feat: 完成部分界面重构

This commit is contained in:
2026-01-02 06:12:49 +08:00
parent 9b32a01a10
commit eced6130f1
26 changed files with 700 additions and 323 deletions

View File

@@ -31,13 +31,11 @@ max_riddles_num = 2
min_denominator = 3
[paths] # 相对于配置文件的 ".." (即工作目录) 而言 或绝对路径
nucleon_dir = "./data/nucleon"
electron_dir = "./data/electron"
global_dir = "./data/global" # 全局数据路径, SM-15 等算法需要
orbital_dir = "./data/orbital"
cache_dir = "./data/cache"
template_dir = "./data/template"
data = "./data"
cache = "./data/cache"
config = "./data/config"
global = "./data/global"
repo = "./data/repo"
[services] # 定义服务到提供者的映射
audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Android Termux), mpg123(TODO)
tts = "edgetts" # 可选项: edgetts

View File

@@ -1,3 +1,3 @@
title = "测试单元: 过秦论"
title = "测试单元 (过秦论)"
author = "__heurams__"
desc = "高考古诗文: 过秦论"

View File

@@ -21,10 +21,27 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 1,
"id": "a5ed9864",
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[01;34m.\u001b[0m\n",
"├── \u001b[00mrepo.ipynb\u001b[0m\n",
"└── \u001b[01;34mtest_repo\u001b[0m\n",
" ├── \u001b[00malgodata.json\u001b[0m\n",
" ├── \u001b[00mmanifest.toml\u001b[0m\n",
" ├── \u001b[00mpayload.toml\u001b[0m\n",
" ├── \u001b[00mschedule.toml\u001b[0m\n",
" └── \u001b[00mtypedef.toml\u001b[0m\n",
"\n",
"2 directories, 6 files\n"
]
}
],
"source": [
"!tree # 了解文件结构"
]
@@ -39,10 +56,18 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 2,
"id": "9777730e",
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"zsh:1: no matches found: heurams.log*\n"
]
}
],
"source": [
"!rm -rf test_new_repo\n",
"!rm -rf heurams.log*"
@@ -60,10 +85,21 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 3,
"id": "bf1b00c8",
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"欢迎使用 HeurAMS 及其组件!\n",
"rootdir: /mnt/data/Devel/HeurAMS/HeurAMS/src/heurams\n",
"workdir: /mnt/data/Devel/HeurAMS/HeurAMS/examples\n",
"未能加载自定义用户配置\n"
]
}
],
"source": [
"import heurams.kernel.repolib as repolib # 这是 RepoLib 子模块, 用于管理和结构化 repo(中文含义: 仓库) 数据结构与本地文件间的联系\n",
"import heurams.kernel.particles as pt # 这是 Particles(中文含义: 粒子) 子模块, 用于运行时的记忆管理操作\n",
@@ -82,10 +118,18 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 4,
"id": "897b62d7",
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"这是一个 合规 的 repo!\n"
]
}
],
"source": [
"is_vaild = repolib.Repo.check_repodir(Path(\"./test_repo\"))\n",
"print(f\"这是一个 {'合规' if is_vaild else '不合规'} 的 repo!\")"
@@ -102,7 +146,7 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 5,
"id": "708ae7e4",
"metadata": {},
"outputs": [],
@@ -123,10 +167,67 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 6,
"id": "a11115fb",
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'algodata': [('君臣固守以窥周室,', {}), ('秦孝公据崤函之固, 拥雍州之地,', {})],\n",
" 'manifest': {'author': '__heurams__',\n",
" 'desc': '高考古诗文: 过秦论',\n",
" 'title': '测试单元: 过秦论'},\n",
" 'payload': [('君臣固守以窥周室,',\n",
" {'content': '君臣/固守/以窥/周室,/',\n",
" 'keyword_note': {'窥': '窥视'},\n",
" 'note': [],\n",
" 'translation': '君臣牢固地守卫着,借以窥视周王室的权力,'}),\n",
" ('秦孝公据崤函之固, 拥雍州之地,',\n",
" {'content': '秦孝公/据/崤函/之固/, 拥/雍州/之地,/',\n",
" 'keyword_note': {'崤函': '崤山和函谷关', '据': '占据', '雍州': '古代九州之一'},\n",
" 'note': [],\n",
" 'translation': '秦孝公占据着崤山和函谷关的险固地势,拥有雍州的土地,'})],\n",
" 'schedule': {'phases': {'final_review': [['FillBlank', '0.7'],\n",
" ['SelectMeaning', '0.7'],\n",
" ['Recognition', '1.0']],\n",
" 'quick_review': [['FillBlank', '1.0'],\n",
" ['SelectMeaning', '0.5'],\n",
" ['Recognition', '1.0']],\n",
" 'recognition': [['Recognition', '1.0']]},\n",
" 'schedule': ['quick_review', 'recognition', 'final_review']},\n",
" 'source': PosixPath('test_repo'),\n",
" 'typedef': {'annotation': {'content': '内容',\n",
" 'delimiter': '分隔符',\n",
" 'keyword_note': '关键词翻译',\n",
" 'note': '笔记',\n",
" 'translation': '语句翻译',\n",
" 'tts_text': '文本转语音文本'},\n",
" 'common': {'delimiter': '/',\n",
" 'tts_text': \"eval:payload['content'].replace('/', '')\"},\n",
" 'puzzles': {'FillBlank': {'__hint__': '',\n",
" '__origin__': 'cloze',\n",
" 'delimiter': \"eval:metadata['formation']['delimiter']\",\n",
" 'min_denominator': \"eval:default['cloze']['min_denominator']\",\n",
" 'text': \"eval:payload['content']\"},\n",
" 'Recognition': {'__hint__': '',\n",
" '__origin__': 'recognition',\n",
" 'primary': \"eval:payload['content']\",\n",
" 'secondary': [\"eval:payload['keyword_note']\",\n",
" \"eval:payload['note']\"],\n",
" 'top_dim': [\"eval:payload['translation']\"]},\n",
" 'SelectMeaning': {'__hint__': \"eval:payload['content']\",\n",
" '__origin__': 'mcq',\n",
" 'jammer': \"eval:list(payload['keyword_note'].values())\",\n",
" 'mapping': \"eval:payload['keyword_note']\",\n",
" 'max_riddles_num': \"eval:default['mcq']['max_riddles_num']\",\n",
" 'prefix': '选择正确项: ',\n",
" 'primary': \"eval:payload['content']\"}},\n",
" '古文句': {}}}\n"
]
}
],
"source": [
"test_repo_dic = test_repo.export_to_single_dict()\n",
"from pprint import pprint\n",
@@ -157,10 +258,34 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 7,
"id": "05eeaacc",
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\u001b[01;34m.\u001b[0m\n",
"├── \u001b[00mheurams.log\u001b[0m\n",
"├── \u001b[00mrepo.ipynb\u001b[0m\n",
"├── \u001b[01;34mtest_new_repo\u001b[0m\n",
"│   ├── \u001b[00malgodata.json\u001b[0m\n",
"│   ├── \u001b[00mmanifest.toml\u001b[0m\n",
"│   ├── \u001b[00mpayload.toml\u001b[0m\n",
"│   ├── \u001b[00mschedule.toml\u001b[0m\n",
"│   └── \u001b[00mtypedef.toml\u001b[0m\n",
"└── \u001b[01;34mtest_repo\u001b[0m\n",
" ├── \u001b[00malgodata.json\u001b[0m\n",
" ├── \u001b[00mmanifest.toml\u001b[0m\n",
" ├── \u001b[00mpayload.toml\u001b[0m\n",
" ├── \u001b[00mschedule.toml\u001b[0m\n",
" └── \u001b[00mtypedef.toml\u001b[0m\n",
"\n",
"3 directories, 12 files\n"
]
}
],
"source": [
"test_repo.persist_to_repodir(save_list=[\"schedule\", \"payload\", \"manifest\", \"typedef\", \"algodata\"], source=Path(\"test_new_repo\"))\n",
"!tree"
@@ -196,10 +321,19 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 8,
"id": "7e88bd7c",
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('age', 12), ('enemy', 'jerry'), ('name', 'tom')]\n",
"[('age', 12), ('enemy', 'jerry'), ('name', 'tom')]\n"
]
}
],
"source": [
"from heurams.utils.lict import Lict\n",
"lct = Lict() # 空的\n",
@@ -221,10 +355,18 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 9,
"id": "248f6cba",
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'name': 'tom', 'age': 12, 'enemy': 'jerry'}\n"
]
}
],
"source": [
"print(lct.dicted_data)"
]
@@ -243,10 +385,20 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 10,
"id": "a0eb07a7",
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('age', 12), ('enemy', 'jerry'), ('name', 'tom')]\n",
"[('age', 12), ('enemy', 'jerry'), ('name', 'tom'), ('type', 'cat')]\n",
"[('age', 12), ('enemy', 'jerry'), ('is_human', False), ('name', 'tom'), ('type', 'cat')]\n"
]
}
],
"source": [
"# 由于 jupyter 的环境处理, 请不要重复运行此单元格, 如果想再看一遍, 请重启 jupyter 后再全部运行\n",
"\n",
@@ -278,10 +430,18 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 11,
"id": "0ab442d4",
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'age': 12, 'enemy': 'jerry', 'is_human': False, 'name': 'tom', 'type': 'cat', 'enemy_2': 'spike'}\n"
]
}
],
"source": [
"# 由于 jupyter 的环境处理, 请不要重复运行此单元格, 如果想再看一遍, 请重启 jupyter 后再全部运行\n",
"\n",
@@ -303,10 +463,49 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 12,
"id": "f3ca752f",
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('age', 12), ('enemy', 'jerry'), ('enemy_2', 'spike'), ('is_human', False), ('name', 'tom'), ('type', 'cat')]\n",
"{'age': 12, 'enemy': 'jerry', 'is_human': False, 'name': 'tom', 'type': 'cat', 'enemy_2': 'spike'}\n",
"------\n",
"('age', 12)\n",
"('enemy', 'jerry')\n",
"('enemy_2', 'spike')\n",
"('is_human', False)\n",
"('name', 'tom')\n",
"('type', 'cat')\n",
"6\n",
"('type', 'cat')\n",
"[('age', 12), ('enemy', 'jerry'), ('enemy_2', 'spike'), ('is_human', False), ('name', 'tom')]\n",
"('name', 'tom')\n",
"[('age', 12), ('enemy', 'jerry'), ('enemy_2', 'spike'), ('is_human', False)]\n",
"('is_human', False)\n",
"[('age', 12), ('enemy', 'jerry'), ('enemy_2', 'spike')]\n",
"('enemy_2', 'spike')\n",
"[('age', 12), ('enemy', 'jerry')]\n",
"('enemy', 'jerry')\n",
"[('age', 12)]\n",
"('age', 12)\n",
"[]\n"
]
},
{
"data": {
"text/plain": [
"Ellipsis"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"lct = Lict(initdict={'age': 12, 'enemy': 'jerry', 'is_human': False, 'name': 'tom', 'type': 'cat', 'enemy_2': 'spike'})\n",
"print(lct)\n",
@@ -332,7 +531,7 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 13,
"id": "773bf99c",
"metadata": {},
"outputs": [],
@@ -340,6 +539,131 @@
"!rm -rf test_new_repo\n",
"!rm -rf heurams.log*"
]
},
{
"cell_type": "code",
"execution_count": 14,
"id": "8645c5a2",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{ 'content': '君臣/固守/以窥/周室,/',\n",
" 'delimiter': '/',\n",
" 'keyword_note': {'窥': '窥视'},\n",
" 'note': [],\n",
" 'translation': '君臣牢固地守卫着,借以窥视周王室的权力,',\n",
" 'tts_text': '君臣固守以窥周室,'}\n",
"{ 'SM-2': { 'efactor': 2.5,\n",
" 'interval': 1,\n",
" 'is_activated': 1,\n",
" 'last_date': 20454,\n",
" 'last_modify': 1767274438.752494,\n",
" 'next_date': 20455,\n",
" 'real_rept': 1,\n",
" 'rept': 0}}\n",
"{ 'content': '秦孝公/据/崤函/之固/, 拥/雍州/之地,/',\n",
" 'delimiter': '/',\n",
" 'keyword_note': {'崤函': '崤山和函谷关', '据': '占据', '雍州': '古代九州之一'},\n",
" 'note': [],\n",
" 'translation': '秦孝公占据着崤山和函谷关的险固地势,拥有雍州的土地,',\n",
" 'tts_text': '秦孝公据崤函之固, 拥雍州之地,'}\n",
"{ 'SM-2': { 'efactor': 2.5,\n",
" 'interval': 1,\n",
" 'is_activated': 1,\n",
" 'last_date': 20454,\n",
" 'last_modify': 1767274438.7534873,\n",
" 'next_date': 20455,\n",
" 'real_rept': 1,\n",
" 'rept': 0}}\n",
"{ 'algodata': [ ( '君臣固守以窥周室,',\n",
" { 'SM-2': { 'efactor': 2.5,\n",
" 'interval': 1,\n",
" 'is_activated': 1,\n",
" 'last_date': 20454,\n",
" 'last_modify': 1767274438.752494,\n",
" 'next_date': 20455,\n",
" 'real_rept': 1,\n",
" 'rept': 0}}),\n",
" ( '秦孝公据崤函之固, 拥雍州之地,',\n",
" { 'SM-2': { 'efactor': 2.5,\n",
" 'interval': 1,\n",
" 'is_activated': 1,\n",
" 'last_date': 20454,\n",
" 'last_modify': 1767274438.7534873,\n",
" 'next_date': 20455,\n",
" 'real_rept': 1,\n",
" 'rept': 0}})],\n",
" 'manifest': { 'author': '__heurams__',\n",
" 'desc': '高考古诗文: 过秦论',\n",
" 'title': '测试单元: 过秦论'},\n",
" 'payload': [ ( '君臣固守以窥周室,',\n",
" { 'content': '君臣/固守/以窥/周室,/',\n",
" 'keyword_note': {'窥': '窥视'},\n",
" 'note': [],\n",
" 'translation': '君臣牢固地守卫着,借以窥视周王室的权力,'}),\n",
" ( '秦孝公据崤函之固, 拥雍州之地,',\n",
" { 'content': '秦孝公/据/崤函/之固/, 拥/雍州/之地,/',\n",
" 'keyword_note': { '崤函': '崤山和函谷关',\n",
" '据': '占据',\n",
" '雍州': '古代九州之一'},\n",
" 'note': [],\n",
" 'translation': '秦孝公占据着崤山和函谷关的险固地势,拥有雍州的土地,'})],\n",
" 'schedule': { 'phases': { 'final_review': [ ['FillBlank', '0.7'],\n",
" ['SelectMeaning', '0.7'],\n",
" ['Recognition', '1.0']],\n",
" 'quick_review': [ ['FillBlank', '1.0'],\n",
" ['SelectMeaning', '0.5'],\n",
" ['Recognition', '1.0']],\n",
" 'recognition': [['Recognition', '1.0']]},\n",
" 'schedule': [ 'quick_review',\n",
" 'recognition',\n",
" 'final_review']},\n",
" 'source': PosixPath('test_repo'),\n",
" 'typedef': { 'annotation': { 'content': '内容',\n",
" 'delimiter': '分隔符',\n",
" 'keyword_note': '关键词翻译',\n",
" 'note': '笔记',\n",
" 'translation': '语句翻译',\n",
" 'tts_text': '文本转语音文本'},\n",
" 'common': { 'delimiter': '/',\n",
" 'tts_text': \"eval:payload['content'].replace('/', \"\n",
" \"'')\"},\n",
" 'puzzles': { 'FillBlank': { '__hint__': '',\n",
" '__origin__': 'cloze',\n",
" 'delimiter': \"eval:metadata['formation']['delimiter']\",\n",
" 'min_denominator': \"eval:default['cloze']['min_denominator']\",\n",
" 'text': \"eval:payload['content']\"},\n",
" 'Recognition': { '__hint__': '',\n",
" '__origin__': 'recognition',\n",
" 'primary': \"eval:payload['content']\",\n",
" 'secondary': [ \"eval:payload['keyword_note']\",\n",
" \"eval:payload['note']\"],\n",
" 'top_dim': [ \"eval:payload['translation']\"]},\n",
" 'SelectMeaning': { '__hint__': \"eval:payload['content']\",\n",
" '__origin__': 'mcq',\n",
" 'jammer': \"eval:list(payload['keyword_note'].values())\",\n",
" 'mapping': \"eval:payload['keyword_note']\",\n",
" 'max_riddles_num': \"eval:default['mcq']['max_riddles_num']\",\n",
" 'prefix': '选择正确项: ',\n",
" 'primary': \"eval:payload['content']\"}},\n",
" '古文句': {}}}\n"
]
}
],
"source": [
"repo = repolib.Repo.create_from_repodir(Path('./test_repo'))\n",
"for i in repo.ident_index:\n",
" n = pt.Nucleon.create_on_nucleonic_data(nucleonic_data=repo.nucleonic_data_lict.get_itemic_unit(i))\n",
" e = pt.Electron.create_on_electonic_data(electronic_data=repo.electronic_data_lict.get_itemic_unit(i))\n",
" e.activate()\n",
" e.revisor(5, True)\n",
" print(repr(n))\n",
" print(repr(e))\n",
"print(repo)"
]
}
],
"metadata": {

12
examples/simplemem.py Normal file
View File

@@ -0,0 +1,12 @@
import heurams.kernel.repolib as repolib
import heurams.kernel.particles as pt
from pathlib import Path
repo = repolib.Repo.create_from_repodir(Path('./test_repo'))
for i in repo.ident_index:
n = pt.Nucleon.create_on_nucleonic_data(nucleonic_data=repo.nucleonic_data_lict.get_itemic_unit(i))
e = pt.Electron.create_on_electonic_data(electronic_data=repo.electronic_data_lict.get_itemic_unit(i))
e.activate()
e.revisor(5, True)
print(repr(n))
print(repr(e))
print(repo)

View File

@@ -5,6 +5,7 @@
import pathlib
from contextvars import ContextVar
import shutil
from heurams.services.config import ConfigFile
from heurams.services.logger import get_logger
@@ -14,32 +15,33 @@ from heurams.services.logger import get_logger
# 数据文件路径规定: 以运行目录为准
rootdir = pathlib.Path(__file__).parent
print(f"rootdir: {rootdir}")
print(f"项目根目录: {rootdir}")
logger = get_logger(__name__)
logger.debug(f"项目根目录: {rootdir}")
workdir = pathlib.Path.cwd()
print(f"workdir: {workdir}")
print(f"工作目录: {workdir}")
logger.debug(f"工作目录: {workdir}")
config_var: ContextVar[ConfigFile] = ContextVar(
"config_var", default=ConfigFile(rootdir / "default" / "config" / "config.toml")
)
try:
config_var: ContextVar[ConfigFile] = ContextVar(
"config_var", default=ConfigFile(workdir / "config" / "config.toml")
) # 配置文件
print("已加载自定义用户配置")
logger.info("已加载自定义用户配置, 路径: %s", workdir / "config" / "config.toml")
except Exception as e:
print("未能加载自定义用户配置")
logger.warning("未能加载自定义用户配置, 错误: %s", e)
if pathlib.Path(workdir / "config" / "config_dev.toml").exists():
if pathlib.Path(workdir / "data" / "config" / "config_dev.toml").exists():
print("使用开发设置")
logger.debug("使用开发设置")
config_var: ContextVar[ConfigFile] = ContextVar(
"config_var", default=ConfigFile(workdir / "config" / "config_dev.toml")
"config_var", default=ConfigFile(workdir / "data" / "config" / "config_dev.toml")
)
# runtime_var: ContextVar = ContextVar('runtime_var', default=dict()) # 运行时共享数据
else:
try:
config_var: ContextVar[ConfigFile] = ContextVar(
"config_var", default=ConfigFile(workdir / "data" / "config" / "config.toml")
) # 配置文件
except Exception as e:
input("按下回车以创建新的配置文件, 或按下 Ctrl + C 以终止程序 ")
(workdir / "data" / 'config').mkdir(parents=True, exist_ok=True)
(workdir / "data" / 'config' / 'config').unlink(missing_ok=True)
shutil.copy((rootdir / 'default' / 'config' / 'config.toml'), workdir / "data" / "config" / "config.toml")
finally:
config_var: ContextVar[ConfigFile] = ContextVar(
"config_var", default=ConfigFile(workdir / "data" / "config" / "config.toml")
) # 配置文件
class ConfigContext:
"""

View File

@@ -31,12 +31,7 @@ max_riddles_num = 2
min_denominator = 3
[paths] # 相对于配置文件的 ".." (即工作目录) 而言 或绝对路径
nucleon_dir = "./data/nucleon"
electron_dir = "./data/electron"
global_dir = "./data/global" # 全局数据路径, SM-15 等算法需要
orbital_dir = "./data/orbital"
cache_dir = "./data/cache"
template_dir = "./data/template"
data = "./data"
[services] # 定义服务到提供者的映射
audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Android Termux), mpg123(TODO)

View File

@@ -7,12 +7,11 @@ from heurams.services.logger import get_logger
from .screens.about import AboutScreen
from .screens.dashboard import DashboardScreen
from .screens.precache import PrecachingScreen
from .screens.repocreator import NucleonCreatorScreen
from .screens.repocreator import RepoCreatorScreen
from .screens.synctool import SyncScreen
logger = get_logger(__name__)
def environment_check():
from pathlib import Path
@@ -39,13 +38,13 @@ class HeurAMSApp(App):
("d", "toggle_dark", "切换色调"),
("1", "app.push_screen('dashboard')", "仪表盘"),
("2", "app.push_screen('precache_all')", "缓存管理器"),
("3", "app.push_screen('nucleon_creator')", "创建新单元"),
("3", "app.push_screen('nucleon_creator')", "创建新仓库"),
# ("4", "app.push_screen('synctool')", "同步工具"),
("0", "app.push_screen('about')", "版本信息"),
]
SCREENS = {
"dashboard": DashboardScreen,
"nucleon_creator": NucleonCreatorScreen,
"repo_creator": RepoCreatorScreen,
"precache_all": PrecachingScreen,
"synctool": SyncScreen,
"about": AboutScreen,

View File

@@ -8,7 +8,7 @@ from heurams.services.logger import get_logger
from .screens.about import AboutScreen
from .screens.dashboard import DashboardScreen
from .screens.precache import PrecachingScreen
from .screens.repocreator import NucleonCreatorScreen
from .screens.repocreator import RepoCreatorScreen
logger = get_logger(__name__)

View File

@@ -12,6 +12,7 @@ import heurams.services.timer as timer
import heurams.services.version as version
from heurams.context import *
from heurams.kernel.particles import *
from heurams.kernel.repolib import *
from heurams.services.logger import get_logger
from .about import AboutScreen
@@ -32,9 +33,10 @@ class DashboardScreen(Screen):
classes: str | None = None,
) -> None:
super().__init__(name, id, classes)
self.nextdates = {}
self.texts = {}
self.stay_enabled = {}
self.repostat = {}
self.title2dirname = {}
self.title2repo = {}
self._load_data()
def compose(self) -> ComposeResult:
"""组合界面组件"""
@@ -44,119 +46,86 @@ class DashboardScreen(Screen):
Label(f"当前 UNIX 日时间戳: {timer.get_daystamp()}"),
Label(f'时区修正: UTC+{config_var.get()["timezone_offset"] / 3600}'),
Label(f"使用算法: {config_var.get()['algorithm']['default']}"),
Label("选择待学习或待修改的记忆单元集:", classes="title-label"),
ListView(id="union-list", classes="union-list-view"),
Label("选择待学习或待修改的仓库:", classes="title-label"),
ListView(id="repo-list", classes="repo-list-view"),
Label(
f'"潜进" 启发式辅助记忆调度器 | 版本 {version.ver} '
f"{version.codename.capitalize()} 2025"
),
)
yield Footer()
def analyser(self, filename: str) -> dict:
"""分析文件状态以生成显示文本
def _load_data(self):
self.repo_dirs = Repo.probe_vaild_repos_in_dir(Path(config_var.get()['paths']['data']) / 'repo')
for repo_dir in self.repo_dirs:
repo = Repo.create_from_repodir(repo_dir)
self._analyse_repo(repo)
Args:
filename: 要分析的文件名
Returns:
dict: 包含显示文本的字典,键为行号
"""
from heurams.kernel.repository.particle_loader import (load_electron,
load_nucleon)
result = {}
filestem = pathlib.Path(filename).stem
# 构建电子文件路径
electron_dir = config_var.get()["paths"]["electron_dir"]
electron_file_path = pathlib.Path(electron_dir) / f"{filestem}.json"
logger.debug(f"电子文件路径: {electron_file_path}")
# 确保电子文件存在
if not electron_file_path.exists():
electron_file_path.touch()
electron_file_path.write_text("{}")
# 加载电子数据
electron_dict = load_electron(path=electron_file_path)
logger.debug(f"电子数据: {electron_dict}")
# 分析电子状态
def _analyse_repo(self, repo: Repo):
dirname = repo.source.name # type: ignore
title = repo.manifest["title"]
is_due = 0
is_activated = 0
nextdate = 0x3F3F3F3F
for electron in electron_dict.values():
logger.debug(f"{electron}, 是否到期: {electron.is_due()}")
if electron.is_due():
is_due = 1
unit_sum = len(repo)
activated_sum = 0
nextdate = 0x3f3f3f3f
is_unfinished = (unit_sum > activated_sum)
for i in repo.ident_index:
nucleon = pt.Nucleon.create_on_nucleonic_data(nucleonic_data=repo.nucleonic_data_lict.get_itemic_unit(i))
electron = pt.Electron.create_on_electonic_data(electronic_data=repo.electronic_data_lict.get_itemic_unit(i))
if electron.is_activated():
is_activated = 1
nextdate = min(nextdate, electron.nextdate())
# 检查是否需要更多复习
nucleon_dir = config_var.get()["paths"]["nucleon_dir"]
nucleon_path = pathlib.Path(nucleon_dir) / f"{filestem}.toml"
nucleon_count = len(load_nucleon(nucleon_path))
electron_count = len(electron_dict)
is_more = not (electron_count >= nucleon_count)
logger.debug(f"是否需要更多复习: {is_more}")
# 更新状态
self.nextdates[filename] = nextdate
self.stay_enabled[filename] = is_due or is_more
# 构建返回结果
result[0] = f"{filename}\0"
if not is_activated:
result[1] = " 尚未激活"
else:
status_text = "需要复习" if is_due else "当前无需复习"
result[1] = f"下一次复习: {nextdate}\n{status_text}"
return result
activated_sum += 1
if electron.is_due():
is_due = 1
nextdate = min(nextdate, electron.nextdate())
if is_unfinished:
nextdate = min(nextdate, timer.get_daystamp())
need_to_study = is_due or is_unfinished
prompt = f"{title}\0\n 进度: {activated_sum}/{unit_sum}\n {"需要学习" if need_to_study else "无需操作"}"
stat = {
"is_due": is_due,
"unit_sum": unit_sum,
"title": title,
"activated_sum": activated_sum,
"nextdate": nextdate,
"is_unfinished": is_unfinished,
"need_to_study": need_to_study,
"prompt": prompt,
"dirname": dirname,
}
self.repostat[dirname] = stat
self.title2dirname[title] = dirname
self.title2repo[title] = repo
def on_mount(self) -> None:
"""挂载组件时初始化"""
union_list_widget = self.query_one("#union-list", ListView)
probe = probe_all(0)
# 分析所有文件
for file in probe["nucleon"]:
self.texts[file] = self.analyser(file)
repo_list_widget = self.query_one("#repo-list", ListView)
# 按下次复习时间排序
nucleon_files = sorted(
probe["nucleon"],
key=lambda f: self.nextdates[f],
repodirs = sorted(
self.repo_dirs,
key=lambda f: self.repostat[f.name]['nextdate'],
reverse=True,
)
repotitles = map(lambda f: self.repostat[f.name]['title'], repodirs)
# 填充列表
if not probe["nucleon"]:
union_list_widget.append(
if not repodirs:
repo_list_widget.append(
ListItem(
Static(
"在 ./nucleon/ 中未找到任何内容源数据文件。\n"
"放置文件后重启应用或者新建空的单元集。"
"在 ./data/repo/ 中未找到任何仓库.\n"
"导入仓库后重启应用, 或者新建空的仓库."
)
)
)
union_list_widget.disabled = True
repo_list_widget.disabled = True
return
for file in nucleon_files:
text = self.texts[file]
list_item = ListItem(Label(f"{text[0]}\n{text[1]}"))
union_list_widget.append(list_item)
for repotitle in repotitles:
prompt = self.repostat[self.title2dirname[repotitle]]['prompt']
list_item = ListItem(Label(prompt))
repo_list_widget.append(list_item)
if not self.stay_enabled[file]:
list_item.disabled = True
#if not self.stay_enabled[repodir]:
# list_item.disabled = True
def on_list_view_selected(self, event) -> None:
"""处理列表项选择事件"""
@@ -166,32 +135,25 @@ class DashboardScreen(Screen):
selected_label = event.item.query_one(Label)
label_text = str(selected_label.renderable)
if "未找到任何 .toml 文件" in label_text:
if "未找到任何仓库" in label_text:
return
# 提取文件名
selected_filename = pathlib.Path(label_text.partition("\0")[0].replace("*", ""))
selected_repotitle = label_text.partition("\0")[0].replace("*", "")
selected_repo = self.title2repo[label_text.partition("\0")[0].replace("*", "")]
# 构建文件路径
nucleon_dir = config_var.get()["paths"]["nucleon_dir"]
electron_dir = config_var.get()["paths"]["electron_dir"]
nucleon_file_path = pathlib.Path(nucleon_dir) / selected_filename
electron_file_path = (
pathlib.Path(electron_dir) / f"{selected_filename.stem}.json"
)
# 跳转到准备屏幕
self.app.push_screen(PreparationScreen(nucleon_file_path, electron_file_path))
self.app.push_screen(PreparationScreen(selected_repo, self.repostat[self.title2dirname[selected_repotitle]]))
def on_button_pressed(self, event) -> None:
"""处理按钮点击事件"""
button_id = event.button.id
if button_id == "new_nucleon_button":
from .repocreator import NucleonCreatorScreen
from .repocreator import RepoCreatorScreen
new_screen = NucleonCreatorScreen()
new_screen = RepoCreatorScreen()
self.app.push_screen(new_screen)
elif button_id == "precache_all_button":

View File

@@ -12,6 +12,7 @@ import heurams.services.hasher as hasher
from heurams.context import *
from heurams.context import config_var
from heurams.services.logger import get_logger
from heurams.kernel.repolib import *
logger = get_logger(__name__)
@@ -29,24 +30,19 @@ class PreparationScreen(Screen):
scheduled_num = reactive(config_var.get()["scheduled_num"])
def __init__(self, nucleon_file: pathlib.Path, electron_file: pathlib.Path) -> None:
def __init__(self, repo: Repo, repostat: dict) -> None:
super().__init__(name=None, id=None, classes=None)
self.nucleon_file = nucleon_file
self.electron_file = electron_file
self.nucleons_with_orbital = pt.load_nucleon(self.nucleon_file)
self.electrons = pt.load_electron(self.electron_file)
self.repo = repo
self.repostat = repostat
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with ScrollableContainer(id="vice_container"):
yield Label(f"准备就绪: [b]{self.nucleon_file.stem}[/b]\n")
yield Label(f"准备就绪: [b]{self.repostat['title']}[/b]\n")
yield Label(
f"内容源文件: {config_var.get()['paths']['nucleon_dir']}/[b]{self.nucleon_file.name}[/b]"
f"仓库路径: {config_var.get()['paths']['data']}/repo/[b]{self.repostat['dirname']}[/b]"
)
yield Label(
f"元数据文件: {config_var.get()['paths']['electron_dir']}/[b]{self.electron_file.name}[/b]"
)
yield Label(f"\n单元数量: {len(self.nucleons_with_orbital)}\n")
yield Label(f"\n单元数量: {len(self.repo)}\n")
yield Label(f"单次记忆数量: {self.scheduled_num}", id="schnum_label")
yield Button(
@@ -76,10 +72,9 @@ class PreparationScreen(Screen):
def _get_full_content(self):
content = ""
for nucleon, orbital in self.nucleons_with_orbital:
nucleon: pt.Nucleon
# print(nucleon.payload)
content += " - " + nucleon["content"] + " \n"
for i in self.repo.ident_index:
n = pt.Nucleon.create_on_nucleonic_data(nucleonic_data=self.repo.nucleonic_data_lict.get_itemic_unit(i))
content += f"- {n['content']} \n"
return content
def action_go_back(self):

View File

@@ -13,9 +13,9 @@ from heurams.context import config_var
from heurams.services.version import ver
class NucleonCreatorScreen(Screen):
class RepoCreatorScreen(Screen):
BINDINGS = [("q", "go_back", "返回")]
SUB_TITLE = "单元集创建向导"
SUB_TITLE = "仓库创建向导"
def __init__(self) -> None:
super().__init__(name=None, id=None, classes=None)

View File

@@ -20,7 +20,7 @@ from heurams.kernel.algorithms.sm15m_calc import (MAX_AF, MIN_AF, NOTCH_AF,
# 全局状态文件路径
_GLOBAL_STATE_FILE = os.path.expanduser(
pathlib.Path(config_var.get()["paths"]["global_dir"]) / "sm15m_global_state.json"
pathlib.Path(config_var.get()["paths"]["data"]) / 'global' / "sm15m_global_state.json"
)

View File

@@ -21,7 +21,7 @@ class Electron:
algo_name: 使用的算法模块标识
"""
if algo_name == "":
algo_name = "sm-2"
algo_name = "SM-2"
self.algodata = algodata
self.ident = ident
self.algo: algolib.BaseAlgorithm = algorithms[algo_name]
@@ -29,6 +29,11 @@ class Electron:
if not self.algo.check_integrity(self.algodata):
self.algodata[self.algo.algo_name] = deepcopy(self.algo.defaults)
def __repr__(self):
from pprint import pformat
s = pformat(self.algodata, indent=4)
return s
def activate(self):
"""激活此电子"""
self.algodata[self.algo.algo_name]["is_activated"] = 1

View File

@@ -41,7 +41,9 @@ class Nucleon:
return len(self.data)
def __repr__(self):
return repr(self.data)
from pprint import pformat
s = pformat(self.data, indent=4)
return s
@staticmethod
def create_on_nucleonic_data(nucleonic_data: tuple):

View File

@@ -1,12 +0,0 @@
from heurams.services.logger import get_logger
from .fission import Fission
from .phaser import Phaser
from .procession import Procession
from .states import PhaserState, ProcessionState
logger = get_logger(__name__)
__all__ = ["PhaserState", "ProcessionState", "Procession", "Fission", "Phaser"]
logger.debug("反应堆模块已加载")

View File

@@ -1,51 +1,24 @@
# 移相器类定义
from enum import Enum
from typing import Any, Sequence, Type
from transitions import Machine, State, Event, EventData
import heurams.kernel.particles as pt
from heurams.services.logger import get_logger
class Phaser(Machine):
def __init__(self, schedule: list):
state_words = ["init"] + schedule.copy()
state_objects = list()
for name in state_words:
state_objects.append(State(
name=name,
on_enter=["on_enter"]
))
Machine.__init__(self, states=state_objects, initial="init", send_event=True)
self.add_ordered_transitions(loop=False)
from .procession import Procession
from .states import PhaserState, ProcessionState
def on_enter(self, event_data: EventData):
print(event_data.transition.source, "->", event_data.transition.dest) # type: ignore
logger = get_logger(__name__)
class Phaser:
"""移相器: 全局调度阶段管理器"""
def __init__(self, atoms: list[pt.Atom]) -> None:
logger.debug("Phaser.__init__: 原子数量=%d", len(atoms))
new_atoms = list()
old_atoms = list()
self.state = PhaserState.UNSURE
for i in atoms:
if not i.registry["electron"].is_activated():
new_atoms.append(i)
else:
old_atoms.append(i)
logger.debug("新原子数量=%d, 旧原子数量=%d", len(new_atoms), len(old_atoms))
self.processions = list()
if len(old_atoms):
self.processions.append(
Procession(old_atoms, PhaserState.QUICK_REVIEW, "初始复习")
)
logger.debug("创建初始复习 Procession")
if len(new_atoms):
self.processions.append(
Procession(new_atoms, PhaserState.RECOGNITION, "新记忆")
)
logger.debug("创建新记忆 Procession")
self.processions.append(Procession(atoms, PhaserState.FINAL_REVIEW, "总体复习"))
logger.debug("创建总体复习 Procession")
logger.debug("Phaser 初始化完成, processions 数量=%d", len(self.processions))
def current_procession(self):
logger.debug("Phaser.current_procession 被调用")
for i in self.processions:
i: Procession
if not i.state == ProcessionState.FINISHED:
self.state = i.phase
logger.debug("找到未完成的 Procession: phase=%s", i.phase)
return i
self.state = PhaserState.FINISHED
logger.debug("所有 Procession 已完成, 状态设置为 FINISHED")
return 0
if __name__ == "__main__":
p = Phaser(["a", "b"])
p.next_state()
p.next_state()
print(p.state)

View File

@@ -1,74 +1,24 @@
import heurams.kernel.particles as pt
from heurams.services.logger import get_logger
from enum import Enum
from typing import Any, Sequence, Type
from transitions import Machine, State, Event, EventData
from .states import PhaserState, ProcessionState
class Phaser(Machine):
def __init__(self, schedule: list):
state_words = ["init"] + schedule.copy()
state_objects = list()
for name in state_words:
state_objects.append(State(
name=name,
on_enter=["on_enter"]
))
Machine.__init__(self, states=state_objects, initial="init", send_event=True)
self.add_ordered_transitions(loop=False)
logger = get_logger(__name__)
def on_enter(self, event_data: EventData):
print(event_data.transition.source, "->", event_data.transition.dest) # type: ignore
class Procession:
"""队列: 标识单次记忆流程"""
def __init__(self, atoms: list, phase: PhaserState, name: str = ""):
logger.debug(
"Procession.__init__: 原子数量=%d, phase=%s, name='%s'",
len(atoms),
phase.value,
name,
)
self.atoms = atoms
self.queue = atoms.copy()
self.current_atom = atoms[0]
self.cursor = 0
self.name = name
self.phase = phase
self.state: ProcessionState = ProcessionState.RUNNING
logger.debug("Procession 初始化完成, 队列长度=%d", len(self.queue))
def forward(self, step=1):
logger.debug("Procession.forward: step=%d, 当前 cursor=%d", step, self.cursor)
self.cursor += step
if self.cursor == len(self.queue):
self.state = ProcessionState.FINISHED
logger.debug("Procession 已完成")
else:
self.state = ProcessionState.RUNNING
try:
logger.debug("cursor 更新为: %d", self.cursor)
self.current_atom = self.queue[self.cursor]
logger.debug("当前原子更新为: %s", self.current_atom.ident)
return 1 # 成功
except IndexError as e:
logger.debug("IndexError: %s", e)
self.state = ProcessionState.FINISHED
logger.debug("Procession 因索引错误而完成")
return 0
def append(self, atom=None):
if atom == None:
atom = self.current_atom
logger.debug("Procession.append: atom=%s", atom.ident if atom else "None")
if self.queue[len(self.queue) - 1] != atom or len(self) <= 1:
self.queue.append(atom)
logger.debug("原子已追加到队列, 新队列长度=%d", len(self.queue))
else:
logger.debug("原子未追加(重复或队列长度<=1)")
def __len__(self):
length = len(self.queue) - self.cursor
logger.debug("Procession.__len__: 剩余长度=%d", length)
return length
def process(self):
logger.debug("Procession.process: cursor=%d", self.cursor)
return self.cursor
def total_length(self):
total = len(self.queue)
logger.debug("Procession.total_length: %d", total)
return total
def is_empty(self):
empty = len(self.queue)
logger.debug("Procession.is_empty: %d", empty)
return empty
if __name__ == "__main__":
p = Phaser(["a", "b"])
p.next_state()
p.next_state()
print(p.state)

View File

@@ -0,0 +1,12 @@
from heurams.services.logger import get_logger
from .fission import Fission
from .phaser import Phaser
from .procession import Procession
from .states import PhaserState, ProcessionState
logger = get_logger(__name__)
__all__ = ["PhaserState", "ProcessionState", "Procession", "Fission", "Phaser"]
logger.debug("反应堆模块已加载")

View File

@@ -0,0 +1,51 @@
# 移相器类定义
import heurams.kernel.particles as pt
from heurams.services.logger import get_logger
from .procession import Procession
from .states import PhaserState, ProcessionState
logger = get_logger(__name__)
class Phaser:
"""全局调度阶段管理器"""
def __init__(self, atoms: list[pt.Atom]) -> None:
logger.debug("Phaser.__init__: 原子数量=%d", len(atoms))
new_atoms = list()
old_atoms = list()
self.state = PhaserState.UNSURE
for i in atoms:
if not i.registry["electron"].is_activated():
new_atoms.append(i)
else:
old_atoms.append(i)
logger.debug("新原子数量=%d, 旧原子数量=%d", len(new_atoms), len(old_atoms))
self.processions = list()
if len(old_atoms):
self.processions.append(
Procession(old_atoms, PhaserState.QUICK_REVIEW, "初始复习")
)
logger.debug("创建初始复习 Procession")
if len(new_atoms):
self.processions.append(
Procession(new_atoms, PhaserState.RECOGNITION, "新记忆")
)
logger.debug("创建新记忆 Procession")
self.processions.append(Procession(atoms, PhaserState.FINAL_REVIEW, "总体复习"))
logger.debug("创建总体复习 Procession")
logger.debug("Phaser 初始化完成, processions 数量=%d", len(self.processions))
def current_procession(self):
logger.debug("Phaser.current_procession 被调用")
for i in self.processions:
i: Procession
if not i.state == ProcessionState.FINISHED:
self.state = i.phase
logger.debug("找到未完成的 Procession: phase=%s", i.phase)
return i
self.state = PhaserState.FINISHED
logger.debug("所有 Procession 已完成, 状态设置为 FINISHED")
return 0

View File

@@ -0,0 +1,74 @@
import heurams.kernel.particles as pt
from heurams.services.logger import get_logger
from .states import PhaserState, ProcessionState
logger = get_logger(__name__)
class Procession:
"""队列: 标识单次记忆流程"""
def __init__(self, atoms: list, phase: PhaserState, name: str = ""):
logger.debug(
"Procession.__init__: 原子数量=%d, phase=%s, name='%s'",
len(atoms),
phase.value,
name,
)
self.atoms = atoms
self.queue = atoms.copy()
self.current_atom = atoms[0]
self.cursor = 0
self.name = name
self.phase = phase
self.state: ProcessionState = ProcessionState.RUNNING
logger.debug("Procession 初始化完成, 队列长度=%d", len(self.queue))
def forward(self, step=1):
logger.debug("Procession.forward: step=%d, 当前 cursor=%d", step, self.cursor)
self.cursor += step
if self.cursor == len(self.queue):
self.state = ProcessionState.FINISHED
logger.debug("Procession 已完成")
else:
self.state = ProcessionState.RUNNING
try:
logger.debug("cursor 更新为: %d", self.cursor)
self.current_atom = self.queue[self.cursor]
logger.debug("当前原子更新为: %s", self.current_atom.ident)
return 1 # 成功
except IndexError as e:
logger.debug("IndexError: %s", e)
self.state = ProcessionState.FINISHED
logger.debug("Procession 因索引错误而完成")
return 0
def append(self, atom=None):
if atom == None:
atom = self.current_atom
logger.debug("Procession.append: atom=%s", atom.ident if atom else "None")
if self.queue[len(self.queue) - 1] != atom or len(self) <= 1:
self.queue.append(atom)
logger.debug("原子已追加到队列, 新队列长度=%d", len(self.queue))
else:
logger.debug("原子未追加(重复或队列长度<=1)")
def __len__(self):
length = len(self.queue) - self.cursor
logger.debug("Procession.__len__: 剩余长度=%d", length)
return length
def process(self):
logger.debug("Procession.process: cursor=%d", self.cursor)
return self.cursor
def total_length(self):
total = len(self.queue)
logger.debug("Procession.total_length: %d", total)
return total
def is_empty(self):
empty = len(self.queue)
logger.debug("Procession.is_empty: %d", empty)
return empty

View File

@@ -1,6 +1,7 @@
import json
from functools import reduce
from pathlib import Path
from typing import TypedDict
import toml
@@ -8,6 +9,10 @@ import heurams.kernel.particles as pt
from ...utils.lict import Lict
class RepoManifest(TypedDict):
title: str
author: str
desc: str
class Repo:
file_mapping = {
@@ -38,7 +43,7 @@ class Repo:
source=None,
) -> None:
self.schedule: dict = schedule
self.manifest: dict = manifest
self.manifest: RepoManifest = manifest # type: ignore
self.typedef: dict = typedef
self.payload: Lict = payload
self.algodata: Lict = algodata
@@ -60,8 +65,11 @@ class Repo:
self._nucleonic_proc,
self.payload))
)
self.electronic_data_lict = self.algodata
self.orbitic_data = self.schedule
self.ident_index = self.nucleonic_data_lict.keys()
for i in self.ident_index:
self.algodata.append_new((i, {}))
self.electronic_data_lict = self.algodata
def _nucleonic_proc(self, unit):
ident = unit[0]
@@ -78,6 +86,11 @@ class Repo:
def __len__(self):
return len(self.payload)
def __repr__(self):
from pprint import pformat
s = pformat(self.database, indent=4)
return s
def persist_to_repodir(
self, save_list: list | None = None, source: Path | None = None
):
@@ -151,3 +164,12 @@ class Repo:
return 1
except:
return 0
@classmethod
def probe_vaild_repos_in_dir(cls, folder: Path):
lst = list()
for i in folder.iterdir():
if i.is_dir():
if cls.check_repodir(i):
lst.append(i)
return lst

