feat(interface): 改进仪表盘

This commit is contained in:
2026-01-22 06:13:01 +08:00
parent 4ec062b116
commit f6753cd0a3
14 changed files with 249 additions and 248 deletions

1
.gitignore vendored
View File

@@ -7,6 +7,7 @@ __pycache__/
cache/
data/repo/cngk
data/repo/eotgk
data/repo/evtgk
*.egg-info/
build/
dist/

View File

@@ -1,56 +0,0 @@
# [调试] 将更改保存到文件
persist_to_file = 1
# [调试] 覆写时间, 设为 -1 以禁用
daystamp_override = -1
timestamp_override = -1
# [调试] 一键通过
quick_pass = 1
# 对于每个项目的默认新记忆原子数量
scheduled_num = 8
# UTC 时间戳修正 仅用于 UNIX 日时间戳的生成修正, 单位为秒
timezone_offset = +28800 # 中国标准时间 (UTC+8)
[interface]
[interface.memorizor]
autovoice = true # 自动语音播放, 仅限于 recognition 组件
[algorithm]
default = "SM-2" # 主要算法; 可选项: SM-2, SM-15M, FSRS
[puzzles] # 谜题默认配置
[puzzles.mcq]
max_riddles_num = 2
[puzzles.cloze]
min_denominator = 3
[paths] # 相对于配置文件的 ".." (即工作目录) 而言 或绝对路径
data = "./data"
[services] # 定义服务到提供者的映射
audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Android Termux), mpg123(TODO)
tts = "edgetts" # 可选项: edgetts
llm = "openai" # 可选项: openai
sync = "webdav" # 可选项: 留空, webdav
[providers.tts.edgetts] # EdgeTTS 设置
voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声)
[providers.llm.openai] # 与 OpenAI 相容的语言模型接口服务设置
url = ""
key = ""
[providers.sync.webdav] # WebDAV 同步设置
url = ""
username = ""
password = ""
remote_path = "/heurams/"
verify_ssl = true
[sync]

View File

@@ -4,7 +4,6 @@
"""
import pathlib
import shutil
from contextvars import ContextVar
from heurams.services.config import ConfigFile
@@ -15,40 +14,19 @@ from heurams.services.logger import get_logger
# 数据文件路径规定: 以运行目录为准
rootdir = pathlib.Path(__file__).parent
print(f"项目根目录: {rootdir}")
workdir = pathlib.Path.cwd()
#print(f"项目根目录: {rootdir}")
#print(f"工作目录: {workdir}")
logger = get_logger(__name__)
logger.debug(f"项目根目录: {rootdir}")
workdir = pathlib.Path.cwd()
print(f"工作目录: {workdir}")
logger.debug(f"工作目录: {workdir}")
if pathlib.Path(workdir / "data" / "config" / "config_dev.toml").exists():
print("使用开发设置")
logger.debug("使用开发设置")
config_var: ContextVar[ConfigFile] = ContextVar(
"config_var",
default=ConfigFile(workdir / "data" / "config" / "config_dev.toml"),
)
else:
try:
config_var: ContextVar[ConfigFile] = ContextVar(
"config_var",
default=ConfigFile(workdir / "data" / "config" / "config.toml"),
) # 配置文件
except Exception as e:
input("按下回车以创建新的配置文件, 或按下 Ctrl + C 以终止程序 ")
(workdir / "data" / "config").mkdir(parents=True, exist_ok=True)
(workdir / "data" / "config" / "config").unlink(missing_ok=True)
shutil.copy(
(rootdir / "default" / "config" / "config.toml"),
workdir / "data" / "config" / "config.toml",
)
finally:
config_var: ContextVar[ConfigFile] = ContextVar(
"config_var",
default=ConfigFile(workdir / "data" / "config" / "config.toml"),
) # 配置文件
(workdir / "data" / "config").mkdir(parents=True, exist_ok=True)
config_var: ContextVar[ConfigFile] = ContextVar(
"config_var",
default=ConfigFile(workdir / "data" / "config" / "config.toml"),
)
class ConfigContext:
"""

View File

