feat: 自动音频播放与改进设计

This commit is contained in:
2025-12-21 05:32:58 +08:00
parent e57cea7219
commit f5e0417292
12 changed files with 98 additions and 144 deletions

View File

@@ -14,6 +14,11 @@ scheduled_num = 8
# UTC 时间戳修正 仅用于 UNIX 日时间戳的生成修正, 单位为秒 # UTC 时间戳修正 仅用于 UNIX 日时间戳的生成修正, 单位为秒
timezone_offset = +28800 # 中国标准时间 (UTC+8) timezone_offset = +28800 # 中国标准时间 (UTC+8)
[interface]
[interface.memorizor]
autovoice = true # 自动语音播放, 仅限于 recognition 组件
[puzzles] # 谜题默认配置 [puzzles] # 谜题默认配置
[puzzles.mcq] [puzzles.mcq]
@@ -34,6 +39,9 @@ audio = "playsound" # 可选项: playsound(通用), termux(仅用于支持 Andro
tts = "edgetts" # 可选项: edgetts tts = "edgetts" # 可选项: edgetts
llm = "openai" # 可选项: openai llm = "openai" # 可选项: openai
[providers.tts.edgetts] # EdgeTTS 设置
voice = "zh-CN-XiaoxiaoNeural" # 可选项: zh-CN-YunjianNeural (男声), zh-CN-XiaoxiaoNeural (女声)
[providers.llm.openai] # 与 OpenAI 相容的语言模型接口服务设置 [providers.llm.openai] # 与 OpenAI 相容的语言模型接口服务设置
url = "" url = ""
key = "" key = ""

View File

@@ -46,14 +46,14 @@ class MemScreen(Screen):
) -> None: ) -> None:
super().__init__(name, id, classes) super().__init__(name, id, classes)
self.atoms = atoms self.atoms = atoms
for i in self.atoms:
i.do_eval()
self.phaser = Phaser(atoms) self.phaser = Phaser(atoms)
# logger.debug(self.phaser.state) # logger.debug(self.phaser.state)
self.procession: Procession = self.phaser.current_procession() # type: ignore self.procession: Procession = self.phaser.current_procession() # type: ignore
self.atom: pt.Atom = self.procession.current_atom self.atom: pt.Atom = self.procession.current_atom
# logger.debug(self.phaser.state) # logger.debug(self.phaser.state)
# self.procession.forward(1) # self.procession.forward(1)
for i in atoms:
i.do_eval()
def on_mount(self): def on_mount(self):
self.load_puzzle() self.load_puzzle()
@@ -144,8 +144,21 @@ class MemScreen(Screen):
self.atom.lock(1) self.atom.lock(1)
def action_play_voice(self): def action_play_voice(self):
self.run_worker(self.play_voice, exclusive=True, thread=True)
def play_voice(self):
"""朗读当前内容""" """朗读当前内容"""
pass from heurams.services.audio_service import play_by_path
from pathlib import Path
from heurams.services.hasher import get_md5
path = Path(config_var.get()['paths']["cache_dir"])
path = path / f"{get_md5(self.atom.registry['nucleon'].metadata["formation"]["tts_text"])}.wav"
if path.exists():
play_by_path(path)
else:
from heurams.services.tts_service import convertor
convertor(self.atom.registry['nucleon'].metadata["formation"]["tts_text"], path)
play_by_path(path)
def action_toggle_dark(self): def action_toggle_dark(self):
self.app.action_toggle_dark() self.app.action_toggle_dark()

View File

