fix(interface): 完成主要屏幕重构

This commit is contained in:
2026-01-04 03:46:45 +08:00
parent a689604021
commit 5d883b015e
7 changed files with 98 additions and 301 deletions

View File

@@ -17,9 +17,9 @@ def environment_check():
from pathlib import Path
logger.debug("检查环境路径")
for i in config_var.get()["paths"].values():
i = Path(i)
subdir = ['cache/voice', 'repo', 'global', 'config']
for i in subdir:
i = Path(config_var.get()['paths']['data']) / i
if not i.exists():
logger.info("创建目录: %s", i)
print(f"创建 {i}")

View File

@@ -42,10 +42,9 @@ class DashboardScreen(Screen):
yield Header(show_clock=True)
yield ScrollableContainer(
Label('欢迎使用 "潜进" 启发式辅助记忆调度器', classes="title-label"),
Label(f"当前 UNIX 日时间戳: {timer.get_daystamp()}"),
Label(f'时区修正: UTC+{config_var.get()["timezone_offset"] / 3600}'),
Label(f"使用算法: {config_var.get()['algorithm']['default']}"),
Label("选择待学习或待修改的仓库:", classes="title-label"),
Label(f"当前 UNIX 日时间戳: {timer.get_daystamp()} (UTC+{config_var.get()["timezone_offset"] / 3600})"),
Label(f"全局算法设置: {config_var.get()['algorithm']['default']}"),
Label("选择待学习或待修改的项目:", classes="title-label"),
ListView(id="repo-list", classes="repo-list-view"),
Label(f'"潜进" 启发式辅助记忆调度器 | 版本 {version.ver} '),
)

View File

