Files
HeurAMS/legacy/screens.py
2025-10-11 22:05:25 +08:00

552 lines
20 KiB
Python

#!/usr/bin/env python3
from textual.app import App, ComposeResult
from textual.widgets import (
Header,
Footer,
Input,
ListView,
ProgressBar,
DirectoryTree,
ListItem,
Label,
Markdown,
Static,
Button,
Select,
)
from textual.containers import Container, Horizontal, Center
from textual.screen import Screen
from textual.worker import Worker, get_current_worker
import pathlib
import threading
from playsound import playsound
import particles as pt
from reactor import Reactor, Apparatus, Glimpse
import auxiliary as aux
import compositions as compo
import builtins
import metadata
import time
import shutil
config = aux.ConfigFile("config.toml")
class PrecachingScreen(Screen):
"""预缓存音频文件屏幕"""
BINDINGS = [("q", "go_back", "返回"), ("escape", "quit_app", "退出")]
def __init__(self, nucleon_file = None):
super().__init__(name=None, id=None, classes=None)
self.nucleon_file = nucleon_file
self.is_precaching = False
self.current_file = ""
self.current_item = ""
self.progress = 0
self.total = 0
self.processed = 0
self.precache_worker = None
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Container(id="precache_container"):
yield Label("[b]音频预缓存[/b]", classes="title-label")
if self.nucleon_file:
yield Static(f"目标单元集: [b]{self.nucleon_file.name}[/b]", classes="target-info")
yield Static(f"单元数量: {len(self.nucleon_file.nucleons)}", classes="target-info")
else:
yield Static("目标: 所有单元集", classes="target-info")
yield Static(id="status", classes="status-info")
yield Static(id="current_item", classes="current-item")
yield ProgressBar(total=100, show_eta=False, id="progress_bar")
with Horizontal(classes="button-group"):
if not self.is_precaching:
yield Button("开始预缓存", id="start_precache", variant="primary")
else:
yield Button("取消预缓存", id="cancel_precache", variant="error")
yield Button("清空缓存", id="clear_cache", variant="warning")
yield Button("返回", id="go_back", variant="default")
yield Footer()
def on_mount(self):
"""挂载时初始化状态"""
self.update_status("就绪", "等待开始...")
def update_status(self, status, current_item="", progress=None):
"""更新状态显示"""
status_widget = self.query_one("#status", Static)
item_widget = self.query_one("#current_item", Static)
progress_bar = self.query_one("#progress_bar", ProgressBar)
status_widget.update(f"状态: {status}")
item_widget.update(f"当前项目: {current_item}" if current_item else "")
if progress is not None:
progress_bar.progress = progress
progress_bar.advance(0) # 刷新显示
def precache_single_text(self, text: str):
"""预缓存单个文本的音频"""
cache_dir = pathlib.Path("./cache/voice/")
cache_dir.mkdir(parents=True, exist_ok=True)
cache = cache_dir / f"{aux.get_md5(text)}.wav"
if not cache.exists():
try:
import edge_tts as tts
communicate = tts.Communicate(text, "zh-CN-XiaoxiaoNeural")
communicate.save_sync(str(cache))
return True
except Exception as e:
print(f"预缓存失败 '{text}': {e}")
return False
return True
def precache_file(self, nucleon_union: pt.NucleonUnion):
"""预缓存单个文件的所有内容"""
self.current_file = nucleon_union.name
total_items = len(nucleon_union.nucleons)
for idx, nucleon in enumerate(nucleon_union.nucleons):
# 检查是否被取消
worker = get_current_worker()
if worker and worker.is_cancelled:
return False
text = nucleon['content'].replace('/', '')
self.current_item = text[:50] + "..." if len(text) > 50 else text
self.processed += 1
# 更新进度
progress = int((self.processed / self.total) * 100) if self.total > 0 else 0
self.update_status(
f"处理中: {nucleon_union.name} ({idx+1}/{total_items})",
self.current_item,
progress
)
# 预缓存音频
success = self.precache_single_text(text)
if not success:
self.update_status("错误", f"处理失败: {self.current_item}")
time.sleep(1) # 短暂暂停以便用户看到错误信息
return True
def precache_all_files(self):
"""预缓存所有文件"""
nucleon_path = pathlib.Path("./nucleon")
nucleon_files = [f for f in nucleon_path.iterdir() if f.suffix == ".toml"]
# 计算总项目数
self.total = 0
for file in nucleon_files:
try:
nu = pt.NucleonUnion(file)
self.total += len(nu.nucleons)
except:
continue
self.processed = 0
self.is_precaching = True
for file in nucleon_files:
try:
nu = pt.NucleonUnion(file)
if not self.precache_file(nu):
break # 用户取消
except Exception as e:
print(f"处理文件失败 {file}: {e}")
continue
self.is_precaching = False
self.update_status("完成", "所有音频文件已预缓存", 100)
def precache_single_file(self):
"""预缓存单个文件"""
if not self.nucleon_file:
return
self.total = len(self.nucleon_file.nucleons)
self.processed = 0
self.is_precaching = True
success = self.precache_file(self.nucleon_file)
self.is_precaching = False
if success:
self.update_status("完成", f"'{self.nucleon_file.name}' 音频文件已预缓存", 100)
else:
self.update_status("已取消", "预缓存操作被用户取消", 0)
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "start_precache" and not self.is_precaching:
# 开始预缓存
if self.nucleon_file:
self.precache_worker = self.run_worker(self.precache_single_file, thread=True)
else:
self.precache_worker = self.run_worker(self.precache_all_files, thread=True)
elif event.button.id == "cancel_precache" and self.is_precaching:
# 取消预缓存
if self.precache_worker:
self.precache_worker.cancel()
self.is_precaching = False
self.update_status("已取消", "预缓存操作被用户取消", 0)
elif event.button.id == "clear_cache":
# 清空缓存
try:
shutil.rmtree("./cache/voice", ignore_errors=True)
self.update_status("已清空", "音频缓存已清空", 0)
except Exception as e:
self.update_status("错误", f"清空缓存失败: {e}")
elif event.button.id == "go_back":
self.action_go_back()
def action_go_back(self):
if self.is_precaching and self.precache_worker:
self.precache_worker.cancel()
self.app.pop_screen()
def action_quit_app(self):
if self.is_precaching and self.precache_worker:
self.precache_worker.cancel()
self.app.exit()
class MemScreen(Screen):
BINDINGS = [
("d", "toggle_dark", "改变色调"),
("q", "pop_screen", "返回主菜单"),
("v", "play_voice", "朗读"),
# ("p", "precache_current", "预缓存当前单元集"), # 新增预缓存快捷键
]
if config.get("quick_pass"):
BINDINGS.append(("k", "quick_pass", "快速通过[调试]"))
btn = dict()
def __init__(
self,
nucleon_file: pt.NucleonUnion,
electron_file: pt.ElectronUnion,
tasked_num,
):
super().__init__(name=None, id=None, classes=None)
self.nucleon_file = nucleon_file
self.electron_file = electron_file
self.reactor = Reactor(nucleon_file, electron_file, self, tasked_num)
self.stage = 1
self.stage += self.reactor.set_round_templated(self.stage)
first_forward = self.reactor.forward()
print(first_forward)
if first_forward == -1:
self.stage = 3
self.reactor.set_round_templated(3)
print(self.reactor.forward())
#self._forward_judge(first_forward)
self.compo = next(self.reactor.current_appar)
self.feedback_state = 0 # 默认状态
self.feedback_state_map = {
0: "",
255: "回答有误, 请重试. 或者重新学习此单元",
}
def compose(self) -> ComposeResult:
if type(self.compo).__name__ == "Recognition":
self.action_play_voice()
yield Header(show_clock=True)
with Center():
yield Static(
f"当前进度: {len(self.reactor.procession) - self.reactor.index}/{len(self.reactor.procession)}"
)
yield Label(self.feedback_state_map[self.feedback_state])
yield from self.compo.compose()
if self.feedback_state == 255:
yield Button("重新学习此单元", id="re-recognize", variant="warning")
yield Footer()
def on_mount(self):
pass
def on_button_pressed(self, event):
try:
if event.button.id == "re-recognize":
return
except:
pass
ret = self.compo.handler(event, "button")
self._forward_judge(ret)
def _forward_judge(self, ret):
self.feedback_state = 0
if ret == -1:
return
if ret == 0:
try:
self.compo = next(self.reactor.current_appar)
self.refresh_ui()
except StopIteration:
nxt = self.reactor.forward(1)
try:
self.compo = next(self.reactor.current_appar)
except:
pass
if nxt == -1:
if self.reactor.round_set == 0:
if self.stage == 4:
if config.get("save"):
self.reactor.save()
self.compo = compo.Finished(
self, None, pt.Atom.placeholder()
)
self.refresh_ui()
else:
self.reactor.set_round_templated(self.stage)
self.reactor.forward(1)
self.stage += 1
self.compo = next(self.reactor.current_appar)
self.refresh_ui()
return
return
else:
self.refresh_ui()
return
if ret >= 1:
if ret == 2:
self.feedback_state = 255 # 表示错误
else:
self.feedback_state = 0
self.refresh_ui()
return
def refresh_ui(self):
self.call_later(self.recompose)
print(type(self.compo).__name__)
def action_play_voice(self):
def play():
cache_dir = pathlib.Path(f"./cache/voice/")
cache_dir.mkdir(parents=True, exist_ok=True)
cache = cache_dir / f"{aux.get_md5(self.reactor.current_atom[1].content.replace('/',''))}.wav"
if not cache.exists():
import edge_tts as tts
communicate = tts.Communicate(
self.reactor.current_atom[1].content.replace("/", ""),
"zh-CN-XiaoxiaoNeural",
)
communicate.save_sync(
f"./cache/voice/{aux.get_md5(self.reactor.current_atom[1].content.replace('/',''))}.wav"
)
playsound(str(cache))
threading.Thread(target=play).start()
def action_precache_current(self):
"""预缓存当前单元集的音频"""
precache_screen = PrecachingScreen(self.nucleon_file)
self.app.push_screen(precache_screen)
def action_quick_pass(self):
self.reactor.report(self.reactor.current_atom, 5)
self._forward_judge(0)
def action_toggle_dark(self):
self.app.action_toggle_dark()
def action_pop_screen(self):
self.app.pop_screen()
class PreparationScreen(Screen):
BINDINGS = [
("q", "go_back", "返回"),
("escape", "quit_app", "退出"),
("p", "precache", "预缓存音频"), # 新增预缓存快捷键
]
def __init__(
self, nucleon_file: pt.NucleonUnion, electron_file: pt.ElectronUnion
) -> None:
super().__init__(name=None, id=None, classes=None)
self.nucleon_file = nucleon_file
self.electron_file = electron_file
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Container(id="vice_container"):
yield Label(f"准备就绪: [b]{self.nucleon_file.name}[/b]\n")
yield Label(f"内容源文件对象: ./nucleon/[b]{self.nucleon_file.name}[/b].toml")
yield Label(f"元数据文件对象: ./electron/[b]{self.electron_file.name}[/b].toml")
yield Label(f"\n单元数量:{len(self.nucleon_file)}\n")
yield Button(
"开始记忆",
id="start_memorizing_button",
variant="primary",
classes="start-button",
)
yield Button(
"预缓存音频",
id="precache_button",
variant="success",
classes="precache-button",
)
yield Static(f"\n单元预览:\n")
yield Markdown(self._get_full_content().replace("/", ""), classes="full")
yield Footer()
def _get_full_content(self):
content = ""
for i in self.nucleon_file.nucleons:
content += " - " + i["content"] + " \n"
return content
def action_go_back(self):
self.app.pop_screen()
def action_precache(self):
"""预缓存当前单元集的音频"""
precache_screen = PrecachingScreen(self.nucleon_file)
self.app.push_screen(precache_screen)
def action_quit_app(self):
self.app.exit()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "start_memorizing_button":
newscr = MemScreen(
self.nucleon_file, self.electron_file, config.get("tasked_number", 6)
)
self.app.push_screen(newscr)
elif event.button.id == "precache_button":
self.action_precache()
class NewNucleonScreen(Screen):
BINDINGS = [("q", "go_back", "返回"), ("escape", "quit_app", "退出")]
def __init__(self) -> None:
super().__init__(name=None, id=None, classes=None)
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Container(id="vice_container"):
yield Label(f"[b]新建空的单元集\n")
yield Markdown("1. 键入单元集名称")
yield Input(placeholder="单元集名称")
yield Markdown("> 单元集名称不应与现有单元集重复, 新的单元集文件将创建在 ./nucleon/你输入的名称.toml")
yield Label(f"\n")
yield Markdown("2. 选择单元集类型")
LINES = """
单一字符串
主字符串(带有附加属性)
动态单元集(使用宏)
""".splitlines()
yield Select.from_values(LINES, prompt="选择类型")
yield Label(f"\n")
yield Markdown("3. 输入附加元数据 (可选)")
yield Input(placeholder="作者")
yield Input(placeholder="内容描述")
yield Button(
"新建空单元集",
id="submit_button",
variant="primary",
classes="start-button",
)
yield Footer()
def action_go_back(self):
self.app.pop_screen()
def action_quit_app(self):
self.app.exit()
def on_button_pressed(self, event: Button.Pressed) -> None:
pass
class DashboardScreen(Screen):
#BINDINGS = [("p", "precache_all", "预缓存所有音频")] # 新增全局预缓存快捷键
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
yield Container(
Label(f'欢迎使用 "潜进" 启发式辅助记忆调度器, 版本 {metadata.ver}, 使用 {pt.Electron.algorithm} 调度算法', classes="title-label"),
Label(f"当前的 UNIX 日时间戳: {aux.get_daystamp()}"),
Label(f'包含时间戳修正: UTC+{config.get("timezone_offset")/3600}'),
Label("选择待学习或待修改的记忆单元集:", classes="title-label"),
ListView(id="file-list", classes="file-list-view"),
#Button("新建空的单元集", id="new_nucleon_button"),
Button("音频预缓存实用程序", id="precache_all_button"),
Label(f"\"潜进\" 开放源代码软件项目 | 版本 {metadata.ver} {metadata.stage.capitalize()} | Wang Zhiyu 2025"),
)
yield Footer()
def item_desc_generator(self, path) -> dict:
gmp = Glimpse(pt.NucleonUnion(path))
res = dict()
res[0] = f"{gmp.name}.toml\0"
res[1] = f""
if gmp.is_initialized:
res[1] += f" 已激活单元: {gmp.activated_num}/{gmp.total_num}\n"
res[1] += f" 下一次复习: {gmp.next_date} (最后复习于 {gmp.lastest_date})\n"
res[1] += f" 系数均值: {gmp.avg_efactor}"
else:
res[1] = " 尚未激活"
return res
def on_mount(self) -> None:
file_list_widget = self.query_one("#file-list", ListView)
nucleon_path = pathlib.Path("./nucleon")
nucleon_files = sorted(
[f for f in nucleon_path.iterdir() if f.suffix == ".toml"],
key=lambda f: Glimpse(pt.NucleonUnion(f)).next_date,
reverse=True
)
if nucleon_files:
for file in nucleon_files:
text = self.item_desc_generator(pathlib.Path(file))
file_list_widget.append(ListItem(
Label(text[0] + '\n' + text[1]),
))
else:
file_list_widget.append(
ListItem(Static("在 ./nucleon/ 中未找到任何内容源数据文件.\n请放置文件后重启应用.\n或者新建空的单元集."))
)
file_list_widget.disabled = True
def on_list_view_selected(self, event: ListView.Selected) -> None:
if not isinstance(event.item, ListItem):
return
selected_label = event.item.query_one(Label)
if "未找到任何 .toml 文件" in str(selected_label.renderable):
return
selected_filename = str(selected_label.renderable).partition('\0')[0].replace('*', "")
nucleon_file = pt.NucleonUnion(
pathlib.Path("./nucleon") / selected_filename
)
electron_file_path = pathlib.Path("./electron") / selected_filename
if electron_file_path.exists():
pass
else:
electron_file_path.touch()
electron_file = pt.ElectronUnion(
pathlib.Path("./electron") / selected_filename
)
self.app.push_screen(PreparationScreen(nucleon_file, electron_file))
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "new_nucleon_button":
newscr = NewNucleonScreen()
self.app.push_screen(newscr)
elif event.button.id == "precache_all_button":
self.action_precache_all()
def action_precache_all(self):
"""预缓存所有单元集的音频"""
precache_screen = PrecachingScreen()
self.app.push_screen(precache_screen)
def action_quit_app(self) -> None:
self.app.exit()