feat: 进一步改进

This commit is contained in:
2025-12-31 00:57:07 +08:00
parent b65dad6a1f
commit eaa38fb880
43 changed files with 251 additions and 528 deletions

View File

@@ -1,61 +0,0 @@
# [调试] 将更改保存到文件
persist_to_file = 1
# [调试] 覆写时间, 设为 -1 以禁用
daystamp_override = -1
timestamp_override = -1
# [调试] 一键通过
quick_pass = 1
# 对于每个项目的默认新记忆原子数量
scheduled_num = 8
# UTC 时间戳修正 仅用于 UNIX 日时间戳的生成修正, 单位为秒
timezone_offset = +28800 # 中国标准时间 (UTC+8)
[interface]
[interface.memorizor]
autovoice = true # 自动语音播放, 仅限于 recognition 组件
[algorithm]
default = "SM-2" # 主要算法; 可选项: SM-2, SM-15M, FSRS
[puzzles] # 谜题默认配置
[puzzles.mcq]
max_riddles_num = 2
[puzzles.cloze]
min_denominator = 3
[paths] # 相对于配置文件的 ".." (即工作目录) 而言 或绝对路径
nucleon_dir = "./data/nucleon"
electron_dir = "./data/electron"
global_dir = "./data/global" # 全局数据路径, SM-15 等算法需要
orbital_dir = "./data/orbital"
cache_dir = "./data/cache"
template_dir = "./data/template"
[services] # 定义服务到提供者的映射
audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Android Termux), mpg123(TODO)
tts = "edgetts" # 可选项: edgetts
llm = "openai" # 可选项: openai
sync = "webdav" # 可选项: 留空, webdav
[providers.tts.edgetts] # EdgeTTS 设置
voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声)
[providers.llm.openai] # 与 OpenAI 相容的语言模型接口服务设置
url = ""
key = ""
[providers.sync.webdav] # WebDAV 同步设置
url = ""
username = ""
password = ""
remote_path = "/heurams/"
verify_ssl = true
[sync]

View File

