18 Commits
0.4.0 ... dev

Author SHA1 Message Date
b5f30ec4ee style: 格式化代码 2025-12-21 23:44:13 +08:00
87cefedb61 feat(interface): 增加智能单元集排序 2025-12-21 23:42:02 +08:00
0fb421412e fix: 暂时禁用实验性功能 2025-12-21 23:06:17 +08:00
ee0646ac79 refactor(synctool): 改进同步方案 2025-12-21 21:18:31 +08:00
d8fc18166d feat(synctool): 虚拟文件系统初步方案 2025-12-21 18:48:25 +08:00
a2e12c7462 style: 格式化代码 2025-12-21 07:56:10 +08:00
1efe034a59 feat(synctool): 增加同步功能 2025-12-21 07:49:19 +08:00
0a365b568a feat(kernel): 添加算法切换设置 2025-12-21 06:48:30 +08:00
e303d4dc1e style: 更新版本号和合并规则 2025-12-21 06:34:17 +08:00
cb78290f05 fix(interface): 修复仪表盘详情 2025-12-21 06:27:00 +08:00
e0417981b1 feat(interface): 更改启动方式 2025-12-21 06:06:16 +08:00
a0660d3348 fix(interface): 修复显示问题 2025-12-21 05:47:22 +08:00
f5e0417292 feat: 自动音频播放与改进设计 2025-12-21 05:32:58 +08:00
e57cea7219 style(version): 更新版本号 2025-12-21 03:02:29 +08:00
98ec6504a4 feat: 实验性 SM-15M 算法实现
实验性 SM-15M 逆向工程算法实现
2025-12-21 02:15:23 +08:00
243eea864b style: isort 格式化 2025-12-19 15:13:42 +08:00
cfb1385f4d style: 格式化代码 2025-12-19 15:08:26 +08:00
a1462206a2 fix(interface): 修复默认配置文件 2025-12-18 15:53:18 +08:00
74 changed files with 3097 additions and 525 deletions

7
.gitignore vendored
View File

@@ -11,15 +11,16 @@ electron/test.toml
build/
dist/
old/
# Project specific directories
# config/
data/cache/
data/electron/
data/nucleon/
!data/nucleon/test*
data/global/
!data/nucleon/TEST*
data/orbital/
config/config_dev.toml
AGENTS.md
*.log.1
# Byte-compiled / optimized / DLL files
__pycache__/

View File

