试图移植

This commit is contained in:
2025-11-02 23:42:53 +08:00
parent f86187868b
commit 0e08fb3a41
18 changed files with 689 additions and 156 deletions

View File

@@ -0,0 +1,89 @@
#!/usr/bin/env python3
from textual.app import ComposeResult
from textual.widgets import (
Header,
Footer,
Label,
ListView,
ListItem,
Button,
Static,
)
from textual.containers import Container
from textual.screen import Screen
from heurams.kernel.particles import *
from heurams.context import *
class DashboardScreen(Screen):
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
yield Container(
Label(f'欢迎使用 "潜进" 启发式辅助记忆调度器', classes="title-label"),
Label(f"当前的 UNIX 日时间戳: 0"),
Label(f'包含时间戳修正: UTC+0'),
Label("选择待学习或待修改的记忆单元集:", classes="title-label"),
ListView(id="union-list", classes="union-list-view"),
Label(f'"潜进" 开放源代码软件项目 | 版本 0.0.1 | Wang Zhiyu 2025'),
)
yield Footer()
def item_desc_generator(self, path) -> dict:
res = dict()
res[0] = f"{path.name}\0"
res[1] = f""
res[1] = " 尚未激活"
return res
def on_mount(self) -> None:
union_list_widget = self.query_one("#union-list", ListView)
probe = probe_all(0)
if len(probe["nucleon"]):
for file in probe["nucleon"]:
text = self.item_desc_generator(file)
union_list_widget.append(ListItem(
Label(text[0] + '\n' + text[1]),
))
else:
union_list_widget.append(
ListItem(Static("在 ./nucleon/ 中未找到任何内容源数据文件.\n请放置文件后重启应用.\n或者新建空的单元集."))
)
union_list_widget.disabled = True
def on_list_view_selected(self, event) -> None:
if not isinstance(event.item, ListItem):
return
selected_label = event.item.query_one(Label)
if "未找到任何 .toml 文件" in str(selected_label.renderable):
return
selected_filename = str(selected_label.renderable).partition('\0')[0].replace('*', "")
# 这里需要导入相应的文件处理逻辑
# nucleon_file = pt.NucleonUnion(pathlib.Path("./nucleon") / selected_filename)
# electron_file_path = pathlib.Path("./electron") / selected_filename
# if electron_file_path.exists():
# pass
# else:
# electron_file_path.touch()
# electron_file = pt.ElectronUnion(pathlib.Path("./electron") / selected_filename)
# self.app.push_screen(PreparationScreen(nucleon_file, electron_file))
def on_button_pressed(self, event) -> None:
if event.button.id == "new_nucleon_button":
from .nucleon_creator import NucleonCreatorScreen
newscr = NucleonCreatorScreen()
self.app.push_screen(newscr)
elif event.button.id == "precache_all_button":
self.action_precache_all()
def action_precache_all(self):
"""预缓存所有单元集的音频"""
from .precache import PrecachingScreen
precache_screen = PrecachingScreen()
self.app.push_screen(precache_screen)
def action_quit_app(self) -> None:
self.app.exit()

View File

@@ -0,0 +1,55 @@
#!/usr/bin/env python3
from textual.app import ComposeResult
from textual.widgets import Header, Footer, Label, Static, Button
from textual.containers import Center
from textual.screen import Screen
from enum import Enum, auto
from heurams.kernel.reactor import *
class AtomState(Enum):
FAILED = auto()
NORMAL = auto()
class MemScreen(Screen):
BINDINGS = [
("d", "toggle_dark", "改变色调"),
("q", "pop_screen", "返回主菜单"),
("v", "play_voice", "朗读"),
]
def __init__(self, atoms: list):
super().__init__(name=None, id=None, classes=None)
self.atoms = atoms
self.phaser = Phaser(atoms)
self.procession: Procession = self.phaser.current_procession() # type: ignore
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Center():
yield Static(f"当前进度: {self.procession.process()}/{self.procession.total_length()}")
yield Label()
yield Button("重新学习此单元", id="re-recognize", variant="warning")
yield Footer()
def on_mount(self):
pass
def on_button_pressed(self, event):
pass
def action_play_voice(self):
"""朗读当前内容"""
pass
def action_precache_current(self):
"""预缓存当前单元集的音频"""
from .precache import PrecachingScreen
precache_screen = PrecachingScreen(self.nucleon_file)
self.app.push_screen(precache_screen)
def action_toggle_dark(self):
self.app.action_toggle_dark()
def action_pop_screen(self):
self.app.pop_screen()

View File

