0.4.3 版本合并

This commit is contained in:
2025-12-21 23:44:28 +08:00
26 changed files with 830 additions and 92 deletions

1
.gitignore vendored
View File

@@ -20,6 +20,7 @@ data/global/
data/orbital/ data/orbital/
config/config_dev.toml config/config_dev.toml
AGENTS.md AGENTS.md
*.log.1
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/

View File

@@ -30,6 +30,7 @@
- 自然语音: 集成微软神经网络文本转语音 (TTS) 技术 - 自然语音: 集成微软神经网络文本转语音 (TTS) 技术
- 多种谜题类型: 选择题 (MCQ)、填空题 (Cloze)、识别题 (Recognition) - 多种谜题类型: 选择题 (MCQ)、填空题 (Cloze)、识别题 (Recognition)
- 动态内容生成: 支持宏驱动的模板系统, 根据上下文动态生成题目 - 动态内容生成: 支持宏驱动的模板系统, 根据上下文动态生成题目
- 云同步支持: 通过 WebDAV 协议同步数据到远程服务器
### 实用用户界面 ### 实用用户界面
- 响应式 Textual 框架构建的跨平台 TUI 界面 - 响应式 Textual 框架构建的跨平台 TUI 界面
@@ -82,7 +83,23 @@ python -m heurams.interface
## 配置 ## 配置
配置文件位于 `config/config.toml`(相对于工作目录). 如果不存在, 会使用内置的默认配置. 配置文件位于 `config/config.toml`(相对于工作目录). 如果不存在, 会使用内置的默认配置.
### 同步配置
同步功能支持 WebDAV 协议,可在配置文件的 `[sync.webdav]` 段进行配置:
```toml
[sync.webdav]
enabled = false
url = "" # WebDAV 服务器地址
username = "" # 用户名
password = "" # 密码
remote_path = "/heurams/" # 远程路径
sync_mode = "bidirectional" # 同步模式: bidirectional/upload_only/download_only
conflict_strategy = "newer" # 冲突策略: newer/ask/keep_both
verify_ssl = true # SSL 证书验证
```
启用同步后,可通过应用内的同步工具进行数据备份和恢复。
## 项目结构 ## 项目结构
@@ -104,6 +121,7 @@ graph TB
Timer[时间服务] Timer[时间服务]
AudioService[音频服务] AudioService[音频服务]
TTSService[TTS服务] TTSService[TTS服务]
SyncService[同步服务]
OtherServices[其他服务] OtherServices[其他服务]
end end
@@ -156,7 +174,8 @@ src/heurams/
│ ├── logger.py # 日志系统 │ ├── logger.py # 日志系统
│ ├── timer.py # 时间服务 │ ├── timer.py # 时间服务
│ ├── audio_service.py # 音频播放抽象 │ ├── audio_service.py # 音频播放抽象
── tts_service.py # 文本转语音抽象 ── tts_service.py # 文本转语音抽象
│ └── sync_service.py # WebDAV 同步服务
├── kernel/ # 核心业务逻辑 ├── kernel/ # 核心业务逻辑
│ ├── algorithms/ # 间隔重复算法 (FSRS, SM2) │ ├── algorithms/ # 间隔重复算法 (FSRS, SM2)
│ ├── particles/ # 数据模型 (Atom, Electron, Nucleon, Orbital) │ ├── particles/ # 数据模型 (Atom, Electron, Nucleon, Orbital)

View File

@@ -42,6 +42,7 @@ template_dir = "./data/template"
audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Android Termux), mpg123(TODO) audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Android Termux), mpg123(TODO)
tts = "edgetts" # 可选项: edgetts tts = "edgetts" # 可选项: edgetts
llm = "openai" # 可选项: openai llm = "openai" # 可选项: openai
sync = "webdav" # 可选项: 留空, webdav
[providers.tts.edgetts] # EdgeTTS 设置 [providers.tts.edgetts] # EdgeTTS 设置
voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声) voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声)
@@ -49,3 +50,12 @@ voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-
[providers.llm.openai] # 与 OpenAI 相容的语言模型接口服务设置 [providers.llm.openai] # 与 OpenAI 相容的语言模型接口服务设置
url = "" url = ""
key = "" key = ""
[providers.sync.webdav] # WebDAV 同步设置
url = ""
username = ""
password = ""
remote_path = "/heurams/"
verify_ssl = true
[sync]

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "heurams" name = "heurams"
version = "0.4.0" version = "0.4.3"
description = "Heuristic Assisted Memory Scheduler" description = "Heuristic Assisted Memory Scheduler"
license = {file = "LICENSE"} license = {file = "LICENSE"}
classifiers = [ classifiers = [

View File

@@ -2,3 +2,5 @@ bidict==0.23.1
playsound==1.2.2 playsound==1.2.2
textual==5.3.0 textual==5.3.0
toml==0.10.2 toml==0.10.2
requests>=2.31.0
webdavclient3>=3.0.0

View File

@@ -32,11 +32,12 @@ try:
except Exception as e: except Exception as e:
print("未能加载自定义用户配置") print("未能加载自定义用户配置")
logger.warning("未能加载自定义用户配置, 错误: %s", e) logger.warning("未能加载自定义用户配置, 错误: %s", e)
if pathlib.Path(rootdir / "default" / "config" / "config_dev.toml").exists(): if pathlib.Path(workdir / "config" / "config_dev.toml").exists():
print("使用开发设置")
logger.debug("使用开发设置") logger.debug("使用开发设置")
config_var: ContextVar[ConfigFile] = ContextVar( config_var: ContextVar[ConfigFile] = ContextVar(
"config_var", default=ConfigFile(workdir / "config" / "config_dev.toml") "config_var", default=ConfigFile(workdir / "config" / "config_dev.toml")
) )
# runtime_var: ContextVar = ContextVar('runtime_var', default=dict()) # 运行时共享数据 # runtime_var: ContextVar = ContextVar('runtime_var', default=dict()) # 运行时共享数据

View File

@@ -14,6 +14,14 @@ scheduled_num = 8
# UTC 时间戳修正 仅用于 UNIX 日时间戳的生成修正, 单位为秒 # UTC 时间戳修正 仅用于 UNIX 日时间戳的生成修正, 单位为秒
timezone_offset = +28800 # 中国标准时间 (UTC+8) timezone_offset = +28800 # 中国标准时间 (UTC+8)
[interface]
[interface.memorizor]
autovoice = true # 自动语音播放, 仅限于 recognition 组件
[algorithm]
default = "SM-2" # 主要算法; 可选项: SM-2, SM-15M, FSRS
[puzzles] # 谜题默认配置 [puzzles] # 谜题默认配置
[puzzles.mcq] [puzzles.mcq]
@@ -25,6 +33,7 @@ min_denominator = 3
[paths] # 相对于配置文件的 ".." (即工作目录) 而言 或绝对路径 [paths] # 相对于配置文件的 ".." (即工作目录) 而言 或绝对路径
nucleon_dir = "./data/nucleon" nucleon_dir = "./data/nucleon"
electron_dir = "./data/electron" electron_dir = "./data/electron"
global_dir = "./data/global" # 全局数据路径, SM-15 等算法需要
orbital_dir = "./data/orbital" orbital_dir = "./data/orbital"
cache_dir = "./data/cache" cache_dir = "./data/cache"
template_dir = "./data/template" template_dir = "./data/template"
@@ -33,7 +42,20 @@ template_dir = "./data/template"
audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Android Termux), mpg123(TODO) audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Android Termux), mpg123(TODO)
tts = "edgetts" # 可选项: edgetts tts = "edgetts" # 可选项: edgetts
llm = "openai" # 可选项: openai llm = "openai" # 可选项: openai
sync = "webdav" # 可选项: 留空, webdav
[providers.tts.edgetts] # EdgeTTS 设置
voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声)
[providers.llm.openai] # 与 OpenAI 相容的语言模型接口服务设置 [providers.llm.openai] # 与 OpenAI 相容的语言模型接口服务设置
url = "" url = ""
key = "" key = ""
[providers.sync.webdav] # WebDAV 同步设置
url = ""
username = ""
password = ""
remote_path = "/heurams/"
verify_ssl = true
[sync]

