""" filename: agency.py description: WebSocket network agency for AiraPulsar components. This module serves as a universal client module for Moons, facilitating communication between Pulsar/Comet and Moons. """ import websocket import json from loguru import logger import threading import time # 配置日志 logger.add("agent_logs.log", rotation="1 MB", level="INFO", format="{time} - {level} - {message}") class Agent(object): """ handler: function which processes args receiver: sendto emitter: sendfrom content: original string data data: formatted json data -> dict """ wsapp = None is_connected = 0 def __init__(self, name, addr, handler): logger.add("agent_logs.log", rotation="1 MB", level="INFO", format="{time} - {level} - {message}") logger.info("初始化代理对象") logger.info(f"卫星地址: {addr}") logger.info(f"代理对象代号: {name}") self.name = name self.addr = addr self.handler = handler self.wsapp = websocket.WebSocketApp( f"{addr}", on_open=self.__on_open, on_message=self.__on_message, on_close=self.__on_close ) def __dumper(self, receiver: str, content: dict): msg = { "id": self.name, "receiver": receiver, "content": content } return msg def __loader(self, received_data: str): try: data = json.loads(received_data) content = data["content"] emitter = data["emitter"] return {"content": content, "emitter": emitter} except json.JSONDecodeError: logger.error("接收到的消息不是有效的 JSON 格式") return {"content": None, "emitter": None} def __on_open(self, wsapp): start_time = time.time() self.send(receiver="auth", data="auth") self.is_connected = True end_time = time.time() delta = end_time - start_time logger.info(f"成功建立连接, 单向延迟 {round(delta*10000)/10}ms") def __on_message(self, wsapp, message): loaded_data = self.__loader(message) if loaded_data["emitter"] is not None and loaded_data["content"] is not None: self.handler(emitter=loaded_data["emitter"], content=loaded_data["content"]) def __on_close(self, wsapp): self.is_connected = False logger.info("WebSocket 连接已关闭") def send(self, receiver: str, data): if isinstance(data, dict): self.wsapp.send( json.dumps(self.__dumper(receiver, data)) ) elif isinstance(data, str): self.wsapp.send( json.dumps(self.__dumper(receiver, data)) ) else: logger.warning("数据类型不支持") def run(self): self.wsapp.run_forever() def runasthread(self): thread = threading.Thread(target=self.wsapp.run_forever) thread.start() logger.info("等待连接成功") while self.is_connected == 0: time.sleep(0.1) logger.info("连接成功, 取消阻塞")