@@ -28,7 +28,7 @@ logger = get_logger(__name__)
class MemScreen(Screen):
BINDINGS = [
("q", "pop_screen", "返回"),
#("p", "prev", "复习上一个"),
("p", "prev", "查看上一个"),
("d", "toggle_dark", ""),
("v", "play_voice", "朗读"),
("0,1,2,3", "app.push_screen('about')", ""),
@@ -115,7 +115,7 @@ class MemScreen(Screen):
from heurams.services.audio_service import play_by_path
from heurams.services.hasher import get_md5
path = Path(config_var.get()["paths"]["cache_dir"])
path = Path(config_var.get()["paths"]['data']) / 'cache' / 'voice'
path = (
path
/ f"{get_md5(self.atom.registry['nucleon']["tts_text"])}.wav"
@@ -127,4 +127,29 @@ class MemScreen(Screen):
convertor(
self.atom.registry["nucleon"]["tts_text"], path
)
play_by_path(path)
play_by_path(path)
def watch_rating(self, old_rating, new_rating) -> None:
self.update_state() # 刷新状态
if self.procession == None: # 已经完成记忆
return
if new_rating == -1: # 安全值
return
forwards = 1 if new_rating >= 4 else 0 # 准许前进
self.rating = -1
logger.debug(f"试图前进: {"允许" if forwards else "禁止"}")
if forwards:
ret = self.procession.forward(1)
if ret == 0: # 若结束了此次队列
self.update_state()
if self.procession == 0: # 若所有队列都结束了
logger.debug(f"记忆进程结束")
self.mount_finished_widget()
return
else:
logger.debug(f"建立新队列 {self.procession.phase}")
self.update_state()
self.mount_puzzle()
else: # 若不通过
self.procession.append()
self.update_display()

View File

@@ -1,167 +0,0 @@
"""队列式记忆工作界面"""
from enum import Enum, auto
from textual.app import ComposeResult
from textual.containers import Center, ScrollableContainer
from textual.reactive import reactive
from textual.screen import Screen
from textual.widgets import Button, Footer, Header, Label, Static
import heurams.kernel.evaluators as pz
import heurams.kernel.particles as pt
from heurams.context import config_var
from heurams.kernel.reactor import *
from heurams.services.logger import get_logger
from .. import shim
class AtomState(Enum):
FAILED = auto()
NORMAL = auto()
logger = get_logger(__name__)
class MemScreen(Screen):
BINDINGS = [
("q", "pop_screen", "返回"),
#("p", "prev", "复习上一个"),
("d", "toggle_dark", ""),
("v", "play_voice", "朗读"),
("0,1,2,3", "app.push_screen('about')", ""),
]
if config_var.get()["quick_pass"]:
BINDINGS.append(("k", "quick_pass", "跳过"))
rating = reactive(-1)
def __init__(
self,
atoms: list,
name: str | None = None,
id: str | None = None,
classes: str | None = None,
) -> None:
super().__init__(name, id, classes)
self.atoms = atoms
self.phaser = Phaser(atoms)
self.procession: Procession = self.phaser.current_procession() # type: ignore
self.atom: pt.Atom = self.procession.current_atom # type: ignore
def on_mount(self):
self.load_puzzle()
pass
def puzzle_widget(self):
try:
logger.debug(self.phaser.state)
logger.debug(self.procession.cursor)
logger.debug(self.atom)
self.fission = Fission(self.atom, self.phaser.state)
puzzle_debug = self.fission.get_puzzles()
# logger.debug(puzzle_debug)
return shim.puzzle2widget[puzzle_debug["puzzle"]](
atom=self.atom, alia=puzzle_debug["alia"]
)
except (KeyError, StopIteration, AttributeError) as e:
logger.debug(f"调度展开出错: {e}")
return Static("无法生成谜题")
# logger.debug(shim.puzzle2widget[puzzle_debug["puzzle"]])
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with ScrollableContainer():
yield Label(self._get_progress_text(), id="progress")
# self.mount(self.current_widget()) # type: ignore
yield ScrollableContainer(id="puzzle-container")
# yield Button("重新学习此单元", id="re-recognize", variant="warning")
yield Footer()
def _get_progress_text(self):
return f"当前进度: {self.procession.process() + 1}/{self.procession.total_length()}"
def update_display(self):
progress_widget = self.query_one("#progress")
progress_widget.update(self._get_progress_text()) # type: ignore
def load_puzzle(self):
self.atom: pt.Atom = self.procession.current_atom # type: ignore
container = self.query_one("#puzzle-container")
for i in container.children:
i.remove()
container.mount(self.puzzle_widget())
def load_finished_widget(self):
container = self.query_one("#puzzle-container")
for i in container.children:
i.remove()
from heurams.interface.widgets.finished import Finished
container.mount(Finished())
def on_button_pressed(self, event):
event.stop()
def watch_rating(self, old_rating, new_rating) -> None:
if self.procession == 0:
return
if new_rating == -1:
return
forwards = 1 if new_rating >= 4 else 0
self.rating = -1
logger.debug(f"试图前进: {"允许" if forwards else "禁止"}")
if forwards:
ret = self.procession.forward(1)
if ret == 0: # 若结束了此次队列
self.procession = self.phaser.current_procession() # type: ignore
if self.procession == 0: # 若所有队列都结束了
logger.debug(f"记忆进程结束")
self.load_finished_widget()
return
else:
logger.debug(f"建立新队列 {self.procession.phase}")
self.load_puzzle()
else: # 若不通过
self.procession.append()
self.update_display()
def action_quick_pass(self):
self.rating = 5
self.atom.minimize(5)
self.atom.registry["electron"].activate()
self.atom.lock(1)
def action_play_voice(self):
self.run_worker(self.play_voice, exclusive=True, thread=True)
def play_voice(self):
"""朗读当前内容"""
from pathlib import Path
from heurams.services.audio_service import play_by_path
from heurams.services.hasher import get_md5
path = Path(config_var.get()["paths"]["cache_dir"])
path = (
path
/ f"{get_md5(self.atom.registry['nucleon'].metadata["formation"]["tts_text"])}.wav"
)
if path.exists():
play_by_path(path)
else:
from heurams.services.tts_service import convertor
convertor(
self.atom.registry["nucleon"].metadata["formation"]["tts_text"], path
)
play_by_path(path)
def action_toggle_dark(self):
self.app.action_toggle_dark()
def action_pop_screen(self):
self.app.pop_screen()

View File

@@ -1,119 +0,0 @@
import re
from typing import Dict, List, TypedDict
from textual.containers import Center
from textual.message import Message
from textual.reactive import reactive
from textual.widget import Widget
from textual.widgets import Button, Label, Markdown, Static
import heurams.kernel.particles as pt
from heurams.services.logger import get_logger
from .base_puzzle_widget import BasePuzzleWidget
logger = get_logger(__name__)
class RecognitionConfig(TypedDict):
__origin__: str
__hint__: str
primary: str
secondary: List[str]
top_dim: List[str]
class Recognition(BasePuzzleWidget):
def __init__(
self,
*children: Widget,
atom: pt.Atom,
alia: str = "",
name: str | None = None,
id: str | None = None,
classes: str | None = None,
disabled: bool = False,
markup: bool = True,
) -> None:
super().__init__(
*children,
atom=atom,
name=name,
id=id,
classes=classes,
disabled=disabled,
markup=markup,
)
if alia == "":
alia = "Recognition"
self.alia = alia
def compose(self):
from heurams.context import config_var
autovoice = config_var.get()["interface"]["memorizor"]["autovoice"]
if autovoice:
self.screen.action_play_voice() # type: ignore
cfg: RecognitionConfig = self.atom.registry["orbital"]["puzzles"][self.alia]
delim = self.atom.registry["nucleon"]["delimiter"]
replace_dict = {
", ": ",",
". ": ".",
"; ": ";",
": ": ":",
f"{delim},": ",",
f".{delim}": ".",
f"{delim};": ";",
f";{delim}": ";",
f":{delim}": ":",
}
nucleon = self.atom.registry["nucleon"]
metadata = self.atom.registry["nucleon"].metadata
primary = cfg["primary"]
with Center():
for i in cfg["top_dim"]:
yield Static(f"[dim]{i}[/]")
yield Label("")
for old, new in replace_dict.items():
primary = primary.replace(old, new)
primary_splited = re.split(r"(?<=[,;:|])", cfg["primary"])
for item in primary_splited:
with Center():
yield Label(
f"[b][b]{item.replace(delim, ' ')}[/][/]",
id="sentence" + str(hash(item)),
)
for item in cfg["secondary"]:
if isinstance(item, list):
for j in item:
yield Markdown(f"### {metadata['annotation'][item]}: {j}")
continue
if isinstance(item, Dict):
total = ""
for j, k in item.items(): # type: ignore
total += f"> **{j}**: {k} \n"
yield Markdown(total)
if isinstance(item, str):
yield Markdown(item)
with Center():
yield Button("我已知晓", id="ok")
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "ok":
self.screen.rating = 5 # type: ignore
self.handler(5)
def handler(self, rating):
if not self.atom.registry["runtime"]["locked"]:
if not self.atom.registry["electron"].is_activated():
self.atom.registry["electron"].activate()
logger.debug(f"激活原子 {self.atom}")
self.atom.lock(1)
self.atom.minimize(5)
else:
pass

View File

@@ -49,10 +49,71 @@ class Recognition(BasePuzzleWidget):
self.alia = alia
def compose(self):
yield Button("我已知晓", id="ok")
from heurams.context import config_var
autovoice = config_var.get()["interface"]["memorizor"]["autovoice"]
if autovoice:
self.screen.action_play_voice() # type: ignore
cfg: RecognitionConfig = self.atom.registry["nucleon"]["puzzles"][self.alia]
delim = self.atom.registry["nucleon"]["delimiter"]
replace_dict = {
", ": ",",
". ": ".",
"; ": ";",
": ": ":",
f"{delim},": ",",
f".{delim}": ".",
f"{delim};": ";",
f";{delim}": ";",
f":{delim}": ":",
}
nucleon = self.atom.registry["nucleon"]
metadata = self.atom.registry["nucleon"]
primary = cfg["primary"]
with Center():
for i in cfg["top_dim"]:
yield Static(f"[dim]{i}[/]")
yield Label("")
for old, new in replace_dict.items():
primary = primary.replace(old, new)
primary_splited = re.split(r"(?<=[,;:|])", cfg["primary"])
for item in primary_splited:
with Center():
yield Label(
f"[b][b]{item.replace(delim, ' ')}[/][/]",
id="sentence" + str(hash(item)),
)
for item in cfg["secondary"]:
if isinstance(item, list):
for j in item:
yield Markdown(f"### {metadata['annotation'][item]}: {j}")
continue
if isinstance(item, Dict):
total = ""
for j, k in item.items(): # type: ignore
total += f"> **{j}**: {k} \n"
yield Markdown(total)
if isinstance(item, str):
yield Markdown(item)
with Center():
yield Button("我已知晓", id="ok")
def on_button_pressed(self, event: Button.Pressed) -> None:
pass
if event.button.id == "ok":
self.screen.rating = 5 # type: ignore
self.handler(5)
def handler(self, rating):
pass
if not self.atom.registry["runtime"]["locked"]:
if not self.atom.registry["electron"].is_activated():
self.atom.registry["electron"].activate()
logger.debug(f"激活原子 {self.atom}")
self.atom.lock(1)
self.atom.minimize(5)
else:
pass

View File

@@ -30,6 +30,4 @@ class Evalizer:
def eval_with_env(self, s: str):
ret = eval(s, globals(), self.env)
if not isinstance(ret, str):
ret = str(ret)
return ret