Compare commits

..

5 Commits

Author SHA1 Message Date
b552d20a46 fix:测距 2025-12-28 16:19:00 +08:00
21cec260b8 fix: 修改电量不固定 2025-12-26 15:12:47 +08:00
5a98bf2e85 pref: 计算环数代码 2025-12-26 14:04:43 +08:00
huangzhenwei2
f11b31c09c update hearbeat 2025-12-26 11:47:33 +08:00
0b18ec353c temp: 2025-12-25 16:08:42 +08:00
2 changed files with 146 additions and 81 deletions

View File

@@ -1,6 +1,6 @@
id: t11
name: t11
version: 1.0.2
version: 1.0.3
author: t11
icon: ''
desc: t11

157
main.py
View File

@@ -7,20 +7,23 @@
作者ZZH
最后更新2025-11-21
"""
from maix import camera, display, image, app, time, key, uart, pinmap, i2c, network, err
import _thread
import hashlib
import hmac
import json
import os
import re
import socket
import struct
import cv2
import numpy as np
import json
import struct
import re
from maix.peripheral import adc
import _thread
import os
import hmac
import ujson
import hashlib
import requests
import socket
import ujson
from maix import camera, display, image, app, time, key, uart, pinmap, i2c, network, err
from maix.peripheral import adc
# import config
# ==================== 全局配置 ====================
@@ -35,7 +38,7 @@ PASSWORD = None
# 服务器连接参数
SERVER_IP = "www.shelingxingqiu.com"
SERVER_PORT = 50005
HEARTBEAT_INTERVAL = 2 # 心跳间隔(秒)
HEARTBEAT_INTERVAL = 60 # 心跳间隔(秒)
# 激光校准配置
CONFIG_FILE = "/root/laser_config.json"
@@ -92,13 +95,15 @@ LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0x
# 相机标定参数(用于距离估算)
# FOCAL_LENGTH_PIX = 3800.0 # 焦距(像素)
FOCAL_LENGTH_PIX = 1900.0 # 焦距(像素)
FOCAL_LENGTH_PIX = 2200.0 # 焦距(像素)
REAL_RADIUS_CM = 15 # 靶心实际半径(厘米)
# TCP 连接状态
tcp_connected = False
send_queue = []
queue_lock = False
high_send_queue = [] # 高优先级发送队列:射箭事件等
normal_send_queue = [] # 普通发送队列:电量/校准结果/状态等
queue_lock = False # 简易互斥锁,保护队列
uart4g_lock = False # 简易互斥锁,保护 4G 串口 AT 发送流程(防并发)
update_thread_started = False # 防止 OTA 更新线程重复启动
@@ -164,6 +169,7 @@ def direct_ota_download():
finally:
update_thread_started = False # 允许下次 OTA
def handle_wifi_and_update(ssid, password):
"""在子线程中执行 Wi-Fi 连接 + OTA 更新流程"""
try:
@@ -250,16 +256,32 @@ def read_device_id():
return "DEFAULT_DEVICE_ID"
def safe_enqueue(data_dict, msg_type=2):
"""线程安全地将消息加入 TCP 发送队列"""
global queue_lock, send_queue
def safe_enqueue(data_dict, msg_type=2, high=False):
"""线程安全地将消息加入 TCP 发送队列(支持优先级)"""
global queue_lock, high_send_queue, normal_send_queue
while queue_lock:
time.sleep_ms(1)
queue_lock = True
send_queue.append((msg_type, data_dict))
item = (msg_type, data_dict)
if high:
high_send_queue.append(item)
else:
normal_send_queue.append(item)
queue_lock = False
def _uart4g_lock_acquire():
global uart4g_lock
while uart4g_lock:
time.sleep_ms(1)
uart4g_lock = True
def _uart4g_lock_release():
global uart4g_lock
uart4g_lock = False
def at(cmd, wait="OK", timeout=2000):
"""向 4G 模块发送 AT 指令并等待响应"""
if cmd:
@@ -301,7 +323,8 @@ def tcp_send_raw(data: bytes, max_retries=2) -> bool:
global tcp_connected
if not tcp_connected:
return False
_uart4g_lock_acquire()
try:
for attempt in range(max_retries):
cmd = f'AT+MIPSEND=0,{len(data)}'
if ">" not in at(cmd, ">", 1500):
@@ -322,6 +345,8 @@ def tcp_send_raw(data: bytes, max_retries=2) -> bool:
time.sleep_ms(100)
return False
finally:
_uart4g_lock_release()
def generate_token(device_id):
@@ -511,7 +536,10 @@ def detect_circle(frame):
circularity = 4 * np.pi * area / (perimeter ** 2)
if circularity > 0.75 and len(cnt) >= 5:
center, axes, angle = cv2.fitEllipse(cnt)
radius = (axes[0] + axes[1]) / 4
radius = axes[0]
if axes[1] < radius:
radius = axes[1]
radius /= 2
best_center = (int(center[0]), int(center[1]))
best_radius = int(radius)
best_radius1 = best_radius
@@ -566,6 +594,19 @@ def compute_laser_position(circle_center, laser_point, radius, method):
return dx / (circle_r / 100.0), -dy / (circle_r / 100.0)
def compute_laser_position_v2(circle_center, laser_point):
if circle_center is None:
return 200, 200
cx, cy = circle_center
lx, ly = 320, 220
dx = lx - cx
dy = ly - cy
r = 22.16 * 5
target_x = dx / r * 100
target_y = dy / r * 100
print(f"lx{lx} ly: {ly} cx: {cx} cy: {cy} dx: {dx} dy: {dy} result_x: {target_x} result_y: {-target_y}")
return (target_x, -target_y)
# ==================== TCP 通信线程 ====================
def connect_server():
@@ -574,8 +615,12 @@ def connect_server():
if tcp_connected:
return True
print("连接到服务器...")
_uart4g_lock_acquire()
try:
at("AT+MIPCLOSE=0", "OK", 1000)
res = at(f'AT+MIPOPEN=0,"TCP","{SERVER_IP}",{SERVER_PORT}', "+MIPOPEN", 8000)
finally:
_uart4g_lock_release()
if "+MIPOPEN: 0,0" in res:
tcp_connected = True
return True
@@ -584,7 +629,8 @@ def connect_server():
def tcp_main():
"""TCP 主通信循环:登录、心跳、处理指令、发送数据"""
global tcp_connected, send_queue, queue_lock, laser_calibration_active, laser_calibration_result, laser_calibration_lock,update_thread_started
global tcp_connected, high_send_queue, normal_send_queue, queue_lock, laser_calibration_active, laser_calibration_result, laser_calibration_lock, update_thread_started
send_hartbeat_fail_count = 0
while not app.need_exit():
if not connect_server():
time.sleep_ms(5000)
@@ -629,7 +675,7 @@ def tcp_main():
# 处理心跳 ACK
elif logged_in and msg_type == 4:
last_heartbeat_ack_time = time.ticks_ms()
# print("✅ 收到心跳确认")
print("✅ 收到心跳确认")
# 处理业务指令
elif logged_in and isinstance(body, dict):
@@ -665,7 +711,8 @@ def tcp_main():
safe_enqueue({"result": "update_already_started"}, 2)
elif inner_cmd == 6:
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
ip = os.popen(
"ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
ip = ip if ip else "no_ip"
except:
ip = "error_getting_ip"
@@ -678,12 +725,14 @@ def tcp_main():
# 实时检查是否有 IP
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
ip = os.popen(
"ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
except:
ip = None
if not ip:
safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, MSG_TYPE_STATUS)
safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"},
MSG_TYPE_STATUS)
else:
# 启动纯下载线程
update_thread_started = True
@@ -697,16 +746,21 @@ def tcp_main():
break
# 发送队列中的业务数据
if logged_in and not queue_lock and send_queue:
if logged_in and (high_send_queue or normal_send_queue) and (not queue_lock):
# 只在锁内取出一个待发包,发送放到锁外,避免长时间占用队列锁
while queue_lock:
time.sleep_ms(1)
queue_lock = True
if send_queue:
msg_type, data_dict = send_queue.pop(0)
if high_send_queue:
msg_type, data_dict = high_send_queue.pop(0)
else:
msg_type, data_dict = normal_send_queue.pop(0)
queue_lock = False
pkt = make_packet(msg_type, data_dict)
if not tcp_send_raw(pkt):
tcp_connected = False
queue_lock = False
break
queue_lock = False
# 发送激光校准结果
if logged_in and not laser_calibration_lock and laser_calibration_result is not None:
@@ -721,13 +775,21 @@ def tcp_main():
if logged_in and current_time - last_heartbeat_send_time > HEARTBEAT_INTERVAL * 1000:
if not tcp_send_raw(make_packet(4, {"t": int(time.time())})):
print("💔 心跳发送失败")
send_hartbeat_fail_count += 1
if send_hartbeat_fail_count >= 3:
send_hartbeat_fail_count = 0
print("连续3次发送心跳失败重连")
break
else:
continue
else:
send_hartbeat_fail_count = 0
last_heartbeat_send_time = current_time
# print("💓 心跳已发送")
print("💓 心跳已发送")
# 心跳超时重连
if logged_in and current_time - last_heartbeat_ack_time > 6000:
print("6秒无心跳ACK重连")
if logged_in and current_time - last_heartbeat_ack_time > 1000 * 60 * 10: # 十分钟
print("十分钟无心跳ACK重连")
break
time.sleep_ms(50)
@@ -761,6 +823,7 @@ def laser_calibration_worker():
def cmd_str():
global DEVICE_ID, PASSWORD
# print("env: ", config.get_env())
DEVICE_ID = read_device_id()
PASSWORD = DEVICE_ID + "."
@@ -774,7 +837,7 @@ def cmd_str():
# 初始化硬件
init_ina226()
load_laser_point()
# load_laser_point()
disp = display.Display()
cam = camera.Camera(640, 480)
@@ -790,7 +853,8 @@ def cmd_str():
while not app.need_exit():
current_time = time.ticks_ms()
# print("压力传感器数值: ", adc_obj.read())
if adc_obj.read() > ADC_TRIGGER_THRESHOLD:
adc_val = adc_obj.read()
if adc_val > ADC_TRIGGER_THRESHOLD:
diff_ms = current_time - last_adc_trigger
if diff_ms < 3000:
continue
@@ -809,7 +873,7 @@ def cmd_str():
disp.show(result_img)
# 计算偏移与距离
dx, dy = compute_laser_position(center, (x, y), radius, method)
dx, dy = compute_laser_position_v2(center, (x, y))
distance_m = estimate_distance(best_radius1)
# 读取电量
@@ -817,23 +881,24 @@ def cmd_str():
battery_percent = voltage_to_percent(voltage)
# 保存图像(带标注)
try:
jpg_count = len([f for f in os.listdir(photo_dir) if f.endswith('.jpg')])
filename = f"{photo_dir}/{int(x)}_{int(y)}_{round((distance_m or 0.0) * 100)}_{method}_{jpg_count:04d}.jpg"
result_img.save(filename, quality=70)
except Exception as e:
print(f"❌ 保存失败: {e}")
# try:
# jpg_count = len([f for f in os.listdir(photo_dir) if f.endswith('.jpg')])
# filename = f"{photo_dir}/{int(x)}_{int(y)}_{round((distance_m or 0.0) * 100)}_{method}_{jpg_count:04d}.jpg"
# result_img.save(filename, quality=70)
# except Exception as e:
# print(f"❌ 保存失败: {e}")
# 构造上报数据
inner_data = {
"x": float(dx) if dx is not None else 200.0,
"y": float(dy) if dy is not None else 200.0,
"r": 90.0,
"d": round((distance_m or 0.0) * 100), # 距离(厘米)
"m": method
"m": method,
"adc": adc_val,
}
report_data = {"cmd": 1, "data": inner_data}
safe_enqueue(report_data)
# 射箭事件高优先级入队,由 tcp_main 统一发送
safe_enqueue(report_data, msg_type=2, high=True)
print("📤 射箭事件已加入发送队列")
time.sleep_ms(100)