@@ -1,56 +0,0 @@
# [调试] 将更改保存到文件
persist_to_file = 1
# [调试] 覆写时间, 设为 -1 以禁用
daystamp_override = -1
timestamp_override = -1
# [调试] 一键通过
quick_pass = 1
# 对于每个项目的默认新记忆原子数量
scheduled_num = 8
# UTC 时间戳修正 仅用于 UNIX 日时间戳的生成修正, 单位为秒
timezone_offset = +28800 # 中国标准时间 (UTC+8)
[interface]
[interface.memorizor]
autovoice = true # 自动语音播放, 仅限于 recognition 组件
[algorithm]
default = "SM-2" # 主要算法; 可选项: SM-2, SM-15M, FSRS
[puzzles] # 谜题默认配置
[puzzles.mcq]
max_riddles_num = 2
[puzzles.cloze]
min_denominator = 3
[paths] # 相对于配置文件的 ".." (即工作目录) 而言 或绝对路径
data = "./data"
[services] # 定义服务到提供者的映射
audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Android Termux), mpg123(TODO)
tts = "edgetts" # 可选项: edgetts
llm = "openai" # 可选项: openai
sync = "webdav" # 可选项: 留空, webdav
[providers.tts.edgetts] # EdgeTTS 设置
voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声)
[providers.llm.openai] # 与 OpenAI 相容的语言模型接口服务设置
url = ""
key = ""
[providers.sync.webdav] # WebDAV 同步设置
url = ""
username = ""
password = ""
remote_path = "/heurams/"
verify_ssl = true
[sync]

View File

@@ -1,12 +1,15 @@
from typing import Type
from time import sleep
print("欢迎使用基本用户界面!")
print("加载配置... ", end="", flush=True)
from heurams.context import *
print("已完成!")
print("加载用户界面框架... ", end="", flush=True)
from textual.app import App
from textual.driver import Driver
from textual.widgets import Button
print("已完成!")
from heurams.context import config_var
from heurams.services.logger import get_logger
print("加载用户界面布局... ", end="", flush=True)
from .screens.about import AboutScreen
from .screens.dashboard import DashboardScreen
from .screens.llmchat import LLMChatScreen
@@ -16,27 +19,9 @@ from .screens.radio import RadioScreen
from .screens.repocreator import RepoCreatorScreen
from .screens.repoeditor import RepoEditorScreen
from .screens.synctool import SyncScreen
logger = get_logger(__name__)
def environment_check():
from pathlib import Path
logger.debug("检查环境路径")
subdir = ["cache/voice", "repo", "global", "config"]
for i in subdir:
i = Path(config_var.get()["paths"]["data"]) / i
if not i.exists():
logger.info("创建目录: %s", i)
print(f"创建 {i}")
i.mkdir(exist_ok=True, parents=True)
else:
logger.debug("目录已存在: %s", i)
print(f"找到 {i}")
logger.debug("环境检查完成")
print("已完成!")
print(f"组件目录: {rootdir}")
print(f"工作目录: {workdir}")
class HeurAMSApp(App):
TITLE = "潜进"
CSS_PATH = "css/main.tcss"
@@ -61,7 +46,6 @@ class HeurAMSApp(App):
}
def on_mount(self) -> None:
environment_check()
self.push_screen("dashboard")
def on_button_pressed(self, event: Button.Pressed) -> None:

View File

@@ -1,20 +1,29 @@
from textual.app import App
from textual.widgets import Button
from heurams.interface import *
from heurams.context import config_var
from heurams.interface import HeurAMSApp
from heurams.services.logger import get_logger
from .screens.about import AboutScreen
from .screens.dashboard import DashboardScreen
from .screens.precache import PrecachingScreen
from .screens.repocreator import RepoCreatorScreen
logger = get_logger(__name__)
def environment_check():
from pathlib import Path
logger.debug("检查环境路径")
subdir = ["cache/voice", "repo", "global", "config"]
for i in subdir:
i = Path(config_var.get()["paths"]["data"]) / i
if not i.exists():
logger.info("创建目录: %s", i)
print(f"创建 {i}")
i.mkdir(exist_ok=True, parents=True)
else:
logger.debug("目录已存在: %s", i)
print(f"找到 {i}")
logger.debug("环境检查完成")
def main():
environment_check()
app = HeurAMSApp()
app.run()
app.run(inline=False)
if __name__ == "__main__":
main()

