48 Commits

Author SHA1 Message Date
aa99aa7686 上下文管理器与进一步移植 2025-10-13 00:28:15 +08:00
6dac9d5a7c 初次移植 2025-10-12 00:01:52 +08:00
f19fcac6c4 准备重构 2025-10-11 22:05:25 +08:00
ebeb62f72d 性能改进 2025-10-07 23:30:16 +08:00
d71d8eb1ec 更新 2025-10-07 23:23:17 +08:00
4ac12479cc 修改 2025-10-07 03:00:42 +08:00
7fc53e4113 算法与交互改进, 以及同步原型 2025-10-07 02:49:26 +08:00
06c62e284d 更新版本号 2025-10-03 13:41:41 +08:00
d5d31eb5fe 改进 2025-10-02 16:49:27 +08:00
156f558a45 重构预缓存实用程序, 并保留分离式旧版本 2025-10-02 14:55:24 +08:00
e96832a60c 若干改进 2025-10-02 14:40:40 +08:00
e1a5de74ad 更新自述文件 2025-10-02 00:59:02 +08:00
90339be30e 更新自述文件 2025-10-02 00:58:30 +08:00
4da80d26a3 添加规划计算器实用程序, 并添加 shebang 和一些优化 2025-10-02 00:40:41 +08:00
cb062788a7 增加再学习机制和状态反馈 2025-10-01 12:59:41 +08:00
acfd179435 改进 2025-09-30 22:27:34 +08:00
d1b606782f 新建单元集界面改进 2025-09-30 00:38:48 +08:00
784cf41003 新建单元集界面 2025-09-30 00:13:54 +08:00
0f342c96ee 更新版本号 2025-09-29 23:55:44 +08:00
3022ada51a 更新准备界面 2025-09-29 23:53:07 +08:00
9dada59c56 加入时间戳修正配置 2025-09-29 23:30:51 +08:00
8683693cd1 将数据文件移至单独的 Nucleons 仓库维护 2025-09-22 12:34:01 +08:00
144d7cb83c 修补 2025-09-15 12:37:07 +08:00
44573624aa 问题修复 2025-09-14 23:29:01 +08:00
1b33bb8618 兼容旧版 Textual 2025-09-14 23:27:59 +08:00
d83025d818 兼容旧版 Textual 2025-09-14 23:26:49 +08:00
f7e93cf05f 兼容旧版 Textual 2025-09-14 23:25:06 +08:00
e30cefeb44 兼容旧版 Textual 2025-09-14 23:23:13 +08:00
7dc963d491 兼容旧版 Textual 2025-09-14 23:22:03 +08:00
3640d8a799 算法修复与实验性扩展支持 2025-09-14 23:19:24 +08:00
1ea34ab87a 更新版本号 2025-09-14 02:08:16 +08:00
2bedc686a5 修复空文件夹 2025-09-14 02:07:53 +08:00
19d0e32b6f 更新用户界面, 修复总复习 2025-09-14 01:54:33 +08:00
50a5b9b108 添加开源许可证 2025-09-11 00:19:34 +08:00
5a096e4b4f 更新版本号 2025-09-11 00:05:18 +08:00
65491117d3 问题修复与更新 2025-09-11 00:04:02 +08:00
c82eedde82 问题修复与更新 2025-09-11 00:03:20 +08:00
697e3b2b8f 更新配置文件 2025-09-10 23:51:28 +08:00
11eff7da43 更新说明 2025-09-10 23:29:53 +08:00
c93bcdd489 删除无用文件 2025-09-10 23:27:34 +08:00
bb99b0a0b7 增加软件管理实用程序与音频缓存机制修复 2025-09-10 23:27:02 +08:00
6293b69ef0 改进 2025-09-08 13:59:09 +08:00
afb7252f71 若干改进 2025-09-08 13:44:14 +08:00
5e96fc8138 更新文件树 2025-08-30 22:03:34 +08:00
e64d1711d0 更新 README 2025-08-30 00:06:16 +08:00
beeb2fd318 改进 🍰 2025-08-28 13:08:37 +08:00
fc58f61dfe 新增源文件 2025-08-23 23:02:30 +08:00
e11e7e781b 添加更多源文件 2025-08-23 13:44:03 +08:00
107 changed files with 789 additions and 2987 deletions

217
.gitignore vendored
View File

@@ -1,6 +1,215 @@
.vscode
# Project specific additions
.devflag
.vscode/
.directory
__pycache__/
scripts/
.idea
cache
.idea/
cache/
nucleon/test.toml
electron/test.toml
*.egg-info/
build/
dist/
# Project specific directories
config/
data/cache/
data/electrion/
data/nucleon/
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly used for packaging.
#poetry.lock
#poetry.toml
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
#pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
.idea/
# Audio cache and temporary files
*.mp3
*.wav
*.ogg
*.tmp
# LLM cache files
*.cache
*.jsonl
# Log files
*.log
logs/
# OS generated files
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db
# Windows
Thumbs.db
ehthumbs.db
Desktop.ini
# Linux
*~
# VSCode
.vscode/
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
*.code-workspace
# Temporary files
*.tmp
*.temp

View File

@@ -1,2 +0,0 @@
def playsound(p):
print(p)

View File

@@ -1,7 +0,0 @@
# 贡献指南
## 使用 Nuitka 静态编译
运行
```bash
nuitka --clang --jobs=6 --standalone --onefile main.py
```

View File

@@ -1,71 +0,0 @@
# 潜进 (HeurAMS) - 启发式辅助记忆程序
> 形人而我无形,**则我专而敌分**
## 概述
"潜进" (HeurAMS) 是为习题册, 古诗词, 及其他问答/记忆/理解型知识设计的辅助记忆软件, 提供动态规划的优化记忆方案
## 技术集成与特性
### 间隔迭代算法
> 许多出版物都广泛讨论了不同重复间隔对学习效果的影响。特别是,间隔效应被认为是一种普遍现象。间隔效应是指,如果重复的间隔是分散/稀疏的,而不是集中重复,那么学习任务的表现会更好。因此,有观点提出,学习中使用的最佳重复间隔是**最长的、但不会导致遗忘的间隔**。
- 采用经实证的 SM-2 间隔迭代算法, 此算法亦用作 Anki 闪卡记忆软件的默认闪卡调度器
> 计划: 将添加 FSRS 算法 (Anki 的新可选闪卡调度器) 与一种 SM-15 变体算法作为后续替代
> 参考 https://github.com/slaypni/SM-15
> 使用 SM-15 的变体:
> SM-2 后续算法并非完全开放, 故使用一种基于 SM-15 描述实现的变体算法
- 动态规划每个记忆单元的记忆间隔时间表
- 动态跟踪记忆反馈数据,优化长期记忆保留率与稳定性
### 学习进程优化
- 逐字解析:支持逐字详细释义解析
- 语法分析:接入生成式人工智能, 支持古文结构交互式解析
- 自然语音:集成微软神经网络文本转语音 (TTS) 技术
### 现代用户界面
- 响应式 Textual 框架构建的跨平台 TUI 界面
- 支持触屏/鼠标/键盘多操作模式
- 简洁直观的复习流程设计
## 屏幕截图
> 单击图片以放大
<img src="./readme_src/img1.png" alt="img1" style="zoom: 33%;" />
<img src="./readme_src/img2.png" alt="img2" style="zoom:33%;" />
<img src="./readme_src/img3.png" alt="img3" style="zoom:33%;" />
<img src="./readme_src/img4.png" alt="img4" style="zoom:33%;" />
## 技术架构
> 有关技术与实现的细节, 请参阅 CONTRIBUTING.md
> 提交拉取请求以参与到此开放源代码项目
``` mermaid
graph TD
subgraph 后端
A[SM-2 算法] --> B[间隔迭代算法]
B --> C[迭代记忆参数]
end
subgraph 用户界面
D[展示模块] --> E[用户界面]
E --> F[进度追踪面板]
end
subgraph 外部服务
G[LLM]
H[TTS]
end
C --> D
F -->|用户数据| C
D --> G
D --> H
```
## 系统要求
- 平台支持Windows / macOS / Linux / Android (需要 Termux 或 Linux) (终端或浏览器)
- 网络连接:可预缓存语音文件, 需联网使用大模型服务功能

View File

@@ -1,297 +0,0 @@
from textual.app import App, ComposeResult
from textual.events import Event
from textual.widgets import (
Collapsible,
Header,
Footer,
Markdown,
ListView,
ListItem,
Label,
Static,
Button,
)
from textual.containers import Container, Horizontal, Center
from textual.screen import Screen
from textual.widget import Widget
import uuid
from typing import Tuple, Dict
import particles as pt
import puzzles as pz
import re
import random
import copy
class Composition:
def __init__(
self,
screen: Screen,
reactor,
atom: Tuple[pt.Electron, pt.Nucleon, Dict] = pt.Atom.placeholder(),
):
self.screen = screen
self.atom = atom
from reactor import Reactor
self.reactor: Reactor = reactor
self.reg = dict()
def regid(self, id_):
self.reg[id_] = id_ + str(uuid.uuid4())
return self.reg[id_]
def getid(self, id_):
if id_ not in self.reg.keys():
return "None"
return self.reg[id_]
def recid(self, id_):
return id_[:-36]
def compose(self):
yield Label("示例标签", id="testlabel")
yield Button("示例按钮", id="testbtn")
def handler(self, event, type_):
return 1
class Finished(Composition):
def __init__(self, screen: Screen, reactor, atom: Tuple[pt.Electron, pt.Nucleon, Dict]):
super().__init__(screen, reactor, atom)
def compose(self):
yield Label("本次记忆进程结束", id=self.regid("msg"))
class Placeholder(Composition):
def __init__(self, screen: Screen):
self.screen = screen
def compose(self):
yield Label("示例标签", id="testlabel")
yield Button("示例按钮", id="testbtn", classes="choice")
def handler(self, event, type_):
self.screen.query_one("#testlabel", Label).update("hi")
class Recognition(Composition):
def __init__(self, screen: Screen, reactor, atom: Tuple[pt.Electron, pt.Nucleon, Dict]):
super().__init__(screen, reactor, atom)
def compose(self):
with Center():
yield Static(f"[dim]{self.atom[1]['translation']}[/]")
yield Label(f"")
s = str(self.atom[1]["content"])
replace_dict = {
", ": ",",
". ": ".",
"; ": ";",
": ": ":",
"/,": ",",
"./": ".",
"/;": ";",
";/": ";",
":/": ":",
}
for old, new in replace_dict.items():
s = s.replace(old, new)
result = re.split(r"(?<=[,;:|])", s.replace("/", " "))
for i in result:
with Center():
yield Label(
f"[b][b]{i.replace('/', ' ')}[/][/]",
id=self.regid("sentence" + str(hash(i))),
)
for i in self.atom[2]["testdata"]["additional_inf"]:
if self.atom[1][i]:
if isinstance(self.atom[1][i], list):
for j in self.atom[1][i]:
yield Markdown(f"### {self.atom[2]['keydata'][i]}: {j}")
continue
if isinstance(self.atom[1][i], Dict):
t = ""
for j, k in self.atom[1][i].items(): # type: ignore
# 弱智的 Pylance 类型推导
t += f"> **{j}**: {k} \n"
yield Markdown(t, id=self.regid("tran"))
with Center():
yield Button("我已知晓", id=self.regid("ok"))
def handler(self, event, type_):
if type_ == "button":
if event.button.id == self.getid("ok"):
self.reactor.report(self.atom, 5)
return 0
return -1
class BasicEvaluation(Composition):
def __init__(self, screen: Screen, reactor, atom: Tuple[pt.Electron, pt.Nucleon, Dict]):
super().__init__(screen, reactor, atom)
def compose(self):
yield Label(self.atom[1]["content"], id="sentence")
with Container(id="button_container"):
btn = {}
btn["5"] = Button(
"完美回想", variant="success", id=self.regid("feedback5"), classes="choice"
)
btn["4"] = Button(
"犹豫后正确", variant="success", id=self.regid("feedback4"), classes="choice"
)
btn["3"] = Button(
"困难地正确", variant="warning", id=self.regid("feedback3"), classes="choice"
)
btn["2"] = Button(
"错误但熟悉", variant="warning", id=self.regid("feedback2"), classes="choice"
)
btn["1"] = Button(
"错误且不熟", variant="error", id=self.regid("feedback1"), classes="choice"
)
btn["0"] = Button(
"完全空白", variant="error", id=self.regid("feedback0"), classes="choice"
)
yield Horizontal(btn["5"], btn["4"])
yield Horizontal(btn["3"], btn["2"])
yield Horizontal(btn["1"], btn["0"])
def handler(self, event, type_):
if "feedback" in event.button.id:
assess = int(self.recid(event.button.id)[8:9])
ret = self.reactor.report(self.atom, assess)
return ret
class FillBlank(Composition):
def __init__(self, screen: Screen, reactor, atom: Tuple[pt.Electron, pt.Nucleon, Dict]):
super().__init__(screen, reactor, atom)
self.inputlist = []
self.hashtable = {}
self._work()
def _work(self):
self.puzzle = pz.BlankPuzzle(self.atom[1]["content"], 4)
self.puzzle.refresh()
self.ans = copy.copy(self.puzzle.answer)
random.shuffle(self.ans)
def compose(self):
yield Label(self.puzzle.wording, id=self.regid("sentence"))
yield Label(f"当前输入: {self.inputlist}", id=self.regid("inputpreview"))
for i in self.ans:
self.hashtable[str(hash(i))] = i
yield Button(i, id=self.regid(f"select{hash(i)}"))
yield Button("退格", id=self.regid(f"delete"))
def handler(self, event, type_):
# TODO: 改动:在线错误纠正
if type_ == "button":
if self.recid(event.button.id) == "delete":
if len(self.inputlist) > 0:
self.inputlist.pop()
else:
return 1
else:
self.inputlist.append(self.hashtable[self.recid(event.button.id)[6:]])
if len(self.inputlist) < len(self.puzzle.answer):
return 1
else:
if self.inputlist == self.puzzle.answer:
self.reactor.report(self.atom, 4)
return 0
else:
self.inputlist = []
self.reactor.report(self.atom, 2)
return 1
class DrawCard(Composition):
def __init__(self, screen: Screen, reactor, atom: Tuple[pt.Electron, pt.Nucleon, Dict]):
super().__init__(screen, reactor, atom)
self.inputlist = []
self.hashtable = {}
self._work()
def _work(self):
self.puzzle = pz.SelectionPuzzle(self.atom[1]["keyword_note"], [], 2, "选择正确词义: ") # type: ignore
self.puzzle.refresh()
def compose(self):
yield Label(self.atom[1].content.replace("/",""), id=self.regid("sentence"))
yield Label(self.puzzle.wording[len(self.inputlist)], id=self.regid("puzzle"))
yield Label(f"当前输入: {self.inputlist}", id=self.regid("inputpreview"))
for i in self.puzzle.options[len(self.inputlist)]:
self.hashtable[str(hash(i))] = i
yield Button(i, id=self.regid(f"select{hash(i)}"))
yield Button("退格", id=self.regid(f"delete"))
def handler(self, event, type_):
if type_ == "button":
if self.recid(event.button.id) == "delete":
if len(self.inputlist) > 0:
self.inputlist.pop()
else:
return 1
else:
self.inputlist.append(self.hashtable[self.recid(event.button.id)[6:]])
if len(self.inputlist) < len(self.puzzle.answer):
return 1
else:
if self.inputlist == self.puzzle.answer:
self.reactor.report(self.atom, 4)
return 0
else:
self.inputlist = []
self.reactor.report(self.atom, 2)
return 1
registry = {
"sample": Composition,
"recognition": Recognition,
"fill_blank_test": FillBlank,
"draw_card_test": DrawCard,
"basic_evaluation": BasicEvaluation,
}
class TestScreen(Screen):
def __init__(self):
super().__init__(name=None, id=None, classes=None)
self.comp = Recognition(self, None, pt.Atom.advanced_placeholder())
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
yield from self.comp.compose()
yield Footer()
def on_mount(self) -> None:
pass
def on_button_pressed(self, event: Event) -> None:
self.comp.handler(event, "button")
def action_quit_app(self) -> None:
self.app.exit()
class AppLauncher(App):
CSS_PATH = "styles.css"
TITLE = "测试布局"
BINDINGS = [("escape", "quit", "退出"), ("d", "toggle_dark", "改变色调")]
SCREENS = {
"testscreen": TestScreen,
}
def on_mount(self) -> None:
self.action_toggle_dark()
self.push_screen("testscreen")
if __name__ == "__main__":
app = AppLauncher()
app.run()

