feat: 更新状态机

This commit is contained in:
2026-01-05 05:25:14 +08:00
parent 65486794b7
commit e1c935f348
19 changed files with 314 additions and 140 deletions

View File

@@ -1 +1,123 @@
# Reactor - 记忆程状态机模块
# Reactor - 记忆程状态机模块
Reactor 是 HeurAMS 的记忆流程状态机模块, 和界面 (interface) 的实现是解耦的, 以便后期与其他框架的适配.
得益于 Pickle, 状态机模块支持快照!
## Phaser - 全局阶段控制器
在一次队列记忆流程中, Phaser 代表记忆流程本身.
### 属性
#### 状态属性
其有状态属性:
- unsure - 用于初始化
- *quick_review - 复习逾期的单元
- *recognition - 辨识新单元
- *final_review - 复习所有逾期的和新辨认的单元
- finished - 表示完成
> 逾期的: 指 SM-2 算法间隔显示应该复习的单元
带 * 的属性表示实际的记忆阶段, 由 repo 中 schedule.toml 中 schedule 列表显式声明, 运行过程中可以选择性执行, "空的" Procession 会被直接跳过.
在初始化 Procession 时, 每个 Procession 被赋予一个不重复的状态属性 作为"阶段状态"属性, 以此标识 Procession 的阶段属性, 因为每个 Procession 管理一个阶段下的复习进程.
你可以用 state 属性获取 Phaser 的当前状态.
#### Procession 属性
储存一个顺序列表, 保存所有构造的 Procession.
顺序与 repo 中 schedule.toml 中 schedule 列表中的顺序完全相同
### 初始化
Phaser 接受一个存储 Atom 对象的列表, 作为组织记忆流程的材料
在内部, 根据是否激活将其分为 new_atoms 与 old_atoms.
因此, 如果你传入的列表中有算法上"无所事事"的 Atom, 流程会对其进行"加强复习"
由此创建 Procession.
### 直接输出呈现形式
Phaser 的 __repr__ 定义了此对象"官方的显示"用作直观的调试.
其以 ascii 表格形式输出, 格式也符合 markdown 表格规范, 你可以直接复制到 markdown.
示例:
```text
| Type | State | Processions | Current Procession |
|:-------|:--------|:-----------------------|:---------------------|
| Phaser | unsure | ['新记忆', '总体复习'] | 新记忆 |
```
| Type | State | Processions | Current Procession |
|:-------|:--------|:-----------------------|:---------------------|
| Phaser | unsure | ['新记忆', '总体复习'] | 新记忆 |
### 方法
作为一个 Transition Machine 对象的继承, 其拥有 Machine 对象拥有的所有方法.
除此之外, 它也拥有一些其他方法.
#### current_procession(self)
用于查询当前的 Procession, 并且根据当前 Procession 更新自身状态.
返回一个 Procession 对象, 是当前阶段的 Procession.
内部运作是返回第一个状态不为 finished 的 Procession, 并将自身状态变更为 Procession 的"阶段状态"属性
若所有 Procession 都已完成, 将返回一个"阶段状态"为 finished 的 Procession 占位符对象(它不在 procession 属性中), 并更新自身状态为 finished.
## Procession - 阶段管理器
### 属性
#### 状态属性
其有状态属性:
- active - 标识未完成, 初始化的默认属性
- finished - 完成了
#### 其他属性
- current_atom: 当前记忆原子的引用
- atoms: 队列中所有原子列表
- cursor: 指针, 是当前原子在 atoms 列表中的索引
- phase: "阶段属性"
> 注意区分 "Phaser" 和 "Phase", 其中 "Phase" 表示 "Phaser State".
- name_: 阶段的命名
- state: 当前状态属性
### 初始化
接受一个 atoms 列表与 phase_state (PhaserState Enum 类型)对象
### 直接输出呈现形式
同 Phaser, 但显示数据有所不同
与 Phaser 不同, Procession 显示队列会对过长的 atom.ident 进行缩略(末尾 `>` 符号)
```text
| Type | Name | State | Progress | Queue | Current Atom |
|:-----------|:-------|:--------|:-----------|:-----------------------|:------------------------------|
| Procession | 新记忆 | active | 1 / 2 | ['秦孝公>', '君臣固>'] | 秦孝公据崤函之固, 拥雍州之地, |
```
| Type | Name | State | Progress | Queue | Current Atom |
|:-----------|:-------|:--------|:-----------|:-----------------------|:------------------------------|
| Procession | 新记忆 | active | 1 / 2 | ['秦孝公>', '君臣固>'] | 秦孝公据崤函之固, 拥雍州之地, |
### 方法
作为一个 Transition Machine 对象的继承, 其拥有 Machine 对象拥有的所有方法.
除此之外, 它也拥有一些其他方法.
#### forward(self, step=1)
移动 cursor 并依情况更新 current_atom 和状态属性
无论 Procession 是否处于完成状态, forward 操作都是可逆的, 你可以传入负数, 此时已完成的 Procession 会自动"重启".
#### append(self, atom=None)
追加(回忆失败的)原子(默认为当前原子, 传入 None 会自动转化为当前原子)到队列末端
如果这个原子已经处于队列末端, 不会重复追加, 除非队列只剩下这个原子还没完成(此时最多重复两个)
#### process(self)
返回 cursor 值
#### __len__(self)
返回剩余原子量(而不是原子总量)
可以使用 len 函数调用
获取原子总量请用 len(obj.atoms), 或者 total_length(self) 方法
#### total_length(self)
返回队列原子总量
#### is_empty(self)
判断是否为空队列(传入原子列表对象是空列表的队列)
#### get_fission(self)
获取当前原子的 Fission 对象, 用于单原子调度展开
## Fission - 单原子调度控制器
### 属性
#### 状态属性
- exammode: 测试模式(默认)
- retronly: 仅回顾模式
#### 其他属性
- cursor
- atom
- current_puzzle
- orbital_schedule
- orbital_puzzles
- puzzles
### 初始化
接受 atom 对象和 phase 参数
### 方法
#### get_puzzles(self)

