实现 Reactor

This commit is contained in:
2025-11-02 04:58:20 +08:00
parent 5aee812ffc
commit 2640299cd2
12 changed files with 148 additions and 215 deletions

View File

@@ -0,0 +1 @@
print("Hello from HeurAMS Program :)")

View File

@@ -25,9 +25,21 @@ class BaseAlgorithm:
}
@classmethod
def revisor(cls, algodata: dict, feedback: int = 5, is_new_activation: bool = False):
def revisor(cls, algodata: dict, feedback: int = 5, is_new_activation: bool = False) -> None:
"""迭代记忆数据"""
pass
@classmethod
def is_due(cls, algodata):
return 1
def is_due(cls, algodata) -> int:
"""是否应该复习"""
return 1
@classmethod
def rate(cls, algodata) -> str:
"""获取评分信息"""
return ""
@classmethod
def nextdate(cls, algodata) -> int:
"""获取下一次记忆时间戳"""
return -1

View File

@@ -70,4 +70,12 @@ class SM2Algorithm(BaseAlgorithm):
@classmethod
def is_due(cls, algodata):
return (algodata[cls.algo_name]['next_date'] <= timer.get_daystamp())
return (algodata[cls.algo_name]['next_date'] <= timer.get_daystamp())
@classmethod
def rate(cls, algodata):
return str(algodata[cls.algo_name]['efactor'])
@classmethod
def nextdate(cls, algodata):
return algodata[cls.algo_name]['next_date']

View File

