Compare commits
21 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1a738252d6 | |||
| b5f30ec4ee | |||
| 87cefedb61 | |||
| 0fb421412e | |||
| ee0646ac79 | |||
| d8fc18166d | |||
| a2e12c7462 | |||
| 1efe034a59 | |||
| d347471cc0 | |||
| 0a365b568a | |||
| e303d4dc1e | |||
| cb78290f05 | |||
| e0417981b1 | |||
| a0660d3348 | |||
| f5e0417292 | |||
| 142d30347b | |||
| e57cea7219 | |||
| 98ec6504a4 | |||
| 243eea864b | |||
| cfb1385f4d | |||
| a1462206a2 |
7
.gitignore
vendored
7
.gitignore
vendored
@@ -11,15 +11,16 @@ electron/test.toml
|
||||
build/
|
||||
dist/
|
||||
old/
|
||||
|
||||
# Project specific directories
|
||||
# config/
|
||||
data/cache/
|
||||
data/electron/
|
||||
data/nucleon/
|
||||
!data/nucleon/test*
|
||||
data/global/
|
||||
!data/nucleon/TEST*
|
||||
data/orbital/
|
||||
config/config_dev.toml
|
||||
AGENTS.md
|
||||
*.log.1
|
||||
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
|
||||
@@ -10,13 +10,15 @@
|
||||
- `dev` 分支: 开发版本
|
||||
- 功能分支: 从 `dev` 分支创建, 命名格式为 `feature/描述` 或 `fix/描述` 或 `refactor/描述`
|
||||
2. **代码风格**:
|
||||
- 请使用 Black 格式化代码
|
||||
- 请使用 Black 与 isort 格式化代码
|
||||
- 遵循 PEP 8 规范
|
||||
- 添加适当的文档字符串
|
||||
3. **提交消息**:
|
||||
- 使用简体中文或英文撰写清晰的提交消息
|
||||
- 格式: 遵循 Conventional Commits 规范
|
||||
|
||||
4. **合并方式**:
|
||||
- 不使用 Fast-forward 合并
|
||||
- 可以设置 `git config merge.ff false`
|
||||
## 设置开发环境
|
||||
|
||||
```bash
|
||||
|
||||
23
README.md
23
README.md
@@ -30,6 +30,7 @@
|
||||
- 自然语音: 集成微软神经网络文本转语音 (TTS) 技术
|
||||
- 多种谜题类型: 选择题 (MCQ)、填空题 (Cloze)、识别题 (Recognition)
|
||||
- 动态内容生成: 支持宏驱动的模板系统, 根据上下文动态生成题目
|
||||
- 云同步支持: 通过 WebDAV 协议同步数据到远程服务器
|
||||
|
||||
### 实用用户界面
|
||||
- 响应式 Textual 框架构建的跨平台 TUI 界面
|
||||
@@ -82,7 +83,23 @@ python -m heurams.interface
|
||||
|
||||
## 配置
|
||||
|
||||
配置文件位于 `config/config.toml`(相对于工作目录). 如果不存在, 会使用内置的默认配置.
|
||||
配置文件位于 `config/config.toml`(相对于工作目录). 如果不存在, 会使用内置的默认配置.
|
||||
|
||||
### 同步配置
|
||||
同步功能支持 WebDAV 协议,可在配置文件的 `[sync.webdav]` 段进行配置:
|
||||
```toml
|
||||
[sync.webdav]
|
||||
enabled = false
|
||||
url = "" # WebDAV 服务器地址
|
||||
username = "" # 用户名
|
||||
password = "" # 密码
|
||||
remote_path = "/heurams/" # 远程路径
|
||||
sync_mode = "bidirectional" # 同步模式: bidirectional/upload_only/download_only
|
||||
conflict_strategy = "newer" # 冲突策略: newer/ask/keep_both
|
||||
verify_ssl = true # SSL 证书验证
|
||||
```
|
||||
|
||||
启用同步后,可通过应用内的同步工具进行数据备份和恢复。
|
||||
|
||||
## 项目结构
|
||||
|
||||
@@ -104,6 +121,7 @@ graph TB
|
||||
Timer[时间服务]
|
||||
AudioService[音频服务]
|
||||
TTSService[TTS服务]
|
||||
SyncService[同步服务]
|
||||
OtherServices[其他服务]
|
||||
end
|
||||
|
||||
@@ -156,7 +174,8 @@ src/heurams/
|
||||
│ ├── logger.py # 日志系统
|
||||
│ ├── timer.py # 时间服务
|
||||
│ ├── audio_service.py # 音频播放抽象
|
||||
│ └── tts_service.py # 文本转语音抽象
|
||||
│ ├── tts_service.py # 文本转语音抽象
|
||||
│ └── sync_service.py # WebDAV 同步服务
|
||||
├── kernel/ # 核心业务逻辑
|
||||
│ ├── algorithms/ # 间隔重复算法 (FSRS, SM2)
|
||||
│ ├── particles/ # 数据模型 (Atom, Electron, Nucleon, Orbital)
|
||||
|
||||
@@ -14,6 +14,14 @@ scheduled_num = 8
|
||||
# UTC 时间戳修正 仅用于 UNIX 日时间戳的生成修正, 单位为秒
|
||||
timezone_offset = +28800 # 中国标准时间 (UTC+8)
|
||||
|
||||
[interface]
|
||||
|
||||
[interface.memorizor]
|
||||
autovoice = true # 自动语音播放, 仅限于 recognition 组件
|
||||
|
||||
[algorithm]
|
||||
default = "SM-2" # 主要算法; 可选项: SM-2, SM-15M, FSRS
|
||||
|
||||
[puzzles] # 谜题默认配置
|
||||
|
||||
[puzzles.mcq]
|
||||
@@ -25,6 +33,7 @@ min_denominator = 3
|
||||
[paths] # 相对于配置文件的 ".." (即工作目录) 而言 或绝对路径
|
||||
nucleon_dir = "./data/nucleon"
|
||||
electron_dir = "./data/electron"
|
||||
global_dir = "./data/global" # 全局数据路径, SM-15 等算法需要
|
||||
orbital_dir = "./data/orbital"
|
||||
cache_dir = "./data/cache"
|
||||
template_dir = "./data/template"
|
||||
@@ -33,7 +42,20 @@ template_dir = "./data/template"
|
||||
audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Android Termux), mpg123(TODO)
|
||||
tts = "edgetts" # 可选项: edgetts
|
||||
llm = "openai" # 可选项: openai
|
||||
sync = "webdav" # 可选项: 留空, webdav
|
||||
|
||||
[providers.tts.edgetts] # EdgeTTS 设置
|
||||
voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声)
|
||||
|
||||
[providers.llm.openai] # 与 OpenAI 相容的语言模型接口服务设置
|
||||
url = ""
|
||||
key = ""
|
||||
|
||||
[providers.sync.webdav] # WebDAV 同步设置
|
||||
url = ""
|
||||
username = ""
|
||||
password = ""
|
||||
remote_path = "/heurams/"
|
||||
verify_ssl = true
|
||||
|
||||
[sync]
|
||||
|
||||
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "heurams"
|
||||
version = "0.4.0"
|
||||
version = "0.4.3"
|
||||
description = "Heuristic Assisted Memory Scheduler"
|
||||
license = {file = "LICENSE"}
|
||||
classifiers = [
|
||||
@@ -25,3 +25,4 @@ readme = "README.md"
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
where = ["src"]
|
||||
|
||||
|
||||
@@ -2,3 +2,5 @@ bidict==0.23.1
|
||||
playsound==1.2.2
|
||||
textual==5.3.0
|
||||
toml==0.10.2
|
||||
requests>=2.31.0
|
||||
webdavclient3>=3.0.0
|
||||
|
||||
@@ -3,8 +3,9 @@
|
||||
以及基准路径
|
||||
"""
|
||||
|
||||
from contextvars import ContextVar
|
||||
import pathlib
|
||||
from contextvars import ContextVar
|
||||
|
||||
from heurams.services.config import ConfigFile
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
@@ -31,7 +32,12 @@ try:
|
||||
except Exception as e:
|
||||
print("未能加载自定义用户配置")
|
||||
logger.warning("未能加载自定义用户配置, 错误: %s", e)
|
||||
|
||||
if pathlib.Path(workdir / "config" / "config_dev.toml").exists():
|
||||
print("使用开发设置")
|
||||
logger.debug("使用开发设置")
|
||||
config_var: ContextVar[ConfigFile] = ContextVar(
|
||||
"config_var", default=ConfigFile(workdir / "config" / "config_dev.toml")
|
||||
)
|
||||
# runtime_var: ContextVar = ContextVar('runtime_var', default=dict()) # 运行时共享数据
|
||||
|
||||
|
||||
|
||||
@@ -6,14 +6,22 @@ daystamp_override = -1
|
||||
timestamp_override = -1
|
||||
|
||||
# [调试] 一键通过
|
||||
quick_pass = 0
|
||||
quick_pass = 1
|
||||
|
||||
# 对于每个项目的默认新记忆原子数量
|
||||
tasked_number = 8
|
||||
scheduled_num = 8
|
||||
|
||||
# UTC 时间戳修正 仅用于 UNIX 日时间戳的生成修正, 单位为秒
|
||||
timezone_offset = +28800 # 中国标准时间 (UTC+8)
|
||||
|
||||
[interface]
|
||||
|
||||
[interface.memorizor]
|
||||
autovoice = true # 自动语音播放, 仅限于 recognition 组件
|
||||
|
||||
[algorithm]
|
||||
default = "SM-2" # 主要算法; 可选项: SM-2, SM-15M, FSRS
|
||||
|
||||
[puzzles] # 谜题默认配置
|
||||
|
||||
[puzzles.mcq]
|
||||
@@ -22,8 +30,32 @@ max_riddles_num = 2
|
||||
[puzzles.cloze]
|
||||
min_denominator = 3
|
||||
|
||||
[paths] # 相对于工作目录而言 或绝对路径
|
||||
[paths] # 相对于配置文件的 ".." (即工作目录) 而言 或绝对路径
|
||||
nucleon_dir = "./data/nucleon"
|
||||
electron_dir = "./data/electron"
|
||||
global_dir = "./data/global" # 全局数据路径, SM-15 等算法需要
|
||||
orbital_dir = "./data/orbital"
|
||||
cache_dir = "./data/cache"
|
||||
template_dir = "./data/template"
|
||||
|
||||
[services] # 定义服务到提供者的映射
|
||||
audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Android Termux), mpg123(TODO)
|
||||
tts = "edgetts" # 可选项: edgetts
|
||||
llm = "openai" # 可选项: openai
|
||||
sync = "webdav" # 可选项: 留空, webdav
|
||||
|
||||
[providers.tts.edgetts] # EdgeTTS 设置
|
||||
voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声)
|
||||
|
||||
[providers.llm.openai] # 与 OpenAI 相容的语言模型接口服务设置
|
||||
url = ""
|
||||
key = ""
|
||||
|
||||
[providers.sync.webdav] # WebDAV 同步设置
|
||||
url = ""
|
||||
username = ""
|
||||
password = ""
|
||||
remote_path = "/heurams/"
|
||||
verify_ssl = true
|
||||
|
||||
[sync]
|
||||
|
||||
63
src/heurams/interface/__init__.py
Normal file
63
src/heurams/interface/__init__.py
Normal file
@@ -0,0 +1,63 @@
|
||||
from textual.app import App
|
||||
from textual.widgets import Button
|
||||
|
||||
from heurams.context import config_var
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
from .screens.about import AboutScreen
|
||||
from .screens.dashboard import DashboardScreen
|
||||
from .screens.nucreator import NucleonCreatorScreen
|
||||
from .screens.precache import PrecachingScreen
|
||||
from .screens.synctool import SyncScreen
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
def environment_check():
|
||||
from pathlib import Path
|
||||
|
||||
logger.debug("检查环境路径")
|
||||
|
||||
for i in config_var.get()["paths"].values():
|
||||
i = Path(i)
|
||||
if not i.exists():
|
||||
logger.info("创建目录: %s", i)
|
||||
print(f"创建 {i}")
|
||||
i.mkdir(exist_ok=True, parents=True)
|
||||
else:
|
||||
logger.debug("目录已存在: %s", i)
|
||||
print(f"找到 {i}")
|
||||
logger.debug("环境检查完成")
|
||||
|
||||
|
||||
class HeurAMSApp(App):
|
||||
TITLE = "潜进"
|
||||
CSS_PATH = "css/main.tcss"
|
||||
SUB_TITLE = "启发式辅助记忆调度器"
|
||||
BINDINGS = [
|
||||
("q", "quit", "退出"),
|
||||
("d", "toggle_dark", "切换色调"),
|
||||
("1", "app.push_screen('dashboard')", "仪表盘"),
|
||||
("2", "app.push_screen('precache_all')", "缓存管理器"),
|
||||
("3", "app.push_screen('nucleon_creator')", "创建新单元"),
|
||||
# ("4", "app.push_screen('synctool')", "同步工具"),
|
||||
("0", "app.push_screen('about')", "版本信息"),
|
||||
]
|
||||
SCREENS = {
|
||||
"dashboard": DashboardScreen,
|
||||
"nucleon_creator": NucleonCreatorScreen,
|
||||
"precache_all": PrecachingScreen,
|
||||
"synctool": SyncScreen,
|
||||
"about": AboutScreen,
|
||||
}
|
||||
|
||||
def on_mount(self) -> None:
|
||||
environment_check()
|
||||
self.push_screen("dashboard")
|
||||
|
||||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||||
self.exit(event.button.id)
|
||||
|
||||
def action_do_nothing(self):
|
||||
print("DO NOTHING")
|
||||
self.refresh()
|
||||
@@ -1,87 +1,18 @@
|
||||
from textual.app import App
|
||||
from textual.widgets import Button
|
||||
|
||||
from heurams.context import config_var
|
||||
from heurams.interface import HeurAMSApp
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
from .screens.about import AboutScreen
|
||||
from .screens.dashboard import DashboardScreen
|
||||
from .screens.nucreator import NucleonCreatorScreen
|
||||
from .screens.precache import PrecachingScreen
|
||||
from .screens.about import AboutScreen
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class HeurAMSApp(App):
|
||||
TITLE = "潜进"
|
||||
CSS_PATH = "css/main.tcss"
|
||||
SUB_TITLE = "启发式辅助记忆调度器"
|
||||
BINDINGS = [
|
||||
("q", "quit", "退出"),
|
||||
("d", "toggle_dark", "切换色调"),
|
||||
("1", "app.push_screen('dashboard')", "仪表盘"),
|
||||
("2", "app.push_screen('precache_all')", "缓存管理器"),
|
||||
("3", "app.push_screen('nucleon_creator')", "创建新单元"),
|
||||
("0", "app.push_screen('about')", "版本信息"),
|
||||
]
|
||||
SCREENS = {
|
||||
"dashboard": DashboardScreen,
|
||||
"nucleon_creator": NucleonCreatorScreen,
|
||||
"precache_all": PrecachingScreen,
|
||||
"about": AboutScreen,
|
||||
}
|
||||
|
||||
def on_mount(self) -> None:
|
||||
self.push_screen("dashboard")
|
||||
|
||||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||||
self.exit(event.button.id)
|
||||
|
||||
def action_do_nothing(self):
|
||||
print("DO NOTHING")
|
||||
self.refresh()
|
||||
|
||||
|
||||
def environment_check():
|
||||
from pathlib import Path
|
||||
|
||||
logger.debug("检查环境路径")
|
||||
|
||||
for i in config_var.get()["paths"].values():
|
||||
i = Path(i)
|
||||
if not i.exists():
|
||||
logger.info("创建目录: %s", i)
|
||||
print(f"创建 {i}")
|
||||
i.mkdir(exist_ok=True, parents=True)
|
||||
else:
|
||||
logger.debug("目录已存在: %s", i)
|
||||
print(f"找到 {i}")
|
||||
logger.debug("环境检查完成")
|
||||
|
||||
|
||||
def is_subdir(parent, child):
|
||||
try:
|
||||
child.relative_to(parent)
|
||||
logger.debug("is_subdir: %s 是 %s 的子目录", child, parent)
|
||||
return 1
|
||||
except:
|
||||
logger.debug("is_subdir: %s 不是 %s 的子目录", child, parent)
|
||||
return 0
|
||||
|
||||
|
||||
# 开发模式
|
||||
from heurams.context import rootdir, workdir, config_var
|
||||
from pathlib import Path
|
||||
from heurams.context import rootdir
|
||||
import os
|
||||
|
||||
if is_subdir(Path(rootdir), Path(os.getcwd())):
|
||||
os.chdir(Path(rootdir) / ".." / "..")
|
||||
print(f'转入开发数据目录: {Path(rootdir)/".."/".."}')
|
||||
|
||||
environment_check()
|
||||
|
||||
app = HeurAMSApp()
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run()
|
||||
|
||||
|
||||
def main():
|
||||
app.run()
|
||||
|
||||
@@ -1,15 +1,8 @@
|
||||
#!/usr/bin/env python3
|
||||
from textual.app import ComposeResult
|
||||
from textual.widgets import (
|
||||
Header,
|
||||
Footer,
|
||||
Label,
|
||||
Static,
|
||||
Button,
|
||||
Markdown,
|
||||
)
|
||||
from textual.containers import ScrollableContainer, ScrollableContainer
|
||||
from textual.containers import ScrollableContainer
|
||||
from textual.screen import Screen
|
||||
from textual.widgets import Button, Footer, Header, Label, Markdown, Static
|
||||
|
||||
import heurams.services.version as version
|
||||
from heurams.context import *
|
||||
@@ -38,7 +31,8 @@ class AboutScreen(Screen):
|
||||
|
||||
特别感谢:
|
||||
|
||||
- [Piotr A. Woźniak](https://supermemo.guru/wiki/Piotr_Wozniak): SuperMemo-2 算法
|
||||
- [Piotr A. Woźniak](https://supermemo.guru/wiki/Piotr_Wozniak): SM-2 算法与 SM-15 算法理论
|
||||
- [Kazuaki Tanida](https://github.com/slaypni): SM-15 算法的 CoffeeScript 实现
|
||||
- [Thoughts Memo](https://www.zhihu.com/people/L.M.Sherlock): 文献参考
|
||||
|
||||
# 参与贡献
|
||||
|
||||
@@ -1,153 +1,207 @@
|
||||
#!/usr/bin/env python3
|
||||
import pathlib
|
||||
|
||||
from textual.app import ComposeResult
|
||||
from textual.widgets import (
|
||||
Header,
|
||||
Footer,
|
||||
Label,
|
||||
ListView,
|
||||
ListItem,
|
||||
Button,
|
||||
Static,
|
||||
)
|
||||
from textual.containers import ScrollableContainer
|
||||
from textual.screen import Screen
|
||||
from textual.widgets import (Button, Footer, Header, Label, ListItem, ListView,
|
||||
Static)
|
||||
|
||||
from heurams.kernel.particles import *
|
||||
from heurams.context import *
|
||||
import heurams.services.version as version
|
||||
import heurams.services.timer as timer
|
||||
from .preparation import PreparationScreen
|
||||
from .about import AboutScreen
|
||||
import heurams.services.version as version
|
||||
from heurams.context import *
|
||||
from heurams.kernel.particles import *
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
import pathlib
|
||||
from .about import AboutScreen
|
||||
from .preparation import PreparationScreen
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class DashboardScreen(Screen):
|
||||
"""主仪表盘屏幕"""
|
||||
|
||||
SUB_TITLE = "仪表盘"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str | None = None,
|
||||
id: str | None = None,
|
||||
classes: str | None = None,
|
||||
) -> None:
|
||||
super().__init__(name, id, classes)
|
||||
self.nextdates = {}
|
||||
self.texts = {}
|
||||
self.stay_enabled = {}
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
"""组合界面组件"""
|
||||
yield Header(show_clock=True)
|
||||
yield ScrollableContainer(
|
||||
Label(f'欢迎使用 "潜进" 启发式辅助记忆调度器', classes="title-label"),
|
||||
Label('欢迎使用 "潜进" 启发式辅助记忆调度器', classes="title-label"),
|
||||
Label(f"当前 UNIX 日时间戳: {timer.get_daystamp()}"),
|
||||
Label(f'时区修正: UTC+{config_var.get()["timezone_offset"] / 3600}'),
|
||||
Label(f"使用算法: {config_var.get()['algorithm']['default']}"),
|
||||
Label("选择待学习或待修改的记忆单元集:", classes="title-label"),
|
||||
ListView(id="union-list", classes="union-list-view"),
|
||||
Label(
|
||||
f'"潜进" 启发式辅助记忆调度器 | 版本 {version.ver} {version.codename.capitalize()} 2025'
|
||||
f'"潜进" 启发式辅助记忆调度器 | 版本 {version.ver} '
|
||||
f"{version.codename.capitalize()} 2025"
|
||||
),
|
||||
)
|
||||
yield Footer()
|
||||
|
||||
def item_desc_generator(self, filename) -> dict:
|
||||
"""简单分析以生成项目项显示文本
|
||||
def analyser(self, filename: str) -> dict:
|
||||
"""分析文件状态以生成显示文本
|
||||
|
||||
Args:
|
||||
filename: 要分析的文件名
|
||||
|
||||
Returns:
|
||||
dict: 以数字为列表, 分别呈现单行字符串
|
||||
dict: 包含显示文本的字典,键为行号
|
||||
"""
|
||||
res = dict()
|
||||
filestem = pathlib.Path(filename).stem
|
||||
res[0] = f"{filename}\0"
|
||||
from heurams.kernel.particles.loader import load_electron
|
||||
import heurams.kernel.particles as pt
|
||||
from heurams.kernel.particles.loader import load_electron, load_nucleon
|
||||
|
||||
electron_file_path = pathlib.Path(config_var.get()["paths"]["electron_dir"]) / (
|
||||
filestem + ".json"
|
||||
)
|
||||
result = {}
|
||||
filestem = pathlib.Path(filename).stem
|
||||
|
||||
# 构建电子文件路径
|
||||
electron_dir = config_var.get()["paths"]["electron_dir"]
|
||||
electron_file_path = pathlib.Path(electron_dir) / f"{filestem}.json"
|
||||
|
||||
logger.debug(f"电子文件路径: {electron_file_path}")
|
||||
|
||||
if electron_file_path.exists(): # 未找到则创建电子文件 (json)
|
||||
pass
|
||||
else:
|
||||
# 确保电子文件存在
|
||||
if not electron_file_path.exists():
|
||||
electron_file_path.touch()
|
||||
with open(electron_file_path, "w") as f:
|
||||
f.write("{}")
|
||||
electron_dict = load_electron(path=electron_file_path) # TODO: 取消硬编码扩展名
|
||||
logger.debug(electron_dict)
|
||||
electron_file_path.write_text("{}")
|
||||
|
||||
# 加载电子数据
|
||||
electron_dict = load_electron(path=electron_file_path)
|
||||
logger.debug(f"电子数据: {electron_dict}")
|
||||
|
||||
# 分析电子状态
|
||||
is_due = 0
|
||||
is_activated = 0
|
||||
nextdate = 0x3F3F3F3F
|
||||
for i in electron_dict.values():
|
||||
i: pt.Electron
|
||||
logger.debug(i, i.is_due())
|
||||
if i.is_due():
|
||||
|
||||
for electron in electron_dict.values():
|
||||
logger.debug(f"{electron}, 是否到期: {electron.is_due()}")
|
||||
|
||||
if electron.is_due():
|
||||
is_due = 1
|
||||
if i.is_activated():
|
||||
if electron.is_activated():
|
||||
is_activated = 1
|
||||
nextdate = min(nextdate, i.nextdate())
|
||||
res[1] = f"下一次复习: {nextdate}\n"
|
||||
res[1] += f"{is_due if "需要复习" else "当前无需复习"}"
|
||||
nextdate = min(nextdate, electron.nextdate())
|
||||
|
||||
# 检查是否需要更多复习
|
||||
nucleon_dir = config_var.get()["paths"]["nucleon_dir"]
|
||||
nucleon_path = pathlib.Path(nucleon_dir) / f"{filestem}.toml"
|
||||
nucleon_count = len(load_nucleon(nucleon_path))
|
||||
electron_count = len(electron_dict)
|
||||
is_more = not (electron_count >= nucleon_count)
|
||||
|
||||
logger.debug(f"是否需要更多复习: {is_more}")
|
||||
|
||||
# 更新状态
|
||||
self.nextdates[filename] = nextdate
|
||||
self.stay_enabled[filename] = is_due or is_more
|
||||
|
||||
# 构建返回结果
|
||||
result[0] = f"{filename}\0"
|
||||
|
||||
if not is_activated:
|
||||
res[1] = " 尚未激活"
|
||||
return res
|
||||
result[1] = " 尚未激活"
|
||||
else:
|
||||
status_text = "需要复习" if is_due else "当前无需复习"
|
||||
result[1] = f"下一次复习: {nextdate}\n{status_text}"
|
||||
|
||||
return result
|
||||
|
||||
def on_mount(self) -> None:
|
||||
"""挂载组件时初始化"""
|
||||
union_list_widget = self.query_one("#union-list", ListView)
|
||||
|
||||
probe = probe_all(0)
|
||||
|
||||
if len(probe["nucleon"]):
|
||||
# 分析所有文件
|
||||
for file in probe["nucleon"]:
|
||||
text = self.item_desc_generator(file)
|
||||
union_list_widget.append(
|
||||
ListItem(
|
||||
Label(text[0] + "\n" + text[1]),
|
||||
self.texts[file] = self.analyser(file)
|
||||
|
||||
# 按下次复习时间排序
|
||||
nucleon_files = sorted(
|
||||
probe["nucleon"],
|
||||
key=lambda f: self.nextdates[f],
|
||||
reverse=True,
|
||||
)
|
||||
)
|
||||
else:
|
||||
|
||||
# 填充列表
|
||||
if not probe["nucleon"]:
|
||||
union_list_widget.append(
|
||||
ListItem(
|
||||
Static(
|
||||
"在 ./nucleon/ 中未找到任何内容源数据文件.\n请放置文件后重启应用.\n或者新建空的单元集."
|
||||
"在 ./nucleon/ 中未找到任何内容源数据文件。\n"
|
||||
"请放置文件后重启应用,或者新建空的单元集。"
|
||||
)
|
||||
)
|
||||
)
|
||||
union_list_widget.disabled = True
|
||||
return
|
||||
|
||||
for file in nucleon_files:
|
||||
text = self.texts[file]
|
||||
list_item = ListItem(Label(f"{text[0]}\n{text[1]}"))
|
||||
union_list_widget.append(list_item)
|
||||
|
||||
if not self.stay_enabled[file]:
|
||||
list_item.disabled = True
|
||||
|
||||
def on_list_view_selected(self, event) -> None:
|
||||
"""处理列表项选择事件"""
|
||||
if not isinstance(event.item, ListItem):
|
||||
return
|
||||
|
||||
selected_label = event.item.query_one(Label)
|
||||
if "未找到任何 .toml 文件" in str(selected_label.renderable): # type: ignore
|
||||
label_text = str(selected_label.renderable)
|
||||
|
||||
if "未找到任何 .toml 文件" in label_text:
|
||||
return
|
||||
|
||||
selected_filename = pathlib.Path(
|
||||
str(selected_label.renderable)
|
||||
.partition("\0")[0] # 文件名末尾截断, 保留文件名
|
||||
.replace("*", "")
|
||||
) # 去除markdown加粗
|
||||
# 提取文件名
|
||||
selected_filename = pathlib.Path(label_text.partition("\0")[0].replace("*", ""))
|
||||
|
||||
nucleon_file_path = (
|
||||
pathlib.Path(config_var.get()["paths"]["nucleon_dir"]) / selected_filename
|
||||
)
|
||||
electron_file_path = pathlib.Path(config_var.get()["paths"]["electron_dir"]) / (
|
||||
str(selected_filename.stem) + ".json"
|
||||
# 构建文件路径
|
||||
nucleon_dir = config_var.get()["paths"]["nucleon_dir"]
|
||||
electron_dir = config_var.get()["paths"]["electron_dir"]
|
||||
|
||||
nucleon_file_path = pathlib.Path(nucleon_dir) / selected_filename
|
||||
electron_file_path = (
|
||||
pathlib.Path(electron_dir) / f"{selected_filename.stem}.json"
|
||||
)
|
||||
|
||||
# 跳转到准备屏幕
|
||||
self.app.push_screen(PreparationScreen(nucleon_file_path, electron_file_path))
|
||||
|
||||
def on_button_pressed(self, event) -> None:
|
||||
if event.button.id == "new_nucleon_button":
|
||||
# 切换到创建单元
|
||||
"""处理按钮点击事件"""
|
||||
button_id = event.button.id
|
||||
|
||||
if button_id == "new_nucleon_button":
|
||||
from .nucreator import NucleonCreatorScreen
|
||||
|
||||
newscr = NucleonCreatorScreen()
|
||||
self.app.push_screen(newscr)
|
||||
elif event.button.id == "precache_all_button":
|
||||
# 切换到缓存管理器
|
||||
new_screen = NucleonCreatorScreen()
|
||||
self.app.push_screen(new_screen)
|
||||
|
||||
elif button_id == "precache_all_button":
|
||||
from .precache import PrecachingScreen
|
||||
|
||||
precache_screen = PrecachingScreen()
|
||||
self.app.push_screen(precache_screen)
|
||||
elif event.button.id == "about_button":
|
||||
from .about import AboutScreen
|
||||
|
||||
elif button_id == "about_button":
|
||||
about_screen = AboutScreen()
|
||||
self.app.push_screen(about_screen)
|
||||
|
||||
def action_quit_app(self) -> None:
|
||||
"""退出应用程序"""
|
||||
self.app.exit()
|
||||
|
||||
@@ -1,16 +1,18 @@
|
||||
#!/usr/bin/env python3
|
||||
from textual.app import ComposeResult
|
||||
from textual.widgets import Header, Footer, Label, Static, Button
|
||||
from textual.containers import Center, ScrollableContainer
|
||||
from textual.screen import Screen
|
||||
from textual.reactive import reactive
|
||||
from enum import Enum, auto
|
||||
|
||||
from heurams.services.logger import get_logger
|
||||
from heurams.context import config_var
|
||||
from heurams.kernel.reactor import *
|
||||
from textual.app import ComposeResult
|
||||
from textual.containers import Center, ScrollableContainer
|
||||
from textual.reactive import reactive
|
||||
from textual.screen import Screen
|
||||
from textual.widgets import Button, Footer, Header, Label, Static
|
||||
|
||||
import heurams.kernel.particles as pt
|
||||
import heurams.kernel.puzzles as pz
|
||||
from heurams.context import config_var
|
||||
from heurams.kernel.reactor import *
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
from .. import shim
|
||||
|
||||
|
||||
@@ -44,14 +46,14 @@ class MemScreen(Screen):
|
||||
) -> None:
|
||||
super().__init__(name, id, classes)
|
||||
self.atoms = atoms
|
||||
for i in self.atoms:
|
||||
i.do_eval()
|
||||
self.phaser = Phaser(atoms)
|
||||
# logger.debug(self.phaser.state)
|
||||
self.procession: Procession = self.phaser.current_procession() # type: ignore
|
||||
self.atom: pt.Atom = self.procession.current_atom
|
||||
# logger.debug(self.phaser.state)
|
||||
# self.procession.forward(1)
|
||||
for i in atoms:
|
||||
i.do_eval()
|
||||
|
||||
def on_mount(self):
|
||||
self.load_puzzle()
|
||||
@@ -142,8 +144,29 @@ class MemScreen(Screen):
|
||||
self.atom.lock(1)
|
||||
|
||||
def action_play_voice(self):
|
||||
self.run_worker(self.play_voice, exclusive=True, thread=True)
|
||||
|
||||
def play_voice(self):
|
||||
"""朗读当前内容"""
|
||||
pass
|
||||
from pathlib import Path
|
||||
|
||||
from heurams.services.audio_service import play_by_path
|
||||
from heurams.services.hasher import get_md5
|
||||
|
||||
path = Path(config_var.get()["paths"]["cache_dir"])
|
||||
path = (
|
||||
path
|
||||
/ f"{get_md5(self.atom.registry['nucleon'].metadata["formation"]["tts_text"])}.wav"
|
||||
)
|
||||
if path.exists():
|
||||
play_by_path(path)
|
||||
else:
|
||||
from heurams.services.tts_service import convertor
|
||||
|
||||
convertor(
|
||||
self.atom.registry["nucleon"].metadata["formation"]["tts_text"], path
|
||||
)
|
||||
play_by_path(path)
|
||||
|
||||
def action_toggle_dark(self):
|
||||
self.app.action_toggle_dark()
|
||||
|
||||
@@ -1,21 +1,15 @@
|
||||
#!/usr/bin/env python3
|
||||
from pathlib import Path
|
||||
|
||||
import toml
|
||||
from textual.app import ComposeResult
|
||||
from textual.widgets import (
|
||||
Header,
|
||||
Footer,
|
||||
Label,
|
||||
Input,
|
||||
Select,
|
||||
Button,
|
||||
Markdown,
|
||||
)
|
||||
from textual.containers import ScrollableContainer
|
||||
from textual.screen import Screen
|
||||
from textual.widgets import (Button, Footer, Header, Input, Label, Markdown,
|
||||
Select)
|
||||
|
||||
from heurams.services.version import ver
|
||||
import toml
|
||||
from pathlib import Path
|
||||
from heurams.context import config_var
|
||||
from heurams.services.version import ver
|
||||
|
||||
|
||||
class NucleonCreatorScreen(Screen):
|
||||
@@ -27,6 +21,7 @@ class NucleonCreatorScreen(Screen):
|
||||
|
||||
def search_templates(self):
|
||||
from pathlib import Path
|
||||
|
||||
from heurams.context import config_var
|
||||
|
||||
template_dir = Path(config_var.get()["paths"]["template_dir"])
|
||||
|
||||
@@ -1,22 +1,15 @@
|
||||
#!/usr/bin/env python3
|
||||
from textual.app import ComposeResult
|
||||
from textual.widgets import (
|
||||
Header,
|
||||
Footer,
|
||||
Label,
|
||||
Button,
|
||||
Static,
|
||||
ProgressBar,
|
||||
)
|
||||
from textual.containers import ScrollableContainer, Horizontal
|
||||
from textual.containers import ScrollableContainer
|
||||
from textual.screen import Screen
|
||||
import pathlib
|
||||
|
||||
from textual.app import ComposeResult
|
||||
from textual.containers import Horizontal, ScrollableContainer
|
||||
from textual.screen import Screen
|
||||
from textual.widgets import Button, Footer, Header, Label, ProgressBar, Static
|
||||
from textual.worker import get_current_worker
|
||||
|
||||
import heurams.kernel.particles as pt
|
||||
import heurams.services.hasher as hasher
|
||||
from heurams.context import *
|
||||
from textual.worker import get_current_worker
|
||||
|
||||
|
||||
class PrecachingScreen(Screen):
|
||||
@@ -46,7 +39,9 @@ class PrecachingScreen(Screen):
|
||||
self.desc = desc
|
||||
for i in nucleons:
|
||||
i: pt.Nucleon
|
||||
i.do_eval()
|
||||
atom = pt.Atom()
|
||||
atom.link("nucleon", i)
|
||||
atom.do_eval()
|
||||
# print("完成 EVAL")
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
@@ -96,17 +91,16 @@ class PrecachingScreen(Screen):
|
||||
|
||||
def precache_by_text(self, text: str):
|
||||
"""预缓存单段文本的音频"""
|
||||
from heurams.context import rootdir, workdir, config_var
|
||||
from heurams.context import config_var, rootdir, workdir
|
||||
|
||||
cache_dir = pathlib.Path(config_var.get()["paths"]["cache_dir"])
|
||||
cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
cache_file = cache_dir / f"{hasher.get_md5(text)}.wav"
|
||||
if not cache_file.exists():
|
||||
try: # TODO: 调用模块消除tts耦合
|
||||
import edge_tts as tts
|
||||
try:
|
||||
from heurams.services.tts_service import convertor
|
||||
|
||||
communicate = tts.Communicate(text, "zh-CN-XiaoxiaoNeural")
|
||||
communicate.save_sync(str(cache_file))
|
||||
convertor(text, cache_file)
|
||||
return 1
|
||||
except Exception as e:
|
||||
print(f"预缓存失败 '{text}': {e}")
|
||||
@@ -166,7 +160,7 @@ class PrecachingScreen(Screen):
|
||||
|
||||
def precache_all_files(self):
|
||||
"""预缓存所有文件"""
|
||||
from heurams.context import rootdir, workdir, config_var
|
||||
from heurams.context import config_var, rootdir, workdir
|
||||
|
||||
nucleon_path = pathlib.Path(config_var.get()["paths"]["nucleon_dir"])
|
||||
nucleon_files = [
|
||||
@@ -185,7 +179,9 @@ class PrecachingScreen(Screen):
|
||||
self.total = len(nu)
|
||||
for i in nu:
|
||||
i: pt.Nucleon
|
||||
i.do_eval()
|
||||
atom = pt.Atom()
|
||||
atom.link("nucleon", i)
|
||||
atom.do_eval()
|
||||
return self.precache_by_list(nu)
|
||||
|
||||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||||
@@ -220,7 +216,8 @@ class PrecachingScreen(Screen):
|
||||
# 清空缓存
|
||||
try:
|
||||
import shutil
|
||||
from heurams.context import rootdir, workdir, config_var
|
||||
|
||||
from heurams.context import config_var, rootdir, workdir
|
||||
|
||||
shutil.rmtree(
|
||||
f"{config_var.get()["paths"]["cache_dir"]}", ignore_errors=True
|
||||
|
||||
@@ -1,21 +1,15 @@
|
||||
#!/usr/bin/env python3
|
||||
from textual.app import ComposeResult
|
||||
from textual.widgets import (
|
||||
Header,
|
||||
Footer,
|
||||
Label,
|
||||
Static,
|
||||
Button,
|
||||
Markdown,
|
||||
)
|
||||
from textual.containers import ScrollableContainer
|
||||
from textual.reactive import reactive
|
||||
from textual.screen import Screen
|
||||
from heurams.context import config_var
|
||||
from textual.widget import Widget
|
||||
from textual.widgets import Button, Footer, Header, Label, Markdown, Static
|
||||
|
||||
import heurams.kernel.particles as pt
|
||||
import heurams.services.hasher as hasher
|
||||
from heurams.context import *
|
||||
from textual.reactive import reactive
|
||||
from textual.widget import Widget
|
||||
from heurams.context import config_var
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
@@ -1,22 +1,16 @@
|
||||
#!/usr/bin/env python3
|
||||
from textual.app import ComposeResult
|
||||
from textual.widgets import (
|
||||
Header,
|
||||
Footer,
|
||||
Label,
|
||||
Button,
|
||||
Static,
|
||||
ProgressBar,
|
||||
)
|
||||
from textual.containers import ScrollableContainer, Horizontal
|
||||
from textual.containers import ScrollableContainer
|
||||
from textual.screen import Screen
|
||||
import pathlib
|
||||
import time
|
||||
|
||||
from textual.app import ComposeResult
|
||||
from textual.containers import Horizontal, ScrollableContainer
|
||||
from textual.screen import Screen
|
||||
from textual.widgets import Button, Footer, Header, Label, ProgressBar, Static
|
||||
from textual.worker import get_current_worker
|
||||
|
||||
import heurams.kernel.particles as pt
|
||||
import heurams.services.hasher as hasher
|
||||
from heurams.context import *
|
||||
from textual.worker import get_current_worker
|
||||
|
||||
|
||||
class SyncScreen(Screen):
|
||||
@@ -25,22 +19,287 @@ class SyncScreen(Screen):
|
||||
|
||||
def __init__(self, nucleons: list = [], desc: str = ""):
|
||||
super().__init__(name=None, id=None, classes=None)
|
||||
self.sync_service = None
|
||||
self.is_syncing = False
|
||||
self.is_paused = False
|
||||
self.log_messages = []
|
||||
self.max_log_lines = 50
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
yield Header(show_clock=True)
|
||||
with ScrollableContainer(id="sync_container"):
|
||||
pass
|
||||
# 标题和连接状态
|
||||
yield Static("同步工具", classes="title")
|
||||
yield Static("", id="status_label", classes="status")
|
||||
|
||||
# 配置信息
|
||||
yield Static(f"同步协议: {config_var.get()['services']['sync']}")
|
||||
yield Static("服务器配置:", classes="section_title")
|
||||
with Horizontal(classes="config_info"):
|
||||
yield Static("远程服务器:", classes="config_label")
|
||||
yield Static("", id="server_url", classes="config_value")
|
||||
with Horizontal(classes="config_info"):
|
||||
yield Static("远程路径:", classes="config_label")
|
||||
yield Static("", id="remote_path", classes="config_value")
|
||||
|
||||
with Horizontal(classes="control_buttons"):
|
||||
yield Button("测试连接", id="test_connection", variant="primary")
|
||||
yield Button("开始同步", id="start_sync", variant="success")
|
||||
yield Button("暂停", id="pause_sync", variant="warning", disabled=True)
|
||||
yield Button("取消", id="cancel_sync", variant="error", disabled=True)
|
||||
|
||||
yield Static("同步进度", classes="section_title")
|
||||
yield ProgressBar(id="progress_bar", show_percentage=True, total=100)
|
||||
yield Static("", id="progress_label", classes="progress_text")
|
||||
|
||||
yield Static("同步日志", classes="section_title")
|
||||
yield Static("", id="log_output", classes="log_output")
|
||||
|
||||
yield Footer()
|
||||
|
||||
def on_mount(self):
|
||||
"""挂载时初始化状态"""
|
||||
self.update_ui_from_config()
|
||||
self.log_message("同步工具已启动")
|
||||
|
||||
def update_ui_from_config(self):
|
||||
"""更新 UI 显示配置信息"""
|
||||
try:
|
||||
sync_cfg: dict = config_var.get()["providers"]["sync"]["webdav"]
|
||||
# 更新服务器 URL
|
||||
url = sync_cfg.get("url", "未配置")
|
||||
url_widget = self.query_one("#server_url")
|
||||
url_widget.update(url) # type: ignore
|
||||
# 更新远程路径
|
||||
remote_path = sync_cfg.get("remote_path", "/")
|
||||
path_widget = self.query_one("#remote_path")
|
||||
path_widget.update(remote_path) # type: ignore
|
||||
|
||||
# 更新状态标签
|
||||
status_widget = self.query_one("#status_label")
|
||||
if self.sync_service and self.sync_service.client:
|
||||
status_widget.update("✅ 同步服务已就绪") # type: ignore
|
||||
status_widget.add_class("ready")
|
||||
else:
|
||||
status_widget.update("❌ 同步服务未配置或未启用") # type: ignore
|
||||
status_widget.add_class("error")
|
||||
|
||||
except Exception as e:
|
||||
self.log_message(f"更新 UI 失败: {e}", is_error=True)
|
||||
|
||||
def update_status(self, status, current_item="", progress=None):
|
||||
"""更新状态显示"""
|
||||
try:
|
||||
status_widget = self.query_one("#status_label")
|
||||
status_widget.update(status) # type: ignore
|
||||
|
||||
if progress is not None:
|
||||
progress_bar = self.query_one("#progress_bar")
|
||||
progress_bar.progress = progress # type: ignore
|
||||
|
||||
progress_label = self.query_one("#progress_label")
|
||||
progress_label.update(f"{progress}% - {current_item}" if current_item else f"{progress}%") # type: ignore
|
||||
|
||||
except Exception as e:
|
||||
self.log_message(f"更新状态失败: {e}", is_error=True)
|
||||
|
||||
def log_message(self, message: str, is_error: bool = False):
|
||||
"""添加日志消息并更新显示"""
|
||||
timestamp = time.strftime("%H:%M:%S")
|
||||
prefix = "[ERROR]" if is_error else "[INFO]"
|
||||
log_line = f"{timestamp} {prefix} {message}"
|
||||
|
||||
self.log_messages.append(log_line)
|
||||
# 保持日志行数不超过最大值
|
||||
if len(self.log_messages) > self.max_log_lines:
|
||||
self.log_messages = self.log_messages[-self.max_log_lines :]
|
||||
|
||||
# 更新日志显示
|
||||
try:
|
||||
log_widget = self.query_one("#log_output")
|
||||
log_widget.update("\n".join(self.log_messages)) # type: ignore
|
||||
except Exception:
|
||||
pass # 如果组件未就绪,忽略错误
|
||||
|
||||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||||
"""处理按钮点击事件"""
|
||||
button_id = event.button.id
|
||||
|
||||
if button_id == "test_connection":
|
||||
self.test_connection()
|
||||
elif button_id == "start_sync":
|
||||
self.start_sync()
|
||||
elif button_id == "pause_sync":
|
||||
self.pause_sync()
|
||||
elif button_id == "cancel_sync":
|
||||
self.cancel_sync()
|
||||
|
||||
event.stop()
|
||||
|
||||
def test_connection(self):
|
||||
"""测试 WebDAV 服务器连接"""
|
||||
if not self.sync_service:
|
||||
self.log_message("同步服务未初始化,请检查配置", is_error=True)
|
||||
self.update_status("❌ 同步服务未初始化")
|
||||
return
|
||||
|
||||
self.log_message("正在测试 WebDAV 连接...")
|
||||
self.update_status("正在测试连接...")
|
||||
|
||||
try:
|
||||
success = self.sync_service.test_connection()
|
||||
if success:
|
||||
self.log_message("连接测试成功")
|
||||
self.update_status("✅ 连接正常")
|
||||
else:
|
||||
self.log_message("连接测试失败", is_error=True)
|
||||
self.update_status("❌ 连接失败")
|
||||
except Exception as e:
|
||||
self.log_message(f"连接测试异常: {e}", is_error=True)
|
||||
self.update_status("❌ 连接异常")
|
||||
|
||||
def start_sync(self):
|
||||
"""开始同步"""
|
||||
if not self.sync_service:
|
||||
self.log_message("同步服务未初始化,无法开始同步", is_error=True)
|
||||
return
|
||||
|
||||
if self.is_syncing:
|
||||
self.log_message("同步已在进行中", is_error=True)
|
||||
return
|
||||
|
||||
self.is_syncing = True
|
||||
self.is_paused = False
|
||||
self.update_button_states()
|
||||
|
||||
self.log_message("开始同步数据...")
|
||||
self.update_status("正在同步...", progress=0)
|
||||
|
||||
# 启动后台同步任务
|
||||
self.run_worker(self.perform_sync, thread=True)
|
||||
|
||||
def perform_sync(self):
|
||||
"""执行同步任务(在后台线程中运行)"""
|
||||
worker = get_current_worker()
|
||||
|
||||
try:
|
||||
# 获取需要同步的本地目录
|
||||
from heurams.context import config_var
|
||||
|
||||
config = config_var.get()
|
||||
paths = config.get("paths", {})
|
||||
|
||||
# 同步 nucleon 目录
|
||||
nucleon_dir = pathlib.Path(paths.get("nucleon_dir", "./data/nucleon"))
|
||||
if nucleon_dir.exists():
|
||||
self.log_message(f"同步 nucleon 目录: {nucleon_dir}")
|
||||
self.update_status(f"同步 nucleon 目录...", progress=10)
|
||||
|
||||
result = self.sync_service.sync_directory(nucleon_dir) # type: ignore
|
||||
if result.get("success"):
|
||||
self.log_message(
|
||||
f"nucleon 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)} 个"
|
||||
)
|
||||
else:
|
||||
self.log_message(
|
||||
f"nucleon 同步失败: {result.get('error', '未知错误')}",
|
||||
is_error=True,
|
||||
)
|
||||
|
||||
# 同步 electron 目录
|
||||
electron_dir = pathlib.Path(paths.get("electron_dir", "./data/electron"))
|
||||
if electron_dir.exists():
|
||||
self.log_message(f"同步 electron 目录: {electron_dir}")
|
||||
self.update_status(f"同步 electron 目录...", progress=60)
|
||||
|
||||
result = self.sync_service.sync_directory(electron_dir) # type: ignore
|
||||
if result.get("success"):
|
||||
self.log_message(
|
||||
f"electron 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)} 个"
|
||||
)
|
||||
else:
|
||||
self.log_message(
|
||||
f"electron 同步失败: {result.get('error', '未知错误')}",
|
||||
is_error=True,
|
||||
)
|
||||
|
||||
# 同步 orbital 目录(如果存在)
|
||||
orbital_dir = pathlib.Path(paths.get("orbital_dir", "./data/orbital"))
|
||||
if orbital_dir.exists():
|
||||
self.log_message(f"同步 orbital 目录: {orbital_dir}")
|
||||
self.update_status(f"同步 orbital 目录...", progress=80)
|
||||
|
||||
result = self.sync_service.sync_directory(orbital_dir) # type: ignore
|
||||
if result.get("success"):
|
||||
self.log_message(
|
||||
f"orbital 同步完成: 上传 {result.get('uploaded', 0)} 个, 下载 {result.get('downloaded', 0)} 个"
|
||||
)
|
||||
else:
|
||||
self.log_message(
|
||||
f"orbital 同步失败: {result.get('error', '未知错误')}",
|
||||
is_error=True,
|
||||
)
|
||||
|
||||
# 同步完成
|
||||
self.update_status("同步完成", progress=100)
|
||||
self.log_message("所有目录同步完成")
|
||||
|
||||
except Exception as e:
|
||||
self.log_message(f"同步过程中发生错误: {e}", is_error=True)
|
||||
self.update_status("同步失败")
|
||||
finally:
|
||||
# 重置同步状态
|
||||
self.is_syncing = False
|
||||
self.is_paused = False
|
||||
self.update_button_states() # type: ignore
|
||||
|
||||
def pause_sync(self):
|
||||
"""暂停同步"""
|
||||
if not self.is_syncing:
|
||||
return
|
||||
|
||||
self.is_paused = not self.is_paused
|
||||
self.update_button_states()
|
||||
|
||||
if self.is_paused:
|
||||
self.log_message("同步已暂停")
|
||||
self.update_status("同步已暂停")
|
||||
else:
|
||||
self.log_message("同步已恢复")
|
||||
self.update_status("正在同步...")
|
||||
|
||||
def cancel_sync(self):
|
||||
"""取消同步"""
|
||||
if not self.is_syncing:
|
||||
return
|
||||
|
||||
self.is_syncing = False
|
||||
self.is_paused = False
|
||||
self.update_button_states()
|
||||
|
||||
self.log_message("同步已取消")
|
||||
self.update_status("同步已取消")
|
||||
|
||||
def update_button_states(self):
|
||||
"""更新按钮状态"""
|
||||
try:
|
||||
start_button = self.query_one("#start_sync")
|
||||
pause_button = self.query_one("#pause_sync")
|
||||
cancel_button = self.query_one("#cancel_sync")
|
||||
|
||||
if self.is_syncing:
|
||||
start_button.disabled = True
|
||||
pause_button.disabled = False
|
||||
cancel_button.disabled = False
|
||||
pause_button.label = "继续" if self.is_paused else "暂停" # type: ignore
|
||||
else:
|
||||
start_button.disabled = False
|
||||
pause_button.disabled = True
|
||||
cancel_button.disabled = True
|
||||
|
||||
except Exception as e:
|
||||
self.log_message(f"更新按钮状态失败: {e}", is_error=True)
|
||||
|
||||
def action_go_back(self):
|
||||
self.app.pop_screen()
|
||||
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
"""Kernel 操作辅助函数库"""
|
||||
|
||||
import random
|
||||
from typing import TypedDict
|
||||
|
||||
import heurams.interface.widgets as pzw
|
||||
import heurams.kernel.particles as pt
|
||||
import heurams.kernel.puzzles as pz
|
||||
import heurams.interface.widgets as pzw
|
||||
from typing import TypedDict
|
||||
|
||||
staging = {} # 细粒度缓存区, 是 ident -> quality 的封装
|
||||
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
from typing import Iterable
|
||||
|
||||
from textual.app import ComposeResult
|
||||
from textual.widget import Widget
|
||||
|
||||
import heurams.kernel.particles as pt
|
||||
|
||||
|
||||
|
||||
@@ -1,13 +1,11 @@
|
||||
from textual.widgets import (
|
||||
Label,
|
||||
Static,
|
||||
Button,
|
||||
)
|
||||
from textual.containers import ScrollableContainer, Horizontal
|
||||
from textual.widget import Widget
|
||||
import heurams.kernel.particles as pt
|
||||
from .base_puzzle_widget import BasePuzzleWidget
|
||||
from textual.containers import Horizontal, ScrollableContainer
|
||||
from textual.message import Message
|
||||
from textual.widget import Widget
|
||||
from textual.widgets import Button, Label, Static
|
||||
|
||||
import heurams.kernel.particles as pt
|
||||
|
||||
from .base_puzzle_widget import BasePuzzleWidget
|
||||
|
||||
|
||||
class BasicEvaluation(BasePuzzleWidget):
|
||||
@@ -51,7 +49,7 @@ class BasicEvaluation(BasePuzzleWidget):
|
||||
# 显示主要内容
|
||||
yield Label(self.atom.registry["nucleon"]["content"], id="main")
|
||||
|
||||
# 显示评估说明(可选)
|
||||
# 显示评估说明(可选)
|
||||
yield Static("请评估你对这个内容的记忆程度: ", classes="instruction")
|
||||
|
||||
# 按钮容器
|
||||
|
||||
@@ -1,17 +1,17 @@
|
||||
from textual.widgets import (
|
||||
Label,
|
||||
Button,
|
||||
)
|
||||
from textual.widget import Widget
|
||||
import heurams.kernel.particles as pt
|
||||
import heurams.kernel.puzzles as pz
|
||||
from .base_puzzle_widget import BasePuzzleWidget
|
||||
import copy
|
||||
import random
|
||||
from typing import TypedDict
|
||||
|
||||
from textual.containers import Container
|
||||
from textual.message import Message
|
||||
from textual.widget import Widget
|
||||
from textual.widgets import Button, Label
|
||||
|
||||
import heurams.kernel.particles as pt
|
||||
import heurams.kernel.puzzles as pz
|
||||
from heurams.services.logger import get_logger
|
||||
from typing import TypedDict
|
||||
|
||||
from .base_puzzle_widget import BasePuzzleWidget
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
@@ -1,8 +1,5 @@
|
||||
from textual.widgets import (
|
||||
Label,
|
||||
Button,
|
||||
)
|
||||
from textual.widget import Widget
|
||||
from textual.widgets import Button, Label
|
||||
|
||||
|
||||
class Finished(Widget):
|
||||
|
||||
@@ -1,17 +1,17 @@
|
||||
# 单项选择题
|
||||
from textual.widgets import (
|
||||
Label,
|
||||
Button,
|
||||
)
|
||||
from textual.containers import ScrollableContainer, Container
|
||||
from typing import TypedDict
|
||||
|
||||
from textual.containers import Container, ScrollableContainer
|
||||
from textual.widget import Widget
|
||||
from textual.widgets import Button, Label
|
||||
|
||||
import heurams.kernel.particles as pt
|
||||
import heurams.kernel.puzzles as pz
|
||||
from .base_puzzle_widget import BasePuzzleWidget
|
||||
from typing import TypedDict
|
||||
from heurams.services.hasher import hash
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
from .base_puzzle_widget import BasePuzzleWidget
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
@@ -61,11 +61,16 @@ class MCQPuzzle(BasePuzzleWidget):
|
||||
self.puzzle.refresh()
|
||||
|
||||
def compose(self):
|
||||
self.atom.registry["nucleon"].do_eval()
|
||||
setting: Setting = self.atom.registry["nucleon"].metadata["orbital"]["puzzles"][
|
||||
self.alia
|
||||
]
|
||||
logger.debug(f"Puzzle Setting: {setting}")
|
||||
logger.debug(f"WIRED INDEX: {len(self.inputlist)}")
|
||||
if len(self.inputlist) > len(self.puzzle.options):
|
||||
logger.debug("ERR IDX")
|
||||
logger.debug(self.inputlist)
|
||||
logger.debug(self.puzzle.options)
|
||||
else:
|
||||
current_options = self.puzzle.options[len(self.inputlist)]
|
||||
yield Label(setting["primary"], id="sentence")
|
||||
yield Label(self.puzzle.wording[len(self.inputlist)], id="puzzle")
|
||||
@@ -116,7 +121,7 @@ class MCQPuzzle(BasePuzzleWidget):
|
||||
|
||||
self.screen.rating = rating # type: ignore
|
||||
self.handler(rating)
|
||||
# 重置输入(如果回答错误)
|
||||
# 重置输入(如果回答错误)
|
||||
if not is_correct:
|
||||
self.inputlist = []
|
||||
self.refresh_buttons()
|
||||
@@ -127,7 +132,7 @@ class MCQPuzzle(BasePuzzleWidget):
|
||||
self.update_display()
|
||||
|
||||
def refresh_buttons(self):
|
||||
"""刷新按钮显示(用于题目切换)"""
|
||||
"""刷新按钮显示(用于题目切换)"""
|
||||
# 移除所有选项按钮
|
||||
logger.debug("刷新按钮")
|
||||
self.cursor += 1
|
||||
|
||||
@@ -1,8 +1,5 @@
|
||||
from textual.widgets import (
|
||||
Label,
|
||||
Button,
|
||||
)
|
||||
from textual.widget import Widget
|
||||
from textual.widgets import Button, Label
|
||||
|
||||
|
||||
class Placeholder(Widget):
|
||||
|
||||
@@ -1,20 +1,17 @@
|
||||
from textual.reactive import reactive
|
||||
from textual.widgets import (
|
||||
Markdown,
|
||||
Label,
|
||||
Static,
|
||||
Button,
|
||||
)
|
||||
from textual.containers import Center
|
||||
from textual.widget import Widget
|
||||
from typing import Dict
|
||||
import heurams.kernel.particles as pt
|
||||
import re
|
||||
from .base_puzzle_widget import BasePuzzleWidget
|
||||
from typing import TypedDict, List
|
||||
from typing import Dict, List, TypedDict
|
||||
|
||||
from textual.containers import Center
|
||||
from textual.message import Message
|
||||
from textual.reactive import reactive
|
||||
from textual.widget import Widget
|
||||
from textual.widgets import Button, Label, Markdown, Static
|
||||
|
||||
import heurams.kernel.particles as pt
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
from .base_puzzle_widget import BasePuzzleWidget
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
@@ -52,6 +49,11 @@ class Recognition(BasePuzzleWidget):
|
||||
self.alia = alia
|
||||
|
||||
def compose(self):
|
||||
from heurams.context import config_var
|
||||
|
||||
autovoice = config_var.get()["interface"]["memorizor"]["autovoice"]
|
||||
if autovoice:
|
||||
self.screen.action_play_voice() # type: ignore
|
||||
cfg: RecognitionConfig = self.atom.registry["orbital"]["puzzles"][self.alia]
|
||||
delim = self.atom.registry["nucleon"].metadata["formation"]["delimiter"]
|
||||
replace_dict = {
|
||||
@@ -71,7 +73,8 @@ class Recognition(BasePuzzleWidget):
|
||||
primary = cfg["primary"]
|
||||
|
||||
with Center():
|
||||
yield Static(f"[dim]{cfg['top_dim']}[/]")
|
||||
for i in cfg["top_dim"]:
|
||||
yield Static(f"[dim]{i}[/]")
|
||||
yield Label("")
|
||||
|
||||
for old, new in replace_dict.items():
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
from .sm2 import SM2Algorithm
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
from .sm2 import SM2Algorithm
|
||||
from .sm15m import SM15MAlgorithm
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
__all__ = [
|
||||
@@ -9,7 +11,8 @@ __all__ = [
|
||||
|
||||
algorithms = {
|
||||
"SM-2": SM2Algorithm,
|
||||
"supermemo2": SM2Algorithm,
|
||||
"SM-15M": SM15MAlgorithm,
|
||||
# "SM-15M": SM15MAlgorithm,
|
||||
}
|
||||
|
||||
logger.debug("算法模块初始化完成, 注册的算法: %s", list(algorithms.keys()))
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import heurams.services.timer as timer
|
||||
from typing import TypedDict
|
||||
|
||||
import heurams.services.timer as timer
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
286
src/heurams/kernel/algorithms/sm15m.py
Normal file
286
src/heurams/kernel/algorithms/sm15m.py
Normal file
@@ -0,0 +1,286 @@
|
||||
"""
|
||||
SM-15 接口兼容实现, 基于 SM-15 算法的逆向工程
|
||||
全局状态保存在文件中, 项目状态通过 algodata 字典传递
|
||||
|
||||
基于: https://github.com/slaypni/sm.js
|
||||
原始 CoffeeScript 代码: (c) 2014 Kazuaki Tanida
|
||||
MIT 许可证
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import json
|
||||
import os
|
||||
import pathlib
|
||||
from typing import TypedDict
|
||||
|
||||
from heurams.context import config_var
|
||||
from heurams.kernel.algorithms.sm15m_calc import (MAX_AF, MIN_AF, NOTCH_AF,
|
||||
RANGE_AF, RANGE_REPETITION,
|
||||
SM, THRESHOLD_RECALL, Item)
|
||||
|
||||
# 全局状态文件路径
|
||||
_GLOBAL_STATE_FILE = os.path.expanduser(
|
||||
pathlib.Path(config_var.get()["paths"]["global_dir"]) / "sm15m_global_state.json"
|
||||
)
|
||||
|
||||
|
||||
def _get_global_sm():
|
||||
"""获取全局 SM 实例, 从文件加载或创建新的"""
|
||||
if os.path.exists(_GLOBAL_STATE_FILE):
|
||||
try:
|
||||
with open(_GLOBAL_STATE_FILE, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
sm_instance = SM.load(data)
|
||||
return sm_instance
|
||||
except Exception:
|
||||
# 如果加载失败, 创建新的实例
|
||||
pass
|
||||
|
||||
# 创建新的 SM 实例
|
||||
sm_instance = SM()
|
||||
# 保存初始状态
|
||||
_save_global_sm(sm_instance)
|
||||
return sm_instance
|
||||
|
||||
|
||||
def _save_global_sm(sm_instance):
|
||||
"""保存全局 SM 实例到文件"""
|
||||
try:
|
||||
data = sm_instance.data()
|
||||
with open(_GLOBAL_STATE_FILE, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
except Exception:
|
||||
# 忽略保存错误
|
||||
pass
|
||||
|
||||
|
||||
class SM15MAlgorithm:
|
||||
algo_name = "SM-15M"
|
||||
|
||||
class AlgodataDict(TypedDict):
|
||||
efactor: float
|
||||
real_rept: int
|
||||
rept: int
|
||||
interval: int
|
||||
last_date: int
|
||||
next_date: int
|
||||
is_activated: int
|
||||
last_modify: float
|
||||
|
||||
defaults = {
|
||||
"efactor": 2.5,
|
||||
"real_rept": 0,
|
||||
"rept": 0,
|
||||
"interval": 0,
|
||||
"last_date": 0,
|
||||
"next_date": 0,
|
||||
"is_activated": 0,
|
||||
"last_modify": 0.0,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def _get_timestamp(cls):
|
||||
"""获取当前时间戳(秒)"""
|
||||
return datetime.datetime.now().timestamp()
|
||||
|
||||
@classmethod
|
||||
def _get_daystamp(cls):
|
||||
"""获取当前天数戳(从某个纪元开始的天数)"""
|
||||
# 使用与原始 SM-2 相同的纪元:1970-01-01
|
||||
now = datetime.datetime.now()
|
||||
epoch = datetime.datetime(1970, 1, 1)
|
||||
delta = now - epoch
|
||||
return delta.days
|
||||
|
||||
@classmethod
|
||||
def _algodata_to_item(cls, algodata, sm_instance):
|
||||
"""将 algodata 转换为 Item 实例"""
|
||||
# 从 algodata 获取 SM-2 数据
|
||||
sm15_data = algodata.get(cls.algo_name, cls.defaults.copy())
|
||||
|
||||
# 创建 Item 实例
|
||||
item = Item(sm_instance)
|
||||
|
||||
# 映射字段
|
||||
# efactor -> A-Factor (需要转换)
|
||||
efactor = sm15_data.get("efactor", 2.5)
|
||||
# SM-2 的 efactor 范围 [1.3, 2.5+], SM-15 的 A-Factor 范围 [1.2, 6.9]
|
||||
# 简单线性映射:af = (efactor - 1.3) * (MAX_AF - MIN_AF) / (2.5 - 1.3) + MIN_AF
|
||||
# 但 efactor 可能大于 2.5, 所以需要限制
|
||||
af = max(MIN_AF, min(MAX_AF, efactor * 2.0)) # 粗略映射
|
||||
# 调试
|
||||
# print(f"DEBUG: efactor={efactor}, af before set={af}")
|
||||
item.af(af)
|
||||
# print(f"DEBUG: item.af() after set={item.af()}")
|
||||
|
||||
# rept -> repetition (成功回忆次数)
|
||||
rept = sm15_data.get("rept", 0)
|
||||
item.repetition = (
|
||||
rept - 1 if rept > 0 else -1
|
||||
) # SM-15 中 repetition=-1 表示新项目
|
||||
|
||||
# real_rept -> lapse? 或者忽略
|
||||
real_rept = sm15_data.get("real_rept", 0)
|
||||
# 可以存储在 value 中或忽略
|
||||
|
||||
# interval -> optimum_interval (需要从天数转换为毫秒)
|
||||
interval_days = sm15_data.get("interval", 0)
|
||||
if interval_days == 0:
|
||||
item.optimum_interval = sm_instance.interval_base
|
||||
else:
|
||||
item.optimum_interval = interval_days * 24 * 60 * 60 * 1000 # 天转毫秒
|
||||
|
||||
# last_date -> previous_date
|
||||
last_date_days = sm15_data.get("last_date", 0)
|
||||
if last_date_days > 0:
|
||||
epoch = datetime.datetime(1970, 1, 1)
|
||||
item.previous_date = epoch + datetime.timedelta(days=last_date_days)
|
||||
|
||||
# next_date -> due_date
|
||||
next_date_days = sm15_data.get("next_date", 0)
|
||||
if next_date_days > 0:
|
||||
epoch = datetime.datetime(1970, 1, 1)
|
||||
item.due_date = epoch + datetime.timedelta(days=next_date_days)
|
||||
|
||||
# is_activated 和 last_modify 忽略
|
||||
|
||||
# 将原始 algodata 保存在 value 中以便恢复
|
||||
item.value = {
|
||||
"front": "SM-15 item",
|
||||
"back": "SM-15 item",
|
||||
"_sm15_data": sm15_data,
|
||||
}
|
||||
|
||||
return item
|
||||
|
||||
@classmethod
|
||||
def _item_to_algodata(cls, item, algodata):
|
||||
"""将 Item 实例状态写回 algodata"""
|
||||
if cls.algo_name not in algodata:
|
||||
algodata[cls.algo_name] = cls.defaults.copy()
|
||||
|
||||
sm15_data = algodata[cls.algo_name]
|
||||
|
||||
# A-Factor -> efactor (反向映射)
|
||||
af = item.af()
|
||||
if af is None:
|
||||
af = MIN_AF
|
||||
# 反向粗略映射
|
||||
efactor = max(1.3, min(af / 2.0, 10.0)) # 限制范围
|
||||
# 调试
|
||||
# print(f"DEBUG: item.af()={af}, computed efactor={efactor}")
|
||||
sm15_data["efactor"] = efactor
|
||||
|
||||
# repetition -> rept
|
||||
rept = item.repetition + 1 if item.repetition >= 0 else 0
|
||||
sm15_data["rept"] = rept
|
||||
|
||||
# real_rept: 递增在 revisor 中处理, 这里保持不变
|
||||
# 但如果没有 real_rept 字段, 则初始化为0
|
||||
if "real_rept" not in sm15_data:
|
||||
sm15_data["real_rept"] = 0
|
||||
|
||||
# optimum_interval -> interval (毫秒转天)
|
||||
interval_ms = item.optimum_interval
|
||||
if interval_ms == item.sm.interval_base:
|
||||
sm15_data["interval"] = 0
|
||||
else:
|
||||
interval_days = max(0, round(interval_ms / (24 * 60 * 60 * 1000)))
|
||||
sm15_data["interval"] = interval_days
|
||||
|
||||
# previous_date -> last_date
|
||||
if item.previous_date:
|
||||
epoch = datetime.datetime(1970, 1, 1)
|
||||
last_date_days = (item.previous_date - epoch).days
|
||||
sm15_data["last_date"] = last_date_days
|
||||
else:
|
||||
sm15_data["last_date"] = 0
|
||||
|
||||
# due_date -> next_date
|
||||
if item.due_date:
|
||||
epoch = datetime.datetime(1970, 1, 1)
|
||||
next_date_days = (item.due_date - epoch).days
|
||||
sm15_data["next_date"] = next_date_days
|
||||
else:
|
||||
sm15_data["next_date"] = 0
|
||||
|
||||
# is_activated: 保持不变或设为1
|
||||
if "is_activated" not in sm15_data:
|
||||
sm15_data["is_activated"] = 1
|
||||
|
||||
# last_modify: 更新时间戳
|
||||
sm15_data["last_modify"] = cls._get_timestamp()
|
||||
|
||||
return algodata
|
||||
|
||||
@classmethod
|
||||
def revisor(
|
||||
cls, algodata: dict, feedback: int = 5, is_new_activation: bool = False
|
||||
):
|
||||
"""SM-15 算法迭代决策机制实现"""
|
||||
# 获取全局 SM 实例
|
||||
sm_instance = _get_global_sm()
|
||||
|
||||
# 将 algodata 转换为 Item
|
||||
item = cls._algodata_to_item(algodata, sm_instance)
|
||||
|
||||
# 处理 is_new_activation
|
||||
if is_new_activation:
|
||||
# 重置为初始状态
|
||||
item.repetition = -1
|
||||
item.lapse = 0
|
||||
item.optimum_interval = sm_instance.interval_base
|
||||
item.previous_date = None
|
||||
item.due_date = datetime.datetime.fromtimestamp(0)
|
||||
item.af(2.5) # 重置 efactor
|
||||
|
||||
# 将项目临时添加到 SM 实例(以便 answer 更新共享状态)
|
||||
sm_instance.q.append(item)
|
||||
|
||||
# 处理反馈(评分)
|
||||
# SM-2 的 feedback 是 0-5, SM-15 的 grade 也是 0-5
|
||||
grade = feedback
|
||||
now = datetime.datetime.now()
|
||||
|
||||
# 调用 answer 方法
|
||||
item.answer(grade, now)
|
||||
|
||||
# 更新共享状态(FI-Graph, ForgettingCurves, OFM)
|
||||
if item.repetition >= 0:
|
||||
sm_instance.forgetting_curves.register_point(grade, item, now)
|
||||
sm_instance.ofm.update()
|
||||
sm_instance.fi_g.update(grade, item, now)
|
||||
|
||||
# 从队列中移除项目
|
||||
sm_instance.q.remove(item)
|
||||
|
||||
# 保存全局状态
|
||||
_save_global_sm(sm_instance)
|
||||
|
||||
# 将更新后的 Item 状态写回 algodata
|
||||
cls._item_to_algodata(item, algodata)
|
||||
|
||||
# 更新 real_rept(总复习次数)
|
||||
algodata[cls.algo_name]["real_rept"] += 1
|
||||
|
||||
@classmethod
|
||||
def is_due(cls, algodata):
|
||||
"""检查项目是否到期"""
|
||||
sm15_data = algodata.get(cls.algo_name, cls.defaults.copy())
|
||||
next_date_days = sm15_data.get("next_date", 0)
|
||||
current_daystamp = cls._get_daystamp()
|
||||
return next_date_days <= current_daystamp
|
||||
|
||||
@classmethod
|
||||
def rate(cls, algodata):
|
||||
"""获取项目的评分(返回 efactor 字符串)"""
|
||||
sm15_data = algodata.get(cls.algo_name, cls.defaults.copy())
|
||||
efactor = sm15_data.get("efactor", 2.5)
|
||||
return str(efactor)
|
||||
|
||||
@classmethod
|
||||
def nextdate(cls, algodata) -> int:
|
||||
"""获取下次复习日期(天数戳)"""
|
||||
sm15_data = algodata.get(cls.algo_name, cls.defaults.copy())
|
||||
next_date_days = sm15_data.get("next_date", 0)
|
||||
return next_date_days
|
||||
1573
src/heurams/kernel/algorithms/sm15m_calc.py
Normal file
1573
src/heurams/kernel/algorithms/sm15m_calc.py
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1,8 +1,10 @@
|
||||
from .base import BaseAlgorithm
|
||||
import heurams.services.timer as timer
|
||||
from typing import TypedDict
|
||||
|
||||
import heurams.services.timer as timer
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
from .base import BaseAlgorithm
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
|
||||
@@ -9,12 +9,12 @@ from heurams.services.logger import get_logger
|
||||
logger = get_logger(__name__)
|
||||
logger.debug("粒子模块已加载")
|
||||
|
||||
from .atom import Atom, atom_registry
|
||||
from .electron import Electron
|
||||
from .loader import load_electron, load_nucleon
|
||||
from .nucleon import Nucleon
|
||||
from .orbital import Orbital
|
||||
from .atom import Atom, atom_registry
|
||||
from .probe import probe_all, probe_by_filename
|
||||
from .loader import load_nucleon, load_electron
|
||||
|
||||
__all__ = [
|
||||
"Electron",
|
||||
|
||||
@@ -1,14 +1,17 @@
|
||||
import json
|
||||
import pathlib
|
||||
import typing
|
||||
from typing import TypedDict
|
||||
|
||||
import bidict
|
||||
import toml
|
||||
|
||||
from heurams.context import config_var
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
from .electron import Electron
|
||||
from .nucleon import Nucleon
|
||||
from .orbital import Orbital
|
||||
from typing import TypedDict
|
||||
import pathlib
|
||||
import typing
|
||||
import toml
|
||||
import json
|
||||
import bidict
|
||||
from heurams.context import config_var
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
@@ -59,7 +62,6 @@ class Atom:
|
||||
"orbital_fmt": "toml",
|
||||
"runtime": {"locked": False, "min_rate": 0x3F3F3F3F, "newact": False},
|
||||
}
|
||||
self.do_eval()
|
||||
logger.debug("Atom 初始化完成")
|
||||
|
||||
def link(self, key, value):
|
||||
@@ -67,14 +69,64 @@ class Atom:
|
||||
if key in self.registry.keys():
|
||||
self.registry[key] = value
|
||||
logger.debug("键 '%s' 已链接, 触发 do_eval", key)
|
||||
self.do_eval()
|
||||
if key == 'electron':
|
||||
if self.registry['electron'].is_activated() == 0:
|
||||
self.registry['runtime']['newact'] = True
|
||||
if key == "electron":
|
||||
if self.registry["electron"].is_activated() == 0:
|
||||
self.registry["runtime"]["newact"] = True
|
||||
else:
|
||||
logger.error("尝试链接不受支持的键: '%s'", key)
|
||||
raise ValueError("不受支持的原子元数据链接操作")
|
||||
|
||||
def do_eval(self):
|
||||
"""
|
||||
执行并以结果替换当前单元的所有 eval 语句
|
||||
TODO: 带有限制的 eval, 异步/多线程执行避免堵塞
|
||||
"""
|
||||
logger.debug("EVAL 开始")
|
||||
|
||||
# eval 环境设置
|
||||
def eval_with_env(s: str):
|
||||
default = config_var.get()["puzzles"]
|
||||
payload = self.registry["nucleon"].payload
|
||||
metadata = self.registry["nucleon"].metadata
|
||||
eval_value = eval(s)
|
||||
if isinstance(eval_value, (int, float)):
|
||||
ret = str(eval_value)
|
||||
else:
|
||||
ret = eval_value
|
||||
logger.debug(
|
||||
"eval 执行成功: '%s' -> '%s'",
|
||||
s,
|
||||
str(ret)[:50] + "..." if len(ret) > 50 else ret,
|
||||
)
|
||||
return ret
|
||||
|
||||
def traverse(data, modifier):
|
||||
if isinstance(data, dict):
|
||||
for key, value in data.items():
|
||||
data[key] = traverse(value, modifier)
|
||||
return data
|
||||
elif isinstance(data, list):
|
||||
for i, item in enumerate(data):
|
||||
data[i] = traverse(item, modifier)
|
||||
return data
|
||||
elif isinstance(data, tuple):
|
||||
return tuple(traverse(item, modifier) for item in data)
|
||||
else:
|
||||
if isinstance(data, str):
|
||||
if data.startswith("eval:"):
|
||||
logger.debug("发现 eval 表达式: '%s'", data[5:])
|
||||
return modifier(data[5:])
|
||||
return data
|
||||
|
||||
try:
|
||||
traverse(self.registry["nucleon"].payload, eval_with_env)
|
||||
traverse(self.registry["nucleon"].metadata, eval_with_env)
|
||||
traverse(self.registry["orbital"], eval_with_env)
|
||||
except Exception as e:
|
||||
ret = f"此 eval 实例发生错误: {e}"
|
||||
logger.warning(ret)
|
||||
logger.debug("EVAL 完成")
|
||||
|
||||
def minimize(self, rating):
|
||||
"""效果等同于 self.registry['runtime']['min_rate'] = min(rating, self.registry['runtime']['min_rate'])
|
||||
|
||||
@@ -104,84 +156,13 @@ class Atom:
|
||||
"""
|
||||
if self.registry["runtime"]["locked"]:
|
||||
logger.debug(f"允许总评分: {self.registry['runtime']['min_rate']}")
|
||||
self.registry["electron"].revisor(self.registry["runtime"]["min_rate"], is_new_activation=self.registry["runtime"]["newact"])
|
||||
self.registry["electron"].revisor(
|
||||
self.registry["runtime"]["min_rate"],
|
||||
is_new_activation=self.registry["runtime"]["newact"],
|
||||
)
|
||||
else:
|
||||
logger.debug("禁止总评分")
|
||||
|
||||
def do_eval(self):
|
||||
"""
|
||||
执行并以结果替换当前单元的所有 eval 语句
|
||||
TODO: 带有限制的 eval, 异步/多线程执行避免堵塞
|
||||
"""
|
||||
logger.debug("Atom.do_eval 开始")
|
||||
|
||||
# eval 环境设置
|
||||
def eval_with_env(s: str):
|
||||
# 初始化默认值
|
||||
nucleon = self.registry["nucleon"]
|
||||
default = {}
|
||||
metadata = {}
|
||||
try:
|
||||
default = config_var.get()["puzzles"]
|
||||
metadata = nucleon.metadata
|
||||
except Exception:
|
||||
# 如果无法获取配置或元数据, 使用空字典
|
||||
logger.debug("无法获取配置或元数据, 使用空字典")
|
||||
pass
|
||||
try:
|
||||
eval_value = eval(s)
|
||||
if isinstance(eval_value, (list, dict)):
|
||||
ret = eval_value
|
||||
else:
|
||||
ret = str(eval_value)
|
||||
logger.debug(
|
||||
"eval 执行成功: '%s' -> '%s'",
|
||||
s,
|
||||
str(ret)[:50] + "..." if len(ret) > 50 else ret,
|
||||
)
|
||||
except Exception as e:
|
||||
ret = f"此 eval 实例发生错误: {e}"
|
||||
logger.warning("eval 执行错误: '%s' -> %s", s, e)
|
||||
return ret
|
||||
|
||||
def traverse(data, modifier):
|
||||
if isinstance(data, dict):
|
||||
for key, value in data.items():
|
||||
data[key] = traverse(value, modifier)
|
||||
return data
|
||||
elif isinstance(data, list):
|
||||
for i, item in enumerate(data):
|
||||
data[i] = traverse(item, modifier)
|
||||
return data
|
||||
elif isinstance(data, tuple):
|
||||
return tuple(traverse(item, modifier) for item in data)
|
||||
else:
|
||||
if isinstance(data, str):
|
||||
if data.startswith("eval:"):
|
||||
logger.debug("发现 eval 表达式: '%s'", data[5:])
|
||||
return modifier(data[5:])
|
||||
return data
|
||||
|
||||
# 如果 nucleon 存在且有 do_eval 方法, 调用它
|
||||
nucleon = self.registry["nucleon"]
|
||||
if nucleon is not None and hasattr(nucleon, "do_eval"):
|
||||
nucleon.do_eval()
|
||||
logger.debug("已调用 nucleon.do_eval")
|
||||
|
||||
# 如果 electron 存在且其 algodata 包含 eval 字符串, 遍历它
|
||||
electron = self.registry["electron"]
|
||||
if electron is not None and hasattr(electron, "algodata"):
|
||||
traverse(electron.algodata, eval_with_env)
|
||||
logger.debug("已处理 electron algodata eval")
|
||||
|
||||
# 如果 orbital 存在且是字典, 遍历它
|
||||
orbital = self.registry["orbital"]
|
||||
if orbital is not None and isinstance(orbital, dict):
|
||||
traverse(orbital, eval_with_env)
|
||||
logger.debug("orbital eval 完成")
|
||||
|
||||
logger.debug("Atom.do_eval 完成")
|
||||
|
||||
def persist(self, key):
|
||||
logger.debug("Atom.persist: key='%s'", key)
|
||||
path: pathlib.Path | None = self.registry[key + "_path"]
|
||||
|
||||
@@ -9,7 +9,7 @@ logger = get_logger(__name__)
|
||||
class Electron:
|
||||
"""电子: 记忆分析元数据及算法"""
|
||||
|
||||
def __init__(self, ident: str, algodata: dict = {}, algo_name: str = "supermemo2"):
|
||||
def __init__(self, ident: str, algodata: dict = {}, algo_name: str = ""):
|
||||
"""初始化电子对象 (记忆数据)
|
||||
|
||||
Args:
|
||||
@@ -17,19 +17,26 @@ class Electron:
|
||||
algodata: 算法数据字典, 包含算法的各项参数和设置
|
||||
algo: 使用的算法模块标识
|
||||
"""
|
||||
if algo_name == "":
|
||||
algo_name = config_var.get()["algorithm"]["default"]
|
||||
logger.debug(
|
||||
"创建 Electron 实例, ident: '%s', algo_name: '%s'", ident, algo_name
|
||||
"创建 Electron 实例, ident: '%s', algo_name: '%s', algodata: %s",
|
||||
ident,
|
||||
algo_name,
|
||||
algodata,
|
||||
)
|
||||
self.algodata = algodata
|
||||
self.ident = ident
|
||||
self.algo = algorithms[algo_name]
|
||||
logger.debug("使用的算法类: %s", self.algo.__name__)
|
||||
|
||||
if self.algo not in self.algodata.keys():
|
||||
if self.algo.algo_name not in self.algodata.keys():
|
||||
self.algodata[self.algo.algo_name] = {}
|
||||
logger.debug("算法键 '%s' 不存在, 已创建空字典", self.algo)
|
||||
if not self.algodata[self.algo.algo_name]:
|
||||
logger.debug("算法数据为空, 使用默认值初始化")
|
||||
logger.debug(
|
||||
f"算法数据为空, 使用默认值初始化{self.algodata[self.algo.algo_name]}"
|
||||
)
|
||||
self._default_init(self.algo.defaults)
|
||||
else:
|
||||
logger.debug("算法数据已存在, 跳过默认初始化")
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
from .nucleon import Nucleon
|
||||
from .electron import Electron
|
||||
import heurams.services.hasher as hasher
|
||||
import pathlib
|
||||
import toml
|
||||
import json
|
||||
import pathlib
|
||||
from copy import deepcopy
|
||||
|
||||
import toml
|
||||
|
||||
import heurams.services.hasher as hasher
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
from .electron import Electron
|
||||
from .nucleon import Nucleon
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
@@ -65,7 +68,7 @@ def load_electron(path: pathlib.Path, fmt="json") -> dict:
|
||||
logger.debug("JSON 解析成功, keys: %s", list(dictdata.keys()))
|
||||
dic = dict()
|
||||
for item, attr in dictdata.items():
|
||||
logger.debug("处理电子项目: %s", item)
|
||||
logger.debug("处理电子项目: %s, %s", item, attr)
|
||||
dic[item] = Electron(item, attr)
|
||||
logger.debug("load_electron 完成, 加载了 %d 个 Electron 对象", len(dic))
|
||||
return dic
|
||||
|
||||
@@ -49,54 +49,6 @@ class Nucleon:
|
||||
def __hash__(self):
|
||||
return hash(self.ident)
|
||||
|
||||
def do_eval(self):
|
||||
"""
|
||||
执行并以结果替换当前单元的所有 eval 语句
|
||||
TODO: 带有限制的 eval, 异步/多线程执行避免堵塞
|
||||
"""
|
||||
logger.debug("Nucleon.do_eval 开始")
|
||||
|
||||
# eval 环境设置
|
||||
def eval_with_env(s: str):
|
||||
try:
|
||||
nucleon = self
|
||||
eval_value = eval(s)
|
||||
if isinstance(eval_value, (int, float)):
|
||||
ret = str(eval_value)
|
||||
else:
|
||||
ret = eval_value
|
||||
logger.debug(
|
||||
"eval 执行成功: '%s' -> '%s'",
|
||||
s,
|
||||
str(ret)[:50] + "..." if len(ret) > 50 else ret,
|
||||
)
|
||||
except Exception as e:
|
||||
ret = f"此 eval 实例发生错误: {e}"
|
||||
logger.warning("eval 执行错误: '%s' -> %s", s, e)
|
||||
return ret
|
||||
|
||||
def traverse(data, modifier):
|
||||
if isinstance(data, dict):
|
||||
for key, value in data.items():
|
||||
data[key] = traverse(value, modifier)
|
||||
return data
|
||||
elif isinstance(data, list):
|
||||
for i, item in enumerate(data):
|
||||
data[i] = traverse(item, modifier)
|
||||
return data
|
||||
elif isinstance(data, tuple):
|
||||
return tuple(traverse(item, modifier) for item in data)
|
||||
else:
|
||||
if isinstance(data, str):
|
||||
if data.startswith("eval:"):
|
||||
logger.debug("发现 eval 表达式: '%s'", data[5:])
|
||||
return modifier(data[5:])
|
||||
return data
|
||||
|
||||
traverse(self.payload, eval_with_env)
|
||||
traverse(self.metadata, eval_with_env)
|
||||
logger.debug("Nucleon.do_eval 完成")
|
||||
|
||||
@staticmethod
|
||||
def placeholder():
|
||||
"""生成一个占位原子核"""
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
from typing import TypedDict
|
||||
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from heurams.context import config_var
|
||||
import pathlib
|
||||
|
||||
from heurams.context import config_var
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
@@ -8,7 +8,7 @@ class BasePuzzle:
|
||||
"""谜题基类"""
|
||||
|
||||
def refresh(self):
|
||||
logger.debug("BasePuzzle.refresh 被调用(未实现)")
|
||||
logger.debug("BasePuzzle.refresh 被调用(未实现)")
|
||||
raise NotImplementedError("谜题对象未实现 refresh 方法")
|
||||
|
||||
def __str__(self):
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
from .base import BasePuzzle
|
||||
import random
|
||||
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
from .base import BasePuzzle
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
# mcq.py
|
||||
from .base import BasePuzzle
|
||||
import random
|
||||
from typing import List, Dict, Optional, Union
|
||||
from typing import Dict, List, Optional, Union
|
||||
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
from .base import BasePuzzle
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
# mcq.py
|
||||
from .base import BasePuzzle
|
||||
import random
|
||||
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
from .base import BasePuzzle
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
@@ -14,5 +16,5 @@ class RecognitionPuzzle(BasePuzzle):
|
||||
super().__init__()
|
||||
|
||||
def refresh(self):
|
||||
logger.debug("RecognitionPuzzle.refresh(空实现)")
|
||||
logger.debug("RecognitionPuzzle.refresh(空实现)")
|
||||
pass
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
from .states import PhaserState, ProcessionState
|
||||
from .procession import Procession
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
from .fission import Fission
|
||||
from .phaser import Phaser
|
||||
from heurams.services.logger import get_logger
|
||||
from .procession import Procession
|
||||
from .states import PhaserState, ProcessionState
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
import random
|
||||
|
||||
import heurams.kernel.particles as pt
|
||||
import heurams.kernel.puzzles as puz
|
||||
import random
|
||||
from .states import PhaserState
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
from .states import PhaserState
|
||||
|
||||
|
||||
class Fission:
|
||||
"""裂变器: 单原子调度展开器"""
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
# 移相器类定义
|
||||
|
||||
import heurams.kernel.particles as pt
|
||||
from .states import PhaserState, ProcessionState
|
||||
from .procession import Procession
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
from .procession import Procession
|
||||
from .states import PhaserState, ProcessionState
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
import heurams.kernel.particles as pt
|
||||
from .states import PhaserState, ProcessionState
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
from .states import PhaserState, ProcessionState
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
@@ -51,7 +52,7 @@ class Procession:
|
||||
self.queue.append(atom)
|
||||
logger.debug("原子已追加到队列, 新队列长度=%d", len(self.queue))
|
||||
else:
|
||||
logger.debug("原子未追加(重复或队列长度<=1)")
|
||||
logger.debug("原子未追加(重复或队列长度<=1)")
|
||||
|
||||
def __len__(self):
|
||||
length = len(self.queue) - self.cursor
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
from enum import Enum, auto
|
||||
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
# 音频播放器, 必须基于文件操作
|
||||
from . import termux_audio
|
||||
from . import playsound_audio
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
from . import playsound_audio, termux_audio
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
__all__ = [
|
||||
|
||||
@@ -5,7 +5,9 @@
|
||||
|
||||
import os
|
||||
import pathlib
|
||||
|
||||
import playsound
|
||||
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from typing import Protocol
|
||||
import pathlib
|
||||
from typing import Protocol
|
||||
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
|
||||
import os
|
||||
import pathlib
|
||||
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
@@ -2,4 +2,4 @@ from heurams.services.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
logger.debug("OpenAI provider 模块已加载(未实现)")
|
||||
logger.debug("OpenAI provider 模块已加载(未实现)")
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
from .base import BaseTTS
|
||||
from .edge_tts import EdgeTTS
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import pathlib
|
||||
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
from .base import BaseTTS
|
||||
import pathlib
|
||||
|
||||
import edge_tts
|
||||
|
||||
from heurams.context import config_var
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
from .base import BaseTTS
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
@@ -15,7 +19,7 @@ class EdgeTTS(BaseTTS):
|
||||
try:
|
||||
communicate = edge_tts.Communicate(
|
||||
text,
|
||||
"zh-CN-YunjianNeural",
|
||||
config_var.get()["providers"]["tts"]["edgetts"]["voice"],
|
||||
)
|
||||
logger.debug("EdgeTTS 通信对象创建成功, 正在保存音频")
|
||||
communicate.save_sync(str(path))
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
# 音频服务
|
||||
from typing import Callable
|
||||
|
||||
from heurams.context import config_var
|
||||
from heurams.providers.audio import providers as prov
|
||||
from typing import Callable
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
play_by_path: Callable = prov[config_var.get()["services"]["audio"]].play_by_path
|
||||
logger.debug(
|
||||
"音频服务初始化完成, 使用 provider: %s", config_var.get()["services"]["audio"]
|
||||
"音频服务初始化完成, 使用 Provider: %s", config_var.get()["services"]["audio"]
|
||||
)
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
# 配置文件服务
|
||||
import pathlib
|
||||
import toml
|
||||
import typing
|
||||
|
||||
import toml
|
||||
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
# 哈希服务
|
||||
import hashlib
|
||||
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
@@ -45,7 +45,7 @@ def setup_logging(
|
||||
# 创建formatter
|
||||
formatter = logging.Formatter(log_format, date_format)
|
||||
|
||||
# 创建文件handler(使用RotatingFileHandler防止日志过大)
|
||||
# 创建文件handler(使用RotatingFileHandler防止日志过大)
|
||||
file_handler = logging.handlers.RotatingFileHandler(
|
||||
filename=log_path,
|
||||
maxBytes=max_bytes,
|
||||
@@ -55,7 +55,7 @@ def setup_logging(
|
||||
file_handler.setFormatter(formatter)
|
||||
file_handler.setLevel(log_level)
|
||||
|
||||
# 配置root logger - 设置为 WARNING 级别(只记录重要信息)
|
||||
# 配置root logger - 设置为 WARNING 级别(只记录重要信息)
|
||||
root_logger = logging.getLogger()
|
||||
root_logger.setLevel(logging.WARNING) # 这里改为 WARNING
|
||||
|
||||
@@ -63,7 +63,7 @@ def setup_logging(
|
||||
for handler in root_logger.handlers[:]:
|
||||
root_logger.removeHandler(handler)
|
||||
|
||||
# 创建自己的应用logger(单独设置DEBUG级别)
|
||||
# 创建自己的应用logger(单独设置DEBUG级别)
|
||||
app_logger = logging.getLogger("heurams")
|
||||
app_logger.setLevel(log_level) # 保持DEBUG级别
|
||||
app_logger.addHandler(file_handler)
|
||||
@@ -146,7 +146,7 @@ def exception(msg: str, *args, **kwargs) -> None:
|
||||
get_logger().exception(msg, *args, **kwargs)
|
||||
|
||||
|
||||
# 初始化日志系统(硬编码配置)
|
||||
# 初始化日志系统(硬编码配置)
|
||||
setup_logging()
|
||||
|
||||
|
||||
|
||||
0
src/heurams/services/sync_service.py
Normal file
0
src/heurams/services/sync_service.py
Normal file
@@ -1,6 +1,7 @@
|
||||
# 时间服务
|
||||
from heurams.context import config_var
|
||||
import time
|
||||
|
||||
from heurams.context import config_var
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
# 文本转语音服务
|
||||
from heurams.context import config_var
|
||||
from heurams.providers.tts import TTSs
|
||||
from typing import Callable
|
||||
|
||||
from heurams.context import config_var
|
||||
from heurams.providers.tts import providers as prov
|
||||
from heurams.services.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
convert: Callable = TTSs[config_var.get().get("tts_provider")]
|
||||
convertor: Callable = prov[config_var.get()["services"]["tts"]].convert
|
||||
logger.debug(
|
||||
"TTS服务初始化完成, 使用 provider: %s", config_var.get().get("tts_provider")
|
||||
"TTS服务初始化完成, 使用 provider: %s", config_var.get()["services"]["tts"]
|
||||
)
|
||||
|
||||
@@ -3,7 +3,7 @@ from heurams.services.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
ver = "0.4.0"
|
||||
ver = "0.4.3"
|
||||
stage = "prototype"
|
||||
codename = "fledge" # 雏鸟, 0.4.x 版本
|
||||
|
||||
|
||||
20
src/heurams/services/vfs.py
Normal file
20
src/heurams/services/vfs.py
Normal file
@@ -0,0 +1,20 @@
|
||||
"""vfs.py
|
||||
得益于 FSSpec, 无需实现大部分虚拟文件系统的 Providers
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import fsspec as fs
|
||||
|
||||
|
||||
class VFSObject:
|
||||
def __init__(self, protocol, base_url):
|
||||
self.base_url = base_url
|
||||
self.protocol = protocol
|
||||
self.fs = fs.filesystem(protocol=protocol, base_url=base_url)
|
||||
|
||||
def open(self, path: Path):
|
||||
return self.fs.open(path)
|
||||
|
||||
def open_by_list(self, path_list: list[Path]):
|
||||
return self.fs.open_files(path_list)
|
||||
0
src/heurams/services/vfs/zipfs.py
Normal file
0
src/heurams/services/vfs/zipfs.py
Normal file
@@ -2,21 +2,22 @@
|
||||
"""
|
||||
DashboardScreen 的测试, 包括单元测试和 pilot 测试.
|
||||
"""
|
||||
import unittest
|
||||
import tempfile
|
||||
import pathlib
|
||||
import tempfile
|
||||
import time
|
||||
from unittest.mock import patch, MagicMock
|
||||
import unittest
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from textual.pilot import Pilot
|
||||
|
||||
from heurams.context import ConfigContext
|
||||
from heurams.services.config import ConfigFile
|
||||
from heurams.interface.__main__ import HeurAMSApp
|
||||
from heurams.interface.screens.dashboard import DashboardScreen
|
||||
from heurams.services.config import ConfigFile
|
||||
|
||||
|
||||
class TestDashboardScreenUnit(unittest.TestCase):
|
||||
"""DashboardScreen 的单元测试(不启动完整应用). """
|
||||
"""DashboardScreen 的单元测试(不启动完整应用)."""
|
||||
|
||||
def setUp(self):
|
||||
"""在每个测试之前运行, 设置临时目录和配置."""
|
||||
@@ -66,7 +67,7 @@ class TestDashboardScreenUnit(unittest.TestCase):
|
||||
result = screen.compose()
|
||||
widgets = list(result)
|
||||
# 检查是否包含 Header 和 Footer
|
||||
from textual.widgets import Header, Footer
|
||||
from textual.widgets import Footer, Header
|
||||
|
||||
header_present = any(isinstance(w, Header) for w in widgets)
|
||||
footer_present = any(isinstance(w, Footer) for w in widgets)
|
||||
@@ -88,7 +89,7 @@ class TestDashboardScreenUnit(unittest.TestCase):
|
||||
screen = DashboardScreen()
|
||||
# 模拟一个文件名
|
||||
filename = "test.toml"
|
||||
result = screen.item_desc_generator(filename)
|
||||
result = screen.analyser(filename)
|
||||
self.assertIsInstance(result, dict)
|
||||
self.assertIn(0, result)
|
||||
self.assertIn(1, result)
|
||||
|
||||
314
tests/interface/test_synctool.py
Normal file
314
tests/interface/test_synctool.py
Normal file
@@ -0,0 +1,314 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
SyncScreen 和 SyncService 的测试.
|
||||
"""
|
||||
import pathlib
|
||||
import tempfile
|
||||
import time
|
||||
import unittest
|
||||
from unittest.mock import MagicMock, Mock, patch
|
||||
|
||||
from heurams.context import ConfigContext
|
||||
from heurams.services.config import ConfigFile
|
||||
from heurams.services.sync_service import (ConflictStrategy, SyncConfig,
|
||||
SyncMode, SyncService)
|
||||
|
||||
|
||||
class TestSyncServiceUnit(unittest.TestCase):
|
||||
"""SyncService 的单元测试."""
|
||||
|
||||
def setUp(self):
|
||||
"""在每个测试之前运行, 设置临时目录和模拟客户端."""
|
||||
self.temp_dir = tempfile.TemporaryDirectory()
|
||||
self.temp_path = pathlib.Path(self.temp_dir.name)
|
||||
|
||||
# 创建测试文件
|
||||
self.test_file = self.temp_path / "test.txt"
|
||||
self.test_file.write_text("测试内容")
|
||||
|
||||
# 模拟 WebDAV 客户端
|
||||
self.mock_client = MagicMock()
|
||||
|
||||
# 创建同步配置
|
||||
self.config = SyncConfig(
|
||||
enabled=True,
|
||||
url="https://example.com/dav/",
|
||||
username="test",
|
||||
password="test",
|
||||
remote_path="/heurams/",
|
||||
sync_mode=SyncMode.BIDIRECTIONAL,
|
||||
conflict_strategy=ConflictStrategy.NEWER,
|
||||
verify_ssl=True,
|
||||
)
|
||||
|
||||
def tearDown(self):
|
||||
"""在每个测试之后清理."""
|
||||
self.temp_dir.cleanup()
|
||||
|
||||
@patch("heurams.services.sync_service.Client")
|
||||
def test_sync_service_initialization(self, mock_client_class):
|
||||
"""测试同步服务初始化."""
|
||||
mock_client_class.return_value = self.mock_client
|
||||
|
||||
service = SyncService(self.config)
|
||||
|
||||
# 验证客户端已创建
|
||||
mock_client_class.assert_called_once()
|
||||
self.assertIsNotNone(service.client)
|
||||
self.assertEqual(service.config, self.config)
|
||||
|
||||
@patch("heurams.services.sync_service.Client")
|
||||
def test_sync_service_disabled(self, mock_client_class):
|
||||
"""测试同步服务未启用."""
|
||||
config = SyncConfig(enabled=False)
|
||||
service = SyncService(config)
|
||||
|
||||
# 客户端不应初始化
|
||||
mock_client_class.assert_not_called()
|
||||
self.assertIsNone(service.client)
|
||||
|
||||
@patch("heurams.services.sync_service.Client")
|
||||
def test_test_connection_success(self, mock_client_class):
|
||||
"""测试连接测试成功."""
|
||||
mock_client_class.return_value = self.mock_client
|
||||
self.mock_client.list.return_value = []
|
||||
|
||||
service = SyncService(self.config)
|
||||
result = service.test_connection()
|
||||
|
||||
self.assertTrue(result)
|
||||
self.mock_client.list.assert_called_once()
|
||||
|
||||
@patch("heurams.services.sync_service.Client")
|
||||
def test_test_connection_failure(self, mock_client_class):
|
||||
"""测试连接测试失败."""
|
||||
mock_client_class.return_value = self.mock_client
|
||||
self.mock_client.list.side_effect = Exception("连接失败")
|
||||
|
||||
service = SyncService(self.config)
|
||||
result = service.test_connection()
|
||||
|
||||
self.assertFalse(result)
|
||||
self.mock_client.list.assert_called_once()
|
||||
|
||||
@patch("heurams.services.sync_service.Client")
|
||||
def test_upload_file(self, mock_client_class):
|
||||
"""测试上传单个文件."""
|
||||
mock_client_class.return_value = self.mock_client
|
||||
|
||||
service = SyncService(self.config)
|
||||
result = service.upload_file(self.test_file)
|
||||
|
||||
self.assertTrue(result)
|
||||
self.mock_client.upload_file.assert_called_once()
|
||||
|
||||
@patch("heurams.services.sync_service.Client")
|
||||
def test_download_file(self, mock_client_class):
|
||||
"""测试下载单个文件."""
|
||||
mock_client_class.return_value = self.mock_client
|
||||
|
||||
service = SyncService(self.config)
|
||||
remote_path = "/heurams/test.txt"
|
||||
local_path = self.temp_path / "downloaded.txt"
|
||||
|
||||
result = service.download_file(remote_path, local_path)
|
||||
|
||||
self.assertTrue(result)
|
||||
self.mock_client.download_file.assert_called_once()
|
||||
self.assertTrue(local_path.parent.exists())
|
||||
|
||||
@patch("heurams.services.sync_service.Client")
|
||||
def test_sync_directory_no_files(self, mock_client_class):
|
||||
"""测试同步空目录."""
|
||||
mock_client_class.return_value = self.mock_client
|
||||
self.mock_client.list.return_value = []
|
||||
self.mock_client.mkdir.return_value = None
|
||||
|
||||
service = SyncService(self.config)
|
||||
result = service.sync_directory(self.temp_path)
|
||||
|
||||
self.assertTrue(result["success"])
|
||||
self.assertEqual(result["uploaded"], 0)
|
||||
self.assertEqual(result["downloaded"], 0)
|
||||
self.mock_client.mkdir.assert_called_once()
|
||||
|
||||
@patch("heurams.services.sync_service.Client")
|
||||
def test_sync_directory_upload_only(self, mock_client_class):
|
||||
"""测试仅上传模式."""
|
||||
mock_client_class.return_value = self.mock_client
|
||||
self.mock_client.list.return_value = []
|
||||
self.mock_client.mkdir.return_value = None
|
||||
|
||||
config = SyncConfig(
|
||||
enabled=True,
|
||||
url="https://example.com/dav/",
|
||||
username="test",
|
||||
password="test",
|
||||
remote_path="/heurams/",
|
||||
sync_mode=SyncMode.UPLOAD_ONLY,
|
||||
conflict_strategy=ConflictStrategy.NEWER,
|
||||
)
|
||||
|
||||
service = SyncService(config)
|
||||
result = service.sync_directory(self.temp_path)
|
||||
|
||||
self.assertTrue(result["success"])
|
||||
self.mock_client.mkdir.assert_called_once()
|
||||
|
||||
@patch("heurams.services.sync_service.Client")
|
||||
def test_conflict_strategy_newer(self, mock_client_class):
|
||||
"""测试 NEWER 冲突策略."""
|
||||
mock_client_class.return_value = self.mock_client
|
||||
|
||||
# 模拟远程文件存在
|
||||
self.mock_client.list.return_value = ["test.txt"]
|
||||
self.mock_client.info.return_value = {
|
||||
"size": 100,
|
||||
"modified": "2023-01-01T00:00:00Z",
|
||||
}
|
||||
self.mock_client.mkdir.return_value = None
|
||||
|
||||
service = SyncService(self.config)
|
||||
result = service.sync_directory(self.temp_path)
|
||||
|
||||
self.assertTrue(result["success"])
|
||||
# 应该有一个冲突
|
||||
self.assertGreaterEqual(result.get("conflicts", 0), 0)
|
||||
|
||||
@patch("heurams.services.sync_service.Client")
|
||||
def test_create_sync_service_from_config(self, mock_client_class):
|
||||
"""测试从配置文件创建同步服务."""
|
||||
mock_client_class.return_value = self.mock_client
|
||||
|
||||
# 创建临时配置文件
|
||||
config_data = {
|
||||
"sync": {
|
||||
"webdav": {
|
||||
"enabled": True,
|
||||
"url": "https://example.com/dav/",
|
||||
"username": "test",
|
||||
"password": "test",
|
||||
"remote_path": "/heurams/",
|
||||
"sync_mode": "bidirectional",
|
||||
"conflict_strategy": "newer",
|
||||
"verify_ssl": True,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# 模拟 config_var
|
||||
with patch("heurams.services.sync_service.config_var") as mock_config_var:
|
||||
mock_config = MagicMock()
|
||||
mock_config.data = config_data
|
||||
mock_config_var.get.return_value = mock_config
|
||||
|
||||
from heurams.services.sync_service import \
|
||||
create_sync_service_from_config
|
||||
|
||||
service = create_sync_service_from_config()
|
||||
|
||||
self.assertIsNotNone(service)
|
||||
self.assertIsNotNone(service.client)
|
||||
|
||||
|
||||
class TestSyncScreenUnit(unittest.TestCase):
|
||||
"""SyncScreen 的单元测试."""
|
||||
|
||||
def setUp(self):
|
||||
"""在每个测试之前运行."""
|
||||
self.temp_dir = tempfile.TemporaryDirectory()
|
||||
self.temp_path = pathlib.Path(self.temp_dir.name)
|
||||
|
||||
# 创建默认配置
|
||||
default_config_path = (
|
||||
pathlib.Path(__file__).parent.parent.parent
|
||||
/ "src/heurams/default/config/config.toml"
|
||||
)
|
||||
self.config = ConfigFile(default_config_path)
|
||||
|
||||
# 更新配置中的路径
|
||||
config_data = self.config.data
|
||||
config_data["paths"]["nucleon_dir"] = str(self.temp_path / "nucleon")
|
||||
config_data["paths"]["electron_dir"] = str(self.temp_path / "electron")
|
||||
config_data["paths"]["orbital_dir"] = str(self.temp_path / "orbital")
|
||||
config_data["paths"]["cache_dir"] = str(self.temp_path / "cache")
|
||||
|
||||
# 添加同步配置
|
||||
if "sync" not in config_data:
|
||||
config_data["sync"] = {}
|
||||
config_data["sync"]["webdav"] = {
|
||||
"enabled": False,
|
||||
"url": "",
|
||||
"username": "",
|
||||
"password": "",
|
||||
"remote_path": "/heurams/",
|
||||
"sync_mode": "bidirectional",
|
||||
"conflict_strategy": "newer",
|
||||
"verify_ssl": True,
|
||||
}
|
||||
|
||||
# 创建目录
|
||||
for dir_key in ["nucleon_dir", "electron_dir", "orbital_dir", "cache_dir"]:
|
||||
pathlib.Path(config_data["paths"][dir_key]).mkdir(
|
||||
parents=True, exist_ok=True
|
||||
)
|
||||
|
||||
# 使用 ConfigContext 设置配置
|
||||
self.config_ctx = ConfigContext(self.config)
|
||||
self.config_ctx.__enter__()
|
||||
|
||||
def tearDown(self):
|
||||
"""在每个测试之后清理."""
|
||||
self.config_ctx.__exit__(None, None, None)
|
||||
self.temp_dir.cleanup()
|
||||
|
||||
@patch("heurams.interface.screens.synctool.create_sync_service_from_config")
|
||||
def test_sync_screen_compose(self, mock_create_service):
|
||||
"""测试 SyncScreen 的 compose 方法."""
|
||||
from heurams.interface.screens.synctool import SyncScreen
|
||||
|
||||
# 模拟同步服务
|
||||
mock_service = MagicMock()
|
||||
mock_service.client = MagicMock()
|
||||
mock_create_service.return_value = mock_service
|
||||
|
||||
screen = SyncScreen()
|
||||
|
||||
# 测试 compose 方法
|
||||
from textual.app import ComposeResult
|
||||
|
||||
result = screen.compose()
|
||||
widgets = list(result)
|
||||
|
||||
# 检查基本部件
|
||||
from textual.containers import ScrollableContainer
|
||||
from textual.widgets import Button, Footer, Header, ProgressBar, Static
|
||||
|
||||
header_present = any(isinstance(w, Header) for w in widgets)
|
||||
footer_present = any(isinstance(w, Footer) for w in widgets)
|
||||
self.assertTrue(header_present)
|
||||
self.assertTrue(footer_present)
|
||||
|
||||
# 检查容器
|
||||
container_present = any(isinstance(w, ScrollableContainer) for w in widgets)
|
||||
self.assertTrue(container_present)
|
||||
|
||||
@patch("heurams.interface.screens.synctool.create_sync_service_from_config")
|
||||
def test_sync_screen_load_config(self, mock_create_service):
|
||||
"""测试 SyncScreen 加载配置."""
|
||||
from heurams.interface.screens.synctool import SyncScreen
|
||||
|
||||
mock_service = MagicMock()
|
||||
mock_service.client = MagicMock()
|
||||
mock_create_service.return_value = mock_service
|
||||
|
||||
screen = SyncScreen()
|
||||
screen.load_config()
|
||||
|
||||
# 验证配置已加载
|
||||
self.assertIsNotNone(screen.sync_config)
|
||||
mock_create_service.assert_called_once()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -1,5 +1,5 @@
|
||||
import unittest
|
||||
from unittest.mock import patch, MagicMock
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from heurams.kernel.algorithms.sm2 import SM2Algorithm
|
||||
|
||||
@@ -94,7 +94,7 @@ class TestSM2Algorithm(unittest.TestCase):
|
||||
SM2Algorithm.revisor(algodata, feedback=5, is_new_activation=True)
|
||||
self.assertEqual(algodata[SM2Algorithm.algo_name]["rept"], 0)
|
||||
self.assertEqual(algodata[SM2Algorithm.algo_name]["efactor"], 2.5)
|
||||
# interval 应为 1(因为 rept=0)
|
||||
# interval 应为 1(因为 rept=0)
|
||||
self.assertEqual(algodata[SM2Algorithm.algo_name]["interval"], 1)
|
||||
|
||||
def test_revisor_efactor_calculation(self):
|
||||
|
||||
@@ -1,15 +1,16 @@
|
||||
import unittest
|
||||
from unittest.mock import patch, MagicMock
|
||||
import json
|
||||
import pathlib
|
||||
import tempfile
|
||||
import toml
|
||||
import json
|
||||
import unittest
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import toml
|
||||
|
||||
from heurams.context import ConfigContext
|
||||
from heurams.kernel.particles.atom import Atom, atom_registry
|
||||
from heurams.kernel.particles.electron import Electron
|
||||
from heurams.kernel.particles.nucleon import Nucleon
|
||||
from heurams.kernel.particles.orbital import Orbital
|
||||
from heurams.context import ConfigContext
|
||||
from heurams.services.config import ConfigFile
|
||||
|
||||
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import unittest
|
||||
from unittest.mock import patch, MagicMock
|
||||
import sys
|
||||
import unittest
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from heurams.kernel.particles.electron import Electron
|
||||
from heurams.kernel.algorithms import algorithms
|
||||
from heurams.kernel.particles.electron import Electron
|
||||
|
||||
|
||||
class TestElectron(unittest.TestCase):
|
||||
@@ -27,7 +27,7 @@ class TestElectron(unittest.TestCase):
|
||||
self.assertEqual(electron.algo, algorithms["supermemo2"])
|
||||
self.assertIn(electron.algo, electron.algodata)
|
||||
self.assertIsInstance(electron.algodata[electron.algo], dict)
|
||||
# 检查默认值(排除动态字段)
|
||||
# 检查默认值(排除动态字段)
|
||||
defaults = electron.algo.defaults
|
||||
for key, value in defaults.items():
|
||||
if key == "last_modify":
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import unittest
|
||||
from unittest.mock import patch, MagicMock
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from heurams.kernel.particles.nucleon import Nucleon
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import unittest
|
||||
from unittest.mock import patch, MagicMock
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from heurams.kernel.puzzles.cloze import ClozePuzzle
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import unittest
|
||||
from unittest.mock import patch, MagicMock, call
|
||||
from unittest.mock import MagicMock, call, patch
|
||||
|
||||
from heurams.kernel.puzzles.mcq import MCQPuzzle
|
||||
|
||||
@@ -48,7 +48,7 @@ class TestMCQPuzzle(unittest.TestCase):
|
||||
# 模拟 random.sample 返回前两个映射项
|
||||
mock_sample.side_effect = [
|
||||
[("q1", "a1"), ("q2", "a2")], # 选择问题
|
||||
["j1", "j2", "j3"], # 为每个问题选择干扰项(实际调用两次)
|
||||
["j1", "j2", "j3"], # 为每个问题选择干扰项(实际调用两次)
|
||||
]
|
||||
puzzle.refresh()
|
||||
|
||||
@@ -59,7 +59,7 @@ class TestMCQPuzzle(unittest.TestCase):
|
||||
self.assertEqual(puzzle.answer, ["a1", "a2"])
|
||||
# 检查 options 列表
|
||||
self.assertEqual(len(puzzle.options), 2)
|
||||
# 每个选项列表应包含 4 个选项(正确答案 + 3 个干扰项)
|
||||
# 每个选项列表应包含 4 个选项(正确答案 + 3 个干扰项)
|
||||
self.assertEqual(len(puzzle.options[0]), 4)
|
||||
self.assertEqual(len(puzzle.options[1]), 4)
|
||||
# random.shuffle 应被调用
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
import unittest
|
||||
from unittest.mock import Mock, patch, MagicMock
|
||||
from unittest.mock import MagicMock, Mock, patch
|
||||
|
||||
from heurams.kernel.reactor.phaser import Phaser
|
||||
from heurams.kernel.reactor.states import PhaserState, ProcessionState
|
||||
from heurams.kernel.particles.atom import Atom
|
||||
from heurams.kernel.particles.electron import Electron
|
||||
from heurams.kernel.reactor.phaser import Phaser
|
||||
from heurams.kernel.reactor.states import PhaserState, ProcessionState
|
||||
|
||||
|
||||
class TestPhaser(unittest.TestCase):
|
||||
|
||||
Reference in New Issue
Block a user