refactor(synctool): 改进同步方案

This commit is contained in:
2025-12-21 21:18:31 +08:00
parent d8fc18166d
commit ee0646ac79
5 changed files with 25 additions and 486 deletions

1
.gitignore vendored
View File

@@ -20,6 +20,7 @@ data/global/
data/orbital/ data/orbital/
config/config_dev.toml config/config_dev.toml
AGENTS.md AGENTS.md
*.log.1
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/

View File

@@ -42,6 +42,7 @@ template_dir = "./data/template"
audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Android Termux), mpg123(TODO) audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Android Termux), mpg123(TODO)
tts = "edgetts" # 可选项: edgetts tts = "edgetts" # 可选项: edgetts
llm = "openai" # 可选项: openai llm = "openai" # 可选项: openai
sync = "webdav" # 可选项: 留空, webdav
[providers.tts.edgetts] # EdgeTTS 设置 [providers.tts.edgetts] # EdgeTTS 设置
voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声) voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声)
@@ -50,12 +51,11 @@ voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-
url = "" url = ""
key = "" key = ""
[sync.webdav] # WebDAV 同步设置 [providers.sync.webdav] # WebDAV 同步设置
enabled = false
url = "" url = ""
username = "" username = ""
password = "" password = ""
remote_path = "/heurams/" remote_path = "/heurams/"
sync_mode = "bidirectional" # bidirectional/upload_only/download_only
conflict_strategy = "newer" # newer/ask/keep_both
verify_ssl = true verify_ssl = true
[sync]

View File