View File

@@ -1,7 +1,20 @@
NavigatorScreen {
align: center middle;
}
.repo_listitem {
layout: grid;
grid-size: 2;
}
.repo_listitem_btn {
dock: right;
offset: -5% 0
}
#dialog {
grid-size: 2;
grid-gutter: 1 1;
@@ -40,6 +53,10 @@ LLMChatScreen {
background: $surface;
}
#dashboardtop {
height: 4
}
#input-container {
height: 3;
margin-top: 1;

View File

@@ -1,12 +1,14 @@
"""仪表盘界面"""
from functools import reduce
import pathlib
from pathlib import Path
from textual.app import ComposeResult
from textual.containers import ScrollableContainer
from textual.containers import ScrollableContainer, Container, Horizontal, Vertical
from textual.screen import Screen
from textual.widgets import Button, Footer, Header, Label, ListItem, ListView, Static
from textual.layouts import horizontal
import heurams.kernel.particles as pt
import heurams.services.timer as timer
@@ -42,21 +44,35 @@ class DashboardScreen(Screen):
self.repostat = {}
self.title2dirname = {}
self.title2repo = {}
self.dirname2repo = {}
self._load_data()
def compose(self) -> ComposeResult:
"""组合界面组件"""
yield Header(show_clock=True)
yield ScrollableContainer(
Label('欢迎使用 "潜进" 启发式辅助记忆调度器', classes="title-label"),
Label(
f"当前 UNIX 日时间戳: {timer.get_daystamp()} (UTC+{config_var.get()['timezone_offset'] / 3600})"
),
Label(f"全局算法设置: {config_var.get()['algorithm']['default']}"),
Label("选择待学习或待修改的项目:", classes="title-label"),
ListView(id="repo-list", classes="repo-list-view"),
Label(f'"潜进" 启发式辅助记忆调度器 版本 {version.ver} '),
)
with ScrollableContainer():
yield Horizontal(
Vertical(
Label('欢迎使用 "潜进" 启发式辅助记忆调度器'),
Label(
f"当前 UNIX 日时间戳: {timer.get_daystamp()} (UTC+{config_var.get()['timezone_offset'] / 3600})"
),
Label(f"全局算法设置: {config_var.get()['algorithm']['default']}"),
Label("选择待学习或待修改的项目:"),
classes="column",
),
Vertical(
Label(f"已加载 {len(self.repostat)} 个单元集", classes='dataview'),
Label(f"共计 {reduce(lambda x, y: x + y, map(lambda x: x.get('unit_sum'), self.repostat.values()))} 个单元", classes='dataview'),
Label(f"已激活 {reduce(lambda x, y: x + y, map(lambda x: x.get('activated_sum'), self.repostat.values()))} 个单元", classes='dataview'),
Label(""),
classes="column",
),
id="dashboardtop"
)
yield ListView(id="repo-list", classes="repo-list-view")
yield Label(f'"潜进" 启发式辅助记忆调度器 版本 {version.ver} ')
yield Footer()
def _load_data(self):
@@ -105,6 +121,7 @@ class DashboardScreen(Screen):
self.repostat[dirname] = stat
self.title2dirname[title] = dirname
self.title2repo[title] = repo
self.dirname2repo[dirname] = repo
def on_mount(self) -> None:
"""挂载组件时初始化"""
@@ -132,7 +149,7 @@ class DashboardScreen(Screen):
for repotitle in repotitles:
prompt = self.repostat[self.title2dirname[repotitle]]["prompt"]
list_item = ListItem(Label(prompt))
list_item = ListItem(Label(prompt), Button(f"开始学习", flat=True, variant="primary", classes="repo_listitem_btn", id=f"launch_{self.repostat[self.title2dirname[repotitle]]['dirname']}"), classes="repo_listitem")
repo_list_widget.append(list_item)
# if not self.stay_enabled[repodir]:
@@ -169,6 +186,9 @@ class DashboardScreen(Screen):
self.app.push_screen(NavigatorScreen())
def on_button_pressed(self, event: Button.Pressed) -> None:
logger.debug(f"event.button.id: {event.button.id}")
"""处理按钮点击事件"""
if event.button.id == "navigator-button":
self.action_open_navigator()
if str(event.button.id).startswith("launch_"): # type: ignore
from .preparation import launch
launch(repo=self.dirname2repo[event.button.id[7:]], app=self.app, scheduled_num=-1) # type: ignore
# TODO: 这样启动的记忆实例的状态机无法绑定到 PreparationScreen 中