@@ -61,7 +61,7 @@ class DashboardScreen(Screen):
Returns:
dict: 包含显示文本的字典,键为行号
"""
from heurams.kernel.particles.loader import load_electron, load_nucleon
from heurams.kernel.repository.particle_loader import load_electron, load_nucleon
result = {}
filestem = pathlib.Path(filename).stem

View File

@@ -8,7 +8,7 @@ from textual.screen import Screen
from textual.widgets import Button, Footer, Header, Label, Static
import heurams.kernel.particles as pt
import heurams.kernel.puzzles as pz
import heurams.kernel.evaluators as pz
from heurams.context import config_var
from heurams.kernel.reactor import *
from heurams.services.logger import get_logger

View File

@@ -5,7 +5,7 @@ from typing import TypedDict
import heurams.interface.widgets as pzw
import heurams.kernel.particles as pt
import heurams.kernel.puzzles as pz
import heurams.kernel.evaluators as pz
staging = {} # 细粒度缓存区, 是 ident -> quality 的封装

View File

@@ -8,7 +8,7 @@ from textual.widget import Widget
from textual.widgets import Button, Label
import heurams.kernel.particles as pt
import heurams.kernel.puzzles as pz
import heurams.kernel.evaluators as pz
from heurams.services.logger import get_logger
from .base_puzzle_widget import BasePuzzleWidget

View File

@@ -6,7 +6,7 @@ from textual.widget import Widget
from textual.widgets import Button, Label
import heurams.kernel.particles as pt
import heurams.kernel.puzzles as pz
import heurams.kernel.evaluators as pz
from heurams.services.hasher import hash
from heurams.services.logger import get_logger

View File

@@ -2,17 +2,20 @@ from heurams.services.logger import get_logger
from .sm2 import SM2Algorithm
from .sm15m import SM15MAlgorithm
from .base import BaseAlgorithm
logger = get_logger(__name__)
__all__ = [
"SM2Algorithm",
"BaseAlgorithm",
"SM15MAlgorithm",
]
algorithms = {
"SM-2": SM2Algorithm,
"SM-15M": SM15MAlgorithm,
# "SM-15M": SM15MAlgorithm,
"Base": BaseAlgorithm,
}
logger.debug("算法模块初始化完成, 注册的算法: %s", list(algorithms.keys()))

View File

@@ -10,7 +10,6 @@ class BaseAlgorithm:
algo_name = "BaseAlgorithm"
class AlgodataDict(TypedDict):
efactor: float
real_rept: int
rept: int
interval: int
@@ -52,7 +51,7 @@ class BaseAlgorithm:
return 1
@classmethod
def rate(cls, algodata) -> str:
def get_rating(cls, algodata) -> str:
"""获取评分信息"""
logger.debug(
"BaseAlgorithm.rate 被调用, algodata keys: %s",
@@ -68,3 +67,12 @@ class BaseAlgorithm:
list(algodata.keys()) if algodata else [],
)
return -1
@classmethod
def check_integrity(cls, algodata):
try:
cls.AlgodataDict(**algodata[cls.algo_name])
return 1
except:
return 0

View File

@@ -116,7 +116,7 @@ class SM2Algorithm(BaseAlgorithm):
return result
@classmethod
def rate(cls, algodata):
def get_rating(cls, algodata):
efactor = algodata[cls.algo_name]["efactor"]
logger.debug("SM2.rate: efactor=%f", efactor)
return str(efactor)

View File

@@ -1,20 +1,20 @@
"""
Puzzle 模块 - 谜题生成系统
Evaluator 模块 - 生成评估模块
提供多种类型的谜题生成器, 支持从字符串字典等数据源导入题目
提供多种类型的辅助评估生成器, 支持从字符串字典等数据源导入题目
"""
from heurams.services.logger import get_logger
logger = get_logger(__name__)
from .base import BasePuzzle
from .base import BaseEvaluator
from .cloze import ClozePuzzle
from .mcq import MCQPuzzle
from .recognition import RecognitionPuzzle
__all__ = [
"BasePuzzle",
"BaseEvaluator",
"ClozePuzzle",
"MCQPuzzle",
"RecognitionPuzzle",
@@ -24,12 +24,12 @@ puzzles = {
"mcq": MCQPuzzle,
"cloze": ClozePuzzle,
"recognition": RecognitionPuzzle,
"base": BasePuzzle,
"base": BaseEvaluator,
}
@staticmethod
def create_by_dict(config_dict: dict) -> BasePuzzle:
def create_by_dict(config_dict: dict) -> BaseEvaluator:
"""
根据配置字典创建谜题

View File

@@ -4,7 +4,7 @@ from heurams.services.logger import get_logger
logger = get_logger(__name__)
class BasePuzzle:
class BaseEvaluator:
"""谜题基类"""
def refresh(self):

View File

@@ -2,12 +2,12 @@ import random
from heurams.services.logger import get_logger
from .base import BasePuzzle
from .base import BaseEvaluator
logger = get_logger(__name__)
class ClozePuzzle(BasePuzzle):
class ClozePuzzle(BaseEvaluator):
"""填空题谜题生成器
Args:

View File

@@ -0,0 +1,11 @@
import random
from heurams.services.logger import get_logger
from .base import BaseEvaluator
logger = get_logger(__name__)
class GuessEvaluator(BaseEvaluator):
def __init__(self):
super().__init__()

View File

@@ -4,12 +4,12 @@ from typing import Dict, List, Optional, Union
from heurams.services.logger import get_logger
from .base import BasePuzzle
from .base import BaseEvaluator
logger = get_logger(__name__)
class MCQPuzzle(BasePuzzle):
class MCQPuzzle(BaseEvaluator):
"""选择题谜题生成器
该类用于生成和管理选择题谜题, 支持多个题目同时生成,

View File

@@ -3,12 +3,12 @@ import random
from heurams.services.logger import get_logger
from .base import BasePuzzle
from .base import BaseEvaluator
logger = get_logger(__name__)
class RecognitionPuzzle(BasePuzzle):
class RecognitionPuzzle(BaseEvaluator):
"""识别占位符"""
def __init__(self) -> None:

View File