@@ -39,7 +39,9 @@ class PrecachingScreen(Screen):
self.desc = desc self.desc = desc
for i in nucleons: for i in nucleons:
i: pt.Nucleon i: pt.Nucleon
i.do_eval() atom = pt.Atom()
atom.link("nucleon", i)
atom.do_eval()
# print("完成 EVAL") # print("完成 EVAL")
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
@@ -95,11 +97,9 @@ class PrecachingScreen(Screen):
cache_dir.mkdir(parents=True, exist_ok=True) cache_dir.mkdir(parents=True, exist_ok=True)
cache_file = cache_dir / f"{hasher.get_md5(text)}.wav" cache_file = cache_dir / f"{hasher.get_md5(text)}.wav"
if not cache_file.exists(): if not cache_file.exists():
try: # TODO: 调用模块消除tts耦合 try:
import edge_tts as tts from heurams.services.tts_service import convertor
convertor(text, cache_file)
communicate = tts.Communicate(text, "zh-CN-XiaoxiaoNeural")
communicate.save_sync(str(cache_file))
return 1 return 1
except Exception as e: except Exception as e:
print(f"预缓存失败 '{text}': {e}") print(f"预缓存失败 '{text}': {e}")
@@ -178,7 +178,9 @@ class PrecachingScreen(Screen):
self.total = len(nu) self.total = len(nu)
for i in nu: for i in nu:
i: pt.Nucleon i: pt.Nucleon
i.do_eval() atom = pt.Atom()
atom.link("nucleon", i)
atom.do_eval()
return self.precache_by_list(nu) return self.precache_by_list(nu)
def on_button_pressed(self, event: Button.Pressed) -> None: def on_button_pressed(self, event: Button.Pressed) -> None:

View File

@@ -61,7 +61,6 @@ class MCQPuzzle(BasePuzzleWidget):
self.puzzle.refresh() self.puzzle.refresh()
def compose(self): def compose(self):
self.atom.registry["nucleon"].do_eval()
setting: Setting = self.atom.registry["nucleon"].metadata["orbital"]["puzzles"][ setting: Setting = self.atom.registry["nucleon"].metadata["orbital"]["puzzles"][
self.alia self.alia
] ]

View File

