Files
DynaNote/dynanote.py
2025-11-16 12:07:52 +08:00

726 lines
26 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import os
import sys
import cmd
import toml
import tempfile
import subprocess
import requests
from typing import Dict, List, Optional, Tuple
from tabulate import tabulate
import hasher
import timer
import sm2
import pathlib
os.chdir(pathlib.Path(__file__).resolve().parent)
class MemoryUnit:
def __init__(self, name: str, unit_id: Optional[str] = None):
self.name = name
self.unit_id = unit_id or hasher.get_md5(name)
self.attachments = set() # (字符串, 创建时间戳)
self.created_time = timer.get_daystamp()
self.lastmodify = timer.get_timestamp() # 新增最后修改时间字段
self.algodata = {
sm2.SM2Algorithm.algo_name: {
'efactor': 2.5,
'real_rept': 0,
'rept': 0,
'interval': 0,
'last_date': 0,
'next_date': 0,
'is_activated': 0,
'last_modify': timer.get_timestamp()
}
}
def to_dict(self) -> Dict:
return {
'name': self.name,
'attachments': list(self.attachments),
'created_time': self.created_time,
'lastmodify': self.lastmodify, # 新增最后修改时间
'algodata': self.algodata
}
@classmethod
def from_dict(cls, unit_id: str, data: Dict) -> 'MemoryUnit':
unit = cls(data['name'], unit_id)
unit.attachments = set(tuple(att) for att in data['attachments'])
unit.created_time = data['created_time']
unit.lastmodify = data.get('lastmodify', timer.get_timestamp()) # 兼容旧数据
unit.algodata = data['algodata']
return unit
class WebDAVClient:
"""WebDAV 客户端"""
def __init__(self, url: str, username: str, password: str):
self.url = url.rstrip('/')
self.username = username
self.password = password
self.session = requests.Session()
if username and password:
self.session.auth = (username, password)
def download_file(self, remote_path: str) -> Optional[str]:
"""下载文件"""
try:
full_url = f"{self.url}/{remote_path.lstrip('/')}"
response = self.session.get(full_url)
response.raise_for_status()
return response.text
except requests.RequestException as e:
print(f"下载文件失败: {e}")
return None
def upload_file(self, remote_path: str, content: str) -> bool:
"""上传文件"""
try:
full_url = f"{self.url}/{remote_path.lstrip('/')}"
response = self.session.put(full_url, data=content.encode('utf-8'))
response.raise_for_status()
return True
except requests.RequestException as e:
print(f"上传文件失败: {e}")
return False
def file_exists(self, remote_path: str) -> bool:
"""检查文件是否存在"""
try:
full_url = f"{self.url}/{remote_path.lstrip('/')}"
response = self.session.head(full_url)
return response.status_code == 200
except requests.RequestException:
return False
class DynaNoteShell(cmd.Cmd):
intro = f'欢迎使用 DynaNote Shell. 输入 help 或 ? 查看命令列表. \n当前 UNIX 日时间戳: {timer.get_daystamp()}\n'
prompt = '(dynanote) $ '
def __init__(self):
super().__init__()
self.data_file = './data.toml'
self.sync_config_file = './sync.toml'
self.memory_units: Dict[str, MemoryUnit] = {}
self.selected_unit: Optional[MemoryUnit] = None
self.webdav_client: Optional[WebDAVClient] = None
self.load_data()
self.load_sync_config()
def load_data(self) -> None:
"""从 data.toml 文件加载记忆单元"""
if os.path.exists(self.data_file):
try:
with open(self.data_file, 'r', encoding='utf-8') as f:
data = toml.load(f)
nested_data = dict()
for key, value in data.items():
if '.' in key:
parts = key.split('.')
current = nested_data
for part in parts[:-1]:
if part not in current:
current[part] = {}
current = current[part]
current[parts[-1]] = value
else:
nested_data[key] = value
data = nested_data
#print(data)
for unit_id, unit_data in data.items():
self.memory_units[unit_id] = MemoryUnit.from_dict(unit_id, unit_data)
print(f"已加载 {len(self.memory_units)} 个记忆单元")
except Exception as e:
print(f"加载数据时出错: {e}")
else:
print("未找到数据文件. 从空数据库开始. ")
def load_sync_config(self) -> None:
"""加载同步配置"""
if os.path.exists(self.sync_config_file):
try:
with open(self.sync_config_file, 'r', encoding='utf-8') as f:
self.sync_config = toml.load(f)
print("同步配置已加载")
except Exception as e:
print(f"加载同步配置时出错: {e}")
self.sync_config = {}
else:
print("未找到同步配置文件")
self.sync_config = {}
def save_data(self) -> None:
"""将记忆单元保存到 data.toml 文件"""
try:
data = {}
for unit_id, unit in self.memory_units.items():
data[unit_id] = unit.to_dict()
with open(self.data_file, 'w', encoding='utf-8') as f:
toml.dump(data, f)
print("数据保存成功")
except Exception as e:
print(f"保存数据时出错: {e}")
def ensure_webdav_client(self) -> bool:
"""确保 WebDAV 客户端已初始化"""
if self.webdav_client is not None:
return True
if not self.sync_config.get('webdav'):
print("未配置 WebDAV 同步")
return False
webdav_config = self.sync_config['webdav']
if not webdav_config.get('url'):
print("未配置 WebDAV 服务器地址")
return False
self.webdav_client = WebDAVClient(
url=webdav_config['url'],
username=webdav_config.get('username', ''),
password=webdav_config.get('password', '')
)
return True
def merge_units(self, local_units: Dict[str, MemoryUnit], remote_units: Dict[str, MemoryUnit],
strategy: str = "remote_merge") -> Dict[str, MemoryUnit]:
"""合并本地和远程的记忆单元"""
merged = {}
# 获取所有单元ID
all_unit_ids = set(local_units.keys()) | set(remote_units.keys())
for unit_id in all_unit_ids:
local_unit = local_units.get(unit_id)
remote_unit = remote_units.get(unit_id)
if local_unit and remote_unit:
# 冲突解决:取时间更为新的版本
if local_unit.lastmodify > remote_unit.lastmodify:
merged[unit_id] = local_unit
else:
merged[unit_id] = remote_unit
elif local_unit:
merged[unit_id] = local_unit
elif remote_unit:
merged[unit_id] = remote_unit
return merged
def do_pull(self, arg: str) -> None:
"""从云端拉取数据
用法: pull
"""
if not self.ensure_webdav_client():
return
webdav_config = self.sync_config['webdav']
remote_file = webdav_config.get('remote_file', 'data.toml')
print("正在从云端拉取数据...")
print(f"{remote_file}")
# 下载远程文件
remote_content = self.webdav_client.download_file(remote_file)
if remote_content is None:
print("拉取失败")
return
try:
# 解析远程数据
remote_data = toml.loads(remote_content)
remote_units = {}
for unit_id, unit_data in remote_data.items():
remote_units[unit_id] = MemoryUnit.from_dict(unit_id, unit_data)
# 显示预览信息
print("\n=== 同步预览 ===")
print("本地单元:", " ".join([unit.name for unit in self.memory_units.values()]))
print("云端单元:", " ".join([unit.name for unit in remote_units.values()]))
print(f"\n本地单元数: {len(self.memory_units)}, 云端单元数: {len(remote_units)}")
# 请求确认
response = input("\n确认拉取并合并数据?(y/N): ")
if response.lower() not in ['y', 'yes']:
print("拉取已取消")
return
# 获取合并策略
sync_config = self.sync_config.get('sync', {})
merge_strategy = sync_config.get('merge_strategy', 'remote_merge')
# 合并数据
merged_units = self.merge_units(self.memory_units, remote_units, merge_strategy)
# 更新本地数据
self.memory_units = merged_units
print(f"拉取完成,合并后共有 {len(self.memory_units)} 个记忆单元")
# 保存合并后的数据
self.save_data()
except Exception as e:
print(f"处理云端数据时出错: {e}")
def do_push(self, arg: str) -> None:
"""推送数据到云端
用法: push
"""
if not self.ensure_webdav_client():
return
webdav_config = self.sync_config['webdav']
remote_file = webdav_config.get('remote_file', 'data.toml')
print("正在推送数据到云端...")
# 首先拉取云端数据以进行合并
remote_units = {}
if self.webdav_client.file_exists(remote_file):
remote_content = self.webdav_client.download_file(remote_file)
if remote_content:
try:
remote_data = toml.loads(remote_content)
for unit_id, unit_data in remote_data.items():
remote_units[unit_id] = MemoryUnit.from_dict(unit_id, unit_data)
except Exception as e:
print(f"读取云端数据时出错: {e}")
# 获取合并策略
sync_config = self.sync_config.get('sync', {})
merge_strategy = sync_config.get('merge_strategy', 'remote_merge')
# 合并数据
if merge_strategy == "remote_merge":
# 与云端已有文件合并
merged_units = self.merge_units(remote_units, self.memory_units, merge_strategy)
else:
# 与本地已有文件合并(实际上就是使用本地数据覆盖)
merged_units = self.memory_units
# 显示预览信息
print("\n=== 同步预览 ===")
print("本地单元:", " ".join([unit.name for unit in self.memory_units.values()]))
print("云端单元:", " ".join([unit.name for unit in remote_units.values()]))
print(f"合并后单元:", " ".join([unit.name for unit in merged_units.values()]))
print(f"\n本地单元数: {len(self.memory_units)}, 云端单元数: {len(remote_units)}, 合并后单元数: {len(merged_units)}")
# 请求确认
response = input("\n确认推送并合并数据?(y/N): ")
if response.lower() not in ['y', 'yes']:
print("推送已取消")
return
# 准备上传数据
upload_data = {}
for unit_id, unit in merged_units.items():
upload_data[unit_id] = unit.to_dict()
# 转换为 TOML 格式
try:
upload_content = toml.dumps(upload_data)
except Exception as e:
print(f"序列化数据时出错: {e}")
return
# 上传到云端
if self.webdav_client.upload_file(remote_file, upload_content):
print(f"推送成功,上传了 {len(merged_units)} 个记忆单元")
# 更新本地数据为合并后的版本
self.memory_units = merged_units
self.save_data()
else:
print("推送失败")
def find_unit_by_prefix(self, prefix: str) -> Optional[str]:
matches = [unit_id for unit_id in self.memory_units.keys()
if unit_id.startswith(prefix)]
if len(matches) == 1:
return matches[0]
elif len(matches) > 1:
print(f"多个单元匹配前缀 '{prefix}': {', '.join(matches)}")
return None
else:
# 尝试通过名称查找
name_matches = [unit_id for unit_id, unit in self.memory_units.items()
if unit.name == prefix]
if len(name_matches) == 1:
return name_matches[0]
elif len(name_matches) > 1:
print(f"多个单元具有名称 '{prefix}'")
return None
return None
def do_list(self, arg: str) -> None:
"""列出记忆单元
用法: list [all|整数]
- list all: 显示所有记忆单元
- list 5: 显示复习次数<=5且今天到期的单元
- list: 同 'list 5'
"""
if not arg:
print("默认列出显示复习次数 <= 5, 且今天到期的单元")
arg = "5"
if arg.lower() == "all" or arg.lower() == "-a":
units_to_show = list(self.memory_units.values())
else:
try:
max_reviews = int(arg)
today = timer.get_daystamp()
units_to_show = [
unit for unit in self.memory_units.values()
if unit.algodata[sm2.SM2Algorithm.algo_name]['real_rept'] <= max_reviews
and unit.algodata[sm2.SM2Algorithm.algo_name]['next_date'] <= today
]
except ValueError:
print("无效参数. 使用 'all' 或整数. ")
return
if not units_to_show:
print("没有符合条件的记忆单元")
return
# 准备 tabulate 数据
table_data = []
for unit in units_to_show:
algodata = unit.algodata[sm2.SM2Algorithm.algo_name]
table_data.append([
unit.unit_id[:8] + "...",
unit.name,
algodata['real_rept'],
algodata['efactor'],
algodata['next_date'],
len(unit.attachments)
])
headers = ["ID", "名称", "复习次数", "EF", "下次复习时间", "附件"]
print(tabulate(table_data, headers=headers))
def do_select(self, arg: str) -> None:
"""通过 ID、前缀或名称选择记忆单元
用法: select <单元ID|前缀|名称>
"""
if not arg:
print("请提供单元 ID、前缀或名称")
return
unit_id = self.find_unit_by_prefix(arg)
if unit_id:
self.selected_unit = self.memory_units[unit_id]
print(f"已选择单元: {self.selected_unit.name} ({unit_id[:8]}...)")
self.prompt = f"({unit_id[:6]}..) $ "
else:
print(f"未找到唯一匹配 '{arg}' 的单元")
def do_attach(self, arg: str) -> None:
"""将字符串附加到选定的记忆单元
用法: attach <字符串>
"""
if not self.selected_unit:
print("未选择单元. 请先使用 'select'. ")
return
if not arg:
print("请提供要附加的字符串")
return
attachment = (arg, str(timer.get_timestamp()))
self.selected_unit.attachments.add(attachment)
print(f"附件已添加到 {self.selected_unit.name}")
def do_new(self, arg: str) -> None:
"""创建新的记忆单元
用法: new <名称>
"""
if not arg:
print("请提供新单元的名称")
return
unit = MemoryUnit(arg)
self.memory_units[unit.unit_id] = unit
self.selected_unit = unit
print(f"已创建新单元: {unit.name} ({unit.unit_id[:8]}...)")
def do_del(self, arg: str) -> None:
"""删除记忆单元
用法: del <单元ID|前缀|名称>
"""
if not arg:
print("请提供单元 ID、前缀或名称")
return
unit_id = self.find_unit_by_prefix(arg)
if unit_id:
unit_name = self.memory_units[unit_id].name
del self.memory_units[unit_id]
# 如果删除的是当前选中的单元,则清除选择
if self.selected_unit and self.selected_unit.unit_id == unit_id:
self.selected_unit = None
print(f"已删除单元: {unit_name}")
else:
print(f"未找到唯一匹配 '{arg}' 的单元")
def do_mark(self, arg: str) -> None:
"""使用评分0-5更新 SM-2 算法
用法: mark <评分>
"""
if not self.selected_unit:
print("未选择单元. 请先使用 'select'. ")
return
try:
rating = int(arg)
if rating < 0 or rating > 5:
print("评分必须在 0 到 5 之间")
return
except ValueError:
print("请提供有效的整数评分0-5")
return
# 显示评分标准以确认
print("\nSM-2 评分标准:")
print(" 5 - 完美回答")
print(" 4 - 犹豫后正确回答")
print(" 3 - 经过严重困难后回忆起正确答案")
print(" 2 - 错误回答; 但记得正确答案")
print(" 1 - 错误回答; 但正确答案似乎熟悉")
print(" 0 - 完全遗忘")
if self.selected_unit.algodata["SM-2"]["is_activated"] == 0:
print("此次为初次激活, 可以使用 mark 5")
# 请求确认
response = input(f"\n确认对 '{self.selected_unit.name}' 评分 {rating}(y/N): ")
if response.lower() not in ['y', 'yes']:
print("评分已取消")
return
if self.selected_unit.algodata["SM-2"]["is_activated"] == 0:
sm2.SM2Algorithm.revisor(self.selected_unit.algodata, rating, is_new_activation=1)
self.selected_unit.algodata["SM-2"]["is_activated"] = 1
else:
sm2.SM2Algorithm.revisor(self.selected_unit.algodata, rating)
print(f"已使用评分 {rating} 更新 {self.selected_unit.name} 的 SM-2 参数")
def do_show(self, arg: str) -> None:
"""显示选定记忆单元的详细信息
用法: show
"""
if not self.selected_unit:
print("未选择单元. 请先使用 'select'. ")
return
unit = self.selected_unit
algodata = unit.algodata[sm2.SM2Algorithm.algo_name]
print(f"\n记忆单元: {unit.name}")
print(f"ID: {unit.unit_id}")
print(f"创建时间: {unit.created_time}")
print(f"\nSM-2 算法数据:")
print(f" EFactor: {algodata['efactor']}")
print(f" 实际复习次数: {algodata['real_rept']}")
print(f" 当前重复次数: {algodata['rept']}")
print(f" 间隔: {algodata['interval']}")
print(f" 上次复习: {algodata['last_date']}")
print(f" 下次复习: {algodata['next_date']}")
print(f" 是否激活: {algodata['is_activated']}")
if unit.attachments:
print(f"\n附件 ({len(unit.attachments)}):")
# 按时间戳从最老到最新排序
sorted_attachments = sorted(unit.attachments, key=lambda x: float(x[1]))
for i, (text, timestamp) in enumerate(sorted_attachments, 1):
print(f" {i}. {text[:80]}{'...' if len(text) > 80 else ''} (于 {timestamp})")
else:
print("\n无附件")
def do_edit(self, arg: str) -> None:
"""使用 nano 编辑器编辑选定的记忆单元
用法: edit
"""
if not self.selected_unit:
print("未选择单元. 请先使用 'select'. ")
return
# 创建包含单元数据的临时文件
with tempfile.NamedTemporaryFile(mode='w', suffix='.toml', delete=False) as f:
temp_file = f.name
toml.dump({self.selected_unit.unit_id: self.selected_unit.to_dict()}, f)
try:
# 启动 nano 编辑器
subprocess.run(['nano', temp_file], check=True)
# 读取编辑后的文件
with open(temp_file, 'r', encoding='utf-8') as f:
edited_data = toml.load(f)
# 更新记忆单元
if self.selected_unit.unit_id in edited_data:
updated_data = edited_data[self.selected_unit.unit_id]
self.memory_units[self.selected_unit.unit_id] = MemoryUnit.from_dict(
self.selected_unit.unit_id, updated_data
)
self.selected_unit = self.memory_units[self.selected_unit.unit_id]
print("单元更新成功")
else:
print("错误: 在编辑后的文件中未找到单元 ID")
except subprocess.CalledProcessError:
print("编辑器已关闭但未保存")
except Exception as e:
print(f"编辑单元时出错: {e}")
finally:
# 清理临时文件
if os.path.exists(temp_file):
os.unlink(temp_file)
def do_reset(self, arg: str) -> None:
"""重置选定单元的记忆数据
用法: reset
"""
if not self.selected_unit:
print("未选择单元. 请先使用 'select'. ")
return
# 确认重置操作
response = input(f"确认重置单元 '{self.selected_unit.name}' 的记忆数据?(y/N): ")
if response.lower() not in ['y', 'yes']:
print("重置已取消")
return
# 重置记忆数据
unit = self.selected_unit
unit.algodata = {
sm2.SM2Algorithm.algo_name: {
'efactor': 2.5,
'real_rept': 0,
'rept': 0,
'interval': 0,
'last_date': 0,
'next_date': 0,
'is_activated': 0,
'last_modify': timer.get_timestamp()
}
}
print(f"已重置单元 '{unit.name}' 的记忆数据")
def do_clear(self, arg: str) -> None:
"""清屏
用法: clear
"""
os.system('clear')
print(f"当前 UNIX 日时间戳: {timer.get_daystamp()}")
def do_help(self, arg: str) -> None:
"""显示帮助和 SM-2 评分标准
用法: help
"""
print("\nDynaNote Shell 命令:")
print(" list [all|整数] - 列出记忆单元")
print(" select <id|前缀|名称> - 选择记忆单元")
print(" attach <字符串> - 将字符串附加到选定单元")
print(" new <名称> - 创建新记忆单元")
print(" del <id|前缀|名称> - 删除记忆单元")
print(" mark <评分> - 使用评分0-5更新 SM-2")
print(" show - 显示选定单元详情")
print(" edit - 使用 nano 编辑选定单元")
print(" pull - 从云端拉取数据")
print(" push - 推送数据到云端")
print(" clear - 清屏")
print(" help - 显示此帮助")
print(" exit - 退出 shell")
print("\nSM-2 评分标准:")
print(" 5 - 完美回答")
print(" 4 - 犹豫后正确回答")
print(" 3 - 经过严重困难后回忆起正确答案")
print(" 2 - 错误回答; 但记得正确答案")
print(" 1 - 错误回答; 但正确答案似乎熟悉")
print(" 0 - 完全遗忘")
print("\n同步配置:")
print(" 编辑 sync.toml 文件配置 WebDAV 同步")
print(" 合并策略: remote_merge (与云端合并) 或 local_merge (与本地合并)")
print(" 冲突解决: 相同单元取时间更为新的版本")
def do_exit(self, arg: str) -> bool:
"""退出 shell
用法: exit
"""
self.save_data()
return True
def do_quit(self, arg: str) -> bool:
"""退出 shellexit 的别名)
用法: quit
"""
return self.do_exit(arg)
def do_time(self, arg: str) -> bool:
print(f"UNIX 日时间戳: {timer.get_daystamp()}")
def do_dev(self, arg: str) -> bool:
os.system(f"nano {__file__}")
def do_save(self, arg: str) -> bool:
self.save_data()
# 别名
def do_sel(self, arg: str) -> None:
"""select 命令的别名"""
self.do_select(arg)
def do_add(self, arg: str) -> None:
"""attach 命令的别名"""
self.do_attach(arg)
def do_nano(self, arg: str) -> None:
"""edit 命令的别名"""
self.do_edit(arg)
def do_ls(self, arg: str) -> None:
"""list 命令的别名"""
self.do_list(arg)
def do_la(self, arg: str) -> None:
"""list -a 命令的别名"""
self.do_list("all")
def do_sh(self, arg: str) -> None:
"""show 命令的别名"""
self.do_show(arg)
def do_rm(self, arg: str) -> None:
"""del 命令的别名"""
self.do_del(arg)
def main():
"""主入口点"""
try:
DynaNoteShell().cmdloop()
except KeyboardInterrupt:
print("\n用户中断")
except Exception as e:
print(f"错误: {e}")
if __name__ == '__main__':
main()