From 21cec260b8f50365fceba5dacdcf840e4fd09011 Mon Sep 17 00:00:00 2001 From: linyimin <18316471919@139.com> Date: Fri, 26 Dec 2025 15:12:47 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E6=94=B9=E7=94=B5=E9=87=8F?= =?UTF-8?q?=E4=B8=8D=E5=9B=BA=E5=AE=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 84 +++++++++++++++++++++++++++++++-------------------------- 1 file changed, 46 insertions(+), 38 deletions(-) diff --git a/main.py b/main.py index 5c39bfa..03804a6 100644 --- a/main.py +++ b/main.py @@ -7,20 +7,21 @@ 作者:ZZH 最后更新:2025-11-21 """ -from maix import camera, display, image, app, time, key, uart, pinmap, i2c, network, err +import _thread +import hashlib +import hmac +import json +import os +import re +import socket +import struct + import cv2 import numpy as np -import json -import struct -import re -from maix.peripheral import adc -import _thread -import os -import hmac -import ujson -import hashlib import requests -import socket +import ujson +from maix import camera, display, image, app, time, key, uart, pinmap, i2c, network, err +from maix.peripheral import adc # import config @@ -49,7 +50,7 @@ URL = "http://ws.shelingxingqiu.com" API_PATH = "/home/shoot/device_fire/arrow/fire" # UART 设备初始化 -uart4g = uart.UART("/dev/ttyS2", 115200) # 4G 模块通信 +uart4g = uart.UART("/dev/ttyS2", 115200) # 4G 模块通信 distance_serial = uart.UART("/dev/ttyS1", 9600) # 激光测距模块 # 引脚功能映射 @@ -71,14 +72,14 @@ thickness = 1 length = 2 # 全局状态变量 -laser_calibration_active = False # 是否正在后台校准激光 -laser_calibration_result = None # 校准结果坐标 (x, y) -laser_calibration_lock = False # 简易互斥锁,防止多线程冲突 +laser_calibration_active = False # 是否正在后台校准激光 +laser_calibration_result = None # 校准结果坐标 (x, y) +laser_calibration_lock = False # 简易互斥锁,防止多线程冲突 # 硬件对象初始化 laser_x, laser_y = laser_point adc_obj = adc.ADC(0, adc.RES_BIT_12) -bus = i2c.I2C(1, i2c.Mode.MASTER) # 使用 I2C1 总线 +bus = i2c.I2C(1, i2c.Mode.MASTER) # 使用 I2C1 总线 # bus = i2c.I2C(5, i2c.Mode.MASTER) #ota升级的 # INA226 电流/电压监测芯片寄存器地址 INA226_ADDR = 0x40 @@ -94,15 +95,15 @@ LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0x # 相机标定参数(用于距离估算) # FOCAL_LENGTH_PIX = 3800.0 # 焦距(像素) -FOCAL_LENGTH_PIX = 1900.0 # 焦距(像素) -REAL_RADIUS_CM = 15 # 靶心实际半径(厘米) +FOCAL_LENGTH_PIX = 1900.0 # 焦距(像素) +REAL_RADIUS_CM = 15 # 靶心实际半径(厘米) # TCP 连接状态 tcp_connected = False -high_send_queue = [] # 高优先级发送队列:射箭事件等 -normal_send_queue = [] # 普通发送队列:电量/校准结果/状态等 -queue_lock = False # 简易互斥锁,保护队列 -uart4g_lock = False # 简易互斥锁,保护 4G 串口 AT 发送流程(防并发) +high_send_queue = [] # 高优先级发送队列:射箭事件等 +normal_send_queue = [] # 普通发送队列:电量/校准结果/状态等 +queue_lock = False # 简易互斥锁,保护队列 +uart4g_lock = False # 简易互斥锁,保护 4G 串口 AT 发送流程(防并发) update_thread_started = False # 防止 OTA 更新线程重复启动 @@ -168,6 +169,7 @@ def direct_ota_download(): finally: update_thread_started = False # 允许下次 OTA + def handle_wifi_and_update(ssid, password): """在子线程中执行 Wi-Fi 连接 + OTA 更新流程""" try: @@ -442,7 +444,7 @@ def find_red_laser(frame, threshold=150): for y in range(0, h, 2): for x in range(0, w, 2): idx = (y * w + x) * 3 - r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2] + r, g, b = img_bytes[idx], img_bytes[idx + 1], img_bytes[idx + 2] if r > threshold and r > g * 2 and r > b * 2: rgb_sum = r + g + b if rgb_sum > max_sum: @@ -588,18 +590,20 @@ def compute_laser_position(circle_center, laser_point, radius, method): dy = ly - cy return dx / (circle_r / 100.0), -dy / (circle_r / 100.0) + def compute_laser_position_v2(circle_center, laser_point): - print(f"circle_center : {circle_center}") + if circle_center is None: + return 200, 200 cx, cy = circle_center - lx, ly = 320,230 - # lx, ly = laser_point + lx, ly = 320, 220 dx = lx - cx dy = ly - cy r = 22.16 * 5 - target_x = dx/r*100 - target_y = dy/r*100 + target_x = dx / r * 100 + target_y = dy / r * 100 print(f"lx:{lx} ly: {ly} cx: {cx} cy: {cy} dx: {dx} dy: {dy} result_x: {target_x} result_y: {-target_y}") return (target_x, -target_y) + # ==================== TCP 通信线程 ==================== def connect_server(): @@ -704,7 +708,8 @@ def tcp_main(): safe_enqueue({"result": "update_already_started"}, 2) elif inner_cmd == 6: try: - ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() + ip = os.popen( + "ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() ip = ip if ip else "no_ip" except: ip = "error_getting_ip" @@ -717,12 +722,14 @@ def tcp_main(): # 实时检查是否有 IP try: - ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() + ip = os.popen( + "ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() except: ip = None if not ip: - safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, MSG_TYPE_STATUS) + safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, + MSG_TYPE_STATUS) else: # 启动纯下载线程 update_thread_started = True @@ -778,7 +785,7 @@ def tcp_main(): print("💓 心跳已发送") # 心跳超时重连 - if logged_in and current_time - last_heartbeat_ack_time > 1000*60*10: # 十分钟 + if logged_in and current_time - last_heartbeat_ack_time > 1000 * 60 * 10: # 十分钟 print("⏰ 十分钟无心跳ACK,重连") break @@ -843,9 +850,10 @@ def cmd_str(): while not app.need_exit(): current_time = time.ticks_ms() # print("压力传感器数值: ", adc_obj.read()) - if adc_obj.read() > ADC_TRIGGER_THRESHOLD: - diff_ms = current_time-last_adc_trigger - if diff_ms<3000: + adc_val = adc_obj.read() + if adc_val > ADC_TRIGGER_THRESHOLD: + diff_ms = current_time - last_adc_trigger + if diff_ms < 3000: continue last_adc_trigger = current_time time.sleep_ms(60) # 防抖 @@ -876,14 +884,14 @@ def cmd_str(): # result_img.save(filename, quality=70) # except Exception as e: # print(f"❌ 保存失败: {e}") - # 构造上报数据 inner_data = { "x": float(dx) if dx is not None else 200.0, "y": float(dy) if dy is not None else 200.0, "r": 90.0, "d": round((distance_m or 0.0) * 100), # 距离(厘米) - "m": method + "m": method, + "adc": adc_val, } report_data = {"cmd": 1, "data": inner_data} # 射箭事件高优先级入队,由 tcp_main 统一发送 @@ -897,4 +905,4 @@ def cmd_str(): if __name__ == "__main__": - cmd_str() \ No newline at end of file + cmd_str()