View File

@@ -1,14 +0,0 @@
# [调试] 将更改保存到文件
save = 1
# [调试] 覆写时间
time_override = -1
# [调试] 一键通过
quick_pass = 0
# 对于每个项目的新记忆核子数量
tasked_number = 8
# 竖屏适配 (未完成)
mobile_mode = 1

View File

@@ -1,40 +0,0 @@
@echo off
echo "HeurAMS 环境安装脚本"
echo "正在检测系统中是否安装 Python 3.x..."
rem 检查 Python 3 是否存在
where python >nul 2>nul
if %errorlevel% neq 0 (
echo "错误: 未检测到 Python. 请确保 Python 已添加到系统 PATH 中,然后再次运行此脚本。"
exit /b 1
)
rem 检查 Python 版本是否为 3.x
for /f "tokens=*" %%i in ('python -c "import sys; print(f'{sys.version_info.major}')"') do set PYTHON_MAJOR_VERSION=%%i
if "%PYTHON_MAJOR_VERSION%"=="3" (
for /f "tokens=*" %%i in ('python -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')"') do set PYTHON_VERSION=%%i
echo "检测到 Python 3 已安装, 版本为: %PYTHON_VERSION%"
) else (
echo "错误: 未检测到 Python 3. 请先安装 Python 3, 然后再次运行此脚本."
exit /b 1
)
echo "---"
echo "正在安装 requirements.txt 中的依赖..."
rem 检查 requirements.txt 文件是否存在
if exist "requirements.txt" (
python -m pip install -r requirements.txt
if %errorlevel% equ 0 (
echo "依赖安装成功."
) else (
echo "错误: 依赖安装失败. 请检查 requirements.txt 文件或网络连接."
exit /b 1
)
) else (
echo "警告: 未找到 requirements.txt 文件. 跳过依赖安装."
)
echo "---"
echo "HeurAMS 的环境依赖已安装"
pause

View File

