fix: 部分修复重构数据格式差异

This commit is contained in:
2026-01-04 02:11:07 +08:00
parent 55c656e8f9
commit a689604021
14 changed files with 371 additions and 192 deletions

View File

@@ -28,7 +28,7 @@ logger = get_logger(__name__)
class MemScreen(Screen):
BINDINGS = [
("q", "pop_screen", "返回"),
# ("p", "prev", "复习上一个"),
#("p", "prev", "复习上一个"),
("d", "toggle_dark", ""),
("v", "play_voice", "朗读"),
("0,1,2,3", "app.push_screen('about')", ""),
@@ -40,65 +40,61 @@ class MemScreen(Screen):
def __init__(
self,
atoms: list,
name: str | None = None,
id: str | None = None,
classes: str | None = None,
phaser: Phaser,
name = None,
id = None,
classes = None,
) -> None:
super().__init__(name, id, classes)
self.atoms = atoms
self.phaser = Phaser(atoms)
# logger.debug(self.phaser.state)
self.procession: Procession = self.phaser.current_procession() # type: ignore
self.atom: pt.Atom = self.procession.current_atom
# logger.debug(self.phaser.state)
# self.procession.forward(1)
def on_mount(self):
self.load_puzzle()
pass
def puzzle_widget(self):
try:
logger.debug(self.phaser.state)
logger.debug(self.procession.cursor)
logger.debug(self.atom)
self.fission = Fission(self.atom, self.phaser.state)
puzzle_debug = next(self.fission.generate())
# logger.debug(puzzle_debug)
return shim.puzzle2widget[puzzle_debug["puzzle"]](
atom=self.atom, alia=puzzle_debug["alia"]
)
except (KeyError, StopIteration, AttributeError) as e:
logger.debug(f"调度展开出错: {e}")
return Static("无法生成谜题")
# logger.debug(shim.puzzle2widget[puzzle_debug["puzzle"]])
self.phaser = phaser
self.update_state()
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with ScrollableContainer():
yield Label(self._get_progress_text(), id="progress")
# self.mount(self.current_widget()) # type: ignore
yield ScrollableContainer(id="puzzle-container")
# yield Button("重新学习此单元", id="re-recognize", variant="warning")
yield Footer()
def update_state(self):
"""更新状态机"""
self.procession: Procession = self.phaser.current_procession() # type: ignore
self.atom: pt.Atom = self.procession.current_atom # type: ignore
def on_mount(self):
self.mount_puzzle()
self.update_display()
def puzzle_widget(self):
try:
self.fission = self.procession.get_fission()
puzzle = self.fission.get_current_puzzle()
# logger.debug(puzzle_debug)
return shim.puzzle2widget[puzzle["puzzle"]]( # type: ignore
atom=self.atom, alia=puzzle["alia"] # type: ignore
)
except (KeyError, StopIteration, AttributeError) as e:
logger.debug(f"调度展开出错: {e}")
return Static(f"无法生成谜题 {e}")
# logger.debug(shim.puzzle2widget[puzzle_debug["puzzle"]])
def _get_progress_text(self):
return f"当前进度: {self.procession.process() + 1}/{self.procession.total_length()}"
def update_display(self):
"""更新进度显示"""
progress_widget = self.query_one("#progress")
progress_widget.update(self._get_progress_text()) # type: ignore
def load_puzzle(self):
self.atom: pt.Atom = self.procession.current_atom
def mount_puzzle(self):
"""挂载当前谜题组件"""
container = self.query_one("#puzzle-container")
for i in container.children:
i.remove()
container.mount(self.puzzle_widget())
def load_finished_widget(self):
def mount_finished_widget(self):
"""挂载已完成组件"""
container = self.query_one("#puzzle-container")
for i in container.children:
i.remove()
@@ -109,39 +105,6 @@ class MemScreen(Screen):
def on_button_pressed(self, event):
event.stop()
def watch_rating(self, old_rating, new_rating) -> None:
if self.procession == 0:
return
if new_rating == -1:
return
forwards = 1 if new_rating >= 4 else 0
self.rating = -1
logger.debug(f"试图前进: {"允许" if forwards else "禁止"}")
if forwards:
ret = self.procession.forward(1)
if ret == 0: # 若结束了此次队列
self.procession = self.phaser.current_procession() # type: ignore
if self.procession == 0: # 若所有队列都结束了
logger.debug(f"记忆进程结束")
for i in self.atoms:
i: pt.Atom
i.revise()
i.persist("electron")
self.load_finished_widget()
return
else:
logger.debug(f"建立新队列 {self.procession.phase}")
self.load_puzzle()
else: # 若不通过
self.procession.append()
self.update_display()
def action_quick_pass(self):
self.rating = 5
self.atom.minimize(5)
self.atom.registry["electron"].activate()
self.atom.lock(1)
def action_play_voice(self):
self.run_worker(self.play_voice, exclusive=True, thread=True)
@@ -155,20 +118,13 @@ class MemScreen(Screen):
path = Path(config_var.get()["paths"]["cache_dir"])
path = (
path
/ f"{get_md5(self.atom.registry['nucleon'].metadata["formation"]["tts_text"])}.wav"
/ f"{get_md5(self.atom.registry['nucleon']["tts_text"])}.wav"
)
if path.exists():
play_by_path(path)
else:
from heurams.services.tts_service import convertor
convertor(
self.atom.registry["nucleon"].metadata["formation"]["tts_text"], path
self.atom.registry["nucleon"]["tts_text"], path
)
play_by_path(path)
def action_toggle_dark(self):
self.app.action_toggle_dark()
def action_pop_screen(self):
self.app.pop_screen()
play_by_path(path)

View File

@@ -0,0 +1,167 @@
"""队列式记忆工作界面"""
from enum import Enum, auto
from textual.app import ComposeResult
from textual.containers import Center, ScrollableContainer
from textual.reactive import reactive
from textual.screen import Screen
from textual.widgets import Button, Footer, Header, Label, Static
import heurams.kernel.evaluators as pz
import heurams.kernel.particles as pt
from heurams.context import config_var
from heurams.kernel.reactor import *
from heurams.services.logger import get_logger
from .. import shim
class AtomState(Enum):
FAILED = auto()
NORMAL = auto()
logger = get_logger(__name__)
class MemScreen(Screen):
BINDINGS = [
("q", "pop_screen", "返回"),
#("p", "prev", "复习上一个"),
("d", "toggle_dark", ""),
("v", "play_voice", "朗读"),
("0,1,2,3", "app.push_screen('about')", ""),
]
if config_var.get()["quick_pass"]:
BINDINGS.append(("k", "quick_pass", "跳过"))
rating = reactive(-1)
def __init__(
self,
atoms: list,
name: str | None = None,
id: str | None = None,
classes: str | None = None,
) -> None:
super().__init__(name, id, classes)
self.atoms = atoms
self.phaser = Phaser(atoms)
self.procession: Procession = self.phaser.current_procession() # type: ignore
self.atom: pt.Atom = self.procession.current_atom # type: ignore
def on_mount(self):
self.load_puzzle()
pass
def puzzle_widget(self):
try:
logger.debug(self.phaser.state)
logger.debug(self.procession.cursor)
logger.debug(self.atom)
self.fission = Fission(self.atom, self.phaser.state)
puzzle_debug = self.fission.get_puzzles()
# logger.debug(puzzle_debug)
return shim.puzzle2widget[puzzle_debug["puzzle"]](
atom=self.atom, alia=puzzle_debug["alia"]
)
except (KeyError, StopIteration, AttributeError) as e:
logger.debug(f"调度展开出错: {e}")
return Static("无法生成谜题")
# logger.debug(shim.puzzle2widget[puzzle_debug["puzzle"]])
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with ScrollableContainer():
yield Label(self._get_progress_text(), id="progress")
# self.mount(self.current_widget()) # type: ignore
yield ScrollableContainer(id="puzzle-container")
# yield Button("重新学习此单元", id="re-recognize", variant="warning")
yield Footer()
def _get_progress_text(self):
return f"当前进度: {self.procession.process() + 1}/{self.procession.total_length()}"
def update_display(self):
progress_widget = self.query_one("#progress")
progress_widget.update(self._get_progress_text()) # type: ignore
def load_puzzle(self):
self.atom: pt.Atom = self.procession.current_atom # type: ignore
container = self.query_one("#puzzle-container")
for i in container.children:
i.remove()
container.mount(self.puzzle_widget())
def load_finished_widget(self):
container = self.query_one("#puzzle-container")
for i in container.children:
i.remove()
from heurams.interface.widgets.finished import Finished
container.mount(Finished())
def on_button_pressed(self, event):
event.stop()
def watch_rating(self, old_rating, new_rating) -> None:
if self.procession == 0:
return
if new_rating == -1:
return
forwards = 1 if new_rating >= 4 else 0
self.rating = -1
logger.debug(f"试图前进: {"允许" if forwards else "禁止"}")
if forwards:
ret = self.procession.forward(1)
if ret == 0: # 若结束了此次队列
self.procession = self.phaser.current_procession() # type: ignore
if self.procession == 0: # 若所有队列都结束了
logger.debug(f"记忆进程结束")
self.load_finished_widget()
return
else:
logger.debug(f"建立新队列 {self.procession.phase}")
self.load_puzzle()
else: # 若不通过
self.procession.append()
self.update_display()
def action_quick_pass(self):
self.rating = 5
self.atom.minimize(5)
self.atom.registry["electron"].activate()
self.atom.lock(1)
def action_play_voice(self):
self.run_worker(self.play_voice, exclusive=True, thread=True)
def play_voice(self):
"""朗读当前内容"""
from pathlib import Path
from heurams.services.audio_service import play_by_path
from heurams.services.hasher import get_md5
path = Path(config_var.get()["paths"]["cache_dir"])
path = (
path
/ f"{get_md5(self.atom.registry['nucleon'].metadata["formation"]["tts_text"])}.wav"
)
if path.exists():
play_by_path(path)
else:
from heurams.services.tts_service import convertor
convertor(
self.atom.registry["nucleon"].metadata["formation"]["tts_text"], path
)
play_by_path(path)
def action_toggle_dark(self):
self.app.action_toggle_dark()
def action_pop_screen(self):
self.app.pop_screen()

View File

@@ -127,8 +127,9 @@ class PreparationScreen(Screen):
if left_new >= 0:
atoms_to_provide.append(i)
from .memoqueue import MemScreen
memscreen = MemScreen(atoms_to_provide)
import heurams.kernel.reactor as rt
pheser = rt.Phaser(atoms_to_provide)
memscreen = MemScreen(pheser)
self.app.push_screen(memscreen)
elif event.button.id == "precache_button":

View File

@@ -1,37 +1,10 @@
"""Kernel 操作辅助函数库"""
import random
from typing import TypedDict
import heurams.interface.widgets as pzw
import heurams.kernel.evaluators as pz
import heurams.kernel.particles as pt
staging = {} # 细粒度缓存区, 是 ident -> quality 的封装
def report_to_staging(atom: pt.Atom, quality):
staging[atom.ident] = min(quality, staging[atom.ident])
def clear():
staging = dict()
def deploy_to_electron():
for atom_ident, quality in staging.items():
if pt.atom_registry[atom_ident].registry["electron"].is_activated:
pt.atom_registry[atom_ident].registry["electron"].revisor(quality=quality)
else:
pt.atom_registry[atom_ident].registry["electron"].revisor(
quality=quality, is_new_activation=True
)
clear()
puzzle2widget = {
pz.RecognitionPuzzle: pzw.Recognition,
pz.ClozePuzzle: pzw.ClozePuzzle,
pz.MCQPuzzle: pzw.MCQPuzzle,
pz.BasePuzzle: pzw.BasePuzzleWidget,
pz.BaseEvaluator: pzw.BasePuzzleWidget,
}

View File

@@ -0,0 +1,119 @@
import re
from typing import Dict, List, TypedDict
from textual.containers import Center
from textual.message import Message
from textual.reactive import reactive
from textual.widget import Widget
from textual.widgets import Button, Label, Markdown, Static
import heurams.kernel.particles as pt
from heurams.services.logger import get_logger
from .base_puzzle_widget import BasePuzzleWidget
logger = get_logger(__name__)
class RecognitionConfig(TypedDict):
__origin__: str
__hint__: str
primary: str
secondary: List[str]
top_dim: List[str]
class Recognition(BasePuzzleWidget):
def __init__(
self,
*children: Widget,
atom: pt.Atom,
alia: str = "",
name: str | None = None,
id: str | None = None,
classes: str | None = None,
disabled: bool = False,
markup: bool = True,
) -> None:
super().__init__(
*children,
atom=atom,
name=name,
id=id,
classes=classes,
disabled=disabled,
markup=markup,
)
if alia == "":
alia = "Recognition"
self.alia = alia
def compose(self):
from heurams.context import config_var
autovoice = config_var.get()["interface"]["memorizor"]["autovoice"]
if autovoice:
self.screen.action_play_voice() # type: ignore
cfg: RecognitionConfig = self.atom.registry["orbital"]["puzzles"][self.alia]
delim = self.atom.registry["nucleon"]["delimiter"]
replace_dict = {
", ": ",",
". ": ".",
"; ": ";",
": ": ":",
f"{delim},": ",",
f".{delim}": ".",
f"{delim};": ";",
f";{delim}": ";",
f":{delim}": ":",
}
nucleon = self.atom.registry["nucleon"]
metadata = self.atom.registry["nucleon"].metadata
primary = cfg["primary"]
with Center():
for i in cfg["top_dim"]:
yield Static(f"[dim]{i}[/]")
yield Label("")
for old, new in replace_dict.items():
primary = primary.replace(old, new)
primary_splited = re.split(r"(?<=[,;:|])", cfg["primary"])
for item in primary_splited:
with Center():
yield Label(
f"[b][b]{item.replace(delim, ' ')}[/][/]",
id="sentence" + str(hash(item)),
)
for item in cfg["secondary"]:
if isinstance(item, list):
for j in item:
yield Markdown(f"### {metadata['annotation'][item]}: {j}")
continue
if isinstance(item, Dict):
total = ""
for j, k in item.items(): # type: ignore
total += f"> **{j}**: {k} \n"
yield Markdown(total)
if isinstance(item, str):
yield Markdown(item)
with Center():
yield Button("我已知晓", id="ok")
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "ok":
self.screen.rating = 5 # type: ignore
self.handler(5)
def handler(self, rating):
if not self.atom.registry["runtime"]["locked"]:
if not self.atom.registry["electron"].is_activated():
self.atom.registry["electron"].activate()
logger.debug(f"激活原子 {self.atom}")
self.atom.lock(1)
self.atom.minimize(5)
else:
pass

View File

@@ -49,71 +49,10 @@ class Recognition(BasePuzzleWidget):
self.alia = alia
def compose(self):
from heurams.context import config_var
autovoice = config_var.get()["interface"]["memorizor"]["autovoice"]
if autovoice:
self.screen.action_play_voice() # type: ignore
cfg: RecognitionConfig = self.atom.registry["orbital"]["puzzles"][self.alia]
delim = self.atom.registry["nucleon"].metadata["formation"]["delimiter"]
replace_dict = {
", ": ",",
". ": ".",
"; ": ";",
": ": ":",
f"{delim},": ",",
f".{delim}": ".",
f"{delim};": ";",
f";{delim}": ";",
f":{delim}": ":",
}
nucleon = self.atom.registry["nucleon"]
metadata = self.atom.registry["nucleon"].metadata
primary = cfg["primary"]
with Center():
for i in cfg["top_dim"]:
yield Static(f"[dim]{i}[/]")
yield Label("")
for old, new in replace_dict.items():
primary = primary.replace(old, new)
primary_splited = re.split(r"(?<=[,;:|])", cfg["primary"])
for item in primary_splited:
with Center():
yield Label(
f"[b][b]{item.replace(delim, ' ')}[/][/]",
id="sentence" + str(hash(item)),
)
for item in cfg["secondary"]:
if isinstance(item, list):
for j in item:
yield Markdown(f"### {metadata['annotation'][item]}: {j}")
continue
if isinstance(item, Dict):
total = ""
for j, k in item.items(): # type: ignore
total += f"> **{j}**: {k} \n"
yield Markdown(total)
if isinstance(item, str):
yield Markdown(item)
with Center():
yield Button("我已知晓", id="ok")
yield Button("我已知晓", id="ok")
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "ok":
self.screen.rating = 5 # type: ignore
self.handler(5)
pass
def handler(self, rating):
if not self.atom.registry["runtime"]["locked"]:
if not self.atom.registry["electron"].is_activated():
self.atom.registry["electron"].activate()
logger.debug(f"激活原子 {self.atom}")
self.atom.lock(1)
self.atom.minimize(5)
else:
pass
pass

View File

@@ -1,7 +1,9 @@
from copy import deepcopy
from logging import config
from heurams.services.logger import get_logger
from heurams.utils.evalizor import Evalizer
from heurams.context import config_var
logger = get_logger(__name__)
@@ -11,7 +13,9 @@ class Nucleon:
def __init__(self, ident, payload, common):
self.ident = ident
env = {"payload": payload}
env = {"payload": payload,
"default": config_var.get()['puzzles'],
"nucleon": (payload | common)}
self.evalizer = Evalizer(environment=env)
self.data: dict = self.evalizer(deepcopy((payload | common))) # type: ignore

View File

@@ -11,6 +11,7 @@ class Fission:
"""单原子调度展开器"""
def __init__(self, atom: pt.Atom, phase_state=PhaserState.RECOGNITION):
self.cursor = 0
self.logger = get_logger(__name__)
self.atom = atom
@@ -19,7 +20,7 @@ class Fission:
phase_state.value if isinstance(phase_state, PhaserState) else phase_state
)
self.orbital_schedule = atom.registry["phases"][phase_value] # type: ignore
self.orbital_schedule = atom.registry['orbital']["phases"][phase_value] # type: ignore
self.orbital_puzzles = atom.registry["nucleon"]["puzzles"]
self.puzzles = list()
@@ -33,6 +34,7 @@ class Fission:
{
"puzzle": puz.puzzles[self.orbital_puzzles[item]["__origin__"]],
"alia": item,
"finished": 0,
}
)
possibility -= 1
@@ -42,10 +44,27 @@ class Fission:
{
"puzzle": puz.puzzles[self.orbital_puzzles[item]["__origin__"]],
"alia": item,
"finished": 0,
}
)
self.logger.debug(f"orbital 项处理完成: {item}")
def get_puzzles_list(self):
yield from self.puzzles
def get_puzzles(self):
return self.puzzles
def get_current_puzzle(self, forward = 0):
if forward:
if len(self.puzzles) <= self.cursor + 1:
return 0
self.cursor += 1
return self.puzzles[self.cursor]
else:
return self.puzzles[self.cursor]
def check_passed(self):
for i in self.puzzles:
if i["finished"] == 0:
return 0
return 1

View File

@@ -3,6 +3,7 @@ from heurams.services.logger import get_logger
from transitions import Machine
from tabulate import tabulate as tabu
from .fission import Fission
from .states import PhaserState, ProcessionState
logger = get_logger(__name__)
@@ -116,6 +117,9 @@ class Procession(Machine):
logger.debug("Procession.is_empty: %s", empty)
return empty
def get_fission(self):
return Fission(atom=self.current_atom, phase_state=self.phase) # type: ignore
def __repr__(self):
from heurams.services.textproc import truncate
dic = [