diff --git a/.gitignore b/.gitignore index b622d60..ff7e71c 100644 --- a/.gitignore +++ b/.gitignore @@ -20,6 +20,7 @@ data/global/ data/orbital/ config/config_dev.toml AGENTS.md +*.log.1 # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/README.md b/README.md index b4f9148..1b3ef80 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ - 自然语音: 集成微软神经网络文本转语音 (TTS) 技术 - 多种谜题类型: 选择题 (MCQ)、填空题 (Cloze)、识别题 (Recognition) - 动态内容生成: 支持宏驱动的模板系统, 根据上下文动态生成题目 +- 云同步支持: 通过 WebDAV 协议同步数据到远程服务器 ### 实用用户界面 - 响应式 Textual 框架构建的跨平台 TUI 界面 @@ -82,7 +83,23 @@ python -m heurams.interface ## 配置 -配置文件位于 `config/config.toml`(相对于工作目录). 如果不存在, 会使用内置的默认配置. +配置文件位于 `config/config.toml`(相对于工作目录). 如果不存在, 会使用内置的默认配置. + +### 同步配置 +同步功能支持 WebDAV 协议,可在配置文件的 `[sync.webdav]` 段进行配置: +```toml +[sync.webdav] +enabled = false +url = "" # WebDAV 服务器地址 +username = "" # 用户名 +password = "" # 密码 +remote_path = "/heurams/" # 远程路径 +sync_mode = "bidirectional" # 同步模式: bidirectional/upload_only/download_only +conflict_strategy = "newer" # 冲突策略: newer/ask/keep_both +verify_ssl = true # SSL 证书验证 +``` + +启用同步后,可通过应用内的同步工具进行数据备份和恢复。 ## 项目结构 @@ -104,6 +121,7 @@ graph TB Timer[时间服务] AudioService[音频服务] TTSService[TTS服务] + SyncService[同步服务] OtherServices[其他服务] end @@ -156,7 +174,8 @@ src/heurams/ │ ├── logger.py # 日志系统 │ ├── timer.py # 时间服务 │ ├── audio_service.py # 音频播放抽象 -│ └── tts_service.py # 文本转语音抽象 +│ ├── tts_service.py # 文本转语音抽象 +│ └── sync_service.py # WebDAV 同步服务 ├── kernel/ # 核心业务逻辑 │ ├── algorithms/ # 间隔重复算法 (FSRS, SM2) │ ├── particles/ # 数据模型 (Atom, Electron, Nucleon, Orbital) diff --git a/config/config.toml b/config/config.toml index a24b7a0..c7a3805 100644 --- a/config/config.toml +++ b/config/config.toml @@ -42,6 +42,7 @@ template_dir = "./data/template" audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Android Termux), mpg123(TODO) tts = "edgetts" # 可选项: edgetts llm = "openai" # 可选项: openai +sync = "webdav" # 可选项: 留空, webdav [providers.tts.edgetts] # EdgeTTS 设置 voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声) @@ -49,3 +50,12 @@ voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN- [providers.llm.openai] # 与 OpenAI 相容的语言模型接口服务设置 url = "" key = "" + +[providers.sync.webdav] # WebDAV 同步设置 +url = "" +username = "" +password = "" +remote_path = "/heurams/" +verify_ssl = true + +[sync] diff --git a/pyproject.toml b/pyproject.toml index 0156761..400baf1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "heurams" -version = "0.4.0" +version = "0.4.3" description = "Heuristic Assisted Memory Scheduler" license = {file = "LICENSE"} classifiers = [ diff --git a/requirements.txt b/requirements.txt index 24f6e4a..20c66f5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,5 @@ bidict==0.23.1 playsound==1.2.2 textual==5.3.0 toml==0.10.2 +requests>=2.31.0 +webdavclient3>=3.0.0 diff --git a/src/heurams/context.py b/src/heurams/context.py index 80f7feb..92eab06 100644 --- a/src/heurams/context.py +++ b/src/heurams/context.py @@ -32,11 +32,12 @@ try: except Exception as e: print("未能加载自定义用户配置") logger.warning("未能加载自定义用户配置, 错误: %s", e) -if pathlib.Path(rootdir / "default" / "config" / "config_dev.toml").exists(): +if pathlib.Path(workdir / "config" / "config_dev.toml").exists(): + print("使用开发设置") logger.debug("使用开发设置") config_var: ContextVar[ConfigFile] = ContextVar( - "config_var", default=ConfigFile(workdir / "config" / "config_dev.toml") -) + "config_var", default=ConfigFile(workdir / "config" / "config_dev.toml") + ) # runtime_var: ContextVar = ContextVar('runtime_var', default=dict()) # 运行时共享数据 diff --git a/src/heurams/default/config/config.toml b/src/heurams/default/config/config.toml index a7d9029..c7a3805 100644 --- a/src/heurams/default/config/config.toml +++ b/src/heurams/default/config/config.toml @@ -14,6 +14,14 @@ scheduled_num = 8 # UTC 时间戳修正 仅用于 UNIX 日时间戳的生成修正, 单位为秒 timezone_offset = +28800 # 中国标准时间 (UTC+8) +[interface] + +[interface.memorizor] +autovoice = true # 自动语音播放, 仅限于 recognition 组件 + +[algorithm] +default = "SM-2" # 主要算法; 可选项: SM-2, SM-15M, FSRS + [puzzles] # 谜题默认配置 [puzzles.mcq] @@ -25,6 +33,7 @@ min_denominator = 3 [paths] # 相对于配置文件的 ".." (即工作目录) 而言 或绝对路径 nucleon_dir = "./data/nucleon" electron_dir = "./data/electron" +global_dir = "./data/global" # 全局数据路径, SM-15 等算法需要 orbital_dir = "./data/orbital" cache_dir = "./data/cache" template_dir = "./data/template" @@ -33,7 +42,20 @@ template_dir = "./data/template" audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Android Termux), mpg123(TODO) tts = "edgetts" # 可选项: edgetts llm = "openai" # 可选项: openai +sync = "webdav" # 可选项: 留空, webdav + +[providers.tts.edgetts] # EdgeTTS 设置 +voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声) [providers.llm.openai] # 与 OpenAI 相容的语言模型接口服务设置 url = "" key = "" + +[providers.sync.webdav] # WebDAV 同步设置 +url = "" +username = "" +password = "" +remote_path = "/heurams/" +verify_ssl = true + +[sync] diff --git a/src/heurams/interface/__init__.py b/src/heurams/interface/__init__.py index 0a6bb1a..8205e2e 100644 --- a/src/heurams/interface/__init__.py +++ b/src/heurams/interface/__init__.py @@ -8,6 +8,7 @@ from .screens.about import AboutScreen from .screens.dashboard import DashboardScreen from .screens.nucreator import NucleonCreatorScreen from .screens.precache import PrecachingScreen +from .screens.synctool import SyncScreen logger = get_logger(__name__) @@ -39,12 +40,14 @@ class HeurAMSApp(App): ("1", "app.push_screen('dashboard')", "仪表盘"), ("2", "app.push_screen('precache_all')", "缓存管理器"), ("3", "app.push_screen('nucleon_creator')", "创建新单元"), + # ("4", "app.push_screen('synctool')", "同步工具"), ("0", "app.push_screen('about')", "版本信息"), ] SCREENS = { "dashboard": DashboardScreen, "nucleon_creator": NucleonCreatorScreen, "precache_all": PrecachingScreen, + "synctool": SyncScreen, "about": AboutScreen, } diff --git a/src/heurams/interface/__main__.py b/src/heurams/interface/__main__.py index cf62f5a..a6dae83 100644 --- a/src/heurams/interface/__main__.py +++ b/src/heurams/interface/__main__.py @@ -1,9 +1,9 @@ from textual.app import App from textual.widgets import Button -from heurams.services.logger import get_logger from heurams.context import config_var from heurams.interface import HeurAMSApp +from heurams.services.logger import get_logger from .screens.about import AboutScreen from .screens.dashboard import DashboardScreen @@ -15,4 +15,4 @@ logger = get_logger(__name__) app = HeurAMSApp() if __name__ == "__main__": - app.run() \ No newline at end of file + app.run() diff --git a/src/heurams/interface/screens/about.py b/src/heurams/interface/screens/about.py index b0b4904..915da4c 100644 --- a/src/heurams/interface/screens/about.py +++ b/src/heurams/interface/screens/about.py @@ -31,7 +31,8 @@ class AboutScreen(Screen): 特别感谢: -- [Piotr A. Woźniak](https://supermemo.guru/wiki/Piotr_Wozniak): SuperMemo-2 算法 +- [Piotr A. Woźniak](https://supermemo.guru/wiki/Piotr_Wozniak): SM-2 算法与 SM-15 算法理论 +- [Kazuaki Tanida](https://github.com/slaypni): SM-15 算法的 CoffeeScript 实现 - [Thoughts Memo](https://www.zhihu.com/people/L.M.Sherlock): 文献参考 # 参与贡献 diff --git a/src/heurams/interface/screens/dashboard.py b/src/heurams/interface/screens/dashboard.py index b7b8eb7..38b0d1d 100644 --- a/src/heurams/interface/screens/dashboard.py +++ b/src/heurams/interface/screens/dashboard.py @@ -20,128 +20,188 @@ logger = get_logger(__name__) class DashboardScreen(Screen): + """主仪表盘屏幕""" + SUB_TITLE = "仪表盘" + def __init__( + self, + name: str | None = None, + id: str | None = None, + classes: str | None = None, + ) -> None: + super().__init__(name, id, classes) + self.nextdates = {} + self.texts = {} + self.stay_enabled = {} + def compose(self) -> ComposeResult: + """组合界面组件""" yield Header(show_clock=True) yield ScrollableContainer( - Label(f'欢迎使用 "潜进" 启发式辅助记忆调度器', classes="title-label"), + Label('欢迎使用 "潜进" 启发式辅助记忆调度器', classes="title-label"), Label(f"当前 UNIX 日时间戳: {timer.get_daystamp()}"), Label(f'时区修正: UTC+{config_var.get()["timezone_offset"] / 3600}'), + Label(f"使用算法: {config_var.get()['algorithm']['default']}"), Label("选择待学习或待修改的记忆单元集:", classes="title-label"), ListView(id="union-list", classes="union-list-view"), Label( - f'"潜进" 启发式辅助记忆调度器 | 版本 {version.ver} {version.codename.capitalize()} 2025' + f'"潜进" 启发式辅助记忆调度器 | 版本 {version.ver} ' + f"{version.codename.capitalize()} 2025" ), ) yield Footer() - def item_desc_generator(self, filename) -> dict: - """简单分析以生成项目项显示文本 + def analyser(self, filename: str) -> dict: + """分析文件状态以生成显示文本 + + Args: + filename: 要分析的文件名 Returns: - dict: 以数字为列表, 分别呈现单行字符串 + dict: 包含显示文本的字典,键为行号 """ - res = dict() - filestem = pathlib.Path(filename).stem - res[0] = f"{filename}\0" - import heurams.kernel.particles as pt - from heurams.kernel.particles.loader import load_electron + from heurams.kernel.particles.loader import load_electron, load_nucleon - electron_file_path = pathlib.Path(config_var.get()["paths"]["electron_dir"]) / ( - filestem + ".json" - ) + result = {} + filestem = pathlib.Path(filename).stem + + # 构建电子文件路径 + electron_dir = config_var.get()["paths"]["electron_dir"] + electron_file_path = pathlib.Path(electron_dir) / f"{filestem}.json" logger.debug(f"电子文件路径: {electron_file_path}") - if electron_file_path.exists(): # 未找到则创建电子文件 (json) - pass - else: + # 确保电子文件存在 + if not electron_file_path.exists(): electron_file_path.touch() - with open(electron_file_path, "w") as f: - f.write("{}") - electron_dict = load_electron(path=electron_file_path) # TODO: 取消硬编码扩展名 - logger.debug(electron_dict) + electron_file_path.write_text("{}") + + # 加载电子数据 + electron_dict = load_electron(path=electron_file_path) + logger.debug(f"电子数据: {electron_dict}") + + # 分析电子状态 is_due = 0 is_activated = 0 nextdate = 0x3F3F3F3F - for i in electron_dict.values(): - i: pt.Electron - logger.debug(i, i.is_due()) - if i.is_due(): + + for electron in electron_dict.values(): + logger.debug(f"{electron}, 是否到期: {electron.is_due()}") + + if electron.is_due(): is_due = 1 - if i.is_activated(): + if electron.is_activated(): is_activated = 1 - nextdate = min(nextdate, i.nextdate()) - res[1] = f"下一次复习: {nextdate}\n" - res[1] += f"{"需要复习" if is_due else "当前无需复习"}" + nextdate = min(nextdate, electron.nextdate()) + + # 检查是否需要更多复习 + nucleon_dir = config_var.get()["paths"]["nucleon_dir"] + nucleon_path = pathlib.Path(nucleon_dir) / f"{filestem}.toml" + nucleon_count = len(load_nucleon(nucleon_path)) + electron_count = len(electron_dict) + is_more = not (electron_count >= nucleon_count) + + logger.debug(f"是否需要更多复习: {is_more}") + + # 更新状态 + self.nextdates[filename] = nextdate + self.stay_enabled[filename] = is_due or is_more + + # 构建返回结果 + result[0] = f"{filename}\0" + if not is_activated: - res[1] = " 尚未激活" - return res + result[1] = " 尚未激活" + else: + status_text = "需要复习" if is_due else "当前无需复习" + result[1] = f"下一次复习: {nextdate}\n{status_text}" + + return result def on_mount(self) -> None: + """挂载组件时初始化""" union_list_widget = self.query_one("#union-list", ListView) - probe = probe_all(0) - if len(probe["nucleon"]): - for file in probe["nucleon"]: - text = self.item_desc_generator(file) - union_list_widget.append( - ListItem( - Label(text[0] + "\n" + text[1]), - ) - ) - else: + # 分析所有文件 + for file in probe["nucleon"]: + self.texts[file] = self.analyser(file) + + # 按下次复习时间排序 + nucleon_files = sorted( + probe["nucleon"], + key=lambda f: self.nextdates[f], + reverse=True, + ) + + # 填充列表 + if not probe["nucleon"]: union_list_widget.append( ListItem( Static( - "在 ./nucleon/ 中未找到任何内容源数据文件.\n请放置文件后重启应用.\n或者新建空的单元集." + "在 ./nucleon/ 中未找到任何内容源数据文件。\n" + "请放置文件后重启应用,或者新建空的单元集。" ) ) ) union_list_widget.disabled = True + return + + for file in nucleon_files: + text = self.texts[file] + list_item = ListItem(Label(f"{text[0]}\n{text[1]}")) + union_list_widget.append(list_item) + + if not self.stay_enabled[file]: + list_item.disabled = True def on_list_view_selected(self, event) -> None: + """处理列表项选择事件""" if not isinstance(event.item, ListItem): return selected_label = event.item.query_one(Label) - if "未找到任何 .toml 文件" in str(selected_label.renderable): # type: ignore + label_text = str(selected_label.renderable) + + if "未找到任何 .toml 文件" in label_text: return - selected_filename = pathlib.Path( - str(selected_label.renderable) - .partition("\0")[0] # 文件名末尾截断, 保留文件名 - .replace("*", "") - ) # 去除markdown加粗 + # 提取文件名 + selected_filename = pathlib.Path(label_text.partition("\0")[0].replace("*", "")) - nucleon_file_path = ( - pathlib.Path(config_var.get()["paths"]["nucleon_dir"]) / selected_filename - ) - electron_file_path = pathlib.Path(config_var.get()["paths"]["electron_dir"]) / ( - str(selected_filename.stem) + ".json" + # 构建文件路径 + nucleon_dir = config_var.get()["paths"]["nucleon_dir"] + electron_dir = config_var.get()["paths"]["electron_dir"] + + nucleon_file_path = pathlib.Path(nucleon_dir) / selected_filename + electron_file_path = ( + pathlib.Path(electron_dir) / f"{selected_filename.stem}.json" ) + + # 跳转到准备屏幕 self.app.push_screen(PreparationScreen(nucleon_file_path, electron_file_path)) def on_button_pressed(self, event) -> None: - if event.button.id == "new_nucleon_button": - # 切换到创建单元 + """处理按钮点击事件""" + button_id = event.button.id + + if button_id == "new_nucleon_button": from .nucreator import NucleonCreatorScreen - newscr = NucleonCreatorScreen() - self.app.push_screen(newscr) - elif event.button.id == "precache_all_button": - # 切换到缓存管理器 + new_screen = NucleonCreatorScreen() + self.app.push_screen(new_screen) + + elif button_id == "precache_all_button": from .precache import PrecachingScreen precache_screen = PrecachingScreen() self.app.push_screen(precache_screen) - elif event.button.id == "about_button": - from .about import AboutScreen + elif button_id == "about_button": about_screen = AboutScreen() self.app.push_screen(about_screen) def action_quit_app(self) -> None: + """退出应用程序""" self.app.exit() diff --git a/src/heurams/interface/screens/memorizor.py b/src/heurams/interface/screens/memorizor.py index 9394a00..9bf03f7 100644 --- a/src/heurams/interface/screens/memorizor.py +++ b/src/heurams/interface/screens/memorizor.py @@ -148,16 +148,24 @@ class MemScreen(Screen): def play_voice(self): """朗读当前内容""" - from heurams.services.audio_service import play_by_path from pathlib import Path + + from heurams.services.audio_service import play_by_path from heurams.services.hasher import get_md5 - path = Path(config_var.get()['paths']["cache_dir"]) - path = path / f"{get_md5(self.atom.registry['nucleon'].metadata["formation"]["tts_text"])}.wav" + + path = Path(config_var.get()["paths"]["cache_dir"]) + path = ( + path + / f"{get_md5(self.atom.registry['nucleon'].metadata["formation"]["tts_text"])}.wav" + ) if path.exists(): play_by_path(path) else: from heurams.services.tts_service import convertor - convertor(self.atom.registry['nucleon'].metadata["formation"]["tts_text"], path) + + convertor( + self.atom.registry["nucleon"].metadata["formation"]["tts_text"], path + ) play_by_path(path) def action_toggle_dark(self): diff --git a/src/heurams/interface/screens/precache.py b/src/heurams/interface/screens/precache.py index a8ab604..4c03bf2 100644 --- a/src/heurams/interface/screens/precache.py +++ b/src/heurams/interface/screens/precache.py @@ -99,6 +99,7 @@ class PrecachingScreen(Screen): if not cache_file.exists(): try: from heurams.services.tts_service import convertor + convertor(text, cache_file) return 1 except Exception as e: diff --git a/src/heurams/interface/screens/synctool.py b/src/heurams/interface/screens/synctool.py index 1a41805..3d1eebe 100644 --- a/src/heurams/interface/screens/synctool.py +++ b/src/heurams/interface/screens/synctool.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 import pathlib +import time from textual.app import ComposeResult from textual.containers import Horizontal, ScrollableContainer @@ -18,22 +19,287 @@ class SyncScreen(Screen): def __init__(self, nucleons: list = [], desc: str = ""): super().__init__(name=None, id=None, classes=None) + self.sync_service = None + self.is_syncing = False + self.is_paused = False + self.log_messages = [] + self.max_log_lines = 50 def compose(self) -> ComposeResult: yield Header(show_clock=True) with ScrollableContainer(id="sync_container"): - pass + # 标题和连接状态 + yield Static("同步工具", classes="title") + yield Static("", id="status_label", classes="status") + + # 配置信息 + yield Static(f"同步协议: {config_var.get()['services']['sync']}") + yield Static("服务器配置:", classes="section_title") + with Horizontal(classes="config_info"): + yield Static("远程服务器:", classes="config_label") + yield Static("", id="server_url", classes="config_value") + with Horizontal(classes="config_info"): + yield Static("远程路径:", classes="config_label") + yield Static("", id="remote_path", classes="config_value") + + with Horizontal(classes="control_buttons"): + yield Button("测试连接", id="test_connection", variant="primary") + yield Button("开始同步", id="start_sync", variant="success") + yield Button("暂停", id="pause_sync", variant="warning", disabled=True) + yield Button("取消", id="cancel_sync", variant="error", disabled=True) + + yield Static("同步进度", classes="section_title") + yield ProgressBar(id="progress_bar", show_percentage=True, total=100) + yield Static("", id="progress_label", classes="progress_text") + + yield Static("同步日志", classes="section_title") + yield Static("", id="log_output", classes="log_output") + yield Footer() def on_mount(self): """挂载时初始化状态""" + self.update_ui_from_config() + self.log_message("同步工具已启动") + + def update_ui_from_config(self): + """更新 UI 显示配置信息""" + try: + sync_cfg: dict = config_var.get()["providers"]["sync"]["webdav"] + # 更新服务器 URL + url = sync_cfg.get("url", "未配置") + url_widget = self.query_one("#server_url") + url_widget.update(url) # type: ignore + # 更新远程路径 + remote_path = sync_cfg.get("remote_path", "/") + path_widget = self.query_one("#remote_path") + path_widget.update(remote_path) # type: ignore + + # 更新状态标签 + status_widget = self.query_one("#status_label") + if self.sync_service and self.sync_service.client: + status_widget.update("✅ 同步服务已就绪") # type: ignore + status_widget.add_class("ready") + else: + status_widget.update("❌ 同步服务未配置或未启用") # type: ignore + status_widget.add_class("error") + + except Exception as e: + self.log_message(f"更新 UI 失败: {e}", is_error=True) def update_status(self, status, current_item="", progress=None): """更新状态显示""" + try: + status_widget = self.query_one("#status_label") + status_widget.update(status) # type: ignore + + if progress is not None: + progress_bar = self.query_one("#progress_bar") + progress_bar.progress = progress # type: ignore + + progress_label = self.query_one("#progress_label") + progress_label.update(f"{progress}% - {current_item}" if current_item else f"{progress}%") # type: ignore + + except Exception as e: + self.log_message(f"更新状态失败: {e}", is_error=True) + + def log_message(self, message: str, is_error: bool = False): + """添加日志消息并更新显示""" + timestamp = time.strftime("%H:%M:%S") + prefix = "[ERROR]" if is_error else "[INFO]" + log_line = f"{timestamp} {prefix} {message}" + + self.log_messages.append(log_line) + # 保持日志行数不超过最大值 + if len(self.log_messages) > self.max_log_lines: + self.log_messages = self.log_messages[-self.max_log_lines :] + + # 更新日志显示 + try: + log_widget = self.query_one("#log_output") + log_widget.update("\n".join(self.log_messages)) # type: ignore + except Exception: + pass # 如果组件未就绪,忽略错误 def on_button_pressed(self, event: Button.Pressed) -> None: + """处理按钮点击事件""" + button_id = event.button.id + + if button_id == "test_connection": + self.test_connection() + elif button_id == "start_sync": + self.start_sync() + elif button_id == "pause_sync": + self.pause_sync() + elif button_id == "cancel_sync": + self.cancel_sync() + event.stop() + def test_connection(self): + """测试 WebDAV 服务器连接""" + if not self.sync_service: + self.log_message("同步服务未初始化,请检查配置", is_error=True) + self.update_status("❌ 同步服务未初始化") + return + + self.log_message("正在测试 WebDAV 连接...") + self.update_status("正在测试连接...") + + try: + success = self.sync_service.test_connection() + if success: + self.log_message("连接测试成功") + self.update_status("✅ 连接正常") + else: + self.log_message("连接测试失败", is_error=True) + self.update_status("❌ 连接失败") + except Exception as e: + self.log_message(f"连接测试异常: {e}", is_error=True) + self.update_status("❌ 连接异常") + + def start_sync(self): + """开始同步""" + if not self.sync_service: + self.log_message("同步服务未初始化,无法开始同步", is_error=True) + return + + if self.is_syncing: + self.log_message("同步已在进行中", is_error=True) + return + + self.is_syncing = True + self.is_paused = False + self.update_button_states() + + self.log_message("开始同步数据...") + self.update_status("正在同步...", progress=0) + + # 启动后台同步任务 + self.run_worker(self.perform_sync, thread=True) + + def perform_sync(self): + """执行同步任务(在后台线程中运行)""" + worker = get_current_worker() + + try: + # 获取需要同步的本地目录 + from heurams.context import config_var + + config = config_var.get() + paths = config.get("paths", {}) + + # 同步 nucleon 目录 + nucleon_dir = pathlib.Path(paths.get("nucleon_dir", "./data/nucleon")) + if nucleon_dir.exists(): + self.log_message(f"同步 nucleon 目录: {nucleon_dir}") + self.update_status(f"同步 nucleon 目录...", progress=10) + + result = self.sync_service.sync_directory(nucleon_dir) # type: ignore + if result.get("success"): + self.log_message( + f"nucleon 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)} 个" + ) + else: + self.log_message( + f"nucleon 同步失败: {result.get('error', '未知错误')}", + is_error=True, + ) + + # 同步 electron 目录 + electron_dir = pathlib.Path(paths.get("electron_dir", "./data/electron")) + if electron_dir.exists(): + self.log_message(f"同步 electron 目录: {electron_dir}") + self.update_status(f"同步 electron 目录...", progress=60) + + result = self.sync_service.sync_directory(electron_dir) # type: ignore + if result.get("success"): + self.log_message( + f"electron 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)} 个" + ) + else: + self.log_message( + f"electron 同步失败: {result.get('error', '未知错误')}", + is_error=True, + ) + + # 同步 orbital 目录(如果存在) + orbital_dir = pathlib.Path(paths.get("orbital_dir", "./data/orbital")) + if orbital_dir.exists(): + self.log_message(f"同步 orbital 目录: {orbital_dir}") + self.update_status(f"同步 orbital 目录...", progress=80) + + result = self.sync_service.sync_directory(orbital_dir) # type: ignore + if result.get("success"): + self.log_message( + f"orbital 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)} 个" + ) + else: + self.log_message( + f"orbital 同步失败: {result.get('error', '未知错误')}", + is_error=True, + ) + + # 同步完成 + self.update_status("同步完成", progress=100) + self.log_message("所有目录同步完成") + + except Exception as e: + self.log_message(f"同步过程中发生错误: {e}", is_error=True) + self.update_status("同步失败") + finally: + # 重置同步状态 + self.is_syncing = False + self.is_paused = False + self.update_button_states() # type: ignore + + def pause_sync(self): + """暂停同步""" + if not self.is_syncing: + return + + self.is_paused = not self.is_paused + self.update_button_states() + + if self.is_paused: + self.log_message("同步已暂停") + self.update_status("同步已暂停") + else: + self.log_message("同步已恢复") + self.update_status("正在同步...") + + def cancel_sync(self): + """取消同步""" + if not self.is_syncing: + return + + self.is_syncing = False + self.is_paused = False + self.update_button_states() + + self.log_message("同步已取消") + self.update_status("同步已取消") + + def update_button_states(self): + """更新按钮状态""" + try: + start_button = self.query_one("#start_sync") + pause_button = self.query_one("#pause_sync") + cancel_button = self.query_one("#cancel_sync") + + if self.is_syncing: + start_button.disabled = True + pause_button.disabled = False + cancel_button.disabled = False + pause_button.label = "继续" if self.is_paused else "暂停" # type: ignore + else: + start_button.disabled = False + pause_button.disabled = True + cancel_button.disabled = True + + except Exception as e: + self.log_message(f"更新按钮状态失败: {e}", is_error=True) + def action_go_back(self): self.app.pop_screen() diff --git a/src/heurams/interface/widgets/recognition.py b/src/heurams/interface/widgets/recognition.py index 842082a..7b4364c 100644 --- a/src/heurams/interface/widgets/recognition.py +++ b/src/heurams/interface/widgets/recognition.py @@ -50,9 +50,10 @@ class Recognition(BasePuzzleWidget): def compose(self): from heurams.context import config_var - autovoice = config_var.get()['interface']['memorizor']['autovoice'] + + autovoice = config_var.get()["interface"]["memorizor"]["autovoice"] if autovoice: - self.screen.action_play_voice() # type: ignore + self.screen.action_play_voice() # type: ignore cfg: RecognitionConfig = self.atom.registry["orbital"]["puzzles"][self.alia] delim = self.atom.registry["nucleon"].metadata["formation"]["delimiter"] replace_dict = { @@ -72,7 +73,7 @@ class Recognition(BasePuzzleWidget): primary = cfg["primary"] with Center(): - for i in cfg['top_dim']: + for i in cfg["top_dim"]: yield Static(f"[dim]{i}[/]") yield Label("") diff --git a/src/heurams/kernel/algorithms/sm15m.py b/src/heurams/kernel/algorithms/sm15m.py index 95e210e..fc5abd7 100644 --- a/src/heurams/kernel/algorithms/sm15m.py +++ b/src/heurams/kernel/algorithms/sm15m.py @@ -10,15 +10,18 @@ MIT 许可证 import datetime import json import os -from typing import TypedDict import pathlib +from typing import TypedDict + from heurams.context import config_var from heurams.kernel.algorithms.sm15m_calc import (MAX_AF, MIN_AF, NOTCH_AF, RANGE_AF, RANGE_REPETITION, SM, THRESHOLD_RECALL, Item) # 全局状态文件路径 -_GLOBAL_STATE_FILE = os.path.expanduser(pathlib.Path(config_var.get()['paths']['global_dir']) / 'sm15m_global_state.json') +_GLOBAL_STATE_FILE = os.path.expanduser( + pathlib.Path(config_var.get()["paths"]["global_dir"]) / "sm15m_global_state.json" +) def _get_global_sm(): diff --git a/src/heurams/kernel/particles/atom.py b/src/heurams/kernel/particles/atom.py index 4d90ff2..dfb172c 100644 --- a/src/heurams/kernel/particles/atom.py +++ b/src/heurams/kernel/particles/atom.py @@ -86,8 +86,8 @@ class Atom: # eval 环境设置 def eval_with_env(s: str): default = config_var.get()["puzzles"] - payload = self.registry['nucleon'].payload - metadata = self.registry['nucleon'].metadata + payload = self.registry["nucleon"].payload + metadata = self.registry["nucleon"].metadata eval_value = eval(s) if isinstance(eval_value, (int, float)): ret = str(eval_value) @@ -117,10 +117,11 @@ class Atom: logger.debug("发现 eval 表达式: '%s'", data[5:]) return modifier(data[5:]) return data + try: - traverse(self.registry['nucleon'].payload, eval_with_env) - traverse(self.registry['nucleon'].metadata, eval_with_env) - traverse(self.registry['orbital'], eval_with_env) + traverse(self.registry["nucleon"].payload, eval_with_env) + traverse(self.registry["nucleon"].metadata, eval_with_env) + traverse(self.registry["orbital"], eval_with_env) except Exception as e: ret = f"此 eval 实例发生错误: {e}" logger.warning(ret) diff --git a/src/heurams/kernel/particles/electron.py b/src/heurams/kernel/particles/electron.py index cb7cf54..cc5cdd6 100644 --- a/src/heurams/kernel/particles/electron.py +++ b/src/heurams/kernel/particles/electron.py @@ -18,9 +18,12 @@ class Electron: algo: 使用的算法模块标识 """ if algo_name == "": - algo_name = config_var.get()['algorithm']['default'] + algo_name = config_var.get()["algorithm"]["default"] logger.debug( - "创建 Electron 实例, ident: '%s', algo_name: '%s', algodata: %s", ident, algo_name, algodata + "创建 Electron 实例, ident: '%s', algo_name: '%s', algodata: %s", + ident, + algo_name, + algodata, ) self.algodata = algodata self.ident = ident @@ -31,7 +34,9 @@ class Electron: self.algodata[self.algo.algo_name] = {} logger.debug("算法键 '%s' 不存在, 已创建空字典", self.algo) if not self.algodata[self.algo.algo_name]: - logger.debug(f"算法数据为空, 使用默认值初始化{self.algodata[self.algo.algo_name]}") + logger.debug( + f"算法数据为空, 使用默认值初始化{self.algodata[self.algo.algo_name]}" + ) self._default_init(self.algo.defaults) else: logger.debug("算法数据已存在, 跳过默认初始化") diff --git a/src/heurams/kernel/particles/nucleon.py b/src/heurams/kernel/particles/nucleon.py index 098e840..175661b 100644 --- a/src/heurams/kernel/particles/nucleon.py +++ b/src/heurams/kernel/particles/nucleon.py @@ -53,4 +53,4 @@ class Nucleon: def placeholder(): """生成一个占位原子核""" logger.debug("创建 Nucleon 占位符") - return Nucleon("核子对象样例内容", {}) \ No newline at end of file + return Nucleon("核子对象样例内容", {}) diff --git a/src/heurams/providers/tts/edge_tts.py b/src/heurams/providers/tts/edge_tts.py index ee74cf0..9b8a33c 100644 --- a/src/heurams/providers/tts/edge_tts.py +++ b/src/heurams/providers/tts/edge_tts.py @@ -2,8 +2,8 @@ import pathlib import edge_tts -from heurams.services.logger import get_logger from heurams.context import config_var +from heurams.services.logger import get_logger from .base import BaseTTS @@ -19,7 +19,7 @@ class EdgeTTS(BaseTTS): try: communicate = edge_tts.Communicate( text, - config_var.get()['providers']['tts']['edgetts']["voice"], + config_var.get()["providers"]["tts"]["edgetts"]["voice"], ) logger.debug("EdgeTTS 通信对象创建成功, 正在保存音频") communicate.save_sync(str(path)) diff --git a/src/heurams/default/config/config_dev.toml b/src/heurams/services/sync_service.py similarity index 100% rename from src/heurams/default/config/config_dev.toml rename to src/heurams/services/sync_service.py diff --git a/src/heurams/services/version.py b/src/heurams/services/version.py index cd501b1..0239b75 100644 --- a/src/heurams/services/version.py +++ b/src/heurams/services/version.py @@ -3,7 +3,7 @@ from heurams.services.logger import get_logger logger = get_logger(__name__) -ver = "0.4.2" +ver = "0.4.3" stage = "prototype" codename = "fledge" # 雏鸟, 0.4.x 版本 diff --git a/src/heurams/services/vfs.py b/src/heurams/services/vfs.py new file mode 100644 index 0000000..a024ef8 --- /dev/null +++ b/src/heurams/services/vfs.py @@ -0,0 +1,20 @@ +"""vfs.py +得益于 FSSpec, 无需实现大部分虚拟文件系统的 Providers +""" + +from pathlib import Path + +import fsspec as fs + + +class VFSObject: + def __init__(self, protocol, base_url): + self.base_url = base_url + self.protocol = protocol + self.fs = fs.filesystem(protocol=protocol, base_url=base_url) + + def open(self, path: Path): + return self.fs.open(path) + + def open_by_list(self, path_list: list[Path]): + return self.fs.open_files(path_list) diff --git a/src/heurams/services/vfs/zipfs.py b/src/heurams/services/vfs/zipfs.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/interface/test_dashboard.py b/tests/interface/test_dashboard.py index be519ac..42ca79a 100644 --- a/tests/interface/test_dashboard.py +++ b/tests/interface/test_dashboard.py @@ -89,7 +89,7 @@ class TestDashboardScreenUnit(unittest.TestCase): screen = DashboardScreen() # 模拟一个文件名 filename = "test.toml" - result = screen.item_desc_generator(filename) + result = screen.analyser(filename) self.assertIsInstance(result, dict) self.assertIn(0, result) self.assertIn(1, result) diff --git a/tests/interface/test_synctool.py b/tests/interface/test_synctool.py new file mode 100644 index 0000000..b70d8a3 --- /dev/null +++ b/tests/interface/test_synctool.py @@ -0,0 +1,314 @@ +#!/usr/bin/env python3 +""" +SyncScreen 和 SyncService 的测试. +""" +import pathlib +import tempfile +import time +import unittest +from unittest.mock import MagicMock, Mock, patch + +from heurams.context import ConfigContext +from heurams.services.config import ConfigFile +from heurams.services.sync_service import (ConflictStrategy, SyncConfig, + SyncMode, SyncService) + + +class TestSyncServiceUnit(unittest.TestCase): + """SyncService 的单元测试.""" + + def setUp(self): + """在每个测试之前运行, 设置临时目录和模拟客户端.""" + self.temp_dir = tempfile.TemporaryDirectory() + self.temp_path = pathlib.Path(self.temp_dir.name) + + # 创建测试文件 + self.test_file = self.temp_path / "test.txt" + self.test_file.write_text("测试内容") + + # 模拟 WebDAV 客户端 + self.mock_client = MagicMock() + + # 创建同步配置 + self.config = SyncConfig( + enabled=True, + url="https://example.com/dav/", + username="test", + password="test", + remote_path="/heurams/", + sync_mode=SyncMode.BIDIRECTIONAL, + conflict_strategy=ConflictStrategy.NEWER, + verify_ssl=True, + ) + + def tearDown(self): + """在每个测试之后清理.""" + self.temp_dir.cleanup() + + @patch("heurams.services.sync_service.Client") + def test_sync_service_initialization(self, mock_client_class): + """测试同步服务初始化.""" + mock_client_class.return_value = self.mock_client + + service = SyncService(self.config) + + # 验证客户端已创建 + mock_client_class.assert_called_once() + self.assertIsNotNone(service.client) + self.assertEqual(service.config, self.config) + + @patch("heurams.services.sync_service.Client") + def test_sync_service_disabled(self, mock_client_class): + """测试同步服务未启用.""" + config = SyncConfig(enabled=False) + service = SyncService(config) + + # 客户端不应初始化 + mock_client_class.assert_not_called() + self.assertIsNone(service.client) + + @patch("heurams.services.sync_service.Client") + def test_test_connection_success(self, mock_client_class): + """测试连接测试成功.""" + mock_client_class.return_value = self.mock_client + self.mock_client.list.return_value = [] + + service = SyncService(self.config) + result = service.test_connection() + + self.assertTrue(result) + self.mock_client.list.assert_called_once() + + @patch("heurams.services.sync_service.Client") + def test_test_connection_failure(self, mock_client_class): + """测试连接测试失败.""" + mock_client_class.return_value = self.mock_client + self.mock_client.list.side_effect = Exception("连接失败") + + service = SyncService(self.config) + result = service.test_connection() + + self.assertFalse(result) + self.mock_client.list.assert_called_once() + + @patch("heurams.services.sync_service.Client") + def test_upload_file(self, mock_client_class): + """测试上传单个文件.""" + mock_client_class.return_value = self.mock_client + + service = SyncService(self.config) + result = service.upload_file(self.test_file) + + self.assertTrue(result) + self.mock_client.upload_file.assert_called_once() + + @patch("heurams.services.sync_service.Client") + def test_download_file(self, mock_client_class): + """测试下载单个文件.""" + mock_client_class.return_value = self.mock_client + + service = SyncService(self.config) + remote_path = "/heurams/test.txt" + local_path = self.temp_path / "downloaded.txt" + + result = service.download_file(remote_path, local_path) + + self.assertTrue(result) + self.mock_client.download_file.assert_called_once() + self.assertTrue(local_path.parent.exists()) + + @patch("heurams.services.sync_service.Client") + def test_sync_directory_no_files(self, mock_client_class): + """测试同步空目录.""" + mock_client_class.return_value = self.mock_client + self.mock_client.list.return_value = [] + self.mock_client.mkdir.return_value = None + + service = SyncService(self.config) + result = service.sync_directory(self.temp_path) + + self.assertTrue(result["success"]) + self.assertEqual(result["uploaded"], 0) + self.assertEqual(result["downloaded"], 0) + self.mock_client.mkdir.assert_called_once() + + @patch("heurams.services.sync_service.Client") + def test_sync_directory_upload_only(self, mock_client_class): + """测试仅上传模式.""" + mock_client_class.return_value = self.mock_client + self.mock_client.list.return_value = [] + self.mock_client.mkdir.return_value = None + + config = SyncConfig( + enabled=True, + url="https://example.com/dav/", + username="test", + password="test", + remote_path="/heurams/", + sync_mode=SyncMode.UPLOAD_ONLY, + conflict_strategy=ConflictStrategy.NEWER, + ) + + service = SyncService(config) + result = service.sync_directory(self.temp_path) + + self.assertTrue(result["success"]) + self.mock_client.mkdir.assert_called_once() + + @patch("heurams.services.sync_service.Client") + def test_conflict_strategy_newer(self, mock_client_class): + """测试 NEWER 冲突策略.""" + mock_client_class.return_value = self.mock_client + + # 模拟远程文件存在 + self.mock_client.list.return_value = ["test.txt"] + self.mock_client.info.return_value = { + "size": 100, + "modified": "2023-01-01T00:00:00Z", + } + self.mock_client.mkdir.return_value = None + + service = SyncService(self.config) + result = service.sync_directory(self.temp_path) + + self.assertTrue(result["success"]) + # 应该有一个冲突 + self.assertGreaterEqual(result.get("conflicts", 0), 0) + + @patch("heurams.services.sync_service.Client") + def test_create_sync_service_from_config(self, mock_client_class): + """测试从配置文件创建同步服务.""" + mock_client_class.return_value = self.mock_client + + # 创建临时配置文件 + config_data = { + "sync": { + "webdav": { + "enabled": True, + "url": "https://example.com/dav/", + "username": "test", + "password": "test", + "remote_path": "/heurams/", + "sync_mode": "bidirectional", + "conflict_strategy": "newer", + "verify_ssl": True, + } + } + } + + # 模拟 config_var + with patch("heurams.services.sync_service.config_var") as mock_config_var: + mock_config = MagicMock() + mock_config.data = config_data + mock_config_var.get.return_value = mock_config + + from heurams.services.sync_service import \ + create_sync_service_from_config + + service = create_sync_service_from_config() + + self.assertIsNotNone(service) + self.assertIsNotNone(service.client) + + +class TestSyncScreenUnit(unittest.TestCase): + """SyncScreen 的单元测试.""" + + def setUp(self): + """在每个测试之前运行.""" + self.temp_dir = tempfile.TemporaryDirectory() + self.temp_path = pathlib.Path(self.temp_dir.name) + + # 创建默认配置 + default_config_path = ( + pathlib.Path(__file__).parent.parent.parent + / "src/heurams/default/config/config.toml" + ) + self.config = ConfigFile(default_config_path) + + # 更新配置中的路径 + config_data = self.config.data + config_data["paths"]["nucleon_dir"] = str(self.temp_path / "nucleon") + config_data["paths"]["electron_dir"] = str(self.temp_path / "electron") + config_data["paths"]["orbital_dir"] = str(self.temp_path / "orbital") + config_data["paths"]["cache_dir"] = str(self.temp_path / "cache") + + # 添加同步配置 + if "sync" not in config_data: + config_data["sync"] = {} + config_data["sync"]["webdav"] = { + "enabled": False, + "url": "", + "username": "", + "password": "", + "remote_path": "/heurams/", + "sync_mode": "bidirectional", + "conflict_strategy": "newer", + "verify_ssl": True, + } + + # 创建目录 + for dir_key in ["nucleon_dir", "electron_dir", "orbital_dir", "cache_dir"]: + pathlib.Path(config_data["paths"][dir_key]).mkdir( + parents=True, exist_ok=True + ) + + # 使用 ConfigContext 设置配置 + self.config_ctx = ConfigContext(self.config) + self.config_ctx.__enter__() + + def tearDown(self): + """在每个测试之后清理.""" + self.config_ctx.__exit__(None, None, None) + self.temp_dir.cleanup() + + @patch("heurams.interface.screens.synctool.create_sync_service_from_config") + def test_sync_screen_compose(self, mock_create_service): + """测试 SyncScreen 的 compose 方法.""" + from heurams.interface.screens.synctool import SyncScreen + + # 模拟同步服务 + mock_service = MagicMock() + mock_service.client = MagicMock() + mock_create_service.return_value = mock_service + + screen = SyncScreen() + + # 测试 compose 方法 + from textual.app import ComposeResult + + result = screen.compose() + widgets = list(result) + + # 检查基本部件 + from textual.containers import ScrollableContainer + from textual.widgets import Button, Footer, Header, ProgressBar, Static + + header_present = any(isinstance(w, Header) for w in widgets) + footer_present = any(isinstance(w, Footer) for w in widgets) + self.assertTrue(header_present) + self.assertTrue(footer_present) + + # 检查容器 + container_present = any(isinstance(w, ScrollableContainer) for w in widgets) + self.assertTrue(container_present) + + @patch("heurams.interface.screens.synctool.create_sync_service_from_config") + def test_sync_screen_load_config(self, mock_create_service): + """测试 SyncScreen 加载配置.""" + from heurams.interface.screens.synctool import SyncScreen + + mock_service = MagicMock() + mock_service.client = MagicMock() + mock_create_service.return_value = mock_service + + screen = SyncScreen() + screen.load_config() + + # 验证配置已加载 + self.assertIsNotNone(screen.sync_config) + mock_create_service.assert_called_once() + + +if __name__ == "__main__": + unittest.main()