91 Commits

Author SHA1 Message Date
aa99aa7686 上下文管理器与进一步移植 2025-10-13 00:28:15 +08:00
6dac9d5a7c 初次移植 2025-10-12 00:01:52 +08:00
f19fcac6c4 准备重构 2025-10-11 22:05:25 +08:00
ebeb62f72d 性能改进 2025-10-07 23:30:16 +08:00
d71d8eb1ec 更新 2025-10-07 23:23:17 +08:00
4ac12479cc 修改 2025-10-07 03:00:42 +08:00
7fc53e4113 算法与交互改进, 以及同步原型 2025-10-07 02:49:26 +08:00
06c62e284d 更新版本号 2025-10-03 13:41:41 +08:00
d5d31eb5fe 改进 2025-10-02 16:49:27 +08:00
156f558a45 重构预缓存实用程序, 并保留分离式旧版本 2025-10-02 14:55:24 +08:00
e96832a60c 若干改进 2025-10-02 14:40:40 +08:00
e1a5de74ad 更新自述文件 2025-10-02 00:59:02 +08:00
90339be30e 更新自述文件 2025-10-02 00:58:30 +08:00
4da80d26a3 添加规划计算器实用程序, 并添加 shebang 和一些优化 2025-10-02 00:40:41 +08:00
cb062788a7 增加再学习机制和状态反馈 2025-10-01 12:59:41 +08:00
acfd179435 改进 2025-09-30 22:27:34 +08:00
d1b606782f 新建单元集界面改进 2025-09-30 00:38:48 +08:00
784cf41003 新建单元集界面 2025-09-30 00:13:54 +08:00
0f342c96ee 更新版本号 2025-09-29 23:55:44 +08:00
3022ada51a 更新准备界面 2025-09-29 23:53:07 +08:00
9dada59c56 加入时间戳修正配置 2025-09-29 23:30:51 +08:00
8683693cd1 将数据文件移至单独的 Nucleons 仓库维护 2025-09-22 12:34:01 +08:00
144d7cb83c 修补 2025-09-15 12:37:07 +08:00
44573624aa 问题修复 2025-09-14 23:29:01 +08:00
1b33bb8618 兼容旧版 Textual 2025-09-14 23:27:59 +08:00
d83025d818 兼容旧版 Textual 2025-09-14 23:26:49 +08:00
f7e93cf05f 兼容旧版 Textual 2025-09-14 23:25:06 +08:00
e30cefeb44 兼容旧版 Textual 2025-09-14 23:23:13 +08:00
7dc963d491 兼容旧版 Textual 2025-09-14 23:22:03 +08:00
3640d8a799 算法修复与实验性扩展支持 2025-09-14 23:19:24 +08:00
1ea34ab87a 更新版本号 2025-09-14 02:08:16 +08:00
2bedc686a5 修复空文件夹 2025-09-14 02:07:53 +08:00
19d0e32b6f 更新用户界面, 修复总复习 2025-09-14 01:54:33 +08:00
50a5b9b108 添加开源许可证 2025-09-11 00:19:34 +08:00
5a096e4b4f 更新版本号 2025-09-11 00:05:18 +08:00
65491117d3 问题修复与更新 2025-09-11 00:04:02 +08:00
c82eedde82 问题修复与更新 2025-09-11 00:03:20 +08:00
697e3b2b8f 更新配置文件 2025-09-10 23:51:28 +08:00
11eff7da43 更新说明 2025-09-10 23:29:53 +08:00
c93bcdd489 删除无用文件 2025-09-10 23:27:34 +08:00
bb99b0a0b7 增加软件管理实用程序与音频缓存机制修复 2025-09-10 23:27:02 +08:00
6293b69ef0 改进 2025-09-08 13:59:09 +08:00
afb7252f71 若干改进 2025-09-08 13:44:14 +08:00
5e96fc8138 更新文件树 2025-08-30 22:03:34 +08:00
e64d1711d0 更新 README 2025-08-30 00:06:16 +08:00
beeb2fd318 改进 🍰 2025-08-28 13:08:37 +08:00
fc58f61dfe 新增源文件 2025-08-23 23:02:30 +08:00
e11e7e781b 添加更多源文件 2025-08-23 13:44:03 +08:00
39459a0f6e 更新版本号 2025-08-21 13:30:59 +08:00
cccf7189e3 自动播放机制 2025-08-21 13:30:30 +08:00
2c51f2cea3 改动 2025-08-21 06:28:22 +08:00
2ad014fcd8 更新文件树 2025-08-16 07:33:42 +08:00
4ad289d02d 改进部署组件 2025-08-14 11:16:43 +08:00
28ccfdd227 删除运行时文件 2025-08-14 11:13:11 +08:00
f83d5c934d 增加若干元数据 2025-08-14 11:12:33 +08:00
4f9eb3b7d1 重命名文件 2025-08-12 19:10:14 +08:00
c44a38f3c8 更新自述文件 2025-08-09 22:44:02 +08:00
f760e7f0fa 预缓存实用程序改动 2025-08-09 08:41:40 +08:00
30eb45e1cb 更新自述文件 2025-08-06 07:52:42 +08:00
2a30f136cb 更新自述文件 2025-08-06 07:52:15 +08:00
051c4847b2 更新自述文件 2025-08-06 07:43:36 +08:00
0873caa5fc 若干善后改进 2025-08-06 07:42:43 +08:00
6d3d2e665c 实装自动评分系统 2025-08-06 06:46:30 +08:00
edf2f0868a 改进 2025-08-06 06:30:41 +08:00
d5ef5e84d0 规范代码 2025-08-06 06:11:30 +08:00
dd74dddf00 规范代码 2025-08-06 06:11:22 +08:00
2cf2cdb33f 更新版本号 2025-08-06 05:31:05 +08:00
385a86eb2c 更新版本号 2025-08-06 05:30:30 +08:00
46d992b408 组件可用性更新 2025-08-06 05:27:38 +08:00
b77f3f2abe 布局更新 2025-08-06 04:57:28 +08:00
6c28bd461b 修复 2025-08-05 20:29:20 +08:00
44f75bca90 更新谜题生成器 2025-08-05 19:40:36 +08:00
961f0ba785 改进 2025-08-05 14:13:11 +08:00
c9185fbd0a 逻辑改进 2025-08-04 01:24:35 +08:00
4beb42f615 重构布局系统并改进界面 2025-08-03 03:57:21 +08:00
445e15646b 改进 2025-08-02 17:48:42 +08:00
4d6d1f1b60 中途改进 2025-07-30 23:10:43 +08:00
722ae6f72c 完善 webshare 2025-07-30 16:16:50 +08:00
88650b0856 完善 webshare 2025-07-30 16:15:36 +08:00
620473863a 完善 webshare 2025-07-30 16:13:03 +08:00
9d8cb17cc6 完善项目安装配置 & 中途改进 2025-07-30 16:07:31 +08:00
123f7919b0 实现音频预缓存实用程序 2025-07-30 01:55:46 +08:00
2eeb60c75b 更新版本号 2025-07-30 01:34:06 +08:00
2c870377a6 修复 ElectronUnion 对象初始化问题 2025-07-30 01:32:31 +08:00
36562323b7 使用 NucleonUnion & ElectronUnion 取代繁琐的 AtomicFiles 2025-07-30 01:11:03 +08:00
b91176241b 小改动 2025-07-28 16:48:46 +08:00
edfeb0b40b README.md 2025-07-27 21:54:50 +08:00
9d658ea646 README.md 2025-07-27 21:54:27 +08:00
5d3c354d8f 中途改进 2025-07-27 21:39:34 +08:00
d27d353374 更新自述文件, 更新 gitignore 2025-07-25 23:57:58 +08:00
3988b55f1f v0.2.5 修复初次激活相关问题 2025-07-25 23:50:07 +08:00
58 changed files with 1074 additions and 2390 deletions

216
.gitignore vendored
View File

@@ -1 +1,215 @@
.vscode
# Project specific additions
.devflag
.vscode/
.directory
__pycache__/
.idea/
cache/
nucleon/test.toml
electron/test.toml
*.egg-info/
build/
dist/
# Project specific directories
config/
data/cache/
data/electrion/
data/nucleon/
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly used for packaging.
#poetry.lock
#poetry.toml
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
#pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
.idea/
# Audio cache and temporary files
*.mp3
*.wav
*.ogg
*.tmp
# LLM cache files
*.cache
*.jsonl
# Log files
*.log
logs/
# OS generated files
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db
# Windows
Thumbs.db
ehthumbs.db
Desktop.ini
# Linux
*~
# VSCode
.vscode/
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
*.code-workspace
# Temporary files
*.tmp
*.temp

View File

@@ -1,57 +0,0 @@
# 潜进 (HeurAMS) - 开放源代码实验型辅助记忆程序
> 形人而我无形,**则我专而敌分**
## 概述
"潜进" (HeurAMS, 中文含义: 启发式辅助记忆软件) 是一款为古诗词设计的记忆辅助软件, 集成记忆拟合算法、自然语音技术与生成式人工智能, 提供科学的记忆训练解决方案
## 核心特性
### 科学记忆拟合算法
- 采用经实证的 SM-2 间隔重复算法
- 动态优化每首诗词的记忆间隔时间表
- 实时跟踪记忆曲线,最大化长期记忆保留率与稳定性
### 技术集成与优化
- 逐字解析:支持点击查看每个字的详细释义
- 语法分析:接入生成式人工智能, 支持古文结构**交互式**解析
- 自然语音:集成微软神经网络文本转语音 (TTS) 技术
### 现代用户界面
- 响应式 Textual 框架构建的跨平台 TUI 界面
- 支持触屏/鼠标/键盘多操作模式
- 简洁直观的复习流程设计
## 技术架构
``` mermaid
graph TD
subgraph 后端
A[SM-2算法] --> B[间隔预测引擎]
B --> C[个性化记忆模型]
end
subgraph 用户界面
D[诗词展示模块] --> E[交互复习界面]
E --> F[进度追踪面板]
end
subgraph 外部服务
G[词义解析]
H[语法分析]
I[语音合成]
end
C --> D
F -->|用户数据| C
D --> G
D --> H
D --> I
```
## 系统要求
- 平台支持Windows / macOS / Linux / Android(需要 Termux) (终端或浏览器)
- 网络连接:可预缓存语音文件, 需联网使用大模型服务功能

Binary file not shown.

View File

