From 1efe034a59746d9fe53ad38872173e185db13831 Mon Sep 17 00:00:00 2001 From: david-ajax Date: Sun, 21 Dec 2025 07:49:19 +0800 Subject: [PATCH 1/7] =?UTF-8?q?feat(synctool):=20=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 23 +- config/config.toml | 10 + requirements.txt | 2 + src/heurams/context.py | 3 +- src/heurams/default/config/config.toml | 22 ++ src/heurams/default/config/config_dev.toml | 0 src/heurams/interface/__init__.py | 3 + src/heurams/interface/screens/about.py | 3 +- src/heurams/interface/screens/synctool.py | 285 +++++++++++++- src/heurams/services/sync_service.py | 410 +++++++++++++++++++++ src/heurams/services/version.py | 2 +- tests/interface/test_synctool.py | 305 +++++++++++++++ 12 files changed, 1062 insertions(+), 6 deletions(-) delete mode 100644 src/heurams/default/config/config_dev.toml create mode 100644 src/heurams/services/sync_service.py create mode 100644 tests/interface/test_synctool.py diff --git a/README.md b/README.md index b4f9148..1b3ef80 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ - 自然语音: 集成微软神经网络文本转语音 (TTS) 技术 - 多种谜题类型: 选择题 (MCQ)、填空题 (Cloze)、识别题 (Recognition) - 动态内容生成: 支持宏驱动的模板系统, 根据上下文动态生成题目 +- 云同步支持: 通过 WebDAV 协议同步数据到远程服务器 ### 实用用户界面 - 响应式 Textual 框架构建的跨平台 TUI 界面 @@ -82,7 +83,23 @@ python -m heurams.interface ## 配置 -配置文件位于 `config/config.toml`(相对于工作目录). 如果不存在, 会使用内置的默认配置. +配置文件位于 `config/config.toml`(相对于工作目录). 如果不存在, 会使用内置的默认配置. + +### 同步配置 +同步功能支持 WebDAV 协议,可在配置文件的 `[sync.webdav]` 段进行配置: +```toml +[sync.webdav] +enabled = false +url = "" # WebDAV 服务器地址 +username = "" # 用户名 +password = "" # 密码 +remote_path = "/heurams/" # 远程路径 +sync_mode = "bidirectional" # 同步模式: bidirectional/upload_only/download_only +conflict_strategy = "newer" # 冲突策略: newer/ask/keep_both +verify_ssl = true # SSL 证书验证 +``` + +启用同步后,可通过应用内的同步工具进行数据备份和恢复。 ## 项目结构 @@ -104,6 +121,7 @@ graph TB Timer[时间服务] AudioService[音频服务] TTSService[TTS服务] + SyncService[同步服务] OtherServices[其他服务] end @@ -156,7 +174,8 @@ src/heurams/ │ ├── logger.py # 日志系统 │ ├── timer.py # 时间服务 │ ├── audio_service.py # 音频播放抽象 -│ └── tts_service.py # 文本转语音抽象 +│ ├── tts_service.py # 文本转语音抽象 +│ └── sync_service.py # WebDAV 同步服务 ├── kernel/ # 核心业务逻辑 │ ├── algorithms/ # 间隔重复算法 (FSRS, SM2) │ ├── particles/ # 数据模型 (Atom, Electron, Nucleon, Orbital) diff --git a/config/config.toml b/config/config.toml index a24b7a0..b0bedc1 100644 --- a/config/config.toml +++ b/config/config.toml @@ -49,3 +49,13 @@ voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN- [providers.llm.openai] # 与 OpenAI 相容的语言模型接口服务设置 url = "" key = "" + +[sync.webdav] # WebDAV 同步设置 +enabled = false +url = "" +username = "" +password = "" +remote_path = "/heurams/" +sync_mode = "bidirectional" # bidirectional/upload_only/download_only +conflict_strategy = "newer" # newer/ask/keep_both +verify_ssl = true diff --git a/requirements.txt b/requirements.txt index 24f6e4a..20c66f5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,5 @@ bidict==0.23.1 playsound==1.2.2 textual==5.3.0 toml==0.10.2 +requests>=2.31.0 +webdavclient3>=3.0.0 diff --git a/src/heurams/context.py b/src/heurams/context.py index 80f7feb..06b2e34 100644 --- a/src/heurams/context.py +++ b/src/heurams/context.py @@ -32,7 +32,8 @@ try: except Exception as e: print("未能加载自定义用户配置") logger.warning("未能加载自定义用户配置, 错误: %s", e) -if pathlib.Path(rootdir / "default" / "config" / "config_dev.toml").exists(): +if pathlib.Path(workdir / "config" / "config_dev.toml").exists(): + print("使用开发设置") logger.debug("使用开发设置") config_var: ContextVar[ConfigFile] = ContextVar( "config_var", default=ConfigFile(workdir / "config" / "config_dev.toml") diff --git a/src/heurams/default/config/config.toml b/src/heurams/default/config/config.toml index a7d9029..b0bedc1 100644 --- a/src/heurams/default/config/config.toml +++ b/src/heurams/default/config/config.toml @@ -14,6 +14,14 @@ scheduled_num = 8 # UTC 时间戳修正 仅用于 UNIX 日时间戳的生成修正, 单位为秒 timezone_offset = +28800 # 中国标准时间 (UTC+8) +[interface] + +[interface.memorizor] +autovoice = true # 自动语音播放, 仅限于 recognition 组件 + +[algorithm] +default = "SM-2" # 主要算法; 可选项: SM-2, SM-15M, FSRS + [puzzles] # 谜题默认配置 [puzzles.mcq] @@ -25,6 +33,7 @@ min_denominator = 3 [paths] # 相对于配置文件的 ".." (即工作目录) 而言 或绝对路径 nucleon_dir = "./data/nucleon" electron_dir = "./data/electron" +global_dir = "./data/global" # 全局数据路径, SM-15 等算法需要 orbital_dir = "./data/orbital" cache_dir = "./data/cache" template_dir = "./data/template" @@ -34,6 +43,19 @@ audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Andro tts = "edgetts" # 可选项: edgetts llm = "openai" # 可选项: openai +[providers.tts.edgetts] # EdgeTTS 设置 +voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声) + [providers.llm.openai] # 与 OpenAI 相容的语言模型接口服务设置 url = "" key = "" + +[sync.webdav] # WebDAV 同步设置 +enabled = false +url = "" +username = "" +password = "" +remote_path = "/heurams/" +sync_mode = "bidirectional" # bidirectional/upload_only/download_only +conflict_strategy = "newer" # newer/ask/keep_both +verify_ssl = true diff --git a/src/heurams/default/config/config_dev.toml b/src/heurams/default/config/config_dev.toml deleted file mode 100644 index e69de29..0000000 diff --git a/src/heurams/interface/__init__.py b/src/heurams/interface/__init__.py index 0a6bb1a..4071b04 100644 --- a/src/heurams/interface/__init__.py +++ b/src/heurams/interface/__init__.py @@ -8,6 +8,7 @@ from .screens.about import AboutScreen from .screens.dashboard import DashboardScreen from .screens.nucreator import NucleonCreatorScreen from .screens.precache import PrecachingScreen +from .screens.synctool import SyncScreen logger = get_logger(__name__) @@ -39,12 +40,14 @@ class HeurAMSApp(App): ("1", "app.push_screen('dashboard')", "仪表盘"), ("2", "app.push_screen('precache_all')", "缓存管理器"), ("3", "app.push_screen('nucleon_creator')", "创建新单元"), + ("4", "app.push_screen('synctool')", "同步工具"), ("0", "app.push_screen('about')", "版本信息"), ] SCREENS = { "dashboard": DashboardScreen, "nucleon_creator": NucleonCreatorScreen, "precache_all": PrecachingScreen, + "synctool": SyncScreen, "about": AboutScreen, } diff --git a/src/heurams/interface/screens/about.py b/src/heurams/interface/screens/about.py index b0b4904..915da4c 100644 --- a/src/heurams/interface/screens/about.py +++ b/src/heurams/interface/screens/about.py @@ -31,7 +31,8 @@ class AboutScreen(Screen): 特别感谢: -- [Piotr A. Woźniak](https://supermemo.guru/wiki/Piotr_Wozniak): SuperMemo-2 算法 +- [Piotr A. Woźniak](https://supermemo.guru/wiki/Piotr_Wozniak): SM-2 算法与 SM-15 算法理论 +- [Kazuaki Tanida](https://github.com/slaypni): SM-15 算法的 CoffeeScript 实现 - [Thoughts Memo](https://www.zhihu.com/people/L.M.Sherlock): 文献参考 # 参与贡献 diff --git a/src/heurams/interface/screens/synctool.py b/src/heurams/interface/screens/synctool.py index 1a41805..b62f44b 100644 --- a/src/heurams/interface/screens/synctool.py +++ b/src/heurams/interface/screens/synctool.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 import pathlib +import time from textual.app import ComposeResult from textual.containers import Horizontal, ScrollableContainer @@ -18,22 +19,304 @@ class SyncScreen(Screen): def __init__(self, nucleons: list = [], desc: str = ""): super().__init__(name=None, id=None, classes=None) + self.sync_service = None + self.sync_config = {} + self.is_syncing = False + self.is_paused = False + self.log_messages = [] + self.max_log_lines = 50 def compose(self) -> ComposeResult: yield Header(show_clock=True) with ScrollableContainer(id="sync_container"): - pass + # 标题和连接状态 + yield Static("WebDAV 同步工具", classes="title") + yield Static("", id="status_label", classes="status") + + # 配置信息 + yield Static("服务器配置", classes="section_title") + with Horizontal(classes="config_info"): + yield Static("URL:", classes="config_label") + yield Static("", id="server_url", classes="config_value") + with Horizontal(classes="config_info"): + yield Static("远程路径:", classes="config_label") + yield Static("", id="remote_path", classes="config_value") + with Horizontal(classes="config_info"): + yield Static("同步模式:", classes="config_label") + yield Static("", id="sync_mode", classes="config_value") + + # 控制按钮 + yield Static("控制面板", classes="section_title") + with Horizontal(classes="control_buttons"): + yield Button("测试连接", id="test_connection", variant="primary") + yield Button("开始同步", id="start_sync", variant="success") + yield Button("暂停", id="pause_sync", variant="warning", disabled=True) + yield Button("取消", id="cancel_sync", variant="error", disabled=True) + + # 进度显示 + yield Static("同步进度", classes="section_title") + yield ProgressBar(id="progress_bar", show_percentage=True, total=100) + yield Static("", id="progress_label", classes="progress_text") + + # 日志输出 + yield Static("同步日志", classes="section_title") + yield Static("", id="log_output", classes="log_output") + yield Footer() def on_mount(self): """挂载时初始化状态""" + self.load_config() + self.update_ui_from_config() + self.log_message("同步工具已启动") + + def load_config(self): + """从配置文件加载同步设置""" + try: + from heurams.context import config_var + config_data = config_var.get().data + self.sync_config = config_data.get('sync', {}).get('webdav', {}) + + # 创建同步服务实例 + from heurams.services.sync_service import create_sync_service_from_config + self.sync_service = create_sync_service_from_config() + + except Exception as e: + self.log_message(f"加载配置失败: {e}", is_error=True) + self.sync_config = {} + + def update_ui_from_config(self): + """更新 UI 显示配置信息""" + try: + # 更新服务器 URL + url = self.sync_config.get('url', '未配置') + url_widget = self.query_one("#server_url") + url_widget.update(url if url else '未配置') # type: ignore + + # 更新远程路径 + remote_path = self.sync_config.get('remote_path', '/heurams/') + path_widget = self.query_one("#remote_path") + path_widget.update(remote_path) # type: ignore + + # 更新同步模式 + sync_mode = self.sync_config.get('sync_mode', 'bidirectional') + mode_widget = self.query_one("#sync_mode") + mode_map = { + 'bidirectional': '双向同步', + 'upload_only': '仅上传', + 'download_only': '仅下载', + } + mode_widget.update(mode_map.get(sync_mode, sync_mode)) # type: ignore + + # 更新状态标签 + status_widget = self.query_one("#status_label") + if self.sync_service and self.sync_service.client: + status_widget.update("✅ 同步服务已就绪") # type: ignore + status_widget.add_class("ready") + else: + status_widget.update("❌ 同步服务未配置或未启用") # type: ignore + status_widget.add_class("error") + + except Exception as e: + self.log_message(f"更新 UI 失败: {e}", is_error=True) def update_status(self, status, current_item="", progress=None): """更新状态显示""" + try: + status_widget = self.query_one("#status_label") + status_widget.update(status) # type: ignore + + if progress is not None: + progress_bar = self.query_one("#progress_bar") + progress_bar.progress = progress # type: ignore + + progress_label = self.query_one("#progress_label") + progress_label.update(f"{progress}% - {current_item}" if current_item else f"{progress}%") # type: ignore + + except Exception as e: + self.log_message(f"更新状态失败: {e}", is_error=True) + + def log_message(self, message: str, is_error: bool = False): + """添加日志消息并更新显示""" + timestamp = time.strftime("%H:%M:%S") + prefix = "[ERROR]" if is_error else "[INFO]" + log_line = f"{timestamp} {prefix} {message}" + + self.log_messages.append(log_line) + # 保持日志行数不超过最大值 + if len(self.log_messages) > self.max_log_lines: + self.log_messages = self.log_messages[-self.max_log_lines:] + + # 更新日志显示 + try: + log_widget = self.query_one("#log_output") + log_widget.update("\n".join(self.log_messages)) # type: ignore + except Exception: + pass # 如果组件未就绪,忽略错误 def on_button_pressed(self, event: Button.Pressed) -> None: + """处理按钮点击事件""" + button_id = event.button.id + + if button_id == "test_connection": + self.test_connection() + elif button_id == "start_sync": + self.start_sync() + elif button_id == "pause_sync": + self.pause_sync() + elif button_id == "cancel_sync": + self.cancel_sync() + event.stop() + def test_connection(self): + """测试 WebDAV 服务器连接""" + if not self.sync_service: + self.log_message("同步服务未初始化,请检查配置", is_error=True) + self.update_status("❌ 同步服务未初始化") + return + + self.log_message("正在测试 WebDAV 连接...") + self.update_status("正在测试连接...") + + try: + success = self.sync_service.test_connection() + if success: + self.log_message("连接测试成功") + self.update_status("✅ 连接正常") + else: + self.log_message("连接测试失败", is_error=True) + self.update_status("❌ 连接失败") + except Exception as e: + self.log_message(f"连接测试异常: {e}", is_error=True) + self.update_status("❌ 连接异常") + + def start_sync(self): + """开始同步""" + if not self.sync_service: + self.log_message("同步服务未初始化,无法开始同步", is_error=True) + return + + if self.is_syncing: + self.log_message("同步已在进行中", is_error=True) + return + + self.is_syncing = True + self.is_paused = False + self.update_button_states() + + self.log_message("开始同步数据...") + self.update_status("正在同步...", progress=0) + + # 启动后台同步任务 + self.run_worker(self.perform_sync, thread=True) + + def perform_sync(self): + """执行同步任务(在后台线程中运行)""" + worker = get_current_worker() + + try: + # 获取需要同步的本地目录 + from heurams.context import config_var + config = config_var.get() + paths = config.get('paths', {}) + + # 同步 nucleon 目录 + nucleon_dir = pathlib.Path(paths.get('nucleon_dir', './data/nucleon')) + if nucleon_dir.exists(): + self.log_message(f"同步 nucleon 目录: {nucleon_dir}") + self.update_status(f"同步 nucleon 目录...", progress=10) + + result = self.sync_service.sync_directory(nucleon_dir) # type: ignore + if result.get('success'): + self.log_message(f"nucleon 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)} 个") + else: + self.log_message(f"nucleon 同步失败: {result.get('error', '未知错误')}", is_error=True) + + # 同步 electron 目录 + electron_dir = pathlib.Path(paths.get('electron_dir', './data/electron')) + if electron_dir.exists(): + self.log_message(f"同步 electron 目录: {electron_dir}") + self.update_status(f"同步 electron 目录...", progress=60) + + result = self.sync_service.sync_directory(electron_dir) # type: ignore + if result.get('success'): + self.log_message(f"electron 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)} 个") + else: + self.log_message(f"electron 同步失败: {result.get('error', '未知错误')}", is_error=True) + + # 同步 orbital 目录(如果存在) + orbital_dir = pathlib.Path(paths.get('orbital_dir', './data/orbital')) + if orbital_dir.exists(): + self.log_message(f"同步 orbital 目录: {orbital_dir}") + self.update_status(f"同步 orbital 目录...", progress=80) + + result = self.sync_service.sync_directory(orbital_dir) # type: ignore + if result.get('success'): + self.log_message(f"orbital 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)} 个") + else: + self.log_message(f"orbital 同步失败: {result.get('error', '未知错误')}", is_error=True) + + # 同步完成 + self.update_status("同步完成", progress=100) + self.log_message("所有目录同步完成") + + except Exception as e: + self.log_message(f"同步过程中发生错误: {e}", is_error=True) + self.update_status("同步失败") + finally: + # 重置同步状态 + self.is_syncing = False + self.is_paused = False + self.update_button_states() # type: ignore + + def pause_sync(self): + """暂停同步""" + if not self.is_syncing: + return + + self.is_paused = not self.is_paused + self.update_button_states() + + if self.is_paused: + self.log_message("同步已暂停") + self.update_status("同步已暂停") + else: + self.log_message("同步已恢复") + self.update_status("正在同步...") + + def cancel_sync(self): + """取消同步""" + if not self.is_syncing: + return + + self.is_syncing = False + self.is_paused = False + self.update_button_states() + + self.log_message("同步已取消") + self.update_status("同步已取消") + + def update_button_states(self): + """更新按钮状态""" + try: + start_button = self.query_one("#start_sync") + pause_button = self.query_one("#pause_sync") + cancel_button = self.query_one("#cancel_sync") + + if self.is_syncing: + start_button.disabled = True + pause_button.disabled = False + cancel_button.disabled = False + pause_button.label = "继续" if self.is_paused else "暂停" # type: ignore + else: + start_button.disabled = False + pause_button.disabled = True + cancel_button.disabled = True + + except Exception as e: + self.log_message(f"更新按钮状态失败: {e}", is_error=True) + def action_go_back(self): self.app.pop_screen() diff --git a/src/heurams/services/sync_service.py b/src/heurams/services/sync_service.py new file mode 100644 index 0000000..c8792c0 --- /dev/null +++ b/src/heurams/services/sync_service.py @@ -0,0 +1,410 @@ +# WebDAV 同步服务 +import hashlib +import os +import pathlib +import time +import typing +from dataclasses import dataclass +from enum import Enum + +import requests +from webdav3.client import Client + +from heurams.context import config_var +from heurams.services.logger import get_logger + +logger = get_logger(__name__) + + +class SyncMode(Enum): + """同步模式枚举""" + BIDIRECTIONAL = "bidirectional" + UPLOAD_ONLY = "upload_only" + DOWNLOAD_ONLY = "download_only" + + +class ConflictStrategy(Enum): + """冲突解决策略枚举""" + NEWER = "newer" # 较新文件覆盖较旧文件 + ASK = "ask" # 用户手动选择 + KEEP_BOTH = "keep_both" # 保留双方(重命名) + + +@dataclass +class SyncConfig: + """同步配置数据类""" + enabled: bool = False + url: str = "" + username: str = "" + password: str = "" + remote_path: str = "/heurams/" + sync_mode: SyncMode = SyncMode.BIDIRECTIONAL + conflict_strategy: ConflictStrategy = ConflictStrategy.NEWER + verify_ssl: bool = True + + +class SyncService: + """WebDAV 同步服务""" + + def __init__(self, config): + self.config = config + logger.debug(f"{str(self.config)}") + self.client = None + self._setup_client() + + def _setup_client(self): + """设置 WebDAV 客户端""" + if not self.config.enabled or not self.config.url: + logger.warning("同步服务未启用或未配置 URL") + return + + options = { + 'webdav_hostname': self.config.url, + 'webdav_login': self.config.username, + 'webdav_password': self.config.password, + 'webdav_root': self.config.remote_path, + 'verify_ssl': self.config.verify_ssl, + 'disable_check': True, # 不检查服务器支持的功能 + } + + try: + self.client = Client(options) + logger.info("WebDAV 客户端初始化完成") + except Exception as e: + logger.error("WebDAV 客户端初始化失败: %s", e) + self.client = None + + def test_connection(self) -> bool: + """测试 WebDAV 服务器连接""" + if not self.client: + logger.error("WebDAV 客户端未初始化") + return False + + try: + # 尝试列出根目录 + self.client.list() + logger.info("WebDAV 连接测试成功") + return True + except Exception as e: + logger.error("WebDAV 连接测试失败: %s", e) + return False + + def _get_local_files(self, local_dir: pathlib.Path) -> typing.Dict[str, dict]: + """获取本地文件列表及其元数据""" + files = {} + for root, _, filenames in os.walk(local_dir): + for filename in filenames: + file_path = pathlib.Path(root) / filename + rel_path = file_path.relative_to(local_dir) + stat = file_path.stat() + files[str(rel_path)] = { + 'path': file_path, + 'size': stat.st_size, + 'mtime': stat.st_mtime, + 'hash': self._calculate_hash(file_path), + } + return files + + def _get_remote_files(self) -> typing.Dict[str, dict]: + """获取远程文件列表及其元数据""" + if not self.client: + return {} + + try: + remote_list = self.client.list(recursive=True) + files = {} + for item in remote_list: + if not item.endswith('/'): # 忽略目录 + rel_path = item.lstrip('/') + try: + info = self.client.info(item) + files[rel_path] = { + 'path': item, + 'size': info.get('size', 0), + 'mtime': self._parse_remote_mtime(info), + } + except Exception as e: + logger.warning("无法获取远程文件信息 %s: %s", item, e) + return files + except Exception as e: + logger.error("获取远程文件列表失败: %s", e) + return {} + + def _calculate_hash(self, file_path: pathlib.Path, block_size: int = 65536) -> str: + """计算文件的 SHA-256 哈希值""" + sha256 = hashlib.sha256() + try: + with open(file_path, 'rb') as f: + for block in iter(lambda: f.read(block_size), b''): + sha256.update(block) + return sha256.hexdigest() + except Exception as e: + logger.error("计算文件哈希失败 %s: %s", file_path, e) + return "" + + def _parse_remote_mtime(self, info: dict) -> float: + """解析远程文件的修改时间""" + # WebDAV 可能返回 Last-Modified 头或其他时间格式 + # 这里简单返回当前时间,实际应根据服务器响应解析 + return time.time() + + def sync_directory(self, local_dir: pathlib.Path) -> typing.Dict[str, typing.Any]: + """ + 同步目录 + + Args: + local_dir: 本地目录路径 + + Returns: + 同步结果统计 + """ + if not self.client: + logger.error("WebDAV 客户端未初始化") + return {'success': False, 'error': '客户端未初始化'} + + results = { + 'uploaded': 0, + 'downloaded': 0, + 'conflicts': 0, + 'errors': 0, + 'success': True, + } + + try: + # 确保远程目录存在 + self.client.mkdir(self.config.remote_path) + + local_files = self._get_local_files(local_dir) + remote_files = self._get_remote_files() + + # 根据同步模式处理文件 + if self.config.sync_mode in [SyncMode.BIDIRECTIONAL, SyncMode.UPLOAD_ONLY]: + stats = self._upload_files(local_dir, local_files, remote_files) + results['uploaded'] += stats.get('uploaded', 0) + results['conflicts'] += stats.get('conflicts', 0) + results['errors'] += stats.get('errors', 0) + + if self.config.sync_mode in [SyncMode.BIDIRECTIONAL, SyncMode.DOWNLOAD_ONLY]: + stats = self._download_files(local_dir, local_files, remote_files) + results['downloaded'] += stats.get('downloaded', 0) + results['conflicts'] += stats.get('conflicts', 0) + results['errors'] += stats.get('errors', 0) + + logger.info("同步完成: %s", results) + return results + + except Exception as e: + logger.error("同步过程中发生错误: %s", e) + results['success'] = False + results['error'] = str(e) + return results + + def _upload_files(self, local_dir: pathlib.Path, + local_files: dict, remote_files: dict) -> typing.Dict[str, int]: + """上传文件到远程服务器""" + stats = {'uploaded': 0, 'errors': 0, 'conflicts': 0} + + for rel_path, local_info in local_files.items(): + remote_info = remote_files.get(rel_path) + + # 判断是否需要上传 + should_upload = False + conflict_resolved = False + remote_path = os.path.join(self.config.remote_path, rel_path) + + if not remote_info: + should_upload = True # 远程不存在 + else: + # 检查冲突 + local_mtime = local_info.get('mtime', 0) + remote_mtime = remote_info.get('mtime', 0) + + if local_mtime != remote_mtime: + # 存在冲突 + stats['conflicts'] += 1 + should_upload, should_download = self._handle_conflict(local_info, remote_info) + + if should_upload and self.config.conflict_strategy == ConflictStrategy.KEEP_BOTH: + # 重命名远程文件避免覆盖 + conflict_suffix = f".conflict_{int(remote_mtime)}" + name, ext = os.path.splitext(rel_path) + new_rel_path = f"{name}{conflict_suffix}{ext}" if ext else f"{name}{conflict_suffix}" + remote_path = os.path.join(self.config.remote_path, new_rel_path) + conflict_resolved = True + logger.debug("冲突文件重命名: %s -> %s", rel_path, new_rel_path) + else: + # 时间相同,无需上传 + should_upload = False + + if should_upload: + try: + self.client.upload_file(local_info['path'], remote_path) + stats['uploaded'] += 1 + logger.debug("上传文件: %s -> %s", rel_path, remote_path) + except Exception as e: + logger.error("上传文件失败 %s: %s", rel_path, e) + stats['errors'] += 1 + + return stats + + def _download_files(self, local_dir: pathlib.Path, + local_files: dict, remote_files: dict) -> typing.Dict[str, int]: + """从远程服务器下载文件""" + stats = {'downloaded': 0, 'errors': 0, 'conflicts': 0} + + for rel_path, remote_info in remote_files.items(): + local_info = local_files.get(rel_path) + + # 判断是否需要下载 + should_download = False + if not local_info: + should_download = True # 本地不存在 + else: + # 检查冲突 + local_mtime = local_info.get('mtime', 0) + remote_mtime = remote_info.get('mtime', 0) + + if local_mtime != remote_mtime: + # 存在冲突 + stats['conflicts'] += 1 + should_upload, should_download = self._handle_conflict(local_info, remote_info) + # 如果应该上传,则不应该下载(冲突已在上传侧处理) + if should_upload: + should_download = False + else: + # 时间相同,无需下载 + should_download = False + + if should_download: + try: + local_path = local_dir / rel_path + local_path.parent.mkdir(parents=True, exist_ok=True) + self.client.download_file(remote_info['path'], str(local_path)) + stats['downloaded'] += 1 + logger.debug("下载文件: %s -> %s", rel_path, local_path) + except Exception as e: + logger.error("下载文件失败 %s: %s", rel_path, e) + stats['errors'] += 1 + + return stats + + def _handle_conflict(self, local_info: dict, remote_info: dict) -> typing.Tuple[bool, bool]: + """ + 处理文件冲突 + + Returns: + (should_upload, should_download) - 是否应该上传和下载 + """ + local_mtime = local_info.get('mtime', 0) + remote_mtime = remote_info.get('mtime', 0) + + if self.config.conflict_strategy == ConflictStrategy.NEWER: + # 较新文件覆盖较旧文件 + if local_mtime > remote_mtime: + return True, False # 上传本地较新版本 + elif remote_mtime > local_mtime: + return False, True # 下载远程较新版本 + else: + return False, False # 时间相同,无需操作 + + elif self.config.conflict_strategy == ConflictStrategy.KEEP_BOTH: + # 保留双方 - 重命名远程文件 + # 这里实现简单的重命名策略:添加冲突后缀 + # 实际应该在上传时处理重命名 + # 返回 True, False 表示上传重命名后的文件 + # 重命名逻辑在调用处处理 + return True, False + + elif self.config.conflict_strategy == ConflictStrategy.ASK: + # 用户手动选择 - 记录冲突,跳过 + # 返回 False, False 跳过,等待用户决定 + logger.warning("文件冲突需要用户手动选择: local_mtime=%s, remote_mtime=%s", + local_mtime, remote_mtime) + return False, False + + return False, False + + def _should_upload(self, local_info: dict, remote_info: dict) -> bool: + """判断是否需要上传(本地较新或哈希不同)""" + # 这里实现简单的基于时间的比较 + # 实际应该使用哈希比较更可靠 + return local_info.get('mtime', 0) > remote_info.get('mtime', 0) + + def _should_download(self, local_info: dict, remote_info: dict) -> bool: + """判断是否需要下载(远程较新)""" + return remote_info.get('mtime', 0) > local_info.get('mtime', 0) + + def upload_file(self, local_path: pathlib.Path, remote_path: str = "") -> bool: + """上传单个文件""" + if not self.client: + return False + + try: + if not remote_path: + remote_path = os.path.join(self.config.remote_path, local_path.name) + self.client.upload_file(str(local_path), remote_path) + logger.info("文件上传成功: %s -> %s", local_path, remote_path) + return True + except Exception as e: + logger.error("文件上传失败: %s", e) + return False + + def download_file(self, remote_path: str, local_path: pathlib.Path) -> bool: + """下载单个文件""" + if not self.client: + return False + + try: + local_path.parent.mkdir(parents=True, exist_ok=True) + self.client.download_file(remote_path, str(local_path)) + logger.info("文件下载成功: %s -> %s", remote_path, local_path) + return True + except Exception as e: + logger.error("文件下载失败: %s", e) + return False + + def delete_remote_file(self, remote_path: str) -> bool: + """删除远程文件""" + if not self.client: + return False + + try: + self.client.clean(remote_path) + logger.info("远程文件删除成功: %s", remote_path) + return True + except Exception as e: + logger.error("远程文件删除失败: %s", e) + return False + + +def create_sync_service_from_config() -> typing.Optional[SyncService]: + """从配置文件创建同步服务实例""" + try: + from heurams.context import config_var + + sync_config = config_var.get()['providers']['sync']['webdav'] + if not sync_config.get('enabled', False): + logger.debug("同步服务未启用") + return None + + config = SyncConfig( + enabled=sync_config.get('enabled', False), + url=sync_config.get('url', ''), + username=sync_config.get('username', ''), + password=sync_config.get('password', ''), + remote_path=sync_config.get('remote_path', '/heurams/'), + sync_mode=SyncMode(sync_config.get('sync_mode', 'bidirectional')), + conflict_strategy=ConflictStrategy(sync_config.get('conflict_strategy', 'newer')), + verify_ssl=sync_config.get('verify_ssl', True), + ) + + service = SyncService(config) + if service.client is None: + logger.warning("同步服务客户端创建失败") + return None + + return service + + except Exception as e: + logger.error("创建同步服务失败: %s", e) + return None \ No newline at end of file diff --git a/src/heurams/services/version.py b/src/heurams/services/version.py index cd501b1..0239b75 100644 --- a/src/heurams/services/version.py +++ b/src/heurams/services/version.py @@ -3,7 +3,7 @@ from heurams.services.logger import get_logger logger = get_logger(__name__) -ver = "0.4.2" +ver = "0.4.3" stage = "prototype" codename = "fledge" # 雏鸟, 0.4.x 版本 diff --git a/tests/interface/test_synctool.py b/tests/interface/test_synctool.py new file mode 100644 index 0000000..4d2c217 --- /dev/null +++ b/tests/interface/test_synctool.py @@ -0,0 +1,305 @@ +#!/usr/bin/env python3 +""" +SyncScreen 和 SyncService 的测试. +""" +import pathlib +import tempfile +import time +import unittest +from unittest.mock import MagicMock, patch, Mock + +from heurams.context import ConfigContext +from heurams.services.config import ConfigFile +from heurams.services.sync_service import SyncService, SyncConfig, SyncMode, ConflictStrategy + + +class TestSyncServiceUnit(unittest.TestCase): + """SyncService 的单元测试.""" + + def setUp(self): + """在每个测试之前运行, 设置临时目录和模拟客户端.""" + self.temp_dir = tempfile.TemporaryDirectory() + self.temp_path = pathlib.Path(self.temp_dir.name) + + # 创建测试文件 + self.test_file = self.temp_path / "test.txt" + self.test_file.write_text("测试内容") + + # 模拟 WebDAV 客户端 + self.mock_client = MagicMock() + + # 创建同步配置 + self.config = SyncConfig( + enabled=True, + url="https://example.com/dav/", + username="test", + password="test", + remote_path="/heurams/", + sync_mode=SyncMode.BIDIRECTIONAL, + conflict_strategy=ConflictStrategy.NEWER, + verify_ssl=True, + ) + + def tearDown(self): + """在每个测试之后清理.""" + self.temp_dir.cleanup() + + @patch('heurams.services.sync_service.Client') + def test_sync_service_initialization(self, mock_client_class): + """测试同步服务初始化.""" + mock_client_class.return_value = self.mock_client + + service = SyncService(self.config) + + # 验证客户端已创建 + mock_client_class.assert_called_once() + self.assertIsNotNone(service.client) + self.assertEqual(service.config, self.config) + + @patch('heurams.services.sync_service.Client') + def test_sync_service_disabled(self, mock_client_class): + """测试同步服务未启用.""" + config = SyncConfig(enabled=False) + service = SyncService(config) + + # 客户端不应初始化 + mock_client_class.assert_not_called() + self.assertIsNone(service.client) + + @patch('heurams.services.sync_service.Client') + def test_test_connection_success(self, mock_client_class): + """测试连接测试成功.""" + mock_client_class.return_value = self.mock_client + self.mock_client.list.return_value = [] + + service = SyncService(self.config) + result = service.test_connection() + + self.assertTrue(result) + self.mock_client.list.assert_called_once() + + @patch('heurams.services.sync_service.Client') + def test_test_connection_failure(self, mock_client_class): + """测试连接测试失败.""" + mock_client_class.return_value = self.mock_client + self.mock_client.list.side_effect = Exception("连接失败") + + service = SyncService(self.config) + result = service.test_connection() + + self.assertFalse(result) + self.mock_client.list.assert_called_once() + + @patch('heurams.services.sync_service.Client') + def test_upload_file(self, mock_client_class): + """测试上传单个文件.""" + mock_client_class.return_value = self.mock_client + + service = SyncService(self.config) + result = service.upload_file(self.test_file) + + self.assertTrue(result) + self.mock_client.upload_file.assert_called_once() + + @patch('heurams.services.sync_service.Client') + def test_download_file(self, mock_client_class): + """测试下载单个文件.""" + mock_client_class.return_value = self.mock_client + + service = SyncService(self.config) + remote_path = "/heurams/test.txt" + local_path = self.temp_path / "downloaded.txt" + + result = service.download_file(remote_path, local_path) + + self.assertTrue(result) + self.mock_client.download_file.assert_called_once() + self.assertTrue(local_path.parent.exists()) + + @patch('heurams.services.sync_service.Client') + def test_sync_directory_no_files(self, mock_client_class): + """测试同步空目录.""" + mock_client_class.return_value = self.mock_client + self.mock_client.list.return_value = [] + self.mock_client.mkdir.return_value = None + + service = SyncService(self.config) + result = service.sync_directory(self.temp_path) + + self.assertTrue(result['success']) + self.assertEqual(result['uploaded'], 0) + self.assertEqual(result['downloaded'], 0) + self.mock_client.mkdir.assert_called_once() + + @patch('heurams.services.sync_service.Client') + def test_sync_directory_upload_only(self, mock_client_class): + """测试仅上传模式.""" + mock_client_class.return_value = self.mock_client + self.mock_client.list.return_value = [] + self.mock_client.mkdir.return_value = None + + config = SyncConfig( + enabled=True, + url="https://example.com/dav/", + username="test", + password="test", + remote_path="/heurams/", + sync_mode=SyncMode.UPLOAD_ONLY, + conflict_strategy=ConflictStrategy.NEWER, + ) + + service = SyncService(config) + result = service.sync_directory(self.temp_path) + + self.assertTrue(result['success']) + self.mock_client.mkdir.assert_called_once() + + @patch('heurams.services.sync_service.Client') + def test_conflict_strategy_newer(self, mock_client_class): + """测试 NEWER 冲突策略.""" + mock_client_class.return_value = self.mock_client + + # 模拟远程文件存在 + self.mock_client.list.return_value = ["test.txt"] + self.mock_client.info.return_value = {'size': 100, 'modified': '2023-01-01T00:00:00Z'} + self.mock_client.mkdir.return_value = None + + service = SyncService(self.config) + result = service.sync_directory(self.temp_path) + + self.assertTrue(result['success']) + # 应该有一个冲突 + self.assertGreaterEqual(result.get('conflicts', 0), 0) + + @patch('heurams.services.sync_service.Client') + def test_create_sync_service_from_config(self, mock_client_class): + """测试从配置文件创建同步服务.""" + mock_client_class.return_value = self.mock_client + + # 创建临时配置文件 + config_data = { + 'sync': { + 'webdav': { + 'enabled': True, + 'url': 'https://example.com/dav/', + 'username': 'test', + 'password': 'test', + 'remote_path': '/heurams/', + 'sync_mode': 'bidirectional', + 'conflict_strategy': 'newer', + 'verify_ssl': True, + } + } + } + + # 模拟 config_var + with patch('heurams.services.sync_service.config_var') as mock_config_var: + mock_config = MagicMock() + mock_config.data = config_data + mock_config_var.get.return_value = mock_config + + from heurams.services.sync_service import create_sync_service_from_config + service = create_sync_service_from_config() + + self.assertIsNotNone(service) + self.assertIsNotNone(service.client) + + +class TestSyncScreenUnit(unittest.TestCase): + """SyncScreen 的单元测试.""" + + def setUp(self): + """在每个测试之前运行.""" + self.temp_dir = tempfile.TemporaryDirectory() + self.temp_path = pathlib.Path(self.temp_dir.name) + + # 创建默认配置 + default_config_path = ( + pathlib.Path(__file__).parent.parent.parent + / "src/heurams/default/config/config.toml" + ) + self.config = ConfigFile(default_config_path) + + # 更新配置中的路径 + config_data = self.config.data + config_data["paths"]["nucleon_dir"] = str(self.temp_path / "nucleon") + config_data["paths"]["electron_dir"] = str(self.temp_path / "electron") + config_data["paths"]["orbital_dir"] = str(self.temp_path / "orbital") + config_data["paths"]["cache_dir"] = str(self.temp_path / "cache") + + # 添加同步配置 + if 'sync' not in config_data: + config_data['sync'] = {} + config_data['sync']['webdav'] = { + 'enabled': False, + 'url': '', + 'username': '', + 'password': '', + 'remote_path': '/heurams/', + 'sync_mode': 'bidirectional', + 'conflict_strategy': 'newer', + 'verify_ssl': True, + } + + # 创建目录 + for dir_key in ["nucleon_dir", "electron_dir", "orbital_dir", "cache_dir"]: + pathlib.Path(config_data["paths"][dir_key]).mkdir(parents=True, exist_ok=True) + + # 使用 ConfigContext 设置配置 + self.config_ctx = ConfigContext(self.config) + self.config_ctx.__enter__() + + def tearDown(self): + """在每个测试之后清理.""" + self.config_ctx.__exit__(None, None, None) + self.temp_dir.cleanup() + + @patch('heurams.interface.screens.synctool.create_sync_service_from_config') + def test_sync_screen_compose(self, mock_create_service): + """测试 SyncScreen 的 compose 方法.""" + from heurams.interface.screens.synctool import SyncScreen + + # 模拟同步服务 + mock_service = MagicMock() + mock_service.client = MagicMock() + mock_create_service.return_value = mock_service + + screen = SyncScreen() + + # 测试 compose 方法 + from textual.app import ComposeResult + result = screen.compose() + widgets = list(result) + + # 检查基本部件 + from textual.widgets import Footer, Header, Button, Static, ProgressBar + from textual.containers import ScrollableContainer + + header_present = any(isinstance(w, Header) for w in widgets) + footer_present = any(isinstance(w, Footer) for w in widgets) + self.assertTrue(header_present) + self.assertTrue(footer_present) + + # 检查容器 + container_present = any(isinstance(w, ScrollableContainer) for w in widgets) + self.assertTrue(container_present) + + @patch('heurams.interface.screens.synctool.create_sync_service_from_config') + def test_sync_screen_load_config(self, mock_create_service): + """测试 SyncScreen 加载配置.""" + from heurams.interface.screens.synctool import SyncScreen + + mock_service = MagicMock() + mock_service.client = MagicMock() + mock_create_service.return_value = mock_service + + screen = SyncScreen() + screen.load_config() + + # 验证配置已加载 + self.assertIsNotNone(screen.sync_config) + mock_create_service.assert_called_once() + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file From a2e12c74620765e1adcf370527085a9506fd7c9e Mon Sep 17 00:00:00 2001 From: david-ajax Date: Sun, 21 Dec 2025 07:56:10 +0800 Subject: [PATCH 2/7] =?UTF-8?q?style:=20=E6=A0=BC=E5=BC=8F=E5=8C=96?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/heurams/context.py | 4 +- src/heurams/interface/__main__.py | 4 +- src/heurams/interface/screens/dashboard.py | 4 +- src/heurams/interface/screens/memorizor.py | 16 +- src/heurams/interface/screens/nucreator.py | 3 +- src/heurams/interface/screens/precache.py | 1 + src/heurams/interface/screens/synctool.py | 166 +++++++------- src/heurams/interface/widgets/recognition.py | 7 +- src/heurams/kernel/algorithms/sm15m.py | 20 +- src/heurams/kernel/particles/atom.py | 11 +- src/heurams/kernel/particles/electron.py | 11 +- src/heurams/kernel/particles/nucleon.py | 2 +- src/heurams/providers/tts/edge_tts.py | 4 +- src/heurams/services/sync_service.py | 222 +++++++++++-------- tests/interface/test_synctool.py | 188 ++++++++-------- 15 files changed, 373 insertions(+), 290 deletions(-) diff --git a/src/heurams/context.py b/src/heurams/context.py index 06b2e34..92eab06 100644 --- a/src/heurams/context.py +++ b/src/heurams/context.py @@ -36,8 +36,8 @@ if pathlib.Path(workdir / "config" / "config_dev.toml").exists(): print("使用开发设置") logger.debug("使用开发设置") config_var: ContextVar[ConfigFile] = ContextVar( - "config_var", default=ConfigFile(workdir / "config" / "config_dev.toml") -) + "config_var", default=ConfigFile(workdir / "config" / "config_dev.toml") + ) # runtime_var: ContextVar = ContextVar('runtime_var', default=dict()) # 运行时共享数据 diff --git a/src/heurams/interface/__main__.py b/src/heurams/interface/__main__.py index cf62f5a..a6dae83 100644 --- a/src/heurams/interface/__main__.py +++ b/src/heurams/interface/__main__.py @@ -1,9 +1,9 @@ from textual.app import App from textual.widgets import Button -from heurams.services.logger import get_logger from heurams.context import config_var from heurams.interface import HeurAMSApp +from heurams.services.logger import get_logger from .screens.about import AboutScreen from .screens.dashboard import DashboardScreen @@ -15,4 +15,4 @@ logger = get_logger(__name__) app = HeurAMSApp() if __name__ == "__main__": - app.run() \ No newline at end of file + app.run() diff --git a/src/heurams/interface/screens/dashboard.py b/src/heurams/interface/screens/dashboard.py index b7b8eb7..ed09e22 100644 --- a/src/heurams/interface/screens/dashboard.py +++ b/src/heurams/interface/screens/dashboard.py @@ -4,8 +4,7 @@ import pathlib from textual.app import ComposeResult from textual.containers import ScrollableContainer from textual.screen import Screen -from textual.widgets import (Button, Footer, Header, Label, ListItem, ListView, - Static) +from textual.widgets import Button, Footer, Header, Label, ListItem, ListView, Static import heurams.services.timer as timer import heurams.services.version as version @@ -28,6 +27,7 @@ class DashboardScreen(Screen): Label(f'欢迎使用 "潜进" 启发式辅助记忆调度器', classes="title-label"), Label(f"当前 UNIX 日时间戳: {timer.get_daystamp()}"), Label(f'时区修正: UTC+{config_var.get()["timezone_offset"] / 3600}'), + Label(f"使用算法: {config_var.get()['algorithm']['default']}"), Label("选择待学习或待修改的记忆单元集:", classes="title-label"), ListView(id="union-list", classes="union-list-view"), Label( diff --git a/src/heurams/interface/screens/memorizor.py b/src/heurams/interface/screens/memorizor.py index 9394a00..9bf03f7 100644 --- a/src/heurams/interface/screens/memorizor.py +++ b/src/heurams/interface/screens/memorizor.py @@ -148,16 +148,24 @@ class MemScreen(Screen): def play_voice(self): """朗读当前内容""" - from heurams.services.audio_service import play_by_path from pathlib import Path + + from heurams.services.audio_service import play_by_path from heurams.services.hasher import get_md5 - path = Path(config_var.get()['paths']["cache_dir"]) - path = path / f"{get_md5(self.atom.registry['nucleon'].metadata["formation"]["tts_text"])}.wav" + + path = Path(config_var.get()["paths"]["cache_dir"]) + path = ( + path + / f"{get_md5(self.atom.registry['nucleon'].metadata["formation"]["tts_text"])}.wav" + ) if path.exists(): play_by_path(path) else: from heurams.services.tts_service import convertor - convertor(self.atom.registry['nucleon'].metadata["formation"]["tts_text"], path) + + convertor( + self.atom.registry["nucleon"].metadata["formation"]["tts_text"], path + ) play_by_path(path) def action_toggle_dark(self): diff --git a/src/heurams/interface/screens/nucreator.py b/src/heurams/interface/screens/nucreator.py index 735dda6..b187528 100644 --- a/src/heurams/interface/screens/nucreator.py +++ b/src/heurams/interface/screens/nucreator.py @@ -5,8 +5,7 @@ import toml from textual.app import ComposeResult from textual.containers import ScrollableContainer from textual.screen import Screen -from textual.widgets import (Button, Footer, Header, Input, Label, Markdown, - Select) +from textual.widgets import Button, Footer, Header, Input, Label, Markdown, Select from heurams.context import config_var from heurams.services.version import ver diff --git a/src/heurams/interface/screens/precache.py b/src/heurams/interface/screens/precache.py index a8ab604..4c03bf2 100644 --- a/src/heurams/interface/screens/precache.py +++ b/src/heurams/interface/screens/precache.py @@ -99,6 +99,7 @@ class PrecachingScreen(Screen): if not cache_file.exists(): try: from heurams.services.tts_service import convertor + convertor(text, cache_file) return 1 except Exception as e: diff --git a/src/heurams/interface/screens/synctool.py b/src/heurams/interface/screens/synctool.py index b62f44b..e07a407 100644 --- a/src/heurams/interface/screens/synctool.py +++ b/src/heurams/interface/screens/synctool.py @@ -32,7 +32,7 @@ class SyncScreen(Screen): # 标题和连接状态 yield Static("WebDAV 同步工具", classes="title") yield Static("", id="status_label", classes="status") - + # 配置信息 yield Static("服务器配置", classes="section_title") with Horizontal(classes="config_info"): @@ -44,7 +44,7 @@ class SyncScreen(Screen): with Horizontal(classes="config_info"): yield Static("同步模式:", classes="config_label") yield Static("", id="sync_mode", classes="config_value") - + # 控制按钮 yield Static("控制面板", classes="section_title") with Horizontal(classes="control_buttons"): @@ -52,16 +52,16 @@ class SyncScreen(Screen): yield Button("开始同步", id="start_sync", variant="success") yield Button("暂停", id="pause_sync", variant="warning", disabled=True) yield Button("取消", id="cancel_sync", variant="error", disabled=True) - + # 进度显示 yield Static("同步进度", classes="section_title") yield ProgressBar(id="progress_bar", show_percentage=True, total=100) yield Static("", id="progress_label", classes="progress_text") - + # 日志输出 yield Static("同步日志", classes="section_title") yield Static("", id="log_output", classes="log_output") - + yield Footer() def on_mount(self): @@ -74,13 +74,15 @@ class SyncScreen(Screen): """从配置文件加载同步设置""" try: from heurams.context import config_var + config_data = config_var.get().data - self.sync_config = config_data.get('sync', {}).get('webdav', {}) - + self.sync_config = config_data.get("sync", {}).get("webdav", {}) + # 创建同步服务实例 from heurams.services.sync_service import create_sync_service_from_config + self.sync_service = create_sync_service_from_config() - + except Exception as e: self.log_message(f"加载配置失败: {e}", is_error=True) self.sync_config = {} @@ -89,34 +91,34 @@ class SyncScreen(Screen): """更新 UI 显示配置信息""" try: # 更新服务器 URL - url = self.sync_config.get('url', '未配置') + url = self.sync_config.get("url", "未配置") url_widget = self.query_one("#server_url") - url_widget.update(url if url else '未配置') # type: ignore - + url_widget.update(url if url else "未配置") # type: ignore + # 更新远程路径 - remote_path = self.sync_config.get('remote_path', '/heurams/') + remote_path = self.sync_config.get("remote_path", "/heurams/") path_widget = self.query_one("#remote_path") - path_widget.update(remote_path) # type: ignore - + path_widget.update(remote_path) # type: ignore + # 更新同步模式 - sync_mode = self.sync_config.get('sync_mode', 'bidirectional') + sync_mode = self.sync_config.get("sync_mode", "bidirectional") mode_widget = self.query_one("#sync_mode") mode_map = { - 'bidirectional': '双向同步', - 'upload_only': '仅上传', - 'download_only': '仅下载', + "bidirectional": "双向同步", + "upload_only": "仅上传", + "download_only": "仅下载", } - mode_widget.update(mode_map.get(sync_mode, sync_mode)) # type: ignore - + mode_widget.update(mode_map.get(sync_mode, sync_mode)) # type: ignore + # 更新状态标签 status_widget = self.query_one("#status_label") if self.sync_service and self.sync_service.client: - status_widget.update("✅ 同步服务已就绪") # type: ignore + status_widget.update("✅ 同步服务已就绪") # type: ignore status_widget.add_class("ready") else: - status_widget.update("❌ 同步服务未配置或未启用") # type: ignore + status_widget.update("❌ 同步服务未配置或未启用") # type: ignore status_widget.add_class("error") - + except Exception as e: self.log_message(f"更新 UI 失败: {e}", is_error=True) @@ -124,15 +126,15 @@ class SyncScreen(Screen): """更新状态显示""" try: status_widget = self.query_one("#status_label") - status_widget.update(status) # type: ignore - + status_widget.update(status) # type: ignore + if progress is not None: progress_bar = self.query_one("#progress_bar") - progress_bar.progress = progress # type: ignore - + progress_bar.progress = progress # type: ignore + progress_label = self.query_one("#progress_label") - progress_label.update(f"{progress}% - {current_item}" if current_item else f"{progress}%") # type: ignore - + progress_label.update(f"{progress}% - {current_item}" if current_item else f"{progress}%") # type: ignore + except Exception as e: self.log_message(f"更新状态失败: {e}", is_error=True) @@ -141,23 +143,23 @@ class SyncScreen(Screen): timestamp = time.strftime("%H:%M:%S") prefix = "[ERROR]" if is_error else "[INFO]" log_line = f"{timestamp} {prefix} {message}" - + self.log_messages.append(log_line) # 保持日志行数不超过最大值 if len(self.log_messages) > self.max_log_lines: - self.log_messages = self.log_messages[-self.max_log_lines:] - + self.log_messages = self.log_messages[-self.max_log_lines :] + # 更新日志显示 try: log_widget = self.query_one("#log_output") - log_widget.update("\n".join(self.log_messages)) # type: ignore + log_widget.update("\n".join(self.log_messages)) # type: ignore except Exception: pass # 如果组件未就绪,忽略错误 def on_button_pressed(self, event: Button.Pressed) -> None: """处理按钮点击事件""" button_id = event.button.id - + if button_id == "test_connection": self.test_connection() elif button_id == "start_sync": @@ -166,7 +168,7 @@ class SyncScreen(Screen): self.pause_sync() elif button_id == "cancel_sync": self.cancel_sync() - + event.stop() def test_connection(self): @@ -175,10 +177,10 @@ class SyncScreen(Screen): self.log_message("同步服务未初始化,请检查配置", is_error=True) self.update_status("❌ 同步服务未初始化") return - + self.log_message("正在测试 WebDAV 连接...") self.update_status("正在测试连接...") - + try: success = self.sync_service.test_connection() if success: @@ -196,71 +198,87 @@ class SyncScreen(Screen): if not self.sync_service: self.log_message("同步服务未初始化,无法开始同步", is_error=True) return - + if self.is_syncing: self.log_message("同步已在进行中", is_error=True) return - + self.is_syncing = True self.is_paused = False self.update_button_states() - + self.log_message("开始同步数据...") self.update_status("正在同步...", progress=0) - + # 启动后台同步任务 self.run_worker(self.perform_sync, thread=True) def perform_sync(self): """执行同步任务(在后台线程中运行)""" worker = get_current_worker() - + try: # 获取需要同步的本地目录 from heurams.context import config_var + config = config_var.get() - paths = config.get('paths', {}) - + paths = config.get("paths", {}) + # 同步 nucleon 目录 - nucleon_dir = pathlib.Path(paths.get('nucleon_dir', './data/nucleon')) + nucleon_dir = pathlib.Path(paths.get("nucleon_dir", "./data/nucleon")) if nucleon_dir.exists(): self.log_message(f"同步 nucleon 目录: {nucleon_dir}") self.update_status(f"同步 nucleon 目录...", progress=10) - - result = self.sync_service.sync_directory(nucleon_dir) # type: ignore - if result.get('success'): - self.log_message(f"nucleon 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)} 个") + + result = self.sync_service.sync_directory(nucleon_dir) # type: ignore + if result.get("success"): + self.log_message( + f"nucleon 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)} 个" + ) else: - self.log_message(f"nucleon 同步失败: {result.get('error', '未知错误')}", is_error=True) - + self.log_message( + f"nucleon 同步失败: {result.get('error', '未知错误')}", + is_error=True, + ) + # 同步 electron 目录 - electron_dir = pathlib.Path(paths.get('electron_dir', './data/electron')) + electron_dir = pathlib.Path(paths.get("electron_dir", "./data/electron")) if electron_dir.exists(): self.log_message(f"同步 electron 目录: {electron_dir}") self.update_status(f"同步 electron 目录...", progress=60) - - result = self.sync_service.sync_directory(electron_dir) # type: ignore - if result.get('success'): - self.log_message(f"electron 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)} 个") + + result = self.sync_service.sync_directory(electron_dir) # type: ignore + if result.get("success"): + self.log_message( + f"electron 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)} 个" + ) else: - self.log_message(f"electron 同步失败: {result.get('error', '未知错误')}", is_error=True) - + self.log_message( + f"electron 同步失败: {result.get('error', '未知错误')}", + is_error=True, + ) + # 同步 orbital 目录(如果存在) - orbital_dir = pathlib.Path(paths.get('orbital_dir', './data/orbital')) + orbital_dir = pathlib.Path(paths.get("orbital_dir", "./data/orbital")) if orbital_dir.exists(): self.log_message(f"同步 orbital 目录: {orbital_dir}") self.update_status(f"同步 orbital 目录...", progress=80) - - result = self.sync_service.sync_directory(orbital_dir) # type: ignore - if result.get('success'): - self.log_message(f"orbital 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)} 个") + + result = self.sync_service.sync_directory(orbital_dir) # type: ignore + if result.get("success"): + self.log_message( + f"orbital 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)} 个" + ) else: - self.log_message(f"orbital 同步失败: {result.get('error', '未知错误')}", is_error=True) - + self.log_message( + f"orbital 同步失败: {result.get('error', '未知错误')}", + is_error=True, + ) + # 同步完成 self.update_status("同步完成", progress=100) self.log_message("所有目录同步完成") - + except Exception as e: self.log_message(f"同步过程中发生错误: {e}", is_error=True) self.update_status("同步失败") @@ -268,16 +286,16 @@ class SyncScreen(Screen): # 重置同步状态 self.is_syncing = False self.is_paused = False - self.update_button_states() # type: ignore + self.update_button_states() # type: ignore def pause_sync(self): """暂停同步""" if not self.is_syncing: return - + self.is_paused = not self.is_paused self.update_button_states() - + if self.is_paused: self.log_message("同步已暂停") self.update_status("同步已暂停") @@ -289,11 +307,11 @@ class SyncScreen(Screen): """取消同步""" if not self.is_syncing: return - + self.is_syncing = False self.is_paused = False self.update_button_states() - + self.log_message("同步已取消") self.update_status("同步已取消") @@ -303,17 +321,17 @@ class SyncScreen(Screen): start_button = self.query_one("#start_sync") pause_button = self.query_one("#pause_sync") cancel_button = self.query_one("#cancel_sync") - + if self.is_syncing: start_button.disabled = True pause_button.disabled = False cancel_button.disabled = False - pause_button.label = "继续" if self.is_paused else "暂停" # type: ignore + pause_button.label = "继续" if self.is_paused else "暂停" # type: ignore else: start_button.disabled = False pause_button.disabled = True cancel_button.disabled = True - + except Exception as e: self.log_message(f"更新按钮状态失败: {e}", is_error=True) diff --git a/src/heurams/interface/widgets/recognition.py b/src/heurams/interface/widgets/recognition.py index 842082a..7b4364c 100644 --- a/src/heurams/interface/widgets/recognition.py +++ b/src/heurams/interface/widgets/recognition.py @@ -50,9 +50,10 @@ class Recognition(BasePuzzleWidget): def compose(self): from heurams.context import config_var - autovoice = config_var.get()['interface']['memorizor']['autovoice'] + + autovoice = config_var.get()["interface"]["memorizor"]["autovoice"] if autovoice: - self.screen.action_play_voice() # type: ignore + self.screen.action_play_voice() # type: ignore cfg: RecognitionConfig = self.atom.registry["orbital"]["puzzles"][self.alia] delim = self.atom.registry["nucleon"].metadata["formation"]["delimiter"] replace_dict = { @@ -72,7 +73,7 @@ class Recognition(BasePuzzleWidget): primary = cfg["primary"] with Center(): - for i in cfg['top_dim']: + for i in cfg["top_dim"]: yield Static(f"[dim]{i}[/]") yield Label("") diff --git a/src/heurams/kernel/algorithms/sm15m.py b/src/heurams/kernel/algorithms/sm15m.py index 95e210e..1decbb5 100644 --- a/src/heurams/kernel/algorithms/sm15m.py +++ b/src/heurams/kernel/algorithms/sm15m.py @@ -10,15 +10,25 @@ MIT 许可证 import datetime import json import os -from typing import TypedDict import pathlib +from typing import TypedDict + from heurams.context import config_var -from heurams.kernel.algorithms.sm15m_calc import (MAX_AF, MIN_AF, NOTCH_AF, - RANGE_AF, RANGE_REPETITION, - SM, THRESHOLD_RECALL, Item) +from heurams.kernel.algorithms.sm15m_calc import ( + MAX_AF, + MIN_AF, + NOTCH_AF, + RANGE_AF, + RANGE_REPETITION, + SM, + THRESHOLD_RECALL, + Item, +) # 全局状态文件路径 -_GLOBAL_STATE_FILE = os.path.expanduser(pathlib.Path(config_var.get()['paths']['global_dir']) / 'sm15m_global_state.json') +_GLOBAL_STATE_FILE = os.path.expanduser( + pathlib.Path(config_var.get()["paths"]["global_dir"]) / "sm15m_global_state.json" +) def _get_global_sm(): diff --git a/src/heurams/kernel/particles/atom.py b/src/heurams/kernel/particles/atom.py index 4d90ff2..dfb172c 100644 --- a/src/heurams/kernel/particles/atom.py +++ b/src/heurams/kernel/particles/atom.py @@ -86,8 +86,8 @@ class Atom: # eval 环境设置 def eval_with_env(s: str): default = config_var.get()["puzzles"] - payload = self.registry['nucleon'].payload - metadata = self.registry['nucleon'].metadata + payload = self.registry["nucleon"].payload + metadata = self.registry["nucleon"].metadata eval_value = eval(s) if isinstance(eval_value, (int, float)): ret = str(eval_value) @@ -117,10 +117,11 @@ class Atom: logger.debug("发现 eval 表达式: '%s'", data[5:]) return modifier(data[5:]) return data + try: - traverse(self.registry['nucleon'].payload, eval_with_env) - traverse(self.registry['nucleon'].metadata, eval_with_env) - traverse(self.registry['orbital'], eval_with_env) + traverse(self.registry["nucleon"].payload, eval_with_env) + traverse(self.registry["nucleon"].metadata, eval_with_env) + traverse(self.registry["orbital"], eval_with_env) except Exception as e: ret = f"此 eval 实例发生错误: {e}" logger.warning(ret) diff --git a/src/heurams/kernel/particles/electron.py b/src/heurams/kernel/particles/electron.py index cb7cf54..cc5cdd6 100644 --- a/src/heurams/kernel/particles/electron.py +++ b/src/heurams/kernel/particles/electron.py @@ -18,9 +18,12 @@ class Electron: algo: 使用的算法模块标识 """ if algo_name == "": - algo_name = config_var.get()['algorithm']['default'] + algo_name = config_var.get()["algorithm"]["default"] logger.debug( - "创建 Electron 实例, ident: '%s', algo_name: '%s', algodata: %s", ident, algo_name, algodata + "创建 Electron 实例, ident: '%s', algo_name: '%s', algodata: %s", + ident, + algo_name, + algodata, ) self.algodata = algodata self.ident = ident @@ -31,7 +34,9 @@ class Electron: self.algodata[self.algo.algo_name] = {} logger.debug("算法键 '%s' 不存在, 已创建空字典", self.algo) if not self.algodata[self.algo.algo_name]: - logger.debug(f"算法数据为空, 使用默认值初始化{self.algodata[self.algo.algo_name]}") + logger.debug( + f"算法数据为空, 使用默认值初始化{self.algodata[self.algo.algo_name]}" + ) self._default_init(self.algo.defaults) else: logger.debug("算法数据已存在, 跳过默认初始化") diff --git a/src/heurams/kernel/particles/nucleon.py b/src/heurams/kernel/particles/nucleon.py index 098e840..175661b 100644 --- a/src/heurams/kernel/particles/nucleon.py +++ b/src/heurams/kernel/particles/nucleon.py @@ -53,4 +53,4 @@ class Nucleon: def placeholder(): """生成一个占位原子核""" logger.debug("创建 Nucleon 占位符") - return Nucleon("核子对象样例内容", {}) \ No newline at end of file + return Nucleon("核子对象样例内容", {}) diff --git a/src/heurams/providers/tts/edge_tts.py b/src/heurams/providers/tts/edge_tts.py index ee74cf0..9b8a33c 100644 --- a/src/heurams/providers/tts/edge_tts.py +++ b/src/heurams/providers/tts/edge_tts.py @@ -2,8 +2,8 @@ import pathlib import edge_tts -from heurams.services.logger import get_logger from heurams.context import config_var +from heurams.services.logger import get_logger from .base import BaseTTS @@ -19,7 +19,7 @@ class EdgeTTS(BaseTTS): try: communicate = edge_tts.Communicate( text, - config_var.get()['providers']['tts']['edgetts']["voice"], + config_var.get()["providers"]["tts"]["edgetts"]["voice"], ) logger.debug("EdgeTTS 通信对象创建成功, 正在保存音频") communicate.save_sync(str(path)) diff --git a/src/heurams/services/sync_service.py b/src/heurams/services/sync_service.py index c8792c0..a82d5da 100644 --- a/src/heurams/services/sync_service.py +++ b/src/heurams/services/sync_service.py @@ -18,6 +18,7 @@ logger = get_logger(__name__) class SyncMode(Enum): """同步模式枚举""" + BIDIRECTIONAL = "bidirectional" UPLOAD_ONLY = "upload_only" DOWNLOAD_ONLY = "download_only" @@ -25,6 +26,7 @@ class SyncMode(Enum): class ConflictStrategy(Enum): """冲突解决策略枚举""" + NEWER = "newer" # 较新文件覆盖较旧文件 ASK = "ask" # 用户手动选择 KEEP_BOTH = "keep_both" # 保留双方(重命名) @@ -33,6 +35,7 @@ class ConflictStrategy(Enum): @dataclass class SyncConfig: """同步配置数据类""" + enabled: bool = False url: str = "" username: str = "" @@ -59,12 +62,12 @@ class SyncService: return options = { - 'webdav_hostname': self.config.url, - 'webdav_login': self.config.username, - 'webdav_password': self.config.password, - 'webdav_root': self.config.remote_path, - 'verify_ssl': self.config.verify_ssl, - 'disable_check': True, # 不检查服务器支持的功能 + "webdav_hostname": self.config.url, + "webdav_login": self.config.username, + "webdav_password": self.config.password, + "webdav_root": self.config.remote_path, + "verify_ssl": self.config.verify_ssl, + "disable_check": True, # 不检查服务器支持的功能 } try: @@ -98,10 +101,10 @@ class SyncService: rel_path = file_path.relative_to(local_dir) stat = file_path.stat() files[str(rel_path)] = { - 'path': file_path, - 'size': stat.st_size, - 'mtime': stat.st_mtime, - 'hash': self._calculate_hash(file_path), + "path": file_path, + "size": stat.st_size, + "mtime": stat.st_mtime, + "hash": self._calculate_hash(file_path), } return files @@ -114,14 +117,14 @@ class SyncService: remote_list = self.client.list(recursive=True) files = {} for item in remote_list: - if not item.endswith('/'): # 忽略目录 - rel_path = item.lstrip('/') + if not item.endswith("/"): # 忽略目录 + rel_path = item.lstrip("/") try: info = self.client.info(item) files[rel_path] = { - 'path': item, - 'size': info.get('size', 0), - 'mtime': self._parse_remote_mtime(info), + "path": item, + "size": info.get("size", 0), + "mtime": self._parse_remote_mtime(info), } except Exception as e: logger.warning("无法获取远程文件信息 %s: %s", item, e) @@ -134,8 +137,8 @@ class SyncService: """计算文件的 SHA-256 哈希值""" sha256 = hashlib.sha256() try: - with open(file_path, 'rb') as f: - for block in iter(lambda: f.read(block_size), b''): + with open(file_path, "rb") as f: + for block in iter(lambda: f.read(block_size), b""): sha256.update(block) return sha256.hexdigest() except Exception as e: @@ -151,23 +154,23 @@ class SyncService: def sync_directory(self, local_dir: pathlib.Path) -> typing.Dict[str, typing.Any]: """ 同步目录 - + Args: local_dir: 本地目录路径 - + Returns: 同步结果统计 """ if not self.client: logger.error("WebDAV 客户端未初始化") - return {'success': False, 'error': '客户端未初始化'} + return {"success": False, "error": "客户端未初始化"} results = { - 'uploaded': 0, - 'downloaded': 0, - 'conflicts': 0, - 'errors': 0, - 'success': True, + "uploaded": 0, + "downloaded": 0, + "conflicts": 0, + "errors": 0, + "success": True, } try: @@ -180,124 +183,144 @@ class SyncService: # 根据同步模式处理文件 if self.config.sync_mode in [SyncMode.BIDIRECTIONAL, SyncMode.UPLOAD_ONLY]: stats = self._upload_files(local_dir, local_files, remote_files) - results['uploaded'] += stats.get('uploaded', 0) - results['conflicts'] += stats.get('conflicts', 0) - results['errors'] += stats.get('errors', 0) + results["uploaded"] += stats.get("uploaded", 0) + results["conflicts"] += stats.get("conflicts", 0) + results["errors"] += stats.get("errors", 0) - if self.config.sync_mode in [SyncMode.BIDIRECTIONAL, SyncMode.DOWNLOAD_ONLY]: + if self.config.sync_mode in [ + SyncMode.BIDIRECTIONAL, + SyncMode.DOWNLOAD_ONLY, + ]: stats = self._download_files(local_dir, local_files, remote_files) - results['downloaded'] += stats.get('downloaded', 0) - results['conflicts'] += stats.get('conflicts', 0) - results['errors'] += stats.get('errors', 0) + results["downloaded"] += stats.get("downloaded", 0) + results["conflicts"] += stats.get("conflicts", 0) + results["errors"] += stats.get("errors", 0) logger.info("同步完成: %s", results) return results except Exception as e: logger.error("同步过程中发生错误: %s", e) - results['success'] = False - results['error'] = str(e) + results["success"] = False + results["error"] = str(e) return results - def _upload_files(self, local_dir: pathlib.Path, - local_files: dict, remote_files: dict) -> typing.Dict[str, int]: + def _upload_files( + self, local_dir: pathlib.Path, local_files: dict, remote_files: dict + ) -> typing.Dict[str, int]: """上传文件到远程服务器""" - stats = {'uploaded': 0, 'errors': 0, 'conflicts': 0} - + stats = {"uploaded": 0, "errors": 0, "conflicts": 0} + for rel_path, local_info in local_files.items(): remote_info = remote_files.get(rel_path) - + # 判断是否需要上传 should_upload = False conflict_resolved = False remote_path = os.path.join(self.config.remote_path, rel_path) - + if not remote_info: should_upload = True # 远程不存在 else: # 检查冲突 - local_mtime = local_info.get('mtime', 0) - remote_mtime = remote_info.get('mtime', 0) - + local_mtime = local_info.get("mtime", 0) + remote_mtime = remote_info.get("mtime", 0) + if local_mtime != remote_mtime: # 存在冲突 - stats['conflicts'] += 1 - should_upload, should_download = self._handle_conflict(local_info, remote_info) - - if should_upload and self.config.conflict_strategy == ConflictStrategy.KEEP_BOTH: + stats["conflicts"] += 1 + should_upload, should_download = self._handle_conflict( + local_info, remote_info + ) + + if ( + should_upload + and self.config.conflict_strategy == ConflictStrategy.KEEP_BOTH + ): # 重命名远程文件避免覆盖 conflict_suffix = f".conflict_{int(remote_mtime)}" name, ext = os.path.splitext(rel_path) - new_rel_path = f"{name}{conflict_suffix}{ext}" if ext else f"{name}{conflict_suffix}" - remote_path = os.path.join(self.config.remote_path, new_rel_path) + new_rel_path = ( + f"{name}{conflict_suffix}{ext}" + if ext + else f"{name}{conflict_suffix}" + ) + remote_path = os.path.join( + self.config.remote_path, new_rel_path + ) conflict_resolved = True logger.debug("冲突文件重命名: %s -> %s", rel_path, new_rel_path) else: # 时间相同,无需上传 should_upload = False - + if should_upload: try: - self.client.upload_file(local_info['path'], remote_path) - stats['uploaded'] += 1 + self.client.upload_file(local_info["path"], remote_path) + stats["uploaded"] += 1 logger.debug("上传文件: %s -> %s", rel_path, remote_path) except Exception as e: logger.error("上传文件失败 %s: %s", rel_path, e) - stats['errors'] += 1 - + stats["errors"] += 1 + return stats - def _download_files(self, local_dir: pathlib.Path, - local_files: dict, remote_files: dict) -> typing.Dict[str, int]: + def _download_files( + self, local_dir: pathlib.Path, local_files: dict, remote_files: dict + ) -> typing.Dict[str, int]: """从远程服务器下载文件""" - stats = {'downloaded': 0, 'errors': 0, 'conflicts': 0} - + stats = {"downloaded": 0, "errors": 0, "conflicts": 0} + for rel_path, remote_info in remote_files.items(): local_info = local_files.get(rel_path) - + # 判断是否需要下载 should_download = False if not local_info: should_download = True # 本地不存在 else: # 检查冲突 - local_mtime = local_info.get('mtime', 0) - remote_mtime = remote_info.get('mtime', 0) - + local_mtime = local_info.get("mtime", 0) + remote_mtime = remote_info.get("mtime", 0) + if local_mtime != remote_mtime: # 存在冲突 - stats['conflicts'] += 1 - should_upload, should_download = self._handle_conflict(local_info, remote_info) + stats["conflicts"] += 1 + should_upload, should_download = self._handle_conflict( + local_info, remote_info + ) # 如果应该上传,则不应该下载(冲突已在上传侧处理) if should_upload: should_download = False else: # 时间相同,无需下载 should_download = False - + if should_download: try: local_path = local_dir / rel_path local_path.parent.mkdir(parents=True, exist_ok=True) - self.client.download_file(remote_info['path'], str(local_path)) - stats['downloaded'] += 1 + self.client.download_file(remote_info["path"], str(local_path)) + stats["downloaded"] += 1 logger.debug("下载文件: %s -> %s", rel_path, local_path) except Exception as e: logger.error("下载文件失败 %s: %s", rel_path, e) - stats['errors'] += 1 - + stats["errors"] += 1 + return stats - def _handle_conflict(self, local_info: dict, remote_info: dict) -> typing.Tuple[bool, bool]: + def _handle_conflict( + self, local_info: dict, remote_info: dict + ) -> typing.Tuple[bool, bool]: """ 处理文件冲突 - + Returns: (should_upload, should_download) - 是否应该上传和下载 """ - local_mtime = local_info.get('mtime', 0) - remote_mtime = remote_info.get('mtime', 0) - + local_mtime = local_info.get("mtime", 0) + remote_mtime = remote_info.get("mtime", 0) + if self.config.conflict_strategy == ConflictStrategy.NEWER: # 较新文件覆盖较旧文件 if local_mtime > remote_mtime: @@ -306,7 +329,7 @@ class SyncService: return False, True # 下载远程较新版本 else: return False, False # 时间相同,无需操作 - + elif self.config.conflict_strategy == ConflictStrategy.KEEP_BOTH: # 保留双方 - 重命名远程文件 # 这里实现简单的重命名策略:添加冲突后缀 @@ -314,25 +337,28 @@ class SyncService: # 返回 True, False 表示上传重命名后的文件 # 重命名逻辑在调用处处理 return True, False - + elif self.config.conflict_strategy == ConflictStrategy.ASK: # 用户手动选择 - 记录冲突,跳过 # 返回 False, False 跳过,等待用户决定 - logger.warning("文件冲突需要用户手动选择: local_mtime=%s, remote_mtime=%s", - local_mtime, remote_mtime) + logger.warning( + "文件冲突需要用户手动选择: local_mtime=%s, remote_mtime=%s", + local_mtime, + remote_mtime, + ) return False, False - + return False, False def _should_upload(self, local_info: dict, remote_info: dict) -> bool: """判断是否需要上传(本地较新或哈希不同)""" # 这里实现简单的基于时间的比较 # 实际应该使用哈希比较更可靠 - return local_info.get('mtime', 0) > remote_info.get('mtime', 0) + return local_info.get("mtime", 0) > remote_info.get("mtime", 0) def _should_download(self, local_info: dict, remote_info: dict) -> bool: """判断是否需要下载(远程较新)""" - return remote_info.get('mtime', 0) > local_info.get('mtime', 0) + return remote_info.get("mtime", 0) > local_info.get("mtime", 0) def upload_file(self, local_path: pathlib.Path, remote_path: str = "") -> bool: """上传单个文件""" @@ -381,30 +407,32 @@ def create_sync_service_from_config() -> typing.Optional[SyncService]: """从配置文件创建同步服务实例""" try: from heurams.context import config_var - - sync_config = config_var.get()['providers']['sync']['webdav'] - if not sync_config.get('enabled', False): + + sync_config = config_var.get()["providers"]["sync"]["webdav"] + if not sync_config.get("enabled", False): logger.debug("同步服务未启用") return None - + config = SyncConfig( - enabled=sync_config.get('enabled', False), - url=sync_config.get('url', ''), - username=sync_config.get('username', ''), - password=sync_config.get('password', ''), - remote_path=sync_config.get('remote_path', '/heurams/'), - sync_mode=SyncMode(sync_config.get('sync_mode', 'bidirectional')), - conflict_strategy=ConflictStrategy(sync_config.get('conflict_strategy', 'newer')), - verify_ssl=sync_config.get('verify_ssl', True), + enabled=sync_config.get("enabled", False), + url=sync_config.get("url", ""), + username=sync_config.get("username", ""), + password=sync_config.get("password", ""), + remote_path=sync_config.get("remote_path", "/heurams/"), + sync_mode=SyncMode(sync_config.get("sync_mode", "bidirectional")), + conflict_strategy=ConflictStrategy( + sync_config.get("conflict_strategy", "newer") + ), + verify_ssl=sync_config.get("verify_ssl", True), ) - + service = SyncService(config) if service.client is None: logger.warning("同步服务客户端创建失败") return None - + return service - + except Exception as e: logger.error("创建同步服务失败: %s", e) - return None \ No newline at end of file + return None diff --git a/tests/interface/test_synctool.py b/tests/interface/test_synctool.py index 4d2c217..096ac8a 100644 --- a/tests/interface/test_synctool.py +++ b/tests/interface/test_synctool.py @@ -6,11 +6,16 @@ import pathlib import tempfile import time import unittest -from unittest.mock import MagicMock, patch, Mock +from unittest.mock import MagicMock, Mock, patch from heurams.context import ConfigContext from heurams.services.config import ConfigFile -from heurams.services.sync_service import SyncService, SyncConfig, SyncMode, ConflictStrategy +from heurams.services.sync_service import ( + ConflictStrategy, + SyncConfig, + SyncMode, + SyncService, +) class TestSyncServiceUnit(unittest.TestCase): @@ -20,14 +25,14 @@ class TestSyncServiceUnit(unittest.TestCase): """在每个测试之前运行, 设置临时目录和模拟客户端.""" self.temp_dir = tempfile.TemporaryDirectory() self.temp_path = pathlib.Path(self.temp_dir.name) - + # 创建测试文件 self.test_file = self.temp_path / "test.txt" self.test_file.write_text("测试内容") - + # 模拟 WebDAV 客户端 self.mock_client = MagicMock() - + # 创建同步配置 self.config = SyncConfig( enabled=True, @@ -44,100 +49,100 @@ class TestSyncServiceUnit(unittest.TestCase): """在每个测试之后清理.""" self.temp_dir.cleanup() - @patch('heurams.services.sync_service.Client') + @patch("heurams.services.sync_service.Client") def test_sync_service_initialization(self, mock_client_class): """测试同步服务初始化.""" mock_client_class.return_value = self.mock_client - + service = SyncService(self.config) - + # 验证客户端已创建 mock_client_class.assert_called_once() self.assertIsNotNone(service.client) self.assertEqual(service.config, self.config) - @patch('heurams.services.sync_service.Client') + @patch("heurams.services.sync_service.Client") def test_sync_service_disabled(self, mock_client_class): """测试同步服务未启用.""" config = SyncConfig(enabled=False) service = SyncService(config) - + # 客户端不应初始化 mock_client_class.assert_not_called() self.assertIsNone(service.client) - @patch('heurams.services.sync_service.Client') + @patch("heurams.services.sync_service.Client") def test_test_connection_success(self, mock_client_class): """测试连接测试成功.""" mock_client_class.return_value = self.mock_client self.mock_client.list.return_value = [] - + service = SyncService(self.config) result = service.test_connection() - + self.assertTrue(result) self.mock_client.list.assert_called_once() - @patch('heurams.services.sync_service.Client') + @patch("heurams.services.sync_service.Client") def test_test_connection_failure(self, mock_client_class): """测试连接测试失败.""" mock_client_class.return_value = self.mock_client self.mock_client.list.side_effect = Exception("连接失败") - + service = SyncService(self.config) result = service.test_connection() - + self.assertFalse(result) self.mock_client.list.assert_called_once() - @patch('heurams.services.sync_service.Client') + @patch("heurams.services.sync_service.Client") def test_upload_file(self, mock_client_class): """测试上传单个文件.""" mock_client_class.return_value = self.mock_client - + service = SyncService(self.config) result = service.upload_file(self.test_file) - + self.assertTrue(result) self.mock_client.upload_file.assert_called_once() - @patch('heurams.services.sync_service.Client') + @patch("heurams.services.sync_service.Client") def test_download_file(self, mock_client_class): """测试下载单个文件.""" mock_client_class.return_value = self.mock_client - + service = SyncService(self.config) remote_path = "/heurams/test.txt" local_path = self.temp_path / "downloaded.txt" - + result = service.download_file(remote_path, local_path) - + self.assertTrue(result) self.mock_client.download_file.assert_called_once() self.assertTrue(local_path.parent.exists()) - @patch('heurams.services.sync_service.Client') + @patch("heurams.services.sync_service.Client") def test_sync_directory_no_files(self, mock_client_class): """测试同步空目录.""" mock_client_class.return_value = self.mock_client self.mock_client.list.return_value = [] self.mock_client.mkdir.return_value = None - + service = SyncService(self.config) result = service.sync_directory(self.temp_path) - - self.assertTrue(result['success']) - self.assertEqual(result['uploaded'], 0) - self.assertEqual(result['downloaded'], 0) + + self.assertTrue(result["success"]) + self.assertEqual(result["uploaded"], 0) + self.assertEqual(result["downloaded"], 0) self.mock_client.mkdir.assert_called_once() - @patch('heurams.services.sync_service.Client') + @patch("heurams.services.sync_service.Client") def test_sync_directory_upload_only(self, mock_client_class): """测试仅上传模式.""" mock_client_class.return_value = self.mock_client self.mock_client.list.return_value = [] self.mock_client.mkdir.return_value = None - + config = SyncConfig( enabled=True, url="https://example.com/dav/", @@ -147,60 +152,64 @@ class TestSyncServiceUnit(unittest.TestCase): sync_mode=SyncMode.UPLOAD_ONLY, conflict_strategy=ConflictStrategy.NEWER, ) - + service = SyncService(config) result = service.sync_directory(self.temp_path) - - self.assertTrue(result['success']) + + self.assertTrue(result["success"]) self.mock_client.mkdir.assert_called_once() - @patch('heurams.services.sync_service.Client') + @patch("heurams.services.sync_service.Client") def test_conflict_strategy_newer(self, mock_client_class): """测试 NEWER 冲突策略.""" mock_client_class.return_value = self.mock_client - + # 模拟远程文件存在 self.mock_client.list.return_value = ["test.txt"] - self.mock_client.info.return_value = {'size': 100, 'modified': '2023-01-01T00:00:00Z'} + self.mock_client.info.return_value = { + "size": 100, + "modified": "2023-01-01T00:00:00Z", + } self.mock_client.mkdir.return_value = None - + service = SyncService(self.config) result = service.sync_directory(self.temp_path) - - self.assertTrue(result['success']) - # 应该有一个冲突 - self.assertGreaterEqual(result.get('conflicts', 0), 0) - @patch('heurams.services.sync_service.Client') + self.assertTrue(result["success"]) + # 应该有一个冲突 + self.assertGreaterEqual(result.get("conflicts", 0), 0) + + @patch("heurams.services.sync_service.Client") def test_create_sync_service_from_config(self, mock_client_class): """测试从配置文件创建同步服务.""" mock_client_class.return_value = self.mock_client - + # 创建临时配置文件 config_data = { - 'sync': { - 'webdav': { - 'enabled': True, - 'url': 'https://example.com/dav/', - 'username': 'test', - 'password': 'test', - 'remote_path': '/heurams/', - 'sync_mode': 'bidirectional', - 'conflict_strategy': 'newer', - 'verify_ssl': True, + "sync": { + "webdav": { + "enabled": True, + "url": "https://example.com/dav/", + "username": "test", + "password": "test", + "remote_path": "/heurams/", + "sync_mode": "bidirectional", + "conflict_strategy": "newer", + "verify_ssl": True, } } } - + # 模拟 config_var - with patch('heurams.services.sync_service.config_var') as mock_config_var: + with patch("heurams.services.sync_service.config_var") as mock_config_var: mock_config = MagicMock() mock_config.data = config_data mock_config_var.get.return_value = mock_config - + from heurams.services.sync_service import create_sync_service_from_config + service = create_sync_service_from_config() - + self.assertIsNotNone(service) self.assertIsNotNone(service.client) @@ -212,39 +221,41 @@ class TestSyncScreenUnit(unittest.TestCase): """在每个测试之前运行.""" self.temp_dir = tempfile.TemporaryDirectory() self.temp_path = pathlib.Path(self.temp_dir.name) - + # 创建默认配置 default_config_path = ( pathlib.Path(__file__).parent.parent.parent / "src/heurams/default/config/config.toml" ) self.config = ConfigFile(default_config_path) - + # 更新配置中的路径 config_data = self.config.data config_data["paths"]["nucleon_dir"] = str(self.temp_path / "nucleon") config_data["paths"]["electron_dir"] = str(self.temp_path / "electron") config_data["paths"]["orbital_dir"] = str(self.temp_path / "orbital") config_data["paths"]["cache_dir"] = str(self.temp_path / "cache") - + # 添加同步配置 - if 'sync' not in config_data: - config_data['sync'] = {} - config_data['sync']['webdav'] = { - 'enabled': False, - 'url': '', - 'username': '', - 'password': '', - 'remote_path': '/heurams/', - 'sync_mode': 'bidirectional', - 'conflict_strategy': 'newer', - 'verify_ssl': True, + if "sync" not in config_data: + config_data["sync"] = {} + config_data["sync"]["webdav"] = { + "enabled": False, + "url": "", + "username": "", + "password": "", + "remote_path": "/heurams/", + "sync_mode": "bidirectional", + "conflict_strategy": "newer", + "verify_ssl": True, } - + # 创建目录 for dir_key in ["nucleon_dir", "electron_dir", "orbital_dir", "cache_dir"]: - pathlib.Path(config_data["paths"][dir_key]).mkdir(parents=True, exist_ok=True) - + pathlib.Path(config_data["paths"][dir_key]).mkdir( + parents=True, exist_ok=True + ) + # 使用 ConfigContext 设置配置 self.config_ctx = ConfigContext(self.config) self.config_ctx.__enter__() @@ -254,52 +265,53 @@ class TestSyncScreenUnit(unittest.TestCase): self.config_ctx.__exit__(None, None, None) self.temp_dir.cleanup() - @patch('heurams.interface.screens.synctool.create_sync_service_from_config') + @patch("heurams.interface.screens.synctool.create_sync_service_from_config") def test_sync_screen_compose(self, mock_create_service): """测试 SyncScreen 的 compose 方法.""" from heurams.interface.screens.synctool import SyncScreen - + # 模拟同步服务 mock_service = MagicMock() mock_service.client = MagicMock() mock_create_service.return_value = mock_service - + screen = SyncScreen() - + # 测试 compose 方法 from textual.app import ComposeResult + result = screen.compose() widgets = list(result) - + # 检查基本部件 - from textual.widgets import Footer, Header, Button, Static, ProgressBar from textual.containers import ScrollableContainer - + from textual.widgets import Button, Footer, Header, ProgressBar, Static + header_present = any(isinstance(w, Header) for w in widgets) footer_present = any(isinstance(w, Footer) for w in widgets) self.assertTrue(header_present) self.assertTrue(footer_present) - + # 检查容器 container_present = any(isinstance(w, ScrollableContainer) for w in widgets) self.assertTrue(container_present) - @patch('heurams.interface.screens.synctool.create_sync_service_from_config') + @patch("heurams.interface.screens.synctool.create_sync_service_from_config") def test_sync_screen_load_config(self, mock_create_service): """测试 SyncScreen 加载配置.""" from heurams.interface.screens.synctool import SyncScreen - + mock_service = MagicMock() mock_service.client = MagicMock() mock_create_service.return_value = mock_service - + screen = SyncScreen() screen.load_config() - + # 验证配置已加载 self.assertIsNotNone(screen.sync_config) mock_create_service.assert_called_once() if __name__ == "__main__": - unittest.main() \ No newline at end of file + unittest.main() From d8fc18166dd441a661f9b71f3272d213057eb526 Mon Sep 17 00:00:00 2001 From: david-ajax Date: Sun, 21 Dec 2025 18:48:25 +0800 Subject: [PATCH 3/7] =?UTF-8?q?feat(synctool):=20=E8=99=9A=E6=8B=9F?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E7=B3=BB=E7=BB=9F=E5=88=9D=E6=AD=A5=E6=96=B9?= =?UTF-8?q?=E6=A1=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pyproject.toml | 2 +- src/heurams/services/vfs.py | 5 +++++ src/heurams/services/vfs/zipfs.py | 0 3 files changed, 6 insertions(+), 1 deletion(-) create mode 100644 src/heurams/services/vfs.py create mode 100644 src/heurams/services/vfs/zipfs.py diff --git a/pyproject.toml b/pyproject.toml index 0156761..400baf1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "heurams" -version = "0.4.0" +version = "0.4.3" description = "Heuristic Assisted Memory Scheduler" license = {file = "LICENSE"} classifiers = [ diff --git a/src/heurams/services/vfs.py b/src/heurams/services/vfs.py new file mode 100644 index 0000000..5fdecd1 --- /dev/null +++ b/src/heurams/services/vfs.py @@ -0,0 +1,5 @@ +""" vfs.py +得益于 FSSpec, 无需实现大部分虚拟文件系统的 Providers +""" + +import fsspec diff --git a/src/heurams/services/vfs/zipfs.py b/src/heurams/services/vfs/zipfs.py new file mode 100644 index 0000000..e69de29 From ee0646ac7939834b1ced984b114651537157f036 Mon Sep 17 00:00:00 2001 From: david-ajax Date: Sun, 21 Dec 2025 21:18:31 +0800 Subject: [PATCH 4/7] =?UTF-8?q?refactor(synctool):=20=E6=94=B9=E8=BF=9B?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E6=96=B9=E6=A1=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + config/config.toml | 8 +- src/heurams/interface/screens/synctool.py | 51 +-- src/heurams/services/sync_service.py | 438 ---------------------- src/heurams/services/vfs.py | 13 +- 5 files changed, 25 insertions(+), 486 deletions(-) diff --git a/.gitignore b/.gitignore index b622d60..ff7e71c 100644 --- a/.gitignore +++ b/.gitignore @@ -20,6 +20,7 @@ data/global/ data/orbital/ config/config_dev.toml AGENTS.md +*.log.1 # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/config/config.toml b/config/config.toml index b0bedc1..c7a3805 100644 --- a/config/config.toml +++ b/config/config.toml @@ -42,6 +42,7 @@ template_dir = "./data/template" audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Android Termux), mpg123(TODO) tts = "edgetts" # 可选项: edgetts llm = "openai" # 可选项: openai +sync = "webdav" # 可选项: 留空, webdav [providers.tts.edgetts] # EdgeTTS 设置 voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声) @@ -50,12 +51,11 @@ voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN- url = "" key = "" -[sync.webdav] # WebDAV 同步设置 -enabled = false +[providers.sync.webdav] # WebDAV 同步设置 url = "" username = "" password = "" remote_path = "/heurams/" -sync_mode = "bidirectional" # bidirectional/upload_only/download_only -conflict_strategy = "newer" # newer/ask/keep_both verify_ssl = true + +[sync] diff --git a/src/heurams/interface/screens/synctool.py b/src/heurams/interface/screens/synctool.py index e07a407..a169ea4 100644 --- a/src/heurams/interface/screens/synctool.py +++ b/src/heurams/interface/screens/synctool.py @@ -20,7 +20,6 @@ class SyncScreen(Screen): def __init__(self, nucleons: list = [], desc: str = ""): super().__init__(name=None, id=None, classes=None) self.sync_service = None - self.sync_config = {} self.is_syncing = False self.is_paused = False self.log_messages = [] @@ -30,35 +29,29 @@ class SyncScreen(Screen): yield Header(show_clock=True) with ScrollableContainer(id="sync_container"): # 标题和连接状态 - yield Static("WebDAV 同步工具", classes="title") + yield Static("同步工具", classes="title") yield Static("", id="status_label", classes="status") # 配置信息 - yield Static("服务器配置", classes="section_title") + yield Static(f"同步协议: {config_var.get()['services']['sync']}") + yield Static("服务器配置:", classes="section_title") with Horizontal(classes="config_info"): - yield Static("URL:", classes="config_label") + yield Static("远程服务器:", classes="config_label") yield Static("", id="server_url", classes="config_value") with Horizontal(classes="config_info"): yield Static("远程路径:", classes="config_label") yield Static("", id="remote_path", classes="config_value") - with Horizontal(classes="config_info"): - yield Static("同步模式:", classes="config_label") - yield Static("", id="sync_mode", classes="config_value") - # 控制按钮 - yield Static("控制面板", classes="section_title") with Horizontal(classes="control_buttons"): yield Button("测试连接", id="test_connection", variant="primary") yield Button("开始同步", id="start_sync", variant="success") yield Button("暂停", id="pause_sync", variant="warning", disabled=True) yield Button("取消", id="cancel_sync", variant="error", disabled=True) - # 进度显示 yield Static("同步进度", classes="section_title") yield ProgressBar(id="progress_bar", show_percentage=True, total=100) yield Static("", id="progress_label", classes="progress_text") - # 日志输出 yield Static("同步日志", classes="section_title") yield Static("", id="log_output", classes="log_output") @@ -66,50 +59,22 @@ class SyncScreen(Screen): def on_mount(self): """挂载时初始化状态""" - self.load_config() self.update_ui_from_config() self.log_message("同步工具已启动") - def load_config(self): - """从配置文件加载同步设置""" - try: - from heurams.context import config_var - - config_data = config_var.get().data - self.sync_config = config_data.get("sync", {}).get("webdav", {}) - - # 创建同步服务实例 - from heurams.services.sync_service import create_sync_service_from_config - - self.sync_service = create_sync_service_from_config() - - except Exception as e: - self.log_message(f"加载配置失败: {e}", is_error=True) - self.sync_config = {} - def update_ui_from_config(self): """更新 UI 显示配置信息""" try: + sync_cfg: dict = config_var.get()['providers']['sync']['webdav'] # 更新服务器 URL - url = self.sync_config.get("url", "未配置") + url = sync_cfg.get("url", "未配置") url_widget = self.query_one("#server_url") - url_widget.update(url if url else "未配置") # type: ignore - + url_widget.update(url) # type: ignore # 更新远程路径 - remote_path = self.sync_config.get("remote_path", "/heurams/") + remote_path = sync_cfg.get("remote_path", "/") path_widget = self.query_one("#remote_path") path_widget.update(remote_path) # type: ignore - # 更新同步模式 - sync_mode = self.sync_config.get("sync_mode", "bidirectional") - mode_widget = self.query_one("#sync_mode") - mode_map = { - "bidirectional": "双向同步", - "upload_only": "仅上传", - "download_only": "仅下载", - } - mode_widget.update(mode_map.get(sync_mode, sync_mode)) # type: ignore - # 更新状态标签 status_widget = self.query_one("#status_label") if self.sync_service and self.sync_service.client: diff --git a/src/heurams/services/sync_service.py b/src/heurams/services/sync_service.py index a82d5da..e69de29 100644 --- a/src/heurams/services/sync_service.py +++ b/src/heurams/services/sync_service.py @@ -1,438 +0,0 @@ -# WebDAV 同步服务 -import hashlib -import os -import pathlib -import time -import typing -from dataclasses import dataclass -from enum import Enum - -import requests -from webdav3.client import Client - -from heurams.context import config_var -from heurams.services.logger import get_logger - -logger = get_logger(__name__) - - -class SyncMode(Enum): - """同步模式枚举""" - - BIDIRECTIONAL = "bidirectional" - UPLOAD_ONLY = "upload_only" - DOWNLOAD_ONLY = "download_only" - - -class ConflictStrategy(Enum): - """冲突解决策略枚举""" - - NEWER = "newer" # 较新文件覆盖较旧文件 - ASK = "ask" # 用户手动选择 - KEEP_BOTH = "keep_both" # 保留双方(重命名) - - -@dataclass -class SyncConfig: - """同步配置数据类""" - - enabled: bool = False - url: str = "" - username: str = "" - password: str = "" - remote_path: str = "/heurams/" - sync_mode: SyncMode = SyncMode.BIDIRECTIONAL - conflict_strategy: ConflictStrategy = ConflictStrategy.NEWER - verify_ssl: bool = True - - -class SyncService: - """WebDAV 同步服务""" - - def __init__(self, config): - self.config = config - logger.debug(f"{str(self.config)}") - self.client = None - self._setup_client() - - def _setup_client(self): - """设置 WebDAV 客户端""" - if not self.config.enabled or not self.config.url: - logger.warning("同步服务未启用或未配置 URL") - return - - options = { - "webdav_hostname": self.config.url, - "webdav_login": self.config.username, - "webdav_password": self.config.password, - "webdav_root": self.config.remote_path, - "verify_ssl": self.config.verify_ssl, - "disable_check": True, # 不检查服务器支持的功能 - } - - try: - self.client = Client(options) - logger.info("WebDAV 客户端初始化完成") - except Exception as e: - logger.error("WebDAV 客户端初始化失败: %s", e) - self.client = None - - def test_connection(self) -> bool: - """测试 WebDAV 服务器连接""" - if not self.client: - logger.error("WebDAV 客户端未初始化") - return False - - try: - # 尝试列出根目录 - self.client.list() - logger.info("WebDAV 连接测试成功") - return True - except Exception as e: - logger.error("WebDAV 连接测试失败: %s", e) - return False - - def _get_local_files(self, local_dir: pathlib.Path) -> typing.Dict[str, dict]: - """获取本地文件列表及其元数据""" - files = {} - for root, _, filenames in os.walk(local_dir): - for filename in filenames: - file_path = pathlib.Path(root) / filename - rel_path = file_path.relative_to(local_dir) - stat = file_path.stat() - files[str(rel_path)] = { - "path": file_path, - "size": stat.st_size, - "mtime": stat.st_mtime, - "hash": self._calculate_hash(file_path), - } - return files - - def _get_remote_files(self) -> typing.Dict[str, dict]: - """获取远程文件列表及其元数据""" - if not self.client: - return {} - - try: - remote_list = self.client.list(recursive=True) - files = {} - for item in remote_list: - if not item.endswith("/"): # 忽略目录 - rel_path = item.lstrip("/") - try: - info = self.client.info(item) - files[rel_path] = { - "path": item, - "size": info.get("size", 0), - "mtime": self._parse_remote_mtime(info), - } - except Exception as e: - logger.warning("无法获取远程文件信息 %s: %s", item, e) - return files - except Exception as e: - logger.error("获取远程文件列表失败: %s", e) - return {} - - def _calculate_hash(self, file_path: pathlib.Path, block_size: int = 65536) -> str: - """计算文件的 SHA-256 哈希值""" - sha256 = hashlib.sha256() - try: - with open(file_path, "rb") as f: - for block in iter(lambda: f.read(block_size), b""): - sha256.update(block) - return sha256.hexdigest() - except Exception as e: - logger.error("计算文件哈希失败 %s: %s", file_path, e) - return "" - - def _parse_remote_mtime(self, info: dict) -> float: - """解析远程文件的修改时间""" - # WebDAV 可能返回 Last-Modified 头或其他时间格式 - # 这里简单返回当前时间,实际应根据服务器响应解析 - return time.time() - - def sync_directory(self, local_dir: pathlib.Path) -> typing.Dict[str, typing.Any]: - """ - 同步目录 - - Args: - local_dir: 本地目录路径 - - Returns: - 同步结果统计 - """ - if not self.client: - logger.error("WebDAV 客户端未初始化") - return {"success": False, "error": "客户端未初始化"} - - results = { - "uploaded": 0, - "downloaded": 0, - "conflicts": 0, - "errors": 0, - "success": True, - } - - try: - # 确保远程目录存在 - self.client.mkdir(self.config.remote_path) - - local_files = self._get_local_files(local_dir) - remote_files = self._get_remote_files() - - # 根据同步模式处理文件 - if self.config.sync_mode in [SyncMode.BIDIRECTIONAL, SyncMode.UPLOAD_ONLY]: - stats = self._upload_files(local_dir, local_files, remote_files) - results["uploaded"] += stats.get("uploaded", 0) - results["conflicts"] += stats.get("conflicts", 0) - results["errors"] += stats.get("errors", 0) - - if self.config.sync_mode in [ - SyncMode.BIDIRECTIONAL, - SyncMode.DOWNLOAD_ONLY, - ]: - stats = self._download_files(local_dir, local_files, remote_files) - results["downloaded"] += stats.get("downloaded", 0) - results["conflicts"] += stats.get("conflicts", 0) - results["errors"] += stats.get("errors", 0) - - logger.info("同步完成: %s", results) - return results - - except Exception as e: - logger.error("同步过程中发生错误: %s", e) - results["success"] = False - results["error"] = str(e) - return results - - def _upload_files( - self, local_dir: pathlib.Path, local_files: dict, remote_files: dict - ) -> typing.Dict[str, int]: - """上传文件到远程服务器""" - stats = {"uploaded": 0, "errors": 0, "conflicts": 0} - - for rel_path, local_info in local_files.items(): - remote_info = remote_files.get(rel_path) - - # 判断是否需要上传 - should_upload = False - conflict_resolved = False - remote_path = os.path.join(self.config.remote_path, rel_path) - - if not remote_info: - should_upload = True # 远程不存在 - else: - # 检查冲突 - local_mtime = local_info.get("mtime", 0) - remote_mtime = remote_info.get("mtime", 0) - - if local_mtime != remote_mtime: - # 存在冲突 - stats["conflicts"] += 1 - should_upload, should_download = self._handle_conflict( - local_info, remote_info - ) - - if ( - should_upload - and self.config.conflict_strategy == ConflictStrategy.KEEP_BOTH - ): - # 重命名远程文件避免覆盖 - conflict_suffix = f".conflict_{int(remote_mtime)}" - name, ext = os.path.splitext(rel_path) - new_rel_path = ( - f"{name}{conflict_suffix}{ext}" - if ext - else f"{name}{conflict_suffix}" - ) - remote_path = os.path.join( - self.config.remote_path, new_rel_path - ) - conflict_resolved = True - logger.debug("冲突文件重命名: %s -> %s", rel_path, new_rel_path) - else: - # 时间相同,无需上传 - should_upload = False - - if should_upload: - try: - self.client.upload_file(local_info["path"], remote_path) - stats["uploaded"] += 1 - logger.debug("上传文件: %s -> %s", rel_path, remote_path) - except Exception as e: - logger.error("上传文件失败 %s: %s", rel_path, e) - stats["errors"] += 1 - - return stats - - def _download_files( - self, local_dir: pathlib.Path, local_files: dict, remote_files: dict - ) -> typing.Dict[str, int]: - """从远程服务器下载文件""" - stats = {"downloaded": 0, "errors": 0, "conflicts": 0} - - for rel_path, remote_info in remote_files.items(): - local_info = local_files.get(rel_path) - - # 判断是否需要下载 - should_download = False - if not local_info: - should_download = True # 本地不存在 - else: - # 检查冲突 - local_mtime = local_info.get("mtime", 0) - remote_mtime = remote_info.get("mtime", 0) - - if local_mtime != remote_mtime: - # 存在冲突 - stats["conflicts"] += 1 - should_upload, should_download = self._handle_conflict( - local_info, remote_info - ) - # 如果应该上传,则不应该下载(冲突已在上传侧处理) - if should_upload: - should_download = False - else: - # 时间相同,无需下载 - should_download = False - - if should_download: - try: - local_path = local_dir / rel_path - local_path.parent.mkdir(parents=True, exist_ok=True) - self.client.download_file(remote_info["path"], str(local_path)) - stats["downloaded"] += 1 - logger.debug("下载文件: %s -> %s", rel_path, local_path) - except Exception as e: - logger.error("下载文件失败 %s: %s", rel_path, e) - stats["errors"] += 1 - - return stats - - def _handle_conflict( - self, local_info: dict, remote_info: dict - ) -> typing.Tuple[bool, bool]: - """ - 处理文件冲突 - - Returns: - (should_upload, should_download) - 是否应该上传和下载 - """ - local_mtime = local_info.get("mtime", 0) - remote_mtime = remote_info.get("mtime", 0) - - if self.config.conflict_strategy == ConflictStrategy.NEWER: - # 较新文件覆盖较旧文件 - if local_mtime > remote_mtime: - return True, False # 上传本地较新版本 - elif remote_mtime > local_mtime: - return False, True # 下载远程较新版本 - else: - return False, False # 时间相同,无需操作 - - elif self.config.conflict_strategy == ConflictStrategy.KEEP_BOTH: - # 保留双方 - 重命名远程文件 - # 这里实现简单的重命名策略:添加冲突后缀 - # 实际应该在上传时处理重命名 - # 返回 True, False 表示上传重命名后的文件 - # 重命名逻辑在调用处处理 - return True, False - - elif self.config.conflict_strategy == ConflictStrategy.ASK: - # 用户手动选择 - 记录冲突,跳过 - # 返回 False, False 跳过,等待用户决定 - logger.warning( - "文件冲突需要用户手动选择: local_mtime=%s, remote_mtime=%s", - local_mtime, - remote_mtime, - ) - return False, False - - return False, False - - def _should_upload(self, local_info: dict, remote_info: dict) -> bool: - """判断是否需要上传(本地较新或哈希不同)""" - # 这里实现简单的基于时间的比较 - # 实际应该使用哈希比较更可靠 - return local_info.get("mtime", 0) > remote_info.get("mtime", 0) - - def _should_download(self, local_info: dict, remote_info: dict) -> bool: - """判断是否需要下载(远程较新)""" - return remote_info.get("mtime", 0) > local_info.get("mtime", 0) - - def upload_file(self, local_path: pathlib.Path, remote_path: str = "") -> bool: - """上传单个文件""" - if not self.client: - return False - - try: - if not remote_path: - remote_path = os.path.join(self.config.remote_path, local_path.name) - self.client.upload_file(str(local_path), remote_path) - logger.info("文件上传成功: %s -> %s", local_path, remote_path) - return True - except Exception as e: - logger.error("文件上传失败: %s", e) - return False - - def download_file(self, remote_path: str, local_path: pathlib.Path) -> bool: - """下载单个文件""" - if not self.client: - return False - - try: - local_path.parent.mkdir(parents=True, exist_ok=True) - self.client.download_file(remote_path, str(local_path)) - logger.info("文件下载成功: %s -> %s", remote_path, local_path) - return True - except Exception as e: - logger.error("文件下载失败: %s", e) - return False - - def delete_remote_file(self, remote_path: str) -> bool: - """删除远程文件""" - if not self.client: - return False - - try: - self.client.clean(remote_path) - logger.info("远程文件删除成功: %s", remote_path) - return True - except Exception as e: - logger.error("远程文件删除失败: %s", e) - return False - - -def create_sync_service_from_config() -> typing.Optional[SyncService]: - """从配置文件创建同步服务实例""" - try: - from heurams.context import config_var - - sync_config = config_var.get()["providers"]["sync"]["webdav"] - if not sync_config.get("enabled", False): - logger.debug("同步服务未启用") - return None - - config = SyncConfig( - enabled=sync_config.get("enabled", False), - url=sync_config.get("url", ""), - username=sync_config.get("username", ""), - password=sync_config.get("password", ""), - remote_path=sync_config.get("remote_path", "/heurams/"), - sync_mode=SyncMode(sync_config.get("sync_mode", "bidirectional")), - conflict_strategy=ConflictStrategy( - sync_config.get("conflict_strategy", "newer") - ), - verify_ssl=sync_config.get("verify_ssl", True), - ) - - service = SyncService(config) - if service.client is None: - logger.warning("同步服务客户端创建失败") - return None - - return service - - except Exception as e: - logger.error("创建同步服务失败: %s", e) - return None diff --git a/src/heurams/services/vfs.py b/src/heurams/services/vfs.py index 5fdecd1..a7f0988 100644 --- a/src/heurams/services/vfs.py +++ b/src/heurams/services/vfs.py @@ -2,4 +2,15 @@ 得益于 FSSpec, 无需实现大部分虚拟文件系统的 Providers """ -import fsspec +from pathlib import Path +import fsspec as fs + +class VFSObject(): + def __init__(self, protocol, base_url): + self.base_url = base_url + self.protocol = protocol + self.fs = fs.filesystem(protocol=protocol, base_url = base_url) + def open(self, path: Path): + return self.fs.open(path) + def open_by_list(self, path_list: list[Path]): + return self.fs.open_files(path_list) From 0fb421412e6887db105c2e0423f4da4c2fe9aa92 Mon Sep 17 00:00:00 2001 From: david-ajax Date: Sun, 21 Dec 2025 23:06:17 +0800 Subject: [PATCH 5/7] =?UTF-8?q?fix:=20=E6=9A=82=E6=97=B6=E7=A6=81=E7=94=A8?= =?UTF-8?q?=E5=AE=9E=E9=AA=8C=E6=80=A7=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/heurams/default/config/config.toml | 8 ++++---- src/heurams/interface/__init__.py | 2 +- src/heurams/interface/screens/dashboard.py | 3 ++- src/heurams/interface/screens/nucreator.py | 3 ++- src/heurams/interface/screens/synctool.py | 2 +- src/heurams/kernel/algorithms/sm15m.py | 13 +++---------- src/heurams/services/vfs.py | 10 +++++++--- tests/interface/test_synctool.py | 11 ++++------- 8 files changed, 24 insertions(+), 28 deletions(-) diff --git a/src/heurams/default/config/config.toml b/src/heurams/default/config/config.toml index b0bedc1..c7a3805 100644 --- a/src/heurams/default/config/config.toml +++ b/src/heurams/default/config/config.toml @@ -42,6 +42,7 @@ template_dir = "./data/template" audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Android Termux), mpg123(TODO) tts = "edgetts" # 可选项: edgetts llm = "openai" # 可选项: openai +sync = "webdav" # 可选项: 留空, webdav [providers.tts.edgetts] # EdgeTTS 设置 voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声) @@ -50,12 +51,11 @@ voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN- url = "" key = "" -[sync.webdav] # WebDAV 同步设置 -enabled = false +[providers.sync.webdav] # WebDAV 同步设置 url = "" username = "" password = "" remote_path = "/heurams/" -sync_mode = "bidirectional" # bidirectional/upload_only/download_only -conflict_strategy = "newer" # newer/ask/keep_both verify_ssl = true + +[sync] diff --git a/src/heurams/interface/__init__.py b/src/heurams/interface/__init__.py index 4071b04..8205e2e 100644 --- a/src/heurams/interface/__init__.py +++ b/src/heurams/interface/__init__.py @@ -40,7 +40,7 @@ class HeurAMSApp(App): ("1", "app.push_screen('dashboard')", "仪表盘"), ("2", "app.push_screen('precache_all')", "缓存管理器"), ("3", "app.push_screen('nucleon_creator')", "创建新单元"), - ("4", "app.push_screen('synctool')", "同步工具"), + # ("4", "app.push_screen('synctool')", "同步工具"), ("0", "app.push_screen('about')", "版本信息"), ] SCREENS = { diff --git a/src/heurams/interface/screens/dashboard.py b/src/heurams/interface/screens/dashboard.py index ed09e22..1a323f4 100644 --- a/src/heurams/interface/screens/dashboard.py +++ b/src/heurams/interface/screens/dashboard.py @@ -4,7 +4,8 @@ import pathlib from textual.app import ComposeResult from textual.containers import ScrollableContainer from textual.screen import Screen -from textual.widgets import Button, Footer, Header, Label, ListItem, ListView, Static +from textual.widgets import (Button, Footer, Header, Label, ListItem, ListView, + Static) import heurams.services.timer as timer import heurams.services.version as version diff --git a/src/heurams/interface/screens/nucreator.py b/src/heurams/interface/screens/nucreator.py index b187528..735dda6 100644 --- a/src/heurams/interface/screens/nucreator.py +++ b/src/heurams/interface/screens/nucreator.py @@ -5,7 +5,8 @@ import toml from textual.app import ComposeResult from textual.containers import ScrollableContainer from textual.screen import Screen -from textual.widgets import Button, Footer, Header, Input, Label, Markdown, Select +from textual.widgets import (Button, Footer, Header, Input, Label, Markdown, + Select) from heurams.context import config_var from heurams.services.version import ver diff --git a/src/heurams/interface/screens/synctool.py b/src/heurams/interface/screens/synctool.py index a169ea4..3d1eebe 100644 --- a/src/heurams/interface/screens/synctool.py +++ b/src/heurams/interface/screens/synctool.py @@ -65,7 +65,7 @@ class SyncScreen(Screen): def update_ui_from_config(self): """更新 UI 显示配置信息""" try: - sync_cfg: dict = config_var.get()['providers']['sync']['webdav'] + sync_cfg: dict = config_var.get()["providers"]["sync"]["webdav"] # 更新服务器 URL url = sync_cfg.get("url", "未配置") url_widget = self.query_one("#server_url") diff --git a/src/heurams/kernel/algorithms/sm15m.py b/src/heurams/kernel/algorithms/sm15m.py index 1decbb5..fc5abd7 100644 --- a/src/heurams/kernel/algorithms/sm15m.py +++ b/src/heurams/kernel/algorithms/sm15m.py @@ -14,16 +14,9 @@ import pathlib from typing import TypedDict from heurams.context import config_var -from heurams.kernel.algorithms.sm15m_calc import ( - MAX_AF, - MIN_AF, - NOTCH_AF, - RANGE_AF, - RANGE_REPETITION, - SM, - THRESHOLD_RECALL, - Item, -) +from heurams.kernel.algorithms.sm15m_calc import (MAX_AF, MIN_AF, NOTCH_AF, + RANGE_AF, RANGE_REPETITION, + SM, THRESHOLD_RECALL, Item) # 全局状态文件路径 _GLOBAL_STATE_FILE = os.path.expanduser( diff --git a/src/heurams/services/vfs.py b/src/heurams/services/vfs.py index a7f0988..a024ef8 100644 --- a/src/heurams/services/vfs.py +++ b/src/heurams/services/vfs.py @@ -1,16 +1,20 @@ -""" vfs.py +"""vfs.py 得益于 FSSpec, 无需实现大部分虚拟文件系统的 Providers """ from pathlib import Path + import fsspec as fs -class VFSObject(): + +class VFSObject: def __init__(self, protocol, base_url): self.base_url = base_url self.protocol = protocol - self.fs = fs.filesystem(protocol=protocol, base_url = base_url) + self.fs = fs.filesystem(protocol=protocol, base_url=base_url) + def open(self, path: Path): return self.fs.open(path) + def open_by_list(self, path_list: list[Path]): return self.fs.open_files(path_list) diff --git a/tests/interface/test_synctool.py b/tests/interface/test_synctool.py index 096ac8a..b70d8a3 100644 --- a/tests/interface/test_synctool.py +++ b/tests/interface/test_synctool.py @@ -10,12 +10,8 @@ from unittest.mock import MagicMock, Mock, patch from heurams.context import ConfigContext from heurams.services.config import ConfigFile -from heurams.services.sync_service import ( - ConflictStrategy, - SyncConfig, - SyncMode, - SyncService, -) +from heurams.services.sync_service import (ConflictStrategy, SyncConfig, + SyncMode, SyncService) class TestSyncServiceUnit(unittest.TestCase): @@ -206,7 +202,8 @@ class TestSyncServiceUnit(unittest.TestCase): mock_config.data = config_data mock_config_var.get.return_value = mock_config - from heurams.services.sync_service import create_sync_service_from_config + from heurams.services.sync_service import \ + create_sync_service_from_config service = create_sync_service_from_config() From 87cefedb61f96f027bcd75b97379512d0bc3fa93 Mon Sep 17 00:00:00 2001 From: david-ajax Date: Sun, 21 Dec 2025 23:42:02 +0800 Subject: [PATCH 6/7] =?UTF-8?q?feat(interface):=20=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E6=99=BA=E8=83=BD=E5=8D=95=E5=85=83=E9=9B=86=E6=8E=92=E5=BA=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/heurams/interface/screens/dashboard.py | 228 +++++++++++++-------- tests/interface/test_dashboard.py | 2 +- 2 files changed, 149 insertions(+), 81 deletions(-) diff --git a/src/heurams/interface/screens/dashboard.py b/src/heurams/interface/screens/dashboard.py index 1a323f4..f3be579 100644 --- a/src/heurams/interface/screens/dashboard.py +++ b/src/heurams/interface/screens/dashboard.py @@ -4,8 +4,15 @@ import pathlib from textual.app import ComposeResult from textual.containers import ScrollableContainer from textual.screen import Screen -from textual.widgets import (Button, Footer, Header, Label, ListItem, ListView, - Static) +from textual.widgets import ( + Button, + Footer, + Header, + Label, + ListItem, + ListView, + Static, +) import heurams.services.timer as timer import heurams.services.version as version @@ -20,129 +27,190 @@ logger = get_logger(__name__) class DashboardScreen(Screen): + """主仪表盘屏幕""" + SUB_TITLE = "仪表盘" - + + def __init__( + self, + name: str | None = None, + id: str | None = None, + classes: str | None = None, + ) -> None: + super().__init__(name, id, classes) + self.nextdates = {} + self.texts = {} + self.stay_enabled = {} + def compose(self) -> ComposeResult: + """组合界面组件""" yield Header(show_clock=True) yield ScrollableContainer( - Label(f'欢迎使用 "潜进" 启发式辅助记忆调度器', classes="title-label"), + Label('欢迎使用 "潜进" 启发式辅助记忆调度器', classes="title-label"), Label(f"当前 UNIX 日时间戳: {timer.get_daystamp()}"), Label(f'时区修正: UTC+{config_var.get()["timezone_offset"] / 3600}'), Label(f"使用算法: {config_var.get()['algorithm']['default']}"), Label("选择待学习或待修改的记忆单元集:", classes="title-label"), ListView(id="union-list", classes="union-list-view"), Label( - f'"潜进" 启发式辅助记忆调度器 | 版本 {version.ver} {version.codename.capitalize()} 2025' + f'"潜进" 启发式辅助记忆调度器 | 版本 {version.ver} ' + f'{version.codename.capitalize()} 2025' ), ) yield Footer() - - def item_desc_generator(self, filename) -> dict: - """简单分析以生成项目项显示文本 - + + def analyser(self, filename: str) -> dict: + """分析文件状态以生成显示文本 + + Args: + filename: 要分析的文件名 + Returns: - dict: 以数字为列表, 分别呈现单行字符串 + dict: 包含显示文本的字典,键为行号 """ - res = dict() + from heurams.kernel.particles.loader import load_electron, load_nucleon + + result = {} filestem = pathlib.Path(filename).stem - res[0] = f"{filename}\0" - import heurams.kernel.particles as pt - from heurams.kernel.particles.loader import load_electron - - electron_file_path = pathlib.Path(config_var.get()["paths"]["electron_dir"]) / ( - filestem + ".json" - ) - + + # 构建电子文件路径 + electron_dir = config_var.get()["paths"]["electron_dir"] + electron_file_path = pathlib.Path(electron_dir) / f"{filestem}.json" + logger.debug(f"电子文件路径: {electron_file_path}") - - if electron_file_path.exists(): # 未找到则创建电子文件 (json) - pass - else: + + # 确保电子文件存在 + if not electron_file_path.exists(): electron_file_path.touch() - with open(electron_file_path, "w") as f: - f.write("{}") - electron_dict = load_electron(path=electron_file_path) # TODO: 取消硬编码扩展名 - logger.debug(electron_dict) + electron_file_path.write_text("{}") + + # 加载电子数据 + electron_dict = load_electron(path=electron_file_path) + logger.debug(f"电子数据: {electron_dict}") + + # 分析电子状态 is_due = 0 is_activated = 0 nextdate = 0x3F3F3F3F - for i in electron_dict.values(): - i: pt.Electron - logger.debug(i, i.is_due()) - if i.is_due(): + + for electron in electron_dict.values(): + logger.debug(f"{electron}, 是否到期: {electron.is_due()}") + + if electron.is_due(): is_due = 1 - if i.is_activated(): + if electron.is_activated(): is_activated = 1 - nextdate = min(nextdate, i.nextdate()) - res[1] = f"下一次复习: {nextdate}\n" - res[1] += f"{"需要复习" if is_due else "当前无需复习"}" + nextdate = min(nextdate, electron.nextdate()) + + # 检查是否需要更多复习 + nucleon_dir = config_var.get()["paths"]["nucleon_dir"] + nucleon_path = pathlib.Path(nucleon_dir) / f"{filestem}.toml" + nucleon_count = len(load_nucleon(nucleon_path)) + electron_count = len(electron_dict) + is_more = not (electron_count >= nucleon_count) + + logger.debug(f"是否需要更多复习: {is_more}") + + # 更新状态 + self.nextdates[filename] = nextdate + self.stay_enabled[filename] = (is_due or is_more) + + # 构建返回结果 + result[0] = f"{filename}\0" + if not is_activated: - res[1] = " 尚未激活" - return res - - def on_mount(self) -> None: - union_list_widget = self.query_one("#union-list", ListView) - - probe = probe_all(0) - - if len(probe["nucleon"]): - for file in probe["nucleon"]: - text = self.item_desc_generator(file) - union_list_widget.append( - ListItem( - Label(text[0] + "\n" + text[1]), - ) - ) + result[1] = " 尚未激活" else: + status_text = "需要复习" if is_due else "当前无需复习" + result[1] = f"下一次复习: {nextdate}\n{status_text}" + + return result + + def on_mount(self) -> None: + """挂载组件时初始化""" + union_list_widget = self.query_one("#union-list", ListView) + probe = probe_all(0) + + # 分析所有文件 + for file in probe["nucleon"]: + self.texts[file] = self.analyser(file) + + # 按下次复习时间排序 + nucleon_files = sorted( + probe["nucleon"], + key=lambda f: self.nextdates[f], + reverse=True, + ) + + # 填充列表 + if not probe["nucleon"]: union_list_widget.append( ListItem( Static( - "在 ./nucleon/ 中未找到任何内容源数据文件.\n请放置文件后重启应用.\n或者新建空的单元集." + "在 ./nucleon/ 中未找到任何内容源数据文件。\n" + "请放置文件后重启应用,或者新建空的单元集。" ) ) ) union_list_widget.disabled = True - + return + + for file in nucleon_files: + text = self.texts[file] + list_item = ListItem( + Label(f"{text[0]}\n{text[1]}") + ) + union_list_widget.append(list_item) + + if not self.stay_enabled[file]: + list_item.disabled = True + def on_list_view_selected(self, event) -> None: + """处理列表项选择事件""" if not isinstance(event.item, ListItem): return - + selected_label = event.item.query_one(Label) - if "未找到任何 .toml 文件" in str(selected_label.renderable): # type: ignore + label_text = str(selected_label.renderable) + + if "未找到任何 .toml 文件" in label_text: return - + + # 提取文件名 selected_filename = pathlib.Path( - str(selected_label.renderable) - .partition("\0")[0] # 文件名末尾截断, 保留文件名 - .replace("*", "") - ) # 去除markdown加粗 - - nucleon_file_path = ( - pathlib.Path(config_var.get()["paths"]["nucleon_dir"]) / selected_filename + label_text.partition("\0")[0].replace("*", "") ) - electron_file_path = pathlib.Path(config_var.get()["paths"]["electron_dir"]) / ( - str(selected_filename.stem) + ".json" + + # 构建文件路径 + nucleon_dir = config_var.get()["paths"]["nucleon_dir"] + electron_dir = config_var.get()["paths"]["electron_dir"] + + nucleon_file_path = pathlib.Path(nucleon_dir) / selected_filename + electron_file_path = pathlib.Path(electron_dir) / f"{selected_filename.stem}.json" + + # 跳转到准备屏幕 + self.app.push_screen( + PreparationScreen(nucleon_file_path, electron_file_path) ) - self.app.push_screen(PreparationScreen(nucleon_file_path, electron_file_path)) - + def on_button_pressed(self, event) -> None: - if event.button.id == "new_nucleon_button": - # 切换到创建单元 + """处理按钮点击事件""" + button_id = event.button.id + + if button_id == "new_nucleon_button": from .nucreator import NucleonCreatorScreen - - newscr = NucleonCreatorScreen() - self.app.push_screen(newscr) - elif event.button.id == "precache_all_button": - # 切换到缓存管理器 + new_screen = NucleonCreatorScreen() + self.app.push_screen(new_screen) + + elif button_id == "precache_all_button": from .precache import PrecachingScreen - precache_screen = PrecachingScreen() self.app.push_screen(precache_screen) - elif event.button.id == "about_button": - from .about import AboutScreen - + + elif button_id == "about_button": about_screen = AboutScreen() self.app.push_screen(about_screen) - + def action_quit_app(self) -> None: - self.app.exit() + """退出应用程序""" + self.app.exit() \ No newline at end of file diff --git a/tests/interface/test_dashboard.py b/tests/interface/test_dashboard.py index be519ac..42ca79a 100644 --- a/tests/interface/test_dashboard.py +++ b/tests/interface/test_dashboard.py @@ -89,7 +89,7 @@ class TestDashboardScreenUnit(unittest.TestCase): screen = DashboardScreen() # 模拟一个文件名 filename = "test.toml" - result = screen.item_desc_generator(filename) + result = screen.analyser(filename) self.assertIsInstance(result, dict) self.assertIn(0, result) self.assertIn(1, result) From b5f30ec4eeaf1d1a9daf960f81e938710cd8a9a6 Mon Sep 17 00:00:00 2001 From: david-ajax Date: Sun, 21 Dec 2025 23:44:13 +0800 Subject: [PATCH 7/7] =?UTF-8?q?style:=20=E6=A0=BC=E5=BC=8F=E5=8C=96?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/heurams/interface/screens/dashboard.py | 111 ++++++++++----------- 1 file changed, 51 insertions(+), 60 deletions(-) diff --git a/src/heurams/interface/screens/dashboard.py b/src/heurams/interface/screens/dashboard.py index f3be579..38b0d1d 100644 --- a/src/heurams/interface/screens/dashboard.py +++ b/src/heurams/interface/screens/dashboard.py @@ -4,15 +4,8 @@ import pathlib from textual.app import ComposeResult from textual.containers import ScrollableContainer from textual.screen import Screen -from textual.widgets import ( - Button, - Footer, - Header, - Label, - ListItem, - ListView, - Static, -) +from textual.widgets import (Button, Footer, Header, Label, ListItem, ListView, + Static) import heurams.services.timer as timer import heurams.services.version as version @@ -28,9 +21,9 @@ logger = get_logger(__name__) class DashboardScreen(Screen): """主仪表盘屏幕""" - + SUB_TITLE = "仪表盘" - + def __init__( self, name: str | None = None, @@ -41,7 +34,7 @@ class DashboardScreen(Screen): self.nextdates = {} self.texts = {} self.stay_enabled = {} - + def compose(self) -> ComposeResult: """组合界面组件""" yield Header(show_clock=True) @@ -54,94 +47,94 @@ class DashboardScreen(Screen): ListView(id="union-list", classes="union-list-view"), Label( f'"潜进" 启发式辅助记忆调度器 | 版本 {version.ver} ' - f'{version.codename.capitalize()} 2025' + f"{version.codename.capitalize()} 2025" ), ) yield Footer() - + def analyser(self, filename: str) -> dict: """分析文件状态以生成显示文本 - + Args: filename: 要分析的文件名 - + Returns: dict: 包含显示文本的字典,键为行号 """ from heurams.kernel.particles.loader import load_electron, load_nucleon - + result = {} filestem = pathlib.Path(filename).stem - + # 构建电子文件路径 electron_dir = config_var.get()["paths"]["electron_dir"] electron_file_path = pathlib.Path(electron_dir) / f"{filestem}.json" - + logger.debug(f"电子文件路径: {electron_file_path}") - + # 确保电子文件存在 if not electron_file_path.exists(): electron_file_path.touch() electron_file_path.write_text("{}") - + # 加载电子数据 electron_dict = load_electron(path=electron_file_path) logger.debug(f"电子数据: {electron_dict}") - + # 分析电子状态 is_due = 0 is_activated = 0 nextdate = 0x3F3F3F3F - + for electron in electron_dict.values(): logger.debug(f"{electron}, 是否到期: {electron.is_due()}") - + if electron.is_due(): is_due = 1 if electron.is_activated(): is_activated = 1 nextdate = min(nextdate, electron.nextdate()) - + # 检查是否需要更多复习 nucleon_dir = config_var.get()["paths"]["nucleon_dir"] nucleon_path = pathlib.Path(nucleon_dir) / f"{filestem}.toml" nucleon_count = len(load_nucleon(nucleon_path)) electron_count = len(electron_dict) is_more = not (electron_count >= nucleon_count) - + logger.debug(f"是否需要更多复习: {is_more}") - + # 更新状态 self.nextdates[filename] = nextdate - self.stay_enabled[filename] = (is_due or is_more) - + self.stay_enabled[filename] = is_due or is_more + # 构建返回结果 result[0] = f"{filename}\0" - + if not is_activated: result[1] = " 尚未激活" else: status_text = "需要复习" if is_due else "当前无需复习" result[1] = f"下一次复习: {nextdate}\n{status_text}" - + return result - + def on_mount(self) -> None: """挂载组件时初始化""" union_list_widget = self.query_one("#union-list", ListView) probe = probe_all(0) - + # 分析所有文件 for file in probe["nucleon"]: self.texts[file] = self.analyser(file) - + # 按下次复习时间排序 nucleon_files = sorted( probe["nucleon"], key=lambda f: self.nextdates[f], reverse=True, ) - + # 填充列表 if not probe["nucleon"]: union_list_widget.append( @@ -154,63 +147,61 @@ class DashboardScreen(Screen): ) union_list_widget.disabled = True return - + for file in nucleon_files: text = self.texts[file] - list_item = ListItem( - Label(f"{text[0]}\n{text[1]}") - ) + list_item = ListItem(Label(f"{text[0]}\n{text[1]}")) union_list_widget.append(list_item) - + if not self.stay_enabled[file]: list_item.disabled = True - + def on_list_view_selected(self, event) -> None: """处理列表项选择事件""" if not isinstance(event.item, ListItem): return - + selected_label = event.item.query_one(Label) label_text = str(selected_label.renderable) - + if "未找到任何 .toml 文件" in label_text: return - + # 提取文件名 - selected_filename = pathlib.Path( - label_text.partition("\0")[0].replace("*", "") - ) - + selected_filename = pathlib.Path(label_text.partition("\0")[0].replace("*", "")) + # 构建文件路径 nucleon_dir = config_var.get()["paths"]["nucleon_dir"] electron_dir = config_var.get()["paths"]["electron_dir"] - + nucleon_file_path = pathlib.Path(nucleon_dir) / selected_filename - electron_file_path = pathlib.Path(electron_dir) / f"{selected_filename.stem}.json" - - # 跳转到准备屏幕 - self.app.push_screen( - PreparationScreen(nucleon_file_path, electron_file_path) + electron_file_path = ( + pathlib.Path(electron_dir) / f"{selected_filename.stem}.json" ) - + + # 跳转到准备屏幕 + self.app.push_screen(PreparationScreen(nucleon_file_path, electron_file_path)) + def on_button_pressed(self, event) -> None: """处理按钮点击事件""" button_id = event.button.id - + if button_id == "new_nucleon_button": from .nucreator import NucleonCreatorScreen + new_screen = NucleonCreatorScreen() self.app.push_screen(new_screen) - + elif button_id == "precache_all_button": from .precache import PrecachingScreen + precache_screen = PrecachingScreen() self.app.push_screen(precache_screen) - + elif button_id == "about_button": about_screen = AboutScreen() self.app.push_screen(about_screen) - + def action_quit_app(self) -> None: """退出应用程序""" - self.app.exit() \ No newline at end of file + self.app.exit()