分离算法

This commit is contained in:
2025-10-23 23:45:40 +08:00
parent 3fef587ff5
commit 809a6dbe75
9 changed files with 181 additions and 81 deletions

View File

@@ -0,0 +1,33 @@
from typing import Dict, Any, Callable, TypedDict
class AlgorithmConfig(TypedDict):
algo_name: str
defaults: Dict[str, Any]
revisor: Callable[[dict, int, bool], None]
_algorithms: Dict[str, AlgorithmConfig] = {}
def register_algorithm(algo_name: str, defaults: Dict[str, Any], revisor: Callable):
_algorithms[algo_name] = {
'algo_name': algo_name,
'defaults': defaults,
'revisor': revisor
}
def get_algorithm(algo_name: str) -> AlgorithmConfig:
if algo_name not in _algorithms:
raise ValueError(f"算法 {algo_name}' 未找到, 可用值: {list_algorithms()}")
return _algorithms[algo_name]
def list_algorithms() -> list[str]:
return list(_algorithms.keys())
# 导入注册
from . import supermemo2
register_algorithm(
algo_name=supermemo2.algo_name,
defaults=supermemo2.defaults,
revisor=supermemo2.revisor
)
__all__ = ['get_algorithm', 'list_algorithms', 'register_algorithm', 'AlgorithmConfig']

View File

@@ -0,0 +1,66 @@
import heurams.services.timer as timer
from typing import TypedDict
algo_name = "supermemo2"
class AlgodataDict(TypedDict):
efactor: float
real_rept: int
rept: int
interval: int
last_date: int
next_date: int
is_activated: int
last_modify: float
defaults = {
'efactor': 2.5,
'real_rept': 0,
'rept': 0,
'interval': 0,
'last_date': 0,
'next_date': 0,
'is_activated': 0,
'last_modify': timer.get_timestamp()
}
def revisor(algodata: dict, feedback: int = 5, is_new_activation: bool = False):
"""SM-2 算法迭代决策机制实现
根据 quality(0 ~ 5) 进行参数迭代最佳间隔
quality 由主程序评估
Args:
quality (int): 记忆保留率量化参数
"""
if feedback == -1:
return
algodata[algo_name]['efactor'] = algodata[algo_name]['efactor'] + (
0.1 - (5 - feedback) * (0.08 + (5 - feedback) * 0.02)
)
algodata[algo_name]['efactor'] = max(1.3, algodata[algo_name]['efactor'])
if feedback < 3:
algodata[algo_name]['rept'] = 0
algodata[algo_name]['interval'] = 0
else:
algodata[algo_name]['rept'] += 1
algodata[algo_name]['real_rept'] += 1
if is_new_activation:
algodata[algo_name]['rept'] = 0
algodata[algo_name]['efactor'] = 2.5
if algodata[algo_name]['rept'] == 0:
algodata[algo_name]['interval'] = 1
elif algodata[algo_name]['rept'] == 1:
algodata[algo_name]['interval'] = 6
else:
algodata[algo_name]['interval'] = round(
algodata[algo_name]['interval'] * algodata[algo_name]['efactor']
)
algodata[algo_name]['last_date'] = timer.get_daystamp()
algodata[algo_name]['next_date'] = timer.get_daystamp() + algodata[algo_name]['interval']
algodata[algo_name]['last_modify'] = timer.get_timestamp()

View File