View File

@@ -1,30 +1,30 @@
from functools import reduce
import random
import heurams.kernel.evaluators as puz
import heurams.kernel.puzzles as puz
import heurams.kernel.particles as pt
from heurams.services.logger import get_logger
from .states import PhaserState
from transitions import Machine
from .states import FissionState, PhaserState
logger = get_logger(__name__)
class Fission:
class Fission(Machine):
"""单原子调度展开器"""
def __init__(self, atom: pt.Atom, phase_state=PhaserState.RECOGNITION):
def __init__(self, atom: pt.Atom, phase=PhaserState.RECOGNITION):
self.phase = phase
self.cursor = 0
self.logger = get_logger(__name__)
self.atom = atom
# NOTE: phase 为 PhaserState 枚举实例需要获取其value
phase_value = (
phase_state.value if isinstance(phase_state, PhaserState) else phase_state
)
self.orbital_schedule = atom.registry["orbital"]["phases"][phase_value] # type: ignore
self.orbital_puzzles = atom.registry["nucleon"]["puzzles"]
self.current_puzzle: puz.BasePuzzle
# phase 为 PhaserState 枚举实例, 需要获取其value
phase_value = phase.value
orbital_schedule = atom.registry["orbital"]["phases"][phase_value] # type: ignore
orbital_puzzles = atom.registry["nucleon"]["puzzles"]
self.puzzles = list()
for item, possibility in self.orbital_schedule: # type: ignore
self.min_ratings = []
for item, possibility in orbital_schedule: # type: ignore
self.logger.debug(f"开始处理: {item}")
if not isinstance(possibility, float):
possibility = float(possibility)
@@ -32,7 +32,7 @@ class Fission:
while possibility > 1:
self.puzzles.append(
{
"puzzle": puz.puzzles[self.orbital_puzzles[item]["__origin__"]],
"puzzle": puz.puzzles[orbital_puzzles[item]["__origin__"]],
"alia": item,
}
)
@@ -41,27 +41,55 @@ class Fission:
if random.random() <= possibility:
self.puzzles.append(
{
"puzzle": puz.puzzles[self.orbital_puzzles[item]["__origin__"]],
"puzzle": puz.puzzles[orbital_puzzles[item]["__origin__"]],
"alia": item,
}
)
states = [
{"name": FissionState.EXAMMODE.value, "on_enter": "on_exammode"},
{"name": FissionState.RETRONLY.value, "on_enter": "on_retronly"},
]
self.logger.debug(f"orbital 项处理完成: {item}")
transitions = [
{
"trigger": "finish",
"source": FissionState.EXAMMODE.value,
"dest": FissionState.RETRONLY.value,
},
]
Machine.__init__(
self,
states=states,
transitions=transitions,
initial="Evaluator_0",
)
def get_puzzles(self):
if self.state == 'retronly':
return [puz.puzzles['recognition']]
return self.puzzles
def get_current_puzzle(self, forward=0):
if forward:
if len(self.puzzles) <= self.cursor + 1:
return 0
self.cursor += 1
return self.puzzles[self.cursor]
else:
return self.puzzles[self.cursor]
def get_current_puzzle(self):
if self.state == 'retronly':
return puz.puzzles['recognition']
return self.current_puzzle
def report(self, rating):
self.min_ratings[self.cursor] = min(rating, self.min_ratings[self.cursor])
def get_quality(self):
if self.is_state("exammode", self):
return reduce(lambda x,y: min(x, y), self.min_ratings)
return -1
def check_passed(self):
for i in self.puzzles:
if i["finished"] == 0:
return 0
return 1
def forward(self, step=1):
"""将谜题指针向前移动并依情况更新或完成"""
logger.debug("Procession.forward: step=%d, 当前 cursor=%d", step, self.cursor)
self.cursor += step
if self.cursor >= len(self.puzzles):
if self.state != 'retronly':
self.finish()
else:
self.current_puzzle = self.puzzles[self.cursor]