@@ -1,29 +0,0 @@
"""
Particle 模块 - 粒子对象系统
提供闪卡所需对象, 使用物理学粒子的领域驱动设计
"""
from heurams.services.logger import get_logger
logger = get_logger(__name__)
logger.debug("粒子模块已加载")
from .atom import Atom, atom_registry
from .electron import Electron
from .loader import load_electron, load_nucleon
from .nucleon import Nucleon
from .orbital import Orbital
from .probe import probe_all, probe_by_filename
__all__ = [
"Electron",
"Nucleon",
"Orbital",
"Atom",
"probe_all",
"probe_by_filename",
"load_nucleon",
"load_electron",
"atom_registry",
]

View File

@@ -11,7 +11,6 @@ from heurams.services.logger import get_logger
from .electron import Electron
from .nucleon import Nucleon
from .orbital import Orbital
logger = get_logger(__name__)
@@ -19,19 +18,12 @@ logger = get_logger(__name__)
class AtomRegister_runtime(TypedDict):
locked: bool # 只读锁定标识符
min_rate: int # 最低评分
newact: bool # 新激活
new_activation: bool # 新激活
class AtomRegister(TypedDict):
nucleon: Nucleon
nucleon_path: pathlib.Path
nucleon_fmt: str
electron: Electron
electron_path: pathlib.Path
electron_fmt: str
orbital: Orbital
orbital_path: pathlib.Path
orbital_fmt: str
runtime: AtomRegister_runtime
@@ -44,50 +36,37 @@ class Atom:
以及关联路径
"""
def __init__(self, ident=""):
logger.debug("创建 Atom 实例, ident: '%s'", ident)
self.ident = ident
atom_registry[ident] = self
logger.debug("Atom 已注册到全局注册表, 当前注册表大小: %d", len(atom_registry))
# self.is_evaled = False
default_runtime = {
"locked": False,
"min_rate": 0x3F3F3F3F,
"new_activation": False,
}
def __init__(self, nucleon_obj = None, electron_obj = None, orbital_obj = None):
self.registry: AtomRegister = { # type: ignore
"nucleon": None,
"nucleon_path": None,
"nucleon_fmt": "toml",
"electron": None,
"electron_path": None,
"electron_fmt": "json",
"orbital": None,
"orbital_path": None, # 允许设置为 None, 此时使用 nucleon 文件内的推荐配置
"orbital_fmt": "toml",
"runtime": {"locked": False, "min_rate": 0x3F3F3F3F, "newact": False},
"ident": nucleon_obj["ident"], # type: ignore
"nucleon": nucleon_obj,
"electron": electron_obj,
"orbital": orbital_obj,
"runtime": dict(),
}
logger.debug("Atom 初始化完成")
def link(self, key, value):
logger.debug("Atom.link: key='%s', value type: %s", key, type(value).__name__)
if key in self.registry.keys():
self.registry[key] = value
logger.debug("'%s' 已链接, 触发 do_eval", key)
if key == "electron":
if self.registry["electron"].is_activated() == 0:
self.registry["runtime"]["newact"] = True
else:
logger.error("尝试链接不受支持的键: '%s'", key)
raise ValueError("不受支持的原子元数据链接操作")
if self.registry["electron"].is_activated() == 0:
self.registry["runtime"]["new_activation"] = True
def init_runtime(self):
self.registry['runtime'] = AtomRegister_runtime(**self.default_runtime)
def do_eval(self):
"""
执行并以结果替换当前单元的所有 eval 语句
TODO: 带有限制的 eval, 异步/多线程执行避免堵塞
"""
logger.debug("EVAL 开始")
# eval 环境设置
def eval_with_env(s: str):
default = config_var.get()["puzzles"]
payload = self.registry["nucleon"].payload
metadata = self.registry["nucleon"].metadata
nucleon = self.registry["nucleon"]
payload = nucleon # 兼容历史遗留问题
metadata = nucleon # 兼容历史遗留问题
eval_value = eval(s)
if isinstance(eval_value, (int, float)):
ret = str(eval_value)
@@ -158,39 +137,11 @@ class Atom:
logger.debug(f"允许总评分: {self.registry['runtime']['min_rate']}")
self.registry["electron"].revisor(
self.registry["runtime"]["min_rate"],
is_new_activation=self.registry["runtime"]["newact"],
is_new_activation=self.registry["runtime"]["new_activation"],
)
else:
logger.debug("禁止总评分")
def persist(self, key):
logger.debug("Atom.persist: key='%s'", key)
path: pathlib.Path | None = self.registry[key + "_path"]
if isinstance(path, pathlib.Path):
path = typing.cast(pathlib.Path, path)
logger.debug("持久化路径: %s, 格式: %s", path, self.registry[key + "_fmt"])
path.parent.mkdir(parents=True, exist_ok=True)
if self.registry[key + "_fmt"] == "toml":
with open(path, "r+") as f:
f.seek(0)
f.truncate()
toml.dump(self.registry[key], f)
logger.debug("TOML 数据已保存到: %s", path)
elif self.registry[key + "_fmt"] == "json":
with open(path, "r+") as f:
origin = json.load(f)
f.seek(0)
f.truncate()
origin[self.ident] = self.registry[key].algodata
json.dump(origin, f, indent=2, ensure_ascii=False)
logger.debug("JSON 数据已保存到: %s", path)
else:
logger.error("不受支持的持久化格式: %s", self.registry[key + "_fmt"])
raise KeyError("不受支持的持久化格式")
else:
logger.error("路径未初始化: %s_path", key)
raise TypeError("对未初始化的路径对象操作")
def __getitem__(self, key):
logger.debug("Atom.__getitem__: key='%s'", key)
if key in self.registry:
@@ -211,9 +162,3 @@ class Atom:
logger.error("不支持的键: '%s'", key)
raise KeyError(f"不支持的键: {key}")
@staticmethod
def placeholder():
return (Electron.placeholder(), Nucleon.placeholder(), {})
atom_registry: bidict.bidict[str, Atom] = bidict.bidict()

View File

@@ -1,100 +1,71 @@
from typing import TypedDict
import heurams.services.timer as timer
from heurams.context import config_var
from heurams.kernel.algorithms import algorithms
from heurams.services.logger import get_logger
import heurams.kernel.algorithms as algolib
from copy import deepcopy
logger = get_logger(__name__)
class QueryType(TypedDict):
is_due: bool
is_activated: bool
rating: int
nextdate: int
lastdate: int
class Electron:
"""电子: 记忆分析元数据及算法"""
"""电子: 单算法支持的记忆数据包装"""
def __init__(self, ident: str, algodata: dict = {}, algo_name: str = ""):
"""初始化电子对象 (记忆数据)
Args:
ident: 算法的唯一标识符, 用于区分不同的算法实例, 使用 algodata[ident] 获取
algodata: 算法数据字典, 包含算法的各项参数和设置
algo: 使用的算法模块标识
algodata: 算法数据字典引用, 包含算法的各项参数和设置
algo_name: 使用的算法模块标识
"""
if algo_name == "":
algo_name = config_var.get()["algorithm"]["default"]
logger.debug(
"创建 Electron 实例, ident: '%s', algo_name: '%s', algodata: %s",
ident,
algo_name,
algodata,
)
algo_name = "sm-2"
self.algodata = algodata
self.ident = ident
self.algo = algorithms[algo_name]
logger.debug("使用的算法类: %s", self.algo.__name__)
self.algo: algolib.BaseAlgorithm = algorithms[algo_name]
self.query = dict()
if self.algo.algo_name not in self.algodata.keys():
self.algodata[self.algo.algo_name] = {}
logger.debug("算法键 '%s' 不存在, 已创建空字典", self.algo)
if not self.algodata[self.algo.algo_name]:
logger.debug(
f"算法数据为空, 使用默认值初始化{self.algodata[self.algo.algo_name]}"
)
self._default_init(self.algo.defaults)
else:
logger.debug("算法数据已存在, 跳过默认初始化")
logger.debug(
"Electron 初始化完成, algodata keys: %s", list(self.algodata.keys())
)
def _default_init(self, defaults: dict):
"""默认初始化包装"""
logger.debug(
"Electron._default_init: 使用默认值, keys: %s", list(defaults.keys())
)
self.algodata[self.algo.algo_name] = defaults.copy()
if self.algo.check_integrity(self.algodata):
self.algodata[self.algo.algo_name] = deepcopy(self.algo.defaults)
def activate(self):
"""激活此电子"""
logger.debug("Electron.activate: 激活 ident='%s'", self.ident)
self.algodata[self.algo.algo_name]["is_activated"] = 1
self.algodata[self.algo.algo_name]["last_modify"] = timer.get_timestamp()
logger.debug("电子已激活, is_activated=1")
def modify(self, var: str, value):
def modify(self, key, value):
"""修改 algodata[algo] 中子字典数据"""
logger.debug("Electron.modify: var='%s', value=%s", var, value)
if var in self.algodata[self.algo.algo_name]:
self.algodata[self.algo.algo_name][var] = value
if key in self.algodata[self.algo.algo_name]:
self.algodata[self.algo.algo_name][key] = value
self.algodata[self.algo.algo_name]["last_modify"] = timer.get_timestamp()
logger.debug("变量 '%s' 已修改, 更新 last_modify", var)
else:
logger.warning("'%s' 非已知元数据字段", var)
print(f"警告: '{var}' 非已知元数据字段")
raise AttributeError("不存在的子键")
def is_due(self):
"""是否应该复习"""
logger.debug("Electron.is_due: 检查 ident='%s'", self.ident)
result = self.algo.is_due(self.algodata)
logger.debug("is_due 结果: %s", result)
return result and self.is_activated()
def is_activated(self):
result = self.algodata[self.algo.algo_name]["is_activated"]
logger.debug("Electron.is_activated: ident='%s', 结果: %d", self.ident, result)
return result
def get_rate(self):
"评价"
def get_rating(self):
try:
logger.debug("Electron.rate: ident='%s'", self.ident)
result = self.algo.rate(self.algodata)
logger.debug("rate 结果: %s", result)
result = self.algo.get_rating(self.algodata)
return result
except:
return 0
def nextdate(self) -> int:
logger.debug("Electron.nextdate: ident='%s'", self.ident)
result = self.algo.nextdate(self.algodata)
logger.debug("nextdate 结果: %d", result)
return result
def revisor(self, quality: int = 5, is_new_activation: bool = False):
@@ -104,32 +75,7 @@ class Electron:
quality (int): 记忆保留率量化参数 (0-5)
is_new_activation (bool): 是否为初次激活
"""
logger.debug(
"Electron.revisor: ident='%s', quality=%d, is_new_activation=%s",
self.ident,
quality,
is_new_activation,
)
self.algo.revisor(self.algodata, quality, is_new_activation)
logger.debug(
"revisor 完成, 更新后的 algodata: %s", self.algodata.get(self.algo, {})
)
def __str__(self):
return (
f"记忆单元预览 \n"
f"标识符: '{self.ident}' \n"
f"算法: {self.algo} \n"
f"易度系数: {self.algodata[self.algo.algo_name]['efactor']:.2f} \n"
f"已经重复的次数: {self.algodata[self.algo.algo_name]['rept']} \n"
f"下次间隔: {self.algodata[self.algo.algo_name]['interval']}\n"
f"下次复习日期时间戳: {self.algodata[self.algo.algo_name]['next_date']}"
)
def __eq__(self, other):
if self.ident == other.ident:
return True
return False
def __hash__(self):
return hash(self.ident)
@@ -151,8 +97,11 @@ class Electron:
def __len__(self):
"""仅返回当前算法的配置数量"""
return len(self.algodata[self.algo.algo_name])
@staticmethod
def placeholder():
"""生成一个电子占位符"""
return Electron("电子对象样例内容", {})
def create_on_electonic_data(electronic_data: tuple, algo_name: str = ""):
_data = electronic_data
ident = _data[0]
algodata = _data[1]
ident = ident
return Electron(ident, algodata, algo_name)

