#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 激光射击系统主程序(视觉测距版) 功能:目标检测、激光校准、4G TCP 通信、OTA 升级、单目测距、INA226 电量监测 平台:MaixPy (Sipeed MAIX) 作者:ZZH 最后更新:2025-11-21 """ from maix import camera, display, image, app, time, key, uart, pinmap, i2c, network, err import cv2 import numpy as np import json import struct import re from maix.peripheral import adc import _thread import os import hmac import ujson import hashlib import requests import socket import re import binascii try: import hashlib except: hashlib = None # import config # ==================== Locks ==================== class _Mutex: """ 基于 _thread.allocate_lock() 的互斥锁封装: - 支持 with - 支持 try_acquire(若固件不支持非阻塞 acquire 参数,则退化为阻塞 acquire) """ def __init__(self): self._lk = _thread.allocate_lock() def acquire(self, blocking=True): try: return self._lk.acquire(blocking) except TypeError: self._lk.acquire() return True def try_acquire(self): return self.acquire(False) def release(self): self._lk.release() def __enter__(self): self.acquire(True) return self def __exit__(self, exc_type, exc, tb): self.release() return False # ==================== 全局配置 ==================== # OTA 升级地址与本地路径 # url = "https://static.shelingxingqiu.com/shoot/202511031031/main.py" local_filename = "/maixapp/apps/t11/main_tmp.py" app_version = '1.0.0' # OTA 下发参数(由后端指令写入) OTA_URL = None OTA_MODE = None # "4g" / "wifi" / None def is_wifi_connected(): """尽量判断当前是否有 Wi-Fi(有则走 Wi-Fi OTA,否则走 4G OTA)""" # 优先用 MaixPy network(如果可用) try: wlan = network.WLAN(network.TYPE_WIFI) if wlan.isconnected(): return True except: pass # 兜底:看系统 wlan0 有没有 IP(你系统可能没有 wlan0,则返回 False) try: ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() return bool(ip) except: return False # 设备认证信息(运行时动态加载) DEVICE_ID = None PASSWORD = None # 服务器连接参数 SERVER_IP = "www.shelingxingqiu.com" SERVER_PORT = 50005 HEARTBEAT_INTERVAL = 60 # 心跳间隔(秒) # 激光校准配置 CONFIG_FILE = "/root/laser_config.json" DEFAULT_POINT = (640, 480) # 默认激光中心点(图像中心) laser_point = DEFAULT_POINT # HTTP 上报接口 URL = "http://ws.shelingxingqiu.com" API_PATH = "/home/shoot/device_fire/arrow/fire" # UART 设备初始化 uart4g = uart.UART("/dev/ttyS2", 115200) # 4G 模块通信 distance_serial = uart.UART("/dev/ttyS1", 9600) # 激光测距模块 # 引脚功能映射 pinmap.set_pin_function("A18", "UART1_RX") pinmap.set_pin_function("A19", "UART1_TX") pinmap.set_pin_function("A29", "UART2_RX") pinmap.set_pin_function("A28", "UART2_TX") pinmap.set_pin_function("P18", "I2C1_SCL") pinmap.set_pin_function("P21", "I2C1_SDA") # pinmap.set_pin_function("A15", "I2C5_SCL") # pinmap.set_pin_function("A27", "I2C5_SDA")#ota升级要修改的 # ADC 触发阈值(用于检测扳机/激光触发) ADC_TRIGGER_THRESHOLD = 3000 ADC_LASER_THRESHOLD = 3000 # 显示参数:激光十字线样式 color = image.Color(255, 100, 0) thickness = 1 length = 2 # 全局状态变量 laser_calibration_active = False # 是否正在后台校准激光 laser_calibration_result = None # 校准结果坐标 (x, y) laser_calibration_lock = _Mutex() # 互斥锁,防止多线程冲突 # 硬件对象初始化 laser_x, laser_y = laser_point adc_obj = adc.ADC(0, adc.RES_BIT_12) bus = i2c.I2C(1, i2c.Mode.MASTER) # 使用 I2C1 总线 # bus = i2c.I2C(5, i2c.Mode.MASTER) #ota升级的 # INA226 电流/电压监测芯片寄存器地址 INA226_ADDR = 0x40 REG_CONFIGURATION = 0x00 REG_BUS_VOLTAGE = 0x02 REG_CALIBRATION = 0x05 CALIBRATION_VALUE = 0x1400 # 激光控制指令(自定义协议) MODULE_ADDR = 0x00 LASER_ON_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC1]) LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0]) # 相机标定参数(用于距离估算) # FOCAL_LENGTH_PIX = 3800.0 # 焦距(像素) FOCAL_LENGTH_PIX = 2200.0 # 焦距(像素) REAL_RADIUS_CM = 15 # 靶心实际半径(厘米) # TCP 连接状态 tcp_connected = False high_send_queue = [] # 高优先级发送队列:射箭事件等 normal_send_queue = [] # 普通发送队列:电量/校准结果/状态等 queue_lock = _Mutex() # 互斥锁,保护队列 uart4g_lock = _Mutex() # 互斥锁,保护 4G 串口 AT 发送流程(防并发) update_thread_started = False # 防止 OTA 更新线程重复启动 ota_in_progress = False # OTA(4G HTTP URC) 期间暂停 tcp_main 读取 uart4g,避免吞掉 +MHTTPURC # ==================== 工具函数 ==================== def download_file(url, filename): """从指定 URL 下载文件并保存为 UTF-8 编码文本""" try: print(f"正在从 {url} 下载文件...") response = requests.get(url) response.raise_for_status() response.encoding = 'utf-8' with open(filename, 'w', encoding='utf-8') as file: file.write(response.text) return f"下载成功!文件已保存为: {filename}" except requests.exceptions.RequestException as e: return f"下载失败!网络请求错误: {e}" except OSError as e: return f"下载失败!文件写入错误: {e}" except Exception as e: return f"下载失败!发生未知错误: {e}" def is_server_reachable(host, port=80, timeout=5): """检查目标主机端口是否可达(用于 OTA 前网络检测)""" try: addr_info = socket.getaddrinfo(host, port)[0] s = socket.socket(addr_info[0], addr_info[1], addr_info[2]) s.settimeout(timeout) s.connect(addr_info[-1]) s.close() return True except Exception as e: print(f"[NET] 无法连接 {host}:{port} - {e}") return False def apply_ota_and_reboot(ota_url=None): """ OTA 文件下载成功后:备份原 main.py -> 替换 main_tmp.py -> 重启设备 """ import shutil main_py = "/maixapp/apps/t11/main.py" main_tmp = "/maixapp/apps/t11/main_tmp.py" main_bak = "/maixapp/apps/t11/main.py.bak" ota_pending = "/maixapp/apps/t11/ota_pending.json" try: # 1. 检查下载的文件是否存在 if not os.path.exists(main_tmp): print(f"[OTA] 错误:{main_tmp} 不存在") return False # 2. 备份原 main.py(如果存在) if os.path.exists(main_py): try: shutil.copy2(main_py, main_bak) print(f"[OTA] 已备份 {main_py} -> {main_bak}") except Exception as e: print(f"[OTA] 备份失败: {e}") # 备份失败也继续(可能没有原文件) # 3. 替换:main_tmp.py -> main.py try: shutil.copy2(main_tmp, main_py) print(f"[OTA] 已替换 {main_tmp} -> {main_py}") # 确保写入磁盘 try: os.sync() # 如果系统支持 except: pass time.sleep_ms(500) # 额外等待确保写入完成 except Exception as e: print(f"[OTA] 替换失败: {e}") return False # 3.5 写入 pending(用于重启后确认成功并上报) try: pending_obj = { "ts": int(time.time()) if hasattr(time, "time") else 0, "url": ota_url or "", "tmp": main_tmp, "main": main_py, "bak": main_bak, } with open(ota_pending, "w", encoding="utf-8") as f: json.dump(pending_obj, f) try: os.sync() except: pass except Exception as e: print(f"[OTA] 写入 ota_pending 失败: {e}") # 4. 通知服务器(可选,但重启前发一次) safe_enqueue({"result": "ota_applied_rebooting"}, 2) time.sleep_ms(1000) # 给一点时间让消息发出 # 5. 重启设备 print("[OTA] 准备重启设备...") os.system("reboot") # MaixPy 通常是这个命令 return True except Exception as e: print(f"[OTA] apply_ota_and_reboot 异常: {e}") return False def direct_ota_download(ota_url): """ 直接执行 OTA 下载(假设已有网络) 用于 cmd=7 / 或 wifi 模式 """ global update_thread_started try: if not ota_url: safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2) return from urllib.parse import urlparse parsed_url = urlparse(ota_url) host = parsed_url.hostname port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80) if not is_server_reachable(host, port, timeout=8): safe_enqueue({"result": "ota_failed", "reason": f"无法连接 {host}:{port}"}, 2) return print(f"[OTA] 开始下载: {ota_url}") result_msg = download_file(ota_url, local_filename) print(f"[OTA] {result_msg}") # 检查是否下载成功(包含"成功"或"下载成功"关键字) if "成功" in result_msg or "下载成功" in result_msg: # 下载成功:备份+替换+重启 if apply_ota_and_reboot(ota_url): return # 会重启,不会执行到 finally else: safe_enqueue({"result": result_msg}, 2) except Exception as e: error_msg = f"OTA 异常: {str(e)}" print(error_msg) safe_enqueue({"result": "ota_failed", "reason": error_msg}, 2) finally: update_thread_started = False def handle_wifi_and_update(ssid, password, ota_url): """在子线程中执行 Wi-Fi 连接 + OTA 更新流程""" global update_thread_started try: ip, error = connect_wifi(ssid, password) if error: safe_enqueue({"result": "wifi_failed", "error": error}, 2) return safe_enqueue({"result": "wifi_connected", "ip": ip}, 2) # 下载 if not ota_url: safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2) return from urllib.parse import urlparse parsed_url = urlparse(ota_url) host = parsed_url.hostname port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80) if not is_server_reachable(host, port, timeout=8): err_msg = f"网络不通:无法连接 {host}:{port}" safe_enqueue({"result": err_msg}, 2) return print(f"[NET] 已确认可访问 {host}:{port},开始下载...") result = download_file(ota_url, local_filename) print(result) # 检查是否下载成功(包含"成功"或"下载成功"关键字) if "成功" in result or "下载成功" in result: # 下载成功:备份+替换+重启 if apply_ota_and_reboot(ota_url): return # 会重启,不会执行到 finally else: safe_enqueue({"result": result}, 2) finally: update_thread_started = False print("[UPDATE] 更新线程执行完毕,即将退出。") def connect_wifi(ssid, password): """ 连接 Wi-Fi 并将凭证持久化保存到 /boot/ 目录, 以便设备重启后自动连接。 """ conf_path = "/etc/wpa_supplicant.conf" ssid_file = "/boot/wifi.ssid" pass_file = "/boot/wifi.pass" try: # 生成 wpa_supplicant 配置 net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read() if "network={" not in net_conf: return None, "Failed to generate wpa config" # 写入运行时配置 with open(conf_path, "w") as f: f.write("ctrl_interface=/var/run/wpa_supplicant\n") f.write("update_config=1\n\n") f.write(net_conf) # 持久化保存 SSID/PASS(关键!) with open(ssid_file, "w") as f: f.write(ssid.strip()) with open(pass_file, "w") as f: f.write(password.strip()) # 重启 Wi-Fi 服务 os.system("/etc/init.d/S30wifi restart") # 等待获取 IP for _ in range(20): ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() if ip: return ip, None time.sleep(1) return None, "Timeout: No IP obtained" except Exception as e: return None, f"Exception: {str(e)}" def read_device_id(): """从 /device_key 文件读取设备唯一 ID,失败则使用默认值""" try: with open("/device_key", "r") as f: device_id = f.read().strip() if device_id: print(f"[INFO] 从 /device_key 读取到 DEVICE_ID: {device_id}") return device_id except Exception as e: print(f"[ERROR] 无法读取 /device_key: {e}") return "DEFAULT_DEVICE_ID" def safe_enqueue(data_dict, msg_type=2, high=False): """线程安全地将消息加入 TCP 发送队列(支持优先级)""" global queue_lock, high_send_queue, normal_send_queue item = (msg_type, data_dict) with queue_lock: if high: high_send_queue.append(item) else: normal_send_queue.append(item) def _uart4g_lock_acquire(): # 兼容旧调用:建议改为 `with uart4g_lock:` uart4g_lock.acquire(True) def _uart4g_lock_release(): # 兼容旧调用:建议改为 `with uart4g_lock:` uart4g_lock.release() def at(cmd, wait="OK", timeout=2000): """向 4G 模块发送 AT 指令并等待响应""" if cmd: uart4g.write((cmd + "\r\n").encode()) t0 = time.ticks_ms() buf = b"" while time.ticks_ms() - t0 < timeout: data = uart4g.read() if data: buf += data if wait.encode() in buf: return buf.decode(errors="ignore") return buf.decode(errors="ignore") def make_packet(msg_type: int, body_dict: dict) -> bytes: """打包 TCP 数据包:头部(长度+类型+校验)+ JSON 正文""" body = json.dumps(body_dict).encode() body_len = len(body) checksum = body_len + msg_type header = struct.pack(">III", body_len, msg_type, checksum) return header + body def parse_packet(data: bytes): """解析 TCP 数据包,返回 (类型, 正文字典)""" if len(data) < 12: return None, None body_len, msg_type, checksum = struct.unpack(">III", data[:12]) body = data[12:12 + body_len] try: return msg_type, json.loads(body.decode()) except: return msg_type, {"raw": body.decode(errors="ignore")} def tcp_send_raw(data: bytes, max_retries=2) -> bool: """通过 4G 模块的 AT 指令发送原始 TCP 数据包""" global tcp_connected if not tcp_connected: return False with uart4g_lock: for attempt in range(max_retries): cmd = f'AT+MIPSEND=0,{len(data)}' if ">" not in at(cmd, ">", 1500): time.sleep_ms(100) continue time.sleep_ms(10) full = data + b"\x1A" # AT 指令结束符 try: sent = uart4g.write(full) if sent != len(full): continue except: continue if "OK" in at("", "OK", 1000): return True time.sleep_ms(100) return False def generate_token(device_id): """生成用于 HTTP 接口鉴权的 Token(HMAC-SHA256)""" SALT = "shootMessageFire" SALT2 = "shoot" return "Arrow_" + hmac.new((SALT + device_id).encode(), SALT2.encode(), hashlib.sha256).hexdigest() def send_http_cmd(cmd_str, timeout_ms=3000): """发送 HTTP 相关 AT 指令(调试用)""" print("[HTTP AT] =>", cmd_str) return at(cmd_str, "OK", timeout_ms) def read_http_response(timeout_ms=5000): """读取并打印 HTTP 响应(用于调试)""" start = time.ticks_ms() while time.ticks_ms() - start < timeout_ms: data = uart4g.read(128) if data: try: print("📡 HTTP 响应:", data.decode("utf-8", "ignore").strip()) except: print("📡 响应(raw):", data) time.sleep_ms(100) def upload_shoot_event(json_data): """通过 4G 模块上报射击事件到 HTTP 接口(备用通道)""" token = generate_token(DEVICE_ID) if not send_http_cmd(f'AT+MHTTPCREATE="{URL}"'): return False instance_id = 0 send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"Content-Type: application/json"') send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"Authorization: {token}"') send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"DeviceId: {DEVICE_ID}"') json_str = ujson.dumps(json_data) if not send_http_cmd(f'AT+MHTTPCONTENT={instance_id},0,0,"{json_str}"'): return False if send_http_cmd(f'AT+MHTTPREQUEST={instance_id},2,0,"{API_PATH}"'): read_http_response() return True return False def load_laser_point(): """从配置文件加载激光中心点,失败则使用默认值""" global laser_point try: if "laser_config.json" in os.listdir("/root"): with open(CONFIG_FILE, "r") as f: data = json.load(f) if isinstance(data, list) and len(data) == 2: laser_point = (int(data[0]), int(data[1])) print(f"[INFO] 加载激光点: {laser_point}") else: raise ValueError else: laser_point = DEFAULT_POINT except: laser_point = DEFAULT_POINT def save_laser_point(point): """保存激光中心点到配置文件""" global laser_point try: with open(CONFIG_FILE, "w") as f: json.dump([point[0], point[1]], f) laser_point = point except: pass def turn_on_laser(): """发送指令开启激光,并读取回包(部分模块支持)""" distance_serial.write(LASER_ON_CMD) time.sleep_ms(10) resp = distance_serial.read(20) if resp: if resp == LASER_ON_CMD: print("✅ 激光指令已确认") else: print("🔇 无回包(正常或模块不支持)") return resp def find_red_laser(frame, threshold=150): """在图像中查找最亮的红色激光点(基于 RGB 阈值)""" w, h = frame.width(), frame.height() img_bytes = frame.to_bytes() max_sum = 0 best_pos = None for y in range(0, h, 2): for x in range(0, w, 2): idx = (y * w + x) * 3 r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2] if r > threshold and r > g * 2 and r > b * 2: rgb_sum = r + g + b if rgb_sum > max_sum: max_sum = rgb_sum best_pos = (x, y) return best_pos def calibrate_laser_position(): """执行一次激光校准:拍照 → 找红点 → 保存坐标""" global laser_x, laser_y time.sleep_ms(80) cam = camera.Camera(640, 480) frame = cam.read() pos = find_red_laser(frame) if pos: laser_x, laser_y = pos save_laser_point(pos) return pos return None # ==================== 电源管理(INA226) ==================== def write_register(reg, value): data = [(value >> 8) & 0xFF, value & 0xFF] bus.writeto_mem(INA226_ADDR, reg, bytes(data)) def read_register(reg): data = bus.readfrom_mem(INA226_ADDR, reg, 2) return (data[0] << 8) | data[1] def init_ina226(): """初始化 INA226 芯片:配置模式 + 校准值""" write_register(REG_CONFIGURATION, 0x4527) write_register(REG_CALIBRATION, CALIBRATION_VALUE) def get_bus_voltage(): """读取总线电压(单位:V)""" raw = read_register(REG_BUS_VOLTAGE) return raw * 1.25 / 1000 def voltage_to_percent(voltage): """根据电压估算电池百分比(查表插值)""" points = [ (4.20, 100), (4.10, 95), (4.05, 85), (4.00, 75), (3.95, 65), (3.90, 55), (3.85, 45), (3.80, 35), (3.75, 25), (3.70, 15), (3.65, 5), (3.60, 0) ] if voltage >= points[0][0]: return 100 if voltage <= points[-1][0]: return 0 for i in range(len(points) - 1): v1, p1 = points[i] v2, p2 = points[i + 1] if voltage >= v2: ratio = (voltage - v1) / (v2 - v1) percent = p1 + (p2 - p1) * ratio return max(0, min(100, int(round(percent)))) return 0 # ==================== 靶心检测与距离计算 ==================== def detect_circle(frame): """检测图像中的靶心(优先清晰轮廓,其次黄色区域)""" global REAL_RADIUS_CM img_cv = image.image2cv(frame, False, False) gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY) blurred = cv2.GaussianBlur(gray, (5, 5), 0) edged = cv2.Canny(blurred, 50, 150) kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) ceroded = cv2.erode(cv2.dilate(edged, kernel), kernel) contours, _ = cv2.findContours(ceroded, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) best_center = best_radius = best_radius1 = method = None # 方法1:基于轮廓拟合椭圆(清晰靶心) for cnt in contours: area = cv2.contourArea(cnt) perimeter = cv2.arcLength(cnt, True) if perimeter < 100 or area < 100: continue circularity = 4 * np.pi * area / (perimeter ** 2) if circularity > 0.75 and len(cnt) >= 5: center, axes, angle = cv2.fitEllipse(cnt) radius = axes[0] if axes[1] < radius: radius = axes[1] radius /= 2 best_center = (int(center[0]), int(center[1])) best_radius = int(radius) best_radius1 = best_radius REAL_RADIUS_CM = 15 method = "清晰" break # 方法2:基于 HSV 黄色掩码(模糊靶心) if not best_center: hsv = cv2.cvtColor(img_cv, cv2.COLOR_BGR2HSV) h, s, v = cv2.split(hsv) s = np.clip(s * 2, 0, 255).astype(np.uint8) hsv = cv2.merge((h, s, v)) lower_yellow = np.array([7, 80, 0]) upper_yellow = np.array([32, 255, 182]) mask = cv2.inRange(hsv, lower_yellow, upper_yellow) kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel) mask = cv2.morphologyEx(mask, cv2.MORPH_DILATE, kernel) contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if contours: largest = max(contours, key=cv2.contourArea) if cv2.contourArea(largest) > 50: (x, y), radius = cv2.minEnclosingCircle(largest) best_center = (int(x), int(y)) best_radius = int(radius) best_radius1 = best_radius REAL_RADIUS_CM = 15 method = "模糊" result_img = image.cv2image(img_cv, False, False) return result_img, best_center, best_radius, method, best_radius1 def estimate_distance(pixel_radius): """根据像素半径估算实际距离(单位:米)""" if not pixel_radius: return 0.0 return (REAL_RADIUS_CM * FOCAL_LENGTH_PIX) / pixel_radius / 100.0 def compute_laser_position(circle_center, laser_point, radius, method): """计算激光相对于靶心的偏移量(单位:厘米)""" if not all([circle_center, radius, method]): return None, None cx, cy = circle_center lx, ly = laser_point # 根据检测方法动态调整靶心物理半径(简化模型) circle_r = (radius / 4.0) * 20.0 if method == "模糊" else (68 / 16.0) * 20.0 dx = lx - cx dy = ly - cy return dx / (circle_r / 100.0), -dy / (circle_r / 100.0) # ==================== TCP 通信线程 ==================== def connect_server(): """通过 4G 模块建立 TCP 连接""" global tcp_connected if tcp_connected: return True print("连接到服务器...") with uart4g_lock: at("AT+MIPCLOSE=0", "OK", 1000) res = at(f'AT+MIPOPEN=0,"TCP","{SERVER_IP}",{SERVER_PORT}', "+MIPOPEN", 8000) if "+MIPOPEN: 0,0" in res: tcp_connected = True return True return False def tcp_main(): """TCP 主通信循环:登录、心跳、处理指令、发送数据""" global tcp_connected, high_send_queue, normal_send_queue, queue_lock, laser_calibration_active, laser_calibration_result, laser_calibration_lock, update_thread_started send_hartbeat_fail_count = 0 while not app.need_exit(): if not connect_server(): time.sleep_ms(5000) continue # 发送登录包 login_data = {"deviceId": DEVICE_ID, "password": PASSWORD} if not tcp_send_raw(make_packet(1, login_data)): tcp_connected = False time.sleep_ms(2000) continue print("➡️ 登录包已发送,等待确认...") logged_in = False last_heartbeat_ack_time = time.ticks_ms() last_heartbeat_send_time = time.ticks_ms() rx_buf = b"" while True: # 接收数据 # OTA(4G HTTP) 会从 uart4g 吐出大量 +MHTTPURC 数据; # tcp_main 若在此 read,会把 URC 吃掉,导致 OTA empty_body/incomplete_body。 # 同时 uart4g_lock 置 True 时也不要抢读。 if ota_in_progress: time.sleep_ms(20) continue data = uart4g.read() if data: rx_buf += data # 解析 +MIPURC 消息 while b'+MIPURC: "rtcp"' in rx_buf: try: match = re.search(b'\+MIPURC: "rtcp",0,(\d+),(.+)', rx_buf, re.DOTALL) if match: payload_len = int(match.group(1)) payload = match.group(2)[:payload_len] msg_type, body = parse_packet(payload) # 处理登录响应 if not logged_in and msg_type == 1: if body and body.get("cmd") == 1 and body.get("data") == "登录成功": logged_in = True last_heartbeat_ack_time = time.ticks_ms() print("✅ 登录成功") # 若存在 ota_pending.json,说明上次 OTA 已应用并重启; # 这里以“能成功登录服务器”为 OTA 成功判据:上报 ota_ok 并删除 pending,确保只上报一次。 try: pending_path = "/maixapp/apps/t11/ota_pending.json" if os.path.exists(pending_path): try: with open(pending_path, "r", encoding="utf-8") as f: pending_obj = json.load(f) except: pending_obj = {} safe_enqueue({"result": "ota_ok", "url": pending_obj.get("url", "")}, 2) try: os.remove(pending_path) except: pass except Exception as e: print(f"[OTA] ota_ok 上报失败: {e}") else: break # 处理心跳 ACK elif logged_in and msg_type == 4: last_heartbeat_ack_time = time.ticks_ms() print("✅ 收到心跳确认") # 处理业务指令 elif logged_in and isinstance(body, dict): if isinstance(body.get("data"), dict) and "cmd" in body["data"]: inner_cmd = body["data"]["cmd"] if inner_cmd == 2: # 开启激光并校准 turn_on_laser() time.sleep_ms(100) laser_calibration_active = True safe_enqueue({"result": "calibrating"}, 2) elif inner_cmd == 3: # 关闭激光 distance_serial.write(LASER_OFF_CMD) laser_calibration_active = False safe_enqueue({"result": "laser_off"}, 2) elif inner_cmd == 4: # 上报电量 voltage = get_bus_voltage() battery_percent = voltage_to_percent(voltage) battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)} safe_enqueue(battery_data, 2) print(f"🔋 电量上报: {battery_percent}%") elif inner_cmd == 5: # OTA 升级(含 Wi-Fi 配置,及4g) inner_data = body["data"].get("data", {}) ssid = inner_data.get("ssid") password = inner_data.get("password") ota_url = inner_data.get("url") mode = (inner_data.get("mode") or "").strip().lower() # "4g"/"wifi"/"" if not ota_url: print("ota missing_url") safe_enqueue({"result": "missing_url"}, 2) rx_buf = rx_buf[match.end():] # 关键:先消费掉这个包 continue # 自动判断:mode 非法/为空时,优先 Wi-Fi(如果已连),否则 4G if mode not in ("4g", "wifi"): print("ota missing mode") mode = "wifi" if is_wifi_connected() else "4g" if update_thread_started: safe_enqueue({"result": "update_already_started"}, 2) rx_buf = rx_buf[match.end():] # 关键:先消费掉这个包 continue update_thread_started = True if mode == "4g": _thread.start_new_thread(direct_ota_download_via_4g, (ota_url,)) else: # wifi 模式:需要 ssid/password if not ssid or not password: update_thread_started = False safe_enqueue({"result": "missing_ssid_or_password"}, 2) else: _thread.start_new_thread(handle_wifi_and_update, (ssid, password, ota_url)) elif inner_cmd == 6: try: ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() ip = ip if ip else "no_ip" except: ip = "error_getting_ip" safe_enqueue({"result": "current_ip", "ip": ip}, 2) elif inner_cmd == 7: # global update_thread_started if update_thread_started: safe_enqueue({"result": "update_already_started"}, 2) rx_buf = rx_buf[match.end():] # 关键:先消费掉这个包 continue # 实时检查是否有 IP try: ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() except: ip = None if not ip: safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, MSG_TYPE_STATUS) else: # 启动纯下载线程 update_thread_started = True _thread.start_new_thread(direct_ota_download, ()) rx_buf = rx_buf[match.end():] else: break except: rx_buf = b"" break # 发送队列中的业务数据 if logged_in and (high_send_queue or normal_send_queue): # 只在锁内取出一个待发包,发送放到锁外,避免长时间占用队列锁 msg_type = None data_dict = None if queue_lock.try_acquire(): try: if high_send_queue: msg_type, data_dict = high_send_queue.pop(0) elif normal_send_queue: msg_type, data_dict = normal_send_queue.pop(0) finally: queue_lock.release() if msg_type is not None and data_dict is not None: pkt = make_packet(msg_type, data_dict) if not tcp_send_raw(pkt): tcp_connected = False break # 发送激光校准结果 if logged_in and laser_calibration_result is not None: x = y = None with laser_calibration_lock: if laser_calibration_result is not None: x, y = laser_calibration_result laser_calibration_result = None if x is not None and y is not None: safe_enqueue({"result": "ok", "x": x, "y": y}, 2) # 定期发送心跳 current_time = time.ticks_ms() if logged_in and current_time - last_heartbeat_send_time > HEARTBEAT_INTERVAL * 1000: if not tcp_send_raw(make_packet(4, {"t": int(time.time())})): print("💔 心跳发送失败") send_hartbeat_fail_count += 1 if send_hartbeat_fail_count >= 3: send_hartbeat_fail_count = 0 print("连续3次发送心跳失败,重连") break else: continue else: send_hartbeat_fail_count = 0 last_heartbeat_send_time = current_time print("💓 心跳已发送") # 心跳超时重连 if logged_in and current_time - last_heartbeat_ack_time > 1000*60*10: # 十分钟 print("⏰ 十分钟无心跳ACK,重连") break time.sleep_ms(50) tcp_connected = False print("🔌 连接异常,2秒后重连...") time.sleep_ms(2000) def laser_calibration_worker(): """后台线程:持续检测是否需要执行激光校准""" global laser_calibration_active, laser_calibration_result, laser_calibration_lock while True: if laser_calibration_active: result = calibrate_laser_position() if result and len(result) == 2: with laser_calibration_lock: laser_calibration_result = result laser_calibration_active = False print(f"✅ 后台校准成功: {result}") else: time.sleep_ms(80) else: time.sleep_ms(50) def download_file_via_4g(url, filename, total_timeout_ms=30000, retries=3, debug=False): """ ML307R HTTP 下载(URC content 分片模式) - 重试:empty/incomplete/AT错误都会重试 - 超时:total_timeout_ms - 校验:Content-Length 必须填满;如有 Content-Md5 且 hashlib 可用则校验 MD5 - 日志:默认干净;debug=True 才打印 URC 进度 """ from urllib.parse import urlparse parsed = urlparse(url) host = parsed.hostname path = parsed.path or "/" base_url = f"http://{host}" # 你已验证 HTTP 可 200;如需 https 需另配 SSL def _log(*args): if debug: print(*args) def _get_ip(): r = at("AT+CGPADDR=1", "OK", 3000) m = re.search(r'\+CGPADDR:\s*1,"([^"]+)"', r) return m.group(1) if m else "" def _drain_uart(max_ms=300): """清空串口残留,防止旧 URC 干扰""" t0 = time.ticks_ms() while time.ticks_ms() - t0 < max_ms: d = uart4g.read(4096) if not d: time.sleep_ms(10) continue def _read_more(buf: bytes, ms=50) -> bytes: t0 = time.ticks_ms() while time.ticks_ms() - t0 < ms: d = uart4g.read(4096) if d: buf += d time.sleep_ms(1) return buf def _parse_httpid(raw: bytes): m = re.search(rb"\+MHTTPCREATE:\s*(\d+)", raw) return int(m.group(1)) if m else None def _try_parse_header(buf: bytes): """ +MHTTPURC: "header",,,, 返回 (urc_id, status_code, header_text, rest_buf) 或 None """ tag = b'+MHTTPURC: "header",' i = buf.find(tag) if i < 0: return None if i > 0: buf = buf[i:] i = 0 # header 在 hdr_text 里包含 \r\n;我们用 hdr_len 精确截取 j = i + len(tag) comma_count = 0 k = j while k < len(buf) and comma_count < 3: # 3 个逗号到 hdr_len 后的逗号 if buf[k:k+1] == b",": comma_count += 1 k += 1 if comma_count < 3: return None header_prefix = buf[i:k] m = re.search(rb'\+MHTTPURC: "header",\s*(\d+),\s*(\d+),\s*(\d+),', header_prefix) if not m: return ("drop", buf[1:]) urc_id = int(m.group(1)) code = int(m.group(2)) hdr_len = int(m.group(3)) text_start = k text_end = text_start + hdr_len if len(buf) < text_end: return None hdr_text = buf[text_start:text_end].decode("utf-8", "ignore") rest = buf[text_end:] return ("ok", urc_id, code, hdr_text, rest) def _try_parse_one_content(buf: bytes): """ +MHTTPURC: "content",,,,, 返回 ("ok", urc_id, total_len, sum_len, cur_len, payload_bytes, rest_buf) 或 None """ tag = b'+MHTTPURC: "content",' i = buf.find(tag) if i < 0: return None if i > 0: buf = buf[i:] i = 0 j = i + len(tag) comma_count = 0 k = j while k < len(buf) and comma_count < 4: # payload 前只有 4 个逗号 if buf[k:k+1] == b",": comma_count += 1 k += 1 if comma_count < 4: return None prefix = buf[i:k] m = re.search(rb'\+MHTTPURC: "content",\s*(\d+),\s*(\d+),\s*(\d+),\s*(\d+),', prefix) if not m: return ("drop", buf[1:]) urc_id = int(m.group(1)) total_len = int(m.group(2)) sum_len = int(m.group(3)) cur_len = int(m.group(4)) payload_start = k payload_end = payload_start + cur_len if len(buf) < payload_end: return None payload = buf[payload_start:payload_end] rest = buf[payload_end:] return ("ok", urc_id, total_len, sum_len, cur_len, payload, rest) def _extract_hdr_fields(hdr_text: str): # Content-Length mlen = re.search(r"Content-Length:\s*(\d+)", hdr_text, re.IGNORECASE) clen = int(mlen.group(1)) if mlen else None # Content-Md5 (base64) mmd5 = re.search(r"Content-Md5:\s*([A-Za-z0-9+/=]+)", hdr_text, re.IGNORECASE) md5_b64 = mmd5.group(1).strip() if mmd5 else None return clen, md5_b64 def _md5_base64(data: bytes) -> str: if hashlib is None: return "" digest = hashlib.md5(data).digest() # base64: 24 chars with == return binascii.b2a_base64(digest).decode().strip() def _one_attempt(): # 0) PDP:确保有 IP(避免把 OK 当成功) ip = _get_ip() if not ip or ip == "0.0.0.0": at("AT+MIPCALL=1,1", "OK", 15000) for _ in range(10): ip = _get_ip() if ip and ip != "0.0.0.0": break time.sleep(1) if not ip or ip == "0.0.0.0": return False, "PDP not ready (no_ip)" # 1) 清理旧实例 + 清空串口残留 for i in range(0, 6): at(f"AT+MHTTPDEL={i}", "OK", 1500) _drain_uart(300) # 2) 创建实例(httpid 可能延迟吐出来) uart4g.write((f'AT+MHTTPCREATE="{base_url}"\r\n').encode()) raw = b"" raw = _read_more(raw, 300) raw = _read_more(raw, 2000) httpid = _parse_httpid(raw) if httpid is None: return False, "MHTTPCREATE failed (no httpid)" # 3) 发 GET:不用 at()(避免 at 吃掉 URC) _drain_uart(100) uart4g.write((f'AT+MHTTPREQUEST={httpid},1,0,"{path}"\r\n').encode()) # 4) 收 URC:先拿 header(code/length/md5),再收 content 分片 buf = b"" urc_id = None status_code = None total_len = None expect_len = None expect_md5_b64 = None body_buf = None got_ranges = set() filled_bytes = 0 t0 = time.ticks_ms() last_sum = 0 while time.ticks_ms() - t0 < total_timeout_ms: buf = _read_more(buf, 50) # 4.1 尝试解析 header(可能先来) while True: ph = _try_parse_header(buf) if ph is None: break if ph[0] == "drop": buf = ph[1] continue _, hid, code, hdr_text, rest = ph buf = rest if urc_id is None: urc_id = hid status_code = code expect_len, expect_md5_b64 = _extract_hdr_fields(hdr_text) _log(f"[HDR] id={hid} code={code} len={expect_len} md5={expect_md5_b64}") # 4.2 解析尽可能多的 content while True: pc = _try_parse_one_content(buf) if pc is None: break if pc[0] == "drop": buf = pc[1] continue _, cid, _total, _sum, _cur, payload, rest = pc buf = rest if urc_id is None: urc_id = cid if cid != urc_id: continue if total_len is None: total_len = _total body_buf = bytearray(total_len) # 定位写入 start = _sum - _cur end = _sum if start < 0 or end > total_len: continue key = (start, end) if key not in got_ranges: got_ranges.add(key) body_buf[start:end] = payload filled_bytes += (end - start) if _sum > last_sum: last_sum = _sum if debug: _log(f"[URC] {start}:{end} sum={_sum}/{total_len} filled={filled_bytes}") # 完整条件:填满 total_len if filled_bytes == total_len: break if total_len is not None and filled_bytes == total_len: break # 5) 清理实例 at(f"AT+MHTTPDEL={httpid}", "OK", 3000) if body_buf is None: return False, "empty_body" if total_len is None: return False, "no_total_len" if filled_bytes != total_len: return False, f"incomplete_body got={filled_bytes} expected={total_len}" data = bytes(body_buf) # 6) 校验:Content-Length if expect_len is not None and len(data) != expect_len: return False, f"length_mismatch got={len(data)} expected={expect_len}" # 7) 校验:Content-Md5(base64) if expect_md5_b64 and hashlib is not None: md5_b64 = _md5_base64(data) if md5_b64 != expect_md5_b64: return False, f"md5_mismatch got={md5_b64} expected={expect_md5_b64}" # 8) 写文件(原样 bytes) with open(filename, "wb") as f: f.write(data) return True, f"OK size={len(data)} ip={ip} code={status_code}" global ota_in_progress ota_in_progress = True with uart4g_lock: try: last_err = "unknown" for attempt in range(1, retries + 1): ok, msg = _one_attempt() if ok: return True, msg last_err = msg # 重试前等待 + 清 UART _log(f"[RETRY] attempt={attempt} failed={msg}") _drain_uart(500) time.sleep_ms(300) return False, f"FAILED after {retries} retries: {last_err}" finally: ota_in_progress = False def direct_ota_download_via_4g(ota_url): """通过 4G 模块下载 OTA(不需要 Wi-Fi)""" global update_thread_started try: if not ota_url: safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2) return print(f"[OTA-4G] 开始通过 4G 下载: {ota_url}") success, msg = download_file_via_4g(ota_url, local_filename) print(f"[OTA-4G] {msg}") if success and "OK" in msg: # 下载成功:备份+替换+重启 if apply_ota_and_reboot(ota_url): return # 会重启,不会执行到 finally else: safe_enqueue({"result": msg}, 2) except Exception as e: error_msg = f"OTA-4G 异常: {str(e)}" print(error_msg) safe_enqueue({"result": "ota_failed", "reason": error_msg}, 2) finally: update_thread_started = False # ==================== 主程序入口 ==================== def cmd_str(): global DEVICE_ID, PASSWORD # print("env: ", config.get_env()) DEVICE_ID = read_device_id() PASSWORD = DEVICE_ID + "." # 创建照片存储目录 photo_dir = "/root/phot" if photo_dir not in os.listdir("/root"): try: os.mkdir(photo_dir) except: pass # 初始化硬件 init_ina226() # load_laser_point() disp = display.Display() cam = camera.Camera(640, 480) # 启动通信与校准线程 _thread.start_new_thread(tcp_main, ()) _thread.start_new_thread(laser_calibration_worker, ()) print("系统准备完成...") last_adc_trigger = 0 # 主循环:检测扳机触发 → 拍照 → 分析 → 上报 while not app.need_exit(): current_time = time.ticks_ms() # print("压力传感器数值: ", adc_obj.read()) if adc_obj.read() > ADC_TRIGGER_THRESHOLD: diff_ms = current_time-last_adc_trigger if diff_ms<3000: continue last_adc_trigger = current_time time.sleep_ms(60) # 防抖 frame = cam.read() x, y = laser_point # 绘制激光十字线 frame.draw_line(int(x - length), int(y), int(x + length), int(y), color, thickness) frame.draw_line(int(x), int(y - length), int(x), int(y + length), color, thickness) frame.draw_circle(int(x), int(y), 1, color, thickness) # 检测靶心 result_img, center, radius, method, best_radius1 = detect_circle(frame) disp.show(result_img) # 计算偏移与距离 dx, dy = compute_laser_position(center, (x, y), radius, method) distance_m = estimate_distance(best_radius1) # 读取电量 voltage = get_bus_voltage() battery_percent = voltage_to_percent(voltage) # 保存图像(带标注) try: jpg_count = len([f for f in os.listdir(photo_dir) if f.endswith('.jpg')]) filename = f"{photo_dir}/{int(x)}_{int(y)}_{round((distance_m or 0.0) * 100)}_{method}_{jpg_count:04d}.jpg" result_img.save(filename, quality=70) except Exception as e: print(f"❌ 保存失败: {e}") # 构造上报数据 inner_data = { "x": float(dx) if dx is not None else 200.0, "y": float(dy) if dy is not None else 200.0, "r": 90.0, "d": round((distance_m or 0.0) * 100), # 距离(厘米) "m": method } report_data = {"cmd": 1, "data": inner_data} # 射箭事件高优先级入队,由 tcp_main 统一发送 safe_enqueue(report_data, msg_type=2, high=True) print("📤 射箭事件已加入发送队列") time.sleep_ms(100) else: disp.show(cam.read()) time.sleep_ms(50) def dump_system_info(): cmds = [ "uname -a", "cat /etc/os-release 2>/dev/null", "cat /proc/version 2>/dev/null", "cat /proc/1/comm 2>/dev/null", "ps 2>/dev/null | head -n 5", "ls -l /sbin/init 2>/dev/null", "ls -l /etc/init.d 2>/dev/null | head -n 10", "which systemctl 2>/dev/null", "which rc-service 2>/dev/null", "which busybox 2>/dev/null && busybox | head -n 1", "ls /dev/watchdog* 2>/dev/null", ] for c in cmds: try: out = os.popen(c).read() print("\n$ " + c + "\n" + (out.strip() or "")) except Exception as e: print("\n$ " + c + "\n " + str(e)) if __name__ == "__main__": # dump_system_info() try: import threading print("threading module:", threading) print("has Lock:", hasattr(threading, "Lock")) if hasattr(threading, "Lock"): print("has lock") finally: pass cmd_str()