@@ -1,140 +0,0 @@
class MemScreen(Screen):
BINDINGS = [
("d", "toggle_dark", "改变色调"),
("q", "pop_screen", "返回主菜单"),
("v", "play_voice", "朗读"),
# ("p", "precache_current", "预缓存当前单元集"), # 新增预缓存快捷键
]
if config.get("quick_pass"):
BINDINGS.append(("k", "quick_pass", "快速通过[调试]"))
btn = dict()
def __init__(
self,
nucleon_file: pt.NucleonUnion,
electron_file: pt.ElectronUnion,
tasked_num,
):
super().__init__(name=None, id=None, classes=None)
self.nucleon_file = nucleon_file
self.electron_file = electron_file
self.reactor = Reactor(nucleon_file, electron_file, self, tasked_num)
self.stage = 1
self.stage += self.reactor.set_round_templated(self.stage)
first_forward = self.reactor.forward()
print(first_forward)
if first_forward == -1:
self.stage = 3
self.reactor.set_round_templated(3)
print(self.reactor.forward())
#self._forward_judge(first_forward)
self.compo = next(self.reactor.current_appar)
self.feedback_state = 0 # 默认状态
self.feedback_state_map = {
0: "",
255: "回答有误, 请重试. 或者重新学习此单元",
}
def compose(self) -> ComposeResult:
if type(self.compo).__name__ == "Recognition":
self.action_play_voice()
yield Header(show_clock=True)
with Center():
yield Static(
f"当前进度: {len(self.reactor.procession) - self.reactor.index}/{len(self.reactor.procession)}"
)
yield Label(self.feedback_state_map[self.feedback_state])
yield from self.compo.compose()
if self.feedback_state == 255:
yield Button("重新学习此单元", id="re-recognize", variant="warning")
yield Footer()
def on_mount(self):
pass
def on_button_pressed(self, event):
try:
if event.button.id == "re-recognize":
return
except:
pass
ret = self.compo.handler(event, "button")
self._forward_judge(ret)
def _forward_judge(self, ret):
self.feedback_state = 0
if ret == -1:
return
if ret == 0:
try:
self.compo = next(self.reactor.current_appar)
self.refresh_ui()
except StopIteration:
nxt = self.reactor.forward(1)
try:
self.compo = next(self.reactor.current_appar)
except:
pass
if nxt == -1:
if self.reactor.round_set == 0:
if self.stage == 4:
if config.get("save"):
self.reactor.save()
self.compo = compo.Finished(
self, None, pt.Atom.placeholder()
)
self.refresh_ui()
else:
self.reactor.set_round_templated(self.stage)
self.reactor.forward(1)
self.stage += 1
self.compo = next(self.reactor.current_appar)
self.refresh_ui()
return
return
else:
self.refresh_ui()
return
if ret >= 1:
if ret == 2:
self.feedback_state = 255 # 表示错误
else:
self.feedback_state = 0
self.refresh_ui()
return
def refresh_ui(self):
self.call_later(self.recompose)
print(type(self.compo).__name__)
def action_play_voice(self):
def play():
cache_dir = pathlib.Path(f"./cache/voice/")
cache_dir.mkdir(parents=True, exist_ok=True)
cache = cache_dir / f"{aux.get_md5(self.reactor.current_atom[1].content.replace('/',''))}.wav"
if not cache.exists():
import edge_tts as tts
communicate = tts.Communicate(
self.reactor.current_atom[1].content.replace("/", ""),
"zh-CN-XiaoxiaoNeural",
)
communicate.save_sync(
f"./cache/voice/{aux.get_md5(self.reactor.current_atom[1].content.replace('/',''))}.wav"
)
playsound(str(cache))
threading.Thread(target=play).start()
def action_precache_current(self):
"""预缓存当前单元集的音频"""
precache_screen = PrecachingScreen(self.nucleon_file)
self.app.push_screen(precache_screen)
def action_quick_pass(self):
self.reactor.report(self.reactor.current_atom, 5)
self._forward_judge(0)
def action_toggle_dark(self):
self.app.action_toggle_dark()
def action_pop_screen(self):
self.app.pop_screen()

View File

@@ -0,0 +1,55 @@
#!/usr/bin/env python3
from textual.app import ComposeResult
from textual.widgets import (
Header,
Footer,
Label,
Input,
Select,
Button,
Markdown,
)
from textual.containers import Container
from textual.screen import Screen
class NucleonCreatorScreen(Screen):
BINDINGS = [("q", "go_back", "返回"), ("escape", "quit_app", "退出")]
def __init__(self) -> None:
super().__init__(name=None, id=None, classes=None)
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Container(id="vice_container"):
yield Label(f"[b]新建空的单元集\n")
yield Markdown("1. 键入单元集名称")
yield Input(placeholder="单元集名称")
yield Markdown("> 单元集名称不应与现有单元集重复, 新的单元集文件将创建在 ./nucleon/你输入的名称.toml")
yield Label(f"\n")
yield Markdown("2. 选择单元集类型")
LINES = """
单一字符串
主字符串(带有附加属性)
动态单元集(使用宏)
""".splitlines()
yield Select.from_values(LINES, prompt="选择类型")
yield Label(f"\n")
yield Markdown("3. 输入附加元数据 (可选)")
yield Input(placeholder="作者")
yield Input(placeholder="内容描述")
yield Button(
"新建空单元集",
id="submit_button",
variant="primary",
classes="start-button",
)
yield Footer()
def action_go_back(self):
self.app.pop_screen()
def action_quit_app(self):
self.app.exit()
def on_button_pressed(self, event) -> None:
pass