@@ -20,7 +20,6 @@ class SyncScreen(Screen):
def __init__(self, nucleons: list = [], desc: str = ""): def __init__(self, nucleons: list = [], desc: str = ""):
super().__init__(name=None, id=None, classes=None) super().__init__(name=None, id=None, classes=None)
self.sync_service = None self.sync_service = None
self.sync_config = {}
self.is_syncing = False self.is_syncing = False
self.is_paused = False self.is_paused = False
self.log_messages = [] self.log_messages = []
@@ -30,35 +29,29 @@ class SyncScreen(Screen):
yield Header(show_clock=True) yield Header(show_clock=True)
with ScrollableContainer(id="sync_container"): with ScrollableContainer(id="sync_container"):
# 标题和连接状态 # 标题和连接状态
yield Static("WebDAV 同步工具", classes="title") yield Static("同步工具", classes="title")
yield Static("", id="status_label", classes="status") yield Static("", id="status_label", classes="status")
# 配置信息 # 配置信息
yield Static("服务器配置", classes="section_title") yield Static(f"同步协议: {config_var.get()['services']['sync']}")
yield Static("服务器配置:", classes="section_title")
with Horizontal(classes="config_info"): with Horizontal(classes="config_info"):
yield Static("URL:", classes="config_label") yield Static("远程服务器:", classes="config_label")
yield Static("", id="server_url", classes="config_value") yield Static("", id="server_url", classes="config_value")
with Horizontal(classes="config_info"): with Horizontal(classes="config_info"):
yield Static("远程路径:", classes="config_label") yield Static("远程路径:", classes="config_label")
yield Static("", id="remote_path", classes="config_value") yield Static("", id="remote_path", classes="config_value")
with Horizontal(classes="config_info"):
yield Static("同步模式:", classes="config_label")
yield Static("", id="sync_mode", classes="config_value")
# 控制按钮
yield Static("控制面板", classes="section_title")
with Horizontal(classes="control_buttons"): with Horizontal(classes="control_buttons"):
yield Button("测试连接", id="test_connection", variant="primary") yield Button("测试连接", id="test_connection", variant="primary")
yield Button("开始同步", id="start_sync", variant="success") yield Button("开始同步", id="start_sync", variant="success")
yield Button("暂停", id="pause_sync", variant="warning", disabled=True) yield Button("暂停", id="pause_sync", variant="warning", disabled=True)
yield Button("取消", id="cancel_sync", variant="error", disabled=True) yield Button("取消", id="cancel_sync", variant="error", disabled=True)
# 进度显示
yield Static("同步进度", classes="section_title") yield Static("同步进度", classes="section_title")
yield ProgressBar(id="progress_bar", show_percentage=True, total=100) yield ProgressBar(id="progress_bar", show_percentage=True, total=100)
yield Static("", id="progress_label", classes="progress_text") yield Static("", id="progress_label", classes="progress_text")
# 日志输出
yield Static("同步日志", classes="section_title") yield Static("同步日志", classes="section_title")
yield Static("", id="log_output", classes="log_output") yield Static("", id="log_output", classes="log_output")
@@ -66,50 +59,22 @@ class SyncScreen(Screen):
def on_mount(self): def on_mount(self):
"""挂载时初始化状态""" """挂载时初始化状态"""
self.load_config()
self.update_ui_from_config() self.update_ui_from_config()
self.log_message("同步工具已启动") self.log_message("同步工具已启动")
def load_config(self):
"""从配置文件加载同步设置"""
try:
from heurams.context import config_var
config_data = config_var.get().data
self.sync_config = config_data.get("sync", {}).get("webdav", {})
# 创建同步服务实例
from heurams.services.sync_service import create_sync_service_from_config
self.sync_service = create_sync_service_from_config()
except Exception as e:
self.log_message(f"加载配置失败: {e}", is_error=True)
self.sync_config = {}
def update_ui_from_config(self): def update_ui_from_config(self):
"""更新 UI 显示配置信息""" """更新 UI 显示配置信息"""
try: try:
sync_cfg: dict = config_var.get()['providers']['sync']['webdav']
# 更新服务器 URL # 更新服务器 URL
url = self.sync_config.get("url", "未配置") url = sync_cfg.get("url", "未配置")
url_widget = self.query_one("#server_url") url_widget = self.query_one("#server_url")
url_widget.update(url if url else "未配置") # type: ignore url_widget.update(url) # type: ignore
# 更新远程路径 # 更新远程路径
remote_path = self.sync_config.get("remote_path", "/heurams/") remote_path = sync_cfg.get("remote_path", "/")
path_widget = self.query_one("#remote_path") path_widget = self.query_one("#remote_path")
path_widget.update(remote_path) # type: ignore path_widget.update(remote_path) # type: ignore
# 更新同步模式
sync_mode = self.sync_config.get("sync_mode", "bidirectional")
mode_widget = self.query_one("#sync_mode")
mode_map = {
"bidirectional": "双向同步",
"upload_only": "仅上传",
"download_only": "仅下载",
}
mode_widget.update(mode_map.get(sync_mode, sync_mode)) # type: ignore
# 更新状态标签 # 更新状态标签
status_widget = self.query_one("#status_label") status_widget = self.query_one("#status_label")
if self.sync_service and self.sync_service.client: if self.sync_service and self.sync_service.client:

View File

