feat: 改进缓存管理器

This commit is contained in:
2025-12-08 16:50:44 +08:00
parent 483670b23d
commit 42708e8a09
4 changed files with 112 additions and 79 deletions

View File

@@ -41,7 +41,7 @@ class MemScreen(Screen):
print(puzzle_info)
return shim.puzzle2widget[puzzle_info["puzzle"]](atom = self.procession.current_atom, alia = puzzle_info["alia"])
except (KeyError, StopIteration, AttributeError) as e:
print(f"Fission error: {e}")
print(f"调度展开出错: {e}")
return Static("无法生成谜题")
#print(shim.puzzle2widget[puzzle_info["puzzle"]])
@@ -50,7 +50,7 @@ class MemScreen(Screen):
with Center():
yield Static(f"当前进度: {self.procession.process()}/{self.procession.total_length()}")
self.mount(self.current_widget()) # type: ignore
#yield Button("重新学习此单元", id="re-recognize", variant="warning")
yield Button("重新学习此单元", id="re-recognize", variant="warning")
yield Footer()
def on_mount(self):
@@ -63,12 +63,6 @@ class MemScreen(Screen):
"""朗读当前内容"""
pass
def action_precache_current(self):
"""预缓存当前单元集的音频"""
#from .precache import PrecachingScreen
#precache_screen = PrecachingScreen(self.nucleon_file)
#self.app.push_screen(precache_screen)
def action_toggle_dark(self):
self.app.action_toggle_dark()

View File

@@ -19,14 +19,22 @@ import pathlib
import heurams.kernel.particles as pt
import heurams.services.hasher as hasher
from heurams.context import *
from textual.worker import Worker, get_current_worker
class PrecachingScreen(Screen):
"""预缓存音频文件屏幕"""
"""预缓存音频文件屏幕
缓存记忆单元音频文件, 全部(默认) 或部分记忆单元(可选参数传入)
Args:
nucleons (list): 可选列表, 仅包含 Nucleon 对象
desc (list): 可选字符串, 包含对此次调用的文字描述
"""
BINDINGS = [("q", "go_back", "返回")]
def __init__(self, nucleon_file = None):
def __init__(self, nucleons: list = [], desc: str = ""):
super().__init__(name=None, id=None, classes=None)
self.nucleon_file = nucleon_file
self.nucleons = nucleons
self.is_precaching = False
self.current_file = ""
self.current_item = ""
@@ -34,17 +42,21 @@ class PrecachingScreen(Screen):
self.total = 0
self.processed = 0
self.precache_worker = None
self.desc = desc
for i in nucleons:
i: pt.Nucleon
i.do_eval()
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Container(id="precache_container"):
yield Label("[b]音频预缓存[/b]", classes="title-label")
if self.nucleon_file:
yield Static(f"目标单元: [b]{self.nucleon_file.name}[/b]", classes="target-info")
yield Static(f"单元数量: {len(self.nucleon_file.nucleons)}", classes="target-info")
if self.nucleons:
yield Static(f"目标单元归属: [b]{self.desc}[/b]", classes="target-info")
yield Static(f"单元数量: {len(self.nucleons)}", classes="target-info")
else:
yield Static("目标: 所有单元", classes="target-info")
yield Static("目标: 所有单元", classes="target-info")
yield Static(id="status", classes="status-info")
yield Static(id="current_item", classes="current-item")
@@ -76,65 +88,71 @@ class PrecachingScreen(Screen):
progress_bar.progress = progress
progress_bar.advance(0) # 刷新显示
def precache_single_text(self, text: str):
"""预缓存单文本的音频"""
cache_dir = pathlib.Path("./cache/voice/")
def precache_by_text(self, text: str):
"""预缓存单文本的音频"""
from heurams.context import rootdir, workdir, config_var
cache_dir = pathlib.Path(config_var.get()["paths"]["cache_dir"])
cache_dir.mkdir(parents=True, exist_ok=True)
cache = cache_dir / f"{hasher.get_md5(text)}.wav"
if not cache.exists():
try:
cache_file = cache_dir / f"{hasher.get_md5(text)}.wav"
if not cache_file.exists():
try: # TODO: 调用模块消除tts耦合
import edge_tts as tts
communicate = tts.Communicate(text, "zh-CN-XiaoxiaoNeural")
communicate.save_sync(str(cache))
return True
communicate.save_sync(str(cache_file))
return 1
except Exception as e:
print(f"预缓存失败 '{text}': {e}")
return False
return True
def precache_file(self, path: pathlib.Path):
"""预缓存单个文件的所有内容"""
self.current_file = path.name
total_items = len(pt.load_nucleon(path))
for idx, nucleon in enumerate(nucleon_union.nucleons):
# 检查是否被取消
return 0
return 1
def precache_by_nucleon(self, nucleon: pt.Nucleon):
"""依据 Nucleon 缓存"""
return self.precache_by_text(nucleon.metadata['formation']['tts_text'])
#print(f"TTS 缓存: {nucleon.metadata['formation']['tts_text']}")
def precache_by_list(self, nucleons: list):
"""依据 Nucleons 列表缓存"""
for idx, nucleon in enumerate(nucleons):
worker = get_current_worker()
if worker and worker.is_cancelled:
if worker and worker.is_cancelled: # 函数在worker中执行且已被取消
return False
text = nucleon['content'].replace('/', '')
self.current_item = text[:50] + "..." if len(text) > 50 else text
text = nucleon.metadata['formation']['tts_text']
self.current_item = text[:30] + "..." if len(text) > 50 else text
self.processed += 1
# 更新进度
progress = int((self.processed / self.total) * 100) if self.total > 0 else 0
self.update_status(
f"处理中: {nucleon_union.name} ({idx+1}/{total_items})",
self.current_item,
progress
f"处理 ({idx + 1}/{len(nucleons)})"
)
# 预缓存音频
success = self.precache_single_text(text)
if not success:
self.update_status("错误", f"处理失败: {self.current_item}")
time.sleep(1) # 短暂暂停以便用户看到错误信息
return True
ret = self.precache_by_nucleon(nucleon)
if not ret:
self.update_status(
"出错",
f"处理失败, 跳过: {self.current_item}",
)
import time
time.sleep(1)
return True
def precache_by_nucleons(self):
return self.precache_by_list(self.nucleons)
def precache_by_filepath(self, path: pathlib.Path):
"""预缓存单个文件的所有内容"""
return self.precache_by_list(pt.load_nucleon(path)[0])
def precache_all_files(self):
"""预缓存所有文件"""
nucleon_path = pathlib.Path("./nucleon")
nucleon_files = [f for f in nucleon_path.iterdir() if f.suffix == ".toml"]
from heurams.context import rootdir, workdir, config_var
nucleon_path = pathlib.Path(config_var.get()["paths"]["nucleon_dir"])
nucleon_files = [f for f in nucleon_path.iterdir() if f.suffix == ".toml"] # TODO: 解耦合
# 计算总项目数
self.total = 0
nu = list()
for file in nucleon_files:
try:
nu += pt.load_nucleon(file)
nu += pt.load_nucleon(file)[0]
self.total = len(nu)
except:
continue
@@ -144,38 +162,22 @@ class PrecachingScreen(Screen):
for file in nucleon_files:
try:
nu += pt.load_nucleon(file)
if not self.precache_file(nu):
nu += pt.load_nucleon(file)[0]
if not self.precache_by_list(nu):
break # 用户取消
except Exception as e:
print(f"处理文件失败 {file}: {e}")
continue
self.is_precaching = False
self.update_status("完成", "所有音频文件已预缓存", 100)
def precache_single_file(self):
"""预缓存单个文件"""
if not self.nucleon_file:
return
self.total = len(self.nucleon_file.nucleons)
self.processed = 0
self.is_precaching = True
success = self.precache_file(self.nucleon_file)
self.is_precaching = False
if success:
self.update_status("完成", f"'{self.nucleon_file.name}' 音频文件已预缓存", 100)
else:
self.update_status("已取消", "预缓存操作被用户取消", 0)
self.update_status("完成", "所有单元的音频已被预缓存", 100)
def on_button_pressed(self, event: Button.Pressed) -> None:
event.stop()
if event.button.id == "start_precache" and not self.is_precaching:
# 开始预缓存
if self.nucleon_file:
self.precache_worker = self.run_worker(self.precache_single_file, thread=True)
if self.nucleons:
self.precache_worker = self.run_worker(self.precache_by_nucleons, thread=True)
else:
self.precache_worker = self.run_worker(self.precache_all_files, thread=True)
@@ -189,7 +191,9 @@ class PrecachingScreen(Screen):
elif event.button.id == "clear_cache":
# 清空缓存
try:
shutil.rmtree("./cache/voice", ignore_errors=True)
import shutil
from heurams.context import rootdir, workdir, config_var
shutil.rmtree(f"{config_var.get()["paths"]["cache_dir"]}", ignore_errors=True)
self.update_status("已清空", "音频缓存已清空", 0)
except Exception as e:
self.update_status("错误", f"清空缓存失败: {e}")