@@ -1,28 +0,0 @@
#!/bin/bash
echo "HeurAMS 环境安装脚本"
echo "正在检测系统中是否安装 Python 3.x..."
if command -v python3 &>/dev/null; then
PYTHON_VERSION=$(python3 -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
echo "检测到 Python 3 已安装, 版本为: ${PYTHON_VERSION}"
else
echo "错误: 未检测到 Python 3. 请先安装 Python 3, 然后再次运行此脚本. "
exit 1 # 退出脚本, 因为 Python 3 是必需的
fi
echo "---"
echo "正在安装 requirements.txt 中的依赖..."
if [ -f "requirements.txt" ]; then
python3 -m pip install -r requirements.txt
if [ $? -eq 0 ]; then
echo "依赖安装成功. "
else
echo "错误: 依赖安装失败. 请检查 requirements.txt 文件或网络连接. "
exit 1
fi
else
echo "警告: 未找到 requirements.txt 文件. 跳过依赖安装. "
fi
echo "---"
echo "HeurAMS 的环境依赖已安装"

259
main.py
View File

@@ -1,259 +0,0 @@
from textual.app import App, ComposeResult
from textual.widgets import (
Header,
Footer,
ListView,
ProgressBar,
DirectoryTree,
ListItem,
Label,
Static,
Button,
)
from textual.containers import Container, Horizontal, Center
from textual.screen import Screen
import pathlib
import threading
import edge_tts as tts
from playsound import playsound
import particles as pt
from reactor import Reactor, Apparatus
import auxiliary as aux
import compositions as compo
import builtins
ver = "0.3.1"
config = aux.ConfigFile("config.toml")
class MemScreen(Screen):
BINDINGS = [
("d", "toggle_dark", "改变色调"),
("q", "pop_screen", "返回主菜单"),
("v", "play_voice", "朗读"),
]
if config.get("quick_pass"):
BINDINGS.append(("k", "quick_pass", "快速通过[调试]"))
btn = dict()
def __init__(
self,
nucleon_file: pt.NucleonUnion,
electron_file: pt.ElectronUnion,
tasked_num,
):
super().__init__(name=None, id=None, classes=None)
self.reactor = Reactor(nucleon_file, electron_file, self, tasked_num)
self.stage = 1
self.stage += self.reactor.set_round_templated(self.stage)
self.reactor.forward()
self.compo = next(self.reactor.current_appar)
def compose(self) -> ComposeResult:
if type(self.compo).__name__ == "Recognition":
self.action_play_voice()
yield Header(show_clock=True)
with Center():
yield Static(
f"{len(self.reactor.procession) - self.reactor.index}/{len(self.reactor.procession)}"
)
yield from self.compo.compose()
yield Footer()
def on_mount(self):
pass
def on_button_pressed(self, event):
ret = self.compo.handler(event, "button")
self._forward_judge(ret)
def _forward_judge(self, ret):
if ret == -1:
return
if ret == 0:
try:
self.compo = next(self.reactor.current_appar)
self.refresh_ui()
except StopIteration:
nxt = self.reactor.forward(1)
try:
self.compo = next(self.reactor.current_appar)
except:
pass
if nxt == -1:
if self.reactor.round_set == 0:
if self.stage == 4:
if config.get("save"):
self.reactor.save()
self.compo = compo.Finished(
self, None, pt.Atom.placeholder()
)
self.refresh_ui()
else:
self.reactor.set_round_templated(self.stage)
self.reactor.forward(1)
self.stage += 1
self.compo = next(self.reactor.current_appar)
self.refresh_ui()
return
return
else:
self.refresh_ui()
return
if ret == 1:
self.refresh_ui()
return
def refresh_ui(self):
self.call_later(self.recompose)
print(type(self.compo).__name__)
def action_play_voice(self):
print("VOICE")
def play():
cache_dir = pathlib.Path(f"./cache/voice/")
cache_dir.mkdir(parents=True, exist_ok=True)
cache = cache_dir / f"{self.reactor.current_atom[1].content.replace('/','')}.wav"
if not cache.exists():
communicate = tts.Communicate(
self.reactor.current_atom[1].content.replace("/", ""),
"zh-CN-YunjianNeural",
)
communicate.save_sync(
f"./cache/voice/{self.reactor.current_atom[1].content.replace('/','')}.wav"
)
playsound(str(cache))
threading.Thread(target=play).start()
def action_quick_pass(self):
self.reactor.report(self.reactor.current_atom, 5)
self._forward_judge(0)
def action_toggle_dark(self):
self.app.action_toggle_dark()
def action_pop_screen(self):
self.app.pop_screen()
class PreparationScreen(Screen):
BINDINGS = [("q", "go_back", "返回"), ("escape", "quit_app", "退出")]
def __init__(
self, nucleon_file: pt.NucleonUnion, electron_file: pt.ElectronUnion
) -> None:
super().__init__(name=None, id=None, classes=None)
self.nucleon_file = nucleon_file
self.electron_file = electron_file
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Container(id="learning_screen_container"):
yield Label(f"记忆项目: [b]{self.nucleon_file.name}[/b]\n")
yield Label(
f"核子文件对象: ./nucleon/[b]{self.nucleon_file.name}[/b].toml"
)
yield Label(
f"电子文件对象: ./electron/[b]{self.electron_file.name}[/b].toml"
)
yield Label(f"核子数量:{len(self.nucleon_file)}")
yield Button(
"开始记忆",
id="start_memorizing_button",
variant="primary",
classes="start-button",
)
yield Static(f"\n全文如下:\n")
yield Static(self._get_full_content().replace("/", ""), classes="full")
yield Footer()
def _get_full_content(self):
content = ""
for i in self.nucleon_file.nucleons:
content += i["content"]
return content
def action_go_back(self):
self.app.pop_screen()
def action_quit_app(self):
self.app.exit()
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "start_memorizing_button":
newscr = MemScreen(
self.nucleon_file, self.electron_file, config.get("tasked_number", 8)
)
self.app.push_screen(newscr)
class FileSelectorScreen(Screen):
global ver
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
yield Container(
Label(f'欢迎使用 "潜进" 辅助记忆软件, 版本 {ver}', classes="title-label"),
Label("选择要学习的文件:", classes="title-label"),
ListView(id="file-list", classes="file-list-view"),
)
yield Footer()
def on_mount(self) -> None:
file_list_widget = self.query_one("#file-list", ListView)
nucleon_path = pathlib.Path("./nucleon")
nucleon_files = sorted(
[f.name for f in nucleon_path.iterdir() if f.suffix == ".toml"]
)
if nucleon_files:
for filename in nucleon_files:
file_list_widget.append(ListItem(Label(filename)))
else:
file_list_widget.append(
ListItem(Static("在 ./nucleon/ 中未找到任何核子文件. 请放置文件后重启应用."))
)
file_list_widget.disabled = True
def on_list_view_selected(self, event: ListView.Selected) -> None:
if not isinstance(event.item, ListItem):
return
selected_label = event.item.query_one(Label)
if "未找到任何 .toml 文件" in str(selected_label.renderable):
return
selected_filename = str(selected_label.renderable)
nucleon_file = pt.NucleonUnion(
pathlib.Path("./nucleon") / selected_filename
)
electron_file_path = pathlib.Path("./electron") / selected_filename
if electron_file_path.exists():
pass
else:
electron_file_path.touch()
electron_file = pt.ElectronUnion(
pathlib.Path("./electron") / selected_filename
)
self.app.push_screen(PreparationScreen(nucleon_file, electron_file))
def action_quit_app(self) -> None:
self.app.exit()
class AppLauncher(App):
CSS_PATH = "styles.css"
TITLE = "潜进 - 辅助记忆程序"
BINDINGS = [("escape", "quit", "退出"), ("d", "toggle_dark", "改变色调")]
SCREENS = {
"file_selection_screen": FileSelectorScreen,
}
def on_mount(self) -> None:
self.push_screen("file_selection_screen")
if __name__ == "__main__":
app = AppLauncher()
app.run()

View File

@@ -1,154 +0,0 @@
# 散列表的键翻译
["keydata"]
note = "笔记"
keyword_note = "关键词翻译"
translation = "语句翻译"
# 测试项目元数据
["testdata"]
# 记忆时显示的额外信息
additional_inf = ["translation","keyword_note", "note"]
# 填空测试, content 指代键名
fill_blank_test = {"from"=["content"], "hint"=["translation"]}
# 选择题测试
draw_card_test = {"from"=["keyword_note"]}
["臣/密/言: /臣/以/险衅/, 夙/遭/闵凶./"]
note = []
translation = "臣子李密陈言: 我因命运不好, 小时候遭遇到了不幸"
keyword_note = {"险衅"="凶险祸患(这里指命运不好)", "夙"="早时, 这里指年幼的时候", "闵"="通'悯', 指可忧患的事", "凶"="不幸, 指丧父"}
["生孩/六月/, 慈父/见背/; /行年/四岁/, 舅/夺/母志./"]
note = []
translation = "刚出生六个月, 我慈爱的父亲就不幸去世了。经过了四年, 舅父逼母亲改嫁"
keyword_note = {"见背"="死的委婉说法", "行年"="经历的年岁", "母志"="母亲守节之志(改嫁的委婉说法)"}
["祖母/刘/愍/臣/孤弱/, 躬亲/抚养./"]
note = []
translation = "我的祖母刘氏, 怜悯我从小丧父, 便亲自对我加以抚养"
keyword_note = {"愍"="怜悯", "躬亲"="亲身"}
["臣/少/多/疾病/, 九岁/不行/, 零丁/孤苦/, 至于/成立./"]
note = []
translation = "臣小的时候经常生病, 九岁时还不会行走。孤独无靠, 一直到成人自立"
keyword_note = {"成立"="成人自立"}
["既/无/伯叔/, 终/鲜/兄弟/, 门/衰/祚/薄/, 晚/有/儿息./"]
note = []
translation = "既没有叔叔伯伯, 又没什么兄弟, 门庭衰微而福分浅薄, 很晚才有儿子"
keyword_note = {"鲜"="少, 这里指'无'", "祚薄"="福分浅薄", "儿息"="亲生子女"}
["外/无/期功/强近/之亲/, 内/无/应门/五尺/之僮/, 茕茕/孑立/, 形影/相吊./"]
note = []
translation = "在外面没有比较亲近的亲戚, 在家里又没有照应门户的童仆。生活孤单没有依靠, 每天只有自己的身体和影子相互安慰"
keyword_note = {"期功"="指关系较近的亲属", "茕茕孑立"="孤单无依靠的样子", "吊"="安慰"}
["而/刘/夙/婴/疾病/, 常/在/床蓐/, 臣/侍/汤药/, 未曾/废离./"]
note = []
translation = "但祖母又早被疾病缠绕, 常年卧床不起, 我侍奉她吃饭喝药, 从来就没有停止侍奉而离开她"
keyword_note = {"婴"="被...缠绕", "蓐"="通'褥', 床垫", "废"="停止服侍", "离"="离开"}
["逮/奉/圣朝/, 沐浴/清化./"]
note = []
translation = "到了晋朝建立, 我蒙受着清明的政治教化"
keyword_note = {"逮"="及, 到", "奉"="承奉", "圣朝"="指当时的晋朝", "沐浴清化"="蒙受清平教化"}
["前/太守/臣/逵/察/臣/孝廉/; /后/刺史/臣/荣/举/臣/秀才./"]
note = []
translation = "前任太守逵, 考察后推举臣下为孝廉, 后任刺史荣又推举臣下为优秀人才"
keyword_note = {"察"="考察和推举", "孝廉"="孝顺, 品性纯洁", "举"="推举", "秀才"="优秀人才"}
["臣/以/供养/无主/, 辞/不赴命./"]
note = []
translation = "臣下因为供奉赡养祖母的事无人承担, 辞谢不接受任命"
keyword_note = {"无主"="无人承担"}
["诏书/特下/, 拜/臣/郎中/, 寻/蒙/国恩/, 除/臣/洗马./"]
note = []
translation = "朝廷又特地下了诏书, 任命我为郎中, 不久又蒙受国家恩命, 任命我为太子洗马"
keyword_note = {"拜"="授予官职", "郎中"="尚书省的属官", "寻"="不久", "除"="拜官受职", "洗马"="太子的属官"}
["猥/以/微贱/, 当/侍/东宫/, 非/臣/陨首/所能/上报./"]
note = []
translation = "像我这样出身微贱地位卑下的人, 担当侍奉太子的职务, 这实在不是我杀身捐躯所能报答朝廷的"
keyword_note = {"猥"="谦词", "微贱"="卑微低贱", "东宫"="太子居处", "陨首"="杀身"}
["臣/具/以表/闻/, 辞/不就职./"]
note = []
translation = "我将以上苦衷上表报告, 加以推辞不去就职"
keyword_note = {"具"="详细", "闻"="使...知道"}
["诏书/切峻/, 责/臣/逋慢/; /郡县/逼迫/, 催/臣/上道/; /州司/临门/, 急于/星火./"]
note = []
translation = "但是诏书急切严峻, 责备我逃避命令, 有意拖延, 态度傲慢。郡县长官催促我立刻上路; 州官登门督促, 比流星坠落还要急迫"
keyword_note = {"切峻"="急切而严厉", "逋慢"="逃避怠慢", "星火"="流星的光, 喻急迫"}
["臣/欲/奉诏/奔驰/, 则/刘/病/日笃/, 欲/苟/顺/私情/, 则/告诉/不许./"]
note = []
translation = "我很想遵从皇上的旨意赴京就职, 但祖母刘氏的病却一天比一天重; 想要姑且顺从自己的私情, 但报告申诉不被允许"
keyword_note = {"日笃"="病情日益加重", "苟"="姑且", "告诉"="报告申诉"}
["臣/之/进退/, 实为/狼狈./"]
note = []
translation = "我是进退两难, 十分狼狈"
keyword_note = {"狼狈"="进退两难的样子"}
["伏惟/圣朝/以/孝/治/天下/, 凡/在/故老/, 犹/蒙/矜育/, 况/臣/孤苦/, 特为/尤甚./"]
note = []
translation = "我俯伏思量晋朝是用孝道来治理天下的, 凡是年老而德高的旧臣, 尚且还受到怜悯养育, 何况我的孤苦程度更为严重呢"
keyword_note = {"伏惟"="下对上的敬辞", "故老"="年老德高的旧臣", "矜育"="怜悯养育"}
["且/臣/少/仕/伪朝/, 历职/郎署/, 本图/宦达/, 不矜/名节./"]
note = []
translation = "况且我年轻的时候曾经做过蜀汉的官, 担任过郎官职务, 本来就希望做官显达, 并不顾惜名声节操"
keyword_note = {"伪朝"="对前朝的蔑称", "历职"="连续任职", "郎署"="尚书郎的官衙", "宦达"="官场上显达"}
["今/臣/亡国/贱俘/, 至微/至陋/, 过/蒙/拔擢/, 宠命/优渥/, 岂敢/盘桓/, 有所/希冀!/"]
note = []
translation = "现在我是一个低贱的亡国俘虏, 十分卑微浅陋, 受到过分提拔, 恩宠优厚, 怎敢犹豫不决而有非分的企求呢"
keyword_note = {"拔擢"="提拔", "优渥"="优厚", "盘桓"="徘徊不前", "希冀"="非分的企求"}
["但/以/刘/日薄/西山/, 气息/奄奄/, 人命/危浅/, 朝不/虑夕./"]
note = []
translation = "只是因为祖母刘氏寿命即将终了, 气息微弱, 生命垂危, 早上不能想到晚上怎样"
keyword_note = {"日薄西山"="太阳接近西山, 喻人寿命将终", "危浅"="生命垂危"}
["臣/无/祖母/, 无以/至今日/, 祖母/无/臣/, 无以/终余年./"]
note = []
translation = "臣下我如果没有祖母, 就没有今天的样子; 祖母如果没有我的照料, 也无法度过她的余生"
keyword_note = {"终余年"="度过余生"}
["母孙/二人/, 更相/为命/, 是以/区区/不能/废远./"]
note = []
translation = "我们祖孙二人, 互相依靠而维持生命, 因此我的内心不愿废止奉养, 远离祖母"
keyword_note = {"更相"="相互", "区区"="自己的私情", "废远"="废止奉养而远离"}
["臣/密/今年/四十/有四/, 祖母/今年/九十/有六/, 是/臣/尽节/于/陛下/之日/长/, 报养/刘/之日/短./"]
note = []
translation = "臣下我现在的年龄四十四岁了, 祖母现在的年龄九十六岁了, 臣下我在陛下面前尽忠尽节的日子还长着呢, 而在祖母刘氏面前尽孝尽心的日子已经不多了"
keyword_note = {"有"="又", "尽节"="尽忠节"}
["乌鸟/私情/, 愿/乞/终养./"]
note = []
translation = "我怀着乌鸦反哺的私情, 乞求能够准许我完成对祖母养老送终的心愿"
keyword_note = {"乌鸟私情"="乌鸦反哺之情, 喻孝心", "终养"="养老至终"}
["臣/之/辛苦/, 非独/蜀之/人士/及/二州/牧伯/所见/明知/, 皇天/后土/, 实所/共鉴./"]
note = []
translation = "我的辛酸苦楚, 并不仅仅被蜀地的百姓及益州、梁州的长官所亲眼目睹、内心明白, 连天地神明也都看得清清楚楚"
keyword_note = {"辛苦"="辛酸苦楚", "牧伯"="州郡长官", "皇天后土"="天地神明", "鉴"="明察"}
["愿/陛下/矜悯/愚诚/, 听/臣/微志/, 庶/刘/侥幸/, 保/卒/余年./"]
note = []
translation = "希望陛下能怜悯我愚昧诚心, 请允许我完成臣下一点小小的心愿, 使祖母刘氏能够侥幸地保全她的余生"
keyword_note = {"矜悯"="怜悯", "听"="准许", "庶"="或许(表希望)", "卒余年"="终老"}
["臣/生/当/陨首/, 死/当/结草./"]
note = []
translation = "我活着应当杀身报效朝廷, 死了也要结草衔环来报答陛下的恩情"
keyword_note = {"陨首"="掉脑袋, 指献出生命", "结草"="死后报恩的典故"}
["臣/不胜/犬马/怖惧/之情/, 谨/拜表/以闻./"]
note = []
translation = "臣下我怀着牛马一样不胜恐惧的心情, 恭敬地呈上此表来使陛下知道这件事"
keyword_note = {"不胜"="禁不住", "犬马怖惧"="臣子谦卑的自比", "闻"="使...知道"}

View File

@@ -1,52 +0,0 @@
# 音频预缓存实用程序, 独立于主程序之外, 但依赖 particles 组件
import particles as pt
import edge_tts as tts
from pathlib import Path
import shutil
def precache(text: str):
"""预缓存单个文本的音频"""
cache_dir = Path("./cache/voice/")
cache_dir.mkdir(parents=True, exist_ok=True)
cache = cache_dir / f"{text}.wav"
if not cache.exists():
communicate = tts.Communicate(text, "zh-CN-YunjianNeural")
communicate.save_sync(f"./cache/voice/{text}.wav")
def proc_file(path: Path):
"""处理单个文件"""
nu = pt.NucleonUnion(path)
c = 0
for i in nu.nucleons:
c += 1
print(f"预缓存 [{nu.name}] ({c}/{len(nu)}): {i['content'].replace('/', '')}")
precache(i['content'].replace('/', ''))
def walk(path_str: str):
"""遍历目录处理所有文件"""
path = Path(path_str)
print(f"正在遍历目录: {path}")
for item in path.iterdir():
if item.is_file() and item.suffix == ".toml":
print(f"正预缓存文件: {item.name}")
proc_file(item)
elif item.is_dir():
print(f"进入目录: {item.name}")
if __name__ == "__main__":
print("音频预缓存实用程序")
print("A: 全部缓存")
print("C: 清空缓存")
choice = input("输入选项 $ ").upper()
if choice == "A":
walk("./nucleon")
elif choice == "C":
shutil.rmtree("./cache/voice", ignore_errors=True)
print("缓存已清空")

View File

@@ -1,121 +0,0 @@
import random
class Puzzle:
pass
class BlankPuzzle(Puzzle):
"""填空题谜题生成器
Args:
text: 原始字符串(需要 "/" 分割句子, 末尾应有 "/")
min_denominator: 最小概率倒数(如占所有可生成填空数的 1/7 中的 7, 若期望值小于 1, 则取 1)
"""
def __init__(self, text: str, min_denominator: int):
self.text = text
self.min_denominator = min_denominator
self.wording = "填空题 - 尚未刷新谜题"
self.answer = ["填空题 - 尚未刷新谜题"]
def refresh(self): # 刷新谜题
placeholder = "___SLASH___"
tmp_text = self.text.replace("/", placeholder)
words = tmp_text.split(placeholder)
if not words:
return
words = [word for word in words if word]
num_blanks = min(max(1, len(words) // self.min_denominator), len(words))
indices_to_blank = random.sample(range(len(words)), num_blanks)
indices_to_blank.sort()
blanked_words = list(words)
answer = list()
for index in indices_to_blank:
blanked_words[index] = "__" * len(words[index])
answer.append(words[index])
self.answer = answer
self.wording = "".join(blanked_words)
def __str__(self):
return f"{self.wording}\n{str(self.answer)}"
class SelectionPuzzle(Puzzle):
"""选择题谜题生成器
Args:
mapping: 正确选项映射 {问题: 答案}
jammer: 干扰项列表
max_riddles_num: 最大生成谜题数 (默认2个)
prefix: 问题前缀
"""
def __init__(
self,
mapping: dict,
jammer: list,
max_riddles_num: int = 2,
prefix: str = ""
):
self.prefix = prefix
self.mapping = mapping
self.jammer = list(set(jammer + list(mapping.values())))
while len(self.jammer) < 4:
self.jammer.append(" ")
self.max_riddles_num = max(1, min(max_riddles_num, 5))
self.wording = "选择题 - 尚未刷新谜题"
self.answer = ["选择题 - 尚未刷新谜题"]
self.options = []
def refresh(self):
"""刷新谜题,根据题目数量生成适当数量的谜题"""
if not self.mapping:
self.wording = "无可用题目"
self.answer = ["无答案"]
self.options = []
return
num_questions = min(self.max_riddles_num, len(self.mapping))
questions = random.sample(list(self.mapping.items()), num_questions)
puzzles = []
answers = []
all_options = []
for question, correct_answer in questions:
options = [correct_answer]
available_jammers = [
j for j in self.jammer if j != correct_answer
]
if len(available_jammers) >= 3:
selected_jammers = random.sample(available_jammers, 3)
else:
selected_jammers = random.choices(available_jammers, k=3)
options.extend(selected_jammers)
random.shuffle(options)
puzzles.append(question)
answers.append(correct_answer)
all_options.append(options)
question_texts = []
for i, puzzle in enumerate(puzzles):
question_texts.append(f"{self.prefix}:\n {i+1}. {puzzle}")
self.wording = question_texts
self.answer = answers
self.options = all_options
def __str__(self):
return f"{self.wording}\n正确答案: {', '.join(self.answer)}"
if __name__ == "__main__":
puz = SelectionPuzzle(
{"1+1": "2", "1+2": "3", "1+3": "4"},
["2", "5", "0"],
3,
'求值: '
)
puz.refresh()
print(puz.wording)
print(puz.answer)
print(puz.options)

11
pyproject.toml Normal file
View File

@@ -0,0 +1,11 @@
[build-system]
requires = ["setuptools>=45", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "heurams"
version = "0.4.0"
description = "Heuristic Assisted Memory Scheduler"
[tool.setuptools.packages.find]
where = ["src"]

View File

@@ -1,150 +0,0 @@
import typing
import particles as pt
import pathlib
import auxiliary as aux
import compositions as comps
import random
#from pprint import pprint as print # debug
class Apparatus():
"""反应器对象, 决策一个原子的不同记忆方式, 并反馈到布局"""
def __init__(self, screen, reactor, atom):
self.electron: pt.Electron = atom[0]
self.nucleon: pt.Nucleon = atom[1]
self.positron: dict = atom[2]
self.testdata = self.positron["testdata"]
self.procession: typing.List[comps.Composition] = list()
if self.positron["is_new_activation"] == 1:
self.positron["is_new_activation"] = 0
self.procession.append(comps.registry["recognition"](screen, reactor, atom))
return
for i in self.positron["testdata"].keys():
if i == "additional_inf":
continue
self.procession.append(comps.registry[i](screen, reactor, atom))
# self.procession.reverse()
random.shuffle(self.procession)
def iterator(self):
yield from self.procession
class Reactor():
"""反应堆对象, 处理和分配一次文件记忆流程的资源与策略"""
def __init__(self, nucleon_file: pt.NucleonUnion, electron_file: pt.ElectronUnion, screen, tasked_num):
# 导入原子对象
self.nucleon_file = nucleon_file
self.electron_file = electron_file
self.tasked_num = tasked_num
self.atoms_new = list()
self.atoms_review = list()
counter = self.tasked_num
self.screen = screen
self.electron_dict = electron_file.electrons_dict
self.quality_dict = {}
def electron_dict_get_fallback(key) -> pt.Electron:
value = self.electron_dict.get(key)
# 如果值不存在,则设置默认值
if value is None:
value = pt.Electron(key, {}) # 获取默认值
self.electron_dict[key] = value # 将默认值存入字典
electron_file.sync()
return value # 返回获取的值(可能是默认值)
for nucleon in nucleon_file.nucleons:
# atom = (Electron, Nucleon, Positron) 即 (记忆元数据, 内容元数据, 运行时数据)
atom = (electron_dict_get_fallback(nucleon.content), nucleon, {}) # 使用 "Positron" 代称 atom[2]
atom[2]["testdata"] = nucleon_file.testdata
atom[2]["keydata"] = nucleon_file.keydata
if atom[0]["is_activated"] == 0:
if counter > 0:
atom[2]["is_new_activation"] = 1
atom[0]["is_activated"] = 1
self.atoms_new.append(atom)
counter -= 1
else:
atom[2]["is_new_activation"] = 0
if int(atom[0]["next_date"]) <= aux.get_daystamp():
atom[0]["last_date"] = aux.get_daystamp()
self.atoms_review.append(atom)
# 设置运行时
self.index: int
self.procession: list
self.failed: list
self.round_title: str
self.current_atom: typing.Tuple[pt.Electron, pt.Nucleon, dict]
self.round_set = 0
self.current_atom = pt.Atom.placeholder()
#print(self.atoms_new)
def set_round(self, title, procession):
self.round_set = 1
self.round_title = title
self.procession = procession
self.failed = list()
self.index = -1
def set_round_templated(self, stage):
titles = {
1: "复习模式",
2: "新记忆模式",
3: "总复习模式"
}
processions = {
1: self.atoms_review,
2: self.atoms_new,
3: (self.atoms_new + self.atoms_review)
}
ret = 1
if stage == 1 and len(processions[1]) == 0:
stage = 2
ret = 2
self.set_round(title=titles[stage], procession=processions[stage])
return ret
def forward(self, step = 1):
"""
返回值规则:
1: 重定向至 failed
-1: 此轮已完成
0: 下一个记忆单元
"""
if self.index + step >= len(self.procession):
if len(self.failed) > 0:
self.procession = self.failed
self.index = -1
self.forward(step)
if "- 额外复习" not in self.round_title:
self.round_title += " - 额外复习"
self.failed = list()
return 1 # 自动重定向到 failed
else:
self.round_set = 0
return -1 # 此轮已完成
self.index += step
self.current_atom = self.procession[self.index]
self.current_appar = Apparatus(self.screen, self, self.current_atom).iterator()
return 0
def save(self):
self._deploy_report()
print("Progress saved")
# self.nucleon_file.save()
self.electron_file.save()
def _deploy_report(self):
"部署所有 _report"
for e, q in self.quality_dict.items():
if q == -1:
e.revisor(5, True)
continue
e.revisor(q)
def report(self, atom, quality):
"向反应器和最低质量记录汇报"
if atom in self.atoms_new:
self.quality_dict[atom[0]] = -1
print(self.quality_dict)
return
self.quality_dict[atom[0]] = min(quality, self.quality_dict.get(atom[0], 5))
if quality <= 3:
self.failed.append(atom)
print(self.quality_dict)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 120 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 559 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 177 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 161 KiB

View File

@@ -1,8 +0,0 @@
aiohttp==3.12.13
aiohttp_jinja2==1.6
edge_tts==7.0.2
Jinja2==3.1.6
playsound==1.2.2
rich==14.1.0
textual==5.0.1
toml==0.10.2

View File

@@ -1,3 +0,0 @@
from webshare import server
server = server.Server("python3 main.py", title="辅助记忆程序", host="0.0.0.0")
server.serve()

24
src/heurams/__main__.py Normal file
View File

@@ -0,0 +1,24 @@
#!/usr/bin/env python3
from textual.app import App
import screens
import os
class AppLauncher(App):
CSS_PATH = "styles.css"
TITLE = "潜进 - 辅助记忆调度器"
BINDINGS = [("escape", "quit", "退出"), ("d", "toggle_dark", "改变色调")]
SCREENS = {
"dashboard": screens.DashboardScreen,
}
def on_mount(self) -> None:
self.push_screen("dashboard")
if __name__ == "__main__":
script_dir = os.path.dirname(os.path.abspath(__file__))
os.chdir(script_dir)
os.makedirs("electron", exist_ok=True)
os.makedirs("nucleon", exist_ok=True)
os.makedirs("cache/voice", exist_ok=True)
app = AppLauncher()
app.run()

31
src/heurams/context.py Normal file
View File

@@ -0,0 +1,31 @@
"""
全局上下文管理模块
"""
from contextvars import ContextVar
from typing import Optional
from heurams.services.config import ConfigFile
config_var: ContextVar[ConfigFile] = ContextVar('config_var', default=ConfigFile("")) # 配置文件
runtime_var: ContextVar = ContextVar('runtime_var', default=None) # 运行时共享数据
class ConfigContext:
"""
功能完备的上下文管理器
用于临时切换配置的作用域, 支持嵌套使用
Example:
>>> with ConfigContext(test_config):
... get_daystamp() # 使用 test_config
>>> get_daystamp() # 恢复原配置
"""
def __init__(self, config_provider: ConfigFile):
self.config_provider = config_provider
self._token = None
def __enter__(self):
self._token = config_var.set(self.config_provider)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
config_var.reset(self._token) # type: ignore

View File

@@ -0,0 +1,140 @@
class MemScreen(Screen):
BINDINGS = [
("d", "toggle_dark", "改变色调"),
("q", "pop_screen", "返回主菜单"),
("v", "play_voice", "朗读"),
# ("p", "precache_current", "预缓存当前单元集"), # 新增预缓存快捷键
]
if config.get("quick_pass"):
BINDINGS.append(("k", "quick_pass", "快速通过[调试]"))
btn = dict()
def __init__(
self,
nucleon_file: pt.NucleonUnion,
electron_file: pt.ElectronUnion,
tasked_num,
):
super().__init__(name=None, id=None, classes=None)
self.nucleon_file = nucleon_file
self.electron_file = electron_file
self.reactor = Reactor(nucleon_file, electron_file, self, tasked_num)
self.stage = 1
self.stage += self.reactor.set_round_templated(self.stage)
first_forward = self.reactor.forward()
print(first_forward)
if first_forward == -1:
self.stage = 3
self.reactor.set_round_templated(3)
print(self.reactor.forward())
#self._forward_judge(first_forward)
self.compo = next(self.reactor.current_appar)
self.feedback_state = 0 # 默认状态
self.feedback_state_map = {
0: "",
255: "回答有误, 请重试. 或者重新学习此单元",
}
def compose(self) -> ComposeResult:
if type(self.compo).__name__ == "Recognition":
self.action_play_voice()
yield Header(show_clock=True)
with Center():
yield Static(
f"当前进度: {len(self.reactor.procession) - self.reactor.index}/{len(self.reactor.procession)}"
)
yield Label(self.feedback_state_map[self.feedback_state])
yield from self.compo.compose()
if self.feedback_state == 255:
yield Button("重新学习此单元", id="re-recognize", variant="warning")
yield Footer()
def on_mount(self):
pass
def on_button_pressed(self, event):
try:
if event.button.id == "re-recognize":
return
except:
pass
ret = self.compo.handler(event, "button")
self._forward_judge(ret)
def _forward_judge(self, ret):
self.feedback_state = 0
if ret == -1:
return
if ret == 0:
try:
self.compo = next(self.reactor.current_appar)
self.refresh_ui()
except StopIteration:
nxt = self.reactor.forward(1)
try:
self.compo = next(self.reactor.current_appar)
except:
pass
if nxt == -1:
if self.reactor.round_set == 0:
if self.stage == 4:
if config.get("save"):
self.reactor.save()
self.compo = compo.Finished(
self, None, pt.Atom.placeholder()
)
self.refresh_ui()
else:
self.reactor.set_round_templated(self.stage)
self.reactor.forward(1)
self.stage += 1
self.compo = next(self.reactor.current_appar)
self.refresh_ui()
return
return
else:
self.refresh_ui()
return
if ret >= 1:
if ret == 2:
self.feedback_state = 255 # 表示错误
else:
self.feedback_state = 0
self.refresh_ui()
return
def refresh_ui(self):
self.call_later(self.recompose)
print(type(self.compo).__name__)
def action_play_voice(self):
def play():
cache_dir = pathlib.Path(f"./cache/voice/")
cache_dir.mkdir(parents=True, exist_ok=True)
cache = cache_dir / f"{aux.get_md5(self.reactor.current_atom[1].content.replace('/',''))}.wav"
if not cache.exists():
import edge_tts as tts
communicate = tts.Communicate(
self.reactor.current_atom[1].content.replace("/", ""),
"zh-CN-XiaoxiaoNeural",
)
communicate.save_sync(
f"./cache/voice/{aux.get_md5(self.reactor.current_atom[1].content.replace('/',''))}.wav"
)
playsound(str(cache))
threading.Thread(target=play).start()
def action_precache_current(self):
"""预缓存当前单元集的音频"""
precache_screen = PrecachingScreen(self.nucleon_file)
self.app.push_screen(precache_screen)
def action_quick_pass(self):
self.reactor.report(self.reactor.current_atom, 5)
self._forward_judge(0)
def action_toggle_dark(self):
self.app.action_toggle_dark()
def action_pop_screen(self):
self.app.pop_screen()

View File

@@ -0,0 +1,185 @@
class PrecachingScreen(Screen):
"""预缓存音频文件屏幕"""
BINDINGS = [("q", "go_back", "返回"), ("escape", "quit_app", "退出")]
def __init__(self, nucleon_file = None):
super().__init__(name=None, id=None, classes=None)
self.nucleon_file = nucleon_file
self.is_precaching = False
self.current_file = ""
self.current_item = ""
self.progress = 0
self.total = 0
self.processed = 0
self.precache_worker = None
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
with Container(id="precache_container"):
yield Label("[b]音频预缓存[/b]", classes="title-label")
if self.nucleon_file:
yield Static(f"目标单元集: [b]{self.nucleon_file.name}[/b]", classes="target-info")
yield Static(f"单元数量: {len(self.nucleon_file.nucleons)}", classes="target-info")
else:
yield Static("目标: 所有单元集", classes="target-info")
yield Static(id="status", classes="status-info")
yield Static(id="current_item", classes="current-item")
yield ProgressBar(total=100, show_eta=False, id="progress_bar")
with Horizontal(classes="button-group"):
if not self.is_precaching:
yield Button("开始预缓存", id="start_precache", variant="primary")
else:
yield Button("取消预缓存", id="cancel_precache", variant="error")
yield Button("清空缓存", id="clear_cache", variant="warning")
yield Button("返回", id="go_back", variant="default")
yield Footer()
def on_mount(self):
"""挂载时初始化状态"""
self.update_status("就绪", "等待开始...")
def update_status(self, status, current_item="", progress=None):
"""更新状态显示"""
status_widget = self.query_one("#status", Static)
item_widget = self.query_one("#current_item", Static)
progress_bar = self.query_one("#progress_bar", ProgressBar)
status_widget.update(f"状态: {status}")
item_widget.update(f"当前项目: {current_item}" if current_item else "")
if progress is not None:
progress_bar.progress = progress
progress_bar.advance(0) # 刷新显示
def precache_single_text(self, text: str):
"""预缓存单个文本的音频"""
cache_dir = pathlib.Path("./cache/voice/")
cache_dir.mkdir(parents=True, exist_ok=True)
cache = cache_dir / f"{aux.get_md5(text)}.wav"
if not cache.exists():
try:
import edge_tts as tts
communicate = tts.Communicate(text, "zh-CN-XiaoxiaoNeural")
communicate.save_sync(str(cache))
return True
except Exception as e:
print(f"预缓存失败 '{text}': {e}")
return False
return True
def precache_file(self, nucleon_union: pt.NucleonUnion):
"""预缓存单个文件的所有内容"""
self.current_file = nucleon_union.name
total_items = len(nucleon_union.nucleons)
for idx, nucleon in enumerate(nucleon_union.nucleons):
# 检查是否被取消
worker = get_current_worker()
if worker and worker.is_cancelled:
return False
text = nucleon['content'].replace('/', '')
self.current_item = text[:50] + "..." if len(text) > 50 else text
self.processed += 1
# 更新进度
progress = int((self.processed / self.total) * 100) if self.total > 0 else 0
self.update_status(
f"处理中: {nucleon_union.name} ({idx+1}/{total_items})",
self.current_item,
progress
)
# 预缓存音频
success = self.precache_single_text(text)
if not success:
self.update_status("错误", f"处理失败: {self.current_item}")
time.sleep(1) # 短暂暂停以便用户看到错误信息
return True
def precache_all_files(self):
"""预缓存所有文件"""
nucleon_path = pathlib.Path("./nucleon")
nucleon_files = [f for f in nucleon_path.iterdir() if f.suffix == ".toml"]
# 计算总项目数
self.total = 0
for file in nucleon_files:
try:
nu = pt.NucleonUnion(file)
self.total += len(nu.nucleons)
except:
continue
self.processed = 0
self.is_precaching = True
for file in nucleon_files:
try:
nu = pt.NucleonUnion(file)
if not self.precache_file(nu):
break # 用户取消
except Exception as e:
print(f"处理文件失败 {file}: {e}")
continue
self.is_precaching = False
self.update_status("完成", "所有音频文件已预缓存", 100)
def precache_single_file(self):
"""预缓存单个文件"""
if not self.nucleon_file:
return
self.total = len(self.nucleon_file.nucleons)
self.processed = 0
self.is_precaching = True
success = self.precache_file(self.nucleon_file)
self.is_precaching = False
if success:
self.update_status("完成", f"'{self.nucleon_file.name}' 音频文件已预缓存", 100)
else:
self.update_status("已取消", "预缓存操作被用户取消", 0)
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "start_precache" and not self.is_precaching:
# 开始预缓存
if self.nucleon_file:
self.precache_worker = self.run_worker(self.precache_single_file, thread=True)
else:
self.precache_worker = self.run_worker(self.precache_all_files, thread=True)
elif event.button.id == "cancel_precache" and self.is_precaching:
# 取消预缓存
if self.precache_worker:
self.precache_worker.cancel()
self.is_precaching = False
self.update_status("已取消", "预缓存操作被用户取消", 0)
elif event.button.id == "clear_cache":
# 清空缓存
try:
shutil.rmtree("./cache/voice", ignore_errors=True)
self.update_status("已清空", "音频缓存已清空", 0)
except Exception as e:
self.update_status("错误", f"清空缓存失败: {e}")
elif event.button.id == "go_back":
self.action_go_back()
def action_go_back(self):
if self.is_precaching and self.precache_worker:
self.precache_worker.cancel()
self.app.pop_screen()
def action_quit_app(self):
if self.is_precaching and self.precache_worker:
self.precache_worker.cancel()
self.app.exit()

View File

@@ -1,8 +1,9 @@
#!/usr/bin/env python3
import pathlib
import toml
import time
import auxiliary as aux
import heurams.services.timer as timer
from typing import List
class Electron:
"""电子: 记忆分析元数据及算法"""
@@ -79,8 +80,8 @@ class Electron:
self.metadata['interval'] * self.metadata['efactor']
)
self.metadata['last_date'] = aux.get_daystamp()
self.metadata['next_date'] = aux.get_daystamp() + self.metadata['interval']
self.metadata['last_date'] = timer.get_daystamp()
self.metadata['next_date'] = timer.get_daystamp() + self.metadata['interval']
self.metadata['last_modify'] = time.time()
def __str__(self):
@@ -155,7 +156,7 @@ class Nucleon:
return Nucleon("核子对象样例内容", {})
class NucleonUnion:
class NucleonUnion():
"""
替代原有 NucleonFile , 支持复杂逻辑
@@ -171,23 +172,31 @@ class NucleonUnion:
path (Path): 包含核子数据的文件路径
"""
def __init__(self, path):
def __init__(self, path: pathlib.Path):
self.path = path
self.name = path.name.replace(path.suffix, "")
with open(path, 'r') as f:
all = toml.load(f)
lst = list()
for i in all.keys():
if "attr" in i:
continue
if "data" in i:
continue
lst.append(Nucleon(i, all[i]))
self.keydata = all["keydata"]
self.testdata = all["testdata"]
self.nucleons = lst
self.nucleons: List[Nucleon] = lst
self.nucleons_dict = {i.content: i for i in lst}
def __len__(self):
return len(self.nucleons)
def linked_electron_union(self):
if (self.path.parent / '..' / 'electron' / self.path.name).exists():
return ElectronUnion(self.path.parent / '..' / 'electron' / self.path.name)
else:
return 0
def save(self):
with open(self.path, 'w') as f:
@@ -197,58 +206,40 @@ class NucleonUnion:
class ElectronUnion:
"""取代原有 ElectronFile 类, 以支持复杂逻辑"""
def __init__(self, path):
self.path = path
print(path)
self.name = path.name.replace(path.suffix, "")
with open(path, 'r') as f:
all = toml.load(f)
lst = list()
for i in all.keys():
lst.append(Electron(i, all[i]))
if i != "total":
lst.append(Electron(i, all[i]))
self.total = all.get("total", {"last_date": 0})
self.electrons = lst
self.electrons_dict = {i.content: i for i in lst}
def sync(self):
"""同步 electrons_dict 中新增对到 electrons 中"""
"""同步 electrons_dict 中新增对到 electrons 中, 仅用于缺省初始化不存在映射时调用"""
self.electrons = self.electrons_dict.values()
def __len__(self):
return len(self.electrons)
def linked_nucleon_union(self):
return NucleonUnion(self.path.parent / '..' / 'nucleon' / self.path.name)
def save(self):
# print(1)
self.total["last_date"] = timer.get_daystamp()
with open(self.path, 'w') as f:
tmp = {i.content: i.metadata for i in self.electrons}
tmp["total"] = self.total
# print(tmp)
toml.dump(tmp, f)
"""
class AtomicFile():
def __init__(self, path, type_="unknown"):
self.path = path
self.type_ = type_
if type_ == "nucleon":
self.name, self.datalist = Nucleon.import_from_file(pathlib.Path(path))
if type_ == "electron":
self.name, self.datalist = Electron.import_from_file(pathlib.Path(path))
def save(self):
dictobj = {i.content: i.export_data() for i in self.datalist}
print(dictobj)
if self.type_ == "nucleon":
Nucleon.save_to_file(dictobj, self.path)
if self.type_ == "electron":
Electron.save_to_file(dictobj, self.path)
def get_full_content(self):
if self.type_ == "nucleon":
text = ""
for i in self.datalist:
text += i.content
return text
return ""
def get_len(self):
return len(self.datalist)
"""
class Atom:
@staticmethod
def placeholder():

View File

@@ -0,0 +1,126 @@
#!/usr/bin/env python3
import pathlib
import toml
import time
import heurams.services.timer as timer
class Electron:
"""电子: 记忆分析元数据及算法"""
algorithm = "SM-2" # 暂时使用 SM-2 算法进行记忆拟合, 考虑 SM-15 替代
def __init__(self, content: str, metadata: dict):
self.content = content
self.metadata = metadata
if metadata == {}:
# print("NULL")
self._default_init()
def _default_init(self):
defaults = {
'efactor': 2.5, # 易度系数, 越大越简单, 最大为5
'real_rept': 0, # (实际)重复次数
'rept': 0, # (有效)重复次数
'interval': 0, # 最佳间隔
'last_date': 0, # 上一次复习的时间戳
'next_date': 0, # 将要复习的时间戳
'is_activated': 0, # 激活状态
# *NOTE: 此处"时间戳"是以天为单位的整数, 即 UNIX 时间戳除以一天的秒数取整
'last_modify': time.time() # 最后修改时间戳(此处是UNIX时间戳)
}
self.metadata = defaults
def activate(self):
self.metadata['is_activated'] = 1
self.metadata['last_modify'] = time.time()
def modify(self, var: str, value):
if var in self.metadata:
self.metadata[var] = value
self.metadata['last_modify'] = time.time()
else:
print(f"警告: '{var}' 非已知元数据字段")
def revisor(self, quality: int = 5, is_new_activation: bool = False):
"""SM-2 算法迭代决策机制实现
根据 quality(0 ~ 5) 进行参数迭代最佳间隔
quality 由主程序评估
Args:
quality (int): 记忆保留率量化参数
"""
print(f"REVISOR: {quality}, {is_new_activation}")
if quality == -1:
return -1
self.metadata['efactor'] = self.metadata['efactor'] + (
0.1 - (5 - quality) * (0.08 + (5 - quality) * 0.02)
)
self.metadata['efactor'] = max(1.3, self.metadata['efactor'])
if quality < 3:
# 若保留率低于 3重置重复次数
self.metadata['rept'] = 0
self.metadata['interval'] = 0 # 设为0以便下面重新计算 I(1)
else:
self.metadata['rept'] += 1
self.metadata['real_rept'] += 1
if is_new_activation: # 初次激活
self.metadata['rept'] = 0
self.metadata['efactor'] = 2.5
if self.metadata['rept'] == 0: # 刚被重置或初次激活后复习
self.metadata['interval'] = 1 # I(1)
elif self.metadata['rept'] == 1:
self.metadata['interval'] = 6 # I(2) 经验公式
else:
self.metadata['interval'] = round(
self.metadata['interval'] * self.metadata['efactor']
)
self.metadata['last_date'] = timer.get_daystamp()
self.metadata['next_date'] = timer.get_daystamp() + self.metadata['interval']
self.metadata['last_modify'] = time.time()
def __str__(self):
return (
f"记忆单元预览 \n"
f"内容: '{self.content}' \n"
f"易度系数: {self.metadata['efactor']:.2f} \n"
f"已经重复的次数: {self.metadata['rept']} \n"
f"下次间隔: {self.metadata['interval']}\n"
f"下次复习日期时间戳: {self.metadata['next_date']}"
)
def __eq__(self, other):
if self.content == other.content:
return True
return False
def __hash__(self):
return hash(self.content)
def __getitem__(self, key):
if key == "content":
return self.content
if key in self.metadata:
return self.metadata[key]
else:
raise KeyError(f"Key '{key}' not found in metadata.")
def __setitem__(self, key, value):
if key == "content":
raise AttributeError("content 应为只读")
self.metadata[key] = value
self.metadata['last_modify'] = time.time()
def __iter__(self):
yield from self.metadata.keys()
def __len__(self):
return len(self.metadata)
@staticmethod
def placeholder():
return Electron("电子对象样例内容", {})

View File

@@ -0,0 +1 @@
# 音频服务

View File

@@ -0,0 +1 @@
# 缓存服务

View File

@@ -1,10 +1,7 @@
import time
# 配置文件服务
import pathlib
import toml
import typing
import playsound
import threading
import edge_tts as tts
class ConfigFile:
def __init__(self, path: str):
@@ -36,25 +33,3 @@ class ConfigFile:
def get(self, key: str, default: typing.Any = None) -> typing.Any:
"""获取配置值,如果不存在返回默认值"""
return self.data.get(key, default)
def action_play_voice(content):
def play():
communicate = tts.Communicate(
content,
"zh-CN-YunjianNeural",
)
communicate.save_sync(
f"./cache/voice/{content}"
)
playsound()
threading.Thread(target=play).start()
def get_daystamp() -> int:
"""获取当前日戳(以天为单位的整数时间戳)"""
config = ConfigFile("config.toml")
time_override = config.get("time_override", -1)
if time_override != -1:
return int(time_override)
return int(time.time() // (24 * 3600))

View File

@@ -0,0 +1,5 @@
# 哈希服务
import hashlib
def get_md5(text):
return hashlib.md5(text.encode('utf-8')).hexdigest()

View File

@@ -0,0 +1,20 @@
# 时间服务
from heurams.context import config_var
import time
def get_daystamp() -> int:
"""获取当前日戳(以天为单位的整数时间戳)"""
time_override = config_var.get().get("daystamp_override", -1)
if time_override != -1:
return int(time_override)
return int((time.time() + config_var.get().get("timezone_offset")) // (24 * 3600))
def get_timestamp() -> float:
"""获取 UNIX 时间戳"""
# 搞这个类的原因是要支持可复现操作
time_override = config_var.get().get("timestamp_override", -1)
if time_override != -1:
return float(time_override)
return time.time()

View File

@@ -0,0 +1 @@
# 文本转语音服务

View File

@@ -0,0 +1 @@
# 版本控制集成服务

View File

@@ -1,62 +0,0 @@
Screen {
align: center bottom;
}
#main_container {
align: center middle;
width: 95%;
height: auto;
border: thick $primary-lighten-2;
padding: 2;
}
#vice_container {
align: center middle;
width: 95%;
height: auto;
padding: 2;
}
#sentence {
content-align: center middle;
width: 100%;
height: 5;
margin-bottom: 2;
text-style: bold;
}
#progress {
width: 100%;
content-align: center middle;
margin-bottom: 1;
color: $text-muted;
}
Button {
align-horizontal: center;
width: 50%;
margin: 0 0;
}
.choice {
align-horizontal: center;
width: 50%;
margin: 0 0;
height: auto;
}
/* no_margin.tcss */
#button_container {
align-horizontal: center;
height: 9;
}
/* 选中 #button_container 下所有的 Horizontal 子元素 */
#button_container > Horizontal {
margin-top: 0;
margin-bottom: 0;
align-horizontal: center;
width: 40%;
}

