2025-03-10 18:37:53 +08:00

163 lines
4.7 KiB
Python

import yaml
__dummy_mode__ = 1
cfg = None
if not __dummy_mode__:
import sys, os
sys.path.append("arduino_api")
sys.path.append("termux_lib")
import termux_lib as lib
import socket
import json
import os
import time
import threading
else:
import sys
sys.path.append("dummy_all")
import dummy_lib as lib
import socket
import json
import os
import time
import threading
def hwinit():
global hw
hw = dict()
hw["aile_left"] = lib.Flank("左副翼", "al")
hw["aile_right"] = lib.Flank("右副翼", "ar")
#hw["tail_vert"] = lib.Flank("垂直尾翼", "tv")
hw["tail_left"] = lib.Flank("左尾翼", "tl")
hw["tail_right"] = lib.Flank("右尾翼", "tr")
hw["engine_main"] = lib.Engine("一号引擎", "e1")
def check():
for i in hw.values():
i.selfchk()
print("操纵面与引擎自检完成, 等待远程指令")
def proc(data):
global hw
#print(data)
inf = json.loads(data.replace('\n', ';'))
print("DT"+data + "DT")
if inf["type"] == "cmd":
#print(f"执行命令")
exec(inf["cmd"])
elif inf["type"] == "syscmd":
#print(f"执行系统级命令")
os.system(inf["cmd"])
elif inf["type"] == "reset":
pass
#print(f"系统重置")
elif inf["type"] == "stop":
#print(f"退出系统")
return 1
else:
pass
#print(f"未知命令")
return 0
global isx
isx = 0
def stat():
global isx
status = dict()
status["devices"] = dict()
for i in hw.items():
status["devices"][i[0]] = i[1].status()
status["name"] = name
status["sensors"] = lib.Sensor.stat()
status["time"] = time.asctime()
isx+=1
return status
def debug_shell():
lib.Sensor.pseudo_gui()
def init_hardware():
global hwobj
hwobj = dict()
print("注册传感器")
for i in hw["surfaces"]:
hwobj[i.key] = lib.Surface(i.value, i.key)
print("注册操纵面")
for i in hw["surfaces"]:
hwobj[i.key] = lib.Surface(i.value, i.key)
print("注册引擎")
for i in hw["engines"]:
hwobj[i.key] = lib.Engine(i.value, i.key)
print("注册机炮")
for i in hw["cannons"]:
hwobj[i.key] = lib.Engine(i.value, i.key)
print("注册供能系统")
for i in hw["powersys"]:
hwobj[i.key] = lib.Engine(i.value, i.key)
print("注册机轮制动")
for i in hw["powersys"]:
hwobj[i.key] = lib.Engine(i.value, i.key)
print("注册雷达")
for i in hw["powersys"]:
hwobj[i.key] = lib.Engine(i.value, i.key)
print("注册挂架")
for i in hw[""]:
hwobj[i.key] = lib.Engine(i.value, i.key)
print("注册布尔元件")
for i in hw["bools"]:
hwobj[i.key] = lib.Surface(i.value, i.key)
def selfchk():
print("开始自检")
for i in hw:
print(f"{i}")
for i in hw["engines"]:
print(f"{j} ", end="")
if __name__ == "__main__":
name = "零号侧卫"
global hw
hw = dict()
port = 40808
print(f"Commdore 飞行控制系统 服务端")
print("正在加载配置文件")
with open("config.yaml") as cfg_file:
cfg = yaml.safe_load(cfg_file)
name = cfg["name"]
model = cfg["model"]
hw = cfg["hardware"]
print(f"型号: {model}")
print(f"名称: {name}")
print("正在初始化硬件")
print("启动 socket 网络通信")
print("启动备用 socket 网络通信")
dummy = socket.socket()
dummy.bind(("localhost", port)) # 端口监听
dummy.listen(1)
print(f"监听端口 " + str(port))
conn, address = dummy.accept()
print(f"来自 {address} 的连接已接收")
msg = json.dumps(stat()) # 状态回传
size = len(msg.encode())
conn.send(msg.encode("UTF-8").ljust(2048))
np = threading.Thread(target=debug_shell, name='Debugging')
np.start()
while True:
# 接收消息
print(lib.Sensor.data)
data: str = conn.recv(2048).decode("UTF-8")
if data == "" or data == None:
continue
print(f"命令接收: {data}")
ret = proc(data)
if ret == 1:
break
msg = json.dumps(stat()) # 状态回传
size = len(msg.encode())
conn.send(msg.encode("UTF-8").ljust(2048 - size)) # encode将字符串编码为字节数组对象
#print(msg.encode("UTF-8").decode('unicode_escape'))
#print(f"当前状态已回传")
# 关闭连接
conn.close()
dummy.close()
np.join()