refactor: 部分更改数据结构

This commit is contained in:
2025-12-28 06:16:21 +08:00
parent b5f30ec4ee
commit c13b8bed98
5 changed files with 211 additions and 0 deletions

View File

@@ -0,0 +1 @@
from .repo import *

View File

@@ -0,0 +1,95 @@
from collections import UserList
from typing import Any
class Lict(UserList): #TODO: 优化同步(惰性同步), 当前性能为 O(n)
""""列典" 对象
同时兼容字典和列表大多数 API, 两边数据同步的容器
列表数据是 dictobj.items() 的格式
支持根据字典或列表初始化
限制要求:
- 键名一定唯一, 且仅能为字符串
- 值一定是引用对象
- 不使用并发
- 不在乎列表顺序语义和列表索引查找, 因此 sort, index 等功能不可用
- append 的元组中, 表示键名的元素不能重复, 否则会导致覆盖行为
"""
def __init__(self, initlist = None, initdict = None):
self.dicted_data = {}
if initdict != None:
initlist = list(initdict.items())
super().__init__(initlist=initlist)
self._sync_based_on_list()
def _sync_based_on_dict(self):
self.data = list(self.dicted_data.items())
def _sync_based_on_list(self):
self.dicted_data = {}
for i in self.data:
self.dicted_data[i[0]] = i[1]
def __getitem__(self, i):
if isinstance(i, str):
return self.dicted_data[i]
else:
return super().__getitem__(i)
def __setitem__(self, i, item):
if isinstance(i, str):
self.dicted_data[i] = item
self._sync_based_on_dict()
else:
if item != (item[0], item[1]):
raise NotImplementedError
super().__setitem__(i, item)
self._sync_based_on_list()
def __delitem__(self, i):
if isinstance(i, str):
del self.dicted_data[i]
self._sync_based_on_dict()
else:
super().__delitem__(i)
self._sync_based_on_list()
def append(self, item: Any) -> None:
if item != (item[0], item[1]):
raise NotImplementedError
super().append(item)
self._sync_based_on_list()
def insert(self, i: int, item: Any) -> None:
if item != (item[0], item[1]):
raise NotImplementedError
super().insert(i, item)
self._sync_based_on_list()
def pop(self, i: int = -1) -> Any:
super().pop(i)
self._sync_based_on_list()
def remove(self, item: Any) -> None:
if isinstance(item, str):
item = (item, self.dicted_data[item])
if item != (item[0], item[1]):
raise NotImplementedError
super().remove(item)
self._sync_based_on_list()
def clear(self) -> None:
super().clear()
self._sync_based_on_list()
def index(self):
raise NotImplementedError
def extend(self):
raise NotImplementedError
def sort(self):
raise NotImplementedError
def reverse(self):
raise NotImplementedError

View File

View File

@@ -0,0 +1,107 @@
from pathlib import Path
import heurams.kernel.particles as pt
import toml
import json
from .lict import Lict
class Repo():
file_mapping = {
"orbital": "orbital.toml",
"payload": "payload.toml",
"algodata": "algodata.json",
"manifest": "manifest.toml",
"formation": "formation.toml",
}
default_save_list = ["algodata"]
def __init__(self, orbital, payload, manifest, formation, algodata, source = None) -> None:
self.orbital: pt.Orbital = orbital
self.payload: dict = payload
self.manifest: dict = manifest
self.formation: dict = formation
self.algodata: dict = algodata
self.source: Path | None = source # 若存在, 指向 repo 所在 dir
self.database = {
"orbital": self.orbital,
"payload": self.payload,
"manifest": self.manifest,
"formation": self.formation,
"algodata": self.algodata,
"source": self.source,
}
self.particles = dict()
def __len__(self):
return len(self.payload)
def persist_to_repodir(self, save_list: list | None = None, source: Path | None= None):
if save_list == None:
save_list = self.default_save_list
if self.source != None and source == None:
source = self.source
if source == None:
raise FileNotFoundError("不存在仓库到文件的映射")
for keyname in save_list:
filename = self.file_mapping[keyname]
with open(source / filename, 'w') as f:
if filename.endswith("toml"):
toml.dump(self.database[keyname], f)
elif filename.endswith("json"):
json.dump(self.database[keyname], f)
else:
raise ValueError(f"不支持的文件类型: {filename}")
def export_to_single_dict(self):
return self.database
@classmethod
def create_new_repo(cls, source = None):
default_database = {
"orbital": {"puzzles": {}, "schedule": {}},
"payload": {},
"algodata": {},
"manifest": {},
"formation": {"annotation": {}, "unified": {}},
"source": source
}
return Repo(**default_database)
@classmethod
def create_from_repodir(cls, source: Path):
database = {}
for keyname, filename in cls.file_mapping.items():
with open(source / filename, "r") as f:
if filename.endswith("toml"):
database[keyname] = toml.load(f)
elif filename.endswith("json"):
database[keyname] = json.load(f)
else:
raise ValueError(f"不支持的文件类型: {filename}")
database["source"] = source
return Repo(**database)
@classmethod
def create_from_single_dict(cls, dictdata, source: Path | None = None):
database = dictdata
database["source"] = source
return Repo(**database)
@classmethod
def check_repodir(cls, source: Path):
try:
cls.create_from_repodir(source)
return 1
except:
return 0
def _load(self):
pass
def _load_nucleons(self):
self.particles["nucleons"] = Lict(initdict = self.payload)
self.particles["electrons"] = Lict(initdict = self.algodata)
mtx["list"] =
for ident, payload in self.payload:
res.append()

View File

@@ -0,0 +1,8 @@
from typing import Any, Iterable, TypedDict
from typing_extensions import Self
from .lict import Lict
class ParticalsContainer(TypedDict):
nucleons: Lict
electrons: Lict
atoms: Lict