@@ -1,10 +1,22 @@
from .electron import Electron
from .nucleon import Nucleon
from typing import TypedDict
import pathlib
import typing
import toml
import json
class AtomRegister(TypedDict):
nucleon: Nucleon
nucleon_path: pathlib.Path
nucleon_fmt: str
electron: Electron
electron_path: pathlib.Path
electron_fmt: str
orbital: dict
orbital_path: pathlib.Path
orbital_fmt: str
class Atom():
"""
统一处理一系列对象的所有信息与持久化:
@@ -14,9 +26,11 @@ class Atom():
以及关联路径
"""
def __init__(self, ident = ""):
self.ident = ident
self.register = {
self.register: AtomRegister = { # type: ignore
"nucleon": None,
"nucleon_path": None,
"nucleon_fmt": "toml",
@@ -49,6 +63,17 @@ class Atom():
raise KeyError("不受支持的持久化格式")
else:
raise TypeError("对未初始化的路径对象操作")
def __getitem__(self, key):
if key in self.register:
return self.register[key]
raise KeyError(f"不支持的键: {key}")
def __setitem__(self, key, value):
if key in self.register:
self.register[key] = value
else:
raise KeyError(f"不支持的键: {key}")
@staticmethod
def placeholder():

View File

@@ -1,49 +1,40 @@
import heurams.services.timer as timer
from heurams.context import config_var
from heurams.kernel.algorithms import get_algorithm
class Electron:
"""电子: 记忆分析元数据及算法"""
algo = "SM-2"
def __init__(self, ident: str, algodata: dict = {}):
def __init__(self, ident: str, algodata: dict = {}, algo: str = "supermemo2"):
"""初始化电子对象 (记忆数据)
Args:
ident: 算法的唯一标识符, 用于区分不同的算法实例, 使用 algodata[ident] 获取
algodata: 算法数据字典, 包含算法的各项参数和设置
algo: 使用的算法模块标识
"""
self.algodata = algodata
self.ident = ident
self.algo = algo
algorithm_config = get_algorithm(self.algo)
if self.algo not in self.algodata.keys():
self.algodata[self.algo] = {}
if algodata[self.algo] == {}:
self._default_init()
if not self.algodata[self.algo]:
self._default_init(algorithm_config['defaults'])
def _default_init(self):
"""默认初始化包装
"""
defaults = {
'efactor': 2.5, # 易度系数, 越大越简单, 最大为5
'real_rept': 0, # (实际)重复次数
'rept': 0, # (有效)重复次数
'interval': 0, # 最佳间隔
'last_date': 0, # 上一次复习的时间戳
'next_date': 0, # 将要复习的时间戳
'is_activated': 0, # 激活状态
# *NOTE: 此处"时间戳"是以天为单位的整数, 即 UNIX 时间戳除以一天的秒数取整
'last_modify': timer.get_timestamp() # 最后修改时间戳(此处是UNIX时间戳)
}
self.algodata[self.algo] = defaults
def _default_init(self, defaults: dict):
"""默认初始化包装"""
self.algodata[self.algo] = defaults.copy()
def activate(self):
"""激活此电子
"""
"""激活此电子"""
self.algodata[self.algo]['is_activated'] = 1
self.algodata[self.algo]['last_modify'] = timer.get_timestamp()
def modify(self, var: str, value):
"""修改 algodata[algo] 中子字典数据
"""
"""修改 algodata[algo] 中子字典数据"""
if var in self.algodata[self.algo]:
self.algodata[self.algo][var] = value
self.algodata[self.algo]['last_modify'] = timer.get_timestamp()
@@ -51,52 +42,20 @@ class Electron:
print(f"警告: '{var}' 非已知元数据字段")
def revisor(self, quality: int = 5, is_new_activation: bool = False):
"""SM-2 算法迭代决策机制实现
根据 quality(0 ~ 5) 进行参数迭代最佳间隔
quality 由主程序评估
"""算法迭代决策机制实现
Args:
quality (int): 记忆保留率量化参数
quality (int): 记忆保留率量化参数 (0-5)
is_new_activation (bool): 是否为初次激活
"""
print(f"REVISOR: {quality}, {is_new_activation}")
if quality == -1:
return -1
self.algodata[self.algo]['efactor'] = self.algodata[self.algo]['efactor'] + (
0.1 - (5 - quality) * (0.08 + (5 - quality) * 0.02)
)
self.algodata[self.algo]['efactor'] = max(1.3, self.algodata[self.algo]['efactor'])
if quality < 3:
# 若保留率低于 3重置重复次数
self.algodata[self.algo]['rept'] = 0
self.algodata[self.algo]['interval'] = 0 # 设为0以便下面重新计算 I(1)
else:
self.algodata[self.algo]['rept'] += 1
self.algodata[self.algo]['real_rept'] += 1
if is_new_activation: # 初次激活
self.algodata[self.algo]['rept'] = 0
self.algodata[self.algo]['efactor'] = 2.5
if self.algodata[self.algo]['rept'] == 0: # 刚被重置或初次激活后复习
self.algodata[self.algo]['interval'] = 1 # I(1)
elif self.algodata[self.algo]['rept'] == 1:
self.algodata[self.algo]['interval'] = 6 # I(2) 经验公式
else:
self.algodata[self.algo]['interval'] = round(
self.algodata[self.algo]['interval'] * self.algodata[self.algo]['efactor']
)
self.algodata[self.algo]['last_date'] = timer.get_daystamp()
self.algodata[self.algo]['next_date'] = timer.get_daystamp() + self.algodata[self.algo]['interval']
self.algodata[self.algo]['last_modify'] = timer.get_timestamp()
algorithm_config = get_algorithm(self.algo)
algorithm_config['revisor'](self.algodata, quality, is_new_activation)
def __str__(self):
return (
f"记忆单元预览 \n"
f"标识符: '{self.ident}' \n"
f"算法: {self.algo} \n"
f"易度系数: {self.algodata[self.algo]['efactor']:.2f} \n"
f"已经重复的次数: {self.algodata[self.algo]['rept']} \n"
f"下次间隔: {self.algodata[self.algo]['interval']}\n"
@@ -126,12 +85,10 @@ class Electron:
self.algodata[self.algo]['last_modify'] = timer.get_timestamp()
def __len__(self):
"""仅返回当前算法的配置数量
"""
"""仅返回当前算法的配置数量"""
return len(self.algodata[self.algo])
@staticmethod
def placeholder():
"""生成一个电子占位符
"""
return Electron("电子对象样例内容", {})
"""生成一个电子占位符"""
return Electron("电子对象样例内容", {})

View File

@@ -5,7 +5,38 @@ import heurams.services.timer as timer
from typing import Tuple
from .apparatus import Apparatus
class Reactor():
class Core():
"""堆芯: 记忆流程核心状态机"""
def __init__(self, atoms: list, stage = ""):
self.atoms = atoms
self.queue = atoms.copy()
self.current_atom = atoms[0]
self.cursor = 0
def forward(self, step = 1):
self.cursor += step
try:
self.current_atom = self.queue[self.cursor]
return 1 # 成功
except IndexError:
return 0
def append(self, atom):
self.queue.append(self.current_atom)
def __len__(self):
return (len(self.queue) - self.cursor)
def is_empty(self):
return len(self.queue)
class Fission():
"""裂变器: 单原子调度展开器"""
def __init__(self, atom: pt.Atom):
self.atom = atom
atom.register["orbital"]
class Reactork():
"""反应堆对象, 处理和分配一次文件记忆流程的资源与策略"""
def __init__(self, nucleon_file: pt.NucleonUnion, electron_file: pt.ElectronUnion, screen, tasked_num):
# 导入原子对象

View File

@@ -1 +0,0 @@
# 轮次编排

View File

@@ -1,11 +0,0 @@
# 核心流程状态机
import heurams.kernel.particles as pt
import heurams.services.timer as timer
from typing import Tuple
from .apparatus import Apparatus
class Reactor():
def __init__(self, atoms):
# atoms 是原子列表