Compare commits
51 Commits
v0.3.1b
...
refactor/i
Author | SHA1 | Date | |
---|---|---|---|
aa99aa7686 | |||
6dac9d5a7c | |||
f19fcac6c4 | |||
ebeb62f72d | |||
d71d8eb1ec | |||
4ac12479cc | |||
7fc53e4113 | |||
06c62e284d | |||
d5d31eb5fe | |||
156f558a45 | |||
e96832a60c | |||
e1a5de74ad | |||
90339be30e | |||
4da80d26a3 | |||
cb062788a7 | |||
acfd179435 | |||
d1b606782f | |||
784cf41003 | |||
0f342c96ee | |||
3022ada51a | |||
9dada59c56 | |||
8683693cd1 | |||
144d7cb83c | |||
44573624aa | |||
1b33bb8618 | |||
d83025d818 | |||
f7e93cf05f | |||
e30cefeb44 | |||
7dc963d491 | |||
3640d8a799 | |||
1ea34ab87a | |||
2bedc686a5 | |||
19d0e32b6f | |||
50a5b9b108 | |||
5a096e4b4f | |||
65491117d3 | |||
c82eedde82 | |||
697e3b2b8f | |||
11eff7da43 | |||
c93bcdd489 | |||
bb99b0a0b7 | |||
6293b69ef0 | |||
afb7252f71 | |||
5e96fc8138 | |||
e64d1711d0 | |||
beeb2fd318 | |||
fc58f61dfe | |||
e11e7e781b | |||
39459a0f6e | |||
cccf7189e3 | |||
2c51f2cea3 |
217
.gitignore
vendored
217
.gitignore
vendored
@@ -1,6 +1,215 @@
|
||||
.vscode
|
||||
# Project specific additions
|
||||
.devflag
|
||||
.vscode/
|
||||
.directory
|
||||
__pycache__/
|
||||
scripts/
|
||||
.idea
|
||||
cache
|
||||
.idea/
|
||||
cache/
|
||||
nucleon/test.toml
|
||||
electron/test.toml
|
||||
*.egg-info/
|
||||
build/
|
||||
dist/
|
||||
|
||||
# Project specific directories
|
||||
config/
|
||||
data/cache/
|
||||
data/electrion/
|
||||
data/nucleon/
|
||||
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
# C extensions
|
||||
*.so
|
||||
|
||||
# Distribution / packaging
|
||||
.Python
|
||||
build/
|
||||
develop-eggs/
|
||||
dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
share/python-wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
MANIFEST
|
||||
|
||||
# PyInstaller
|
||||
# Usually these files are written by a python script from a template
|
||||
# before PyInstaller builds the exe, so as to inject date/other infos into it.
|
||||
*.manifest
|
||||
*.spec
|
||||
|
||||
# Installer logs
|
||||
pip-log.txt
|
||||
pip-delete-this-directory.txt
|
||||
|
||||
# Unit test / coverage reports
|
||||
htmlcov/
|
||||
.tox/
|
||||
.nox/
|
||||
.coverage
|
||||
.coverage.*
|
||||
.cache
|
||||
nosetests.xml
|
||||
coverage.xml
|
||||
*.cover
|
||||
*.py,cover
|
||||
.hypothesis/
|
||||
.pytest_cache/
|
||||
cover/
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
*.pot
|
||||
|
||||
# Django stuff:
|
||||
*.log
|
||||
local_settings.py
|
||||
db.sqlite3
|
||||
db.sqlite3-journal
|
||||
|
||||
# Flask stuff:
|
||||
instance/
|
||||
.webassets-cache
|
||||
|
||||
# Scrapy stuff:
|
||||
.scrapy
|
||||
|
||||
# Sphinx documentation
|
||||
docs/_build/
|
||||
|
||||
# PyBuilder
|
||||
.pybuilder/
|
||||
target/
|
||||
|
||||
# Jupyter Notebook
|
||||
.ipynb_checkpoints
|
||||
|
||||
# IPython
|
||||
profile_default/
|
||||
ipython_config.py
|
||||
|
||||
# pyenv
|
||||
# For a library or package, you might want to ignore these files since the code is
|
||||
# intended to run in multiple environments; otherwise, check them in:
|
||||
# .python-version
|
||||
|
||||
# pipenv
|
||||
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
|
||||
# However, in case of collaboration, if having platform-specific dependencies or dependencies
|
||||
# having no cross-platform support, pipenv may install dependencies that don't work, or not
|
||||
# install all needed dependencies.
|
||||
#Pipfile.lock
|
||||
|
||||
# poetry
|
||||
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
|
||||
# This is especially recommended for binary packages to ensure reproducibility, and is more
|
||||
# commonly used for packaging.
|
||||
#poetry.lock
|
||||
#poetry.toml
|
||||
|
||||
# pdm
|
||||
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
|
||||
#pdm.lock
|
||||
#pdm.toml
|
||||
|
||||
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
|
||||
__pypackages__/
|
||||
|
||||
# Celery stuff
|
||||
celerybeat-schedule
|
||||
celerybeat.pid
|
||||
|
||||
# SageMath parsed files
|
||||
*.sage.py
|
||||
|
||||
# Environments
|
||||
.env
|
||||
.venv
|
||||
env/
|
||||
venv/
|
||||
ENV/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
|
||||
# Spyder project settings
|
||||
.spyderproject
|
||||
.spyproject
|
||||
|
||||
# Rope project settings
|
||||
.ropeproject
|
||||
|
||||
# mkdocs documentation
|
||||
/site
|
||||
|
||||
# mypy
|
||||
.mypy_cache/
|
||||
.dmypy.json
|
||||
dmypy.json
|
||||
|
||||
# Pyre type checker
|
||||
.pyre/
|
||||
|
||||
# pytype static type analyzer
|
||||
.pytype/
|
||||
|
||||
# Cython debug symbols
|
||||
cython_debug/
|
||||
|
||||
# PyCharm
|
||||
.idea/
|
||||
|
||||
# Audio cache and temporary files
|
||||
*.mp3
|
||||
*.wav
|
||||
*.ogg
|
||||
*.tmp
|
||||
|
||||
# LLM cache files
|
||||
*.cache
|
||||
*.jsonl
|
||||
|
||||
# Log files
|
||||
*.log
|
||||
logs/
|
||||
|
||||
# OS generated files
|
||||
.DS_Store
|
||||
.DS_Store?
|
||||
._*
|
||||
.Spotlight-V100
|
||||
.Trashes
|
||||
ehthumbs.db
|
||||
Thumbs.db
|
||||
|
||||
# Windows
|
||||
Thumbs.db
|
||||
ehthumbs.db
|
||||
Desktop.ini
|
||||
|
||||
# Linux
|
||||
*~
|
||||
|
||||
# VSCode
|
||||
.vscode/
|
||||
!.vscode/settings.json
|
||||
!.vscode/tasks.json
|
||||
!.vscode/launch.json
|
||||
!.vscode/extensions.json
|
||||
*.code-workspace
|
||||
|
||||
# Temporary files
|
||||
*.tmp
|
||||
*.temp
|
@@ -1,7 +0,0 @@
|
||||
# 贡献指南
|
||||
## 使用 Nuitka 静态编译
|
||||
运行
|
||||
|
||||
```bash
|
||||
nuitka --clang --jobs=6 --standalone --onefile main.py
|
||||
```
|
71
README.md
71
README.md
@@ -1,71 +0,0 @@
|
||||
# 潜进 (HeurAMS) - 启发式辅助记忆程序
|
||||
> 形人而我无形,**则我专而敌分**
|
||||
|
||||
## 概述
|
||||
|
||||
"潜进" (HeurAMS) 是为习题册, 古诗词, 及其他问答/记忆/理解型知识设计的辅助记忆软件, 提供动态规划的优化记忆方案
|
||||
|
||||
## 技术集成与特性
|
||||
|
||||
### 间隔迭代算法
|
||||
> 许多出版物都广泛讨论了不同重复间隔对学习效果的影响。特别是,间隔效应被认为是一种普遍现象。间隔效应是指,如果重复的间隔是分散/稀疏的,而不是集中重复,那么学习任务的表现会更好。因此,有观点提出,学习中使用的最佳重复间隔是**最长的、但不会导致遗忘的间隔**。
|
||||
- 采用经实证的 SM-2 间隔迭代算法, 此算法亦用作 Anki 闪卡记忆软件的默认闪卡调度器
|
||||
> 计划: 将添加 FSRS 算法 (Anki 的新可选闪卡调度器) 与一种 SM-15 变体算法作为后续替代
|
||||
> 参考 https://github.com/slaypni/SM-15
|
||||
> 使用 SM-15 的变体:
|
||||
> SM-2 后续算法并非完全开放, 故使用一种基于 SM-15 描述实现的变体算法
|
||||
- 动态规划每个记忆单元的记忆间隔时间表
|
||||
- 动态跟踪记忆反馈数据,优化长期记忆保留率与稳定性
|
||||
|
||||
### 学习进程优化
|
||||
- 逐字解析:支持逐字详细释义解析
|
||||
- 语法分析:接入生成式人工智能, 支持古文结构交互式解析
|
||||
- 自然语音:集成微软神经网络文本转语音 (TTS) 技术
|
||||
|
||||
### 现代用户界面
|
||||
|
||||
- 响应式 Textual 框架构建的跨平台 TUI 界面
|
||||
- 支持触屏/鼠标/键盘多操作模式
|
||||
- 简洁直观的复习流程设计
|
||||
|
||||
## 屏幕截图
|
||||
|
||||
> 单击图片以放大
|
||||
|
||||
<img src="./readme_src/img1.png" alt="img1" style="zoom: 33%;" />
|
||||
<img src="./readme_src/img2.png" alt="img2" style="zoom:33%;" />
|
||||
<img src="./readme_src/img3.png" alt="img3" style="zoom:33%;" />
|
||||
<img src="./readme_src/img4.png" alt="img4" style="zoom:33%;" />
|
||||
|
||||
## 技术架构
|
||||
|
||||
> 有关技术与实现的细节, 请参阅 CONTRIBUTING.md
|
||||
> 提交拉取请求以参与到此开放源代码项目
|
||||
|
||||
``` mermaid
|
||||
graph TD
|
||||
subgraph 后端
|
||||
A[SM-2 算法] --> B[间隔迭代算法]
|
||||
B --> C[迭代记忆参数]
|
||||
end
|
||||
|
||||
subgraph 用户界面
|
||||
D[展示模块] --> E[用户界面]
|
||||
E --> F[进度追踪面板]
|
||||
end
|
||||
|
||||
subgraph 外部服务
|
||||
G[LLM]
|
||||
H[TTS]
|
||||
end
|
||||
|
||||
C --> D
|
||||
F -->|用户数据| C
|
||||
D --> G
|
||||
D --> H
|
||||
```
|
||||
|
||||
## 系统要求
|
||||
|
||||
- 平台支持:Windows / macOS / Linux / Android (需要 Termux 或 Linux) (终端或浏览器)
|
||||
- 网络连接:可预缓存语音文件, 需联网使用大模型服务功能
|
297
compositions.py
297
compositions.py
@@ -1,297 +0,0 @@
|
||||
from textual.app import App, ComposeResult
|
||||
from textual.events import Event
|
||||
from textual.widgets import (
|
||||
Collapsible,
|
||||
Header,
|
||||
Footer,
|
||||
Markdown,
|
||||
ListView,
|
||||
ListItem,
|
||||
Label,
|
||||
Static,
|
||||
Button,
|
||||
)
|
||||
from textual.containers import Container, Horizontal, Center
|
||||
from textual.screen import Screen
|
||||
from textual.widget import Widget
|
||||
import uuid
|
||||
from typing import Tuple, Dict
|
||||
import particles as pt
|
||||
import puzzles as pz
|
||||
import re
|
||||
import random
|
||||
import copy
|
||||
|
||||
|
||||
class Composition:
|
||||
def __init__(
|
||||
self,
|
||||
screen: Screen,
|
||||
reactor,
|
||||
atom: Tuple[pt.Electron, pt.Nucleon, Dict] = pt.Atom.placeholder(),
|
||||
):
|
||||
self.screen = screen
|
||||
self.atom = atom
|
||||
from reactor import Reactor
|
||||
|
||||
self.reactor: Reactor = reactor
|
||||
self.reg = dict()
|
||||
|
||||
def regid(self, id_):
|
||||
self.reg[id_] = id_ + str(uuid.uuid4())
|
||||
return self.reg[id_]
|
||||
|
||||
def getid(self, id_):
|
||||
if id_ not in self.reg.keys():
|
||||
return "None"
|
||||
return self.reg[id_]
|
||||
|
||||
def recid(self, id_):
|
||||
return id_[:-36]
|
||||
|
||||
def compose(self):
|
||||
yield Label("示例标签", id="testlabel")
|
||||
yield Button("示例按钮", id="testbtn")
|
||||
|
||||
def handler(self, event, type_):
|
||||
return 1
|
||||
|
||||
|
||||
class Finished(Composition):
|
||||
def __init__(self, screen: Screen, reactor, atom: Tuple[pt.Electron, pt.Nucleon, Dict]):
|
||||
super().__init__(screen, reactor, atom)
|
||||
|
||||
def compose(self):
|
||||
yield Label("本次记忆进程结束", id=self.regid("msg"))
|
||||
|
||||
|
||||
class Placeholder(Composition):
|
||||
def __init__(self, screen: Screen):
|
||||
self.screen = screen
|
||||
|
||||
def compose(self):
|
||||
yield Label("示例标签", id="testlabel")
|
||||
yield Button("示例按钮", id="testbtn", classes="choice")
|
||||
|
||||
def handler(self, event, type_):
|
||||
self.screen.query_one("#testlabel", Label).update("hi")
|
||||
|
||||
|
||||
class Recognition(Composition):
|
||||
def __init__(self, screen: Screen, reactor, atom: Tuple[pt.Electron, pt.Nucleon, Dict]):
|
||||
super().__init__(screen, reactor, atom)
|
||||
|
||||
def compose(self):
|
||||
with Center():
|
||||
yield Static(f"[dim]{self.atom[1]['translation']}[/]")
|
||||
yield Label(f"")
|
||||
s = str(self.atom[1]["content"])
|
||||
replace_dict = {
|
||||
", ": ",",
|
||||
". ": ".",
|
||||
"; ": ";",
|
||||
": ": ":",
|
||||
"/,": ",",
|
||||
"./": ".",
|
||||
"/;": ";",
|
||||
";/": ";",
|
||||
":/": ":",
|
||||
}
|
||||
for old, new in replace_dict.items():
|
||||
s = s.replace(old, new)
|
||||
result = re.split(r"(?<=[,;:|])", s.replace("/", " "))
|
||||
for i in result:
|
||||
with Center():
|
||||
yield Label(
|
||||
f"[b][b]{i.replace('/', ' ')}[/][/]",
|
||||
id=self.regid("sentence" + str(hash(i))),
|
||||
)
|
||||
for i in self.atom[2]["testdata"]["additional_inf"]:
|
||||
if self.atom[1][i]:
|
||||
if isinstance(self.atom[1][i], list):
|
||||
for j in self.atom[1][i]:
|
||||
yield Markdown(f"### {self.atom[2]['keydata'][i]}: {j}")
|
||||
continue
|
||||
if isinstance(self.atom[1][i], Dict):
|
||||
t = ""
|
||||
for j, k in self.atom[1][i].items(): # type: ignore
|
||||
# 弱智的 Pylance 类型推导
|
||||
t += f"> **{j}**: {k} \n"
|
||||
yield Markdown(t, id=self.regid("tran"))
|
||||
with Center():
|
||||
yield Button("我已知晓", id=self.regid("ok"))
|
||||
|
||||
def handler(self, event, type_):
|
||||
if type_ == "button":
|
||||
if event.button.id == self.getid("ok"):
|
||||
self.reactor.report(self.atom, 5)
|
||||
return 0
|
||||
return -1
|
||||
|
||||
|
||||
class BasicEvaluation(Composition):
|
||||
def __init__(self, screen: Screen, reactor, atom: Tuple[pt.Electron, pt.Nucleon, Dict]):
|
||||
super().__init__(screen, reactor, atom)
|
||||
|
||||
def compose(self):
|
||||
yield Label(self.atom[1]["content"], id="sentence")
|
||||
with Container(id="button_container"):
|
||||
btn = {}
|
||||
btn["5"] = Button(
|
||||
"完美回想", variant="success", id=self.regid("feedback5"), classes="choice"
|
||||
)
|
||||
btn["4"] = Button(
|
||||
"犹豫后正确", variant="success", id=self.regid("feedback4"), classes="choice"
|
||||
)
|
||||
btn["3"] = Button(
|
||||
"困难地正确", variant="warning", id=self.regid("feedback3"), classes="choice"
|
||||
)
|
||||
btn["2"] = Button(
|
||||
"错误但熟悉", variant="warning", id=self.regid("feedback2"), classes="choice"
|
||||
)
|
||||
btn["1"] = Button(
|
||||
"错误且不熟", variant="error", id=self.regid("feedback1"), classes="choice"
|
||||
)
|
||||
btn["0"] = Button(
|
||||
"完全空白", variant="error", id=self.regid("feedback0"), classes="choice"
|
||||
)
|
||||
yield Horizontal(btn["5"], btn["4"])
|
||||
yield Horizontal(btn["3"], btn["2"])
|
||||
yield Horizontal(btn["1"], btn["0"])
|
||||
|
||||
def handler(self, event, type_):
|
||||
if "feedback" in event.button.id:
|
||||
assess = int(self.recid(event.button.id)[8:9])
|
||||
ret = self.reactor.report(self.atom, assess)
|
||||
return ret
|
||||
|
||||
|
||||
class FillBlank(Composition):
|
||||
def __init__(self, screen: Screen, reactor, atom: Tuple[pt.Electron, pt.Nucleon, Dict]):
|
||||
super().__init__(screen, reactor, atom)
|
||||
self.inputlist = []
|
||||
self.hashtable = {}
|
||||
self._work()
|
||||
|
||||
def _work(self):
|
||||
self.puzzle = pz.BlankPuzzle(self.atom[1]["content"], 4)
|
||||
self.puzzle.refresh()
|
||||
self.ans = copy.copy(self.puzzle.answer)
|
||||
random.shuffle(self.ans)
|
||||
|
||||
def compose(self):
|
||||
yield Label(self.puzzle.wording, id=self.regid("sentence"))
|
||||
yield Label(f"当前输入: {self.inputlist}", id=self.regid("inputpreview"))
|
||||
for i in self.ans:
|
||||
self.hashtable[str(hash(i))] = i
|
||||
yield Button(i, id=self.regid(f"select{hash(i)}"))
|
||||
yield Button("退格", id=self.regid(f"delete"))
|
||||
|
||||
def handler(self, event, type_):
|
||||
# TODO: 改动:在线错误纠正
|
||||
if type_ == "button":
|
||||
if self.recid(event.button.id) == "delete":
|
||||
if len(self.inputlist) > 0:
|
||||
self.inputlist.pop()
|
||||
else:
|
||||
return 1
|
||||
else:
|
||||
self.inputlist.append(self.hashtable[self.recid(event.button.id)[6:]])
|
||||
if len(self.inputlist) < len(self.puzzle.answer):
|
||||
return 1
|
||||
else:
|
||||
if self.inputlist == self.puzzle.answer:
|
||||
self.reactor.report(self.atom, 4)
|
||||
return 0
|
||||
else:
|
||||
self.inputlist = []
|
||||
self.reactor.report(self.atom, 2)
|
||||
return 1
|
||||
|
||||
|
||||
class DrawCard(Composition):
|
||||
def __init__(self, screen: Screen, reactor, atom: Tuple[pt.Electron, pt.Nucleon, Dict]):
|
||||
super().__init__(screen, reactor, atom)
|
||||
self.inputlist = []
|
||||
self.hashtable = {}
|
||||
self._work()
|
||||
|
||||
def _work(self):
|
||||
self.puzzle = pz.SelectionPuzzle(self.atom[1]["keyword_note"], [], 2, "选择正确词义: ") # type: ignore
|
||||
self.puzzle.refresh()
|
||||
|
||||
def compose(self):
|
||||
yield Label(self.atom[1].content.replace("/",""), id=self.regid("sentence"))
|
||||
yield Label(self.puzzle.wording[len(self.inputlist)], id=self.regid("puzzle"))
|
||||
yield Label(f"当前输入: {self.inputlist}", id=self.regid("inputpreview"))
|
||||
for i in self.puzzle.options[len(self.inputlist)]:
|
||||
self.hashtable[str(hash(i))] = i
|
||||
yield Button(i, id=self.regid(f"select{hash(i)}"))
|
||||
yield Button("退格", id=self.regid(f"delete"))
|
||||
|
||||
def handler(self, event, type_):
|
||||
if type_ == "button":
|
||||
if self.recid(event.button.id) == "delete":
|
||||
if len(self.inputlist) > 0:
|
||||
self.inputlist.pop()
|
||||
else:
|
||||
return 1
|
||||
else:
|
||||
self.inputlist.append(self.hashtable[self.recid(event.button.id)[6:]])
|
||||
if len(self.inputlist) < len(self.puzzle.answer):
|
||||
return 1
|
||||
else:
|
||||
if self.inputlist == self.puzzle.answer:
|
||||
self.reactor.report(self.atom, 4)
|
||||
return 0
|
||||
else:
|
||||
self.inputlist = []
|
||||
self.reactor.report(self.atom, 2)
|
||||
return 1
|
||||
|
||||
|
||||
registry = {
|
||||
"sample": Composition,
|
||||
"recognition": Recognition,
|
||||
"fill_blank_test": FillBlank,
|
||||
"draw_card_test": DrawCard,
|
||||
"basic_evaluation": BasicEvaluation,
|
||||
}
|
||||
|
||||
|
||||
class TestScreen(Screen):
|
||||
def __init__(self):
|
||||
super().__init__(name=None, id=None, classes=None)
|
||||
self.comp = Recognition(self, None, pt.Atom.advanced_placeholder())
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
yield Header(show_clock=True)
|
||||
yield from self.comp.compose()
|
||||
yield Footer()
|
||||
|
||||
def on_mount(self) -> None:
|
||||
pass
|
||||
|
||||
def on_button_pressed(self, event: Event) -> None:
|
||||
self.comp.handler(event, "button")
|
||||
|
||||
def action_quit_app(self) -> None:
|
||||
self.app.exit()
|
||||
|
||||
|
||||
class AppLauncher(App):
|
||||
CSS_PATH = "styles.css"
|
||||
TITLE = "测试布局"
|
||||
BINDINGS = [("escape", "quit", "退出"), ("d", "toggle_dark", "改变色调")]
|
||||
SCREENS = {
|
||||
"testscreen": TestScreen,
|
||||
}
|
||||
|
||||
def on_mount(self) -> None:
|
||||
self.action_toggle_dark()
|
||||
self.push_screen("testscreen")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app = AppLauncher()
|
||||
app.run()
|
14
config.toml
14
config.toml
@@ -1,14 +0,0 @@
|
||||
# [调试] 将更改保存到文件
|
||||
save = 1
|
||||
|
||||
# [调试] 覆写时间
|
||||
time_override = -1
|
||||
|
||||
# [调试] 一键通过
|
||||
quick_pass = 0
|
||||
|
||||
# 对于每个项目的新记忆核子数量
|
||||
tasked_number = 8
|
||||
|
||||
# 竖屏适配 (未完成)
|
||||
mobile_mode = 1
|
40
install.bat
40
install.bat
@@ -1,40 +0,0 @@
|
||||
@echo off
|
||||
echo "HeurAMS 环境安装脚本"
|
||||
echo "正在检测系统中是否安装 Python 3.x..."
|
||||
|
||||
rem 检查 Python 3 是否存在
|
||||
where python >nul 2>nul
|
||||
if %errorlevel% neq 0 (
|
||||
echo "错误: 未检测到 Python. 请确保 Python 已添加到系统 PATH 中,然后再次运行此脚本。"
|
||||
exit /b 1
|
||||
)
|
||||
|
||||
rem 检查 Python 版本是否为 3.x
|
||||
for /f "tokens=*" %%i in ('python -c "import sys; print(f'{sys.version_info.major}')"') do set PYTHON_MAJOR_VERSION=%%i
|
||||
if "%PYTHON_MAJOR_VERSION%"=="3" (
|
||||
for /f "tokens=*" %%i in ('python -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')"') do set PYTHON_VERSION=%%i
|
||||
echo "检测到 Python 3 已安装, 版本为: %PYTHON_VERSION%"
|
||||
) else (
|
||||
echo "错误: 未检测到 Python 3. 请先安装 Python 3, 然后再次运行此脚本."
|
||||
exit /b 1
|
||||
)
|
||||
|
||||
echo "---"
|
||||
echo "正在安装 requirements.txt 中的依赖..."
|
||||
|
||||
rem 检查 requirements.txt 文件是否存在
|
||||
if exist "requirements.txt" (
|
||||
python -m pip install -r requirements.txt
|
||||
if %errorlevel% equ 0 (
|
||||
echo "依赖安装成功."
|
||||
) else (
|
||||
echo "错误: 依赖安装失败. 请检查 requirements.txt 文件或网络连接."
|
||||
exit /b 1
|
||||
)
|
||||
) else (
|
||||
echo "警告: 未找到 requirements.txt 文件. 跳过依赖安装."
|
||||
)
|
||||
|
||||
echo "---"
|
||||
echo "HeurAMS 的环境依赖已安装"
|
||||
pause
|
28
install.sh
28
install.sh
@@ -1,28 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
echo "HeurAMS 环境安装脚本"
|
||||
echo "正在检测系统中是否安装 Python 3.x..."
|
||||
if command -v python3 &>/dev/null; then
|
||||
PYTHON_VERSION=$(python3 -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
|
||||
echo "检测到 Python 3 已安装, 版本为: ${PYTHON_VERSION}"
|
||||
else
|
||||
echo "错误: 未检测到 Python 3. 请先安装 Python 3, 然后再次运行此脚本. "
|
||||
exit 1 # 退出脚本, 因为 Python 3 是必需的
|
||||
fi
|
||||
|
||||
echo "---"
|
||||
echo "正在安装 requirements.txt 中的依赖..."
|
||||
if [ -f "requirements.txt" ]; then
|
||||
python3 -m pip install -r requirements.txt
|
||||
if [ $? -eq 0 ]; then
|
||||
echo "依赖安装成功. "
|
||||
else
|
||||
echo "错误: 依赖安装失败. 请检查 requirements.txt 文件或网络连接. "
|
||||
exit 1
|
||||
fi
|
||||
else
|
||||
echo "警告: 未找到 requirements.txt 文件. 跳过依赖安装. "
|
||||
fi
|
||||
|
||||
echo "---"
|
||||
echo "HeurAMS 的环境依赖已安装"
|
266
main.py
266
main.py
@@ -1,266 +0,0 @@
|
||||
from textual.app import App, ComposeResult
|
||||
from textual.widgets import (
|
||||
Header,
|
||||
Footer,
|
||||
ListView,
|
||||
ProgressBar,
|
||||
DirectoryTree,
|
||||
ListItem,
|
||||
Label,
|
||||
Static,
|
||||
Button,
|
||||
)
|
||||
from textual.containers import Container, Horizontal, Center
|
||||
from textual.screen import Screen
|
||||
import pathlib
|
||||
import threading
|
||||
import edge_tts as tts
|
||||
from playsound import playsound
|
||||
import particles as pt
|
||||
from reactor import Reactor, Apparatus
|
||||
import auxiliary as aux
|
||||
import compositions as compo
|
||||
import builtins
|
||||
|
||||
# Hook python 的 open() 函数, 使用 utf-8 (兼容 Windows 万年 GBK)
|
||||
|
||||
_original_open = builtins.open
|
||||
|
||||
def _open(*args, **kwargs):
|
||||
if "encoding" not in kwargs:
|
||||
kwargs["encoding"] = "utf-8"
|
||||
return _original_open(*args, **kwargs)
|
||||
|
||||
builtins.open = _open
|
||||
|
||||
ver = "0.3.0"
|
||||
|
||||
config = aux.ConfigFile("config.toml")
|
||||
|
||||
|
||||
class MemScreen(Screen):
|
||||
BINDINGS = [
|
||||
("d", "toggle_dark", "改变色调"),
|
||||
("q", "pop_screen", "返回主菜单"),
|
||||
("v", "play_voice", "朗读"),
|
||||
]
|
||||
if config.get("quick_pass"):
|
||||
BINDINGS.append(("k", "quick_pass", "快速通过[调试]"))
|
||||
btn = dict()
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
nucleon_file: pt.NucleonUnion,
|
||||
electron_file: pt.ElectronUnion,
|
||||
tasked_num,
|
||||
):
|
||||
super().__init__(name=None, id=None, classes=None)
|
||||
self.reactor = Reactor(nucleon_file, electron_file, self, tasked_num)
|
||||
self.stage = 1
|
||||
self.stage += self.reactor.set_round_templated(self.stage)
|
||||
self.reactor.forward()
|
||||
self.compo = next(self.reactor.current_appar)
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
yield Header(show_clock=True)
|
||||
with Center():
|
||||
yield Static(
|
||||
f"{len(self.reactor.procession) - self.reactor.index}/{len(self.reactor.procession)}"
|
||||
)
|
||||
yield from self.compo.compose()
|
||||
yield Footer()
|
||||
|
||||
def on_mount(self):
|
||||
pass
|
||||
|
||||
def on_button_pressed(self, event):
|
||||
ret = self.compo.handler(event, "button")
|
||||
self._forward_judge(ret)
|
||||
|
||||
def _forward_judge(self, ret):
|
||||
if ret == -1:
|
||||
return
|
||||
if ret == 0:
|
||||
try:
|
||||
self.compo = next(self.reactor.current_appar)
|
||||
self.refresh_ui()
|
||||
except StopIteration:
|
||||
nxt = self.reactor.forward(1)
|
||||
try:
|
||||
self.compo = next(self.reactor.current_appar)
|
||||
except:
|
||||
pass
|
||||
if nxt == -1:
|
||||
if self.reactor.round_set == 0:
|
||||
if self.stage == 4:
|
||||
if config.get("save"):
|
||||
self.reactor.save()
|
||||
self.compo = compo.Finished(
|
||||
self, None, pt.Atom.placeholder()
|
||||
)
|
||||
self.refresh_ui()
|
||||
else:
|
||||
self.reactor.set_round_templated(self.stage)
|
||||
self.reactor.forward(1)
|
||||
self.stage += 1
|
||||
self.compo = next(self.reactor.current_appar)
|
||||
self.refresh_ui()
|
||||
return
|
||||
return
|
||||
else:
|
||||
self.refresh_ui()
|
||||
return
|
||||
if ret == 1:
|
||||
self.refresh_ui()
|
||||
return
|
||||
|
||||
def refresh_ui(self):
|
||||
self.call_later(self.recompose)
|
||||
|
||||
def action_play_voice(self):
|
||||
def play():
|
||||
cache_dir = pathlib.Path(f"./cache/voice/")
|
||||
cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
cache = cache_dir / f"{self.reactor.current_atom[1].content.replace('/','')}.wav"
|
||||
if not cache.exists():
|
||||
communicate = tts.Communicate(
|
||||
self.reactor.current_atom[1].content.replace("/", ""),
|
||||
"zh-CN-YunjianNeural",
|
||||
)
|
||||
communicate.save_sync(
|
||||
f"./cache/voice/{self.reactor.current_atom[1].content.replace('/','')}.wav"
|
||||
)
|
||||
playsound(str(cache))
|
||||
|
||||
threading.Thread(target=play).start()
|
||||
|
||||
def action_quick_pass(self):
|
||||
self.reactor.report(self.reactor.current_atom, 5)
|
||||
self._forward_judge(0)
|
||||
def action_toggle_dark(self):
|
||||
self.app.action_toggle_dark()
|
||||
|
||||
def action_pop_screen(self):
|
||||
self.app.pop_screen()
|
||||
|
||||
|
||||
class PreparationScreen(Screen):
|
||||
BINDINGS = [("q", "go_back", "返回"), ("escape", "quit_app", "退出")]
|
||||
|
||||
def __init__(
|
||||
self, nucleon_file: pt.NucleonUnion, electron_file: pt.ElectronUnion
|
||||
) -> None:
|
||||
super().__init__(name=None, id=None, classes=None)
|
||||
self.nucleon_file = nucleon_file
|
||||
self.electron_file = electron_file
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
yield Header(show_clock=True)
|
||||
with Container(id="learning_screen_container"):
|
||||
yield Label(f"记忆项目: [b]{self.nucleon_file.name}[/b]\n")
|
||||
yield Label(
|
||||
f"核子文件对象: ./nucleon/[b]{self.nucleon_file.name}[/b].toml"
|
||||
)
|
||||
yield Label(
|
||||
f"电子文件对象: ./electron/[b]{self.electron_file.name}[/b].toml"
|
||||
)
|
||||
yield Label(f"核子数量:{len(self.nucleon_file)}")
|
||||
yield Button(
|
||||
"开始记忆",
|
||||
id="start_memorizing_button",
|
||||
variant="primary",
|
||||
classes="start-button",
|
||||
)
|
||||
yield Static(f"\n全文如下:\n")
|
||||
yield Static(self._get_full_content().replace("/", ""), classes="full")
|
||||
yield Footer()
|
||||
|
||||
def _get_full_content(self):
|
||||
content = ""
|
||||
for i in self.nucleon_file.nucleons:
|
||||
content += i["content"]
|
||||
return content
|
||||
|
||||
def action_go_back(self):
|
||||
self.app.pop_screen()
|
||||
|
||||
def action_quit_app(self):
|
||||
self.app.exit()
|
||||
|
||||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||||
if event.button.id == "start_memorizing_button":
|
||||
newscr = MemScreen(
|
||||
self.nucleon_file, self.electron_file, config.get("tasked_number", 8)
|
||||
)
|
||||
self.app.push_screen(newscr)
|
||||
|
||||
|
||||
class FileSelectorScreen(Screen):
|
||||
global ver
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
yield Header(show_clock=True)
|
||||
yield Container(
|
||||
Label(f'欢迎使用 "潜进" 辅助记忆软件, 版本 {ver}', classes="title-label"),
|
||||
Label("选择要学习的文件:", classes="title-label"),
|
||||
ListView(id="file-list", classes="file-list-view"),
|
||||
)
|
||||
yield Footer()
|
||||
|
||||
def on_mount(self) -> None:
|
||||
file_list_widget = self.query_one("#file-list", ListView)
|
||||
nucleon_path = pathlib.Path("./nucleon")
|
||||
nucleon_files = sorted(
|
||||
[f.name for f in nucleon_path.iterdir() if f.suffix == ".toml"]
|
||||
)
|
||||
|
||||
if nucleon_files:
|
||||
for filename in nucleon_files:
|
||||
file_list_widget.append(ListItem(Label(filename)))
|
||||
else:
|
||||
file_list_widget.append(
|
||||
ListItem(Static("在 ./nucleon/ 中未找到任何核子文件. 请放置文件后重启应用."))
|
||||
)
|
||||
file_list_widget.disabled = True
|
||||
|
||||
def on_list_view_selected(self, event: ListView.Selected) -> None:
|
||||
if not isinstance(event.item, ListItem):
|
||||
return
|
||||
|
||||
selected_label = event.item.query_one(Label)
|
||||
if "未找到任何 .toml 文件" in str(selected_label.renderable):
|
||||
return
|
||||
|
||||
selected_filename = str(selected_label.renderable)
|
||||
nucleon_file = pt.NucleonUnion(
|
||||
pathlib.Path("./nucleon") / selected_filename
|
||||
)
|
||||
electron_file_path = pathlib.Path("./electron") / selected_filename
|
||||
if electron_file_path.exists():
|
||||
pass
|
||||
else:
|
||||
electron_file_path.touch()
|
||||
electron_file = pt.ElectronUnion(
|
||||
pathlib.Path("./electron") / selected_filename
|
||||
)
|
||||
self.app.push_screen(PreparationScreen(nucleon_file, electron_file))
|
||||
|
||||
def action_quit_app(self) -> None:
|
||||
self.app.exit()
|
||||
|
||||
|
||||
class AppLauncher(App):
|
||||
CSS_PATH = "styles.css"
|
||||
TITLE = "潜进 - 辅助记忆程序"
|
||||
BINDINGS = [("escape", "quit", "退出"), ("d", "toggle_dark", "改变色调")]
|
||||
SCREENS = {
|
||||
"file_selection_screen": FileSelectorScreen,
|
||||
}
|
||||
|
||||
def on_mount(self) -> None:
|
||||
self.push_screen("file_selection_screen")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app = AppLauncher()
|
||||
app.run()
|
@@ -1,22 +0,0 @@
|
||||
# 文件头, 按部就班复制即可
|
||||
["keydata"]
|
||||
note = "笔记"
|
||||
keyword_note = "关键词翻译"
|
||||
translation = "语句翻译"
|
||||
["testdata"]
|
||||
additional_inf = ["translation","keyword_note", "note"]
|
||||
fill_blank_test = {"from"=["content"], "hint"=["translation"]}
|
||||
|
||||
|
||||
#可重复的单元
|
||||
draw_card_test = {"from"=["keyword_note"]}
|
||||
[CONTENT]
|
||||
note = []
|
||||
translation = "TRANSLATION"
|
||||
keyword_note = {"KN_KEY": "KN_VALUE"}
|
||||
|
||||
#这是一个示例:(不要求附加在生成文本中)
|
||||
["臣/密/言: /臣/以/险衅/, 夙/遭/闵凶./"]
|
||||
note = []
|
||||
translation = "臣子李密陈言: 我因命运不好, 小时候遭遇到了不幸"
|
||||
keyword_note = {"险衅"="凶险祸患(这里指命运不好)", "夙"="早时, 这里指年幼的时候", "闵"="通'悯', 指可忧患的事", "凶"="不幸, 指丧父"}
|
154
nucleon/陈情表.toml
154
nucleon/陈情表.toml
@@ -1,154 +0,0 @@
|
||||
# 散列表的键翻译
|
||||
["keydata"]
|
||||
note = "笔记"
|
||||
keyword_note = "关键词翻译"
|
||||
translation = "语句翻译"
|
||||
|
||||
# 测试项目元数据
|
||||
["testdata"]
|
||||
# 记忆时显示的额外信息
|
||||
additional_inf = ["translation","keyword_note", "note"]
|
||||
# 填空测试, content 指代键名
|
||||
fill_blank_test = {"from"=["content"], "hint"=["translation"]}
|
||||
# 选择题测试
|
||||
draw_card_test = {"from"=["keyword_note"]}
|
||||
|
||||
["臣/密/言: /臣/以/险衅/, 夙/遭/闵凶./"]
|
||||
note = []
|
||||
translation = "臣子李密陈言: 我因命运不好, 小时候遭遇到了不幸"
|
||||
keyword_note = {"险衅"="凶险祸患(这里指命运不好)", "夙"="早时, 这里指年幼的时候", "闵"="通'悯', 指可忧患的事", "凶"="不幸, 指丧父"}
|
||||
|
||||
["生孩/六月/, 慈父/见背/; /行年/四岁/, 舅/夺/母志./"]
|
||||
note = []
|
||||
translation = "刚出生六个月, 我慈爱的父亲就不幸去世了。经过了四年, 舅父逼母亲改嫁"
|
||||
keyword_note = {"见背"="死的委婉说法", "行年"="经历的年岁", "母志"="母亲守节之志(改嫁的委婉说法)"}
|
||||
|
||||
["祖母/刘/愍/臣/孤弱/, 躬亲/抚养./"]
|
||||
note = []
|
||||
translation = "我的祖母刘氏, 怜悯我从小丧父, 便亲自对我加以抚养"
|
||||
keyword_note = {"愍"="怜悯", "躬亲"="亲身"}
|
||||
|
||||
["臣/少/多/疾病/, 九岁/不行/, 零丁/孤苦/, 至于/成立./"]
|
||||
note = []
|
||||
translation = "臣小的时候经常生病, 九岁时还不会行走。孤独无靠, 一直到成人自立"
|
||||
keyword_note = {"成立"="成人自立"}
|
||||
|
||||
["既/无/伯叔/, 终/鲜/兄弟/, 门/衰/祚/薄/, 晚/有/儿息./"]
|
||||
note = []
|
||||
translation = "既没有叔叔伯伯, 又没什么兄弟, 门庭衰微而福分浅薄, 很晚才有儿子"
|
||||
keyword_note = {"鲜"="少, 这里指'无'", "祚薄"="福分浅薄", "儿息"="亲生子女"}
|
||||
|
||||
["外/无/期功/强近/之亲/, 内/无/应门/五尺/之僮/, 茕茕/孑立/, 形影/相吊./"]
|
||||
note = []
|
||||
translation = "在外面没有比较亲近的亲戚, 在家里又没有照应门户的童仆。生活孤单没有依靠, 每天只有自己的身体和影子相互安慰"
|
||||
keyword_note = {"期功"="指关系较近的亲属", "茕茕孑立"="孤单无依靠的样子", "吊"="安慰"}
|
||||
|
||||
["而/刘/夙/婴/疾病/, 常/在/床蓐/, 臣/侍/汤药/, 未曾/废离./"]
|
||||
note = []
|
||||
translation = "但祖母又早被疾病缠绕, 常年卧床不起, 我侍奉她吃饭喝药, 从来就没有停止侍奉而离开她"
|
||||
keyword_note = {"婴"="被...缠绕", "蓐"="通'褥', 床垫", "废"="停止服侍", "离"="离开"}
|
||||
|
||||
["逮/奉/圣朝/, 沐浴/清化./"]
|
||||
note = []
|
||||
translation = "到了晋朝建立, 我蒙受着清明的政治教化"
|
||||
keyword_note = {"逮"="及, 到", "奉"="承奉", "圣朝"="指当时的晋朝", "沐浴清化"="蒙受清平教化"}
|
||||
|
||||
["前/太守/臣/逵/察/臣/孝廉/; /后/刺史/臣/荣/举/臣/秀才./"]
|
||||
note = []
|
||||
translation = "前任太守逵, 考察后推举臣下为孝廉, 后任刺史荣又推举臣下为优秀人才"
|
||||
keyword_note = {"察"="考察和推举", "孝廉"="孝顺, 品性纯洁", "举"="推举", "秀才"="优秀人才"}
|
||||
|
||||
["臣/以/供养/无主/, 辞/不赴命./"]
|
||||
note = []
|
||||
translation = "臣下因为供奉赡养祖母的事无人承担, 辞谢不接受任命"
|
||||
keyword_note = {"无主"="无人承担"}
|
||||
|
||||
["诏书/特下/, 拜/臣/郎中/, 寻/蒙/国恩/, 除/臣/洗马./"]
|
||||
note = []
|
||||
translation = "朝廷又特地下了诏书, 任命我为郎中, 不久又蒙受国家恩命, 任命我为太子洗马"
|
||||
keyword_note = {"拜"="授予官职", "郎中"="尚书省的属官", "寻"="不久", "除"="拜官受职", "洗马"="太子的属官"}
|
||||
|
||||
["猥/以/微贱/, 当/侍/东宫/, 非/臣/陨首/所能/上报./"]
|
||||
note = []
|
||||
translation = "像我这样出身微贱地位卑下的人, 担当侍奉太子的职务, 这实在不是我杀身捐躯所能报答朝廷的"
|
||||
keyword_note = {"猥"="谦词", "微贱"="卑微低贱", "东宫"="太子居处", "陨首"="杀身"}
|
||||
|
||||
["臣/具/以表/闻/, 辞/不就职./"]
|
||||
note = []
|
||||
translation = "我将以上苦衷上表报告, 加以推辞不去就职"
|
||||
keyword_note = {"具"="详细", "闻"="使...知道"}
|
||||
|
||||
["诏书/切峻/, 责/臣/逋慢/; /郡县/逼迫/, 催/臣/上道/; /州司/临门/, 急于/星火./"]
|
||||
note = []
|
||||
translation = "但是诏书急切严峻, 责备我逃避命令, 有意拖延, 态度傲慢。郡县长官催促我立刻上路; 州官登门督促, 比流星坠落还要急迫"
|
||||
keyword_note = {"切峻"="急切而严厉", "逋慢"="逃避怠慢", "星火"="流星的光, 喻急迫"}
|
||||
|
||||
["臣/欲/奉诏/奔驰/, 则/刘/病/日笃/, 欲/苟/顺/私情/, 则/告诉/不许./"]
|
||||
note = []
|
||||
translation = "我很想遵从皇上的旨意赴京就职, 但祖母刘氏的病却一天比一天重; 想要姑且顺从自己的私情, 但报告申诉不被允许"
|
||||
keyword_note = {"日笃"="病情日益加重", "苟"="姑且", "告诉"="报告申诉"}
|
||||
|
||||
["臣/之/进退/, 实为/狼狈./"]
|
||||
note = []
|
||||
translation = "我是进退两难, 十分狼狈"
|
||||
keyword_note = {"狼狈"="进退两难的样子"}
|
||||
|
||||
["伏惟/圣朝/以/孝/治/天下/, 凡/在/故老/, 犹/蒙/矜育/, 况/臣/孤苦/, 特为/尤甚./"]
|
||||
note = []
|
||||
translation = "我俯伏思量晋朝是用孝道来治理天下的, 凡是年老而德高的旧臣, 尚且还受到怜悯养育, 何况我的孤苦程度更为严重呢"
|
||||
keyword_note = {"伏惟"="下对上的敬辞", "故老"="年老德高的旧臣", "矜育"="怜悯养育"}
|
||||
|
||||
["且/臣/少/仕/伪朝/, 历职/郎署/, 本图/宦达/, 不矜/名节./"]
|
||||
note = []
|
||||
translation = "况且我年轻的时候曾经做过蜀汉的官, 担任过郎官职务, 本来就希望做官显达, 并不顾惜名声节操"
|
||||
keyword_note = {"伪朝"="对前朝的蔑称", "历职"="连续任职", "郎署"="尚书郎的官衙", "宦达"="官场上显达"}
|
||||
|
||||
["今/臣/亡国/贱俘/, 至微/至陋/, 过/蒙/拔擢/, 宠命/优渥/, 岂敢/盘桓/, 有所/希冀!/"]
|
||||
note = []
|
||||
translation = "现在我是一个低贱的亡国俘虏, 十分卑微浅陋, 受到过分提拔, 恩宠优厚, 怎敢犹豫不决而有非分的企求呢"
|
||||
keyword_note = {"拔擢"="提拔", "优渥"="优厚", "盘桓"="徘徊不前", "希冀"="非分的企求"}
|
||||
|
||||
["但/以/刘/日薄/西山/, 气息/奄奄/, 人命/危浅/, 朝不/虑夕./"]
|
||||
note = []
|
||||
translation = "只是因为祖母刘氏寿命即将终了, 气息微弱, 生命垂危, 早上不能想到晚上怎样"
|
||||
keyword_note = {"日薄西山"="太阳接近西山, 喻人寿命将终", "危浅"="生命垂危"}
|
||||
|
||||
["臣/无/祖母/, 无以/至今日/, 祖母/无/臣/, 无以/终余年./"]
|
||||
note = []
|
||||
translation = "臣下我如果没有祖母, 就没有今天的样子; 祖母如果没有我的照料, 也无法度过她的余生"
|
||||
keyword_note = {"终余年"="度过余生"}
|
||||
|
||||
["母孙/二人/, 更相/为命/, 是以/区区/不能/废远./"]
|
||||
note = []
|
||||
translation = "我们祖孙二人, 互相依靠而维持生命, 因此我的内心不愿废止奉养, 远离祖母"
|
||||
keyword_note = {"更相"="相互", "区区"="自己的私情", "废远"="废止奉养而远离"}
|
||||
|
||||
["臣/密/今年/四十/有四/, 祖母/今年/九十/有六/, 是/臣/尽节/于/陛下/之日/长/, 报养/刘/之日/短./"]
|
||||
note = []
|
||||
translation = "臣下我现在的年龄四十四岁了, 祖母现在的年龄九十六岁了, 臣下我在陛下面前尽忠尽节的日子还长着呢, 而在祖母刘氏面前尽孝尽心的日子已经不多了"
|
||||
keyword_note = {"有"="又", "尽节"="尽忠节"}
|
||||
|
||||
["乌鸟/私情/, 愿/乞/终养./"]
|
||||
note = []
|
||||
translation = "我怀着乌鸦反哺的私情, 乞求能够准许我完成对祖母养老送终的心愿"
|
||||
keyword_note = {"乌鸟私情"="乌鸦反哺之情, 喻孝心", "终养"="养老至终"}
|
||||
|
||||
["臣/之/辛苦/, 非独/蜀之/人士/及/二州/牧伯/所见/明知/, 皇天/后土/, 实所/共鉴./"]
|
||||
note = []
|
||||
translation = "我的辛酸苦楚, 并不仅仅被蜀地的百姓及益州、梁州的长官所亲眼目睹、内心明白, 连天地神明也都看得清清楚楚"
|
||||
keyword_note = {"辛苦"="辛酸苦楚", "牧伯"="州郡长官", "皇天后土"="天地神明", "鉴"="明察"}
|
||||
|
||||
["愿/陛下/矜悯/愚诚/, 听/臣/微志/, 庶/刘/侥幸/, 保/卒/余年./"]
|
||||
note = []
|
||||
translation = "希望陛下能怜悯我愚昧诚心, 请允许我完成臣下一点小小的心愿, 使祖母刘氏能够侥幸地保全她的余生"
|
||||
keyword_note = {"矜悯"="怜悯", "听"="准许", "庶"="或许(表希望)", "卒余年"="终老"}
|
||||
|
||||
["臣/生/当/陨首/, 死/当/结草./"]
|
||||
note = []
|
||||
translation = "我活着应当杀身报效朝廷, 死了也要结草衔环来报答陛下的恩情"
|
||||
keyword_note = {"陨首"="掉脑袋, 指献出生命", "结草"="死后报恩的典故"}
|
||||
|
||||
["臣/不胜/犬马/怖惧/之情/, 谨/拜表/以闻./"]
|
||||
note = []
|
||||
translation = "臣下我怀着牛马一样不胜恐惧的心情, 恭敬地呈上此表来使陛下知道这件事"
|
||||
keyword_note = {"不胜"="禁不住", "犬马怖惧"="臣子谦卑的自比", "闻"="使...知道"}
|
52
precache.py
52
precache.py
@@ -1,52 +0,0 @@
|
||||
# 音频预缓存实用程序, 独立于主程序之外, 但依赖 particles 组件
|
||||
import particles as pt
|
||||
import edge_tts as tts
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
|
||||
|
||||
def precache(text: str):
|
||||
"""预缓存单个文本的音频"""
|
||||
cache_dir = Path("./cache/voice/")
|
||||
cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
cache = cache_dir / f"{text}.wav"
|
||||
if not cache.exists():
|
||||
communicate = tts.Communicate(text, "zh-CN-YunjianNeural")
|
||||
communicate.save_sync(f"./cache/voice/{text}.wav")
|
||||
|
||||
|
||||
def proc_file(path: Path):
|
||||
"""处理单个文件"""
|
||||
nu = pt.NucleonUnion(path)
|
||||
c = 0
|
||||
for i in nu.nucleons:
|
||||
c += 1
|
||||
print(f"预缓存 [{nu.name}] ({c}/{len(nu)}): {i['content'].replace('/', '')}")
|
||||
precache(i['content'].replace('/', ''))
|
||||
|
||||
|
||||
def walk(path_str: str):
|
||||
"""遍历目录处理所有文件"""
|
||||
path = Path(path_str)
|
||||
print(f"正在遍历目录: {path}")
|
||||
|
||||
for item in path.iterdir():
|
||||
if item.is_file() and item.suffix == ".toml":
|
||||
print(f"正预缓存文件: {item.name}")
|
||||
proc_file(item)
|
||||
elif item.is_dir():
|
||||
print(f"进入目录: {item.name}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("音频预缓存实用程序")
|
||||
print("A: 全部缓存")
|
||||
print("C: 清空缓存")
|
||||
|
||||
choice = input("输入选项 $ ").upper()
|
||||
|
||||
if choice == "A":
|
||||
walk("./nucleon")
|
||||
elif choice == "C":
|
||||
shutil.rmtree("./cache/voice", ignore_errors=True)
|
||||
print("缓存已清空")
|
121
puzzles.py
121
puzzles.py
@@ -1,121 +0,0 @@
|
||||
import random
|
||||
|
||||
|
||||
class Puzzle:
|
||||
pass
|
||||
|
||||
|
||||
class BlankPuzzle(Puzzle):
|
||||
"""填空题谜题生成器
|
||||
|
||||
Args:
|
||||
text: 原始字符串(需要 "/" 分割句子, 末尾应有 "/")
|
||||
min_denominator: 最小概率倒数(如占所有可生成填空数的 1/7 中的 7, 若期望值小于 1, 则取 1)
|
||||
"""
|
||||
|
||||
def __init__(self, text: str, min_denominator: int):
|
||||
self.text = text
|
||||
self.min_denominator = min_denominator
|
||||
self.wording = "填空题 - 尚未刷新谜题"
|
||||
self.answer = ["填空题 - 尚未刷新谜题"]
|
||||
|
||||
def refresh(self): # 刷新谜题
|
||||
placeholder = "___SLASH___"
|
||||
tmp_text = self.text.replace("/", placeholder)
|
||||
words = tmp_text.split(placeholder)
|
||||
if not words:
|
||||
return
|
||||
words = [word for word in words if word]
|
||||
num_blanks = min(max(1, len(words) // self.min_denominator), len(words))
|
||||
indices_to_blank = random.sample(range(len(words)), num_blanks)
|
||||
indices_to_blank.sort()
|
||||
blanked_words = list(words)
|
||||
answer = list()
|
||||
for index in indices_to_blank:
|
||||
blanked_words[index] = "__" * len(words[index])
|
||||
answer.append(words[index])
|
||||
self.answer = answer
|
||||
self.wording = "".join(blanked_words)
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.wording}\n{str(self.answer)}"
|
||||
|
||||
|
||||
class SelectionPuzzle(Puzzle):
|
||||
"""选择题谜题生成器
|
||||
|
||||
Args:
|
||||
mapping: 正确选项映射 {问题: 答案}
|
||||
jammer: 干扰项列表
|
||||
max_riddles_num: 最大生成谜题数 (默认2个)
|
||||
prefix: 问题前缀
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
mapping: dict,
|
||||
jammer: list,
|
||||
max_riddles_num: int = 2,
|
||||
prefix: str = ""
|
||||
):
|
||||
self.prefix = prefix
|
||||
self.mapping = mapping
|
||||
self.jammer = list(set(jammer + list(mapping.values())))
|
||||
while len(self.jammer) < 4:
|
||||
self.jammer.append(" ")
|
||||
self.max_riddles_num = max(1, min(max_riddles_num, 5))
|
||||
self.wording = "选择题 - 尚未刷新谜题"
|
||||
self.answer = ["选择题 - 尚未刷新谜题"]
|
||||
self.options = []
|
||||
|
||||
def refresh(self):
|
||||
"""刷新谜题,根据题目数量生成适当数量的谜题"""
|
||||
if not self.mapping:
|
||||
self.wording = "无可用题目"
|
||||
self.answer = ["无答案"]
|
||||
self.options = []
|
||||
return
|
||||
|
||||
num_questions = min(self.max_riddles_num, len(self.mapping))
|
||||
questions = random.sample(list(self.mapping.items()), num_questions)
|
||||
puzzles = []
|
||||
answers = []
|
||||
all_options = []
|
||||
|
||||
for question, correct_answer in questions:
|
||||
options = [correct_answer]
|
||||
available_jammers = [
|
||||
j for j in self.jammer if j != correct_answer
|
||||
]
|
||||
if len(available_jammers) >= 3:
|
||||
selected_jammers = random.sample(available_jammers, 3)
|
||||
else:
|
||||
selected_jammers = random.choices(available_jammers, k=3)
|
||||
options.extend(selected_jammers)
|
||||
random.shuffle(options)
|
||||
puzzles.append(question)
|
||||
answers.append(correct_answer)
|
||||
all_options.append(options)
|
||||
|
||||
question_texts = []
|
||||
for i, puzzle in enumerate(puzzles):
|
||||
question_texts.append(f"{self.prefix}:\n {i+1}. {puzzle}")
|
||||
|
||||
self.wording = question_texts
|
||||
self.answer = answers
|
||||
self.options = all_options
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.wording}\n正确答案: {', '.join(self.answer)}"
|
||||
|
||||
if __name__ == "__main__":
|
||||
puz = SelectionPuzzle(
|
||||
{"1+1": "2", "1+2": "3", "1+3": "4"},
|
||||
["2", "5", "0"],
|
||||
3,
|
||||
'求值: '
|
||||
)
|
||||
puz.refresh()
|
||||
print(puz.wording)
|
||||
print(puz.answer)
|
||||
print(puz.options)
|
11
pyproject.toml
Normal file
11
pyproject.toml
Normal file
@@ -0,0 +1,11 @@
|
||||
[build-system]
|
||||
requires = ["setuptools>=45", "wheel"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "heurams"
|
||||
version = "0.4.0"
|
||||
description = "Heuristic Assisted Memory Scheduler"
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
where = ["src"]
|
150
reactor.py
150
reactor.py
@@ -1,150 +0,0 @@
|
||||
import typing
|
||||
import particles as pt
|
||||
import pathlib
|
||||
import auxiliary as aux
|
||||
import compositions as comps
|
||||
import random
|
||||
#from pprint import pprint as print # debug
|
||||
class Apparatus():
|
||||
"""反应器对象, 决策一个原子的不同记忆方式, 并反馈到布局"""
|
||||
def __init__(self, screen, reactor, atom):
|
||||
self.electron: pt.Electron = atom[0]
|
||||
self.nucleon: pt.Nucleon = atom[1]
|
||||
self.positron: dict = atom[2]
|
||||
self.testdata = self.positron["testdata"]
|
||||
self.procession: typing.List[comps.Composition] = list()
|
||||
if self.positron["is_new_activation"] == 1:
|
||||
self.positron["is_new_activation"] = 0
|
||||
self.procession.append(comps.registry["recognition"](screen, reactor, atom))
|
||||
return
|
||||
for i in self.positron["testdata"].keys():
|
||||
if i == "additional_inf":
|
||||
continue
|
||||
self.procession.append(comps.registry[i](screen, reactor, atom))
|
||||
# self.procession.reverse()
|
||||
random.shuffle(self.procession)
|
||||
|
||||
def iterator(self):
|
||||
yield from self.procession
|
||||
|
||||
|
||||
class Reactor():
|
||||
"""反应堆对象, 处理和分配一次文件记忆流程的资源与策略"""
|
||||
def __init__(self, nucleon_file: pt.NucleonUnion, electron_file: pt.ElectronUnion, screen, tasked_num):
|
||||
# 导入原子对象
|
||||
self.nucleon_file = nucleon_file
|
||||
self.electron_file = electron_file
|
||||
self.tasked_num = tasked_num
|
||||
self.atoms_new = list()
|
||||
self.atoms_review = list()
|
||||
counter = self.tasked_num
|
||||
self.screen = screen
|
||||
self.electron_dict = electron_file.electrons_dict
|
||||
self.quality_dict = {}
|
||||
def electron_dict_get_fallback(key) -> pt.Electron:
|
||||
value = self.electron_dict.get(key)
|
||||
# 如果值不存在,则设置默认值
|
||||
if value is None:
|
||||
value = pt.Electron(key, {}) # 获取默认值
|
||||
self.electron_dict[key] = value # 将默认值存入字典
|
||||
electron_file.sync()
|
||||
return value # 返回获取的值(可能是默认值)
|
||||
|
||||
for nucleon in nucleon_file.nucleons:
|
||||
# atom = (Electron, Nucleon, Positron) 即 (记忆元数据, 内容元数据, 运行时数据)
|
||||
atom = (electron_dict_get_fallback(nucleon.content), nucleon, {}) # 使用 "Positron" 代称 atom[2]
|
||||
atom[2]["testdata"] = nucleon_file.testdata
|
||||
atom[2]["keydata"] = nucleon_file.keydata
|
||||
if atom[0]["is_activated"] == 0:
|
||||
if counter > 0:
|
||||
atom[2]["is_new_activation"] = 1
|
||||
atom[0]["is_activated"] = 1
|
||||
self.atoms_new.append(atom)
|
||||
counter -= 1
|
||||
else:
|
||||
atom[2]["is_new_activation"] = 0
|
||||
if int(atom[0]["next_date"]) <= aux.get_daystamp():
|
||||
atom[0]["last_date"] = aux.get_daystamp()
|
||||
self.atoms_review.append(atom)
|
||||
# 设置运行时
|
||||
self.index: int
|
||||
self.procession: list
|
||||
self.failed: list
|
||||
self.round_title: str
|
||||
self.current_atom: typing.Tuple[pt.Electron, pt.Nucleon, dict]
|
||||
self.round_set = 0
|
||||
self.current_atom = pt.Atom.placeholder()
|
||||
#print(self.atoms_new)
|
||||
|
||||
def set_round(self, title, procession):
|
||||
self.round_set = 1
|
||||
self.round_title = title
|
||||
self.procession = procession
|
||||
self.failed = list()
|
||||
self.index = -1
|
||||
|
||||
def set_round_templated(self, stage):
|
||||
titles = {
|
||||
1: "复习模式",
|
||||
2: "新记忆模式",
|
||||
3: "总复习模式"
|
||||
}
|
||||
processions = {
|
||||
1: self.atoms_review,
|
||||
2: self.atoms_new,
|
||||
3: (self.atoms_new + self.atoms_review)
|
||||
}
|
||||
ret = 1
|
||||
if stage == 1 and len(processions[1]) == 0:
|
||||
stage = 2
|
||||
ret = 2
|
||||
self.set_round(title=titles[stage], procession=processions[stage])
|
||||
return ret
|
||||
|
||||
def forward(self, step = 1):
|
||||
"""
|
||||
返回值规则:
|
||||
1: 重定向至 failed
|
||||
-1: 此轮已完成
|
||||
0: 下一个记忆单元
|
||||
"""
|
||||
if self.index + step >= len(self.procession):
|
||||
if len(self.failed) > 0:
|
||||
self.procession = self.failed
|
||||
self.index = -1
|
||||
self.forward(step)
|
||||
if "- 额外复习" not in self.round_title:
|
||||
self.round_title += " - 额外复习"
|
||||
self.failed = list()
|
||||
return 1 # 自动重定向到 failed
|
||||
else:
|
||||
self.round_set = 0
|
||||
return -1 # 此轮已完成
|
||||
self.index += step
|
||||
self.current_atom = self.procession[self.index]
|
||||
self.current_appar = Apparatus(self.screen, self, self.current_atom).iterator()
|
||||
return 0
|
||||
|
||||
def save(self):
|
||||
self._deploy_report()
|
||||
print("Progress saved")
|
||||
# self.nucleon_file.save()
|
||||
self.electron_file.save()
|
||||
|
||||
def _deploy_report(self):
|
||||
"部署所有 _report"
|
||||
for e, q in self.quality_dict.items():
|
||||
if q == -1:
|
||||
e.revisor(5, True)
|
||||
continue
|
||||
e.revisor(q)
|
||||
def report(self, atom, quality):
|
||||
"向反应器和最低质量记录汇报"
|
||||
if atom in self.atoms_new:
|
||||
self.quality_dict[atom[0]] = -1
|
||||
print(self.quality_dict)
|
||||
return
|
||||
self.quality_dict[atom[0]] = min(quality, self.quality_dict.get(atom[0], 5))
|
||||
if quality <= 3:
|
||||
self.failed.append(atom)
|
||||
print(self.quality_dict)
|
Binary file not shown.
Before Width: | Height: | Size: 120 KiB |
Binary file not shown.
Before Width: | Height: | Size: 559 KiB |
Binary file not shown.
Before Width: | Height: | Size: 177 KiB |
Binary file not shown.
Before Width: | Height: | Size: 161 KiB |
@@ -1,8 +0,0 @@
|
||||
aiohttp==3.12.13
|
||||
aiohttp_jinja2==1.6
|
||||
edge_tts==7.0.2
|
||||
Jinja2==3.1.6
|
||||
playsound==1.2.2
|
||||
rich==14.1.0
|
||||
textual==5.0.1
|
||||
toml==0.10.2
|
3
serve.py
3
serve.py
@@ -1,3 +0,0 @@
|
||||
from webshare import server
|
||||
server = server.Server("python3 main.py", title="辅助记忆程序", host="0.0.0.0")
|
||||
server.serve()
|
24
src/heurams/__main__.py
Normal file
24
src/heurams/__main__.py
Normal file
@@ -0,0 +1,24 @@
|
||||
#!/usr/bin/env python3
|
||||
from textual.app import App
|
||||
import screens
|
||||
import os
|
||||
|
||||
class AppLauncher(App):
|
||||
CSS_PATH = "styles.css"
|
||||
TITLE = "潜进 - 辅助记忆调度器"
|
||||
BINDINGS = [("escape", "quit", "退出"), ("d", "toggle_dark", "改变色调")]
|
||||
SCREENS = {
|
||||
"dashboard": screens.DashboardScreen,
|
||||
}
|
||||
|
||||
def on_mount(self) -> None:
|
||||
self.push_screen("dashboard")
|
||||
|
||||
if __name__ == "__main__":
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
os.chdir(script_dir)
|
||||
os.makedirs("electron", exist_ok=True)
|
||||
os.makedirs("nucleon", exist_ok=True)
|
||||
os.makedirs("cache/voice", exist_ok=True)
|
||||
app = AppLauncher()
|
||||
app.run()
|
31
src/heurams/context.py
Normal file
31
src/heurams/context.py
Normal file
@@ -0,0 +1,31 @@
|
||||
"""
|
||||
全局上下文管理模块
|
||||
"""
|
||||
from contextvars import ContextVar
|
||||
from typing import Optional
|
||||
from heurams.services.config import ConfigFile
|
||||
|
||||
config_var: ContextVar[ConfigFile] = ContextVar('config_var', default=ConfigFile("")) # 配置文件
|
||||
|
||||
runtime_var: ContextVar = ContextVar('runtime_var', default=None) # 运行时共享数据
|
||||
|
||||
class ConfigContext:
|
||||
"""
|
||||
功能完备的上下文管理器
|
||||
用于临时切换配置的作用域, 支持嵌套使用
|
||||
Example:
|
||||
>>> with ConfigContext(test_config):
|
||||
... get_daystamp() # 使用 test_config
|
||||
>>> get_daystamp() # 恢复原配置
|
||||
"""
|
||||
|
||||
def __init__(self, config_provider: ConfigFile):
|
||||
self.config_provider = config_provider
|
||||
self._token = None
|
||||
|
||||
def __enter__(self):
|
||||
self._token = config_var.set(self.config_provider)
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
config_var.reset(self._token) # type: ignore
|
140
src/heurams/interface/screens/memory.py
Normal file
140
src/heurams/interface/screens/memory.py
Normal file
@@ -0,0 +1,140 @@
|
||||
|
||||
class MemScreen(Screen):
|
||||
BINDINGS = [
|
||||
("d", "toggle_dark", "改变色调"),
|
||||
("q", "pop_screen", "返回主菜单"),
|
||||
("v", "play_voice", "朗读"),
|
||||
# ("p", "precache_current", "预缓存当前单元集"), # 新增预缓存快捷键
|
||||
]
|
||||
if config.get("quick_pass"):
|
||||
BINDINGS.append(("k", "quick_pass", "快速通过[调试]"))
|
||||
btn = dict()
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
nucleon_file: pt.NucleonUnion,
|
||||
electron_file: pt.ElectronUnion,
|
||||
tasked_num,
|
||||
):
|
||||
super().__init__(name=None, id=None, classes=None)
|
||||
self.nucleon_file = nucleon_file
|
||||
self.electron_file = electron_file
|
||||
self.reactor = Reactor(nucleon_file, electron_file, self, tasked_num)
|
||||
self.stage = 1
|
||||
self.stage += self.reactor.set_round_templated(self.stage)
|
||||
first_forward = self.reactor.forward()
|
||||
print(first_forward)
|
||||
if first_forward == -1:
|
||||
self.stage = 3
|
||||
self.reactor.set_round_templated(3)
|
||||
print(self.reactor.forward())
|
||||
#self._forward_judge(first_forward)
|
||||
self.compo = next(self.reactor.current_appar)
|
||||
self.feedback_state = 0 # 默认状态
|
||||
self.feedback_state_map = {
|
||||
0: "",
|
||||
255: "回答有误, 请重试. 或者重新学习此单元",
|
||||
}
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
if type(self.compo).__name__ == "Recognition":
|
||||
self.action_play_voice()
|
||||
yield Header(show_clock=True)
|
||||
with Center():
|
||||
yield Static(
|
||||
f"当前进度: {len(self.reactor.procession) - self.reactor.index}/{len(self.reactor.procession)}"
|
||||
)
|
||||
yield Label(self.feedback_state_map[self.feedback_state])
|
||||
yield from self.compo.compose()
|
||||
if self.feedback_state == 255:
|
||||
yield Button("重新学习此单元", id="re-recognize", variant="warning")
|
||||
yield Footer()
|
||||
|
||||
def on_mount(self):
|
||||
pass
|
||||
|
||||
def on_button_pressed(self, event):
|
||||
try:
|
||||
if event.button.id == "re-recognize":
|
||||
return
|
||||
except:
|
||||
pass
|
||||
ret = self.compo.handler(event, "button")
|
||||
self._forward_judge(ret)
|
||||
def _forward_judge(self, ret):
|
||||
self.feedback_state = 0
|
||||
if ret == -1:
|
||||
return
|
||||
if ret == 0:
|
||||
try:
|
||||
self.compo = next(self.reactor.current_appar)
|
||||
self.refresh_ui()
|
||||
except StopIteration:
|
||||
nxt = self.reactor.forward(1)
|
||||
try:
|
||||
self.compo = next(self.reactor.current_appar)
|
||||
except:
|
||||
pass
|
||||
if nxt == -1:
|
||||
if self.reactor.round_set == 0:
|
||||
if self.stage == 4:
|
||||
if config.get("save"):
|
||||
self.reactor.save()
|
||||
self.compo = compo.Finished(
|
||||
self, None, pt.Atom.placeholder()
|
||||
)
|
||||
self.refresh_ui()
|
||||
else:
|
||||
self.reactor.set_round_templated(self.stage)
|
||||
self.reactor.forward(1)
|
||||
self.stage += 1
|
||||
self.compo = next(self.reactor.current_appar)
|
||||
self.refresh_ui()
|
||||
return
|
||||
return
|
||||
else:
|
||||
self.refresh_ui()
|
||||
return
|
||||
if ret >= 1:
|
||||
if ret == 2:
|
||||
self.feedback_state = 255 # 表示错误
|
||||
else:
|
||||
self.feedback_state = 0
|
||||
self.refresh_ui()
|
||||
return
|
||||
|
||||
def refresh_ui(self):
|
||||
self.call_later(self.recompose)
|
||||
print(type(self.compo).__name__)
|
||||
|
||||
def action_play_voice(self):
|
||||
def play():
|
||||
cache_dir = pathlib.Path(f"./cache/voice/")
|
||||
cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
cache = cache_dir / f"{aux.get_md5(self.reactor.current_atom[1].content.replace('/',''))}.wav"
|
||||
if not cache.exists():
|
||||
import edge_tts as tts
|
||||
communicate = tts.Communicate(
|
||||
self.reactor.current_atom[1].content.replace("/", ""),
|
||||
"zh-CN-XiaoxiaoNeural",
|
||||
)
|
||||
communicate.save_sync(
|
||||
f"./cache/voice/{aux.get_md5(self.reactor.current_atom[1].content.replace('/',''))}.wav"
|
||||
)
|
||||
playsound(str(cache))
|
||||
|
||||
threading.Thread(target=play).start()
|
||||
|
||||
def action_precache_current(self):
|
||||
"""预缓存当前单元集的音频"""
|
||||
precache_screen = PrecachingScreen(self.nucleon_file)
|
||||
self.app.push_screen(precache_screen)
|
||||
|
||||
def action_quick_pass(self):
|
||||
self.reactor.report(self.reactor.current_atom, 5)
|
||||
self._forward_judge(0)
|
||||
def action_toggle_dark(self):
|
||||
self.app.action_toggle_dark()
|
||||
|
||||
def action_pop_screen(self):
|
||||
self.app.pop_screen()
|
185
src/heurams/interface/screens/precache.py
Normal file
185
src/heurams/interface/screens/precache.py
Normal file
@@ -0,0 +1,185 @@
|
||||
|
||||
class PrecachingScreen(Screen):
|
||||
"""预缓存音频文件屏幕"""
|
||||
BINDINGS = [("q", "go_back", "返回"), ("escape", "quit_app", "退出")]
|
||||
|
||||
def __init__(self, nucleon_file = None):
|
||||
super().__init__(name=None, id=None, classes=None)
|
||||
self.nucleon_file = nucleon_file
|
||||
self.is_precaching = False
|
||||
self.current_file = ""
|
||||
self.current_item = ""
|
||||
self.progress = 0
|
||||
self.total = 0
|
||||
self.processed = 0
|
||||
self.precache_worker = None
|
||||
|
||||
def compose(self) -> ComposeResult:
|
||||
yield Header(show_clock=True)
|
||||
with Container(id="precache_container"):
|
||||
yield Label("[b]音频预缓存[/b]", classes="title-label")
|
||||
|
||||
if self.nucleon_file:
|
||||
yield Static(f"目标单元集: [b]{self.nucleon_file.name}[/b]", classes="target-info")
|
||||
yield Static(f"单元数量: {len(self.nucleon_file.nucleons)}", classes="target-info")
|
||||
else:
|
||||
yield Static("目标: 所有单元集", classes="target-info")
|
||||
|
||||
yield Static(id="status", classes="status-info")
|
||||
yield Static(id="current_item", classes="current-item")
|
||||
yield ProgressBar(total=100, show_eta=False, id="progress_bar")
|
||||
|
||||
with Horizontal(classes="button-group"):
|
||||
if not self.is_precaching:
|
||||
yield Button("开始预缓存", id="start_precache", variant="primary")
|
||||
else:
|
||||
yield Button("取消预缓存", id="cancel_precache", variant="error")
|
||||
yield Button("清空缓存", id="clear_cache", variant="warning")
|
||||
yield Button("返回", id="go_back", variant="default")
|
||||
yield Footer()
|
||||
|
||||
def on_mount(self):
|
||||
"""挂载时初始化状态"""
|
||||
self.update_status("就绪", "等待开始...")
|
||||
|
||||
def update_status(self, status, current_item="", progress=None):
|
||||
"""更新状态显示"""
|
||||
status_widget = self.query_one("#status", Static)
|
||||
item_widget = self.query_one("#current_item", Static)
|
||||
progress_bar = self.query_one("#progress_bar", ProgressBar)
|
||||
|
||||
status_widget.update(f"状态: {status}")
|
||||
item_widget.update(f"当前项目: {current_item}" if current_item else "")
|
||||
|
||||
if progress is not None:
|
||||
progress_bar.progress = progress
|
||||
progress_bar.advance(0) # 刷新显示
|
||||
|
||||
def precache_single_text(self, text: str):
|
||||
"""预缓存单个文本的音频"""
|
||||
cache_dir = pathlib.Path("./cache/voice/")
|
||||
cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
cache = cache_dir / f"{aux.get_md5(text)}.wav"
|
||||
if not cache.exists():
|
||||
try:
|
||||
import edge_tts as tts
|
||||
communicate = tts.Communicate(text, "zh-CN-XiaoxiaoNeural")
|
||||
communicate.save_sync(str(cache))
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"预缓存失败 '{text}': {e}")
|
||||
return False
|
||||
return True
|
||||
|
||||
def precache_file(self, nucleon_union: pt.NucleonUnion):
|
||||
"""预缓存单个文件的所有内容"""
|
||||
self.current_file = nucleon_union.name
|
||||
total_items = len(nucleon_union.nucleons)
|
||||
|
||||
for idx, nucleon in enumerate(nucleon_union.nucleons):
|
||||
# 检查是否被取消
|
||||
worker = get_current_worker()
|
||||
if worker and worker.is_cancelled:
|
||||
return False
|
||||
|
||||
text = nucleon['content'].replace('/', '')
|
||||
self.current_item = text[:50] + "..." if len(text) > 50 else text
|
||||
self.processed += 1
|
||||
|
||||
# 更新进度
|
||||
progress = int((self.processed / self.total) * 100) if self.total > 0 else 0
|
||||
self.update_status(
|
||||
f"处理中: {nucleon_union.name} ({idx+1}/{total_items})",
|
||||
self.current_item,
|
||||
progress
|
||||
)
|
||||
|
||||
# 预缓存音频
|
||||
success = self.precache_single_text(text)
|
||||
if not success:
|
||||
self.update_status("错误", f"处理失败: {self.current_item}")
|
||||
time.sleep(1) # 短暂暂停以便用户看到错误信息
|
||||
|
||||
return True
|
||||
|
||||
def precache_all_files(self):
|
||||
"""预缓存所有文件"""
|
||||
nucleon_path = pathlib.Path("./nucleon")
|
||||
nucleon_files = [f for f in nucleon_path.iterdir() if f.suffix == ".toml"]
|
||||
|
||||
# 计算总项目数
|
||||
self.total = 0
|
||||
for file in nucleon_files:
|
||||
try:
|
||||
nu = pt.NucleonUnion(file)
|
||||
self.total += len(nu.nucleons)
|
||||
except:
|
||||
continue
|
||||
|
||||
self.processed = 0
|
||||
self.is_precaching = True
|
||||
|
||||
for file in nucleon_files:
|
||||
try:
|
||||
nu = pt.NucleonUnion(file)
|
||||
if not self.precache_file(nu):
|
||||
break # 用户取消
|
||||
except Exception as e:
|
||||
print(f"处理文件失败 {file}: {e}")
|
||||
continue
|
||||
|
||||
self.is_precaching = False
|
||||
self.update_status("完成", "所有音频文件已预缓存", 100)
|
||||
|
||||
def precache_single_file(self):
|
||||
"""预缓存单个文件"""
|
||||
if not self.nucleon_file:
|
||||
return
|
||||
|
||||
self.total = len(self.nucleon_file.nucleons)
|
||||
self.processed = 0
|
||||
self.is_precaching = True
|
||||
|
||||
success = self.precache_file(self.nucleon_file)
|
||||
|
||||
self.is_precaching = False
|
||||
if success:
|
||||
self.update_status("完成", f"'{self.nucleon_file.name}' 音频文件已预缓存", 100)
|
||||
else:
|
||||
self.update_status("已取消", "预缓存操作被用户取消", 0)
|
||||
|
||||
def on_button_pressed(self, event: Button.Pressed) -> None:
|
||||
if event.button.id == "start_precache" and not self.is_precaching:
|
||||
# 开始预缓存
|
||||
if self.nucleon_file:
|
||||
self.precache_worker = self.run_worker(self.precache_single_file, thread=True)
|
||||
else:
|
||||
self.precache_worker = self.run_worker(self.precache_all_files, thread=True)
|
||||
|
||||
elif event.button.id == "cancel_precache" and self.is_precaching:
|
||||
# 取消预缓存
|
||||
if self.precache_worker:
|
||||
self.precache_worker.cancel()
|
||||
self.is_precaching = False
|
||||
self.update_status("已取消", "预缓存操作被用户取消", 0)
|
||||
|
||||
elif event.button.id == "clear_cache":
|
||||
# 清空缓存
|
||||
try:
|
||||
shutil.rmtree("./cache/voice", ignore_errors=True)
|
||||
self.update_status("已清空", "音频缓存已清空", 0)
|
||||
except Exception as e:
|
||||
self.update_status("错误", f"清空缓存失败: {e}")
|
||||
|
||||
elif event.button.id == "go_back":
|
||||
self.action_go_back()
|
||||
|
||||
def action_go_back(self):
|
||||
if self.is_precaching and self.precache_worker:
|
||||
self.precache_worker.cancel()
|
||||
self.app.pop_screen()
|
||||
|
||||
def action_quit_app(self):
|
||||
if self.is_precaching and self.precache_worker:
|
||||
self.precache_worker.cancel()
|
||||
self.app.exit()
|
@@ -1,8 +1,9 @@
|
||||
#!/usr/bin/env python3
|
||||
import pathlib
|
||||
import toml
|
||||
import time
|
||||
import auxiliary as aux
|
||||
|
||||
import heurams.services.timer as timer
|
||||
from typing import List
|
||||
|
||||
class Electron:
|
||||
"""电子: 记忆分析元数据及算法"""
|
||||
@@ -79,8 +80,8 @@ class Electron:
|
||||
self.metadata['interval'] * self.metadata['efactor']
|
||||
)
|
||||
|
||||
self.metadata['last_date'] = aux.get_daystamp()
|
||||
self.metadata['next_date'] = aux.get_daystamp() + self.metadata['interval']
|
||||
self.metadata['last_date'] = timer.get_daystamp()
|
||||
self.metadata['next_date'] = timer.get_daystamp() + self.metadata['interval']
|
||||
self.metadata['last_modify'] = time.time()
|
||||
|
||||
def __str__(self):
|
||||
@@ -155,7 +156,7 @@ class Nucleon:
|
||||
return Nucleon("核子对象样例内容", {})
|
||||
|
||||
|
||||
class NucleonUnion:
|
||||
class NucleonUnion():
|
||||
"""
|
||||
替代原有 NucleonFile 类, 支持复杂逻辑
|
||||
|
||||
@@ -171,23 +172,31 @@ class NucleonUnion:
|
||||
path (Path): 包含核子数据的文件路径。
|
||||
"""
|
||||
|
||||
def __init__(self, path):
|
||||
def __init__(self, path: pathlib.Path):
|
||||
self.path = path
|
||||
self.name = path.name.replace(path.suffix, "")
|
||||
with open(path, 'r') as f:
|
||||
all = toml.load(f)
|
||||
lst = list()
|
||||
for i in all.keys():
|
||||
if "attr" in i:
|
||||
continue
|
||||
if "data" in i:
|
||||
continue
|
||||
lst.append(Nucleon(i, all[i]))
|
||||
self.keydata = all["keydata"]
|
||||
self.testdata = all["testdata"]
|
||||
self.nucleons = lst
|
||||
self.nucleons: List[Nucleon] = lst
|
||||
self.nucleons_dict = {i.content: i for i in lst}
|
||||
|
||||
def __len__(self):
|
||||
return len(self.nucleons)
|
||||
|
||||
def linked_electron_union(self):
|
||||
if (self.path.parent / '..' / 'electron' / self.path.name).exists():
|
||||
return ElectronUnion(self.path.parent / '..' / 'electron' / self.path.name)
|
||||
else:
|
||||
return 0
|
||||
|
||||
def save(self):
|
||||
with open(self.path, 'w') as f:
|
||||
@@ -197,58 +206,40 @@ class NucleonUnion:
|
||||
|
||||
class ElectronUnion:
|
||||
"""取代原有 ElectronFile 类, 以支持复杂逻辑"""
|
||||
|
||||
def __init__(self, path):
|
||||
self.path = path
|
||||
print(path)
|
||||
self.name = path.name.replace(path.suffix, "")
|
||||
with open(path, 'r') as f:
|
||||
all = toml.load(f)
|
||||
lst = list()
|
||||
for i in all.keys():
|
||||
lst.append(Electron(i, all[i]))
|
||||
if i != "total":
|
||||
lst.append(Electron(i, all[i]))
|
||||
self.total = all.get("total", {"last_date": 0})
|
||||
self.electrons = lst
|
||||
self.electrons_dict = {i.content: i for i in lst}
|
||||
|
||||
def sync(self):
|
||||
"""同步 electrons_dict 中新增对到 electrons 中"""
|
||||
"""同步 electrons_dict 中新增对到 electrons 中, 仅用于缺省初始化不存在映射时调用"""
|
||||
self.electrons = self.electrons_dict.values()
|
||||
|
||||
def __len__(self):
|
||||
return len(self.electrons)
|
||||
|
||||
def linked_nucleon_union(self):
|
||||
return NucleonUnion(self.path.parent / '..' / 'nucleon' / self.path.name)
|
||||
|
||||
def save(self):
|
||||
# print(1)
|
||||
self.total["last_date"] = timer.get_daystamp()
|
||||
with open(self.path, 'w') as f:
|
||||
tmp = {i.content: i.metadata for i in self.electrons}
|
||||
tmp["total"] = self.total
|
||||
# print(tmp)
|
||||
toml.dump(tmp, f)
|
||||
|
||||
|
||||
"""
|
||||
class AtomicFile():
|
||||
def __init__(self, path, type_="unknown"):
|
||||
self.path = path
|
||||
self.type_ = type_
|
||||
if type_ == "nucleon":
|
||||
self.name, self.datalist = Nucleon.import_from_file(pathlib.Path(path))
|
||||
if type_ == "electron":
|
||||
self.name, self.datalist = Electron.import_from_file(pathlib.Path(path))
|
||||
def save(self):
|
||||
dictobj = {i.content: i.export_data() for i in self.datalist}
|
||||
print(dictobj)
|
||||
if self.type_ == "nucleon":
|
||||
Nucleon.save_to_file(dictobj, self.path)
|
||||
if self.type_ == "electron":
|
||||
Electron.save_to_file(dictobj, self.path)
|
||||
def get_full_content(self):
|
||||
if self.type_ == "nucleon":
|
||||
text = ""
|
||||
for i in self.datalist:
|
||||
text += i.content
|
||||
return text
|
||||
return ""
|
||||
def get_len(self):
|
||||
return len(self.datalist)
|
||||
"""
|
||||
|
||||
|
||||
class Atom:
|
||||
@staticmethod
|
||||
def placeholder():
|
126
src/heurams/kernel/particles/electron.py
Normal file
126
src/heurams/kernel/particles/electron.py
Normal file
@@ -0,0 +1,126 @@
|
||||
#!/usr/bin/env python3
|
||||
import pathlib
|
||||
import toml
|
||||
import time
|
||||
import heurams.services.timer as timer
|
||||
|
||||
class Electron:
|
||||
"""电子: 记忆分析元数据及算法"""
|
||||
algorithm = "SM-2" # 暂时使用 SM-2 算法进行记忆拟合, 考虑 SM-15 替代
|
||||
|
||||
def __init__(self, content: str, metadata: dict):
|
||||
self.content = content
|
||||
self.metadata = metadata
|
||||
if metadata == {}:
|
||||
# print("NULL")
|
||||
self._default_init()
|
||||
|
||||
def _default_init(self):
|
||||
defaults = {
|
||||
'efactor': 2.5, # 易度系数, 越大越简单, 最大为5
|
||||
'real_rept': 0, # (实际)重复次数
|
||||
'rept': 0, # (有效)重复次数
|
||||
'interval': 0, # 最佳间隔
|
||||
'last_date': 0, # 上一次复习的时间戳
|
||||
'next_date': 0, # 将要复习的时间戳
|
||||
'is_activated': 0, # 激活状态
|
||||
# *NOTE: 此处"时间戳"是以天为单位的整数, 即 UNIX 时间戳除以一天的秒数取整
|
||||
'last_modify': time.time() # 最后修改时间戳(此处是UNIX时间戳)
|
||||
}
|
||||
self.metadata = defaults
|
||||
|
||||
def activate(self):
|
||||
self.metadata['is_activated'] = 1
|
||||
self.metadata['last_modify'] = time.time()
|
||||
|
||||
def modify(self, var: str, value):
|
||||
if var in self.metadata:
|
||||
self.metadata[var] = value
|
||||
self.metadata['last_modify'] = time.time()
|
||||
else:
|
||||
print(f"警告: '{var}' 非已知元数据字段")
|
||||
|
||||
def revisor(self, quality: int = 5, is_new_activation: bool = False):
|
||||
"""SM-2 算法迭代决策机制实现
|
||||
根据 quality(0 ~ 5) 进行参数迭代最佳间隔
|
||||
quality 由主程序评估
|
||||
|
||||
Args:
|
||||
quality (int): 记忆保留率量化参数
|
||||
"""
|
||||
print(f"REVISOR: {quality}, {is_new_activation}")
|
||||
if quality == -1:
|
||||
return -1
|
||||
|
||||
self.metadata['efactor'] = self.metadata['efactor'] + (
|
||||
0.1 - (5 - quality) * (0.08 + (5 - quality) * 0.02)
|
||||
)
|
||||
self.metadata['efactor'] = max(1.3, self.metadata['efactor'])
|
||||
|
||||
if quality < 3:
|
||||
# 若保留率低于 3,重置重复次数
|
||||
self.metadata['rept'] = 0
|
||||
self.metadata['interval'] = 0 # 设为0,以便下面重新计算 I(1)
|
||||
else:
|
||||
self.metadata['rept'] += 1
|
||||
|
||||
self.metadata['real_rept'] += 1
|
||||
|
||||
if is_new_activation: # 初次激活
|
||||
self.metadata['rept'] = 0
|
||||
self.metadata['efactor'] = 2.5
|
||||
|
||||
if self.metadata['rept'] == 0: # 刚被重置或初次激活后复习
|
||||
self.metadata['interval'] = 1 # I(1)
|
||||
elif self.metadata['rept'] == 1:
|
||||
self.metadata['interval'] = 6 # I(2) 经验公式
|
||||
else:
|
||||
self.metadata['interval'] = round(
|
||||
self.metadata['interval'] * self.metadata['efactor']
|
||||
)
|
||||
|
||||
self.metadata['last_date'] = timer.get_daystamp()
|
||||
self.metadata['next_date'] = timer.get_daystamp() + self.metadata['interval']
|
||||
self.metadata['last_modify'] = time.time()
|
||||
|
||||
def __str__(self):
|
||||
return (
|
||||
f"记忆单元预览 \n"
|
||||
f"内容: '{self.content}' \n"
|
||||
f"易度系数: {self.metadata['efactor']:.2f} \n"
|
||||
f"已经重复的次数: {self.metadata['rept']} \n"
|
||||
f"下次间隔: {self.metadata['interval']} 天 \n"
|
||||
f"下次复习日期时间戳: {self.metadata['next_date']}"
|
||||
)
|
||||
|
||||
def __eq__(self, other):
|
||||
if self.content == other.content:
|
||||
return True
|
||||
return False
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.content)
|
||||
|
||||
def __getitem__(self, key):
|
||||
if key == "content":
|
||||
return self.content
|
||||
if key in self.metadata:
|
||||
return self.metadata[key]
|
||||
else:
|
||||
raise KeyError(f"Key '{key}' not found in metadata.")
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
if key == "content":
|
||||
raise AttributeError("content 应为只读")
|
||||
self.metadata[key] = value
|
||||
self.metadata['last_modify'] = time.time()
|
||||
|
||||
def __iter__(self):
|
||||
yield from self.metadata.keys()
|
||||
|
||||
def __len__(self):
|
||||
return len(self.metadata)
|
||||
|
||||
@staticmethod
|
||||
def placeholder():
|
||||
return Electron("电子对象样例内容", {})
|
1
src/heurams/services/audio_service.py
Normal file
1
src/heurams/services/audio_service.py
Normal file
@@ -0,0 +1 @@
|
||||
# 音频服务
|
1
src/heurams/services/cache.py
Normal file
1
src/heurams/services/cache.py
Normal file
@@ -0,0 +1 @@
|
||||
# 缓存服务
|
@@ -1,4 +1,4 @@
|
||||
import time
|
||||
# 配置文件服务
|
||||
import pathlib
|
||||
import toml
|
||||
import typing
|
||||
@@ -33,14 +33,3 @@ class ConfigFile:
|
||||
def get(self, key: str, default: typing.Any = None) -> typing.Any:
|
||||
"""获取配置值,如果不存在返回默认值"""
|
||||
return self.data.get(key, default)
|
||||
|
||||
|
||||
def get_daystamp() -> int:
|
||||
"""获取当前日戳(以天为单位的整数时间戳)"""
|
||||
config = ConfigFile("config.toml")
|
||||
time_override = config.get("time_override", -1)
|
||||
|
||||
if time_override != -1:
|
||||
return int(time_override)
|
||||
|
||||
return int(time.time() // (24 * 3600))
|
5
src/heurams/services/hasher.py
Normal file
5
src/heurams/services/hasher.py
Normal file
@@ -0,0 +1,5 @@
|
||||
# 哈希服务
|
||||
import hashlib
|
||||
|
||||
def get_md5(text):
|
||||
return hashlib.md5(text.encode('utf-8')).hexdigest()
|
20
src/heurams/services/timer.py
Normal file
20
src/heurams/services/timer.py
Normal file
@@ -0,0 +1,20 @@
|
||||
# 时间服务
|
||||
from heurams.context import config_var
|
||||
import time
|
||||
|
||||
def get_daystamp() -> int:
|
||||
"""获取当前日戳(以天为单位的整数时间戳)"""
|
||||
time_override = config_var.get().get("daystamp_override", -1)
|
||||
if time_override != -1:
|
||||
return int(time_override)
|
||||
|
||||
return int((time.time() + config_var.get().get("timezone_offset")) // (24 * 3600))
|
||||
|
||||
def get_timestamp() -> float:
|
||||
"""获取 UNIX 时间戳"""
|
||||
# 搞这个类的原因是要支持可复现操作
|
||||
time_override = config_var.get().get("timestamp_override", -1)
|
||||
if time_override != -1:
|
||||
return float(time_override)
|
||||
|
||||
return time.time()
|
1
src/heurams/services/tts_service.py
Normal file
1
src/heurams/services/tts_service.py
Normal file
@@ -0,0 +1 @@
|
||||
# 文本转语音服务
|
1
src/heurams/services/version.py
Normal file
1
src/heurams/services/version.py
Normal file
@@ -0,0 +1 @@
|
||||
# 版本控制集成服务
|
62
styles.css
62
styles.css
@@ -1,62 +0,0 @@
|
||||
Screen {
|
||||
align: center bottom;
|
||||
|
||||
}
|
||||
|
||||
#main_container {
|
||||
align: center middle;
|
||||
width: 95%;
|
||||
height: auto;
|
||||
border: thick $primary-lighten-2;
|
||||
padding: 2;
|
||||
}
|
||||
|
||||
#vice_container {
|
||||
align: center middle;
|
||||
width: 95%;
|
||||
height: auto;
|
||||
padding: 2;
|
||||
}
|
||||
|
||||
#sentence {
|
||||
content-align: center middle;
|
||||
width: 100%;
|
||||
height: 5;
|
||||
margin-bottom: 2;
|
||||
text-style: bold;
|
||||
}
|
||||
|
||||
#progress {
|
||||
width: 100%;
|
||||
content-align: center middle;
|
||||
margin-bottom: 1;
|
||||
color: $text-muted;
|
||||
}
|
||||
|
||||
|
||||
|
||||
Button {
|
||||
align-horizontal: center;
|
||||
width: 50%;
|
||||
margin: 0 0;
|
||||
}
|
||||
.choice {
|
||||
align-horizontal: center;
|
||||
width: 50%;
|
||||
margin: 0 0;
|
||||
height: auto;
|
||||
}
|
||||
/* no_margin.tcss */
|
||||
|
||||
#button_container {
|
||||
align-horizontal: center;
|
||||
height: 9;
|
||||
}
|
||||
|
||||
/* 选中 #button_container 下所有的 Horizontal 子元素 */
|
||||
#button_container > Horizontal {
|
||||
margin-top: 0;
|
||||
margin-bottom: 0;
|
||||
align-horizontal: center;
|
||||
width: 40%;
|
||||
}
|
@@ -1,45 +0,0 @@
|
||||
import random
|
||||
|
||||
class BlankPuzzle():
|
||||
"""填空题谜题生成器
|
||||
|
||||
Args:
|
||||
text: 原始字符串(需要 "/" 分割句子, 末尾应有 "/")
|
||||
min_denominator: 最小概率倒数(如占所有可生成填空数的 1/7 中的 7, 若期望值小于 1, 则取 1)
|
||||
"""
|
||||
def __init__(self, text, min_denominator):
|
||||
self.text = text
|
||||
self.min_denominator = min_denominator
|
||||
self.wording = "填空题 - 尚未刷新谜题"
|
||||
self.answer = ["填空题 - 尚未刷新谜题"]
|
||||
|
||||
def refresh(self): # 刷新谜题
|
||||
placeholder = "___SLASH___"
|
||||
tmp_text = self.text.replace("/", placeholder)
|
||||
words = tmp_text.split(placeholder)
|
||||
if not words:
|
||||
return ""
|
||||
words = [word for word in words if word]
|
||||
num_blanks = min(max(1, len(words) // self.min_denominator), len(words))
|
||||
indices_to_blank = random.sample(range(len(words)), num_blanks)
|
||||
indices_to_blank.sort()
|
||||
blanked_words = list(words)
|
||||
answer = list()
|
||||
for index in indices_to_blank:
|
||||
blanked_words[index] = "__" * len(words[index])
|
||||
answer.append(words[index])
|
||||
result = []
|
||||
for word in blanked_words:
|
||||
result.append(word)
|
||||
self.answer = answer
|
||||
self.wording = "".join(result)
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.wording}\n{str(self.answer)}"
|
||||
|
||||
# demo
|
||||
text = """我联合国人民/同兹/决心/: /欲免/后世/再遭/今代人类/两度/身历/惨不堪言/之战祸/.../"""
|
||||
riddle = BlankPuzzle(text, 3)
|
||||
print(riddle)
|
||||
riddle.refresh()
|
||||
print(riddle)
|
@@ -1,45 +0,0 @@
|
||||
import random
|
||||
|
||||
class SelectionPuzzle():
|
||||
"""选择题谜题生成器
|
||||
|
||||
Args:
|
||||
text: 原始字符串(需要 "/" 分割句子, 末尾应有 "/")
|
||||
min_denominator: 最小概率倒数(如占所有可生成填空数的 1/7 中的 7, 若期望值小于 1, 则取 1)
|
||||
"""
|
||||
def __init__(self, prefix_text, origin_dict, min_denominator):
|
||||
self.text = text
|
||||
self.min_denominator = min_denominator
|
||||
self.wording = "填空题 - 尚未刷新谜题"
|
||||
self.answer = ["填空题 - 尚未刷新谜题"]
|
||||
|
||||
def refresh(self): # 刷新谜题
|
||||
placeholder = "___SLASH___"
|
||||
tmp_text = self.text.replace("/", placeholder)
|
||||
words = tmp_text.split(placeholder)
|
||||
if not words:
|
||||
return ""
|
||||
words = [word for word in words if word]
|
||||
num_blanks = min(max(1, len(words) // self.min_denominator), len(words))
|
||||
indices_to_blank = random.sample(range(len(words)), num_blanks)
|
||||
indices_to_blank.sort()
|
||||
blanked_words = list(words)
|
||||
answer = list()
|
||||
for index in indices_to_blank:
|
||||
blanked_words[index] = "__" * len(words[index])
|
||||
answer.append(words[index])
|
||||
result = []
|
||||
for word in blanked_words:
|
||||
result.append(word)
|
||||
self.answer = answer
|
||||
self.wording = "".join(result)
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.wording}\n{str(self.answer)}"
|
||||
|
||||
# demo
|
||||
text = """我联合国人民/同兹/决心/: /欲免/后世/再遭/今代人类/两度/身历/惨不堪言/之战祸/.../"""
|
||||
riddle = BlankPuzzle(text, 3)
|
||||
print(riddle)
|
||||
riddle.refresh()
|
||||
print(riddle)
|
@@ -1,325 +0,0 @@
|
||||
"""
|
||||
An encoding / decoding format suitable for serializing data structures to binary.
|
||||
|
||||
This is based on https://en.wikipedia.org/wiki/Bencode with some extensions.
|
||||
|
||||
The following data types may be encoded:
|
||||
|
||||
- None
|
||||
- int
|
||||
- bool
|
||||
- bytes
|
||||
- str
|
||||
- list
|
||||
- tuple
|
||||
- dict
|
||||
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Callable
|
||||
|
||||
|
||||
class DecodeError(Exception):
|
||||
"""A problem decoding data."""
|
||||
|
||||
|
||||
def dump(data: object) -> bytes:
|
||||
"""Encodes a data structure in to bytes.
|
||||
|
||||
Args:
|
||||
data: Data structure
|
||||
|
||||
Returns:
|
||||
A byte string encoding the data.
|
||||
"""
|
||||
|
||||
def encode_none(_datum: None) -> bytes:
|
||||
"""
|
||||
Encodes a None value.
|
||||
|
||||
Args:
|
||||
datum: Always None.
|
||||
|
||||
Returns:
|
||||
None encoded.
|
||||
"""
|
||||
return b"N"
|
||||
|
||||
def encode_bool(datum: bool) -> bytes:
|
||||
"""
|
||||
Encode a boolean value.
|
||||
|
||||
Args:
|
||||
datum: The boolean value to encode.
|
||||
|
||||
Returns:
|
||||
The encoded bytes.
|
||||
"""
|
||||
return b"T" if datum else b"F"
|
||||
|
||||
def encode_int(datum: int) -> bytes:
|
||||
"""
|
||||
Encode an integer value.
|
||||
|
||||
Args:
|
||||
datum: The integer value to encode.
|
||||
|
||||
Returns:
|
||||
The encoded bytes.
|
||||
"""
|
||||
return b"i%ie" % datum
|
||||
|
||||
def encode_bytes(datum: bytes) -> bytes:
|
||||
"""
|
||||
Encode a bytes value.
|
||||
|
||||
Args:
|
||||
datum: The bytes value to encode.
|
||||
|
||||
Returns:
|
||||
The encoded bytes.
|
||||
"""
|
||||
return b"%i:%s" % (len(datum), datum)
|
||||
|
||||
def encode_string(datum: str) -> bytes:
|
||||
"""
|
||||
Encode a string value.
|
||||
|
||||
Args:
|
||||
datum: The string value to encode.
|
||||
|
||||
Returns:
|
||||
The encoded bytes.
|
||||
"""
|
||||
encoded_data = datum.encode("utf-8")
|
||||
return b"s%i:%s" % (len(encoded_data), encoded_data)
|
||||
|
||||
def encode_list(datum: list) -> bytes:
|
||||
"""
|
||||
Encode a list value.
|
||||
|
||||
Args:
|
||||
datum: The list value to encode.
|
||||
|
||||
Returns:
|
||||
The encoded bytes.
|
||||
"""
|
||||
return b"l%se" % b"".join(encode(element) for element in datum)
|
||||
|
||||
def encode_tuple(datum: tuple) -> bytes:
|
||||
"""
|
||||
Encode a tuple value.
|
||||
|
||||
Args:
|
||||
datum: The tuple value to encode.
|
||||
|
||||
Returns:
|
||||
The encoded bytes.
|
||||
"""
|
||||
return b"t%se" % b"".join(encode(element) for element in datum)
|
||||
|
||||
def encode_dict(datum: dict) -> bytes:
|
||||
"""
|
||||
Encode a dictionary value.
|
||||
|
||||
Args:
|
||||
datum: The dictionary value to encode.
|
||||
|
||||
Returns:
|
||||
The encoded bytes.
|
||||
"""
|
||||
return b"d%se" % b"".join(
|
||||
b"%s%s" % (encode(key), encode(value)) for key, value in datum.items()
|
||||
)
|
||||
|
||||
ENCODERS: dict[type, Callable[[Any], Any]] = {
|
||||
type(None): encode_none,
|
||||
bool: encode_bool,
|
||||
int: encode_int,
|
||||
bytes: encode_bytes,
|
||||
str: encode_string,
|
||||
list: encode_list,
|
||||
tuple: encode_tuple,
|
||||
dict: encode_dict,
|
||||
}
|
||||
|
||||
def encode(datum: object) -> bytes:
|
||||
"""Recursively encode data.
|
||||
|
||||
Args:
|
||||
datum: Data suitable for encoding.
|
||||
|
||||
Raises:
|
||||
TypeError: If `datum` is not one of the supported types.
|
||||
|
||||
Returns:
|
||||
Encoded data bytes.
|
||||
"""
|
||||
try:
|
||||
decoder = ENCODERS[type(datum)]
|
||||
except KeyError:
|
||||
raise TypeError("Can't encode {datum!r}") from None
|
||||
return decoder(datum)
|
||||
|
||||
return encode(data)
|
||||
|
||||
|
||||
def load(encoded: bytes) -> object:
|
||||
"""Load an encoded data structure from bytes.
|
||||
|
||||
Args:
|
||||
encoded: Encoded data in bytes.
|
||||
|
||||
Raises:
|
||||
DecodeError: If an error was encountered decoding the string.
|
||||
|
||||
Returns:
|
||||
Decoded data.
|
||||
"""
|
||||
if not isinstance(encoded, bytes):
|
||||
raise TypeError("must be bytes")
|
||||
max_position = len(encoded)
|
||||
position = 0
|
||||
|
||||
def get_byte() -> bytes:
|
||||
"""Get an encoded byte and advance position.
|
||||
|
||||
Raises:
|
||||
DecodeError: If the end of the data was reached
|
||||
|
||||
Returns:
|
||||
A bytes object with a single byte.
|
||||
"""
|
||||
nonlocal position
|
||||
if position >= max_position:
|
||||
raise DecodeError("More data expected")
|
||||
character = encoded[position : position + 1]
|
||||
position += 1
|
||||
return character
|
||||
|
||||
def peek_byte() -> bytes:
|
||||
"""Get the byte at the current position, but don't advance position.
|
||||
|
||||
Returns:
|
||||
A bytes object with a single byte.
|
||||
"""
|
||||
return encoded[position : position + 1]
|
||||
|
||||
def get_bytes(size: int) -> bytes:
|
||||
"""Get a number of bytes of encode data.
|
||||
|
||||
Args:
|
||||
size: Number of bytes to retrieve.
|
||||
|
||||
Raises:
|
||||
DecodeError: If there aren't enough bytes.
|
||||
|
||||
Returns:
|
||||
A bytes object.
|
||||
"""
|
||||
nonlocal position
|
||||
bytes_data = encoded[position : position + size]
|
||||
if len(bytes_data) != size:
|
||||
raise DecodeError(b"Missing bytes in {bytes_data!r}")
|
||||
position += size
|
||||
return bytes_data
|
||||
|
||||
def decode_int() -> int:
|
||||
"""Decode an int from the encoded data.
|
||||
|
||||
Returns:
|
||||
An integer.
|
||||
"""
|
||||
int_bytes = b""
|
||||
while (byte := get_byte()) != b"e":
|
||||
int_bytes += byte
|
||||
return int(int_bytes)
|
||||
|
||||
def decode_bytes(size_bytes: bytes) -> bytes:
|
||||
"""Decode a bytes string from the encoded data.
|
||||
|
||||
Returns:
|
||||
A bytes object.
|
||||
"""
|
||||
while (byte := get_byte()) != b":":
|
||||
size_bytes += byte
|
||||
bytes_string = get_bytes(int(size_bytes))
|
||||
return bytes_string
|
||||
|
||||
def decode_string() -> str:
|
||||
"""Decode a (utf-8 encoded) string from the encoded data.
|
||||
|
||||
Returns:
|
||||
A string.
|
||||
"""
|
||||
size_bytes = b""
|
||||
while (byte := get_byte()) != b":":
|
||||
size_bytes += byte
|
||||
bytes_string = get_bytes(int(size_bytes))
|
||||
decoded_string = bytes_string.decode("utf-8", errors="replace")
|
||||
return decoded_string
|
||||
|
||||
def decode_list() -> list[object]:
|
||||
"""Decode a list.
|
||||
|
||||
Returns:
|
||||
A list of data.
|
||||
"""
|
||||
elements: list[object] = []
|
||||
add_element = elements.append
|
||||
while peek_byte() != b"e":
|
||||
add_element(decode())
|
||||
get_byte()
|
||||
return elements
|
||||
|
||||
def decode_tuple() -> tuple[object, ...]:
|
||||
"""Decode a tuple.
|
||||
|
||||
Returns:
|
||||
A tuple of decoded data.
|
||||
"""
|
||||
elements: list[object] = []
|
||||
add_element = elements.append
|
||||
while peek_byte() != b"e":
|
||||
add_element(decode())
|
||||
get_byte()
|
||||
return tuple(elements)
|
||||
|
||||
def decode_dict() -> dict[object, object]:
|
||||
"""Decode a dict.
|
||||
|
||||
Returns:
|
||||
A dict of decoded data.
|
||||
"""
|
||||
elements: dict[object, object] = {}
|
||||
add_element = elements.__setitem__
|
||||
while peek_byte() != b"e":
|
||||
add_element(decode(), decode())
|
||||
get_byte()
|
||||
return elements
|
||||
|
||||
DECODERS = {
|
||||
b"i": decode_int,
|
||||
b"s": decode_string,
|
||||
b"l": decode_list,
|
||||
b"t": decode_tuple,
|
||||
b"d": decode_dict,
|
||||
b"T": lambda: True,
|
||||
b"F": lambda: False,
|
||||
b"N": lambda: None,
|
||||
}
|
||||
|
||||
def decode() -> object:
|
||||
"""Recursively decode data.
|
||||
|
||||
Returns:
|
||||
Decoded data.
|
||||
"""
|
||||
decoder = DECODERS.get(initial := get_byte(), None)
|
||||
if decoder is None:
|
||||
return decode_bytes(initial)
|
||||
return decoder()
|
||||
|
||||
return decode()
|
@@ -1,349 +0,0 @@
|
||||
from __future__ import annotations
|
||||
from pathlib import Path
|
||||
|
||||
import asyncio
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
from typing import Awaitable, Callable, Literal
|
||||
from asyncio.subprocess import Process
|
||||
import logging
|
||||
|
||||
from importlib.metadata import version
|
||||
import uuid
|
||||
|
||||
from webshare.download_manager import DownloadManager
|
||||
from webshare._binary_encode import load as binary_load
|
||||
|
||||
log = logging.getLogger("textual-serve")
|
||||
|
||||
|
||||
class AppService:
|
||||
"""Creates and manages a single Textual app subprocess.
|
||||
|
||||
When a user connects to the websocket in their browser, a new AppService
|
||||
instance is created to manage the corresponding Textual app process.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
command: str,
|
||||
*,
|
||||
write_bytes: Callable[[bytes], Awaitable[None]],
|
||||
write_str: Callable[[str], Awaitable[None]],
|
||||
close: Callable[[], Awaitable[None]],
|
||||
download_manager: DownloadManager,
|
||||
debug: bool = False,
|
||||
) -> None:
|
||||
self.app_service_id: str = uuid.uuid4().hex
|
||||
"""The unique ID of this running app service."""
|
||||
self.command = command
|
||||
"""The command to launch the Textual app subprocess."""
|
||||
self.remote_write_bytes = write_bytes
|
||||
"""Write bytes to the client browser websocket."""
|
||||
self.remote_write_str = write_str
|
||||
"""Write string to the client browser websocket."""
|
||||
self.remote_close = close
|
||||
"""Close the client browser websocket."""
|
||||
self.debug = debug
|
||||
"""Enable/disable debug mode."""
|
||||
|
||||
self._process: Process | None = None
|
||||
self._task: asyncio.Task[None] | None = None
|
||||
self._stdin: asyncio.StreamWriter | None = None
|
||||
self._exit_event = asyncio.Event()
|
||||
self._download_manager = download_manager
|
||||
|
||||
@property
|
||||
def stdin(self) -> asyncio.StreamWriter:
|
||||
"""The processes standard input stream."""
|
||||
assert self._stdin is not None
|
||||
return self._stdin
|
||||
|
||||
def _build_environment(self, width: int = 80, height: int = 24) -> dict[str, str]:
|
||||
"""Build an environment dict for the App subprocess.
|
||||
|
||||
Args:
|
||||
width: Initial width.
|
||||
height: Initial height.
|
||||
|
||||
Returns:
|
||||
A environment dict.
|
||||
"""
|
||||
environment = dict(os.environ.copy())
|
||||
environment["TEXTUAL_DRIVER"] = "textual.drivers.web_driver:WebDriver"
|
||||
environment["TEXTUAL_FPS"] = "60"
|
||||
environment["TEXTUAL_COLOR_SYSTEM"] = "truecolor"
|
||||
environment["TERM_PROGRAM"] = "textual"
|
||||
environment["TERM_PROGRAM_VERSION"] = version("textual-serve")
|
||||
environment["COLUMNS"] = str(width)
|
||||
environment["ROWS"] = str(height)
|
||||
if self.debug:
|
||||
environment["TEXTUAL"] = "debug,devtools"
|
||||
environment["TEXTUAL_LOG"] = "textual.log"
|
||||
return environment
|
||||
|
||||
async def _open_app_process(self, width: int = 80, height: int = 24) -> Process:
|
||||
"""Open a process to run the app.
|
||||
|
||||
Args:
|
||||
width: Width of the terminal.
|
||||
height: height of the terminal.
|
||||
"""
|
||||
environment = self._build_environment(width=width, height=height)
|
||||
self._process = process = await asyncio.create_subprocess_shell(
|
||||
self.command,
|
||||
stdin=asyncio.subprocess.PIPE,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
env=environment,
|
||||
)
|
||||
assert process.stdin is not None
|
||||
self._stdin = process.stdin
|
||||
|
||||
return process
|
||||
|
||||
@classmethod
|
||||
def encode_packet(cls, packet_type: Literal[b"D", b"M"], payload: bytes) -> bytes:
|
||||
"""Encode a packet.
|
||||
|
||||
Args:
|
||||
packet_type: The packet type (b"D" for data or b"M" for meta)
|
||||
payload: The payload.
|
||||
|
||||
Returns:
|
||||
Data as bytes.
|
||||
"""
|
||||
return b"%s%s%s" % (packet_type, len(payload).to_bytes(4, "big"), payload)
|
||||
|
||||
async def send_bytes(self, data: bytes) -> bool:
|
||||
"""Send bytes to process.
|
||||
|
||||
Args:
|
||||
data: Data to send.
|
||||
|
||||
Returns:
|
||||
True if the data was sent, otherwise False.
|
||||
"""
|
||||
stdin = self.stdin
|
||||
try:
|
||||
stdin.write(self.encode_packet(b"D", data))
|
||||
except RuntimeError:
|
||||
return False
|
||||
try:
|
||||
await stdin.drain()
|
||||
except Exception:
|
||||
return False
|
||||
return True
|
||||
|
||||
async def send_meta(self, data: dict[str, str | None | int | bool]) -> bool:
|
||||
"""Send meta information to process.
|
||||
|
||||
Args:
|
||||
data: Meta dict to send.
|
||||
|
||||
Returns:
|
||||
True if the data was sent, otherwise False.
|
||||
"""
|
||||
stdin = self.stdin
|
||||
data_bytes = json.dumps(data).encode("utf-8")
|
||||
try:
|
||||
stdin.write(self.encode_packet(b"M", data_bytes))
|
||||
except RuntimeError:
|
||||
return False
|
||||
try:
|
||||
await stdin.drain()
|
||||
except Exception:
|
||||
return False
|
||||
return True
|
||||
|
||||
async def set_terminal_size(self, width: int, height: int) -> None:
|
||||
"""Tell the process about the new terminal size.
|
||||
|
||||
Args:
|
||||
width: Width of terminal in cells.
|
||||
height: Height of terminal in cells.
|
||||
"""
|
||||
await self.send_meta(
|
||||
{
|
||||
"type": "resize",
|
||||
"width": width,
|
||||
"height": height,
|
||||
}
|
||||
)
|
||||
|
||||
async def blur(self) -> None:
|
||||
"""Send an (app) blur to the process."""
|
||||
await self.send_meta({"type": "blur"})
|
||||
|
||||
async def focus(self) -> None:
|
||||
"""Send an (app) focus to the process."""
|
||||
await self.send_meta({"type": "focus"})
|
||||
|
||||
async def start(self, width: int, height: int) -> None:
|
||||
await self._open_app_process(width, height)
|
||||
self._task = asyncio.create_task(self.run())
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Stop the process and wait for it to complete."""
|
||||
if self._task is not None:
|
||||
await self._download_manager.cancel_app_downloads(
|
||||
app_service_id=self.app_service_id
|
||||
)
|
||||
|
||||
await self.send_meta({"type": "quit"})
|
||||
await self._task
|
||||
self._task = None
|
||||
|
||||
async def run(self) -> None:
|
||||
"""Run the Textual app process.
|
||||
|
||||
!!! note
|
||||
|
||||
Do not call this manually, use `start`.
|
||||
|
||||
"""
|
||||
META = b"M"
|
||||
DATA = b"D"
|
||||
PACKED = b"P"
|
||||
|
||||
assert self._process is not None
|
||||
process = self._process
|
||||
|
||||
stdout = process.stdout
|
||||
stderr = process.stderr
|
||||
assert stdout is not None
|
||||
assert stderr is not None
|
||||
|
||||
stderr_data = io.BytesIO()
|
||||
|
||||
async def read_stderr() -> None:
|
||||
"""Task to read stderr."""
|
||||
try:
|
||||
while True:
|
||||
data = await stderr.read(1024 * 4)
|
||||
if not data:
|
||||
break
|
||||
stderr_data.write(data)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
stderr_task = asyncio.create_task(read_stderr())
|
||||
|
||||
try:
|
||||
ready = False
|
||||
# Wait for prelude text, so we know it is a Textual app
|
||||
for _ in range(10):
|
||||
if not (line := await stdout.readline()):
|
||||
break
|
||||
if line == b"__GANGLION__\n":
|
||||
ready = True
|
||||
break
|
||||
|
||||
if not ready:
|
||||
log.error("Application failed to start")
|
||||
if error_text := stderr_data.getvalue():
|
||||
import sys
|
||||
|
||||
sys.stdout.write(error_text.decode("utf-8", "replace"))
|
||||
|
||||
readexactly = stdout.readexactly
|
||||
int_from_bytes = int.from_bytes
|
||||
while True:
|
||||
type_bytes = await readexactly(1)
|
||||
size_bytes = await readexactly(4)
|
||||
size = int_from_bytes(size_bytes, "big")
|
||||
payload = await readexactly(size)
|
||||
if type_bytes == DATA:
|
||||
await self.on_data(payload)
|
||||
elif type_bytes == META:
|
||||
await self.on_meta(payload)
|
||||
elif type_bytes == PACKED:
|
||||
await self.on_packed(payload)
|
||||
|
||||
except asyncio.IncompleteReadError:
|
||||
pass
|
||||
except ConnectionResetError:
|
||||
pass
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
finally:
|
||||
stderr_task.cancel()
|
||||
await stderr_task
|
||||
|
||||
if error_text := stderr_data.getvalue():
|
||||
import sys
|
||||
|
||||
sys.stdout.write(error_text.decode("utf-8", "replace"))
|
||||
|
||||
async def on_data(self, payload: bytes) -> None:
|
||||
"""Called when there is data.
|
||||
|
||||
Args:
|
||||
payload: Data received from process.
|
||||
"""
|
||||
await self.remote_write_bytes(payload)
|
||||
|
||||
async def on_meta(self, data: bytes) -> None:
|
||||
"""Called when there is a meta packet sent from the running app process.
|
||||
|
||||
Args:
|
||||
data: Encoded meta data.
|
||||
"""
|
||||
meta_data: dict[str, object] = json.loads(data)
|
||||
meta_type = meta_data["type"]
|
||||
|
||||
if meta_type == "exit":
|
||||
await self.remote_close()
|
||||
elif meta_type == "open_url":
|
||||
payload = json.dumps(
|
||||
[
|
||||
"open_url",
|
||||
{
|
||||
"url": meta_data["url"],
|
||||
"new_tab": meta_data["new_tab"],
|
||||
},
|
||||
]
|
||||
)
|
||||
await self.remote_write_str(payload)
|
||||
elif meta_type == "deliver_file_start":
|
||||
log.debug("deliver_file_start, %s", meta_data)
|
||||
try:
|
||||
# Record this delivery key as available for download.
|
||||
delivery_key = str(meta_data["key"])
|
||||
await self._download_manager.create_download(
|
||||
app_service=self,
|
||||
delivery_key=delivery_key,
|
||||
file_name=Path(meta_data["path"]).name,
|
||||
open_method=meta_data["open_method"],
|
||||
mime_type=meta_data["mime_type"],
|
||||
encoding=meta_data["encoding"],
|
||||
name=meta_data.get("name", None),
|
||||
)
|
||||
except KeyError:
|
||||
log.error("Missing key in `deliver_file_start` meta packet")
|
||||
return
|
||||
else:
|
||||
# Tell the browser front-end about the new delivery key,
|
||||
# so that it may hit the "/download/{key}" endpoint
|
||||
# to start the download.
|
||||
json_string = json.dumps(["deliver_file_start", delivery_key])
|
||||
await self.remote_write_str(json_string)
|
||||
else:
|
||||
log.warning(
|
||||
f"Unknown meta type: {meta_type!r}. You may need to update `textual-serve`."
|
||||
)
|
||||
|
||||
async def on_packed(self, payload: bytes) -> None:
|
||||
"""Called when there is a packed packet sent from the running app process.
|
||||
|
||||
Args:
|
||||
payload: Encoded packed data.
|
||||
"""
|
||||
unpacked = binary_load(payload)
|
||||
if unpacked[0] == "deliver_chunk":
|
||||
# If we receive a chunk, hand it to the download manager to
|
||||
# handle distribution to the browser.
|
||||
_, delivery_key, chunk = unpacked
|
||||
await self._download_manager.chunk_received(delivery_key, chunk)
|
@@ -1,197 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from dataclasses import dataclass, field
|
||||
import logging
|
||||
from typing import AsyncGenerator, TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from webshare.app_service import AppService
|
||||
|
||||
log = logging.getLogger("textual-serve")
|
||||
|
||||
DOWNLOAD_TIMEOUT = 4
|
||||
DOWNLOAD_CHUNK_SIZE = 1024 * 64 # 64 KB
|
||||
|
||||
|
||||
@dataclass
|
||||
class Download:
|
||||
app_service: "AppService"
|
||||
"""The app service that the download is associated with."""
|
||||
|
||||
delivery_key: str
|
||||
"""Key which identifies the download."""
|
||||
|
||||
file_name: str
|
||||
"""The name of the file to download. This will be used to set
|
||||
the Content-Disposition filename."""
|
||||
|
||||
open_method: str
|
||||
"""The method to open the file with. "browser" or "download"."""
|
||||
|
||||
mime_type: str
|
||||
"""The mime type of the content."""
|
||||
|
||||
encoding: str | None = None
|
||||
"""The encoding of the content.
|
||||
Will be None if the content is binary.
|
||||
"""
|
||||
|
||||
name: str | None = None
|
||||
"""Optional name set bt the client."""
|
||||
|
||||
incoming_chunks: asyncio.Queue[bytes | None] = field(default_factory=asyncio.Queue)
|
||||
"""A queue of incoming chunks for the download.
|
||||
Chunks are sent from the app service to the download handler
|
||||
via this queue."""
|
||||
|
||||
|
||||
class DownloadManager:
|
||||
"""Class which manages downloads for the server.
|
||||
|
||||
Serves as the link between the web server and app processes during downloads.
|
||||
|
||||
A single server has a single download manager, which manages all downloads for all
|
||||
running app processes.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._active_downloads: dict[str, Download] = {}
|
||||
"""A dictionary of active downloads.
|
||||
|
||||
When a delivery key is received in a meta packet, it is added to this set.
|
||||
When the user hits the "/download/{key}" endpoint, we ensure the key is in
|
||||
this set and start the download by requesting chunks from the app process.
|
||||
|
||||
When the download is complete, the app process sends a "deliver_file_end"
|
||||
meta packet, and we remove the key from this set.
|
||||
"""
|
||||
|
||||
async def create_download(
|
||||
self,
|
||||
*,
|
||||
app_service: "AppService",
|
||||
delivery_key: str,
|
||||
file_name: str,
|
||||
open_method: str,
|
||||
mime_type: str,
|
||||
encoding: str | None = None,
|
||||
name: str | None = None,
|
||||
) -> None:
|
||||
"""Prepare for a new download.
|
||||
|
||||
Args:
|
||||
app_service: The app service to start the download for.
|
||||
delivery_key: The delivery key to start the download for.
|
||||
file_name: The name of the file to download.
|
||||
open_method: The method to open the file with.
|
||||
mime_type: The mime type of the content.
|
||||
encoding: The encoding of the content or None if the content is binary.
|
||||
"""
|
||||
self._active_downloads[delivery_key] = Download(
|
||||
app_service,
|
||||
delivery_key,
|
||||
file_name,
|
||||
open_method,
|
||||
mime_type,
|
||||
encoding,
|
||||
name=name,
|
||||
)
|
||||
|
||||
async def download(self, delivery_key: str) -> AsyncGenerator[bytes, None]:
|
||||
"""Download a file from the given app service.
|
||||
|
||||
Args:
|
||||
delivery_key: The delivery key to download.
|
||||
"""
|
||||
|
||||
app_service = await self._get_app_service(delivery_key)
|
||||
download = self._active_downloads[delivery_key]
|
||||
incoming_chunks = download.incoming_chunks
|
||||
|
||||
while True:
|
||||
# Request a chunk from the app service.
|
||||
send_result = await app_service.send_meta(
|
||||
{
|
||||
"type": "deliver_chunk_request",
|
||||
"key": delivery_key,
|
||||
"size": DOWNLOAD_CHUNK_SIZE,
|
||||
"name": download.name,
|
||||
}
|
||||
)
|
||||
|
||||
if not send_result:
|
||||
log.warning(
|
||||
"Download {delivery_key!r} failed to request chunk from app service"
|
||||
)
|
||||
del self._active_downloads[delivery_key]
|
||||
break
|
||||
|
||||
try:
|
||||
chunk = await asyncio.wait_for(incoming_chunks.get(), DOWNLOAD_TIMEOUT)
|
||||
except asyncio.TimeoutError:
|
||||
log.warning(
|
||||
"Download %r failed to receive chunk from app service within %r seconds",
|
||||
delivery_key,
|
||||
DOWNLOAD_TIMEOUT,
|
||||
)
|
||||
chunk = None
|
||||
|
||||
if not chunk:
|
||||
# Empty chunk - the app process has finished sending the file
|
||||
# or the download has been cancelled.
|
||||
incoming_chunks.task_done()
|
||||
del self._active_downloads[delivery_key]
|
||||
break
|
||||
else:
|
||||
incoming_chunks.task_done()
|
||||
yield chunk
|
||||
|
||||
async def chunk_received(self, delivery_key: str, chunk: bytes | str) -> None:
|
||||
"""Handle a chunk received from the app service for a download.
|
||||
|
||||
Args:
|
||||
delivery_key: The delivery key that the chunk was received for.
|
||||
chunk: The chunk that was received.
|
||||
"""
|
||||
|
||||
download = self._active_downloads.get(delivery_key)
|
||||
if not download:
|
||||
# The download may have been cancelled - e.g. the websocket
|
||||
# was closed before the download could complete.
|
||||
log.debug("Chunk received for cancelled download %r", delivery_key)
|
||||
return
|
||||
|
||||
if isinstance(chunk, str):
|
||||
chunk = chunk.encode(download.encoding or "utf-8")
|
||||
await download.incoming_chunks.put(chunk)
|
||||
|
||||
async def _get_app_service(self, delivery_key: str) -> "AppService":
|
||||
"""Get the app service that the given delivery key is linked to.
|
||||
|
||||
Args:
|
||||
delivery_key: The delivery key to get the app service for.
|
||||
"""
|
||||
for key in self._active_downloads.keys():
|
||||
if key == delivery_key:
|
||||
return self._active_downloads[key].app_service
|
||||
else:
|
||||
raise ValueError(f"No active download for delivery key {delivery_key!r}")
|
||||
|
||||
async def get_download_metadata(self, delivery_key: str) -> Download:
|
||||
"""Get the metadata for a download.
|
||||
|
||||
Args:
|
||||
delivery_key: The delivery key to get the metadata for.
|
||||
"""
|
||||
return self._active_downloads[delivery_key]
|
||||
|
||||
async def cancel_app_downloads(self, app_service_id: str) -> None:
|
||||
"""Cancel all downloads for the given app service.
|
||||
|
||||
Args:
|
||||
app_service_id: The app service ID to cancel downloads for.
|
||||
"""
|
||||
for download in self._active_downloads.values():
|
||||
if download.app_service.app_service_id == app_service_id:
|
||||
await download.incoming_chunks.put(None)
|
@@ -1,350 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
import signal
|
||||
import sys
|
||||
|
||||
from typing import Any
|
||||
|
||||
import aiohttp_jinja2
|
||||
from aiohttp import web
|
||||
from aiohttp import WSMsgType
|
||||
from aiohttp.web_runner import GracefulExit
|
||||
import jinja2
|
||||
|
||||
from importlib.metadata import version
|
||||
|
||||
from rich.console import Console
|
||||
from rich.logging import RichHandler
|
||||
from rich.highlighter import RegexHighlighter
|
||||
|
||||
from webshare.download_manager import DownloadManager
|
||||
|
||||
from .app_service import AppService
|
||||
|
||||
log = logging.getLogger("textual-serve")
|
||||
|
||||
LOGO = r"""[bold magenta]___ ____ _ _ ___ _ _ ____ _ ____ ____ ____ _ _ ____
|
||||
| |___ \/ | | | |__| | __ [__ |___ |__/ | | |___
|
||||
| |___ _/\_ | |__| | | |___ ___] |___ | \ \/ |___ [not bold]VVVVV
|
||||
""".replace("VVVVV", f"v{version('textual-serve')}")
|
||||
|
||||
|
||||
WINDOWS = sys.platform == "WINDOWS"
|
||||
|
||||
|
||||
class LogHighlighter(RegexHighlighter):
|
||||
base_style = "repr."
|
||||
highlights = [
|
||||
r"(?P<number>(?<!\w)\-?[0-9]+\.?[0-9]*(e[-+]?\d+?)?\b|0x[0-9a-fA-F]*)",
|
||||
r"(?P<path>\[.*?\])",
|
||||
r"(?<![\\\w])(?P<str>b?'''.*?(?<!\\)'''|b?'.*?(?<!\\)'|b?\"\"\".*?(?<!\\)\"\"\"|b?\".*?(?<!\\)\")",
|
||||
]
|
||||
|
||||
|
||||
def to_int(value: str, default: int) -> int:
|
||||
"""Convert to an integer, or return a default if that's not possible.
|
||||
|
||||
Args:
|
||||
number: A string possibly containing a decimal.
|
||||
default: Default value if value can't be decoded.
|
||||
|
||||
Returns:
|
||||
Integer.
|
||||
"""
|
||||
try:
|
||||
return int(value)
|
||||
except ValueError:
|
||||
return default
|
||||
|
||||
|
||||
class Server:
|
||||
"""Serve a Textual app."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
command: str,
|
||||
host: str = "localhost",
|
||||
port: int = 8000,
|
||||
title: str | None = None,
|
||||
public_url: str | None = None,
|
||||
statics_path: str | os.PathLike = "./static",
|
||||
templates_path: str | os.PathLike = "./templates",
|
||||
):
|
||||
"""
|
||||
|
||||
Args:
|
||||
app_factory: A callable that returns a new App instance.
|
||||
host: Host of web application.
|
||||
port: Port for server.
|
||||
statics_path: Path to statics folder. May be absolute or relative to server.py.
|
||||
templates_path" Path to templates folder. May be absolute or relative to server.py.
|
||||
"""
|
||||
self.command = command
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.title = title or command
|
||||
self.debug = False
|
||||
|
||||
if public_url is None:
|
||||
if self.port == 80:
|
||||
self.public_url = f"http://{self.host}"
|
||||
elif self.port == 443:
|
||||
self.public_url = f"https://{self.host}"
|
||||
else:
|
||||
self.public_url = f"http://{self.host}:{self.port}"
|
||||
else:
|
||||
self.public_url = public_url
|
||||
|
||||
base_path = (Path(__file__) / "../").resolve().absolute()
|
||||
self.statics_path = base_path / statics_path
|
||||
self.templates_path = base_path / templates_path
|
||||
self.console = Console()
|
||||
self.download_manager = DownloadManager()
|
||||
|
||||
def initialize_logging(self) -> None:
|
||||
"""Initialize logging.
|
||||
|
||||
May be overridden in a subclass.
|
||||
"""
|
||||
FORMAT = "%(message)s"
|
||||
logging.basicConfig(
|
||||
level="DEBUG" if self.debug else "INFO",
|
||||
format=FORMAT,
|
||||
datefmt="[%X]",
|
||||
handlers=[
|
||||
RichHandler(
|
||||
show_path=False,
|
||||
show_time=False,
|
||||
rich_tracebacks=True,
|
||||
tracebacks_show_locals=True,
|
||||
highlighter=LogHighlighter(),
|
||||
console=self.console,
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
def request_exit(self) -> None:
|
||||
"""Gracefully exit the app."""
|
||||
raise GracefulExit()
|
||||
|
||||
async def _make_app(self) -> web.Application:
|
||||
"""Make the aiohttp web.Application.
|
||||
|
||||
Returns:
|
||||
New aiohttp web application.
|
||||
"""
|
||||
app = web.Application()
|
||||
|
||||
aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader(self.templates_path))
|
||||
|
||||
ROUTES = [
|
||||
web.get("/", self.handle_index, name="index"),
|
||||
web.get("/ws", self.handle_websocket, name="websocket"),
|
||||
web.get("/download/{key}", self.handle_download, name="download"),
|
||||
web.static("/static", self.statics_path, show_index=True, name="static"),
|
||||
]
|
||||
app.add_routes(ROUTES)
|
||||
|
||||
app.on_startup.append(self.on_startup)
|
||||
app.on_shutdown.append(self.on_shutdown)
|
||||
return app
|
||||
|
||||
async def handle_download(self, request: web.Request) -> web.StreamResponse:
|
||||
"""Handle a download request."""
|
||||
key = request.match_info["key"]
|
||||
|
||||
try:
|
||||
download_meta = await self.download_manager.get_download_metadata(key)
|
||||
except KeyError:
|
||||
raise web.HTTPNotFound(text=f"Download with key {key!r} not found")
|
||||
|
||||
response = web.StreamResponse()
|
||||
mime_type = download_meta.mime_type
|
||||
|
||||
content_type = mime_type
|
||||
if download_meta.encoding:
|
||||
content_type += f"; charset={download_meta.encoding}"
|
||||
|
||||
response.headers["Content-Type"] = content_type
|
||||
disposition = (
|
||||
"inline" if download_meta.open_method == "browser" else "attachment"
|
||||
)
|
||||
response.headers["Content-Disposition"] = (
|
||||
f"{disposition}; filename={download_meta.file_name}"
|
||||
)
|
||||
|
||||
await response.prepare(request)
|
||||
|
||||
async for chunk in self.download_manager.download(key):
|
||||
await response.write(chunk)
|
||||
|
||||
await response.write_eof()
|
||||
return response
|
||||
|
||||
async def on_shutdown(self, app: web.Application) -> None:
|
||||
"""Called on shutdown.
|
||||
|
||||
Args:
|
||||
app: App instance.
|
||||
"""
|
||||
|
||||
async def on_startup(self, app: web.Application) -> None:
|
||||
"""Called on startup.
|
||||
|
||||
Args:
|
||||
app: App instance.
|
||||
"""
|
||||
|
||||
self.console.print(LOGO, highlight=False)
|
||||
self.console.print(f"Serving {self.command!r} on {self.public_url}")
|
||||
self.console.print("\n[cyan]Press Ctrl+C to quit")
|
||||
|
||||
def serve(self, debug: bool = False) -> None:
|
||||
"""Serve the Textual application.
|
||||
|
||||
This will run a local webserver until it is closed with Ctrl+C
|
||||
|
||||
"""
|
||||
self.debug = debug
|
||||
self.initialize_logging()
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
try:
|
||||
loop.add_signal_handler(signal.SIGINT, self.request_exit)
|
||||
loop.add_signal_handler(signal.SIGTERM, self.request_exit)
|
||||
except NotImplementedError:
|
||||
pass
|
||||
|
||||
if self.debug:
|
||||
log.info("Running in debug mode. You may use textual dev tools.")
|
||||
|
||||
web.run_app(
|
||||
self._make_app(),
|
||||
host=self.host,
|
||||
port=self.port,
|
||||
handle_signals=False,
|
||||
loop=loop,
|
||||
print=lambda *args: None,
|
||||
)
|
||||
|
||||
@aiohttp_jinja2.template("app_index.html")
|
||||
async def handle_index(self, request: web.Request) -> dict[str, Any]:
|
||||
"""Serves the HTML for an app.
|
||||
|
||||
Args:
|
||||
request: Request object.
|
||||
|
||||
Returns:
|
||||
Template data.
|
||||
"""
|
||||
router = request.app.router
|
||||
font_size = to_int(request.query.get("fontsize", "16"), 16)
|
||||
|
||||
def get_url(route: str, **args) -> str:
|
||||
"""Get a URL from the aiohttp router."""
|
||||
path = router[route].url_for(**args)
|
||||
return f"{self.public_url}{path}"
|
||||
|
||||
def get_websocket_url(route: str, **args) -> str:
|
||||
"""Get a URL with a websocket prefix."""
|
||||
url = get_url(route, **args)
|
||||
|
||||
if self.public_url.startswith("https"):
|
||||
return "wss:" + url.split(":", 1)[1]
|
||||
else:
|
||||
return "ws:" + url.split(":", 1)[1]
|
||||
|
||||
context = {
|
||||
"font_size": font_size,
|
||||
"app_websocket_url": get_websocket_url("websocket"),
|
||||
}
|
||||
context["config"] = {
|
||||
"static": {
|
||||
"url": get_url("static", filename="/").rstrip("/") + "/",
|
||||
},
|
||||
}
|
||||
context["application"] = {
|
||||
"name": self.title,
|
||||
}
|
||||
return context
|
||||
|
||||
async def _process_messages(
|
||||
self, websocket: web.WebSocketResponse, app_service: AppService
|
||||
) -> None:
|
||||
"""Process messages from the client browser websocket.
|
||||
|
||||
Args:
|
||||
websocket: Websocket instance.
|
||||
app_service: App service.
|
||||
"""
|
||||
TEXT = WSMsgType.TEXT
|
||||
|
||||
async for message in websocket:
|
||||
if message.type != TEXT:
|
||||
continue
|
||||
envelope = message.json()
|
||||
assert isinstance(envelope, list)
|
||||
type_ = envelope[0]
|
||||
if type_ == "stdin":
|
||||
data = envelope[1]
|
||||
await app_service.send_bytes(data.encode("utf-8"))
|
||||
elif type_ == "resize":
|
||||
data = envelope[1]
|
||||
await app_service.set_terminal_size(data["width"], data["height"])
|
||||
elif type_ == "ping":
|
||||
data = envelope[1]
|
||||
await websocket.send_json(["pong", data])
|
||||
elif type_ == "blur":
|
||||
await app_service.blur()
|
||||
elif type_ == "focus":
|
||||
await app_service.focus()
|
||||
|
||||
async def handle_websocket(self, request: web.Request) -> web.WebSocketResponse:
|
||||
"""Handle the websocket that drives the remote process.
|
||||
|
||||
This is called when the browser connects to the websocket.
|
||||
|
||||
Args:
|
||||
request: Request object.
|
||||
|
||||
Returns:
|
||||
Websocket response.
|
||||
"""
|
||||
websocket = web.WebSocketResponse(heartbeat=15)
|
||||
|
||||
width = to_int(request.query.get("width", "80"), 80)
|
||||
height = to_int(request.query.get("height", "24"), 24)
|
||||
|
||||
app_service: AppService | None = None
|
||||
try:
|
||||
await websocket.prepare(request)
|
||||
app_service = AppService(
|
||||
self.command,
|
||||
write_bytes=websocket.send_bytes,
|
||||
write_str=websocket.send_str,
|
||||
close=websocket.close,
|
||||
download_manager=self.download_manager,
|
||||
debug=self.debug,
|
||||
)
|
||||
await app_service.start(width, height)
|
||||
try:
|
||||
await self._process_messages(websocket, app_service)
|
||||
finally:
|
||||
await app_service.stop()
|
||||
|
||||
except asyncio.CancelledError:
|
||||
await websocket.close()
|
||||
|
||||
except Exception as error:
|
||||
log.exception(error)
|
||||
|
||||
finally:
|
||||
if app_service is not None:
|
||||
await app_service.stop()
|
||||
|
||||
return websocket
|
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user