diff --git a/.gitignore b/.gitignore index b0fbf27..67b9206 100644 --- a/.gitignore +++ b/.gitignore @@ -2,7 +2,6 @@ .vscode .directory __pycache__/ -scripts/ .idea cache nucleon/test.toml diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..580d0e0 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,9 @@ +[project.scripts] +heurams = "src.__main__:main" + +[project] +name = "heurams" +version = "0.4.0" + +[tool.setuptools] +packages = ["src"] \ No newline at end of file diff --git a/src/heurams/__init__.py b/src/heurams/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/heurams/__main__.py b/src/heurams/__main__.py new file mode 100644 index 0000000..1f425a1 --- /dev/null +++ b/src/heurams/__main__.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python3 +from textual.app import App +import screens +import os + +class AppLauncher(App): + CSS_PATH = "styles.css" + TITLE = "潜进 - 辅助记忆调度器" + BINDINGS = [("escape", "quit", "退出"), ("d", "toggle_dark", "改变色调")] + SCREENS = { + "dashboard": screens.DashboardScreen, + } + + def on_mount(self) -> None: + self.push_screen("dashboard") + +if __name__ == "__main__": + script_dir = os.path.dirname(os.path.abspath(__file__)) + os.chdir(script_dir) + os.makedirs("electron", exist_ok=True) + os.makedirs("nucleon", exist_ok=True) + os.makedirs("cache/voice", exist_ok=True) + app = AppLauncher() + app.run() \ No newline at end of file diff --git a/src/heurams/core/particles.py b/src/heurams/core/particles.py new file mode 100644 index 0000000..6835014 --- /dev/null +++ b/src/heurams/core/particles.py @@ -0,0 +1,278 @@ +#!/usr/bin/env python3 +import pathlib +import toml +import time +import heurams.services.timer as timer +from typing import List + +class Electron: + """电子: 记忆分析元数据及算法""" + algorithm = "SM-2" # 暂时使用 SM-2 算法进行记忆拟合, 考虑 SM-15 替代 + + def __init__(self, content: str, metadata: dict): + self.content = content + self.metadata = metadata + if metadata == {}: + # print("NULL") + self._default_init() + + def _default_init(self): + defaults = { + 'efactor': 2.5, # 易度系数, 越大越简单, 最大为5 + 'real_rept': 0, # (实际)重复次数 + 'rept': 0, # (有效)重复次数 + 'interval': 0, # 最佳间隔 + 'last_date': 0, # 上一次复习的时间戳 + 'next_date': 0, # 将要复习的时间戳 + 'is_activated': 0, # 激活状态 + # *NOTE: 此处"时间戳"是以天为单位的整数, 即 UNIX 时间戳除以一天的秒数取整 + 'last_modify': time.time() # 最后修改时间戳(此处是UNIX时间戳) + } + self.metadata = defaults + + def activate(self): + self.metadata['is_activated'] = 1 + self.metadata['last_modify'] = time.time() + + def modify(self, var: str, value): + if var in self.metadata: + self.metadata[var] = value + self.metadata['last_modify'] = time.time() + else: + print(f"警告: '{var}' 非已知元数据字段") + + def revisor(self, quality: int = 5, is_new_activation: bool = False): + """SM-2 算法迭代决策机制实现 + 根据 quality(0 ~ 5) 进行参数迭代最佳间隔 + quality 由主程序评估 + + Args: + quality (int): 记忆保留率量化参数 + """ + print(f"REVISOR: {quality}, {is_new_activation}") + if quality == -1: + return -1 + + self.metadata['efactor'] = self.metadata['efactor'] + ( + 0.1 - (5 - quality) * (0.08 + (5 - quality) * 0.02) + ) + self.metadata['efactor'] = max(1.3, self.metadata['efactor']) + + if quality < 3: + # 若保留率低于 3,重置重复次数 + self.metadata['rept'] = 0 + self.metadata['interval'] = 0 # 设为0,以便下面重新计算 I(1) + else: + self.metadata['rept'] += 1 + + self.metadata['real_rept'] += 1 + + if is_new_activation: # 初次激活 + self.metadata['rept'] = 0 + self.metadata['efactor'] = 2.5 + + if self.metadata['rept'] == 0: # 刚被重置或初次激活后复习 + self.metadata['interval'] = 1 # I(1) + elif self.metadata['rept'] == 1: + self.metadata['interval'] = 6 # I(2) 经验公式 + else: + self.metadata['interval'] = round( + self.metadata['interval'] * self.metadata['efactor'] + ) + + self.metadata['last_date'] = timer.get_daystamp() + self.metadata['next_date'] = timer.get_daystamp() + self.metadata['interval'] + self.metadata['last_modify'] = time.time() + + def __str__(self): + return ( + f"记忆单元预览 \n" + f"内容: '{self.content}' \n" + f"易度系数: {self.metadata['efactor']:.2f} \n" + f"已经重复的次数: {self.metadata['rept']} \n" + f"下次间隔: {self.metadata['interval']} 天 \n" + f"下次复习日期时间戳: {self.metadata['next_date']}" + ) + + def __eq__(self, other): + if self.content == other.content: + return True + return False + + def __hash__(self): + return hash(self.content) + + def __getitem__(self, key): + if key == "content": + return self.content + if key in self.metadata: + return self.metadata[key] + else: + raise KeyError(f"Key '{key}' not found in metadata.") + + def __setitem__(self, key, value): + if key == "content": + raise AttributeError("content 应为只读") + self.metadata[key] = value + self.metadata['last_modify'] = time.time() + + def __iter__(self): + yield from self.metadata.keys() + + def __len__(self): + return len(self.metadata) + + @staticmethod + def placeholder(): + return Electron("电子对象样例内容", {}) + + +class Nucleon: + """核子: 材料元数据""" + + def __init__(self, content: str, data: dict): + self.metadata = data + self.content = content + + def __getitem__(self, key): + if key == "content": + return self.content + if key in self.metadata: + return self.metadata[key] + else: + raise KeyError(f"Key '{key}' not found in metadata.") + + def __iter__(self): + yield from self.metadata.keys() + + def __len__(self): + return len(self.metadata) + + def __hash__(self): + return hash(self.content) + + @staticmethod + def placeholder(): + return Nucleon("核子对象样例内容", {}) + + +class NucleonUnion(): + """ + 替代原有 NucleonFile 类, 支持复杂逻辑 + + Attributes: + path (Path): 对应于 NucleonUnion 实例的文件路径。 + name (str): 核联对象的显示名称,从文件名中派生。 + nucleons (list): 内部核子对象的列表。 + nucleons_dict (dict): 内部核子对象的字典,以核子内容作为键。 + keydata (dict): 核子对象字典键名的翻译。 + testdata (dict): 记忆测试项目的元数据。 + + Parameters: + path (Path): 包含核子数据的文件路径。 + """ + + def __init__(self, path: pathlib.Path): + self.path = path + self.name = path.name.replace(path.suffix, "") + with open(path, 'r') as f: + all = toml.load(f) + lst = list() + for i in all.keys(): + if "attr" in i: + continue + if "data" in i: + continue + lst.append(Nucleon(i, all[i])) + self.keydata = all["keydata"] + self.testdata = all["testdata"] + self.nucleons: List[Nucleon] = lst + self.nucleons_dict = {i.content: i for i in lst} + + def __len__(self): + return len(self.nucleons) + + def linked_electron_union(self): + if (self.path.parent / '..' / 'electron' / self.path.name).exists(): + return ElectronUnion(self.path.parent / '..' / 'electron' / self.path.name) + else: + return 0 + + def save(self): + with open(self.path, 'w') as f: + tmp = {i.content: i.metadata for i in self.nucleons} + toml.dump(tmp, f) + + +class ElectronUnion: + """取代原有 ElectronFile 类, 以支持复杂逻辑""" + def __init__(self, path): + self.path = path + print(path) + self.name = path.name.replace(path.suffix, "") + with open(path, 'r') as f: + all = toml.load(f) + lst = list() + for i in all.keys(): + if i != "total": + lst.append(Electron(i, all[i])) + self.total = all.get("total", {"last_date": 0}) + self.electrons = lst + self.electrons_dict = {i.content: i for i in lst} + + def sync(self): + """同步 electrons_dict 中新增对到 electrons 中, 仅用于缺省初始化不存在映射时调用""" + self.electrons = self.electrons_dict.values() + + def __len__(self): + return len(self.electrons) + + def linked_nucleon_union(self): + return NucleonUnion(self.path.parent / '..' / 'nucleon' / self.path.name) + + def save(self): + # print(1) + self.total["last_date"] = timer.get_daystamp() + with open(self.path, 'w') as f: + tmp = {i.content: i.metadata for i in self.electrons} + tmp["total"] = self.total + # print(tmp) + toml.dump(tmp, f) + + +class Atom: + @staticmethod + def placeholder(): + return (Electron.placeholder(), Nucleon.placeholder(), {}) + + @staticmethod + def advanced_placeholder(): + return ( + Electron("两只黄鹤鸣翠柳", {}), + Nucleon( + "两只黄鹤鸣翠柳", + { + "note": [], + "translation": "臣子李密陈言:我因命运不好,小时候遭遇到了不幸", + "keyword_note": { + "险衅": "凶险祸患(这里指命运不好)", + "夙": "早时,这里指年幼的时候", + "闵": "通'悯',指可忧患的事", + "凶": "不幸,指丧父" + } + } + ), + { + "keydata": { + "note": "笔记", + "keyword_note": "关键词翻译", + "translation": "语句翻译" + }, + "testdata": { + "additional_inf": ["translation", "note", "keyword_note"], + "fill_blank_test": ["translation"], + "draw_card_test": ["keyword_note"] + }, + "is_new_activation": 0 + } + ) \ No newline at end of file diff --git a/src/heurams/core/puzzles/__init__.py b/src/heurams/core/puzzles/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/heurams/core/puzzles/base.py b/src/heurams/core/puzzles/base.py new file mode 100644 index 0000000..e69de29 diff --git a/src/heurams/core/puzzles/blank.py b/src/heurams/core/puzzles/blank.py new file mode 100644 index 0000000..e69de29 diff --git a/src/heurams/core/puzzles/factory.py b/src/heurams/core/puzzles/factory.py new file mode 100644 index 0000000..e69de29 diff --git a/src/heurams/core/puzzles/selection.py b/src/heurams/core/puzzles/selection.py new file mode 100644 index 0000000..e69de29 diff --git a/src/heurams/interface/screens/dashboard.py b/src/heurams/interface/screens/dashboard.py new file mode 100644 index 0000000..e69de29 diff --git a/src/heurams/interface/screens/memory.py b/src/heurams/interface/screens/memory.py new file mode 100644 index 0000000..5e9c6d7 --- /dev/null +++ b/src/heurams/interface/screens/memory.py @@ -0,0 +1,140 @@ + +class MemScreen(Screen): + BINDINGS = [ + ("d", "toggle_dark", "改变色调"), + ("q", "pop_screen", "返回主菜单"), + ("v", "play_voice", "朗读"), + # ("p", "precache_current", "预缓存当前单元集"), # 新增预缓存快捷键 + ] + if config.get("quick_pass"): + BINDINGS.append(("k", "quick_pass", "快速通过[调试]")) + btn = dict() + + def __init__( + self, + nucleon_file: pt.NucleonUnion, + electron_file: pt.ElectronUnion, + tasked_num, + ): + super().__init__(name=None, id=None, classes=None) + self.nucleon_file = nucleon_file + self.electron_file = electron_file + self.reactor = Reactor(nucleon_file, electron_file, self, tasked_num) + self.stage = 1 + self.stage += self.reactor.set_round_templated(self.stage) + first_forward = self.reactor.forward() + print(first_forward) + if first_forward == -1: + self.stage = 3 + self.reactor.set_round_templated(3) + print(self.reactor.forward()) + #self._forward_judge(first_forward) + self.compo = next(self.reactor.current_appar) + self.feedback_state = 0 # 默认状态 + self.feedback_state_map = { + 0: "", + 255: "回答有误, 请重试. 或者重新学习此单元", + } + + def compose(self) -> ComposeResult: + if type(self.compo).__name__ == "Recognition": + self.action_play_voice() + yield Header(show_clock=True) + with Center(): + yield Static( + f"当前进度: {len(self.reactor.procession) - self.reactor.index}/{len(self.reactor.procession)}" + ) + yield Label(self.feedback_state_map[self.feedback_state]) + yield from self.compo.compose() + if self.feedback_state == 255: + yield Button("重新学习此单元", id="re-recognize", variant="warning") + yield Footer() + + def on_mount(self): + pass + + def on_button_pressed(self, event): + try: + if event.button.id == "re-recognize": + return + except: + pass + ret = self.compo.handler(event, "button") + self._forward_judge(ret) + def _forward_judge(self, ret): + self.feedback_state = 0 + if ret == -1: + return + if ret == 0: + try: + self.compo = next(self.reactor.current_appar) + self.refresh_ui() + except StopIteration: + nxt = self.reactor.forward(1) + try: + self.compo = next(self.reactor.current_appar) + except: + pass + if nxt == -1: + if self.reactor.round_set == 0: + if self.stage == 4: + if config.get("save"): + self.reactor.save() + self.compo = compo.Finished( + self, None, pt.Atom.placeholder() + ) + self.refresh_ui() + else: + self.reactor.set_round_templated(self.stage) + self.reactor.forward(1) + self.stage += 1 + self.compo = next(self.reactor.current_appar) + self.refresh_ui() + return + return + else: + self.refresh_ui() + return + if ret >= 1: + if ret == 2: + self.feedback_state = 255 # 表示错误 + else: + self.feedback_state = 0 + self.refresh_ui() + return + + def refresh_ui(self): + self.call_later(self.recompose) + print(type(self.compo).__name__) + + def action_play_voice(self): + def play(): + cache_dir = pathlib.Path(f"./cache/voice/") + cache_dir.mkdir(parents=True, exist_ok=True) + cache = cache_dir / f"{aux.get_md5(self.reactor.current_atom[1].content.replace('/',''))}.wav" + if not cache.exists(): + import edge_tts as tts + communicate = tts.Communicate( + self.reactor.current_atom[1].content.replace("/", ""), + "zh-CN-XiaoxiaoNeural", + ) + communicate.save_sync( + f"./cache/voice/{aux.get_md5(self.reactor.current_atom[1].content.replace('/',''))}.wav" + ) + playsound(str(cache)) + + threading.Thread(target=play).start() + + def action_precache_current(self): + """预缓存当前单元集的音频""" + precache_screen = PrecachingScreen(self.nucleon_file) + self.app.push_screen(precache_screen) + + def action_quick_pass(self): + self.reactor.report(self.reactor.current_atom, 5) + self._forward_judge(0) + def action_toggle_dark(self): + self.app.action_toggle_dark() + + def action_pop_screen(self): + self.app.pop_screen() diff --git a/src/heurams/interface/screens/nucleon_editor.py b/src/heurams/interface/screens/nucleon_editor.py new file mode 100644 index 0000000..e69de29 diff --git a/src/heurams/interface/screens/precache.py b/src/heurams/interface/screens/precache.py new file mode 100644 index 0000000..2005180 --- /dev/null +++ b/src/heurams/interface/screens/precache.py @@ -0,0 +1,185 @@ + +class PrecachingScreen(Screen): + """预缓存音频文件屏幕""" + BINDINGS = [("q", "go_back", "返回"), ("escape", "quit_app", "退出")] + + def __init__(self, nucleon_file = None): + super().__init__(name=None, id=None, classes=None) + self.nucleon_file = nucleon_file + self.is_precaching = False + self.current_file = "" + self.current_item = "" + self.progress = 0 + self.total = 0 + self.processed = 0 + self.precache_worker = None + + def compose(self) -> ComposeResult: + yield Header(show_clock=True) + with Container(id="precache_container"): + yield Label("[b]音频预缓存[/b]", classes="title-label") + + if self.nucleon_file: + yield Static(f"目标单元集: [b]{self.nucleon_file.name}[/b]", classes="target-info") + yield Static(f"单元数量: {len(self.nucleon_file.nucleons)}", classes="target-info") + else: + yield Static("目标: 所有单元集", classes="target-info") + + yield Static(id="status", classes="status-info") + yield Static(id="current_item", classes="current-item") + yield ProgressBar(total=100, show_eta=False, id="progress_bar") + + with Horizontal(classes="button-group"): + if not self.is_precaching: + yield Button("开始预缓存", id="start_precache", variant="primary") + else: + yield Button("取消预缓存", id="cancel_precache", variant="error") + yield Button("清空缓存", id="clear_cache", variant="warning") + yield Button("返回", id="go_back", variant="default") + yield Footer() + + def on_mount(self): + """挂载时初始化状态""" + self.update_status("就绪", "等待开始...") + + def update_status(self, status, current_item="", progress=None): + """更新状态显示""" + status_widget = self.query_one("#status", Static) + item_widget = self.query_one("#current_item", Static) + progress_bar = self.query_one("#progress_bar", ProgressBar) + + status_widget.update(f"状态: {status}") + item_widget.update(f"当前项目: {current_item}" if current_item else "") + + if progress is not None: + progress_bar.progress = progress + progress_bar.advance(0) # 刷新显示 + + def precache_single_text(self, text: str): + """预缓存单个文本的音频""" + cache_dir = pathlib.Path("./cache/voice/") + cache_dir.mkdir(parents=True, exist_ok=True) + cache = cache_dir / f"{aux.get_md5(text)}.wav" + if not cache.exists(): + try: + import edge_tts as tts + communicate = tts.Communicate(text, "zh-CN-XiaoxiaoNeural") + communicate.save_sync(str(cache)) + return True + except Exception as e: + print(f"预缓存失败 '{text}': {e}") + return False + return True + + def precache_file(self, nucleon_union: pt.NucleonUnion): + """预缓存单个文件的所有内容""" + self.current_file = nucleon_union.name + total_items = len(nucleon_union.nucleons) + + for idx, nucleon in enumerate(nucleon_union.nucleons): + # 检查是否被取消 + worker = get_current_worker() + if worker and worker.is_cancelled: + return False + + text = nucleon['content'].replace('/', '') + self.current_item = text[:50] + "..." if len(text) > 50 else text + self.processed += 1 + + # 更新进度 + progress = int((self.processed / self.total) * 100) if self.total > 0 else 0 + self.update_status( + f"处理中: {nucleon_union.name} ({idx+1}/{total_items})", + self.current_item, + progress + ) + + # 预缓存音频 + success = self.precache_single_text(text) + if not success: + self.update_status("错误", f"处理失败: {self.current_item}") + time.sleep(1) # 短暂暂停以便用户看到错误信息 + + return True + + def precache_all_files(self): + """预缓存所有文件""" + nucleon_path = pathlib.Path("./nucleon") + nucleon_files = [f for f in nucleon_path.iterdir() if f.suffix == ".toml"] + + # 计算总项目数 + self.total = 0 + for file in nucleon_files: + try: + nu = pt.NucleonUnion(file) + self.total += len(nu.nucleons) + except: + continue + + self.processed = 0 + self.is_precaching = True + + for file in nucleon_files: + try: + nu = pt.NucleonUnion(file) + if not self.precache_file(nu): + break # 用户取消 + except Exception as e: + print(f"处理文件失败 {file}: {e}") + continue + + self.is_precaching = False + self.update_status("完成", "所有音频文件已预缓存", 100) + + def precache_single_file(self): + """预缓存单个文件""" + if not self.nucleon_file: + return + + self.total = len(self.nucleon_file.nucleons) + self.processed = 0 + self.is_precaching = True + + success = self.precache_file(self.nucleon_file) + + self.is_precaching = False + if success: + self.update_status("完成", f"'{self.nucleon_file.name}' 音频文件已预缓存", 100) + else: + self.update_status("已取消", "预缓存操作被用户取消", 0) + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "start_precache" and not self.is_precaching: + # 开始预缓存 + if self.nucleon_file: + self.precache_worker = self.run_worker(self.precache_single_file, thread=True) + else: + self.precache_worker = self.run_worker(self.precache_all_files, thread=True) + + elif event.button.id == "cancel_precache" and self.is_precaching: + # 取消预缓存 + if self.precache_worker: + self.precache_worker.cancel() + self.is_precaching = False + self.update_status("已取消", "预缓存操作被用户取消", 0) + + elif event.button.id == "clear_cache": + # 清空缓存 + try: + shutil.rmtree("./cache/voice", ignore_errors=True) + self.update_status("已清空", "音频缓存已清空", 0) + except Exception as e: + self.update_status("错误", f"清空缓存失败: {e}") + + elif event.button.id == "go_back": + self.action_go_back() + + def action_go_back(self): + if self.is_precaching and self.precache_worker: + self.precache_worker.cancel() + self.app.pop_screen() + + def action_quit_app(self): + if self.is_precaching and self.precache_worker: + self.precache_worker.cancel() + self.app.exit() diff --git a/src/heurams/interface/screens/preparation.py b/src/heurams/interface/screens/preparation.py new file mode 100644 index 0000000..e69de29 diff --git a/src/heurams/interface/screens/register.py b/src/heurams/interface/screens/register.py new file mode 100644 index 0000000..e69de29 diff --git a/src/heurams/services/audio_service.py b/src/heurams/services/audio_service.py new file mode 100644 index 0000000..e69de29 diff --git a/src/heurams/services/cache.py b/src/heurams/services/cache.py new file mode 100644 index 0000000..e69de29 diff --git a/src/heurams/services/config.py b/src/heurams/services/config.py new file mode 100644 index 0000000..e69de29 diff --git a/src/heurams/services/hash.py b/src/heurams/services/hash.py new file mode 100644 index 0000000..e69de29 diff --git a/src/heurams/services/timer.py b/src/heurams/services/timer.py new file mode 100644 index 0000000..e69de29 diff --git a/src/heurams/services/tts_service.py b/src/heurams/services/tts_service.py new file mode 100644 index 0000000..e69de29 diff --git a/src/heurams/services/version.py b/src/heurams/services/version.py new file mode 100644 index 0000000..e69de29