View File

@@ -1,3 +1,4 @@
from click import style
import heurams.kernel.particles as pt
from heurams.kernel.particles.placeholders import AtomPlaceholder
from heurams.services.logger import get_logger
@@ -27,7 +28,7 @@ class Phaser(Machine):
logger.debug("新原子数量=%d, 旧原子数量=%d", len(new_atoms), len(old_atoms))
self.processions = list()
# TODO: 改进为基于配置文件的可复习阶段管理
# TODO: 改进为基于配置文件的可复习阶段
if len(old_atoms):
self.processions.append(
Procession(old_atoms, PhaserState.QUICK_REVIEW, "初始复习")
@@ -111,7 +112,7 @@ class Phaser(Machine):
for i in self.processions:
i: Procession
if i.state != ProcessionState.FINISHED.value:
# 根据当前procession的phase更新Phaser状态
#if i.phase == PhaserState.UNSURE: 此判断是不必要的 因为没有这种 Procession
if i.phase == PhaserState.QUICK_REVIEW:
self.to_quick_review()
elif i.phase == PhaserState.RECOGNITION:
@@ -139,4 +140,4 @@ class Phaser(Machine):
"Current Procession": "None" if not self.current_procession() else self.current_procession().name_, # type: ignore
},
]
return str(tabu(lst, headers="keys")) + "\n"
return str(tabu(tabular_data=lst, headers="keys", tablefmt="pipe")) + "\n"

View File

