This commit is contained in:
2025-12-16 14:09:58 +08:00
parent ac6aa75a15
commit eb163048d6
6 changed files with 97 additions and 32 deletions

View File

@@ -85,13 +85,17 @@ class Atom:
# 如果无法获取配置或元数据,使用空字典
logger.debug("无法获取配置或元数据,使用空字典")
pass
try:
eval_value = eval(s)
if isinstance(eval_value, (list, dict)):
ret = eval_value
else:
ret = str(eval_value)
logger.debug("eval 执行成功: '%s' -> '%s'", s, str(ret)[:50] + '...' if len(ret) > 50 else ret)
try:
# 为 eval 提供全局变量,包含 default 和 metadata
globals_dict = {'default': default, 'metadata': metadata, '__builtins__': __builtins__}
eval_value = eval(s, globals_dict, {})
# 返回原始值,保持类型
ret = eval_value
# 安全日志记录:将 ret 转换为字符串并截断
ret_str = str(ret)
if len(ret_str) > 50:
ret_str = ret_str[:50] + '...'
logger.debug("eval 执行成功: '%s' -> '%s'", s, ret_str)
except Exception as e:
ret = f"此 eval 实例发生错误: {e}"
logger.warning("eval 执行错误: '%s' -> %s", s, e)
@@ -143,17 +147,33 @@ class Atom:
logger.debug("持久化路径: %s, 格式: %s", path, self.registry[key + "_fmt"])
path.parent.mkdir(parents=True, exist_ok=True)
if self.registry[key + "_fmt"] == "toml":
with open(path, "r+") as f:
f.seek(0)
f.truncate()
toml.dump(self.registry[key], f)
obj = self.registry[key]
# 转换为与测试兼容的格式ident, payload, __metadata__ 键
if hasattr(obj, 'ident') and hasattr(obj, 'payload') and hasattr(obj, 'metadata'):
data = {"ident": obj.ident, "payload": obj.payload, "__metadata__": obj.metadata}
else:
data = obj # 回退到直接转储
with open(path, "w") as f:
toml.dump(data, f)
logger.debug("TOML 数据已保存到: %s", path)
elif self.registry[key + "_fmt"] == "json":
with open(path, "r+") as f:
origin = json.load(f)
f.seek(0)
f.truncate()
origin[self.ident] = self.registry[key].algodata
# 读取现有 JSON 数据,如果文件不存在或无效,则使用空字典
origin = {}
if path.exists():
try:
with open(path, "r") as f:
origin = json.load(f)
if not isinstance(origin, dict):
logger.warning("JSON 文件内容不是字典,重置为空字典")
origin = {}
except (json.JSONDecodeError, IOError) as e:
logger.warning("无法读取 JSON 文件 %s: %s,将创建新文件", path, e)
origin = {}
# 更新特定标识符的数据,仅保留字符串键以确保 JSON 可序列化
algodata = self.registry[key].algodata
filtered_algodata = {k: v for k, v in algodata.items() if isinstance(k, str)}
origin[self.ident] = filtered_algodata
with open(path, "w") as f:
json.dump(origin, f, indent=2, ensure_ascii=False)
logger.debug("JSON 数据已保存到: %s", path)
else:

View File

@@ -23,10 +23,28 @@ class Electron:
self.algo = algorithms[algo_name]
logger.debug("使用的算法类: %s", self.algo.__name__)
if self.algo not in self.algodata.keys():
self.algodata[self.algo.algo_name] = {}
logger.debug("算法键 '%s' 不存在,已创建空字典", self.algo)
if not self.algodata[self.algo.algo_name]:
# 确保 algodata 包含字符串键(算法名称)和类对象键
algo_name_str = self.algo.algo_name
# 如果字符串键不存在,但类对象键存在,则使用类对象键的值
if algo_name_str not in self.algodata and self.algo in self.algodata:
self.algodata[algo_name_str] = self.algodata[self.algo]
logger.debug("从类对象键复制数据到字符串键")
# 如果字符串键不存在,创建空字典
if algo_name_str not in self.algodata:
self.algodata[algo_name_str] = {}
logger.debug("算法键 '%s' 不存在,已创建空字典", algo_name_str)
# 确保类对象键指向字符串键的字典
if self.algo not in self.algodata:
self.algodata[self.algo] = self.algodata[algo_name_str]
logger.debug("设置类对象键指向相同字典")
else:
# 如果类对象键存在,但指向不同的字典,则将其更新为指向字符串键的字典
if self.algodata[self.algo] is not self.algodata[algo_name_str]:
self.algodata[self.algo] = self.algodata[algo_name_str]
logger.debug("更新类对象键指向字符串键的字典")
# 仅当字符串键字典为空时,才使用默认值初始化
if not self.algodata[algo_name_str]:
logger.debug("算法数据为空,使用默认值初始化")
self._default_init(self.algo.defaults)
else:
@@ -36,7 +54,10 @@ class Electron:
def _default_init(self, defaults: dict):
"""默认初始化包装"""
logger.debug("Electron._default_init: 使用默认值keys: %s", list(defaults.keys()))
self.algodata[self.algo.algo_name] = defaults.copy()
algo_name_str = self.algo.algo_name
self.algodata[algo_name_str] = defaults.copy()
# 确保类对象键也指向同一个字典
self.algodata[self.algo] = self.algodata[algo_name_str]
def activate(self):
"""激活此电子"""