View File

@@ -1,7 +1,28 @@
#!/usr/bin/env python3
from textual.app import ComposeResult
from textual.widgets import (
Header,
Footer,
Label,
Input,
Select,
Button,
Markdown,
Static,
ProgressBar,
)
from textual.containers import Container, Horizontal, Center
from textual.containers import Container
from textual.screen import Screen
import pathlib
import heurams.kernel.particles as pt
import heurams.services.hasher as hasher
from heurams.context import *
class PrecachingScreen(Screen):
"""预缓存音频文件屏幕"""
BINDINGS = [("q", "go_back", "返回"), ("escape", "quit_app", "退出")]
BINDINGS = [("q", "go_back", "返回")]
def __init__(self, nucleon_file = None):
super().__init__(name=None, id=None, classes=None)
@@ -59,7 +80,7 @@ class PrecachingScreen(Screen):
"""预缓存单个文本的音频"""
cache_dir = pathlib.Path("./cache/voice/")
cache_dir.mkdir(parents=True, exist_ok=True)
cache = cache_dir / f"{aux.get_md5(text)}.wav"
cache = cache_dir / f"{hasher.get_md5(text)}.wav"
if not cache.exists():
try:
import edge_tts as tts
@@ -71,10 +92,10 @@ class PrecachingScreen(Screen):
return False
return True
def precache_file(self, nucleon_union: pt.NucleonUnion):
def precache_file(self, path: pathlib.Path):
"""预缓存单个文件的所有内容"""
self.current_file = nucleon_union.name
total_items = len(nucleon_union.nucleons)
self.current_file = path.name
total_items = len(pt.load_nucleon(path))
for idx, nucleon in enumerate(nucleon_union.nucleons):
# 检查是否被取消
@@ -102,6 +123,7 @@ class PrecachingScreen(Screen):
return True
def precache_all_files(self):
"""预缓存所有文件"""
nucleon_path = pathlib.Path("./nucleon")
@@ -109,10 +131,11 @@ class PrecachingScreen(Screen):
# 计算总项目数
self.total = 0
nu = list()
for file in nucleon_files:
try:
nu = pt.NucleonUnion(file)
self.total += len(nu.nucleons)
nu += pt.load_nucleon(file)
self.total = len(nu)
except:
continue
@@ -121,7 +144,7 @@ class PrecachingScreen(Screen):
for file in nucleon_files:
try:
nu = pt.NucleonUnion(file)
nu += pt.load_nucleon(file)
if not self.precache_file(nu):
break # 用户取消
except Exception as e:

View File

@@ -0,0 +1,83 @@
#!/usr/bin/env python3
from textual.app import ComposeResult
from textual.widgets import (
Header,
Footer,
Label,
Static,
Button,
Markdown,
)
from textual.containers import Container
from textual.screen import Screen
import heurams.kernel.particles as pt
import heurams.services.hasher as hasher
from heurams.context import *
class PreparationScreen(Screen):
BINDINGS = [
("q", "go_back", "返回"),
("escape", "quit_app", "退出"),
("p", "precache", "预缓存音频")
]
def __init__(
self, nucleon_file: pathlib.Path, electron_file: pathlib.Path
) -> None:
super().__init__(name=None, id=None, classes=None)
self.nucleon_file = nucleon_file
self.electron_file = electron_file
self.nucleon_union = pt.load_nucleon(self.nucleon_file)
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Container(id="vice_container"):
yield Label(f"准备就绪: [b]{self.nucleon_file.stem}[/b]\n")
yield Label(f"内容源文件对象: ./nucleon/[b]{self.nucleon_file.name}[/b].toml")
yield Label(f"元数据文件对象: ./electron/[b]{self.electron_file.name}[/b].toml")
yield Label(f"\n单元数量:{len(self.nucleon_union)}\n")
yield Button(
"开始记忆",
id="start_memorizing_button",
variant="primary",
classes="start-button",
)
yield Button(
"预缓存音频",
id="precache_button",
variant="success",
classes="precache-button",
)
yield Static(f"\n单元预览:\n")
yield Markdown(self._get_full_content().replace("/", ""), classes="full")
yield Footer()
def _get_full_content(self):
content = ""
for i in self.nucleon_union:
content += " - " + i["content"] + " \n"
return content
def action_go_back(self):
self.app.pop_screen()
def action_precache(self):
from ..screens.precache import PrecachingScreen
precache_screen = PrecachingScreen(self.nucleon_file)
self.app.push_screen(precache_screen)
def action_quit_app(self):
self.app.exit()
def on_button_pressed(self, event) -> None:
if event.button.id == "start_memorizing_button":
from .memorizor import MemScreen
newscr = MemScreen(
self.nucleon_file, self.electron_file, 6
)
self.app.push_screen(newscr)
elif event.button.id == "precache_button":
self.action_precache()