View File

@@ -123,36 +123,41 @@ class PreparationScreen(Screen):
event.stop()
logger.debug("按下按钮")
if event.button.id == "start_memorizing_button":
atoms = list()
for i in self.repo.ident_index:
n = pt.Nucleon.create_on_nucleonic_data(
nucleonic_data=self.repo.nucleonic_data_lict.get_itemic_unit(i)
)
e = pt.Electron.create_on_electonic_data(
electronic_data=self.repo.electronic_data_lict.get_itemic_unit(i)
)
a = pt.Atom(n, e, self.repo.orbitic_data)
atoms.append(a)
atoms_to_provide = list()
left_new = self.scheduled_num
for i in atoms:
i: pt.Atom
if i.registry["electron"].is_activated():
if i.registry["electron"].is_due():
atoms_to_provide.append(i)
else:
left_new -= 1
if left_new >= 0:
atoms_to_provide.append(i)
import heurams.kernel.reactor as rt
from .memoqueue import MemScreen
pheser = rt.Phaser(atoms_to_provide)
save_func = self.repo.persist_to_repodir
memscreen = MemScreen(pheser, save_func, repo=self.repo)
self.app.push_screen(memscreen)
launch(repo=self.repo, app=self.app, scheduled_num=self.scheduled_num)
elif event.button.id == "precache_button":
self.action_precache()
def launch(repo, app, scheduled_num):
if scheduled_num == -1:
scheduled_num = config_var.get()["scheduled_num"]
atoms = list()
for i in repo.ident_index:
n = pt.Nucleon.create_on_nucleonic_data(
nucleonic_data=repo.nucleonic_data_lict.get_itemic_unit(i)
)
e = pt.Electron.create_on_electonic_data(
electronic_data=repo.electronic_data_lict.get_itemic_unit(i)
)
a = pt.Atom(n, e, repo.orbitic_data)
atoms.append(a)
atoms_to_provide = list()
left_new = scheduled_num
for i in atoms:
i: pt.Atom
if i.registry["electron"].is_activated():
if i.registry["electron"].is_due():
atoms_to_provide.append(i)
else:
left_new -= 1
if left_new >= 0:
atoms_to_provide.append(i)
import heurams.kernel.reactor as rt
from .memoqueue import MemScreen
pheser = rt.Phaser(atoms_to_provide)
save_func = repo.persist_to_repodir
memscreen = MemScreen(pheser, save_func, repo=repo)
app.push_screen(memscreen)

View File

@@ -5,7 +5,7 @@ from typing import TypedDict
from textual.containers import Container
from textual.message import Message
from textual.widget import Widget
from textual.widgets import Button, Label
from textual.widgets import Button, Label, Markdown
import heurams.kernel.particles as pt
import heurams.kernel.puzzles as pz
@@ -66,7 +66,7 @@ class ClozePuzzle(BasePuzzleWidget):
def compose(self):
yield Label(self.puzzle.wording, id="sentence")
yield Label(f"当前输入: {self.inputlist}", id="inputpreview")
yield Markdown(f"> {self.listprint(self.inputlist)}", id="inputpreview")
# 渲染当前问题的选项
with Container(id="btn-container"):
for i in self.ans:
@@ -77,9 +77,18 @@ class ClozePuzzle(BasePuzzleWidget):
yield Button("退格", id="delete")
def listprint(self, lst):
s = ""
if lst:
lastone = lst[-1]
for i in lst[:-1]:
s += (i + ' ')
s += f" `{lastone}`"
return s
def update_display(self):
preview = self.query_one("#inputpreview")
preview.update(f"当前输入: {self.inputlist}") # type: ignore
preview.update(f"> {self.listprint(self.inputlist)}") # type: ignore
def on_button_pressed(self, event: Button.Pressed) -> None:
button_id = event.button.id