@@ -40,9 +40,18 @@ class Electron:
print(f"警告: '{var}' 非已知元数据字段")
def is_due(self):
"""是否应该复习
"""
"""是否应该复习"""
return self.algo.is_due(self.algodata)
def is_activated(self):
return self.algodata[self.algo]['is_activated']
def rate(self):
"评价"
return self.algo.rate(self.algodata)
def nextdate(self):
return self.algo.nextdate(self.algodata)
def revisor(self, quality: int = 5, is_new_activation: bool = False):
"""算法迭代决策机制实现

View File

@@ -0,0 +1,12 @@
from .states import PhaserState, ProcessionState
from .procession import Procession
from .fission import Fission
from .phaser import Phaser
__all__ = [
"PhaserState",
"ProcessionState",
"Procession",
"Fission",
"Phaser"
]

View File

@@ -1 +0,0 @@
# 原子的生命周期管理器

View File

@@ -1,173 +0,0 @@
# 核心流程状态机
import heurams.kernel.particles as pt
import heurams.services.timer as timer
from typing import Tuple
class Procession():
"""队列: 记忆流程核心状态机"""
def __init__(self, atoms: list, phase: str = ""):
self.atoms = atoms
self.queue = atoms.copy()
self.current_atom = atoms[0]
self.cursor = 0
self.phase = phase
def forward(self, step = 1):
self.cursor += step
try:
self.current_atom = self.queue[self.cursor]
return 1 # 成功
except IndexError:
return 0
def append(self, atom = None):
if atom == None:
self.queue.append(self.current_atom)
else:
self.queue.append(atom)
def __len__(self):
return (len(self.queue) - self.cursor)
def is_empty(self):
return len(self.queue)
class Fission():
"""裂变器: 单原子调度展开器"""
def __init__(self, atom: pt.Atom, stage = ""):
self.atom = atom
atom.register["orbital"]["puzzle_config"]
class Phaser():
"""移相器: 全局调度阶段管理器"""
class Reactork():
"""反应堆对象, 处理和分配一次文件记忆流程的资源与策略"""
def __init__(self, nucleon_file: pt.NucleonUnion, electron_file: pt.ElectronUnion, screen, tasked_num):
# 导入原子对象
self.stage = 0
self.nucleon_file = nucleon_file
self.electron_file = electron_file
self.tasked_num = tasked_num
self.atoms_new = list()
self.atoms_review = list()
counter = self.tasked_num
self.screen = screen
self.electron_dict = electron_file.electrons_dict
self.quality_dict = {}
def electron_dict_get_fallback(key) -> pt.Electron:
value = self.electron_dict.get(key)
# 如果值不存在,则设置默认值
if value is None:
value = pt.Electron(key, {}) # 获取默认值
self.electron_dict[key] = value # 将默认值存入字典
electron_file.sync()
return value # 返回获取的值(可能是默认值)
for nucleon in nucleon_file.nucleons:
# atom = (Electron, Nucleon, Positron) 即 (记忆元数据, 内容元数据, 运行时数据)
atom = (electron_dict_get_fallback(nucleon.content), nucleon, {}) # 使用 "Positron" 代称 atom[2]
atom[2]["testdata"] = nucleon_file.testdata
atom[2]["keydata"] = nucleon_file.keydata
if atom[0]["is_activated"] == 0:
if counter > 0:
atom[2]["is_new_activation"] = 1
atom[0]["is_activated"] = 1
self.atoms_new.append(atom)
counter -= 1
else:
atom[2]["is_new_activation"] = 0
if int(atom[0]["next_date"]) <= timer.get_daystamp():
atom[0]["last_date"] = timer.get_daystamp()
self.atoms_review.append(atom)
# 设置运行时
self.index: int
self.procession: list
self.failed: list
self.round_title: str
self.current_atom: Tuple[pt.Electron, pt.Nucleon, dict]
self.round_set = 0
self.current_atom = pt.Atom.placeholder()
#print(self.atoms_new)
def set_round(self, title, procession):
self.round_set = 1
self.round_title = title
self.procession = procession
self.failed = list()
self.index = -1
def set_round_templated(self, stage):
titles = {
1: "复习模式",
2: "新记忆模式",
3: "总复习模式"
}
processions = {
1: self.atoms_review,
2: self.atoms_new,
3: (self.atoms_new + self.atoms_review)
}
self.stage = stage
ret = 1
if stage == 1 and len(processions[1]) == 0:
stage = 2
ret = 2
if stage == 1 and len(processions[2]) == 0:
stage = 3
ret = 3
self.set_round(title=titles[stage], procession=processions[stage])
return ret
def forward(self, step = 1):
"""
返回值规则:
1: 重定向至 failed
-1: 此轮已完成
0: 下一个记忆单元
"""
if self.index + step >= len(self.procession):
if len(self.failed) > 0:
self.procession = self.failed
self.index = -1
self.forward(step)
if "- 额外复习" not in self.round_title:
self.round_title += " - 额外复习"
self.failed = list()
return 1 # 自动重定向到 failed
else:
self.round_set = 0
return -1 # 此轮已完成
self.index += step
self.current_atom = self.procession[self.index]
self.current_appar = Apparatus(self.screen, self, self.current_atom, (self.stage == 1)).iterator()
return 0
def save(self):
self._deploy_report()
print("Progress saved")
# self.nucleon_file.save()
if self.electron_file.total["last_date"] < timer.get_daystamp():
self.electron_file.save()
def _deploy_report(self):
"部署所有 _report"
for e, q in self.quality_dict.items():
if q == -1:
e.revisor(5, True)
continue
e.revisor(q)
def report(self, atom, quality):
"向反应器和最低质量记录汇报"
if atom in self.atoms_new:
self.quality_dict[atom[0]] = -1
print(self.quality_dict)
return
self.quality_dict[atom[0]] = min(quality, self.quality_dict.get(atom[0], 5))
if quality <= 3:
self.failed.append(atom)
print(self.quality_dict)

View File

@@ -0,0 +1,20 @@
import heurams.kernel.particles as pt
import heurams.kernel.puzzles as puz
import random
from .states import PhaserState
class Fission():
"""裂变器: 单原子调度展开器"""
def __init__(self, atom: pt.Atom, phase = PhaserState.RECOGNITION):
self.atom = atom
self.orbital = atom.register["orbital"]["puzzle_config"][phase]
self.puzzles = list()
for item, possibility in self.orbital:
while possibility > 1:
self.puzzles.append(puz.puzzles[item])
possibility -= 1
if random.random() <= possibility:
self.puzzles.append(puz.puzzles[item])
def iterator(self):
yield from self.puzzles

View File

@@ -1,35 +0,0 @@
# 轻量状态查看器
import heurams.kernel.particles as pt
class Glimpse():
"""轻量级只读, 用于状态指示"""
def __init__(self, nucleon_union: pt.NucleonUnion):
self.name = nucleon_union.name
self.nuc_u = nucleon_union
self.elt_u = self.nuc_u.linked_electron_union()
self.lastest_date = -1
self.next_date = 0x3f3f3f3f
self.avg_efactor = 0
self.total_num = 0
self.activated_num = 0
self.is_initialized = 0
if self.elt_u != 0:
self.is_initialized = 1
self.total_num = len(self.elt_u.electrons)
for i in self.elt_u.electrons:
if i['next_date'] > 0:
self.next_date = min(self.next_date, i['next_date'])
self.lastest_date = max(self.lastest_date, i['last_date'])
if i['is_activated']:
self.avg_efactor += i['efactor']
self.activated_num += 1
if self.next_date == 0x3f3f3f3f:
self.next_date = -1
self.is_initialized = 0
if self.activated_num == 0:
return
self.avg_efactor = round(self.avg_efactor / self.activated_num, 2)
if self.next_date == 0x3f3f3f3f:
self.next_date = -1
return

View File

@@ -0,0 +1,31 @@
# 移相器类定义
import heurams.kernel.particles as pt
from .states import PhaserState, ProcessionState
from .procession import Procession
class Phaser():
"""移相器: 全局调度阶段管理器"""
def __init__(self, atoms: list[pt.Atom]) -> None:
new_atoms = list()
old_atoms = list()
self.state = PhaserState.UNSURE
for i in atoms:
if not i.register["electron"].is_activated():
new_atoms.append(i)
else:
old_atoms.append(i)
self.processions = list()
if len(old_atoms):
self.processions.append(Procession(old_atoms, PhaserState.QUICK_REVIEW, "初始复习"))
if len(new_atoms):
self.processions.append(Procession(new_atoms,PhaserState.RECOGNITION, "新记忆"))
self.processions.append(Procession(atoms,PhaserState.FINAL_REVIEW, "总体复习"))
def current_procession(self):
for i in self.processions:
if not i.state == ProcessionState.FINISHED:
self.state = i.phase
return i
self.state = PhaserState.FINISHED
return 0

View File

@@ -0,0 +1,37 @@
import heurams.kernel.particles as pt
from .states import PhaserState, ProcessionState
class Procession():
"""队列: 标识单次记忆流程"""
def __init__(self, atoms: list, phase: PhaserState, name: str = ""):
self.atoms = atoms
self.queue = atoms.copy()
self.current_atom = atoms[0]
self.cursor = 0
self.name = name
self.phase = phase
self.state: ProcessionState = ProcessionState.RUNNING
def forward(self, step = 1):
self.cursor += step
if self.cursor == len(self.queue):
self.state: ProcessionState = ProcessionState.FINISHED
else:
self.state: ProcessionState = ProcessionState.RUNNING
try:
self.current_atom = self.queue[self.cursor]
return 1 # 成功
except IndexError:
return 0
def append(self, atom = None):
if atom == None:
self.queue.append(self.current_atom)
else:
self.queue.append(atom)
def __len__(self):
return (len(self.queue) - self.cursor)
def is_empty(self):
return len(self.queue)

View File

@@ -0,0 +1,12 @@
from enum import Enum, auto
class PhaserState(Enum):
UNSURE = "unsure"
QUICK_REVIEW = "quick_review"
RECOGNITION = "recognition"
FINAL_REVIEW = "final_review"
FINISHED = "finished"
class ProcessionState(Enum):
RUNNING = auto()
FINISHED = auto()