View File

@@ -1,74 +0,0 @@
import json
import pathlib
from copy import deepcopy
import toml
import heurams.services.hasher as hasher
from heurams.services.logger import get_logger
from .electron import Electron
from .nucleon import Nucleon
logger = get_logger(__name__)
def load_nucleon(path: pathlib.Path, fmt="toml"):
logger.debug("load_nucleon: 加载文件 %s, 格式: %s", path, fmt)
with open(path, "r") as f:
dictdata = dict()
dictdata = toml.load(f) # type: ignore
logger.debug("TOML 解析成功, keys: %s", list(dictdata.keys()))
lst = list()
nested_data = dict()
# 修正 toml 解析器的不管嵌套行为
for key, value in dictdata.items():
if "__metadata__" in key: # 以免影响句号
if "." in key:
parts = key.split(".")
current = nested_data
for part in parts[:-1]:
if part not in current:
current[part] = {}
current = current[part]
current[parts[-1]] = value
logger.debug("处理元数据键: %s", key)
else:
nested_data[key] = value
logger.debug("嵌套数据处理完成, keys: %s", list(nested_data.keys()))
# print(nested_data)
for item, attr in nested_data.items():
if item == "__metadata__":
continue
logger.debug("处理项目: %s", item)
lst.append(
(
Nucleon(item, attr, deepcopy(nested_data["__metadata__"])),
deepcopy(nested_data["__metadata__"]["orbital"]),
)
)
logger.debug("load_nucleon 完成, 加载了 %d 个 Nucleon 对象", len(lst))
return lst
def load_electron(path: pathlib.Path, fmt="json") -> dict:
"""从文件路径加载电子对象
Args:
path (pathlib.Path): 路径
fmt (str): 文件格式(可选, 默认 json)
Returns:
dict: 键名是电子对象名称, 值是电子对象
"""
logger.debug("load_electron: 加载文件 %s, 格式: %s", path, fmt)
with open(path, "r") as f:
dictdata = dict()
dictdata = json.load(f) # type: ignore
logger.debug("JSON 解析成功, keys: %s", list(dictdata.keys()))
dic = dict()
for item, attr in dictdata.items():
logger.debug("处理电子项目: %s, %s", item, attr)
dic[item] = Electron(item, attr)
logger.debug("load_electron 完成, 加载了 %d 个 Electron 对象", len(dic))
return dic