View File

@@ -67,9 +67,7 @@ class Recognition(BasePuzzleWidget):
f";{delim}": ";",
f":{delim}": ":",
}
nucleon = self.atom.registry["nucleon"]
metadata = self.atom.registry["nucleon"]
primary = cfg["primary"]
with Center():

View File

@@ -49,13 +49,16 @@ class Fission(Machine):
self.min_ratings = []
for item, possibility in orbital_schedule: # type: ignore
logger.debug(f"开始处理: {item}")
puzzle = puz.puzzles[orbital_puzzles[item]["__origin__"]]
if not isinstance(possibility, float):
possibility = float(possibility)
while possibility > 1:
self.puzzles_inf.append(
{
"puzzle": puz.puzzles[orbital_puzzles[item]["__origin__"]],
"puzzle": puzzle,
"alia": item,
}
)
@@ -64,7 +67,7 @@ class Fission(Machine):
if random.random() <= possibility:
self.puzzles_inf.append(
{
"puzzle": puz.puzzles[orbital_puzzles[item]["__origin__"]],
"puzzle": puzzle,
"alia": item,
}
)

View File

@@ -6,15 +6,81 @@ import toml
from heurams.services.logger import get_logger
default_config = {
"persist_to_file": 1, # 将更改保存到文件
"daystamp_override": -1, # 覆写时间, 设为 -1 以禁用
"timestamp_override": -1, # 覆写时间, 设为 -1 以禁用
"quick_pass": 1, # 启用用于测试的快速通过
"scheduled_num": 8, # 对于每个项目的默认新记忆原子数量
"timezone_offset": 28800, # UTC 时间戳修正 仅用于 UNIX 日时间戳的生成修正, 单位为秒
# 28800 是中国标准时间 (UTC+8)
"interface": {
"memorizor": {
"autovoice": True # 自动语音播放, 仅限于 recognition 组件
}
},
"algorithm": {
"default": "SM-2" # 主要算法
# 可选项: SM-2, SM-15M, FSRS
},
"puzzles": { # 谜题默认配置
"mcq": {
"max_riddles_num": 2
},
"cloze": {
"min_denominator": 3
}
},
"paths": { # 相对于配置文件的 ".." (即工作目录) 而言 或绝对路径
"data": "./data"
},
"services": { # 定义服务到提供者的映射
"audio": "playsound", # 可选项: playsound(通用), termux(仅用于 Android Termux), mpg123(TODO)
"tts": "edgetts", # 可选项: edgetts
"llm": "openai", # 可选项: openai
"sync": "webdav" # 可选项: 留空, webdav
},
"providers": {
"tts": {
"edgetts": { # EdgeTTS 设置
"voice": "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声)
}
},
"llm": {
"openai": { # 与 OpenAI 相容的语言模型接口服务设置
"url": "",
"key": ""
}
},
"sync": {
"webdav": { # WebDAV 同步设置
"url": "",
"username": "",
"password": "",
"remote_path": "/heurams/",
"verify_ssl": True
}
}
},
}
class ConfigFile:
def __init__(self, path: pathlib.Path):
self.logger = get_logger(__name__)
self.path = path
self.data = dict()
if not self.path.exists():
self.path.touch()
self.logger.debug("创建配置文件: %s", self.path)
self.data = dict()
self.data = default_config
self.valid_configfile = 1
# 考虑到可能临时编辑格式错误, 所以不覆写格式错误的配置文件, 而是提示损坏并使用默认配置
self._load()
def _load(self):
@@ -26,7 +92,8 @@ class ConfigFile:
except toml.TomlDecodeError as e:
print(f"{e}")
self.logger.error("TOML解析错误: %s", e)
self.data = {}
self.data = default_config
self.valid_configfile = 0
def modify(self, key: str, value: typing.Any):
"""修改配置值并保存"""
@@ -36,10 +103,13 @@ class ConfigFile:
def save(self, path: typing.Union[str, pathlib.Path] = ""):
"""保存配置到文件"""
save_path = pathlib.Path(path) if path else self.path
with open(save_path, "w") as f:
toml.dump(self.data, f)
self.logger.debug("配置文件已保存: %s", save_path)
if self.valid_configfile:
save_path = pathlib.Path(path) if path else self.path
with open(save_path, "w") as f:
toml.dump(self.data, f)
self.logger.debug("配置文件已保存: %s", save_path)
else:
pass
def get(self, key: str, default: typing.Any = None) -> typing.Any:
"""获取配置值, 如果不存在返回默认值"""

