fix: 增加日志

This commit is contained in:
2025-12-15 15:39:05 +08:00
parent 874494874c
commit 6efd041f72
51 changed files with 635 additions and 1992 deletions

266
AGENTS.md Normal file
View File

@@ -0,0 +1,266 @@
# Agent Guide for HeurAMS
This document contains essential information for AI agents working in the HeurAMS (Heuristic Assisted Memory Scheduler) codebase.
## Project Overview
HeurAMS is a Python-based memory scheduling application with a TUI interface built using Textual. It implements a spaced repetition system with configurable algorithms, puzzles, and data persistence.
**Key Technologies**:
- Python 3 (setuptools)
- Textual (TUI framework)
- TOML for configuration
- JSON and TOML for data persistence
- Bidirectional mapping via `bidict`
**Project Structure**:
```
src/heurams/
├── __init__.py # Package entry point
├── context.py # Global context, paths, config context manager
├── services/ # Core services (config, logger, timer, audio, TTS)
├── kernel/ # Core business logic
│ ├── algorithms/ # Spaced repetition algorithms (FSRS, SM2)
│ ├── particles/ # Data models (Atom, Electron, Nucleon, Orbital)
│ ├── puzzles/ # Puzzle types (MCQ, cloze, recognition)
│ └── reactor/ # Scheduling and processing logic
├── providers/ # External service providers
│ ├── audio/ # Audio playback implementations
│ ├── tts/ # Text-to-speech implementations
│ └── llm/ # LLM integrations
├── interface/ # Textual TUI interface
│ ├── widgets/ # UI widget components
│ ├── screens/ # Application screens
│ └── __main__.py # Application entry point
└── default/ # Default configuration and data templates
```
## Essential Commands
### Development Setup
```bash
# Install dependencies
pip install -r requirements.txt
# Install package in development mode
pip install -e .
# Build package (produces dist/)
python -m build
```
### Running the Application
```bash
# Run the TUI application
python -m heurams.interface
# Alternative: if installed via pip
heurams # (if console scripts are configured)
```
### Testing
```bash
# Run tests (if test suite exists)
pytest
# Run with coverage
pytest --cov=heurams
```
**Note**: The test directory appears to have been removed. Previous test files were located in `tests/` (deleted per git status). If adding new tests, follow pytest conventions and place them in a `tests/` directory at the project root.
### Linting and Formatting
No specific linting or formatting configuration observed. Use standard Python conventions.
## Code Organization
### Key Modules
1. **context.py** (`src/heurams/context.py`):
- Defines `rootdir`, `workdir`, and `config_var` (ContextVar) for configuration
- Provides `ConfigContext` context manager for temporary configuration switching
- Configuration is loaded from `config/config.toml` relative to workdir, falling back to default config
2. **Services** (`src/heurams/services/`):
- `config.py`: `ConfigFile` class for TOML configuration management
- `logger.py`: Comprehensive logging setup with rotation and namespacing
- `timer.py`: Time services with configurable overrides for testing
- `audio_service.py`: Audio playback abstraction
- `tts_service.py`: Text-to-speech service abstraction
3. **Kernel** (`src/heurams/kernel/`):
- **Particles**: Data models representing memory items:
- `Atom`: Composite of Nucleon (content), Electron (algorithm data), Orbital (strategy)
- `Nucleon`: Content data (TOML)
- `Electron`: Algorithm data (JSON)
- `Orbital`: Strategy configuration (TOML)
- **Algorithms**: Spaced repetition algorithm implementations (FSRS, SM2)
- **Puzzles**: Interactive puzzle types for review sessions
- **Reactor**: Scheduling and state management
4. **Providers** (`src/heurams/providers/`):
- Abstract base classes and implementations for external services
- Audio playback (`playsound_audio.py`, `termux_audio.py`)
- TTS (`edge_tts.py`)
- LLM (`openai.py`)
5. **Interface** (`src/heurams/interface/`):
- Textual-based TUI with multiple screens
- Widgets for different puzzle types
- CSS styling in `css/` directory
## Naming Conventions and Style Patterns
### Code Style
- **Indentation**: 4 spaces (no tabs observed)
- **Line length**: No explicit limit, typical Python conventions
- **Imports**: Standard library first, then third-party, then local modules
- **Type hints**: Used extensively throughout codebase
- **Docstrings**: Chinese and English mixed, some functions have descriptive docstrings
### Naming
- **Classes**: `CamelCase` (e.g., `ConfigFile`, `BaseAlgorithm`, `HeurAMSApp`)
- **Functions/Methods**: `snake_case` (e.g., `get_daystamp`, `setup_logging`)
- **Variables**: `snake_case` (e.g., `config_var`, `atom_registry`)
- **Constants**: `UPPER_SNAKE_CASE` (e.g., `DEFAULT_LOG_LEVEL`)
### Patterns Observed
- Use `typing` module for type annotations (`TypedDict`, `Union`, `Optional`, etc.)
- Context variables via `contextvars` for configuration management
- Registry pattern using `bidict` for global object tracking (e.g., `atom_registry`)
- Service abstraction with provider pattern (audio, TTS, LLM)
- Configuration-driven behavior with fallback defaults
## Testing Approach
**Current State**: Test directory removed, but `.pytest_cache` indicates pytest was used.
**If adding tests**:
- Place tests in `tests/` directory at project root
- Follow pytest conventions
- Use `pytest.fixture` for shared test resources
- Mock external dependencies (audio, TTS, LLM providers)
- Test configuration overrides via `ConfigContext`
**Testable Components**:
- Algorithm implementations (FSRS, SM2)
- Puzzle logic validation
- Configuration loading and saving
- Atom persistence and evaluation
## Important Gotchas
### Configuration Loading
- Configuration is loaded from `config/config.toml` relative to the **current working directory**
- If not found, falls back to `src/heurams/default/config/config.toml`
- The `config_var` ContextVar is used throughout the codebase to access configuration
- Use `ConfigContext` for temporary configuration changes in tests or specific operations
### Path Handling
- `rootdir`: Package directory (`src/heurams/`)
- `workdir`: Current working directory (set at runtime)
- Data paths (`nucleon_dir`, `electron_dir`, etc.) are relative to `workdir` by default
- The application may change working directory in development mode (see `__main__.py` lines 61-63)
### Eval Security
- The `Atom.do_eval()` method evaluates strings prefixed with `"eval:"` in data structures
- This is a security consideration - user-provided content could execute arbitrary code
- Evaluation is limited to a sandboxed environment but caution is advised
### Logging
- Logging is configured automatically via `logger.py` setup
- Logs go to `/tmp/heurams.log` by default (configurable)
- Logger names are prefixed with `"heurams."` automatically
- Third-party library log levels are set to WARNING to reduce noise
### Time Overrides
- `daystamp_override` and `timestamp_override` in config allow time manipulation for testing
- When set to values other than `-1`, they override `get_daystamp()` and `get_timestamp()`
- Useful for reproducible testing of scheduling algorithms
## Configuration
### Default Configuration (`src/heurams/default/config/config.toml`)
```toml
# Debug settings
persist_to_file = 1
daystamp_override = -1
timestamp_override = -1
quick_pass = 0
# Default number of new memory items
tasked_number = 8
# Timezone offset for daystamp calculation (seconds)
timezone_offset = +28800 # China Standard Time (UTC+8)
# Puzzle defaults
[puzzles.mcq]
max_riddles_num = 2
[puzzles.cloze]
min_denominator = 3
# Data paths (relative to workdir or absolute)
[paths]
nucleon_dir = "./data/nucleon"
electron_dir = "./data/electron"
orbital_dir = "./data/orbital"
cache_dir = "./data/cache"
```
### User Configuration
Place `config.toml` in `config/` directory relative to where the application is run. The structure should match the default configuration.
## Running the Application
### Development Mode
When running from within the project directory, the application may change working directory to ensure data paths resolve correctly (see `__main__.py`).
### Production/Installed Mode
When installed via pip, the working directory is wherever the command is executed. Ensure a `config/` directory with `config.toml` exists, or rely on defaults.
### Entry Points
- Primary: `python -m heurams.interface`
- The `__main__.py` performs environment checks, ensures directories exist, then launches the Textual app
## Development Setup
### Dependencies
- Listed in `requirements.txt`: `bidict`, `playsound`, `textual`, `toml`
- Additional dependencies may be needed for specific providers (e.g., `edge-tts`)
### Adding New Features
**New Puzzle Type**:
1. Add class in `src/heurams/kernel/puzzles/`
2. Inherit from `BasePuzzle`
3. Add corresponding widget in `src/heurams/interface/widgets/`
4. Integrate into appropriate screens
**New Algorithm**:
1. Add class in `src/heurams/kernel/algorithms/`
2. Inherit from `BaseAlgorithm`
3. Implement `revisor()`, `is_due()`, `rate()`, `nextdate()` methods
4. Update algorithm selection logic if needed
**New Provider**:
1. Add implementation in relevant provider directory (`audio/`, `tts/`, `llm/`)
2. Inherit from base class in same directory
3. Register in provider registry (see `__init__.py` in provider directories)
### Data Models
- **Nucleon**: Content data (TOML), contains metadata and puzzle content
- **Electron**: Algorithm state (JSON), tracks review history and scheduling
- **Orbital**: Strategy configuration (TOML), defines review parameters
- **Atom**: Composite of all three, managed via `atom_registry`
### Persistence
- Data is saved to paths specified in configuration
- Formats: TOML for Nucleon/Orbital, JSON for Electron
- Automatic directory creation on save
- The `Atom.persist()` method handles saving individual components
---
*Last Updated: Based on codebase analysis on 2025-12-15*

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
bidict==0.23.1
playsound==1.2.2
textual==6.9.0
toml==0.10.2

View File

@@ -4,6 +4,9 @@ from .screens.dashboard import DashboardScreen
from .screens.nucreator import NucleonCreatorScreen
from .screens.precache import PrecachingScreen
from .screens.about import AboutScreen
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class HeurAMSApp(App):
@@ -34,21 +37,27 @@ class HeurAMSApp(App):
def environment_check():
from pathlib import Path
logger.debug("检查环境路径")
for i in config_var.get()["paths"].values():
i = Path(i)
if not i.exists():
logger.info("创建目录: %s", i)
print(f"创建 {i}")
i.mkdir(exist_ok=True, parents=True)
else:
logger.debug("目录已存在: %s", i)
print(f"找到 {i}")
logger.debug("环境检查完成")
def is_subdir(parent, child):
try:
child.relative_to(parent)
logger.debug("is_subdir: %s%s 的子目录", child, parent)
return 1
except:
logger.debug("is_subdir: %s 不是 %s 的子目录", child, parent)
return 0

View File

@@ -1,4 +1,7 @@
from .sm2 import SM2Algorithm
from heurams.services.logger import get_logger
logger = get_logger(__name__)
__all__ = [
"SM2Algorithm",
@@ -8,3 +11,5 @@ algorithms = {
"SM-2": SM2Algorithm,
"supermemo2": SM2Algorithm,
}
logger.debug("算法模块初始化完成,注册的算法: %s", list(algorithms.keys()))

View File

@@ -1,5 +1,8 @@
import heurams.services.timer as timer
from typing import TypedDict
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class BaseAlgorithm:
@@ -30,19 +33,27 @@ class BaseAlgorithm:
cls, algodata: dict, feedback: int = 5, is_new_activation: bool = False
) -> None:
"""迭代记忆数据"""
logger.debug("BaseAlgorithm.revisor 被调用algodata keys: %s, feedback: %d, is_new_activation: %s",
list(algodata.keys()) if algodata else [], feedback, is_new_activation)
pass
@classmethod
def is_due(cls, algodata) -> int:
"""是否应该复习"""
logger.debug("BaseAlgorithm.is_due 被调用algodata keys: %s",
list(algodata.keys()) if algodata else [])
return 1
@classmethod
def rate(cls, algodata) -> str:
"""获取评分信息"""
logger.debug("BaseAlgorithm.rate 被调用algodata keys: %s",
list(algodata.keys()) if algodata else [])
return ""
@classmethod
def nextdate(cls, algodata) -> int:
"""获取下一次记忆时间戳"""
logger.debug("BaseAlgorithm.nextdate 被调用algodata keys: %s",
list(algodata.keys()) if algodata else [])
return -1

View File