@@ -1,33 +0,0 @@
import time
import pathlib
import toml
class ConfigFile():
def __init__(self, path):
self.path = pathlib.Path(path)
if self.path.exists() == 0:
self.path.touch()
self.data = dict()
with open(self.path, 'r') as f:
self.data = toml.load(f)
def modify(self, key, value):
self.data[key] = value
self.save()
def save(self, path=""):
if path == "":
path = self.path
with open(path, 'w') as f:
toml.dump(self.data, f)
def get(self, key, default = None):
return self.data.get(key, default)
def get_daystamp() -> int:
config = ConfigFile("config.toml")
time_override = config.get("time_override", -1)
if time_override is not None and time_override != -1:
print(f"TIME OVERRIDEED TO {time_override}")
return int(time_override)
return int(time.time() // (24 * 3600))

View File

@@ -1,8 +0,0 @@
# [调试] 将更改保存到文件
save = 1
# [调试] 覆写时间
time_override = 10
# 对于每个项目的新记忆核子数量
tasked_number = 12
# 竖屏适配
mobile_mode = 1

245
main.py
View File

@@ -1,245 +0,0 @@
from textual.app import App, ComposeResult
from textual.widgets import Header, Footer, ListView, ListItem, Label, Static, Button
from textual.containers import Container, Horizontal
from textual.screen import Screen
import pathlib
import threading
import edge_tts as tts
from playsound import playsound
from textual.logging import TextualHandler
import particles as pt
from reactor import Reactor
import auxiliary as aux
ver = '0.2.4'
config = aux.ConfigFile("config.toml")
class MemScreen(Screen):
BINDINGS = [
("d", "toggle_dark", "改变色调"),
("q", "pop_screen", "返回主菜单"),
("v", "play_voice", "朗读"),
("0", "press('q0')", None),
("1", "press('q1')", None),
("2", "press('q2')", None),
("3", "press('q3')", None),
("4", "press('q4')", None),
("5", "press('q5')", None),
("[", "press('q5')", None),
("]", "press('q4')", None),
(";", "press('q3')", None),
("'", "press('q2')", None),
(".", "press('q1')", None),
("/", "press('q0')", None),
]
btn = dict()
def __init__(
self,
nucleon_file: pt.AtomicFile,
electron_file: pt.AtomicFile,
tasked_num
):
super().__init__(name=None, id=None, classes=None)
self.reactor = Reactor(nucleon_file, electron_file, tasked_num)
self.stage = 1
self.stage += self.reactor.set_round_templated(self.stage)
#print(self.reactor.procession)
self.reactor.forward()
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Container(id="main_container"):
yield Label(self.reactor.round_title, id="round_title")
yield Label("记住了吗?", id="question")
yield Static(self.reactor.current_atom[1].content, id="sentence")
yield Static("", id="feedback") # 用于显示反馈
yield Label(self._get_progress_text(), id="progress")
with Container(id="button_container"):
self.btn['5'] = Button("完美回想", variant="success", id="q5", classes="choice")
self.btn['4'] = Button("犹豫后正确", variant="success", id="q4", classes="choice")
self.btn['3'] = Button("困难地正确", variant="warning", id="q3", classes="choice")
self.btn['2'] = Button("错误但熟悉", variant="warning", id="q2", classes="choice")
self.btn['1'] = Button("错误且不熟", variant="error", id="q1", classes="choice")
self.btn['0'] = Button("完全空白", variant="error", id="q0", classes="choice")
yield Horizontal(self.btn['5'], self.btn['4'])
yield Horizontal(self.btn['3'], self.btn['2'])
yield Horizontal(self.btn['1'], self.btn['0'])
yield Footer()
def _get_progress_text(self):
return f"{len(self.reactor.procession) - self.reactor.index}/{len(self.reactor.procession)}"
def on_mount(self):
# 首次挂载时调用
self._update_ui()
def _update_ui(self):
self.query_one("#round_title", Label).update(self.reactor.round_title)
self.query_one("#sentence", Static).update(self.reactor.current_atom[1].content)
self.query_one("#progress", Label).update(self._get_progress_text())
self.query_one("#feedback", Static).update("") # 清除任何之前的反馈消息
def _show_finished_screen(self, message):
self.query_one("#question", Label).update(message)
self.query_one("#sentence", Static).update("已经完成记忆任务")
self.query_one("#round_title").display = False
self.query_one("#progress").display = False
for i in range(6):
self.query_one(f"#q{i}", Button).display = False
def on_button_pressed(self, event):
feedback_label = self.query_one("#feedback", Static)
if type(event) == str:
btnid = event
else:
btnid = event.button.id
btnid = str(btnid)
quality = int(btnid.replace('q', ''))
assessment = self.reactor.report(self.reactor.current_atom, quality)
if assessment == 1:
# 需要复习
feedback_label.update(f"评分为 {quality}, 已经加入至复习, 请重复记忆")
else:
ret = self.reactor.forward(1)
if ret == -1:
if self.reactor.round_set == 0:
if self.stage == 4:
# NOTE #
if config.get("save"):
self.reactor.save()
self._show_finished_screen("今日目标已完成")
else:
self.reactor.set_round_templated(self.stage)
self.reactor.forward(1)
self._update_ui()
self.stage += 1
return
#feedback_label.update("") # 清除反馈消息
self._update_ui()
def action_press(self, btnid):
self.on_button_pressed(btnid)
def action_play_voice(self):
def play():
cache_dir = pathlib.Path(f"./cache/voice/")
cache_dir.mkdir(parents = True, exist_ok = True)
cache = cache_dir / f"{self.reactor.current_atom[1].content}.wav"
if not cache.exists():
communicate = tts.Communicate(self.reactor.current_atom[1].content, "zh-CN-YunjianNeural")
communicate.save_sync(f"./cache/voice/{self.reactor.current_atom[1].content}.wav")
playsound(str(cache))
threading.Thread(target=play).start()
def action_toggle_dark(self):
self.app.action_toggle_dark()
def action_pop_screen(self):
"""返回到上一个屏幕"""
self.app.pop_screen()
class PreparationScreen(Screen):
BINDINGS = [
("q", "go_back", "返回"),
("escape", "quit_app", "退出")
]
def __init__(self, nucleon_file: pt.AtomicFile, electron_file: pt.AtomicFile) -> None:
super().__init__(name=None, id=None, classes=None)
self.nucleon_file = nucleon_file
self.electron_file = electron_file
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Container(id="learning_screen_container"):
yield Label(f"记忆项目: [b]{self.nucleon_file.name}[/b]\n")
yield Label(f"核子文件对象: ./nucleon/[b]{self.nucleon_file.name}[/b].toml")
yield Label(f"电子文件对象: ./electron/[b]{self.electron_file.name}[/b].toml")
yield Label(f"核子数量:{self.nucleon_file.get_len()}")
yield Button("开始记忆", id="start_memorizing_button", variant="primary", classes="start-button")
yield Static(f"\n全文如下:\n")
yield Static(self.nucleon_file.get_full_content(), classes="full")
yield Footer()
def action_go_back(self):
self.app.pop_screen()
def action_quit_app(self):
self.app.exit()
def on_button_pressed(self, event: Button.Pressed) -> None:
pass
if event.button.id == "start_memorizing_button":
#init_file(Path(self.atom_file).name)
newscr = MemScreen(self.nucleon_file, self.electron_file, config.get("tasked_number", 8))
self.app.push_screen(
newscr
)
#if event.button.id == "edit_metadata_button":
# init_file(Path(self.atom_file).name)
# os.system("reset;nano ./data/" + str(Path(self.atom_file).name.replace(".txt", "_atoms.json")))
class FileSelectorScreen(Screen):
global ver
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
yield Container(
Label(f'欢迎使用 "潜进" 辅助记忆软件, 版本 {ver}', classes="title-label"),
Label("选择要学习的文件:", classes="title-label"),
ListView(id="file-list", classes="file-list-view")
)
yield Footer()
def on_mount(self) -> None:
file_list_widget = self.query_one("#file-list", ListView)
nucleon_path = pathlib.Path("./nucleon")
nucleon_files = sorted([f.name for f in nucleon_path.iterdir() if f.suffix == ".toml"])
if nucleon_files:
for filename in nucleon_files:
file_list_widget.append(ListItem(Label(filename)))
else:
file_list_widget.append(ListItem(Static("在 ./nucleon/ 中未找到任何核子文件. 请放置文件后重启应用.")))
file_list_widget.disabled = True
def on_list_view_selected(self, event: ListView.Selected) -> None:
if not isinstance(event.item, ListItem):
self.notify("无法选择此项。", severity="error")
return
selected_label = event.item.query_one(Label)
if "未找到任何 .toml 文件" in str(selected_label.renderable):
self.notify("请先在 `./atoms/` 目录中放置 .toml 文件。", severity="warning")
return
selected_filename = str(selected_label.renderable)
nucleon_file = pt.AtomicFile(pathlib.Path("./nucleon") / selected_filename, "nucleon")
electron_file_path = pathlib.Path("./electron") / selected_filename
if electron_file_path.exists():
pass
else:
electron_file_path.touch()
electron_file = pt.AtomicFile(pathlib.Path("./electron") / selected_filename, "electron")
# self.notify(f"已选择: {selected_filename}", timeout=2)
self.app.push_screen(PreparationScreen(nucleon_file, electron_file))
def action_quit_app(self) -> None:
self.app.exit()
class AppLauncher(App):
CSS_PATH = "styles.tcss"
TITLE = '潜进 - 辅助记忆程序'
BINDINGS = [("escape", "quit", "退出"), ("d", "toggle_dark", "改变色调")]
SCREENS = {
"file_selection_screen": FileSelectorScreen,
}
def on_mount(self) -> None:
self.action_toggle_dark()
self.push_screen("file_selection_screen")
if __name__ == "__main__":
app = AppLauncher()
app.run()

View File

@@ -1,139 +0,0 @@
["臣密言:臣以险衅, 夙遭闵凶."]
note = []
translation = "臣子李密陈言:我因命运不好,小时候遭遇到了不幸"
keyword_note = {"险衅"="凶险祸患(这里指命运不好)", "夙"="早时,这里指年幼的时候", "闵"="通'悯',指可忧患的事", "凶"="不幸,指丧父"}
["生孩六月, 慈父见背;行年四岁, 舅夺母志."]
note = []
translation = "刚出生六个月,我慈爱的父亲就不幸去世了。经过了四年,舅父逼母亲改嫁"
keyword_note = {"见背"="死的委婉说法", "行年"="经历的年岁", "母志"="母亲守节之志(改嫁的委婉说法)"}
["祖母刘愍臣孤弱, 躬亲抚养."]
note = []
translation = "我的祖母刘氏,怜悯我从小丧父,便亲自对我加以抚养"
keyword_note = {"愍"="怜悯", "躬亲"="亲身"}
["臣少多疾病, 九岁不行, 零丁孤苦, 至于成立."]
note = []
translation = "臣小的时候经常生病,九岁时还不会行走。孤独无靠,一直到成人自立"
keyword_note = {"成立"="成人自立"}
["既无伯叔, 终鲜兄弟, 门衰祚薄, 晚有儿息."]
note = []
translation = "既没有叔叔伯伯,又没什么兄弟,门庭衰微而福分浅薄,很晚才有儿子"
keyword_note = {"鲜"="少,这里指'无'", "祚薄"="福分浅薄", "儿息"="亲生子女"}
["外无期功强近之亲, 内无应门五尺之僮, 茕茕孑立, 形影相吊."]
note = []
translation = "在外面没有比较亲近的亲戚,在家里又没有照应门户的童仆。生活孤单没有依靠,每天只有自己的身体和影子相互安慰"
keyword_note = {"期功"="指关系较近的亲属", "茕茕孑立"="孤单无依靠的样子", "吊"="安慰"}
["而刘夙婴疾病, 常在床蓐, 臣侍汤药, 未曾废离."]
note = []
translation = "但祖母又早被疾病缠绕,常年卧床不起,我侍奉她吃饭喝药,从来就没有停止侍奉而离开她"
keyword_note = {"婴"="被...缠绕", "蓐"="通'褥',床垫", "废"="停止服侍", "离"="离开"}
["逮奉圣朝, 沐浴清化."]
note = []
translation = "到了晋朝建立,我蒙受着清明的政治教化"
keyword_note = {"逮"="及,到", "奉"="承奉", "圣朝"="指当时的晋朝", "沐浴清化"="蒙受清平教化"}
["前太守臣逵察臣孝廉;后刺史臣荣举臣秀才."]
note = []
translation = "前任太守逵,考察后推举臣下为孝廉,后任刺史荣又推举臣下为优秀人才"
keyword_note = {"察"="考察和推举", "孝廉"="孝顺,品性纯洁", "举"="推举", "秀才"="优秀人才"}
["臣以供养无主, 辞不赴命."]
note = []
translation = "臣下因为供奉赡养祖母的事无人承担,辞谢不接受任命"
keyword_note = {"无主"="无人承担"}
["诏书特下, 拜臣郎中, 寻蒙国恩, 除臣洗马."]
note = []
translation = "朝廷又特地下了诏书,任命我为郎中,不久又蒙受国家恩命,任命我为太子洗马"
keyword_note = {"拜"="授予官职", "郎中"="尚书省的属官", "寻"="不久", "除"="拜官受职", "洗马"="太子的属官"}
["猥以微贱, 当侍东宫, 非臣陨首所能上报."]
note = []
translation = "像我这样出身微贱地位卑下的人,担当侍奉太子的职务,这实在不是我杀身捐躯所能报答朝廷的"
keyword_note = {"猥"="谦词", "微贱"="卑微低贱", "东宫"="太子居处", "陨首"="杀身"}
["臣具以表闻, 辞不就职."]
note = []
translation = "我将以上苦衷上表报告,加以推辞不去就职"
keyword_note = {"具"="详细", "闻"="使...知道"}
["诏书切峻, 责臣逋慢;郡县逼迫, 催臣上道;州司临门, 急于星火."]
note = []
translation = "但是诏书急切严峻,责备我逃避命令,有意拖延,态度傲慢。郡县长官催促我立刻上路;州官登门督促,比流星坠落还要急迫"
keyword_note = {"切峻"="急切而严厉", "逋慢"="逃避怠慢", "星火"="流星的光,喻急迫"}
["臣欲奉诏奔驰, 则刘病日笃, 欲苟顺私情, 则告诉不许."]
note = []
translation = "我很想遵从皇上的旨意赴京就职,但祖母刘氏的病却一天比一天重;想要姑且顺从自己的私情,但报告申诉不被允许"
keyword_note = {"日笃"="病情日益加重", "苟"="姑且", "告诉"="报告申诉"}
["臣之进退, 实为狼狈."]
note = []
translation = "我是进退两难,十分狼狈"
keyword_note = {"狼狈"="进退两难的样子"}
["伏惟圣朝以孝治天下, 凡在故老, 犹蒙矜育, 况臣孤苦, 特为尤甚."]
note = []
translation = "我俯伏思量晋朝是用孝道来治理天下的,凡是年老而德高的旧臣,尚且还受到怜悯养育,何况我的孤苦程度更为严重呢"
keyword_note = {"伏惟"="下对上的敬辞", "故老"="年老德高的旧臣", "矜育"="怜悯养育"}
["且臣少仕伪朝, 历职郎署, 本图宦达, 不矜名节."]
note = []
translation = "况且我年轻的时候曾经做过蜀汉的官,担任过郎官职务,本来就希望做官显达,并不顾惜名声节操"
keyword_note = {"伪朝"="对前朝的蔑称", "历职"="连续任职", "郎署"="尚书郎的官衙", "宦达"="官场上显达"}
["今臣亡国贱俘, 至微至陋, 过蒙拔擢, 宠命优渥, 岂敢盘桓, 有所希冀!"]
note = []
translation = "现在我是一个低贱的亡国俘虏,十分卑微浅陋,受到过分提拔,恩宠优厚,怎敢犹豫不决而有非分的企求呢"
keyword_note = {"拔擢"="提拔", "优渥"="优厚", "盘桓"="徘徊不前", "希冀"="非分的企求"}
["但以刘日薄西山, 气息奄奄, 人命危浅, 朝不虑夕."]
note = []
translation = "只是因为祖母刘氏寿命即将终了,气息微弱,生命垂危,早上不能想到晚上怎样"
keyword_note = {"日薄西山"="太阳接近西山,喻人寿命将终", "危浅"="生命垂危"}
["臣无祖母, 无以至今日, 祖母无臣, 无以终余年."]
note = []
translation = "臣下我如果没有祖母,就没有今天的样子;祖母如果没有我的照料,也无法度过她的余生"
keyword_note = {"终余年"="度过余生"}
["母孙二人, 更相为命, 是以区区不能废远."]
note = []
translation = "我们祖孙二人,互相依靠而维持生命,因此我的内心不愿废止奉养,远离祖母"
keyword_note = {"更相"="相互", "区区"="自己的私情", "废远"="废止奉养而远离"}
["臣密今年四十有四, 祖母今年九十有六, 是臣尽节于陛下之日长, 报养刘之日短."]
note = []
translation = "臣下我现在的年龄四十四岁了,祖母现在的年龄九十六岁了,臣下我在陛下面前尽忠尽节的日子还长着呢,而在祖母刘氏面前尽孝尽心的日子已经不多了"
keyword_note = {"有"="又", "尽节"="尽忠节"}
["乌鸟私情, 愿乞终养."]
note = []
translation = "我怀着乌鸦反哺的私情,乞求能够准许我完成对祖母养老送终的心愿"
keyword_note = {"乌鸟私情"="乌鸦反哺之情,喻孝心", "终养"="养老至终"}
["臣之辛苦, 非独蜀之人士及二州牧伯所见明知, 皇天后土, 实所共鉴."]
note = []
translation = "我的辛酸苦楚,并不仅仅被蜀地的百姓及益州、梁州的长官所亲眼目睹、内心明白,连天地神明也都看得清清楚楚"
keyword_note = {"辛苦"="辛酸苦楚", "牧伯"="州郡长官", "皇天后土"="天地神明", "鉴"="明察"}
["愿陛下矜悯愚诚, 听臣微志, 庶刘侥幸, 保卒余年."]
note = []
translation = "希望陛下能怜悯我愚昧诚心,请允许我完成臣下一点小小的心愿,使祖母刘氏能够侥幸地保全她的余生"
keyword_note = {"矜悯"="怜悯", "听"="准许", "庶"="或许(表希望)", "卒余年"="终老"}
["臣生当陨首, 死当结草."]
note = []
translation = "我活着应当杀身报效朝廷,死了也要结草衔环来报答陛下的恩情"
keyword_note = {"陨首"="掉脑袋,指献出生命", "结草"="死后报恩的典故"}
["臣不胜犬马怖惧之情, 谨拜表以闻."]
note = []
translation = "臣下我怀着牛马一样不胜恐惧的心情,恭敬地呈上此表来使陛下知道这件事"
keyword_note = {"不胜"="禁不住", "犬马怖惧"="臣子谦卑的自比", "闻"="使...知道"}

View File

@@ -1,170 +0,0 @@
import pathlib
import toml
import time
import auxiliary as aux
class Electron():
"""电子: 记忆分析元数据及算法"""
algorithm = "SM-2" # 暂时使用 SM-2 算法进行记忆拟合, 考虑 SM-15 替代
"""
content = "" # 内容
efactor = 2.5 # 易度系数, 越大越简单, 最大为5
real_rept = 0 # (实际)重复次数
rept = 0 # (有效)重复次数
interval = 0 # 最佳间隔
last_date = 0 # 上一次复习的时间戳
next_date = 0 # 将要复习的时间戳
is_activated = 0 # 激活状态
# *NOTE: 此处"时间戳"是以天为单位的整数, 即 UNIX 时间戳除以一天的秒数取整
last_modify = 0 # 最后修改时间戳(此处是UNIX时间戳)
"""
def __init__(self, content: str, data: dict):
self.content = content
self.efactor = data.get('efactor', 2.5)
self.real_rept = data.get('real_rept', 0)
self.rept = data.get('rept', 0)
self.interval = data.get('interval', 0)
self.last_date = data.get('last_date', 0)
self.next_date = data.get('next_date', 0)
self.is_activated = data.get('is_activated', 0)
self.last_modify = time.time()
def activate(self):
self.is_activated = 1
def modify(self, var: str, value):
setattr(self, var, value)
self.last_modify = time.time()
def export_data(self):
return {
'efactor': self.efactor,
'real_rept': self.real_rept,
'rept': self.rept,
'interval': self.interval,
'last_date': self.last_date,
'next_date': self.next_date,
'is_activated': self.is_activated
}
def revisor(self, quality):
"""SM-2 算法迭代决策机制实现
根据 quality(0 ~ 5) 进行参数迭代最佳间隔
quality 由主程序评估
Args:
quality (int): 记忆保留率量化参数
"""
if quality == -1:
return -1
self.efactor = self.efactor + (0.1 - (5 - quality) * (0.08 + (5 - quality) * 0.02))
self.efactor = max(1.3, self.efactor)
if quality < 3:
# 若保留率低于 3重置重复次数
self.rept = 0
self.interval = 0 # 设为0以便下面重新计算 I(1)
else:
self.rept += 1
self.real_rept += 1
if self.rept == 0: # 刚被重置或初次激活后复习
self.interval = 1 # I(1)
elif self.rept == 1:
self.interval = 6 # I(2) 经验公式
else:
self.interval = round(self.interval * self.efactor)
self.last_date = aux.get_daystamp()
self.next_date = aux.get_daystamp() + self.interval
def __str__(self):
return (f"记忆单元预览 \n"
f"内容: '{self.content}' \n"
f"易度系数: {self.efactor:.2f} \n"
f"已经重复的次数: {self.rept} \n"
f"下次间隔: {self.interval}\n"
f"下次复习日期时间戳: {self.next_date}")
def __eq__(self, other):
if self.content == other.content:
return 1
return 0
def __hash__(self):
return hash(self.content)
@staticmethod
def placeholder():
return Electron("电子对象样例内容", {})
@staticmethod
def import_from_file(path: pathlib.Path):
name = path.name.replace(path.suffix, "")
with open(path, 'r') as f:
all = toml.load(f)
lst = list()
for i in all.keys():
lst.append(Electron(i, all[i]))
return (name, lst)
@staticmethod
def save_to_file(electron_dictized, path: pathlib.Path):
with open(path, 'w') as f:
toml.dump(electron_dictized, f)
class Nucleon():
"""核子: 材料元数据"""
def __init__(self, content: str, data: dict):
self.metadata = data
self.content = content
@staticmethod
def import_from_file(path: pathlib.Path):
name = path.name.replace(path.suffix, "")
with open(path, 'r') as f:
all = toml.load(f)
lst = list()
for i in all.keys():
lst.append(Nucleon(i, all[i]))
return (name, lst)
@staticmethod
def save_to_file(nucleon_dictized, path: pathlib.Path):
with open(path, 'w') as f:
toml.dump(nucleon_dictized, f)
@staticmethod
def placeholder():
return Nucleon("核子对象样例内容", {})
class AtomicFile():
def __init__(self, path, type_="unknown"):
self.path = path
self.type_ = type_
if type_ == "nucleon":
self.name, self.datalist = Nucleon.import_from_file(pathlib.Path(path))
if type_ == "electron":
self.name, self.datalist = Electron.import_from_file(pathlib.Path(path))
def save(self):
dictobj = {i.content: i.export_data() for i in self.datalist}
print(dictobj)
if self.type_ == "nucleon":
Nucleon.save_to_file(dictobj, self.path)
if self.type_ == "electron":
Electron.save_to_file(dictobj, self.path)
def get_full_content(self):
if self.type_ == "nucleon":
text = ""
for i in self.datalist:
text += i.content
return text
return ""
def get_len(self):
return len(self.datalist)
class Atom():
@staticmethod
def placeholder():
return (Electron.placeholder(), Nucleon.placeholder())

View File

@@ -1,5 +0,0 @@
"""Android Termux 无法编译 playsound 库的替代方案,
需要安装 playaudio, 并重命名此文件至 playsound.py"""
import os
def playsound(path):
os.system(f"play-audio '{path}'")

11
pyproject.toml Normal file
View File

@@ -0,0 +1,11 @@
[build-system]
requires = ["setuptools>=45", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "heurams"
version = "0.4.0"
description = "Heuristic Assisted Memory Scheduler"
[tool.setuptools.packages.find]
where = ["src"]

View File

@@ -1,113 +0,0 @@
import typing
import particles as pt
import pathlib
import auxiliary as aux
class Parser():
"""轻量级版本文件解析器, 用于文件管理器的记忆状态解析"""
class Reactor():
"""反应堆对象, 用于全面解析文件, 并处理和分配一次文件记忆流程的资源与策略"""
def __init__(self, nucleon_file: pt.AtomicFile, electron_file: pt.AtomicFile, tasked_num):
# 导入原子对象
self.reported = set()
self.nucleon_file = nucleon_file
self.electron_file = electron_file
self.tasked_num = tasked_num
self.atoms_new = list()
self.atoms_review = list()
counter = self.tasked_num
self.electron_dict = {elect.content: elect for elect in electron_file.datalist}
def electron_dict_get_fallback(key) -> pt.Electron:
value = self.electron_dict.get(key)
# 如果值不存在,则设置默认值
if value is None:
value = pt.Electron(key, {}) # 获取默认值
self.electron_dict[key] = value # 将默认值存入字典
value = self.electron_dict[key]
return value # 返回获取的值(可能是默认值)
for nucleon in nucleon_file.datalist:
atom = (electron_dict_get_fallback(nucleon.content), nucleon)
if atom[0].is_activated == 0:
if counter > 0:
atom[0].is_activated = 1
self.atoms_new.append(atom)
counter -= 1
else:
if atom[0].next_date <= aux.get_daystamp():
atom[0].last_date = aux.get_daystamp()
self.atoms_review.append(atom)
# 设置运行时
self.index: int
self.procession: list
self.failed: list
self.round_title: str
self.reported: set
self.current_atom: typing.Tuple[pt.Electron, pt.Nucleon]
self.round_set = 0
self.current_atom = pt.Atom.placeholder()
def set_round(self, title, procession):
self.round_set = 1
self.round_title = title
self.procession = procession
self.failed = list()
self.index = -1
def set_round_templated(self, stage):
titles = {
1: "复习模式",
2: "新记忆模式",
3: "总复习模式"
}
processions = {
1: self.atoms_review,
2: self.atoms_new,
3: list(set(self.atoms_new + self.atoms_review))
}
ret = 1
if stage == 1 and len(processions[1]) == 0:
stage = 2
ret = 2
self.set_round(title=titles[stage], procession=processions[stage])
return ret
def forward(self, step = 1):
if self.index + step >= len(self.procession):
if len(self.failed) > 0:
self.procession = self.failed
self.index = -1
self.forward(step)
if "- 额外复习" not in self.round_title:
self.round_title += " - 额外复习"
self.failed = list()
return 1 # 自动重定向到 failed
else:
self.round_set = 0
return -1 # 此轮已完成
self.index += step
self.current_atom = self.procession[self.index]
return 0
def save(self):
print("Progress saved")
# self.nucleon_file.save()
temp = list()
for i in self.electron_dict.keys():
temp.append(self.electron_dict[i])
self.electron_file.datalist = temp
self.electron_file.save()
def report(self, atom, quality):
if atom[0] not in self.reported:
atom[0].revisor(quality)
self.reported.add(atom[0])
if quality <= 3:
self.failed.append(atom)
return 1
else:
return 0

View File

@@ -1,3 +0,0 @@
from webshare import server
server = server.Server("python3 main.py", title="辅助记忆程序", host="0.0.0.0")
server.serve()

24
src/heurams/__main__.py Normal file
View File

@@ -0,0 +1,24 @@
#!/usr/bin/env python3
from textual.app import App
import screens
import os
class AppLauncher(App):
CSS_PATH = "styles.css"
TITLE = "潜进 - 辅助记忆调度器"
BINDINGS = [("escape", "quit", "退出"), ("d", "toggle_dark", "改变色调")]
SCREENS = {
"dashboard": screens.DashboardScreen,
}
def on_mount(self) -> None:
self.push_screen("dashboard")
if __name__ == "__main__":
script_dir = os.path.dirname(os.path.abspath(__file__))
os.chdir(script_dir)
os.makedirs("electron", exist_ok=True)
os.makedirs("nucleon", exist_ok=True)
os.makedirs("cache/voice", exist_ok=True)
app = AppLauncher()
app.run()

31
src/heurams/context.py Normal file
View File

@@ -0,0 +1,31 @@
"""
全局上下文管理模块
"""
from contextvars import ContextVar
from typing import Optional
from heurams.services.config import ConfigFile
config_var: ContextVar[ConfigFile] = ContextVar('config_var', default=ConfigFile("")) # 配置文件
runtime_var: ContextVar = ContextVar('runtime_var', default=None) # 运行时共享数据
class ConfigContext:
"""
功能完备的上下文管理器
用于临时切换配置的作用域, 支持嵌套使用
Example:
>>> with ConfigContext(test_config):
... get_daystamp() # 使用 test_config
>>> get_daystamp() # 恢复原配置
"""
def __init__(self, config_provider: ConfigFile):
self.config_provider = config_provider
self._token = None
def __enter__(self):
self._token = config_var.set(self.config_provider)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
config_var.reset(self._token) # type: ignore

View File

@@ -0,0 +1,140 @@
class MemScreen(Screen):
BINDINGS = [
("d", "toggle_dark", "改变色调"),
("q", "pop_screen", "返回主菜单"),
("v", "play_voice", "朗读"),
# ("p", "precache_current", "预缓存当前单元集"), # 新增预缓存快捷键
]
if config.get("quick_pass"):
BINDINGS.append(("k", "quick_pass", "快速通过[调试]"))
btn = dict()
def __init__(
self,
nucleon_file: pt.NucleonUnion,
electron_file: pt.ElectronUnion,
tasked_num,
):
super().__init__(name=None, id=None, classes=None)
self.nucleon_file = nucleon_file
self.electron_file = electron_file
self.reactor = Reactor(nucleon_file, electron_file, self, tasked_num)
self.stage = 1
self.stage += self.reactor.set_round_templated(self.stage)
first_forward = self.reactor.forward()
print(first_forward)
if first_forward == -1:
self.stage = 3
self.reactor.set_round_templated(3)
print(self.reactor.forward())
#self._forward_judge(first_forward)
self.compo = next(self.reactor.current_appar)
self.feedback_state = 0 # 默认状态
self.feedback_state_map = {
0: "",
255: "回答有误, 请重试. 或者重新学习此单元",
}
def compose(self) -> ComposeResult:
if type(self.compo).__name__ == "Recognition":
self.action_play_voice()
yield Header(show_clock=True)
with Center():
yield Static(
f"当前进度: {len(self.reactor.procession) - self.reactor.index}/{len(self.reactor.procession)}"
)
yield Label(self.feedback_state_map[self.feedback_state])
yield from self.compo.compose()
if self.feedback_state == 255:
yield Button("重新学习此单元", id="re-recognize", variant="warning")
yield Footer()
def on_mount(self):
pass
def on_button_pressed(self, event):
try:
if event.button.id == "re-recognize":
return
except:
pass
ret = self.compo.handler(event, "button")
self._forward_judge(ret)
def _forward_judge(self, ret):
self.feedback_state = 0
if ret == -1:
return
if ret == 0:
try:
self.compo = next(self.reactor.current_appar)
self.refresh_ui()
except StopIteration:
nxt = self.reactor.forward(1)
try:
self.compo = next(self.reactor.current_appar)
except:
pass
if nxt == -1:
if self.reactor.round_set == 0:
if self.stage == 4:
if config.get("save"):
self.reactor.save()
self.compo = compo.Finished(
self, None, pt.Atom.placeholder()
)
self.refresh_ui()
else:
self.reactor.set_round_templated(self.stage)
self.reactor.forward(1)
self.stage += 1
self.compo = next(self.reactor.current_appar)
self.refresh_ui()
return
return
else:
self.refresh_ui()
return
if ret >= 1:
if ret == 2:
self.feedback_state = 255 # 表示错误
else:
self.feedback_state = 0
self.refresh_ui()
return
def refresh_ui(self):
self.call_later(self.recompose)
print(type(self.compo).__name__)
def action_play_voice(self):
def play():
cache_dir = pathlib.Path(f"./cache/voice/")
cache_dir.mkdir(parents=True, exist_ok=True)
cache = cache_dir / f"{aux.get_md5(self.reactor.current_atom[1].content.replace('/',''))}.wav"
if not cache.exists():
import edge_tts as tts
communicate = tts.Communicate(
self.reactor.current_atom[1].content.replace("/", ""),
"zh-CN-XiaoxiaoNeural",
)
communicate.save_sync(
f"./cache/voice/{aux.get_md5(self.reactor.current_atom[1].content.replace('/',''))}.wav"
)
playsound(str(cache))
threading.Thread(target=play).start()
def action_precache_current(self):
"""预缓存当前单元集的音频"""
precache_screen = PrecachingScreen(self.nucleon_file)
self.app.push_screen(precache_screen)
def action_quick_pass(self):
self.reactor.report(self.reactor.current_atom, 5)
self._forward_judge(0)
def action_toggle_dark(self):
self.app.action_toggle_dark()
def action_pop_screen(self):
self.app.pop_screen()

View File

@@ -0,0 +1,185 @@
class PrecachingScreen(Screen):
"""预缓存音频文件屏幕"""
BINDINGS = [("q", "go_back", "返回"), ("escape", "quit_app", "退出")]
def __init__(self, nucleon_file = None):
super().__init__(name=None, id=None, classes=None)
self.nucleon_file = nucleon_file
self.is_precaching = False
self.current_file = ""
self.current_item = ""
self.progress = 0
self.total = 0
self.processed = 0
self.precache_worker = None
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Container(id="precache_container"):
yield Label("[b]音频预缓存[/b]", classes="title-label")
if self.nucleon_file:
yield Static(f"目标单元集: [b]{self.nucleon_file.name}[/b]", classes="target-info")
yield Static(f"单元数量: {len(self.nucleon_file.nucleons)}", classes="target-info")
else:
yield Static("目标: 所有单元集", classes="target-info")
yield Static(id="status", classes="status-info")
yield Static(id="current_item", classes="current-item")
yield ProgressBar(total=100, show_eta=False, id="progress_bar")
with Horizontal(classes="button-group"):
if not self.is_precaching:
yield Button("开始预缓存", id="start_precache", variant="primary")
else:
yield Button("取消预缓存", id="cancel_precache", variant="error")
yield Button("清空缓存", id="clear_cache", variant="warning")
yield Button("返回", id="go_back", variant="default")
yield Footer()
def on_mount(self):
"""挂载时初始化状态"""
self.update_status("就绪", "等待开始...")
def update_status(self, status, current_item="", progress=None):
"""更新状态显示"""
status_widget = self.query_one("#status", Static)
item_widget = self.query_one("#current_item", Static)
progress_bar = self.query_one("#progress_bar", ProgressBar)
status_widget.update(f"状态: {status}")
item_widget.update(f"当前项目: {current_item}" if current_item else "")
if progress is not None:
progress_bar.progress = progress
progress_bar.advance(0) # 刷新显示
def precache_single_text(self, text: str):
"""预缓存单个文本的音频"""
cache_dir = pathlib.Path("./cache/voice/")
cache_dir.mkdir(parents=True, exist_ok=True)
cache = cache_dir / f"{aux.get_md5(text)}.wav"
if not cache.exists():
try:
import edge_tts as tts
communicate = tts.Communicate(text, "zh-CN-XiaoxiaoNeural")
communicate.save_sync(str(cache))
return True
except Exception as e:
print(f"预缓存失败 '{text}': {e}")
return False
return True
def precache_file(self, nucleon_union: pt.NucleonUnion):
"""预缓存单个文件的所有内容"""
self.current_file = nucleon_union.name
total_items = len(nucleon_union.nucleons)
for idx, nucleon in enumerate(nucleon_union.nucleons):
# 检查是否被取消
worker = get_current_worker()
if worker and worker.is_cancelled:
return False
text = nucleon['content'].replace('/', '')
self.current_item = text[:50] + "..." if len(text) > 50 else text
self.processed += 1
# 更新进度
progress = int((self.processed / self.total) * 100) if self.total > 0 else 0
self.update_status(
f"处理中: {nucleon_union.name} ({idx+1}/{total_items})",
self.current_item,
progress
)
# 预缓存音频
success = self.precache_single_text(text)
if not success:
self.update_status("错误", f"处理失败: {self.current_item}")
time.sleep(1) # 短暂暂停以便用户看到错误信息
return True
def precache_all_files(self):
"""预缓存所有文件"""
nucleon_path = pathlib.Path("./nucleon")
nucleon_files = [f for f in nucleon_path.iterdir() if f.suffix == ".toml"]
# 计算总项目数
self.total = 0
for file in nucleon_files:
try:
nu = pt.NucleonUnion(file)
self.total += len(nu.nucleons)
except:
continue
self.processed = 0
self.is_precaching = True
for file in nucleon_files:
try:
nu = pt.NucleonUnion(file)
if not self.precache_file(nu):
break # 用户取消
except Exception as e:
print(f"处理文件失败 {file}: {e}")
continue
self.is_precaching = False
self.update_status("完成", "所有音频文件已预缓存", 100)
def precache_single_file(self):
"""预缓存单个文件"""
if not self.nucleon_file:
return
self.total = len(self.nucleon_file.nucleons)
self.processed = 0
self.is_precaching = True
success = self.precache_file(self.nucleon_file)
self.is_precaching = False
if success:
self.update_status("完成", f"'{self.nucleon_file.name}' 音频文件已预缓存", 100)
else:
self.update_status("已取消", "预缓存操作被用户取消", 0)
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "start_precache" and not self.is_precaching:
# 开始预缓存
if self.nucleon_file:
self.precache_worker = self.run_worker(self.precache_single_file, thread=True)
else:
self.precache_worker = self.run_worker(self.precache_all_files, thread=True)
elif event.button.id == "cancel_precache" and self.is_precaching:
# 取消预缓存
if self.precache_worker:
self.precache_worker.cancel()
self.is_precaching = False
self.update_status("已取消", "预缓存操作被用户取消", 0)
elif event.button.id == "clear_cache":
# 清空缓存
try:
shutil.rmtree("./cache/voice", ignore_errors=True)
self.update_status("已清空", "音频缓存已清空", 0)
except Exception as e:
self.update_status("错误", f"清空缓存失败: {e}")
elif event.button.id == "go_back":
self.action_go_back()
def action_go_back(self):
if self.is_precaching and self.precache_worker:
self.precache_worker.cancel()
self.app.pop_screen()
def action_quit_app(self):
if self.is_precaching and self.precache_worker:
self.precache_worker.cancel()
self.app.exit()

View File

@@ -0,0 +1,278 @@
#!/usr/bin/env python3
import pathlib
import toml
import time
import heurams.services.timer as timer
from typing import List
class Electron:
"""电子: 记忆分析元数据及算法"""
algorithm = "SM-2" # 暂时使用 SM-2 算法进行记忆拟合, 考虑 SM-15 替代
def __init__(self, content: str, metadata: dict):
self.content = content
self.metadata = metadata
if metadata == {}:
# print("NULL")
self._default_init()
def _default_init(self):
defaults = {
'efactor': 2.5, # 易度系数, 越大越简单, 最大为5
'real_rept': 0, # (实际)重复次数
'rept': 0, # (有效)重复次数
'interval': 0, # 最佳间隔
'last_date': 0, # 上一次复习的时间戳
'next_date': 0, # 将要复习的时间戳
'is_activated': 0, # 激活状态
# *NOTE: 此处"时间戳"是以天为单位的整数, 即 UNIX 时间戳除以一天的秒数取整
'last_modify': time.time() # 最后修改时间戳(此处是UNIX时间戳)
}
self.metadata = defaults
def activate(self):
self.metadata['is_activated'] = 1
self.metadata['last_modify'] = time.time()
def modify(self, var: str, value):
if var in self.metadata:
self.metadata[var] = value
self.metadata['last_modify'] = time.time()
else:
print(f"警告: '{var}' 非已知元数据字段")
def revisor(self, quality: int = 5, is_new_activation: bool = False):
"""SM-2 算法迭代决策机制实现
根据 quality(0 ~ 5) 进行参数迭代最佳间隔
quality 由主程序评估
Args:
quality (int): 记忆保留率量化参数
"""
print(f"REVISOR: {quality}, {is_new_activation}")
if quality == -1:
return -1
self.metadata['efactor'] = self.metadata['efactor'] + (
0.1 - (5 - quality) * (0.08 + (5 - quality) * 0.02)
)
self.metadata['efactor'] = max(1.3, self.metadata['efactor'])
if quality < 3:
# 若保留率低于 3重置重复次数
self.metadata['rept'] = 0
self.metadata['interval'] = 0 # 设为0以便下面重新计算 I(1)
else:
self.metadata['rept'] += 1
self.metadata['real_rept'] += 1
if is_new_activation: # 初次激活
self.metadata['rept'] = 0
self.metadata['efactor'] = 2.5
if self.metadata['rept'] == 0: # 刚被重置或初次激活后复习
self.metadata['interval'] = 1 # I(1)
elif self.metadata['rept'] == 1:
self.metadata['interval'] = 6 # I(2) 经验公式
else:
self.metadata['interval'] = round(
self.metadata['interval'] * self.metadata['efactor']
)
self.metadata['last_date'] = timer.get_daystamp()
self.metadata['next_date'] = timer.get_daystamp() + self.metadata['interval']
self.metadata['last_modify'] = time.time()
def __str__(self):
return (
f"记忆单元预览 \n"
f"内容: '{self.content}' \n"
f"易度系数: {self.metadata['efactor']:.2f} \n"
f"已经重复的次数: {self.metadata['rept']} \n"
f"下次间隔: {self.metadata['interval']}\n"
f"下次复习日期时间戳: {self.metadata['next_date']}"
)
def __eq__(self, other):
if self.content == other.content:
return True
return False
def __hash__(self):
return hash(self.content)
def __getitem__(self, key):
if key == "content":
return self.content
if key in self.metadata:
return self.metadata[key]
else:
raise KeyError(f"Key '{key}' not found in metadata.")
def __setitem__(self, key, value):
if key == "content":
raise AttributeError("content 应为只读")
self.metadata[key] = value
self.metadata['last_modify'] = time.time()
def __iter__(self):
yield from self.metadata.keys()
def __len__(self):
return len(self.metadata)
@staticmethod
def placeholder():
return Electron("电子对象样例内容", {})
class Nucleon:
"""核子: 材料元数据"""
def __init__(self, content: str, data: dict):
self.metadata = data
self.content = content
def __getitem__(self, key):
if key == "content":
return self.content
if key in self.metadata:
return self.metadata[key]
else:
raise KeyError(f"Key '{key}' not found in metadata.")
def __iter__(self):
yield from self.metadata.keys()
def __len__(self):
return len(self.metadata)
def __hash__(self):
return hash(self.content)
@staticmethod
def placeholder():
return Nucleon("核子对象样例内容", {})
class NucleonUnion():
"""
替代原有 NucleonFile 类, 支持复杂逻辑
Attributes:
path (Path): 对应于 NucleonUnion 实例的文件路径。
name (str): 核联对象的显示名称,从文件名中派生。
nucleons (list): 内部核子对象的列表。
nucleons_dict (dict): 内部核子对象的字典,以核子内容作为键。
keydata (dict): 核子对象字典键名的翻译。
testdata (dict): 记忆测试项目的元数据。
Parameters:
path (Path): 包含核子数据的文件路径。
"""
def __init__(self, path: pathlib.Path):
self.path = path
self.name = path.name.replace(path.suffix, "")
with open(path, 'r') as f:
all = toml.load(f)
lst = list()
for i in all.keys():
if "attr" in i:
continue
if "data" in i:
continue
lst.append(Nucleon(i, all[i]))
self.keydata = all["keydata"]
self.testdata = all["testdata"]
self.nucleons: List[Nucleon] = lst
self.nucleons_dict = {i.content: i for i in lst}
def __len__(self):
return len(self.nucleons)
def linked_electron_union(self):
if (self.path.parent / '..' / 'electron' / self.path.name).exists():
return ElectronUnion(self.path.parent / '..' / 'electron' / self.path.name)
else:
return 0
def save(self):
with open(self.path, 'w') as f:
tmp = {i.content: i.metadata for i in self.nucleons}
toml.dump(tmp, f)
class ElectronUnion:
"""取代原有 ElectronFile 类, 以支持复杂逻辑"""
def __init__(self, path):
self.path = path
print(path)
self.name = path.name.replace(path.suffix, "")
with open(path, 'r') as f:
all = toml.load(f)
lst = list()
for i in all.keys():
if i != "total":
lst.append(Electron(i, all[i]))
self.total = all.get("total", {"last_date": 0})
self.electrons = lst
self.electrons_dict = {i.content: i for i in lst}
def sync(self):
"""同步 electrons_dict 中新增对到 electrons 中, 仅用于缺省初始化不存在映射时调用"""
self.electrons = self.electrons_dict.values()
def __len__(self):
return len(self.electrons)
def linked_nucleon_union(self):
return NucleonUnion(self.path.parent / '..' / 'nucleon' / self.path.name)
def save(self):
# print(1)
self.total["last_date"] = timer.get_daystamp()
with open(self.path, 'w') as f:
tmp = {i.content: i.metadata for i in self.electrons}
tmp["total"] = self.total
# print(tmp)
toml.dump(tmp, f)
class Atom:
@staticmethod
def placeholder():
return (Electron.placeholder(), Nucleon.placeholder(), {})
@staticmethod
def advanced_placeholder():
return (
Electron("两只黄鹤鸣翠柳", {}),
Nucleon(
"两只黄鹤鸣翠柳",
{
"note": [],
"translation": "臣子李密陈言:我因命运不好,小时候遭遇到了不幸",
"keyword_note": {
"险衅": "凶险祸患(这里指命运不好)",
"": "早时,这里指年幼的时候",
"": "'',指可忧患的事",
"": "不幸,指丧父"
}
}
),
{
"keydata": {
"note": "笔记",
"keyword_note": "关键词翻译",
"translation": "语句翻译"
},
"testdata": {
"additional_inf": ["translation", "note", "keyword_note"],
"fill_blank_test": ["translation"],
"draw_card_test": ["keyword_note"]
},
"is_new_activation": 0
}
)

View File

View File

View File

@@ -0,0 +1,126 @@
#!/usr/bin/env python3
import pathlib
import toml
import time
import heurams.services.timer as timer
class Electron:
"""电子: 记忆分析元数据及算法"""
algorithm = "SM-2" # 暂时使用 SM-2 算法进行记忆拟合, 考虑 SM-15 替代
def __init__(self, content: str, metadata: dict):
self.content = content
self.metadata = metadata
if metadata == {}:
# print("NULL")
self._default_init()
def _default_init(self):
defaults = {
'efactor': 2.5, # 易度系数, 越大越简单, 最大为5
'real_rept': 0, # (实际)重复次数
'rept': 0, # (有效)重复次数
'interval': 0, # 最佳间隔
'last_date': 0, # 上一次复习的时间戳
'next_date': 0, # 将要复习的时间戳
'is_activated': 0, # 激活状态
# *NOTE: 此处"时间戳"是以天为单位的整数, 即 UNIX 时间戳除以一天的秒数取整
'last_modify': time.time() # 最后修改时间戳(此处是UNIX时间戳)
}
self.metadata = defaults
def activate(self):
self.metadata['is_activated'] = 1
self.metadata['last_modify'] = time.time()
def modify(self, var: str, value):
if var in self.metadata:
self.metadata[var] = value
self.metadata['last_modify'] = time.time()
else:
print(f"警告: '{var}' 非已知元数据字段")
def revisor(self, quality: int = 5, is_new_activation: bool = False):
"""SM-2 算法迭代决策机制实现
根据 quality(0 ~ 5) 进行参数迭代最佳间隔
quality 由主程序评估
Args:
quality (int): 记忆保留率量化参数
"""
print(f"REVISOR: {quality}, {is_new_activation}")
if quality == -1:
return -1
self.metadata['efactor'] = self.metadata['efactor'] + (
0.1 - (5 - quality) * (0.08 + (5 - quality) * 0.02)
)
self.metadata['efactor'] = max(1.3, self.metadata['efactor'])
if quality < 3:
# 若保留率低于 3重置重复次数
self.metadata['rept'] = 0
self.metadata['interval'] = 0 # 设为0以便下面重新计算 I(1)
else:
self.metadata['rept'] += 1
self.metadata['real_rept'] += 1
if is_new_activation: # 初次激活
self.metadata['rept'] = 0
self.metadata['efactor'] = 2.5
if self.metadata['rept'] == 0: # 刚被重置或初次激活后复习
self.metadata['interval'] = 1 # I(1)
elif self.metadata['rept'] == 1:
self.metadata['interval'] = 6 # I(2) 经验公式
else:
self.metadata['interval'] = round(
self.metadata['interval'] * self.metadata['efactor']
)
self.metadata['last_date'] = timer.get_daystamp()
self.metadata['next_date'] = timer.get_daystamp() + self.metadata['interval']
self.metadata['last_modify'] = time.time()
def __str__(self):
return (
f"记忆单元预览 \n"
f"内容: '{self.content}' \n"
f"易度系数: {self.metadata['efactor']:.2f} \n"
f"已经重复的次数: {self.metadata['rept']} \n"
f"下次间隔: {self.metadata['interval']}\n"
f"下次复习日期时间戳: {self.metadata['next_date']}"
)
def __eq__(self, other):
if self.content == other.content:
return True
return False
def __hash__(self):
return hash(self.content)
def __getitem__(self, key):
if key == "content":
return self.content
if key in self.metadata:
return self.metadata[key]
else:
raise KeyError(f"Key '{key}' not found in metadata.")
def __setitem__(self, key, value):
if key == "content":
raise AttributeError("content 应为只读")
self.metadata[key] = value
self.metadata['last_modify'] = time.time()
def __iter__(self):
yield from self.metadata.keys()
def __len__(self):
return len(self.metadata)
@staticmethod
def placeholder():
return Electron("电子对象样例内容", {})

View File

View File

View File

View File

View File

View File

View File

View File

@@ -0,0 +1 @@
# 音频服务

View File

@@ -0,0 +1 @@
# 缓存服务

View File

@@ -0,0 +1,35 @@
# 配置文件服务
import pathlib
import toml
import typing
class ConfigFile:
def __init__(self, path: str):
self.path = pathlib.Path(path)
if not self.path.exists():
self.path.touch()
self.data = dict()
self._load()
def _load(self):
"""从文件加载配置数据"""
with open(self.path, 'r') as f:
try:
self.data = toml.load(f)
except toml.TomlDecodeError:
self.data = {}
def modify(self, key: str, value: typing.Any):
"""修改配置值并保存"""
self.data[key] = value
self.save()
def save(self, path: typing.Union[str, pathlib.Path] = ""):
"""保存配置到文件"""
save_path = pathlib.Path(path) if path else self.path
with open(save_path, 'w') as f:
toml.dump(self.data, f)
def get(self, key: str, default: typing.Any = None) -> typing.Any:
"""获取配置值,如果不存在返回默认值"""
return self.data.get(key, default)

View File

@@ -0,0 +1,5 @@
# 哈希服务
import hashlib
def get_md5(text):
return hashlib.md5(text.encode('utf-8')).hexdigest()

View File

@@ -0,0 +1,20 @@
# 时间服务
from heurams.context import config_var
import time
def get_daystamp() -> int:
"""获取当前日戳(以天为单位的整数时间戳)"""
time_override = config_var.get().get("daystamp_override", -1)
if time_override != -1:
return int(time_override)
return int((time.time() + config_var.get().get("timezone_offset")) // (24 * 3600))
def get_timestamp() -> float:
"""获取 UNIX 时间戳"""
# 搞这个类的原因是要支持可复现操作
time_override = config_var.get().get("timestamp_override", -1)
if time_override != -1:
return float(time_override)
return time.time()

View File

@@ -0,0 +1 @@
# 文本转语音服务

View File

@@ -0,0 +1 @@
# 版本控制集成服务

View File

@@ -1,55 +0,0 @@
Screen {
align: center middle;
}
#main_container {
align: center middle;
width: 80%;
height: auto;
border: thick $primary-lighten-2;
padding: 2;
}
#sentence {
content-align: center middle;
width: 100%;
height: 5;
margin-bottom: 2;
text-style: bold;
}
#progress {
width: 100%;
content-align: center middle;
margin-bottom: 1;
color: $text-muted;
}
Button {
align-horizontal: center;
width: 50%;
margin: 0 0;
}
.choice {
align-horizontal: center;
width: 50%;
margin: 0 0;
height: auto;
}
/* no_margin.tcss */
#button_container {
align-horizontal: center;
height: 9;
}
/* 选中 #button_container 下所有的 Horizontal 子元素 */
#button_container > Horizontal {
margin-top: 0;
margin-bottom: 0;
align-horizontal: center;
width: 40%;
}

