diff --git a/precache.py b/precache.py index f83f822..0145793 100644 --- a/precache.py +++ b/precache.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -# 音频预缓存实用程序, 独立于主程序之外, 但依赖其他组件 +# 音频预缓存实用程序(旧版), 独立于主程序之外, 但依赖其他组件 import particles as pt import auxiliary as aux import edge_tts as tts @@ -45,7 +45,7 @@ def walk(path_str: str): walk(path_str) if __name__ == "__main__": - print("音频预缓存实用程序") + print("音频预缓存实用程序(旧版)") print("A: 全部缓存") print("C: 清空缓存") diff --git a/screens.py b/screens.py index 8621dbb..9bf9c17 100644 --- a/screens.py +++ b/screens.py @@ -16,6 +16,7 @@ from textual.widgets import ( ) from textual.containers import Container, Horizontal, Center from textual.screen import Screen +from textual.worker import Worker, get_current_worker import pathlib import threading import edge_tts as tts @@ -26,14 +27,202 @@ import auxiliary as aux import compositions as compo import builtins import metadata +import time +import shutil config = aux.ConfigFile("config.toml") +class PrecachingScreen(Screen): + """预缓存音频文件屏幕""" + BINDINGS = [("q", "go_back", "返回"), ("escape", "quit_app", "退出")] + + def __init__(self, nucleon_file = None): + super().__init__(name=None, id=None, classes=None) + self.nucleon_file = nucleon_file + self.is_precaching = False + self.current_file = "" + self.current_item = "" + self.progress = 0 + self.total = 0 + self.processed = 0 + self.precache_worker = None + + def compose(self) -> ComposeResult: + yield Header(show_clock=True) + with Container(id="precache_container"): + yield Label("[b]音频预缓存[/b]", classes="title-label") + + if self.nucleon_file: + yield Static(f"目标单元集: [b]{self.nucleon_file.name}[/b]", classes="target-info") + yield Static(f"单元数量: {len(self.nucleon_file.nucleons)}", classes="target-info") + else: + yield Static("目标: 所有单元集", classes="target-info") + + yield Static(id="status", classes="status-info") + yield Static(id="current_item", classes="current-item") + yield ProgressBar(total=100, show_eta=False, id="progress_bar") + + with Horizontal(classes="button-group"): + if not self.is_precaching: + yield Button("开始预缓存", id="start_precache", variant="primary") + else: + yield Button("取消预缓存", id="cancel_precache", variant="error") + yield Button("清空缓存", id="clear_cache", variant="warning") + yield Button("返回", id="go_back", variant="default") + yield Footer() + + def on_mount(self): + """挂载时初始化状态""" + self.update_status("就绪", "等待开始...") + + def update_status(self, status, current_item="", progress=None): + """更新状态显示""" + status_widget = self.query_one("#status", Static) + item_widget = self.query_one("#current_item", Static) + progress_bar = self.query_one("#progress_bar", ProgressBar) + + status_widget.update(f"状态: {status}") + item_widget.update(f"当前项目: {current_item}" if current_item else "") + + if progress is not None: + progress_bar.progress = progress + progress_bar.advance(0) # 刷新显示 + + def precache_single_text(self, text: str): + """预缓存单个文本的音频""" + cache_dir = pathlib.Path("./cache/voice/") + cache_dir.mkdir(parents=True, exist_ok=True) + cache = cache_dir / f"{aux.get_md5(text)}.wav" + if not cache.exists(): + try: + communicate = tts.Communicate(text, "zh-CN-XiaoxiaoNeural") + communicate.save_sync(str(cache)) + return True + except Exception as e: + print(f"预缓存失败 '{text}': {e}") + return False + return True + + def precache_file(self, nucleon_union: pt.NucleonUnion): + """预缓存单个文件的所有内容""" + self.current_file = nucleon_union.name + total_items = len(nucleon_union.nucleons) + + for idx, nucleon in enumerate(nucleon_union.nucleons): + # 检查是否被取消 + worker = get_current_worker() + if worker and worker.is_cancelled: + return False + + text = nucleon['content'].replace('/', '') + self.current_item = text[:50] + "..." if len(text) > 50 else text + self.processed += 1 + + # 更新进度 + progress = int((self.processed / self.total) * 100) if self.total > 0 else 0 + self.update_status( + f"处理中: {nucleon_union.name} ({idx+1}/{total_items})", + self.current_item, + progress + ) + + # 预缓存音频 + success = self.precache_single_text(text) + if not success: + self.update_status("错误", f"处理失败: {self.current_item}") + time.sleep(1) # 短暂暂停以便用户看到错误信息 + + return True + + def precache_all_files(self): + """预缓存所有文件""" + nucleon_path = pathlib.Path("./nucleon") + nucleon_files = [f for f in nucleon_path.iterdir() if f.suffix == ".toml"] + + # 计算总项目数 + self.total = 0 + for file in nucleon_files: + try: + nu = pt.NucleonUnion(file) + self.total += len(nu.nucleons) + except: + continue + + self.processed = 0 + self.is_precaching = True + + for file in nucleon_files: + try: + nu = pt.NucleonUnion(file) + if not self.precache_file(nu): + break # 用户取消 + except Exception as e: + print(f"处理文件失败 {file}: {e}") + continue + + self.is_precaching = False + self.update_status("完成", "所有音频文件已预缓存", 100) + + def precache_single_file(self): + """预缓存单个文件""" + if not self.nucleon_file: + return + + self.total = len(self.nucleon_file.nucleons) + self.processed = 0 + self.is_precaching = True + + success = self.precache_file(self.nucleon_file) + + self.is_precaching = False + if success: + self.update_status("完成", f"'{self.nucleon_file.name}' 音频文件已预缓存", 100) + else: + self.update_status("已取消", "预缓存操作被用户取消", 0) + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "start_precache" and not self.is_precaching: + # 开始预缓存 + if self.nucleon_file: + self.precache_worker = self.run_worker(self.precache_single_file, thread=True) + else: + self.precache_worker = self.run_worker(self.precache_all_files, thread=True) + + elif event.button.id == "cancel_precache" and self.is_precaching: + # 取消预缓存 + if self.precache_worker: + self.precache_worker.cancel() + self.is_precaching = False + self.update_status("已取消", "预缓存操作被用户取消", 0) + + elif event.button.id == "clear_cache": + # 清空缓存 + try: + shutil.rmtree("./cache/voice", ignore_errors=True) + self.update_status("已清空", "音频缓存已清空", 0) + except Exception as e: + self.update_status("错误", f"清空缓存失败: {e}") + + elif event.button.id == "go_back": + self.action_go_back() + + def action_go_back(self): + if self.is_precaching and self.precache_worker: + self.precache_worker.cancel() + self.app.pop_screen() + + def action_quit_app(self): + if self.is_precaching and self.precache_worker: + self.precache_worker.cancel() + self.app.exit() + + class MemScreen(Screen): BINDINGS = [ ("d", "toggle_dark", "改变色调"), ("q", "pop_screen", "返回主菜单"), ("v", "play_voice", "朗读"), + # ("p", "precache_current", "预缓存当前单元集"), # 新增预缓存快捷键 ] if config.get("quick_pass"): BINDINGS.append(("k", "quick_pass", "快速通过[调试]")) @@ -46,6 +235,8 @@ class MemScreen(Screen): tasked_num, ): super().__init__(name=None, id=None, classes=None) + self.nucleon_file = nucleon_file + self.electron_file = electron_file self.reactor = Reactor(nucleon_file, electron_file, self, tasked_num) self.stage = 1 self.stage += self.reactor.set_round_templated(self.stage) @@ -151,6 +342,11 @@ class MemScreen(Screen): threading.Thread(target=play).start() + def action_precache_current(self): + """预缓存当前单元集的音频""" + precache_screen = PrecachingScreen(self.nucleon_file) + self.app.push_screen(precache_screen) + def action_quick_pass(self): self.reactor.report(self.reactor.current_atom, 5) self._forward_judge(0) @@ -161,7 +357,11 @@ class MemScreen(Screen): self.app.pop_screen() class PreparationScreen(Screen): - BINDINGS = [("q", "go_back", "返回"), ("escape", "quit_app", "退出")] + BINDINGS = [ + ("q", "go_back", "返回"), + ("escape", "quit_app", "退出"), + ("p", "precache", "预缓存音频"), # 新增预缓存快捷键 + ] def __init__( self, nucleon_file: pt.NucleonUnion, electron_file: pt.ElectronUnion @@ -177,12 +377,20 @@ class PreparationScreen(Screen): yield Label(f"内容源文件对象: ./nucleon/[b]{self.nucleon_file.name}[/b].toml") yield Label(f"元数据文件对象: ./electron/[b]{self.electron_file.name}[/b].toml") yield Label(f"\n单元数量:{len(self.nucleon_file)}\n") + yield Button( - "开始记忆", - id="start_memorizing_button", - variant="primary", - classes="start-button", + "开始记忆", + id="start_memorizing_button", + variant="primary", + classes="start-button", ) + yield Button( + "预缓存音频", + id="precache_button", + variant="success", + classes="precache-button", + ) + yield Static(f"\n单元预览:\n") yield Markdown(self._get_full_content().replace("/", ""), classes="full") yield Footer() @@ -196,6 +404,11 @@ class PreparationScreen(Screen): def action_go_back(self): self.app.pop_screen() + def action_precache(self): + """预缓存当前单元集的音频""" + precache_screen = PrecachingScreen(self.nucleon_file) + self.app.push_screen(precache_screen) + def action_quit_app(self): self.app.exit() @@ -205,6 +418,8 @@ class PreparationScreen(Screen): self.nucleon_file, self.electron_file, config.get("tasked_number", 6) ) self.app.push_screen(newscr) + elif event.button.id == "precache_button": + self.action_precache() class NewNucleonScreen(Screen): BINDINGS = [("q", "go_back", "返回"), ("escape", "quit_app", "退出")] @@ -249,6 +464,8 @@ class NewNucleonScreen(Screen): pass class DashboardScreen(Screen): + #BINDINGS = [("p", "precache_all", "预缓存所有音频")] # 新增全局预缓存快捷键 + def compose(self) -> ComposeResult: yield Header(show_clock=True) yield Container( @@ -258,6 +475,7 @@ class DashboardScreen(Screen): Label("选择待学习或待修改的记忆单元集:", classes="title-label"), ListView(id="file-list", classes="file-list-view"), #Button("新建空的单元集", id="new_nucleon_button"), + Button("预缓存所有音频", id="precache_all_button", variant="success"), Label(f"\"潜进\" 开放源代码软件项目 | 版本 {metadata.ver} {metadata.stage.capitalize()} | Wang Zhiyu 2025"), ) yield Footer() @@ -322,7 +540,13 @@ class DashboardScreen(Screen): if event.button.id == "new_nucleon_button": newscr = NewNucleonScreen() self.app.push_screen(newscr) + elif event.button.id == "precache_all_button": + self.action_precache_all() + + def action_precache_all(self): + """预缓存所有单元集的音频""" + precache_screen = PrecachingScreen() + self.app.push_screen(precache_screen) def action_quit_app(self) -> None: - self.app.exit() - + self.app.exit() \ No newline at end of file