View File

@@ -2,55 +2,55 @@ from heurams.services.logger import get_logger
logger = get_logger(__name__)
class Nucleon:
"""原子核: 材料元数据"""
"""原子核: 带有运行时隔离的半只读材料元数据容器
"""
def __init__(self, ident: str, payload: dict, metadata: dict = {}):
"""初始化原子核 (记忆内容)
Args:
ident: 唯一标识符
payload: 记忆内容信息
metadata: 可选元数据信息
"""
logger.debug(
"创建 Nucleon 实例, ident: '%s', payload keys: %s, metadata keys: %s",
ident,
list(payload.keys()) if payload else [],
list(metadata.keys()) if metadata else [],
)
self.metadata = metadata
self.payload = payload
def __init__(self, ident, payload, common):
self.ident = ident
logger.debug("Nucleon 初始化完成")
self.payload = payload
self.common = common
self.rtlayer = dict() # 运行时层
def __getitem__(self, key):
logger.debug("Nucleon.__getitem__: key='%s'", key)
if key == "ident":
logger.debug("返回 ident: '%s'", self.ident)
return self.ident
if key in self.payload:
value = self.payload[key]
logger.debug(
"返回 payload['%s'], value type: %s", key, type(value).__name__
)
return value
merged = self.rtlayer | self.payload | self.common
return merged[key]
def __setitem__(self, key, value):
if key == "ident":
raise AttributeError("ident 应为只读")
else:
logger.error("'%s' 未在 payload 中找到", key)
raise KeyError(f"Key '{key}' not found in payload.")
self.rtlayer[key] = value
def __delitem__(self, key):
raise AttributeError("Nucleon 包含的数据被设计为无法删除")
def __iter__(self):
yield from self.payload.keys()
merged = self.rtlayer | self.payload | self.common
return iter(merged)
def __contains__(self, key):
return key in (self.rtlayer | self.payload | self.common)
def get(self, key, default=None):
if key in self:
return self[key]
return default
def __len__(self):
return len(self.payload)
return len(self.rtlayer | self.payload | self.common)
def __hash__(self):
return hash(self.ident)
def __repr__(self):
return f"""RUNTIME:{repr(self.rtlayer)}
PAYLOAD:{repr(self.payload)}
COMMON:{repr(self.common)}"""
@staticmethod
def placeholder():
"""生成一个占位原子核"""
logger.debug("创建 Nucleon 占位符")
return Nucleon("核子对象样例内容", {})
def create_on_nucleonic_data(nucleonic_data: tuple):
_data = nucleonic_data
payload = _data[1][0]
common = _data[1][1]
ident = _data[0] #TODO:实现eval
return Nucleon(ident, payload, common)