@@ -49,6 +49,10 @@ class Recognition(BasePuzzleWidget):
self.alia = alia self.alia = alia
def compose(self): def compose(self):
from heurams.context import config_var
autovoice = config_var.get()['interface']['memorizor']['autovoice']
if autovoice:
self.screen.action_play_voice() # type: ignore
cfg: RecognitionConfig = self.atom.registry["orbital"]["puzzles"][self.alia] cfg: RecognitionConfig = self.atom.registry["orbital"]["puzzles"][self.alia]
delim = self.atom.registry["nucleon"].metadata["formation"]["delimiter"] delim = self.atom.registry["nucleon"].metadata["formation"]["delimiter"]
replace_dict = { replace_dict = {

View File

@@ -2,8 +2,9 @@
SM-15 接口兼容实现, 基于 SM-15 算法的逆向工程 SM-15 接口兼容实现, 基于 SM-15 算法的逆向工程
全局状态保存在文件中, 项目状态通过 algodata 字典传递 全局状态保存在文件中, 项目状态通过 algodata 字典传递
基于: https://github.com/kazuaki/sm.js 基于: https://github.com/slaypni/sm.js
原始 CoffeeScript 代码: (c) 2014 Kazuaki Tanida (MIT 许可证) 原始 CoffeeScript 代码: (c) 2014 Kazuaki Tanida
MIT 许可证
""" """
import datetime import datetime

View File

@@ -1,5 +1,5 @@
""" """
基于: https://github.com/kazuaki/sm.js 基于: https://github.com/slaypni/sm.js
原始 CoffeeScript 代码: (c) 2014 Kazuaki Tanida 原始 CoffeeScript 代码: (c) 2014 Kazuaki Tanida
MIT 许可证 MIT 许可证

View File

@@ -62,7 +62,6 @@ class Atom:
"orbital_fmt": "toml", "orbital_fmt": "toml",
"runtime": {"locked": False, "min_rate": 0x3F3F3F3F, "newact": False}, "runtime": {"locked": False, "min_rate": 0x3F3F3F3F, "newact": False},
} }
self.do_eval()
logger.debug("Atom 初始化完成") logger.debug("Atom 初始化完成")
def link(self, key, value): def link(self, key, value):
@@ -70,7 +69,6 @@ class Atom:
if key in self.registry.keys(): if key in self.registry.keys():
self.registry[key] = value self.registry[key] = value
logger.debug("'%s' 已链接, 触发 do_eval", key) logger.debug("'%s' 已链接, 触发 do_eval", key)
self.do_eval()
if key == "electron": if key == "electron":
if self.registry["electron"].is_activated() == 0: if self.registry["electron"].is_activated() == 0:
self.registry["runtime"]["newact"] = True self.registry["runtime"]["newact"] = True
@@ -78,6 +76,56 @@ class Atom:
logger.error("尝试链接不受支持的键: '%s'", key) logger.error("尝试链接不受支持的键: '%s'", key)
raise ValueError("不受支持的原子元数据链接操作") raise ValueError("不受支持的原子元数据链接操作")
def do_eval(self):
"""
执行并以结果替换当前单元的所有 eval 语句
TODO: 带有限制的 eval, 异步/多线程执行避免堵塞
"""
logger.debug("EVAL 开始")
# eval 环境设置
def eval_with_env(s: str):
default = config_var.get()["puzzles"]
payload = self.registry['nucleon'].payload
metadata = self.registry['nucleon'].metadata
eval_value = eval(s)
if isinstance(eval_value, (int, float)):
ret = str(eval_value)
else:
ret = eval_value
logger.debug(
"eval 执行成功: '%s' -> '%s'",
s,
str(ret)[:50] + "..." if len(ret) > 50 else ret,
)
return ret
def traverse(data, modifier):
if isinstance(data, dict):
for key, value in data.items():
data[key] = traverse(value, modifier)
return data
elif isinstance(data, list):
for i, item in enumerate(data):
data[i] = traverse(item, modifier)
return data
elif isinstance(data, tuple):
return tuple(traverse(item, modifier) for item in data)
else:
if isinstance(data, str):
if data.startswith("eval:"):
logger.debug("发现 eval 表达式: '%s'", data[5:])
return modifier(data[5:])
return data
try:
traverse(self.registry['nucleon'].payload, eval_with_env)
traverse(self.registry['nucleon'].metadata, eval_with_env)
traverse(self.registry['orbital'], eval_with_env)
except Exception as e:
ret = f"此 eval 实例发生错误: {e}"
logger.warning(ret)
logger.debug("EVAL 完成")
def minimize(self, rating): def minimize(self, rating):
"""效果等同于 self.registry['runtime']['min_rate'] = min(rating, self.registry['runtime']['min_rate']) """效果等同于 self.registry['runtime']['min_rate'] = min(rating, self.registry['runtime']['min_rate'])
@@ -114,80 +162,6 @@ class Atom:
else: else:
logger.debug("禁止总评分") logger.debug("禁止总评分")
def do_eval(self):
"""
执行并以结果替换当前单元的所有 eval 语句
TODO: 带有限制的 eval, 异步/多线程执行避免堵塞
"""
logger.debug("Atom.do_eval 开始")
# eval 环境设置
def eval_with_env(s: str):
# 初始化默认值
nucleon = self.registry["nucleon"]
default = {}
metadata = {}
try:
default = config_var.get()["puzzles"]
metadata = nucleon.metadata
except Exception:
# 如果无法获取配置或元数据, 使用空字典
logger.debug("无法获取配置或元数据, 使用空字典")
pass
try:
eval_value = eval(s)
if isinstance(eval_value, (list, dict)):
ret = eval_value
else:
ret = str(eval_value)
logger.debug(
"eval 执行成功: '%s' -> '%s'",
s,
str(ret)[:50] + "..." if len(ret) > 50 else ret,
)
except Exception as e:
ret = f"此 eval 实例发生错误: {e}"
logger.warning("eval 执行错误: '%s' -> %s", s, e)
return ret
def traverse(data, modifier):
if isinstance(data, dict):
for key, value in data.items():
data[key] = traverse(value, modifier)
return data
elif isinstance(data, list):
for i, item in enumerate(data):
data[i] = traverse(item, modifier)
return data
elif isinstance(data, tuple):
return tuple(traverse(item, modifier) for item in data)
else:
if isinstance(data, str):
if data.startswith("eval:"):
logger.debug("发现 eval 表达式: '%s'", data[5:])
return modifier(data[5:])
return data
# 如果 nucleon 存在且有 do_eval 方法, 调用它
nucleon = self.registry["nucleon"]
if nucleon is not None and hasattr(nucleon, "do_eval"):
nucleon.do_eval()
logger.debug("已调用 nucleon.do_eval")
# 如果 electron 存在且其 algodata 包含 eval 字符串, 遍历它
electron = self.registry["electron"]
if electron is not None and hasattr(electron, "algodata"):
traverse(electron.algodata, eval_with_env)
logger.debug("已处理 electron algodata eval")
# 如果 orbital 存在且是字典, 遍历它
orbital = self.registry["orbital"]
if orbital is not None and isinstance(orbital, dict):
traverse(orbital, eval_with_env)
logger.debug("orbital eval 完成")
logger.debug("Atom.do_eval 完成")
def persist(self, key): def persist(self, key):
logger.debug("Atom.persist: key='%s'", key) logger.debug("Atom.persist: key='%s'", key)
path: pathlib.Path | None = self.registry[key + "_path"] path: pathlib.Path | None = self.registry[key + "_path"]

