重构预缓存实用程序, 并保留分离式旧版本

This commit is contained in:
2025-10-02 14:55:24 +08:00
parent e96832a60c
commit 156f558a45
2 changed files with 233 additions and 9 deletions

View File

@@ -1,5 +1,5 @@
#!/usr/bin/env python3
# 音频预缓存实用程序, 独立于主程序之外, 但依赖其他组件
# 音频预缓存实用程序(旧版), 独立于主程序之外, 但依赖其他组件
import particles as pt
import auxiliary as aux
import edge_tts as tts
@@ -45,7 +45,7 @@ def walk(path_str: str):
walk(path_str)
if __name__ == "__main__":
print("音频预缓存实用程序")
print("音频预缓存实用程序(旧版)")
print("A: 全部缓存")
print("C: 清空缓存")

View File

@@ -16,6 +16,7 @@ from textual.widgets import (
)
from textual.containers import Container, Horizontal, Center
from textual.screen import Screen
from textual.worker import Worker, get_current_worker
import pathlib
import threading
import edge_tts as tts
@@ -26,14 +27,202 @@ import auxiliary as aux
import compositions as compo
import builtins
import metadata
import time
import shutil
config = aux.ConfigFile("config.toml")
class PrecachingScreen(Screen):
"""预缓存音频文件屏幕"""
BINDINGS = [("q", "go_back", "返回"), ("escape", "quit_app", "退出")]
def __init__(self, nucleon_file = None):
super().__init__(name=None, id=None, classes=None)
self.nucleon_file = nucleon_file
self.is_precaching = False
self.current_file = ""
self.current_item = ""
self.progress = 0
self.total = 0
self.processed = 0
self.precache_worker = None
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Container(id="precache_container"):
yield Label("[b]音频预缓存[/b]", classes="title-label")
if self.nucleon_file:
yield Static(f"目标单元集: [b]{self.nucleon_file.name}[/b]", classes="target-info")
yield Static(f"单元数量: {len(self.nucleon_file.nucleons)}", classes="target-info")
else:
yield Static("目标: 所有单元集", classes="target-info")
yield Static(id="status", classes="status-info")
yield Static(id="current_item", classes="current-item")
yield ProgressBar(total=100, show_eta=False, id="progress_bar")
with Horizontal(classes="button-group"):
if not self.is_precaching:
yield Button("开始预缓存", id="start_precache", variant="primary")
else:
yield Button("取消预缓存", id="cancel_precache", variant="error")
yield Button("清空缓存", id="clear_cache", variant="warning")
yield Button("返回", id="go_back", variant="default")
yield Footer()
def on_mount(self):
"""挂载时初始化状态"""
self.update_status("就绪", "等待开始...")
def update_status(self, status, current_item="", progress=None):
"""更新状态显示"""
status_widget = self.query_one("#status", Static)
item_widget = self.query_one("#current_item", Static)
progress_bar = self.query_one("#progress_bar", ProgressBar)
status_widget.update(f"状态: {status}")
item_widget.update(f"当前项目: {current_item}" if current_item else "")
if progress is not None:
progress_bar.progress = progress
progress_bar.advance(0) # 刷新显示
def precache_single_text(self, text: str):
"""预缓存单个文本的音频"""
cache_dir = pathlib.Path("./cache/voice/")
cache_dir.mkdir(parents=True, exist_ok=True)
cache = cache_dir / f"{aux.get_md5(text)}.wav"
if not cache.exists():
try:
communicate = tts.Communicate(text, "zh-CN-XiaoxiaoNeural")
communicate.save_sync(str(cache))
return True
except Exception as e:
print(f"预缓存失败 '{text}': {e}")
return False
return True
def precache_file(self, nucleon_union: pt.NucleonUnion):
"""预缓存单个文件的所有内容"""
self.current_file = nucleon_union.name
total_items = len(nucleon_union.nucleons)
for idx, nucleon in enumerate(nucleon_union.nucleons):
# 检查是否被取消
worker = get_current_worker()
if worker and worker.is_cancelled:
return False
text = nucleon['content'].replace('/', '')
self.current_item = text[:50] + "..." if len(text) > 50 else text
self.processed += 1
# 更新进度
progress = int((self.processed / self.total) * 100) if self.total > 0 else 0
self.update_status(
f"处理中: {nucleon_union.name} ({idx+1}/{total_items})",
self.current_item,
progress
)
# 预缓存音频
success = self.precache_single_text(text)
if not success:
self.update_status("错误", f"处理失败: {self.current_item}")
time.sleep(1) # 短暂暂停以便用户看到错误信息
return True
def precache_all_files(self):
"""预缓存所有文件"""
nucleon_path = pathlib.Path("./nucleon")
nucleon_files = [f for f in nucleon_path.iterdir() if f.suffix == ".toml"]
# 计算总项目数
self.total = 0
for file in nucleon_files:
try:
nu = pt.NucleonUnion(file)
self.total += len(nu.nucleons)
except:
continue
self.processed = 0
self.is_precaching = True
for file in nucleon_files:
try:
nu = pt.NucleonUnion(file)
if not self.precache_file(nu):
break # 用户取消
except Exception as e:
print(f"处理文件失败 {file}: {e}")
continue
self.is_precaching = False
self.update_status("完成", "所有音频文件已预缓存", 100)
def precache_single_file(self):
"""预缓存单个文件"""
if not self.nucleon_file:
return
self.total = len(self.nucleon_file.nucleons)
self.processed = 0
self.is_precaching = True
success = self.precache_file(self.nucleon_file)
self.is_precaching = False
if success:
self.update_status("完成", f"'{self.nucleon_file.name}' 音频文件已预缓存", 100)
else:
self.update_status("已取消", "预缓存操作被用户取消", 0)
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "start_precache" and not self.is_precaching:
# 开始预缓存
if self.nucleon_file:
self.precache_worker = self.run_worker(self.precache_single_file, thread=True)
else:
self.precache_worker = self.run_worker(self.precache_all_files, thread=True)
elif event.button.id == "cancel_precache" and self.is_precaching:
# 取消预缓存
if self.precache_worker:
self.precache_worker.cancel()
self.is_precaching = False
self.update_status("已取消", "预缓存操作被用户取消", 0)
elif event.button.id == "clear_cache":
# 清空缓存
try:
shutil.rmtree("./cache/voice", ignore_errors=True)
self.update_status("已清空", "音频缓存已清空", 0)
except Exception as e:
self.update_status("错误", f"清空缓存失败: {e}")
elif event.button.id == "go_back":
self.action_go_back()
def action_go_back(self):
if self.is_precaching and self.precache_worker:
self.precache_worker.cancel()
self.app.pop_screen()
def action_quit_app(self):
if self.is_precaching and self.precache_worker:
self.precache_worker.cancel()
self.app.exit()
class MemScreen(Screen):
BINDINGS = [
("d", "toggle_dark", "改变色调"),
("q", "pop_screen", "返回主菜单"),
("v", "play_voice", "朗读"),
# ("p", "precache_current", "预缓存当前单元集"), # 新增预缓存快捷键
]
if config.get("quick_pass"):
BINDINGS.append(("k", "quick_pass", "快速通过[调试]"))
@@ -46,6 +235,8 @@ class MemScreen(Screen):
tasked_num,
):
super().__init__(name=None, id=None, classes=None)
self.nucleon_file = nucleon_file
self.electron_file = electron_file
self.reactor = Reactor(nucleon_file, electron_file, self, tasked_num)
self.stage = 1
self.stage += self.reactor.set_round_templated(self.stage)
@@ -151,6 +342,11 @@ class MemScreen(Screen):
threading.Thread(target=play).start()
def action_precache_current(self):
"""预缓存当前单元集的音频"""
precache_screen = PrecachingScreen(self.nucleon_file)
self.app.push_screen(precache_screen)
def action_quick_pass(self):
self.reactor.report(self.reactor.current_atom, 5)
self._forward_judge(0)
@@ -161,7 +357,11 @@ class MemScreen(Screen):
self.app.pop_screen()
class PreparationScreen(Screen):
BINDINGS = [("q", "go_back", "返回"), ("escape", "quit_app", "退出")]
BINDINGS = [
("q", "go_back", "返回"),
("escape", "quit_app", "退出"),
("p", "precache", "预缓存音频"), # 新增预缓存快捷键
]
def __init__(
self, nucleon_file: pt.NucleonUnion, electron_file: pt.ElectronUnion
@@ -177,12 +377,20 @@ class PreparationScreen(Screen):
yield Label(f"内容源文件对象: ./nucleon/[b]{self.nucleon_file.name}[/b].toml")
yield Label(f"元数据文件对象: ./electron/[b]{self.electron_file.name}[/b].toml")
yield Label(f"\n单元数量:{len(self.nucleon_file)}\n")
yield Button(
"开始记忆",
id="start_memorizing_button",
variant="primary",
classes="start-button",
)
yield Button(
"预缓存音频",
id="precache_button",
variant="success",
classes="precache-button",
)
yield Static(f"\n单元预览:\n")
yield Markdown(self._get_full_content().replace("/", ""), classes="full")
yield Footer()
@@ -196,6 +404,11 @@ class PreparationScreen(Screen):
def action_go_back(self):
self.app.pop_screen()
def action_precache(self):
"""预缓存当前单元集的音频"""
precache_screen = PrecachingScreen(self.nucleon_file)
self.app.push_screen(precache_screen)
def action_quit_app(self):
self.app.exit()
@@ -205,6 +418,8 @@ class PreparationScreen(Screen):
self.nucleon_file, self.electron_file, config.get("tasked_number", 6)
)
self.app.push_screen(newscr)
elif event.button.id == "precache_button":
self.action_precache()
class NewNucleonScreen(Screen):
BINDINGS = [("q", "go_back", "返回"), ("escape", "quit_app", "退出")]
@@ -249,6 +464,8 @@ class NewNucleonScreen(Screen):
pass
class DashboardScreen(Screen):
#BINDINGS = [("p", "precache_all", "预缓存所有音频")] # 新增全局预缓存快捷键
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
yield Container(
@@ -258,6 +475,7 @@ class DashboardScreen(Screen):
Label("选择待学习或待修改的记忆单元集:", classes="title-label"),
ListView(id="file-list", classes="file-list-view"),
#Button("新建空的单元集", id="new_nucleon_button"),
Button("预缓存所有音频", id="precache_all_button", variant="success"),
Label(f"\"潜进\" 开放源代码软件项目 | 版本 {metadata.ver} {metadata.stage.capitalize()} | Wang Zhiyu 2025"),
)
yield Footer()
@@ -322,7 +540,13 @@ class DashboardScreen(Screen):
if event.button.id == "new_nucleon_button":
newscr = NewNucleonScreen()
self.app.push_screen(newscr)
elif event.button.id == "precache_all_button":
self.action_precache_all()
def action_precache_all(self):
"""预缓存所有单元集的音频"""
precache_screen = PrecachingScreen()
self.app.push_screen(precache_screen)
def action_quit_app(self) -> None:
self.app.exit()