View File

@@ -1,45 +0,0 @@
import random
class BlankPuzzle():
"""填空题谜题生成器
Args:
text: 原始字符串(需要 "/" 分割句子, 末尾应有 "/")
min_denominator: 最小概率倒数(如占所有可生成填空数的 1/7 中的 7, 若期望值小于 1, 则取 1)
"""
def __init__(self, text, min_denominator):
self.text = text
self.min_denominator = min_denominator
self.wording = "填空题 - 尚未刷新谜题"
self.answer = ["填空题 - 尚未刷新谜题"]
def refresh(self): # 刷新谜题
placeholder = "___SLASH___"
tmp_text = self.text.replace("/", placeholder)
words = tmp_text.split(placeholder)
if not words:
return ""
words = [word for word in words if word]
num_blanks = min(max(1, len(words) // self.min_denominator), len(words))
indices_to_blank = random.sample(range(len(words)), num_blanks)
indices_to_blank.sort()
blanked_words = list(words)
answer = list()
for index in indices_to_blank:
blanked_words[index] = "__" * len(words[index])
answer.append(words[index])
result = []
for word in blanked_words:
result.append(word)
self.answer = answer
self.wording = "".join(result)
def __str__(self):
return f"{self.wording}\n{str(self.answer)}"
# demo
text = """我联合国人民/同兹/决心/: /欲免/后世/再遭/今代人类/两度/身历/惨不堪言/之战祸/.../"""
riddle = BlankPuzzle(text, 3)
print(riddle)
riddle.refresh()
print(riddle)

View File

@@ -1,45 +0,0 @@
import random
class SelectionPuzzle():
"""选择题谜题生成器
Args:
text: 原始字符串(需要 "/" 分割句子, 末尾应有 "/")
min_denominator: 最小概率倒数(如占所有可生成填空数的 1/7 中的 7, 若期望值小于 1, 则取 1)
"""
def __init__(self, prefix_text, origin_dict, min_denominator):
self.text = text
self.min_denominator = min_denominator
self.wording = "填空题 - 尚未刷新谜题"
self.answer = ["填空题 - 尚未刷新谜题"]
def refresh(self): # 刷新谜题
placeholder = "___SLASH___"
tmp_text = self.text.replace("/", placeholder)
words = tmp_text.split(placeholder)
if not words:
return ""
words = [word for word in words if word]
num_blanks = min(max(1, len(words) // self.min_denominator), len(words))
indices_to_blank = random.sample(range(len(words)), num_blanks)
indices_to_blank.sort()
blanked_words = list(words)
answer = list()
for index in indices_to_blank:
blanked_words[index] = "__" * len(words[index])
answer.append(words[index])
result = []
for word in blanked_words:
result.append(word)
self.answer = answer
self.wording = "".join(result)
def __str__(self):
return f"{self.wording}\n{str(self.answer)}"
# demo
text = """我联合国人民/同兹/决心/: /欲免/后世/再遭/今代人类/两度/身历/惨不堪言/之战祸/.../"""
riddle = BlankPuzzle(text, 3)
print(riddle)
riddle.refresh()
print(riddle)

View File

@@ -1,325 +0,0 @@
"""
An encoding / decoding format suitable for serializing data structures to binary.
This is based on https://en.wikipedia.org/wiki/Bencode with some extensions.
The following data types may be encoded:
- None
- int
- bool
- bytes
- str
- list
- tuple
- dict
"""
from __future__ import annotations
from typing import Any, Callable
class DecodeError(Exception):
"""A problem decoding data."""
def dump(data: object) -> bytes:
"""Encodes a data structure in to bytes.
Args:
data: Data structure
Returns:
A byte string encoding the data.
"""
def encode_none(_datum: None) -> bytes:
"""
Encodes a None value.
Args:
datum: Always None.
Returns:
None encoded.
"""
return b"N"
def encode_bool(datum: bool) -> bytes:
"""
Encode a boolean value.
Args:
datum: The boolean value to encode.
Returns:
The encoded bytes.
"""
return b"T" if datum else b"F"
def encode_int(datum: int) -> bytes:
"""
Encode an integer value.
Args:
datum: The integer value to encode.
Returns:
The encoded bytes.
"""
return b"i%ie" % datum
def encode_bytes(datum: bytes) -> bytes:
"""
Encode a bytes value.
Args:
datum: The bytes value to encode.
Returns:
The encoded bytes.
"""
return b"%i:%s" % (len(datum), datum)
def encode_string(datum: str) -> bytes:
"""
Encode a string value.
Args:
datum: The string value to encode.
Returns:
The encoded bytes.
"""
encoded_data = datum.encode("utf-8")
return b"s%i:%s" % (len(encoded_data), encoded_data)
def encode_list(datum: list) -> bytes:
"""
Encode a list value.
Args:
datum: The list value to encode.
Returns:
The encoded bytes.
"""
return b"l%se" % b"".join(encode(element) for element in datum)
def encode_tuple(datum: tuple) -> bytes:
"""
Encode a tuple value.
Args:
datum: The tuple value to encode.
Returns:
The encoded bytes.
"""
return b"t%se" % b"".join(encode(element) for element in datum)
def encode_dict(datum: dict) -> bytes:
"""
Encode a dictionary value.
Args:
datum: The dictionary value to encode.
Returns:
The encoded bytes.
"""
return b"d%se" % b"".join(
b"%s%s" % (encode(key), encode(value)) for key, value in datum.items()
)
ENCODERS: dict[type, Callable[[Any], Any]] = {
type(None): encode_none,
bool: encode_bool,
int: encode_int,
bytes: encode_bytes,
str: encode_string,
list: encode_list,
tuple: encode_tuple,
dict: encode_dict,
}
def encode(datum: object) -> bytes:
"""Recursively encode data.
Args:
datum: Data suitable for encoding.
Raises:
TypeError: If `datum` is not one of the supported types.
Returns:
Encoded data bytes.
"""
try:
decoder = ENCODERS[type(datum)]
except KeyError:
raise TypeError("Can't encode {datum!r}") from None
return decoder(datum)
return encode(data)
def load(encoded: bytes) -> object:
"""Load an encoded data structure from bytes.
Args:
encoded: Encoded data in bytes.
Raises:
DecodeError: If an error was encountered decoding the string.
Returns:
Decoded data.
"""
if not isinstance(encoded, bytes):
raise TypeError("must be bytes")
max_position = len(encoded)
position = 0
def get_byte() -> bytes:
"""Get an encoded byte and advance position.
Raises:
DecodeError: If the end of the data was reached
Returns:
A bytes object with a single byte.
"""
nonlocal position
if position >= max_position:
raise DecodeError("More data expected")
character = encoded[position : position + 1]
position += 1
return character
def peek_byte() -> bytes:
"""Get the byte at the current position, but don't advance position.
Returns:
A bytes object with a single byte.
"""
return encoded[position : position + 1]
def get_bytes(size: int) -> bytes:
"""Get a number of bytes of encode data.
Args:
size: Number of bytes to retrieve.
Raises:
DecodeError: If there aren't enough bytes.
Returns:
A bytes object.
"""
nonlocal position
bytes_data = encoded[position : position + size]
if len(bytes_data) != size:
raise DecodeError(b"Missing bytes in {bytes_data!r}")
position += size
return bytes_data
def decode_int() -> int:
"""Decode an int from the encoded data.
Returns:
An integer.
"""
int_bytes = b""
while (byte := get_byte()) != b"e":
int_bytes += byte
return int(int_bytes)
def decode_bytes(size_bytes: bytes) -> bytes:
"""Decode a bytes string from the encoded data.
Returns:
A bytes object.
"""
while (byte := get_byte()) != b":":
size_bytes += byte
bytes_string = get_bytes(int(size_bytes))
return bytes_string
def decode_string() -> str:
"""Decode a (utf-8 encoded) string from the encoded data.
Returns:
A string.
"""
size_bytes = b""
while (byte := get_byte()) != b":":
size_bytes += byte
bytes_string = get_bytes(int(size_bytes))
decoded_string = bytes_string.decode("utf-8", errors="replace")
return decoded_string
def decode_list() -> list[object]:
"""Decode a list.
Returns:
A list of data.
"""
elements: list[object] = []
add_element = elements.append
while peek_byte() != b"e":
add_element(decode())
get_byte()
return elements
def decode_tuple() -> tuple[object, ...]:
"""Decode a tuple.
Returns:
A tuple of decoded data.
"""
elements: list[object] = []
add_element = elements.append
while peek_byte() != b"e":
add_element(decode())
get_byte()
return tuple(elements)
def decode_dict() -> dict[object, object]:
"""Decode a dict.
Returns:
A dict of decoded data.
"""
elements: dict[object, object] = {}
add_element = elements.__setitem__
while peek_byte() != b"e":
add_element(decode(), decode())
get_byte()
return elements
DECODERS = {
b"i": decode_int,
b"s": decode_string,
b"l": decode_list,
b"t": decode_tuple,
b"d": decode_dict,
b"T": lambda: True,
b"F": lambda: False,
b"N": lambda: None,
}
def decode() -> object:
"""Recursively decode data.
Returns:
Decoded data.
"""
decoder = DECODERS.get(initial := get_byte(), None)
if decoder is None:
return decode_bytes(initial)
return decoder()
return decode()

View File

@@ -1,349 +0,0 @@
from __future__ import annotations
from pathlib import Path
import asyncio
import io
import json
import os
from typing import Awaitable, Callable, Literal
from asyncio.subprocess import Process
import logging
from importlib.metadata import version
import uuid
from webshare.download_manager import DownloadManager
from webshare._binary_encode import load as binary_load
log = logging.getLogger("textual-serve")
class AppService:
"""Creates and manages a single Textual app subprocess.
When a user connects to the websocket in their browser, a new AppService
instance is created to manage the corresponding Textual app process.
"""
def __init__(
self,
command: str,
*,
write_bytes: Callable[[bytes], Awaitable[None]],
write_str: Callable[[str], Awaitable[None]],
close: Callable[[], Awaitable[None]],
download_manager: DownloadManager,
debug: bool = False,
) -> None:
self.app_service_id: str = uuid.uuid4().hex
"""The unique ID of this running app service."""
self.command = command
"""The command to launch the Textual app subprocess."""
self.remote_write_bytes = write_bytes
"""Write bytes to the client browser websocket."""
self.remote_write_str = write_str
"""Write string to the client browser websocket."""
self.remote_close = close
"""Close the client browser websocket."""
self.debug = debug
"""Enable/disable debug mode."""
self._process: Process | None = None
self._task: asyncio.Task[None] | None = None
self._stdin: asyncio.StreamWriter | None = None
self._exit_event = asyncio.Event()
self._download_manager = download_manager
@property
def stdin(self) -> asyncio.StreamWriter:
"""The processes standard input stream."""
assert self._stdin is not None
return self._stdin
def _build_environment(self, width: int = 80, height: int = 24) -> dict[str, str]:
"""Build an environment dict for the App subprocess.
Args:
width: Initial width.
height: Initial height.
Returns:
A environment dict.
"""
environment = dict(os.environ.copy())
environment["TEXTUAL_DRIVER"] = "textual.drivers.web_driver:WebDriver"
environment["TEXTUAL_FPS"] = "60"
environment["TEXTUAL_COLOR_SYSTEM"] = "truecolor"
environment["TERM_PROGRAM"] = "textual"
environment["TERM_PROGRAM_VERSION"] = version("textual-serve")
environment["COLUMNS"] = str(width)
environment["ROWS"] = str(height)
if self.debug:
environment["TEXTUAL"] = "debug,devtools"
environment["TEXTUAL_LOG"] = "textual.log"
return environment
async def _open_app_process(self, width: int = 80, height: int = 24) -> Process:
"""Open a process to run the app.
Args:
width: Width of the terminal.
height: height of the terminal.
"""
environment = self._build_environment(width=width, height=height)
self._process = process = await asyncio.create_subprocess_shell(
self.command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=environment,
)
assert process.stdin is not None
self._stdin = process.stdin
return process
@classmethod
def encode_packet(cls, packet_type: Literal[b"D", b"M"], payload: bytes) -> bytes:
"""Encode a packet.
Args:
packet_type: The packet type (b"D" for data or b"M" for meta)
payload: The payload.
Returns:
Data as bytes.
"""
return b"%s%s%s" % (packet_type, len(payload).to_bytes(4, "big"), payload)
async def send_bytes(self, data: bytes) -> bool:
"""Send bytes to process.
Args:
data: Data to send.
Returns:
True if the data was sent, otherwise False.
"""
stdin = self.stdin
try:
stdin.write(self.encode_packet(b"D", data))
except RuntimeError:
return False
try:
await stdin.drain()
except Exception:
return False
return True
async def send_meta(self, data: dict[str, str | None | int | bool]) -> bool:
"""Send meta information to process.
Args:
data: Meta dict to send.
Returns:
True if the data was sent, otherwise False.
"""
stdin = self.stdin
data_bytes = json.dumps(data).encode("utf-8")
try:
stdin.write(self.encode_packet(b"M", data_bytes))
except RuntimeError:
return False
try:
await stdin.drain()
except Exception:
return False
return True
async def set_terminal_size(self, width: int, height: int) -> None:
"""Tell the process about the new terminal size.
Args:
width: Width of terminal in cells.
height: Height of terminal in cells.
"""
await self.send_meta(
{
"type": "resize",
"width": width,
"height": height,
}
)
async def blur(self) -> None:
"""Send an (app) blur to the process."""
await self.send_meta({"type": "blur"})
async def focus(self) -> None:
"""Send an (app) focus to the process."""
await self.send_meta({"type": "focus"})
async def start(self, width: int, height: int) -> None:
await self._open_app_process(width, height)
self._task = asyncio.create_task(self.run())
async def stop(self) -> None:
"""Stop the process and wait for it to complete."""
if self._task is not None:
await self._download_manager.cancel_app_downloads(
app_service_id=self.app_service_id
)
await self.send_meta({"type": "quit"})
await self._task
self._task = None
async def run(self) -> None:
"""Run the Textual app process.
!!! note
Do not call this manually, use `start`.
"""
META = b"M"
DATA = b"D"
PACKED = b"P"
assert self._process is not None
process = self._process
stdout = process.stdout
stderr = process.stderr
assert stdout is not None
assert stderr is not None
stderr_data = io.BytesIO()
async def read_stderr() -> None:
"""Task to read stderr."""
try:
while True:
data = await stderr.read(1024 * 4)
if not data:
break
stderr_data.write(data)
except asyncio.CancelledError:
pass
stderr_task = asyncio.create_task(read_stderr())
try:
ready = False
# Wait for prelude text, so we know it is a Textual app
for _ in range(10):
if not (line := await stdout.readline()):
break
if line == b"__GANGLION__\n":
ready = True
break
if not ready:
log.error("Application failed to start")
if error_text := stderr_data.getvalue():
import sys
sys.stdout.write(error_text.decode("utf-8", "replace"))
readexactly = stdout.readexactly
int_from_bytes = int.from_bytes
while True:
type_bytes = await readexactly(1)
size_bytes = await readexactly(4)
size = int_from_bytes(size_bytes, "big")
payload = await readexactly(size)
if type_bytes == DATA:
await self.on_data(payload)
elif type_bytes == META:
await self.on_meta(payload)
elif type_bytes == PACKED:
await self.on_packed(payload)
except asyncio.IncompleteReadError:
pass
except ConnectionResetError:
pass
except asyncio.CancelledError:
pass
finally:
stderr_task.cancel()
await stderr_task
if error_text := stderr_data.getvalue():
import sys
sys.stdout.write(error_text.decode("utf-8", "replace"))
async def on_data(self, payload: bytes) -> None:
"""Called when there is data.
Args:
payload: Data received from process.
"""
await self.remote_write_bytes(payload)
async def on_meta(self, data: bytes) -> None:
"""Called when there is a meta packet sent from the running app process.
Args:
data: Encoded meta data.
"""
meta_data: dict[str, object] = json.loads(data)
meta_type = meta_data["type"]
if meta_type == "exit":
await self.remote_close()
elif meta_type == "open_url":
payload = json.dumps(
[
"open_url",
{
"url": meta_data["url"],
"new_tab": meta_data["new_tab"],
},
]
)
await self.remote_write_str(payload)
elif meta_type == "deliver_file_start":
log.debug("deliver_file_start, %s", meta_data)
try:
# Record this delivery key as available for download.
delivery_key = str(meta_data["key"])
await self._download_manager.create_download(
app_service=self,
delivery_key=delivery_key,
file_name=Path(meta_data["path"]).name,
open_method=meta_data["open_method"],
mime_type=meta_data["mime_type"],
encoding=meta_data["encoding"],
name=meta_data.get("name", None),
)
except KeyError:
log.error("Missing key in `deliver_file_start` meta packet")
return
else:
# Tell the browser front-end about the new delivery key,
# so that it may hit the "/download/{key}" endpoint
# to start the download.
json_string = json.dumps(["deliver_file_start", delivery_key])
await self.remote_write_str(json_string)
else:
log.warning(
f"Unknown meta type: {meta_type!r}. You may need to update `textual-serve`."
)
async def on_packed(self, payload: bytes) -> None:
"""Called when there is a packed packet sent from the running app process.
Args:
payload: Encoded packed data.
"""
unpacked = binary_load(payload)
if unpacked[0] == "deliver_chunk":
# If we receive a chunk, hand it to the download manager to
# handle distribution to the browser.
_, delivery_key, chunk = unpacked
await self._download_manager.chunk_received(delivery_key, chunk)

View File

@@ -1,197 +0,0 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
import logging
from typing import AsyncGenerator, TYPE_CHECKING
if TYPE_CHECKING:
from webshare.app_service import AppService
log = logging.getLogger("textual-serve")
DOWNLOAD_TIMEOUT = 4
DOWNLOAD_CHUNK_SIZE = 1024 * 64 # 64 KB
@dataclass
class Download:
app_service: "AppService"
"""The app service that the download is associated with."""
delivery_key: str
"""Key which identifies the download."""
file_name: str
"""The name of the file to download. This will be used to set
the Content-Disposition filename."""
open_method: str
"""The method to open the file with. "browser" or "download"."""
mime_type: str
"""The mime type of the content."""
encoding: str | None = None
"""The encoding of the content.
Will be None if the content is binary.
"""
name: str | None = None
"""Optional name set bt the client."""
incoming_chunks: asyncio.Queue[bytes | None] = field(default_factory=asyncio.Queue)
"""A queue of incoming chunks for the download.
Chunks are sent from the app service to the download handler
via this queue."""
class DownloadManager:
"""Class which manages downloads for the server.
Serves as the link between the web server and app processes during downloads.
A single server has a single download manager, which manages all downloads for all
running app processes.
"""
def __init__(self) -> None:
self._active_downloads: dict[str, Download] = {}
"""A dictionary of active downloads.
When a delivery key is received in a meta packet, it is added to this set.
When the user hits the "/download/{key}" endpoint, we ensure the key is in
this set and start the download by requesting chunks from the app process.
When the download is complete, the app process sends a "deliver_file_end"
meta packet, and we remove the key from this set.
"""
async def create_download(
self,
*,
app_service: "AppService",
delivery_key: str,
file_name: str,
open_method: str,
mime_type: str,
encoding: str | None = None,
name: str | None = None,
) -> None:
"""Prepare for a new download.
Args:
app_service: The app service to start the download for.
delivery_key: The delivery key to start the download for.
file_name: The name of the file to download.
open_method: The method to open the file with.
mime_type: The mime type of the content.
encoding: The encoding of the content or None if the content is binary.
"""
self._active_downloads[delivery_key] = Download(
app_service,
delivery_key,
file_name,
open_method,
mime_type,
encoding,
name=name,
)
async def download(self, delivery_key: str) -> AsyncGenerator[bytes, None]:
"""Download a file from the given app service.
Args:
delivery_key: The delivery key to download.
"""
app_service = await self._get_app_service(delivery_key)
download = self._active_downloads[delivery_key]
incoming_chunks = download.incoming_chunks
while True:
# Request a chunk from the app service.
send_result = await app_service.send_meta(
{
"type": "deliver_chunk_request",
"key": delivery_key,
"size": DOWNLOAD_CHUNK_SIZE,
"name": download.name,
}
)
if not send_result:
log.warning(
"Download {delivery_key!r} failed to request chunk from app service"
)
del self._active_downloads[delivery_key]
break
try:
chunk = await asyncio.wait_for(incoming_chunks.get(), DOWNLOAD_TIMEOUT)
except asyncio.TimeoutError:
log.warning(
"Download %r failed to receive chunk from app service within %r seconds",
delivery_key,
DOWNLOAD_TIMEOUT,
)
chunk = None
if not chunk:
# Empty chunk - the app process has finished sending the file
# or the download has been cancelled.
incoming_chunks.task_done()
del self._active_downloads[delivery_key]
break
else:
incoming_chunks.task_done()
yield chunk
async def chunk_received(self, delivery_key: str, chunk: bytes | str) -> None:
"""Handle a chunk received from the app service for a download.
Args:
delivery_key: The delivery key that the chunk was received for.
chunk: The chunk that was received.
"""
download = self._active_downloads.get(delivery_key)
if not download:
# The download may have been cancelled - e.g. the websocket
# was closed before the download could complete.
log.debug("Chunk received for cancelled download %r", delivery_key)
return
if isinstance(chunk, str):
chunk = chunk.encode(download.encoding or "utf-8")
await download.incoming_chunks.put(chunk)
async def _get_app_service(self, delivery_key: str) -> "AppService":
"""Get the app service that the given delivery key is linked to.
Args:
delivery_key: The delivery key to get the app service for.
"""
for key in self._active_downloads.keys():
if key == delivery_key:
return self._active_downloads[key].app_service
else:
raise ValueError(f"No active download for delivery key {delivery_key!r}")
async def get_download_metadata(self, delivery_key: str) -> Download:
"""Get the metadata for a download.
Args:
delivery_key: The delivery key to get the metadata for.
"""
return self._active_downloads[delivery_key]
async def cancel_app_downloads(self, app_service_id: str) -> None:
"""Cancel all downloads for the given app service.
Args:
app_service_id: The app service ID to cancel downloads for.
"""
for download in self._active_downloads.values():
if download.app_service.app_service_id == app_service_id:
await download.incoming_chunks.put(None)

View File

View File

@@ -1,350 +0,0 @@
from __future__ import annotations
import asyncio
import logging
import os
from pathlib import Path
import signal
import sys
from typing import Any
import aiohttp_jinja2
from aiohttp import web
from aiohttp import WSMsgType
from aiohttp.web_runner import GracefulExit
import jinja2
from importlib.metadata import version
from rich.console import Console
from rich.logging import RichHandler
from rich.highlighter import RegexHighlighter
from webshare.download_manager import DownloadManager
from .app_service import AppService
log = logging.getLogger("textual-serve")
LOGO = r"""[bold magenta]___ ____ _ _ ___ _ _ ____ _ ____ ____ ____ _ _ ____
| |___ \/ | | | |__| | __ [__ |___ |__/ | | |___
| |___ _/\_ | |__| | | |___ ___] |___ | \ \/ |___ [not bold]VVVVV
""".replace("VVVVV", f"v{version('textual-serve')}")
WINDOWS = sys.platform == "WINDOWS"
class LogHighlighter(RegexHighlighter):
base_style = "repr."
highlights = [
r"(?P<number>(?<!\w)\-?[0-9]+\.?[0-9]*(e[-+]?\d+?)?\b|0x[0-9a-fA-F]*)",
r"(?P<path>\[.*?\])",
r"(?<![\\\w])(?P<str>b?'''.*?(?<!\\)'''|b?'.*?(?<!\\)'|b?\"\"\".*?(?<!\\)\"\"\"|b?\".*?(?<!\\)\")",
]
def to_int(value: str, default: int) -> int:
"""Convert to an integer, or return a default if that's not possible.
Args:
number: A string possibly containing a decimal.
default: Default value if value can't be decoded.
Returns:
Integer.
"""
try:
return int(value)
except ValueError:
return default
class Server:
"""Serve a Textual app."""
def __init__(
self,
command: str,
host: str = "localhost",
port: int = 8000,
title: str | None = None,
public_url: str | None = None,
statics_path: str | os.PathLike = "./static",
templates_path: str | os.PathLike = "./templates",
):
"""
Args:
app_factory: A callable that returns a new App instance.
host: Host of web application.
port: Port for server.
statics_path: Path to statics folder. May be absolute or relative to server.py.
templates_path" Path to templates folder. May be absolute or relative to server.py.
"""
self.command = command
self.host = host
self.port = port
self.title = title or command
self.debug = False
if public_url is None:
if self.port == 80:
self.public_url = f"http://{self.host}"
elif self.port == 443:
self.public_url = f"https://{self.host}"
else:
self.public_url = f"http://{self.host}:{self.port}"
else:
self.public_url = public_url
base_path = (Path(__file__) / "../").resolve().absolute()
self.statics_path = base_path / statics_path
self.templates_path = base_path / templates_path
self.console = Console()
self.download_manager = DownloadManager()
def initialize_logging(self) -> None:
"""Initialize logging.
May be overridden in a subclass.
"""
FORMAT = "%(message)s"
logging.basicConfig(
level="DEBUG" if self.debug else "INFO",
format=FORMAT,
datefmt="[%X]",
handlers=[
RichHandler(
show_path=False,
show_time=False,
rich_tracebacks=True,
tracebacks_show_locals=True,
highlighter=LogHighlighter(),
console=self.console,
)
],
)
def request_exit(self) -> None:
"""Gracefully exit the app."""
raise GracefulExit()
async def _make_app(self) -> web.Application:
"""Make the aiohttp web.Application.
Returns:
New aiohttp web application.
"""
app = web.Application()
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader(self.templates_path))
ROUTES = [
web.get("/", self.handle_index, name="index"),
web.get("/ws", self.handle_websocket, name="websocket"),
web.get("/download/{key}", self.handle_download, name="download"),
web.static("/static", self.statics_path, show_index=True, name="static"),
]
app.add_routes(ROUTES)
app.on_startup.append(self.on_startup)
app.on_shutdown.append(self.on_shutdown)
return app
async def handle_download(self, request: web.Request) -> web.StreamResponse:
"""Handle a download request."""
key = request.match_info["key"]
try:
download_meta = await self.download_manager.get_download_metadata(key)
except KeyError:
raise web.HTTPNotFound(text=f"Download with key {key!r} not found")
response = web.StreamResponse()
mime_type = download_meta.mime_type
content_type = mime_type
if download_meta.encoding:
content_type += f"; charset={download_meta.encoding}"
response.headers["Content-Type"] = content_type
disposition = (
"inline" if download_meta.open_method == "browser" else "attachment"
)
response.headers["Content-Disposition"] = (
f"{disposition}; filename={download_meta.file_name}"
)
await response.prepare(request)
async for chunk in self.download_manager.download(key):
await response.write(chunk)
await response.write_eof()
return response
async def on_shutdown(self, app: web.Application) -> None:
"""Called on shutdown.
Args:
app: App instance.
"""
async def on_startup(self, app: web.Application) -> None:
"""Called on startup.
Args:
app: App instance.
"""
self.console.print(LOGO, highlight=False)
self.console.print(f"Serving {self.command!r} on {self.public_url}")
self.console.print("\n[cyan]Press Ctrl+C to quit")
def serve(self, debug: bool = False) -> None:
"""Serve the Textual application.
This will run a local webserver until it is closed with Ctrl+C
"""
self.debug = debug
self.initialize_logging()
loop = asyncio.get_event_loop()
try:
loop.add_signal_handler(signal.SIGINT, self.request_exit)
loop.add_signal_handler(signal.SIGTERM, self.request_exit)
except NotImplementedError:
pass
if self.debug:
log.info("Running in debug mode. You may use textual dev tools.")
web.run_app(
self._make_app(),
host=self.host,
port=self.port,
handle_signals=False,
loop=loop,
print=lambda *args: None,
)
@aiohttp_jinja2.template("app_index.html")
async def handle_index(self, request: web.Request) -> dict[str, Any]:
"""Serves the HTML for an app.
Args:
request: Request object.
Returns:
Template data.
"""
router = request.app.router
font_size = to_int(request.query.get("fontsize", "16"), 16)
def get_url(route: str, **args) -> str:
"""Get a URL from the aiohttp router."""
path = router[route].url_for(**args)
return f"{self.public_url}{path}"
def get_websocket_url(route: str, **args) -> str:
"""Get a URL with a websocket prefix."""
url = get_url(route, **args)
if self.public_url.startswith("https"):
return "wss:" + url.split(":", 1)[1]
else:
return "ws:" + url.split(":", 1)[1]
context = {
"font_size": font_size,
"app_websocket_url": get_websocket_url("websocket"),
}
context["config"] = {
"static": {
"url": get_url("static", filename="/").rstrip("/") + "/",
},
}
context["application"] = {
"name": self.title,
}
return context
async def _process_messages(
self, websocket: web.WebSocketResponse, app_service: AppService
) -> None:
"""Process messages from the client browser websocket.
Args:
websocket: Websocket instance.
app_service: App service.
"""
TEXT = WSMsgType.TEXT
async for message in websocket:
if message.type != TEXT:
continue
envelope = message.json()
assert isinstance(envelope, list)
type_ = envelope[0]
if type_ == "stdin":
data = envelope[1]
await app_service.send_bytes(data.encode("utf-8"))
elif type_ == "resize":
data = envelope[1]
await app_service.set_terminal_size(data["width"], data["height"])
elif type_ == "ping":
data = envelope[1]
await websocket.send_json(["pong", data])
elif type_ == "blur":
await app_service.blur()
elif type_ == "focus":
await app_service.focus()
async def handle_websocket(self, request: web.Request) -> web.WebSocketResponse:
"""Handle the websocket that drives the remote process.
This is called when the browser connects to the websocket.
Args:
request: Request object.
Returns:
Websocket response.
"""
websocket = web.WebSocketResponse(heartbeat=15)
width = to_int(request.query.get("width", "80"), 80)
height = to_int(request.query.get("height", "24"), 24)
app_service: AppService | None = None
try:
await websocket.prepare(request)
app_service = AppService(
self.command,
write_bytes=websocket.send_bytes,
write_str=websocket.send_str,
close=websocket.close,
download_manager=self.download_manager,
debug=self.debug,
)
await app_service.start(width, height)
try:
await self._process_messages(websocket, app_service)
finally:
await app_service.stop()
except asyncio.CancelledError:
await websocket.close()
except Exception as error:
log.exception(error)
finally:
if app_service is not None:
await app_service.stop()
return websocket

Some files were not shown because too many files have changed in this diff Show More