View File

@@ -49,54 +49,6 @@ class Nucleon:
def __hash__(self): def __hash__(self):
return hash(self.ident) return hash(self.ident)
def do_eval(self):
"""
执行并以结果替换当前单元的所有 eval 语句
TODO: 带有限制的 eval, 异步/多线程执行避免堵塞
"""
logger.debug("Nucleon.do_eval 开始")
# eval 环境设置
def eval_with_env(s: str):
try:
nucleon = self
eval_value = eval(s)
if isinstance(eval_value, (int, float)):
ret = str(eval_value)
else:
ret = eval_value
logger.debug(
"eval 执行成功: '%s' -> '%s'",
s,
str(ret)[:50] + "..." if len(ret) > 50 else ret,
)
except Exception as e:
ret = f"此 eval 实例发生错误: {e}"
logger.warning("eval 执行错误: '%s' -> %s", s, e)
return ret
def traverse(data, modifier):
if isinstance(data, dict):
for key, value in data.items():
data[key] = traverse(value, modifier)
return data
elif isinstance(data, list):
for i, item in enumerate(data):
data[i] = traverse(item, modifier)
return data
elif isinstance(data, tuple):
return tuple(traverse(item, modifier) for item in data)
else:
if isinstance(data, str):
if data.startswith("eval:"):
logger.debug("发现 eval 表达式: '%s'", data[5:])
return modifier(data[5:])
return data
traverse(self.payload, eval_with_env)
traverse(self.metadata, eval_with_env)
logger.debug("Nucleon.do_eval 完成")
@staticmethod @staticmethod
def placeholder(): def placeholder():
"""生成一个占位原子核""" """生成一个占位原子核"""

View File

@@ -3,6 +3,7 @@ import pathlib
import edge_tts import edge_tts
from heurams.services.logger import get_logger from heurams.services.logger import get_logger
from heurams.context import config_var
from .base import BaseTTS from .base import BaseTTS
@@ -18,7 +19,7 @@ class EdgeTTS(BaseTTS):
try: try:
communicate = edge_tts.Communicate( communicate = edge_tts.Communicate(
text, text,
"zh-CN-YunjianNeural", config_var.get()['providers']['tts']['edgetts']["voice"],
) )
logger.debug("EdgeTTS 通信对象创建成功, 正在保存音频") logger.debug("EdgeTTS 通信对象创建成功, 正在保存音频")
communicate.save_sync(str(path)) communicate.save_sync(str(path))

View File

@@ -9,5 +9,5 @@ logger = get_logger(__name__)
play_by_path: Callable = prov[config_var.get()["services"]["audio"]].play_by_path play_by_path: Callable = prov[config_var.get()["services"]["audio"]].play_by_path
logger.debug( logger.debug(
"音频服务初始化完成, 使用 provider: %s", config_var.get()["services"]["audio"] "音频服务初始化完成, 使用 Provider: %s", config_var.get()["services"]["audio"]
) )

View File

@@ -2,12 +2,12 @@
from typing import Callable from typing import Callable
from heurams.context import config_var from heurams.context import config_var
from heurams.providers.tts import TTSs from heurams.providers.tts import providers as prov
from heurams.services.logger import get_logger from heurams.services.logger import get_logger
logger = get_logger(__name__) logger = get_logger(__name__)
convert: Callable = TTSs[config_var.get().get("tts_provider")] convertor: Callable = prov[config_var.get()["services"]["tts"]].convert
logger.debug( logger.debug(
"TTS服务初始化完成, 使用 provider: %s", config_var.get().get("tts_provider") "TTS服务初始化完成, 使用 provider: %s", config_var.get()["services"]["tts"]
) )