@@ -21,27 +21,26 @@ class Procession(Machine):
)
self.current_atom: pt.Atom | None
self.atoms = atoms
self.queue = atoms.copy()
self.current_atom = atoms[0] if atoms else None
self.cursor = 0
self.name_ = name_
self.phase = phase_state
states = [
{"name": ProcessionState.RUNNING.value, "on_enter": "on_running"},
{"name": ProcessionState.ACTIVE.value, "on_enter": "on_active"},
{"name": ProcessionState.FINISHED.value, "on_enter": "on_finished"},
]
transitions = [
{
"trigger": "finish",
"source": ProcessionState.RUNNING.value,
"source": ProcessionState.ACTIVE.value,
"dest": ProcessionState.FINISHED.value,
},
{
"trigger": "restart",
"source": ProcessionState.FINISHED.value,
"dest": ProcessionState.RUNNING.value,
"dest": ProcessionState.ACTIVE.value,
},
]
@@ -49,14 +48,14 @@ class Procession(Machine):
self,
states=states,
transitions=transitions,
initial=ProcessionState.RUNNING.value,
initial=ProcessionState.ACTIVE.value,
)
logger.debug("Procession 初始化完成, 队列长度=%d", len(self.queue))
logger.debug("Procession 初始化完成, 队列长度=%d", len(self.atoms))
def on_running(self):
"""进入RUNNING状态时的回调"""
logger.debug("Procession 进入 RUNNING 状态")
def on_active(self):
"""进入active状态时的回调"""
logger.debug("Procession 进入 active 状态")
def on_finished(self):
"""进入FINISHED状态时的回调"""
@@ -66,21 +65,19 @@ class Procession(Machine):
"""将记忆原子指针向前移动并依情况更新原子(返回 1)或完成队列(返回 0)"""
logger.debug("Procession.forward: step=%d, 当前 cursor=%d", step, self.cursor)
self.cursor += step
if self.cursor >= len(self.queue):
if self.cursor >= len(self.atoms):
if self.state != ProcessionState.FINISHED.value:
self.finish() # 触发状态转换
logger.debug("Procession 已完成")
else:
if self.state != ProcessionState.RUNNING.value:
self.restart() # 确保在RUNNING状态
self.current_atom = self.queue[self.cursor]
if self.state != ProcessionState.ACTIVE.value:
self.restart() # 确保在active状态
self.current_atom = self.atoms[self.cursor]
logger.debug("cursor 更新为: %d", self.cursor)
logger.debug(
"当前原子更新为: %s",
self.current_atom.ident if self.current_atom else "None",
)
return 1 # 成功
return 0
def append(self, atom=None):
"""追加(回忆失败的)原子(默认为当前原子)到队列末端"""
@@ -88,16 +85,16 @@ class Procession(Machine):
atom = self.current_atom
logger.debug("Procession.append: atom=%s", atom.ident if atom else "None")
if not self.queue or self.queue[-1] != atom or len(self) <= 1:
self.queue.append(atom)
logger.debug("原子已追加到队列, 新队列长度=%d", len(self.queue))
if not self.atoms or self.atoms[-1] != atom or len(self) <= 1:
self.atoms.append(atom)
logger.debug("原子已追加到队列, 新队列长度=%d", len(self.atoms))
else:
logger.debug("原子未追加(重复或队列长度<=1)")
def __len__(self):
if not self.queue:
if not self.atoms:
return 0
length = len(self.queue) - self.cursor
length = len(self.atoms) - self.cursor
logger.debug("Procession.__len__: 剩余长度=%d", length)
return length
@@ -106,17 +103,17 @@ class Procession(Machine):
return self.cursor
def total_length(self):
total = len(self.queue)
total = len(self.atoms)
logger.debug("Procession.total_length: %d", total)
return total
def is_empty(self):
empty = len(self.queue) == 0
empty = len(self.atoms) == 0
logger.debug("Procession.is_empty: %s", empty)
return empty
def get_fission(self):
return Fission(atom=self.current_atom, phase_state=self.phase) # type: ignore
return Fission(atom=self.current_atom, phase=self.phase) # type: ignore
def __repr__(self):
from heurams.services.textproc import truncate
@@ -126,9 +123,9 @@ class Procession(Machine):
"Type": "Procession",
"Name": self.name_,
"State": self.state,
"Progress": f"{self.cursor + 1} / {len(self.queue)}",
"Queue": list(map(lambda f: truncate(f.ident), self.queue)),
"Progress": f"{self.cursor + 1} / {len(self.atoms)}",
"Queue": list(map(lambda f: truncate(f.ident), self.atoms)),
"Current Atom": self.current_atom.ident, # type: ignore
}
]
return str(tabu(dic, headers="keys")) + "\n"
return str(tabu(dic, headers="keys", tablefmt='pipe')) + "\n"

View File

@@ -12,10 +12,12 @@ class PhaserState(Enum):
FINAL_REVIEW = "final_review"
FINISHED = "finished"
class ProcessionState(Enum):
RUNNING = "running"
ACTIVE = "active"
FINISHED = "finished"
class FissionState(Enum):
EXAMMODE = "exammode"
RETRONLY = "retronly"
logger.debug("状态枚举定义已加载")