Improvement

This commit is contained in:
2025-03-10 03:10:22 +08:00
commit afd1a37c0c
36 changed files with 1599 additions and 0 deletions

3
testfield/README.md Normal file
View File

@@ -0,0 +1,3 @@
# 实验与参考用代码
仅作技术分析验证与历史留存, 若要修改此项目代码, 不必处理此文件夹
若你要使用此软件, 可以克隆后直接将 testfield 文件夹删除

View File

@@ -0,0 +1,2 @@
# Sin 波形传输 [成功]
基于 websocket 的通讯功能模型测试

View File

@@ -0,0 +1,70 @@
import pygame
import sys
import threading
import math
import websocket
import json
import time
sin_remote = 0
th1 = None
th2 = None
moon_addr = "127.0.0.1"
def on_open(wsapp):
print("打开新连接")
print("准备待命")
message = {
"id": "comet",
"sendto": "auth",
"content": "none"
}
wsapp.send(json.dumps(message))
def on_message(wsapp, message):
global sin_remote
print(f"接收到消息: {message}")
if message == "-3":
th1.start()
sin_remote = float(message)
def on_close(wsapp):
print("on_close")
def inet():
global sin_remote
wsapp = websocket.WebSocketApp(f"ws://{moon_addr}:8765",
on_open=on_open,
on_message=on_message,
on_close=on_close)
wsapp.run_forever()
def grap():
pygame.init()
width, height = 628, 600
screen = pygame.display.set_mode((width, height))
pygame.display.set_caption("TEST")
LOCAL_COLOR = (255, 255, 255, 64)
REMOTE_COLOR = [(0, 0, 255, 128), (255, 0, 0, 128)]
frequency = 0.01 # 波动频率
amplitude = 100 # 波动幅度
x = round(time.time()*1000)
while True:
x = round(time.time()*1000)
x = x % 628
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
sys.exit()
#pygame.draw.rect(screen, (0, 0, 0), (x, 0, 5, height)) # 扫描线
y = height // 2 + amplitude * math.sin((x * frequency))
y_remote = height // 2 + amplitude * sin_remote
pygame.draw.rect(screen, (round(time.time()*1000+120)%256,round(time.time()*1000)%256,round(time.time()*1000+60)%256), (x, int(y_remote), 4, 4)) # 远程波形
pygame.draw.rect(screen, LOCAL_COLOR, (x, int(y), 2, 2)) # 本地波形
pygame.display.flip()
pygame.time.delay(10)
if __name__ == "__main__":
th1 = threading.Thread(target=grap)
th2 = threading.Thread(target=inet)
# 启动线程
th2.start()

View File

@@ -0,0 +1,44 @@
import json
from websocket_server import WebsocketServer
moon_addr = "127.0.0.1"
clients = dict() # id -> name
clients_rev = dict() # name -> id
clients_connection = dict() # id -> client obj
def new_client(client, server):
clients_connection[client["id"]] = client
print(f"新客户端连接: {client['id']}")
def client_left(client, server):
print(f"客户端断开连接: {client['id']}")
# 连接关闭时从集合中移除
if client in clients.values():
clients.pop(clients_rev[client['id']])
clients_rev.pop(client['id'])
def message_received(client, server, message):
print(f"处理信息: {message}")
try:
msg = json.loads(message)
clients[msg["id"]] = client['id']
clients_rev[client['id']] = msg["id"]
if msg["sendto"] == "auth":
return
try:
server.send_message(clients_connection[clients[msg["sendto"]]], msg["content"])
except KeyError:
print("无法发送至端点")
except json.JSONDecodeError:
print("无法解码 JSON 消息")
def main():
server = WebsocketServer(host=f'{moon_addr}', port=8765)
server.set_fn_new_client(new_client)
server.set_fn_client_left(client_left)
server.set_fn_message_received(message_received)
print(f'启动"卫星"中继服务器: ws://{moon_addr}:8765')
server.run_forever()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,34 @@
import websocket
import threading
import math
import json
import time
moon_addr = "127.0.0.1"
def on_open(wsapp):
print("打开新的连接")
input("回车以启动同步: ")
message = {
"id": "pulsar",
"sendto": "comet",
"content": "-3"
}
wsapp.send(json.dumps(message))
x = round(time.time()*1000)
while 1:
x = round(time.time()*1000)
x = x % 628
message["content"] = str(math.sin(x*0.01))
wsapp.send(json.dumps(message))
time.sleep(0.01)
def on_message(wsapp, message):
print("on_message:", message)
def on_close(wsapp):
print("on_close")
wsapp = websocket.WebSocketApp(f"ws://{moon_addr}:8765",
on_open=on_open,
on_message=on_message,
on_close=on_close)
wsapp.run_forever()

View File

@@ -0,0 +1,2 @@
# 总览测试
基于 bmp 的通讯功能模型测试

63
testfield/visual/main.py Normal file
View File

@@ -0,0 +1,63 @@
from PIL import Image
import pygame
import sys
import time
colorzone = dict()
def img():
# 打开 BMP 文件
bmp_image = Image.open('view.bmp')
# 获取图像的基本信息
width, height = bmp_image.size
mode = bmp_image.mode
print(f"图像宽度: {width}, 高度: {height}, 模式: {mode}")
# 读取图像数据
pixel_data = bmp_image.load() # 获取像素数据
# 遍历每个像素并打印其 RGB 值
minleft = 0x3f3f3f3f
mintop = 0x3f3f3f3f
for y in range(height):
for x in range(width):
minleft = min(minleft, x)
mintop = min(mintop, y)
r, g, b = pixel_data[x, y] # 获取每个像素的 RGB 值
if r == 255 and b == 255 and g == 255:
pass
else:
if (r,g,b) in colorzone:
colorzone[(r,g,b)].append(((x//8)*4,(y//8)*4))
else:
colorzone[(r,g,b)] = list()
colorzone[(r,g,b)].append((x//8*4,y//8*4))
#print(f"像素 ({x}, {y}): R={r}, G={g}, B={b}")
colorzone[(255,255,255)] = colorzone[(0,0,0)]
# 关闭图像
bmp_image.close()
def grap():
pygame.init()
width, height = 800, 1000
screen = pygame.display.set_mode((width, height))
pygame.display.set_caption("TEST")
global colorzone
while True:
for i in colorzone.keys():
for j in colorzone[i]:
if i != (255,255,255):
pygame.draw.rect(screen, i, (j[0], j[1], 2, 2)) # 远程波形
else:
pygame.draw.rect(screen, i, (j[0], j[1], 1, 1)) # 远程波形
pygame.display.flip()
pygame.time.delay(10)
if __name__ == "__main__":
print("START")
img()
print("LOADED")
grap()

BIN
testfield/visual/view.bmp Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 MiB

View File

@@ -0,0 +1,17 @@
# 机体: Fuselage -> fuse
# 副翼: Aileron -> aile
# 襟翼: Flap -> flap
# 发动机: Engine -> eng
# 控制器: Controller -> ctrl
# 方向舵: Rudder -> rud
# 俯仰舵: Elevator -> elev
marking:
color_config:
fuselage: (255,255,255)
aileron: ()
flap:
engine:
controller: (255,255,255)
rudder:
elevator: