Compare commits

..

5 Commits

Author SHA1 Message Date
b552d20a46 fix:测距 2025-12-28 16:19:00 +08:00
21cec260b8 fix: 修改电量不固定 2025-12-26 15:12:47 +08:00
5a98bf2e85 pref: 计算环数代码 2025-12-26 14:04:43 +08:00
huangzhenwei2
f11b31c09c update hearbeat 2025-12-26 11:47:33 +08:00
0b18ec353c temp: 2025-12-25 16:08:42 +08:00
2 changed files with 146 additions and 81 deletions

View File

@@ -1,6 +1,6 @@
id: t11
name: t11
version: 1.0.2
version: 1.0.3
author: t11
icon: ''
desc: t11

223
main.py
View File

@@ -7,20 +7,23 @@
作者ZZH
最后更新2025-11-21
"""
from maix import camera, display, image, app, time, key, uart, pinmap, i2c, network, err
import _thread
import hashlib
import hmac
import json
import os
import re
import socket
import struct
import cv2
import numpy as np
import json
import struct
import re
from maix.peripheral import adc
import _thread
import os
import hmac
import ujson
import hashlib
import requests
import socket
import ujson
from maix import camera, display, image, app, time, key, uart, pinmap, i2c, network, err
from maix.peripheral import adc
# import config
# ==================== 全局配置 ====================
@@ -35,7 +38,7 @@ PASSWORD = None
# 服务器连接参数
SERVER_IP = "www.shelingxingqiu.com"
SERVER_PORT = 50005
HEARTBEAT_INTERVAL = 2 # 心跳间隔(秒)
HEARTBEAT_INTERVAL = 60 # 心跳间隔(秒)
# 激光校准配置
CONFIG_FILE = "/root/laser_config.json"
@@ -47,7 +50,7 @@ URL = "http://ws.shelingxingqiu.com"
API_PATH = "/home/shoot/device_fire/arrow/fire"
# UART 设备初始化
uart4g = uart.UART("/dev/ttyS2", 115200) # 4G 模块通信
uart4g = uart.UART("/dev/ttyS2", 115200) # 4G 模块通信
distance_serial = uart.UART("/dev/ttyS1", 9600) # 激光测距模块
# 引脚功能映射
@@ -69,14 +72,14 @@ thickness = 1
length = 2
# 全局状态变量
laser_calibration_active = False # 是否正在后台校准激光
laser_calibration_result = None # 校准结果坐标 (x, y)
laser_calibration_lock = False # 简易互斥锁,防止多线程冲突
laser_calibration_active = False # 是否正在后台校准激光
laser_calibration_result = None # 校准结果坐标 (x, y)
laser_calibration_lock = False # 简易互斥锁,防止多线程冲突
# 硬件对象初始化
laser_x, laser_y = laser_point
adc_obj = adc.ADC(0, adc.RES_BIT_12)
bus = i2c.I2C(1, i2c.Mode.MASTER) # 使用 I2C1 总线
bus = i2c.I2C(1, i2c.Mode.MASTER) # 使用 I2C1 总线
# bus = i2c.I2C(5, i2c.Mode.MASTER) #ota升级的
# INA226 电流/电压监测芯片寄存器地址
INA226_ADDR = 0x40
@@ -92,13 +95,15 @@ LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0x
# 相机标定参数(用于距离估算)
# FOCAL_LENGTH_PIX = 3800.0 # 焦距(像素)
FOCAL_LENGTH_PIX = 1900.0 # 焦距(像素)
REAL_RADIUS_CM = 15 # 靶心实际半径(厘米)
FOCAL_LENGTH_PIX = 2200.0 # 焦距(像素)
REAL_RADIUS_CM = 15 # 靶心实际半径(厘米)
# TCP 连接状态
tcp_connected = False
send_queue = []
queue_lock = False
high_send_queue = [] # 高优先级发送队列:射箭事件等
normal_send_queue = [] # 普通发送队列:电量/校准结果/状态等
queue_lock = False # 简易互斥锁,保护队列
uart4g_lock = False # 简易互斥锁,保护 4G 串口 AT 发送流程(防并发)
update_thread_started = False # 防止 OTA 更新线程重复启动
@@ -164,6 +169,7 @@ def direct_ota_download():
finally:
update_thread_started = False # 允许下次 OTA
def handle_wifi_and_update(ssid, password):
"""在子线程中执行 Wi-Fi 连接 + OTA 更新流程"""
try:
@@ -250,16 +256,32 @@ def read_device_id():
return "DEFAULT_DEVICE_ID"
def safe_enqueue(data_dict, msg_type=2):
"""线程安全地将消息加入 TCP 发送队列"""
global queue_lock, send_queue
def safe_enqueue(data_dict, msg_type=2, high=False):
"""线程安全地将消息加入 TCP 发送队列(支持优先级)"""
global queue_lock, high_send_queue, normal_send_queue
while queue_lock:
time.sleep_ms(1)
queue_lock = True
send_queue.append((msg_type, data_dict))
item = (msg_type, data_dict)
if high:
high_send_queue.append(item)
else:
normal_send_queue.append(item)
queue_lock = False
def _uart4g_lock_acquire():
global uart4g_lock
while uart4g_lock:
time.sleep_ms(1)
uart4g_lock = True
def _uart4g_lock_release():
global uart4g_lock
uart4g_lock = False
def at(cmd, wait="OK", timeout=2000):
"""向 4G 模块发送 AT 指令并等待响应"""
if cmd:
@@ -301,27 +323,30 @@ def tcp_send_raw(data: bytes, max_retries=2) -> bool:
global tcp_connected
if not tcp_connected:
return False
for attempt in range(max_retries):
cmd = f'AT+MIPSEND=0,{len(data)}'
if ">" not in at(cmd, ">", 1500):
time.sleep_ms(100)
continue
time.sleep_ms(10)
full = data + b"\x1A" # AT 指令结束符
try:
sent = uart4g.write(full)
if sent != len(full):
_uart4g_lock_acquire()
try:
for attempt in range(max_retries):
cmd = f'AT+MIPSEND=0,{len(data)}'
if ">" not in at(cmd, ">", 1500):
time.sleep_ms(100)
continue
except:
continue
if "OK" in at("", "OK", 1000):
return True
time.sleep_ms(100)
time.sleep_ms(10)
full = data + b"\x1A" # AT 指令结束符
try:
sent = uart4g.write(full)
if sent != len(full):
continue
except:
continue
return False
if "OK" in at("", "OK", 1000):
return True
time.sleep_ms(100)
return False
finally:
_uart4g_lock_release()
def generate_token(device_id):
@@ -419,7 +444,7 @@ def find_red_laser(frame, threshold=150):
for y in range(0, h, 2):
for x in range(0, w, 2):
idx = (y * w + x) * 3
r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2]
r, g, b = img_bytes[idx], img_bytes[idx + 1], img_bytes[idx + 2]
if r > threshold and r > g * 2 and r > b * 2:
rgb_sum = r + g + b
if rgb_sum > max_sum:
@@ -511,7 +536,10 @@ def detect_circle(frame):
circularity = 4 * np.pi * area / (perimeter ** 2)
if circularity > 0.75 and len(cnt) >= 5:
center, axes, angle = cv2.fitEllipse(cnt)
radius = (axes[0] + axes[1]) / 4
radius = axes[0]
if axes[1] < radius:
radius = axes[1]
radius /= 2
best_center = (int(center[0]), int(center[1]))
best_radius = int(radius)
best_radius1 = best_radius
@@ -566,6 +594,19 @@ def compute_laser_position(circle_center, laser_point, radius, method):
return dx / (circle_r / 100.0), -dy / (circle_r / 100.0)
def compute_laser_position_v2(circle_center, laser_point):
if circle_center is None:
return 200, 200
cx, cy = circle_center
lx, ly = 320, 220
dx = lx - cx
dy = ly - cy
r = 22.16 * 5
target_x = dx / r * 100
target_y = dy / r * 100
print(f"lx{lx} ly: {ly} cx: {cx} cy: {cy} dx: {dx} dy: {dy} result_x: {target_x} result_y: {-target_y}")
return (target_x, -target_y)
# ==================== TCP 通信线程 ====================
def connect_server():
@@ -574,8 +615,12 @@ def connect_server():
if tcp_connected:
return True
print("连接到服务器...")
at("AT+MIPCLOSE=0", "OK", 1000)
res = at(f'AT+MIPOPEN=0,"TCP","{SERVER_IP}",{SERVER_PORT}', "+MIPOPEN", 8000)
_uart4g_lock_acquire()
try:
at("AT+MIPCLOSE=0", "OK", 1000)
res = at(f'AT+MIPOPEN=0,"TCP","{SERVER_IP}",{SERVER_PORT}', "+MIPOPEN", 8000)
finally:
_uart4g_lock_release()
if "+MIPOPEN: 0,0" in res:
tcp_connected = True
return True
@@ -584,7 +629,8 @@ def connect_server():
def tcp_main():
"""TCP 主通信循环:登录、心跳、处理指令、发送数据"""
global tcp_connected, send_queue, queue_lock, laser_calibration_active, laser_calibration_result, laser_calibration_lock,update_thread_started
global tcp_connected, high_send_queue, normal_send_queue, queue_lock, laser_calibration_active, laser_calibration_result, laser_calibration_lock, update_thread_started
send_hartbeat_fail_count = 0
while not app.need_exit():
if not connect_server():
time.sleep_ms(5000)
@@ -629,7 +675,7 @@ def tcp_main():
# 处理心跳 ACK
elif logged_in and msg_type == 4:
last_heartbeat_ack_time = time.ticks_ms()
# print("✅ 收到心跳确认")
print("✅ 收到心跳确认")
# 处理业务指令
elif logged_in and isinstance(body, dict):
@@ -665,7 +711,8 @@ def tcp_main():
safe_enqueue({"result": "update_already_started"}, 2)
elif inner_cmd == 6:
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
ip = os.popen(
"ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
ip = ip if ip else "no_ip"
except:
ip = "error_getting_ip"
@@ -678,12 +725,14 @@ def tcp_main():
# 实时检查是否有 IP
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
ip = os.popen(
"ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
except:
ip = None
if not ip:
safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, MSG_TYPE_STATUS)
safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"},
MSG_TYPE_STATUS)
else:
# 启动纯下载线程
update_thread_started = True
@@ -697,17 +746,22 @@ def tcp_main():
break
# 发送队列中的业务数据
if logged_in and not queue_lock and send_queue:
if logged_in and (high_send_queue or normal_send_queue) and (not queue_lock):
# 只在锁内取出一个待发包,发送放到锁外,避免长时间占用队列锁
while queue_lock:
time.sleep_ms(1)
queue_lock = True
if send_queue:
msg_type, data_dict = send_queue.pop(0)
pkt = make_packet(msg_type, data_dict)
if not tcp_send_raw(pkt):
tcp_connected = False
queue_lock = False
break
if high_send_queue:
msg_type, data_dict = high_send_queue.pop(0)
else:
msg_type, data_dict = normal_send_queue.pop(0)
queue_lock = False
pkt = make_packet(msg_type, data_dict)
if not tcp_send_raw(pkt):
tcp_connected = False
break
# 发送激光校准结果
if logged_in and not laser_calibration_lock and laser_calibration_result is not None:
laser_calibration_lock = True
@@ -721,13 +775,21 @@ def tcp_main():
if logged_in and current_time - last_heartbeat_send_time > HEARTBEAT_INTERVAL * 1000:
if not tcp_send_raw(make_packet(4, {"t": int(time.time())})):
print("💔 心跳发送失败")
break
last_heartbeat_send_time = current_time
# print("💓 心跳已发送")
send_hartbeat_fail_count += 1
if send_hartbeat_fail_count >= 3:
send_hartbeat_fail_count = 0
print("连续3次发送心跳失败重连")
break
else:
continue
else:
send_hartbeat_fail_count = 0
last_heartbeat_send_time = current_time
print("💓 心跳已发送")
# 心跳超时重连
if logged_in and current_time - last_heartbeat_ack_time > 6000:
print("6秒无心跳ACK重连")
if logged_in and current_time - last_heartbeat_ack_time > 1000 * 60 * 10: # 十分钟
print("十分钟无心跳ACK重连")
break
time.sleep_ms(50)
@@ -761,6 +823,7 @@ def laser_calibration_worker():
def cmd_str():
global DEVICE_ID, PASSWORD
# print("env: ", config.get_env())
DEVICE_ID = read_device_id()
PASSWORD = DEVICE_ID + "."
@@ -774,7 +837,7 @@ def cmd_str():
# 初始化硬件
init_ina226()
load_laser_point()
# load_laser_point()
disp = display.Display()
cam = camera.Camera(640, 480)
@@ -790,9 +853,10 @@ def cmd_str():
while not app.need_exit():
current_time = time.ticks_ms()
# print("压力传感器数值: ", adc_obj.read())
if adc_obj.read() > ADC_TRIGGER_THRESHOLD:
diff_ms = current_time-last_adc_trigger
if diff_ms<3000:
adc_val = adc_obj.read()
if adc_val > ADC_TRIGGER_THRESHOLD:
diff_ms = current_time - last_adc_trigger
if diff_ms < 3000:
continue
last_adc_trigger = current_time
time.sleep_ms(60) # 防抖
@@ -809,7 +873,7 @@ def cmd_str():
disp.show(result_img)
# 计算偏移与距离
dx, dy = compute_laser_position(center, (x, y), radius, method)
dx, dy = compute_laser_position_v2(center, (x, y))
distance_m = estimate_distance(best_radius1)
# 读取电量
@@ -817,23 +881,24 @@ def cmd_str():
battery_percent = voltage_to_percent(voltage)
# 保存图像(带标注)
try:
jpg_count = len([f for f in os.listdir(photo_dir) if f.endswith('.jpg')])
filename = f"{photo_dir}/{int(x)}_{int(y)}_{round((distance_m or 0.0) * 100)}_{method}_{jpg_count:04d}.jpg"
result_img.save(filename, quality=70)
except Exception as e:
print(f"❌ 保存失败: {e}")
# try:
# jpg_count = len([f for f in os.listdir(photo_dir) if f.endswith('.jpg')])
# filename = f"{photo_dir}/{int(x)}_{int(y)}_{round((distance_m or 0.0) * 100)}_{method}_{jpg_count:04d}.jpg"
# result_img.save(filename, quality=70)
# except Exception as e:
# print(f"❌ 保存失败: {e}")
# 构造上报数据
inner_data = {
"x": float(dx) if dx is not None else 200.0,
"y": float(dy) if dy is not None else 200.0,
"r": 90.0,
"d": round((distance_m or 0.0) * 100), # 距离(厘米)
"m": method
"m": method,
"adc": adc_val,
}
report_data = {"cmd": 1, "data": inner_data}
safe_enqueue(report_data)
# 射箭事件高优先级入队,由 tcp_main 统一发送
safe_enqueue(report_data, msg_type=2, high=True)
print("📤 射箭事件已加入发送队列")
time.sleep_ms(100)