@@ -1,438 +0,0 @@
# WebDAV 同步服务
import hashlib
import os
import pathlib
import time
import typing
from dataclasses import dataclass
from enum import Enum
import requests
from webdav3.client import Client
from heurams.context import config_var
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class SyncMode(Enum):
"""同步模式枚举"""
BIDIRECTIONAL = "bidirectional"
UPLOAD_ONLY = "upload_only"
DOWNLOAD_ONLY = "download_only"
class ConflictStrategy(Enum):
"""冲突解决策略枚举"""
NEWER = "newer" # 较新文件覆盖较旧文件
ASK = "ask" # 用户手动选择
KEEP_BOTH = "keep_both" # 保留双方(重命名)
@dataclass
class SyncConfig:
"""同步配置数据类"""
enabled: bool = False
url: str = ""
username: str = ""
password: str = ""
remote_path: str = "/heurams/"
sync_mode: SyncMode = SyncMode.BIDIRECTIONAL
conflict_strategy: ConflictStrategy = ConflictStrategy.NEWER
verify_ssl: bool = True
class SyncService:
"""WebDAV 同步服务"""
def __init__(self, config):
self.config = config
logger.debug(f"{str(self.config)}")
self.client = None
self._setup_client()
def _setup_client(self):
"""设置 WebDAV 客户端"""
if not self.config.enabled or not self.config.url:
logger.warning("同步服务未启用或未配置 URL")
return
options = {
"webdav_hostname": self.config.url,
"webdav_login": self.config.username,
"webdav_password": self.config.password,
"webdav_root": self.config.remote_path,
"verify_ssl": self.config.verify_ssl,
"disable_check": True, # 不检查服务器支持的功能
}
try:
self.client = Client(options)
logger.info("WebDAV 客户端初始化完成")
except Exception as e:
logger.error("WebDAV 客户端初始化失败: %s", e)
self.client = None
def test_connection(self) -> bool:
"""测试 WebDAV 服务器连接"""
if not self.client:
logger.error("WebDAV 客户端未初始化")
return False
try:
# 尝试列出根目录
self.client.list()
logger.info("WebDAV 连接测试成功")
return True
except Exception as e:
logger.error("WebDAV 连接测试失败: %s", e)
return False
def _get_local_files(self, local_dir: pathlib.Path) -> typing.Dict[str, dict]:
"""获取本地文件列表及其元数据"""
files = {}
for root, _, filenames in os.walk(local_dir):
for filename in filenames:
file_path = pathlib.Path(root) / filename
rel_path = file_path.relative_to(local_dir)
stat = file_path.stat()
files[str(rel_path)] = {
"path": file_path,
"size": stat.st_size,
"mtime": stat.st_mtime,
"hash": self._calculate_hash(file_path),
}
return files
def _get_remote_files(self) -> typing.Dict[str, dict]:
"""获取远程文件列表及其元数据"""
if not self.client:
return {}
try:
remote_list = self.client.list(recursive=True)
files = {}
for item in remote_list:
if not item.endswith("/"): # 忽略目录
rel_path = item.lstrip("/")
try:
info = self.client.info(item)
files[rel_path] = {
"path": item,
"size": info.get("size", 0),
"mtime": self._parse_remote_mtime(info),
}
except Exception as e:
logger.warning("无法获取远程文件信息 %s: %s", item, e)
return files
except Exception as e:
logger.error("获取远程文件列表失败: %s", e)
return {}
def _calculate_hash(self, file_path: pathlib.Path, block_size: int = 65536) -> str:
"""计算文件的 SHA-256 哈希值"""
sha256 = hashlib.sha256()
try:
with open(file_path, "rb") as f:
for block in iter(lambda: f.read(block_size), b""):
sha256.update(block)
return sha256.hexdigest()
except Exception as e:
logger.error("计算文件哈希失败 %s: %s", file_path, e)
return ""
def _parse_remote_mtime(self, info: dict) -> float:
"""解析远程文件的修改时间"""
# WebDAV 可能返回 Last-Modified 头或其他时间格式
# 这里简单返回当前时间,实际应根据服务器响应解析
return time.time()
def sync_directory(self, local_dir: pathlib.Path) -> typing.Dict[str, typing.Any]:
"""
同步目录
Args:
local_dir: 本地目录路径
Returns:
同步结果统计
"""
if not self.client:
logger.error("WebDAV 客户端未初始化")
return {"success": False, "error": "客户端未初始化"}
results = {
"uploaded": 0,
"downloaded": 0,
"conflicts": 0,
"errors": 0,
"success": True,
}
try:
# 确保远程目录存在
self.client.mkdir(self.config.remote_path)
local_files = self._get_local_files(local_dir)
remote_files = self._get_remote_files()
# 根据同步模式处理文件
if self.config.sync_mode in [SyncMode.BIDIRECTIONAL, SyncMode.UPLOAD_ONLY]:
stats = self._upload_files(local_dir, local_files, remote_files)
results["uploaded"] += stats.get("uploaded", 0)
results["conflicts"] += stats.get("conflicts", 0)
results["errors"] += stats.get("errors", 0)
if self.config.sync_mode in [
SyncMode.BIDIRECTIONAL,
SyncMode.DOWNLOAD_ONLY,
]:
stats = self._download_files(local_dir, local_files, remote_files)
results["downloaded"] += stats.get("downloaded", 0)
results["conflicts"] += stats.get("conflicts", 0)
results["errors"] += stats.get("errors", 0)
logger.info("同步完成: %s", results)
return results
except Exception as e:
logger.error("同步过程中发生错误: %s", e)
results["success"] = False
results["error"] = str(e)
return results
def _upload_files(
self, local_dir: pathlib.Path, local_files: dict, remote_files: dict
) -> typing.Dict[str, int]:
"""上传文件到远程服务器"""
stats = {"uploaded": 0, "errors": 0, "conflicts": 0}
for rel_path, local_info in local_files.items():
remote_info = remote_files.get(rel_path)
# 判断是否需要上传
should_upload = False
conflict_resolved = False
remote_path = os.path.join(self.config.remote_path, rel_path)
if not remote_info:
should_upload = True # 远程不存在
else:
# 检查冲突
local_mtime = local_info.get("mtime", 0)
remote_mtime = remote_info.get("mtime", 0)
if local_mtime != remote_mtime:
# 存在冲突
stats["conflicts"] += 1
should_upload, should_download = self._handle_conflict(
local_info, remote_info
)
if (
should_upload
and self.config.conflict_strategy == ConflictStrategy.KEEP_BOTH
):
# 重命名远程文件避免覆盖
conflict_suffix = f".conflict_{int(remote_mtime)}"
name, ext = os.path.splitext(rel_path)
new_rel_path = (
f"{name}{conflict_suffix}{ext}"
if ext
else f"{name}{conflict_suffix}"
)
remote_path = os.path.join(
self.config.remote_path, new_rel_path
)
conflict_resolved = True
logger.debug("冲突文件重命名: %s -> %s", rel_path, new_rel_path)
else:
# 时间相同,无需上传
should_upload = False
if should_upload:
try:
self.client.upload_file(local_info["path"], remote_path)
stats["uploaded"] += 1
logger.debug("上传文件: %s -> %s", rel_path, remote_path)
except Exception as e:
logger.error("上传文件失败 %s: %s", rel_path, e)
stats["errors"] += 1
return stats
def _download_files(
self, local_dir: pathlib.Path, local_files: dict, remote_files: dict
) -> typing.Dict[str, int]:
"""从远程服务器下载文件"""
stats = {"downloaded": 0, "errors": 0, "conflicts": 0}
for rel_path, remote_info in remote_files.items():
local_info = local_files.get(rel_path)
# 判断是否需要下载
should_download = False
if not local_info:
should_download = True # 本地不存在
else:
# 检查冲突
local_mtime = local_info.get("mtime", 0)
remote_mtime = remote_info.get("mtime", 0)
if local_mtime != remote_mtime:
# 存在冲突
stats["conflicts"] += 1
should_upload, should_download = self._handle_conflict(
local_info, remote_info
)
# 如果应该上传,则不应该下载(冲突已在上传侧处理)
if should_upload:
should_download = False
else:
# 时间相同,无需下载
should_download = False
if should_download:
try:
local_path = local_dir / rel_path
local_path.parent.mkdir(parents=True, exist_ok=True)
self.client.download_file(remote_info["path"], str(local_path))
stats["downloaded"] += 1
logger.debug("下载文件: %s -> %s", rel_path, local_path)
except Exception as e:
logger.error("下载文件失败 %s: %s", rel_path, e)
stats["errors"] += 1
return stats
def _handle_conflict(
self, local_info: dict, remote_info: dict
) -> typing.Tuple[bool, bool]:
"""
处理文件冲突
Returns:
(should_upload, should_download) - 是否应该上传和下载
"""
local_mtime = local_info.get("mtime", 0)
remote_mtime = remote_info.get("mtime", 0)
if self.config.conflict_strategy == ConflictStrategy.NEWER:
# 较新文件覆盖较旧文件
if local_mtime > remote_mtime:
return True, False # 上传本地较新版本
elif remote_mtime > local_mtime:
return False, True # 下载远程较新版本
else:
return False, False # 时间相同,无需操作
elif self.config.conflict_strategy == ConflictStrategy.KEEP_BOTH:
# 保留双方 - 重命名远程文件
# 这里实现简单的重命名策略:添加冲突后缀
# 实际应该在上传时处理重命名
# 返回 True, False 表示上传重命名后的文件
# 重命名逻辑在调用处处理
return True, False
elif self.config.conflict_strategy == ConflictStrategy.ASK:
# 用户手动选择 - 记录冲突,跳过
# 返回 False, False 跳过,等待用户决定
logger.warning(
"文件冲突需要用户手动选择: local_mtime=%s, remote_mtime=%s",
local_mtime,
remote_mtime,
)
return False, False
return False, False
def _should_upload(self, local_info: dict, remote_info: dict) -> bool:
"""判断是否需要上传(本地较新或哈希不同)"""
# 这里实现简单的基于时间的比较
# 实际应该使用哈希比较更可靠
return local_info.get("mtime", 0) > remote_info.get("mtime", 0)
def _should_download(self, local_info: dict, remote_info: dict) -> bool:
"""判断是否需要下载(远程较新)"""
return remote_info.get("mtime", 0) > local_info.get("mtime", 0)
def upload_file(self, local_path: pathlib.Path, remote_path: str = "") -> bool:
"""上传单个文件"""
if not self.client:
return False
try:
if not remote_path:
remote_path = os.path.join(self.config.remote_path, local_path.name)
self.client.upload_file(str(local_path), remote_path)
logger.info("文件上传成功: %s -> %s", local_path, remote_path)
return True
except Exception as e:
logger.error("文件上传失败: %s", e)
return False
def download_file(self, remote_path: str, local_path: pathlib.Path) -> bool:
"""下载单个文件"""
if not self.client:
return False
try:
local_path.parent.mkdir(parents=True, exist_ok=True)
self.client.download_file(remote_path, str(local_path))
logger.info("文件下载成功: %s -> %s", remote_path, local_path)
return True
except Exception as e:
logger.error("文件下载失败: %s", e)
return False
def delete_remote_file(self, remote_path: str) -> bool:
"""删除远程文件"""
if not self.client:
return False
try:
self.client.clean(remote_path)
logger.info("远程文件删除成功: %s", remote_path)
return True
except Exception as e:
logger.error("远程文件删除失败: %s", e)
return False
def create_sync_service_from_config() -> typing.Optional[SyncService]:
"""从配置文件创建同步服务实例"""
try:
from heurams.context import config_var
sync_config = config_var.get()["providers"]["sync"]["webdav"]
if not sync_config.get("enabled", False):
logger.debug("同步服务未启用")
return None
config = SyncConfig(
enabled=sync_config.get("enabled", False),
url=sync_config.get("url", ""),
username=sync_config.get("username", ""),
password=sync_config.get("password", ""),
remote_path=sync_config.get("remote_path", "/heurams/"),
sync_mode=SyncMode(sync_config.get("sync_mode", "bidirectional")),
conflict_strategy=ConflictStrategy(
sync_config.get("conflict_strategy", "newer")
),
verify_ssl=sync_config.get("verify_ssl", True),
)
service = SyncService(config)
if service.client is None:
logger.warning("同步服务客户端创建失败")
return None
return service
except Exception as e:
logger.error("创建同步服务失败: %s", e)
return None

View File

@@ -2,4 +2,15 @@
得益于 FSSpec, 无需实现大部分虚拟文件系统的 Providers 得益于 FSSpec, 无需实现大部分虚拟文件系统的 Providers
""" """
import fsspec from pathlib import Path
import fsspec as fs
class VFSObject():
def __init__(self, protocol, base_url):
self.base_url = base_url
self.protocol = protocol
self.fs = fs.filesystem(protocol=protocol, base_url = base_url)
def open(self, path: Path):
return self.fs.open(path)
def open_by_list(self, path_list: list[Path]):
return self.fs.open_files(path_list)