View File

@@ -8,6 +8,7 @@ from .screens.about import AboutScreen
from .screens.dashboard import DashboardScreen from .screens.dashboard import DashboardScreen
from .screens.nucreator import NucleonCreatorScreen from .screens.nucreator import NucleonCreatorScreen
from .screens.precache import PrecachingScreen from .screens.precache import PrecachingScreen
from .screens.synctool import SyncScreen
logger = get_logger(__name__) logger = get_logger(__name__)
@@ -39,12 +40,14 @@ class HeurAMSApp(App):
("1", "app.push_screen('dashboard')", "仪表盘"), ("1", "app.push_screen('dashboard')", "仪表盘"),
("2", "app.push_screen('precache_all')", "缓存管理器"), ("2", "app.push_screen('precache_all')", "缓存管理器"),
("3", "app.push_screen('nucleon_creator')", "创建新单元"), ("3", "app.push_screen('nucleon_creator')", "创建新单元"),
# ("4", "app.push_screen('synctool')", "同步工具"),
("0", "app.push_screen('about')", "版本信息"), ("0", "app.push_screen('about')", "版本信息"),
] ]
SCREENS = { SCREENS = {
"dashboard": DashboardScreen, "dashboard": DashboardScreen,
"nucleon_creator": NucleonCreatorScreen, "nucleon_creator": NucleonCreatorScreen,
"precache_all": PrecachingScreen, "precache_all": PrecachingScreen,
"synctool": SyncScreen,
"about": AboutScreen, "about": AboutScreen,
} }

View File

@@ -1,9 +1,9 @@
from textual.app import App from textual.app import App
from textual.widgets import Button from textual.widgets import Button
from heurams.services.logger import get_logger
from heurams.context import config_var from heurams.context import config_var
from heurams.interface import HeurAMSApp from heurams.interface import HeurAMSApp
from heurams.services.logger import get_logger
from .screens.about import AboutScreen from .screens.about import AboutScreen
from .screens.dashboard import DashboardScreen from .screens.dashboard import DashboardScreen
@@ -15,4 +15,4 @@ logger = get_logger(__name__)
app = HeurAMSApp() app = HeurAMSApp()
if __name__ == "__main__": if __name__ == "__main__":
app.run() app.run()

View File