View File

@@ -1,325 +0,0 @@
"""
An encoding / decoding format suitable for serializing data structures to binary.
This is based on https://en.wikipedia.org/wiki/Bencode with some extensions.
The following data types may be encoded:
- None
- int
- bool
- bytes
- str
- list
- tuple
- dict
"""
from __future__ import annotations
from typing import Any, Callable
class DecodeError(Exception):
"""A problem decoding data."""
def dump(data: object) -> bytes:
"""Encodes a data structure in to bytes.
Args:
data: Data structure
Returns:
A byte string encoding the data.
"""
def encode_none(_datum: None) -> bytes:
"""
Encodes a None value.
Args:
datum: Always None.
Returns:
None encoded.
"""
return b"N"
def encode_bool(datum: bool) -> bytes:
"""
Encode a boolean value.
Args:
datum: The boolean value to encode.
Returns:
The encoded bytes.
"""
return b"T" if datum else b"F"
def encode_int(datum: int) -> bytes:
"""
Encode an integer value.
Args:
datum: The integer value to encode.
Returns:
The encoded bytes.
"""
return b"i%ie" % datum
def encode_bytes(datum: bytes) -> bytes:
"""
Encode a bytes value.
Args:
datum: The bytes value to encode.
Returns:
The encoded bytes.
"""
return b"%i:%s" % (len(datum), datum)
def encode_string(datum: str) -> bytes:
"""
Encode a string value.
Args:
datum: The string value to encode.
Returns:
The encoded bytes.
"""
encoded_data = datum.encode("utf-8")
return b"s%i:%s" % (len(encoded_data), encoded_data)
def encode_list(datum: list) -> bytes:
"""
Encode a list value.
Args:
datum: The list value to encode.
Returns:
The encoded bytes.
"""
return b"l%se" % b"".join(encode(element) for element in datum)
def encode_tuple(datum: tuple) -> bytes:
"""
Encode a tuple value.
Args:
datum: The tuple value to encode.
Returns:
The encoded bytes.
"""
return b"t%se" % b"".join(encode(element) for element in datum)
def encode_dict(datum: dict) -> bytes:
"""
Encode a dictionary value.
Args:
datum: The dictionary value to encode.
Returns:
The encoded bytes.
"""
return b"d%se" % b"".join(
b"%s%s" % (encode(key), encode(value)) for key, value in datum.items()
)
ENCODERS: dict[type, Callable[[Any], Any]] = {
type(None): encode_none,
bool: encode_bool,
int: encode_int,
bytes: encode_bytes,
str: encode_string,
list: encode_list,
tuple: encode_tuple,
dict: encode_dict,
}
def encode(datum: object) -> bytes:
"""Recursively encode data.
Args:
datum: Data suitable for encoding.
Raises:
TypeError: If `datum` is not one of the supported types.
Returns:
Encoded data bytes.
"""
try:
decoder = ENCODERS[type(datum)]
except KeyError:
raise TypeError("Can't encode {datum!r}") from None
return decoder(datum)
return encode(data)
def load(encoded: bytes) -> object:
"""Load an encoded data structure from bytes.
Args:
encoded: Encoded data in bytes.
Raises:
DecodeError: If an error was encountered decoding the string.
Returns:
Decoded data.
"""
if not isinstance(encoded, bytes):
raise TypeError("must be bytes")
max_position = len(encoded)
position = 0
def get_byte() -> bytes:
"""Get an encoded byte and advance position.
Raises:
DecodeError: If the end of the data was reached
Returns:
A bytes object with a single byte.
"""
nonlocal position
if position >= max_position:
raise DecodeError("More data expected")
character = encoded[position : position + 1]
position += 1
return character
def peek_byte() -> bytes:
"""Get the byte at the current position, but don't advance position.
Returns:
A bytes object with a single byte.
"""
return encoded[position : position + 1]
def get_bytes(size: int) -> bytes:
"""Get a number of bytes of encode data.
Args:
size: Number of bytes to retrieve.
Raises:
DecodeError: If there aren't enough bytes.
Returns:
A bytes object.
"""
nonlocal position
bytes_data = encoded[position : position + size]
if len(bytes_data) != size:
raise DecodeError(b"Missing bytes in {bytes_data!r}")
position += size
return bytes_data
def decode_int() -> int:
"""Decode an int from the encoded data.
Returns:
An integer.
"""
int_bytes = b""
while (byte := get_byte()) != b"e":
int_bytes += byte
return int(int_bytes)
def decode_bytes(size_bytes: bytes) -> bytes:
"""Decode a bytes string from the encoded data.
Returns:
A bytes object.
"""
while (byte := get_byte()) != b":":
size_bytes += byte
bytes_string = get_bytes(int(size_bytes))
return bytes_string
def decode_string() -> str:
"""Decode a (utf-8 encoded) string from the encoded data.
Returns:
A string.
"""
size_bytes = b""
while (byte := get_byte()) != b":":
size_bytes += byte
bytes_string = get_bytes(int(size_bytes))
decoded_string = bytes_string.decode("utf-8", errors="replace")
return decoded_string
def decode_list() -> list[object]:
"""Decode a list.
Returns:
A list of data.
"""
elements: list[object] = []
add_element = elements.append
while peek_byte() != b"e":
add_element(decode())
get_byte()
return elements
def decode_tuple() -> tuple[object, ...]:
"""Decode a tuple.
Returns:
A tuple of decoded data.
"""
elements: list[object] = []
add_element = elements.append
while peek_byte() != b"e":
add_element(decode())
get_byte()
return tuple(elements)
def decode_dict() -> dict[object, object]:
"""Decode a dict.
Returns:
A dict of decoded data.
"""
elements: dict[object, object] = {}
add_element = elements.__setitem__
while peek_byte() != b"e":
add_element(decode(), decode())
get_byte()
return elements
DECODERS = {
b"i": decode_int,
b"s": decode_string,
b"l": decode_list,
b"t": decode_tuple,
b"d": decode_dict,
b"T": lambda: True,
b"F": lambda: False,
b"N": lambda: None,
}
def decode() -> object:
"""Recursively decode data.
Returns:
Decoded data.
"""
decoder = DECODERS.get(initial := get_byte(), None)
if decoder is None:
return decode_bytes(initial)
return decoder()
return decode()

