diff --git a/src/heurams/interface/screens/memorizor.py b/src/heurams/interface/screens/memorizor.py index e69574a..8fa49ff 100644 --- a/src/heurams/interface/screens/memorizor.py +++ b/src/heurams/interface/screens/memorizor.py @@ -41,7 +41,7 @@ class MemScreen(Screen): print(puzzle_info) return shim.puzzle2widget[puzzle_info["puzzle"]](atom = self.procession.current_atom, alia = puzzle_info["alia"]) except (KeyError, StopIteration, AttributeError) as e: - print(f"Fission error: {e}") + print(f"调度展开出错: {e}") return Static("无法生成谜题") #print(shim.puzzle2widget[puzzle_info["puzzle"]]) @@ -50,7 +50,7 @@ class MemScreen(Screen): with Center(): yield Static(f"当前进度: {self.procession.process()}/{self.procession.total_length()}") self.mount(self.current_widget()) # type: ignore - #yield Button("重新学习此单元", id="re-recognize", variant="warning") + yield Button("重新学习此单元", id="re-recognize", variant="warning") yield Footer() def on_mount(self): @@ -63,12 +63,6 @@ class MemScreen(Screen): """朗读当前内容""" pass - def action_precache_current(self): - """预缓存当前单元集的音频""" - #from .precache import PrecachingScreen - #precache_screen = PrecachingScreen(self.nucleon_file) - #self.app.push_screen(precache_screen) - def action_toggle_dark(self): self.app.action_toggle_dark() diff --git a/src/heurams/interface/screens/precache.py b/src/heurams/interface/screens/precache.py index 3b5ec97..5dabe6d 100644 --- a/src/heurams/interface/screens/precache.py +++ b/src/heurams/interface/screens/precache.py @@ -19,14 +19,22 @@ import pathlib import heurams.kernel.particles as pt import heurams.services.hasher as hasher from heurams.context import * +from textual.worker import Worker, get_current_worker class PrecachingScreen(Screen): - """预缓存音频文件屏幕""" + """预缓存音频文件屏幕 + + 缓存记忆单元音频文件, 全部(默认) 或部分记忆单元(可选参数传入) + + Args: + nucleons (list): 可选列表, 仅包含 Nucleon 对象 + desc (list): 可选字符串, 包含对此次调用的文字描述 + """ BINDINGS = [("q", "go_back", "返回")] - def __init__(self, nucleon_file = None): + def __init__(self, nucleons: list = [], desc: str = ""): super().__init__(name=None, id=None, classes=None) - self.nucleon_file = nucleon_file + self.nucleons = nucleons self.is_precaching = False self.current_file = "" self.current_item = "" @@ -34,17 +42,21 @@ class PrecachingScreen(Screen): self.total = 0 self.processed = 0 self.precache_worker = None + self.desc = desc + for i in nucleons: + i: pt.Nucleon + i.do_eval() def compose(self) -> ComposeResult: yield Header(show_clock=True) with Container(id="precache_container"): yield Label("[b]音频预缓存[/b]", classes="title-label") - if self.nucleon_file: - yield Static(f"目标单元集: [b]{self.nucleon_file.name}[/b]", classes="target-info") - yield Static(f"单元数量: {len(self.nucleon_file.nucleons)}", classes="target-info") + if self.nucleons: + yield Static(f"目标单元归属: [b]{self.desc}[/b]", classes="target-info") + yield Static(f"单元数量: {len(self.nucleons)}", classes="target-info") else: - yield Static("目标: 所有单元集", classes="target-info") + yield Static("目标: 所有单元", classes="target-info") yield Static(id="status", classes="status-info") yield Static(id="current_item", classes="current-item") @@ -76,65 +88,71 @@ class PrecachingScreen(Screen): progress_bar.progress = progress progress_bar.advance(0) # 刷新显示 - def precache_single_text(self, text: str): - """预缓存单个文本的音频""" - cache_dir = pathlib.Path("./cache/voice/") + def precache_by_text(self, text: str): + """预缓存单段文本的音频""" + from heurams.context import rootdir, workdir, config_var + cache_dir = pathlib.Path(config_var.get()["paths"]["cache_dir"]) cache_dir.mkdir(parents=True, exist_ok=True) - cache = cache_dir / f"{hasher.get_md5(text)}.wav" - if not cache.exists(): - try: + cache_file = cache_dir / f"{hasher.get_md5(text)}.wav" + if not cache_file.exists(): + try: # TODO: 调用模块消除tts耦合 import edge_tts as tts communicate = tts.Communicate(text, "zh-CN-XiaoxiaoNeural") - communicate.save_sync(str(cache)) - return True + communicate.save_sync(str(cache_file)) + return 1 except Exception as e: print(f"预缓存失败 '{text}': {e}") - return False - return True - - def precache_file(self, path: pathlib.Path): - """预缓存单个文件的所有内容""" - self.current_file = path.name - total_items = len(pt.load_nucleon(path)) - - for idx, nucleon in enumerate(nucleon_union.nucleons): - # 检查是否被取消 + return 0 + return 1 + + def precache_by_nucleon(self, nucleon: pt.Nucleon): + """依据 Nucleon 缓存""" + return self.precache_by_text(nucleon.metadata['formation']['tts_text']) + #print(f"TTS 缓存: {nucleon.metadata['formation']['tts_text']}") + + def precache_by_list(self, nucleons: list): + """依据 Nucleons 列表缓存""" + for idx, nucleon in enumerate(nucleons): worker = get_current_worker() - if worker and worker.is_cancelled: + if worker and worker.is_cancelled: # 函数在worker中执行且已被取消 return False - - text = nucleon['content'].replace('/', '') - self.current_item = text[:50] + "..." if len(text) > 50 else text + text = nucleon.metadata['formation']['tts_text'] + self.current_item = text[:30] + "..." if len(text) > 50 else text self.processed += 1 - - # 更新进度 progress = int((self.processed / self.total) * 100) if self.total > 0 else 0 self.update_status( - f"处理中: {nucleon_union.name} ({idx+1}/{total_items})", - self.current_item, - progress + f"正处理 ({idx + 1}/{len(nucleons)})" ) - - # 预缓存音频 - success = self.precache_single_text(text) - if not success: - self.update_status("错误", f"处理失败: {self.current_item}") - time.sleep(1) # 短暂暂停以便用户看到错误信息 - - return True + ret = self.precache_by_nucleon(nucleon) + if not ret: + self.update_status( + "出错", + f"处理失败, 跳过: {self.current_item}", + ) + import time + time.sleep(1) + return True + + def precache_by_nucleons(self): + return self.precache_by_list(self.nucleons) + + def precache_by_filepath(self, path: pathlib.Path): + """预缓存单个文件的所有内容""" + return self.precache_by_list(pt.load_nucleon(path)[0]) def precache_all_files(self): """预缓存所有文件""" - nucleon_path = pathlib.Path("./nucleon") - nucleon_files = [f for f in nucleon_path.iterdir() if f.suffix == ".toml"] + from heurams.context import rootdir, workdir, config_var + nucleon_path = pathlib.Path(config_var.get()["paths"]["nucleon_dir"]) + nucleon_files = [f for f in nucleon_path.iterdir() if f.suffix == ".toml"] # TODO: 解耦合 # 计算总项目数 self.total = 0 nu = list() for file in nucleon_files: try: - nu += pt.load_nucleon(file) + nu += pt.load_nucleon(file)[0] self.total = len(nu) except: continue @@ -144,38 +162,22 @@ class PrecachingScreen(Screen): for file in nucleon_files: try: - nu += pt.load_nucleon(file) - if not self.precache_file(nu): + nu += pt.load_nucleon(file)[0] + if not self.precache_by_list(nu): break # 用户取消 except Exception as e: print(f"处理文件失败 {file}: {e}") continue self.is_precaching = False - self.update_status("完成", "所有音频文件已预缓存", 100) - - def precache_single_file(self): - """预缓存单个文件""" - if not self.nucleon_file: - return - - self.total = len(self.nucleon_file.nucleons) - self.processed = 0 - self.is_precaching = True - - success = self.precache_file(self.nucleon_file) - - self.is_precaching = False - if success: - self.update_status("完成", f"'{self.nucleon_file.name}' 音频文件已预缓存", 100) - else: - self.update_status("已取消", "预缓存操作被用户取消", 0) + self.update_status("完成", "所有单元的音频已被预缓存", 100) def on_button_pressed(self, event: Button.Pressed) -> None: + event.stop() if event.button.id == "start_precache" and not self.is_precaching: # 开始预缓存 - if self.nucleon_file: - self.precache_worker = self.run_worker(self.precache_single_file, thread=True) + if self.nucleons: + self.precache_worker = self.run_worker(self.precache_by_nucleons, thread=True) else: self.precache_worker = self.run_worker(self.precache_all_files, thread=True) @@ -189,7 +191,9 @@ class PrecachingScreen(Screen): elif event.button.id == "clear_cache": # 清空缓存 try: - shutil.rmtree("./cache/voice", ignore_errors=True) + import shutil + from heurams.context import rootdir, workdir, config_var + shutil.rmtree(f"{config_var.get()["paths"]["cache_dir"]}", ignore_errors=True) self.update_status("已清空", "音频缓存已清空", 0) except Exception as e: self.update_status("错误", f"清空缓存失败: {e}") diff --git a/src/heurams/interface/screens/preparation.py b/src/heurams/interface/screens/preparation.py index d336ace..6775216 100644 --- a/src/heurams/interface/screens/preparation.py +++ b/src/heurams/interface/screens/preparation.py @@ -18,7 +18,6 @@ from heurams.context import * class PreparationScreen(Screen): BINDINGS = [ ("q", "go_back", "返回"), - ("escape", "quit_app", "退出"), ("p", "precache", "预缓存音频") ] @@ -69,7 +68,10 @@ class PreparationScreen(Screen): def action_precache(self): from ..screens.precache import PrecachingScreen - precache_screen = PrecachingScreen(self.nucleon_file) + lst = list() + for i in self.nucleons_with_orbital: + lst.append(i[0]) + precache_screen = PrecachingScreen(lst) self.app.push_screen(precache_screen) def action_quit_app(self): diff --git a/src/heurams/kernel/particles/nucleon.py b/src/heurams/kernel/particles/nucleon.py index 1589df2..ce5b18e 100644 --- a/src/heurams/kernel/particles/nucleon.py +++ b/src/heurams/kernel/particles/nucleon.py @@ -29,7 +29,40 @@ class Nucleon: def __hash__(self): return hash(self.ident) - + + def do_eval(self): + """ + 执行并以结果替换当前单元的所有 eval 语句 + TODO: 带有限制的 eval, 异步/多线程执行避免堵塞 + """ + # eval 环境设置 + def eval_with_env(s: str): + try: + nucleon = self + ret = str(eval(s)) + except Exception as e: + ret = f"此 eval 实例发生错误: {e}" + return ret + + def traverse(data, modifier): + if isinstance(data, dict): + for key, value in data.items(): + data[key] = traverse(value, modifier) + return data + elif isinstance(data, list): + for i, item in enumerate(data): + data[i] = traverse(item, modifier) + return data + elif isinstance(data, tuple): + return tuple(traverse(item, modifier) for item in data) + else: + if isinstance(data, str): + if data.startswith("eval:"): + return modifier(data[5:]) + return data + + traverse(self.payload, eval_with_env) + traverse(self.metadata, eval_with_env) @staticmethod def placeholder(): """生成一个占位原子核"""