diff --git a/testfield/legacy/README.md b/testfield/legacy/README.md new file mode 100644 index 0000000..0b208a1 --- /dev/null +++ b/testfield/legacy/README.md @@ -0,0 +1,6 @@ +# Commodore 飞行控制系统 [存档] +这是 AiraPulsar 的前身, commodore 飞行控制系统. +与 AiraPulsar 相比, 它只有 client 和 server 之间相互通讯, 导致 server 端需要内网穿透才能受控. +此外, commodore 使用原始的 socket 协议通讯, 粘包问题导致信息需要以 4KB/次 的速率恒定传输, 且 JSON 数据大小受到限制, 稳定性不足. +commodore 代码缺乏模块化, 难以维护 +尽管它代码风格杂乱无序, 但它还是勉强完成了飞行控制同步的基本功能, 保留此文件夹仅作留存参考用, 并无实用价值 diff --git a/testfield/legacy/art/logo.kra b/testfield/legacy/art/logo.kra new file mode 100644 index 0000000..2a047dd Binary files /dev/null and b/testfield/legacy/art/logo.kra differ diff --git a/testfield/legacy/client/client.py b/testfield/legacy/client/client.py new file mode 100644 index 0000000..d0af391 --- /dev/null +++ b/testfield/legacy/client/client.py @@ -0,0 +1,69 @@ +from os import environ +environ['PYGAME_HIDE_SUPPORT_PROMPT'] = '1' +import pygame +from pygame.locals import * +import cv2 +import client_lib as lib +import client_elements as elements +import math +import time +import threading +import logging + +uitest_enabled = 1 +global running +elements = list() + +class Element: + def __init__(self, name, desc): + self.name = name + self.desc = desc + + def action(self): + print("default element action") + print(f"name: {self.name}") + print(f"desc: {self.desc}") + + + +def data_proc(): + global running + pass + +def input_proc(): + global running + while running: + for event in pygame.event.get(): + if event.type == pygame.QUIT: + running = False + if event.type == pygame.MOUSEMOTION: + loggin.info("鼠标: ", event.pos[0] - mousepos[0], event.pos[1] - mousepos[1]) + mousepos = event.pos + pass + +def main(): + colors = {"main":"white", "remote":"green", "warn":"red"} + fonts = {"medium":pygame.font.Font('src/font.ttf', 20), "big":pygame.font.Font('src/font.ttf', 40), "huge":pygame.font.Font('src/font.ttf', 60), "small":pygame.font.Font('src/font.ttf', 16), "tiny":pygame.font.Font('src/font.ttf', 12)} + height = 1080 + width = int(height / 9 * 16) + pygame.init() + pygame.display.set_caption(f"Commodore 远程终端") + screen = pygame.display.set_mode((width, height), RESIZABLE) + icon = pygame.image.load("src/icon.ico").convert_alpha() + pygame.display.set_icon(icon) + global running + running = True + pygame.mouse.set_visible(False) + pygame.mouse.set_pos([width / 2, height / 2]) + mousepos = (width / 2, height / 2) + input_thr = threading.Thread(target=input_proc, name='InputProcess') + input_thr.start() + data_thr = threading.Thread(target=data_proc, name='DataProcess') + data_thr.start() + + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + if uitest_enabled == 0: + pass # load data \ No newline at end of file diff --git a/testfield/legacy/client/legacy/client_cli.py b/testfield/legacy/client/legacy/client_cli.py new file mode 100644 index 0000000..6d0a47a --- /dev/null +++ b/testfield/legacy/client/legacy/client_cli.py @@ -0,0 +1,29 @@ +import socket +import json +import logging + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +def operate(): + strs = input() + return strs + +dest = "127.0.0.1" +port = 40808 + +logging.info("启动 AZURE 客户端") +logging.info("启动远程主机网络通讯") +remote = socket.socket() +remote.connect((dest, port)) + +data: str = remote.recv(2048).decode("UTF-8") +first = json.loads(data) +devices = first["devices"] +name = first["name"] +logging.info(f"已链接至 {name}") + +while True: + msg = json.dumps(operate()) # 状态回传 + size = len(msg.encode()) + remote.send(msg.encode("UTF-8").ljust(2048 - size)) # encode将字符串编码为字节数组对象 + logging.info("命令已回传") diff --git a/testfield/legacy/client/legacy/client_gui_legacy copy 2.py b/testfield/legacy/client/legacy/client_gui_legacy copy 2.py new file mode 100644 index 0000000..4979ac0 --- /dev/null +++ b/testfield/legacy/client/legacy/client_gui_legacy copy 2.py @@ -0,0 +1,363 @@ +from os import environ +environ['PYGAME_HIDE_SUPPORT_PROMPT'] = '1' +import pygame +from pygame.locals import * +import cv2 +import client_lib as lib +import math +import time +import threading +import logging + +camera_enabled = 0 + +def camera_refresh(): + global frame_surface + if camera_enabled: + frame = camera.read()[1] + frame_rgb = cv2.resize(cv2.flip(cv2.rotate(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), cv2.ROTATE_90_CLOCKWISE), 1), (height, width)) + frame_surface = pygame.surfarray.make_surface(frame_rgb) + else: + frame_surface = pygame.Surface((width, height), pygame.SRCALPHA) + frame_surface.fill((0, 0, 0, 250)) # 将表面填充为完全透明 + +def welcome(): + for i in range(0, 120): + screen.fill("black") + text = fonts['huge'].render("Initialize Azure Network BVLOS Flighting Controlling System", True, colors['main'], None) + screen.blit(text, (width / 2 - text.get_width() / 2, height / 2 - 150)) + text = fonts['big'].render(f"Remote connection: {remote.name}", True, colors['main'], None) + screen.blit(text, (width / 2 - text.get_width() / 2, height / 2)) + pygame.display.flip() + clock.tick(60) +def cmdgen(): + global devices + cmd = "" + for i in devices.keys(): + word = "" + if remote.jdata["devices"][i]['type'] == "Flank": + word = "angle" + if remote.jdata["devices"][i]['type'] == "Engine": + word = "speed" + cmd += f"hw['{i}'].set_{word}({devices[i]})\n" + # loggin.info(cmd) + return cmd +def transmit(type, cmd): + msg = dict() + msg['type'] = type + msg['cmd'] = cmd + msg['time'] = time.asctime() + remote.jsend(msg) +def update(): + global remote_devices, sensors + remote.refresh() + types = {'Flank':'angle', 'Engine':'speed'} + for i in devices.keys(): + remote_devices[i] = remote.jdata['devices'][i][types[remote.jdata['devices'][i]['type']]] + for i in sensors.keys(): + sensors[i] = remote.jdata['sensors'][i] +def calc_pair2angle(dyna): + tan = (dyna[0][1] - dyna[1][1]) / (dyna[0][0] - dyna[1][0]) + return round(math.degrees(math.atan(tan)), 1) +def calc_langle(posx): + if (width / 2 - 30 < mousepos[0] < width / 2 + 30) and (height / 2 - 30 < mousepos[1] < height / 2 + 30): + global f_shootmode + f_shootmode = 1 + return 0 + f_shootmode = 0 + a = abs(posx - width / 2) / (width * 0.76) * lwidth # 底边 + b = vdist + angle_a = math.atan(a / b) + angle_a = math.degrees(angle_a) + if posx < width / 2: + angle_a = -angle_a + return angle_a +def calc_vangle(posy): + if (width / 2 - 30 < mousepos[0] < width / 2 + 30) and (height / 2 - 30 < mousepos[1] < height / 2 + 30): + global f_shootmode + f_shootmode = 1 + return 0 + f_shootmode = 0 + a = abs(posy - height / 2) / (height * 0.5) * vwidth # 底边 + b = vdist + angle_a = math.atan(a / b) + angle_a = math.degrees(angle_a) + if posy < height / 2: + angle_a = -angle_a + return angle_a +def calc_langle2(posx): + a = abs(posx - width / 2) / (width * 0.76) * lwidth # 底边 + b = vdist + angle_a = math.atan(a / b) + angle_a = math.degrees(angle_a) + if posx < width / 2: + angle_a = -angle_a + return angle_a +def calc_vangle2(posy): + a = abs(posy - height / 2) / (height * 0.5) * vwidth # 底边 + b = vdist + angle_a = math.atan(a / b) + angle_a = math.degrees(angle_a) + if posy < height / 2: + angle_a = -angle_a + return angle_a +def showtext(text, pos, color = 'main', fontid = "medium"): + db_text = fonts[fontid].render(text, 0 if fontid == "tiny" else 1, colors[color], None) + screen.blit(db_text, pos) +def showline(pos1, pos2, width = 2, color = 'main'): + pygame.draw.line(screen, colors[color], pos1, pos2, width) +def debug_ui(): + global sensors + showtext(f"远程方位角(Z): {sensors['zangle']}", (32, 844), fontid = "tiny") + showtext(f"远程滚动角(Y): {sensors['yangle']}", (32, 858), fontid = "tiny") + showtext(f"远程倾斜角(X): {sensors['xangle']}", (32, 872), fontid = "tiny") + showtext(f"本地滚动角(Y): {langle}", (32, 886), fontid = "tiny") + showtext(f"本地倾斜角(X): {vangle}", (32, 900), fontid = "tiny") + showtext(f"远程主机时间戳: {remote.jdata['time']}", (32, 914), fontid = "tiny") + showtext(f"航行灯状态: {'打开' if sensors['torch'] else '关闭'}", (32, 928), fontid = "tiny") + showtext(f"瞬时加速度: {sensors['acc']}", (32, 942), fontid = "tiny") + showtext(f"远程信号强度: {sensors['sign']}", (32, 956), fontid = "tiny") + showtext(f"远程 CPU 负载: {sensors['cpuload'] * 100} %", (32, 970), fontid = "tiny") + showtext(f"远程内存占用: {sensors['memload'] * 100} %", (32, 984), fontid = "tiny") + showtext(f"武装: 无", (32, 998), fontid = "tiny") + showtext(f"挂载: 无", (32, 1012), fontid = "tiny") +def warn_layer(text): + if 3 < flamenum % 15 < 12: + showtext(text, (width / 2 - fonts['huge'].render(text, True, colors['warn'], None).get_width() / 2, height / 2 - 150), color=colors['warn'], fontid="huge") +def text_layer(): + global sensors, remote_devices + showtext(f"远程主机: {remote.name}", (0, 0)) + showtext(f"虚拟视距: {vdist}cm", (0, 24)) + showtext(f"水平指示器宽度: {lwidth}cm", (0, 48)) + showtext(f"竖直指示器宽度: {round(vwidth)}cm", (0, 72)) + oriword = {0: "[N]", 90: "[E]", 180: "[S]", 270: "[W]"} + for i in range(0, 13): + showtext(f'{str(oriword[n] if (n := round(abs(calc_langle2(width * 0.12 + width * 0.76 * i / 12) + sensors["zangle"]))) in oriword.keys() else n).zfill(2)}', + (width * 0.12 + width * 0.76 * i / 12 - 12, height * 0.1 + width * 0.01)) + + for i in range(0, 9): + showtext(f'{str(round(abs(calc_vangle2(height * 0.25 + height * 0.5 * i / 8) - sensors["yangle"]))).zfill(2)}', + (width * 0.926, height * 0.25 + height * 0.49 * i / 8 - 3), fontid="small") + showtext(f'{str(round(abs(calc_vangle2(height * 0.25 + height * 0.5 * i / 8) - sensors["yangle"]))).zfill(2)}', + (width * 0.064, height * 0.25 + height * 0.49 * i / 8 - 3), fontid="small") + + showtext("右副翼 >", (width * 0.95, height * 0.81), fontid="tiny") + showtext(f"{devices['aile_right']}/{round(remote_devices['aile_right'], 1)}", (width * 0.955, height * 0.825), fontid="tiny") + showtext("右尾翼 >", (width * 0.95, height * 0.93), fontid="tiny") + showtext(f"{devices['tail_right']}/{round(remote_devices['tail_right'], 1)}", (width * 0.955, height * 0.915), fontid="tiny") + showtext("左副翼 >", (width * 0.92, height * 0.81), fontid="tiny") + showtext(f"{devices['aile_left']}/{round(remote_devices['aile_left'], 1)}", (width * 0.925, height * 0.825), fontid="tiny") + showtext("左尾翼 >", (width * 0.92, height * 0.93), fontid="tiny") + showtext(f"{devices['tail_left']}/{round(remote_devices['tail_left'], 1)}", (width * 0.925, height * 0.915), fontid="tiny") + showtext("转速", (width * 0.874, height * 0.923), fontid="tiny") + showtext(f"{str(devices['engine_main']).zfill(3)}", (width * 0.8745, height * 0.938), fontid="tiny") + showtext("空速", (width * 0.844, height * 0.923), fontid="tiny") + showtext(f"{str(sensors['speed']).zfill(3)}", (width * 0.8445, height * 0.938), fontid="tiny") + showtext("能量", (width * 0.814, height * 0.923), fontid="tiny") + showtext(f"{str(sensors['battery'] * 100).zfill(2)}%", (width * 0.8145, height * 0.938), fontid="tiny") +def netproc(): + while running: + update() + transmit("cmd", cmdgen()) + time.sleep(0.05) + +if __name__ == "__main__": + + vdist = 30 # 虚拟视距 + lwidth = 60 # 水平指示器距离 + vwidth = lwidth * 0.5 / 0.76 # 竖直指示器距离 + # 网络初始化 + remote = lib.Net(dest = "127.0.0.1", port = 40808) + # 数据初始化 + devices = dict() + vangle = None + langle = None + for i in remote.jdata['devices'].keys(): + devices[i] = 0 + remote_devices = dict() + for i in remote.jdata['devices'].keys(): + remote_devices[i] = 0 + sensors = dict() + for i in remote.jdata['sensors'].keys(): + sensors[i] = 0 + name = remote.name + # 摄像头初始化 + if camera_enabled: + camera = cv2.VideoCapture(1) + # 图形初始化 + pygame.init() + pygame.display.set_caption(f"Azure 远程终端 - 连接到 {name}") + flamenum = 0 + colors = {"main":"white", "remote":"green", "warn":"red"} + fonts = {"medium":pygame.font.Font('src/font.ttf', 20), "big":pygame.font.Font('src/font.ttf', 40), "huge":pygame.font.Font('src/font.ttf', 60), "small":pygame.font.Font('src/font.ttf', 16), "tiny":pygame.font.Font('src/font.ttf', 12)} + + height = 1080 + width = int(height / 9 * 16) + heightp = 1080 + widthp = int(height / 9 * 16) + + screen = pygame.display.set_mode((width, height), RESIZABLE) + icon = pygame.image.load("src/icon.ico").convert_alpha() + pygame.display.set_icon(icon) + clock = pygame.time.Clock() + running = True + pygame.mouse.set_visible(False) + pygame.mouse.set_pos([width / 2, height / 2]) + f_shootmode = 0 + + #welcome() + isfullscreen = 0 + camera_refresh() + mousepos = (width / 2, height / 2) + np = threading.Thread(target=netproc, name='NetworkProcessing') + np.start() + while running: + for event in pygame.event.get(): + if event.type == pygame.QUIT: + running = False + if event.type == pygame.MOUSEMOTION: + #loggin.info(mousepos) + #loggin.info("鼠标: ", event.pos[0] - mousepos[0], event.pos[1] - mousepos[1]) + mousepos = event.pos + if event.type == pygame.VIDEORESIZE: + height = event.size[1] + width = event.size[0] + screen = pygame.display.set_mode((width, height), RESIZABLE) + flamenum += 1 + if flamenum % 2 == 1: + camera_refresh() + screen.blit(frame_surface, (0, 0)) + # 计算 + vangle = calc_vangle(mousepos[1]) + langle = calc_langle(mousepos[0]) + # 常态指示器 + showline((width * 0.08, height * 0.75), (width * 0.08, height * 0.25), 2) # 高度指示 + showline((width * 0.92, height * 0.75), (width * 0.92, height * 0.25), 2) # 高度指示 + showline((width * 0.97, height * 0.86), (width * 0.95, height * 0.86), 2) # 翼面指示 1 + showline((width * 0.97, height * 0.90), (width * 0.95, height * 0.90), 2) # 翼面指示 2 + showline((width * 0.94, height * 0.86), (width * 0.92, height * 0.86), 2) # 翼面指示 1 + showline((width * 0.94, height * 0.90), (width * 0.92, height * 0.90), 2) # 翼面指示 2 + showline((width * 0.88, height * 0.1), (width * 0.12, height * 0.1), 2) # 方向指示 + showline((width / 2 - 20, height / 2), (width / 2 + 20, height / 2), 1 if not f_shootmode else 3, 'main' if not f_shootmode else 'warn') # 准星指示 + showline((width / 2, height / 2 + 20), (width / 2, height / 2 - 20), 1 if not f_shootmode else 3, 'main' if not f_shootmode else 'warn') # 准星指示 + + for i in range(0, 9): + showline((width * 0.92, height * 0.25 + height * 0.5 * i / 8), (width * 0.91, height * 0.25 + height * 0.5 * i / 8), 2) # 高度指示 + showline((width * 0.08, height * 0.25 + height * 0.5 * i / 8), (width * 0.09, height * 0.25 + height * 0.5 * i / 8), 2) # 高度指示 + + for i in range(0, 13): + showline((width * 0.12 + width * 0.76 * i / 12, height * 0.1), (width * 0.12 + width * 0.76 * i / 12, height * 0.1 + width * 0.01), 2) # 水平指示 + + # 鼠标复位 + # pygame.mouse.set_pos([width / 2, height / 2]) + + # 指示器 + dyna1 = ((width * 0.97, height * 0.86 - width * 0.01 * math.tan(math.radians(vangle)) + width * 0.01 * math.tan(math.radians(langle))), + (width * 0.95, height * 0.86 + width * 0.01 * math.tan(math.radians(vangle)) - width * 0.01 * math.tan(math.radians(langle)))) + dyna2 = ((width * 0.97, height * 0.90 - width * 0.01 * math.tan(math.radians(vangle)) + width * 0.01 * math.tan(math.radians(langle))), + (width * 0.95, height * 0.90 + width * 0.01 * math.tan(math.radians(vangle)) - width * 0.01 * math.tan(math.radians(langle)))) + dyna3 = ((width * 0.94, height * 0.86 - width * 0.01 * math.tan(math.radians(vangle)) - width * 0.01 * math.tan(math.radians(langle))), + (width * 0.92, height * 0.86 + width * 0.01 * math.tan(math.radians(vangle)) + width * 0.01 * math.tan(math.radians(langle)))) + dyna4 = ((width * 0.94, height * 0.90 - width * 0.01 * math.tan(math.radians(vangle)) - width * 0.01 * math.tan(math.radians(langle))), + (width * 0.92, height * 0.90 + width * 0.01 * math.tan(math.radians(vangle)) + width * 0.01 * math.tan(math.radians(langle)))) + + + showline(dyna1[0], dyna1[1], 2) # 翼面指示 1 + showline(dyna2[0], dyna2[1], 2) # 翼面指示 2 + showline(dyna3[0], dyna3[1], 2) # 翼面指示 1 + showline(dyna4[0], dyna4[1], 2) # 翼面指示 2 + + showline((width * 0.97, height * 0.86 + math.tan(math.radians(remote_devices['aile_left'])) * 0.01 * width), + (width * 0.95, height * 0.86 - math.tan(math.radians(remote_devices['aile_left'])) * 0.01 * width), 2, color="remote") # 翼面指示 1 + showline((width * 0.97, height * 0.90 + math.tan(math.radians(remote_devices['aile_right'])) * 0.01 * width), + (width * 0.95, height * 0.90 - math.tan(math.radians(remote_devices['aile_right'])) * 0.01 * width), 2, color="remote") # 翼面指示 1 + showline((width * 0.94, height * 0.86 + math.tan(math.radians(remote_devices['tail_left'])) * 0.01 * width), + (width * 0.92, height * 0.86 - math.tan(math.radians(remote_devices['tail_left'])) * 0.01 * width), 2, color="remote") # 翼面指示 1 + showline((width * 0.94, height * 0.90 + math.tan(math.radians(remote_devices['tail_right'])) * 0.01 * width), + (width * 0.92, height * 0.90 - math.tan(math.radians(remote_devices['tail_right'])) * 0.01 * width), 2, color="remote") # 翼面指示 1 + showline((width * 0.88, height * 0.92), (width * 0.88, height * 0.92 - devices['engine_main']), 20, color="main") # 转速指示 1 + showline((width * 0.88, height * 0.92), (width * 0.88, height * 0.92 - remote_devices['engine_main']), 16, color="remote") # 转速指示 2 + showline((width * 0.849, height * 0.92), (width * 0.849, height * 0.92 - sensors["speed"]), 20, color="remote") # 空速指示 1 + showline((width * 0.82, height * 0.92), (width * 0.82, height * 0.92 - 100), 20, color="main") # 能量指示 2 + showline((width * 0.82, height * 0.92), (width * 0.82, height * 0.92 - sensors["battery"] * 100), 16, color="remote") # 能量指示 1 + + devices['aile_left'] = calc_pair2angle(dyna1) + devices['aile_right'] = calc_pair2angle(dyna2) + devices['tail_left'] = calc_pair2angle(dyna3) + devices['tail_right'] = calc_pair2angle(dyna4) + + text_layer() + # 键盘事件 + keys = pygame.key.get_pressed() + if keys[pygame.K_ESCAPE]: + running = 0 + if keys[pygame.K_r]: + pygame.mouse.set_pos([width / 2, height / 2]) + if keys[pygame.K_w]: + pygame.mouse.set_pos([mousepos[0], mousepos[1] + 3]) + if keys[pygame.K_s]: + pygame.mouse.set_pos([mousepos[0], mousepos[1] - 3]) + if keys[pygame.K_a]: + pygame.mouse.set_pos([mousepos[0] - 3, mousepos[1]]) + if keys[pygame.K_d]: + pygame.mouse.set_pos([mousepos[0] + 3, mousepos[1]]) + if keys[pygame.K_LSHIFT]: + devices['engine_main'] += 1 + if keys[pygame.K_LCTRL]: + devices['engine_main'] -= 1 + if keys[pygame.K_1]: + warn_layer("失速警告") + if keys[pygame.K_2]: + warn_layer("失控") + if keys[pygame.K_3]: + warn_layer("通讯异常") + if keys[pygame.K_4]: + warn_layer("不明硬件损毁") + if keys[pygame.K_5]: + warn_layer("紧急重启") + if keys[pygame.K_6]: + warn_layer("紧急停机") + if keys[pygame.K_7]: + warn_layer("紧急关闭") + if keys[pygame.K_8]: + warn_layer("紧急过载") + if keys[pygame.K_9]: + warn_layer("操纵面失控") + if keys[pygame.K_0]: + warn_layer("引擎失控") + if keys[pygame.K_F11]: + if not isfullscreen: + isfullscreen = True + width = widthp + height = heightp + SIZE = width, height = pygame.display.list_modes()[0] + screen = pygame.display.set_mode(SIZE, FULLSCREEN) + else: + isfullscreen = False + screen = pygame.display.set_mode((width, height), RESIZABLE) + if keys[pygame.K_b]: + colors["main"] = "white" + if keys[pygame.K_n]: + colors["main"] = "blue" + if keys[pygame.K_m]: + colors["main"] = "green" + if keys[pygame.K_v]: + colors["main"] = "red" + # 文字显示 + # debug + debug_ui() + pygame.draw.circle(screen, colors['main' if not f_shootmode else 'warn'], mousepos, 2) + pygame.draw.circle(screen, colors['main'], mousepos, 20, 3) if not f_shootmode else '' + # flip() the display to put your work on screen + pygame.display.flip() + showline((width * 0.88, height * 0.92), (width * 0.88, height * 0.92 - devices['engine_main']), 20) # 转速指示 1 + showline((width * 0.88, height * 0.92), (width * 0.88, height * 0.92 - remote_devices['engine_main']), 16, color="remote") # 转速指示 2 + showline((width * 0.849, height * 0.92), (width * 0.849, height * 0.92 - sensors['speed']), 20, color="remote") # 空速指示 2 + showline((width * 0.88, height * 0.92), (width * 0.88, height * 0.92 - devices['engine_main']), 20) # 转速指示 1 + showline((width * 0.88, height * 0.92), (width * 0.88, height * 0.92 - remote_devices['engine_main']), 16, color="remote") # 转速指示 2 + showline((width * 0.849, height * 0.92), (width * 0.849, height * 0.92 - sensors['speed']), 20, color="remote") # 空速指示 2 + + dt = clock.tick(60) / 1000 + + np.join() + pygame.quit() diff --git a/testfield/legacy/client/legacy/client_gui_legacy copy.py b/testfield/legacy/client/legacy/client_gui_legacy copy.py new file mode 100644 index 0000000..a24ee3e --- /dev/null +++ b/testfield/legacy/client/legacy/client_gui_legacy copy.py @@ -0,0 +1,191 @@ +from os import environ +environ['pg_HIDE_SUPPORT_PROMPT'] = '1' +import pygame as pg +def st(text, pos, color = 'main', fontid = "medium"): + db_text = fonts[fontid].render(text, 0 if fontid == "tiny" else 1, colors[color], None) + screen.blit(db_text, pos) +def sl(pos1, pos2, w = 2, color = 'main'): + pg.draw.line(screen, colors[color], pos1, pos2, w) +def debug_ui(): + global sensors + st(f"远程方位角(Z): {sensors['zangle']}", (32, 844), fontid = "tiny") + st(f"远程滚动角(Y): {sensors['yangle']}", (32, 858), fontid = "tiny") + st(f"远程倾斜角(X): {sensors['xangle']}", (32, 872), fontid = "tiny") + st(f"本地滚动角(Y): {langle}", (32, 886), fontid = "tiny") + st(f"本地倾斜角(X): {vangle}", (32, 900), fontid = "tiny") +def warn_layer(text): + if 3 < flamenum % 15 < 12: + st(text, (w / 2 - fonts['huge'].render(text, True, colors['warn'], None).get_w() / 2, h / 2 - 150), color=colors['warn'], fontid="huge") +def text_layer(): + global sensors, rdev + st(f"远程主机: {remote.name}", (0, 0)) + st(f"虚拟视距: {vdist}cm", (0, 24)) + st(f"水平指示器宽度: {lw}cm", (0, 48)) + st(f"竖直指示器宽度: {round(vw)}cm", (0, 72)) + oriword = {0: "[N]", 90: "[E]", 180: "[S]", 270: "[W]"} + for i in range(0, 13): + st(f'{str(oriword[n] if (n := round(abs(calc_langle2(w * 0.12 + w * 0.76 * i / 12) + sensors["zangle"]))) in oriword.keys() else n).zfill(2)}', + (w * 0.12 + w * 0.76 * i / 12 - 12, h * 0.1 + w * 0.01)) + + for i in range(0, 9): + st(f'{str(round(abs(calc_vangle2(h * 0.25 + h * 0.5 * i / 8) - sensors["yangle"]))).zfill(2)}', + (w * 0.926, h * 0.25 + h * 0.49 * i / 8 - 3), fontid="small") + st(f'{str(round(abs(calc_vangle2(h * 0.25 + h * 0.5 * i / 8) - sensors["yangle"]))).zfill(2)}', + (w * 0.064, h * 0.25 + h * 0.49 * i / 8 - 3), fontid="small") + + st("右副翼 >", (w * 0.95, h * 0.81), fontid="tiny") + st(f"{dev['aile_right']}/{round(rdev['aile_right'], 1)}", (w * 0.955, h * 0.825), fontid="tiny") + st("右尾翼 >", (w * 0.95, h * 0.93), fontid="tiny") + st(f"{dev['tail_right']}/{round(rdev['tail_right'], 1)}", (w * 0.955, h * 0.915), fontid="tiny") + st("左副翼 >", (w * 0.92, h * 0.81), fontid="tiny") + st(f"{dev['aile_left']}/{round(rdev['aile_left'], 1)}", (w * 0.925, h * 0.825), fontid="tiny") + st("左尾翼 >", (w * 0.92, h * 0.93), fontid="tiny") + st(f"{dev['tail_left']}/{round(rdev['tail_left'], 1)}", (w * 0.925, h * 0.915), fontid="tiny") + st("转速", (w * 0.874, h * 0.923), fontid="tiny") + st(f"{str(dev['engine_main']).zfill(3)}", (w * 0.8745, h * 0.938), fontid="tiny") + st("空速", (w * 0.844, h * 0.923), fontid="tiny") + st(f"{str(sensors['speed']).zfill(3)}", (w * 0.8445, h * 0.938), fontid="tiny") + st("能量", (w * 0.814, h * 0.923), fontid="tiny") + st(f"{str(sensors['battery'] * 100).zfill(2)}%", (w * 0.8145, h * 0.938), fontid="tiny") +def netproc(): + while running: + update() + transmit("cmd", cmdgen()) + time.sleep(0.05) + +if __name__ == "__main__": + + vdist = 30 # 虚拟视距 + lw = 60 # 水平指示器距离 + vw = lw * 0.5 / 0.76 # 竖直指示器距离 + pg.init() + pg.display.set_caption(f"Azure 远程终端 - 连接到 {name}") + flamenum = 0 + colors = {"main":"white", "remote":"green", "warn":"red"} + fonts = {"medium":pg.font.Font('src/font.ttf', 20), "big":pg.font.Font('src/font.ttf', 40), "huge":pg.font.Font('src/font.ttf', 60), "small":pg.font.Font('src/font.ttf', 16), "tiny":pg.font.Font('src/font.ttf', 12)} + + h = 1080 + w = int(h / 9 * 16) + hp = 1080 + wp = int(h / 9 * 16) + + screen = pg.display.set_mode((w, h), RESIZABLE) + icon = pg.image.load("src/icon.ico").convert_alpha() + pg.display.set_icon(icon) + clock = pg.time.Clock() + running = True + pg.mouse.set_visible(False) + pg.mouse.set_pos([w / 2, h / 2]) + f_shootmode = 0 + + #welcome() + isfullscreen = 0 + mousepos = (w / 2, h / 2) + np = threading.Thread(target=netproc, name='NetworkProcessing') + np.start() + while running: + for event in pg.event.get(): + if event.type == pg.QUIT: + running = False + if event.type == pg.MOUSEMOTION: + #loggin.info(mousepos) + #loggin.info("鼠标: ", event.pos[0] - mousepos[0], event.pos[1] - mousepos[1]) + mousepos = event.pos + if event.type == pg.VIDEORESIZE: + h = event.size[1] + w = event.size[0] + screen = pg.display.set_mode((w, h), RESIZABLE) + flamenum += 1 + if flamenum % 2 == 1: + camera_refresh() + screen.blit(frame_surface, (0, 0)) + # 计算 + vangle = calc_vangle(mousepos[1]) + langle = calc_langle(mousepos[0]) + # 常态指示器 + sl((w * 0.08, h * 0.75), (w * 0.08, h * 0.25), 2) # 高度指示 + sl((w * 0.92, h * 0.75), (w * 0.92, h * 0.25), 2) # 高度指示 + sl((w * 0.97, h * 0.86), (w * 0.95, h * 0.86), 2) # 翼面指示 1 + sl((w * 0.97, h * 0.90), (w * 0.95, h * 0.90), 2) # 翼面指示 2 + sl((w * 0.94, h * 0.86), (w * 0.92, h * 0.86), 2) # 翼面指示 1 + sl((w * 0.94, h * 0.90), (w * 0.92, h * 0.90), 2) # 翼面指示 2 + sl((w * 0.88, h * 0.1), (w * 0.12, h * 0.1), 2) # 方向指示 + sl((w / 2 - 20, h / 2), (w / 2 + 20, h / 2), 1 if not f_shootmode else 3, 'main' if not f_shootmode else 'warn') # 准星指示 + sl((w / 2, h / 2 + 20), (w / 2, h / 2 - 20), 1 if not f_shootmode else 3, 'main' if not f_shootmode else 'warn') # 准星指示 + + for i in range(0, 9): + sl((w * 0.92, h * 0.25 + h * 0.5 * i / 8), (w * 0.91, h * 0.25 + h * 0.5 * i / 8), 2) # 高度指示 + sl((w * 0.08, h * 0.25 + h * 0.5 * i / 8), (w * 0.09, h * 0.25 + h * 0.5 * i / 8), 2) # 高度指示 + + for i in range(0, 13): + sl((w * 0.12 + w * 0.76 * i / 12, h * 0.1), (w * 0.12 + w * 0.76 * i / 12, h * 0.1 + w * 0.01), 2) # 水平指示 + + # 鼠标复位 + # pg.mouse.set_pos([w / 2, h / 2]) + + # 指示器 + dya1 = ((w * 0.97, h * 0.86 - w * 0.01 * tan(rad(vangle)) + w * 0.01 * tan(rad(langle))), + (w * 0.95, h * 0.86 + w * 0.01 * tan(rad(vangle)) - w * 0.01 * tan(rad(langle)))) + dya2 = ((w * 0.97, h * 0.90 - w * 0.01 * tan(rad(vangle)) + w * 0.01 * tan(rad(langle))), + (w * 0.95, h * 0.90 + w * 0.01 * tan(rad(vangle)) - w * 0.01 * tan(rad(langle)))) + + + sl(dya1[0], dya1[1], 2) # 翼面指示 1 + sl(dya2[0], dya2[1], 2) # 翼面指示 2 + sl(dya3[0], dya3[1], 2) # 翼面指示 1 + sl(dya4[0], dya4[1], 2) # 翼面指示 2 + + sl((w * 0.97, h * 0.86 + tan(rad(rdev['aile_left'])) * 0.01 * w), + (w * 0.92, h * 0.90 - tan(rad(rdev['tail_right'])) * 0.01 * w), 2, color="remote") # 翼面指示 1 + sl((w * 0.88, h * 0.92), (w * 0.88, h * 0.92 - dev['engine_main']), 20, color="main") # 转速指示 1 + sl((w * 0.82, h * 0.92), (w * 0.82, h * 0.92 - sensors["battery"] * 100), 16, color="remote") # 能量指示 1 + + dev['aile_left'] = calc_pair2angle(dya1) + dev['aile_right'] = calc_pair2angle(dya2) + dev['tail_left'] = calc_pair2angle(dya3) + dev['tail_right'] = calc_pair2angle(dya4) + + text_layer() + # 键盘事件 + keys = pg.key.get_pressed() + if keys[pg.K_ESCAPE]: + running = 0 + if keys[pg.K_r]: + pg.mouse.set_pos([w / 2, h / 2]) + if keys[pg.K_w]: + pg.mouse.set_pos([mousepos[0], mousepos[1] + 3]) + if keys[pg.K_s]: + pg.mouse.set_pos([mousepos[0], mousepos[1] - 3]) + if keys[pg.K_a]: + pg.mouse.set_pos([mousepos[0] - 3, mousepos[1]]) + if keys[pg.K_d]: + pg.mouse.set_pos([mousepos[0] + 3, mousepos[1]]) + if keys[pg.K_LSHIFT]: + dev['engine_main'] += 1 + if keys[pg.K_LCTRL]: + dev['engine_main'] -= 1 + if keys[pg.K_1]: + warn_layer("失速警告") + if keys[pg.K_F11]: + if not isfullscreen: + isfullscreen = True + w = wp + h = hp + SIZE = w, h = pg.display.list_modes()[0] + screen = pg.display.set_mode(SIZE, FULLSCREEN) + else: + isfullscreen = False + screen = pg.display.set_mode((w, h), RESIZABLE) + # 文字显示 + # debug + debug_ui() + pg.draw.circle(screen, colors['main' if not f_shootmode else 'warn'], mousepos, 2) + pg.draw.circle(screen, colors['main'], mousepos, 20, 3) if not f_shootmode else '' + # flip() the display to put your work on screen + pg.display.flip() + sl((w * 0.88, h * 0.92), (w * 0.88, h * 0.92 - dev['engine_main']), 20) # 转速指示 1 + + dt = clock.tick(60) / 1000 + + np.join() + pg.quit() diff --git a/testfield/legacy/client/legacy/client_gui_legacy.py b/testfield/legacy/client/legacy/client_gui_legacy.py new file mode 100644 index 0000000..4979ac0 --- /dev/null +++ b/testfield/legacy/client/legacy/client_gui_legacy.py @@ -0,0 +1,363 @@ +from os import environ +environ['PYGAME_HIDE_SUPPORT_PROMPT'] = '1' +import pygame +from pygame.locals import * +import cv2 +import client_lib as lib +import math +import time +import threading +import logging + +camera_enabled = 0 + +def camera_refresh(): + global frame_surface + if camera_enabled: + frame = camera.read()[1] + frame_rgb = cv2.resize(cv2.flip(cv2.rotate(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), cv2.ROTATE_90_CLOCKWISE), 1), (height, width)) + frame_surface = pygame.surfarray.make_surface(frame_rgb) + else: + frame_surface = pygame.Surface((width, height), pygame.SRCALPHA) + frame_surface.fill((0, 0, 0, 250)) # 将表面填充为完全透明 + +def welcome(): + for i in range(0, 120): + screen.fill("black") + text = fonts['huge'].render("Initialize Azure Network BVLOS Flighting Controlling System", True, colors['main'], None) + screen.blit(text, (width / 2 - text.get_width() / 2, height / 2 - 150)) + text = fonts['big'].render(f"Remote connection: {remote.name}", True, colors['main'], None) + screen.blit(text, (width / 2 - text.get_width() / 2, height / 2)) + pygame.display.flip() + clock.tick(60) +def cmdgen(): + global devices + cmd = "" + for i in devices.keys(): + word = "" + if remote.jdata["devices"][i]['type'] == "Flank": + word = "angle" + if remote.jdata["devices"][i]['type'] == "Engine": + word = "speed" + cmd += f"hw['{i}'].set_{word}({devices[i]})\n" + # loggin.info(cmd) + return cmd +def transmit(type, cmd): + msg = dict() + msg['type'] = type + msg['cmd'] = cmd + msg['time'] = time.asctime() + remote.jsend(msg) +def update(): + global remote_devices, sensors + remote.refresh() + types = {'Flank':'angle', 'Engine':'speed'} + for i in devices.keys(): + remote_devices[i] = remote.jdata['devices'][i][types[remote.jdata['devices'][i]['type']]] + for i in sensors.keys(): + sensors[i] = remote.jdata['sensors'][i] +def calc_pair2angle(dyna): + tan = (dyna[0][1] - dyna[1][1]) / (dyna[0][0] - dyna[1][0]) + return round(math.degrees(math.atan(tan)), 1) +def calc_langle(posx): + if (width / 2 - 30 < mousepos[0] < width / 2 + 30) and (height / 2 - 30 < mousepos[1] < height / 2 + 30): + global f_shootmode + f_shootmode = 1 + return 0 + f_shootmode = 0 + a = abs(posx - width / 2) / (width * 0.76) * lwidth # 底边 + b = vdist + angle_a = math.atan(a / b) + angle_a = math.degrees(angle_a) + if posx < width / 2: + angle_a = -angle_a + return angle_a +def calc_vangle(posy): + if (width / 2 - 30 < mousepos[0] < width / 2 + 30) and (height / 2 - 30 < mousepos[1] < height / 2 + 30): + global f_shootmode + f_shootmode = 1 + return 0 + f_shootmode = 0 + a = abs(posy - height / 2) / (height * 0.5) * vwidth # 底边 + b = vdist + angle_a = math.atan(a / b) + angle_a = math.degrees(angle_a) + if posy < height / 2: + angle_a = -angle_a + return angle_a +def calc_langle2(posx): + a = abs(posx - width / 2) / (width * 0.76) * lwidth # 底边 + b = vdist + angle_a = math.atan(a / b) + angle_a = math.degrees(angle_a) + if posx < width / 2: + angle_a = -angle_a + return angle_a +def calc_vangle2(posy): + a = abs(posy - height / 2) / (height * 0.5) * vwidth # 底边 + b = vdist + angle_a = math.atan(a / b) + angle_a = math.degrees(angle_a) + if posy < height / 2: + angle_a = -angle_a + return angle_a +def showtext(text, pos, color = 'main', fontid = "medium"): + db_text = fonts[fontid].render(text, 0 if fontid == "tiny" else 1, colors[color], None) + screen.blit(db_text, pos) +def showline(pos1, pos2, width = 2, color = 'main'): + pygame.draw.line(screen, colors[color], pos1, pos2, width) +def debug_ui(): + global sensors + showtext(f"远程方位角(Z): {sensors['zangle']}", (32, 844), fontid = "tiny") + showtext(f"远程滚动角(Y): {sensors['yangle']}", (32, 858), fontid = "tiny") + showtext(f"远程倾斜角(X): {sensors['xangle']}", (32, 872), fontid = "tiny") + showtext(f"本地滚动角(Y): {langle}", (32, 886), fontid = "tiny") + showtext(f"本地倾斜角(X): {vangle}", (32, 900), fontid = "tiny") + showtext(f"远程主机时间戳: {remote.jdata['time']}", (32, 914), fontid = "tiny") + showtext(f"航行灯状态: {'打开' if sensors['torch'] else '关闭'}", (32, 928), fontid = "tiny") + showtext(f"瞬时加速度: {sensors['acc']}", (32, 942), fontid = "tiny") + showtext(f"远程信号强度: {sensors['sign']}", (32, 956), fontid = "tiny") + showtext(f"远程 CPU 负载: {sensors['cpuload'] * 100} %", (32, 970), fontid = "tiny") + showtext(f"远程内存占用: {sensors['memload'] * 100} %", (32, 984), fontid = "tiny") + showtext(f"武装: 无", (32, 998), fontid = "tiny") + showtext(f"挂载: 无", (32, 1012), fontid = "tiny") +def warn_layer(text): + if 3 < flamenum % 15 < 12: + showtext(text, (width / 2 - fonts['huge'].render(text, True, colors['warn'], None).get_width() / 2, height / 2 - 150), color=colors['warn'], fontid="huge") +def text_layer(): + global sensors, remote_devices + showtext(f"远程主机: {remote.name}", (0, 0)) + showtext(f"虚拟视距: {vdist}cm", (0, 24)) + showtext(f"水平指示器宽度: {lwidth}cm", (0, 48)) + showtext(f"竖直指示器宽度: {round(vwidth)}cm", (0, 72)) + oriword = {0: "[N]", 90: "[E]", 180: "[S]", 270: "[W]"} + for i in range(0, 13): + showtext(f'{str(oriword[n] if (n := round(abs(calc_langle2(width * 0.12 + width * 0.76 * i / 12) + sensors["zangle"]))) in oriword.keys() else n).zfill(2)}', + (width * 0.12 + width * 0.76 * i / 12 - 12, height * 0.1 + width * 0.01)) + + for i in range(0, 9): + showtext(f'{str(round(abs(calc_vangle2(height * 0.25 + height * 0.5 * i / 8) - sensors["yangle"]))).zfill(2)}', + (width * 0.926, height * 0.25 + height * 0.49 * i / 8 - 3), fontid="small") + showtext(f'{str(round(abs(calc_vangle2(height * 0.25 + height * 0.5 * i / 8) - sensors["yangle"]))).zfill(2)}', + (width * 0.064, height * 0.25 + height * 0.49 * i / 8 - 3), fontid="small") + + showtext("右副翼 >", (width * 0.95, height * 0.81), fontid="tiny") + showtext(f"{devices['aile_right']}/{round(remote_devices['aile_right'], 1)}", (width * 0.955, height * 0.825), fontid="tiny") + showtext("右尾翼 >", (width * 0.95, height * 0.93), fontid="tiny") + showtext(f"{devices['tail_right']}/{round(remote_devices['tail_right'], 1)}", (width * 0.955, height * 0.915), fontid="tiny") + showtext("左副翼 >", (width * 0.92, height * 0.81), fontid="tiny") + showtext(f"{devices['aile_left']}/{round(remote_devices['aile_left'], 1)}", (width * 0.925, height * 0.825), fontid="tiny") + showtext("左尾翼 >", (width * 0.92, height * 0.93), fontid="tiny") + showtext(f"{devices['tail_left']}/{round(remote_devices['tail_left'], 1)}", (width * 0.925, height * 0.915), fontid="tiny") + showtext("转速", (width * 0.874, height * 0.923), fontid="tiny") + showtext(f"{str(devices['engine_main']).zfill(3)}", (width * 0.8745, height * 0.938), fontid="tiny") + showtext("空速", (width * 0.844, height * 0.923), fontid="tiny") + showtext(f"{str(sensors['speed']).zfill(3)}", (width * 0.8445, height * 0.938), fontid="tiny") + showtext("能量", (width * 0.814, height * 0.923), fontid="tiny") + showtext(f"{str(sensors['battery'] * 100).zfill(2)}%", (width * 0.8145, height * 0.938), fontid="tiny") +def netproc(): + while running: + update() + transmit("cmd", cmdgen()) + time.sleep(0.05) + +if __name__ == "__main__": + + vdist = 30 # 虚拟视距 + lwidth = 60 # 水平指示器距离 + vwidth = lwidth * 0.5 / 0.76 # 竖直指示器距离 + # 网络初始化 + remote = lib.Net(dest = "127.0.0.1", port = 40808) + # 数据初始化 + devices = dict() + vangle = None + langle = None + for i in remote.jdata['devices'].keys(): + devices[i] = 0 + remote_devices = dict() + for i in remote.jdata['devices'].keys(): + remote_devices[i] = 0 + sensors = dict() + for i in remote.jdata['sensors'].keys(): + sensors[i] = 0 + name = remote.name + # 摄像头初始化 + if camera_enabled: + camera = cv2.VideoCapture(1) + # 图形初始化 + pygame.init() + pygame.display.set_caption(f"Azure 远程终端 - 连接到 {name}") + flamenum = 0 + colors = {"main":"white", "remote":"green", "warn":"red"} + fonts = {"medium":pygame.font.Font('src/font.ttf', 20), "big":pygame.font.Font('src/font.ttf', 40), "huge":pygame.font.Font('src/font.ttf', 60), "small":pygame.font.Font('src/font.ttf', 16), "tiny":pygame.font.Font('src/font.ttf', 12)} + + height = 1080 + width = int(height / 9 * 16) + heightp = 1080 + widthp = int(height / 9 * 16) + + screen = pygame.display.set_mode((width, height), RESIZABLE) + icon = pygame.image.load("src/icon.ico").convert_alpha() + pygame.display.set_icon(icon) + clock = pygame.time.Clock() + running = True + pygame.mouse.set_visible(False) + pygame.mouse.set_pos([width / 2, height / 2]) + f_shootmode = 0 + + #welcome() + isfullscreen = 0 + camera_refresh() + mousepos = (width / 2, height / 2) + np = threading.Thread(target=netproc, name='NetworkProcessing') + np.start() + while running: + for event in pygame.event.get(): + if event.type == pygame.QUIT: + running = False + if event.type == pygame.MOUSEMOTION: + #loggin.info(mousepos) + #loggin.info("鼠标: ", event.pos[0] - mousepos[0], event.pos[1] - mousepos[1]) + mousepos = event.pos + if event.type == pygame.VIDEORESIZE: + height = event.size[1] + width = event.size[0] + screen = pygame.display.set_mode((width, height), RESIZABLE) + flamenum += 1 + if flamenum % 2 == 1: + camera_refresh() + screen.blit(frame_surface, (0, 0)) + # 计算 + vangle = calc_vangle(mousepos[1]) + langle = calc_langle(mousepos[0]) + # 常态指示器 + showline((width * 0.08, height * 0.75), (width * 0.08, height * 0.25), 2) # 高度指示 + showline((width * 0.92, height * 0.75), (width * 0.92, height * 0.25), 2) # 高度指示 + showline((width * 0.97, height * 0.86), (width * 0.95, height * 0.86), 2) # 翼面指示 1 + showline((width * 0.97, height * 0.90), (width * 0.95, height * 0.90), 2) # 翼面指示 2 + showline((width * 0.94, height * 0.86), (width * 0.92, height * 0.86), 2) # 翼面指示 1 + showline((width * 0.94, height * 0.90), (width * 0.92, height * 0.90), 2) # 翼面指示 2 + showline((width * 0.88, height * 0.1), (width * 0.12, height * 0.1), 2) # 方向指示 + showline((width / 2 - 20, height / 2), (width / 2 + 20, height / 2), 1 if not f_shootmode else 3, 'main' if not f_shootmode else 'warn') # 准星指示 + showline((width / 2, height / 2 + 20), (width / 2, height / 2 - 20), 1 if not f_shootmode else 3, 'main' if not f_shootmode else 'warn') # 准星指示 + + for i in range(0, 9): + showline((width * 0.92, height * 0.25 + height * 0.5 * i / 8), (width * 0.91, height * 0.25 + height * 0.5 * i / 8), 2) # 高度指示 + showline((width * 0.08, height * 0.25 + height * 0.5 * i / 8), (width * 0.09, height * 0.25 + height * 0.5 * i / 8), 2) # 高度指示 + + for i in range(0, 13): + showline((width * 0.12 + width * 0.76 * i / 12, height * 0.1), (width * 0.12 + width * 0.76 * i / 12, height * 0.1 + width * 0.01), 2) # 水平指示 + + # 鼠标复位 + # pygame.mouse.set_pos([width / 2, height / 2]) + + # 指示器 + dyna1 = ((width * 0.97, height * 0.86 - width * 0.01 * math.tan(math.radians(vangle)) + width * 0.01 * math.tan(math.radians(langle))), + (width * 0.95, height * 0.86 + width * 0.01 * math.tan(math.radians(vangle)) - width * 0.01 * math.tan(math.radians(langle)))) + dyna2 = ((width * 0.97, height * 0.90 - width * 0.01 * math.tan(math.radians(vangle)) + width * 0.01 * math.tan(math.radians(langle))), + (width * 0.95, height * 0.90 + width * 0.01 * math.tan(math.radians(vangle)) - width * 0.01 * math.tan(math.radians(langle)))) + dyna3 = ((width * 0.94, height * 0.86 - width * 0.01 * math.tan(math.radians(vangle)) - width * 0.01 * math.tan(math.radians(langle))), + (width * 0.92, height * 0.86 + width * 0.01 * math.tan(math.radians(vangle)) + width * 0.01 * math.tan(math.radians(langle)))) + dyna4 = ((width * 0.94, height * 0.90 - width * 0.01 * math.tan(math.radians(vangle)) - width * 0.01 * math.tan(math.radians(langle))), + (width * 0.92, height * 0.90 + width * 0.01 * math.tan(math.radians(vangle)) + width * 0.01 * math.tan(math.radians(langle)))) + + + showline(dyna1[0], dyna1[1], 2) # 翼面指示 1 + showline(dyna2[0], dyna2[1], 2) # 翼面指示 2 + showline(dyna3[0], dyna3[1], 2) # 翼面指示 1 + showline(dyna4[0], dyna4[1], 2) # 翼面指示 2 + + showline((width * 0.97, height * 0.86 + math.tan(math.radians(remote_devices['aile_left'])) * 0.01 * width), + (width * 0.95, height * 0.86 - math.tan(math.radians(remote_devices['aile_left'])) * 0.01 * width), 2, color="remote") # 翼面指示 1 + showline((width * 0.97, height * 0.90 + math.tan(math.radians(remote_devices['aile_right'])) * 0.01 * width), + (width * 0.95, height * 0.90 - math.tan(math.radians(remote_devices['aile_right'])) * 0.01 * width), 2, color="remote") # 翼面指示 1 + showline((width * 0.94, height * 0.86 + math.tan(math.radians(remote_devices['tail_left'])) * 0.01 * width), + (width * 0.92, height * 0.86 - math.tan(math.radians(remote_devices['tail_left'])) * 0.01 * width), 2, color="remote") # 翼面指示 1 + showline((width * 0.94, height * 0.90 + math.tan(math.radians(remote_devices['tail_right'])) * 0.01 * width), + (width * 0.92, height * 0.90 - math.tan(math.radians(remote_devices['tail_right'])) * 0.01 * width), 2, color="remote") # 翼面指示 1 + showline((width * 0.88, height * 0.92), (width * 0.88, height * 0.92 - devices['engine_main']), 20, color="main") # 转速指示 1 + showline((width * 0.88, height * 0.92), (width * 0.88, height * 0.92 - remote_devices['engine_main']), 16, color="remote") # 转速指示 2 + showline((width * 0.849, height * 0.92), (width * 0.849, height * 0.92 - sensors["speed"]), 20, color="remote") # 空速指示 1 + showline((width * 0.82, height * 0.92), (width * 0.82, height * 0.92 - 100), 20, color="main") # 能量指示 2 + showline((width * 0.82, height * 0.92), (width * 0.82, height * 0.92 - sensors["battery"] * 100), 16, color="remote") # 能量指示 1 + + devices['aile_left'] = calc_pair2angle(dyna1) + devices['aile_right'] = calc_pair2angle(dyna2) + devices['tail_left'] = calc_pair2angle(dyna3) + devices['tail_right'] = calc_pair2angle(dyna4) + + text_layer() + # 键盘事件 + keys = pygame.key.get_pressed() + if keys[pygame.K_ESCAPE]: + running = 0 + if keys[pygame.K_r]: + pygame.mouse.set_pos([width / 2, height / 2]) + if keys[pygame.K_w]: + pygame.mouse.set_pos([mousepos[0], mousepos[1] + 3]) + if keys[pygame.K_s]: + pygame.mouse.set_pos([mousepos[0], mousepos[1] - 3]) + if keys[pygame.K_a]: + pygame.mouse.set_pos([mousepos[0] - 3, mousepos[1]]) + if keys[pygame.K_d]: + pygame.mouse.set_pos([mousepos[0] + 3, mousepos[1]]) + if keys[pygame.K_LSHIFT]: + devices['engine_main'] += 1 + if keys[pygame.K_LCTRL]: + devices['engine_main'] -= 1 + if keys[pygame.K_1]: + warn_layer("失速警告") + if keys[pygame.K_2]: + warn_layer("失控") + if keys[pygame.K_3]: + warn_layer("通讯异常") + if keys[pygame.K_4]: + warn_layer("不明硬件损毁") + if keys[pygame.K_5]: + warn_layer("紧急重启") + if keys[pygame.K_6]: + warn_layer("紧急停机") + if keys[pygame.K_7]: + warn_layer("紧急关闭") + if keys[pygame.K_8]: + warn_layer("紧急过载") + if keys[pygame.K_9]: + warn_layer("操纵面失控") + if keys[pygame.K_0]: + warn_layer("引擎失控") + if keys[pygame.K_F11]: + if not isfullscreen: + isfullscreen = True + width = widthp + height = heightp + SIZE = width, height = pygame.display.list_modes()[0] + screen = pygame.display.set_mode(SIZE, FULLSCREEN) + else: + isfullscreen = False + screen = pygame.display.set_mode((width, height), RESIZABLE) + if keys[pygame.K_b]: + colors["main"] = "white" + if keys[pygame.K_n]: + colors["main"] = "blue" + if keys[pygame.K_m]: + colors["main"] = "green" + if keys[pygame.K_v]: + colors["main"] = "red" + # 文字显示 + # debug + debug_ui() + pygame.draw.circle(screen, colors['main' if not f_shootmode else 'warn'], mousepos, 2) + pygame.draw.circle(screen, colors['main'], mousepos, 20, 3) if not f_shootmode else '' + # flip() the display to put your work on screen + pygame.display.flip() + showline((width * 0.88, height * 0.92), (width * 0.88, height * 0.92 - devices['engine_main']), 20) # 转速指示 1 + showline((width * 0.88, height * 0.92), (width * 0.88, height * 0.92 - remote_devices['engine_main']), 16, color="remote") # 转速指示 2 + showline((width * 0.849, height * 0.92), (width * 0.849, height * 0.92 - sensors['speed']), 20, color="remote") # 空速指示 2 + showline((width * 0.88, height * 0.92), (width * 0.88, height * 0.92 - devices['engine_main']), 20) # 转速指示 1 + showline((width * 0.88, height * 0.92), (width * 0.88, height * 0.92 - remote_devices['engine_main']), 16, color="remote") # 转速指示 2 + showline((width * 0.849, height * 0.92), (width * 0.849, height * 0.92 - sensors['speed']), 20, color="remote") # 空速指示 2 + + dt = clock.tick(60) / 1000 + + np.join() + pygame.quit() diff --git a/testfield/legacy/client/legacy/client_lib.py b/testfield/legacy/client/legacy/client_lib.py new file mode 100644 index 0000000..b9fecbd --- /dev/null +++ b/testfield/legacy/client/legacy/client_lib.py @@ -0,0 +1,49 @@ +import socket +import json +import logging + +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +class Net(object): + data = None + jdata = None + name = None + isfirst = 0 + + def __init__(self, dest="127.0.0.1", port=40808): + logging.info("Start remote host communcation") + self.dest = dest + self.port = port + self.remote = socket.socket() + self.remote.connect((dest, port)) + self.data: str = self.remote.recv(2048).decode("UTF-8") + self.jdata = json.loads(self.data) + first = json.loads(self.data) + self.name = first["name"] + logging.info(f"Connected to {self.name}") + self.isfirst = 1 + + def send(self, msg): + size = len(msg.encode()) + self.remote.send(msg.encode("UTF-8").ljust(2048 - size)) + self.jdata = json.loads(self.data) + + def recv(self): + self.data: str = self.remote.recv(2048).decode("UTF-8") + self.jdata = json.loads(self.data) + return self.data + + def jsend(self, msg): + msg = json.dumps(msg) # 状态回传 + size = len(msg.encode()) + self.remote.send(msg.encode("UTF-8").ljust(2048 - size)) + self.jdata = json.loads(self.data) + print("发送",msg) + + def refresh(self): + if self.isfirst == 1: + self.isfirst = 0 + return self.jdata + self.data: str = self.remote.recv(2048).decode("UTF-8") + self.jdata = json.loads(self.data.encode("UTF-8")) + return diff --git a/testfield/legacy/client/legacy/src/font.ttf b/testfield/legacy/client/legacy/src/font.ttf new file mode 100644 index 0000000..0eae4f0 Binary files /dev/null and b/testfield/legacy/client/legacy/src/font.ttf differ diff --git a/testfield/legacy/client/legacy/src/icon.ico b/testfield/legacy/client/legacy/src/icon.ico new file mode 100644 index 0000000..900ddad Binary files /dev/null and b/testfield/legacy/client/legacy/src/icon.ico differ diff --git a/testfield/legacy/client/lib/graph.py b/testfield/legacy/client/lib/graph.py new file mode 100644 index 0000000..4e796df --- /dev/null +++ b/testfield/legacy/client/lib/graph.py @@ -0,0 +1,81 @@ +import pygame +import math + +class Line: + def __init__(self, screen, method, pos, size, maincolor, bgcolor): + self.screen = screen + self.method = method + self.start_pos = pos + self.length = size + self.color = maincolor + self.bgcolor = bgcolor + self.angle = 0 # Angle in degrees + self.end_pos = self.calculate_end_pos() + + def calculate_end_pos(self): + """Calculate the end position of the line based on the angle and length.""" + x_offset = self.length * math.cos(math.radians(self.angle)) + y_offset = self.length * math.sin(math.radians(self.angle)) + return (self.start_pos[0] + x_offset, self.start_pos[1] + y_offset) + + def rotate(self, degree): + """Rotate the line around its start position.""" + self.angle = (self.angle + degree) % 360 + self.end_pos = self.calculate_end_pos() + + def resize(self, new_length): + """Change the length of the line.""" + self.length = new_length + self.end_pos = self.calculate_end_pos() + + def recolor(self, color): + """Change the color permanently.""" + self.color = color + + def recolor_tmp(self, color, time_): + """Change the color temporarily.""" + original_color = self.color + self.color = color + pygame.time.set_timer(pygame.USEREVENT, time_ * 1000) # Set a timer for the specified time + + # Event handling for reverting color + def revert_color(event): + if event.type == pygame.USEREVENT: + self.color = original_color + pygame.time.set_timer(pygame.USEREVENT, 0) # Stop the timer + + return revert_color + + def move(self, degree, length): + """Move the line by a certain degree and length.""" + self.start_pos = (self.start_pos[0] + length * math.cos(math.radians(degree)), + self.start_pos[1] + length * math.sin(math.radians(degree))) + self.end_pos = self.calculate_end_pos() + + def show(self): + """Draw the line on the screen.""" + pygame.draw.line(self.screen, self.color, self.start_pos, self.end_pos, 2) + + def hide(self): + """Clear the line from the screen by drawing over it with the background color.""" + pygame.draw.line(self.screen, self.bgcolor, self.start_pos, self.end_pos, 2) + +# Example usage: +pygame.init() +screen = pygame.display.set_mode((800, 600)) +line = Line(screen, 'method', (400, 300), 100, (255, 0, 0), (0, 0, 0)) + +running = True +while running: + for event in pygame.event.get(): + if event.type == pygame.QUIT: + running = False + elif event.type == pygame.USEREVENT: + # Handle color revert event + line.recolor_tmp((0, 255, 0), 2) # Example of temporary recoloring + + screen.fill((0, 0, 0)) # Clear the screen + line.show() # Draw the line + pygame.display.flip() # Update the display + +pygame.quit() diff --git a/testfield/legacy/client/mininf.json b/testfield/legacy/client/mininf.json new file mode 100644 index 0000000..61eddbc --- /dev/null +++ b/testfield/legacy/client/mininf.json @@ -0,0 +1,9 @@ +{ + "vel":0, + "acc":0, + "xangle":0, + "yangle":0, + "zangle":0 + "time":0 + +} \ No newline at end of file diff --git a/testfield/legacy/client/modules/__init__.py b/testfield/legacy/client/modules/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/testfield/legacy/client/modules/builtin/hud.py b/testfield/legacy/client/modules/builtin/hud.py new file mode 100644 index 0000000..7ed9b1c --- /dev/null +++ b/testfield/legacy/client/modules/builtin/hud.py @@ -0,0 +1,14 @@ +from os import environ +import time +environ['PYGAME_HIDE_SUPPORT_PROMPT'] = '1' +import pygame as sdl + +name = "Basic HUD Framework" + +def action(): + + +if __name__ == "__main__": + sdl.init() + sdl.display.set_caption(f"Commodore UI Element Test Window") + screen = sdl.display.set_mode((1024, 768), sdl.RESIZABLE) \ No newline at end of file diff --git a/testfield/legacy/client/src/font.ttf b/testfield/legacy/client/src/font.ttf new file mode 100644 index 0000000..0eae4f0 Binary files /dev/null and b/testfield/legacy/client/src/font.ttf differ diff --git a/testfield/legacy/client/src/icon.ico b/testfield/legacy/client/src/icon.ico new file mode 100644 index 0000000..900ddad Binary files /dev/null and b/testfield/legacy/client/src/icon.ico differ diff --git a/testfield/legacy/docs/example.json b/testfield/legacy/docs/example.json new file mode 100644 index 0000000..f68f380 --- /dev/null +++ b/testfield/legacy/docs/example.json @@ -0,0 +1,58 @@ +{ + "devices": { + "aile_left": { + "type": "Flank", + "name": "左副翼", + "angle": 0 + }, + "aile_right": { + "type": "Flank", + "name": "右副翼", + "angle": 0 + }, + "flap_left": { + "type": "Flank", + "name": "左襟翼", + "angle": 0 + }, + "flap_right": { + "type": "Flank", + "name": "右襟翼", + "angle": 0 + }, + "tail_vert": { + "type": "Flank", + "name": "垂直尾翼", + "angle": 0 + }, + "tail_left": { + "type": "Flank", + "name": "左尾翼", + "angle": 0 + }, + "tail_right": { + "type": "Flank", + "name": "右尾翼", + "angle": 0 + }, + "engine_main": { + "type": "Engine", + "name": "主引擎", + "speed": 0 + } + }, + "name": "零号侧卫", + "sensors": { + "speed": 99, + "battery": 0.8, + "xangle": 201, + "yangle": 108, + "zangle": 0, + "sign": 322, + "cpuload": 0.12, + "memload": 0.88, + "memsum": 2048, + "torch": 0, + "acc": 3 + } +} \ No newline at end of file diff --git a/testfield/legacy/docs/pos.jpg b/testfield/legacy/docs/pos.jpg new file mode 100644 index 0000000..f1f666a Binary files /dev/null and b/testfield/legacy/docs/pos.jpg differ diff --git a/testfield/legacy/hal/arduino/subbot.ino b/testfield/legacy/hal/arduino/subbot.ino new file mode 100644 index 0000000..25ba6a3 --- /dev/null +++ b/testfield/legacy/hal/arduino/subbot.ino @@ -0,0 +1,112 @@ +#include +#include +#include +#include +#include +#include + +constexpr short svo_num = 2; +constexpr short svo_dig_pin[svo_num] = {9, 10}; +Servo svo[svo_num]; // al, ar, tl, tr +short svo_angle[svo_num]; +std::map svo_name2idx = {{"al", 0}, {"ar", 1}}; + +constexpr short eng_num = 1; +constexpr short eng_dig_pin[eng_num] = {11}; +Servo eng[eng_num]; // al, ar, tl, tr +short eng_speed[eng_num]; +std::map eng_name2idx = {{"e1", 0}}; +JSONVar data; + +void svo_set(short idx, short angle){ + svo[idx].write(angle); + svo_angle[idx] = angle; + delay(15); +} + +short svo_get(short idx){ + return svo_angle[idx]; +} + +void svo_pnt_by_name(String name){ + Serial.print("{\"" + name + "\":"); + Serial.print(svo_angle[svo_name2idx[name]]); + Serial.print("}\r\n"); +} + +void svo_set_by_name(String name, short num){ + svo_set(svo_name2idx[name], num); +} + +void eng_set(short idx, short speed){ + eng[idx].writeMicroseconds(speed); + eng_speed[idx] = speed; + delay(15); +} + +short eng_get(short idx){ + return eng_speed[idx]; +} + +void eng_pnt_by_name(String name){ + Serial.print("{\"" + name + "\":"); + Serial.print(eng_speed[eng_name2idx[name]]); + Serial.print("}\r\n"); +} + +void eng_set_by_name(String name, short num){ + eng_set(eng_name2idx[name], num); +} + +void setup() { + for(short i = 0; i < svo_num; ++i){ + svo[i].attach(svo_dig_pin[i]); + } + for(short i = 0; i < svo_num; ++i){ + svo_set(i, 0); + } + for(short i = 0; i < eng_num; ++i){ + eng[i].attach(eng_dig_pin[i]); + } + for(short i = 0; i < eng_num; ++i){ + eng_set(i, 0); + } + Serial.begin(9600); + //Serial.println("\r\n"); +} +void handle(String type, String device, String option, short num){ + if(type == "svo"){ + if(option == "q"){ + svo_pnt_by_name(device); + } + if(option == "s"){ + svo_set_by_name(device, num); + } + } + if(type == "eng"){ + if(option == "q"){ + eng_pnt_by_name(device); + } + if(option == "s"){ + eng_set_by_name(device, num); + } + } +} + +void loop() { + //Serial.print("hihihi\r\n"); + //Serial.print("\r\n"); + while(Serial.available() == 0); + data = JSON.parse(Serial.readString()); + if(JSON.typeof(data) == "undefined") { + Serial.print("json_failed\r\n"); + return; + } + handle(data["type"], data["device"], data["option"], data["num"]); + /* + data["t(ype)"]: svo/eng + data["d(evice)"]: al, ar, tl, tr + data["o(ption)"]: s(et)/q(uery) + data["n(um)"]: SHORT + */ +} diff --git a/testfield/legacy/server/arduino_api/arduino_api.py b/testfield/legacy/server/arduino_api/arduino_api.py new file mode 100644 index 0000000..8f6028e --- /dev/null +++ b/testfield/legacy/server/arduino_api/arduino_api.py @@ -0,0 +1,91 @@ +import json +import serial +import serial.tools.list_ports +import time + +data_table = dict() + +class Motor(object): + angle = None + def __init__(self, code): + self.angle = 0 + self.code = code + data_table[code] = self.angle + def get(self): + self.angle = data_table[self.code] + return self.angle + def set(self, angle): + self.angle = angle + commander("svo", self.code, "s", angle) + return self.angle + +class Engine(object): + speed = None + def __init__(self, code): + self.speed = 0 + self.code = code + data_table[code] = self.speed + def get(self): + self.speed = data_table[self.code] + return self.speed + def tune(self, new_speed): + self.speed = new_speed + commander("eng", self.code, "s", new_speed) + return self.speed + +class Battery: + def get(): + power_left = 0.9 + return power_left + def stat(): + status = "Unplugged" + return status + +class Serial: + ino = serial.Serial("COM7", 9600, timeout=2) + if ino.is_open: + print("串口初始化成功") + def close(): + Serial.ino.close() + def write(data): + n = Serial.ino.write(data.encode()) + print(f"写入 {n} 字节", data.encode()) + def readln(): + data = Serial.ino.readline() + print(f"读入: {data.decode('utf-8', 'ignore')}\n") + return data.decode('utf-8', 'ignore') + +def commander(type, device, option, num = -1): + d = f""""type":"{type}", "device":"{device}", "option":"{option}", "num":{num}""" + d = "{" + d + "}" + Serial.write(d) + if option == "q": + b = Serial.readln() + if b.strip() != "": + print("BIS", b) + b = json.loads(b) + for i in b.keys(): + data_table[i] = b[i] + +def debug(): + ports_list = list(serial.tools.list_ports.comports()) + if len(ports_list) <= 0: + print("无串口设备。") + else: + print("可用的串口设备如下:") + for comport in ports_list: + print(list(comport)[0], list(comport)[1]) + #Serial.init() + #time.sleep(1) + #print("hi",a) + #Serial.readln() + Serial.close() + """/* + data["t(ype)"]: svo/eng + data["d(evice)"]: al, ar, tl, tr + data["o(ption)"]: s(et)/q(uery) + data["n(um)"]: SHORT + */""" + +if __name__ == "__main__": + debug() \ No newline at end of file diff --git a/testfield/legacy/server/config.yaml b/testfield/legacy/server/config.yaml new file mode 100644 index 0000000..1347700 --- /dev/null +++ b/testfield/legacy/server/config.yaml @@ -0,0 +1,14 @@ +# Host Config +model: "SU27" # J-11/J-16 +name: "零号侧卫" +hardware: # 硬件 + surfaces: # 控制面 + - al: "左副翼" + - ar: "右副翼" + - tv: "垂直尾翼" + - tl: "左尾翼" + - tr: "右尾翼" + engines: # 引擎 + - e1: "一号引擎" + bools: # 布尔式硬件 + - l1: "航行灯" \ No newline at end of file diff --git a/testfield/legacy/server/config/su27.yaml b/testfield/legacy/server/config/su27.yaml new file mode 100644 index 0000000..e69de29 diff --git a/testfield/legacy/server/dummy_all/dummy_api.py b/testfield/legacy/server/dummy_all/dummy_api.py new file mode 100644 index 0000000..51affb2 --- /dev/null +++ b/testfield/legacy/server/dummy_all/dummy_api.py @@ -0,0 +1,33 @@ +class Motor(object): + angle = None + def __init__(self, id): + self.angle = 0 # mark + self.id = id + def get(self): + return self.angle + def turn(self, add_angle): + self.angle += add_angle + return self.angle +class Engine(object): + speed = None + def __init__(self, id): + self.speed = 0 # Mark + self.id = id + def get(self): + return self.speed + def tune(self, new_speed): + self.speed = new_speed + return self.speed + +class Battery: + def get(): + power_left = 0.9 + return power_left + def stat(): + status = "Unplugged" + return status + +class Network: + def stat(): + delay = 0.2 # ms + return delay # or -1 (unreachable) \ No newline at end of file diff --git a/testfield/legacy/server/dummy_all/dummy_lib.py b/testfield/legacy/server/dummy_all/dummy_lib.py new file mode 100644 index 0000000..2e0cb9b --- /dev/null +++ b/testfield/legacy/server/dummy_all/dummy_lib.py @@ -0,0 +1,201 @@ +import dummy_api as ctrl +import time +import os +import threading +class Surface(object): + motor = None + name = None + curr_angle = None + id = None + type_ = "surface" + def __init__(self, name, id): + self.motor = ctrl.Motor(id) + self.curr_angle = self.motor.get() + self.name = name + self.id = id + print("初始化操纵面 {}, 硬件代号绑定为 {}".format(name, id)) + def add_angle(self, angle): + self.motor.turn(angle) + self.curr_angle = self.motor.get(); + #print("将 {} 的角度加成 {} 度".format(self.name, angle)) + def set_angle(self, angle): + angle_turn = angle - self.motor.get() + self.motor.turn(angle_turn) + self.curr_angle = self.motor.get() + #print("将 {} 的角度设置为 {} 度".format(self.name, angle)) + def get_angle(self): + self.curr_angle = self.motor.get() + #print("{} 当前角度为 {} 度".format(self.name, self.curr_angle)) + return self.curr_angle + def status(self): + return {"type":"Surface", "name":self.name, "angle":self.curr_angle} + def selfchk(self): + print("开始自检操纵面 {}".format(self.name)) + self.add_angle(30) + self.add_angle(-60) + self.set_angle(0) + +class Engine(object): + engine = None + curr_speed = None + id = None + name = None + type_ = "engine" + def __init__(self, name, id): + self.id = id + self.name = name + self.engine = ctrl.Engine(id) + self.curr_speed = self.engine.get() + print("初始化引擎 {}, 硬件代号绑定为 {}".format(name, id)) + def set_speed(self, new_speed): + self.engine.tune(new_speed) + self.curr_speed = self.engine.get() + ##print("将 {} 的转速调谐为 {} RPM".format(self.name, new_speed)) + def add_speed(self, add_speed): + self.engine.tune(self.curr_speed + add_speed) + self.curr_speed = self.engine.get() + ##print("将 {} 的转速加成 {} RPM".format(self.name, add_speed)) + def get_speed(self): + ##print("{} 当前转速为 {} RPM".format(self.name, self.curr_speed)) + self.curr_speed = self.engine.get() + return self.curr_speed + def status(self): + ##print(self.curr_speed) + return {"type":"Engine", "name":self.name, "speed":self.get_speed()} + def selfchk(self): + print("开始自检引擎 {}".format(self.name)) + self.add_speed(3) + self.add_speed(-6) + self.set_speed(0) + self.curr_speed = self.engine.get() + + +class Sensor: + data = dict() + data["speed"] = 0 + data["battery"] = 0 + data["xangle"] = 0 + data["yangle"] = 0 + data["zangle"] = 0 + data["sign"] = 0 + data["cpuload"] = 0 + data["memload"] = 0 + data["memsum"] = 0 + data["torch"] = 0 + data["acc"] = 0 + is_torch_on = 0 + + @staticmethod + def speed(): + Sensor.data["speed"] = 99 + return Sensor.data["speed"] + + @staticmethod + def battery(): + Sensor.data["battery"] = 0.8 + return Sensor.data["battery"] + + @staticmethod + def xangle(): + Sensor.data["xangle"] = 201 + return Sensor.data["xangle"] + + @staticmethod + def yangle(): + Sensor.data["yangle"] = 108 + return Sensor.data["yangle"] + + @staticmethod + def zangle(): + Sensor.data["zangle"] = 0 + return Sensor.data["zangle"] + + @staticmethod + def sign(): + Sensor.data["sign"] = 322 + return Sensor.data["sign"] + + @staticmethod + def cpuload(): + Sensor.data["cpuload"] = 0.12 + return Sensor.data["cpuload"] + + @staticmethod + def memload(): + Sensor.data["memload"] = 0.88 + return Sensor.data["memload"] + + @staticmethod + def memsum(): + Sensor.data["memsum"] = 2048 + return Sensor.data["memsum"] + + @staticmethod + def torch(): + Sensor.data["torch"] = Sensor.is_torch_on + return Sensor.data["torch"] + + @staticmethod + def torchon(): + Sensor.is_torch_on = 1 + + @staticmethod + def acc(): + Sensor.data["acc"] = 3 + return Sensor.data["acc"] + + @staticmethod + def net_delay(): + Sensor.data["net_delay"] = 300 + return Sensor.data["net_delay"] + + @staticmethod + def refresh(): + Sensor.speed() + Sensor.battery() + Sensor.xangle() + Sensor.yangle() + Sensor.zangle() + Sensor.sign() + Sensor.cpuload() + Sensor.memload() + Sensor.memsum() + Sensor.torch() + Sensor.acc() + Sensor.net_delay() + + @staticmethod + def stat(): + #Sensor.refresh() + return Sensor.data + + def pseudo_gui(): + import tkinter as tk + def update_label(key, value): + labels[key].config(text=f"{key}: {value}") + def update(): + while 1: + for key in Sensor.data.keys(): + Sensor.data[key] = sliders[key].get() + root = tk.Tk() + root.title("DEBUGGING CONSOLE") + + labels = {} + sliders = {} + + # 自动布局 + for key in Sensor.data.keys(): + # 创建标签 + label = tk.Label(root, text=f"{key}: {Sensor.data[key]}") + label.pack() + labels[key] = label + + # 创建滑块 + slider = tk.Scale(root, from_=0, to=100, orient=tk.HORIZONTAL, command=lambda value, k=key: update_label(k, value)) + slider.pack() + sliders[key] = slider + + up = threading.Thread(target=update, name='Update') + up.start() + root.mainloop() + up.join() diff --git a/testfield/legacy/server/host.py b/testfield/legacy/server/host.py new file mode 100644 index 0000000..c56b7b3 --- /dev/null +++ b/testfield/legacy/server/host.py @@ -0,0 +1,163 @@ +import yaml + +__dummy_mode__ = 1 +cfg = None +if not __dummy_mode__: + import sys, os + sys.path.append("arduino_api") + sys.path.append("termux_lib") + import termux_lib as lib + import socket + import json + import os + import time + import threading +else: + import sys + sys.path.append("dummy_all") + import dummy_lib as lib + import socket + import json + import os + import time + import threading + + def hwinit(): + global hw + hw = dict() + hw["aile_left"] = lib.Flank("左副翼", "al") + hw["aile_right"] = lib.Flank("右副翼", "ar") + #hw["tail_vert"] = lib.Flank("垂直尾翼", "tv") + hw["tail_left"] = lib.Flank("左尾翼", "tl") + hw["tail_right"] = lib.Flank("右尾翼", "tr") + hw["engine_main"] = lib.Engine("一号引擎", "e1") + + + def check(): + for i in hw.values(): + i.selfchk() + print("操纵面与引擎自检完成, 等待远程指令") + + def proc(data): + global hw + #print(data) + inf = json.loads(data.replace('\n', ';')) + print("DT"+data + "DT") + if inf["type"] == "cmd": + #print(f"执行命令") + exec(inf["cmd"]) + elif inf["type"] == "syscmd": + #print(f"执行系统级命令") + os.system(inf["cmd"]) + elif inf["type"] == "reset": + pass + #print(f"系统重置") + elif inf["type"] == "stop": + #print(f"退出系统") + return 1 + else: + pass + #print(f"未知命令") + return 0 + global isx + isx = 0 + def stat(): + global isx + status = dict() + status["devices"] = dict() + for i in hw.items(): + status["devices"][i[0]] = i[1].status() + status["name"] = name + status["sensors"] = lib.Sensor.stat() + status["time"] = time.asctime() + isx+=1 + return status + def debug_shell(): + lib.Sensor.pseudo_gui() + +def init_hardware(): + global hwobj + hwobj = dict() + print("注册传感器") + for i in hw["surfaces"]: + hwobj[i.key] = lib.Surface(i.value, i.key) + print("注册操纵面") + for i in hw["surfaces"]: + hwobj[i.key] = lib.Surface(i.value, i.key) + print("注册引擎") + for i in hw["engines"]: + hwobj[i.key] = lib.Engine(i.value, i.key) + print("注册机炮") + for i in hw["cannons"]: + hwobj[i.key] = lib.Engine(i.value, i.key) + print("注册供能系统") + for i in hw["powersys"]: + hwobj[i.key] = lib.Engine(i.value, i.key) + print("注册机轮制动") + for i in hw["powersys"]: + hwobj[i.key] = lib.Engine(i.value, i.key) + print("注册雷达") + for i in hw["powersys"]: + hwobj[i.key] = lib.Engine(i.value, i.key) + print("注册挂架") + for i in hw[""]: + hwobj[i.key] = lib.Engine(i.value, i.key) + print("注册布尔元件") + for i in hw["bools"]: + hwobj[i.key] = lib.Surface(i.value, i.key) +def selfchk(): + print("开始自检") + for i in hw: + print(f"{i}") + for i in hw["engines"]: + print(f"{j} ", end="") + + +if __name__ == "__main__": + name = "零号侧卫" + global hw + hw = dict() + port = 40808 + print(f"Commdore 飞行控制系统 服务端") + print("正在加载配置文件") + with open("config.yaml") as cfg_file: + cfg = yaml.safe_load(cfg_file) + name = cfg["name"] + model = cfg["model"] + hw = cfg["hardware"] + print(f"型号: {model}") + print(f"名称: {name}") + print("正在初始化硬件") + print("启动 socket 网络通信") + print("启动备用 socket 网络通信") + dummy = socket.socket() + dummy.bind(("localhost", port)) # 端口监听 + dummy.listen(1) + print(f"监听端口 " + str(port)) + conn, address = dummy.accept() + print(f"来自 {address} 的连接已接收") + msg = json.dumps(stat()) # 状态回传 + size = len(msg.encode()) + conn.send(msg.encode("UTF-8").ljust(2048)) + np = threading.Thread(target=debug_shell, name='Debugging') + np.start() + while True: + # 接收消息 + print(lib.Sensor.data) + data: str = conn.recv(2048).decode("UTF-8") + if data == "" or data == None: + continue + print(f"命令接收: {data}") + ret = proc(data) + if ret == 1: + break + msg = json.dumps(stat()) # 状态回传 + size = len(msg.encode()) + conn.send(msg.encode("UTF-8").ljust(2048 - size)) # encode将字符串编码为字节数组对象 + #print(msg.encode("UTF-8").decode('unicode_escape')) + #print(f"当前状态已回传") + + # 关闭连接 + conn.close() + dummy.close() + np.join() \ No newline at end of file diff --git a/testfield/legacy/server/termux_lib/termux_lib.py b/testfield/legacy/server/termux_lib/termux_lib.py new file mode 100644 index 0000000..c5745c5 --- /dev/null +++ b/testfield/legacy/server/termux_lib/termux_lib.py @@ -0,0 +1,232 @@ +import sys +import os +sys.path.append("../arduino_api") +import arduino_api as ctrl +import time +import os +import threading +import json +import subprocess +import psutil +import threading + +class Flank(object): + motor = None + name = None + curr_angle = None + code = None + type = "flank" + def __init__(self, name, code): + self.motor = ctrl.Motor(code) + self.curr_angle = self.motor.get() + self.name = name + self.code = code + print("初始化操纵面 {}, 硬件代号绑定为 {}".format(name, code)) + def set_angle(self, angle): + self.motor.set(angle) + self.curr_angle = self.motor.get() + #print("将 {} 的角度设置为 {} 度".format(self.name, angle)) + def add_angle(self, add_angle): + self.motor.set(self.curr_angle + add_angle) + self.curr_angle = self.motor.get() + def status(self): + return {"type":"Flank", "name":self.name, "angle":self.curr_angle} + def selfchk(self): + print("开始自检操纵面 {}".format(self.name)) + self.add_angle(30) + self.add_angle(-60) + self.set_angle(0) + +class Engine(object): + engine = None + curr_speed = None + code = None + name = None + type = "engine" + def __init__(self, name, code): + self.code = code + self.name = name + self.engine = ctrl.Engine(code) + self.curr_speed = self.engine.get() + print("初始化引擎 {}, 硬件代号绑定为 {}".format(name, code)) + def set_speed(self, new_speed): + self.engine.tune(new_speed) + self.curr_speed = self.engine.get() + ##print("将 {} 的转速调谐为 {} RPM".format(self.name, new_speed)) + def add_speed(self, add_speed): + self.engine.tune(self.curr_speed + add_speed) + self.curr_speed = self.engine.get() + ##print("将 {} 的转速加成 {} RPM".format(self.name, add_speed)) + def get_speed(self): + ##print("{} 当前转速为 {} RPM".format(self.name, self.curr_speed)) + self.curr_speed = self.engine.get() + return self.curr_speed + def status(self): + ##print(self.curr_speed) + return {"type":"Engine", "name":self.name, "speed":self.get_speed()} + def selfchk(self): + print("开始自检引擎 {}".format(self.name)) + self.add_speed(3) + self.add_speed(-6) + self.set_speed(0) + self.curr_speed = self.engine.get() + + +""" +@liteon-proximity: 光学接近传感器,用于检测物体的接近 +!yas537-mag: 磁力计传感器,用于测量磁场强度和方向 +@liteon-light: 光传感器,用于测量环境光强度 +@MPU6050-gyro: 陀螺仪传感器,用于测量角速度 +@MPU6050-accel: 加速度计传感器,用于测量线性加速度 +!liteon-pocket: 可能是特定用途的传感器,具体功能需参考设备文档 +*yas537-orientation: 方向传感器,结合磁力计和加速度计数据来确定设备的方向 +@Game Rotation Vector Sensor: 用于测量设备的旋转向量,常用于游戏和增强现实应用 +@GeoMag Rotation Vector Sensor: 结合地磁和加速度数据来测量设备的旋转向量 +@Gravity Sensor: 测量重力加速度,用于确定设备的姿态 +*Linear Acceleration Sensor: 测量去除重力影响后的线性加速度 +@Rotation Vector Sensor: 综合陀螺仪和加速度计数据来测量设备的旋转向量 +""" + +class Sensor: + data = { + "speed": 0, + "battery": 0, + "xangle": 0, + "yangle": 0, + "zangle": 0, + "sign": 0, + "cpuload": 0, + "memload": 0, + "memsum": 0, + "torch": 0, + "acc": 0 + } + is_torch_on = 0 + @staticmethod + def speed(): #TODO + Sensor.data["speed"] = -1 + return Sensor.data["speed"] + + @staticmethod + def battery(): # 电池剩余 + return Sensor.data["sysbattery"] + + def battery(): # TODO: 电机电池剩余 + return Sensor.data["battery"] + + @staticmethod + def xangle(): # X迎角 + return Sensor.data["xangle"] + + @staticmethod + def yangle(): + return Sensor.data["yangle"] + + @staticmethod + def zangle(): + return Sensor.data["zangle"] + + @staticmethod + def sign(): # 信号强度 + Sensor.data["sign"] = -1 + return Sensor.data["sign"] + + @staticmethod + def cpuload(): # CPU占用(百分比) + Sensor.data["cpuload"] = psutil.cpu_percent(interval=1) / 100 + return Sensor.data["cpuload"] + + @staticmethod + def memload(): # 内存占用(百分比) + Sensor.data["memload"] = psutil.virtual_memory().percent / 100 + return Sensor.data["memload"] + + @staticmethod + def memsum(): # 内存总量(MB) + return Sensor.data["memsum"] + + @staticmethod + def torch(): + return Sensor.data["torch"] + + @staticmethod + def torchon(): # 打开电筒 + Sensor.is_torch_on = not Sensor.is_torch_on + to = {0:"off", 1:"on"} + os.system(f"torch {to[Sensor.is_torch_on]} &") + + @staticmethod + def refresh(): + Sensor.speed() + Sensor.battery() + Sensor.xangle() + Sensor.yangle() + Sensor.zangle() + Sensor.sign() + Sensor.cpuload() + Sensor.memload() + Sensor.memsum() + Sensor.torch() + + @staticmethod + def stat(): + #Sensor.refresh() + return Sensor.data + + def update_battery(): + while True: + result = subprocess.run(['termux-battery-status'], capture_output=True, text=True) + data = json.loads(result.stdout) + Sensor.data["sysbattery"] = data["percentage"] + + def update_orientation(): + while True: + # TODO: 优化 + result = subprocess.run(['termux-sensor', '-s', 'yas537-orientation', '-n', '1'], capture_output=True, text=True) + data = json.loads(result.stdout) + Sensor.data["xangle"] = data["yas537-orientation"]["values"][0] + Sensor.data["yangle"] = data["yas537-orientation"]["values"][1] + Sensor.data["zangle"] = data["yas537-orientation"]["values"][2] + + def update_acceleration(): + while True: + result = subprocess.run(['termux-sensor', '-s', 'Linear Acceleration Sensor', '-n', '1'], capture_output=True, text=True) + data = json.loads(result.stdout) + Sensor.data["acc"] = data["Linear Acceleration Sensor"]["values"] + + def update_func(cmd): + while True: + exec(cmd) + time.sleep(0.1) + def init(): + Sensor.data = { + "speed": 0, + "battery": 0, + "xangle": 0, + "yangle": 0, + "zangle": 0, + "sign": 0, + "cpuload": 0, + "memload": 0, + "memsum": 0, + "torch": 0, + "acc": 0 + } + os.system("termux-torch off &") + Sensor.is_torch_on = 0 + Sensor.data["memsum"] = psutil.virtual_memory().total / (1024 * 1024) + + def deamon(): + # 启动线程 + Sensor.battery_thread = threading.Thread(target=Sensor.update_battery) + Sensor.orientation_thread = threading.Thread(target=Sensor.update_orientation) + Sensor.acceleration_thread = threading.Thread(target=Sensor.update_acceleration) + Sensor.other_thread = threading.Thread(target=Sensor.update_func, args=("""Sensor.sign()\nSensor.cpuload()\nSensor.memload()\nSensor.torch()""")) + Sensor.battery_thread.start() + Sensor.orientation_thread.start() + Sensor.acceleration_thread.start() + def stop(): + Sensor.orientation_thread.join() + Sensor.battery_thread.join() + Sensor.acceleration_thread.join() + Sensor.other_thread.join()