@@ -10,13 +10,15 @@
- `dev` 分支: 开发版本
- 功能分支: 从 `dev` 分支创建, 命名格式为 `feature/描述``fix/描述``refactor/描述`
2. **代码风格**:
- 请使用 Black 格式化代码
- 请使用 Black 与 isort 格式化代码
- 遵循 PEP 8 规范
- 添加适当的文档字符串
3. **提交消息**:
- 使用简体中文或英文撰写清晰的提交消息
- 格式: 遵循 Conventional Commits 规范
4. **合并方式**:
- 不使用 Fast-forward 合并
- 可以设置 `git config merge.ff false`
## 设置开发环境
```bash

View File

@@ -30,6 +30,7 @@
- 自然语音: 集成微软神经网络文本转语音 (TTS) 技术
- 多种谜题类型: 选择题 (MCQ)、填空题 (Cloze)、识别题 (Recognition)
- 动态内容生成: 支持宏驱动的模板系统, 根据上下文动态生成题目
- 云同步支持: 通过 WebDAV 协议同步数据到远程服务器
### 实用用户界面
- 响应式 Textual 框架构建的跨平台 TUI 界面
@@ -82,7 +83,23 @@ python -m heurams.interface
## 配置
配置文件位于 `config/config.toml`相对于工作目录. 如果不存在, 会使用内置的默认配置.
配置文件位于 `config/config.toml`(相对于工作目录). 如果不存在, 会使用内置的默认配置.
### 同步配置
同步功能支持 WebDAV 协议,可在配置文件的 `[sync.webdav]` 段进行配置:
```toml
[sync.webdav]
enabled = false
url = "" # WebDAV 服务器地址
username = "" # 用户名
password = "" # 密码
remote_path = "/heurams/" # 远程路径
sync_mode = "bidirectional" # 同步模式: bidirectional/upload_only/download_only
conflict_strategy = "newer" # 冲突策略: newer/ask/keep_both
verify_ssl = true # SSL 证书验证
```
启用同步后,可通过应用内的同步工具进行数据备份和恢复。
## 项目结构
@@ -104,6 +121,7 @@ graph TB
Timer[时间服务]
AudioService[音频服务]
TTSService[TTS服务]
SyncService[同步服务]
OtherServices[其他服务]
end
@@ -156,7 +174,8 @@ src/heurams/
│ ├── logger.py # 日志系统
│ ├── timer.py # 时间服务
│ ├── audio_service.py # 音频播放抽象
── tts_service.py # 文本转语音抽象
── tts_service.py # 文本转语音抽象
│ └── sync_service.py # WebDAV 同步服务
├── kernel/ # 核心业务逻辑
│ ├── algorithms/ # 间隔重复算法 (FSRS, SM2)
│ ├── particles/ # 数据模型 (Atom, Electron, Nucleon, Orbital)

View File

@@ -14,6 +14,14 @@ scheduled_num = 8
# UTC 时间戳修正 仅用于 UNIX 日时间戳的生成修正, 单位为秒
timezone_offset = +28800 # 中国标准时间 (UTC+8)
[interface]
[interface.memorizor]
autovoice = true # 自动语音播放, 仅限于 recognition 组件
[algorithm]
default = "SM-2" # 主要算法; 可选项: SM-2, SM-15M, FSRS
[puzzles] # 谜题默认配置
[puzzles.mcq]
@@ -25,6 +33,7 @@ min_denominator = 3
[paths] # 相对于配置文件的 ".." (即工作目录) 而言 或绝对路径
nucleon_dir = "./data/nucleon"
electron_dir = "./data/electron"
global_dir = "./data/global" # 全局数据路径, SM-15 等算法需要
orbital_dir = "./data/orbital"
cache_dir = "./data/cache"
template_dir = "./data/template"
@@ -33,7 +42,20 @@ template_dir = "./data/template"
audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Android Termux), mpg123(TODO)
tts = "edgetts" # 可选项: edgetts
llm = "openai" # 可选项: openai
sync = "webdav" # 可选项: 留空, webdav
[providers.tts.edgetts] # EdgeTTS 设置
voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声)
[providers.llm.openai] # 与 OpenAI 相容的语言模型接口服务设置
url = ""
key = ""
[providers.sync.webdav] # WebDAV 同步设置
url = ""
username = ""
password = ""
remote_path = "/heurams/"
verify_ssl = true
[sync]

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "heurams"
version = "0.4.0"
version = "0.4.3"
description = "Heuristic Assisted Memory Scheduler"
license = {file = "LICENSE"}
classifiers = [
@@ -25,3 +25,4 @@ readme = "README.md"
[tool.setuptools.packages.find]
where = ["src"]

View File

@@ -2,3 +2,5 @@ bidict==0.23.1
playsound==1.2.2
textual==5.3.0
toml==0.10.2
requests>=2.31.0
webdavclient3>=3.0.0

View File

@@ -3,8 +3,9 @@
以及基准路径
"""
from contextvars import ContextVar
import pathlib
from contextvars import ContextVar
from heurams.services.config import ConfigFile
from heurams.services.logger import get_logger
@@ -31,7 +32,12 @@ try:
except Exception as e:
print("未能加载自定义用户配置")
logger.warning("未能加载自定义用户配置, 错误: %s", e)
if pathlib.Path(workdir / "config" / "config_dev.toml").exists():
print("使用开发设置")
logger.debug("使用开发设置")
config_var: ContextVar[ConfigFile] = ContextVar(
"config_var", default=ConfigFile(workdir / "config" / "config_dev.toml")
)
# runtime_var: ContextVar = ContextVar('runtime_var', default=dict()) # 运行时共享数据

View File

@@ -6,14 +6,22 @@ daystamp_override = -1
timestamp_override = -1
# [调试] 一键通过
quick_pass = 0
quick_pass = 1
# 对于每个项目的默认新记忆原子数量
tasked_number = 8
scheduled_num = 8
# UTC 时间戳修正 仅用于 UNIX 日时间戳的生成修正, 单位为秒
timezone_offset = +28800 # 中国标准时间 (UTC+8)
[interface]
[interface.memorizor]
autovoice = true # 自动语音播放, 仅限于 recognition 组件
[algorithm]
default = "SM-2" # 主要算法; 可选项: SM-2, SM-15M, FSRS
[puzzles] # 谜题默认配置
[puzzles.mcq]
@@ -22,8 +30,32 @@ max_riddles_num = 2
[puzzles.cloze]
min_denominator = 3
[paths] # 相对于工作目录而言 或绝对路径
[paths] # 相对于配置文件的 ".." (即工作目录) 而言 或绝对路径
nucleon_dir = "./data/nucleon"
electron_dir = "./data/electron"
global_dir = "./data/global" # 全局数据路径, SM-15 等算法需要
orbital_dir = "./data/orbital"
cache_dir = "./data/cache"
template_dir = "./data/template"
[services] # 定义服务到提供者的映射
audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Android Termux), mpg123(TODO)
tts = "edgetts" # 可选项: edgetts
llm = "openai" # 可选项: openai
sync = "webdav" # 可选项: 留空, webdav
[providers.tts.edgetts] # EdgeTTS 设置
voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声)
[providers.llm.openai] # 与 OpenAI 相容的语言模型接口服务设置
url = ""
key = ""
[providers.sync.webdav] # WebDAV 同步设置
url = ""
username = ""
password = ""
remote_path = "/heurams/"
verify_ssl = true
[sync]

View File

@@ -0,0 +1,63 @@
from textual.app import App
from textual.widgets import Button
from heurams.context import config_var
from heurams.services.logger import get_logger
from .screens.about import AboutScreen
from .screens.dashboard import DashboardScreen
from .screens.nucreator import NucleonCreatorScreen
from .screens.precache import PrecachingScreen
from .screens.synctool import SyncScreen
logger = get_logger(__name__)
def environment_check():
from pathlib import Path
logger.debug("检查环境路径")
for i in config_var.get()["paths"].values():
i = Path(i)
if not i.exists():
logger.info("创建目录: %s", i)
print(f"创建 {i}")
i.mkdir(exist_ok=True, parents=True)
else:
logger.debug("目录已存在: %s", i)
print(f"找到 {i}")
logger.debug("环境检查完成")
class HeurAMSApp(App):
TITLE = "潜进"
CSS_PATH = "css/main.tcss"
SUB_TITLE = "启发式辅助记忆调度器"
BINDINGS = [
("q", "quit", "退出"),
("d", "toggle_dark", "切换色调"),
("1", "app.push_screen('dashboard')", "仪表盘"),
("2", "app.push_screen('precache_all')", "缓存管理器"),
("3", "app.push_screen('nucleon_creator')", "创建新单元"),
# ("4", "app.push_screen('synctool')", "同步工具"),
("0", "app.push_screen('about')", "版本信息"),
]
SCREENS = {
"dashboard": DashboardScreen,
"nucleon_creator": NucleonCreatorScreen,
"precache_all": PrecachingScreen,
"synctool": SyncScreen,
"about": AboutScreen,
}
def on_mount(self) -> None:
environment_check()
self.push_screen("dashboard")
def on_button_pressed(self, event: Button.Pressed) -> None:
self.exit(event.button.id)
def action_do_nothing(self):
print("DO NOTHING")
self.refresh()

View File

@@ -1,87 +1,18 @@
from textual.app import App
from textual.widgets import Button
from heurams.context import config_var
from heurams.interface import HeurAMSApp
from heurams.services.logger import get_logger
from .screens.about import AboutScreen
from .screens.dashboard import DashboardScreen
from .screens.nucreator import NucleonCreatorScreen
from .screens.precache import PrecachingScreen
from .screens.about import AboutScreen
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class HeurAMSApp(App):
TITLE = "潜进"
CSS_PATH = "css/main.tcss"
SUB_TITLE = "启发式辅助记忆调度器"
BINDINGS = [
("q", "quit", "退出"),
("d", "toggle_dark", "切换色调"),
("1", "app.push_screen('dashboard')", "仪表盘"),
("2", "app.push_screen('precache_all')", "缓存管理器"),
("3", "app.push_screen('nucleon_creator')", "创建新单元"),
("0", "app.push_screen('about')", "版本信息"),
]
SCREENS = {
"dashboard": DashboardScreen,
"nucleon_creator": NucleonCreatorScreen,
"precache_all": PrecachingScreen,
"about": AboutScreen,
}
def on_mount(self) -> None:
self.push_screen("dashboard")
def on_button_pressed(self, event: Button.Pressed) -> None:
self.exit(event.button.id)
def action_do_nothing(self):
print("DO NOTHING")
self.refresh()
def environment_check():
from pathlib import Path
logger.debug("检查环境路径")
for i in config_var.get()["paths"].values():
i = Path(i)
if not i.exists():
logger.info("创建目录: %s", i)
print(f"创建 {i}")
i.mkdir(exist_ok=True, parents=True)
else:
logger.debug("目录已存在: %s", i)
print(f"找到 {i}")
logger.debug("环境检查完成")
def is_subdir(parent, child):
try:
child.relative_to(parent)
logger.debug("is_subdir: %s%s 的子目录", child, parent)
return 1
except:
logger.debug("is_subdir: %s 不是 %s 的子目录", child, parent)
return 0
# 开发模式
from heurams.context import rootdir, workdir, config_var
from pathlib import Path
from heurams.context import rootdir
import os
if is_subdir(Path(rootdir), Path(os.getcwd())):
os.chdir(Path(rootdir) / ".." / "..")
print(f'转入开发数据目录: {Path(rootdir)/".."/".."}')
environment_check()
app = HeurAMSApp()
if __name__ == "__main__":
app.run()
def main():
app.run()

View File

@@ -1,15 +1,8 @@
#!/usr/bin/env python3
from textual.app import ComposeResult
from textual.widgets import (
Header,
Footer,
Label,
Static,
Button,
Markdown,
)
from textual.containers import ScrollableContainer, ScrollableContainer
from textual.containers import ScrollableContainer
from textual.screen import Screen
from textual.widgets import Button, Footer, Header, Label, Markdown, Static
import heurams.services.version as version
from heurams.context import *
@@ -38,7 +31,8 @@ class AboutScreen(Screen):
特别感谢:
- [Piotr A. Woźniak](https://supermemo.guru/wiki/Piotr_Wozniak): SuperMemo-2 算法
- [Piotr A. Woźniak](https://supermemo.guru/wiki/Piotr_Wozniak): SM-2 算法与 SM-15 算法理论
- [Kazuaki Tanida](https://github.com/slaypni): SM-15 算法的 CoffeeScript 实现
- [Thoughts Memo](https://www.zhihu.com/people/L.M.Sherlock): 文献参考
# 参与贡献

View File

@@ -1,153 +1,207 @@
#!/usr/bin/env python3
import pathlib
from textual.app import ComposeResult
from textual.widgets import (
Header,
Footer,
Label,
ListView,
ListItem,
Button,
Static,
)
from textual.containers import ScrollableContainer
from textual.screen import Screen
from textual.widgets import (Button, Footer, Header, Label, ListItem, ListView,
Static)
from heurams.kernel.particles import *
from heurams.context import *
import heurams.services.version as version
import heurams.services.timer as timer
from .preparation import PreparationScreen
from .about import AboutScreen
import heurams.services.version as version
from heurams.context import *
from heurams.kernel.particles import *
from heurams.services.logger import get_logger
import pathlib
from .about import AboutScreen
from .preparation import PreparationScreen
logger = get_logger(__name__)
class DashboardScreen(Screen):
"""主仪表盘屏幕"""
SUB_TITLE = "仪表盘"
def __init__(
self,
name: str | None = None,
id: str | None = None,
classes: str | None = None,
) -> None:
super().__init__(name, id, classes)
self.nextdates = {}
self.texts = {}
self.stay_enabled = {}
def compose(self) -> ComposeResult:
"""组合界面组件"""
yield Header(show_clock=True)
yield ScrollableContainer(
Label(f'欢迎使用 "潜进" 启发式辅助记忆调度器', classes="title-label"),
Label('欢迎使用 "潜进" 启发式辅助记忆调度器', classes="title-label"),
Label(f"当前 UNIX 日时间戳: {timer.get_daystamp()}"),
Label(f'时区修正: UTC+{config_var.get()["timezone_offset"] / 3600}'),
Label(f"使用算法: {config_var.get()['algorithm']['default']}"),
Label("选择待学习或待修改的记忆单元集:", classes="title-label"),
ListView(id="union-list", classes="union-list-view"),
Label(
f'"潜进" 启发式辅助记忆调度器 | 版本 {version.ver} {version.codename.capitalize()} 2025'
f'"潜进" 启发式辅助记忆调度器 | 版本 {version.ver} '
f"{version.codename.capitalize()} 2025"
),
)
yield Footer()
def item_desc_generator(self, filename) -> dict:
"""简单分析以生成项目项显示文本
def analyser(self, filename: str) -> dict:
"""分析文件状态以生成显示文本
Args:
filename: 要分析的文件名
Returns:
dict: 以数字为列表, 分别呈现单行字符串
dict: 包含显示文本的字典,键为行号
"""
res = dict()
filestem = pathlib.Path(filename).stem
res[0] = f"{filename}\0"
from heurams.kernel.particles.loader import load_electron
import heurams.kernel.particles as pt
from heurams.kernel.particles.loader import load_electron, load_nucleon
electron_file_path = pathlib.Path(config_var.get()["paths"]["electron_dir"]) / (
filestem + ".json"
)
result = {}
filestem = pathlib.Path(filename).stem
# 构建电子文件路径
electron_dir = config_var.get()["paths"]["electron_dir"]
electron_file_path = pathlib.Path(electron_dir) / f"{filestem}.json"
logger.debug(f"电子文件路径: {electron_file_path}")
if electron_file_path.exists(): # 未找到则创建电子文件 (json)
pass
else:
# 确保电子文件存在
if not electron_file_path.exists():
electron_file_path.touch()
with open(electron_file_path, "w") as f:
f.write("{}")
electron_dict = load_electron(path=electron_file_path) # TODO: 取消硬编码扩展名
logger.debug(electron_dict)
electron_file_path.write_text("{}")
# 加载电子数据
electron_dict = load_electron(path=electron_file_path)
logger.debug(f"电子数据: {electron_dict}")
# 分析电子状态
is_due = 0
is_activated = 0
nextdate = 0x3F3F3F3F
for i in electron_dict.values():
i: pt.Electron
logger.debug(i, i.is_due())
if i.is_due():
for electron in electron_dict.values():
logger.debug(f"{electron}, 是否到期: {electron.is_due()}")
if electron.is_due():
is_due = 1
if i.is_activated():
if electron.is_activated():
is_activated = 1
nextdate = min(nextdate, i.nextdate())
res[1] = f"下一次复习: {nextdate}\n"
res[1] += f"{is_due if "需要复习" else "当前无需复习"}"
nextdate = min(nextdate, electron.nextdate())
# 检查是否需要更多复习
nucleon_dir = config_var.get()["paths"]["nucleon_dir"]
nucleon_path = pathlib.Path(nucleon_dir) / f"{filestem}.toml"
nucleon_count = len(load_nucleon(nucleon_path))
electron_count = len(electron_dict)
is_more = not (electron_count >= nucleon_count)
logger.debug(f"是否需要更多复习: {is_more}")
# 更新状态
self.nextdates[filename] = nextdate
self.stay_enabled[filename] = is_due or is_more
# 构建返回结果
result[0] = f"{filename}\0"
if not is_activated:
res[1] = " 尚未激活"
return res
result[1] = " 尚未激活"
else:
status_text = "需要复习" if is_due else "当前无需复习"
result[1] = f"下一次复习: {nextdate}\n{status_text}"
return result
def on_mount(self) -> None:
"""挂载组件时初始化"""
union_list_widget = self.query_one("#union-list", ListView)
probe = probe_all(0)
if len(probe["nucleon"]):
# 分析所有文件
for file in probe["nucleon"]:
text = self.item_desc_generator(file)
union_list_widget.append(
ListItem(
Label(text[0] + "\n" + text[1]),
self.texts[file] = self.analyser(file)
# 按下次复习时间排序
nucleon_files = sorted(
probe["nucleon"],
key=lambda f: self.nextdates[f],
reverse=True,
)
)
else:
# 填充列表
if not probe["nucleon"]:
union_list_widget.append(
ListItem(
Static(
"在 ./nucleon/ 中未找到任何内容源数据文件.\n请放置文件后重启应用.\n或者新建空的单元集."
"在 ./nucleon/ 中未找到任何内容源数据文件\n"
"请放置文件后重启应用,或者新建空的单元集。"
)
)
)
union_list_widget.disabled = True
return
for file in nucleon_files:
text = self.texts[file]
list_item = ListItem(Label(f"{text[0]}\n{text[1]}"))
union_list_widget.append(list_item)
if not self.stay_enabled[file]:
list_item.disabled = True
def on_list_view_selected(self, event) -> None:
"""处理列表项选择事件"""
if not isinstance(event.item, ListItem):
return
selected_label = event.item.query_one(Label)
if "未找到任何 .toml 文件" in str(selected_label.renderable): # type: ignore
label_text = str(selected_label.renderable)
if "未找到任何 .toml 文件" in label_text:
return
selected_filename = pathlib.Path(
str(selected_label.renderable)
.partition("\0")[0] # 文件名末尾截断, 保留文件名
.replace("*", "")
) # 去除markdown加粗
# 提取文件名
selected_filename = pathlib.Path(label_text.partition("\0")[0].replace("*", ""))
nucleon_file_path = (
pathlib.Path(config_var.get()["paths"]["nucleon_dir"]) / selected_filename
)
electron_file_path = pathlib.Path(config_var.get()["paths"]["electron_dir"]) / (
str(selected_filename.stem) + ".json"
# 构建文件路径
nucleon_dir = config_var.get()["paths"]["nucleon_dir"]
electron_dir = config_var.get()["paths"]["electron_dir"]
nucleon_file_path = pathlib.Path(nucleon_dir) / selected_filename
electron_file_path = (
pathlib.Path(electron_dir) / f"{selected_filename.stem}.json"
)
# 跳转到准备屏幕
self.app.push_screen(PreparationScreen(nucleon_file_path, electron_file_path))
def on_button_pressed(self, event) -> None:
if event.button.id == "new_nucleon_button":
# 切换到创建单元
"""处理按钮点击事件"""
button_id = event.button.id
if button_id == "new_nucleon_button":
from .nucreator import NucleonCreatorScreen
newscr = NucleonCreatorScreen()
self.app.push_screen(newscr)
elif event.button.id == "precache_all_button":
# 切换到缓存管理器
new_screen = NucleonCreatorScreen()
self.app.push_screen(new_screen)
elif button_id == "precache_all_button":
from .precache import PrecachingScreen
precache_screen = PrecachingScreen()
self.app.push_screen(precache_screen)
elif event.button.id == "about_button":
from .about import AboutScreen
elif button_id == "about_button":
about_screen = AboutScreen()
self.app.push_screen(about_screen)
def action_quit_app(self) -> None:
"""退出应用程序"""
self.app.exit()

View File

@@ -1,16 +1,18 @@
#!/usr/bin/env python3
from textual.app import ComposeResult
from textual.widgets import Header, Footer, Label, Static, Button
from textual.containers import Center, ScrollableContainer
from textual.screen import Screen
from textual.reactive import reactive
from enum import Enum, auto
from heurams.services.logger import get_logger
from heurams.context import config_var
from heurams.kernel.reactor import *
from textual.app import ComposeResult
from textual.containers import Center, ScrollableContainer
from textual.reactive import reactive
from textual.screen import Screen
from textual.widgets import Button, Footer, Header, Label, Static
import heurams.kernel.particles as pt
import heurams.kernel.puzzles as pz
from heurams.context import config_var
from heurams.kernel.reactor import *
from heurams.services.logger import get_logger
from .. import shim
@@ -44,14 +46,14 @@ class MemScreen(Screen):
) -> None:
super().__init__(name, id, classes)
self.atoms = atoms
for i in self.atoms:
i.do_eval()
self.phaser = Phaser(atoms)
# logger.debug(self.phaser.state)
self.procession: Procession = self.phaser.current_procession() # type: ignore
self.atom: pt.Atom = self.procession.current_atom
# logger.debug(self.phaser.state)
# self.procession.forward(1)
for i in atoms:
i.do_eval()
def on_mount(self):
self.load_puzzle()
@@ -142,8 +144,29 @@ class MemScreen(Screen):
self.atom.lock(1)
def action_play_voice(self):
self.run_worker(self.play_voice, exclusive=True, thread=True)
def play_voice(self):
"""朗读当前内容"""
pass
from pathlib import Path
from heurams.services.audio_service import play_by_path
from heurams.services.hasher import get_md5
path = Path(config_var.get()["paths"]["cache_dir"])
path = (
path
/ f"{get_md5(self.atom.registry['nucleon'].metadata["formation"]["tts_text"])}.wav"
)
if path.exists():
play_by_path(path)
else:
from heurams.services.tts_service import convertor
convertor(
self.atom.registry["nucleon"].metadata["formation"]["tts_text"], path
)
play_by_path(path)
def action_toggle_dark(self):
self.app.action_toggle_dark()

View File

@@ -1,21 +1,15 @@
#!/usr/bin/env python3
from pathlib import Path
import toml
from textual.app import ComposeResult
from textual.widgets import (
Header,
Footer,
Label,
Input,
Select,
Button,
Markdown,
)
from textual.containers import ScrollableContainer
from textual.screen import Screen
from textual.widgets import (Button, Footer, Header, Input, Label, Markdown,
Select)
from heurams.services.version import ver
import toml
from pathlib import Path
from heurams.context import config_var
from heurams.services.version import ver
class NucleonCreatorScreen(Screen):
@@ -27,6 +21,7 @@ class NucleonCreatorScreen(Screen):
def search_templates(self):
from pathlib import Path
from heurams.context import config_var
template_dir = Path(config_var.get()["paths"]["template_dir"])

View File

@@ -1,22 +1,15 @@
#!/usr/bin/env python3
from textual.app import ComposeResult
from textual.widgets import (
Header,
Footer,
Label,
Button,
Static,
ProgressBar,
)
from textual.containers import ScrollableContainer, Horizontal
from textual.containers import ScrollableContainer
from textual.screen import Screen
import pathlib
from textual.app import ComposeResult
from textual.containers import Horizontal, ScrollableContainer
from textual.screen import Screen
from textual.widgets import Button, Footer, Header, Label, ProgressBar, Static
from textual.worker import get_current_worker
import heurams.kernel.particles as pt
import heurams.services.hasher as hasher
from heurams.context import *
from textual.worker import get_current_worker
class PrecachingScreen(Screen):
@@ -46,7 +39,9 @@ class PrecachingScreen(Screen):
self.desc = desc
for i in nucleons:
i: pt.Nucleon
i.do_eval()
atom = pt.Atom()
atom.link("nucleon", i)
atom.do_eval()
# print("完成 EVAL")
def compose(self) -> ComposeResult:
@@ -96,17 +91,16 @@ class PrecachingScreen(Screen):
def precache_by_text(self, text: str):
"""预缓存单段文本的音频"""
from heurams.context import rootdir, workdir, config_var
from heurams.context import config_var, rootdir, workdir
cache_dir = pathlib.Path(config_var.get()["paths"]["cache_dir"])
cache_dir.mkdir(parents=True, exist_ok=True)
cache_file = cache_dir / f"{hasher.get_md5(text)}.wav"
if not cache_file.exists():
try: # TODO: 调用模块消除tts耦合
import edge_tts as tts
try:
from heurams.services.tts_service import convertor
communicate = tts.Communicate(text, "zh-CN-XiaoxiaoNeural")
communicate.save_sync(str(cache_file))
convertor(text, cache_file)
return 1
except Exception as e:
print(f"预缓存失败 '{text}': {e}")
@@ -166,7 +160,7 @@ class PrecachingScreen(Screen):
def precache_all_files(self):
"""预缓存所有文件"""
from heurams.context import rootdir, workdir, config_var
from heurams.context import config_var, rootdir, workdir
nucleon_path = pathlib.Path(config_var.get()["paths"]["nucleon_dir"])
nucleon_files = [
@@ -185,7 +179,9 @@ class PrecachingScreen(Screen):
self.total = len(nu)
for i in nu:
i: pt.Nucleon
i.do_eval()
atom = pt.Atom()
atom.link("nucleon", i)
atom.do_eval()
return self.precache_by_list(nu)
def on_button_pressed(self, event: Button.Pressed) -> None:
@@ -220,7 +216,8 @@ class PrecachingScreen(Screen):
# 清空缓存
try:
import shutil
from heurams.context import rootdir, workdir, config_var
from heurams.context import config_var, rootdir, workdir
shutil.rmtree(
f"{config_var.get()["paths"]["cache_dir"]}", ignore_errors=True

View File

@@ -1,21 +1,15 @@
#!/usr/bin/env python3
from textual.app import ComposeResult
from textual.widgets import (
Header,
Footer,
Label,
Static,
Button,
Markdown,
)
from textual.containers import ScrollableContainer
from textual.reactive import reactive
from textual.screen import Screen
from heurams.context import config_var
from textual.widget import Widget
from textual.widgets import Button, Footer, Header, Label, Markdown, Static
import heurams.kernel.particles as pt
import heurams.services.hasher as hasher
from heurams.context import *
from textual.reactive import reactive
from textual.widget import Widget
from heurams.context import config_var
from heurams.services.logger import get_logger
logger = get_logger(__name__)

View File

@@ -1,22 +1,16 @@
#!/usr/bin/env python3
from textual.app import ComposeResult
from textual.widgets import (
Header,
Footer,
Label,
Button,
Static,
ProgressBar,
)
from textual.containers import ScrollableContainer, Horizontal
from textual.containers import ScrollableContainer
from textual.screen import Screen
import pathlib
import time
from textual.app import ComposeResult
from textual.containers import Horizontal, ScrollableContainer
from textual.screen import Screen
from textual.widgets import Button, Footer, Header, Label, ProgressBar, Static
from textual.worker import get_current_worker
import heurams.kernel.particles as pt
import heurams.services.hasher as hasher
from heurams.context import *
from textual.worker import get_current_worker
class SyncScreen(Screen):
@@ -25,22 +19,287 @@ class SyncScreen(Screen):
def __init__(self, nucleons: list = [], desc: str = ""):
super().__init__(name=None, id=None, classes=None)
self.sync_service = None
self.is_syncing = False
self.is_paused = False
self.log_messages = []
self.max_log_lines = 50
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with ScrollableContainer(id="sync_container"):
pass
# 标题和连接状态
yield Static("同步工具", classes="title")
yield Static("", id="status_label", classes="status")
# 配置信息
yield Static(f"同步协议: {config_var.get()['services']['sync']}")
yield Static("服务器配置:", classes="section_title")
with Horizontal(classes="config_info"):
yield Static("远程服务器:", classes="config_label")
yield Static("", id="server_url", classes="config_value")
with Horizontal(classes="config_info"):
yield Static("远程路径:", classes="config_label")
yield Static("", id="remote_path", classes="config_value")
with Horizontal(classes="control_buttons"):
yield Button("测试连接", id="test_connection", variant="primary")
yield Button("开始同步", id="start_sync", variant="success")
yield Button("暂停", id="pause_sync", variant="warning", disabled=True)
yield Button("取消", id="cancel_sync", variant="error", disabled=True)
yield Static("同步进度", classes="section_title")
yield ProgressBar(id="progress_bar", show_percentage=True, total=100)
yield Static("", id="progress_label", classes="progress_text")
yield Static("同步日志", classes="section_title")
yield Static("", id="log_output", classes="log_output")
yield Footer()
def on_mount(self):
"""挂载时初始化状态"""
self.update_ui_from_config()
self.log_message("同步工具已启动")
def update_ui_from_config(self):
"""更新 UI 显示配置信息"""
try:
sync_cfg: dict = config_var.get()["providers"]["sync"]["webdav"]
# 更新服务器 URL
url = sync_cfg.get("url", "未配置")
url_widget = self.query_one("#server_url")
url_widget.update(url) # type: ignore
# 更新远程路径
remote_path = sync_cfg.get("remote_path", "/")
path_widget = self.query_one("#remote_path")
path_widget.update(remote_path) # type: ignore
# 更新状态标签
status_widget = self.query_one("#status_label")
if self.sync_service and self.sync_service.client:
status_widget.update("✅ 同步服务已就绪") # type: ignore
status_widget.add_class("ready")
else:
status_widget.update("❌ 同步服务未配置或未启用") # type: ignore
status_widget.add_class("error")
except Exception as e:
self.log_message(f"更新 UI 失败: {e}", is_error=True)
def update_status(self, status, current_item="", progress=None):
"""更新状态显示"""
try:
status_widget = self.query_one("#status_label")
status_widget.update(status) # type: ignore
if progress is not None:
progress_bar = self.query_one("#progress_bar")
progress_bar.progress = progress # type: ignore
progress_label = self.query_one("#progress_label")
progress_label.update(f"{progress}% - {current_item}" if current_item else f"{progress}%") # type: ignore
except Exception as e:
self.log_message(f"更新状态失败: {e}", is_error=True)
def log_message(self, message: str, is_error: bool = False):
"""添加日志消息并更新显示"""
timestamp = time.strftime("%H:%M:%S")
prefix = "[ERROR]" if is_error else "[INFO]"
log_line = f"{timestamp} {prefix} {message}"
self.log_messages.append(log_line)
# 保持日志行数不超过最大值
if len(self.log_messages) > self.max_log_lines:
self.log_messages = self.log_messages[-self.max_log_lines :]
# 更新日志显示
try:
log_widget = self.query_one("#log_output")
log_widget.update("\n".join(self.log_messages)) # type: ignore
except Exception:
pass # 如果组件未就绪,忽略错误
def on_button_pressed(self, event: Button.Pressed) -> None:
"""处理按钮点击事件"""
button_id = event.button.id
if button_id == "test_connection":
self.test_connection()
elif button_id == "start_sync":
self.start_sync()
elif button_id == "pause_sync":
self.pause_sync()
elif button_id == "cancel_sync":
self.cancel_sync()
event.stop()
def test_connection(self):
"""测试 WebDAV 服务器连接"""
if not self.sync_service:
self.log_message("同步服务未初始化,请检查配置", is_error=True)
self.update_status("❌ 同步服务未初始化")
return
self.log_message("正在测试 WebDAV 连接...")
self.update_status("正在测试连接...")
try:
success = self.sync_service.test_connection()
if success:
self.log_message("连接测试成功")
self.update_status("✅ 连接正常")
else:
self.log_message("连接测试失败", is_error=True)
self.update_status("❌ 连接失败")
except Exception as e:
self.log_message(f"连接测试异常: {e}", is_error=True)
self.update_status("❌ 连接异常")
def start_sync(self):
"""开始同步"""
if not self.sync_service:
self.log_message("同步服务未初始化,无法开始同步", is_error=True)
return
if self.is_syncing:
self.log_message("同步已在进行中", is_error=True)
return
self.is_syncing = True
self.is_paused = False
self.update_button_states()
self.log_message("开始同步数据...")
self.update_status("正在同步...", progress=0)
# 启动后台同步任务
self.run_worker(self.perform_sync, thread=True)
def perform_sync(self):
"""执行同步任务(在后台线程中运行)"""
worker = get_current_worker()
try:
# 获取需要同步的本地目录
from heurams.context import config_var
config = config_var.get()
paths = config.get("paths", {})
# 同步 nucleon 目录
nucleon_dir = pathlib.Path(paths.get("nucleon_dir", "./data/nucleon"))
if nucleon_dir.exists():
self.log_message(f"同步 nucleon 目录: {nucleon_dir}")
self.update_status(f"同步 nucleon 目录...", progress=10)
result = self.sync_service.sync_directory(nucleon_dir) # type: ignore
if result.get("success"):
self.log_message(
f"nucleon 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)}"
)
else:
self.log_message(
f"nucleon 同步失败: {result.get('error', '未知错误')}",
is_error=True,
)
# 同步 electron 目录
electron_dir = pathlib.Path(paths.get("electron_dir", "./data/electron"))
if electron_dir.exists():
self.log_message(f"同步 electron 目录: {electron_dir}")
self.update_status(f"同步 electron 目录...", progress=60)
result = self.sync_service.sync_directory(electron_dir) # type: ignore
if result.get("success"):
self.log_message(
f"electron 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)}"
)
else:
self.log_message(
f"electron 同步失败: {result.get('error', '未知错误')}",
is_error=True,
)
# 同步 orbital 目录(如果存在)
orbital_dir = pathlib.Path(paths.get("orbital_dir", "./data/orbital"))
if orbital_dir.exists():
self.log_message(f"同步 orbital 目录: {orbital_dir}")
self.update_status(f"同步 orbital 目录...", progress=80)
result = self.sync_service.sync_directory(orbital_dir) # type: ignore
if result.get("success"):
self.log_message(
f"orbital 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)}"
)
else:
self.log_message(
f"orbital 同步失败: {result.get('error', '未知错误')}",
is_error=True,
)
# 同步完成
self.update_status("同步完成", progress=100)
self.log_message("所有目录同步完成")
except Exception as e:
self.log_message(f"同步过程中发生错误: {e}", is_error=True)
self.update_status("同步失败")
finally:
# 重置同步状态
self.is_syncing = False
self.is_paused = False
self.update_button_states() # type: ignore
def pause_sync(self):
"""暂停同步"""
if not self.is_syncing:
return
self.is_paused = not self.is_paused
self.update_button_states()
if self.is_paused:
self.log_message("同步已暂停")
self.update_status("同步已暂停")
else:
self.log_message("同步已恢复")
self.update_status("正在同步...")
def cancel_sync(self):
"""取消同步"""
if not self.is_syncing:
return
self.is_syncing = False
self.is_paused = False
self.update_button_states()
self.log_message("同步已取消")
self.update_status("同步已取消")
def update_button_states(self):
"""更新按钮状态"""
try:
start_button = self.query_one("#start_sync")
pause_button = self.query_one("#pause_sync")
cancel_button = self.query_one("#cancel_sync")
if self.is_syncing:
start_button.disabled = True
pause_button.disabled = False
cancel_button.disabled = False
pause_button.label = "继续" if self.is_paused else "暂停" # type: ignore
else:
start_button.disabled = False
pause_button.disabled = True
cancel_button.disabled = True
except Exception as e:
self.log_message(f"更新按钮状态失败: {e}", is_error=True)
def action_go_back(self):
self.app.pop_screen()

View File

@@ -1,10 +1,11 @@
"""Kernel 操作辅助函数库"""
import random
from typing import TypedDict
import heurams.interface.widgets as pzw
import heurams.kernel.particles as pt
import heurams.kernel.puzzles as pz
import heurams.interface.widgets as pzw
from typing import TypedDict
staging = {} # 细粒度缓存区, 是 ident -> quality 的封装

View File

@@ -1,6 +1,8 @@
from typing import Iterable
from textual.app import ComposeResult
from textual.widget import Widget
import heurams.kernel.particles as pt

View File

@@ -1,13 +1,11 @@
from textual.widgets import (
Label,
Static,
Button,
)
from textual.containers import ScrollableContainer, Horizontal
from textual.widget import Widget
import heurams.kernel.particles as pt
from .base_puzzle_widget import BasePuzzleWidget
from textual.containers import Horizontal, ScrollableContainer
from textual.message import Message
from textual.widget import Widget
from textual.widgets import Button, Label, Static
import heurams.kernel.particles as pt
from .base_puzzle_widget import BasePuzzleWidget
class BasicEvaluation(BasePuzzleWidget):
@@ -51,7 +49,7 @@ class BasicEvaluation(BasePuzzleWidget):
# 显示主要内容
yield Label(self.atom.registry["nucleon"]["content"], id="main")
# 显示评估说明可选
# 显示评估说明(可选)
yield Static("请评估你对这个内容的记忆程度: ", classes="instruction")
# 按钮容器

View File

@@ -1,17 +1,17 @@
from textual.widgets import (
Label,
Button,
)
from textual.widget import Widget
import heurams.kernel.particles as pt
import heurams.kernel.puzzles as pz
from .base_puzzle_widget import BasePuzzleWidget
import copy
import random
from typing import TypedDict
from textual.containers import Container
from textual.message import Message
from textual.widget import Widget
from textual.widgets import Button, Label
import heurams.kernel.particles as pt
import heurams.kernel.puzzles as pz
from heurams.services.logger import get_logger
from typing import TypedDict
from .base_puzzle_widget import BasePuzzleWidget
logger = get_logger(__name__)

View File

@@ -1,8 +1,5 @@
from textual.widgets import (
Label,
Button,
)
from textual.widget import Widget
from textual.widgets import Button, Label
class Finished(Widget):

View File

@@ -1,17 +1,17 @@
# 单项选择题
from textual.widgets import (
Label,
Button,
)
from textual.containers import ScrollableContainer, Container
from typing import TypedDict
from textual.containers import Container, ScrollableContainer
from textual.widget import Widget
from textual.widgets import Button, Label
import heurams.kernel.particles as pt
import heurams.kernel.puzzles as pz
from .base_puzzle_widget import BasePuzzleWidget
from typing import TypedDict
from heurams.services.hasher import hash
from heurams.services.logger import get_logger
from .base_puzzle_widget import BasePuzzleWidget
logger = get_logger(__name__)
@@ -61,11 +61,16 @@ class MCQPuzzle(BasePuzzleWidget):
self.puzzle.refresh()
def compose(self):
self.atom.registry["nucleon"].do_eval()
setting: Setting = self.atom.registry["nucleon"].metadata["orbital"]["puzzles"][
self.alia
]
logger.debug(f"Puzzle Setting: {setting}")
logger.debug(f"WIRED INDEX: {len(self.inputlist)}")
if len(self.inputlist) > len(self.puzzle.options):
logger.debug("ERR IDX")
logger.debug(self.inputlist)
logger.debug(self.puzzle.options)
else:
current_options = self.puzzle.options[len(self.inputlist)]
yield Label(setting["primary"], id="sentence")
yield Label(self.puzzle.wording[len(self.inputlist)], id="puzzle")
@@ -116,7 +121,7 @@ class MCQPuzzle(BasePuzzleWidget):
self.screen.rating = rating # type: ignore
self.handler(rating)
# 重置输入如果回答错误
# 重置输入(如果回答错误)
if not is_correct:
self.inputlist = []
self.refresh_buttons()
@@ -127,7 +132,7 @@ class MCQPuzzle(BasePuzzleWidget):
self.update_display()
def refresh_buttons(self):
"""刷新按钮显示用于题目切换"""
"""刷新按钮显示(用于题目切换)"""
# 移除所有选项按钮
logger.debug("刷新按钮")
self.cursor += 1

View File

@@ -1,8 +1,5 @@
from textual.widgets import (
Label,
Button,
)
from textual.widget import Widget
from textual.widgets import Button, Label
class Placeholder(Widget):

View File

@@ -1,20 +1,17 @@
from textual.reactive import reactive
from textual.widgets import (
Markdown,
Label,
Static,
Button,
)
from textual.containers import Center
from textual.widget import Widget
from typing import Dict
import heurams.kernel.particles as pt
import re
from .base_puzzle_widget import BasePuzzleWidget
from typing import TypedDict, List
from typing import Dict, List, TypedDict
from textual.containers import Center
from textual.message import Message
from textual.reactive import reactive
from textual.widget import Widget
from textual.widgets import Button, Label, Markdown, Static
import heurams.kernel.particles as pt
from heurams.services.logger import get_logger
from .base_puzzle_widget import BasePuzzleWidget
logger = get_logger(__name__)
@@ -52,6 +49,11 @@ class Recognition(BasePuzzleWidget):
self.alia = alia
def compose(self):
from heurams.context import config_var
autovoice = config_var.get()["interface"]["memorizor"]["autovoice"]
if autovoice:
self.screen.action_play_voice() # type: ignore
cfg: RecognitionConfig = self.atom.registry["orbital"]["puzzles"][self.alia]
delim = self.atom.registry["nucleon"].metadata["formation"]["delimiter"]
replace_dict = {
@@ -71,7 +73,8 @@ class Recognition(BasePuzzleWidget):
primary = cfg["primary"]
with Center():
yield Static(f"[dim]{cfg['top_dim']}[/]")
for i in cfg["top_dim"]:
yield Static(f"[dim]{i}[/]")
yield Label("")
for old, new in replace_dict.items():

View File

@@ -1,6 +1,8 @@
from .sm2 import SM2Algorithm
from heurams.services.logger import get_logger
from .sm2 import SM2Algorithm
from .sm15m import SM15MAlgorithm
logger = get_logger(__name__)
__all__ = [
@@ -9,7 +11,8 @@ __all__ = [
algorithms = {
"SM-2": SM2Algorithm,
"supermemo2": SM2Algorithm,
"SM-15M": SM15MAlgorithm,
# "SM-15M": SM15MAlgorithm,
}
logger.debug("算法模块初始化完成, 注册的算法: %s", list(algorithms.keys()))

View File

@@ -1,5 +1,6 @@
import heurams.services.timer as timer
from typing import TypedDict
import heurams.services.timer as timer
from heurams.services.logger import get_logger
logger = get_logger(__name__)

View File

@@ -0,0 +1,286 @@
"""
SM-15 接口兼容实现, 基于 SM-15 算法的逆向工程
全局状态保存在文件中, 项目状态通过 algodata 字典传递
基于: https://github.com/slaypni/sm.js
原始 CoffeeScript 代码: (c) 2014 Kazuaki Tanida
MIT 许可证
"""
import datetime
import json
import os
import pathlib
from typing import TypedDict
from heurams.context import config_var
from heurams.kernel.algorithms.sm15m_calc import (MAX_AF, MIN_AF, NOTCH_AF,
RANGE_AF, RANGE_REPETITION,
SM, THRESHOLD_RECALL, Item)
# 全局状态文件路径
_GLOBAL_STATE_FILE = os.path.expanduser(
pathlib.Path(config_var.get()["paths"]["global_dir"]) / "sm15m_global_state.json"
)
def _get_global_sm():
"""获取全局 SM 实例, 从文件加载或创建新的"""
if os.path.exists(_GLOBAL_STATE_FILE):
try:
with open(_GLOBAL_STATE_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
sm_instance = SM.load(data)
return sm_instance
except Exception:
# 如果加载失败, 创建新的实例
pass
# 创建新的 SM 实例
sm_instance = SM()
# 保存初始状态
_save_global_sm(sm_instance)
return sm_instance
def _save_global_sm(sm_instance):
"""保存全局 SM 实例到文件"""
try:
data = sm_instance.data()
with open(_GLOBAL_STATE_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
except Exception:
# 忽略保存错误
pass
class SM15MAlgorithm:
algo_name = "SM-15M"
class AlgodataDict(TypedDict):
efactor: float
real_rept: int
rept: int
interval: int
last_date: int
next_date: int
is_activated: int
last_modify: float
defaults = {
"efactor": 2.5,
"real_rept": 0,
"rept": 0,
"interval": 0,
"last_date": 0,
"next_date": 0,
"is_activated": 0,
"last_modify": 0.0,
}
@classmethod
def _get_timestamp(cls):
"""获取当前时间戳(秒)"""
return datetime.datetime.now().timestamp()
@classmethod
def _get_daystamp(cls):
"""获取当前天数戳(从某个纪元开始的天数)"""
# 使用与原始 SM-2 相同的纪元1970-01-01
now = datetime.datetime.now()
epoch = datetime.datetime(1970, 1, 1)
delta = now - epoch
return delta.days
@classmethod
def _algodata_to_item(cls, algodata, sm_instance):
"""将 algodata 转换为 Item 实例"""
# 从 algodata 获取 SM-2 数据
sm15_data = algodata.get(cls.algo_name, cls.defaults.copy())
# 创建 Item 实例
item = Item(sm_instance)
# 映射字段
# efactor -> A-Factor (需要转换)
efactor = sm15_data.get("efactor", 2.5)
# SM-2 的 efactor 范围 [1.3, 2.5+], SM-15 的 A-Factor 范围 [1.2, 6.9]
# 简单线性映射af = (efactor - 1.3) * (MAX_AF - MIN_AF) / (2.5 - 1.3) + MIN_AF
# 但 efactor 可能大于 2.5, 所以需要限制
af = max(MIN_AF, min(MAX_AF, efactor * 2.0)) # 粗略映射
# 调试
# print(f"DEBUG: efactor={efactor}, af before set={af}")
item.af(af)
# print(f"DEBUG: item.af() after set={item.af()}")
# rept -> repetition (成功回忆次数)
rept = sm15_data.get("rept", 0)
item.repetition = (
rept - 1 if rept > 0 else -1
) # SM-15 中 repetition=-1 表示新项目
# real_rept -> lapse? 或者忽略
real_rept = sm15_data.get("real_rept", 0)
# 可以存储在 value 中或忽略
# interval -> optimum_interval (需要从天数转换为毫秒)
interval_days = sm15_data.get("interval", 0)
if interval_days == 0:
item.optimum_interval = sm_instance.interval_base
else:
item.optimum_interval = interval_days * 24 * 60 * 60 * 1000 # 天转毫秒
# last_date -> previous_date
last_date_days = sm15_data.get("last_date", 0)
if last_date_days > 0:
epoch = datetime.datetime(1970, 1, 1)
item.previous_date = epoch + datetime.timedelta(days=last_date_days)
# next_date -> due_date
next_date_days = sm15_data.get("next_date", 0)
if next_date_days > 0:
epoch = datetime.datetime(1970, 1, 1)
item.due_date = epoch + datetime.timedelta(days=next_date_days)
# is_activated 和 last_modify 忽略
# 将原始 algodata 保存在 value 中以便恢复
item.value = {
"front": "SM-15 item",
"back": "SM-15 item",
"_sm15_data": sm15_data,
}
return item
@classmethod
def _item_to_algodata(cls, item, algodata):
"""将 Item 实例状态写回 algodata"""
if cls.algo_name not in algodata:
algodata[cls.algo_name] = cls.defaults.copy()
sm15_data = algodata[cls.algo_name]
# A-Factor -> efactor (反向映射)
af = item.af()
if af is None:
af = MIN_AF
# 反向粗略映射
efactor = max(1.3, min(af / 2.0, 10.0)) # 限制范围
# 调试
# print(f"DEBUG: item.af()={af}, computed efactor={efactor}")
sm15_data["efactor"] = efactor
# repetition -> rept
rept = item.repetition + 1 if item.repetition >= 0 else 0
sm15_data["rept"] = rept
# real_rept: 递增在 revisor 中处理, 这里保持不变
# 但如果没有 real_rept 字段, 则初始化为0
if "real_rept" not in sm15_data:
sm15_data["real_rept"] = 0
# optimum_interval -> interval (毫秒转天)
interval_ms = item.optimum_interval
if interval_ms == item.sm.interval_base:
sm15_data["interval"] = 0
else:
interval_days = max(0, round(interval_ms / (24 * 60 * 60 * 1000)))
sm15_data["interval"] = interval_days
# previous_date -> last_date
if item.previous_date:
epoch = datetime.datetime(1970, 1, 1)
last_date_days = (item.previous_date - epoch).days
sm15_data["last_date"] = last_date_days
else:
sm15_data["last_date"] = 0
# due_date -> next_date
if item.due_date:
epoch = datetime.datetime(1970, 1, 1)
next_date_days = (item.due_date - epoch).days
sm15_data["next_date"] = next_date_days
else:
sm15_data["next_date"] = 0
# is_activated: 保持不变或设为1
if "is_activated" not in sm15_data:
sm15_data["is_activated"] = 1
# last_modify: 更新时间戳
sm15_data["last_modify"] = cls._get_timestamp()
return algodata
@classmethod
def revisor(
cls, algodata: dict, feedback: int = 5, is_new_activation: bool = False
):
"""SM-15 算法迭代决策机制实现"""
# 获取全局 SM 实例
sm_instance = _get_global_sm()
# 将 algodata 转换为 Item
item = cls._algodata_to_item(algodata, sm_instance)
# 处理 is_new_activation
if is_new_activation:
# 重置为初始状态
item.repetition = -1
item.lapse = 0
item.optimum_interval = sm_instance.interval_base
item.previous_date = None
item.due_date = datetime.datetime.fromtimestamp(0)
item.af(2.5) # 重置 efactor
# 将项目临时添加到 SM 实例(以便 answer 更新共享状态)
sm_instance.q.append(item)
# 处理反馈(评分)
# SM-2 的 feedback 是 0-5, SM-15 的 grade 也是 0-5
grade = feedback
now = datetime.datetime.now()
# 调用 answer 方法
item.answer(grade, now)
# 更新共享状态(FI-Graph, ForgettingCurves, OFM)
if item.repetition >= 0:
sm_instance.forgetting_curves.register_point(grade, item, now)
sm_instance.ofm.update()
sm_instance.fi_g.update(grade, item, now)
# 从队列中移除项目
sm_instance.q.remove(item)
# 保存全局状态
_save_global_sm(sm_instance)
# 将更新后的 Item 状态写回 algodata
cls._item_to_algodata(item, algodata)
# 更新 real_rept(总复习次数)
algodata[cls.algo_name]["real_rept"] += 1
@classmethod
def is_due(cls, algodata):
"""检查项目是否到期"""
sm15_data = algodata.get(cls.algo_name, cls.defaults.copy())
next_date_days = sm15_data.get("next_date", 0)
current_daystamp = cls._get_daystamp()
return next_date_days <= current_daystamp
@classmethod
def rate(cls, algodata):
"""获取项目的评分(返回 efactor 字符串)"""
sm15_data = algodata.get(cls.algo_name, cls.defaults.copy())
efactor = sm15_data.get("efactor", 2.5)
return str(efactor)
@classmethod
def nextdate(cls, algodata) -> int:
"""获取下次复习日期(天数戳)"""
sm15_data = algodata.get(cls.algo_name, cls.defaults.copy())
next_date_days = sm15_data.get("next_date", 0)
return next_date_days

File diff suppressed because it is too large Load Diff

View File

@@ -1,8 +1,10 @@
from .base import BaseAlgorithm
import heurams.services.timer as timer
from typing import TypedDict
import heurams.services.timer as timer
from heurams.services.logger import get_logger
from .base import BaseAlgorithm
logger = get_logger(__name__)

View File

@@ -9,12 +9,12 @@ from heurams.services.logger import get_logger
logger = get_logger(__name__)
logger.debug("粒子模块已加载")
from .atom import Atom, atom_registry
from .electron import Electron
from .loader import load_electron, load_nucleon
from .nucleon import Nucleon
from .orbital import Orbital
from .atom import Atom, atom_registry
from .probe import probe_all, probe_by_filename
from .loader import load_nucleon, load_electron
__all__ = [
"Electron",

View File

@@ -1,14 +1,17 @@
import json
import pathlib
import typing
from typing import TypedDict
import bidict
import toml
from heurams.context import config_var
from heurams.services.logger import get_logger
from .electron import Electron
from .nucleon import Nucleon
from .orbital import Orbital
from typing import TypedDict
import pathlib
import typing
import toml
import json
import bidict
from heurams.context import config_var
from heurams.services.logger import get_logger
logger = get_logger(__name__)
@@ -59,7 +62,6 @@ class Atom:
"orbital_fmt": "toml",
"runtime": {"locked": False, "min_rate": 0x3F3F3F3F, "newact": False},
}
self.do_eval()
logger.debug("Atom 初始化完成")
def link(self, key, value):
@@ -67,14 +69,64 @@ class Atom:
if key in self.registry.keys():
self.registry[key] = value
logger.debug("'%s' 已链接, 触发 do_eval", key)
self.do_eval()
if key == 'electron':
if self.registry['electron'].is_activated() == 0:
self.registry['runtime']['newact'] = True
if key == "electron":
if self.registry["electron"].is_activated() == 0:
self.registry["runtime"]["newact"] = True
else:
logger.error("尝试链接不受支持的键: '%s'", key)
raise ValueError("不受支持的原子元数据链接操作")
def do_eval(self):
"""
执行并以结果替换当前单元的所有 eval 语句
TODO: 带有限制的 eval, 异步/多线程执行避免堵塞
"""
logger.debug("EVAL 开始")
# eval 环境设置
def eval_with_env(s: str):
default = config_var.get()["puzzles"]
payload = self.registry["nucleon"].payload
metadata = self.registry["nucleon"].metadata
eval_value = eval(s)
if isinstance(eval_value, (int, float)):
ret = str(eval_value)
else:
ret = eval_value
logger.debug(
"eval 执行成功: '%s' -> '%s'",
s,
str(ret)[:50] + "..." if len(ret) > 50 else ret,
)
return ret
def traverse(data, modifier):
if isinstance(data, dict):
for key, value in data.items():
data[key] = traverse(value, modifier)
return data
elif isinstance(data, list):
for i, item in enumerate(data):
data[i] = traverse(item, modifier)
return data
elif isinstance(data, tuple):
return tuple(traverse(item, modifier) for item in data)
else:
if isinstance(data, str):
if data.startswith("eval:"):
logger.debug("发现 eval 表达式: '%s'", data[5:])
return modifier(data[5:])
return data
try:
traverse(self.registry["nucleon"].payload, eval_with_env)
traverse(self.registry["nucleon"].metadata, eval_with_env)
traverse(self.registry["orbital"], eval_with_env)
except Exception as e:
ret = f"此 eval 实例发生错误: {e}"
logger.warning(ret)
logger.debug("EVAL 完成")
def minimize(self, rating):
"""效果等同于 self.registry['runtime']['min_rate'] = min(rating, self.registry['runtime']['min_rate'])
@@ -104,84 +156,13 @@ class Atom:
"""
if self.registry["runtime"]["locked"]:
logger.debug(f"允许总评分: {self.registry['runtime']['min_rate']}")
self.registry["electron"].revisor(self.registry["runtime"]["min_rate"], is_new_activation=self.registry["runtime"]["newact"])
self.registry["electron"].revisor(
self.registry["runtime"]["min_rate"],
is_new_activation=self.registry["runtime"]["newact"],
)
else:
logger.debug("禁止总评分")
def do_eval(self):
"""
执行并以结果替换当前单元的所有 eval 语句
TODO: 带有限制的 eval, 异步/多线程执行避免堵塞
"""
logger.debug("Atom.do_eval 开始")
# eval 环境设置
def eval_with_env(s: str):
# 初始化默认值
nucleon = self.registry["nucleon"]
default = {}
metadata = {}
try:
default = config_var.get()["puzzles"]
metadata = nucleon.metadata
except Exception:
# 如果无法获取配置或元数据, 使用空字典
logger.debug("无法获取配置或元数据, 使用空字典")
pass
try:
eval_value = eval(s)
if isinstance(eval_value, (list, dict)):
ret = eval_value
else:
ret = str(eval_value)
logger.debug(
"eval 执行成功: '%s' -> '%s'",
s,
str(ret)[:50] + "..." if len(ret) > 50 else ret,
)
except Exception as e:
ret = f"此 eval 实例发生错误: {e}"
logger.warning("eval 执行错误: '%s' -> %s", s, e)
return ret
def traverse(data, modifier):
if isinstance(data, dict):
for key, value in data.items():
data[key] = traverse(value, modifier)
return data
elif isinstance(data, list):
for i, item in enumerate(data):
data[i] = traverse(item, modifier)
return data
elif isinstance(data, tuple):
return tuple(traverse(item, modifier) for item in data)
else:
if isinstance(data, str):
if data.startswith("eval:"):
logger.debug("发现 eval 表达式: '%s'", data[5:])
return modifier(data[5:])
return data
# 如果 nucleon 存在且有 do_eval 方法, 调用它
nucleon = self.registry["nucleon"]
if nucleon is not None and hasattr(nucleon, "do_eval"):
nucleon.do_eval()
logger.debug("已调用 nucleon.do_eval")
# 如果 electron 存在且其 algodata 包含 eval 字符串, 遍历它
electron = self.registry["electron"]
if electron is not None and hasattr(electron, "algodata"):
traverse(electron.algodata, eval_with_env)
logger.debug("已处理 electron algodata eval")
# 如果 orbital 存在且是字典, 遍历它
orbital = self.registry["orbital"]
if orbital is not None and isinstance(orbital, dict):
traverse(orbital, eval_with_env)
logger.debug("orbital eval 完成")
logger.debug("Atom.do_eval 完成")
def persist(self, key):
logger.debug("Atom.persist: key='%s'", key)
path: pathlib.Path | None = self.registry[key + "_path"]

View File

@@ -9,7 +9,7 @@ logger = get_logger(__name__)
class Electron:
"""电子: 记忆分析元数据及算法"""
def __init__(self, ident: str, algodata: dict = {}, algo_name: str = "supermemo2"):
def __init__(self, ident: str, algodata: dict = {}, algo_name: str = ""):
"""初始化电子对象 (记忆数据)
Args:
@@ -17,19 +17,26 @@ class Electron:
algodata: 算法数据字典, 包含算法的各项参数和设置
algo: 使用的算法模块标识
"""
if algo_name == "":
algo_name = config_var.get()["algorithm"]["default"]
logger.debug(
"创建 Electron 实例, ident: '%s', algo_name: '%s'", ident, algo_name
"创建 Electron 实例, ident: '%s', algo_name: '%s', algodata: %s",
ident,
algo_name,
algodata,
)
self.algodata = algodata
self.ident = ident
self.algo = algorithms[algo_name]
logger.debug("使用的算法类: %s", self.algo.__name__)
if self.algo not in self.algodata.keys():
if self.algo.algo_name not in self.algodata.keys():
self.algodata[self.algo.algo_name] = {}
logger.debug("算法键 '%s' 不存在, 已创建空字典", self.algo)
if not self.algodata[self.algo.algo_name]:
logger.debug("算法数据为空, 使用默认值初始化")
logger.debug(
f"算法数据为空, 使用默认值初始化{self.algodata[self.algo.algo_name]}"
)
self._default_init(self.algo.defaults)
else:
logger.debug("算法数据已存在, 跳过默认初始化")

View File

@@ -1,12 +1,15 @@
from .nucleon import Nucleon
from .electron import Electron
import heurams.services.hasher as hasher
import pathlib
import toml
import json
import pathlib
from copy import deepcopy
import toml
import heurams.services.hasher as hasher
from heurams.services.logger import get_logger
from .electron import Electron
from .nucleon import Nucleon
logger = get_logger(__name__)
@@ -65,7 +68,7 @@ def load_electron(path: pathlib.Path, fmt="json") -> dict:
logger.debug("JSON 解析成功, keys: %s", list(dictdata.keys()))
dic = dict()
for item, attr in dictdata.items():
logger.debug("处理电子项目: %s", item)
logger.debug("处理电子项目: %s, %s", item, attr)
dic[item] = Electron(item, attr)
logger.debug("load_electron 完成, 加载了 %d 个 Electron 对象", len(dic))
return dic

View File

@@ -49,54 +49,6 @@ class Nucleon:
def __hash__(self):
return hash(self.ident)
def do_eval(self):
"""
执行并以结果替换当前单元的所有 eval 语句
TODO: 带有限制的 eval, 异步/多线程执行避免堵塞
"""
logger.debug("Nucleon.do_eval 开始")
# eval 环境设置
def eval_with_env(s: str):
try:
nucleon = self
eval_value = eval(s)
if isinstance(eval_value, (int, float)):
ret = str(eval_value)
else:
ret = eval_value
logger.debug(
"eval 执行成功: '%s' -> '%s'",
s,
str(ret)[:50] + "..." if len(ret) > 50 else ret,
)
except Exception as e:
ret = f"此 eval 实例发生错误: {e}"
logger.warning("eval 执行错误: '%s' -> %s", s, e)
return ret
def traverse(data, modifier):
if isinstance(data, dict):
for key, value in data.items():
data[key] = traverse(value, modifier)
return data
elif isinstance(data, list):
for i, item in enumerate(data):
data[i] = traverse(item, modifier)
return data
elif isinstance(data, tuple):
return tuple(traverse(item, modifier) for item in data)
else:
if isinstance(data, str):
if data.startswith("eval:"):
logger.debug("发现 eval 表达式: '%s'", data[5:])
return modifier(data[5:])
return data
traverse(self.payload, eval_with_env)
traverse(self.metadata, eval_with_env)
logger.debug("Nucleon.do_eval 完成")
@staticmethod
def placeholder():
"""生成一个占位原子核"""

View File

@@ -1,4 +1,5 @@
from typing import TypedDict
from heurams.services.logger import get_logger
logger = get_logger(__name__)

View File

@@ -1,5 +1,6 @@
from heurams.context import config_var
import pathlib
from heurams.context import config_var
from heurams.services.logger import get_logger
logger = get_logger(__name__)

View File

@@ -8,7 +8,7 @@ class BasePuzzle:
"""谜题基类"""
def refresh(self):
logger.debug("BasePuzzle.refresh 被调用未实现")
logger.debug("BasePuzzle.refresh 被调用(未实现)")
raise NotImplementedError("谜题对象未实现 refresh 方法")
def __str__(self):

View File

@@ -1,7 +1,9 @@
from .base import BasePuzzle
import random
from heurams.services.logger import get_logger
from .base import BasePuzzle
logger = get_logger(__name__)

View File

@@ -1,9 +1,11 @@
# mcq.py
from .base import BasePuzzle
import random
from typing import List, Dict, Optional, Union
from typing import Dict, List, Optional, Union
from heurams.services.logger import get_logger
from .base import BasePuzzle
logger = get_logger(__name__)

View File

@@ -1,8 +1,10 @@
# mcq.py
from .base import BasePuzzle
import random
from heurams.services.logger import get_logger
from .base import BasePuzzle
logger = get_logger(__name__)
@@ -14,5 +16,5 @@ class RecognitionPuzzle(BasePuzzle):
super().__init__()
def refresh(self):
logger.debug("RecognitionPuzzle.refresh空实现")
logger.debug("RecognitionPuzzle.refresh(空实现)")
pass

View File

@@ -1,8 +1,9 @@
from .states import PhaserState, ProcessionState
from .procession import Procession
from heurams.services.logger import get_logger
from .fission import Fission
from .phaser import Phaser
from heurams.services.logger import get_logger
from .procession import Procession
from .states import PhaserState, ProcessionState
logger = get_logger(__name__)

View File

@@ -1,9 +1,11 @@
import random
import heurams.kernel.particles as pt
import heurams.kernel.puzzles as puz
import random
from .states import PhaserState
from heurams.services.logger import get_logger
from .states import PhaserState
class Fission:
"""裂变器: 单原子调度展开器"""

View File

@@ -1,10 +1,11 @@
# 移相器类定义
import heurams.kernel.particles as pt
from .states import PhaserState, ProcessionState
from .procession import Procession
from heurams.services.logger import get_logger
from .procession import Procession
from .states import PhaserState, ProcessionState
logger = get_logger(__name__)

View File

@@ -1,7 +1,8 @@
import heurams.kernel.particles as pt
from .states import PhaserState, ProcessionState
from heurams.services.logger import get_logger
from .states import PhaserState, ProcessionState
logger = get_logger(__name__)
@@ -51,7 +52,7 @@ class Procession:
self.queue.append(atom)
logger.debug("原子已追加到队列, 新队列长度=%d", len(self.queue))
else:
logger.debug("原子未追加重复或队列长度<=1")
logger.debug("原子未追加(重复或队列长度<=1)")
def __len__(self):
length = len(self.queue) - self.cursor

View File

@@ -1,4 +1,5 @@
from enum import Enum, auto
from heurams.services.logger import get_logger
logger = get_logger(__name__)

View File

@@ -1,8 +1,8 @@
# 音频播放器, 必须基于文件操作
from . import termux_audio
from . import playsound_audio
from heurams.services.logger import get_logger
from . import playsound_audio, termux_audio
logger = get_logger(__name__)
__all__ = [

View File

@@ -5,7 +5,9 @@
import os
import pathlib
import playsound
from heurams.services.logger import get_logger
logger = get_logger(__name__)

View File

@@ -1,5 +1,6 @@
from typing import Protocol
import pathlib
from typing import Protocol
from heurams.services.logger import get_logger
logger = get_logger(__name__)

View File

@@ -5,6 +5,7 @@
import os
import pathlib
from heurams.services.logger import get_logger
logger = get_logger(__name__)

View File

@@ -2,4 +2,4 @@ from heurams.services.logger import get_logger
logger = get_logger(__name__)
logger.debug("OpenAI provider 模块已加载未实现")
logger.debug("OpenAI provider 模块已加载(未实现)")

View File

@@ -1,6 +1,7 @@
from heurams.services.logger import get_logger
from .base import BaseTTS
from .edge_tts import EdgeTTS
from heurams.services.logger import get_logger
logger = get_logger(__name__)

View File

@@ -1,4 +1,5 @@
import pathlib
from heurams.services.logger import get_logger
logger = get_logger(__name__)

View File

@@ -1,8 +1,12 @@
from .base import BaseTTS
import pathlib
import edge_tts
from heurams.context import config_var
from heurams.services.logger import get_logger
from .base import BaseTTS
logger = get_logger(__name__)
@@ -15,7 +19,7 @@ class EdgeTTS(BaseTTS):
try:
communicate = edge_tts.Communicate(
text,
"zh-CN-YunjianNeural",
config_var.get()["providers"]["tts"]["edgetts"]["voice"],
)
logger.debug("EdgeTTS 通信对象创建成功, 正在保存音频")
communicate.save_sync(str(path))

View File

@@ -1,12 +1,13 @@
# 音频服务
from typing import Callable
from heurams.context import config_var
from heurams.providers.audio import providers as prov
from typing import Callable
from heurams.services.logger import get_logger
logger = get_logger(__name__)
play_by_path: Callable = prov[config_var.get()["services"]["audio"]].play_by_path
logger.debug(
"音频服务初始化完成, 使用 provider: %s", config_var.get()["services"]["audio"]
"音频服务初始化完成, 使用 Provider: %s", config_var.get()["services"]["audio"]
)

View File

@@ -1,7 +1,9 @@
# 配置文件服务
import pathlib
import toml
import typing
import toml
from heurams.services.logger import get_logger

View File

@@ -1,5 +1,6 @@
# 哈希服务
import hashlib
from heurams.services.logger import get_logger
logger = get_logger(__name__)

View File

@@ -45,7 +45,7 @@ def setup_logging(
# 创建formatter
formatter = logging.Formatter(log_format, date_format)
# 创建文件handler使用RotatingFileHandler防止日志过大
# 创建文件handler(使用RotatingFileHandler防止日志过大)
file_handler = logging.handlers.RotatingFileHandler(
filename=log_path,
maxBytes=max_bytes,
@@ -55,7 +55,7 @@ def setup_logging(
file_handler.setFormatter(formatter)
file_handler.setLevel(log_level)
# 配置root logger - 设置为 WARNING 级别只记录重要信息
# 配置root logger - 设置为 WARNING 级别(只记录重要信息)
root_logger = logging.getLogger()
root_logger.setLevel(logging.WARNING) # 这里改为 WARNING
@@ -63,7 +63,7 @@ def setup_logging(
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
# 创建自己的应用logger单独设置DEBUG级别
# 创建自己的应用logger(单独设置DEBUG级别)
app_logger = logging.getLogger("heurams")
app_logger.setLevel(log_level) # 保持DEBUG级别
app_logger.addHandler(file_handler)
@@ -146,7 +146,7 @@ def exception(msg: str, *args, **kwargs) -> None:
get_logger().exception(msg, *args, **kwargs)
# 初始化日志系统硬编码配置
# 初始化日志系统(硬编码配置)
setup_logging()

View File

View File

@@ -1,6 +1,7 @@
# 时间服务
from heurams.context import config_var
import time
from heurams.context import config_var
from heurams.services.logger import get_logger
logger = get_logger(__name__)

View File

@@ -1,12 +1,13 @@
# 文本转语音服务
from heurams.context import config_var
from heurams.providers.tts import TTSs
from typing import Callable
from heurams.context import config_var
from heurams.providers.tts import providers as prov
from heurams.services.logger import get_logger
logger = get_logger(__name__)
convert: Callable = TTSs[config_var.get().get("tts_provider")]
convertor: Callable = prov[config_var.get()["services"]["tts"]].convert
logger.debug(
"TTS服务初始化完成, 使用 provider: %s", config_var.get().get("tts_provider")
"TTS服务初始化完成, 使用 provider: %s", config_var.get()["services"]["tts"]
)

View File

@@ -3,7 +3,7 @@ from heurams.services.logger import get_logger
logger = get_logger(__name__)
ver = "0.4.0"
ver = "0.4.3"
stage = "prototype"
codename = "fledge" # 雏鸟, 0.4.x 版本

View File

@@ -0,0 +1,20 @@
"""vfs.py
得益于 FSSpec, 无需实现大部分虚拟文件系统的 Providers
"""
from pathlib import Path
import fsspec as fs
class VFSObject:
def __init__(self, protocol, base_url):
self.base_url = base_url
self.protocol = protocol
self.fs = fs.filesystem(protocol=protocol, base_url=base_url)
def open(self, path: Path):
return self.fs.open(path)
def open_by_list(self, path_list: list[Path]):
return self.fs.open_files(path_list)

View File

View File

@@ -2,24 +2,25 @@
"""
DashboardScreen 的测试, 包括单元测试和 pilot 测试.
"""
import unittest
import tempfile
import pathlib
import tempfile
import time
from unittest.mock import patch, MagicMock
import unittest
from unittest.mock import MagicMock, patch
from textual.pilot import Pilot
from heurams.context import ConfigContext
from heurams.services.config import ConfigFile
from heurams.interface.__main__ import HeurAMSApp
from heurams.interface.screens.dashboard import DashboardScreen
from heurams.services.config import ConfigFile
class TestDashboardScreenUnit(unittest.TestCase):
"""DashboardScreen 的单元测试不启动完整应用. """
"""DashboardScreen 的单元测试(不启动完整应用)."""
def setUp(self):
"""在每个测试之前运行, 设置临时目录和配置. """
"""在每个测试之前运行, 设置临时目录和配置."""
# 创建临时目录用于测试数据
self.temp_dir = tempfile.TemporaryDirectory()
self.temp_path = pathlib.Path(self.temp_dir.name)
@@ -53,12 +54,12 @@ class TestDashboardScreenUnit(unittest.TestCase):
self.config_ctx.__enter__()
def tearDown(self):
"""在每个测试之后清理. """
"""在每个测试之后清理."""
self.config_ctx.__exit__(None, None, None)
self.temp_dir.cleanup()
def test_compose(self):
"""测试 compose 方法返回正确的部件. """
"""测试 compose 方法返回正确的部件."""
screen = DashboardScreen()
# 手动调用 compose 并收集部件
from textual.app import ComposeResult
@@ -66,7 +67,7 @@ class TestDashboardScreenUnit(unittest.TestCase):
result = screen.compose()
widgets = list(result)
# 检查是否包含 Header 和 Footer
from textual.widgets import Header, Footer
from textual.widgets import Footer, Header
header_present = any(isinstance(w, Header) for w in widgets)
footer_present = any(isinstance(w, Footer) for w in widgets)
@@ -84,11 +85,11 @@ class TestDashboardScreenUnit(unittest.TestCase):
self.assertEqual(list_view.__class__.__name__, "ListView")
def test_item_desc_generator(self):
"""测试 item_desc_generator 函数. """
"""测试 item_desc_generator 函数."""
screen = DashboardScreen()
# 模拟一个文件名
filename = "test.toml"
result = screen.item_desc_generator(filename)
result = screen.analyser(filename)
self.assertIsInstance(result, dict)
self.assertIn(0, result)
self.assertIn(1, result)
@@ -100,10 +101,10 @@ class TestDashboardScreenUnit(unittest.TestCase):
@unittest.skip("Pilot 测试需要进一步配置, 暂不运行")
class TestDashboardScreenPilot(unittest.TestCase):
"""使用 Textual Pilot 的集成测试. """
"""使用 Textual Pilot 的集成测试."""
def setUp(self):
"""配置临时目录和配置. """
"""配置临时目录和配置."""
self.temp_dir = tempfile.TemporaryDirectory()
self.temp_path = pathlib.Path(self.temp_dir.name)
@@ -134,7 +135,7 @@ class TestDashboardScreenPilot(unittest.TestCase):
self.temp_dir.cleanup()
def test_dashboard_loads_with_pilot(self):
"""使用 Pilot 测试 DashboardScreen 加载. """
"""使用 Pilot 测试 DashboardScreen 加载."""
with patch("heurams.interface.__main__.environment_check"):
app = HeurAMSApp()
# 注意: Pilot 在 Textual 6.9.0 中的用法可能不同

View File

@@ -0,0 +1,314 @@
#!/usr/bin/env python3
"""
SyncScreen 和 SyncService 的测试.
"""
import pathlib
import tempfile
import time
import unittest
from unittest.mock import MagicMock, Mock, patch
from heurams.context import ConfigContext
from heurams.services.config import ConfigFile
from heurams.services.sync_service import (ConflictStrategy, SyncConfig,
SyncMode, SyncService)
class TestSyncServiceUnit(unittest.TestCase):
"""SyncService 的单元测试."""
def setUp(self):
"""在每个测试之前运行, 设置临时目录和模拟客户端."""
self.temp_dir = tempfile.TemporaryDirectory()
self.temp_path = pathlib.Path(self.temp_dir.name)
# 创建测试文件
self.test_file = self.temp_path / "test.txt"
self.test_file.write_text("测试内容")
# 模拟 WebDAV 客户端
self.mock_client = MagicMock()
# 创建同步配置
self.config = SyncConfig(
enabled=True,
url="https://example.com/dav/",
username="test",
password="test",
remote_path="/heurams/",
sync_mode=SyncMode.BIDIRECTIONAL,
conflict_strategy=ConflictStrategy.NEWER,
verify_ssl=True,
)
def tearDown(self):
"""在每个测试之后清理."""
self.temp_dir.cleanup()
@patch("heurams.services.sync_service.Client")
def test_sync_service_initialization(self, mock_client_class):
"""测试同步服务初始化."""
mock_client_class.return_value = self.mock_client
service = SyncService(self.config)
# 验证客户端已创建
mock_client_class.assert_called_once()
self.assertIsNotNone(service.client)
self.assertEqual(service.config, self.config)
@patch("heurams.services.sync_service.Client")
def test_sync_service_disabled(self, mock_client_class):
"""测试同步服务未启用."""
config = SyncConfig(enabled=False)
service = SyncService(config)
# 客户端不应初始化
mock_client_class.assert_not_called()
self.assertIsNone(service.client)
@patch("heurams.services.sync_service.Client")
def test_test_connection_success(self, mock_client_class):
"""测试连接测试成功."""
mock_client_class.return_value = self.mock_client
self.mock_client.list.return_value = []
service = SyncService(self.config)
result = service.test_connection()
self.assertTrue(result)
self.mock_client.list.assert_called_once()
@patch("heurams.services.sync_service.Client")
def test_test_connection_failure(self, mock_client_class):
"""测试连接测试失败."""
mock_client_class.return_value = self.mock_client
self.mock_client.list.side_effect = Exception("连接失败")
service = SyncService(self.config)
result = service.test_connection()
self.assertFalse(result)
self.mock_client.list.assert_called_once()
@patch("heurams.services.sync_service.Client")
def test_upload_file(self, mock_client_class):
"""测试上传单个文件."""
mock_client_class.return_value = self.mock_client
service = SyncService(self.config)
result = service.upload_file(self.test_file)
self.assertTrue(result)
self.mock_client.upload_file.assert_called_once()
@patch("heurams.services.sync_service.Client")
def test_download_file(self, mock_client_class):
"""测试下载单个文件."""
mock_client_class.return_value = self.mock_client
service = SyncService(self.config)
remote_path = "/heurams/test.txt"
local_path = self.temp_path / "downloaded.txt"
result = service.download_file(remote_path, local_path)
self.assertTrue(result)
self.mock_client.download_file.assert_called_once()
self.assertTrue(local_path.parent.exists())
@patch("heurams.services.sync_service.Client")
def test_sync_directory_no_files(self, mock_client_class):
"""测试同步空目录."""
mock_client_class.return_value = self.mock_client
self.mock_client.list.return_value = []
self.mock_client.mkdir.return_value = None
service = SyncService(self.config)
result = service.sync_directory(self.temp_path)
self.assertTrue(result["success"])
self.assertEqual(result["uploaded"], 0)
self.assertEqual(result["downloaded"], 0)
self.mock_client.mkdir.assert_called_once()
@patch("heurams.services.sync_service.Client")
def test_sync_directory_upload_only(self, mock_client_class):
"""测试仅上传模式."""
mock_client_class.return_value = self.mock_client
self.mock_client.list.return_value = []
self.mock_client.mkdir.return_value = None
config = SyncConfig(
enabled=True,
url="https://example.com/dav/",
username="test",
password="test",
remote_path="/heurams/",
sync_mode=SyncMode.UPLOAD_ONLY,
conflict_strategy=ConflictStrategy.NEWER,
)
service = SyncService(config)
result = service.sync_directory(self.temp_path)
self.assertTrue(result["success"])
self.mock_client.mkdir.assert_called_once()
@patch("heurams.services.sync_service.Client")
def test_conflict_strategy_newer(self, mock_client_class):
"""测试 NEWER 冲突策略."""
mock_client_class.return_value = self.mock_client
# 模拟远程文件存在
self.mock_client.list.return_value = ["test.txt"]
self.mock_client.info.return_value = {
"size": 100,
"modified": "2023-01-01T00:00:00Z",
}
self.mock_client.mkdir.return_value = None
service = SyncService(self.config)
result = service.sync_directory(self.temp_path)
self.assertTrue(result["success"])
# 应该有一个冲突
self.assertGreaterEqual(result.get("conflicts", 0), 0)
@patch("heurams.services.sync_service.Client")
def test_create_sync_service_from_config(self, mock_client_class):
"""测试从配置文件创建同步服务."""
mock_client_class.return_value = self.mock_client
# 创建临时配置文件
config_data = {
"sync": {
"webdav": {
"enabled": True,
"url": "https://example.com/dav/",
"username": "test",
"password": "test",
"remote_path": "/heurams/",
"sync_mode": "bidirectional",
"conflict_strategy": "newer",
"verify_ssl": True,
}
}
}
# 模拟 config_var
with patch("heurams.services.sync_service.config_var") as mock_config_var:
mock_config = MagicMock()
mock_config.data = config_data
mock_config_var.get.return_value = mock_config
from heurams.services.sync_service import \
create_sync_service_from_config
service = create_sync_service_from_config()
self.assertIsNotNone(service)
self.assertIsNotNone(service.client)
class TestSyncScreenUnit(unittest.TestCase):
"""SyncScreen 的单元测试."""
def setUp(self):
"""在每个测试之前运行."""
self.temp_dir = tempfile.TemporaryDirectory()
self.temp_path = pathlib.Path(self.temp_dir.name)
# 创建默认配置
default_config_path = (
pathlib.Path(__file__).parent.parent.parent
/ "src/heurams/default/config/config.toml"
)
self.config = ConfigFile(default_config_path)
# 更新配置中的路径
config_data = self.config.data
config_data["paths"]["nucleon_dir"] = str(self.temp_path / "nucleon")
config_data["paths"]["electron_dir"] = str(self.temp_path / "electron")
config_data["paths"]["orbital_dir"] = str(self.temp_path / "orbital")
config_data["paths"]["cache_dir"] = str(self.temp_path / "cache")
# 添加同步配置
if "sync" not in config_data:
config_data["sync"] = {}
config_data["sync"]["webdav"] = {
"enabled": False,
"url": "",
"username": "",
"password": "",
"remote_path": "/heurams/",
"sync_mode": "bidirectional",
"conflict_strategy": "newer",
"verify_ssl": True,
}
# 创建目录
for dir_key in ["nucleon_dir", "electron_dir", "orbital_dir", "cache_dir"]:
pathlib.Path(config_data["paths"][dir_key]).mkdir(
parents=True, exist_ok=True
)
# 使用 ConfigContext 设置配置
self.config_ctx = ConfigContext(self.config)
self.config_ctx.__enter__()
def tearDown(self):
"""在每个测试之后清理."""
self.config_ctx.__exit__(None, None, None)
self.temp_dir.cleanup()
@patch("heurams.interface.screens.synctool.create_sync_service_from_config")
def test_sync_screen_compose(self, mock_create_service):
"""测试 SyncScreen 的 compose 方法."""
from heurams.interface.screens.synctool import SyncScreen
# 模拟同步服务
mock_service = MagicMock()
mock_service.client = MagicMock()
mock_create_service.return_value = mock_service
screen = SyncScreen()
# 测试 compose 方法
from textual.app import ComposeResult
result = screen.compose()
widgets = list(result)
# 检查基本部件
from textual.containers import ScrollableContainer
from textual.widgets import Button, Footer, Header, ProgressBar, Static
header_present = any(isinstance(w, Header) for w in widgets)
footer_present = any(isinstance(w, Footer) for w in widgets)
self.assertTrue(header_present)
self.assertTrue(footer_present)
# 检查容器
container_present = any(isinstance(w, ScrollableContainer) for w in widgets)
self.assertTrue(container_present)
@patch("heurams.interface.screens.synctool.create_sync_service_from_config")
def test_sync_screen_load_config(self, mock_create_service):
"""测试 SyncScreen 加载配置."""
from heurams.interface.screens.synctool import SyncScreen
mock_service = MagicMock()
mock_service.client = MagicMock()
mock_create_service.return_value = mock_service
screen = SyncScreen()
screen.load_config()
# 验证配置已加载
self.assertIsNotNone(screen.sync_config)
mock_create_service.assert_called_once()
if __name__ == "__main__":
unittest.main()

View File

@@ -1,5 +1,5 @@
import unittest
from unittest.mock import patch, MagicMock
from unittest.mock import MagicMock, patch
from heurams.kernel.algorithms.sm2 import SM2Algorithm
@@ -94,7 +94,7 @@ class TestSM2Algorithm(unittest.TestCase):
SM2Algorithm.revisor(algodata, feedback=5, is_new_activation=True)
self.assertEqual(algodata[SM2Algorithm.algo_name]["rept"], 0)
self.assertEqual(algodata[SM2Algorithm.algo_name]["efactor"], 2.5)
# interval 应为 1因为 rept=0
# interval 应为 1(因为 rept=0)
self.assertEqual(algodata[SM2Algorithm.algo_name]["interval"], 1)
def test_revisor_efactor_calculation(self):

View File

@@ -1,15 +1,16 @@
import unittest
from unittest.mock import patch, MagicMock
import json
import pathlib
import tempfile
import toml
import json
import unittest
from unittest.mock import MagicMock, patch
import toml
from heurams.context import ConfigContext
from heurams.kernel.particles.atom import Atom, atom_registry
from heurams.kernel.particles.electron import Electron
from heurams.kernel.particles.nucleon import Nucleon
from heurams.kernel.particles.orbital import Orbital
from heurams.context import ConfigContext
from heurams.services.config import ConfigFile

View File

@@ -1,9 +1,9 @@
import unittest
from unittest.mock import patch, MagicMock
import sys
import unittest
from unittest.mock import MagicMock, patch
from heurams.kernel.particles.electron import Electron
from heurams.kernel.algorithms import algorithms
from heurams.kernel.particles.electron import Electron
class TestElectron(unittest.TestCase):
@@ -27,7 +27,7 @@ class TestElectron(unittest.TestCase):
self.assertEqual(electron.algo, algorithms["supermemo2"])
self.assertIn(electron.algo, electron.algodata)
self.assertIsInstance(electron.algodata[electron.algo], dict)
# 检查默认值排除动态字段
# 检查默认值(排除动态字段)
defaults = electron.algo.defaults
for key, value in defaults.items():
if key == "last_modify":

View File

@@ -1,5 +1,5 @@
import unittest
from unittest.mock import patch, MagicMock
from unittest.mock import MagicMock, patch
from heurams.kernel.particles.nucleon import Nucleon

View File

@@ -1,5 +1,5 @@
import unittest
from unittest.mock import patch, MagicMock
from unittest.mock import MagicMock, patch
from heurams.kernel.puzzles.cloze import ClozePuzzle

View File

@@ -1,5 +1,5 @@
import unittest
from unittest.mock import patch, MagicMock, call
from unittest.mock import MagicMock, call, patch
from heurams.kernel.puzzles.mcq import MCQPuzzle
@@ -48,7 +48,7 @@ class TestMCQPuzzle(unittest.TestCase):
# 模拟 random.sample 返回前两个映射项
mock_sample.side_effect = [
[("q1", "a1"), ("q2", "a2")], # 选择问题
["j1", "j2", "j3"], # 为每个问题选择干扰项实际调用两次
["j1", "j2", "j3"], # 为每个问题选择干扰项(实际调用两次)
]
puzzle.refresh()
@@ -59,7 +59,7 @@ class TestMCQPuzzle(unittest.TestCase):
self.assertEqual(puzzle.answer, ["a1", "a2"])
# 检查 options 列表
self.assertEqual(len(puzzle.options), 2)
# 每个选项列表应包含 4 个选项正确答案 + 3 个干扰项
# 每个选项列表应包含 4 个选项(正确答案 + 3 个干扰项)
self.assertEqual(len(puzzle.options[0]), 4)
self.assertEqual(len(puzzle.options[1]), 4)
# random.shuffle 应被调用

View File

@@ -1,10 +1,10 @@
import unittest
from unittest.mock import Mock, patch, MagicMock
from unittest.mock import MagicMock, Mock, patch
from heurams.kernel.reactor.phaser import Phaser
from heurams.kernel.reactor.states import PhaserState, ProcessionState
from heurams.kernel.particles.atom import Atom
from heurams.kernel.particles.electron import Electron
from heurams.kernel.reactor.phaser import Phaser
from heurams.kernel.reactor.states import PhaserState, ProcessionState
class TestPhaser(unittest.TestCase):