@@ -1 +1,6 @@
# FSRS 算法模块, 尚未就绪
from heurams.services.logger import get_logger
logger = get_logger(__name__)
logger.info("FSRS算法模块尚未实现")

View File

@@ -1,6 +1,9 @@
from .base import BaseAlgorithm
import heurams.services.timer as timer
from typing import TypedDict
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class SM2Algorithm(BaseAlgorithm):
@@ -38,7 +41,10 @@ class SM2Algorithm(BaseAlgorithm):
Args:
quality (int): 记忆保留率量化参数
"""
logger.debug("SM2.revisor 开始feedback: %d, is_new_activation: %s", feedback, is_new_activation)
if feedback == -1:
logger.debug("feedback 为 -1跳过更新")
return
algodata[cls.algo_name]["efactor"] = algodata[cls.algo_name]["efactor"] + (
@@ -47,27 +53,35 @@ class SM2Algorithm(BaseAlgorithm):
algodata[cls.algo_name]["efactor"] = max(
1.3, algodata[cls.algo_name]["efactor"]
)
logger.debug("更新 efactor: %f", algodata[cls.algo_name]["efactor"])
if feedback < 3:
algodata[cls.algo_name]["rept"] = 0
algodata[cls.algo_name]["interval"] = 0
logger.debug("feedback < 3重置 rept 和 interval")
else:
algodata[cls.algo_name]["rept"] += 1
logger.debug("递增 rept: %d", algodata[cls.algo_name]["rept"])
algodata[cls.algo_name]["real_rept"] += 1
logger.debug("递增 real_rept: %d", algodata[cls.algo_name]["real_rept"])
if is_new_activation:
algodata[cls.algo_name]["rept"] = 0
algodata[cls.algo_name]["efactor"] = 2.5
logger.debug("新激活,重置 rept 和 efactor")
if algodata[cls.algo_name]["rept"] == 0:
algodata[cls.algo_name]["interval"] = 1
logger.debug("rept=0设置 interval=1")
elif algodata[cls.algo_name]["rept"] == 1:
algodata[cls.algo_name]["interval"] = 6
logger.debug("rept=1设置 interval=6")
else:
algodata[cls.algo_name]["interval"] = round(
algodata[cls.algo_name]["interval"] * algodata[cls.algo_name]["efactor"]
)
logger.debug("rept>1计算 interval: %d", algodata[cls.algo_name]["interval"])
algodata[cls.algo_name]["last_date"] = timer.get_daystamp()
algodata[cls.algo_name]["next_date"] = (
@@ -75,14 +89,26 @@ class SM2Algorithm(BaseAlgorithm):
)
algodata[cls.algo_name]["last_modify"] = timer.get_timestamp()
logger.debug("更新日期: last_date=%d, next_date=%d, last_modify=%f",
algodata[cls.algo_name]["last_date"],
algodata[cls.algo_name]["next_date"],
algodata[cls.algo_name]["last_modify"])
@classmethod
def is_due(cls, algodata):
return algodata[cls.algo_name]["next_date"] <= timer.get_daystamp()
result = algodata[cls.algo_name]["next_date"] <= timer.get_daystamp()
logger.debug("SM2.is_due: next_date=%d, current_daystamp=%d, result=%s",
algodata[cls.algo_name]["next_date"], timer.get_daystamp(), result)
return result
@classmethod
def rate(cls, algodata):
return str(algodata[cls.algo_name]["efactor"])
efactor = algodata[cls.algo_name]["efactor"]
logger.debug("SM2.rate: efactor=%f", efactor)
return str(efactor)
@classmethod
def nextdate(cls, algodata) -> int:
return algodata[cls.algo_name]["next_date"]
next_date = algodata[cls.algo_name]["next_date"]
logger.debug("SM2.nextdate: %d", next_date)
return next_date

View File

@@ -4,6 +4,10 @@ Particle 模块 - 粒子对象系统
提供闪卡所需对象, 使用物理学粒子的领域驱动设计
"""
from heurams.services.logger import get_logger
logger = get_logger(__name__)
logger.debug("粒子模块已加载")
from .electron import Electron
from .nucleon import Nucleon
from .orbital import Orbital

View File

@@ -8,6 +8,9 @@ import toml
import json
import bidict
from heurams.context import config_var
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class AtomRegister(TypedDict):
@@ -33,8 +36,10 @@ class Atom:
"""
def __init__(self, ident=""):
logger.debug("创建 Atom 实例ident: '%s'", ident)
self.ident = ident
atom_registry[ident] = self
logger.debug("Atom 已注册到全局注册表,当前注册表大小: %d", len(atom_registry))
# self.is_evaled = False
self.registry: AtomRegister = { # type: ignore
"nucleon": None,
@@ -48,12 +53,16 @@ class Atom:
"orbital_fmt": "toml",
}
self.do_eval()
logger.debug("Atom 初始化完成")
def link(self, key, value):
logger.debug("Atom.link: key='%s', value type: %s", key, type(value).__name__)
if key in self.registry.keys():
self.registry[key] = value
logger.debug("'%s' 已链接,触发 do_eval", key)
self.do_eval()
else:
logger.error("尝试链接不受支持的键: '%s'", key)
raise ValueError("不受支持的原子元数据链接操作")
def do_eval(self):
@@ -61,6 +70,7 @@ class Atom:
执行并以结果替换当前单元的所有 eval 语句
TODO: 带有限制的 eval, 异步/多线程执行避免堵塞
"""
logger.debug("Atom.do_eval 开始")
# eval 环境设置
def eval_with_env(s: str):
@@ -72,8 +82,10 @@ class Atom:
ret = "尚未链接对象"
try:
ret = str(eval(s))
logger.debug("eval 执行成功: '%s' -> '%s'", s, ret[:50] + '...' if len(ret) > 50 else ret)
except Exception as e:
ret = f"此 eval 实例发生错误: {e}"
logger.warning("eval 执行错误: '%s' -> %s", s, e)
return ret
def traverse(data, modifier):
@@ -90,37 +102,52 @@ class Atom:
else:
if isinstance(data, str):
if data.startswith("eval:"):
logger.debug("发现 eval 表达式: '%s'", data[5:])
return modifier(data[5:])
return data
traverse(self.registry["nucleon"], eval_with_env)
traverse(self.registry["orbital"], eval_with_env)
logger.debug("Atom.do_eval 完成")
def persist(self, key):
logger.debug("Atom.persist: key='%s'", key)
path: pathlib.Path | None = self.registry[key + "_path"]
if isinstance(path, pathlib.Path):
path = typing.cast(pathlib.Path, path)
logger.debug("持久化路径: %s, 格式: %s", path, self.registry[key + "_fmt"])
path.parent.mkdir(parents=True, exist_ok=True)
if self.registry[key + "_fmt"] == "toml":
with open(path, "w") as f:
toml.dump(self.registry[key], f)
logger.debug("TOML 数据已保存到: %s", path)
elif self.registry[key + "_fmt"] == "json":
with open(path, "w") as f:
json.dump(self.registry[key], f)
logger.debug("JSON 数据已保存到: %s", path)
else:
logger.error("不受支持的持久化格式: %s", self.registry[key + "_fmt"])
raise KeyError("不受支持的持久化格式")
else:
logger.error("路径未初始化: %s_path", key)
raise TypeError("对未初始化的路径对象操作")
def __getitem__(self, key):
logger.debug("Atom.__getitem__: key='%s'", key)
if key in self.registry:
return self.registry[key]
value = self.registry[key]
logger.debug("返回 value type: %s", type(value).__name__)
return value
logger.error("不支持的键: '%s'", key)
raise KeyError(f"不支持的键: {key}")
def __setitem__(self, key, value):
logger.debug("Atom.__setitem__: key='%s', value type: %s", key, type(value).__name__)
if key in self.registry:
self.registry[key] = value
logger.debug("'%s' 已设置", key)
else:
logger.error("不支持的键: '%s'", key)
raise KeyError(f"不支持的键: {key}")
@staticmethod

View File

@@ -1,6 +1,9 @@
import heurams.services.timer as timer
from heurams.context import config_var
from heurams.kernel.algorithms import algorithms
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class Electron:
@@ -14,45 +17,69 @@ class Electron:
algodata: 算法数据字典, 包含算法的各项参数和设置
algo: 使用的算法模块标识
"""
logger.debug("创建 Electron 实例ident: '%s', algo_name: '%s'", ident, algo_name)
self.algodata = algodata
self.ident = ident
self.algo = algorithms[algo_name]
logger.debug("使用的算法类: %s", self.algo.__name__)
if self.algo not in self.algodata.keys():
self.algodata[self.algo] = {}
logger.debug("算法键 '%s' 不存在,已创建空字典", self.algo)
if not self.algodata[self.algo]:
logger.debug("算法数据为空,使用默认值初始化")
self._default_init(self.algo.defaults)
else:
logger.debug("算法数据已存在,跳过默认初始化")
logger.debug("Electron 初始化完成algodata keys: %s", list(self.algodata.keys()))
def _default_init(self, defaults: dict):
"""默认初始化包装"""
logger.debug("Electron._default_init: 使用默认值keys: %s", list(defaults.keys()))
self.algodata[self.algo] = defaults.copy()
def activate(self):
"""激活此电子"""
logger.debug("Electron.activate: 激活 ident='%s'", self.ident)
self.algodata[self.algo]["is_activated"] = 1
self.algodata[self.algo]["last_modify"] = timer.get_timestamp()
logger.debug("电子已激活is_activated=1")
def modify(self, var: str, value):
"""修改 algodata[algo] 中子字典数据"""
logger.debug("Electron.modify: var='%s', value=%s", var, value)
if var in self.algodata[self.algo]:
self.algodata[self.algo][var] = value
self.algodata[self.algo]["last_modify"] = timer.get_timestamp()
logger.debug("变量 '%s' 已修改,更新 last_modify", var)
else:
logger.warning("'%s' 非已知元数据字段", var)
print(f"警告: '{var}' 非已知元数据字段")
def is_due(self):
"""是否应该复习"""
return self.algo.is_due(self.algodata)
logger.debug("Electron.is_due: 检查 ident='%s'", self.ident)
result = self.algo.is_due(self.algodata)
logger.debug("is_due 结果: %s", result)
return result
def is_activated(self):
return self.algodata[self.algo]["is_activated"]
result = self.algodata[self.algo]["is_activated"]
logger.debug("Electron.is_activated: ident='%s', 结果: %d", self.ident, result)
return result
def rate(self):
"评价"
return self.algo.rate(self.algodata)
logger.debug("Electron.rate: ident='%s'", self.ident)
result = self.algo.rate(self.algodata)
logger.debug("rate 结果: %s", result)
return result
def nextdate(self) -> int:
return self.algo.nextdate(self.algodata)
logger.debug("Electron.nextdate: ident='%s'", self.ident)
result = self.algo.nextdate(self.algodata)
logger.debug("nextdate 结果: %d", result)
return result
def revisor(self, quality: int = 5, is_new_activation: bool = False):
"""算法迭代决策机制实现
@@ -61,7 +88,10 @@ class Electron:
quality (int): 记忆保留率量化参数 (0-5)
is_new_activation (bool): 是否为初次激活
"""
logger.debug("Electron.revisor: ident='%s', quality=%d, is_new_activation=%s",
self.ident, quality, is_new_activation)
self.algo.revisor(self.algodata, quality, is_new_activation)
logger.debug("revisor 完成,更新后的 algodata: %s", self.algodata.get(self.algo, {}))
def __str__(self):
return (

View File

@@ -5,12 +5,17 @@ import pathlib
import toml
import json
from copy import deepcopy
from heurams.services.logger import get_logger
logger = get_logger(__name__)
def load_nucleon(path: pathlib.Path, fmt="toml"):
logger.debug("load_nucleon: 加载文件 %s, 格式: %s", path, fmt)
with open(path, "r") as f:
dictdata = dict()
dictdata = toml.load(f) # type: ignore
logger.debug("TOML 解析成功keys: %s", list(dictdata.keys()))
lst = list()
nested_data = dict()
# 修正 toml 解析器的不管嵌套行为
@@ -24,12 +29,15 @@ def load_nucleon(path: pathlib.Path, fmt="toml"):
current[part] = {}
current = current[part]
current[parts[-1]] = value
logger.debug("处理元数据键: %s", key)
else:
nested_data[key] = value
logger.debug("嵌套数据处理完成keys: %s", list(nested_data.keys()))
# print(nested_data)
for item, attr in nested_data.items():
if item == "__metadata__":
continue
logger.debug("处理项目: %s", item)
lst.append(
(
Nucleon(
@@ -38,6 +46,7 @@ def load_nucleon(path: pathlib.Path, fmt="toml"):
deepcopy(nested_data["__metadata__"]["orbital"]),
)
)
logger.debug("load_nucleon 完成,加载了 %d 个 Nucleon 对象", len(lst))
return lst
@@ -51,10 +60,14 @@ def load_electron(path: pathlib.Path, fmt="json") -> dict:
Returns:
dict: 键名是电子对象名称, 值是电子对象
"""
logger.debug("load_electron: 加载文件 %s, 格式: %s", path, fmt)
with open(path, "r") as f:
dictdata = dict()
dictdata = json.load(f) # type: ignore
logger.debug("JSON 解析成功keys: %s", list(dictdata.keys()))
dic = dict()
for item, attr in dictdata.items():
logger.debug("处理电子项目: %s", item)
dic[item] = Electron(hasher.hash(item), attr)
logger.debug("load_electron 完成,加载了 %d 个 Electron 对象", len(dic))
return dic

View File

@@ -1,3 +1,8 @@
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class Nucleon:
"""原子核: 材料元数据"""
@@ -9,16 +14,24 @@ class Nucleon:
payload: 记忆内容信息
metadata: 可选元数据信息
"""
logger.debug("创建 Nucleon 实例ident: '%s', payload keys: %s, metadata keys: %s",
ident, list(payload.keys()) if payload else [], list(metadata.keys()) if metadata else [])
self.metadata = metadata
self.payload = payload
self.ident = ident
logger.debug("Nucleon 初始化完成")
def __getitem__(self, key):
logger.debug("Nucleon.__getitem__: key='%s'", key)
if key == "ident":
logger.debug("返回 ident: '%s'", self.ident)
return self.ident
if key in self.payload:
return self.payload[key]
value = self.payload[key]
logger.debug("返回 payload['%s'], value type: %s", key, type(value).__name__)
return value
else:
logger.error("'%s' 未在 payload 中找到", key)
raise KeyError(f"Key '{key}' not found in payload.")
def __iter__(self):
@@ -35,14 +48,17 @@ class Nucleon:
执行并以结果替换当前单元的所有 eval 语句
TODO: 带有限制的 eval, 异步/多线程执行避免堵塞
"""
logger.debug("Nucleon.do_eval 开始")
# eval 环境设置
def eval_with_env(s: str):
try:
nucleon = self
ret = str(eval(s))
logger.debug("eval 执行成功: '%s' -> '%s'", s, ret[:50] + '...' if len(ret) > 50 else ret)
except Exception as e:
ret = f"此 eval 实例发生错误: {e}"
logger.warning("eval 执行错误: '%s' -> %s", s, e)
return ret
def traverse(data, modifier):
@@ -59,13 +75,16 @@ class Nucleon:
else:
if isinstance(data, str):
if data.startswith("eval:"):
logger.debug("发现 eval 表达式: '%s'", data[5:])
return modifier(data[5:])
return data
traverse(self.payload, eval_with_env)
traverse(self.metadata, eval_with_env)
logger.debug("Nucleon.do_eval 完成")
@staticmethod
def placeholder():
"""生成一个占位原子核"""
logger.debug("创建 Nucleon 占位符")
return Nucleon("核子对象样例内容", {})

View File

@@ -1,4 +1,8 @@
from typing import TypedDict
from heurams.services.logger import get_logger
logger = get_logger(__name__)
logger.debug("Orbital 类型定义模块已加载")
class OrbitalSchedule(TypedDict):

View File

@@ -1,17 +1,26 @@
from heurams.context import config_var
import pathlib
from heurams.services.logger import get_logger
logger = get_logger(__name__)
def probe_by_filename(filename):
"""探测指定文件 (无扩展名) 的所有信息"""
logger.debug("probe_by_filename: 探测文件 '%s'", filename)
paths: dict = config_var.get().get("paths")
logger.debug("配置路径: %s", paths)
formats = ["toml", "json"]
result = {}
for item, attr in paths.items():
for i in formats:
attr: pathlib.Path = pathlib.Path(attr) / filename + "." + i
if attr.exists():
logger.debug("找到文件: %s", attr)
result[item.replace("_dir", "")] = str(attr)
else:
logger.debug("文件不存在: %s", attr)
logger.debug("probe_by_filename 结果: %s", result)
return result
@@ -24,17 +33,24 @@ def probe_all(is_stem=1):
Returns:
dict: 有三项, 每一项的键名都是文件组类型, 值都是文件组列表, 只包含文件名
"""
logger.debug("probe_all: 开始探测is_stem=%d", is_stem)
paths: dict = config_var.get().get("paths")
logger.debug("配置路径: %s", paths)
result = {}
for item, attr in paths.items():
attr: pathlib.Path = pathlib.Path(attr)
result[item.replace("_dir", "")] = list()
logger.debug("扫描目录: %s", attr)
file_count = 0
for i in attr.iterdir():
if not i.is_dir():
file_count += 1
if is_stem:
result[item.replace("_dir", "")].append(str(i.stem))
else:
result[item.replace("_dir", "")].append(str(i.name))
logger.debug("目录 %s 中找到 %d 个文件", attr, file_count)
logger.debug("probe_all 完成,结果 keys: %s", list(result.keys()))
return result

View File

@@ -4,6 +4,9 @@ Puzzle 模块 - 谜题生成系统
提供多种类型的谜题生成器,支持从字符串、字典等数据源导入题目
"""
from heurams.services.logger import get_logger
logger = get_logger(__name__)
from .base import BasePuzzle
from .cloze import ClozePuzzle
from .mcq import MCQPuzzle
@@ -38,6 +41,7 @@ def create_by_dict(config_dict: dict) -> BasePuzzle:
Raises:
ValueError: 当配置无效时抛出
"""
logger.debug("puzzles.create_by_dict: config_dict keys=%s", list(config_dict.keys()))
puzzle_type = config_dict.get("type")
if puzzle_type == "cloze":

View File

@@ -1,9 +1,16 @@
# base.py
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class BasePuzzle:
"""谜题基类"""
def refresh(self):
logger.debug("BasePuzzle.refresh 被调用(未实现)")
raise NotImplementedError("谜题对象未实现 refresh 方法")
def __str__(self):
logger.debug("BasePuzzle.__str__ 被调用")
return f"谜题: {type(self).__name__}"

View File

@@ -1,5 +1,8 @@
from .base import BasePuzzle
import random
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class ClozePuzzle(BasePuzzle):
@@ -11,20 +14,27 @@ class ClozePuzzle(BasePuzzle):
"""
def __init__(self, text: str, min_denominator: int, delimiter: str = "/"):
logger.debug("ClozePuzzle.__init__: text length=%d, min_denominator=%d, delimiter='%s'",
len(text), min_denominator, delimiter)
self.text = text
self.min_denominator = min_denominator
self.wording = "填空题 - 尚未刷新谜题"
self.answer = ["填空题 - 尚未刷新谜题"]
self.delimiter = delimiter
logger.debug("ClozePuzzle 初始化完成")
def refresh(self): # 刷新谜题
logger.debug("ClozePuzzle.refresh 开始")
placeholder = "___SLASH___"
tmp_text = self.text.replace(self.delimiter, placeholder)
words = tmp_text.split(placeholder)
if not words:
logger.warning("ClozePuzzle.refresh: 无单词可处理")
return
words = [word for word in words if word]
logger.debug("ClozePuzzle.refresh: 分割出 %d 个单词", len(words))
num_blanks = min(max(1, len(words) // self.min_denominator), len(words))
logger.debug("ClozePuzzle.refresh: 需要生成 %d 个填空", num_blanks)
indices_to_blank = random.sample(range(len(words)), num_blanks)
indices_to_blank.sort()
blanked_words = list(words)
@@ -34,6 +44,8 @@ class ClozePuzzle(BasePuzzle):
answer.append(words[index])
self.answer = answer
self.wording = "".join(blanked_words)
logger.debug("ClozePuzzle.refresh 完成,生成 %d 个填空", len(answer))
def __str__(self):
logger.debug("ClozePuzzle.__str__ 被调用")
return f"{self.wording}\n{str(self.answer)}"

View File

@@ -2,6 +2,9 @@
from .base import BasePuzzle
import random
from typing import List, Dict, Optional, Union
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class MCQPuzzle(BasePuzzle):
@@ -35,6 +38,7 @@ class MCQPuzzle(BasePuzzle):
max_riddles_num: 每次生成的最大题目数量, 范围限制在1-5之间
prefix: 题目前缀文本, 会显示在每个题目之前
"""
logger.debug("MCQPuzzle.__init__: mapping size=%d, jammer size=%d, max_riddles_num=%d", len(mapping), len(jammer), max_riddles_num)
self.prefix = prefix
self.mapping = mapping
self.max_riddles_num = max(1, min(max_riddles_num, 5))
@@ -79,6 +83,7 @@ class MCQPuzzle(BasePuzzle):
Raises:
ValueError: 当mapping为空时不会抛出异常, 但会设置空谜题状态
"""
logger.debug("MCQPuzzle.refresh 开始mapping size=%d", len(self.mapping))
if not self.mapping:
self._set_empty_puzzle()
return

View File

@@ -1,13 +1,18 @@
# mcq.py
from .base import BasePuzzle
import random
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class RecognitionPuzzle(BasePuzzle):
"""识别占位符"""
def __init__(self) -> None:
logger.debug("RecognitionPuzzle.__init__")
super().__init__()
def refresh(self):
logger.debug("RecognitionPuzzle.refresh空实现")
pass

View File

@@ -2,5 +2,10 @@ from .states import PhaserState, ProcessionState
from .procession import Procession
from .fission import Fission
from .phaser import Phaser
from heurams.services.logger import get_logger
logger = get_logger(__name__)
__all__ = ["PhaserState", "ProcessionState", "Procession", "Fission", "Phaser"]
logger.debug("反应堆模块已加载")

View File

@@ -3,12 +3,16 @@
import heurams.kernel.particles as pt
from .states import PhaserState, ProcessionState
from .procession import Procession
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class Phaser:
"""移相器: 全局调度阶段管理器"""
def __init__(self, atoms: list[pt.Atom]) -> None:
logger.debug("Phaser.__init__: 原子数量=%d", len(atoms))
new_atoms = list()
old_atoms = list()
self.state = PhaserState.UNSURE
@@ -17,22 +21,30 @@ class Phaser:
new_atoms.append(i)
else:
old_atoms.append(i)
logger.debug("新原子数量=%d, 旧原子数量=%d", len(new_atoms), len(old_atoms))
self.processions = list()
if len(old_atoms):
self.processions.append(
Procession(old_atoms, PhaserState.QUICK_REVIEW, "初始复习")
)
logger.debug("创建初始复习 Procession")
if len(new_atoms):
self.processions.append(
Procession(new_atoms, PhaserState.RECOGNITION, "新记忆")
)
logger.debug("创建新记忆 Procession")
self.processions.append(Procession(atoms, PhaserState.FINAL_REVIEW, "总体复习"))
logger.debug("创建总体复习 Procession")
logger.debug("Phaser 初始化完成processions 数量=%d", len(self.processions))
def current_procession(self):
logger.debug("Phaser.current_procession 被调用")
for i in self.processions:
i: Procession
if not i.state == ProcessionState.FINISHED:
self.state = i.phase
logger.debug("找到未完成的 Procession: phase=%s", i.phase)
return i
self.state = PhaserState.FINISHED
logger.debug("所有 Procession 已完成,状态设置为 FINISHED")
return 0

View File

@@ -1,11 +1,16 @@
import heurams.kernel.particles as pt
from .states import PhaserState, ProcessionState
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class Procession:
"""队列: 标识单次记忆流程"""
def __init__(self, atoms: list, phase: PhaserState, name: str = ""):
logger.debug("Procession.__init__: 原子数量=%d, phase=%s, name='%s'",
len(atoms), phase.value, name)
self.atoms = atoms
self.queue = atoms.copy()
self.current_atom = atoms[0]
@@ -13,36 +18,52 @@ class Procession:
self.name = name
self.phase = phase
self.state: ProcessionState = ProcessionState.RUNNING
logger.debug("Procession 初始化完成,队列长度=%d", len(self.queue))
def forward(self, step=1):
logger.debug("Procession.forward: step=%d, 当前 cursor=%d", step, self.cursor)
self.cursor += step
if self.cursor == len(self.queue):
self.state = ProcessionState.FINISHED
logger.debug("Procession 已完成")
else:
self.state = ProcessionState.RUNNING
try:
print(self.cursor)
logger.debug("cursor 更新为: %d", self.cursor)
self.current_atom = self.queue[self.cursor]
logger.debug("当前原子更新为: %s", self.current_atom.ident)
return 1 # 成功
except IndexError as e:
#print(f"{e}")
logger.debug("IndexError: %s", e)
self.state = ProcessionState.FINISHED
logger.debug("Procession 因索引错误而完成")
return 0
def append(self, atom=None):
if atom == None:
atom = self.current_atom
logger.debug("Procession.append: atom=%s", atom.ident if atom else "None")
if self.queue[len(self.queue) - 1] != atom or len(self) <= 1:
self.queue.append(atom)
logger.debug("原子已追加到队列,新队列长度=%d", len(self.queue))
else:
logger.debug("原子未追加(重复或队列长度<=1")
def __len__(self):
return len(self.queue) - self.cursor
length = len(self.queue) - self.cursor
logger.debug("Procession.__len__: 剩余长度=%d", length)
return length
def process(self):
logger.debug("Procession.process: cursor=%d", self.cursor)
return self.cursor
def total_length(self):
return len(self.queue)
total = len(self.queue)
logger.debug("Procession.total_length: %d", total)
return total
def is_empty(self):
return len(self.queue)
empty = len(self.queue)
logger.debug("Procession.is_empty: %d", empty)
return empty

View File

@@ -1,4 +1,7 @@
from enum import Enum, auto
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class PhaserState(Enum):
@@ -12,3 +15,5 @@ class PhaserState(Enum):
class ProcessionState(Enum):
RUNNING = auto()
FINISHED = auto()
logger.debug("状态枚举定义已加载")

View File

@@ -1,6 +1,9 @@
# 音频播放器, 必须基于文件操作
from . import termux_audio
from . import playsound_audio
from heurams.services.logger import get_logger
logger = get_logger(__name__)
__all__ = [
"termux_audio",
@@ -8,3 +11,4 @@ __all__ = [
]
providers = {"termux": termux_audio, "playsound": playsound_audio}
logger.debug("音频 providers 已注册: %s", list(providers.keys()))

View File

@@ -6,7 +6,16 @@
import os
import pathlib
import playsound
from heurams.services.logger import get_logger
logger = get_logger(__name__)
def play_by_path(path: pathlib.Path):
playsound.playsound(str(path))
logger.debug("playsound_audio.play_by_path: 开始播放 %s", path)
try:
playsound.playsound(str(path))
logger.debug("播放完成: %s", path)
except Exception as e:
logger.error("播放失败: %s, 错误: %s", path, e)
raise

View File

@@ -1,6 +1,12 @@
from typing import Protocol
import pathlib
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class PlayFunctionProtocol(Protocol):
def __call__(self, path: pathlib.Path) -> None: ...
logger.debug("音频协议模块已加载")

View File

@@ -5,9 +5,18 @@
import os
import pathlib
from heurams.services.logger import get_logger
logger = get_logger(__name__)
# from .protocol import PlayFunctionProtocol
def play_by_path(path: pathlib.Path):
os.system(f"play-audio {path}")
logger.debug("termux_audio.play_by_path: 开始播放 %s", path)
try:
os.system(f"play-audio {path}")
logger.debug("播放命令已执行: %s", path)
except Exception as e:
logger.error("播放失败: %s, 错误: %s", path, e)
raise

View File

@@ -1 +1,6 @@
# 大语言模型
from heurams.services.logger import get_logger
logger = get_logger(__name__)
logger.debug("LLM providers 模块已加载")

View File

@@ -0,0 +1,5 @@
from heurams.services.logger import get_logger
logger = get_logger(__name__)
logger.debug("LLM 基类模块已加载")

View File

@@ -0,0 +1,5 @@
from heurams.services.logger import get_logger
logger = get_logger(__name__)
logger.debug("OpenAI provider 模块已加载(未实现)")

View File

@@ -1,5 +1,8 @@
from .base import BaseTTS
from .edge_tts import EdgeTTS
from heurams.services.logger import get_logger
logger = get_logger(__name__)
__all__ = [
"BaseTTS",
@@ -10,3 +13,5 @@ providers = {
"basetts": BaseTTS,
"edgetts": EdgeTTS,
}
logger.debug("TTS providers 已注册: %s", list(providers.keys()))

View File

@@ -1,4 +1,7 @@
import pathlib
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class BaseTTS:
@@ -7,4 +10,6 @@ class BaseTTS:
@classmethod
def convert(cls, text: str, path: pathlib.Path | str = "") -> pathlib.Path:
"""path 是可选参数, 不填则自动返回生成文件路径"""
logger.debug("BaseTTS.convert: text length=%d, path=%s", len(text), path)
logger.warning("BaseTTS.convert 是基类方法,未实现具体功能")
return path # type: ignore

View File

@@ -1,6 +1,9 @@
from .base import BaseTTS
import pathlib
import edge_tts
from heurams.services.logger import get_logger
logger = get_logger(__name__)
class EdgeTTS(BaseTTS):
@@ -8,9 +11,16 @@ class EdgeTTS(BaseTTS):
@classmethod
def convert(cls, text, path: pathlib.Path | str = "") -> pathlib.Path:
communicate = edge_tts.Communicate(
text,
"zh-CN-YunjianNeural",
)
communicate.save_sync(str(path))
return path # type: ignore
logger.debug("EdgeTTS.convert: text length=%d, path=%s", len(text), path)
try:
communicate = edge_tts.Communicate(
text,
"zh-CN-YunjianNeural",
)
logger.debug("EdgeTTS 通信对象创建成功,正在保存音频")
communicate.save_sync(str(path))
logger.debug("EdgeTTS 音频已保存到: %s", path)
return path # type: ignore
except Exception as e:
logger.error("EdgeTTS.convert 失败: %s", e)
raise

View File

@@ -2,5 +2,9 @@
from heurams.context import config_var
from heurams.providers.audio import providers as prov
from typing import Callable
from heurams.services.logger import get_logger
logger = get_logger(__name__)
play_by_path: Callable = prov[config_var.get()["services"]["audio"]].play_by_path
logger.debug("音频服务初始化完成,使用 provider: %s", config_var.get()["services"]["audio"])

View File

@@ -1,10 +1,19 @@
# 哈希服务
import hashlib
from heurams.services.logger import get_logger
logger = get_logger(__name__)
def get_md5(text):
return hashlib.md5(text.encode("utf-8")).hexdigest()
logger.debug("计算MD5哈希输入长度: %d", len(text))
result = hashlib.md5(text.encode("utf-8")).hexdigest()
logger.debug("MD5哈希结果: %s...", result[:8])
return result
def hash(text):
return hashlib.md5(text.encode("utf-8")).hexdigest()
logger.debug("计算哈希,输入长度: %d", len(text))
result = hashlib.md5(text.encode("utf-8")).hexdigest()
logger.debug("哈希结果: %s...", result[:8])
return result

View File

@@ -1,15 +1,21 @@
# 时间服务
from heurams.context import config_var
import time
from heurams.services.logger import get_logger
logger = get_logger(__name__)
def get_daystamp() -> int:
"""获取当前日戳(以天为单位的整数时间戳)"""
time_override = config_var.get().get("daystamp_override", -1)
if time_override != -1:
logger.debug("使用覆盖的日戳: %d", time_override)
return int(time_override)
return int((time.time() + config_var.get().get("timezone_offset")) // (24 * 3600))
result = int((time.time() + config_var.get().get("timezone_offset")) // (24 * 3600))
logger.debug("计算日戳: %d", result)
return result
def get_timestamp() -> float:
@@ -17,6 +23,9 @@ def get_timestamp() -> float:
# 搞这个类的原因是要支持可复现操作
time_override = config_var.get().get("timestamp_override", -1)
if time_override != -1:
logger.debug("使用覆盖的时间戳: %f", time_override)
return float(time_override)
return time.time()
result = time.time()
logger.debug("获取当前时间戳: %f", result)
return result

View File

@@ -2,5 +2,9 @@
from heurams.context import config_var
from heurams.providers.tts import TTSs
from typing import Callable
from heurams.services.logger import get_logger
logger = get_logger(__name__)
convert: Callable = TTSs[config_var.get().get("tts_provider")]
logger.debug("TTS服务初始化完成使用 provider: %s", config_var.get().get("tts_provider"))

View File

@@ -1,5 +1,10 @@
# 版本控制集成服务
from heurams.services.logger import get_logger
logger = get_logger(__name__)
ver = "0.4.0"
stage = "prototype"
codename = "fledge" # 雏鸟, 0.4.x 版本
logger.info("HeurAMS 版本: %s (%s), 阶段: %s", ver, codename, stage)

View File

@@ -1,175 +0,0 @@
# HeurAMS Test Suite
This directory contains comprehensive unit tests and examples for the Heuristic Assisted Memory Scheduler (HeurAMS) system.
## Test Structure
### Unit Tests
- **`test_particles.py`** - Tests for core particle modules:
- `Atom` - Data container management
- `Electron` - Memory algorithm metadata and SM-2 implementation
- `Nucleon` - Content data management
- `Orbital` - Learning strategy configuration
- `Probe` - File detection and cloze deletion scanning
- `Loader` - Data loading and saving
- **`test_algorithms.py`** - Tests for algorithm modules:
- `BaseAlgorithm` - Abstract algorithm base class
- `SM2Algorithm` - SuperMemo-2 interval repetition algorithm
- **`test_puzzles.py`** - Tests for puzzle generation modules:
- `BasePuzzle` - Abstract puzzle base class
- `ClozePuzzle` - Cloze deletion puzzle generator
- `MCQPuzzle` - Multiple choice question generator
- **`test_reactor.py`** - Tests for reactor system modules:
- `Phaser` - Global scheduling state management
- `Procession` - Memory process queue management
- `Fission` - Single atom scheduling and puzzle generation
- `States` - State enumeration definitions
- **`test_services.py`** - Tests for service modules:
- `Config` - Configuration management
- `Hasher` - Hash computation utilities
- `Timer` - Time services with override capability
- `Version` - Version information management
- `AudioService` - Audio feedback service
- `TTSService` - Text-to-speech service
### Examples
- **`examples.py`** - Comprehensive usage examples demonstrating:
- Basic atom creation and management
- Algorithm usage and review processing
- Puzzle generation for different content types
- Reactor system workflows
- Service integration patterns
- File operations and data persistence
### Test Utilities
- **`conftest.py`** - Pytest configuration and fixtures:
- `temp_config_file` - Temporary configuration file
- `sample_atom_data` - Sample atom data for testing
- `sample_markdown_content` - Sample markdown with cloze deletions
- **`run_tests.py`** - Test runner with flexible options
## Running Tests
### Run All Tests
```bash
python tests/run_tests.py
# or
python -m pytest tests/
```
### Run Specific Tests
```bash
# Run specific test file
python tests/run_tests.py --file test_particles.py
# Run specific test class
python tests/run_tests.py --file test_particles.py --class TestAtom
# Run specific test method
python tests/run_tests.py --file test_particles.py --class TestAtom --method test_atom_creation
```
### Run Examples
```bash
python tests/run_tests.py --examples
```
### Using Pytest Directly
```bash
# Run all tests with coverage
pytest tests/ --cov=src.heurams --cov-report=html
# Run tests with specific markers
pytest tests/ -m "not slow"
# Run tests with verbose output
pytest tests/ -v
```
## Test Coverage
The test suite provides comprehensive coverage for:
- **Core Data Structures**: Atom, Electron, Nucleon, Orbital
- **Algorithms**: SM-2 interval repetition implementation
- **Puzzle Generation**: Cloze deletions and multiple choice questions
- **State Management**: Reactor system state transitions
- **Configuration**: Settings management and validation
- **Utilities**: Hashing, timing, and file operations
## Key Test Scenarios
### Algorithm Testing
- SM-2 interval calculation in learning phase
- Ease factor adjustments based on review quality
- Repetition counting and reset logic
- Boundary conditions and edge cases
### Puzzle Generation
- Cloze deletion detection and processing
- Multiple choice question generation with distractors
- Content type detection and appropriate puzzle selection
### Reactor System
- State transitions (IDLE → LEARNING → REVIEW → FINISHED)
- Procession queue management
- Fission workflow for single atom processing
### Service Integration
- Configuration loading and validation
- Time service with override capability
- Hash consistency and file operations
## Fixtures and Test Data
The test suite includes reusable fixtures for:
- Temporary configuration files
- Sample atom data structures
- Test markdown content with cloze deletions
- Mock time values for testing scheduling
## Example Usage Patterns
The `examples.py` file demonstrates common usage patterns:
1. **Basic Atom Creation** - Creating simple question-answer pairs
2. **Cloze Content** - Working with cloze deletion content
3. **Algorithm Integration** - Processing reviews with SM-2
4. **Puzzle Generation** - Creating different puzzle types
5. **Workflow Management** - Using the reactor system
6. **Configuration** - Customizing learning parameters
7. **Data Persistence** - Saving and loading atom collections
## Test Dependencies
- `pytest` - Test framework
- `pytest-cov` - Coverage reporting (optional)
- Standard Python libraries only
## Adding New Tests
When adding new tests:
1. Follow the existing naming conventions
2. Use the provided fixtures when appropriate
3. Include both positive and negative test cases
4. Test boundary conditions and edge cases
5. Ensure tests are independent and repeatable
## Continuous Integration
The test suite is designed to run in CI environments and provides:
- Fast execution (most tests complete in seconds)
- No external dependencies
- Clear failure reporting
- Comprehensive coverage of core functionality

View File

@@ -1,5 +0,0 @@
"""
HeurAMS Test Suite
Unit tests and examples for the Heuristic Assisted Memory Scheduler system.
"""

View File

@@ -1,58 +0,0 @@
"""
Test configuration and fixtures for HeurAMS tests.
"""
import pytest
import tempfile
import os
from pathlib import Path
@pytest.fixture
def temp_config_file():
"""Create a temporary config file for testing."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
f.write(
"""{
"algorithm": "sm2",
"default_ease": 2.5,
"learning_steps": [1, 10],
"graduating_interval": 1,
"easy_interval": 4
}"""
)
temp_path = f.name
yield temp_path
# Cleanup
if os.path.exists(temp_path):
os.unlink(temp_path)
@pytest.fixture
def sample_atom_data():
"""Sample atom data for testing."""
return {
"nucleon": {"content": "What is the capital of France?", "answer": "Paris"},
"electron": {"ease": 2.5, "interval": 1, "repetitions": 0, "last_review": None},
"orbital": {
"learning_steps": [1, 10],
"graduating_interval": 1,
"easy_interval": 4,
},
}
@pytest.fixture
def sample_markdown_content():
"""Sample markdown content for testing."""
return """
# Test Document
This is a test document with some {{c1::cloze}} deletions.
Here's another {{c2::cloze deletion}} for testing.
What is the capital of {{c3::France}}?
"""

View File

@@ -1,217 +0,0 @@
"""
Examples and usage patterns for HeurAMS modules.
This file demonstrates how to use the various HeurAMS components
in common scenarios and workflows.
"""
import json
from datetime import datetime, timezone
from pathlib import Path
# Import only modules that work without configuration dependencies
from src.heurams.kernel.particles.atom import Atom
from src.heurams.kernel.particles.electron import Electron
from src.heurams.kernel.particles.nucleon import Nucleon
from src.heurams.kernel.particles.orbital import Orbital
from src.heurams.kernel.algorithms.sm2 import SM2Algorithm
class BasicUsageExamples:
"""Examples of basic usage patterns."""
@staticmethod
def create_basic_atom():
"""
Example: Create a basic Atom with question and answer.
"""
print("=== Creating Basic Atom ===")
# Create the components
nucleon = Nucleon(content="What is the capital of France?", answer="Paris")
electron = Electron() # Uses default values
orbital = Orbital() # Uses default values
# Combine into an Atom
atom = Atom(nucleon=nucleon, electron=electron, orbital=orbital)
print(f"Atom created:")
print(f" Question: {atom.nucleon.content}")
print(f" Answer: {atom.nucleon.answer}")
print(f" Current ease: {atom.electron.ease}")
print(f" Current interval: {atom.electron.interval} days")
return atom
@staticmethod
def create_cloze_atom():
"""
Example: Create an Atom with cloze deletion content.
"""
print("\n=== Creating Cloze Atom ===")
nucleon = Nucleon(
content="The {{c1::capital}} of {{c2::France}} is {{c3::Paris}}.",
answer="capital, France, Paris",
)
electron = Electron()
orbital = Orbital()
atom = Atom(nucleon=nucleon, electron=electron, orbital=orbital)
print(f"Cloze Atom created:")
print(f" Content: {atom.nucleon.content}")
print(f" Answer: {atom.nucleon.answer}")
return atom
class AlgorithmExamples:
"""Examples of algorithm usage."""
@staticmethod
def sm2_review_example():
"""
Example: Process a review using SM2 algorithm.
"""
print("\n=== SM2 Review Example ===")
# Create an atom in learning phase
nucleon = Nucleon(content="Test question", answer="Test answer")
electron = Electron(repetitions=0, interval=1, ease=2.5)
orbital = Orbital()
atom = Atom(nucleon=nucleon, electron=electron, orbital=orbital)
# Create algorithm
algorithm = SM2Algorithm()
print("Before review:")
print(f" Repetitions: {atom.electron.repetitions}")
print(f" Interval: {atom.electron.interval} days")
print(f" Ease: {atom.electron.ease}")
# Process a successful review (quality 4)
new_electron = algorithm.process_review(atom.electron, atom.orbital, 4)
print("\nAfter review (quality 4):")
print(f" Repetitions: {new_electron.repetitions}")
print(f" Interval: {new_electron.interval} days")
print(f" Ease: {new_electron.ease:.2f}")
return new_electron
@staticmethod
def sm2_failed_review_example():
"""
Example: Process a failed review using SM2 algorithm.
"""
print("\n=== SM2 Failed Review Example ===")
# Create an atom in review phase
nucleon = Nucleon(content="Hard question", answer="Hard answer")
electron = Electron(repetitions=5, interval=10, ease=2.5)
orbital = Orbital()
atom = Atom(nucleon=nucleon, electron=electron, orbital=orbital)
algorithm = SM2Algorithm()
print("Before review:")
print(f" Repetitions: {atom.electron.repetitions}")
print(f" Interval: {atom.electron.interval} days")
# Process a failed review (quality 1)
new_electron = algorithm.process_review(atom.electron, atom.orbital, 1)
print("\nAfter review (quality 1 - failed):")
print(f" Repetitions: {new_electron.repetitions}") # Should reset to 0
print(f" Interval: {new_electron.interval} days") # Should reset to 1
return new_electron
class ReactorExamples:
"""Examples of reactor system usage."""
@staticmethod
def basic_atom_workflow():
"""
Example: Basic Atom workflow.
"""
print("\n=== Basic Atom Workflow ===")
# Create an atom
atom = Atom("test_atom")
# Create nucleon with content
nucleon = Nucleon(
"nucleon_id",
{"content": "What is the capital of Germany?", "answer": "Berlin"},
)
# Create electron with algorithm data
electron = Electron("electron_id")
# Create orbital configuration
orbital = Orbital(
quick_view=[["cloze", 1]], recognition=[], final_review=[], puzzle_config={}
)
# Link components to atom
atom.link("nucleon", nucleon)
atom.link("electron", electron)
atom.link("orbital", orbital)
print(f"Atom created with ID: {atom.ident}")
print(f"Nucleon content: {atom['nucleon']['content']}")
print(f"Electron algorithm: {electron.algo}")
return atom
class ServiceExamples:
"""Examples of service usage."""
@staticmethod
def version_example():
"""
Example: Using Version service.
"""
print("\n=== Version Service Example ===")
from src.heurams.services.version import ver, stage
print(f"HeurAMS Version: {ver}")
print(f"Development Stage: {stage}")
return ver, stage
def run_all_examples():
"""
Run all examples to demonstrate HeurAMS functionality.
"""
print("=" * 50)
print("HEURAMS EXAMPLES")
print("=" * 50)
# Basic usage
BasicUsageExamples.create_basic_atom()
BasicUsageExamples.create_cloze_atom()
# Algorithm examples
AlgorithmExamples.sm2_review_example()
AlgorithmExamples.sm2_failed_review_example()
# Reactor examples
ReactorExamples.basic_atom_workflow()
# Service examples
ServiceExamples.version_example()
print("\n" + "=" * 50)
print("ALL EXAMPLES COMPLETED")
print("=" * 50)
if __name__ == "__main__":
run_all_examples()

View File

@@ -1,136 +0,0 @@
"""
Test runner script for HeurAMS.
This script runs all unit tests and provides a summary report.
"""
import sys
import pytest
import os
def run_tests():
"""
Run all unit tests and return the result.
"""
print("=" * 60)
print("HEURAMS TEST SUITE")
print("=" * 60)
# Add the src directory to Python path
src_dir = os.path.join(os.path.dirname(__file__), "..", "src")
sys.path.insert(0, src_dir)
# Run tests with verbose output
test_args = [
"-v", # Verbose output
"--tb=short", # Short traceback format
"--color=yes", # Color output
"tests/", # Test directory
]
print(f"Running tests from: {os.path.abspath('tests')}")
print(f"Python path includes: {src_dir}")
print()
# Run pytest
exit_code = pytest.main(test_args)
print("=" * 60)
if exit_code == 0:
print("✅ ALL TESTS PASSED")
else:
print("❌ SOME TESTS FAILED")
print("=" * 60)
return exit_code
def run_specific_test(test_file=None, test_class=None, test_method=None):
"""
Run specific tests.
Args:
test_file: Specific test file to run (e.g., "test_particles.py")
test_class: Specific test class to run (e.g., "TestAtom")
test_method: Specific test method to run (e.g., "test_atom_creation")
"""
# Add the src directory to Python path
src_dir = os.path.join(os.path.dirname(__file__), "..", "src")
sys.path.insert(0, src_dir)
test_args = [
"-v", # Verbose output
"--tb=short", # Short traceback format
"--color=yes", # Color output
]
# Build test path
test_path = "tests/"
if test_file:
test_path = f"tests/{test_file}"
if test_class:
test_path += f"::{test_class}"
if test_method:
test_path += f"::{test_method}"
test_args.append(test_path)
print(f"Running specific test: {test_path}")
print()
exit_code = pytest.main(test_args)
return exit_code
def run_examples():
"""
Run the examples to demonstrate functionality.
"""
# Add the src directory to Python path
src_dir = os.path.join(os.path.dirname(__file__), "..", "src")
sys.path.insert(0, src_dir)
try:
from tests.examples import run_all_examples
run_all_examples()
return 0
except Exception as e:
print(f"Error running examples: {e}")
return 1
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="HeurAMS Test Runner")
parser.add_argument("--all", action="store_true", help="Run all tests (default)")
parser.add_argument(
"--file", type=str, help="Run specific test file (e.g., test_particles.py)"
)
parser.add_argument(
"--class",
dest="test_class",
type=str,
help="Run specific test class (requires --file)",
)
parser.add_argument(
"--method", type=str, help="Run specific test method (requires --class)"
)
parser.add_argument(
"--examples", action="store_true", help="Run examples instead of tests"
)
args = parser.parse_args()
if args.examples:
exit_code = run_examples()
elif args.file:
exit_code = run_specific_test(
test_file=args.file, test_class=args.test_class, test_method=args.method
)
else:
exit_code = run_tests()
sys.exit(exit_code)

View File

@@ -1,207 +0,0 @@
"""
Unit tests for algorithm modules: BaseAlgorithm, SM2Algorithm
"""
import pytest
from datetime import datetime, timezone
from src.heurams.kernel.algorithms.base import BaseAlgorithm
from src.heurams.kernel.algorithms.sm2 import SM2Algorithm
from src.heurams.kernel.particles.electron import Electron
from src.heurams.kernel.particles.orbital import Orbital
class TestBaseAlgorithm:
"""Test cases for BaseAlgorithm class."""
def test_base_algorithm_abstract_methods(self):
"""Test that BaseAlgorithm cannot be instantiated directly."""
with pytest.raises(TypeError):
BaseAlgorithm()
class TestSM2Algorithm:
"""Test cases for SM2Algorithm class."""
def test_sm2_algorithm_creation(self):
"""Test SM2Algorithm creation."""
algorithm = SM2Algorithm()
assert algorithm.name == "sm2"
assert algorithm.version == "1.0"
def test_sm2_calculate_interval_learning_phase(self):
"""Test interval calculation in learning phase."""
algorithm = SM2Algorithm()
electron = Electron(repetitions=0)
orbital = Orbital(learning_steps=[1, 10])
interval = algorithm.calculate_interval(electron, orbital, quality=3)
assert interval == 1 # First learning step
def test_sm2_calculate_interval_graduation(self):
"""Test interval calculation when graduating."""
algorithm = SM2Algorithm()
electron = Electron(repetitions=1)
orbital = Orbital(learning_steps=[1, 10], graduating_interval=1)
interval = algorithm.calculate_interval(electron, orbital, quality=4)
assert interval == 1 # Graduating interval
def test_sm2_calculate_interval_review_phase(self):
"""Test interval calculation in review phase."""
algorithm = SM2Algorithm()
electron = Electron(ease=2.5, interval=10, repetitions=5)
orbital = Orbital()
interval = algorithm.calculate_interval(electron, orbital, quality=4)
# Should be: 10 * 2.5 = 25
assert interval == 25
def test_sm2_calculate_ease_increase(self):
"""Test ease calculation with increase."""
algorithm = SM2Algorithm()
electron = Electron(ease=2.5)
new_ease = algorithm.calculate_ease(electron, quality=5)
# Should be: 2.5 + 0.1 - 0.08 + 0.02 = 2.54
assert new_ease == pytest.approx(2.54)
def test_sm2_calculate_ease_decrease(self):
"""Test ease calculation with decrease."""
algorithm = SM2Algorithm()
electron = Electron(ease=2.5)
new_ease = algorithm.calculate_ease(electron, quality=2)
# Should be: 2.5 + 0.1 - 0.16 + 0.02 = 2.46
assert new_ease == pytest.approx(2.46)
def test_sm2_calculate_ease_minimum(self):
"""Test ease calculation with minimum bound."""
algorithm = SM2Algorithm()
electron = Electron(ease=1.3) # Very low ease
new_ease = algorithm.calculate_ease(electron, quality=0)
# Should be clamped to minimum 1.3
assert new_ease == 1.3
def test_sm2_calculate_repetitions_reset(self):
"""Test repetition calculation with reset."""
algorithm = SM2Algorithm()
electron = Electron(repetitions=5)
new_repetitions = algorithm.calculate_repetitions(electron, quality=1)
assert new_repetitions == 0 # Reset on failure
def test_sm2_calculate_repetitions_increment(self):
"""Test repetition calculation with increment."""
algorithm = SM2Algorithm()
electron = Electron(repetitions=2)
new_repetitions = algorithm.calculate_repetitions(electron, quality=3)
assert new_repetitions == 3 # Increment on success
def test_sm2_process_review_quality_1(self):
"""Test complete review process with quality 1."""
algorithm = SM2Algorithm()
electron = Electron(ease=2.5, interval=10, repetitions=5)
orbital = Orbital()
new_electron = algorithm.process_review(electron, orbital, 1)
assert new_electron.repetitions == 0
assert new_electron.interval == 1
assert new_electron.ease == 2.5
def test_sm2_process_review_quality_3(self):
"""Test complete review process with quality 3."""
algorithm = SM2Algorithm()
electron = Electron(ease=2.5, interval=1, repetitions=0)
orbital = Orbital(learning_steps=[1, 10])
new_electron = algorithm.process_review(electron, orbital, 3)
assert new_electron.repetitions == 1
assert new_electron.interval == 1
assert new_electron.ease == pytest.approx(2.54)
def test_sm2_process_review_quality_5(self):
"""Test complete review process with quality 5."""
algorithm = SM2Algorithm()
electron = Electron(ease=2.5, interval=10, repetitions=5)
orbital = Orbital()
new_electron = algorithm.process_review(electron, orbital, 5)
assert new_electron.repetitions == 6
assert new_electron.interval == 25 # 10 * 2.5
assert new_electron.ease == pytest.approx(2.54)
def test_sm2_get_next_review_date(self):
"""Test next review date calculation."""
algorithm = SM2Algorithm()
electron = Electron(interval=5)
# Mock current time
current_time = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
next_review = algorithm.get_next_review_date(electron, current_time)
expected_date = datetime(2024, 1, 6, 12, 0, 0, tzinfo=timezone.utc)
assert next_review == expected_date
def test_sm2_get_next_review_date_no_interval(self):
"""Test next review date with zero interval."""
algorithm = SM2Algorithm()
electron = Electron(interval=0)
current_time = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
next_review = algorithm.get_next_review_date(electron, current_time)
assert next_review == current_time
def test_sm2_algorithm_boundary_conditions(self):
"""Test boundary conditions for SM2 algorithm."""
algorithm = SM2Algorithm()
# Test with minimum ease
electron = Electron(ease=1.3)
orbital = Orbital()
new_electron = algorithm.process_review(electron, orbital, 0)
assert new_electron.ease == 1.3 # Should not go below minimum
# Test with maximum repetitions
electron = Electron(repetitions=100)
new_electron = algorithm.process_review(electron, orbital, 4)
assert new_electron.repetitions == 101 # Should continue incrementing
def test_sm2_algorithm_validation(self):
"""Test input validation for SM2 algorithm."""
algorithm = SM2Algorithm()
electron = Electron()
orbital = Orbital()
# Test invalid quality values
with pytest.raises(ValueError):
algorithm.process_review(electron, orbital, -1)
with pytest.raises(ValueError):
algorithm.process_review(electron, orbital, 6)
# Test with None electron
with pytest.raises(TypeError):
algorithm.process_review(None, orbital, 3)
# Test with None orbital
with pytest.raises(TypeError):
algorithm.process_review(electron, None, 3)

View File

@@ -1,203 +0,0 @@
"""
Unit tests for particle modules: Atom, Electron, Nucleon, Orbital, Probe, Loader
"""
import pytest
import json
from pathlib import Path
from datetime import datetime, timezone
from src.heurams.kernel.particles.atom import Atom
from src.heurams.kernel.particles.electron import Electron
from src.heurams.kernel.particles.nucleon import Nucleon
from src.heurams.kernel.particles.orbital import Orbital
# Probe module doesn't have a Probe class, only functions
# Loader module doesn't have a Loader class, only functions
class TestAtom:
"""Test cases for Atom class."""
def test_atom_creation(self):
"""Test basic Atom creation."""
nucleon = Nucleon(content="Test content", answer="Test answer")
electron = Electron()
orbital = Orbital()
atom = Atom(nucleon=nucleon, electron=electron, orbital=orbital)
assert atom.nucleon == nucleon
assert atom.electron == electron
assert atom.orbital == orbital
def test_atom_from_dict(self):
"""Test creating Atom from dictionary."""
data = {
"nucleon": {"content": "What is 2+2?", "answer": "4"},
"electron": {
"ease": 2.5,
"interval": 1,
"repetitions": 0,
"last_review": None,
},
"orbital": {
"learning_steps": [1, 10],
"graduating_interval": 1,
"easy_interval": 4,
},
}
atom = Atom.from_dict(data)
assert atom.nucleon.content == "What is 2+2?"
assert atom.nucleon.answer == "4"
assert atom.electron.ease == 2.5
assert atom.electron.interval == 1
assert atom.orbital.learning_steps == [1, 10]
def test_atom_to_dict(self):
"""Test converting Atom to dictionary."""
nucleon = Nucleon(content="Test", answer="Answer")
electron = Electron()
orbital = Orbital()
atom = Atom(nucleon=nucleon, electron=electron, orbital=orbital)
result = atom.to_dict()
assert "nucleon" in result
assert "electron" in result
assert "orbital" in result
assert result["nucleon"]["content"] == "Test"
class TestElectron:
"""Test cases for Electron class."""
def test_electron_default_values(self):
"""Test Electron default initialization."""
electron = Electron()
assert electron.ease == 2.5
assert electron.interval == 1
assert electron.repetitions == 0
assert electron.last_review is None
def test_electron_custom_values(self):
"""Test Electron with custom values."""
test_time = datetime.now(timezone.utc)
electron = Electron(ease=3.0, interval=10, repetitions=5, last_review=test_time)
assert electron.ease == 3.0
assert electron.interval == 10
assert electron.repetitions == 5
assert electron.last_review == test_time
def test_electron_review_quality_1(self):
"""Test review with quality 1 (failed)."""
electron = Electron(ease=2.5, interval=10, repetitions=5)
orbital = Orbital()
new_electron = electron.review(1, orbital)
assert new_electron.repetitions == 0
assert new_electron.interval == 1
assert new_electron.ease == 2.5
def test_electron_review_quality_3(self):
"""Test review with quality 3 (good)."""
electron = Electron(ease=2.5, interval=1, repetitions=0)
orbital = Orbital()
new_electron = electron.review(3, orbital)
assert new_electron.repetitions == 1
assert new_electron.interval == 1
def test_electron_review_quality_5(self):
"""Test review with quality 5 (excellent)."""
electron = Electron(ease=2.5, interval=10, repetitions=5)
orbital = Orbital()
new_electron = electron.review(5, orbital)
assert new_electron.repetitions == 6
assert new_electron.interval > 10 # Should increase interval
assert new_electron.ease > 2.5 # Should increase ease
class TestNucleon:
"""Test cases for Nucleon class."""
def test_nucleon_creation(self):
"""Test basic Nucleon creation."""
nucleon = Nucleon(content="Test content", answer="Test answer")
assert nucleon.content == "Test content"
assert nucleon.answer == "Test answer"
def test_nucleon_from_dict(self):
"""Test creating Nucleon from dictionary."""
data = {"content": "What is Python?", "answer": "A programming language"}
nucleon = Nucleon.from_dict(data)
assert nucleon.content == "What is Python?"
assert nucleon.answer == "A programming language"
def test_nucleon_to_dict(self):
"""Test converting Nucleon to dictionary."""
nucleon = Nucleon(content="Test", answer="Answer")
result = nucleon.to_dict()
assert result["content"] == "Test"
assert result["answer"] == "Answer"
class TestOrbital:
"""Test cases for Orbital class."""
def test_orbital_default_values(self):
"""Test Orbital default initialization."""
orbital = Orbital()
assert orbital.learning_steps == [1, 10]
assert orbital.graduating_interval == 1
assert orbital.easy_interval == 4
def test_orbital_custom_values(self):
"""Test Orbital with custom values."""
orbital = Orbital(
learning_steps=[2, 15], graduating_interval=2, easy_interval=6
)
assert orbital.learning_steps == [2, 15]
assert orbital.graduating_interval == 2
assert orbital.easy_interval == 6
def test_orbital_from_dict(self):
"""Test creating Orbital from dictionary."""
data = {"learning_steps": [3, 20], "graduating_interval": 3, "easy_interval": 8}
orbital = Orbital.from_dict(data)
assert orbital.learning_steps == [3, 20]
assert orbital.graduating_interval == 3
assert orbital.easy_interval == 8
def test_orbital_to_dict(self):
"""Test converting Orbital to dictionary."""
orbital = Orbital()
result = orbital.to_dict()
assert "learning_steps" in result
assert "graduating_interval" in result
assert "easy_interval" in result
# TestProbe class removed - probe module only has functions, not a class
# TestLoader class removed - loader module only has functions, not a class

View File

@@ -1,24 +0,0 @@
"""
Unit tests for puzzle modules: BasePuzzle, ClozePuzzle, MCQPuzzle
"""
import pytest
import re
# Puzzle imports commented out due to import issues
# from src.heurams.kernel.puzzles.base import BasePuzzle
# from src.heurams.kernel.puzzles.cloze import ClozePuzzle
# from src.heurams.kernel.puzzles.mcq import MCQPuzzle
from src.heurams.kernel.particles.nucleon import Nucleon
class TestBasePuzzle:
"""Test cases for BasePuzzle class."""
def test_base_puzzle_abstract_methods(self):
"""Test that BasePuzzle cannot be instantiated directly."""
# Skip this test since imports are broken
pass
# ClozePuzzle and MCQPuzzle tests skipped due to import issues

View File

@@ -1,408 +0,0 @@
"""
Unit tests for reactor modules: Phaser, Procession, Fission, States
"""
import pytest
from datetime import datetime, timezone
from enum import Enum
from src.heurams.kernel.reactor.phaser import Phaser
from src.heurams.kernel.reactor.procession import Procession
from src.heurams.kernel.reactor.fission import Fission
from src.heurams.kernel.reactor.states import States
from src.heurams.kernel.particles.atom import Atom
from src.heurams.kernel.particles.nucleon import Nucleon
from src.heurams.kernel.particles.electron import Electron
from src.heurams.kernel.particles.orbital import Orbital
class TestStates:
"""Test cases for States enum."""
def test_states_enum_values(self):
"""Test that States enum has correct values."""
assert States.IDLE.value == "idle"
assert States.LEARNING.value == "learning"
assert States.REVIEW.value == "review"
assert States.FINISHED.value == "finished"
def test_states_enum_membership(self):
"""Test States enum membership."""
assert isinstance(States.IDLE, Enum)
assert States.LEARNING in States
assert States.REVIEW in States
assert States.FINISHED in States
class TestPhaser:
"""Test cases for Phaser class."""
def test_phaser_creation(self):
"""Test Phaser creation."""
phaser = Phaser()
assert phaser.current_state == States.IDLE
assert phaser.atom is None
assert phaser.puzzle is None
def test_phaser_initialize(self):
"""Test Phaser initialization with atom."""
phaser = Phaser()
nucleon = Nucleon(content="Test", answer="Answer")
electron = Electron()
orbital = Orbital()
atom = Atom(nucleon=nucleon, electron=electron, orbital=orbital)
phaser.initialize(atom)
assert phaser.atom == atom
assert phaser.current_state == States.LEARNING
def test_phaser_transition_to_review(self):
"""Test transition to review state."""
phaser = Phaser()
nucleon = Nucleon(content="Test", answer="Answer")
electron = Electron()
orbital = Orbital()
atom = Atom(nucleon=nucleon, electron=electron, orbital=orbital)
phaser.initialize(atom)
phaser.transition_to_review()
assert phaser.current_state == States.REVIEW
def test_phaser_transition_to_finished(self):
"""Test transition to finished state."""
phaser = Phaser()
nucleon = Nucleon(content="Test", answer="Answer")
electron = Electron()
orbital = Orbital()
atom = Atom(nucleon=nucleon, electron=electron, orbital=orbital)
phaser.initialize(atom)
phaser.transition_to_finished()
assert phaser.current_state == States.FINISHED
def test_phaser_reset(self):
"""Test Phaser reset."""
phaser = Phaser()
nucleon = Nucleon(content="Test", answer="Answer")
electron = Electron()
orbital = Orbital()
atom = Atom(nucleon=nucleon, electron=electron, orbital=orbital)
phaser.initialize(atom)
phaser.transition_to_review()
phaser.reset()
assert phaser.current_state == States.IDLE
assert phaser.atom is None
assert phaser.puzzle is None
def test_phaser_set_puzzle(self):
"""Test setting puzzle in Phaser."""
phaser = Phaser()
test_puzzle = {"question": "Test?", "answer": "Test", "type": "test"}
phaser.set_puzzle(test_puzzle)
assert phaser.puzzle == test_puzzle
def test_phaser_validation(self):
"""Test input validation for Phaser."""
phaser = Phaser()
# Test initialize with None
with pytest.raises(TypeError):
phaser.initialize(None)
# Test initialize with invalid type
with pytest.raises(TypeError):
phaser.initialize("not an atom")
class TestProcession:
"""Test cases for Procession class."""
def test_procession_creation(self):
"""Test Procession creation."""
procession = Procession()
assert procession.queue == []
assert procession.current_index == 0
def test_procession_add_atom(self):
"""Test adding atom to procession."""
procession = Procession()
nucleon = Nucleon(content="Test", answer="Answer")
electron = Electron()
orbital = Orbital()
atom = Atom(nucleon=nucleon, electron=electron, orbital=orbital)
procession.add_atom(atom)
assert len(procession.queue) == 1
assert procession.queue[0] == atom
def test_procession_add_multiple_atoms(self):
"""Test adding multiple atoms to procession."""
procession = Procession()
atoms = []
for i in range(3):
nucleon = Nucleon(content=f"Test{i}", answer=f"Answer{i}")
electron = Electron()
orbital = Orbital()
atom = Atom(nucleon=nucleon, electron=electron, orbital=orbital)
atoms.append(atom)
for atom in atoms:
procession.add_atom(atom)
assert len(procession.queue) == 3
assert procession.queue == atoms
def test_procession_get_current_atom(self):
"""Test getting current atom."""
procession = Procession()
nucleon = Nucleon(content="Test", answer="Answer")
electron = Electron()
orbital = Orbital()
atom = Atom(nucleon=nucleon, electron=electron, orbital=orbital)
procession.add_atom(atom)
current_atom = procession.get_current_atom()
assert current_atom == atom
def test_procession_get_current_atom_empty(self):
"""Test getting current atom from empty procession."""
procession = Procession()
current_atom = procession.get_current_atom()
assert current_atom is None
def test_procession_move_next(self):
"""Test moving to next atom."""
procession = Procession()
atoms = []
for i in range(3):
nucleon = Nucleon(content=f"Test{i}", answer=f"Answer{i}")
electron = Electron()
orbital = Orbital()
atom = Atom(nucleon=nucleon, electron=electron, orbital=orbital)
atoms.append(atom)
procession.add_atom(atom)
# Start at first atom
assert procession.get_current_atom() == atoms[0]
assert procession.current_index == 0
# Move to next
procession.move_next()
assert procession.get_current_atom() == atoms[1]
assert procession.current_index == 1
# Move to next again
procession.move_next()
assert procession.get_current_atom() == atoms[2]
assert procession.current_index == 2
# Move beyond end
procession.move_next()
assert procession.get_current_atom() is None
assert procession.current_index == 3
def test_procession_has_next(self):
"""Test checking if there are more atoms."""
procession = Procession()
nucleon = Nucleon(content="Test", answer="Answer")
electron = Electron()
orbital = Orbital()
atom = Atom(nucleon=nucleon, electron=electron, orbital=orbital)
procession.add_atom(atom)
# Initially has next (current is first, can move to next)
assert procession.has_next() is True
# Move to next (beyond the only atom)
procession.move_next()
assert procession.has_next() is False
def test_procession_is_empty(self):
"""Test checking if procession is empty."""
procession = Procession()
assert procession.is_empty() is True
nucleon = Nucleon(content="Test", answer="Answer")
electron = Electron()
orbital = Orbital()
atom = Atom(nucleon=nucleon, electron=electron, orbital=orbital)
procession.add_atom(atom)
assert procession.is_empty() is False
def test_procession_clear(self):
"""Test clearing procession."""
procession = Procession()
nucleon = Nucleon(content="Test", answer="Answer")
electron = Electron()
orbital = Orbital()
atom = Atom(nucleon=nucleon, electron=electron, orbital=orbital)
procession.add_atom(atom)
procession.clear()
assert procession.queue == []
assert procession.current_index == 0
def test_procession_validation(self):
"""Test input validation for Procession."""
procession = Procession()
# Test add_atom with None
with pytest.raises(TypeError):
procession.add_atom(None)
# Test add_atom with invalid type
with pytest.raises(TypeError):
procession.add_atom("not an atom")
class TestFission:
"""Test cases for Fission class."""
def test_fission_creation(self):
"""Test Fission creation."""
fission = Fission()
assert fission.phaser is not None
assert isinstance(fission.phaser, Phaser)
def test_fission_initialize(self):
"""Test Fission initialization."""
fission = Fission()
nucleon = Nucleon(content="Test", answer="Answer")
electron = Electron()
orbital = Orbital()
atom = Atom(nucleon=nucleon, electron=electron, orbital=orbital)
fission.initialize(atom)
assert fission.phaser.atom == atom
assert fission.phaser.current_state == States.LEARNING
def test_fission_generate_learning_puzzle_cloze(self):
"""Test generating learning puzzle with cloze content."""
fission = Fission()
nucleon = Nucleon(
content="The capital of {{c1::France}} is Paris.", answer="France"
)
electron = Electron()
orbital = Orbital()
atom = Atom(nucleon=nucleon, electron=electron, orbital=orbital)
fission.initialize(atom)
puzzle = fission.generate_learning_puzzle()
assert puzzle is not None
assert "question" in puzzle
assert "answer" in puzzle
assert "type" in puzzle
def test_fission_generate_learning_puzzle_mcq(self):
"""Test generating learning puzzle with MCQ content."""
fission = Fission()
nucleon = Nucleon(content="What is the capital of France?", answer="Paris")
electron = Electron()
orbital = Orbital()
atom = Atom(nucleon=nucleon, electron=electron, orbital=orbital)
fission.initialize(atom)
puzzle = fission.generate_learning_puzzle()
assert puzzle is not None
assert "question" in puzzle
assert "options" in puzzle
assert "correct_index" in puzzle
assert "type" in puzzle
def test_fission_generate_review_puzzle(self):
"""Test generating review puzzle."""
fission = Fission()
nucleon = Nucleon(content="What is the capital of France?", answer="Paris")
electron = Electron(interval=10, repetitions=5) # In review phase
orbital = Orbital()
atom = Atom(nucleon=nucleon, electron=electron, orbital=orbital)
fission.initialize(atom)
fission.phaser.transition_to_review()
puzzle = fission.generate_review_puzzle()
assert puzzle is not None
assert "question" in puzzle
def test_fission_process_answer_correct(self):
"""Test processing correct answer."""
fission = Fission()
nucleon = Nucleon(content="Test", answer="Answer")
electron = Electron()
orbital = Orbital()
atom = Atom(nucleon=nucleon, electron=electron, orbital=orbital)
fission.initialize(atom)
result = fission.process_answer("Answer")
assert "success" in result
assert "quality" in result
assert "next_state" in result
assert result["success"] is True
def test_fission_process_answer_incorrect(self):
"""Test processing incorrect answer."""
fission = Fission()
nucleon = Nucleon(content="Test", answer="Answer")
electron = Electron()
orbital = Orbital()
atom = Atom(nucleon=nucleon, electron=electron, orbital=orbital)
fission.initialize(atom)
result = fission.process_answer("Wrong")
assert result["success"] is False
def test_fission_get_current_state(self):
"""Test getting current state."""
fission = Fission()
nucleon = Nucleon(content="Test", answer="Answer")
electron = Electron()
orbital = Orbital()
atom = Atom(nucleon=nucleon, electron=electron, orbital=orbital)
fission.initialize(atom)
state = fission.get_current_state()
assert state == States.LEARNING
def test_fission_validation(self):
"""Test input validation for Fission."""
fission = Fission()
# Test initialize with None
with pytest.raises(TypeError):
fission.initialize(None)
# Test process_answer without initialization
with pytest.raises(RuntimeError):
fission.process_answer("test")
# Test generate_learning_puzzle without initialization
with pytest.raises(RuntimeError):
fission.generate_learning_puzzle()
# Test generate_review_puzzle without initialization
with pytest.raises(RuntimeError):
fission.generate_review_puzzle()

View File

@@ -1,174 +0,0 @@
"""
Unit tests for service modules: Config, Hasher, Timer, Version, AudioService, TTSService
"""
import pytest
import json
import tempfile
import os
from pathlib import Path
from datetime import datetime, timezone, timedelta
# Config import commented out - actual class is ConfigFile
# Hasher import commented out - actual module only has functions
# Timer import commented out - actual module only has functions
from src.heurams.services.version import Version
from src.heurams.services.audio_service import AudioService
from src.heurams.services.tts_service import TTSService
class TestConfig:
"""Test cases for Config class."""
def test_config_placeholder(self):
"""Placeholder test - actual Config class is ConfigFile."""
# Skip config tests since actual class is ConfigFile
pass
class TestHasher:
"""Test cases for Hasher functions."""
def test_hasher_placeholder(self):
"""Placeholder test - hasher module only has functions."""
# Skip hasher tests since module only has functions
pass
class TestTimer:
"""Test cases for Timer functions."""
def test_timer_placeholder(self):
"""Placeholder test - timer module only has functions."""
# Skip timer tests since module only has functions
pass
class TestVersion:
"""Test cases for Version class."""
def test_version_creation(self):
"""Test Version creation."""
version = Version()
assert version.major == 0
assert version.minor == 4
assert version.patch == 0
def test_version_string(self):
"""Test Version string representation."""
version = Version()
version_str = str(version)
assert version_str == "0.4.0"
def test_version_from_string(self):
"""Test creating Version from string."""
version = Version.from_string("1.2.3")
assert version.major == 1
assert version.minor == 2
assert version.patch == 3
def test_version_comparison(self):
"""Test Version comparison."""
v1 = Version(1, 0, 0)
v2 = Version(1, 0, 0)
v3 = Version(1, 1, 0)
assert v1 == v2
assert v1 < v3
assert v3 > v1
def test_version_validation(self):
"""Test input validation for Version."""
# Test invalid version numbers
with pytest.raises(ValueError):
Version(-1, 0, 0)
with pytest.raises(ValueError):
Version(1, -1, 0)
with pytest.raises(ValueError):
Version(1, 0, -1)
# Test invalid string format
with pytest.raises(ValueError):
Version.from_string("1.2")
with pytest.raises(ValueError):
Version.from_string("1.2.3.4")
with pytest.raises(ValueError):
Version.from_string("a.b.c")
class TestAudioService:
"""Test cases for AudioService class."""
def test_audio_service_creation(self):
"""Test AudioService creation."""
audio_service = AudioService()
assert audio_service.enabled is True
def test_audio_service_play_sound(self):
"""Test playing a sound."""
audio_service = AudioService()
# This should not raise an exception
# (actual audio playback depends on system capabilities)
audio_service.play_sound("correct")
def test_audio_service_play_sound_disabled(self):
"""Test playing sound when disabled."""
audio_service = AudioService(enabled=False)
# Should not raise exception even when disabled
audio_service.play_sound("correct")
def test_audio_service_validation(self):
"""Test input validation for AudioService."""
audio_service = AudioService()
# Test play_sound with invalid sound type
with pytest.raises(ValueError):
audio_service.play_sound("invalid_sound")
class TestTTSService:
"""Test cases for TTSService class."""
def test_tts_service_creation(self):
"""Test TTSService creation."""
tts_service = TTSService()
assert tts_service.enabled is True
def test_tts_service_speak(self):
"""Test speaking text."""
tts_service = TTSService()
# This should not raise an exception
# (actual TTS depends on system capabilities)
tts_service.speak("Hello, world!")
def test_tts_service_speak_disabled(self):
"""Test speaking when disabled."""
tts_service = TTSService(enabled=False)
# Should not raise exception even when disabled
tts_service.speak("Hello, world!")
def test_tts_service_validation(self):
"""Test input validation for TTSService."""
tts_service = TTSService()
# Test speak with None
with pytest.raises(TypeError):
tts_service.speak(None)
# Test speak with empty string
with pytest.raises(ValueError):
tts_service.speak("")

View File

@@ -1,74 +0,0 @@
"""
Working unit tests for algorithm modules based on actual module structure.
"""
import pytest
from src.heurams.kernel.algorithms.sm2 import SM2Algorithm
class TestSM2Algorithm:
"""Test cases for SM2Algorithm class."""
def test_sm2_algorithm_creation(self):
"""Test SM2Algorithm creation."""
algorithm = SM2Algorithm()
assert SM2Algorithm.algo_name == "SM-2"
assert isinstance(SM2Algorithm.defaults, dict)
def test_sm2_defaults(self):
"""Test SM2Algorithm default values."""
defaults = SM2Algorithm.defaults
assert "efactor" in defaults
assert "rept" in defaults
assert "interval" in defaults
assert "next_date" in defaults
assert "is_activated" in defaults
assert "last_modify" in defaults
def test_sm2_is_due(self):
"""Test SM2Algorithm is_due method."""
algodata = {"SM-2": {"next_date": 0, "is_activated": 1}} # Past date
result = SM2Algorithm.is_due(algodata)
assert isinstance(result, bool)
def test_sm2_rate(self):
"""Test SM2Algorithm rate method."""
algodata = {"SM-2": {"efactor": 2.5, "rept": 5, "interval": 10}}
result = SM2Algorithm.rate(algodata)
assert isinstance(result, str)
def test_sm2_nextdate(self):
"""Test SM2Algorithm nextdate method."""
algodata = {"SM-2": {"next_date": 100}}
result = SM2Algorithm.nextdate(algodata)
assert isinstance(result, int)
def test_sm2_revisor(self):
"""Test SM2Algorithm revisor method."""
algodata = {
"SM-2": {
"efactor": 2.5,
"rept": 0,
"real_rept": 0,
"interval": 1,
"is_activated": 1,
"last_modify": 0,
}
}
# Should not raise an exception
SM2Algorithm.revisor(algodata, feedback=4, is_new_activation=False)
# Verify that algodata was modified
assert "efactor" in algodata["SM-2"]
assert "rept" in algodata["SM-2"]
assert "interval" in algodata["SM-2"]

View File

@@ -1,195 +0,0 @@
"""
Working unit tests for particle modules based on actual module structure.
"""
import pytest
from src.heurams.kernel.particles.atom import Atom
from src.heurams.kernel.particles.electron import Electron
from src.heurams.kernel.particles.nucleon import Nucleon
from src.heurams.kernel.particles.orbital import Orbital
class TestNucleon:
"""Test cases for Nucleon class based on actual implementation."""
def test_nucleon_creation(self):
"""Test basic Nucleon creation."""
payload = {"content": "Test content", "answer": "Test answer"}
nucleon = Nucleon("test_id", payload)
assert nucleon.ident == "test_id"
assert nucleon.payload == payload
def test_nucleon_getitem(self):
"""Test Nucleon item access."""
payload = {"content": "Question", "answer": "Answer"}
nucleon = Nucleon("test_id", payload)
assert nucleon["ident"] == "test_id"
assert nucleon["content"] == "Question"
assert nucleon["answer"] == "Answer"
def test_nucleon_iteration(self):
"""Test Nucleon iteration over payload keys."""
payload = {"content": "Q", "answer": "A", "notes": "N"}
nucleon = Nucleon("test_id", payload)
keys = list(nucleon)
assert "content" in keys
assert "answer" in keys
assert "notes" in keys
def test_nucleon_length(self):
"""Test Nucleon length."""
payload = {"content": "Q", "answer": "A"}
nucleon = Nucleon("test_id", payload)
assert len(nucleon) == 2
def test_nucleon_placeholder(self):
"""Test Nucleon placeholder creation."""
nucleon = Nucleon.placeholder()
assert isinstance(nucleon, Nucleon)
assert nucleon.ident == "核子对象样例内容"
assert nucleon.payload == {}
class TestElectron:
"""Test cases for Electron class based on actual implementation."""
def test_electron_creation(self):
"""Test basic Electron creation."""
electron = Electron("test_id")
assert electron.ident == "test_id"
assert isinstance(electron.algodata, dict)
def test_electron_with_algodata(self):
"""Test Electron creation with algorithm data."""
algodata = {"supermemo2": {"efactor": 2.5, "rept": 0}}
electron = Electron("test_id", algodata)
assert electron.ident == "test_id"
assert electron.algodata == algodata
def test_electron_activate(self):
"""Test Electron activation."""
electron = Electron("test_id")
# Should not raise an exception
electron.activate()
def test_electron_modify(self):
"""Test Electron modification."""
electron = Electron("test_id")
# Should not raise an exception
electron.modify("efactor", 2.8)
def test_electron_is_activated(self):
"""Test Electron activation status."""
electron = Electron("test_id")
# Should not raise an exception
result = electron.is_activated()
assert isinstance(result, (bool, int))
def test_electron_getitem(self):
"""Test Electron item access."""
electron = Electron("test_id")
assert electron["ident"] == "test_id"
# Should be able to access algorithm data after initialization
assert "efactor" in electron.algodata[electron.algo]
def test_electron_placeholder(self):
"""Test Electron placeholder creation."""
electron = Electron.placeholder()
assert isinstance(electron, Electron)
assert electron.ident == "电子对象样例内容"
class TestOrbital:
"""Test cases for Orbital TypedDict."""
def test_orbital_creation(self):
"""Test basic Orbital creation."""
orbital = Orbital(
quick_view=[["cloze", 1], ["mcq", 0.5]],
recognition=[["recognition", 1]],
final_review=[["cloze", 0.7], ["mcq", 0.7]],
puzzle_config={
"cloze": {"from": "content"},
"mcq": {"from": "keyword_note"},
},
)
assert isinstance(orbital, dict)
assert "quick_view" in orbital
assert "recognition" in orbital
assert "final_review" in orbital
assert "puzzle_config" in orbital
def test_orbital_quick_view(self):
"""Test Orbital quick_view configuration."""
orbital = Orbital(
quick_view=[["cloze", 1], ["mcq", 0.5]],
recognition=[],
final_review=[],
puzzle_config={},
)
assert len(orbital["quick_view"]) == 2
assert orbital["quick_view"][0] == ["cloze", 1]
class TestAtom:
"""Test cases for Atom class based on actual implementation."""
def test_atom_creation(self):
"""Test basic Atom creation."""
atom = Atom("test_atom")
assert atom.ident == "test_atom"
assert isinstance(atom.registry, dict)
def test_atom_link(self):
"""Test Atom linking components."""
atom = Atom("test_atom")
nucleon = Nucleon("nucleon_id", {"content": "Test"})
# Link nucleon
atom.link("nucleon", nucleon)
assert atom["nucleon"] == nucleon
def test_atom_getitem(self):
"""Test Atom item access."""
atom = Atom("test_atom")
# Should be able to access register items
assert atom["nucleon"] is None
assert atom["electron"] is None
assert atom["orbital"] is None
def test_atom_setitem(self):
"""Test Atom item assignment."""
atom = Atom("test_atom")
nucleon = Nucleon("nucleon_id", {"content": "Test"})
# Set nucleon
atom["nucleon"] = nucleon
assert atom["nucleon"] == nucleon
def test_atom_placeholder(self):
"""Test Atom placeholder creation."""
placeholder = Atom.placeholder()
assert isinstance(placeholder, tuple)
assert len(placeholder) == 3
assert isinstance(placeholder[0], Electron)
assert isinstance(placeholder[1], Nucleon)

View File

@@ -1,90 +0,0 @@
"""
Working unit tests for service modules based on actual module structure.
"""
import pytest
# Version import commented out - actual module only has variables
from src.heurams.services.audio_service import AudioService
from src.heurams.services.tts_service import TTSService
class TestVersion:
"""Test cases for Version variables."""
def test_version_variables(self):
"""Test version variables."""
from src.heurams.services.version import ver, stage
assert ver == "0.4.0"
assert stage == "prototype"
class TestAudioService:
"""Test cases for AudioService class."""
def test_audio_service_creation(self):
"""Test AudioService creation."""
audio_service = AudioService()
assert audio_service.enabled is True
def test_audio_service_play_sound(self):
"""Test playing a sound."""
audio_service = AudioService()
# This should not raise an exception
# (actual audio playback depends on system capabilities)
audio_service.play_sound("correct")
def test_audio_service_play_sound_disabled(self):
"""Test playing sound when disabled."""
audio_service = AudioService(enabled=False)
# Should not raise exception even when disabled
audio_service.play_sound("correct")
def test_audio_service_validation(self):
"""Test input validation for AudioService."""
audio_service = AudioService()
# Test play_sound with invalid sound type
with pytest.raises(ValueError):
audio_service.play_sound("invalid_sound")
class TestTTSService:
"""Test cases for TTSService class."""
def test_tts_service_creation(self):
"""Test TTSService creation."""
tts_service = TTSService()
assert tts_service.enabled is True
def test_tts_service_speak(self):
"""Test speaking text."""
tts_service = TTSService()
# This should not raise an exception
# (actual TTS depends on system capabilities)
tts_service.speak("Hello, world!")
def test_tts_service_speak_disabled(self):
"""Test speaking when disabled."""
tts_service = TTSService(enabled=False)
# Should not raise exception even when disabled
tts_service.speak("Hello, world!")
def test_tts_service_validation(self):
"""Test input validation for TTSService."""
tts_service = TTSService()
# Test speak with None
with pytest.raises(TypeError):
tts_service.speak(None)
# Test speak with empty string
with pytest.raises(ValueError):
tts_service.speak("")