AiraPulsar/lib/agency.py
2025-03-10 03:10:22 +08:00

98 lines
3.2 KiB
Python

"""
filename: agency.py
description: WebSocket network agency for AiraPulsar components.
This module serves as a universal client module for Moons,
facilitating communication between Pulsar/Comet and Moons.
"""
import websocket
import json
from loguru import logger
import threading
import time
# 配置日志
logger.add("agent_logs.log", rotation="1 MB", level="INFO", format="{time} - {level} - {message}")
class Agent(object):
"""
handler: function which processes args
receiver: sendto
emitter: sendfrom
content: original string data
data: formatted json data -> dict
"""
wsapp = None
is_connected = 0
def __init__(self, name, addr, handler):
logger.add("agent_logs.log", rotation="1 MB", level="INFO", format="{time} - {level} - {message}")
logger.info("初始化代理对象")
logger.info(f"卫星地址: {addr}")
logger.info(f"代理对象代号: {name}")
self.name = name
self.addr = addr
self.handler = handler
self.wsapp = websocket.WebSocketApp(
f"{addr}",
on_open=self.__on_open,
on_message=self.__on_message,
on_close=self.__on_close
)
def __dumper(self, receiver: str, content: dict):
msg = {
"id": self.name,
"receiver": receiver,
"content": content
}
return msg
def __loader(self, received_data: str):
try:
data = json.loads(received_data)
content = data["content"]
emitter = data["emitter"]
return {"content": content, "emitter": emitter}
except json.JSONDecodeError:
logger.error("接收到的消息不是有效的 JSON 格式")
return {"content": None, "emitter": None}
def __on_open(self, wsapp):
start_time = time.time()
self.send(receiver="auth", data="auth")
self.is_connected = True
end_time = time.time()
delta = end_time - start_time
logger.info(f"成功建立连接, 单向延迟 {round(delta*10000)/10}ms")
def __on_message(self, wsapp, message):
loaded_data = self.__loader(message)
if loaded_data["emitter"] is not None and loaded_data["content"] is not None:
self.handler(emitter=loaded_data["emitter"],
content=loaded_data["content"])
def __on_close(self, wsapp):
self.is_connected = False
logger.info("WebSocket 连接已关闭")
def send(self, receiver: str, data):
if isinstance(data, dict):
self.wsapp.send(
json.dumps(self.__dumper(receiver, data))
)
elif isinstance(data, str):
self.wsapp.send(
json.dumps(self.__dumper(receiver, data))
)
else:
logger.warning("数据类型不支持")
def run(self):
self.wsapp.run_forever()
def runasthread(self):
thread = threading.Thread(target=self.wsapp.run_forever)
thread.start()
logger.info("等待连接成功")
while self.is_connected == 0:
time.sleep(0.1)
logger.info("连接成功, 取消阻塞")