View File

@@ -1,30 +0,0 @@
from typing import TypedDict
from heurams.services.logger import get_logger
logger = get_logger(__name__)
logger.debug("Orbital 类型定义模块已加载")
class OrbitalSchedule(TypedDict):
quick_review: list
recognition: list
final_review: list
class Orbital(TypedDict):
schedule: OrbitalSchedule
puzzles: dict
"""一份示例
["__metadata__.orbital.puzzles"] # 谜题定义
"Recognition" = { __origin__ = "recognition", __hint__ = "", primary = "eval:nucleon['content']", secondery = ["eval:nucleon['keyword_note']", "eval:nucleon['note']"], top_dim = ["eval:nucleon['translation']"] }
"SelectMeaning" = { __origin__ = "mcq", __hint__ = "eval:nucleon['content']", jammer = "eval:nucleon['keyword_note']", max_riddles_num = "eval:default['mcq']['max_riddles_num']", prefix = "选择正确项: " }
"FillBlank" = { __origin__ = "cloze", __hint__ = "", text = "eval:nucleon['content']", delimiter = "eval:metadata['formation']['delimiter']", min_denominator = "eval:default['cloze']['min_denominator']"}
["__metadata__.orbital.schedule"] # 内置的推荐学习方案
quick_review = [["FillBlank", "1.0"], ["SelectMeaning", "0.5"], ["recognition", "1.0"]]
recognition = [["recognition", "1.0"]]
final_review = [["FillBlank", "0.7"], ["SelectMeaning", "0.7"], ["recognition", "1.0"]]
"""