View File

@@ -1,349 +0,0 @@
from __future__ import annotations
from pathlib import Path
import asyncio
import io
import json
import os
from typing import Awaitable, Callable, Literal
from asyncio.subprocess import Process
import logging
from importlib.metadata import version
import uuid
from webshare.download_manager import DownloadManager
from webshare._binary_encode import load as binary_load
log = logging.getLogger("textual-serve")
class AppService:
"""Creates and manages a single Textual app subprocess.
When a user connects to the websocket in their browser, a new AppService
instance is created to manage the corresponding Textual app process.
"""
def __init__(
self,
command: str,
*,
write_bytes: Callable[[bytes], Awaitable[None]],
write_str: Callable[[str], Awaitable[None]],
close: Callable[[], Awaitable[None]],
download_manager: DownloadManager,
debug: bool = False,
) -> None:
self.app_service_id: str = uuid.uuid4().hex
"""The unique ID of this running app service."""
self.command = command
"""The command to launch the Textual app subprocess."""
self.remote_write_bytes = write_bytes
"""Write bytes to the client browser websocket."""
self.remote_write_str = write_str
"""Write string to the client browser websocket."""
self.remote_close = close
"""Close the client browser websocket."""
self.debug = debug
"""Enable/disable debug mode."""
self._process: Process | None = None
self._task: asyncio.Task[None] | None = None
self._stdin: asyncio.StreamWriter | None = None
self._exit_event = asyncio.Event()
self._download_manager = download_manager
@property
def stdin(self) -> asyncio.StreamWriter:
"""The processes standard input stream."""
assert self._stdin is not None
return self._stdin
def _build_environment(self, width: int = 80, height: int = 24) -> dict[str, str]:
"""Build an environment dict for the App subprocess.
Args:
width: Initial width.
height: Initial height.
Returns:
A environment dict.
"""
environment = dict(os.environ.copy())
environment["TEXTUAL_DRIVER"] = "textual.drivers.web_driver:WebDriver"
environment["TEXTUAL_FPS"] = "60"
environment["TEXTUAL_COLOR_SYSTEM"] = "truecolor"
environment["TERM_PROGRAM"] = "textual"
environment["TERM_PROGRAM_VERSION"] = version("textual-serve")
environment["COLUMNS"] = str(width)
environment["ROWS"] = str(height)
if self.debug:
environment["TEXTUAL"] = "debug,devtools"
environment["TEXTUAL_LOG"] = "textual.log"
return environment
async def _open_app_process(self, width: int = 80, height: int = 24) -> Process:
"""Open a process to run the app.
Args:
width: Width of the terminal.
height: height of the terminal.
"""
environment = self._build_environment(width=width, height=height)
self._process = process = await asyncio.create_subprocess_shell(
self.command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=environment,
)
assert process.stdin is not None
self._stdin = process.stdin
return process
@classmethod
def encode_packet(cls, packet_type: Literal[b"D", b"M"], payload: bytes) -> bytes:
"""Encode a packet.
Args:
packet_type: The packet type (b"D" for data or b"M" for meta)
payload: The payload.
Returns:
Data as bytes.
"""
return b"%s%s%s" % (packet_type, len(payload).to_bytes(4, "big"), payload)
async def send_bytes(self, data: bytes) -> bool:
"""Send bytes to process.
Args:
data: Data to send.
Returns:
True if the data was sent, otherwise False.
"""
stdin = self.stdin
try:
stdin.write(self.encode_packet(b"D", data))
except RuntimeError:
return False
try:
await stdin.drain()
except Exception:
return False
return True
async def send_meta(self, data: dict[str, str | None | int | bool]) -> bool:
"""Send meta information to process.
Args:
data: Meta dict to send.
Returns:
True if the data was sent, otherwise False.
"""
stdin = self.stdin
data_bytes = json.dumps(data).encode("utf-8")
try:
stdin.write(self.encode_packet(b"M", data_bytes))
except RuntimeError:
return False
try:
await stdin.drain()
except Exception:
return False
return True
async def set_terminal_size(self, width: int, height: int) -> None:
"""Tell the process about the new terminal size.
Args:
width: Width of terminal in cells.
height: Height of terminal in cells.
"""
await self.send_meta(
{
"type": "resize",
"width": width,
"height": height,
}
)
async def blur(self) -> None:
"""Send an (app) blur to the process."""
await self.send_meta({"type": "blur"})
async def focus(self) -> None:
"""Send an (app) focus to the process."""
await self.send_meta({"type": "focus"})
async def start(self, width: int, height: int) -> None:
await self._open_app_process(width, height)
self._task = asyncio.create_task(self.run())
async def stop(self) -> None:
"""Stop the process and wait for it to complete."""
if self._task is not None:
await self._download_manager.cancel_app_downloads(
app_service_id=self.app_service_id
)
await self.send_meta({"type": "quit"})
await self._task
self._task = None
async def run(self) -> None:
"""Run the Textual app process.
!!! note
Do not call this manually, use `start`.
"""
META = b"M"
DATA = b"D"
PACKED = b"P"
assert self._process is not None
process = self._process
stdout = process.stdout
stderr = process.stderr
assert stdout is not None
assert stderr is not None
stderr_data = io.BytesIO()
async def read_stderr() -> None:
"""Task to read stderr."""
try:
while True:
data = await stderr.read(1024 * 4)
if not data:
break
stderr_data.write(data)
except asyncio.CancelledError:
pass
stderr_task = asyncio.create_task(read_stderr())
try:
ready = False
# Wait for prelude text, so we know it is a Textual app
for _ in range(10):
if not (line := await stdout.readline()):
break
if line == b"__GANGLION__\n":
ready = True
break
if not ready:
log.error("Application failed to start")
if error_text := stderr_data.getvalue():
import sys
sys.stdout.write(error_text.decode("utf-8", "replace"))
readexactly = stdout.readexactly
int_from_bytes = int.from_bytes
while True:
type_bytes = await readexactly(1)
size_bytes = await readexactly(4)
size = int_from_bytes(size_bytes, "big")
payload = await readexactly(size)
if type_bytes == DATA:
await self.on_data(payload)
elif type_bytes == META:
await self.on_meta(payload)
elif type_bytes == PACKED:
await self.on_packed(payload)
except asyncio.IncompleteReadError:
pass
except ConnectionResetError:
pass
except asyncio.CancelledError:
pass
finally:
stderr_task.cancel()
await stderr_task
if error_text := stderr_data.getvalue():
import sys
sys.stdout.write(error_text.decode("utf-8", "replace"))
async def on_data(self, payload: bytes) -> None:
"""Called when there is data.
Args:
payload: Data received from process.
"""
await self.remote_write_bytes(payload)
async def on_meta(self, data: bytes) -> None:
"""Called when there is a meta packet sent from the running app process.
Args:
data: Encoded meta data.
"""
meta_data: dict[str, object] = json.loads(data)
meta_type = meta_data["type"]
if meta_type == "exit":
await self.remote_close()
elif meta_type == "open_url":
payload = json.dumps(
[
"open_url",
{
"url": meta_data["url"],
"new_tab": meta_data["new_tab"],
},
]
)
await self.remote_write_str(payload)
elif meta_type == "deliver_file_start":
log.debug("deliver_file_start, %s", meta_data)
try:
# Record this delivery key as available for download.
delivery_key = str(meta_data["key"])
await self._download_manager.create_download(
app_service=self,
delivery_key=delivery_key,
file_name=Path(meta_data["path"]).name,
open_method=meta_data["open_method"],
mime_type=meta_data["mime_type"],
encoding=meta_data["encoding"],
name=meta_data.get("name", None),
)
except KeyError:
log.error("Missing key in `deliver_file_start` meta packet")
return
else:
# Tell the browser front-end about the new delivery key,
# so that it may hit the "/download/{key}" endpoint
# to start the download.
json_string = json.dumps(["deliver_file_start", delivery_key])
await self.remote_write_str(json_string)
else:
log.warning(
f"Unknown meta type: {meta_type!r}. You may need to update `textual-serve`."
)
async def on_packed(self, payload: bytes) -> None:
"""Called when there is a packed packet sent from the running app process.
Args:
payload: Encoded packed data.
"""
unpacked = binary_load(payload)
if unpacked[0] == "deliver_chunk":
# If we receive a chunk, hand it to the download manager to
# handle distribution to the browser.
_, delivery_key, chunk = unpacked
await self._download_manager.chunk_received(delivery_key, chunk)