@@ -31,7 +31,8 @@ class AboutScreen(Screen):
特别感谢: 特别感谢:
- [Piotr A. Woźniak](https://supermemo.guru/wiki/Piotr_Wozniak): SuperMemo-2 算法 - [Piotr A. Woźniak](https://supermemo.guru/wiki/Piotr_Wozniak): SM-2 算法与 SM-15 算法理论
- [Kazuaki Tanida](https://github.com/slaypni): SM-15 算法的 CoffeeScript 实现
- [Thoughts Memo](https://www.zhihu.com/people/L.M.Sherlock): 文献参考 - [Thoughts Memo](https://www.zhihu.com/people/L.M.Sherlock): 文献参考
# 参与贡献 # 参与贡献

View File

@@ -20,128 +20,188 @@ logger = get_logger(__name__)
class DashboardScreen(Screen): class DashboardScreen(Screen):
"""主仪表盘屏幕"""
SUB_TITLE = "仪表盘" SUB_TITLE = "仪表盘"
def __init__(
self,
name: str | None = None,
id: str | None = None,
classes: str | None = None,
) -> None:
super().__init__(name, id, classes)
self.nextdates = {}
self.texts = {}
self.stay_enabled = {}
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
"""组合界面组件"""
yield Header(show_clock=True) yield Header(show_clock=True)
yield ScrollableContainer( yield ScrollableContainer(
Label(f'欢迎使用 "潜进" 启发式辅助记忆调度器', classes="title-label"), Label('欢迎使用 "潜进" 启发式辅助记忆调度器', classes="title-label"),
Label(f"当前 UNIX 日时间戳: {timer.get_daystamp()}"), Label(f"当前 UNIX 日时间戳: {timer.get_daystamp()}"),
Label(f'时区修正: UTC+{config_var.get()["timezone_offset"] / 3600}'), Label(f'时区修正: UTC+{config_var.get()["timezone_offset"] / 3600}'),
Label(f"使用算法: {config_var.get()['algorithm']['default']}"),
Label("选择待学习或待修改的记忆单元集:", classes="title-label"), Label("选择待学习或待修改的记忆单元集:", classes="title-label"),
ListView(id="union-list", classes="union-list-view"), ListView(id="union-list", classes="union-list-view"),
Label( Label(
f'"潜进" 启发式辅助记忆调度器 | 版本 {version.ver} {version.codename.capitalize()} 2025' f'"潜进" 启发式辅助记忆调度器 | 版本 {version.ver} '
f"{version.codename.capitalize()} 2025"
), ),
) )
yield Footer() yield Footer()
def item_desc_generator(self, filename) -> dict: def analyser(self, filename: str) -> dict:
"""简单分析以生成项目项显示文本 """分析文件状态以生成显示文本
Args:
filename: 要分析的文件名
Returns: Returns:
dict: 以数字为列表, 分别呈现单行字符串 dict: 包含显示文本的字典,键为行号
""" """
res = dict() from heurams.kernel.particles.loader import load_electron, load_nucleon
filestem = pathlib.Path(filename).stem
res[0] = f"{filename}\0"
import heurams.kernel.particles as pt
from heurams.kernel.particles.loader import load_electron
electron_file_path = pathlib.Path(config_var.get()["paths"]["electron_dir"]) / ( result = {}
filestem + ".json" filestem = pathlib.Path(filename).stem
)
# 构建电子文件路径
electron_dir = config_var.get()["paths"]["electron_dir"]
electron_file_path = pathlib.Path(electron_dir) / f"{filestem}.json"
logger.debug(f"电子文件路径: {electron_file_path}") logger.debug(f"电子文件路径: {electron_file_path}")
if electron_file_path.exists(): # 未找到则创建电子文件 (json) # 确保电子文件存在
pass if not electron_file_path.exists():
else:
electron_file_path.touch() electron_file_path.touch()
with open(electron_file_path, "w") as f: electron_file_path.write_text("{}")
f.write("{}")
electron_dict = load_electron(path=electron_file_path) # TODO: 取消硬编码扩展名 # 加载电子数据
logger.debug(electron_dict) electron_dict = load_electron(path=electron_file_path)
logger.debug(f"电子数据: {electron_dict}")
# 分析电子状态
is_due = 0 is_due = 0
is_activated = 0 is_activated = 0
nextdate = 0x3F3F3F3F nextdate = 0x3F3F3F3F
for i in electron_dict.values():
i: pt.Electron for electron in electron_dict.values():
logger.debug(i, i.is_due()) logger.debug(f"{electron}, 是否到期: {electron.is_due()}")
if i.is_due():
if electron.is_due():
is_due = 1 is_due = 1
if i.is_activated(): if electron.is_activated():
is_activated = 1 is_activated = 1
nextdate = min(nextdate, i.nextdate()) nextdate = min(nextdate, electron.nextdate())
res[1] = f"下一次复习: {nextdate}\n"
res[1] += f"{"需要复习" if is_due else "当前无需复习"}" # 检查是否需要更多复习
nucleon_dir = config_var.get()["paths"]["nucleon_dir"]
nucleon_path = pathlib.Path(nucleon_dir) / f"{filestem}.toml"
nucleon_count = len(load_nucleon(nucleon_path))
electron_count = len(electron_dict)
is_more = not (electron_count >= nucleon_count)
logger.debug(f"是否需要更多复习: {is_more}")
# 更新状态
self.nextdates[filename] = nextdate
self.stay_enabled[filename] = is_due or is_more
# 构建返回结果
result[0] = f"{filename}\0"
if not is_activated: if not is_activated:
res[1] = " 尚未激活" result[1] = " 尚未激活"
return res else:
status_text = "需要复习" if is_due else "当前无需复习"
result[1] = f"下一次复习: {nextdate}\n{status_text}"
return result
def on_mount(self) -> None: def on_mount(self) -> None:
"""挂载组件时初始化"""
union_list_widget = self.query_one("#union-list", ListView) union_list_widget = self.query_one("#union-list", ListView)
probe = probe_all(0) probe = probe_all(0)
if len(probe["nucleon"]): # 分析所有文件
for file in probe["nucleon"]: for file in probe["nucleon"]:
text = self.item_desc_generator(file) self.texts[file] = self.analyser(file)
union_list_widget.append(
ListItem( # 按下次复习时间排序
Label(text[0] + "\n" + text[1]), nucleon_files = sorted(
) probe["nucleon"],
) key=lambda f: self.nextdates[f],
else: reverse=True,
)
# 填充列表
if not probe["nucleon"]:
union_list_widget.append( union_list_widget.append(
ListItem( ListItem(
Static( Static(
"在 ./nucleon/ 中未找到任何内容源数据文件.\n请放置文件后重启应用.\n或者新建空的单元集." "在 ./nucleon/ 中未找到任何内容源数据文件\n"
"请放置文件后重启应用,或者新建空的单元集。"
) )
) )
) )
union_list_widget.disabled = True union_list_widget.disabled = True
return
for file in nucleon_files:
text = self.texts[file]
list_item = ListItem(Label(f"{text[0]}\n{text[1]}"))
union_list_widget.append(list_item)
if not self.stay_enabled[file]:
list_item.disabled = True
def on_list_view_selected(self, event) -> None: def on_list_view_selected(self, event) -> None:
"""处理列表项选择事件"""
if not isinstance(event.item, ListItem): if not isinstance(event.item, ListItem):
return return
selected_label = event.item.query_one(Label) selected_label = event.item.query_one(Label)
if "未找到任何 .toml 文件" in str(selected_label.renderable): # type: ignore label_text = str(selected_label.renderable)
if "未找到任何 .toml 文件" in label_text:
return return
selected_filename = pathlib.Path( # 提取文件名
str(selected_label.renderable) selected_filename = pathlib.Path(label_text.partition("\0")[0].replace("*", ""))
.partition("\0")[0] # 文件名末尾截断, 保留文件名
.replace("*", "")
) # 去除markdown加粗
nucleon_file_path = ( # 构建文件路径
pathlib.Path(config_var.get()["paths"]["nucleon_dir"]) / selected_filename nucleon_dir = config_var.get()["paths"]["nucleon_dir"]
) electron_dir = config_var.get()["paths"]["electron_dir"]
electron_file_path = pathlib.Path(config_var.get()["paths"]["electron_dir"]) / (
str(selected_filename.stem) + ".json" nucleon_file_path = pathlib.Path(nucleon_dir) / selected_filename
electron_file_path = (
pathlib.Path(electron_dir) / f"{selected_filename.stem}.json"
) )
# 跳转到准备屏幕
self.app.push_screen(PreparationScreen(nucleon_file_path, electron_file_path)) self.app.push_screen(PreparationScreen(nucleon_file_path, electron_file_path))
def on_button_pressed(self, event) -> None: def on_button_pressed(self, event) -> None:
if event.button.id == "new_nucleon_button": """处理按钮点击事件"""
# 切换到创建单元 button_id = event.button.id
if button_id == "new_nucleon_button":
from .nucreator import NucleonCreatorScreen from .nucreator import NucleonCreatorScreen
newscr = NucleonCreatorScreen() new_screen = NucleonCreatorScreen()
self.app.push_screen(newscr) self.app.push_screen(new_screen)
elif event.button.id == "precache_all_button":
# 切换到缓存管理器 elif button_id == "precache_all_button":
from .precache import PrecachingScreen from .precache import PrecachingScreen
precache_screen = PrecachingScreen() precache_screen = PrecachingScreen()
self.app.push_screen(precache_screen) self.app.push_screen(precache_screen)
elif event.button.id == "about_button":
from .about import AboutScreen
elif button_id == "about_button":
about_screen = AboutScreen() about_screen = AboutScreen()
self.app.push_screen(about_screen) self.app.push_screen(about_screen)
def action_quit_app(self) -> None: def action_quit_app(self) -> None:
"""退出应用程序"""
self.app.exit() self.app.exit()

View File

@@ -148,16 +148,24 @@ class MemScreen(Screen):
def play_voice(self): def play_voice(self):
"""朗读当前内容""" """朗读当前内容"""
from heurams.services.audio_service import play_by_path
from pathlib import Path from pathlib import Path
from heurams.services.audio_service import play_by_path
from heurams.services.hasher import get_md5 from heurams.services.hasher import get_md5
path = Path(config_var.get()['paths']["cache_dir"])
path = path / f"{get_md5(self.atom.registry['nucleon'].metadata["formation"]["tts_text"])}.wav" path = Path(config_var.get()["paths"]["cache_dir"])
path = (
path
/ f"{get_md5(self.atom.registry['nucleon'].metadata["formation"]["tts_text"])}.wav"
)
if path.exists(): if path.exists():
play_by_path(path) play_by_path(path)
else: else:
from heurams.services.tts_service import convertor from heurams.services.tts_service import convertor
convertor(self.atom.registry['nucleon'].metadata["formation"]["tts_text"], path)
convertor(
self.atom.registry["nucleon"].metadata["formation"]["tts_text"], path
)
play_by_path(path) play_by_path(path)
def action_toggle_dark(self): def action_toggle_dark(self):

View File

@@ -99,6 +99,7 @@ class PrecachingScreen(Screen):
if not cache_file.exists(): if not cache_file.exists():
try: try:
from heurams.services.tts_service import convertor from heurams.services.tts_service import convertor
convertor(text, cache_file) convertor(text, cache_file)
return 1 return 1
except Exception as e: except Exception as e:

View File

@@ -1,5 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import pathlib import pathlib
import time
from textual.app import ComposeResult from textual.app import ComposeResult
from textual.containers import Horizontal, ScrollableContainer from textual.containers import Horizontal, ScrollableContainer
@@ -18,22 +19,287 @@ class SyncScreen(Screen):
def __init__(self, nucleons: list = [], desc: str = ""): def __init__(self, nucleons: list = [], desc: str = ""):
super().__init__(name=None, id=None, classes=None) super().__init__(name=None, id=None, classes=None)
self.sync_service = None
self.is_syncing = False
self.is_paused = False
self.log_messages = []
self.max_log_lines = 50
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
yield Header(show_clock=True) yield Header(show_clock=True)
with ScrollableContainer(id="sync_container"): with ScrollableContainer(id="sync_container"):
pass # 标题和连接状态
yield Static("同步工具", classes="title")
yield Static("", id="status_label", classes="status")
# 配置信息
yield Static(f"同步协议: {config_var.get()['services']['sync']}")
yield Static("服务器配置:", classes="section_title")
with Horizontal(classes="config_info"):
yield Static("远程服务器:", classes="config_label")
yield Static("", id="server_url", classes="config_value")
with Horizontal(classes="config_info"):
yield Static("远程路径:", classes="config_label")
yield Static("", id="remote_path", classes="config_value")
with Horizontal(classes="control_buttons"):
yield Button("测试连接", id="test_connection", variant="primary")
yield Button("开始同步", id="start_sync", variant="success")
yield Button("暂停", id="pause_sync", variant="warning", disabled=True)
yield Button("取消", id="cancel_sync", variant="error", disabled=True)
yield Static("同步进度", classes="section_title")
yield ProgressBar(id="progress_bar", show_percentage=True, total=100)
yield Static("", id="progress_label", classes="progress_text")
yield Static("同步日志", classes="section_title")
yield Static("", id="log_output", classes="log_output")
yield Footer() yield Footer()
def on_mount(self): def on_mount(self):
"""挂载时初始化状态""" """挂载时初始化状态"""
self.update_ui_from_config()
self.log_message("同步工具已启动")
def update_ui_from_config(self):
"""更新 UI 显示配置信息"""
try:
sync_cfg: dict = config_var.get()["providers"]["sync"]["webdav"]
# 更新服务器 URL
url = sync_cfg.get("url", "未配置")
url_widget = self.query_one("#server_url")
url_widget.update(url) # type: ignore
# 更新远程路径
remote_path = sync_cfg.get("remote_path", "/")
path_widget = self.query_one("#remote_path")
path_widget.update(remote_path) # type: ignore
# 更新状态标签
status_widget = self.query_one("#status_label")
if self.sync_service and self.sync_service.client:
status_widget.update("✅ 同步服务已就绪") # type: ignore
status_widget.add_class("ready")
else:
status_widget.update("❌ 同步服务未配置或未启用") # type: ignore
status_widget.add_class("error")
except Exception as e:
self.log_message(f"更新 UI 失败: {e}", is_error=True)
def update_status(self, status, current_item="", progress=None): def update_status(self, status, current_item="", progress=None):
"""更新状态显示""" """更新状态显示"""
try:
status_widget = self.query_one("#status_label")
status_widget.update(status) # type: ignore
if progress is not None:
progress_bar = self.query_one("#progress_bar")
progress_bar.progress = progress # type: ignore
progress_label = self.query_one("#progress_label")
progress_label.update(f"{progress}% - {current_item}" if current_item else f"{progress}%") # type: ignore
except Exception as e:
self.log_message(f"更新状态失败: {e}", is_error=True)
def log_message(self, message: str, is_error: bool = False):
"""添加日志消息并更新显示"""
timestamp = time.strftime("%H:%M:%S")
prefix = "[ERROR]" if is_error else "[INFO]"
log_line = f"{timestamp} {prefix} {message}"
self.log_messages.append(log_line)
# 保持日志行数不超过最大值
if len(self.log_messages) > self.max_log_lines:
self.log_messages = self.log_messages[-self.max_log_lines :]
# 更新日志显示
try:
log_widget = self.query_one("#log_output")
log_widget.update("\n".join(self.log_messages)) # type: ignore
except Exception:
pass # 如果组件未就绪,忽略错误
def on_button_pressed(self, event: Button.Pressed) -> None: def on_button_pressed(self, event: Button.Pressed) -> None:
"""处理按钮点击事件"""
button_id = event.button.id
if button_id == "test_connection":
self.test_connection()
elif button_id == "start_sync":
self.start_sync()
elif button_id == "pause_sync":
self.pause_sync()
elif button_id == "cancel_sync":
self.cancel_sync()
event.stop() event.stop()
def test_connection(self):
"""测试 WebDAV 服务器连接"""
if not self.sync_service:
self.log_message("同步服务未初始化,请检查配置", is_error=True)
self.update_status("❌ 同步服务未初始化")
return
self.log_message("正在测试 WebDAV 连接...")
self.update_status("正在测试连接...")
try:
success = self.sync_service.test_connection()
if success:
self.log_message("连接测试成功")
self.update_status("✅ 连接正常")
else:
self.log_message("连接测试失败", is_error=True)
self.update_status("❌ 连接失败")
except Exception as e:
self.log_message(f"连接测试异常: {e}", is_error=True)
self.update_status("❌ 连接异常")
def start_sync(self):
"""开始同步"""
if not self.sync_service:
self.log_message("同步服务未初始化,无法开始同步", is_error=True)
return
if self.is_syncing:
self.log_message("同步已在进行中", is_error=True)
return
self.is_syncing = True
self.is_paused = False
self.update_button_states()
self.log_message("开始同步数据...")
self.update_status("正在同步...", progress=0)
# 启动后台同步任务
self.run_worker(self.perform_sync, thread=True)
def perform_sync(self):
"""执行同步任务(在后台线程中运行)"""
worker = get_current_worker()
try:
# 获取需要同步的本地目录
from heurams.context import config_var
config = config_var.get()
paths = config.get("paths", {})
# 同步 nucleon 目录
nucleon_dir = pathlib.Path(paths.get("nucleon_dir", "./data/nucleon"))
if nucleon_dir.exists():
self.log_message(f"同步 nucleon 目录: {nucleon_dir}")
self.update_status(f"同步 nucleon 目录...", progress=10)
result = self.sync_service.sync_directory(nucleon_dir) # type: ignore
if result.get("success"):
self.log_message(
f"nucleon 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)}"
)
else:
self.log_message(
f"nucleon 同步失败: {result.get('error', '未知错误')}",
is_error=True,
)
# 同步 electron 目录
electron_dir = pathlib.Path(paths.get("electron_dir", "./data/electron"))
if electron_dir.exists():
self.log_message(f"同步 electron 目录: {electron_dir}")
self.update_status(f"同步 electron 目录...", progress=60)
result = self.sync_service.sync_directory(electron_dir) # type: ignore
if result.get("success"):
self.log_message(
f"electron 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)}"
)
else:
self.log_message(
f"electron 同步失败: {result.get('error', '未知错误')}",
is_error=True,
)
# 同步 orbital 目录(如果存在)
orbital_dir = pathlib.Path(paths.get("orbital_dir", "./data/orbital"))
if orbital_dir.exists():
self.log_message(f"同步 orbital 目录: {orbital_dir}")
self.update_status(f"同步 orbital 目录...", progress=80)
result = self.sync_service.sync_directory(orbital_dir) # type: ignore
if result.get("success"):
self.log_message(
f"orbital 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)}"
)
else:
self.log_message(
f"orbital 同步失败: {result.get('error', '未知错误')}",
is_error=True,
)
# 同步完成
self.update_status("同步完成", progress=100)
self.log_message("所有目录同步完成")
except Exception as e:
self.log_message(f"同步过程中发生错误: {e}", is_error=True)
self.update_status("同步失败")
finally:
# 重置同步状态
self.is_syncing = False
self.is_paused = False
self.update_button_states() # type: ignore
def pause_sync(self):
"""暂停同步"""
if not self.is_syncing:
return
self.is_paused = not self.is_paused
self.update_button_states()
if self.is_paused:
self.log_message("同步已暂停")
self.update_status("同步已暂停")
else:
self.log_message("同步已恢复")
self.update_status("正在同步...")
def cancel_sync(self):
"""取消同步"""
if not self.is_syncing:
return
self.is_syncing = False
self.is_paused = False
self.update_button_states()
self.log_message("同步已取消")
self.update_status("同步已取消")
def update_button_states(self):
"""更新按钮状态"""
try:
start_button = self.query_one("#start_sync")
pause_button = self.query_one("#pause_sync")
cancel_button = self.query_one("#cancel_sync")
if self.is_syncing:
start_button.disabled = True
pause_button.disabled = False
cancel_button.disabled = False
pause_button.label = "继续" if self.is_paused else "暂停" # type: ignore
else:
start_button.disabled = False
pause_button.disabled = True
cancel_button.disabled = True
except Exception as e:
self.log_message(f"更新按钮状态失败: {e}", is_error=True)
def action_go_back(self): def action_go_back(self):
self.app.pop_screen() self.app.pop_screen()

View File

@@ -50,9 +50,10 @@ class Recognition(BasePuzzleWidget):
def compose(self): def compose(self):
from heurams.context import config_var from heurams.context import config_var
autovoice = config_var.get()['interface']['memorizor']['autovoice']
autovoice = config_var.get()["interface"]["memorizor"]["autovoice"]
if autovoice: if autovoice:
self.screen.action_play_voice() # type: ignore self.screen.action_play_voice() # type: ignore
cfg: RecognitionConfig = self.atom.registry["orbital"]["puzzles"][self.alia] cfg: RecognitionConfig = self.atom.registry["orbital"]["puzzles"][self.alia]
delim = self.atom.registry["nucleon"].metadata["formation"]["delimiter"] delim = self.atom.registry["nucleon"].metadata["formation"]["delimiter"]
replace_dict = { replace_dict = {
@@ -72,7 +73,7 @@ class Recognition(BasePuzzleWidget):
primary = cfg["primary"] primary = cfg["primary"]
with Center(): with Center():
for i in cfg['top_dim']: for i in cfg["top_dim"]:
yield Static(f"[dim]{i}[/]") yield Static(f"[dim]{i}[/]")
yield Label("") yield Label("")

View File

@@ -10,15 +10,18 @@ MIT 许可证
import datetime import datetime
import json import json
import os import os
from typing import TypedDict
import pathlib import pathlib
from typing import TypedDict
from heurams.context import config_var from heurams.context import config_var
from heurams.kernel.algorithms.sm15m_calc import (MAX_AF, MIN_AF, NOTCH_AF, from heurams.kernel.algorithms.sm15m_calc import (MAX_AF, MIN_AF, NOTCH_AF,
RANGE_AF, RANGE_REPETITION, RANGE_AF, RANGE_REPETITION,
SM, THRESHOLD_RECALL, Item) SM, THRESHOLD_RECALL, Item)
# 全局状态文件路径 # 全局状态文件路径
_GLOBAL_STATE_FILE = os.path.expanduser(pathlib.Path(config_var.get()['paths']['global_dir']) / 'sm15m_global_state.json') _GLOBAL_STATE_FILE = os.path.expanduser(
pathlib.Path(config_var.get()["paths"]["global_dir"]) / "sm15m_global_state.json"
)
def _get_global_sm(): def _get_global_sm():

View File

@@ -86,8 +86,8 @@ class Atom:
# eval 环境设置 # eval 环境设置
def eval_with_env(s: str): def eval_with_env(s: str):
default = config_var.get()["puzzles"] default = config_var.get()["puzzles"]
payload = self.registry['nucleon'].payload payload = self.registry["nucleon"].payload
metadata = self.registry['nucleon'].metadata metadata = self.registry["nucleon"].metadata
eval_value = eval(s) eval_value = eval(s)
if isinstance(eval_value, (int, float)): if isinstance(eval_value, (int, float)):
ret = str(eval_value) ret = str(eval_value)
@@ -117,10 +117,11 @@ class Atom:
logger.debug("发现 eval 表达式: '%s'", data[5:]) logger.debug("发现 eval 表达式: '%s'", data[5:])
return modifier(data[5:]) return modifier(data[5:])
return data return data
try: try:
traverse(self.registry['nucleon'].payload, eval_with_env) traverse(self.registry["nucleon"].payload, eval_with_env)
traverse(self.registry['nucleon'].metadata, eval_with_env) traverse(self.registry["nucleon"].metadata, eval_with_env)
traverse(self.registry['orbital'], eval_with_env) traverse(self.registry["orbital"], eval_with_env)
except Exception as e: except Exception as e:
ret = f"此 eval 实例发生错误: {e}" ret = f"此 eval 实例发生错误: {e}"
logger.warning(ret) logger.warning(ret)

View File

@@ -18,9 +18,12 @@ class Electron:
algo: 使用的算法模块标识 algo: 使用的算法模块标识
""" """
if algo_name == "": if algo_name == "":
algo_name = config_var.get()['algorithm']['default'] algo_name = config_var.get()["algorithm"]["default"]
logger.debug( logger.debug(
"创建 Electron 实例, ident: '%s', algo_name: '%s', algodata: %s", ident, algo_name, algodata "创建 Electron 实例, ident: '%s', algo_name: '%s', algodata: %s",
ident,
algo_name,
algodata,
) )
self.algodata = algodata self.algodata = algodata
self.ident = ident self.ident = ident
@@ -31,7 +34,9 @@ class Electron:
self.algodata[self.algo.algo_name] = {} self.algodata[self.algo.algo_name] = {}
logger.debug("算法键 '%s' 不存在, 已创建空字典", self.algo) logger.debug("算法键 '%s' 不存在, 已创建空字典", self.algo)
if not self.algodata[self.algo.algo_name]: if not self.algodata[self.algo.algo_name]:
logger.debug(f"算法数据为空, 使用默认值初始化{self.algodata[self.algo.algo_name]}") logger.debug(
f"算法数据为空, 使用默认值初始化{self.algodata[self.algo.algo_name]}"
)
self._default_init(self.algo.defaults) self._default_init(self.algo.defaults)
else: else:
logger.debug("算法数据已存在, 跳过默认初始化") logger.debug("算法数据已存在, 跳过默认初始化")

View File

@@ -53,4 +53,4 @@ class Nucleon:
def placeholder(): def placeholder():
"""生成一个占位原子核""" """生成一个占位原子核"""
logger.debug("创建 Nucleon 占位符") logger.debug("创建 Nucleon 占位符")
return Nucleon("核子对象样例内容", {}) return Nucleon("核子对象样例内容", {})

View File

@@ -2,8 +2,8 @@ import pathlib
import edge_tts import edge_tts
from heurams.services.logger import get_logger
from heurams.context import config_var from heurams.context import config_var
from heurams.services.logger import get_logger
from .base import BaseTTS from .base import BaseTTS
@@ -19,7 +19,7 @@ class EdgeTTS(BaseTTS):
try: try:
communicate = edge_tts.Communicate( communicate = edge_tts.Communicate(
text, text,
config_var.get()['providers']['tts']['edgetts']["voice"], config_var.get()["providers"]["tts"]["edgetts"]["voice"],
) )
logger.debug("EdgeTTS 通信对象创建成功, 正在保存音频") logger.debug("EdgeTTS 通信对象创建成功, 正在保存音频")
communicate.save_sync(str(path)) communicate.save_sync(str(path))

View File

@@ -3,7 +3,7 @@ from heurams.services.logger import get_logger
logger = get_logger(__name__) logger = get_logger(__name__)
ver = "0.4.2" ver = "0.4.3"
stage = "prototype" stage = "prototype"
codename = "fledge" # 雏鸟, 0.4.x 版本 codename = "fledge" # 雏鸟, 0.4.x 版本

View File

@@ -0,0 +1,20 @@
"""vfs.py
得益于 FSSpec, 无需实现大部分虚拟文件系统的 Providers
"""
from pathlib import Path
import fsspec as fs
class VFSObject:
def __init__(self, protocol, base_url):
self.base_url = base_url
self.protocol = protocol
self.fs = fs.filesystem(protocol=protocol, base_url=base_url)
def open(self, path: Path):
return self.fs.open(path)
def open_by_list(self, path_list: list[Path]):
return self.fs.open_files(path_list)

View File

View File

@@ -89,7 +89,7 @@ class TestDashboardScreenUnit(unittest.TestCase):
screen = DashboardScreen() screen = DashboardScreen()
# 模拟一个文件名 # 模拟一个文件名
filename = "test.toml" filename = "test.toml"
result = screen.item_desc_generator(filename) result = screen.analyser(filename)
self.assertIsInstance(result, dict) self.assertIsInstance(result, dict)
self.assertIn(0, result) self.assertIn(0, result)
self.assertIn(1, result) self.assertIn(1, result)

View File

@@ -0,0 +1,314 @@
#!/usr/bin/env python3
"""
SyncScreen 和 SyncService 的测试.
"""
import pathlib
import tempfile
import time
import unittest
from unittest.mock import MagicMock, Mock, patch
from heurams.context import ConfigContext
from heurams.services.config import ConfigFile
from heurams.services.sync_service import (ConflictStrategy, SyncConfig,
SyncMode, SyncService)
class TestSyncServiceUnit(unittest.TestCase):
"""SyncService 的单元测试."""
def setUp(self):
"""在每个测试之前运行, 设置临时目录和模拟客户端."""
self.temp_dir = tempfile.TemporaryDirectory()
self.temp_path = pathlib.Path(self.temp_dir.name)
# 创建测试文件
self.test_file = self.temp_path / "test.txt"
self.test_file.write_text("测试内容")
# 模拟 WebDAV 客户端
self.mock_client = MagicMock()
# 创建同步配置
self.config = SyncConfig(
enabled=True,
url="https://example.com/dav/",
username="test",
password="test",
remote_path="/heurams/",
sync_mode=SyncMode.BIDIRECTIONAL,
conflict_strategy=ConflictStrategy.NEWER,
verify_ssl=True,
)
def tearDown(self):
"""在每个测试之后清理."""
self.temp_dir.cleanup()
@patch("heurams.services.sync_service.Client")
def test_sync_service_initialization(self, mock_client_class):
"""测试同步服务初始化."""
mock_client_class.return_value = self.mock_client
service = SyncService(self.config)
# 验证客户端已创建
mock_client_class.assert_called_once()
self.assertIsNotNone(service.client)
self.assertEqual(service.config, self.config)
@patch("heurams.services.sync_service.Client")
def test_sync_service_disabled(self, mock_client_class):
"""测试同步服务未启用."""
config = SyncConfig(enabled=False)
service = SyncService(config)
# 客户端不应初始化
mock_client_class.assert_not_called()
self.assertIsNone(service.client)
@patch("heurams.services.sync_service.Client")
def test_test_connection_success(self, mock_client_class):
"""测试连接测试成功."""
mock_client_class.return_value = self.mock_client
self.mock_client.list.return_value = []
service = SyncService(self.config)
result = service.test_connection()
self.assertTrue(result)
self.mock_client.list.assert_called_once()
@patch("heurams.services.sync_service.Client")
def test_test_connection_failure(self, mock_client_class):
"""测试连接测试失败."""
mock_client_class.return_value = self.mock_client
self.mock_client.list.side_effect = Exception("连接失败")
service = SyncService(self.config)
result = service.test_connection()
self.assertFalse(result)
self.mock_client.list.assert_called_once()
@patch("heurams.services.sync_service.Client")
def test_upload_file(self, mock_client_class):
"""测试上传单个文件."""
mock_client_class.return_value = self.mock_client
service = SyncService(self.config)
result = service.upload_file(self.test_file)
self.assertTrue(result)
self.mock_client.upload_file.assert_called_once()
@patch("heurams.services.sync_service.Client")
def test_download_file(self, mock_client_class):
"""测试下载单个文件."""
mock_client_class.return_value = self.mock_client
service = SyncService(self.config)
remote_path = "/heurams/test.txt"
local_path = self.temp_path / "downloaded.txt"
result = service.download_file(remote_path, local_path)
self.assertTrue(result)
self.mock_client.download_file.assert_called_once()
self.assertTrue(local_path.parent.exists())
@patch("heurams.services.sync_service.Client")
def test_sync_directory_no_files(self, mock_client_class):
"""测试同步空目录."""
mock_client_class.return_value = self.mock_client
self.mock_client.list.return_value = []
self.mock_client.mkdir.return_value = None
service = SyncService(self.config)
result = service.sync_directory(self.temp_path)
self.assertTrue(result["success"])
self.assertEqual(result["uploaded"], 0)
self.assertEqual(result["downloaded"], 0)
self.mock_client.mkdir.assert_called_once()
@patch("heurams.services.sync_service.Client")
def test_sync_directory_upload_only(self, mock_client_class):
"""测试仅上传模式."""
mock_client_class.return_value = self.mock_client
self.mock_client.list.return_value = []
self.mock_client.mkdir.return_value = None
config = SyncConfig(
enabled=True,
url="https://example.com/dav/",
username="test",
password="test",
remote_path="/heurams/",
sync_mode=SyncMode.UPLOAD_ONLY,
conflict_strategy=ConflictStrategy.NEWER,
)
service = SyncService(config)
result = service.sync_directory(self.temp_path)
self.assertTrue(result["success"])
self.mock_client.mkdir.assert_called_once()
@patch("heurams.services.sync_service.Client")
def test_conflict_strategy_newer(self, mock_client_class):
"""测试 NEWER 冲突策略."""
mock_client_class.return_value = self.mock_client
# 模拟远程文件存在
self.mock_client.list.return_value = ["test.txt"]
self.mock_client.info.return_value = {
"size": 100,
"modified": "2023-01-01T00:00:00Z",
}
self.mock_client.mkdir.return_value = None
service = SyncService(self.config)
result = service.sync_directory(self.temp_path)
self.assertTrue(result["success"])
# 应该有一个冲突
self.assertGreaterEqual(result.get("conflicts", 0), 0)
@patch("heurams.services.sync_service.Client")
def test_create_sync_service_from_config(self, mock_client_class):
"""测试从配置文件创建同步服务."""
mock_client_class.return_value = self.mock_client
# 创建临时配置文件
config_data = {
"sync": {
"webdav": {
"enabled": True,
"url": "https://example.com/dav/",
"username": "test",
"password": "test",
"remote_path": "/heurams/",
"sync_mode": "bidirectional",
"conflict_strategy": "newer",
"verify_ssl": True,
}
}
}
# 模拟 config_var
with patch("heurams.services.sync_service.config_var") as mock_config_var:
mock_config = MagicMock()
mock_config.data = config_data
mock_config_var.get.return_value = mock_config
from heurams.services.sync_service import \
create_sync_service_from_config
service = create_sync_service_from_config()
self.assertIsNotNone(service)
self.assertIsNotNone(service.client)
class TestSyncScreenUnit(unittest.TestCase):
"""SyncScreen 的单元测试."""
def setUp(self):
"""在每个测试之前运行."""
self.temp_dir = tempfile.TemporaryDirectory()
self.temp_path = pathlib.Path(self.temp_dir.name)
# 创建默认配置
default_config_path = (
pathlib.Path(__file__).parent.parent.parent
/ "src/heurams/default/config/config.toml"
)
self.config = ConfigFile(default_config_path)
# 更新配置中的路径
config_data = self.config.data
config_data["paths"]["nucleon_dir"] = str(self.temp_path / "nucleon")
config_data["paths"]["electron_dir"] = str(self.temp_path / "electron")
config_data["paths"]["orbital_dir"] = str(self.temp_path / "orbital")
config_data["paths"]["cache_dir"] = str(self.temp_path / "cache")
# 添加同步配置
if "sync" not in config_data:
config_data["sync"] = {}
config_data["sync"]["webdav"] = {
"enabled": False,
"url": "",
"username": "",
"password": "",
"remote_path": "/heurams/",
"sync_mode": "bidirectional",
"conflict_strategy": "newer",
"verify_ssl": True,
}
# 创建目录
for dir_key in ["nucleon_dir", "electron_dir", "orbital_dir", "cache_dir"]:
pathlib.Path(config_data["paths"][dir_key]).mkdir(
parents=True, exist_ok=True
)
# 使用 ConfigContext 设置配置
self.config_ctx = ConfigContext(self.config)
self.config_ctx.__enter__()
def tearDown(self):
"""在每个测试之后清理."""
self.config_ctx.__exit__(None, None, None)
self.temp_dir.cleanup()
@patch("heurams.interface.screens.synctool.create_sync_service_from_config")
def test_sync_screen_compose(self, mock_create_service):
"""测试 SyncScreen 的 compose 方法."""
from heurams.interface.screens.synctool import SyncScreen
# 模拟同步服务
mock_service = MagicMock()
mock_service.client = MagicMock()
mock_create_service.return_value = mock_service
screen = SyncScreen()
# 测试 compose 方法
from textual.app import ComposeResult
result = screen.compose()
widgets = list(result)
# 检查基本部件
from textual.containers import ScrollableContainer
from textual.widgets import Button, Footer, Header, ProgressBar, Static
header_present = any(isinstance(w, Header) for w in widgets)
footer_present = any(isinstance(w, Footer) for w in widgets)
self.assertTrue(header_present)
self.assertTrue(footer_present)
# 检查容器
container_present = any(isinstance(w, ScrollableContainer) for w in widgets)
self.assertTrue(container_present)
@patch("heurams.interface.screens.synctool.create_sync_service_from_config")
def test_sync_screen_load_config(self, mock_create_service):
"""测试 SyncScreen 加载配置."""
from heurams.interface.screens.synctool import SyncScreen
mock_service = MagicMock()
mock_service.client = MagicMock()
mock_create_service.return_value = mock_service
screen = SyncScreen()
screen.load_config()
# 验证配置已加载
self.assertIsNotNone(screen.sync_config)
mock_create_service.assert_called_once()
if __name__ == "__main__":
unittest.main()