改进同步

This commit is contained in:
2025-11-16 12:07:52 +08:00
parent efcea9f7b9
commit 41f05fbb97
3 changed files with 246 additions and 511 deletions

2
.gitignore vendored
View File

@@ -1,3 +1,5 @@
sync.toml
config.toml
data.toml data.toml
.directory/ .directory/
__pycache__/ __pycache__/

View File

@@ -6,6 +6,7 @@ import cmd
import toml import toml
import tempfile import tempfile
import subprocess import subprocess
import requests
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
from tabulate import tabulate from tabulate import tabulate
@@ -15,6 +16,7 @@ import sm2
import pathlib import pathlib
os.chdir(pathlib.Path(__file__).resolve().parent) os.chdir(pathlib.Path(__file__).resolve().parent)
class MemoryUnit: class MemoryUnit:
def __init__(self, name: str, unit_id: Optional[str] = None): def __init__(self, name: str, unit_id: Optional[str] = None):
@@ -22,6 +24,7 @@ class MemoryUnit:
self.unit_id = unit_id or hasher.get_md5(name) self.unit_id = unit_id or hasher.get_md5(name)
self.attachments = set() # (字符串, 创建时间戳) self.attachments = set() # (字符串, 创建时间戳)
self.created_time = timer.get_daystamp() self.created_time = timer.get_daystamp()
self.lastmodify = timer.get_timestamp() # 新增最后修改时间字段
self.algodata = { self.algodata = {
sm2.SM2Algorithm.algo_name: { sm2.SM2Algorithm.algo_name: {
'efactor': 2.5, 'efactor': 2.5,
@@ -40,6 +43,7 @@ class MemoryUnit:
'name': self.name, 'name': self.name,
'attachments': list(self.attachments), 'attachments': list(self.attachments),
'created_time': self.created_time, 'created_time': self.created_time,
'lastmodify': self.lastmodify, # 新增最后修改时间
'algodata': self.algodata 'algodata': self.algodata
} }
@@ -48,10 +52,54 @@ class MemoryUnit:
unit = cls(data['name'], unit_id) unit = cls(data['name'], unit_id)
unit.attachments = set(tuple(att) for att in data['attachments']) unit.attachments = set(tuple(att) for att in data['attachments'])
unit.created_time = data['created_time'] unit.created_time = data['created_time']
unit.lastmodify = data.get('lastmodify', timer.get_timestamp()) # 兼容旧数据
unit.algodata = data['algodata'] unit.algodata = data['algodata']
return unit return unit
class WebDAVClient:
"""WebDAV 客户端"""
def __init__(self, url: str, username: str, password: str):
self.url = url.rstrip('/')
self.username = username
self.password = password
self.session = requests.Session()
if username and password:
self.session.auth = (username, password)
def download_file(self, remote_path: str) -> Optional[str]:
"""下载文件"""
try:
full_url = f"{self.url}/{remote_path.lstrip('/')}"
response = self.session.get(full_url)
response.raise_for_status()
return response.text
except requests.RequestException as e:
print(f"下载文件失败: {e}")
return None
def upload_file(self, remote_path: str, content: str) -> bool:
"""上传文件"""
try:
full_url = f"{self.url}/{remote_path.lstrip('/')}"
response = self.session.put(full_url, data=content.encode('utf-8'))
response.raise_for_status()
return True
except requests.RequestException as e:
print(f"上传文件失败: {e}")
return False
def file_exists(self, remote_path: str) -> bool:
"""检查文件是否存在"""
try:
full_url = f"{self.url}/{remote_path.lstrip('/')}"
response = self.session.head(full_url)
return response.status_code == 200
except requests.RequestException:
return False
class DynaNoteShell(cmd.Cmd): class DynaNoteShell(cmd.Cmd):
intro = f'欢迎使用 DynaNote Shell. 输入 help 或 ? 查看命令列表. \n当前 UNIX 日时间戳: {timer.get_daystamp()}\n' intro = f'欢迎使用 DynaNote Shell. 输入 help 或 ? 查看命令列表. \n当前 UNIX 日时间戳: {timer.get_daystamp()}\n'
@@ -60,9 +108,12 @@ class DynaNoteShell(cmd.Cmd):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.data_file = './data.toml' self.data_file = './data.toml'
self.sync_config_file = './sync.toml'
self.memory_units: Dict[str, MemoryUnit] = {} self.memory_units: Dict[str, MemoryUnit] = {}
self.selected_unit: Optional[MemoryUnit] = None self.selected_unit: Optional[MemoryUnit] = None
self.webdav_client: Optional[WebDAVClient] = None
self.load_data() self.load_data()
self.load_sync_config()
def load_data(self) -> None: def load_data(self) -> None:
"""从 data.toml 文件加载记忆单元""" """从 data.toml 文件加载记忆单元"""
@@ -93,6 +144,20 @@ class DynaNoteShell(cmd.Cmd):
else: else:
print("未找到数据文件. 从空数据库开始. ") print("未找到数据文件. 从空数据库开始. ")
def load_sync_config(self) -> None:
"""加载同步配置"""
if os.path.exists(self.sync_config_file):
try:
with open(self.sync_config_file, 'r', encoding='utf-8') as f:
self.sync_config = toml.load(f)
print("同步配置已加载")
except Exception as e:
print(f"加载同步配置时出错: {e}")
self.sync_config = {}
else:
print("未找到同步配置文件")
self.sync_config = {}
def save_data(self) -> None: def save_data(self) -> None:
"""将记忆单元保存到 data.toml 文件""" """将记忆单元保存到 data.toml 文件"""
try: try:
@@ -107,6 +172,178 @@ class DynaNoteShell(cmd.Cmd):
except Exception as e: except Exception as e:
print(f"保存数据时出错: {e}") print(f"保存数据时出错: {e}")
def ensure_webdav_client(self) -> bool:
"""确保 WebDAV 客户端已初始化"""
if self.webdav_client is not None:
return True
if not self.sync_config.get('webdav'):
print("未配置 WebDAV 同步")
return False
webdav_config = self.sync_config['webdav']
if not webdav_config.get('url'):
print("未配置 WebDAV 服务器地址")
return False
self.webdav_client = WebDAVClient(
url=webdav_config['url'],
username=webdav_config.get('username', ''),
password=webdav_config.get('password', '')
)
return True
def merge_units(self, local_units: Dict[str, MemoryUnit], remote_units: Dict[str, MemoryUnit],
strategy: str = "remote_merge") -> Dict[str, MemoryUnit]:
"""合并本地和远程的记忆单元"""
merged = {}
# 获取所有单元ID
all_unit_ids = set(local_units.keys()) | set(remote_units.keys())
for unit_id in all_unit_ids:
local_unit = local_units.get(unit_id)
remote_unit = remote_units.get(unit_id)
if local_unit and remote_unit:
# 冲突解决:取时间更为新的版本
if local_unit.lastmodify > remote_unit.lastmodify:
merged[unit_id] = local_unit
else:
merged[unit_id] = remote_unit
elif local_unit:
merged[unit_id] = local_unit
elif remote_unit:
merged[unit_id] = remote_unit
return merged
def do_pull(self, arg: str) -> None:
"""从云端拉取数据
用法: pull
"""
if not self.ensure_webdav_client():
return
webdav_config = self.sync_config['webdav']
remote_file = webdav_config.get('remote_file', 'data.toml')
print("正在从云端拉取数据...")
print(f"{remote_file}")
# 下载远程文件
remote_content = self.webdav_client.download_file(remote_file)
if remote_content is None:
print("拉取失败")
return
try:
# 解析远程数据
remote_data = toml.loads(remote_content)
remote_units = {}
for unit_id, unit_data in remote_data.items():
remote_units[unit_id] = MemoryUnit.from_dict(unit_id, unit_data)
# 显示预览信息
print("\n=== 同步预览 ===")
print("本地单元:", " ".join([unit.name for unit in self.memory_units.values()]))
print("云端单元:", " ".join([unit.name for unit in remote_units.values()]))
print(f"\n本地单元数: {len(self.memory_units)}, 云端单元数: {len(remote_units)}")
# 请求确认
response = input("\n确认拉取并合并数据?(y/N): ")
if response.lower() not in ['y', 'yes']:
print("拉取已取消")
return
# 获取合并策略
sync_config = self.sync_config.get('sync', {})
merge_strategy = sync_config.get('merge_strategy', 'remote_merge')
# 合并数据
merged_units = self.merge_units(self.memory_units, remote_units, merge_strategy)
# 更新本地数据
self.memory_units = merged_units
print(f"拉取完成,合并后共有 {len(self.memory_units)} 个记忆单元")
# 保存合并后的数据
self.save_data()
except Exception as e:
print(f"处理云端数据时出错: {e}")
def do_push(self, arg: str) -> None:
"""推送数据到云端
用法: push
"""
if not self.ensure_webdav_client():
return
webdav_config = self.sync_config['webdav']
remote_file = webdav_config.get('remote_file', 'data.toml')
print("正在推送数据到云端...")
# 首先拉取云端数据以进行合并
remote_units = {}
if self.webdav_client.file_exists(remote_file):
remote_content = self.webdav_client.download_file(remote_file)
if remote_content:
try:
remote_data = toml.loads(remote_content)
for unit_id, unit_data in remote_data.items():
remote_units[unit_id] = MemoryUnit.from_dict(unit_id, unit_data)
except Exception as e:
print(f"读取云端数据时出错: {e}")
# 获取合并策略
sync_config = self.sync_config.get('sync', {})
merge_strategy = sync_config.get('merge_strategy', 'remote_merge')
# 合并数据
if merge_strategy == "remote_merge":
# 与云端已有文件合并
merged_units = self.merge_units(remote_units, self.memory_units, merge_strategy)
else:
# 与本地已有文件合并(实际上就是使用本地数据覆盖)
merged_units = self.memory_units
# 显示预览信息
print("\n=== 同步预览 ===")
print("本地单元:", " ".join([unit.name for unit in self.memory_units.values()]))
print("云端单元:", " ".join([unit.name for unit in remote_units.values()]))
print(f"合并后单元:", " ".join([unit.name for unit in merged_units.values()]))
print(f"\n本地单元数: {len(self.memory_units)}, 云端单元数: {len(remote_units)}, 合并后单元数: {len(merged_units)}")
# 请求确认
response = input("\n确认推送并合并数据?(y/N): ")
if response.lower() not in ['y', 'yes']:
print("推送已取消")
return
# 准备上传数据
upload_data = {}
for unit_id, unit in merged_units.items():
upload_data[unit_id] = unit.to_dict()
# 转换为 TOML 格式
try:
upload_content = toml.dumps(upload_data)
except Exception as e:
print(f"序列化数据时出错: {e}")
return
# 上传到云端
if self.webdav_client.upload_file(remote_file, upload_content):
print(f"推送成功,上传了 {len(merged_units)} 个记忆单元")
# 更新本地数据为合并后的版本
self.memory_units = merged_units
self.save_data()
else:
print("推送失败")
def find_unit_by_prefix(self, prefix: str) -> Optional[str]: def find_unit_by_prefix(self, prefix: str) -> Optional[str]:
matches = [unit_id for unit_id in self.memory_units.keys() matches = [unit_id for unit_id in self.memory_units.keys()
if unit_id.startswith(prefix)] if unit_id.startswith(prefix)]
@@ -403,6 +640,8 @@ class DynaNoteShell(cmd.Cmd):
print(" mark <评分> - 使用评分0-5更新 SM-2") print(" mark <评分> - 使用评分0-5更新 SM-2")
print(" show - 显示选定单元详情") print(" show - 显示选定单元详情")
print(" edit - 使用 nano 编辑选定单元") print(" edit - 使用 nano 编辑选定单元")
print(" pull - 从云端拉取数据")
print(" push - 推送数据到云端")
print(" clear - 清屏") print(" clear - 清屏")
print(" help - 显示此帮助") print(" help - 显示此帮助")
print(" exit - 退出 shell") print(" exit - 退出 shell")
@@ -415,6 +654,11 @@ class DynaNoteShell(cmd.Cmd):
print(" 1 - 错误回答; 但正确答案似乎熟悉") print(" 1 - 错误回答; 但正确答案似乎熟悉")
print(" 0 - 完全遗忘") print(" 0 - 完全遗忘")
print("\n同步配置:")
print(" 编辑 sync.toml 文件配置 WebDAV 同步")
print(" 合并策略: remote_merge (与云端合并) 或 local_merge (与本地合并)")
print(" 冲突解决: 相同单元取时间更为新的版本")
def do_exit(self, arg: str) -> bool: def do_exit(self, arg: str) -> bool:
"""退出 shell """退出 shell
用法: exit 用法: exit

View File

@@ -1,511 +0,0 @@
#!/usr/bin/env python3
"""
DynaNote TUI - 使用 SM-2 算法的间隔重复闪卡系统 (Textual 版本)
"""
import os
import sys
import toml
import tempfile
import subprocess
from typing import Dict, List, Optional, Tuple
from datetime import datetime
# 导入现有模块
import hasher
import timer
import sm2
import pathlib
from textual import on
from textual.app import App, ComposeResult
from textual.containers import Container, Horizontal, Vertical
from textual.widgets import (
Header, Footer, Button, Static, Input,
DataTable, Label, Select, TextArea,
TabbedContent, TabPane, ContentSwitcher
)
from textual.screen import Screen, ModalScreen
from textual.reactive import reactive
os.chdir(pathlib.Path(__file__).resolve().parent)
class MemoryUnit:
"""表示单个记忆单元(闪卡)"""
def __init__(self, name: str, unit_id: Optional[str] = None):
self.name = name
self.unit_id = unit_id or hasher.get_md5(name)
self.attachments = set() # 元组集合 (字符串, 创建时间戳)
self.created_time = timer.get_daystamp()
self.algodata = {
sm2.SM2Algorithm.algo_name: {
'efactor': 2.5,
'real_rept': 0,
'rept': 0,
'interval': 0,
'last_date': 0,
'next_date': 0,
'is_activated': 0,
'last_modify': timer.get_timestamp()
}
}
def to_dict(self) -> Dict:
"""将记忆单元转换为字典以便 TOML 序列化"""
return {
'name': self.name,
'attachments': list(self.attachments),
'created_time': self.created_time,
'algodata': self.algodata
}
@classmethod
def from_dict(cls, unit_id: str, data: Dict) -> 'MemoryUnit':
"""从字典创建记忆单元"""
unit = cls(data['name'], unit_id)
unit.attachments = set(tuple(att) for att in data['attachments'])
unit.created_time = data['created_time']
unit.algodata = data['algodata']
return unit
class NewUnitScreen(ModalScreen):
"""创建新记忆单元的模态屏幕"""
def compose(self) -> ComposeResult:
with Container():
yield Label("创建新记忆单元", classes="modal-title")
yield Input(placeholder="输入单元名称...", id="unit-name")
with Horizontal():
yield Button("创建", variant="primary", id="create-btn")
yield Button("取消", variant="default", id="cancel-btn")
@on(Button.Pressed, "#create-btn")
def create_unit(self) -> None:
name_input = self.query_one("#unit-name", Input)
name = name_input.value.strip()
if name:
self.dismiss(name)
@on(Button.Pressed, "#cancel-btn")
def cancel(self) -> None:
self.dismiss(None)
class MarkRatingScreen(ModalScreen):
"""评分记忆单元的模态屏幕"""
def __init__(self, unit_name: str):
super().__init__()
self.unit_name = unit_name
def compose(self) -> ComposeResult:
with Container():
yield Label(f"'{self.unit_name}' 评分", classes="modal-title")
yield Label("\nSM-2 评分标准:", classes="rating-title")
yield Label("5 - 完美回答", classes="rating-item")
yield Label("4 - 犹豫后正确回答", classes="rating-item")
yield Label("3 - 经过严重困难后回忆起正确答案", classes="rating-item")
yield Label("2 - 错误回答; 但记得正确答案", classes="rating-item")
yield Label("1 - 错误回答; 但正确答案似乎熟悉", classes="rating-item")
yield Label("0 - 完全遗忘", classes="rating-item")
with Horizontal():
for rating in [5, 4, 3, 2, 1, 0]:
yield Button(str(rating), id=f"rating-{rating}")
yield Button("取消", variant="default", id="cancel-btn")
@on(Button.Pressed)
def handle_rating(self, event: Button.Pressed) -> None:
if event.button.id and event.button.id.startswith("rating-"):
rating = int(event.button.id.split("-")[1])
self.dismiss(rating)
elif event.button.id == "cancel-btn":
self.dismiss(None)
class AttachmentScreen(ModalScreen):
"""添加附件的模态屏幕"""
def compose(self) -> ComposeResult:
with Container():
yield Label("添加附件", classes="modal-title")
yield TextArea(id="attachment-text", language="markdown")
with Horizontal():
yield Button("添加", variant="primary", id="add-btn")
yield Button("取消", variant="default", id="cancel-btn")
@on(Button.Pressed, "#add-btn")
def add_attachment(self) -> None:
text_area = self.query_one("#attachment-text", TextArea)
text = text_area.text.strip()
if text:
self.dismiss(text)
@on(Button.Pressed, "#cancel-btn")
def cancel(self) -> None:
self.dismiss(None)
class MainScreen(Screen):
"""主屏幕"""
selected_unit = reactive(None)
def __init__(self):
super().__init__()
self.data_file = './data.toml'
self.memory_units: Dict[str, MemoryUnit] = {}
self.load_data()
def load_data(self) -> None:
"""从 data.toml 文件加载记忆单元"""
if os.path.exists(self.data_file):
try:
with open(self.data_file, 'r', encoding='utf-8') as f:
data = toml.load(f)
# 处理嵌套数据
nested_data = dict()
for key, value in data.items():
if '.' in key:
parts = key.split('.')
current = nested_data
for part in parts[:-1]:
if part not in current:
current[part] = {}
current = current[part]
current[parts[-1]] = value
else:
nested_data[key] = value
data = nested_data
for unit_id, unit_data in data.items():
self.memory_units[unit_id] = MemoryUnit.from_dict(unit_id, unit_data)
self.notify(f"已加载 {len(self.memory_units)} 个记忆单元", severity="information")
except Exception as e:
self.notify(f"加载数据时出错: {e}", severity="error")
else:
self.notify("未找到数据文件. 从空数据库开始.", severity="warning")
def save_data(self) -> None:
"""将记忆单元保存到 data.toml 文件"""
try:
data = {}
for unit_id, unit in self.memory_units.items():
data[unit_id] = unit.to_dict()
with open(self.data_file, 'w', encoding='utf-8') as f:
toml.dump(data, f)
self.notify("数据保存成功", severity="information")
except Exception as e:
self.notify(f"保存数据时出错: {e}", severity="error")
def compose(self) -> ComposeResult:
yield Header()
with TabbedContent():
with TabPane("记忆单元列表", id="list-tab"):
with Vertical():
yield Label("记忆单元列表", classes="section-title")
with Horizontal():
yield Button("刷新", id="refresh-btn")
yield Button("新建", id="new-btn", variant="primary")
yield Button("删除", id="delete-btn", variant="error")
yield DataTable(id="units-table")
with TabPane("单元详情", id="detail-tab"):
with Vertical():
yield Label("记忆单元详情", classes="section-title")
yield Static("未选择任何单元", id="unit-info")
with Horizontal():
yield Button("评分", id="mark-btn", variant="success")
yield Button("添加附件", id="attach-btn")
yield Button("编辑", id="edit-btn")
yield Static("", id="attachments-info")
yield Footer()
def on_mount(self) -> None:
"""挂载时初始化"""
self.setup_table()
self.update_unit_info()
def on_resume(self) -> None:
"""从模态屏幕返回时刷新数据"""
self.refresh_table()
def setup_table(self) -> None:
"""设置数据表格"""
table = self.query_one("#units-table", DataTable)
# 只在表格为空时添加列头,避免重复添加
if not table.columns:
table.add_columns(
"ID", "名称", "复习次数", "EF", "下次复习时间", "附件数"
)
# 清除现有行数据
table.clear()
today = timer.get_daystamp()
for unit in self.memory_units.values():
algodata = unit.algodata[sm2.SM2Algorithm.algo_name]
table.add_row(
unit.unit_id[:8] + "...",
unit.name,
str(algodata['real_rept']),
f"{algodata['efactor']:.2f}",
str(algodata['next_date']),
str(len(unit.attachments))
)
def update_unit_info(self) -> None:
"""更新单元详情显示"""
info_widget = self.query_one("#unit-info", Static)
attachments_widget = self.query_one("#attachments-info", Static)
if self.selected_unit:
unit = self.selected_unit
algodata = unit.algodata[sm2.SM2Algorithm.algo_name]
info_text = f"""
记忆单元: {unit.name}
ID: {unit.unit_id}
创建时间: {unit.created_time}
SM-2 算法数据:
EFactor: {algodata['efactor']}
实际复习次数: {algodata['real_rept']}
当前重复次数: {algodata['rept']}
间隔: {algodata['interval']}
上次复习: {algodata['last_date']}
下次复习: {algodata['next_date']}
是否激活: {algodata['is_activated']}
"""
info_widget.update(info_text)
if unit.attachments:
attachments_text = f"\n附件 ({len(unit.attachments)}):\n"
sorted_attachments = sorted(unit.attachments, key=lambda x: float(x[1]))
for i, (text, timestamp) in enumerate(sorted_attachments, 1):
attachments_text += f" {i}. {text[:80]}{'...' if len(text) > 80 else ''} (于 {timestamp})\n"
attachments_widget.update(attachments_text)
else:
attachments_widget.update("\n无附件")
else:
info_widget.update("未选择任何单元")
attachments_widget.update("")
@on(DataTable.RowSelected, "#units-table")
def on_row_selected(self, event: DataTable.RowSelected) -> None:
"""处理表格行选择"""
if event.row_key:
unit_id_prefix = event.row_key.value
for unit_id, unit in self.memory_units.items():
if unit_id.startswith(unit_id_prefix):
self.selected_unit = unit
self.update_unit_info()
self.notify(f"已选择单元: {unit.name}", severity="information")
break
@on(Button.Pressed, "#refresh-btn")
def refresh_table(self) -> None:
"""刷新表格"""
self.load_data() # 重新加载数据
self.setup_table()
self.update_unit_info()
self.notify("表格已刷新", severity="information")
@on(Button.Pressed, "#new-btn")
async def new_unit(self) -> None:
"""创建新单元"""
name = await self.app.push_screen_wait(NewUnitScreen())
if name:
unit = MemoryUnit(name)
self.memory_units[unit.unit_id] = unit
self.selected_unit = unit
self.setup_table()
self.update_unit_info()
self.save_data()
self.notify(f"已创建新单元: {unit.name}", severity="success")
@on(Button.Pressed, "#delete-btn")
def delete_unit(self) -> None:
"""删除选中的单元"""
if self.selected_unit:
unit_name = self.selected_unit.name
unit_id = self.selected_unit.unit_id
del self.memory_units[unit_id]
self.selected_unit = None
self.setup_table()
self.update_unit_info()
self.save_data()
self.notify(f"已删除单元: {unit_name}", severity="warning")
else:
self.notify("请先选择一个单元", severity="error")
@on(Button.Pressed, "#mark-btn")
async def mark_unit(self) -> None:
"""为选中的单元评分"""
if not self.selected_unit:
self.notify("请先选择一个单元", severity="error")
return
rating = await self.app.push_screen_wait(
MarkRatingScreen(self.selected_unit.name)
)
if rating is not None:
algodata = self.selected_unit.algodata
if algodata[sm2.SM2Algorithm.algo_name]['is_activated'] == 0:
sm2.SM2Algorithm.revisor(algodata, rating, is_new_activation=1)
algodata[sm2.SM2Algorithm.algo_name]['is_activated'] = 1
else:
sm2.SM2Algorithm.revisor(algodata, rating)
self.update_unit_info()
self.save_data()
self.notify(f"已使用评分 {rating} 更新 {self.selected_unit.name} 的 SM-2 参数",
severity="success")
@on(Button.Pressed, "#attach-btn")
async def add_attachment(self) -> None:
"""为选中的单元添加附件"""
if not self.selected_unit:
self.notify("请先选择一个单元", severity="error")
return
text = await self.app.push_screen_wait(AttachmentScreen())
if text:
attachment = (text, str(timer.get_timestamp()))
self.selected_unit.attachments.add(attachment)
self.update_unit_info()
self.save_data()
self.notify(f"附件已添加到 {self.selected_unit.name}", severity="success")
@on(Button.Pressed, "#edit-btn")
def edit_unit(self) -> None:
"""编辑选中的单元"""
if not self.selected_unit:
self.notify("请先选择一个单元", severity="error")
return
# 创建包含单元数据的临时文件
with tempfile.NamedTemporaryFile(mode='w', suffix='.toml', delete=False) as f:
temp_file = f.name
toml.dump({self.selected_unit.unit_id: self.selected_unit.to_dict()}, f)
try:
# 启动 nano 编辑器
subprocess.run(['nano', temp_file], check=True)
# 读取编辑后的文件
with open(temp_file, 'r', encoding='utf-8') as f:
edited_data = toml.load(f)
# 更新记忆单元
if self.selected_unit.unit_id in edited_data:
updated_data = edited_data[self.selected_unit.unit_id]
self.memory_units[self.selected_unit.unit_id] = MemoryUnit.from_dict(
self.selected_unit.unit_id, updated_data
)
self.selected_unit = self.memory_units[self.selected_unit.unit_id]
self.update_unit_info()
self.save_data()
self.notify("单元更新成功", severity="success")
else:
self.notify("错误: 在编辑后的文件中未找到单元 ID", severity="error")
except subprocess.CalledProcessError:
self.notify("编辑器已关闭但未保存", severity="warning")
except Exception as e:
self.notify(f"编辑单元时出错: {e}", severity="error")
finally:
# 清理临时文件
if os.path.exists(temp_file):
os.unlink(temp_file)
class DynaNoteTUI(App):
"""DynaNote TUI 应用"""
CSS = """
.modal-title {
text-align: center;
text-style: bold;
margin: 1;
}
.section-title {
text-align: center;
text-style: bold;
margin: 1;
}
.rating-title {
text-style: bold;
margin: 1 0;
}
.rating-item {
margin: 0 0 0 2;
}
#unit-info {
margin: 1;
border: solid $primary;
padding: 1;
}
#attachments-info {
margin: 1;
border: solid $accent;
padding: 1;
}
DataTable {
height: 1fr;
}
"""
BINDINGS = [
("q", "quit", "退出"),
("s", "save", "保存数据"),
("r", "refresh", "刷新"),
("n", "new", "新建单元"),
]
def on_mount(self) -> None:
"""挂载时切换到主屏幕"""
self.push_screen(MainScreen())
def action_quit(self) -> None:
"""退出应用"""
self.exit()
def action_save(self) -> None:
"""保存数据"""
if hasattr(self.screen, 'save_data'):
self.screen.save_data()
def action_refresh(self) -> None:
"""刷新"""
if hasattr(self.screen, 'refresh_table'):
self.screen.refresh_table()
def action_new(self) -> None:
"""新建单元"""
if hasattr(self.screen, 'new_unit'):
self.screen.new_unit()
def main():
"""主入口点"""
app = DynaNoteTUI()
app.run()
if __name__ == '__main__':
main()