View File

@@ -1,4 +1,5 @@
from heurams.services.logger import get_logger
from heurams.context import config_var
logger = get_logger(__name__)
@@ -52,14 +53,36 @@ class Nucleon:
# eval 环境设置
def eval_with_env(s: str):
# 初始化默认值
nucleon = self
default = {}
metadata = {}
try:
nucleon = self
eval_value = eval(s)
default = config_var.get()["puzzles"]
metadata = nucleon.metadata
except Exception:
# 如果无法获取配置或元数据,使用空字典
logger.debug("无法获取配置或元数据,使用空字典")
pass
try:
# 为 eval 提供全局变量,包含 default、metadata 和 nucleon
globals_dict = {
'default': default,
'metadata': metadata,
'nucleon': nucleon,
'__builtins__': __builtins__
}
eval_value = eval(s, globals_dict, {})
# 对于整数和浮点数,转换为字符串;其他类型保持原样
if isinstance(eval_value, (int, float)):
ret = str(eval_value)
else:
ret = eval_value
logger.debug("eval 执行成功: '%s' -> '%s'", s, str(ret)[:50] + '...' if len(ret) > 50 else ret)
# 安全日志记录:将 ret 转换为字符串并截断
ret_str = str(ret)
if len(ret_str) > 50:
ret_str = ret_str[:50] + '...'
logger.debug("eval 执行成功: '%s' -> '%s'", s, ret_str)
except Exception as e:
ret = f"此 eval 实例发生错误: {e}"
logger.warning("eval 执行错误: '%s' -> %s", s, e)

View File

@@ -62,11 +62,9 @@ class MCQPuzzle(BasePuzzle):
unique_jammers = set(jammer + list(self.mapping.values()))
self.jammer = list(unique_jammers)
# 确保至少有4个干扰项
# 确保至少有4个干扰项,使用三个空格作为默认干扰项
while len(self.jammer) < 4:
self.jammer.append(" " * (4 - len(self.jammer)))
unique_jammers = set(jammer + list(self.mapping.values()))
self.jammer.append(" ")
def _reset_puzzle_state(self) -> None:
"""重置谜题状态为初始值

View File

@@ -108,7 +108,9 @@ class TestAtom(unittest.TestCase):
self.assertTrue(test_path.exists())
with open(test_path, 'r') as f:
data = json.load(f)
self.assertIn("supermemo2", data)
# 电子数据使用算法名称 "SM-2" 作为键
self.assertIn("test_persist_json", data)
self.assertIn("SM-2", data["test_persist_json"])
def test_persist_invalid_format(self):
"""测试无效持久化格式"""
@@ -158,7 +160,7 @@ class TestAtom(unittest.TestCase):
# do_eval 应该在链接时自动调用
# 检查 eval 表达式是否被求值
self.assertEqual(nucleon.payload["content"], "hello world")
self.assertEqual(nucleon.payload["number"], "5")
self.assertEqual(nucleon.payload["number"], 5)
def test_do_eval_with_config_access(self):
"""测试 do_eval 访问配置"""

View File

@@ -45,10 +45,11 @@ class TestMCQPuzzle(unittest.TestCase):
mapping = {"q1": "a1", "q2": "a2", "q3": "a3"}
jammer = ["j1", "j2", "j3", "j4"]
puzzle = MCQPuzzle(mapping, jammer, max_riddles_num=2)
# 模拟 random.sample 返回前两个映射项
# 模拟 random.sample 返回前两个映射项,以及每个问题的干扰项
mock_sample.side_effect = [
[("q1", "a1"), ("q2", "a2")], # 选择问题
["j1", "j2", "j3"], # 为每个问题选择干扰项(实际调用两次)
["j1", "j2", "j3"], # 第一个问题干扰项
["j1", "j2", "j3"], # 第二个问题的干扰项(可以使用相同的列表)
]
puzzle.refresh()