Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d347471cc0 | |||
| 142d30347b |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -20,7 +20,6 @@ data/global/
|
|||||||
data/orbital/
|
data/orbital/
|
||||||
config/config_dev.toml
|
config/config_dev.toml
|
||||||
AGENTS.md
|
AGENTS.md
|
||||||
*.log.1
|
|
||||||
|
|
||||||
# Byte-compiled / optimized / DLL files
|
# Byte-compiled / optimized / DLL files
|
||||||
__pycache__/
|
__pycache__/
|
||||||
|
|||||||
21
README.md
21
README.md
@@ -30,7 +30,6 @@
|
|||||||
- 自然语音: 集成微软神经网络文本转语音 (TTS) 技术
|
- 自然语音: 集成微软神经网络文本转语音 (TTS) 技术
|
||||||
- 多种谜题类型: 选择题 (MCQ)、填空题 (Cloze)、识别题 (Recognition)
|
- 多种谜题类型: 选择题 (MCQ)、填空题 (Cloze)、识别题 (Recognition)
|
||||||
- 动态内容生成: 支持宏驱动的模板系统, 根据上下文动态生成题目
|
- 动态内容生成: 支持宏驱动的模板系统, 根据上下文动态生成题目
|
||||||
- 云同步支持: 通过 WebDAV 协议同步数据到远程服务器
|
|
||||||
|
|
||||||
### 实用用户界面
|
### 实用用户界面
|
||||||
- 响应式 Textual 框架构建的跨平台 TUI 界面
|
- 响应式 Textual 框架构建的跨平台 TUI 界面
|
||||||
@@ -85,22 +84,6 @@ python -m heurams.interface
|
|||||||
|
|
||||||
配置文件位于 `config/config.toml`(相对于工作目录). 如果不存在, 会使用内置的默认配置.
|
配置文件位于 `config/config.toml`(相对于工作目录). 如果不存在, 会使用内置的默认配置.
|
||||||
|
|
||||||
### 同步配置
|
|
||||||
同步功能支持 WebDAV 协议,可在配置文件的 `[sync.webdav]` 段进行配置:
|
|
||||||
```toml
|
|
||||||
[sync.webdav]
|
|
||||||
enabled = false
|
|
||||||
url = "" # WebDAV 服务器地址
|
|
||||||
username = "" # 用户名
|
|
||||||
password = "" # 密码
|
|
||||||
remote_path = "/heurams/" # 远程路径
|
|
||||||
sync_mode = "bidirectional" # 同步模式: bidirectional/upload_only/download_only
|
|
||||||
conflict_strategy = "newer" # 冲突策略: newer/ask/keep_both
|
|
||||||
verify_ssl = true # SSL 证书验证
|
|
||||||
```
|
|
||||||
|
|
||||||
启用同步后,可通过应用内的同步工具进行数据备份和恢复。
|
|
||||||
|
|
||||||
## 项目结构
|
## 项目结构
|
||||||
|
|
||||||
### 架构图
|
### 架构图
|
||||||
@@ -121,7 +104,6 @@ graph TB
|
|||||||
Timer[时间服务]
|
Timer[时间服务]
|
||||||
AudioService[音频服务]
|
AudioService[音频服务]
|
||||||
TTSService[TTS服务]
|
TTSService[TTS服务]
|
||||||
SyncService[同步服务]
|
|
||||||
OtherServices[其他服务]
|
OtherServices[其他服务]
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -174,8 +156,7 @@ src/heurams/
|
|||||||
│ ├── logger.py # 日志系统
|
│ ├── logger.py # 日志系统
|
||||||
│ ├── timer.py # 时间服务
|
│ ├── timer.py # 时间服务
|
||||||
│ ├── audio_service.py # 音频播放抽象
|
│ ├── audio_service.py # 音频播放抽象
|
||||||
│ ├── tts_service.py # 文本转语音抽象
|
│ └── tts_service.py # 文本转语音抽象
|
||||||
│ └── sync_service.py # WebDAV 同步服务
|
|
||||||
├── kernel/ # 核心业务逻辑
|
├── kernel/ # 核心业务逻辑
|
||||||
│ ├── algorithms/ # 间隔重复算法 (FSRS, SM2)
|
│ ├── algorithms/ # 间隔重复算法 (FSRS, SM2)
|
||||||
│ ├── particles/ # 数据模型 (Atom, Electron, Nucleon, Orbital)
|
│ ├── particles/ # 数据模型 (Atom, Electron, Nucleon, Orbital)
|
||||||
|
|||||||
@@ -42,7 +42,6 @@ template_dir = "./data/template"
|
|||||||
audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Android Termux), mpg123(TODO)
|
audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Android Termux), mpg123(TODO)
|
||||||
tts = "edgetts" # 可选项: edgetts
|
tts = "edgetts" # 可选项: edgetts
|
||||||
llm = "openai" # 可选项: openai
|
llm = "openai" # 可选项: openai
|
||||||
sync = "webdav" # 可选项: 留空, webdav
|
|
||||||
|
|
||||||
[providers.tts.edgetts] # EdgeTTS 设置
|
[providers.tts.edgetts] # EdgeTTS 设置
|
||||||
voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声)
|
voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声)
|
||||||
@@ -50,12 +49,3 @@ voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-
|
|||||||
[providers.llm.openai] # 与 OpenAI 相容的语言模型接口服务设置
|
[providers.llm.openai] # 与 OpenAI 相容的语言模型接口服务设置
|
||||||
url = ""
|
url = ""
|
||||||
key = ""
|
key = ""
|
||||||
|
|
||||||
[providers.sync.webdav] # WebDAV 同步设置
|
|
||||||
url = ""
|
|
||||||
username = ""
|
|
||||||
password = ""
|
|
||||||
remote_path = "/heurams/"
|
|
||||||
verify_ssl = true
|
|
||||||
|
|
||||||
[sync]
|
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
|
|||||||
|
|
||||||
[project]
|
[project]
|
||||||
name = "heurams"
|
name = "heurams"
|
||||||
version = "0.4.3"
|
version = "0.4.0"
|
||||||
description = "Heuristic Assisted Memory Scheduler"
|
description = "Heuristic Assisted Memory Scheduler"
|
||||||
license = {file = "LICENSE"}
|
license = {file = "LICENSE"}
|
||||||
classifiers = [
|
classifiers = [
|
||||||
|
|||||||
@@ -2,5 +2,3 @@ bidict==0.23.1
|
|||||||
playsound==1.2.2
|
playsound==1.2.2
|
||||||
textual==5.3.0
|
textual==5.3.0
|
||||||
toml==0.10.2
|
toml==0.10.2
|
||||||
requests>=2.31.0
|
|
||||||
webdavclient3>=3.0.0
|
|
||||||
|
|||||||
@@ -32,8 +32,7 @@ try:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
print("未能加载自定义用户配置")
|
print("未能加载自定义用户配置")
|
||||||
logger.warning("未能加载自定义用户配置, 错误: %s", e)
|
logger.warning("未能加载自定义用户配置, 错误: %s", e)
|
||||||
if pathlib.Path(workdir / "config" / "config_dev.toml").exists():
|
if pathlib.Path(rootdir / "default" / "config" / "config_dev.toml").exists():
|
||||||
print("使用开发设置")
|
|
||||||
logger.debug("使用开发设置")
|
logger.debug("使用开发设置")
|
||||||
config_var: ContextVar[ConfigFile] = ContextVar(
|
config_var: ContextVar[ConfigFile] = ContextVar(
|
||||||
"config_var", default=ConfigFile(workdir / "config" / "config_dev.toml")
|
"config_var", default=ConfigFile(workdir / "config" / "config_dev.toml")
|
||||||
|
|||||||
@@ -14,14 +14,6 @@ scheduled_num = 8
|
|||||||
# UTC 时间戳修正 仅用于 UNIX 日时间戳的生成修正, 单位为秒
|
# UTC 时间戳修正 仅用于 UNIX 日时间戳的生成修正, 单位为秒
|
||||||
timezone_offset = +28800 # 中国标准时间 (UTC+8)
|
timezone_offset = +28800 # 中国标准时间 (UTC+8)
|
||||||
|
|
||||||
[interface]
|
|
||||||
|
|
||||||
[interface.memorizor]
|
|
||||||
autovoice = true # 自动语音播放, 仅限于 recognition 组件
|
|
||||||
|
|
||||||
[algorithm]
|
|
||||||
default = "SM-2" # 主要算法; 可选项: SM-2, SM-15M, FSRS
|
|
||||||
|
|
||||||
[puzzles] # 谜题默认配置
|
[puzzles] # 谜题默认配置
|
||||||
|
|
||||||
[puzzles.mcq]
|
[puzzles.mcq]
|
||||||
@@ -33,7 +25,6 @@ min_denominator = 3
|
|||||||
[paths] # 相对于配置文件的 ".." (即工作目录) 而言 或绝对路径
|
[paths] # 相对于配置文件的 ".." (即工作目录) 而言 或绝对路径
|
||||||
nucleon_dir = "./data/nucleon"
|
nucleon_dir = "./data/nucleon"
|
||||||
electron_dir = "./data/electron"
|
electron_dir = "./data/electron"
|
||||||
global_dir = "./data/global" # 全局数据路径, SM-15 等算法需要
|
|
||||||
orbital_dir = "./data/orbital"
|
orbital_dir = "./data/orbital"
|
||||||
cache_dir = "./data/cache"
|
cache_dir = "./data/cache"
|
||||||
template_dir = "./data/template"
|
template_dir = "./data/template"
|
||||||
@@ -42,20 +33,7 @@ template_dir = "./data/template"
|
|||||||
audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Android Termux), mpg123(TODO)
|
audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Android Termux), mpg123(TODO)
|
||||||
tts = "edgetts" # 可选项: edgetts
|
tts = "edgetts" # 可选项: edgetts
|
||||||
llm = "openai" # 可选项: openai
|
llm = "openai" # 可选项: openai
|
||||||
sync = "webdav" # 可选项: 留空, webdav
|
|
||||||
|
|
||||||
[providers.tts.edgetts] # EdgeTTS 设置
|
|
||||||
voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声)
|
|
||||||
|
|
||||||
[providers.llm.openai] # 与 OpenAI 相容的语言模型接口服务设置
|
[providers.llm.openai] # 与 OpenAI 相容的语言模型接口服务设置
|
||||||
url = ""
|
url = ""
|
||||||
key = ""
|
key = ""
|
||||||
|
|
||||||
[providers.sync.webdav] # WebDAV 同步设置
|
|
||||||
url = ""
|
|
||||||
username = ""
|
|
||||||
password = ""
|
|
||||||
remote_path = "/heurams/"
|
|
||||||
verify_ssl = true
|
|
||||||
|
|
||||||
[sync]
|
|
||||||
|
|||||||
@@ -8,7 +8,6 @@ from .screens.about import AboutScreen
|
|||||||
from .screens.dashboard import DashboardScreen
|
from .screens.dashboard import DashboardScreen
|
||||||
from .screens.nucreator import NucleonCreatorScreen
|
from .screens.nucreator import NucleonCreatorScreen
|
||||||
from .screens.precache import PrecachingScreen
|
from .screens.precache import PrecachingScreen
|
||||||
from .screens.synctool import SyncScreen
|
|
||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
@@ -40,14 +39,12 @@ class HeurAMSApp(App):
|
|||||||
("1", "app.push_screen('dashboard')", "仪表盘"),
|
("1", "app.push_screen('dashboard')", "仪表盘"),
|
||||||
("2", "app.push_screen('precache_all')", "缓存管理器"),
|
("2", "app.push_screen('precache_all')", "缓存管理器"),
|
||||||
("3", "app.push_screen('nucleon_creator')", "创建新单元"),
|
("3", "app.push_screen('nucleon_creator')", "创建新单元"),
|
||||||
# ("4", "app.push_screen('synctool')", "同步工具"),
|
|
||||||
("0", "app.push_screen('about')", "版本信息"),
|
("0", "app.push_screen('about')", "版本信息"),
|
||||||
]
|
]
|
||||||
SCREENS = {
|
SCREENS = {
|
||||||
"dashboard": DashboardScreen,
|
"dashboard": DashboardScreen,
|
||||||
"nucleon_creator": NucleonCreatorScreen,
|
"nucleon_creator": NucleonCreatorScreen,
|
||||||
"precache_all": PrecachingScreen,
|
"precache_all": PrecachingScreen,
|
||||||
"synctool": SyncScreen,
|
|
||||||
"about": AboutScreen,
|
"about": AboutScreen,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,9 +1,9 @@
|
|||||||
from textual.app import App
|
from textual.app import App
|
||||||
from textual.widgets import Button
|
from textual.widgets import Button
|
||||||
|
|
||||||
|
from heurams.services.logger import get_logger
|
||||||
from heurams.context import config_var
|
from heurams.context import config_var
|
||||||
from heurams.interface import HeurAMSApp
|
from heurams.interface import HeurAMSApp
|
||||||
from heurams.services.logger import get_logger
|
|
||||||
|
|
||||||
from .screens.about import AboutScreen
|
from .screens.about import AboutScreen
|
||||||
from .screens.dashboard import DashboardScreen
|
from .screens.dashboard import DashboardScreen
|
||||||
|
|||||||
@@ -31,8 +31,7 @@ class AboutScreen(Screen):
|
|||||||
|
|
||||||
特别感谢:
|
特别感谢:
|
||||||
|
|
||||||
- [Piotr A. Woźniak](https://supermemo.guru/wiki/Piotr_Wozniak): SM-2 算法与 SM-15 算法理论
|
- [Piotr A. Woźniak](https://supermemo.guru/wiki/Piotr_Wozniak): SuperMemo-2 算法
|
||||||
- [Kazuaki Tanida](https://github.com/slaypni): SM-15 算法的 CoffeeScript 实现
|
|
||||||
- [Thoughts Memo](https://www.zhihu.com/people/L.M.Sherlock): 文献参考
|
- [Thoughts Memo](https://www.zhihu.com/people/L.M.Sherlock): 文献参考
|
||||||
|
|
||||||
# 参与贡献
|
# 参与贡献
|
||||||
|
|||||||
@@ -20,188 +20,128 @@ logger = get_logger(__name__)
|
|||||||
|
|
||||||
|
|
||||||
class DashboardScreen(Screen):
|
class DashboardScreen(Screen):
|
||||||
"""主仪表盘屏幕"""
|
|
||||||
|
|
||||||
SUB_TITLE = "仪表盘"
|
SUB_TITLE = "仪表盘"
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
name: str | None = None,
|
|
||||||
id: str | None = None,
|
|
||||||
classes: str | None = None,
|
|
||||||
) -> None:
|
|
||||||
super().__init__(name, id, classes)
|
|
||||||
self.nextdates = {}
|
|
||||||
self.texts = {}
|
|
||||||
self.stay_enabled = {}
|
|
||||||
|
|
||||||
def compose(self) -> ComposeResult:
|
def compose(self) -> ComposeResult:
|
||||||
"""组合界面组件"""
|
|
||||||
yield Header(show_clock=True)
|
yield Header(show_clock=True)
|
||||||
yield ScrollableContainer(
|
yield ScrollableContainer(
|
||||||
Label('欢迎使用 "潜进" 启发式辅助记忆调度器', classes="title-label"),
|
Label(f'欢迎使用 "潜进" 启发式辅助记忆调度器', classes="title-label"),
|
||||||
Label(f"当前 UNIX 日时间戳: {timer.get_daystamp()}"),
|
Label(f"当前 UNIX 日时间戳: {timer.get_daystamp()}"),
|
||||||
Label(f'时区修正: UTC+{config_var.get()["timezone_offset"] / 3600}'),
|
Label(f'时区修正: UTC+{config_var.get()["timezone_offset"] / 3600}'),
|
||||||
Label(f"使用算法: {config_var.get()['algorithm']['default']}"),
|
|
||||||
Label("选择待学习或待修改的记忆单元集:", classes="title-label"),
|
Label("选择待学习或待修改的记忆单元集:", classes="title-label"),
|
||||||
ListView(id="union-list", classes="union-list-view"),
|
ListView(id="union-list", classes="union-list-view"),
|
||||||
Label(
|
Label(
|
||||||
f'"潜进" 启发式辅助记忆调度器 | 版本 {version.ver} '
|
f'"潜进" 启发式辅助记忆调度器 | 版本 {version.ver} {version.codename.capitalize()} 2025'
|
||||||
f"{version.codename.capitalize()} 2025"
|
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
yield Footer()
|
yield Footer()
|
||||||
|
|
||||||
def analyser(self, filename: str) -> dict:
|
def item_desc_generator(self, filename) -> dict:
|
||||||
"""分析文件状态以生成显示文本
|
"""简单分析以生成项目项显示文本
|
||||||
|
|
||||||
Args:
|
|
||||||
filename: 要分析的文件名
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
dict: 包含显示文本的字典,键为行号
|
dict: 以数字为列表, 分别呈现单行字符串
|
||||||
"""
|
"""
|
||||||
from heurams.kernel.particles.loader import load_electron, load_nucleon
|
res = dict()
|
||||||
|
|
||||||
result = {}
|
|
||||||
filestem = pathlib.Path(filename).stem
|
filestem = pathlib.Path(filename).stem
|
||||||
|
res[0] = f"{filename}\0"
|
||||||
|
import heurams.kernel.particles as pt
|
||||||
|
from heurams.kernel.particles.loader import load_electron
|
||||||
|
|
||||||
# 构建电子文件路径
|
electron_file_path = pathlib.Path(config_var.get()["paths"]["electron_dir"]) / (
|
||||||
electron_dir = config_var.get()["paths"]["electron_dir"]
|
filestem + ".json"
|
||||||
electron_file_path = pathlib.Path(electron_dir) / f"{filestem}.json"
|
)
|
||||||
|
|
||||||
logger.debug(f"电子文件路径: {electron_file_path}")
|
logger.debug(f"电子文件路径: {electron_file_path}")
|
||||||
|
|
||||||
# 确保电子文件存在
|
if electron_file_path.exists(): # 未找到则创建电子文件 (json)
|
||||||
if not electron_file_path.exists():
|
pass
|
||||||
|
else:
|
||||||
electron_file_path.touch()
|
electron_file_path.touch()
|
||||||
electron_file_path.write_text("{}")
|
with open(electron_file_path, "w") as f:
|
||||||
|
f.write("{}")
|
||||||
# 加载电子数据
|
electron_dict = load_electron(path=electron_file_path) # TODO: 取消硬编码扩展名
|
||||||
electron_dict = load_electron(path=electron_file_path)
|
logger.debug(electron_dict)
|
||||||
logger.debug(f"电子数据: {electron_dict}")
|
|
||||||
|
|
||||||
# 分析电子状态
|
|
||||||
is_due = 0
|
is_due = 0
|
||||||
is_activated = 0
|
is_activated = 0
|
||||||
nextdate = 0x3F3F3F3F
|
nextdate = 0x3F3F3F3F
|
||||||
|
for i in electron_dict.values():
|
||||||
for electron in electron_dict.values():
|
i: pt.Electron
|
||||||
logger.debug(f"{electron}, 是否到期: {electron.is_due()}")
|
logger.debug(i, i.is_due())
|
||||||
|
if i.is_due():
|
||||||
if electron.is_due():
|
|
||||||
is_due = 1
|
is_due = 1
|
||||||
if electron.is_activated():
|
if i.is_activated():
|
||||||
is_activated = 1
|
is_activated = 1
|
||||||
nextdate = min(nextdate, electron.nextdate())
|
nextdate = min(nextdate, i.nextdate())
|
||||||
|
res[1] = f"下一次复习: {nextdate}\n"
|
||||||
# 检查是否需要更多复习
|
res[1] += f"{"需要复习" if is_due else "当前无需复习"}"
|
||||||
nucleon_dir = config_var.get()["paths"]["nucleon_dir"]
|
|
||||||
nucleon_path = pathlib.Path(nucleon_dir) / f"{filestem}.toml"
|
|
||||||
nucleon_count = len(load_nucleon(nucleon_path))
|
|
||||||
electron_count = len(electron_dict)
|
|
||||||
is_more = not (electron_count >= nucleon_count)
|
|
||||||
|
|
||||||
logger.debug(f"是否需要更多复习: {is_more}")
|
|
||||||
|
|
||||||
# 更新状态
|
|
||||||
self.nextdates[filename] = nextdate
|
|
||||||
self.stay_enabled[filename] = is_due or is_more
|
|
||||||
|
|
||||||
# 构建返回结果
|
|
||||||
result[0] = f"{filename}\0"
|
|
||||||
|
|
||||||
if not is_activated:
|
if not is_activated:
|
||||||
result[1] = " 尚未激活"
|
res[1] = " 尚未激活"
|
||||||
else:
|
return res
|
||||||
status_text = "需要复习" if is_due else "当前无需复习"
|
|
||||||
result[1] = f"下一次复习: {nextdate}\n{status_text}"
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
||||||
def on_mount(self) -> None:
|
def on_mount(self) -> None:
|
||||||
"""挂载组件时初始化"""
|
|
||||||
union_list_widget = self.query_one("#union-list", ListView)
|
union_list_widget = self.query_one("#union-list", ListView)
|
||||||
|
|
||||||
probe = probe_all(0)
|
probe = probe_all(0)
|
||||||
|
|
||||||
# 分析所有文件
|
if len(probe["nucleon"]):
|
||||||
for file in probe["nucleon"]:
|
for file in probe["nucleon"]:
|
||||||
self.texts[file] = self.analyser(file)
|
text = self.item_desc_generator(file)
|
||||||
|
union_list_widget.append(
|
||||||
# 按下次复习时间排序
|
ListItem(
|
||||||
nucleon_files = sorted(
|
Label(text[0] + "\n" + text[1]),
|
||||||
probe["nucleon"],
|
|
||||||
key=lambda f: self.nextdates[f],
|
|
||||||
reverse=True,
|
|
||||||
)
|
)
|
||||||
|
)
|
||||||
# 填充列表
|
else:
|
||||||
if not probe["nucleon"]:
|
|
||||||
union_list_widget.append(
|
union_list_widget.append(
|
||||||
ListItem(
|
ListItem(
|
||||||
Static(
|
Static(
|
||||||
"在 ./nucleon/ 中未找到任何内容源数据文件。\n"
|
"在 ./nucleon/ 中未找到任何内容源数据文件.\n请放置文件后重启应用.\n或者新建空的单元集."
|
||||||
"请放置文件后重启应用,或者新建空的单元集。"
|
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
union_list_widget.disabled = True
|
union_list_widget.disabled = True
|
||||||
return
|
|
||||||
|
|
||||||
for file in nucleon_files:
|
|
||||||
text = self.texts[file]
|
|
||||||
list_item = ListItem(Label(f"{text[0]}\n{text[1]}"))
|
|
||||||
union_list_widget.append(list_item)
|
|
||||||
|
|
||||||
if not self.stay_enabled[file]:
|
|
||||||
list_item.disabled = True
|
|
||||||
|
|
||||||
def on_list_view_selected(self, event) -> None:
|
def on_list_view_selected(self, event) -> None:
|
||||||
"""处理列表项选择事件"""
|
|
||||||
if not isinstance(event.item, ListItem):
|
if not isinstance(event.item, ListItem):
|
||||||
return
|
return
|
||||||
|
|
||||||
selected_label = event.item.query_one(Label)
|
selected_label = event.item.query_one(Label)
|
||||||
label_text = str(selected_label.renderable)
|
if "未找到任何 .toml 文件" in str(selected_label.renderable): # type: ignore
|
||||||
|
|
||||||
if "未找到任何 .toml 文件" in label_text:
|
|
||||||
return
|
return
|
||||||
|
|
||||||
# 提取文件名
|
selected_filename = pathlib.Path(
|
||||||
selected_filename = pathlib.Path(label_text.partition("\0")[0].replace("*", ""))
|
str(selected_label.renderable)
|
||||||
|
.partition("\0")[0] # 文件名末尾截断, 保留文件名
|
||||||
|
.replace("*", "")
|
||||||
|
) # 去除markdown加粗
|
||||||
|
|
||||||
# 构建文件路径
|
nucleon_file_path = (
|
||||||
nucleon_dir = config_var.get()["paths"]["nucleon_dir"]
|
pathlib.Path(config_var.get()["paths"]["nucleon_dir"]) / selected_filename
|
||||||
electron_dir = config_var.get()["paths"]["electron_dir"]
|
)
|
||||||
|
electron_file_path = pathlib.Path(config_var.get()["paths"]["electron_dir"]) / (
|
||||||
nucleon_file_path = pathlib.Path(nucleon_dir) / selected_filename
|
str(selected_filename.stem) + ".json"
|
||||||
electron_file_path = (
|
|
||||||
pathlib.Path(electron_dir) / f"{selected_filename.stem}.json"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# 跳转到准备屏幕
|
|
||||||
self.app.push_screen(PreparationScreen(nucleon_file_path, electron_file_path))
|
self.app.push_screen(PreparationScreen(nucleon_file_path, electron_file_path))
|
||||||
|
|
||||||
def on_button_pressed(self, event) -> None:
|
def on_button_pressed(self, event) -> None:
|
||||||
"""处理按钮点击事件"""
|
if event.button.id == "new_nucleon_button":
|
||||||
button_id = event.button.id
|
# 切换到创建单元
|
||||||
|
|
||||||
if button_id == "new_nucleon_button":
|
|
||||||
from .nucreator import NucleonCreatorScreen
|
from .nucreator import NucleonCreatorScreen
|
||||||
|
|
||||||
new_screen = NucleonCreatorScreen()
|
newscr = NucleonCreatorScreen()
|
||||||
self.app.push_screen(new_screen)
|
self.app.push_screen(newscr)
|
||||||
|
elif event.button.id == "precache_all_button":
|
||||||
elif button_id == "precache_all_button":
|
# 切换到缓存管理器
|
||||||
from .precache import PrecachingScreen
|
from .precache import PrecachingScreen
|
||||||
|
|
||||||
precache_screen = PrecachingScreen()
|
precache_screen = PrecachingScreen()
|
||||||
self.app.push_screen(precache_screen)
|
self.app.push_screen(precache_screen)
|
||||||
|
elif event.button.id == "about_button":
|
||||||
|
from .about import AboutScreen
|
||||||
|
|
||||||
elif button_id == "about_button":
|
|
||||||
about_screen = AboutScreen()
|
about_screen = AboutScreen()
|
||||||
self.app.push_screen(about_screen)
|
self.app.push_screen(about_screen)
|
||||||
|
|
||||||
def action_quit_app(self) -> None:
|
def action_quit_app(self) -> None:
|
||||||
"""退出应用程序"""
|
|
||||||
self.app.exit()
|
self.app.exit()
|
||||||
|
|||||||
@@ -148,24 +148,16 @@ class MemScreen(Screen):
|
|||||||
|
|
||||||
def play_voice(self):
|
def play_voice(self):
|
||||||
"""朗读当前内容"""
|
"""朗读当前内容"""
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
from heurams.services.audio_service import play_by_path
|
from heurams.services.audio_service import play_by_path
|
||||||
|
from pathlib import Path
|
||||||
from heurams.services.hasher import get_md5
|
from heurams.services.hasher import get_md5
|
||||||
|
path = Path(config_var.get()['paths']["cache_dir"])
|
||||||
path = Path(config_var.get()["paths"]["cache_dir"])
|
path = path / f"{get_md5(self.atom.registry['nucleon'].metadata["formation"]["tts_text"])}.wav"
|
||||||
path = (
|
|
||||||
path
|
|
||||||
/ f"{get_md5(self.atom.registry['nucleon'].metadata["formation"]["tts_text"])}.wav"
|
|
||||||
)
|
|
||||||
if path.exists():
|
if path.exists():
|
||||||
play_by_path(path)
|
play_by_path(path)
|
||||||
else:
|
else:
|
||||||
from heurams.services.tts_service import convertor
|
from heurams.services.tts_service import convertor
|
||||||
|
convertor(self.atom.registry['nucleon'].metadata["formation"]["tts_text"], path)
|
||||||
convertor(
|
|
||||||
self.atom.registry["nucleon"].metadata["formation"]["tts_text"], path
|
|
||||||
)
|
|
||||||
play_by_path(path)
|
play_by_path(path)
|
||||||
|
|
||||||
def action_toggle_dark(self):
|
def action_toggle_dark(self):
|
||||||
|
|||||||
@@ -99,7 +99,6 @@ class PrecachingScreen(Screen):
|
|||||||
if not cache_file.exists():
|
if not cache_file.exists():
|
||||||
try:
|
try:
|
||||||
from heurams.services.tts_service import convertor
|
from heurams.services.tts_service import convertor
|
||||||
|
|
||||||
convertor(text, cache_file)
|
convertor(text, cache_file)
|
||||||
return 1
|
return 1
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
import pathlib
|
import pathlib
|
||||||
import time
|
|
||||||
|
|
||||||
from textual.app import ComposeResult
|
from textual.app import ComposeResult
|
||||||
from textual.containers import Horizontal, ScrollableContainer
|
from textual.containers import Horizontal, ScrollableContainer
|
||||||
@@ -19,287 +18,22 @@ class SyncScreen(Screen):
|
|||||||
|
|
||||||
def __init__(self, nucleons: list = [], desc: str = ""):
|
def __init__(self, nucleons: list = [], desc: str = ""):
|
||||||
super().__init__(name=None, id=None, classes=None)
|
super().__init__(name=None, id=None, classes=None)
|
||||||
self.sync_service = None
|
|
||||||
self.is_syncing = False
|
|
||||||
self.is_paused = False
|
|
||||||
self.log_messages = []
|
|
||||||
self.max_log_lines = 50
|
|
||||||
|
|
||||||
def compose(self) -> ComposeResult:
|
def compose(self) -> ComposeResult:
|
||||||
yield Header(show_clock=True)
|
yield Header(show_clock=True)
|
||||||
with ScrollableContainer(id="sync_container"):
|
with ScrollableContainer(id="sync_container"):
|
||||||
# 标题和连接状态
|
pass
|
||||||
yield Static("同步工具", classes="title")
|
|
||||||
yield Static("", id="status_label", classes="status")
|
|
||||||
|
|
||||||
# 配置信息
|
|
||||||
yield Static(f"同步协议: {config_var.get()['services']['sync']}")
|
|
||||||
yield Static("服务器配置:", classes="section_title")
|
|
||||||
with Horizontal(classes="config_info"):
|
|
||||||
yield Static("远程服务器:", classes="config_label")
|
|
||||||
yield Static("", id="server_url", classes="config_value")
|
|
||||||
with Horizontal(classes="config_info"):
|
|
||||||
yield Static("远程路径:", classes="config_label")
|
|
||||||
yield Static("", id="remote_path", classes="config_value")
|
|
||||||
|
|
||||||
with Horizontal(classes="control_buttons"):
|
|
||||||
yield Button("测试连接", id="test_connection", variant="primary")
|
|
||||||
yield Button("开始同步", id="start_sync", variant="success")
|
|
||||||
yield Button("暂停", id="pause_sync", variant="warning", disabled=True)
|
|
||||||
yield Button("取消", id="cancel_sync", variant="error", disabled=True)
|
|
||||||
|
|
||||||
yield Static("同步进度", classes="section_title")
|
|
||||||
yield ProgressBar(id="progress_bar", show_percentage=True, total=100)
|
|
||||||
yield Static("", id="progress_label", classes="progress_text")
|
|
||||||
|
|
||||||
yield Static("同步日志", classes="section_title")
|
|
||||||
yield Static("", id="log_output", classes="log_output")
|
|
||||||
|
|
||||||
yield Footer()
|
yield Footer()
|
||||||
|
|
||||||
def on_mount(self):
|
def on_mount(self):
|
||||||
"""挂载时初始化状态"""
|
"""挂载时初始化状态"""
|
||||||
self.update_ui_from_config()
|
|
||||||
self.log_message("同步工具已启动")
|
|
||||||
|
|
||||||
def update_ui_from_config(self):
|
|
||||||
"""更新 UI 显示配置信息"""
|
|
||||||
try:
|
|
||||||
sync_cfg: dict = config_var.get()["providers"]["sync"]["webdav"]
|
|
||||||
# 更新服务器 URL
|
|
||||||
url = sync_cfg.get("url", "未配置")
|
|
||||||
url_widget = self.query_one("#server_url")
|
|
||||||
url_widget.update(url) # type: ignore
|
|
||||||
# 更新远程路径
|
|
||||||
remote_path = sync_cfg.get("remote_path", "/")
|
|
||||||
path_widget = self.query_one("#remote_path")
|
|
||||||
path_widget.update(remote_path) # type: ignore
|
|
||||||
|
|
||||||
# 更新状态标签
|
|
||||||
status_widget = self.query_one("#status_label")
|
|
||||||
if self.sync_service and self.sync_service.client:
|
|
||||||
status_widget.update("✅ 同步服务已就绪") # type: ignore
|
|
||||||
status_widget.add_class("ready")
|
|
||||||
else:
|
|
||||||
status_widget.update("❌ 同步服务未配置或未启用") # type: ignore
|
|
||||||
status_widget.add_class("error")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
self.log_message(f"更新 UI 失败: {e}", is_error=True)
|
|
||||||
|
|
||||||
def update_status(self, status, current_item="", progress=None):
|
def update_status(self, status, current_item="", progress=None):
|
||||||
"""更新状态显示"""
|
"""更新状态显示"""
|
||||||
try:
|
|
||||||
status_widget = self.query_one("#status_label")
|
|
||||||
status_widget.update(status) # type: ignore
|
|
||||||
|
|
||||||
if progress is not None:
|
|
||||||
progress_bar = self.query_one("#progress_bar")
|
|
||||||
progress_bar.progress = progress # type: ignore
|
|
||||||
|
|
||||||
progress_label = self.query_one("#progress_label")
|
|
||||||
progress_label.update(f"{progress}% - {current_item}" if current_item else f"{progress}%") # type: ignore
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
self.log_message(f"更新状态失败: {e}", is_error=True)
|
|
||||||
|
|
||||||
def log_message(self, message: str, is_error: bool = False):
|
|
||||||
"""添加日志消息并更新显示"""
|
|
||||||
timestamp = time.strftime("%H:%M:%S")
|
|
||||||
prefix = "[ERROR]" if is_error else "[INFO]"
|
|
||||||
log_line = f"{timestamp} {prefix} {message}"
|
|
||||||
|
|
||||||
self.log_messages.append(log_line)
|
|
||||||
# 保持日志行数不超过最大值
|
|
||||||
if len(self.log_messages) > self.max_log_lines:
|
|
||||||
self.log_messages = self.log_messages[-self.max_log_lines :]
|
|
||||||
|
|
||||||
# 更新日志显示
|
|
||||||
try:
|
|
||||||
log_widget = self.query_one("#log_output")
|
|
||||||
log_widget.update("\n".join(self.log_messages)) # type: ignore
|
|
||||||
except Exception:
|
|
||||||
pass # 如果组件未就绪,忽略错误
|
|
||||||
|
|
||||||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||||||
"""处理按钮点击事件"""
|
|
||||||
button_id = event.button.id
|
|
||||||
|
|
||||||
if button_id == "test_connection":
|
|
||||||
self.test_connection()
|
|
||||||
elif button_id == "start_sync":
|
|
||||||
self.start_sync()
|
|
||||||
elif button_id == "pause_sync":
|
|
||||||
self.pause_sync()
|
|
||||||
elif button_id == "cancel_sync":
|
|
||||||
self.cancel_sync()
|
|
||||||
|
|
||||||
event.stop()
|
event.stop()
|
||||||
|
|
||||||
def test_connection(self):
|
|
||||||
"""测试 WebDAV 服务器连接"""
|
|
||||||
if not self.sync_service:
|
|
||||||
self.log_message("同步服务未初始化,请检查配置", is_error=True)
|
|
||||||
self.update_status("❌ 同步服务未初始化")
|
|
||||||
return
|
|
||||||
|
|
||||||
self.log_message("正在测试 WebDAV 连接...")
|
|
||||||
self.update_status("正在测试连接...")
|
|
||||||
|
|
||||||
try:
|
|
||||||
success = self.sync_service.test_connection()
|
|
||||||
if success:
|
|
||||||
self.log_message("连接测试成功")
|
|
||||||
self.update_status("✅ 连接正常")
|
|
||||||
else:
|
|
||||||
self.log_message("连接测试失败", is_error=True)
|
|
||||||
self.update_status("❌ 连接失败")
|
|
||||||
except Exception as e:
|
|
||||||
self.log_message(f"连接测试异常: {e}", is_error=True)
|
|
||||||
self.update_status("❌ 连接异常")
|
|
||||||
|
|
||||||
def start_sync(self):
|
|
||||||
"""开始同步"""
|
|
||||||
if not self.sync_service:
|
|
||||||
self.log_message("同步服务未初始化,无法开始同步", is_error=True)
|
|
||||||
return
|
|
||||||
|
|
||||||
if self.is_syncing:
|
|
||||||
self.log_message("同步已在进行中", is_error=True)
|
|
||||||
return
|
|
||||||
|
|
||||||
self.is_syncing = True
|
|
||||||
self.is_paused = False
|
|
||||||
self.update_button_states()
|
|
||||||
|
|
||||||
self.log_message("开始同步数据...")
|
|
||||||
self.update_status("正在同步...", progress=0)
|
|
||||||
|
|
||||||
# 启动后台同步任务
|
|
||||||
self.run_worker(self.perform_sync, thread=True)
|
|
||||||
|
|
||||||
def perform_sync(self):
|
|
||||||
"""执行同步任务(在后台线程中运行)"""
|
|
||||||
worker = get_current_worker()
|
|
||||||
|
|
||||||
try:
|
|
||||||
# 获取需要同步的本地目录
|
|
||||||
from heurams.context import config_var
|
|
||||||
|
|
||||||
config = config_var.get()
|
|
||||||
paths = config.get("paths", {})
|
|
||||||
|
|
||||||
# 同步 nucleon 目录
|
|
||||||
nucleon_dir = pathlib.Path(paths.get("nucleon_dir", "./data/nucleon"))
|
|
||||||
if nucleon_dir.exists():
|
|
||||||
self.log_message(f"同步 nucleon 目录: {nucleon_dir}")
|
|
||||||
self.update_status(f"同步 nucleon 目录...", progress=10)
|
|
||||||
|
|
||||||
result = self.sync_service.sync_directory(nucleon_dir) # type: ignore
|
|
||||||
if result.get("success"):
|
|
||||||
self.log_message(
|
|
||||||
f"nucleon 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)} 个"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
self.log_message(
|
|
||||||
f"nucleon 同步失败: {result.get('error', '未知错误')}",
|
|
||||||
is_error=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
# 同步 electron 目录
|
|
||||||
electron_dir = pathlib.Path(paths.get("electron_dir", "./data/electron"))
|
|
||||||
if electron_dir.exists():
|
|
||||||
self.log_message(f"同步 electron 目录: {electron_dir}")
|
|
||||||
self.update_status(f"同步 electron 目录...", progress=60)
|
|
||||||
|
|
||||||
result = self.sync_service.sync_directory(electron_dir) # type: ignore
|
|
||||||
if result.get("success"):
|
|
||||||
self.log_message(
|
|
||||||
f"electron 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)} 个"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
self.log_message(
|
|
||||||
f"electron 同步失败: {result.get('error', '未知错误')}",
|
|
||||||
is_error=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
# 同步 orbital 目录(如果存在)
|
|
||||||
orbital_dir = pathlib.Path(paths.get("orbital_dir", "./data/orbital"))
|
|
||||||
if orbital_dir.exists():
|
|
||||||
self.log_message(f"同步 orbital 目录: {orbital_dir}")
|
|
||||||
self.update_status(f"同步 orbital 目录...", progress=80)
|
|
||||||
|
|
||||||
result = self.sync_service.sync_directory(orbital_dir) # type: ignore
|
|
||||||
if result.get("success"):
|
|
||||||
self.log_message(
|
|
||||||
f"orbital 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)} 个"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
self.log_message(
|
|
||||||
f"orbital 同步失败: {result.get('error', '未知错误')}",
|
|
||||||
is_error=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
# 同步完成
|
|
||||||
self.update_status("同步完成", progress=100)
|
|
||||||
self.log_message("所有目录同步完成")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
self.log_message(f"同步过程中发生错误: {e}", is_error=True)
|
|
||||||
self.update_status("同步失败")
|
|
||||||
finally:
|
|
||||||
# 重置同步状态
|
|
||||||
self.is_syncing = False
|
|
||||||
self.is_paused = False
|
|
||||||
self.update_button_states() # type: ignore
|
|
||||||
|
|
||||||
def pause_sync(self):
|
|
||||||
"""暂停同步"""
|
|
||||||
if not self.is_syncing:
|
|
||||||
return
|
|
||||||
|
|
||||||
self.is_paused = not self.is_paused
|
|
||||||
self.update_button_states()
|
|
||||||
|
|
||||||
if self.is_paused:
|
|
||||||
self.log_message("同步已暂停")
|
|
||||||
self.update_status("同步已暂停")
|
|
||||||
else:
|
|
||||||
self.log_message("同步已恢复")
|
|
||||||
self.update_status("正在同步...")
|
|
||||||
|
|
||||||
def cancel_sync(self):
|
|
||||||
"""取消同步"""
|
|
||||||
if not self.is_syncing:
|
|
||||||
return
|
|
||||||
|
|
||||||
self.is_syncing = False
|
|
||||||
self.is_paused = False
|
|
||||||
self.update_button_states()
|
|
||||||
|
|
||||||
self.log_message("同步已取消")
|
|
||||||
self.update_status("同步已取消")
|
|
||||||
|
|
||||||
def update_button_states(self):
|
|
||||||
"""更新按钮状态"""
|
|
||||||
try:
|
|
||||||
start_button = self.query_one("#start_sync")
|
|
||||||
pause_button = self.query_one("#pause_sync")
|
|
||||||
cancel_button = self.query_one("#cancel_sync")
|
|
||||||
|
|
||||||
if self.is_syncing:
|
|
||||||
start_button.disabled = True
|
|
||||||
pause_button.disabled = False
|
|
||||||
cancel_button.disabled = False
|
|
||||||
pause_button.label = "继续" if self.is_paused else "暂停" # type: ignore
|
|
||||||
else:
|
|
||||||
start_button.disabled = False
|
|
||||||
pause_button.disabled = True
|
|
||||||
cancel_button.disabled = True
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
self.log_message(f"更新按钮状态失败: {e}", is_error=True)
|
|
||||||
|
|
||||||
def action_go_back(self):
|
def action_go_back(self):
|
||||||
self.app.pop_screen()
|
self.app.pop_screen()
|
||||||
|
|
||||||
|
|||||||
@@ -50,8 +50,7 @@ class Recognition(BasePuzzleWidget):
|
|||||||
|
|
||||||
def compose(self):
|
def compose(self):
|
||||||
from heurams.context import config_var
|
from heurams.context import config_var
|
||||||
|
autovoice = config_var.get()['interface']['memorizor']['autovoice']
|
||||||
autovoice = config_var.get()["interface"]["memorizor"]["autovoice"]
|
|
||||||
if autovoice:
|
if autovoice:
|
||||||
self.screen.action_play_voice() # type: ignore
|
self.screen.action_play_voice() # type: ignore
|
||||||
cfg: RecognitionConfig = self.atom.registry["orbital"]["puzzles"][self.alia]
|
cfg: RecognitionConfig = self.atom.registry["orbital"]["puzzles"][self.alia]
|
||||||
@@ -73,7 +72,7 @@ class Recognition(BasePuzzleWidget):
|
|||||||
primary = cfg["primary"]
|
primary = cfg["primary"]
|
||||||
|
|
||||||
with Center():
|
with Center():
|
||||||
for i in cfg["top_dim"]:
|
for i in cfg['top_dim']:
|
||||||
yield Static(f"[dim]{i}[/]")
|
yield Static(f"[dim]{i}[/]")
|
||||||
yield Label("")
|
yield Label("")
|
||||||
|
|
||||||
|
|||||||
@@ -10,18 +10,15 @@ MIT 许可证
|
|||||||
import datetime
|
import datetime
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import pathlib
|
|
||||||
from typing import TypedDict
|
from typing import TypedDict
|
||||||
|
import pathlib
|
||||||
from heurams.context import config_var
|
from heurams.context import config_var
|
||||||
from heurams.kernel.algorithms.sm15m_calc import (MAX_AF, MIN_AF, NOTCH_AF,
|
from heurams.kernel.algorithms.sm15m_calc import (MAX_AF, MIN_AF, NOTCH_AF,
|
||||||
RANGE_AF, RANGE_REPETITION,
|
RANGE_AF, RANGE_REPETITION,
|
||||||
SM, THRESHOLD_RECALL, Item)
|
SM, THRESHOLD_RECALL, Item)
|
||||||
|
|
||||||
# 全局状态文件路径
|
# 全局状态文件路径
|
||||||
_GLOBAL_STATE_FILE = os.path.expanduser(
|
_GLOBAL_STATE_FILE = os.path.expanduser(pathlib.Path(config_var.get()['paths']['global_dir']) / 'sm15m_global_state.json')
|
||||||
pathlib.Path(config_var.get()["paths"]["global_dir"]) / "sm15m_global_state.json"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _get_global_sm():
|
def _get_global_sm():
|
||||||
|
|||||||
@@ -86,8 +86,8 @@ class Atom:
|
|||||||
# eval 环境设置
|
# eval 环境设置
|
||||||
def eval_with_env(s: str):
|
def eval_with_env(s: str):
|
||||||
default = config_var.get()["puzzles"]
|
default = config_var.get()["puzzles"]
|
||||||
payload = self.registry["nucleon"].payload
|
payload = self.registry['nucleon'].payload
|
||||||
metadata = self.registry["nucleon"].metadata
|
metadata = self.registry['nucleon'].metadata
|
||||||
eval_value = eval(s)
|
eval_value = eval(s)
|
||||||
if isinstance(eval_value, (int, float)):
|
if isinstance(eval_value, (int, float)):
|
||||||
ret = str(eval_value)
|
ret = str(eval_value)
|
||||||
@@ -117,11 +117,10 @@ class Atom:
|
|||||||
logger.debug("发现 eval 表达式: '%s'", data[5:])
|
logger.debug("发现 eval 表达式: '%s'", data[5:])
|
||||||
return modifier(data[5:])
|
return modifier(data[5:])
|
||||||
return data
|
return data
|
||||||
|
|
||||||
try:
|
try:
|
||||||
traverse(self.registry["nucleon"].payload, eval_with_env)
|
traverse(self.registry['nucleon'].payload, eval_with_env)
|
||||||
traverse(self.registry["nucleon"].metadata, eval_with_env)
|
traverse(self.registry['nucleon'].metadata, eval_with_env)
|
||||||
traverse(self.registry["orbital"], eval_with_env)
|
traverse(self.registry['orbital'], eval_with_env)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
ret = f"此 eval 实例发生错误: {e}"
|
ret = f"此 eval 实例发生错误: {e}"
|
||||||
logger.warning(ret)
|
logger.warning(ret)
|
||||||
|
|||||||
@@ -18,12 +18,9 @@ class Electron:
|
|||||||
algo: 使用的算法模块标识
|
algo: 使用的算法模块标识
|
||||||
"""
|
"""
|
||||||
if algo_name == "":
|
if algo_name == "":
|
||||||
algo_name = config_var.get()["algorithm"]["default"]
|
algo_name = config_var.get()['algorithm']['default']
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"创建 Electron 实例, ident: '%s', algo_name: '%s', algodata: %s",
|
"创建 Electron 实例, ident: '%s', algo_name: '%s', algodata: %s", ident, algo_name, algodata
|
||||||
ident,
|
|
||||||
algo_name,
|
|
||||||
algodata,
|
|
||||||
)
|
)
|
||||||
self.algodata = algodata
|
self.algodata = algodata
|
||||||
self.ident = ident
|
self.ident = ident
|
||||||
@@ -34,9 +31,7 @@ class Electron:
|
|||||||
self.algodata[self.algo.algo_name] = {}
|
self.algodata[self.algo.algo_name] = {}
|
||||||
logger.debug("算法键 '%s' 不存在, 已创建空字典", self.algo)
|
logger.debug("算法键 '%s' 不存在, 已创建空字典", self.algo)
|
||||||
if not self.algodata[self.algo.algo_name]:
|
if not self.algodata[self.algo.algo_name]:
|
||||||
logger.debug(
|
logger.debug(f"算法数据为空, 使用默认值初始化{self.algodata[self.algo.algo_name]}")
|
||||||
f"算法数据为空, 使用默认值初始化{self.algodata[self.algo.algo_name]}"
|
|
||||||
)
|
|
||||||
self._default_init(self.algo.defaults)
|
self._default_init(self.algo.defaults)
|
||||||
else:
|
else:
|
||||||
logger.debug("算法数据已存在, 跳过默认初始化")
|
logger.debug("算法数据已存在, 跳过默认初始化")
|
||||||
|
|||||||
@@ -2,8 +2,8 @@ import pathlib
|
|||||||
|
|
||||||
import edge_tts
|
import edge_tts
|
||||||
|
|
||||||
from heurams.context import config_var
|
|
||||||
from heurams.services.logger import get_logger
|
from heurams.services.logger import get_logger
|
||||||
|
from heurams.context import config_var
|
||||||
|
|
||||||
from .base import BaseTTS
|
from .base import BaseTTS
|
||||||
|
|
||||||
@@ -19,7 +19,7 @@ class EdgeTTS(BaseTTS):
|
|||||||
try:
|
try:
|
||||||
communicate = edge_tts.Communicate(
|
communicate = edge_tts.Communicate(
|
||||||
text,
|
text,
|
||||||
config_var.get()["providers"]["tts"]["edgetts"]["voice"],
|
config_var.get()['providers']['tts']['edgetts']["voice"],
|
||||||
)
|
)
|
||||||
logger.debug("EdgeTTS 通信对象创建成功, 正在保存音频")
|
logger.debug("EdgeTTS 通信对象创建成功, 正在保存音频")
|
||||||
communicate.save_sync(str(path))
|
communicate.save_sync(str(path))
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ from heurams.services.logger import get_logger
|
|||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
ver = "0.4.3"
|
ver = "0.4.2"
|
||||||
stage = "prototype"
|
stage = "prototype"
|
||||||
codename = "fledge" # 雏鸟, 0.4.x 版本
|
codename = "fledge" # 雏鸟, 0.4.x 版本
|
||||||
|
|
||||||
|
|||||||
@@ -1,20 +0,0 @@
|
|||||||
"""vfs.py
|
|
||||||
得益于 FSSpec, 无需实现大部分虚拟文件系统的 Providers
|
|
||||||
"""
|
|
||||||
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
import fsspec as fs
|
|
||||||
|
|
||||||
|
|
||||||
class VFSObject:
|
|
||||||
def __init__(self, protocol, base_url):
|
|
||||||
self.base_url = base_url
|
|
||||||
self.protocol = protocol
|
|
||||||
self.fs = fs.filesystem(protocol=protocol, base_url=base_url)
|
|
||||||
|
|
||||||
def open(self, path: Path):
|
|
||||||
return self.fs.open(path)
|
|
||||||
|
|
||||||
def open_by_list(self, path_list: list[Path]):
|
|
||||||
return self.fs.open_files(path_list)
|
|
||||||
@@ -89,7 +89,7 @@ class TestDashboardScreenUnit(unittest.TestCase):
|
|||||||
screen = DashboardScreen()
|
screen = DashboardScreen()
|
||||||
# 模拟一个文件名
|
# 模拟一个文件名
|
||||||
filename = "test.toml"
|
filename = "test.toml"
|
||||||
result = screen.analyser(filename)
|
result = screen.item_desc_generator(filename)
|
||||||
self.assertIsInstance(result, dict)
|
self.assertIsInstance(result, dict)
|
||||||
self.assertIn(0, result)
|
self.assertIn(0, result)
|
||||||
self.assertIn(1, result)
|
self.assertIn(1, result)
|
||||||
|
|||||||
@@ -1,314 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
"""
|
|
||||||
SyncScreen 和 SyncService 的测试.
|
|
||||||
"""
|
|
||||||
import pathlib
|
|
||||||
import tempfile
|
|
||||||
import time
|
|
||||||
import unittest
|
|
||||||
from unittest.mock import MagicMock, Mock, patch
|
|
||||||
|
|
||||||
from heurams.context import ConfigContext
|
|
||||||
from heurams.services.config import ConfigFile
|
|
||||||
from heurams.services.sync_service import (ConflictStrategy, SyncConfig,
|
|
||||||
SyncMode, SyncService)
|
|
||||||
|
|
||||||
|
|
||||||
class TestSyncServiceUnit(unittest.TestCase):
|
|
||||||
"""SyncService 的单元测试."""
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
"""在每个测试之前运行, 设置临时目录和模拟客户端."""
|
|
||||||
self.temp_dir = tempfile.TemporaryDirectory()
|
|
||||||
self.temp_path = pathlib.Path(self.temp_dir.name)
|
|
||||||
|
|
||||||
# 创建测试文件
|
|
||||||
self.test_file = self.temp_path / "test.txt"
|
|
||||||
self.test_file.write_text("测试内容")
|
|
||||||
|
|
||||||
# 模拟 WebDAV 客户端
|
|
||||||
self.mock_client = MagicMock()
|
|
||||||
|
|
||||||
# 创建同步配置
|
|
||||||
self.config = SyncConfig(
|
|
||||||
enabled=True,
|
|
||||||
url="https://example.com/dav/",
|
|
||||||
username="test",
|
|
||||||
password="test",
|
|
||||||
remote_path="/heurams/",
|
|
||||||
sync_mode=SyncMode.BIDIRECTIONAL,
|
|
||||||
conflict_strategy=ConflictStrategy.NEWER,
|
|
||||||
verify_ssl=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
"""在每个测试之后清理."""
|
|
||||||
self.temp_dir.cleanup()
|
|
||||||
|
|
||||||
@patch("heurams.services.sync_service.Client")
|
|
||||||
def test_sync_service_initialization(self, mock_client_class):
|
|
||||||
"""测试同步服务初始化."""
|
|
||||||
mock_client_class.return_value = self.mock_client
|
|
||||||
|
|
||||||
service = SyncService(self.config)
|
|
||||||
|
|
||||||
# 验证客户端已创建
|
|
||||||
mock_client_class.assert_called_once()
|
|
||||||
self.assertIsNotNone(service.client)
|
|
||||||
self.assertEqual(service.config, self.config)
|
|
||||||
|
|
||||||
@patch("heurams.services.sync_service.Client")
|
|
||||||
def test_sync_service_disabled(self, mock_client_class):
|
|
||||||
"""测试同步服务未启用."""
|
|
||||||
config = SyncConfig(enabled=False)
|
|
||||||
service = SyncService(config)
|
|
||||||
|
|
||||||
# 客户端不应初始化
|
|
||||||
mock_client_class.assert_not_called()
|
|
||||||
self.assertIsNone(service.client)
|
|
||||||
|
|
||||||
@patch("heurams.services.sync_service.Client")
|
|
||||||
def test_test_connection_success(self, mock_client_class):
|
|
||||||
"""测试连接测试成功."""
|
|
||||||
mock_client_class.return_value = self.mock_client
|
|
||||||
self.mock_client.list.return_value = []
|
|
||||||
|
|
||||||
service = SyncService(self.config)
|
|
||||||
result = service.test_connection()
|
|
||||||
|
|
||||||
self.assertTrue(result)
|
|
||||||
self.mock_client.list.assert_called_once()
|
|
||||||
|
|
||||||
@patch("heurams.services.sync_service.Client")
|
|
||||||
def test_test_connection_failure(self, mock_client_class):
|
|
||||||
"""测试连接测试失败."""
|
|
||||||
mock_client_class.return_value = self.mock_client
|
|
||||||
self.mock_client.list.side_effect = Exception("连接失败")
|
|
||||||
|
|
||||||
service = SyncService(self.config)
|
|
||||||
result = service.test_connection()
|
|
||||||
|
|
||||||
self.assertFalse(result)
|
|
||||||
self.mock_client.list.assert_called_once()
|
|
||||||
|
|
||||||
@patch("heurams.services.sync_service.Client")
|
|
||||||
def test_upload_file(self, mock_client_class):
|
|
||||||
"""测试上传单个文件."""
|
|
||||||
mock_client_class.return_value = self.mock_client
|
|
||||||
|
|
||||||
service = SyncService(self.config)
|
|
||||||
result = service.upload_file(self.test_file)
|
|
||||||
|
|
||||||
self.assertTrue(result)
|
|
||||||
self.mock_client.upload_file.assert_called_once()
|
|
||||||
|
|
||||||
@patch("heurams.services.sync_service.Client")
|
|
||||||
def test_download_file(self, mock_client_class):
|
|
||||||
"""测试下载单个文件."""
|
|
||||||
mock_client_class.return_value = self.mock_client
|
|
||||||
|
|
||||||
service = SyncService(self.config)
|
|
||||||
remote_path = "/heurams/test.txt"
|
|
||||||
local_path = self.temp_path / "downloaded.txt"
|
|
||||||
|
|
||||||
result = service.download_file(remote_path, local_path)
|
|
||||||
|
|
||||||
self.assertTrue(result)
|
|
||||||
self.mock_client.download_file.assert_called_once()
|
|
||||||
self.assertTrue(local_path.parent.exists())
|
|
||||||
|
|
||||||
@patch("heurams.services.sync_service.Client")
|
|
||||||
def test_sync_directory_no_files(self, mock_client_class):
|
|
||||||
"""测试同步空目录."""
|
|
||||||
mock_client_class.return_value = self.mock_client
|
|
||||||
self.mock_client.list.return_value = []
|
|
||||||
self.mock_client.mkdir.return_value = None
|
|
||||||
|
|
||||||
service = SyncService(self.config)
|
|
||||||
result = service.sync_directory(self.temp_path)
|
|
||||||
|
|
||||||
self.assertTrue(result["success"])
|
|
||||||
self.assertEqual(result["uploaded"], 0)
|
|
||||||
self.assertEqual(result["downloaded"], 0)
|
|
||||||
self.mock_client.mkdir.assert_called_once()
|
|
||||||
|
|
||||||
@patch("heurams.services.sync_service.Client")
|
|
||||||
def test_sync_directory_upload_only(self, mock_client_class):
|
|
||||||
"""测试仅上传模式."""
|
|
||||||
mock_client_class.return_value = self.mock_client
|
|
||||||
self.mock_client.list.return_value = []
|
|
||||||
self.mock_client.mkdir.return_value = None
|
|
||||||
|
|
||||||
config = SyncConfig(
|
|
||||||
enabled=True,
|
|
||||||
url="https://example.com/dav/",
|
|
||||||
username="test",
|
|
||||||
password="test",
|
|
||||||
remote_path="/heurams/",
|
|
||||||
sync_mode=SyncMode.UPLOAD_ONLY,
|
|
||||||
conflict_strategy=ConflictStrategy.NEWER,
|
|
||||||
)
|
|
||||||
|
|
||||||
service = SyncService(config)
|
|
||||||
result = service.sync_directory(self.temp_path)
|
|
||||||
|
|
||||||
self.assertTrue(result["success"])
|
|
||||||
self.mock_client.mkdir.assert_called_once()
|
|
||||||
|
|
||||||
@patch("heurams.services.sync_service.Client")
|
|
||||||
def test_conflict_strategy_newer(self, mock_client_class):
|
|
||||||
"""测试 NEWER 冲突策略."""
|
|
||||||
mock_client_class.return_value = self.mock_client
|
|
||||||
|
|
||||||
# 模拟远程文件存在
|
|
||||||
self.mock_client.list.return_value = ["test.txt"]
|
|
||||||
self.mock_client.info.return_value = {
|
|
||||||
"size": 100,
|
|
||||||
"modified": "2023-01-01T00:00:00Z",
|
|
||||||
}
|
|
||||||
self.mock_client.mkdir.return_value = None
|
|
||||||
|
|
||||||
service = SyncService(self.config)
|
|
||||||
result = service.sync_directory(self.temp_path)
|
|
||||||
|
|
||||||
self.assertTrue(result["success"])
|
|
||||||
# 应该有一个冲突
|
|
||||||
self.assertGreaterEqual(result.get("conflicts", 0), 0)
|
|
||||||
|
|
||||||
@patch("heurams.services.sync_service.Client")
|
|
||||||
def test_create_sync_service_from_config(self, mock_client_class):
|
|
||||||
"""测试从配置文件创建同步服务."""
|
|
||||||
mock_client_class.return_value = self.mock_client
|
|
||||||
|
|
||||||
# 创建临时配置文件
|
|
||||||
config_data = {
|
|
||||||
"sync": {
|
|
||||||
"webdav": {
|
|
||||||
"enabled": True,
|
|
||||||
"url": "https://example.com/dav/",
|
|
||||||
"username": "test",
|
|
||||||
"password": "test",
|
|
||||||
"remote_path": "/heurams/",
|
|
||||||
"sync_mode": "bidirectional",
|
|
||||||
"conflict_strategy": "newer",
|
|
||||||
"verify_ssl": True,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
# 模拟 config_var
|
|
||||||
with patch("heurams.services.sync_service.config_var") as mock_config_var:
|
|
||||||
mock_config = MagicMock()
|
|
||||||
mock_config.data = config_data
|
|
||||||
mock_config_var.get.return_value = mock_config
|
|
||||||
|
|
||||||
from heurams.services.sync_service import \
|
|
||||||
create_sync_service_from_config
|
|
||||||
|
|
||||||
service = create_sync_service_from_config()
|
|
||||||
|
|
||||||
self.assertIsNotNone(service)
|
|
||||||
self.assertIsNotNone(service.client)
|
|
||||||
|
|
||||||
|
|
||||||
class TestSyncScreenUnit(unittest.TestCase):
|
|
||||||
"""SyncScreen 的单元测试."""
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
"""在每个测试之前运行."""
|
|
||||||
self.temp_dir = tempfile.TemporaryDirectory()
|
|
||||||
self.temp_path = pathlib.Path(self.temp_dir.name)
|
|
||||||
|
|
||||||
# 创建默认配置
|
|
||||||
default_config_path = (
|
|
||||||
pathlib.Path(__file__).parent.parent.parent
|
|
||||||
/ "src/heurams/default/config/config.toml"
|
|
||||||
)
|
|
||||||
self.config = ConfigFile(default_config_path)
|
|
||||||
|
|
||||||
# 更新配置中的路径
|
|
||||||
config_data = self.config.data
|
|
||||||
config_data["paths"]["nucleon_dir"] = str(self.temp_path / "nucleon")
|
|
||||||
config_data["paths"]["electron_dir"] = str(self.temp_path / "electron")
|
|
||||||
config_data["paths"]["orbital_dir"] = str(self.temp_path / "orbital")
|
|
||||||
config_data["paths"]["cache_dir"] = str(self.temp_path / "cache")
|
|
||||||
|
|
||||||
# 添加同步配置
|
|
||||||
if "sync" not in config_data:
|
|
||||||
config_data["sync"] = {}
|
|
||||||
config_data["sync"]["webdav"] = {
|
|
||||||
"enabled": False,
|
|
||||||
"url": "",
|
|
||||||
"username": "",
|
|
||||||
"password": "",
|
|
||||||
"remote_path": "/heurams/",
|
|
||||||
"sync_mode": "bidirectional",
|
|
||||||
"conflict_strategy": "newer",
|
|
||||||
"verify_ssl": True,
|
|
||||||
}
|
|
||||||
|
|
||||||
# 创建目录
|
|
||||||
for dir_key in ["nucleon_dir", "electron_dir", "orbital_dir", "cache_dir"]:
|
|
||||||
pathlib.Path(config_data["paths"][dir_key]).mkdir(
|
|
||||||
parents=True, exist_ok=True
|
|
||||||
)
|
|
||||||
|
|
||||||
# 使用 ConfigContext 设置配置
|
|
||||||
self.config_ctx = ConfigContext(self.config)
|
|
||||||
self.config_ctx.__enter__()
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
"""在每个测试之后清理."""
|
|
||||||
self.config_ctx.__exit__(None, None, None)
|
|
||||||
self.temp_dir.cleanup()
|
|
||||||
|
|
||||||
@patch("heurams.interface.screens.synctool.create_sync_service_from_config")
|
|
||||||
def test_sync_screen_compose(self, mock_create_service):
|
|
||||||
"""测试 SyncScreen 的 compose 方法."""
|
|
||||||
from heurams.interface.screens.synctool import SyncScreen
|
|
||||||
|
|
||||||
# 模拟同步服务
|
|
||||||
mock_service = MagicMock()
|
|
||||||
mock_service.client = MagicMock()
|
|
||||||
mock_create_service.return_value = mock_service
|
|
||||||
|
|
||||||
screen = SyncScreen()
|
|
||||||
|
|
||||||
# 测试 compose 方法
|
|
||||||
from textual.app import ComposeResult
|
|
||||||
|
|
||||||
result = screen.compose()
|
|
||||||
widgets = list(result)
|
|
||||||
|
|
||||||
# 检查基本部件
|
|
||||||
from textual.containers import ScrollableContainer
|
|
||||||
from textual.widgets import Button, Footer, Header, ProgressBar, Static
|
|
||||||
|
|
||||||
header_present = any(isinstance(w, Header) for w in widgets)
|
|
||||||
footer_present = any(isinstance(w, Footer) for w in widgets)
|
|
||||||
self.assertTrue(header_present)
|
|
||||||
self.assertTrue(footer_present)
|
|
||||||
|
|
||||||
# 检查容器
|
|
||||||
container_present = any(isinstance(w, ScrollableContainer) for w in widgets)
|
|
||||||
self.assertTrue(container_present)
|
|
||||||
|
|
||||||
@patch("heurams.interface.screens.synctool.create_sync_service_from_config")
|
|
||||||
def test_sync_screen_load_config(self, mock_create_service):
|
|
||||||
"""测试 SyncScreen 加载配置."""
|
|
||||||
from heurams.interface.screens.synctool import SyncScreen
|
|
||||||
|
|
||||||
mock_service = MagicMock()
|
|
||||||
mock_service.client = MagicMock()
|
|
||||||
mock_create_service.return_value = mock_service
|
|
||||||
|
|
||||||
screen = SyncScreen()
|
|
||||||
screen.load_config()
|
|
||||||
|
|
||||||
# 验证配置已加载
|
|
||||||
self.assertIsNotNone(screen.sync_config)
|
|
||||||
mock_create_service.assert_called_once()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
unittest.main()
|
|
||||||
Reference in New Issue
Block a user