style: 代码格式化

This commit is contained in:
2025-12-13 21:47:37 +08:00
parent a0b327cdbb
commit baa7ac8ee9
64 changed files with 755 additions and 573 deletions

View File

@@ -2,6 +2,7 @@
全局上下文管理模块
以及基准路径
"""
from contextvars import ContextVar
import pathlib
from heurams.services.config import ConfigFile
@@ -11,17 +12,22 @@ from heurams.services.config import ConfigFile
# 数据文件路径规定: 以运行目录为准
rootdir = pathlib.Path(__file__).parent
print(f'rootdir: {rootdir}')
print(f"rootdir: {rootdir}")
workdir = pathlib.Path.cwd()
print(f'workdir: {workdir}')
config_var: ContextVar[ConfigFile] = ContextVar('config_var', default=ConfigFile(rootdir / "default" / "config" / "config.toml"))
print(f"workdir: {workdir}")
config_var: ContextVar[ConfigFile] = ContextVar(
"config_var", default=ConfigFile(rootdir / "default" / "config" / "config.toml")
)
try:
config_var: ContextVar[ConfigFile] = ContextVar('config_var', default=ConfigFile(workdir / "config" / "config.toml")) # 配置文件
print('已加载自定义用户配置')
config_var: ContextVar[ConfigFile] = ContextVar(
"config_var", default=ConfigFile(workdir / "config" / "config.toml")
) # 配置文件
print("已加载自定义用户配置")
except:
print('未能加载自定义用户配置')
print("未能加载自定义用户配置")
# runtime_var: ContextVar = ContextVar('runtime_var', default=dict()) # 运行时共享数据
#runtime_var: ContextVar = ContextVar('runtime_var', default=dict()) # 运行时共享数据
class ConfigContext:
"""
@@ -33,14 +39,14 @@ class ConfigContext:
... get_daystamp() # 使用 test_config
>>> get_daystamp() # 恢复原配置
"""
def __init__(self, config_provider: ConfigFile):
self.config_provider = config_provider
self._token = None
def __enter__(self):
self._token = config_var.set(self.config_provider)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
config_var.reset(self._token) # type: ignore
config_var.reset(self._token) # type: ignore

View File

@@ -4,17 +4,20 @@ from .screens.dashboard import DashboardScreen
from .screens.nucreator import NucleonCreatorScreen
from .screens.precache import PrecachingScreen
from .screens.about import AboutScreen
class HeurAMSApp(App):
TITLE = "潜进"
#CSS_PATH = str(cxt.rootdir / "interface" / "css" / "main.css")
# CSS_PATH = str(cxt.rootdir / "interface" / "css" / "main.css")
SUB_TITLE = "启发式先进记忆调度器"
BINDINGS = [("q", "quit", "退出"),
("d", "toggle_dark", "改变色调"),
("1", "app.push_screen('dashboard')", "仪表盘"),
("2", "app.push_screen('precache_all')", "缓存管理器"),
("3", "app.push_screen('nucleon_creator')", "创建新单元"),
("0", "app.push_screen('about')", "版本信息"),
]
BINDINGS = [
("q", "quit", "退出"),
("d", "toggle_dark", "改变色调"),
("1", "app.push_screen('dashboard')", "仪表盘"),
("2", "app.push_screen('precache_all')", "缓存管理器"),
("3", "app.push_screen('nucleon_creator')", "创建新单元"),
("0", "app.push_screen('about')", "版本信息"),
]
SCREENS = {
"dashboard": DashboardScreen,
"nucleon_creator": NucleonCreatorScreen,
@@ -28,16 +31,19 @@ class HeurAMSApp(App):
def on_button_pressed(self, event: Button.Pressed) -> None:
self.exit(event.button.id)
def environment_check():
from pathlib import Path
for i in config_var.get()["paths"].values():
i = Path(i)
if not i.exists():
print(f"创建 {i}")
i.mkdir(exist_ok = True, parents = True)
i.mkdir(exist_ok=True, parents=True)
else:
print(f"找到 {i}")
def is_subdir(parent, child):
try:
child.relative_to(parent)
@@ -45,12 +51,14 @@ def is_subdir(parent, child):
except:
return 0
# 开发模式
from heurams.context import rootdir, workdir, config_var
from pathlib import Path
from heurams.context import rootdir
import os
if is_subdir(Path(rootdir),Path(os.getcwd())):
if is_subdir(Path(rootdir), Path(os.getcwd())):
os.chdir(Path(rootdir) / ".." / "..")
print(f'转入开发数据目录: {Path(rootdir)/".."/".."}')

View File

@@ -14,6 +14,7 @@ from textual.screen import Screen
import heurams.services.version as version
from heurams.context import *
class AboutScreen(Screen):
def compose(self) -> ComposeResult:

View File

@@ -21,6 +21,7 @@ from .about import AboutScreen
import pathlib
class DashboardScreen(Screen):
def compose(self) -> ComposeResult:
@@ -31,7 +32,9 @@ class DashboardScreen(Screen):
Label(f'时区修正: UTC+{config_var.get()["timezone_offset"] / 3600}'),
Label("选择待学习或待修改的记忆单元集:", classes="title-label"),
ListView(id="union-list", classes="union-list-view"),
Label(f'"潜进" 开放源代码软件项目 | 版本 {version.ver} {version.codename.capitalize()} | Wang Zhiyu 2025'),
Label(
f'"潜进" 开放源代码软件项目 | 版本 {version.ver} {version.codename.capitalize()} | Wang Zhiyu 2025'
),
)
yield Footer()
@@ -46,17 +49,20 @@ class DashboardScreen(Screen):
res[0] = f"{filename}\0"
from heurams.kernel.particles.loader import load_electron
import heurams.kernel.particles as pt
electron_file_path = (pathlib.Path(config_var.get()["paths"]["electron_dir"]) / (filestem + ".json"))
if electron_file_path.exists(): # 未找到则创建电子文件 (json)
electron_file_path = pathlib.Path(config_var.get()["paths"]["electron_dir"]) / (
filestem + ".json"
)
if electron_file_path.exists(): # 未找到则创建电子文件 (json)
pass
else:
electron_file_path.touch()
with open(electron_file_path, 'w') as f:
with open(electron_file_path, "w") as f:
f.write("{}")
electron_dict = load_electron(path=electron_file_path) # TODO: 取消硬编码扩展名
electron_dict = load_electron(path=electron_file_path) # TODO: 取消硬编码扩展名
is_due = 0
is_activated = 0
nextdate = 0x3f3f3f3f
nextdate = 0x3F3F3F3F
for i in electron_dict.values():
i: pt.Electron
if i.is_due():
@@ -78,12 +84,18 @@ class DashboardScreen(Screen):
if len(probe["nucleon"]):
for file in probe["nucleon"]:
text = self.item_desc_generator(file)
union_list_widget.append(ListItem(
Label(text[0] + '\n' + text[1]),
))
union_list_widget.append(
ListItem(
Label(text[0] + "\n" + text[1]),
)
)
else:
union_list_widget.append(
ListItem(Static("在 ./nucleon/ 中未找到任何内容源数据文件.\n请放置文件后重启应用.\n或者新建空的单元集."))
ListItem(
Static(
"在 ./nucleon/ 中未找到任何内容源数据文件.\n请放置文件后重启应用.\n或者新建空的单元集."
)
)
)
union_list_widget.disabled = True
@@ -95,27 +107,36 @@ class DashboardScreen(Screen):
if "未找到任何 .toml 文件" in str(selected_label.renderable):
return
selected_filename = pathlib.Path(str(selected_label.renderable)
.partition('\0')[0] # 文件名末尾截断, 保留文件名
.replace('*', "")) # 去除markdown加粗
selected_filename = pathlib.Path(
str(selected_label.renderable)
.partition("\0")[0] # 文件名末尾截断, 保留文件名
.replace("*", "")
) # 去除markdown加粗
nucleon_file_path = pathlib.Path(config_var.get()["paths"]["nucleon_dir"]) / selected_filename
electron_file_path = pathlib.Path(config_var.get()["paths"]["electron_dir"]) / (str(selected_filename.stem) + ".json")
nucleon_file_path = (
pathlib.Path(config_var.get()["paths"]["nucleon_dir"]) / selected_filename
)
electron_file_path = pathlib.Path(config_var.get()["paths"]["electron_dir"]) / (
str(selected_filename.stem) + ".json"
)
self.app.push_screen(PreparationScreen(nucleon_file_path, electron_file_path))
def on_button_pressed(self, event) -> None:
if event.button.id == "new_nucleon_button":
# 切换到创建单元
from .nucreator import NucleonCreatorScreen
newscr = NucleonCreatorScreen()
self.app.push_screen(newscr)
elif event.button.id == "precache_all_button":
# 切换到缓存管理器
from .precache import PrecachingScreen
precache_screen = PrecachingScreen()
self.app.push_screen(precache_screen)
elif event.button.id == "about_button":
from .about import AboutScreen
about_screen = AboutScreen()
self.app.push_screen(about_screen)

View File

@@ -12,10 +12,12 @@ import heurams.kernel.particles as pt
import heurams.kernel.puzzles as pz
from .. import shim
class AtomState(Enum):
FAILED = auto()
NORMAL = auto()
class MemScreen(Screen):
BINDINGS = [
("q", "pop_screen", "返回"),
@@ -27,15 +29,21 @@ class MemScreen(Screen):
if config_var.get()["quick_pass"]:
BINDINGS.append(("k", "quick_pass", "跳过"))
rating = reactive(-1)
def __init__(self, atoms: list, name: str | None = None, id: str | None = None, classes: str | None = None) -> None:
def __init__(
self,
atoms: list,
name: str | None = None,
id: str | None = None,
classes: str | None = None,
) -> None:
super().__init__(name, id, classes)
self.atoms = atoms
self.phaser = Phaser(atoms)
#print(self.phaser.state)
self.procession: Procession = self.phaser.current_procession() # type: ignore
#print(self.phaser.state)
#self.procession.forward(1)
# print(self.phaser.state)
self.procession: Procession = self.phaser.current_procession() # type: ignore
# print(self.phaser.state)
# self.procession.forward(1)
def on_mount(self):
self.load_puzzle()
@@ -45,22 +53,26 @@ class MemScreen(Screen):
try:
print(self.phaser.state)
self.fission = Fission(self.procession.current_atom, self.phaser.state)
#print(1)
# print(1)
puzzle_info = next(self.fission.generate())
print(puzzle_info)
return shim.puzzle2widget[puzzle_info["puzzle"]](atom = self.procession.current_atom, alia = puzzle_info["alia"])
return shim.puzzle2widget[puzzle_info["puzzle"]](
atom=self.procession.current_atom, alia=puzzle_info["alia"]
)
except (KeyError, StopIteration, AttributeError) as e:
print(f"调度展开出错: {e}")
return Static("无法生成谜题")
#print(shim.puzzle2widget[puzzle_info["puzzle"]])
# print(shim.puzzle2widget[puzzle_info["puzzle"]])
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Center():
yield Static(f"当前进度: {self.procession.process()}/{self.procession.total_length()}")
#self.mount(self.current_widget()) # type: ignore
yield Static(
f"当前进度: {self.procession.process()}/{self.procession.total_length()}"
)
# self.mount(self.current_widget()) # type: ignore
yield Container(id="puzzle-container")
#yield Button("重新学习此单元", id="re-recognize", variant="warning")
# yield Button("重新学习此单元", id="re-recognize", variant="warning")
yield Footer()
def load_puzzle(self):

View File

@@ -14,6 +14,7 @@ from textual.screen import Screen
from heurams.services.version import ver
class NucleonCreatorScreen(Screen):
BINDINGS = [("q", "go_back", "返回")]
@@ -23,18 +24,20 @@ class NucleonCreatorScreen(Screen):
def search_templates(self):
from pathlib import Path
from heurams.context import config_var
template_dir = Path(config_var.get()['paths']['template_dir'])
template_dir = Path(config_var.get()["paths"]["template_dir"])
templates = list()
for i in template_dir.iterdir():
if i.name.endswith('.toml'):
if i.name.endswith(".toml"):
try:
import toml
with open(i, 'r') as f:
with open(i, "r") as f:
dic = toml.load(f)
desc = dic['__metadata__.attribution']['desc']
templates.append(desc + ' (' + i.name + ')')
desc = dic["__metadata__.attribution"]["desc"]
templates.append(desc + " (" + i.name + ")")
except Exception as e:
templates.append(f'无描述模板 ({i.name})')
templates.append(f"无描述模板 ({i.name})")
print(e)
print(templates)
return templates
@@ -43,10 +46,14 @@ class NucleonCreatorScreen(Screen):
yield Header(show_clock=True)
with Container(id="vice_container"):
yield Label(f"[b]空白单元集创建向导\n")
yield Markdown("> 提示: 你可能注意到当选中文本框时底栏和操作按键绑定将被覆盖 \n只需选中(使用鼠标或 Tab)选择框即可恢复底栏功能")
yield Markdown(
"> 提示: 你可能注意到当选中文本框时底栏和操作按键绑定将被覆盖 \n只需选中(使用鼠标或 Tab)选择框即可恢复底栏功能"
)
yield Markdown("1. 键入单元集名称")
yield Input(placeholder="单元集名称")
yield Markdown("> 单元集名称不应与现有单元集重复. \n> 新的单元集文件将创建在 ./nucleon/你输入的名称.toml")
yield Markdown(
"> 单元集名称不应与现有单元集重复. \n> 新的单元集文件将创建在 ./nucleon/你输入的名称.toml"
)
yield Label(f"\n")
yield Markdown("2. 选择单元集模板")
LINES = self.search_templates()
@@ -79,5 +86,5 @@ class NucleonCreatorScreen(Screen):
def on_button_pressed(self, event) -> None:
event.stop()
if event.button.id == 'submit_button':
if event.button.id == "submit_button":
pass

View File

@@ -21,17 +21,19 @@ import heurams.services.hasher as hasher
from heurams.context import *
from textual.worker import Worker, get_current_worker
class PrecachingScreen(Screen):
"""预缓存音频文件屏幕
缓存记忆单元音频文件, 全部(默认) 或部分记忆单元(可选参数传入)
Args:
nucleons (list): 可选列表, 仅包含 Nucleon 对象
desc (list): 可选字符串, 包含对此次调用的文字描述
"""
BINDINGS = [("q", "go_back", "返回")]
def __init__(self, nucleons: list = [], desc: str = ""):
super().__init__(name=None, id=None, classes=None)
self.nucleons = nucleons
@@ -47,23 +49,23 @@ class PrecachingScreen(Screen):
for i in nucleons:
i: pt.Nucleon
i.do_eval()
#print("完成 EVAL")
# print("完成 EVAL")
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Container(id="precache_container"):
yield Label("[b]音频预缓存[/b]", classes="title-label")
if self.nucleons:
yield Static(f"目标单元归属: [b]{self.desc}[/b]", classes="target-info")
yield Static(f"单元数量: {len(self.nucleons)}", classes="target-info")
else:
yield Static("目标: 所有单元", classes="target-info")
yield Static(id="status", classes="status-info")
yield Static(id="current_item", classes="current-item")
yield ProgressBar(total=100, show_eta=False, id="progress_bar")
with Horizontal(classes="button-group"):
if not self.is_precaching:
yield Button("开始预缓存", id="start_precache", variant="primary")
@@ -71,9 +73,9 @@ class PrecachingScreen(Screen):
yield Button("取消预缓存", id="cancel_precache", variant="error")
yield Button("清空缓存", id="clear_cache", variant="warning")
yield Button("返回", id="go_back", variant="default")
yield Static("若您离开此界面, 未完成的缓存进程会自动停止.")
yield Static("缓存程序支持 \"断点续传\".")
yield Static('缓存程序支持 "断点续传".')
yield Footer()
@@ -86,10 +88,10 @@ class PrecachingScreen(Screen):
status_widget = self.query_one("#status", Static)
item_widget = self.query_one("#current_item", Static)
progress_bar = self.query_one("#progress_bar", ProgressBar)
status_widget.update(f"状态: {status}")
item_widget.update(f"当前项目: {current_item}" if current_item else "")
if progress is not None:
progress_bar.progress = progress
progress_bar.advance(0) # 刷新显示
@@ -97,12 +99,14 @@ class PrecachingScreen(Screen):
def precache_by_text(self, text: str):
"""预缓存单段文本的音频"""
from heurams.context import rootdir, workdir, config_var
cache_dir = pathlib.Path(config_var.get()["paths"]["cache_dir"])
cache_dir.mkdir(parents=True, exist_ok=True)
cache_file = cache_dir / f"{hasher.get_md5(text)}.wav"
if not cache_file.exists():
try: # TODO: 调用模块消除tts耦合
try: # TODO: 调用模块消除tts耦合
import edge_tts as tts
communicate = tts.Communicate(text, "zh-CN-XiaoxiaoNeural")
communicate.save_sync(str(cache_file))
return 1
@@ -110,34 +114,30 @@ class PrecachingScreen(Screen):
print(f"预缓存失败 '{text}': {e}")
return 0
return 1
def precache_by_nucleon(self, nucleon: pt.Nucleon):
"""依据 Nucleon 缓存"""
#print(nucleon.metadata['formation']['tts_text'])
ret = self.precache_by_text(nucleon.metadata['formation']['tts_text'])
# print(nucleon.metadata['formation']['tts_text'])
ret = self.precache_by_text(nucleon.metadata["formation"]["tts_text"])
return ret
#print(f"TTS 缓存: {nucleon.metadata['formation']['tts_text']}")
# print(f"TTS 缓存: {nucleon.metadata['formation']['tts_text']}")
def precache_by_list(self, nucleons: list):
"""依据 Nucleons 列表缓存"""
for idx, nucleon in enumerate(nucleons):
#print(f"PROC: {nucleon}")
# print(f"PROC: {nucleon}")
worker = get_current_worker()
if worker and worker.is_cancelled: # 函数在worker中执行且已被取消
if worker and worker.is_cancelled: # 函数在worker中执行且已被取消
return False
text = nucleon.metadata['formation']['tts_text']
#self.current_item = text[:30] + "..." if len(text) > 50 else text
#print(text)
text = nucleon.metadata["formation"]["tts_text"]
# self.current_item = text[:30] + "..." if len(text) > 50 else text
# print(text)
self.processed += 1
#print(self.processed)
#print(self.total)
# print(self.processed)
# print(self.total)
progress = int((self.processed / self.total) * 100) if self.total > 0 else 0
#print(progress)
self.update_status(
f"正处理 ({idx + 1}/{len(nucleons)})",
text,
progress
)
# print(progress)
self.update_status(f"正处理 ({idx + 1}/{len(nucleons)})", text, progress)
ret = self.precache_by_nucleon(nucleon)
if not ret:
self.update_status(
@@ -145,6 +145,7 @@ class PrecachingScreen(Screen):
f"处理失败, 跳过: {self.current_item}",
)
import time
time.sleep(1)
if self.cancel_flag:
worker.cancel()
@@ -153,9 +154,9 @@ class PrecachingScreen(Screen):
return True
def precache_by_nucleons(self):
#print("开始缓存")
# print("开始缓存")
ret = self.precache_by_list(self.nucleons)
#print(f"返回 {ret}")
# print(f"返回 {ret}")
return ret
def precache_by_filepath(self, path: pathlib.Path):
@@ -165,13 +166,15 @@ class PrecachingScreen(Screen):
lst.append(i[0])
return self.precache_by_list(lst)
def precache_all_files(self):
"""预缓存所有文件"""
from heurams.context import rootdir, workdir, config_var
nucleon_path = pathlib.Path(config_var.get()["paths"]["nucleon_dir"])
nucleon_files = [f for f in nucleon_path.iterdir() if f.suffix == ".toml"] # TODO: 解耦合
nucleon_files = [
f for f in nucleon_path.iterdir() if f.suffix == ".toml"
] # TODO: 解耦合
# 计算总项目数
self.total = 0
nu = list()
@@ -186,16 +189,26 @@ class PrecachingScreen(Screen):
i: pt.Nucleon
i.do_eval()
return self.precache_by_list(nu)
def on_button_pressed(self, event: Button.Pressed) -> None:
event.stop()
if event.button.id == "start_precache" and not self.is_precaching:
# 开始预缓存
if self.nucleons:
self.precache_worker = self.run_worker(self.precache_by_nucleons, thread=True, exclusive=True, exit_on_error=True)
self.precache_worker = self.run_worker(
self.precache_by_nucleons,
thread=True,
exclusive=True,
exit_on_error=True,
)
else:
self.precache_worker = self.run_worker(self.precache_all_files, thread=True, exclusive=True, exit_on_error=True)
self.precache_worker = self.run_worker(
self.precache_all_files,
thread=True,
exclusive=True,
exit_on_error=True,
)
elif event.button.id == "cancel_precache" and self.is_precaching:
# 取消预缓存
if self.precache_worker:
@@ -204,20 +217,23 @@ class PrecachingScreen(Screen):
self.processed = 0
self.progress = 0
self.update_status("已取消", "预缓存操作被用户取消", 0)
elif event.button.id == "clear_cache":
# 清空缓存
try:
import shutil
from heurams.context import rootdir, workdir, config_var
shutil.rmtree(f"{config_var.get()["paths"]["cache_dir"]}", ignore_errors=True)
shutil.rmtree(
f"{config_var.get()["paths"]["cache_dir"]}", ignore_errors=True
)
self.update_status("已清空", "音频缓存已清空", 0)
except Exception as e:
self.update_status("错误", f"清空缓存失败: {e}")
self.cancel_flag = 1
self.processed = 0
self.progress = 0
elif event.button.id == "go_back":
self.action_go_back()

View File

@@ -15,15 +15,11 @@ import heurams.kernel.particles as pt
import heurams.services.hasher as hasher
from heurams.context import *
class PreparationScreen(Screen):
BINDINGS = [
("q", "go_back", "返回"),
("p", "precache", "预缓存音频")
]
def __init__(
self, nucleon_file: pathlib.Path, electron_file: pathlib.Path
) -> None:
class PreparationScreen(Screen):
BINDINGS = [("q", "go_back", "返回"), ("p", "precache", "预缓存音频")]
def __init__(self, nucleon_file: pathlib.Path, electron_file: pathlib.Path) -> None:
super().__init__(name=None, id=None, classes=None)
self.nucleon_file = nucleon_file
self.electron_file = electron_file
@@ -39,10 +35,10 @@ class PreparationScreen(Screen):
yield Label(f"\n单元数量: {len(self.nucleons_with_orbital)}\n")
yield Button(
"开始记忆",
id="start_memorizing_button",
variant="primary",
classes="start-button",
"开始记忆",
id="start_memorizing_button",
variant="primary",
classes="start-button",
)
yield Button(
"预缓存音频",
@@ -68,6 +64,7 @@ class PreparationScreen(Screen):
def action_precache(self):
from ..screens.precache import PrecachingScreen
lst = list()
for i in self.nucleons_with_orbital:
lst.append(i[0])
@@ -97,8 +94,8 @@ class PreparationScreen(Screen):
atom.link("orbital_path", None)
atoms.append(atom)
from .memorizor import MemScreen
memscreen = MemScreen(atoms)
self.app.push_screen(memscreen)
elif event.button.id == "precache_button":
self.action_precache()

View File

@@ -1,24 +1,36 @@
"""Kernel 操作先进函数库"""
import random
import heurams.kernel.particles as pt
import heurams.kernel.puzzles as pz
import heurams.interface.widgets as pzw
from typing import TypedDict
staging = {} # 细粒度缓存区, 是 ident -> quality 的封装
staging = {} # 细粒度缓存区, 是 ident -> quality 的封装
def report_to_staging(atom: pt.Atom, quality):
staging[atom.ident] = min(quality, staging[atom.ident])
def clear():
staging = dict()
def deploy_to_electron():
for atom_ident, quality in staging.items():
if pt.atom_registry[atom_ident].registry['electron'].is_activated:
pt.atom_registry[atom_ident].registry['electron'].revisor(quality=quality)
if pt.atom_registry[atom_ident].registry["electron"].is_activated:
pt.atom_registry[atom_ident].registry["electron"].revisor(quality=quality)
else:
pt.atom_registry[atom_ident].registry['electron'].revisor(quality=quality, is_new_activation=True)
pt.atom_registry[atom_ident].registry["electron"].revisor(
quality=quality, is_new_activation=True
)
clear()
puzzle2widget = {
pz.RecognitionPuzzle: pzw.Recognition,
pz.ClozePuzzle: pzw.ClozePuzzle,
pz.MCQPuzzle: pzw.MCQPuzzle,
pz.BasePuzzle: pzw.BasePuzzleWidget,
}
}

View File

@@ -1,7 +1,24 @@
from textual.widget import Widget
import heurams.kernel.particles as pt
class BasePuzzleWidget(Widget):
def __init__(self, *children: Widget, atom: pt.Atom, name: str | None = None, id: str | None = None, classes: str | None = None, disabled: bool = False, markup: bool = True) -> None:
super().__init__(*children, name=name, id=id, classes=classes, disabled=disabled, markup=markup)
self.atom = atom
def __init__(
self,
*children: Widget,
atom: pt.Atom,
name: str | None = None,
id: str | None = None,
classes: str | None = None,
disabled: bool = False,
markup: bool = True
) -> None:
super().__init__(
*children,
name=name,
id=id,
classes=classes,
disabled=disabled,
markup=markup
)
self.atom = atom

View File

@@ -9,9 +9,28 @@ import heurams.kernel.particles as pt
from .base_puzzle_widget import BasePuzzleWidget
from textual.message import Message
class BasicEvaluation(BasePuzzleWidget):
def __init__(self, *children: Widget, atom: pt.Atom, alia: str = "", name: str | None = None, id: str | None = None, classes: str | None = None, disabled: bool = False, markup: bool = True) -> None:
super().__init__(*children, atom=atom, name=name, id=id, classes=classes, disabled=disabled, markup=markup)
def __init__(
self,
*children: Widget,
atom: pt.Atom,
alia: str = "",
name: str | None = None,
id: str | None = None,
classes: str | None = None,
disabled: bool = False,
markup: bool = True,
) -> None:
super().__init__(
*children,
atom=atom,
name=name,
id=id,
classes=classes,
disabled=disabled,
markup=markup,
)
class RatingChanged(Message):
def __init__(self, rating: int) -> None:
@@ -31,10 +50,10 @@ class BasicEvaluation(BasePuzzleWidget):
def compose(self):
# 显示主要内容
yield Label(self.atom.registry["nucleon"]["content"], id="main")
# 显示评估说明(可选)
yield Static("请评估你对这个内容的记忆程度:", classes="instruction")
# 按钮容器
with Container(id="button_container"):
btn = {}
@@ -56,7 +75,7 @@ class BasicEvaluation(BasePuzzleWidget):
btn["0"] = Button(
"完全空白", variant="error", id="feedback_0", classes="choice"
)
# 布局按钮
yield Horizontal(btn["5"], btn["4"])
yield Horizontal(btn["3"], btn["2"])
@@ -68,13 +87,15 @@ class BasicEvaluation(BasePuzzleWidget):
if button_id in self.feedback_mapping:
feedback_info = self.feedback_mapping[button_id]
self.post_message(self.RatingChanged(
rating=feedback_info["rating"],
))
self.post_message(
self.RatingChanged(
rating=feedback_info["rating"],
)
)
event.button.add_class("selected")
self.disable_other_buttons(button_id)
def disable_other_buttons(self, selected_button_id: str) -> None:
@@ -91,6 +112,8 @@ class BasicEvaluation(BasePuzzleWidget):
button_id = f"feedback_{event.key}"
if button_id in self.feedback_mapping:
# 模拟按钮点击
self.post_message(self.RatingChanged(
rating=self.feedback_mapping[button_id]["rating"],
))
self.post_message(
self.RatingChanged(
rating=self.feedback_mapping[button_id]["rating"],
)
)

View File

@@ -10,10 +10,29 @@ import copy
import random
from textual.message import Message
class ClozePuzzle(BasePuzzleWidget):
def __init__(self, *children: Widget, atom: pt.Atom, alia: str = "", name: str | None = None, id: str | None = None, classes: str | None = None, disabled: bool = False, markup: bool = True) -> None:
super().__init__(*children, atom=atom, name=name, id=id, classes=classes, disabled=disabled, markup=markup)
def __init__(
self,
*children: Widget,
atom: pt.Atom,
alia: str = "",
name: str | None = None,
id: str | None = None,
classes: str | None = None,
disabled: bool = False,
markup: bool = True,
) -> None:
super().__init__(
*children,
atom=atom,
name=name,
id=id,
classes=classes,
disabled=disabled,
markup=markup,
)
self.inputlist = list()
self.hashtable = {}
self.alia = alia
@@ -21,7 +40,11 @@ class ClozePuzzle(BasePuzzleWidget):
def _work(self):
cfg = self.atom.registry["orbital"]["puzzles"][self.alia]
self.puzzle = pz.ClozePuzzle(text=cfg["content"], delimiter=cfg["delimiter"], min_denominator=cfg["min_denominator"])
self.puzzle = pz.ClozePuzzle(
text=cfg["content"],
delimiter=cfg["delimiter"],
min_denominator=cfg["min_denominator"],
)
self.puzzle.refresh()
self.ans = copy.copy(self.puzzle.answer)
random.shuffle(self.ans)
@@ -35,6 +58,7 @@ class ClozePuzzle(BasePuzzleWidget):
class InputChanged(Message):
"""输入变化消息"""
def __init__(self, current_input: list, max_length: int) -> None:
self.current_input = current_input # 当前输入
self.max_length = max_length # 最大长度
@@ -51,16 +75,17 @@ class ClozePuzzle(BasePuzzleWidget):
def update_preview(self):
preview = self.query_one("#inputpreview")
preview.update(f"当前输入: {self.inputlist}") # type: ignore
self.post_message(self.InputChanged(
current_input=self.inputlist.copy(),
max_length=len(self.puzzle.answer)
))
preview.update(f"当前输入: {self.inputlist}") # type: ignore
self.post_message(
self.InputChanged(
current_input=self.inputlist.copy(), max_length=len(self.puzzle.answer)
)
)
def on_button_pressed(self, event: Button.Pressed) -> None:
button_id = event.button.id
if button_id == "delete":
if len(self.inputlist) > 0:
self.inputlist.pop()
@@ -69,17 +94,17 @@ class ClozePuzzle(BasePuzzleWidget):
answer_text = self.hashtable[button_id]
self.inputlist.append(answer_text)
self.update_preview()
if len(self.inputlist) >= len(self.puzzle.answer):
is_correct = self.inputlist == self.puzzle.answer
rating = 4 if is_correct else 2
self.post_message(self.RatingChanged(
atom=self.atom,
rating=rating,
is_correct=is_correct
))
self.post_message(
self.RatingChanged(
atom=self.atom, rating=rating, is_correct=is_correct
)
)
if not is_correct:
self.inputlist = []
self.update_preview()
self.update_preview()

View File

@@ -4,10 +4,27 @@ from textual.widgets import (
)
from textual.widget import Widget
class Finished(Widget):
def __init__(self, *children: Widget, alia = "", name: str | None = None, id: str | None = None, classes: str | None = None, disabled: bool = False, markup: bool = True) -> None:
def __init__(
self,
*children: Widget,
alia="",
name: str | None = None,
id: str | None = None,
classes: str | None = None,
disabled: bool = False,
markup: bool = True
) -> None:
self.alia = alia
super().__init__(*children, name=name, id=id, classes=classes, disabled=disabled, markup=markup)
super().__init__(
*children,
name=name,
id=id,
classes=classes,
disabled=disabled,
markup=markup
)
def compose(self):
yield Label("本次记忆进程结束", id="finished_msg")
@@ -15,5 +32,5 @@ class Finished(Widget):
def on_button_pressed(self, event):
button_id = event.button.id
if button_id == 'back-to-menu':
if button_id == "back-to-menu":
self.app.pop_screen()

View File

@@ -1,137 +1,119 @@
from textual.app import App, ComposeResult
from textual.events import Event
from textual.widgets import (
Collapsible,
Header,
Footer,
Markdown,
ListView,
ListItem,
Label,
Static,
Button,
)
from textual.containers import Container, Horizontal, Center
from textual.screen import Screen
from textual.widget import Widget
from typing import Tuple, Dict
import heurams.kernel.particles as pt
import heurams.kernel.puzzles as pz
from .base_puzzle_widget import BasePuzzleWidget
import copy
import random
from textual.message import Message
from typing import TypedDict
class Setting(TypedDict):
__origin__: str
__hint__: str
primary: str # 显示的提示文本
mapping: dict # 谜题到答案的映射
jammer: list # 干扰项
max_riddles_num: int # 最大谜题数量
prefix: str # 提示词前缀
class MCQPuzzle(BasePuzzleWidget):
def __init__(self, *children: Widget, atom: pt.Atom, alia: str = "", name: str | None = None, id: str | None = None, classes: str | None = None, disabled: bool = False, markup: bool = True) -> None:
super().__init__(*children, atom=atom, name=name, id=id, classes=classes, disabled=disabled, markup=markup)
def __init__(
self,
*children: Widget,
atom: pt.Atom,
alia: str = "",
name: str | None = None,
id: str | None = None,
classes: str | None = None,
disabled: bool = False,
markup: bool = True,
) -> None:
super().__init__(
*children,
atom=atom,
name=name,
id=id,
classes=classes,
disabled=disabled,
markup=markup,
)
self.inputlist = []
self.alia = alia
self.hashtable = {}
self._work()
self._load()
def _work(self):
def _load(self):
cfg = self.atom.registry["orbital"]["puzzles"][self.alia]
self.puzzle = pz.MCQPuzzle(cfg["mapping"], cfg["jammer"], cfg["max_riddles_num"], cfg['prefix'])
self.puzzle = pz.MCQPuzzle(
cfg["mapping"], cfg["jammer"], cfg["max_riddles_num"], cfg["prefix"]
)
self.puzzle.refresh()
class PuzzleCompleted(Message):
"""选择题完成消息"""
def __init__(self, atom: pt.Atom, rating: int, is_correct: bool, user_answers: list, correct_answers: list) -> None:
self.atom = atom
self.rating = rating # 评分
self.is_correct = is_correct # 是否正确
self.user_answers = user_answers # 用户答案
self.correct_answers = correct_answers # 正确答案
super().__init__()
class InputChanged(Message):
"""输入变化消息"""
def __init__(self, current_input: list, current_question: int, total_questions: int, current_question_text: str) -> None:
self.current_input = current_input # 当前输入
self.current_question = current_question # 当前题号
self.total_questions = total_questions # 总题数
self.current_question_text = current_question_text # 当前问题文本
self.progress = current_question / total_questions # 进度
super().__init__()
class QuestionAdvanced(Message):
"""题目切换消息"""
def __init__(self, question_index: int, question_text: str, options: list) -> None:
self.question_index = question_index # 题目索引
self.question_text = question_text
self.options = options # 选项列表
super().__init__()
def compose(self):
yield Label(self.atom[1].content.replace("/",""), id="sentence")
setting: Setting = self.atom.registry["nucleon"].metadata["orbital"]["puzzle"][
self.alia
]
yield Label(setting["primary"], id="sentence")
yield Label(self.puzzle.wording[len(self.inputlist)], id="puzzle")
yield Label(f"当前输入: {self.inputlist}", id="inputpreview")
# 渲染当前问题的选项
current_options = self.puzzle.options[len(self.inputlist)]
for i in current_options:
self.hashtable[str(hash(i))] = i
yield Button(i, id=f"select{hash(i)}")
yield Button("退格", id="delete")
def update_display(self):
# 更新预览标签
preview = self.query_one("#inputpreview")
preview.update(f"当前输入: {self.inputlist}") # type: ignore
preview.update(f"当前输入: {self.inputlist}") # type: ignore
# 更新问题标签
puzzle_label = self.query_one("#puzzle")
current_question_index = len(self.inputlist)
if current_question_index < len(self.puzzle.wording):
puzzle_label.update(self.puzzle.wording[current_question_index]) # type: ignore
puzzle_label.update(self.puzzle.wording[current_question_index]) # type: ignore
# 发送输入变化消息
self.post_message(self.InputChanged(
current_input=self.inputlist.copy(),
current_question=current_question_index,
total_questions=len(self.puzzle.answer),
current_question_text=self.puzzle.wording[current_question_index] if current_question_index < len(self.puzzle.wording) else ""
))
# 如果还有下一题,发送题目切换消息
if current_question_index < len(self.puzzle.options):
self.post_message(self.QuestionAdvanced(
question_index=current_question_index,
question_text=self.puzzle.wording[current_question_index],
options=self.puzzle.options[current_question_index]
))
def on_button_pressed(self, event: Button.Pressed) -> None:
"""处理按钮点击事件"""
button_id = event.button.id
if button_id == "delete":
# 退格处理
if len(self.inputlist) > 0:
self.inputlist.pop()
self.refresh_buttons()
self.update_display()
elif button_id.startswith("select"): # type: ignore
elif button_id.startswith("select"): # type: ignore
# 选项选择处理
answer_text = self.hashtable[button_id[6:]] # type: ignore
answer_text = self.hashtable[button_id[6:]] # type: ignore
self.inputlist.append(answer_text)
# 检查是否完成所有题目
if len(self.inputlist) >= len(self.puzzle.answer):
is_correct = self.inputlist == self.puzzle.answer
rating = 4 if is_correct else 2
# 发送完成消息
self.post_message(self.PuzzleCompleted(
atom=self.atom,
rating=rating,
is_correct=is_correct,
user_answers=self.inputlist.copy(),
correct_answers=self.puzzle.answer.copy()
))
self.post_message(
self.PuzzleCompleted(
atom=self.atom,
rating=rating,
is_correct=is_correct,
user_answers=self.inputlist.copy(),
correct_answers=self.puzzle.answer.copy(),
)
)
# 重置输入(如果回答错误)
if not is_correct:
self.inputlist = []
@@ -145,10 +127,14 @@ class MCQPuzzle(BasePuzzleWidget):
def refresh_buttons(self):
"""刷新按钮显示(用于题目切换)"""
# 移除所有选项按钮
buttons_to_remove = [child for child in self.children if hasattr(child, 'id') and child.id and child.id.startswith('select')]
buttons_to_remove = [
child
for child in self.children
if hasattr(child, "id") and child.id and child.id.startswith("select")
]
for button in buttons_to_remove:
self.remove_child(button) # type: ignore
self.remove_child(button) # type: ignore
# 添加当前题目的选项按钮
current_question_index = len(self.inputlist)
if current_question_index < len(self.puzzle.options):

View File

@@ -6,8 +6,24 @@ from textual.widget import Widget
class Placeholder(Widget):
def __init__(self, *children: Widget, name: str | None = None, alia: str = "", id: str | None = None, classes: str | None = None, disabled: bool = False, markup: bool = True) -> None:
super().__init__(*children, name=name, id=id, classes=classes, disabled=disabled, markup=markup)
def __init__(
self,
*children: Widget,
name: str | None = None,
alia: str = "",
id: str | None = None,
classes: str | None = None,
disabled: bool = False,
markup: bool = True
) -> None:
super().__init__(
*children,
name=name,
id=id,
classes=classes,
disabled=disabled,
markup=markup
)
def compose(self):
yield Label("示例标签", id="testlabel")

View File

@@ -14,6 +14,7 @@ from .base_puzzle_widget import BasePuzzleWidget
from typing import TypedDict, List
from textual.message import Message
class RecognitionConfig(TypedDict):
__origin__: str
__hint__: str
@@ -21,16 +22,35 @@ class RecognitionConfig(TypedDict):
secondary: List[str]
top_dim: List[str]
class Recognition(BasePuzzleWidget):
def __init__(self, *children: Widget, atom: pt.Atom, alia: str = "", name: str | None = None, id: str | None = None, classes: str | None = None, disabled: bool = False, markup: bool = True) -> None:
super().__init__(*children, atom=atom, name=name, id=id, classes=classes, disabled=disabled, markup=markup)
def __init__(
self,
*children: Widget,
atom: pt.Atom,
alia: str = "",
name: str | None = None,
id: str | None = None,
classes: str | None = None,
disabled: bool = False,
markup: bool = True,
) -> None:
super().__init__(
*children,
atom=atom,
name=name,
id=id,
classes=classes,
disabled=disabled,
markup=markup,
)
if alia == "":
alia = "Recognition"
self.alia = alia
def compose(self):
cfg: RecognitionConfig = self.atom.registry["orbital"]["puzzles"][self.alia]
delim = self.atom.registry['nucleon'].metadata["formation"]["delimiter"]
delim = self.atom.registry["nucleon"].metadata["formation"]["delimiter"]
replace_dict = {
", ": ",",
". ": ".",
@@ -43,8 +63,8 @@ class Recognition(BasePuzzleWidget):
f":{delim}": ":",
}
nucleon = self.atom.registry['nucleon']
metadata = self.atom.registry['nucleon'].metadata
nucleon = self.atom.registry["nucleon"]
metadata = self.atom.registry["nucleon"].metadata
primary = cfg["primary"]
with Center():
@@ -53,7 +73,7 @@ class Recognition(BasePuzzleWidget):
for old, new in replace_dict.items():
primary = primary.replace(old, new)
primary_splited = re.split(r"(?<=[,;:|])", cfg['primary'])
primary_splited = re.split(r"(?<=[,;:|])", cfg["primary"])
for item in primary_splited:
with Center():
yield Label(
@@ -68,7 +88,7 @@ class Recognition(BasePuzzleWidget):
continue
if isinstance(item, Dict):
total = ""
for j, k in item.items(): # type: ignore
for j, k in item.items(): # type: ignore
total += f"> **{j}**: {k} \n"
yield Markdown(total)
if isinstance(item, str):
@@ -79,4 +99,4 @@ class Recognition(BasePuzzleWidget):
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "ok":
self.screen.rating = 5 # type: ignore
self.screen.rating = 5 # type: ignore

View File

@@ -1,10 +1,10 @@
from .sm2 import SM2Algorithm
__all__ = [
'SM2Algorithm',
"SM2Algorithm",
]
algorithms = {
"SM-2": SM2Algorithm,
"supermemo2": SM2Algorithm,
}
}

View File

@@ -1,12 +1,13 @@
import heurams.services.timer as timer
from typing import TypedDict
class BaseAlgorithm:
algo_name = "BaseAlgorithm"
class AlgodataDict(TypedDict):
efactor: float
real_rept: int
real_rept: int
rept: int
interval: int
last_date: int
@@ -15,31 +16,33 @@ class BaseAlgorithm:
last_modify: float
defaults = {
'real_rept': 0,
'rept': 0,
'interval': 0,
'last_date': 0,
'next_date': 0,
'is_activated': 0,
'last_modify': timer.get_timestamp()
"real_rept": 0,
"rept": 0,
"interval": 0,
"last_date": 0,
"next_date": 0,
"is_activated": 0,
"last_modify": timer.get_timestamp(),
}
@classmethod
def revisor(cls, algodata: dict, feedback: int = 5, is_new_activation: bool = False) -> None:
def revisor(
cls, algodata: dict, feedback: int = 5, is_new_activation: bool = False
) -> None:
"""迭代记忆数据"""
pass
@classmethod
def is_due(cls, algodata) -> int:
"""是否应该复习"""
return 1
@classmethod
def rate(cls, algodata) -> str:
"""获取评分信息"""
return ""
@classmethod
def nextdate(cls, algodata) -> int:
"""获取下一次记忆时间戳"""
return -1
return -1

View File

@@ -1 +1 @@
# FSRS 算法模块, 尚未就绪
# FSRS 算法模块, 尚未就绪

View File

@@ -2,12 +2,13 @@ from .base import BaseAlgorithm
import heurams.services.timer as timer
from typing import TypedDict
class SM2Algorithm(BaseAlgorithm):
algo_name = "SM-2"
class AlgodataDict(TypedDict):
efactor: float
real_rept: int
real_rept: int
rept: int
interval: int
last_date: int
@@ -16,66 +17,72 @@ class SM2Algorithm(BaseAlgorithm):
last_modify: float
defaults = {
'efactor': 2.5,
'real_rept': 0,
'rept': 0,
'interval': 0,
'last_date': 0,
'next_date': 0,
'is_activated': 0,
'last_modify': timer.get_timestamp()
"efactor": 2.5,
"real_rept": 0,
"rept": 0,
"interval": 0,
"last_date": 0,
"next_date": 0,
"is_activated": 0,
"last_modify": timer.get_timestamp(),
}
@classmethod
def revisor(cls, algodata: dict, feedback: int = 5, is_new_activation: bool = False):
"""SM-2 算法迭代决策机制实现
根据 quality(0 ~ 5) 进行参数迭代最佳间隔
quality 由主程序评估
Args:
quality (int): 记忆保留率量化参数
@classmethod
def revisor(
cls, algodata: dict, feedback: int = 5, is_new_activation: bool = False
):
"""SM-2 算法迭代决策机制实现
根据 quality(0 ~ 5) 进行参数迭代最佳间隔
quality 由主程序评估
Args:
quality (int): 记忆保留率量化参数
"""
if feedback == -1:
return
algodata[cls.algo_name]['efactor'] = algodata[cls.algo_name]['efactor'] + (
algodata[cls.algo_name]["efactor"] = algodata[cls.algo_name]["efactor"] + (
0.1 - (5 - feedback) * (0.08 + (5 - feedback) * 0.02)
)
algodata[cls.algo_name]['efactor'] = max(1.3, algodata[cls.algo_name]['efactor'])
algodata[cls.algo_name]["efactor"] = max(
1.3, algodata[cls.algo_name]["efactor"]
)
if feedback < 3:
algodata[cls.algo_name]['rept'] = 0
algodata[cls.algo_name]['interval'] = 0
algodata[cls.algo_name]["rept"] = 0
algodata[cls.algo_name]["interval"] = 0
else:
algodata[cls.algo_name]['rept'] += 1
algodata[cls.algo_name]["rept"] += 1
algodata[cls.algo_name]['real_rept'] += 1
algodata[cls.algo_name]["real_rept"] += 1
if is_new_activation:
algodata[cls.algo_name]['rept'] = 0
algodata[cls.algo_name]['efactor'] = 2.5
algodata[cls.algo_name]["rept"] = 0
algodata[cls.algo_name]["efactor"] = 2.5
if algodata[cls.algo_name]['rept'] == 0:
algodata[cls.algo_name]['interval'] = 1
elif algodata[cls.algo_name]['rept'] == 1:
algodata[cls.algo_name]['interval'] = 6
if algodata[cls.algo_name]["rept"] == 0:
algodata[cls.algo_name]["interval"] = 1
elif algodata[cls.algo_name]["rept"] == 1:
algodata[cls.algo_name]["interval"] = 6
else:
algodata[cls.algo_name]['interval'] = round(
algodata[cls.algo_name]['interval'] * algodata[cls.algo_name]['efactor']
algodata[cls.algo_name]["interval"] = round(
algodata[cls.algo_name]["interval"] * algodata[cls.algo_name]["efactor"]
)
algodata[cls.algo_name]['last_date'] = timer.get_daystamp()
algodata[cls.algo_name]['next_date'] = timer.get_daystamp() + algodata[cls.algo_name]['interval']
algodata[cls.algo_name]['last_modify'] = timer.get_timestamp()
algodata[cls.algo_name]["last_date"] = timer.get_daystamp()
algodata[cls.algo_name]["next_date"] = (
timer.get_daystamp() + algodata[cls.algo_name]["interval"]
)
algodata[cls.algo_name]["last_modify"] = timer.get_timestamp()
@classmethod
def is_due(cls, algodata):
return (algodata[cls.algo_name]['next_date'] <= timer.get_daystamp())
return algodata[cls.algo_name]["next_date"] <= timer.get_daystamp()
@classmethod
def rate(cls, algodata):
return str(algodata[cls.algo_name]['efactor'])
return str(algodata[cls.algo_name]["efactor"])
@classmethod
def nextdate(cls, algodata) -> int:
return algodata[cls.algo_name]['next_date']
return algodata[cls.algo_name]["next_date"]

View File

@@ -21,4 +21,4 @@ __all__ = [
"load_nucleon",
"load_electron",
"atom_registry",
]
]

View File

@@ -9,6 +9,7 @@ import json
import bidict
from heurams.context import config_var
class AtomRegister(TypedDict):
nucleon: Nucleon
nucleon_path: pathlib.Path
@@ -21,7 +22,8 @@ class AtomRegister(TypedDict):
orbital_fmt: str
runtime: dict
class Atom():
class Atom:
"""
统一处理一系列对象的所有信息与持久化:
关联电子 (算法数据)
@@ -30,11 +32,11 @@ class Atom():
以及关联路径
"""
def __init__(self, ident = ""):
def __init__(self, ident=""):
self.ident = ident
atom_registry[ident] = self
# self.is_evaled = False
self.registry: AtomRegister = { # type: ignore
self.registry: AtomRegister = { # type: ignore
"nucleon": None,
"nucleon_path": None,
"nucleon_fmt": "toml",
@@ -42,7 +44,7 @@ class Atom():
"electron_path": None,
"electron_fmt": "json",
"orbital": None,
"orbital_path": None, # 允许设置为 None, 此时使用 nucleon 文件内的推荐配置
"orbital_path": None, # 允许设置为 None, 此时使用 nucleon 文件内的推荐配置
"orbital_fmt": "toml",
}
self.do_eval()
@@ -53,16 +55,17 @@ class Atom():
self.do_eval()
else:
raise ValueError("不受支持的原子元数据链接操作")
def do_eval(self):
"""
执行并以结果替换当前单元的所有 eval 语句
TODO: 带有限制的 eval, 异步/多线程执行避免堵塞
"""
# eval 环境设置
def eval_with_env(s: str):
try:
nucleon = self.registry['nucleon']
nucleon = self.registry["nucleon"]
default = config_var.get()["puzzles"]
metadata = nucleon.metadata
except:
@@ -72,7 +75,7 @@ class Atom():
except Exception as e:
ret = f"此 eval 实例发生错误: {e}"
return ret
def traverse(data, modifier):
if isinstance(data, dict):
for key, value in data.items():
@@ -89,10 +92,9 @@ class Atom():
if data.startswith("eval:"):
return modifier(data[5:])
return data
traverse(self.registry["nucleon"], eval_with_env)
traverse(self.registry["orbital"], eval_with_env)
def persist(self, key):
path: pathlib.Path | None = self.registry[key + "_path"]
@@ -109,7 +111,7 @@ class Atom():
raise KeyError("不受支持的持久化格式")
else:
raise TypeError("对未初始化的路径对象操作")
def __getitem__(self, key):
if key in self.registry:
return self.registry[key]
@@ -124,5 +126,6 @@ class Atom():
@staticmethod
def placeholder():
return (Electron.placeholder(), Nucleon.placeholder(), {})
atom_registry: bidict.bidict[str, Atom] = bidict.bidict()

View File

@@ -2,12 +2,13 @@ import heurams.services.timer as timer
from heurams.context import config_var
from heurams.kernel.algorithms import algorithms
class Electron:
"""电子: 记忆分析元数据及算法"""
def __init__(self, ident: str, algodata: dict = {}, algo_name: str = "supermemo2"):
"""初始化电子对象 (记忆数据)
Args:
ident: 算法的唯一标识符, 用于区分不同的算法实例, 使用 algodata[ident] 获取
algodata: 算法数据字典, 包含算法的各项参数和设置
@@ -28,34 +29,34 @@ class Electron:
def activate(self):
"""激活此电子"""
self.algodata[self.algo]['is_activated'] = 1
self.algodata[self.algo]['last_modify'] = timer.get_timestamp()
self.algodata[self.algo]["is_activated"] = 1
self.algodata[self.algo]["last_modify"] = timer.get_timestamp()
def modify(self, var: str, value):
"""修改 algodata[algo] 中子字典数据"""
if var in self.algodata[self.algo]:
self.algodata[self.algo][var] = value
self.algodata[self.algo]['last_modify'] = timer.get_timestamp()
self.algodata[self.algo]["last_modify"] = timer.get_timestamp()
else:
print(f"警告: '{var}' 非已知元数据字段")
def is_due(self):
"""是否应该复习"""
return self.algo.is_due(self.algodata)
def is_activated(self):
return self.algodata[self.algo]['is_activated']
return self.algodata[self.algo]["is_activated"]
def rate(self):
"评价"
return self.algo.rate(self.algodata)
def nextdate(self) -> int:
return self.algo.nextdate(self.algodata)
def revisor(self, quality: int = 5, is_new_activation: bool = False):
"""算法迭代决策机制实现
Args:
quality (int): 记忆保留率量化参数 (0-5)
is_new_activation (bool): 是否为初次激活
@@ -93,7 +94,7 @@ class Electron:
if key == "ident":
raise AttributeError("ident 应为只读")
self.algodata[self.algo][key] = value
self.algodata[self.algo]['last_modify'] = timer.get_timestamp()
self.algodata[self.algo]["last_modify"] = timer.get_timestamp()
def __len__(self):
"""仅返回当前算法的配置数量"""
@@ -102,4 +103,4 @@ class Electron:
@staticmethod
def placeholder():
"""生成一个电子占位符"""
return Electron("电子对象样例内容", {})
return Electron("电子对象样例内容", {})

View File

@@ -6,17 +6,18 @@ import toml
import json
from copy import deepcopy
def load_nucleon(path: pathlib.Path, fmt = "toml"):
def load_nucleon(path: pathlib.Path, fmt="toml"):
with open(path, "r") as f:
dictdata = dict()
dictdata = toml.load(f) # type: ignore
dictdata = toml.load(f) # type: ignore
lst = list()
nested_data = dict()
# 修正 toml 解析器的不管嵌套行为
for key, value in dictdata.items():
if "__metadata__" in key: # 以免影响句号
if '.' in key:
parts = key.split('.')
if "__metadata__" in key: # 以免影响句号
if "." in key:
parts = key.split(".")
current = nested_data
for part in parts[:-1]:
if part not in current:
@@ -29,23 +30,31 @@ def load_nucleon(path: pathlib.Path, fmt = "toml"):
for item, attr in nested_data.items():
if item == "__metadata__":
continue
lst.append((Nucleon(hasher.hash(item), attr, deepcopy(nested_data['__metadata__'])), deepcopy(nested_data["__metadata__"]["orbital"])))
lst.append(
(
Nucleon(
hasher.hash(item), attr, deepcopy(nested_data["__metadata__"])
),
deepcopy(nested_data["__metadata__"]["orbital"]),
)
)
return lst
def load_electron(path: pathlib.Path, fmt = "json") -> dict:
def load_electron(path: pathlib.Path, fmt="json") -> dict:
"""从文件路径加载电子对象
Args:
path (pathlib.Path): 路径
fmt (str): 文件格式(可选, 默认 json)
Returns:
dict: 键名是电子对象名称, 值是电子对象
"""
with open(path, "r") as f:
dictdata = dict()
dictdata = json.load(f) # type: ignore
dictdata = json.load(f) # type: ignore
dic = dict()
for item, attr in dictdata.items():
dic[item] = (Electron(hasher.hash(item), attr))
return dic
dic[item] = Electron(hasher.hash(item), attr)
return dic

View File

@@ -29,12 +29,13 @@ class Nucleon:
def __hash__(self):
return hash(self.ident)
def do_eval(self):
"""
执行并以结果替换当前单元的所有 eval 语句
TODO: 带有限制的 eval, 异步/多线程执行避免堵塞
"""
# eval 环境设置
def eval_with_env(s: str):
try:
@@ -43,7 +44,7 @@ class Nucleon:
except Exception as e:
ret = f"此 eval 实例发生错误: {e}"
return ret
def traverse(data, modifier):
if isinstance(data, dict):
for key, value in data.items():
@@ -60,9 +61,10 @@ class Nucleon:
if data.startswith("eval:"):
return modifier(data[5:])
return data
traverse(self.payload, eval_with_env)
traverse(self.metadata, eval_with_env)
@staticmethod
def placeholder():
"""生成一个占位原子核"""

View File

@@ -1,14 +1,16 @@
from typing import TypedDict
class OrbitalSchedule(TypedDict):
quick_review: list
recognition: list
final_review: list
class Orbital(TypedDict):
schedule: OrbitalSchedule
puzzles: dict
"""一份示例
["__metadata__.orbital.puzzles"] # 谜题定义
@@ -20,4 +22,4 @@ class Orbital(TypedDict):
quick_review = [["FillBlank", "1.0"], ["SelectMeaning", "0.5"], ["recognition", "1.0"]]
recognition = [["recognition", "1.0"]]
final_review = [["FillBlank", "0.7"], ["SelectMeaning", "0.7"], ["recognition", "1.0"]]
"""
"""

View File

@@ -1,6 +1,7 @@
from heurams.context import config_var
import pathlib
def probe_by_filename(filename):
"""探测指定文件 (无扩展名) 的所有信息"""
paths: dict = config_var.get().get("paths")
@@ -8,17 +9,18 @@ def probe_by_filename(filename):
result = {}
for item, attr in paths.items():
for i in formats:
attr: pathlib.Path = pathlib.Path(attr) / filename + '.' + i
attr: pathlib.Path = pathlib.Path(attr) / filename + "." + i
if attr.exists():
result[item.replace("_dir", "")] = str(attr)
return result
def probe_all(is_stem = 1):
def probe_all(is_stem=1):
"""依据目录探测所有信息
Args:
is_stem (boolean): 是否**删除**文件扩展名
Returns:
dict: 有三项, 每一项的键名都是文件组类型, 值都是文件组列表, 只包含文件名
"""
@@ -35,7 +37,9 @@ def probe_all(is_stem = 1):
result[item.replace("_dir", "")].append(str(i.name))
return result
if __name__ == "__main__":
import os
print(os.getcwd())
print(probe_all())

View File

@@ -10,10 +10,10 @@ from .mcq import MCQPuzzle
from .recognition import RecognitionPuzzle
__all__ = [
'BasePuzzle',
'ClozePuzzle',
'MCQPuzzle',
'RecognitionPuzzle',
"BasePuzzle",
"ClozePuzzle",
"MCQPuzzle",
"RecognitionPuzzle",
]
puzzles = {
@@ -23,6 +23,7 @@ puzzles = {
"base": BasePuzzle,
}
@staticmethod
def create_by_dict(config_dict: dict) -> BasePuzzle:
"""
@@ -37,19 +38,19 @@ def create_by_dict(config_dict: dict) -> BasePuzzle:
Raises:
ValueError: 当配置无效时抛出
"""
puzzle_type = config_dict.get('type')
puzzle_type = config_dict.get("type")
if puzzle_type == 'cloze':
if puzzle_type == "cloze":
return puzzles["cloze"](
text=config_dict['text'],
min_denominator=config_dict.get('min_denominator', 7)
text=config_dict["text"],
min_denominator=config_dict.get("min_denominator", 7),
)
elif puzzle_type == 'mcq':
elif puzzle_type == "mcq":
return puzzles["mcq"](
mapping=config_dict['mapping'],
jammer=config_dict.get('jammer', []),
max_riddles_num=config_dict.get('max_riddles_num', 2),
prefix=config_dict.get('prefix', '')
mapping=config_dict["mapping"],
jammer=config_dict.get("jammer", []),
max_riddles_num=config_dict.get("max_riddles_num", 2),
prefix=config_dict.get("prefix", ""),
)
else:
raise ValueError(f"未知的谜题类型: {puzzle_type}")
raise ValueError(f"未知的谜题类型: {puzzle_type}")

View File

@@ -1,9 +1,9 @@
# base.py
class BasePuzzle:
"""谜题基类"""
def refresh(self):
raise NotImplementedError("谜题对象未实现 refresh 方法")
def __str__(self):
return f"谜题: {type(self).__name__}"
return f"谜题: {type(self).__name__}"

View File

@@ -1,9 +1,10 @@
from .base import BasePuzzle
import random
class ClozePuzzle(BasePuzzle):
"""填空题谜题生成器
Args:
text: 原始字符串(需要 delimiter 分割句子, 末尾应有 delimiter)
min_denominator: 最小概率倒数(如占所有可生成填空数的 1/7 中的 7, 若期望值小于 1, 则取 1)

View File

@@ -2,15 +2,12 @@
from .base import BasePuzzle
import random
class MCQPuzzle(BasePuzzle):
"""选择题谜题生成器"""
def __init__(
self,
mapping: dict,
jammer: list,
max_riddles_num: int = 2,
prefix: str = ""
self, mapping: dict, jammer: list, max_riddles_num: int = 2, prefix: str = ""
):
self.prefix = prefix
self.mapping = mapping
@@ -29,18 +26,16 @@ class MCQPuzzle(BasePuzzle):
self.answer = ["无答案"]
self.options = []
return
num_questions = min(self.max_riddles_num, len(self.mapping))
questions = random.sample(list(self.mapping.items()), num_questions)
puzzles = []
answers = []
all_options = []
for question, correct_answer in questions:
options = [correct_answer]
available_jammers = [
j for j in self.jammer if j != correct_answer
]
available_jammers = [j for j in self.jammer if j != correct_answer]
if len(available_jammers) >= 3:
selected_jammers = random.sample(available_jammers, 3)
else:
@@ -50,14 +45,14 @@ class MCQPuzzle(BasePuzzle):
puzzles.append(question)
answers.append(correct_answer)
all_options.append(options)
question_texts = []
for i, puzzle in enumerate(puzzles):
question_texts.append(f"{self.prefix}:\n {i+1}. {puzzle}")
self.wording = question_texts
self.answer = answers
self.options = all_options
def __str__(self):
return f"{self.wording}\n正确答案: {', '.join(self.answer)}"
return f"{self.wording}\n正确答案: {', '.join(self.answer)}"

View File

@@ -2,11 +2,12 @@
from .base import BasePuzzle
import random
class RecognitionPuzzle(BasePuzzle):
"""识别占位符"""
def __init__(self) -> None:
super().__init__()
def refresh(self):
pass
pass

View File

@@ -3,10 +3,4 @@ from .procession import Procession
from .fission import Fission
from .phaser import Phaser
__all__ = [
"PhaserState",
"ProcessionState",
"Procession",
"Fission",
"Phaser"
]
__all__ = ["PhaserState", "ProcessionState", "Procession", "Fission", "Phaser"]

View File

@@ -3,27 +3,34 @@ import heurams.kernel.puzzles as puz
import random
from .states import PhaserState
class Fission():
class Fission:
"""裂变器: 单原子调度展开器"""
def __init__(self, atom: pt.Atom, phase = PhaserState.RECOGNITION):
def __init__(self, atom: pt.Atom, phase=PhaserState.RECOGNITION):
self.atom = atom
self.orbital_schedule = atom.registry["orbital"]["schedule"][phase.value] # type: ignore
self.orbital_puzzles = atom.registry["orbital"]["puzzles"]
#print(self.orbital_schedule)
self.orbital_schedule = atom.registry["orbital"]["schedule"][phase.value] # type: ignore
self.orbital_puzzles = atom.registry["orbital"]["puzzles"]
# print(self.orbital_schedule)
self.puzzles = list()
for item, possibility in self.orbital_schedule: # type: ignore
for item, possibility in self.orbital_schedule: # type: ignore
if not isinstance(possibility, float):
possibility = float(possibility)
while possibility > 1:
self.puzzles.append({
"puzzle": puz.puzzles[self.orbital_puzzles[item]["__origin__"]],
"alia": item
})
self.puzzles.append(
{
"puzzle": puz.puzzles[self.orbital_puzzles[item]["__origin__"]],
"alia": item,
}
)
possibility -= 1
if random.random() <= possibility:
self.puzzles.append({
"puzzle": puz.puzzles[self.orbital_puzzles[item]["__origin__"]],
"alia": item
})
self.puzzles.append(
{
"puzzle": puz.puzzles[self.orbital_puzzles[item]["__origin__"]],
"alia": item,
}
)
def generate(self):
yield from self.puzzles

View File

@@ -4,8 +4,10 @@ import heurams.kernel.particles as pt
from .states import PhaserState, ProcessionState
from .procession import Procession
class Phaser():
class Phaser:
"""移相器: 全局调度阶段管理器"""
def __init__(self, atoms: list[pt.Atom]) -> None:
new_atoms = list()
old_atoms = list()
@@ -17,10 +19,14 @@ class Phaser():
old_atoms.append(i)
self.processions = list()
if len(old_atoms):
self.processions.append(Procession(old_atoms, PhaserState.QUICK_REVIEW, "初始复习"))
self.processions.append(
Procession(old_atoms, PhaserState.QUICK_REVIEW, "初始复习")
)
if len(new_atoms):
self.processions.append(Procession(new_atoms,PhaserState.RECOGNITION, "新记忆"))
self.processions.append(Procession(atoms,PhaserState.FINAL_REVIEW, "总体复习"))
self.processions.append(
Procession(new_atoms, PhaserState.RECOGNITION, "新记忆")
)
self.processions.append(Procession(atoms, PhaserState.FINAL_REVIEW, "总体复习"))
def current_procession(self):
for i in self.processions:
@@ -29,4 +35,4 @@ class Phaser():
self.state = i.phase
return i
self.state = PhaserState.FINISHED
return 0
return 0

View File

@@ -1,8 +1,10 @@
import heurams.kernel.particles as pt
from .states import PhaserState, ProcessionState
class Procession():
class Procession:
"""队列: 标识单次记忆流程"""
def __init__(self, atoms: list, phase: PhaserState, name: str = ""):
self.atoms = atoms
self.queue = atoms.copy()
@@ -12,7 +14,7 @@ class Procession():
self.phase = phase
self.state: ProcessionState = ProcessionState.RUNNING
def forward(self, step = 1):
def forward(self, step=1):
self.cursor += step
if self.cursor == len(self.queue):
self.state = ProcessionState.FINISHED
@@ -20,22 +22,22 @@ class Procession():
self.state = ProcessionState.RUNNING
try:
self.current_atom = self.queue[self.cursor]
return 1 # 成功
return 1 # 成功
except IndexError as e:
print(f"{e}")
return 0
def append(self, atom = None):
def append(self, atom=None):
if atom == None:
atom = self.current_atom
if self.queue[len(self.queue) - 1] != atom or len(self) <= 1:
self.queue.append(atom)
self.queue.append(atom)
def __len__(self):
return (len(self.queue) - self.cursor)
return len(self.queue) - self.cursor
def process(self):
return (self.cursor)
return self.cursor
def total_length(self):
return len(self.queue)

View File

@@ -1,5 +1,6 @@
from enum import Enum, auto
class PhaserState(Enum):
UNSURE = "unsure"
QUICK_REVIEW = "quick_review"
@@ -7,6 +8,7 @@ class PhaserState(Enum):
FINAL_REVIEW = "final_review"
FINISHED = "finished"
class ProcessionState(Enum):
RUNNING = auto()
FINISHED = auto()
FINISHED = auto()

View File

@@ -7,7 +7,4 @@ __all__ = [
"playsound_audio",
]
providers = {
"termux": termux_audio,
"playsound": playsound_audio
}
providers = {"termux": termux_audio, "playsound": playsound_audio}

View File

@@ -1,4 +1,4 @@
""" 通用音频适配器
"""通用音频适配器
基于 playsound 库的音频播放器, 在绝大多数 python 环境上提供音频服务
注意: 在未配置 pulseaudio 的 termux 不可用
"""
@@ -7,5 +7,6 @@ import os
import pathlib
import playsound
def play_by_path(path: pathlib.Path):
playsound.playsound(str(path))
playsound.playsound(str(path))

View File

@@ -1,5 +1,6 @@
from typing import Protocol
import pathlib
class PlayFunctionProtocol(Protocol):
def __call__(self, path: pathlib.Path) -> None: ...
def __call__(self, path: pathlib.Path) -> None: ...

View File

@@ -1,11 +1,13 @@
""" Termux 音频适配
"""Termux 音频适配
适配 Termux 的 play-audio 命令, 以在 android 上提供可用的播放体验
无需配置 pulseaudio
"""
import os
import pathlib
#from .protocol import PlayFunctionProtocol
# from .protocol import PlayFunctionProtocol
def play_by_path(path: pathlib.Path):
os.system(f"play-audio {path}")
os.system(f"play-audio {path}")

View File

@@ -1 +1 @@
# 大语言模型
# 大语言模型

View File

@@ -8,5 +8,5 @@ __all__ = [
providers = {
"basetts": BaseTTS,
"edgetts": EdgeTTS,
}
"edgetts": EdgeTTS,
}

View File

@@ -1,9 +1,10 @@
import pathlib
class BaseTTS:
name = "BaseTTS"
@classmethod
def convert(cls, text: str, path: pathlib.Path | str = "") -> pathlib.Path:
"""path 是可选参数, 不填则自动返回生成文件路径"""
return path # type: ignore
return path # type: ignore

View File

@@ -2,6 +2,7 @@ from .base import BaseTTS
import pathlib
import edge_tts
class EdgeTTS(BaseTTS):
name = "EdgeTTS"
@@ -12,4 +13,4 @@ class EdgeTTS(BaseTTS):
"zh-CN-YunjianNeural",
)
communicate.save_sync(str(path))
return path # type: ignore
return path # type: ignore

View File

@@ -3,4 +3,4 @@ from heurams.context import config_var
from heurams.providers.audio import providers as prov
from typing import Callable
play_by_path: Callable = prov[config_var.get()["services"]["audio"]].play_by_path
play_by_path: Callable = prov[config_var.get()["services"]["audio"]].play_by_path

View File

@@ -3,6 +3,7 @@ import pathlib
import toml
import typing
class ConfigFile:
def __init__(self, path: pathlib.Path):
self.path = path
@@ -13,7 +14,7 @@ class ConfigFile:
def _load(self):
"""从文件加载配置数据"""
with open(self.path, 'r') as f:
with open(self.path, "r") as f:
try:
self.data = toml.load(f)
except toml.TomlDecodeError as e:
@@ -28,20 +29,20 @@ class ConfigFile:
def save(self, path: typing.Union[str, pathlib.Path] = ""):
"""保存配置到文件"""
save_path = pathlib.Path(path) if path else self.path
with open(save_path, 'w') as f:
with open(save_path, "w") as f:
toml.dump(self.data, f)
def get(self, key: str, default: typing.Any = None) -> typing.Any:
"""获取配置值,如果不存在返回默认值"""
return self.data.get(key, default)
def __getitem__(self, key: str) -> typing.Any:
return self.data[key]
def __setitem__(self, key: str, value: typing.Any):
self.data[key] = value
self.save()
def __contains__(self, key: str) -> bool:
"""支持 in 语法"""
return key in self.data
return key in self.data

View File

@@ -1,8 +1,10 @@
# 哈希服务
import hashlib
def get_md5(text):
return hashlib.md5(text.encode('utf-8')).hexdigest()
return hashlib.md5(text.encode("utf-8")).hexdigest()
def hash(text):
return hashlib.md5(text.encode('utf-8')).hexdigest()
return hashlib.md5(text.encode("utf-8")).hexdigest()

View File

@@ -2,19 +2,21 @@
from heurams.context import config_var
import time
def get_daystamp() -> int:
"""获取当前日戳(以天为单位的整数时间戳)"""
time_override = config_var.get().get("daystamp_override", -1)
if time_override != -1:
return int(time_override)
return int((time.time() + config_var.get().get("timezone_offset")) // (24 * 3600))
def get_timestamp() -> float:
"""获取 UNIX 时间戳"""
# 搞这个类的原因是要支持可复现操作
time_override = config_var.get().get("timestamp_override", -1)
if time_override != -1:
return float(time_override)
return time.time()
return time.time()

View File

@@ -3,4 +3,4 @@ from heurams.context import config_var
from heurams.providers.tts import TTSs
from typing import Callable
convert: Callable = TTSs[config_var.get().get("tts_provider")]
convert: Callable = TTSs[config_var.get().get("tts_provider")]

View File

@@ -2,4 +2,4 @@
ver = "0.4.0"
stage = "prototype"
codename = "fledge" # 雏鸟, 0.4.x 版本
codename = "fledge" # 雏鸟, 0.4.x 版本