552 lines
20 KiB
Python
552 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
from textual.app import App, ComposeResult
|
|
from textual.widgets import (
|
|
Header,
|
|
Footer,
|
|
Input,
|
|
ListView,
|
|
ProgressBar,
|
|
DirectoryTree,
|
|
ListItem,
|
|
Label,
|
|
Markdown,
|
|
Static,
|
|
Button,
|
|
Select,
|
|
)
|
|
from textual.containers import Container, Horizontal, Center
|
|
from textual.screen import Screen
|
|
from textual.worker import Worker, get_current_worker
|
|
import pathlib
|
|
import threading
|
|
import edge_tts as tts
|
|
from playsound import playsound
|
|
import particles as pt
|
|
from reactor import Reactor, Apparatus, Glimpse
|
|
import auxiliary as aux
|
|
import compositions as compo
|
|
import builtins
|
|
import metadata
|
|
import time
|
|
import shutil
|
|
|
|
config = aux.ConfigFile("config.toml")
|
|
|
|
class PrecachingScreen(Screen):
|
|
"""预缓存音频文件屏幕"""
|
|
BINDINGS = [("q", "go_back", "返回"), ("escape", "quit_app", "退出")]
|
|
|
|
def __init__(self, nucleon_file = None):
|
|
super().__init__(name=None, id=None, classes=None)
|
|
self.nucleon_file = nucleon_file
|
|
self.is_precaching = False
|
|
self.current_file = ""
|
|
self.current_item = ""
|
|
self.progress = 0
|
|
self.total = 0
|
|
self.processed = 0
|
|
self.precache_worker = None
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Header(show_clock=True)
|
|
with Container(id="precache_container"):
|
|
yield Label("[b]音频预缓存[/b]", classes="title-label")
|
|
|
|
if self.nucleon_file:
|
|
yield Static(f"目标单元集: [b]{self.nucleon_file.name}[/b]", classes="target-info")
|
|
yield Static(f"单元数量: {len(self.nucleon_file.nucleons)}", classes="target-info")
|
|
else:
|
|
yield Static("目标: 所有单元集", classes="target-info")
|
|
|
|
yield Static(id="status", classes="status-info")
|
|
yield Static(id="current_item", classes="current-item")
|
|
yield ProgressBar(total=100, show_eta=False, id="progress_bar")
|
|
|
|
with Horizontal(classes="button-group"):
|
|
if not self.is_precaching:
|
|
yield Button("开始预缓存", id="start_precache", variant="primary")
|
|
else:
|
|
yield Button("取消预缓存", id="cancel_precache", variant="error")
|
|
yield Button("清空缓存", id="clear_cache", variant="warning")
|
|
yield Button("返回", id="go_back", variant="default")
|
|
yield Footer()
|
|
|
|
def on_mount(self):
|
|
"""挂载时初始化状态"""
|
|
self.update_status("就绪", "等待开始...")
|
|
|
|
def update_status(self, status, current_item="", progress=None):
|
|
"""更新状态显示"""
|
|
status_widget = self.query_one("#status", Static)
|
|
item_widget = self.query_one("#current_item", Static)
|
|
progress_bar = self.query_one("#progress_bar", ProgressBar)
|
|
|
|
status_widget.update(f"状态: {status}")
|
|
item_widget.update(f"当前项目: {current_item}" if current_item else "")
|
|
|
|
if progress is not None:
|
|
progress_bar.progress = progress
|
|
progress_bar.advance(0) # 刷新显示
|
|
|
|
def precache_single_text(self, text: str):
|
|
"""预缓存单个文本的音频"""
|
|
cache_dir = pathlib.Path("./cache/voice/")
|
|
cache_dir.mkdir(parents=True, exist_ok=True)
|
|
cache = cache_dir / f"{aux.get_md5(text)}.wav"
|
|
if not cache.exists():
|
|
try:
|
|
communicate = tts.Communicate(text, "zh-CN-XiaoxiaoNeural")
|
|
communicate.save_sync(str(cache))
|
|
return True
|
|
except Exception as e:
|
|
print(f"预缓存失败 '{text}': {e}")
|
|
return False
|
|
return True
|
|
|
|
def precache_file(self, nucleon_union: pt.NucleonUnion):
|
|
"""预缓存单个文件的所有内容"""
|
|
self.current_file = nucleon_union.name
|
|
total_items = len(nucleon_union.nucleons)
|
|
|
|
for idx, nucleon in enumerate(nucleon_union.nucleons):
|
|
# 检查是否被取消
|
|
worker = get_current_worker()
|
|
if worker and worker.is_cancelled:
|
|
return False
|
|
|
|
text = nucleon['content'].replace('/', '')
|
|
self.current_item = text[:50] + "..." if len(text) > 50 else text
|
|
self.processed += 1
|
|
|
|
# 更新进度
|
|
progress = int((self.processed / self.total) * 100) if self.total > 0 else 0
|
|
self.update_status(
|
|
f"处理中: {nucleon_union.name} ({idx+1}/{total_items})",
|
|
self.current_item,
|
|
progress
|
|
)
|
|
|
|
# 预缓存音频
|
|
success = self.precache_single_text(text)
|
|
if not success:
|
|
self.update_status("错误", f"处理失败: {self.current_item}")
|
|
time.sleep(1) # 短暂暂停以便用户看到错误信息
|
|
|
|
return True
|
|
|
|
def precache_all_files(self):
|
|
"""预缓存所有文件"""
|
|
nucleon_path = pathlib.Path("./nucleon")
|
|
nucleon_files = [f for f in nucleon_path.iterdir() if f.suffix == ".toml"]
|
|
|
|
# 计算总项目数
|
|
self.total = 0
|
|
for file in nucleon_files:
|
|
try:
|
|
nu = pt.NucleonUnion(file)
|
|
self.total += len(nu.nucleons)
|
|
except:
|
|
continue
|
|
|
|
self.processed = 0
|
|
self.is_precaching = True
|
|
|
|
for file in nucleon_files:
|
|
try:
|
|
nu = pt.NucleonUnion(file)
|
|
if not self.precache_file(nu):
|
|
break # 用户取消
|
|
except Exception as e:
|
|
print(f"处理文件失败 {file}: {e}")
|
|
continue
|
|
|
|
self.is_precaching = False
|
|
self.update_status("完成", "所有音频文件已预缓存", 100)
|
|
|
|
def precache_single_file(self):
|
|
"""预缓存单个文件"""
|
|
if not self.nucleon_file:
|
|
return
|
|
|
|
self.total = len(self.nucleon_file.nucleons)
|
|
self.processed = 0
|
|
self.is_precaching = True
|
|
|
|
success = self.precache_file(self.nucleon_file)
|
|
|
|
self.is_precaching = False
|
|
if success:
|
|
self.update_status("完成", f"'{self.nucleon_file.name}' 音频文件已预缓存", 100)
|
|
else:
|
|
self.update_status("已取消", "预缓存操作被用户取消", 0)
|
|
|
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
if event.button.id == "start_precache" and not self.is_precaching:
|
|
# 开始预缓存
|
|
if self.nucleon_file:
|
|
self.precache_worker = self.run_worker(self.precache_single_file, thread=True)
|
|
else:
|
|
self.precache_worker = self.run_worker(self.precache_all_files, thread=True)
|
|
|
|
elif event.button.id == "cancel_precache" and self.is_precaching:
|
|
# 取消预缓存
|
|
if self.precache_worker:
|
|
self.precache_worker.cancel()
|
|
self.is_precaching = False
|
|
self.update_status("已取消", "预缓存操作被用户取消", 0)
|
|
|
|
elif event.button.id == "clear_cache":
|
|
# 清空缓存
|
|
try:
|
|
shutil.rmtree("./cache/voice", ignore_errors=True)
|
|
self.update_status("已清空", "音频缓存已清空", 0)
|
|
except Exception as e:
|
|
self.update_status("错误", f"清空缓存失败: {e}")
|
|
|
|
elif event.button.id == "go_back":
|
|
self.action_go_back()
|
|
|
|
def action_go_back(self):
|
|
if self.is_precaching and self.precache_worker:
|
|
self.precache_worker.cancel()
|
|
self.app.pop_screen()
|
|
|
|
def action_quit_app(self):
|
|
if self.is_precaching and self.precache_worker:
|
|
self.precache_worker.cancel()
|
|
self.app.exit()
|
|
|
|
|
|
class MemScreen(Screen):
|
|
BINDINGS = [
|
|
("d", "toggle_dark", "改变色调"),
|
|
("q", "pop_screen", "返回主菜单"),
|
|
("v", "play_voice", "朗读"),
|
|
# ("p", "precache_current", "预缓存当前单元集"), # 新增预缓存快捷键
|
|
]
|
|
if config.get("quick_pass"):
|
|
BINDINGS.append(("k", "quick_pass", "快速通过[调试]"))
|
|
btn = dict()
|
|
|
|
def __init__(
|
|
self,
|
|
nucleon_file: pt.NucleonUnion,
|
|
electron_file: pt.ElectronUnion,
|
|
tasked_num,
|
|
):
|
|
super().__init__(name=None, id=None, classes=None)
|
|
self.nucleon_file = nucleon_file
|
|
self.electron_file = electron_file
|
|
self.reactor = Reactor(nucleon_file, electron_file, self, tasked_num)
|
|
self.stage = 1
|
|
self.stage += self.reactor.set_round_templated(self.stage)
|
|
first_forward = self.reactor.forward()
|
|
print(first_forward)
|
|
if first_forward == -1:
|
|
self.stage = 3
|
|
self.reactor.set_round_templated(3)
|
|
print(self.reactor.forward())
|
|
#self._forward_judge(first_forward)
|
|
self.compo = next(self.reactor.current_appar)
|
|
self.feedback_state = 0 # 默认状态
|
|
self.feedback_state_map = {
|
|
0: "",
|
|
255: "回答有误, 请重试. 或者重新学习此单元",
|
|
}
|
|
|
|
def compose(self) -> ComposeResult:
|
|
if type(self.compo).__name__ == "Recognition":
|
|
self.action_play_voice()
|
|
yield Header(show_clock=True)
|
|
with Center():
|
|
yield Static(
|
|
f"当前进度: {len(self.reactor.procession) - self.reactor.index}/{len(self.reactor.procession)}"
|
|
)
|
|
yield Label(self.feedback_state_map[self.feedback_state])
|
|
yield from self.compo.compose()
|
|
if self.feedback_state == 255:
|
|
yield Button("重新学习此单元", id="re-recognize", variant="warning")
|
|
yield Footer()
|
|
|
|
def on_mount(self):
|
|
pass
|
|
|
|
def on_button_pressed(self, event):
|
|
try:
|
|
if event.button.id == "re-recognize":
|
|
return
|
|
except:
|
|
pass
|
|
ret = self.compo.handler(event, "button")
|
|
self._forward_judge(ret)
|
|
def _forward_judge(self, ret):
|
|
self.feedback_state = 0
|
|
if ret == -1:
|
|
return
|
|
if ret == 0:
|
|
try:
|
|
self.compo = next(self.reactor.current_appar)
|
|
self.refresh_ui()
|
|
except StopIteration:
|
|
nxt = self.reactor.forward(1)
|
|
try:
|
|
self.compo = next(self.reactor.current_appar)
|
|
except:
|
|
pass
|
|
if nxt == -1:
|
|
if self.reactor.round_set == 0:
|
|
if self.stage == 4:
|
|
if config.get("save"):
|
|
self.reactor.save()
|
|
self.compo = compo.Finished(
|
|
self, None, pt.Atom.placeholder()
|
|
)
|
|
self.refresh_ui()
|
|
else:
|
|
self.reactor.set_round_templated(self.stage)
|
|
self.reactor.forward(1)
|
|
self.stage += 1
|
|
self.compo = next(self.reactor.current_appar)
|
|
self.refresh_ui()
|
|
return
|
|
return
|
|
else:
|
|
self.refresh_ui()
|
|
return
|
|
if ret >= 1:
|
|
if ret == 2:
|
|
self.feedback_state = 255 # 表示错误
|
|
else:
|
|
self.feedback_state = 0
|
|
self.refresh_ui()
|
|
return
|
|
|
|
def refresh_ui(self):
|
|
self.call_later(self.recompose)
|
|
print(type(self.compo).__name__)
|
|
|
|
def action_play_voice(self):
|
|
def play():
|
|
cache_dir = pathlib.Path(f"./cache/voice/")
|
|
cache_dir.mkdir(parents=True, exist_ok=True)
|
|
cache = cache_dir / f"{aux.get_md5(self.reactor.current_atom[1].content.replace('/',''))}.wav"
|
|
if not cache.exists():
|
|
communicate = tts.Communicate(
|
|
self.reactor.current_atom[1].content.replace("/", ""),
|
|
"zh-CN-XiaoxiaoNeural",
|
|
)
|
|
communicate.save_sync(
|
|
f"./cache/voice/{aux.get_md5(self.reactor.current_atom[1].content.replace('/',''))}.wav"
|
|
)
|
|
playsound(str(cache))
|
|
|
|
threading.Thread(target=play).start()
|
|
|
|
def action_precache_current(self):
|
|
"""预缓存当前单元集的音频"""
|
|
precache_screen = PrecachingScreen(self.nucleon_file)
|
|
self.app.push_screen(precache_screen)
|
|
|
|
def action_quick_pass(self):
|
|
self.reactor.report(self.reactor.current_atom, 5)
|
|
self._forward_judge(0)
|
|
def action_toggle_dark(self):
|
|
self.app.action_toggle_dark()
|
|
|
|
def action_pop_screen(self):
|
|
self.app.pop_screen()
|
|
|
|
class PreparationScreen(Screen):
|
|
BINDINGS = [
|
|
("q", "go_back", "返回"),
|
|
("escape", "quit_app", "退出"),
|
|
("p", "precache", "预缓存音频"), # 新增预缓存快捷键
|
|
]
|
|
|
|
def __init__(
|
|
self, nucleon_file: pt.NucleonUnion, electron_file: pt.ElectronUnion
|
|
) -> None:
|
|
super().__init__(name=None, id=None, classes=None)
|
|
self.nucleon_file = nucleon_file
|
|
self.electron_file = electron_file
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Header(show_clock=True)
|
|
with Container(id="vice_container"):
|
|
yield Label(f"准备就绪: [b]{self.nucleon_file.name}[/b]\n")
|
|
yield Label(f"内容源文件对象: ./nucleon/[b]{self.nucleon_file.name}[/b].toml")
|
|
yield Label(f"元数据文件对象: ./electron/[b]{self.electron_file.name}[/b].toml")
|
|
yield Label(f"\n单元数量:{len(self.nucleon_file)}\n")
|
|
|
|
yield Button(
|
|
"开始记忆",
|
|
id="start_memorizing_button",
|
|
variant="primary",
|
|
classes="start-button",
|
|
)
|
|
yield Button(
|
|
"预缓存音频",
|
|
id="precache_button",
|
|
variant="success",
|
|
classes="precache-button",
|
|
)
|
|
|
|
yield Static(f"\n单元预览:\n")
|
|
yield Markdown(self._get_full_content().replace("/", ""), classes="full")
|
|
yield Footer()
|
|
|
|
def _get_full_content(self):
|
|
content = ""
|
|
for i in self.nucleon_file.nucleons:
|
|
content += " - " + i["content"] + " \n"
|
|
return content
|
|
|
|
def action_go_back(self):
|
|
self.app.pop_screen()
|
|
|
|
def action_precache(self):
|
|
"""预缓存当前单元集的音频"""
|
|
precache_screen = PrecachingScreen(self.nucleon_file)
|
|
self.app.push_screen(precache_screen)
|
|
|
|
def action_quit_app(self):
|
|
self.app.exit()
|
|
|
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
if event.button.id == "start_memorizing_button":
|
|
newscr = MemScreen(
|
|
self.nucleon_file, self.electron_file, config.get("tasked_number", 6)
|
|
)
|
|
self.app.push_screen(newscr)
|
|
elif event.button.id == "precache_button":
|
|
self.action_precache()
|
|
|
|
class NewNucleonScreen(Screen):
|
|
BINDINGS = [("q", "go_back", "返回"), ("escape", "quit_app", "退出")]
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__(name=None, id=None, classes=None)
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Header(show_clock=True)
|
|
with Container(id="vice_container"):
|
|
yield Label(f"[b]新建空的单元集\n")
|
|
yield Markdown("1. 键入单元集名称")
|
|
yield Input(placeholder="单元集名称")
|
|
yield Markdown("> 单元集名称不应与现有单元集重复, 新的单元集文件将创建在 ./nucleon/你输入的名称.toml")
|
|
yield Label(f"\n")
|
|
yield Markdown("2. 选择单元集类型")
|
|
LINES = """
|
|
单一字符串
|
|
主字符串(带有附加属性)
|
|
动态单元集(使用宏)
|
|
""".splitlines()
|
|
yield Select.from_values(LINES, prompt="选择类型")
|
|
yield Label(f"\n")
|
|
yield Markdown("3. 输入附加元数据 (可选)")
|
|
yield Input(placeholder="作者")
|
|
yield Input(placeholder="内容描述")
|
|
yield Button(
|
|
"新建空单元集",
|
|
id="submit_button",
|
|
variant="primary",
|
|
classes="start-button",
|
|
)
|
|
yield Footer()
|
|
|
|
def action_go_back(self):
|
|
self.app.pop_screen()
|
|
|
|
def action_quit_app(self):
|
|
self.app.exit()
|
|
|
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
pass
|
|
|
|
class DashboardScreen(Screen):
|
|
#BINDINGS = [("p", "precache_all", "预缓存所有音频")] # 新增全局预缓存快捷键
|
|
|
|
def compose(self) -> ComposeResult:
|
|
yield Header(show_clock=True)
|
|
yield Container(
|
|
Label(f'欢迎使用 "潜进" 启发式辅助记忆调度器, 版本 {metadata.ver}, 使用 {pt.Electron.algorithm} 调度算法', classes="title-label"),
|
|
Label(f"当前的 UNIX 日时间戳: {aux.get_daystamp()}"),
|
|
Label(f'包含时间戳修正: UTC+{config.get("timezone_offset")/3600}'),
|
|
Label("选择待学习或待修改的记忆单元集:", classes="title-label"),
|
|
ListView(id="file-list", classes="file-list-view"),
|
|
#Button("新建空的单元集", id="new_nucleon_button"),
|
|
Button("预缓存所有音频", id="precache_all_button", variant="success"),
|
|
Label(f"\"潜进\" 开放源代码软件项目 | 版本 {metadata.ver} {metadata.stage.capitalize()} | Wang Zhiyu 2025"),
|
|
)
|
|
yield Footer()
|
|
|
|
def item_desc_generator(self, path) -> dict:
|
|
gmp = Glimpse(pt.NucleonUnion(path))
|
|
res = dict()
|
|
res[0] = f"{gmp.name}.toml\0"
|
|
res[1] = f""
|
|
if gmp.is_initialized:
|
|
res[1] += f" 已激活单元: {gmp.activated_num}/{gmp.total_num}\n"
|
|
res[1] += f" 下一次复习: {gmp.next_date} (最后复习于 {gmp.lastest_date})\n"
|
|
res[1] += f" 系数均值: {gmp.avg_efactor}"
|
|
else:
|
|
res[1] = " 尚未激活"
|
|
return res
|
|
|
|
def on_mount(self) -> None:
|
|
file_list_widget = self.query_one("#file-list", ListView)
|
|
nucleon_path = pathlib.Path("./nucleon")
|
|
nucleon_files = sorted(
|
|
[f for f in nucleon_path.iterdir() if f.suffix == ".toml"],
|
|
key=lambda f: Glimpse(pt.NucleonUnion(f)).next_date,
|
|
reverse=True
|
|
)
|
|
|
|
if nucleon_files:
|
|
for file in nucleon_files:
|
|
text = self.item_desc_generator(pathlib.Path(file))
|
|
file_list_widget.append(ListItem(
|
|
Label(text[0] + '\n' + text[1]),
|
|
))
|
|
else:
|
|
file_list_widget.append(
|
|
ListItem(Static("在 ./nucleon/ 中未找到任何内容源数据文件.\n请放置文件后重启应用.\n或者新建空的单元集."))
|
|
)
|
|
file_list_widget.disabled = True
|
|
|
|
def on_list_view_selected(self, event: ListView.Selected) -> None:
|
|
if not isinstance(event.item, ListItem):
|
|
return
|
|
|
|
selected_label = event.item.query_one(Label)
|
|
if "未找到任何 .toml 文件" in str(selected_label.renderable):
|
|
return
|
|
|
|
selected_filename = str(selected_label.renderable).partition('\0')[0].replace('*', "")
|
|
nucleon_file = pt.NucleonUnion(
|
|
pathlib.Path("./nucleon") / selected_filename
|
|
)
|
|
electron_file_path = pathlib.Path("./electron") / selected_filename
|
|
if electron_file_path.exists():
|
|
pass
|
|
else:
|
|
electron_file_path.touch()
|
|
electron_file = pt.ElectronUnion(
|
|
pathlib.Path("./electron") / selected_filename
|
|
)
|
|
self.app.push_screen(PreparationScreen(nucleon_file, electron_file))
|
|
|
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
if event.button.id == "new_nucleon_button":
|
|
newscr = NewNucleonScreen()
|
|
self.app.push_screen(newscr)
|
|
elif event.button.id == "precache_all_button":
|
|
self.action_precache_all()
|
|
|
|
def action_precache_all(self):
|
|
"""预缓存所有单元集的音频"""
|
|
precache_screen = PrecachingScreen()
|
|
self.app.push_screen(precache_screen)
|
|
|
|
def action_quit_app(self) -> None:
|
|
self.app.exit() |