0.4.2 版本合并

This commit is contained in:
2025-12-21 06:49:38 +08:00
23 changed files with 200 additions and 234 deletions

4
.gitignore vendored
View File

@@ -15,8 +15,10 @@ old/
data/cache/
data/electron/
data/nucleon/
!data/nucleon/test*
data/global/
!data/nucleon/TEST*
data/orbital/
config/config_dev.toml
AGENTS.md
# Byte-compiled / optimized / DLL files

View File

@@ -10,13 +10,15 @@
- `dev` 分支: 开发版本
- 功能分支: 从 `dev` 分支创建, 命名格式为 `feature/描述``fix/描述``refactor/描述`
2. **代码风格**:
- 请使用 Black 格式化代码
- 请使用 Black 与 isort 格式化代码
- 遵循 PEP 8 规范
- 添加适当的文档字符串
3. **提交消息**:
- 使用简体中文或英文撰写清晰的提交消息
- 格式: 遵循 Conventional Commits 规范
4. **合并方式**:
- 不使用 Fast-forward 合并
- 可以设置 `git config merge.ff false`
## 设置开发环境
```bash

View File

@@ -14,6 +14,14 @@ scheduled_num = 8
# UTC 时间戳修正 仅用于 UNIX 日时间戳的生成修正, 单位为秒
timezone_offset = +28800 # 中国标准时间 (UTC+8)
[interface]
[interface.memorizor]
autovoice = true # 自动语音播放, 仅限于 recognition 组件
[algorithm]
default = "SM-2" # 主要算法; 可选项: SM-2, SM-15M, FSRS
[puzzles] # 谜题默认配置
[puzzles.mcq]
@@ -25,6 +33,7 @@ min_denominator = 3
[paths] # 相对于配置文件的 ".." (即工作目录) 而言 或绝对路径
nucleon_dir = "./data/nucleon"
electron_dir = "./data/electron"
global_dir = "./data/global" # 全局数据路径, SM-15 等算法需要
orbital_dir = "./data/orbital"
cache_dir = "./data/cache"
template_dir = "./data/template"
@@ -34,6 +43,9 @@ audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Andro
tts = "edgetts" # 可选项: edgetts
llm = "openai" # 可选项: openai
[providers.tts.edgetts] # EdgeTTS 设置
voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声)
[providers.llm.openai] # 与 OpenAI 相容的语言模型接口服务设置
url = ""
key = ""

View File

@@ -25,3 +25,4 @@ readme = "README.md"
[tool.setuptools.packages.find]
where = ["src"]

View File

@@ -32,7 +32,11 @@ try:
except Exception as e:
print("未能加载自定义用户配置")
logger.warning("未能加载自定义用户配置, 错误: %s", e)
if pathlib.Path(rootdir / "default" / "config" / "config_dev.toml").exists():
logger.debug("使用开发设置")
config_var: ContextVar[ConfigFile] = ContextVar(
"config_var", default=ConfigFile(workdir / "config" / "config_dev.toml")
)
# runtime_var: ContextVar = ContextVar('runtime_var', default=dict()) # 运行时共享数据

View File

@@ -0,0 +1,60 @@
from textual.app import App
from textual.widgets import Button
from heurams.context import config_var
from heurams.services.logger import get_logger
from .screens.about import AboutScreen
from .screens.dashboard import DashboardScreen
from .screens.nucreator import NucleonCreatorScreen
from .screens.precache import PrecachingScreen
logger = get_logger(__name__)
def environment_check():
from pathlib import Path
logger.debug("检查环境路径")
for i in config_var.get()["paths"].values():
i = Path(i)
if not i.exists():
logger.info("创建目录: %s", i)
print(f"创建 {i}")
i.mkdir(exist_ok=True, parents=True)
else:
logger.debug("目录已存在: %s", i)
print(f"找到 {i}")
logger.debug("环境检查完成")
class HeurAMSApp(App):
TITLE = "潜进"
CSS_PATH = "css/main.tcss"
SUB_TITLE = "启发式辅助记忆调度器"
BINDINGS = [
("q", "quit", "退出"),
("d", "toggle_dark", "切换色调"),
("1", "app.push_screen('dashboard')", "仪表盘"),
("2", "app.push_screen('precache_all')", "缓存管理器"),
("3", "app.push_screen('nucleon_creator')", "创建新单元"),
("0", "app.push_screen('about')", "版本信息"),
]
SCREENS = {
"dashboard": DashboardScreen,
"nucleon_creator": NucleonCreatorScreen,
"precache_all": PrecachingScreen,
"about": AboutScreen,
}
def on_mount(self) -> None:
environment_check()
self.push_screen("dashboard")
def on_button_pressed(self, event: Button.Pressed) -> None:
self.exit(event.button.id)
def action_do_nothing(self):
print("DO NOTHING")
self.refresh()

View File

@@ -2,6 +2,8 @@ from textual.app import App
from textual.widgets import Button
from heurams.services.logger import get_logger
from heurams.context import config_var
from heurams.interface import HeurAMSApp
from .screens.about import AboutScreen
from .screens.dashboard import DashboardScreen
@@ -10,80 +12,7 @@ from .screens.precache import PrecachingScreen
logger = get_logger(__name__)
class HeurAMSApp(App):
TITLE = "潜进"
CSS_PATH = "css/main.tcss"
SUB_TITLE = "启发式辅助记忆调度器"
BINDINGS = [
("q", "quit", "退出"),
("d", "toggle_dark", "切换色调"),
("1", "app.push_screen('dashboard')", "仪表盘"),
("2", "app.push_screen('precache_all')", "缓存管理器"),
("3", "app.push_screen('nucleon_creator')", "创建新单元"),
("0", "app.push_screen('about')", "版本信息"),
]
SCREENS = {
"dashboard": DashboardScreen,
"nucleon_creator": NucleonCreatorScreen,
"precache_all": PrecachingScreen,
"about": AboutScreen,
}
def on_mount(self) -> None:
self.push_screen("dashboard")
def on_button_pressed(self, event: Button.Pressed) -> None:
self.exit(event.button.id)
def action_do_nothing(self):
print("DO NOTHING")
self.refresh()
def environment_check():
from pathlib import Path
logger.debug("检查环境路径")
for i in config_var.get()["paths"].values():
i = Path(i)
if not i.exists():
logger.info("创建目录: %s", i)
print(f"创建 {i}")
i.mkdir(exist_ok=True, parents=True)
else:
logger.debug("目录已存在: %s", i)
print(f"找到 {i}")
logger.debug("环境检查完成")
def is_subdir(parent, child):
try:
child.relative_to(parent)
logger.debug("is_subdir: %s%s 的子目录", child, parent)
return 1
except:
logger.debug("is_subdir: %s 不是 %s 的子目录", child, parent)
return 0
import os
from pathlib import Path
# 开发模式
from heurams.context import config_var, rootdir, workdir
if is_subdir(Path(rootdir), Path(os.getcwd())):
os.chdir(Path(rootdir) / ".." / "..")
print(f'转入开发数据目录: {Path(rootdir)/".."/".."}')
environment_check()
app = HeurAMSApp()
if __name__ == "__main__":
app.run()
def main():
app.run()
app.run()

View File

@@ -74,7 +74,7 @@ class DashboardScreen(Screen):
is_activated = 1
nextdate = min(nextdate, i.nextdate())
res[1] = f"下一次复习: {nextdate}\n"
res[1] += f"{is_due if "需要复习" else "当前无需复习"}"
res[1] += f"{"需要复习" if is_due else "当前无需复习"}"
if not is_activated:
res[1] = " 尚未激活"
return res

View File

@@ -46,14 +46,14 @@ class MemScreen(Screen):
) -> None:
super().__init__(name, id, classes)
self.atoms = atoms
for i in self.atoms:
i.do_eval()
self.phaser = Phaser(atoms)
# logger.debug(self.phaser.state)
self.procession: Procession = self.phaser.current_procession() # type: ignore
self.atom: pt.Atom = self.procession.current_atom
# logger.debug(self.phaser.state)
# self.procession.forward(1)
for i in atoms:
i.do_eval()
def on_mount(self):
self.load_puzzle()
@@ -144,8 +144,21 @@ class MemScreen(Screen):
self.atom.lock(1)
def action_play_voice(self):
self.run_worker(self.play_voice, exclusive=True, thread=True)
def play_voice(self):
"""朗读当前内容"""
pass
from heurams.services.audio_service import play_by_path
from pathlib import Path
from heurams.services.hasher import get_md5
path = Path(config_var.get()['paths']["cache_dir"])
path = path / f"{get_md5(self.atom.registry['nucleon'].metadata["formation"]["tts_text"])}.wav"
if path.exists():
play_by_path(path)
else:
from heurams.services.tts_service import convertor
convertor(self.atom.registry['nucleon'].metadata["formation"]["tts_text"], path)
play_by_path(path)
def action_toggle_dark(self):
self.app.action_toggle_dark()

View File

@@ -39,7 +39,9 @@ class PrecachingScreen(Screen):
self.desc = desc
for i in nucleons:
i: pt.Nucleon
i.do_eval()
atom = pt.Atom()
atom.link("nucleon", i)
atom.do_eval()
# print("完成 EVAL")
def compose(self) -> ComposeResult:
@@ -95,11 +97,9 @@ class PrecachingScreen(Screen):
cache_dir.mkdir(parents=True, exist_ok=True)
cache_file = cache_dir / f"{hasher.get_md5(text)}.wav"
if not cache_file.exists():
try: # TODO: 调用模块消除tts耦合
import edge_tts as tts
communicate = tts.Communicate(text, "zh-CN-XiaoxiaoNeural")
communicate.save_sync(str(cache_file))
try:
from heurams.services.tts_service import convertor
convertor(text, cache_file)
return 1
except Exception as e:
print(f"预缓存失败 '{text}': {e}")
@@ -178,7 +178,9 @@ class PrecachingScreen(Screen):
self.total = len(nu)
for i in nu:
i: pt.Nucleon
i.do_eval()
atom = pt.Atom()
atom.link("nucleon", i)
atom.do_eval()
return self.precache_by_list(nu)
def on_button_pressed(self, event: Button.Pressed) -> None:

View File

@@ -61,12 +61,17 @@ class MCQPuzzle(BasePuzzleWidget):
self.puzzle.refresh()
def compose(self):
self.atom.registry["nucleon"].do_eval()
setting: Setting = self.atom.registry["nucleon"].metadata["orbital"]["puzzles"][
self.alia
]
logger.debug(f"Puzzle Setting: {setting}")
current_options = self.puzzle.options[len(self.inputlist)]
logger.debug(f"WIRED INDEX: {len(self.inputlist)}")
if len(self.inputlist) > len(self.puzzle.options):
logger.debug("ERR IDX")
logger.debug(self.inputlist)
logger.debug(self.puzzle.options)
else:
current_options = self.puzzle.options[len(self.inputlist)]
yield Label(setting["primary"], id="sentence")
yield Label(self.puzzle.wording[len(self.inputlist)], id="puzzle")
yield Label(f"当前输入: {self.inputlist}", id="inputpreview")

View File

@@ -49,6 +49,10 @@ class Recognition(BasePuzzleWidget):
self.alia = alia
def compose(self):
from heurams.context import config_var
autovoice = config_var.get()['interface']['memorizor']['autovoice']
if autovoice:
self.screen.action_play_voice() # type: ignore
cfg: RecognitionConfig = self.atom.registry["orbital"]["puzzles"][self.alia]
delim = self.atom.registry["nucleon"].metadata["formation"]["delimiter"]
replace_dict = {
@@ -68,7 +72,8 @@ class Recognition(BasePuzzleWidget):
primary = cfg["primary"]
with Center():
yield Static(f"[dim]{cfg['top_dim']}[/]")
for i in cfg['top_dim']:
yield Static(f"[dim]{i}[/]")
yield Label("")
for old, new in replace_dict.items():

View File

@@ -2,21 +2,23 @@
SM-15 接口兼容实现, 基于 SM-15 算法的逆向工程
全局状态保存在文件中, 项目状态通过 algodata 字典传递
基于: https://github.com/kazuaki/sm.js
原始 CoffeeScript 代码: (c) 2014 Kazuaki Tanida (MIT 许可证)
基于: https://github.com/slaypni/sm.js
原始 CoffeeScript 代码: (c) 2014 Kazuaki Tanida
MIT 许可证
"""
import datetime
import json
import os
from typing import TypedDict
import pathlib
from heurams.context import config_var
from heurams.kernel.algorithms.sm15m_calc import (MAX_AF, MIN_AF, NOTCH_AF,
RANGE_AF, RANGE_REPETITION,
SM, THRESHOLD_RECALL, Item)
# 全局状态文件路径
_GLOBAL_STATE_FILE = os.path.expanduser("~/.sm15_global_state.json")
_GLOBAL_STATE_FILE = os.path.expanduser(pathlib.Path(config_var.get()['paths']['global_dir']) / 'sm15m_global_state.json')
def _get_global_sm():

View File

@@ -1,5 +1,5 @@
"""
基于: https://github.com/kazuaki/sm.js
基于: https://github.com/slaypni/sm.js
原始 CoffeeScript 代码: (c) 2014 Kazuaki Tanida
MIT 许可证

View File

@@ -62,7 +62,6 @@ class Atom:
"orbital_fmt": "toml",
"runtime": {"locked": False, "min_rate": 0x3F3F3F3F, "newact": False},
}
self.do_eval()
logger.debug("Atom 初始化完成")
def link(self, key, value):
@@ -70,7 +69,6 @@ class Atom:
if key in self.registry.keys():
self.registry[key] = value
logger.debug("'%s' 已链接, 触发 do_eval", key)
self.do_eval()
if key == "electron":
if self.registry["electron"].is_activated() == 0:
self.registry["runtime"]["newact"] = True
@@ -78,6 +76,56 @@ class Atom:
logger.error("尝试链接不受支持的键: '%s'", key)
raise ValueError("不受支持的原子元数据链接操作")
def do_eval(self):
"""
执行并以结果替换当前单元的所有 eval 语句
TODO: 带有限制的 eval, 异步/多线程执行避免堵塞
"""
logger.debug("EVAL 开始")
# eval 环境设置
def eval_with_env(s: str):
default = config_var.get()["puzzles"]
payload = self.registry['nucleon'].payload
metadata = self.registry['nucleon'].metadata
eval_value = eval(s)
if isinstance(eval_value, (int, float)):
ret = str(eval_value)
else:
ret = eval_value
logger.debug(
"eval 执行成功: '%s' -> '%s'",
s,
str(ret)[:50] + "..." if len(ret) > 50 else ret,
)
return ret
def traverse(data, modifier):
if isinstance(data, dict):
for key, value in data.items():
data[key] = traverse(value, modifier)
return data
elif isinstance(data, list):
for i, item in enumerate(data):
data[i] = traverse(item, modifier)
return data
elif isinstance(data, tuple):
return tuple(traverse(item, modifier) for item in data)
else:
if isinstance(data, str):
if data.startswith("eval:"):
logger.debug("发现 eval 表达式: '%s'", data[5:])
return modifier(data[5:])
return data
try:
traverse(self.registry['nucleon'].payload, eval_with_env)
traverse(self.registry['nucleon'].metadata, eval_with_env)
traverse(self.registry['orbital'], eval_with_env)
except Exception as e:
ret = f"此 eval 实例发生错误: {e}"
logger.warning(ret)
logger.debug("EVAL 完成")
def minimize(self, rating):
"""效果等同于 self.registry['runtime']['min_rate'] = min(rating, self.registry['runtime']['min_rate'])
@@ -114,80 +162,6 @@ class Atom:
else:
logger.debug("禁止总评分")
def do_eval(self):
"""
执行并以结果替换当前单元的所有 eval 语句
TODO: 带有限制的 eval, 异步/多线程执行避免堵塞
"""
logger.debug("Atom.do_eval 开始")
# eval 环境设置
def eval_with_env(s: str):
# 初始化默认值
nucleon = self.registry["nucleon"]
default = {}
metadata = {}
try:
default = config_var.get()["puzzles"]
metadata = nucleon.metadata
except Exception:
# 如果无法获取配置或元数据, 使用空字典
logger.debug("无法获取配置或元数据, 使用空字典")
pass
try:
eval_value = eval(s)
if isinstance(eval_value, (list, dict)):
ret = eval_value
else:
ret = str(eval_value)
logger.debug(
"eval 执行成功: '%s' -> '%s'",
s,
str(ret)[:50] + "..." if len(ret) > 50 else ret,
)
except Exception as e:
ret = f"此 eval 实例发生错误: {e}"
logger.warning("eval 执行错误: '%s' -> %s", s, e)
return ret
def traverse(data, modifier):
if isinstance(data, dict):
for key, value in data.items():
data[key] = traverse(value, modifier)
return data
elif isinstance(data, list):
for i, item in enumerate(data):
data[i] = traverse(item, modifier)
return data
elif isinstance(data, tuple):
return tuple(traverse(item, modifier) for item in data)
else:
if isinstance(data, str):
if data.startswith("eval:"):
logger.debug("发现 eval 表达式: '%s'", data[5:])
return modifier(data[5:])
return data
# 如果 nucleon 存在且有 do_eval 方法, 调用它
nucleon = self.registry["nucleon"]
if nucleon is not None and hasattr(nucleon, "do_eval"):
nucleon.do_eval()
logger.debug("已调用 nucleon.do_eval")
# 如果 electron 存在且其 algodata 包含 eval 字符串, 遍历它
electron = self.registry["electron"]
if electron is not None and hasattr(electron, "algodata"):
traverse(electron.algodata, eval_with_env)
logger.debug("已处理 electron algodata eval")
# 如果 orbital 存在且是字典, 遍历它
orbital = self.registry["orbital"]
if orbital is not None and isinstance(orbital, dict):
traverse(orbital, eval_with_env)
logger.debug("orbital eval 完成")
logger.debug("Atom.do_eval 完成")
def persist(self, key):
logger.debug("Atom.persist: key='%s'", key)
path: pathlib.Path | None = self.registry[key + "_path"]

View File

@@ -9,7 +9,7 @@ logger = get_logger(__name__)
class Electron:
"""电子: 记忆分析元数据及算法"""
def __init__(self, ident: str, algodata: dict = {}, algo_name: str = "SM-2"):
def __init__(self, ident: str, algodata: dict = {}, algo_name: str = ""):
"""初始化电子对象 (记忆数据)
Args:
@@ -17,19 +17,21 @@ class Electron:
algodata: 算法数据字典, 包含算法的各项参数和设置
algo: 使用的算法模块标识
"""
if algo_name == "":
algo_name = config_var.get()['algorithm']['default']
logger.debug(
"创建 Electron 实例, ident: '%s', algo_name: '%s'", ident, algo_name
"创建 Electron 实例, ident: '%s', algo_name: '%s', algodata: %s", ident, algo_name, algodata
)
self.algodata = algodata
self.ident = ident
self.algo = algorithms[algo_name]
logger.debug("使用的算法类: %s", self.algo.__name__)
if self.algo not in self.algodata.keys():
if self.algo.algo_name not in self.algodata.keys():
self.algodata[self.algo.algo_name] = {}
logger.debug("算法键 '%s' 不存在, 已创建空字典", self.algo)
if not self.algodata[self.algo.algo_name]:
logger.debug("算法数据为空, 使用默认值初始化")
logger.debug(f"算法数据为空, 使用默认值初始化{self.algodata[self.algo.algo_name]}")
self._default_init(self.algo.defaults)
else:
logger.debug("算法数据已存在, 跳过默认初始化")

View File

@@ -68,7 +68,7 @@ def load_electron(path: pathlib.Path, fmt="json") -> dict:
logger.debug("JSON 解析成功, keys: %s", list(dictdata.keys()))
dic = dict()
for item, attr in dictdata.items():
logger.debug("处理电子项目: %s", item)
logger.debug("处理电子项目: %s, %s", item, attr)
dic[item] = Electron(item, attr)
logger.debug("load_electron 完成, 加载了 %d 个 Electron 对象", len(dic))
return dic

View File

@@ -49,56 +49,8 @@ class Nucleon:
def __hash__(self):
return hash(self.ident)
def do_eval(self):
"""
执行并以结果替换当前单元的所有 eval 语句
TODO: 带有限制的 eval, 异步/多线程执行避免堵塞
"""
logger.debug("Nucleon.do_eval 开始")
# eval 环境设置
def eval_with_env(s: str):
try:
nucleon = self
eval_value = eval(s)
if isinstance(eval_value, (int, float)):
ret = str(eval_value)
else:
ret = eval_value
logger.debug(
"eval 执行成功: '%s' -> '%s'",
s,
str(ret)[:50] + "..." if len(ret) > 50 else ret,
)
except Exception as e:
ret = f"此 eval 实例发生错误: {e}"
logger.warning("eval 执行错误: '%s' -> %s", s, e)
return ret
def traverse(data, modifier):
if isinstance(data, dict):
for key, value in data.items():
data[key] = traverse(value, modifier)
return data
elif isinstance(data, list):
for i, item in enumerate(data):
data[i] = traverse(item, modifier)
return data
elif isinstance(data, tuple):
return tuple(traverse(item, modifier) for item in data)
else:
if isinstance(data, str):
if data.startswith("eval:"):
logger.debug("发现 eval 表达式: '%s'", data[5:])
return modifier(data[5:])
return data
traverse(self.payload, eval_with_env)
traverse(self.metadata, eval_with_env)
logger.debug("Nucleon.do_eval 完成")
@staticmethod
def placeholder():
"""生成一个占位原子核"""
logger.debug("创建 Nucleon 占位符")
return Nucleon("核子对象样例内容", {})
return Nucleon("核子对象样例内容", {})

View File

@@ -3,6 +3,7 @@ import pathlib
import edge_tts
from heurams.services.logger import get_logger
from heurams.context import config_var
from .base import BaseTTS
@@ -18,7 +19,7 @@ class EdgeTTS(BaseTTS):
try:
communicate = edge_tts.Communicate(
text,
"zh-CN-YunjianNeural",
config_var.get()['providers']['tts']['edgetts']["voice"],
)
logger.debug("EdgeTTS 通信对象创建成功, 正在保存音频")
communicate.save_sync(str(path))

View File

@@ -9,5 +9,5 @@ logger = get_logger(__name__)
play_by_path: Callable = prov[config_var.get()["services"]["audio"]].play_by_path
logger.debug(
"音频服务初始化完成, 使用 provider: %s", config_var.get()["services"]["audio"]
"音频服务初始化完成, 使用 Provider: %s", config_var.get()["services"]["audio"]
)

View File

@@ -2,12 +2,12 @@
from typing import Callable
from heurams.context import config_var
from heurams.providers.tts import TTSs
from heurams.providers.tts import providers as prov
from heurams.services.logger import get_logger
logger = get_logger(__name__)
convert: Callable = TTSs[config_var.get().get("tts_provider")]
convertor: Callable = prov[config_var.get()["services"]["tts"]].convert
logger.debug(
"TTS服务初始化完成, 使用 provider: %s", config_var.get().get("tts_provider")
"TTS服务初始化完成, 使用 provider: %s", config_var.get()["services"]["tts"]
)

View File

@@ -3,7 +3,7 @@ from heurams.services.logger import get_logger
logger = get_logger(__name__)
ver = "0.4.1"
ver = "0.4.2"
stage = "prototype"
codename = "fledge" # 雏鸟, 0.4.x 版本