View File

@@ -1,197 +0,0 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
import logging
from typing import AsyncGenerator, TYPE_CHECKING
if TYPE_CHECKING:
from webshare.app_service import AppService
log = logging.getLogger("textual-serve")
DOWNLOAD_TIMEOUT = 4
DOWNLOAD_CHUNK_SIZE = 1024 * 64 # 64 KB
@dataclass
class Download:
app_service: "AppService"
"""The app service that the download is associated with."""
delivery_key: str
"""Key which identifies the download."""
file_name: str
"""The name of the file to download. This will be used to set
the Content-Disposition filename."""
open_method: str
"""The method to open the file with. "browser" or "download"."""
mime_type: str
"""The mime type of the content."""
encoding: str | None = None
"""The encoding of the content.
Will be None if the content is binary.
"""
name: str | None = None
"""Optional name set bt the client."""
incoming_chunks: asyncio.Queue[bytes | None] = field(default_factory=asyncio.Queue)
"""A queue of incoming chunks for the download.
Chunks are sent from the app service to the download handler
via this queue."""
class DownloadManager:
"""Class which manages downloads for the server.
Serves as the link between the web server and app processes during downloads.
A single server has a single download manager, which manages all downloads for all
running app processes.
"""
def __init__(self) -> None:
self._active_downloads: dict[str, Download] = {}
"""A dictionary of active downloads.
When a delivery key is received in a meta packet, it is added to this set.
When the user hits the "/download/{key}" endpoint, we ensure the key is in
this set and start the download by requesting chunks from the app process.
When the download is complete, the app process sends a "deliver_file_end"
meta packet, and we remove the key from this set.
"""
async def create_download(
self,
*,
app_service: "AppService",
delivery_key: str,
file_name: str,
open_method: str,
mime_type: str,
encoding: str | None = None,
name: str | None = None,
) -> None:
"""Prepare for a new download.
Args:
app_service: The app service to start the download for.
delivery_key: The delivery key to start the download for.
file_name: The name of the file to download.
open_method: The method to open the file with.
mime_type: The mime type of the content.
encoding: The encoding of the content or None if the content is binary.
"""
self._active_downloads[delivery_key] = Download(
app_service,
delivery_key,
file_name,
open_method,
mime_type,
encoding,
name=name,
)
async def download(self, delivery_key: str) -> AsyncGenerator[bytes, None]:
"""Download a file from the given app service.
Args:
delivery_key: The delivery key to download.
"""
app_service = await self._get_app_service(delivery_key)
download = self._active_downloads[delivery_key]
incoming_chunks = download.incoming_chunks
while True:
# Request a chunk from the app service.
send_result = await app_service.send_meta(
{
"type": "deliver_chunk_request",
"key": delivery_key,
"size": DOWNLOAD_CHUNK_SIZE,
"name": download.name,
}
)
if not send_result:
log.warning(
"Download {delivery_key!r} failed to request chunk from app service"
)
del self._active_downloads[delivery_key]
break
try:
chunk = await asyncio.wait_for(incoming_chunks.get(), DOWNLOAD_TIMEOUT)
except asyncio.TimeoutError:
log.warning(
"Download %r failed to receive chunk from app service within %r seconds",
delivery_key,
DOWNLOAD_TIMEOUT,
)
chunk = None
if not chunk:
# Empty chunk - the app process has finished sending the file
# or the download has been cancelled.
incoming_chunks.task_done()
del self._active_downloads[delivery_key]
break
else:
incoming_chunks.task_done()
yield chunk
async def chunk_received(self, delivery_key: str, chunk: bytes | str) -> None:
"""Handle a chunk received from the app service for a download.
Args:
delivery_key: The delivery key that the chunk was received for.
chunk: The chunk that was received.
"""
download = self._active_downloads.get(delivery_key)
if not download:
# The download may have been cancelled - e.g. the websocket
# was closed before the download could complete.
log.debug("Chunk received for cancelled download %r", delivery_key)
return
if isinstance(chunk, str):
chunk = chunk.encode(download.encoding or "utf-8")
await download.incoming_chunks.put(chunk)
async def _get_app_service(self, delivery_key: str) -> "AppService":
"""Get the app service that the given delivery key is linked to.
Args:
delivery_key: The delivery key to get the app service for.
"""
for key in self._active_downloads.keys():
if key == delivery_key:
return self._active_downloads[key].app_service
else:
raise ValueError(f"No active download for delivery key {delivery_key!r}")
async def get_download_metadata(self, delivery_key: str) -> Download:
"""Get the metadata for a download.
Args:
delivery_key: The delivery key to get the metadata for.
"""
return self._active_downloads[delivery_key]
async def cancel_app_downloads(self, app_service_id: str) -> None:
"""Cancel all downloads for the given app service.
Args:
app_service_id: The app service ID to cancel downloads for.
"""
for download in self._active_downloads.values():
if download.app_service.app_service_id == app_service_id:
await download.incoming_chunks.put(None)

