feat(reactor): 状态机进一步改进

This commit is contained in:
2026-01-04 00:28:44 +08:00
parent 94aaef386b
commit 55c656e8f9
15 changed files with 83 additions and 245 deletions

View File

@@ -12,6 +12,7 @@ import heurams.kernel.particles as pt
import heurams.services.hasher as hasher
from heurams.context import *
cache_dir = pathlib.Path(config_var.get()["paths"]["data"]) / "cache" / 'voice'
class PrecachingScreen(Screen):
"""预缓存音频文件屏幕
@@ -88,7 +89,6 @@ class PrecachingScreen(Screen):
"""预缓存单段文本的音频"""
from heurams.context import config_var, rootdir, workdir
cache_dir = pathlib.Path(config_var.get()["paths"]["data"]) / "cache"
cache_dir.mkdir(parents=True, exist_ok=True)
cache_file = cache_dir / f"{hasher.get_md5(text)}.wav"
if not cache_file.exists():
@@ -205,7 +205,7 @@ class PrecachingScreen(Screen):
from heurams.context import config_var, rootdir, workdir
shutil.rmtree(
f"{config_var.get()["paths"]["data"]}/'cache'", ignore_errors=True
cache_dir, ignore_errors=True
)
self.update_status("已清空", "音频缓存已清空", 0)
except Exception as e:

View File

@@ -1,4 +1,4 @@
from .atom import Atom
from .electron import Electron
from .nucleon import Nucleon
from .orbital import Orbital
#from .orbital import Orbital

View File

@@ -126,12 +126,15 @@ class Phaser(Machine):
logger.debug("所有 Procession 已完成, 状态设置为 FINISHED")
return None
@property
def state(self):
"""获取当前状态值"""
current_state = self.get_model_state(self)
# 将字符串状态转换为PhaserState枚举
for phase in PhaserState:
if phase.value == current_state:
return phase
return PhaserState.UNSURE
def __repr__(self):
from heurams.services.textproc import truncate
from tabulate import tabulate as tabu
lst = [
{
"Type": "Phaser",
"State": self.state,
"Processions": list(map(lambda f: (f.name_), self.processions)),
"Current Procession": "None" if not self.current_procession() else self.current_procession().name_, # type: ignore
},
]
return str(tabu(lst, headers="keys")) + '\n'

View File

@@ -1,6 +1,7 @@
import heurams.kernel.particles as pt
from heurams.services.logger import get_logger
from transitions import Machine
from tabulate import tabulate as tabu
from .states import PhaserState, ProcessionState
@@ -10,19 +11,19 @@ logger = get_logger(__name__)
class Procession(Machine):
"""队列: 标识单次记忆流程"""
def __init__(self, atoms: list, phase_state: PhaserState, name: str = ""):
def __init__(self, atoms: list, phase_state: PhaserState, name_: str = ""):
logger.debug(
"Procession.__init__: 原子数量=%d, phase=%s, name='%s'",
len(atoms),
phase_state.value,
name,
name_,
)
self.current_atom: pt.Atom | None
self.atoms = atoms
self.queue = atoms.copy()
self.current_atom = atoms[0] if atoms else None
self.cursor = 0
self.name = name
self.name_ = name_
self.phase = phase_state
states = [
@@ -61,9 +62,10 @@ class Procession(Machine):
logger.debug("Procession 进入 FINISHED 状态")
def forward(self, step=1):
"""将记忆原子指针向前移动并依情况更新原子(返回 1)或完成队列(返回 0)
"""
logger.debug("Procession.forward: step=%d, 当前 cursor=%d", step, self.cursor)
self.cursor += step
if self.cursor >= len(self.queue):
if self.state != ProcessionState.FINISHED.value:
self.finish() # 触发状态转换
@@ -78,10 +80,11 @@ class Procession(Machine):
self.current_atom.ident if self.current_atom else "None",
)
return 1 # 成功
return 0
def append(self, atom=None):
"""追加(回忆失败的)原子(默认为当前原子)到队列末端
"""
if atom is None:
atom = self.current_atom
logger.debug("Procession.append: atom=%s", atom.ident if atom else "None")
@@ -113,15 +116,16 @@ class Procession(Machine):
logger.debug("Procession.is_empty: %s", empty)
return empty
@property
def state(self):
"""获取当前状态值"""
return self.get_model_state(self)
@state.setter
def state(self, value):
"""设置状态值"""
if value == ProcessionState.RUNNING.value:
self.restart()
elif value == ProcessionState.FINISHED.value:
self.finish()
def __repr__(self):
from heurams.services.textproc import truncate
dic = [
{
"Type": "Procession",
"Name": self.name_,
"State": self.state,
"Progress": f"{self.cursor + 1} / {len(self.queue)}",
"Queue": list(map(lambda f: truncate(f.ident), self.queue)),
"Current Atom": self.current_atom.ident, # type: ignore
}
]
return str(tabu(dic, headers="keys")) + '\n'

View File

@@ -1,12 +0,0 @@
from heurams.services.logger import get_logger
from .fission import Fission
from .phaser import Phaser
from .procession import Procession
from .states import PhaserState, ProcessionState
logger = get_logger(__name__)
__all__ = ["PhaserState", "ProcessionState", "Procession", "Fission", "Phaser"]
logger.debug("反应堆模块已加载")

View File

@@ -1,45 +0,0 @@
import random
import heurams.kernel.evaluators as puz
import heurams.kernel.particles as pt
from heurams.services.logger import get_logger
from .states import PhaserState
class Fission:
"""裂变器: 单原子调度展开器"""
def __init__(self, atom: pt.Atom, phase=PhaserState.RECOGNITION):
self.logger = get_logger(__name__)
self.atom = atom
# print(f"{phase.value}")
self.orbital_schedule = atom.registry["orbital"]["schedule"][phase.value] # type: ignore
self.orbital_puzzles = atom.registry["orbital"]["puzzles"]
# print(self.orbital_schedule)
self.puzzles = list()
for item, possibility in self.orbital_schedule: # type: ignore
print(f"ad:{item}")
self.logger.debug(f"开始处理 orbital 项: {item}")
if not isinstance(possibility, float):
possibility = float(possibility)
while possibility > 1:
self.puzzles.append(
{
"puzzle": puz.puzzles[self.orbital_puzzles[item]["__origin__"]],
"alia": item,
}
)
possibility -= 1
if random.random() <= possibility:
self.puzzles.append(
{
"puzzle": puz.puzzles[self.orbital_puzzles[item]["__origin__"]],
"alia": item,
}
)
print(f"ok:{item}")
self.logger.debug(f"orbital 项处理完成: {item}")
def generate(self):
yield from self.puzzles

View File

@@ -1,51 +0,0 @@
# 移相器类定义
import heurams.kernel.particles as pt
from heurams.services.logger import get_logger
from .procession import Procession
from .states import PhaserState, ProcessionState
logger = get_logger(__name__)
class Phaser:
"""全局调度阶段管理器"""
def __init__(self, atoms: list[pt.Atom]) -> None:
logger.debug("Phaser.__init__: 原子数量=%d", len(atoms))
new_atoms = list()
old_atoms = list()
self.state = PhaserState.UNSURE
for i in atoms:
if not i.registry["electron"].is_activated():
new_atoms.append(i)
else:
old_atoms.append(i)
logger.debug("新原子数量=%d, 旧原子数量=%d", len(new_atoms), len(old_atoms))
self.processions = list()
if len(old_atoms):
self.processions.append(
Procession(old_atoms, PhaserState.QUICK_REVIEW, "初始复习")
)
logger.debug("创建初始复习 Procession")
if len(new_atoms):
self.processions.append(
Procession(new_atoms, PhaserState.RECOGNITION, "新记忆")
)
logger.debug("创建新记忆 Procession")
self.processions.append(Procession(atoms, PhaserState.FINAL_REVIEW, "总体复习"))
logger.debug("创建总体复习 Procession")
logger.debug("Phaser 初始化完成, processions 数量=%d", len(self.processions))
def current_procession(self):
logger.debug("Phaser.current_procession 被调用")
for i in self.processions:
i: Procession
if not i.state == ProcessionState.FINISHED:
self.state = i.phase
logger.debug("找到未完成的 Procession: phase=%s", i.phase)
return i
self.state = PhaserState.FINISHED
logger.debug("所有 Procession 已完成, 状态设置为 FINISHED")
return 0

View File

@@ -1,74 +0,0 @@
import heurams.kernel.particles as pt
from heurams.services.logger import get_logger
from .states import PhaserState, ProcessionState
logger = get_logger(__name__)
class Procession:
"""队列: 标识单次记忆流程"""
def __init__(self, atoms: list, phase: PhaserState, name: str = ""):
logger.debug(
"Procession.__init__: 原子数量=%d, phase=%s, name='%s'",
len(atoms),
phase.value,
name,
)
self.atoms = atoms
self.queue = atoms.copy()
self.current_atom = atoms[0]
self.cursor = 0
self.name = name
self.phase = phase
self.state: ProcessionState = ProcessionState.RUNNING
logger.debug("Procession 初始化完成, 队列长度=%d", len(self.queue))
def forward(self, step=1):
logger.debug("Procession.forward: step=%d, 当前 cursor=%d", step, self.cursor)
self.cursor += step
if self.cursor == len(self.queue):
self.state = ProcessionState.FINISHED
logger.debug("Procession 已完成")
else:
self.state = ProcessionState.RUNNING
try:
logger.debug("cursor 更新为: %d", self.cursor)
self.current_atom = self.queue[self.cursor]
logger.debug("当前原子更新为: %s", self.current_atom.ident)
return 1 # 成功
except IndexError as e:
logger.debug("IndexError: %s", e)
self.state = ProcessionState.FINISHED
logger.debug("Procession 因索引错误而完成")
return 0
def append(self, atom=None):
if atom == None:
atom = self.current_atom
logger.debug("Procession.append: atom=%s", atom.ident if atom else "None")
if self.queue[len(self.queue) - 1] != atom or len(self) <= 1:
self.queue.append(atom)
logger.debug("原子已追加到队列, 新队列长度=%d", len(self.queue))
else:
logger.debug("原子未追加(重复或队列长度<=1)")
def __len__(self):
length = len(self.queue) - self.cursor
logger.debug("Procession.__len__: 剩余长度=%d", length)
return length
def process(self):
logger.debug("Procession.process: cursor=%d", self.cursor)
return self.cursor
def total_length(self):
total = len(self.queue)
logger.debug("Procession.total_length: %d", total)
return total
def is_empty(self):
empty = len(self.queue)
logger.debug("Procession.is_empty: %d", empty)
return empty

View File

@@ -1,21 +0,0 @@
from enum import Enum, auto
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class PhaserState(Enum):
UNSURE = "unsure"
QUICK_REVIEW = "quick_review"
RECOGNITION = "recognition"
FINAL_REVIEW = "final_review"
FINISHED = "finished"
class ProcessionState(Enum):
RUNNING = auto()
FINISHED = auto()
logger.debug("状态枚举定义已加载")

View File

@@ -0,0 +1,4 @@
def truncate(text):
if len(text) <= 3:
return text
return text[:3] + ">"

View File

@@ -22,7 +22,7 @@ class Lict(UserList): # TODO: 优化同步(惰性同步), 当前性能为 O(n)
self,
initlist: list | None = None,
initdict: dict | None = None,
forced_order=True,
forced_order=False,
):
self.dicted_data = {}
if initdict != None: