You've already forked HeurAMS-legacy
115 lines
4.4 KiB
Python
115 lines
4.4 KiB
Python
import unittest
|
|
from unittest.mock import Mock, patch, MagicMock
|
|
|
|
from heurams.kernel.reactor.phaser import Phaser
|
|
from heurams.kernel.reactor.states import PhaserState, ProcessionState
|
|
from heurams.kernel.particles.atom import Atom
|
|
from heurams.kernel.particles.electron import Electron
|
|
|
|
|
|
class TestPhaser(unittest.TestCase):
|
|
"""测试 Phaser 类"""
|
|
|
|
def setUp(self):
|
|
# 创建模拟的 Atom 对象
|
|
self.atom_new = Mock(spec=Atom)
|
|
self.atom_new.registry = {"electron": Mock(spec=Electron)}
|
|
self.atom_new.registry["electron"].is_activated.return_value = False
|
|
|
|
self.atom_old = Mock(spec=Atom)
|
|
self.atom_old.registry = {"electron": Mock(spec=Electron)}
|
|
self.atom_old.registry["electron"].is_activated.return_value = True
|
|
|
|
# 模拟 Procession 类以避免复杂依赖
|
|
self.procession_patcher = patch("heurams.kernel.reactor.phaser.Procession")
|
|
self.mock_procession_class = self.procession_patcher.start()
|
|
|
|
def tearDown(self):
|
|
self.procession_patcher.stop()
|
|
|
|
def test_init_with_mixed_atoms(self):
|
|
"""测试混合新旧原子的初始化"""
|
|
atoms = [self.atom_old, self.atom_new, self.atom_old]
|
|
phaser = Phaser(atoms)
|
|
|
|
# 应该创建两个 Procession: 一个用于旧原子, 一个用于新原子, 以及一个总体复习
|
|
self.assertEqual(self.mock_procession_class.call_count, 3)
|
|
|
|
# 检查调用参数
|
|
calls = self.mock_procession_class.call_args_list
|
|
# 第一个调用应该是旧原子的初始复习
|
|
self.assertEqual(calls[0][0][0], [self.atom_old, self.atom_old])
|
|
self.assertEqual(calls[0][0][1], PhaserState.QUICK_REVIEW)
|
|
# 第二个调用应该是新原子的识别阶段
|
|
self.assertEqual(calls[1][0][0], [self.atom_new])
|
|
self.assertEqual(calls[1][0][1], PhaserState.RECOGNITION)
|
|
# 第三个调用应该是所有原子的总体复习
|
|
self.assertEqual(calls[2][0][0], atoms)
|
|
self.assertEqual(calls[2][0][1], PhaserState.FINAL_REVIEW)
|
|
|
|
def test_init_only_old_atoms(self):
|
|
"""测试只有旧原子"""
|
|
atoms = [self.atom_old, self.atom_old]
|
|
phaser = Phaser(atoms)
|
|
|
|
# 应该创建两个 Procession: 一个初始复习, 一个总体复习
|
|
self.assertEqual(self.mock_procession_class.call_count, 2)
|
|
calls = self.mock_procession_class.call_args_list
|
|
self.assertEqual(calls[0][0][0], atoms)
|
|
self.assertEqual(calls[0][0][1], PhaserState.QUICK_REVIEW)
|
|
self.assertEqual(calls[1][0][0], atoms)
|
|
self.assertEqual(calls[1][0][1], PhaserState.FINAL_REVIEW)
|
|
|
|
def test_init_only_new_atoms(self):
|
|
"""测试只有新原子"""
|
|
atoms = [self.atom_new, self.atom_new]
|
|
phaser = Phaser(atoms)
|
|
|
|
self.assertEqual(self.mock_procession_class.call_count, 2)
|
|
calls = self.mock_procession_class.call_args_list
|
|
self.assertEqual(calls[0][0][0], atoms)
|
|
self.assertEqual(calls[0][0][1], PhaserState.RECOGNITION)
|
|
self.assertEqual(calls[1][0][0], atoms)
|
|
self.assertEqual(calls[1][0][1], PhaserState.FINAL_REVIEW)
|
|
|
|
def test_current_procession_finds_unfinished(self):
|
|
"""测试 current_procession 找到未完成的 Procession"""
|
|
# 创建模拟 Procession 实例
|
|
mock_proc1 = Mock()
|
|
mock_proc1.state = ProcessionState.FINISHED
|
|
mock_proc2 = Mock()
|
|
mock_proc2.state = ProcessionState.RUNNING
|
|
mock_proc2.phase = PhaserState.QUICK_REVIEW
|
|
|
|
phaser = Phaser([])
|
|
phaser.processions = [mock_proc1, mock_proc2]
|
|
|
|
result = phaser.current_procession()
|
|
self.assertEqual(result, mock_proc2)
|
|
self.assertEqual(phaser.state, PhaserState.QUICK_REVIEW)
|
|
|
|
def test_current_procession_all_finished(self):
|
|
"""测试所有 Procession 都完成"""
|
|
mock_proc = Mock()
|
|
mock_proc.state = ProcessionState.FINISHED
|
|
|
|
phaser = Phaser([])
|
|
phaser.processions = [mock_proc]
|
|
|
|
result = phaser.current_procession()
|
|
self.assertEqual(result, 0)
|
|
self.assertEqual(phaser.state, PhaserState.FINISHED)
|
|
|
|
def test_current_procession_empty(self):
|
|
"""测试没有 Procession"""
|
|
phaser = Phaser([])
|
|
phaser.processions = []
|
|
|
|
result = phaser.current_procession()
|
|
self.assertEqual(result, 0)
|
|
self.assertEqual(phaser.state, PhaserState.FINISHED)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|