View File

@@ -1,350 +0,0 @@
from __future__ import annotations
import asyncio
import logging
import os
from pathlib import Path
import signal
import sys
from typing import Any
import aiohttp_jinja2
from aiohttp import web
from aiohttp import WSMsgType
from aiohttp.web_runner import GracefulExit
import jinja2
from importlib.metadata import version
from rich.console import Console
from rich.logging import RichHandler
from rich.highlighter import RegexHighlighter
from webshare.download_manager import DownloadManager
from .app_service import AppService
log = logging.getLogger("textual-serve")
LOGO = r"""[bold magenta]___ ____ _ _ ___ _ _ ____ _ ____ ____ ____ _ _ ____
| |___ \/ | | | |__| | __ [__ |___ |__/ | | |___
| |___ _/\_ | |__| | | |___ ___] |___ | \ \/ |___ [not bold]VVVVV
""".replace("VVVVV", f"v{version('textual-serve')}")
WINDOWS = sys.platform == "WINDOWS"
class LogHighlighter(RegexHighlighter):
base_style = "repr."
highlights = [
r"(?P<number>(?<!\w)\-?[0-9]+\.?[0-9]*(e[-+]?\d+?)?\b|0x[0-9a-fA-F]*)",
r"(?P<path>\[.*?\])",
r"(?<![\\\w])(?P<str>b?'''.*?(?<!\\)'''|b?'.*?(?<!\\)'|b?\"\"\".*?(?<!\\)\"\"\"|b?\".*?(?<!\\)\")",
]
def to_int(value: str, default: int) -> int:
"""Convert to an integer, or return a default if that's not possible.
Args:
number: A string possibly containing a decimal.
default: Default value if value can't be decoded.
Returns:
Integer.
"""
try:
return int(value)
except ValueError:
return default
class Server:
"""Serve a Textual app."""
def __init__(
self,
command: str,
host: str = "localhost",
port: int = 8000,
title: str | None = None,
public_url: str | None = None,
statics_path: str | os.PathLike = "./static",
templates_path: str | os.PathLike = "./templates",
):
"""
Args:
app_factory: A callable that returns a new App instance.
host: Host of web application.
port: Port for server.
statics_path: Path to statics folder. May be absolute or relative to server.py.
templates_path" Path to templates folder. May be absolute or relative to server.py.
"""
self.command = command
self.host = host
self.port = port
self.title = title or command
self.debug = False
if public_url is None:
if self.port == 80:
self.public_url = f"http://{self.host}"
elif self.port == 443:
self.public_url = f"https://{self.host}"
else:
self.public_url = f"http://{self.host}:{self.port}"
else:
self.public_url = public_url
base_path = (Path(__file__) / "../").resolve().absolute()
self.statics_path = base_path / statics_path
self.templates_path = base_path / templates_path
self.console = Console()
self.download_manager = DownloadManager()
def initialize_logging(self) -> None:
"""Initialize logging.
May be overridden in a subclass.
"""
FORMAT = "%(message)s"
logging.basicConfig(
level="DEBUG" if self.debug else "INFO",
format=FORMAT,
datefmt="[%X]",
handlers=[
RichHandler(
show_path=False,
show_time=False,
rich_tracebacks=True,
tracebacks_show_locals=True,
highlighter=LogHighlighter(),
console=self.console,
)
],
)
def request_exit(self) -> None:
"""Gracefully exit the app."""
raise GracefulExit()
async def _make_app(self) -> web.Application:
"""Make the aiohttp web.Application.
Returns:
New aiohttp web application.
"""
app = web.Application()
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader(self.templates_path))
ROUTES = [
web.get("/", self.handle_index, name="index"),
web.get("/ws", self.handle_websocket, name="websocket"),
web.get("/download/{key}", self.handle_download, name="download"),
web.static("/static", self.statics_path, show_index=True, name="static"),
]
app.add_routes(ROUTES)
app.on_startup.append(self.on_startup)
app.on_shutdown.append(self.on_shutdown)
return app
async def handle_download(self, request: web.Request) -> web.StreamResponse:
"""Handle a download request."""
key = request.match_info["key"]
try:
download_meta = await self.download_manager.get_download_metadata(key)
except KeyError:
raise web.HTTPNotFound(text=f"Download with key {key!r} not found")
response = web.StreamResponse()
mime_type = download_meta.mime_type
content_type = mime_type
if download_meta.encoding:
content_type += f"; charset={download_meta.encoding}"
response.headers["Content-Type"] = content_type
disposition = (
"inline" if download_meta.open_method == "browser" else "attachment"
)
response.headers["Content-Disposition"] = (
f"{disposition}; filename={download_meta.file_name}"
)
await response.prepare(request)
async for chunk in self.download_manager.download(key):
await response.write(chunk)
await response.write_eof()
return response
async def on_shutdown(self, app: web.Application) -> None:
"""Called on shutdown.
Args:
app: App instance.
"""
async def on_startup(self, app: web.Application) -> None:
"""Called on startup.
Args:
app: App instance.
"""
self.console.print(LOGO, highlight=False)
self.console.print(f"Serving {self.command!r} on {self.public_url}")
self.console.print("\n[cyan]Press Ctrl+C to quit")
def serve(self, debug: bool = False) -> None:
"""Serve the Textual application.
This will run a local webserver until it is closed with Ctrl+C
"""
self.debug = debug
self.initialize_logging()
loop = asyncio.get_event_loop()
try:
loop.add_signal_handler(signal.SIGINT, self.request_exit)
loop.add_signal_handler(signal.SIGTERM, self.request_exit)
except NotImplementedError:
pass
if self.debug:
log.info("Running in debug mode. You may use textual dev tools.")
web.run_app(
self._make_app(),
host=self.host,
port=self.port,
handle_signals=False,
loop=loop,
print=lambda *args: None,
)
@aiohttp_jinja2.template("app_index.html")
async def handle_index(self, request: web.Request) -> dict[str, Any]:
"""Serves the HTML for an app.
Args:
request: Request object.
Returns:
Template data.
"""
router = request.app.router
font_size = to_int(request.query.get("fontsize", "16"), 16)
def get_url(route: str, **args) -> str:
"""Get a URL from the aiohttp router."""
path = router[route].url_for(**args)
return f"{self.public_url}{path}"
def get_websocket_url(route: str, **args) -> str:
"""Get a URL with a websocket prefix."""
url = get_url(route, **args)
if self.public_url.startswith("https"):
return "wss:" + url.split(":", 1)[1]
else:
return "ws:" + url.split(":", 1)[1]
context = {
"font_size": font_size,
"app_websocket_url": get_websocket_url("websocket"),
}
context["config"] = {
"static": {
"url": get_url("static", filename="/").rstrip("/") + "/",
},
}
context["application"] = {
"name": self.title,
}
return context
async def _process_messages(
self, websocket: web.WebSocketResponse, app_service: AppService
) -> None:
"""Process messages from the client browser websocket.
Args:
websocket: Websocket instance.
app_service: App service.
"""
TEXT = WSMsgType.TEXT
async for message in websocket:
if message.type != TEXT:
continue
envelope = message.json()
assert isinstance(envelope, list)
type_ = envelope[0]
if type_ == "stdin":
data = envelope[1]
await app_service.send_bytes(data.encode("utf-8"))
elif type_ == "resize":
data = envelope[1]
await app_service.set_terminal_size(data["width"], data["height"])
elif type_ == "ping":
data = envelope[1]
await websocket.send_json(["pong", data])
elif type_ == "blur":
await app_service.blur()
elif type_ == "focus":
await app_service.focus()
async def handle_websocket(self, request: web.Request) -> web.WebSocketResponse:
"""Handle the websocket that drives the remote process.
This is called when the browser connects to the websocket.
Args:
request: Request object.
Returns:
Websocket response.
"""
websocket = web.WebSocketResponse(heartbeat=15)
width = to_int(request.query.get("width", "80"), 80)
height = to_int(request.query.get("height", "24"), 24)
app_service: AppService | None = None
try:
await websocket.prepare(request)
app_service = AppService(
self.command,
write_bytes=websocket.send_bytes,
write_str=websocket.send_str,
close=websocket.close,
download_manager=self.download_manager,
debug=self.debug,
)
await app_service.start(width, height)
try:
await self._process_messages(websocket, app_service)
finally:
await app_service.stop()
except asyncio.CancelledError:
await websocket.close()
except Exception as error:
log.exception(error)
finally:
if app_service is not None:
await app_service.stop()
return websocket