View File

@@ -18,7 +18,6 @@ from heurams.context import *
class PreparationScreen(Screen):
BINDINGS = [
("q", "go_back", "返回"),
("escape", "quit_app", "退出"),
("p", "precache", "预缓存音频")
]
@@ -69,7 +68,10 @@ class PreparationScreen(Screen):
def action_precache(self):
from ..screens.precache import PrecachingScreen
precache_screen = PrecachingScreen(self.nucleon_file)
lst = list()
for i in self.nucleons_with_orbital:
lst.append(i[0])
precache_screen = PrecachingScreen(lst)
self.app.push_screen(precache_screen)
def action_quit_app(self):

View File

@@ -29,7 +29,40 @@ class Nucleon:
def __hash__(self):
return hash(self.ident)
def do_eval(self):
"""
执行并以结果替换当前单元的所有 eval 语句
TODO: 带有限制的 eval, 异步/多线程执行避免堵塞
"""
# eval 环境设置
def eval_with_env(s: str):
try:
nucleon = self
ret = str(eval(s))
except Exception as e:
ret = f"此 eval 实例发生错误: {e}"
return ret
def traverse(data, modifier):
if isinstance(data, dict):
for key, value in data.items():
data[key] = traverse(value, modifier)
return data
elif isinstance(data, list):
for i, item in enumerate(data):
data[i] = traverse(item, modifier)
return data
elif isinstance(data, tuple):
return tuple(traverse(item, modifier) for item in data)
else:
if isinstance(data, str):
if data.startswith("eval:"):
return modifier(data[5:])
return data
traverse(self.payload, eval_with_env)
traverse(self.metadata, eval_with_env)
@staticmethod
def placeholder():
"""生成一个占位原子核"""