View File

@@ -1,62 +0,0 @@
import pathlib
from heurams.context import config_var
from heurams.services.logger import get_logger
logger = get_logger(__name__)
def probe_by_filename(filename):
"""探测指定文件 (无扩展名) 的所有信息"""
logger.debug("probe_by_filename: 探测文件 '%s'", filename)
paths: dict = config_var.get().get("paths")
logger.debug("配置路径: %s", paths)
formats = ["toml", "json"]
result = {}
for item, attr in paths.items():
for i in formats:
attr: pathlib.Path = pathlib.Path(attr) / filename + "." + i
if attr.exists():
logger.debug("找到文件: %s", attr)
result[item.replace("_dir", "")] = str(attr)
else:
logger.debug("文件不存在: %s", attr)
logger.debug("probe_by_filename 结果: %s", result)
return result
def probe_all(is_stem=1):
"""依据目录探测所有信息
Args:
is_stem (boolean): 是否**删除**文件扩展名
Returns:
dict: 有三项, 每一项的键名都是文件组类型, 值都是文件组列表, 只包含文件名
"""
logger.debug("probe_all: 开始探测, is_stem=%d", is_stem)
paths: dict = config_var.get().get("paths")
logger.debug("配置路径: %s", paths)
result = {}
for item, attr in paths.items():
attr: pathlib.Path = pathlib.Path(attr)
result[item.replace("_dir", "")] = list()
logger.debug("扫描目录: %s", attr)
file_count = 0
for i in attr.iterdir():
if not i.is_dir():
file_count += 1
if is_stem:
result[item.replace("_dir", "")].append(str(i.stem))
else:
result[item.replace("_dir", "")].append(str(i.name))
logger.debug("目录 %s 中找到 %d 个文件", attr, file_count)
logger.debug("probe_all 完成, 结果 keys: %s", list(result.keys()))
return result
if __name__ == "__main__":
import os
print(os.getcwd())
print(probe_all())