View File

@@ -1,191 +0,0 @@
/**
* Copyright (c) 2014 The xterm.js authors. All rights reserved.
* Copyright (c) 2012-2013, Christopher Jeffrey (MIT License)
* https://github.com/chjj/term.js
* @license MIT
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*
* Originally forked from (with the author's permission):
* Fabrice Bellard's javascript vt100 for jslinux:
* http://bellard.org/jslinux/
* Copyright (c) 2011 Fabrice Bellard
* The original design remains. The terminal itself
* has been extended to include xterm CSI codes, among
* other features.
*/
/**
* Default styles for xterm.js
*/
.xterm {
cursor: text;
position: relative;
user-select: none;
-ms-user-select: none;
-webkit-user-select: none;
}
.xterm.focus,
.xterm:focus {
outline: none;
}
.xterm .xterm-helpers {
position: absolute;
top: 0;
/**
* The z-index of the helpers must be higher than the canvases in order for
* IMEs to appear on top.
*/
z-index: 5;
}
.xterm .xterm-helper-textarea {
padding: 0;
border: 0;
margin: 0;
/* Move textarea out of the screen to the far left, so that the cursor is not visible */
position: absolute;
opacity: 0;
left: -9999em;
top: 0;
width: 0;
height: 0;
z-index: -5;
/** Prevent wrapping so the IME appears against the textarea at the correct position */
white-space: nowrap;
overflow: hidden;
resize: none;
}
.xterm .composition-view {
/* TODO: Composition position got messed up somewhere */
background: #000;
color: #FFF;
display: none;
position: absolute;
white-space: nowrap;
z-index: 1;
}
.xterm .composition-view.active {
display: block;
}
.xterm .xterm-viewport {
/* On OS X this is required in order for the scroll bar to appear fully opaque */
background-color: #000;
overflow-y: scroll;
cursor: default;
position: absolute;
right: 0;
left: 0;
top: 0;
bottom: 0;
}
.xterm .xterm-screen {
position: relative;
}
.xterm .xterm-screen canvas {
position: absolute;
left: 0;
top: 0;
}
.xterm .xterm-scroll-area {
visibility: hidden;
}
.xterm-char-measure-element {
display: inline-block;
visibility: hidden;
position: absolute;
top: 0;
left: -9999em;
line-height: normal;
}
.xterm.enable-mouse-events {
/* When mouse events are enabled (eg. tmux), revert to the standard pointer cursor */
cursor: default;
}
.xterm.xterm-cursor-pointer,
.xterm .xterm-cursor-pointer {
cursor: pointer;
}
.xterm.column-select.focus {
/* Column selection mode */
cursor: crosshair;
}
.xterm .xterm-accessibility,
.xterm .xterm-message {
position: absolute;
left: 0;
top: 0;
bottom: 0;
z-index: 10;
color: transparent;
}
.xterm .live-region {
position: absolute;
left: -9999px;
width: 1px;
height: 1px;
overflow: hidden;
}
.xterm-dim {
opacity: 0.5;
}
.xterm-underline-1 { text-decoration: underline; }
.xterm-underline-2 { text-decoration: double underline; }
.xterm-underline-3 { text-decoration: wavy underline; }
.xterm-underline-4 { text-decoration: dotted underline; }
.xterm-underline-5 { text-decoration: dashed underline; }
.xterm-strikethrough {
text-decoration: line-through;
}
.xterm-screen .xterm-decoration-container .xterm-decoration {
z-index: 6;
position: absolute;
}
.xterm-decoration-overview-ruler {
z-index: 7;
position: absolute;
top: 0;
right: 0;
pointer-events: none;
}
.xterm-decoration-top {
z-index: 2;
position: relative;
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 58 KiB

File diff suppressed because one or more lines are too long

View File

@@ -1,148 +0,0 @@
<!DOCTYPE html>
<html>
<head>
<link rel="stylesheet" href="{{ config.static.url }}css/xterm.css" />
<link rel="stylesheet" href="https://fonts.googleapis.com/css?family=Roboto%20Mono"/>
<script src="{{ config.static.url }}js/textual.js"></script>
<style>
body {
background: #000000;
}
.dialog-container {
font-family: "Roboto Mono", menlo, monospace;
position: absolute;
width: 100vw;
height: 100vh;
display: flex;
align-items: center;
justify-content: center;
z-index: 10;
}
.shade {
font-family: "Roboto Mono", menlo, monospace;
position: absolute;
width: 100%;
height: 100%;
}
.intro {
font-family: "Roboto Mono", menlo, monospace;
width: 640px;
height: 240px;
z-index: 20;
color: rgba(255, 255, 255, 0.95);
background-color: #000000;
display: flex;
align-items: center;
text-align: center;
justify-content: center;
}
body.-first-byte .intro-dialog,
body.-first-byte .intro-dialog .shade {
font-family: "Roboto Mono", menlo, monospace;
opacity: 0;
transition: opacity 0.3s ease-out;
display: none;
}
body .textual-terminal {
font-family: "Roboto Mono", menlo, monospace;
opacity: 0;
transition: opacity 0.3s ease-out;
}
body.-first-byte .textual-terminal {
font-family: "Roboto Mono", menlo, monospace;
opacity: 1;
transition: opacity 0.3s ease-out;
}
body Button {
font-family: "Roboto Mono", menlo, monospace;
background-color: #000000;
color: rgba(255, 255, 255, 0.95);
display: block;
}
.closed-dialog {
font-family: "Roboto Mono", menlo, monospace;
opacity: 0;
display: none;
}
body.-closed .closed-dialog {
font-family: "Roboto Mono", menlo, monospace;
opacity: 1;
display: flex;
}
#start {
font-family: "Roboto Mono", menlo, monospace;
display: none;
}
#start.-delay {
font-family: "Roboto Mono", menlo, monospace;
display: flex;
}
footer {
position: fixed;
bottom: 0;
width: 100%;
text-align: center;
color: rgba(255, 255, 255, 0.5);
font-size: 12px;
padding: 10px 0;
background-color: #000000;
z-index: 5;
}
</style>
<script>
function getStartUrl() {
const url = new URL(window.location.href);
const params = new URLSearchParams(url.search);
params.delete("delay");
return url.pathname + "?" + params.toString();
}
async function refresh() {
const ping_url = document.body.dataset.pingurl;
if (ping_url) {
await fetch(ping_url, {
method: "GET",
mode: "no-cors",
});
}
window.location.href = getStartUrl();
}
</script>
</head>
<body data-pingurl="{{ ping_url }}">
<div class="dialog-container intro-dialog">
<div class="shade"></div>
<div class="intro">
<div>正加载命令行应用实例...<br><br>
确保使用现代浏览器并启用 JavaScript<br>
终端模拟器基于 XTerm.js<br>应用程序框架: Textual <br>
&copy; Wang Zhiyu 2024-2025, 保留此实例的所有权</div>
<button type="button" onClick="refresh()" id="start">启动新实例</button>
</div>
</div>
<div class="dialog-container closed-dialog">
<div class="shade"></div>
<div class="intro">
<div class="message">实例程序已终止</div>
<button type="button" onClick="refresh()">重新打开实例</button>
</div>
</div>
<div
id="terminal"
class="textual-terminal"
data-session-websocket-url="{{ app_websocket_url }}"
data-font-size="{{ font_size }}"
></div>
</body>
</html>