View File

@@ -1,4 +1,4 @@
class Evalizer:
class Evalizer():
"""几乎无副作用的模板系统
接受环境信息并创建一个模板解析工具, 工具传入参数支持list, dict及其嵌套
@@ -6,7 +6,6 @@ class Evalizer:
"""
# TODO: 弃用风险极高的 eval
# 理论上已经限制了全局函数 但eval仍有风险
# TODO: 异步/多线程执行避免堵塞
def __init__(self, environment: dict) -> None:
self.env = environment
@@ -19,6 +18,8 @@ class Evalizer:
return list(map(self.travel, anyobj))
elif isinstance(anyobj, dict):
return dict(map(self.travel, anyobj.items()))
elif isinstance(anyobj, tuple):
return tuple(map(self.travel, anyobj))
elif isinstance(anyobj, str):
if anyobj.startswith("eval:"):
return self.eval_with_env(anyobj[5:])
@@ -28,7 +29,7 @@ class Evalizer:
return anyobj
def eval_with_env(self, s: str):
ret = eval(s, {}, self.env)
ret = eval(s, globals(), self.env)
if not isinstance(ret, str):
ret = str(ret)
return ret

View File

@@ -52,6 +52,9 @@ class Lict(UserList): # TODO: 优化同步(惰性同步), 当前性能为 O(n)
else:
return super().__getitem__(i)
def get_itemic_unit(self, i):
return (i, self.dicted_data[i])
def __setitem__(self, i, item):
if isinstance(i, str):
self.dicted_data[i] = item
@@ -81,6 +84,15 @@ class Lict(UserList): # TODO: 优化同步(惰性同步), 当前性能为 O(n)
if self.forced_order:
self.data.sort()
def append_new(self, item: Any):
if item != (item[0], item[1]):
raise NotImplementedError
if item[0] not in self:
super().append(item)
self._sync_based_on_list()
if self.forced_order:
self.data.sort()
def insert(self, i: int, item: Any) -> None:
if item != (item[0], item[1]): # 确保 item 是遵从限制的元组
raise NotImplementedError

View File

@@ -3,7 +3,7 @@ from unittest.mock import MagicMock, Mock, patch
from heurams.kernel.particles.atom import Atom
from heurams.kernel.particles.electron import Electron
from heurams.kernel.reactor.phaser import Phaser
from heurams.kernel.reactor.procession import Phaser
from heurams.kernel.reactor.states import PhaserState, ProcessionState