refactor: 完成 0.4.0 版本更新

完成 0.4.0 版本更新, 为了消除此前提交消息风格不一致与错误提交超大文件的问题, 维持代码统计数据的准确性和提交消息风格的一致性, 重新初始化仓库; 旧的提交历史在 HeurAMS-legacy 仓库(https://gitea.imwangzhiyu.xyz/ajax/HeurAMS-legacy)
This commit is contained in:
2025-12-17 22:31:38 +08:00
commit 2f23cfe174
89 changed files with 6112 additions and 0 deletions

View File

@@ -0,0 +1,2 @@
# Kernel - HeurAMS 核心
记忆规划相关算法与数据结构, 可脱离业务层

View File

@@ -0,0 +1,15 @@
from .sm2 import SM2Algorithm
from heurams.services.logger import get_logger
logger = get_logger(__name__)
__all__ = [
"SM2Algorithm",
]
algorithms = {
"SM-2": SM2Algorithm,
"supermemo2": SM2Algorithm,
}
logger.debug("算法模块初始化完成, 注册的算法: %s", list(algorithms.keys()))

View File

@@ -0,0 +1,69 @@
import heurams.services.timer as timer
from typing import TypedDict
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class BaseAlgorithm:
algo_name = "BaseAlgorithm"
class AlgodataDict(TypedDict):
efactor: float
real_rept: int
rept: int
interval: int
last_date: int
next_date: int
is_activated: int
last_modify: float
defaults = {
"real_rept": 0,
"rept": 0,
"interval": 0,
"last_date": 0,
"next_date": 0,
"is_activated": 0,
"last_modify": timer.get_timestamp(),
}
@classmethod
def revisor(
cls, algodata: dict, feedback: int = 5, is_new_activation: bool = False
) -> None:
"""迭代记忆数据"""
logger.debug(
"BaseAlgorithm.revisor 被调用, algodata keys: %s, feedback: %d, is_new_activation: %s",
list(algodata.keys()) if algodata else [],
feedback,
is_new_activation,
)
pass
@classmethod
def is_due(cls, algodata) -> int:
"""是否应该复习"""
logger.debug(
"BaseAlgorithm.is_due 被调用, algodata keys: %s",
list(algodata.keys()) if algodata else [],
)
return 1
@classmethod
def rate(cls, algodata) -> str:
"""获取评分信息"""
logger.debug(
"BaseAlgorithm.rate 被调用, algodata keys: %s",
list(algodata.keys()) if algodata else [],
)
return ""
@classmethod
def nextdate(cls, algodata) -> int:
"""获取下一次记忆时间戳"""
logger.debug(
"BaseAlgorithm.nextdate 被调用, algodata keys: %s",
list(algodata.keys()) if algodata else [],
)
return -1

View File

@@ -0,0 +1,6 @@
# FSRS 算法模块, 尚未就绪
from heurams.services.logger import get_logger
logger = get_logger(__name__)
logger.info("FSRS算法模块尚未实现")

View File

@@ -0,0 +1,126 @@
from .base import BaseAlgorithm
import heurams.services.timer as timer
from typing import TypedDict
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class SM2Algorithm(BaseAlgorithm):
algo_name = "SM-2"
class AlgodataDict(TypedDict):
efactor: float
real_rept: int
rept: int
interval: int
last_date: int
next_date: int
is_activated: int
last_modify: float
defaults = {
"efactor": 2.5,
"real_rept": 0,
"rept": 0,
"interval": 0,
"last_date": 0,
"next_date": 0,
"is_activated": 0,
"last_modify": timer.get_timestamp(),
}
@classmethod
def revisor(
cls, algodata: dict, feedback: int = 5, is_new_activation: bool = False
):
"""SM-2 算法迭代决策机制实现
根据 quality(0 ~ 5) 进行参数迭代最佳间隔
quality 由主程序评估
Args:
quality (int): 记忆保留率量化参数
"""
logger.debug(
"SM2.revisor 开始, feedback: %d, is_new_activation: %s",
feedback,
is_new_activation,
)
if feedback == -1:
logger.debug("feedback 为 -1, 跳过更新")
return
algodata[cls.algo_name]["efactor"] = algodata[cls.algo_name]["efactor"] + (
0.1 - (5 - feedback) * (0.08 + (5 - feedback) * 0.02)
)
algodata[cls.algo_name]["efactor"] = max(
1.3, algodata[cls.algo_name]["efactor"]
)
logger.debug("更新 efactor: %f", algodata[cls.algo_name]["efactor"])
if feedback < 3:
algodata[cls.algo_name]["rept"] = 0
algodata[cls.algo_name]["interval"] = 0
logger.debug("feedback < 3, 重置 rept 和 interval")
else:
algodata[cls.algo_name]["rept"] += 1
logger.debug("递增 rept: %d", algodata[cls.algo_name]["rept"])
algodata[cls.algo_name]["real_rept"] += 1
logger.debug("递增 real_rept: %d", algodata[cls.algo_name]["real_rept"])
if is_new_activation:
algodata[cls.algo_name]["rept"] = 0
algodata[cls.algo_name]["efactor"] = 2.5
logger.debug("新激活, 重置 rept 和 efactor")
if algodata[cls.algo_name]["rept"] == 0:
algodata[cls.algo_name]["interval"] = 1
logger.debug("rept=0, 设置 interval=1")
elif algodata[cls.algo_name]["rept"] == 1:
algodata[cls.algo_name]["interval"] = 6
logger.debug("rept=1, 设置 interval=6")
else:
algodata[cls.algo_name]["interval"] = round(
algodata[cls.algo_name]["interval"] * algodata[cls.algo_name]["efactor"]
)
logger.debug(
"rept>1, 计算 interval: %d", algodata[cls.algo_name]["interval"]
)
algodata[cls.algo_name]["last_date"] = timer.get_daystamp()
algodata[cls.algo_name]["next_date"] = (
timer.get_daystamp() + algodata[cls.algo_name]["interval"]
)
algodata[cls.algo_name]["last_modify"] = timer.get_timestamp()
logger.debug(
"更新日期: last_date=%d, next_date=%d, last_modify=%f",
algodata[cls.algo_name]["last_date"],
algodata[cls.algo_name]["next_date"],
algodata[cls.algo_name]["last_modify"],
)
@classmethod
def is_due(cls, algodata):
result = algodata[cls.algo_name]["next_date"] <= timer.get_daystamp()
logger.debug(
"SM2.is_due: next_date=%d, current_daystamp=%d, result=%s",
algodata[cls.algo_name]["next_date"],
timer.get_daystamp(),
result,
)
return result
@classmethod
def rate(cls, algodata):
efactor = algodata[cls.algo_name]["efactor"]
logger.debug("SM2.rate: efactor=%f", efactor)
return str(efactor)
@classmethod
def nextdate(cls, algodata) -> int:
next_date = algodata[cls.algo_name]["next_date"]
logger.debug("SM2.nextdate: %d", next_date)
return next_date

View File

@@ -0,0 +1,29 @@
"""
Particle 模块 - 粒子对象系统
提供闪卡所需对象, 使用物理学粒子的领域驱动设计
"""
from heurams.services.logger import get_logger
logger = get_logger(__name__)
logger.debug("粒子模块已加载")
from .electron import Electron
from .nucleon import Nucleon
from .orbital import Orbital
from .atom import Atom, atom_registry
from .probe import probe_all, probe_by_filename
from .loader import load_nucleon, load_electron
__all__ = [
"Electron",
"Nucleon",
"Orbital",
"Atom",
"probe_all",
"probe_by_filename",
"load_nucleon",
"load_electron",
"atom_registry",
]

View File

@@ -0,0 +1,238 @@
from .electron import Electron
from .nucleon import Nucleon
from .orbital import Orbital
from typing import TypedDict
import pathlib
import typing
import toml
import json
import bidict
from heurams.context import config_var
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class AtomRegister_runtime(TypedDict):
locked: bool # 只读锁定标识符
min_rate: int # 最低评分
newact: bool # 新激活
class AtomRegister(TypedDict):
nucleon: Nucleon
nucleon_path: pathlib.Path
nucleon_fmt: str
electron: Electron
electron_path: pathlib.Path
electron_fmt: str
orbital: Orbital
orbital_path: pathlib.Path
orbital_fmt: str
runtime: AtomRegister_runtime
class Atom:
"""
统一处理一系列对象的所有信息与持久化:
关联电子 (算法数据)
关联核子 (内容数据)
关联轨道 (策略数据)
以及关联路径
"""
def __init__(self, ident=""):
logger.debug("创建 Atom 实例, ident: '%s'", ident)
self.ident = ident
atom_registry[ident] = self
logger.debug("Atom 已注册到全局注册表, 当前注册表大小: %d", len(atom_registry))
# self.is_evaled = False
self.registry: AtomRegister = { # type: ignore
"nucleon": None,
"nucleon_path": None,
"nucleon_fmt": "toml",
"electron": None,
"electron_path": None,
"electron_fmt": "json",
"orbital": None,
"orbital_path": None, # 允许设置为 None, 此时使用 nucleon 文件内的推荐配置
"orbital_fmt": "toml",
"runtime": {"locked": False, "min_rate": 0x3F3F3F3F, "newact": False},
}
self.do_eval()
logger.debug("Atom 初始化完成")
def link(self, key, value):
logger.debug("Atom.link: key='%s', value type: %s", key, type(value).__name__)
if key in self.registry.keys():
self.registry[key] = value
logger.debug("'%s' 已链接, 触发 do_eval", key)
self.do_eval()
if key == 'electron':
if self.registry['electron'].is_activated() == 0:
self.registry['runtime']['newact'] = True
else:
logger.error("尝试链接不受支持的键: '%s'", key)
raise ValueError("不受支持的原子元数据链接操作")
def minimize(self, rating):
"""效果等同于 self.registry['runtime']['min_rate'] = min(rating, self.registry['runtime']['min_rate'])
Args:
rating (int): 评分
"""
self.registry["runtime"]["min_rate"] = min(
rating, self.registry["runtime"]["min_rate"]
)
def lock(self, locked=-1):
logger.debug(f"锁定参数 {locked}")
"""锁定, 效果等同于 self.registry['runtime']['locked'] = locked 或者返回是否锁定"""
if locked == 1:
self.registry["runtime"]["locked"] = True
return 1
elif locked == 0:
self.registry["runtime"]["locked"] = False
return 1
elif locked == -1:
return self.registry["runtime"]["locked"]
return 0
def revise(self):
"""执行最终评分
PuzzleWidget 的 handler 除了测试, 严禁直接执行 Electron 的 revisor 函数, 否则造成逻辑混乱
"""
if self.registry["runtime"]["locked"]:
logger.debug(f"允许总评分: {self.registry['runtime']['min_rate']}")
self.registry["electron"].revisor(self.registry["runtime"]["min_rate"], is_new_activation=self.registry["runtime"]["newact"])
else:
logger.debug("禁止总评分")
def do_eval(self):
"""
执行并以结果替换当前单元的所有 eval 语句
TODO: 带有限制的 eval, 异步/多线程执行避免堵塞
"""
logger.debug("Atom.do_eval 开始")
# eval 环境设置
def eval_with_env(s: str):
# 初始化默认值
nucleon = self.registry["nucleon"]
default = {}
metadata = {}
try:
default = config_var.get()["puzzles"]
metadata = nucleon.metadata
except Exception:
# 如果无法获取配置或元数据, 使用空字典
logger.debug("无法获取配置或元数据, 使用空字典")
pass
try:
eval_value = eval(s)
if isinstance(eval_value, (list, dict)):
ret = eval_value
else:
ret = str(eval_value)
logger.debug(
"eval 执行成功: '%s' -> '%s'",
s,
str(ret)[:50] + "..." if len(ret) > 50 else ret,
)
except Exception as e:
ret = f"此 eval 实例发生错误: {e}"
logger.warning("eval 执行错误: '%s' -> %s", s, e)
return ret
def traverse(data, modifier):
if isinstance(data, dict):
for key, value in data.items():
data[key] = traverse(value, modifier)
return data
elif isinstance(data, list):
for i, item in enumerate(data):
data[i] = traverse(item, modifier)
return data
elif isinstance(data, tuple):
return tuple(traverse(item, modifier) for item in data)
else:
if isinstance(data, str):
if data.startswith("eval:"):
logger.debug("发现 eval 表达式: '%s'", data[5:])
return modifier(data[5:])
return data
# 如果 nucleon 存在且有 do_eval 方法, 调用它
nucleon = self.registry["nucleon"]
if nucleon is not None and hasattr(nucleon, "do_eval"):
nucleon.do_eval()
logger.debug("已调用 nucleon.do_eval")
# 如果 electron 存在且其 algodata 包含 eval 字符串, 遍历它
electron = self.registry["electron"]
if electron is not None and hasattr(electron, "algodata"):
traverse(electron.algodata, eval_with_env)
logger.debug("已处理 electron algodata eval")
# 如果 orbital 存在且是字典, 遍历它
orbital = self.registry["orbital"]
if orbital is not None and isinstance(orbital, dict):
traverse(orbital, eval_with_env)
logger.debug("orbital eval 完成")
logger.debug("Atom.do_eval 完成")
def persist(self, key):
logger.debug("Atom.persist: key='%s'", key)
path: pathlib.Path | None = self.registry[key + "_path"]
if isinstance(path, pathlib.Path):
path = typing.cast(pathlib.Path, path)
logger.debug("持久化路径: %s, 格式: %s", path, self.registry[key + "_fmt"])
path.parent.mkdir(parents=True, exist_ok=True)
if self.registry[key + "_fmt"] == "toml":
with open(path, "r+") as f:
f.seek(0)
f.truncate()
toml.dump(self.registry[key], f)
logger.debug("TOML 数据已保存到: %s", path)
elif self.registry[key + "_fmt"] == "json":
with open(path, "r+") as f:
origin = json.load(f)
f.seek(0)
f.truncate()
origin[self.ident] = self.registry[key].algodata
json.dump(origin, f, indent=2, ensure_ascii=False)
logger.debug("JSON 数据已保存到: %s", path)
else:
logger.error("不受支持的持久化格式: %s", self.registry[key + "_fmt"])
raise KeyError("不受支持的持久化格式")
else:
logger.error("路径未初始化: %s_path", key)
raise TypeError("对未初始化的路径对象操作")
def __getitem__(self, key):
logger.debug("Atom.__getitem__: key='%s'", key)
if key in self.registry:
value = self.registry[key]
logger.debug("返回 value type: %s", type(value).__name__)
return value
logger.error("不支持的键: '%s'", key)
raise KeyError(f"不支持的键: {key}")
def __setitem__(self, key, value):
logger.debug(
"Atom.__setitem__: key='%s', value type: %s", key, type(value).__name__
)
if key in self.registry:
self.registry[key] = value
logger.debug("'%s' 已设置", key)
else:
logger.error("不支持的键: '%s'", key)
raise KeyError(f"不支持的键: {key}")
@staticmethod
def placeholder():
return (Electron.placeholder(), Nucleon.placeholder(), {})
atom_registry: bidict.bidict[str, Atom] = bidict.bidict()

View File

@@ -0,0 +1,151 @@
import heurams.services.timer as timer
from heurams.context import config_var
from heurams.kernel.algorithms import algorithms
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class Electron:
"""电子: 记忆分析元数据及算法"""
def __init__(self, ident: str, algodata: dict = {}, algo_name: str = "supermemo2"):
"""初始化电子对象 (记忆数据)
Args:
ident: 算法的唯一标识符, 用于区分不同的算法实例, 使用 algodata[ident] 获取
algodata: 算法数据字典, 包含算法的各项参数和设置
algo: 使用的算法模块标识
"""
logger.debug(
"创建 Electron 实例, ident: '%s', algo_name: '%s'", ident, algo_name
)
self.algodata = algodata
self.ident = ident
self.algo = algorithms[algo_name]
logger.debug("使用的算法类: %s", self.algo.__name__)
if self.algo not in self.algodata.keys():
self.algodata[self.algo.algo_name] = {}
logger.debug("算法键 '%s' 不存在, 已创建空字典", self.algo)
if not self.algodata[self.algo.algo_name]:
logger.debug("算法数据为空, 使用默认值初始化")
self._default_init(self.algo.defaults)
else:
logger.debug("算法数据已存在, 跳过默认初始化")
logger.debug(
"Electron 初始化完成, algodata keys: %s", list(self.algodata.keys())
)
def _default_init(self, defaults: dict):
"""默认初始化包装"""
logger.debug(
"Electron._default_init: 使用默认值, keys: %s", list(defaults.keys())
)
self.algodata[self.algo.algo_name] = defaults.copy()
def activate(self):
"""激活此电子"""
logger.debug("Electron.activate: 激活 ident='%s'", self.ident)
self.algodata[self.algo.algo_name]["is_activated"] = 1
self.algodata[self.algo.algo_name]["last_modify"] = timer.get_timestamp()
logger.debug("电子已激活, is_activated=1")
def modify(self, var: str, value):
"""修改 algodata[algo] 中子字典数据"""
logger.debug("Electron.modify: var='%s', value=%s", var, value)
if var in self.algodata[self.algo.algo_name]:
self.algodata[self.algo.algo_name][var] = value
self.algodata[self.algo.algo_name]["last_modify"] = timer.get_timestamp()
logger.debug("变量 '%s' 已修改, 更新 last_modify", var)
else:
logger.warning("'%s' 非已知元数据字段", var)
print(f"警告: '{var}' 非已知元数据字段")
def is_due(self):
"""是否应该复习"""
logger.debug("Electron.is_due: 检查 ident='%s'", self.ident)
result = self.algo.is_due(self.algodata)
logger.debug("is_due 结果: %s", result)
return result and self.is_activated()
def is_activated(self):
result = self.algodata[self.algo.algo_name]["is_activated"]
logger.debug("Electron.is_activated: ident='%s', 结果: %d", self.ident, result)
return result
def get_rate(self):
"评价"
try:
logger.debug("Electron.rate: ident='%s'", self.ident)
result = self.algo.rate(self.algodata)
logger.debug("rate 结果: %s", result)
return result
except:
return 0
def nextdate(self) -> int:
logger.debug("Electron.nextdate: ident='%s'", self.ident)
result = self.algo.nextdate(self.algodata)
logger.debug("nextdate 结果: %d", result)
return result
def revisor(self, quality: int = 5, is_new_activation: bool = False):
"""算法迭代决策机制实现
Args:
quality (int): 记忆保留率量化参数 (0-5)
is_new_activation (bool): 是否为初次激活
"""
logger.debug(
"Electron.revisor: ident='%s', quality=%d, is_new_activation=%s",
self.ident,
quality,
is_new_activation,
)
self.algo.revisor(self.algodata, quality, is_new_activation)
logger.debug(
"revisor 完成, 更新后的 algodata: %s", self.algodata.get(self.algo, {})
)
def __str__(self):
return (
f"记忆单元预览 \n"
f"标识符: '{self.ident}' \n"
f"算法: {self.algo} \n"
f"易度系数: {self.algodata[self.algo.algo_name]['efactor']:.2f} \n"
f"已经重复的次数: {self.algodata[self.algo.algo_name]['rept']} \n"
f"下次间隔: {self.algodata[self.algo.algo_name]['interval']}\n"
f"下次复习日期时间戳: {self.algodata[self.algo.algo_name]['next_date']}"
)
def __eq__(self, other):
if self.ident == other.ident:
return True
return False
def __hash__(self):
return hash(self.ident)
def __getitem__(self, key):
if key == "ident":
return self.ident
if key in self.algodata[self.algo.algo_name]:
return self.algodata[self.algo.algo_name][key]
else:
raise KeyError(f"'{key}' 未在 algodata[self.algo] 中")
def __setitem__(self, key, value):
if key == "ident":
raise AttributeError("ident 应为只读")
self.algodata[self.algo.algo_name][key] = value
self.algodata[self.algo.algo_name]["last_modify"] = timer.get_timestamp()
def __len__(self):
"""仅返回当前算法的配置数量"""
return len(self.algodata[self.algo.algo_name])
@staticmethod
def placeholder():
"""生成一个电子占位符"""
return Electron("电子对象样例内容", {})

View File

@@ -0,0 +1,71 @@
from .nucleon import Nucleon
from .electron import Electron
import heurams.services.hasher as hasher
import pathlib
import toml
import json
from copy import deepcopy
from heurams.services.logger import get_logger
logger = get_logger(__name__)
def load_nucleon(path: pathlib.Path, fmt="toml"):
logger.debug("load_nucleon: 加载文件 %s, 格式: %s", path, fmt)
with open(path, "r") as f:
dictdata = dict()
dictdata = toml.load(f) # type: ignore
logger.debug("TOML 解析成功, keys: %s", list(dictdata.keys()))
lst = list()
nested_data = dict()
# 修正 toml 解析器的不管嵌套行为
for key, value in dictdata.items():
if "__metadata__" in key: # 以免影响句号
if "." in key:
parts = key.split(".")
current = nested_data
for part in parts[:-1]:
if part not in current:
current[part] = {}
current = current[part]
current[parts[-1]] = value
logger.debug("处理元数据键: %s", key)
else:
nested_data[key] = value
logger.debug("嵌套数据处理完成, keys: %s", list(nested_data.keys()))
# print(nested_data)
for item, attr in nested_data.items():
if item == "__metadata__":
continue
logger.debug("处理项目: %s", item)
lst.append(
(
Nucleon(item, attr, deepcopy(nested_data["__metadata__"])),
deepcopy(nested_data["__metadata__"]["orbital"]),
)
)
logger.debug("load_nucleon 完成, 加载了 %d 个 Nucleon 对象", len(lst))
return lst
def load_electron(path: pathlib.Path, fmt="json") -> dict:
"""从文件路径加载电子对象
Args:
path (pathlib.Path): 路径
fmt (str): 文件格式(可选, 默认 json)
Returns:
dict: 键名是电子对象名称, 值是电子对象
"""
logger.debug("load_electron: 加载文件 %s, 格式: %s", path, fmt)
with open(path, "r") as f:
dictdata = dict()
dictdata = json.load(f) # type: ignore
logger.debug("JSON 解析成功, keys: %s", list(dictdata.keys()))
dic = dict()
for item, attr in dictdata.items():
logger.debug("处理电子项目: %s", item)
dic[item] = Electron(item, attr)
logger.debug("load_electron 完成, 加载了 %d 个 Electron 对象", len(dic))
return dic

View File

@@ -0,0 +1,104 @@
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class Nucleon:
"""原子核: 材料元数据"""
def __init__(self, ident: str, payload: dict, metadata: dict = {}):
"""初始化原子核 (记忆内容)
Args:
ident: 唯一标识符
payload: 记忆内容信息
metadata: 可选元数据信息
"""
logger.debug(
"创建 Nucleon 实例, ident: '%s', payload keys: %s, metadata keys: %s",
ident,
list(payload.keys()) if payload else [],
list(metadata.keys()) if metadata else [],
)
self.metadata = metadata
self.payload = payload
self.ident = ident
logger.debug("Nucleon 初始化完成")
def __getitem__(self, key):
logger.debug("Nucleon.__getitem__: key='%s'", key)
if key == "ident":
logger.debug("返回 ident: '%s'", self.ident)
return self.ident
if key in self.payload:
value = self.payload[key]
logger.debug(
"返回 payload['%s'], value type: %s", key, type(value).__name__
)
return value
else:
logger.error("'%s' 未在 payload 中找到", key)
raise KeyError(f"Key '{key}' not found in payload.")
def __iter__(self):
yield from self.payload.keys()
def __len__(self):
return len(self.payload)
def __hash__(self):
return hash(self.ident)
def do_eval(self):
"""
执行并以结果替换当前单元的所有 eval 语句
TODO: 带有限制的 eval, 异步/多线程执行避免堵塞
"""
logger.debug("Nucleon.do_eval 开始")
# eval 环境设置
def eval_with_env(s: str):
try:
nucleon = self
eval_value = eval(s)
if isinstance(eval_value, (int, float)):
ret = str(eval_value)
else:
ret = eval_value
logger.debug(
"eval 执行成功: '%s' -> '%s'",
s,
str(ret)[:50] + "..." if len(ret) > 50 else ret,
)
except Exception as e:
ret = f"此 eval 实例发生错误: {e}"
logger.warning("eval 执行错误: '%s' -> %s", s, e)
return ret
def traverse(data, modifier):
if isinstance(data, dict):
for key, value in data.items():
data[key] = traverse(value, modifier)
return data
elif isinstance(data, list):
for i, item in enumerate(data):
data[i] = traverse(item, modifier)
return data
elif isinstance(data, tuple):
return tuple(traverse(item, modifier) for item in data)
else:
if isinstance(data, str):
if data.startswith("eval:"):
logger.debug("发现 eval 表达式: '%s'", data[5:])
return modifier(data[5:])
return data
traverse(self.payload, eval_with_env)
traverse(self.metadata, eval_with_env)
logger.debug("Nucleon.do_eval 完成")
@staticmethod
def placeholder():
"""生成一个占位原子核"""
logger.debug("创建 Nucleon 占位符")
return Nucleon("核子对象样例内容", {})

View File

@@ -0,0 +1,29 @@
from typing import TypedDict
from heurams.services.logger import get_logger
logger = get_logger(__name__)
logger.debug("Orbital 类型定义模块已加载")
class OrbitalSchedule(TypedDict):
quick_review: list
recognition: list
final_review: list
class Orbital(TypedDict):
schedule: OrbitalSchedule
puzzles: dict
"""一份示例
["__metadata__.orbital.puzzles"] # 谜题定义
"Recognition" = { __origin__ = "recognition", __hint__ = "", primary = "eval:nucleon['content']", secondery = ["eval:nucleon['keyword_note']", "eval:nucleon['note']"], top_dim = ["eval:nucleon['translation']"] }
"SelectMeaning" = { __origin__ = "mcq", __hint__ = "eval:nucleon['content']", jammer = "eval:nucleon['keyword_note']", max_riddles_num = "eval:default['mcq']['max_riddles_num']", prefix = "选择正确项: " }
"FillBlank" = { __origin__ = "cloze", __hint__ = "", text = "eval:nucleon['content']", delimiter = "eval:metadata['formation']['delimiter']", min_denominator = "eval:default['cloze']['min_denominator']"}
["__metadata__.orbital.schedule"] # 内置的推荐学习方案
quick_review = [["FillBlank", "1.0"], ["SelectMeaning", "0.5"], ["recognition", "1.0"]]
recognition = [["recognition", "1.0"]]
final_review = [["FillBlank", "0.7"], ["SelectMeaning", "0.7"], ["recognition", "1.0"]]
"""

View File

@@ -0,0 +1,61 @@
from heurams.context import config_var
import pathlib
from heurams.services.logger import get_logger
logger = get_logger(__name__)
def probe_by_filename(filename):
"""探测指定文件 (无扩展名) 的所有信息"""
logger.debug("probe_by_filename: 探测文件 '%s'", filename)
paths: dict = config_var.get().get("paths")
logger.debug("配置路径: %s", paths)
formats = ["toml", "json"]
result = {}
for item, attr in paths.items():
for i in formats:
attr: pathlib.Path = pathlib.Path(attr) / filename + "." + i
if attr.exists():
logger.debug("找到文件: %s", attr)
result[item.replace("_dir", "")] = str(attr)
else:
logger.debug("文件不存在: %s", attr)
logger.debug("probe_by_filename 结果: %s", result)
return result
def probe_all(is_stem=1):
"""依据目录探测所有信息
Args:
is_stem (boolean): 是否**删除**文件扩展名
Returns:
dict: 有三项, 每一项的键名都是文件组类型, 值都是文件组列表, 只包含文件名
"""
logger.debug("probe_all: 开始探测, is_stem=%d", is_stem)
paths: dict = config_var.get().get("paths")
logger.debug("配置路径: %s", paths)
result = {}
for item, attr in paths.items():
attr: pathlib.Path = pathlib.Path(attr)
result[item.replace("_dir", "")] = list()
logger.debug("扫描目录: %s", attr)
file_count = 0
for i in attr.iterdir():
if not i.is_dir():
file_count += 1
if is_stem:
result[item.replace("_dir", "")].append(str(i.stem))
else:
result[item.replace("_dir", "")].append(str(i.name))
logger.debug("目录 %s 中找到 %d 个文件", attr, file_count)
logger.debug("probe_all 完成, 结果 keys: %s", list(result.keys()))
return result
if __name__ == "__main__":
import os
print(os.getcwd())
print(probe_all())

View File

@@ -0,0 +1,63 @@
"""
Puzzle 模块 - 谜题生成系统
提供多种类型的谜题生成器, 支持从字符串、字典等数据源导入题目
"""
from heurams.services.logger import get_logger
logger = get_logger(__name__)
from .base import BasePuzzle
from .cloze import ClozePuzzle
from .mcq import MCQPuzzle
from .recognition import RecognitionPuzzle
__all__ = [
"BasePuzzle",
"ClozePuzzle",
"MCQPuzzle",
"RecognitionPuzzle",
]
puzzles = {
"mcq": MCQPuzzle,
"cloze": ClozePuzzle,
"recognition": RecognitionPuzzle,
"base": BasePuzzle,
}
@staticmethod
def create_by_dict(config_dict: dict) -> BasePuzzle:
"""
根据配置字典创建谜题
Args:
config_dict: 配置字典, 包含谜题类型和参数
Returns:
BasePuzzle: 谜题实例
Raises:
ValueError: 当配置无效时抛出
"""
logger.debug(
"puzzles.create_by_dict: config_dict keys=%s", list(config_dict.keys())
)
puzzle_type = config_dict.get("type")
if puzzle_type == "cloze":
return puzzles["cloze"](
text=config_dict["text"],
min_denominator=config_dict.get("min_denominator", 7),
)
elif puzzle_type == "mcq":
return puzzles["mcq"](
mapping=config_dict["mapping"],
jammer=config_dict.get("jammer", []),
max_riddles_num=config_dict.get("max_riddles_num", 2),
prefix=config_dict.get("prefix", ""),
)
else:
raise ValueError(f"未知的谜题类型: {puzzle_type}")

View File

@@ -0,0 +1,16 @@
# base.py
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class BasePuzzle:
"""谜题基类"""
def refresh(self):
logger.debug("BasePuzzle.refresh 被调用(未实现)")
raise NotImplementedError("谜题对象未实现 refresh 方法")
def __str__(self):
logger.debug("BasePuzzle.__str__ 被调用")
return f"谜题: {type(self).__name__}"

View File

@@ -0,0 +1,55 @@
from .base import BasePuzzle
import random
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class ClozePuzzle(BasePuzzle):
"""填空题谜题生成器
Args:
text: 原始字符串(需要 delimiter 分割句子, 末尾应有 delimiter)
min_denominator: 最小概率倒数(如占所有可生成填空数的 1/7 中的 7, 若期望值小于 1, 则取 1)
"""
def __init__(self, text: str, min_denominator: int, delimiter: str = "/"):
logger.debug(
"ClozePuzzle.__init__: text length=%d, min_denominator=%d, delimiter='%s'",
len(text),
min_denominator,
delimiter,
)
self.text = text
self.min_denominator = min_denominator
self.wording = "填空题 - 尚未刷新谜题"
self.answer = ["填空题 - 尚未刷新谜题"]
self.delimiter = delimiter
logger.debug("ClozePuzzle 初始化完成")
def refresh(self): # 刷新谜题
logger.debug("ClozePuzzle.refresh 开始")
placeholder = "___SLASH___"
tmp_text = self.text.replace(self.delimiter, placeholder)
words = tmp_text.split(placeholder)
if not words:
logger.warning("ClozePuzzle.refresh: 无单词可处理")
return
words = [word for word in words if word]
logger.debug("ClozePuzzle.refresh: 分割出 %d 个单词", len(words))
num_blanks = min(max(1, len(words) // self.min_denominator), len(words))
logger.debug("ClozePuzzle.refresh: 需要生成 %d 个填空", num_blanks)
indices_to_blank = random.sample(range(len(words)), num_blanks)
indices_to_blank.sort()
blanked_words = list(words)
answer = list()
for index in indices_to_blank:
blanked_words[index] = "__" * len(words[index])
answer.append(words[index])
self.answer = answer
self.wording = "".join(blanked_words)
logger.debug("ClozePuzzle.refresh 完成, 生成 %d 个填空", len(answer))
def __str__(self):
logger.debug("ClozePuzzle.__str__ 被调用")
return f"{self.wording}\n{str(self.answer)}"

View File

@@ -0,0 +1,242 @@
# mcq.py
from .base import BasePuzzle
import random
from typing import List, Dict, Optional, Union
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class MCQPuzzle(BasePuzzle):
"""选择题谜题生成器
该类用于生成和管理选择题谜题, 支持多个题目同时生成,
每个题目包含问题, 正确答案和干扰项选项.
Attributes:
prefix (str): 题目前缀文本
mapping (Dict[str, str]): 问题和正确答案的映射字典
jammer (List[str]): 干扰项列表
max_riddles_num (int): 最大题目数量限制
wording (Union[str, List[str]]): 题目文本内容
answer (Union[str, List[str]]): 正确答案列表
options (List[List[str]]): 每个题目的选项列表
"""
def __init__(
self,
mapping: Dict[str, str],
jammer: List[str],
max_riddles_num: int = 2,
prefix: str = "",
) -> None:
"""初始化选择题谜题生成器
Args:
mapping: 问题和正确答案的映射字典, 键为问题, 值为正确答案
jammer: 干扰项列表, 用于生成错误选项
max_riddles_num: 每次生成的最大题目数量, 范围限制在1-5之间
prefix: 题目前缀文本, 会显示在每个题目之前
"""
logger.debug(
"MCQPuzzle.__init__: mapping size=%d, jammer size=%d, max_riddles_num=%d",
len(mapping),
len(jammer),
max_riddles_num,
)
self.prefix = prefix
self.mapping = mapping
self.max_riddles_num = max(1, min(max_riddles_num, 5))
# 初始化干扰项, 确保至少有4个选项
self._init_jammer(jammer)
# 初始化题目状态
self._reset_puzzle_state()
def _init_jammer(self, jammer: List[str]) -> None:
"""初始化干扰项列表
合并传入的干扰项和所有正确答案, 确保去重后至少有4个干扰项.
Args:
jammer: 传入的干扰项列表
"""
# 合并正确答案和传入的干扰项, 并去重
logger.debug(f"答案映射: {self.mapping}, {type(self.mapping)}")
logger.debug(f"干扰项: {jammer}, {type(jammer)}")
unique_jammers = set(jammer + list(self.mapping.values()))
self.jammer = list(unique_jammers)
# 确保至少有4个干扰项
while len(self.jammer) < 4:
self.jammer.append(" " * (4 - len(self.jammer)))
unique_jammers = set(jammer + list(self.mapping.values()))
def _reset_puzzle_state(self) -> None:
"""重置谜题状态为初始值
将题目文本, 答案和选项重置为默认状态.
"""
self.wording: Union[str, List[str]] = "选择题 - 尚未刷新谜题"
self.answer: Union[str, List[str]] = ["选择题 - 尚未刷新谜题"]
self.options: List[List[str]] = []
def refresh(self) -> None:
"""刷新谜题, 生成指定数量的选择题
从mapping中随机选择指定数量的问题, 为每个问题生成包含正确答案
和干扰项的选项列表, 并更新谜题状态.
Raises:
ValueError: 当mapping为空时不会抛出异常, 但会设置空谜题状态
"""
logger.debug("MCQPuzzle.refresh 开始, mapping size=%d", len(self.mapping))
if not self.mapping:
self._set_empty_puzzle()
return
num_questions = min(self.max_riddles_num, len(self.mapping))
selected_questions = random.sample(list(self.mapping.items()), num_questions)
puzzles: List[str] = []
answers: List[str] = []
all_options: List[List[str]] = []
for question, correct_answer in selected_questions:
options = self._generate_options(correct_answer)
puzzles.append(question)
answers.append(correct_answer)
all_options.append(options)
self.wording = self._format_questions(puzzles)
self.answer = answers
self.options = all_options
def _set_empty_puzzle(self) -> None:
"""设置为空谜题状态
当没有可用的题目时, 设置相应的提示信息.
"""
self.wording = "无可用题目"
self.answer = ["无答案"]
self.options = []
def _generate_options(self, correct_answer: str) -> List[str]:
"""为单个问题生成选项列表(包含正确答案和干扰项)
Args:
correct_answer: 当前问题的正确答案
Returns:
包含4个选项的列表, 其中一个是正确答案, 三个是干扰项
Note:
如果可用干扰项不足3个, 会使用重复的干扰项填充
"""
options = [correct_answer]
# 获取可用的干扰项(排除正确答案)
available_jammers = [
jammer for jammer in self.jammer if jammer != correct_answer
]
# 选择3个干扰项
if len(available_jammers) >= 3:
selected_jammers = random.sample(available_jammers, 3)
else:
selected_jammers = random.choices(available_jammers, k=3)
options.extend(selected_jammers)
random.shuffle(options)
return options
def _format_questions(self, puzzles: List[str]) -> List[str]:
"""格式化问题列表为可读的文本
Args:
puzzles: 原始问题文本列表
Returns:
格式化后的问题文本列表, 包含编号和前缀
Example:
输入: ["问题1", "问题2"]
输出: ["前缀:\\n 1. 问题1", "前缀:\\n 2. 问题2"]
"""
if not puzzles:
return []
formatted_questions = []
for i, puzzle in enumerate(puzzles, 1):
question_text = (
f"{self.prefix}:\n {i}. {puzzle}" if self.prefix else f"{i}. {puzzle}"
)
formatted_questions.append(question_text)
return formatted_questions
def __str__(self) -> str:
"""返回谜题的字符串表示
Returns:
包含所有问题和正确答案的格式化字符串
Example:
选择题 - 尚未刷新谜题
正确答案: 选择题 - 尚未刷新谜题
"""
if isinstance(self.wording, list):
question_text = "\n".join(self.wording)
else:
question_text = self.wording
if isinstance(self.answer, list):
answer_text = ", ".join(self.answer)
else:
answer_text = str(self.answer)
return f"{question_text}\n正确答案: {answer_text}"
def get_question_count(self) -> int:
"""获取当前生成的题目数量
Returns:
当前题目的数量, 如果尚未刷新则返回 0
"""
if isinstance(self.wording, list):
return len(self.wording)
elif self.wording == "选择题 - 尚未刷新谜题" or self.wording == "无可用题目":
return 0
else:
return 1
def get_correct_answer_for_question(self, question_index: int) -> Optional[str]:
"""获取指定题目的正确答案
Args:
question_index: 题目索引(从0开始)
Returns:
指定题目的正确答案, 如果索引无效则返回None
"""
if not isinstance(self.answer, list):
return None
if 0 <= question_index < len(self.answer):
return self.answer[question_index]
return None
def get_options_for_question(self, question_index: int) -> Optional[List[str]]:
"""获取指定题目的选项列表
Args:
question_index: 题目索引(从0开始)
Returns:
指定题目的选项列表, 如果索引无效则返回None
"""
if 0 <= question_index < len(self.options):
return self.options[question_index]
return None

View File

@@ -0,0 +1,18 @@
# mcq.py
from .base import BasePuzzle
import random
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class RecognitionPuzzle(BasePuzzle):
"""识别占位符"""
def __init__(self) -> None:
logger.debug("RecognitionPuzzle.__init__")
super().__init__()
def refresh(self):
logger.debug("RecognitionPuzzle.refresh空实现")
pass

View File

@@ -0,0 +1,11 @@
from .states import PhaserState, ProcessionState
from .procession import Procession
from .fission import Fission
from .phaser import Phaser
from heurams.services.logger import get_logger
logger = get_logger(__name__)
__all__ = ["PhaserState", "ProcessionState", "Procession", "Fission", "Phaser"]
logger.debug("反应堆模块已加载")

View File

@@ -0,0 +1,43 @@
import heurams.kernel.particles as pt
import heurams.kernel.puzzles as puz
import random
from .states import PhaserState
from heurams.services.logger import get_logger
class Fission:
"""裂变器: 单原子调度展开器"""
def __init__(self, atom: pt.Atom, phase=PhaserState.RECOGNITION):
self.logger = get_logger(__name__)
self.atom = atom
# print(f"{phase.value}")
self.orbital_schedule = atom.registry["orbital"]["schedule"][phase.value] # type: ignore
self.orbital_puzzles = atom.registry["orbital"]["puzzles"]
# print(self.orbital_schedule)
self.puzzles = list()
for item, possibility in self.orbital_schedule: # type: ignore
print(f"ad:{item}")
self.logger.debug(f"开始处理 orbital 项: {item}")
if not isinstance(possibility, float):
possibility = float(possibility)
while possibility > 1:
self.puzzles.append(
{
"puzzle": puz.puzzles[self.orbital_puzzles[item]["__origin__"]],
"alia": item,
}
)
possibility -= 1
if random.random() <= possibility:
self.puzzles.append(
{
"puzzle": puz.puzzles[self.orbital_puzzles[item]["__origin__"]],
"alia": item,
}
)
print(f"ok:{item}")
self.logger.debug(f"orbital 项处理完成: {item}")
def generate(self):
yield from self.puzzles

View File

@@ -0,0 +1,50 @@
# 移相器类定义
import heurams.kernel.particles as pt
from .states import PhaserState, ProcessionState
from .procession import Procession
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class Phaser:
"""移相器: 全局调度阶段管理器"""
def __init__(self, atoms: list[pt.Atom]) -> None:
logger.debug("Phaser.__init__: 原子数量=%d", len(atoms))
new_atoms = list()
old_atoms = list()
self.state = PhaserState.UNSURE
for i in atoms:
if not i.registry["electron"].is_activated():
new_atoms.append(i)
else:
old_atoms.append(i)
logger.debug("新原子数量=%d, 旧原子数量=%d", len(new_atoms), len(old_atoms))
self.processions = list()
if len(old_atoms):
self.processions.append(
Procession(old_atoms, PhaserState.QUICK_REVIEW, "初始复习")
)
logger.debug("创建初始复习 Procession")
if len(new_atoms):
self.processions.append(
Procession(new_atoms, PhaserState.RECOGNITION, "新记忆")
)
logger.debug("创建新记忆 Procession")
self.processions.append(Procession(atoms, PhaserState.FINAL_REVIEW, "总体复习"))
logger.debug("创建总体复习 Procession")
logger.debug("Phaser 初始化完成, processions 数量=%d", len(self.processions))
def current_procession(self):
logger.debug("Phaser.current_procession 被调用")
for i in self.processions:
i: Procession
if not i.state == ProcessionState.FINISHED:
self.state = i.phase
logger.debug("找到未完成的 Procession: phase=%s", i.phase)
return i
self.state = PhaserState.FINISHED
logger.debug("所有 Procession 已完成, 状态设置为 FINISHED")
return 0

View File

@@ -0,0 +1,73 @@
import heurams.kernel.particles as pt
from .states import PhaserState, ProcessionState
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class Procession:
"""队列: 标识单次记忆流程"""
def __init__(self, atoms: list, phase: PhaserState, name: str = ""):
logger.debug(
"Procession.__init__: 原子数量=%d, phase=%s, name='%s'",
len(atoms),
phase.value,
name,
)
self.atoms = atoms
self.queue = atoms.copy()
self.current_atom = atoms[0]
self.cursor = 0
self.name = name
self.phase = phase
self.state: ProcessionState = ProcessionState.RUNNING
logger.debug("Procession 初始化完成, 队列长度=%d", len(self.queue))
def forward(self, step=1):
logger.debug("Procession.forward: step=%d, 当前 cursor=%d", step, self.cursor)
self.cursor += step
if self.cursor == len(self.queue):
self.state = ProcessionState.FINISHED
logger.debug("Procession 已完成")
else:
self.state = ProcessionState.RUNNING
try:
logger.debug("cursor 更新为: %d", self.cursor)
self.current_atom = self.queue[self.cursor]
logger.debug("当前原子更新为: %s", self.current_atom.ident)
return 1 # 成功
except IndexError as e:
logger.debug("IndexError: %s", e)
self.state = ProcessionState.FINISHED
logger.debug("Procession 因索引错误而完成")
return 0
def append(self, atom=None):
if atom == None:
atom = self.current_atom
logger.debug("Procession.append: atom=%s", atom.ident if atom else "None")
if self.queue[len(self.queue) - 1] != atom or len(self) <= 1:
self.queue.append(atom)
logger.debug("原子已追加到队列, 新队列长度=%d", len(self.queue))
else:
logger.debug("原子未追加(重复或队列长度<=1")
def __len__(self):
length = len(self.queue) - self.cursor
logger.debug("Procession.__len__: 剩余长度=%d", length)
return length
def process(self):
logger.debug("Procession.process: cursor=%d", self.cursor)
return self.cursor
def total_length(self):
total = len(self.queue)
logger.debug("Procession.total_length: %d", total)
return total
def is_empty(self):
empty = len(self.queue)
logger.debug("Procession.is_empty: %d", empty)
return empty

View File

@@ -0,0 +1,20 @@
from enum import Enum, auto
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class PhaserState(Enum):
UNSURE = "unsure"
QUICK_REVIEW = "quick_review"
RECOGNITION = "recognition"
FINAL_REVIEW = "final_review"
FINISHED = "finished"
class ProcessionState(Enum):
RUNNING = auto()
FINISHED = auto()
logger.debug("状态枚举定义已加载")