View File

@@ -2,8 +2,7 @@
"""
将符合条件的CSV转为符合Payload需要的TOML格式
使用命令: python3 csv2payload.py <CSV路径> <生成TOML路径, 默认为文件名相同, 后缀为toml的TOML文件>
使用命令: python3 csv2payload.py <CSV路径> <生成TOML路径, 默认为文件名相同, 后缀为toml的TOML文件> [-r: 可选参数, 表示按照索引打乱顺序的随机整数种子]
转换规则:
1. `ident` 列用作 TOML 的 section 标题(`[ident]`)
@@ -64,19 +63,23 @@ meaning = "狗发出的声音"
- 生成序列基于原始 CSV 中 `ident` 为空的行出现的顺序
- 所有值都保留为字符串类型,符合 TOML 字符串格式要求
- 如果 CSV 包含更多列,它们也会以相同方式转换为键值对
- 支持 `-r` 参数指定随机种子来打乱 section 顺序
"""
import csv
import sys
import os
import random
import argparse
from pathlib import Path
def csv_to_toml(csv_path, toml_path=None):
def csv_to_toml(csv_path, toml_path=None, random_seed=None):
"""
将CSV文件转换为TOML格式
Args:
csv_path (str): 输入CSV文件路径
toml_path (str): 输出TOML文件路径默认为相同目录下同名文件
random_seed (int): 随机种子用于打乱section顺序None表示不打乱
"""
# 检查CSV文件是否存在
csv_file = Path(csv_path)
@@ -104,6 +107,12 @@ def csv_to_toml(csv_path, toml_path=None):
print("错误: CSV文件为空或格式不正确")
sys.exit(1)
# 如果指定了随机种子,设置随机种子并打乱行顺序
if random_seed is not None:
random.seed(random_seed)
random.shuffle(rows)
print(f"提示: 使用随机种子 {random_seed} 打乱了 section 顺序")
# 生成TOML内容
toml_content = []
idx_counter = 1
@@ -143,16 +152,26 @@ def csv_to_toml(csv_path, toml_path=None):
def main():
"""主函数"""
if len(sys.argv) < 2:
print("用法: python3 csv2payload.py <CSV路径> [<TOML路径>]")
print("示例: python3 csv2payload.py input.csv output.toml")
print("示例: python3 csv2payload.py input.csv # 自动生成input.toml")
sys.exit(1)
parser = argparse.ArgumentParser(
description='将CSV文件转换为TOML格式支持随机打乱section顺序',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
示例:
%(prog)s input.csv output.toml
%(prog)s input.csv # 自动生成input.toml
%(prog)s input.csv -r 42 # 使用种子42打乱顺序
%(prog)s input.csv -r 123 output.toml # 指定种子和输出路径
'''
)
csv_path = sys.argv[1]
toml_path = sys.argv[2] if len(sys.argv) > 2 else None
parser.add_argument('csv_path', help='输入的CSV文件路径')
parser.add_argument('toml_path', nargs='?', help='输出的TOML文件路径默认为CSV同名文件')
parser.add_argument('-r', '--random-seed', type=int,
help='随机种子用于打乱TOML section的顺序')
csv_to_toml(csv_path, toml_path)
args = parser.parse_args()
csv_to_toml(args.csv_path, args.toml_path, args.random_seed)
if __name__ == "__main__":
main()