View File

@@ -1,7 +1,7 @@
import random
import heurams.kernel.particles as pt
import heurams.kernel.puzzles as puz
import heurams.kernel.evaluators as puz
from heurams.services.logger import get_logger
from .states import PhaserState

View File

@@ -0,0 +1,5 @@
from ...utils.lict import Lict
def merge(x: Lict, y: Lict):
return Lict(list(x.values()) + list(y.values()))

View File

@@ -1,44 +1,56 @@
from functools import reduce
from pathlib import Path
import heurams.kernel.particles as pt
import toml
import json
from .lict import Lict
from ...utils.lict import Lict
from .refvar import RefVar
class Repo():
file_mapping = {
"orbital": "orbital.toml",
"schedule": "schedule.toml",
"payload": "payload.toml",
"algodata": "algodata.json",
"manifest": "manifest.toml",
"formation": "formation.toml",
"typedef": "typedef.toml",
}
type_mapping = {
"orbital": "dict",
"schedule": "dict",
"payload": "lict",
"algodata": "lict",
"manifest": "dict",
"formation": "dict",
"typedef": "dict",
}
default_save_list = ["algodata"]
def __init__(self, orbital, payload, manifest, formation, algodata, source = None) -> None:
self.orbital: pt.Orbital = orbital
self.payload: Lict = payload
def __init__(self, schedule: dict, payload: Lict, manifest: dict, typedef: dict, algodata: Lict, source = None) -> None:
self.schedule: dict = schedule
self.manifest: dict = manifest
self.formation: dict = formation
self.typedef: dict = typedef
self.payload: Lict = payload
self.algodata: Lict = algodata
self.source: Path | None = source # 若存在, 指向 repo 所在 dir
self.database = {
"orbital": self.orbital,
"schedule": self.schedule,
"payload": self.payload,
"manifest": self.manifest,
"formation": self.formation,
"typedef": self.typedef,
"algodata": self.algodata,
"source": self.source,
}
self.generate_particles_data()
def generate_particles_data(self):
self.nucleonic_data_lict = Lict(list(map(self._attach(self.typedef), self.payload)))
self.electronic_data_lict = self.algodata
@staticmethod
def _attach(value):
def inner(x):
return (x, value)
return inner
def __len__(self):
return len(self.payload)
@@ -70,11 +82,11 @@ class Repo():
@classmethod
def create_new_repo(cls, source = None):
default_database = {
"orbital": {"puzzles": {}, "schedule": {}},
"payload": [],
"algodata": [],
"schedule": {},
"payload": Lict([]),
"algodata": Lict([]),
"manifest": {},
"formation": {"annotation": {}, "unified": {}},
"typedef": {},
"source": source
}
return Repo(**default_database)

View File

@@ -3,8 +3,8 @@ from heurams.services.logger import get_logger
logger = get_logger(__name__)
ver = "0.4.3"
ver = "0.5.0"
stage = "prototype"
codename = "fledge" # 雏鸟, 0.4.x 版本
codename = "fulcrom" # 支点
logger.info("HeurAMS 版本: %s (%s), 阶段: %s", ver, codename, stage)

View File

@@ -1,2 +0,0 @@
# Utils - 实用工具
脚本与部分分离式工具函数

View File

View File

@@ -0,0 +1,28 @@
from typing import Any
class Evalizer():
def __init__(self, environment: dict) -> None:
self.env = environment
def __call__(self, *args: Any, **kwds: Any) -> Any:
def do_eval(self):
"""
执行并以结果替换当前单元的所有 eval 语句
TODO: 带有限制的 eval, 异步/多线程执行避免堵塞
"""
# eval 环境设置
def eval_with_env(s: str):
default = config_var.get()["puzzles"]
payload = self.registry["nucleon"].payload
metadata = self.registry["nucleon"].metadata
eval_value = eval(s)
if isinstance(eval_value, (int, float)):
ret = str(eval_value)
else:
ret = eval_value
return ret