diff --git a/app.yaml b/app.yaml index d15c02d..90cac47 100644 --- a/app.yaml +++ b/app.yaml @@ -1,8 +1,20 @@ id: t11 name: t11 -version: 1.0.3 +version: 1.1.1 author: t11 icon: '' desc: t11 files: + - app.yaml + - at_client.py + - config.py + - hardware.py + - laser_manager.py + - logger_manager.py - main.py + - network.py + - ota_manager.py + - power.py + - time_sync.py + - version.py + - vision.py diff --git a/config.py b/config.py index b62fe63..d47781e 100644 --- a/config.py +++ b/config.py @@ -50,6 +50,8 @@ ADC_LASER_THRESHOLD = 3000 MODULE_ADDR = 0x00 LASER_ON_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC1]) LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0]) +DISTANCE_QUERY_CMD = bytes([0xAA, MODULE_ADDR, 0x00, 0x20, 0x00, 0x01, 0x00, 0x00, 0x21]) # 激光测距查询命令 +DISTANCE_RESPONSE_LEN = 13 # 激光测距响应数据长度(字节) DEFAULT_LASER_POINT = (640, 480) # 默认激光中心点 # ==================== 视觉检测配置 ==================== diff --git a/laser_manager.py b/laser_manager.py index 0555de5..229b303 100644 --- a/laser_manager.py +++ b/laser_manager.py @@ -6,6 +6,7 @@ """ import json import os +import binascii from maix import time, camera import threading import config @@ -31,7 +32,7 @@ class LaserManager: self._calibration_result = None self._calibration_lock = threading.Lock() self._laser_point = None - + self._laser_turned_on = False self._initialized = True # ==================== 状态访问(只读属性)==================== @@ -107,20 +108,23 @@ class LaserManager: if logger: logger.info(f"[LASER] 写入字节数: {written}") - time.sleep_ms(50) # 增加等待时间,让模块有时间响应 - + return None + + # TODO: 暂时去掉这个等待 # 读取回包 - resp = hardware_manager.distance_serial.read(20) - if resp: - if logger: - logger.info(f"[LASER] 收到回包 ({len(resp)}字节): {resp.hex()}") - if resp == config.LASER_ON_CMD: - if logger: - logger.info("✅ 激光开启指令已确认") - else: - if logger: - logger.warning("🔇 无回包(可能正常或模块不支持回包)") - return resp + # print("before read:", time.ticks_ms()) + # resp = hardware_manager.distance_serial.read(len=20,timeout=10) + # print("after read:", time.ticks_ms()) + # if resp: + # if logger: + # logger.info(f"[LASER] 收到回包 ({len(resp)}字节): {resp.hex()}") + # if resp == config.LASER_ON_CMD: + # if logger: + # logger.info("✅ 激光开启指令已确认") + # else: + # if logger: + # logger.warning("🔇 无回包(可能正常或模块不支持回包)") + # return resp def turn_off_laser(self): """发送指令关闭激光""" @@ -147,17 +151,18 @@ class LaserManager: if logger: logger.info(f"[LASER] 写入字节数: {written}") - time.sleep_ms(50) # 增加等待时间 # 读取回包 - resp = hardware_manager.distance_serial.read(20) - if resp: - if logger: - logger.info(f"[LASER] 收到回包 ({len(resp)}字节): {resp.hex()}") - else: - if logger: - logger.warning("🔇 无回包") - return resp + # resp = hardware_manager.distance_serial.read(20) + # if resp: + # if logger: + # logger.info(f"[LASER] 收到回包 ({len(resp)}字节): {resp.hex()}") + # else: + # if logger: + # logger.warning("🔇 无回包") + # return resp + # 不用读回包 + return None def flash_laser(self, duration_ms=1000): """闪一下激光(用于射箭反馈)""" @@ -223,6 +228,134 @@ class LaserManager: result = self._calibration_result self._calibration_result = None return result + + def parse_bcd_distance(self, bcd_bytes: bytes) -> float: + """将 4 字节 BCD 码转换为距离(米)""" + if len(bcd_bytes) != 4: + return 0.0 + try: + hex_string = binascii.hexlify(bcd_bytes).decode() + distance_int = int(hex_string) + return distance_int / 1000.0 + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[LASER] BCD 解析失败: {e}") + return 0.0 + + def read_distance_from_laser_sensor(self) -> float: + """发送测距指令并返回距离(米)""" + from hardware import hardware_manager + logger = logger_manager.logger + + if hardware_manager.distance_serial is None: + if logger: + logger.error("[LASER] distance_serial 未初始化") + return 0.0 + + try: + # 清空缓冲区 + try: + hardware_manager.distance_serial.read(-1) + except: + pass + # 打开激光 + + self.turn_on_laser() + self._laser_turned_on = True + time.sleep_ms(40) # 需要一定时间让激光稳定 + # 发送测距查询命令 + hardware_manager.distance_serial.write(config.DISTANCE_QUERY_CMD) + # time.sleep_ms(500) # 测试结果:这里的等待没有用! + self.turn_off_laser() + self._laser_turned_on = False + + # 这里的等待才是有效的!大概350ms能读到数据 + # 循环读取响应,最多等待500ms + start_time = time.ticks_ms() + max_wait_ms = 500 + response = None + + while True: + # 检查是否超时 + elapsed_ms = time.ticks_diff(start_time,time.ticks_ms()) + print("elapsed_ms:", elapsed_ms) + if elapsed_ms >= max_wait_ms: + if logger: + logger.warning(f"[LASER] 读取超时 ({elapsed_ms}ms),未收到完整响应") + return 0.0 + + # 尝试读取数据 + response = hardware_manager.distance_serial.read(config.DISTANCE_RESPONSE_LEN) + + # 如果读到完整数据,立即返回 + if response and len(response) == config.DISTANCE_RESPONSE_LEN: + elapsed_ms = time.ticks_diff(start_time,time.ticks_ms()) + if logger: + logger.debug(f"[LASER] 收到响应 ({elapsed_ms}ms)") + break + + # 如果还没超时,短暂等待后继续尝试 + time.sleep_ms(10) # 每次循环等待10ms,避免CPU占用过高 + + # 验证响应格式 + if response and len(response) == config.DISTANCE_RESPONSE_LEN: + if response[3] != 0x20: + if response[0] == 0xEE: + err_code = (response[7] << 8) | response[8] + if logger: + logger.warning(f"[LASER] 模块错误代码: {hex(err_code)}") + return 0.0 + + # 解析BCD码距离 + bcd_bytes = response[6:10] + distance_value_m = self.parse_bcd_distance(bcd_bytes) + signal_quality = (response[10] << 8) | response[11] + + if logger: + logger.debug(f"[LASER] 测距成功: {distance_value_m:.3f} m, 信号质量: {signal_quality}") + return distance_value_m + + if logger: + logger.warning(f"[LASER] 无效响应: {response.hex() if response else 'None'}") + return 0.0 + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[LASER] 读取激光测距失败: {e}") + return 0.0 + + def quick_measure_distance(self) -> float: + """ + 快速激光测距:打开激光 → 测距 → 关闭激光 + 激光开启时间最小化(约500-600ms),尽量不让用户觉察到 + 返回距离(米),失败返回0.0 + """ + logger = logger_manager.logger + self._laser_turned_on = False + + try: + + + # 等待激光稳定(最小延迟) + # time.sleep_ms(50) + + # 读取距离 + distance_m = self.read_distance_from_laser_sensor() + + return distance_m + except Exception as e: + if logger: + logger.error(f"[LASER] 快速测距异常: {e}") + return 0.0 + finally: + # 确保激光关闭 + if self._laser_turned_on: + try: + self.turn_off_laser() + except Exception as e: + if logger: + logger.error(f"[LASER] 关闭激光失败: {e}") # 创建全局单例实例 diff --git a/main.py b/main.py index 8371e35..5d3ad71 100644 --- a/main.py +++ b/main.py @@ -6,2078 +6,419 @@ 平台:MaixPy (Sipeed MAIX) 作者:ZZH 最后更新:2025-11-21 + +重构版本:使用模块化设计 """ -from maix import camera, display, image, app, time, key, uart, pinmap, i2c, network, err -import cv2 -import numpy as np -import json -import struct -import re +from maix import camera, display, image, app, time, uart, pinmap, i2c from maix.peripheral import adc import _thread import os -import hmac -import ujson -import hashlib -import requests -import socket -import re -import binascii - -try: - import hashlib -except: - hashlib = None - -# import config - -# ==================== Locks ==================== - -class _Mutex: - """ - 基于 _thread.allocate_lock() 的互斥锁封装: - - 支持 with - - 支持 try_acquire(若固件不支持非阻塞 acquire 参数,则退化为阻塞 acquire) - """ - def __init__(self): - self._lk = _thread.allocate_lock() - - def acquire(self, blocking=True): - try: - return self._lk.acquire(blocking) - except TypeError: - self._lk.acquire() - return True - - def try_acquire(self): - return self.acquire(False) - - def release(self): - self._lk.release() - - def __enter__(self): - self.acquire(True) - return self - - def __exit__(self, exc_type, exc, tb): - self.release() - return False - - -class ATClient: - """ - 单读者 AT/URC 客户端:唯一读取 uart4g,避免 tcp_main/at()/OTA 抢读导致 EOF / 丢包。 - - send(cmd, expect, timeout_ms) : 发送 AT 并等待 expect - - pop_tcp_payload() : 获取 +MIPURC:"rtcp" 的 payload(已按长度裁剪) - - pop_http_event() : 获取 +MHTTPURC 事件(header/content) - """ - def __init__(self, uart_obj): - self.uart = uart_obj - self._cmd_lock = _Mutex() - self._q_lock = _Mutex() - self._rx = b"" - self._tcp_payloads = [] - self._http_events = [] - - # 当前命令等待状态(仅允许单命令 in-flight) - self._waiting = False - self._expect = b"OK" - self._resp = b"" - - self._running = False - - def start(self): - if self._running: - return - self._running = True - _thread.start_new_thread(self._reader_loop, ()) - - def stop(self): - self._running = False - - def flush(self): - """清空内部缓存与队列(用于 OTA/异常恢复)""" - with self._q_lock: - self._rx = b"" - self._tcp_payloads.clear() - self._http_events.clear() - self._resp = b"" - - def pop_tcp_payload(self): - with self._q_lock: - if self._tcp_payloads: - return self._tcp_payloads.pop(0) - return None - - def pop_http_event(self): - with self._q_lock: - if self._http_events: - return self._http_events.pop(0) - return None - - def _push_tcp_payload(self, payload: bytes): - # 注意:在 _reader_loop 内部解析 URC 时已经持有 _q_lock, - # 这里不要再次 acquire(锁不可重入,会死锁)。 - self._tcp_payloads.append(payload) - - def _push_http_event(self, ev): - # 同上:避免在 _reader_loop 持锁期间二次 acquire - self._http_events.append(ev) - - def send(self, cmd: str, expect: str = "OK", timeout_ms: int = 2000): - """ - 发送 AT 命令并等待 expect(子串匹配)。 - 注意:expect=">" 用于等待 prompt。 - """ - expect_b = expect.encode() if isinstance(expect, str) else expect - with self._cmd_lock: - # 初始化等待 - self._waiting = True - self._expect = expect_b - self._resp = b"" - - # 发送 - if cmd: - # 注意:这里不要再用 uart4g_lock(否则外层已经持锁时会死锁)。 - # 写入由 _cmd_lock 串行化即可。 - self.uart.write((cmd + "\r\n").encode()) - - t0 = time.ticks_ms() - while time.ticks_ms() - t0 < timeout_ms: - if (not self._waiting) or (self._expect in self._resp): - self._waiting = False - break - time.sleep_ms(5) - - # 超时也返回已收集内容(便于诊断) - self._waiting = False - try: - return self._resp.decode(errors="ignore") - except: - return str(self._resp) - - def _find_urc_tag(self, tag: bytes): - """ - 只在“真正的 URC 边界”查找 tag,避免误命中 HTTP payload 内容。 - 规则:tag 必须出现在 buffer 开头,或紧跟在 b"\\r\\n" 后面。 - """ - try: - i = 0 - rx = self._rx - while True: - j = rx.find(tag, i) - if j < 0: - return -1 - if j == 0: - return 0 - if j >= 2 and rx[j - 2:j] == b"\r\n": - return j - i = j + 1 - except: - return -1 - - def _parse_mipurc_rtcp(self): - """ - 解析:+MIPURC: "rtcp",,, - 之前硬编码 link_id=0 会导致在多连接/重连场景下收不到数据。 - """ - prefix = b'+MIPURC: "rtcp",' - i = self._find_urc_tag(prefix) - if i < 0: - return False - # 丢掉前置噪声 - if i > 0: - self._rx = self._rx[i:] - i = 0 - - j = len(prefix) - # 解析 link_id - k = j - while k < len(self._rx) and 48 <= self._rx[k] <= 57: - k += 1 - if k == j or k >= len(self._rx): - return False - if self._rx[k:k+1] != b",": - self._rx = self._rx[1:] - return True - try: - link_id = int(self._rx[j:k].decode()) - except: - self._rx = self._rx[1:] - return True - - # 解析 len - j2 = k + 1 - k2 = j2 - while k2 < len(self._rx) and 48 <= self._rx[k2] <= 57: - k2 += 1 - if k2 == j2 or k2 >= len(self._rx): - return False - if self._rx[k2:k2+1] != b",": - self._rx = self._rx[1:] - return True - try: - n = int(self._rx[j2:k2].decode()) - except: - self._rx = self._rx[1:] - return True - - payload_start = k2 + 1 - payload_end = payload_start + n - if len(self._rx) < payload_end: - return False # payload 未收齐 - - payload = self._rx[payload_start:payload_end] - # 把 link_id 一起带上,便于上层过滤(如果需要) - self._push_tcp_payload((link_id, payload)) - self._rx = self._rx[payload_end:] - return True - - def _parse_mhttpurc_header(self): - tag = b'+MHTTPURC: "header",' - i = self._find_urc_tag(tag) - if i < 0: - return False - if i > 0: - self._rx = self._rx[i:] - i = 0 - - # header: +MHTTPURC: "header",,,, - j = len(tag) - comma_count = 0 - k = j - while k < len(self._rx) and comma_count < 3: - if self._rx[k:k+1] == b",": - comma_count += 1 - k += 1 - if comma_count < 3: - return False - - prefix = self._rx[:k] - m = re.search(rb'\+MHTTPURC: "header",\s*(\d+),\s*(\d+),\s*(\d+),', prefix) - if not m: - self._rx = self._rx[1:] - return True - urc_id = int(m.group(1)) - code = int(m.group(2)) - hdr_len = int(m.group(3)) - - text_start = k - text_end = text_start + hdr_len - if len(self._rx) < text_end: - return False - - hdr_text = self._rx[text_start:text_end].decode("utf-8", "ignore") - self._push_http_event(("header", urc_id, code, hdr_text)) - self._rx = self._rx[text_end:] - return True - - def _parse_mhttpurc_content(self): - tag = b'+MHTTPURC: "content",' - i = self._find_urc_tag(tag) - if i < 0: - return False - if i > 0: - self._rx = self._rx[i:] - i = 0 - - # content: +MHTTPURC: "content",,,,, - j = len(tag) - comma_count = 0 - k = j - while k < len(self._rx) and comma_count < 4: - if self._rx[k:k+1] == b",": - comma_count += 1 - k += 1 - if comma_count < 4: - return False - - prefix = self._rx[:k] - m = re.search(rb'\+MHTTPURC: "content",\s*(\d+),\s*(\d+),\s*(\d+),\s*(\d+),', prefix) - if not m: - self._rx = self._rx[1:] - return True - urc_id = int(m.group(1)) - total_len = int(m.group(2)) - sum_len = int(m.group(3)) - cur_len = int(m.group(4)) - - payload_start = k - payload_end = payload_start + cur_len - if len(self._rx) < payload_end: - return False - - payload = self._rx[payload_start:payload_end] - self._push_http_event(("content", urc_id, total_len, sum_len, cur_len, payload)) - self._rx = self._rx[payload_end:] - return True - - def _reader_loop(self): - while self._running: - # 关键:UART 驱动偶发 read failed,必须兜住,否则线程挂了 OTA/TCP 都会卡死 - try: - d = self.uart.read(4096) # 8192 在一些驱动上更容易触发 read failed - except Exception as e: - try: - print("[ATClient] uart read failed:", e) - except: - pass - time.sleep_ms(50) - continue - - if not d: - time.sleep_ms(1) - continue - - with self._q_lock: - self._rx += d - if self._waiting: - self._resp += d - - while True: - progressed = ( - self._parse_mipurc_rtcp() - or self._parse_mhttpurc_header() - or self._parse_mhttpurc_content() - ) - if not progressed: - break - - try: - ota_flag = int(globals().get("ota_in_progress", 0)) > 0 - except: - ota_flag = False - - has_http_hint = (b"+MHTTP" in self._rx) or (b"+MHTTPURC" in self._rx) - if ota_flag or has_http_hint: - if len(self._rx) > 512 * 1024: - self._rx = self._rx[-256 * 1024:] - else: - if len(self._rx) > 16384: - self._rx = self._rx[-4096:] - -# ==================== 全局配置 ==================== - -# OTA 升级地址与本地路径 -# url = "https://static.shelingxingqiu.com/shoot/202511031031/main.py" -local_filename = "/maixapp/apps/t11/main_tmp.py" -app_version = '1.0.2' -# OTA 下发参数(由后端指令写入) -OTA_URL = None -OTA_MODE = None # "4g" / "wifi" / None - -def is_wifi_connected(): - """尽量判断当前是否有 Wi-Fi(有则走 Wi-Fi OTA,否则走 4G OTA)""" - # 优先用 MaixPy network(如果可用) - try: - wlan = network.WLAN(network.TYPE_WIFI) - if wlan.isconnected(): - return True - except: - pass - - # 兜底:看系统 wlan0 有没有 IP(你系统可能没有 wlan0,则返回 False) - try: - ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() - return bool(ip) - except: - return False - -# 设备认证信息(运行时动态加载) -DEVICE_ID = None -PASSWORD = None - -# 服务器连接参数 -SERVER_IP = "www.shelingxingqiu.com" -SERVER_PORT = 50005 -HEARTBEAT_INTERVAL = 60 # 心跳间隔(秒) - -# 激光校准配置 -CONFIG_FILE = "/root/laser_config.json" -DEFAULT_POINT = (640, 480) # 默认激光中心点(图像中心) -laser_point = DEFAULT_POINT - -# HTTP 上报接口 -URL = "http://ws.shelingxingqiu.com" -API_PATH = "/home/shoot/device_fire/arrow/fire" - -# UART 设备初始化 -uart4g = uart.UART("/dev/ttyS2", 115200) # 4G 模块通信 -distance_serial = uart.UART("/dev/ttyS1", 9600) # 激光测距模块 - -# 单读者 ATClient(唯一读取 uart4g) -at_client = ATClient(uart4g) -at_client.start() - -# 引脚功能映射 -pinmap.set_pin_function("A18", "UART1_RX") -pinmap.set_pin_function("A19", "UART1_TX") -pinmap.set_pin_function("A29", "UART2_RX") -pinmap.set_pin_function("A28", "UART2_TX") -pinmap.set_pin_function("P18", "I2C1_SCL") -pinmap.set_pin_function("P21", "I2C1_SDA") -# pinmap.set_pin_function("A15", "I2C5_SCL") -# pinmap.set_pin_function("A27", "I2C5_SDA")#ota升级要修改的 -# ADC 触发阈值(用于检测扳机/激光触发) -ADC_TRIGGER_THRESHOLD = 3000 -ADC_LASER_THRESHOLD = 3000 - -# 显示参数:激光十字线样式 -color = image.Color(255, 100, 0) -thickness = 1 -length = 2 - -# 全局状态变量 -laser_calibration_active = False # 是否正在后台校准激光 -laser_calibration_result = None # 校准结果坐标 (x, y) -laser_calibration_lock = _Mutex() # 互斥锁,防止多线程冲突 - -# 硬件对象初始化 -laser_x, laser_y = laser_point -adc_obj = adc.ADC(0, adc.RES_BIT_12) -bus = i2c.I2C(1, i2c.Mode.MASTER) # 使用 I2C1 总线 -# bus = i2c.I2C(5, i2c.Mode.MASTER) #ota升级的 -# INA226 电流/电压监测芯片寄存器地址 -INA226_ADDR = 0x40 -REG_CONFIGURATION = 0x00 -REG_BUS_VOLTAGE = 0x02 -REG_CALIBRATION = 0x05 -CALIBRATION_VALUE = 0x1400 - -# 激光控制指令(自定义协议) -MODULE_ADDR = 0x00 -LASER_ON_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC1]) -LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0]) - -# 相机标定参数(用于距离估算) -# FOCAL_LENGTH_PIX = 3800.0 # 焦距(像素) -FOCAL_LENGTH_PIX = 1900.0 # 焦距(像素) -REAL_RADIUS_CM = 15 # 靶心实际半径(厘米) - -# # TCP 连接状态 -tcp_connected = False -high_send_queue = [] # 高优先级发送队列:射箭事件等 -normal_send_queue = [] # 普通发送队列:电量/校准结果/状态等 -queue_lock = _Mutex() # 互斥锁,保护队列 -uart4g_lock = _Mutex() # 互斥锁,保护 4G 串口 AT 发送流程(防并发) -update_thread_started = False # 防止 OTA 更新线程重复启动 -# OTA(4G HTTP URC) 期间“暂停 TCP 活动/让读线程用大 buffer”: -# 用计数器而不是 bool,避免“外层 OTA 还没结束,内层 downloader finally 又把它置 False”。 -ota_in_progress = 0 - -# ==================== 工具函数 ==================== - -def download_file(url, filename): - """从指定 URL 下载文件并保存为 UTF-8 编码文本""" - try: - print(f"正在从 {url} 下载文件...") - response = requests.get(url) - response.raise_for_status() - response.encoding = 'utf-8' - with open(filename, 'w', encoding='utf-8') as file: - file.write(response.text) - return f"下载成功!文件已保存为: {filename}" - except requests.exceptions.RequestException as e: - return f"下载失败!网络请求错误: {e}" - except OSError as e: - return f"下载失败!文件写入错误: {e}" - except Exception as e: - return f"下载失败!发生未知错误: {e}" - - -def _download_file_system_bytes(url, filename, timeout_s=25): - """ - 走“系统网络栈”的下载(RNDIS/ECM/eth0 等),用 TCP 自带重传,适合 OTA 大文件。 - 注意:这里写 bytes,不做编码假设。 - """ - try: - print(f"[NET] system download: {url} -> {filename}") - r = requests.get(url, timeout=timeout_s) - r.raise_for_status() - data = r.content - with open(filename, "wb") as f: - f.write(data) - return True, f"OK size={len(data)}" - except Exception as e: - return False, f"system_download_failed: {e}" - - -def _has_default_route(): - try: - out = os.popen("ip route 2>/dev/null").read() - for line in out.splitlines(): - if line.strip().startswith("default "): - return True - return False - except: - return False - - -def _get_if_ipv4(ifname: str): - try: - out = os.popen(f"ip -4 addr show {ifname} 2>/dev/null").read() - m = re.search(r"\binet\s+(\d+\.\d+\.\d+\.\d+)/(\d+)", out) - if not m: - return None - return m.group(1) - except: - return None - - -def _ping_once(ip: str, ifname=None, timeout_s=1): - # busybox ping 可能不支持 -W;做两套尝试 - if ifname: - cmd1 = f"ping -I {ifname} -c 1 -W {timeout_s} {ip} >/dev/null 2>&1" - cmd2 = f"ping -I {ifname} -c 1 {ip} >/dev/null 2>&1" - else: - cmd1 = f"ping -c 1 -W {timeout_s} {ip} >/dev/null 2>&1" - cmd2 = f"ping -c 1 {ip} >/dev/null 2>&1" - rc = os.system(cmd1) - if rc == 0: - return True - rc = os.system(cmd2) - return rc == 0 - - -def ensure_ml307r_dialup_and_route(prefer_if=("usb0", "usb1"), metric=200, debug=True): - """ - 用 ML307R 的 RNDIS/ECM 方式把“系统网络”拉起来: - - AT+MDIALUP=1,1(拨号) - - 为 usb0/usb1 添加 default route(不改 IP,尽量不影响现有管理网段) - 目标:让 requests.get() 能直接下载 OTA,避免 UART URC 丢包。 - """ - def _dlog(*a): - if debug: - print("[DIALUP]", *a) - - # 1) 拨号(重复执行也安全) - try: - with uart4g_lock: - at('AT+MDIALUPCFG="mode"', "OK", 2000) - r = at("AT+MDIALUP=1,1", "OK", 20000) - _dlog("MDIALUP resp:", r) - except Exception as e: - _dlog("MDIALUP exception:", e) - - # 2) 如果已经有 default route,直接返回 - if _has_default_route(): - _dlog("default route already exists") - return True - - # 3) 尝试在 usb0/usb1 上猜测网关并加默认路由 - for ifname in prefer_if: - ip = _get_if_ipv4(ifname) - if not ip: - continue - - parts = ip.split(".") - if len(parts) != 4: - continue - base = ".".join(parts[:3]) - last = int(parts[3]) - - # 常见网关候选:.2 / .254 / .1(跳过自己) - candidates = [] - if last != 2: - candidates.append(f"{base}.2") - if last != 254: - candidates.append(f"{base}.254") - if last != 1: - candidates.append(f"{base}.1") - - for gw in candidates: - if gw == ip: - continue - # ping 一下让 ARP/neigh 建立(不一定通,但很多系统会因此学到邻居) - _ping_once(gw, ifname=ifname, timeout_s=1) - # 直接尝试加默认路由(若已存在会失败但无害) - os.system(f"ip route add default via {gw} dev {ifname} metric {metric} 2>/dev/null") - if _has_default_route(): - _dlog("default route set:", gw, "dev", ifname) - _dlog(os.popen("ip route 2>/dev/null").read().strip()) - return True - - _dlog("failed to set default route. ip route:", os.popen("ip route 2>/dev/null").read().strip()) - return False - -def is_server_reachable(host, port=80, timeout=5): - """检查目标主机端口是否可达(用于 OTA 前网络检测)""" - try: - addr_info = socket.getaddrinfo(host, port)[0] - s = socket.socket(addr_info[0], addr_info[1], addr_info[2]) - s.settimeout(timeout) - s.connect(addr_info[-1]) - s.close() - return True - except Exception as e: - print(f"[NET] 无法连接 {host}:{port} - {e}") - return False - -def apply_ota_and_reboot(ota_url=None): - # TODO: remove this return after test - # return True - - """ - OTA 文件下载成功后:备份原 main.py -> 替换 main_tmp.py -> 重启设备 - """ - import shutil - - main_py = "/maixapp/apps/t11/main.py" - main_tmp = "/maixapp/apps/t11/main_tmp.py" - main_bak = "/maixapp/apps/t11/main.py.bak" - ota_pending = "/maixapp/apps/t11/ota_pending.json" - - try: - # 1. 检查下载的文件是否存在 - if not os.path.exists(main_tmp): - print(f"[OTA] 错误:{main_tmp} 不存在") - return False - - # 2. 备份原 main.py(如果存在) - if os.path.exists(main_py): - try: - shutil.copy2(main_py, main_bak) - print(f"[OTA] 已备份 {main_py} -> {main_bak}") - except Exception as e: - print(f"[OTA] 备份失败: {e}") - # 备份失败也继续(可能没有原文件) - - # 3. 替换:main_tmp.py -> main.py - try: - shutil.copy2(main_tmp, main_py) - print(f"[OTA] 已替换 {main_tmp} -> {main_py}") - - # 确保写入磁盘 - try: - os.sync() # 如果系统支持 - except: - pass - time.sleep_ms(500) # 额外等待确保写入完成 - - except Exception as e: - print(f"[OTA] 替换失败: {e}") - return False - - # 3.5 写入 pending(用于重启后确认成功并上报) - try: - pending_obj = { - "ts": int(time.time()) if hasattr(time, "time") else 0, - "url": ota_url or "", - "tmp": main_tmp, - "main": main_py, - "bak": main_bak, - } - with open(ota_pending, "w", encoding="utf-8") as f: - json.dump(pending_obj, f) - try: - os.sync() - except: - pass - except Exception as e: - print(f"[OTA] 写入 ota_pending 失败: {e}") - - # 4. 通知服务器(可选,但重启前发一次) - safe_enqueue({"result": "ota_applied_rebooting"}, 2) - time.sleep_ms(1000) # 给一点时间让消息发出 - - # 5. 重启设备 - print("[OTA] 准备重启设备...") - os.system("reboot") # MaixPy 通常是这个命令 - - return True - - except Exception as e: - print(f"[OTA] apply_ota_and_reboot 异常: {e}") - return False - - -def direct_ota_download(ota_url): - """ - 直接执行 OTA 下载(假设已有网络) - 用于 cmd=7 / 或 wifi 模式 - """ - global update_thread_started - try: - if not ota_url: - safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2) - return - - from urllib.parse import urlparse - parsed_url = urlparse(ota_url) - host = parsed_url.hostname - port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80) - - if not is_server_reachable(host, port, timeout=8): - safe_enqueue({"result": "ota_failed", "reason": f"无法连接 {host}:{port}"}, 2) - return - - print(f"[OTA] 开始下载: {ota_url}") - # from ota import download_file - result_msg = download_file(ota_url, local_filename) - print(f"[OTA] {result_msg}") - - # 检查是否下载成功(包含"成功"或"下载成功"关键字) - if "成功" in result_msg or "下载成功" in result_msg: - # 下载成功:备份+替换+重启 - if apply_ota_and_reboot(ota_url): - return # 会重启,不会执行到 finally - else: - safe_enqueue({"result": result_msg}, 2) - - except Exception as e: - error_msg = f"OTA 异常: {str(e)}" - print(error_msg) - safe_enqueue({"result": "ota_failed", "reason": error_msg}, 2) - finally: - update_thread_started = False - -def handle_wifi_and_update(ssid, password, ota_url): - """在子线程中执行 Wi-Fi 连接 + OTA 更新流程""" - global update_thread_started - try: - ip, error = connect_wifi(ssid, password) - if error: - safe_enqueue({"result": "wifi_failed", "error": error}, 2) - return - safe_enqueue({"result": "wifi_connected", "ip": ip}, 2) - - # 下载 - if not ota_url: - safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2) - return - - from urllib.parse import urlparse - parsed_url = urlparse(ota_url) - host = parsed_url.hostname - port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80) - - if not is_server_reachable(host, port, timeout=8): - err_msg = f"网络不通:无法连接 {host}:{port}" - safe_enqueue({"result": err_msg}, 2) - return - - print(f"[NET] 已确认可访问 {host}:{port},开始下载...") - result = download_file(ota_url, local_filename) - print(result) - - # 检查是否下载成功(包含"成功"或"下载成功"关键字) - if "成功" in result or "下载成功" in result: - # 下载成功:备份+替换+重启 - if apply_ota_and_reboot(ota_url): - return # 会重启,不会执行到 finally - else: - safe_enqueue({"result": result}, 2) - - finally: - update_thread_started = False - print("[UPDATE] 更新线程执行完毕,即将退出。") - - -def connect_wifi(ssid, password): - """ - 连接 Wi-Fi 并将凭证持久化保存到 /boot/ 目录, - 以便设备重启后自动连接。 - """ - conf_path = "/etc/wpa_supplicant.conf" - ssid_file = "/boot/wifi.ssid" - pass_file = "/boot/wifi.pass" - - try: - # 生成 wpa_supplicant 配置 - net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read() - if "network={" not in net_conf: - return None, "Failed to generate wpa config" - - # 写入运行时配置 - with open(conf_path, "w") as f: - f.write("ctrl_interface=/var/run/wpa_supplicant\n") - f.write("update_config=1\n\n") - f.write(net_conf) - - # 持久化保存 SSID/PASS(关键!) - with open(ssid_file, "w") as f: - f.write(ssid.strip()) - with open(pass_file, "w") as f: - f.write(password.strip()) - - # 重启 Wi-Fi 服务 - os.system("/etc/init.d/S30wifi restart") - - # 等待获取 IP - for _ in range(20): - ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() - if ip: - return ip, None - time.sleep(1) - - return None, "Timeout: No IP obtained" - - except Exception as e: - return None, f"Exception: {str(e)}" - - -def read_device_id(): - """从 /device_key 文件读取设备唯一 ID,失败则使用默认值""" - try: - with open("/device_key", "r") as f: - device_id = f.read().strip() - if device_id: - print(f"[INFO] 从 /device_key 读取到 DEVICE_ID: {device_id}") - return device_id - except Exception as e: - print(f"[ERROR] 无法读取 /device_key: {e}") - return "DEFAULT_DEVICE_ID" - - -def safe_enqueue(data_dict, msg_type=2, high=False): - """线程安全地将消息加入 TCP 发送队列(支持优先级)""" - global queue_lock, high_send_queue, normal_send_queue - item = (msg_type, data_dict) - with queue_lock: - if high: - high_send_queue.append(item) - else: - normal_send_queue.append(item) - -def at(cmd, wait="OK", timeout=2000): - """向 4G 模块发送 AT 指令并等待响应""" - # 统一由 ATClient 负责读 uart4g,避免多线程抢读 - return at_client.send(cmd, wait, timeout) - - -def make_packet(msg_type: int, body_dict: dict) -> bytes: - """打包 TCP 数据包:头部(长度+类型+校验)+ JSON 正文""" - body = json.dumps(body_dict).encode() - body_len = len(body) - checksum = body_len + msg_type - header = struct.pack(">III", body_len, msg_type, checksum) - return header + body - - -def parse_packet(data: bytes): - """解析 TCP 数据包,返回 (类型, 正文字典)""" - if len(data) < 12: - return None, None - body_len, msg_type, checksum = struct.unpack(">III", data[:12]) - body = data[12:12 + body_len] - try: - return msg_type, json.loads(body.decode()) - except: - return msg_type, {"raw": body.decode(errors="ignore")} - - -def tcp_send_raw(data: bytes, max_retries=2) -> bool: - global tcp_connected - if not tcp_connected: - return False - - with uart4g_lock: - for _ in range(max_retries): - cmd = f'AT+MIPSEND=0,{len(data)}' - if ">" not in at(cmd, ">", 2000): - time.sleep_ms(50) - continue - - # 关键:确保把 data 全部写出去 - total = 0 - while total < len(data): - n = uart4g.write(data[total:]) - if not n or n < 0: - time.sleep_ms(1) - continue - total += n - - # 关键:再发结束符(不算进 payload) - uart4g.write(b"\x1A") - - # 等发送完成确认(不同固件可能是 SEND OK / OK / +MIPSEND) - r = at("", "OK", 8000) - if ("SEND OK" in r) or ("OK" in r) or ("+MIPSEND" in r): - return True - - time.sleep_ms(50) - - return False - - -def generate_token(device_id): - """生成用于 HTTP 接口鉴权的 Token(HMAC-SHA256)""" - SALT = "shootMessageFire" - SALT2 = "shoot" - return "Arrow_" + hmac.new((SALT + device_id).encode(), SALT2.encode(), hashlib.sha256).hexdigest() - - -def send_http_cmd(cmd_str, timeout_ms=3000): - """发送 HTTP 相关 AT 指令(调试用)""" - print("[HTTP AT] =>", cmd_str) - return at(cmd_str, "OK", timeout_ms) - - -def read_http_response(timeout_ms=5000): - """读取并打印 HTTP 响应(用于调试)""" - # 仅保留占位:UART 读取由 ATClient 独占;如需调试,请从 ATClient 的 http_events 中取。 - time.sleep_ms(timeout_ms) - - -def upload_shoot_event(json_data): - """通过 4G 模块上报射击事件到 HTTP 接口(备用通道)""" - token = generate_token(DEVICE_ID) - if not send_http_cmd(f'AT+MHTTPCREATE="{URL}"'): - return False - instance_id = 0 - send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"Content-Type: application/json"') - send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"Authorization: {token}"') - send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"DeviceId: {DEVICE_ID}"') - json_str = ujson.dumps(json_data) - if not send_http_cmd(f'AT+MHTTPCONTENT={instance_id},0,0,"{json_str}"'): - return False - if send_http_cmd(f'AT+MHTTPREQUEST={instance_id},2,0,"{API_PATH}"'): - read_http_response() - return True - return False - - -def load_laser_point(): - """从配置文件加载激光中心点,失败则使用默认值""" - global laser_point - try: - if "laser_config.json" in os.listdir("/root"): - with open(CONFIG_FILE, "r") as f: - data = json.load(f) - if isinstance(data, list) and len(data) == 2: - laser_point = (int(data[0]), int(data[1])) - print(f"[INFO] 加载激光点: {laser_point}") - else: - raise ValueError - else: - laser_point = DEFAULT_POINT - except: - laser_point = DEFAULT_POINT - - -def save_laser_point(point): - """保存激光中心点到配置文件""" - global laser_point - try: - with open(CONFIG_FILE, "w") as f: - json.dump([point[0], point[1]], f) - laser_point = point - except: - pass - - -def turn_on_laser(): - """发送指令开启激光,并读取回包(部分模块支持)""" - distance_serial.write(LASER_ON_CMD) - time.sleep_ms(10) - resp = distance_serial.read(20) - if resp: - if resp == LASER_ON_CMD: - print("✅ 激光指令已确认") - else: - print("🔇 无回包(正常或模块不支持)") - return resp - -def flash_laser(duration_ms=1000): - """闪一下激光(用于射箭反馈)""" - try: - distance_serial.write(LASER_ON_CMD) - time.sleep_ms(duration_ms) - distance_serial.write(LASER_OFF_CMD) - except Exception as e: - print(f"闪激光失败: {e}") - - -def find_red_laser(frame, threshold=150): - """在图像中查找最亮的红色激光点(基于 RGB 阈值)""" - w, h = frame.width(), frame.height() - img_bytes = frame.to_bytes() - max_sum = 0 - best_pos = None - for y in range(0, h, 2): - for x in range(0, w, 2): - idx = (y * w + x) * 3 - r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2] - if r > threshold and r > g * 2 and r > b * 2: - rgb_sum = r + g + b - if rgb_sum > max_sum: - max_sum = rgb_sum - best_pos = (x, y) - return best_pos - - -def calibrate_laser_position(): - """执行一次激光校准:拍照 → 找红点 → 保存坐标""" - global laser_x, laser_y - time.sleep_ms(80) - cam = camera.Camera(640, 480) - frame = cam.read() - pos = find_red_laser(frame) - if pos: - laser_x, laser_y = pos - save_laser_point(pos) - return pos - return None - - -# ==================== 电源管理(INA226) ==================== - -def write_register(reg, value): - data = [(value >> 8) & 0xFF, value & 0xFF] - bus.writeto_mem(INA226_ADDR, reg, bytes(data)) - - -def read_register(reg): - data = bus.readfrom_mem(INA226_ADDR, reg, 2) - return (data[0] << 8) | data[1] - - -def init_ina226(): - """初始化 INA226 芯片:配置模式 + 校准值""" - write_register(REG_CONFIGURATION, 0x4527) - write_register(REG_CALIBRATION, CALIBRATION_VALUE) - - -def get_bus_voltage(): - """读取总线电压(单位:V)""" - raw = read_register(REG_BUS_VOLTAGE) - return raw * 1.25 / 1000 - - -def voltage_to_percent(voltage): - """根据电压估算电池百分比(查表插值)""" - points = [ - (4.20, 100), (4.10, 95), (4.05, 85), (4.00, 75), (3.95, 65), - (3.90, 55), (3.85, 45), (3.80, 35), (3.75, 25), (3.70, 15), - (3.65, 5), (3.60, 0) - ] - if voltage >= points[0][0]: - return 100 - if voltage <= points[-1][0]: - return 0 - for i in range(len(points) - 1): - v1, p1 = points[i] - v2, p2 = points[i + 1] - if voltage >= v2: - ratio = (voltage - v1) / (v2 - v1) - percent = p1 + (p2 - p1) * ratio - return max(0, min(100, int(round(percent)))) - return 0 - - -# ==================== 靶心检测与距离计算 ==================== - -def detect_circle(frame): - """检测图像中的靶心(优先清晰轮廓,其次黄色区域)""" - global REAL_RADIUS_CM - img_cv = image.image2cv(frame, False, False) - gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY) - blurred = cv2.GaussianBlur(gray, (5, 5), 0) - edged = cv2.Canny(blurred, 50, 150) - kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) - ceroded = cv2.erode(cv2.dilate(edged, kernel), kernel) - - contours, _ = cv2.findContours(ceroded, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) - best_center = best_radius = best_radius1 = method = None - - # 方法1:基于轮廓拟合椭圆(清晰靶心) - for cnt in contours: - area = cv2.contourArea(cnt) - perimeter = cv2.arcLength(cnt, True) - if perimeter < 100 or area < 100: - continue - circularity = 4 * np.pi * area / (perimeter ** 2) - if circularity > 0.75 and len(cnt) >= 5: - center, axes, angle = cv2.fitEllipse(cnt) - radius = (axes[0] + axes[1]) / 4 - best_center = (int(center[0]), int(center[1])) - best_radius = int(radius) - best_radius1 = best_radius - REAL_RADIUS_CM = 15 - method = "清晰" - break - - # 方法2:基于 HSV 黄色掩码(模糊靶心) - if not best_center: - hsv = cv2.cvtColor(img_cv, cv2.COLOR_BGR2HSV) - h, s, v = cv2.split(hsv) - s = np.clip(s * 2, 0, 255).astype(np.uint8) - hsv = cv2.merge((h, s, v)) - lower_yellow = np.array([7, 80, 0]) - upper_yellow = np.array([32, 255, 182]) - mask = cv2.inRange(hsv, lower_yellow, upper_yellow) - kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) - mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel) - mask = cv2.morphologyEx(mask, cv2.MORPH_DILATE, kernel) - contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) - if contours: - largest = max(contours, key=cv2.contourArea) - if cv2.contourArea(largest) > 50: - (x, y), radius = cv2.minEnclosingCircle(largest) - best_center = (int(x), int(y)) - best_radius = int(radius) - best_radius1 = best_radius - REAL_RADIUS_CM = 15 - method = "模糊" - - result_img = image.cv2image(img_cv, False, False) - return result_img, best_center, best_radius, method, best_radius1 - - -def estimate_distance(pixel_radius): - """根据像素半径估算实际距离(单位:米)""" - if not pixel_radius: - return 0.0 - return (REAL_RADIUS_CM * FOCAL_LENGTH_PIX) / pixel_radius / 100.0 - - -def compute_laser_position(circle_center, laser_point, radius, method): - """计算激光相对于靶心的偏移量(单位:厘米)""" - if not all([circle_center, radius, method]): - return None, None - cx, cy = circle_center - lx, ly = laser_point - # 根据检测方法动态调整靶心物理半径(简化模型) - circle_r = (radius / 4.0) * 20.0 if method == "模糊" else (68 / 16.0) * 20.0 - dx = lx - cx - dy = ly - cy - return dx / (circle_r / 100.0), -dy / (circle_r / 100.0) - - -# ==================== TCP 通信线程 ==================== - -def connect_server(): - """通过 4G 模块建立 TCP 连接""" - global tcp_connected - if tcp_connected: - return True - print("连接到服务器...") - with uart4g_lock: - at("AT+MIPCLOSE=0", "OK", 1000) - res = at(f'AT+MIPOPEN=0,"TCP","{SERVER_IP}",{SERVER_PORT}', "+MIPOPEN", 8000) - if "+MIPOPEN: 0,0" in res: - tcp_connected = True - return True - return False - -raw_line_data = [] -def tcp_main(): - """TCP 主通信循环:登录、心跳、处理指令、发送数据""" - global tcp_connected, high_send_queue, normal_send_queue, queue_lock, laser_calibration_active, laser_calibration_result, laser_calibration_lock, update_thread_started - send_hartbeat_fail_count = 0 - while not app.need_exit(): - # OTA 期间不要 connect/登录/心跳/发送,避免与 HTTP URC 下载抢 uart4g_lock 导致心跳超时被服务器断开 - try: - if ota_in_progress: - time.sleep_ms(200) - continue - except: - pass - - if not connect_server(): - time.sleep_ms(5000) - continue - - # 发送登录包 - login_data = {"deviceId": DEVICE_ID, "password": PASSWORD, "version": app_version} - if not tcp_send_raw(make_packet(1, login_data)): - tcp_connected = False - time.sleep_ms(2000) - continue - - print("➡️ 登录包已发送,等待确认...") - logged_in = False - last_heartbeat_ack_time = time.ticks_ms() - last_heartbeat_send_time = time.ticks_ms() - while True: - # OTA 期间暂停 TCP 活动(不发心跳、不发业务),让下载独占 4G 串口 - try: - if ota_in_progress: - time.sleep_ms(200) - continue - except: - pass - - # 接收数据(唯一来源:ATClient 解析后的 TCP payload 队列) - item = at_client.pop_tcp_payload() - if item: - # item 可能是 (link_id, payload) 或直接 payload(兼容旧队列格式) - if isinstance(item, tuple) and len(item) == 2: - link_id, payload = item - else: - link_id, payload = 0, item - - # 登录阶段加一条轻量 debug,确认 ACK 是否进入队列 - if not logged_in: - try: - print(f"[TCP] rx link={link_id} len={len(payload)} head={payload[:12].hex()}") - except: - pass - - msg_type, body = parse_packet(payload) - - # 处理登录响应 - if not logged_in and msg_type == 1: - if body and body.get("cmd") == 1 and body.get("data") == "登录成功": - logged_in = True - last_heartbeat_ack_time = time.ticks_ms() - print("✅ 登录成功") - # 若存在 ota_pending.json,说明上次 OTA 已应用并重启; - # 这里以“能成功登录服务器”为 OTA 成功判据:上报 ota_ok 并删除 pending,确保只上报一次。 - try: - pending_path = "/maixapp/apps/t11/ota_pending.json" - if os.path.exists(pending_path): - try: - with open(pending_path, "r", encoding="utf-8") as f: - pending_obj = json.load(f) - except: - pending_obj = {} - safe_enqueue({"result": "ota_ok", "url": pending_obj.get("url", "")}, 2) - try: - os.remove(pending_path) - except: - pass - except Exception as e: - print(f"[OTA] ota_ok 上报失败: {e}") - else: - # 登录失败,跳出重连 - break - - # 处理心跳 ACK - elif logged_in and msg_type == 4: - last_heartbeat_ack_time = time.ticks_ms() - print("✅ 收到心跳确认") - - elif logged_in and msg_type == 40: - if isinstance(body, dict): - t = body.get('t', 0) - v = body.get('v') - raw_line_data.append(body) - if len(raw_line_data) >= int(t): - print(f"下载完成") - stock_array = list(map(lambda x: x.get('d'), raw_line_data)) - with open(local_filename, 'w', encoding='utf-8') as file: - file.write("\n".join(stock_array)) - apply_ota_and_reboot() - else: - safe_enqueue({'data':{'l': len(raw_line_data), 'v': v}, 'cmd': 41}) - print(f"已下载{len(raw_line_data)} 全部:{t} 版本:{v}") - - # 处理业务指令 - elif logged_in and isinstance(body, dict): - # 重要:每个包都要重新解析 inner_cmd,避免上一次的 cmd “粘住”导致反复执行 - inner_cmd = None - data_obj = body.get("data") - if isinstance(data_obj, dict): - inner_cmd = data_obj.get("cmd") - - if inner_cmd == 2: # 开启激光并校准 - # 幂等:正在校准则不重复触发(服务器可能重发 cmd=2) - if not laser_calibration_active: - turn_on_laser() - time.sleep_ms(100) - laser_calibration_active = True - safe_enqueue({"result": "calibrating"}, 2) - elif inner_cmd == 3: # 关闭激光 - distance_serial.write(LASER_OFF_CMD) - laser_calibration_active = False - safe_enqueue({"result": "laser_off"}, 2) - elif inner_cmd == 4: # 上报电量 - voltage = get_bus_voltage() - battery_percent = voltage_to_percent(voltage) - battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)} - safe_enqueue(battery_data, 2) - print(f"🔋 电量上报: {battery_percent}%") - elif inner_cmd == 5: # OTA 升级(含 Wi-Fi 配置,及4g) - inner_data = data_obj.get("data", {}) if isinstance(data_obj, dict) else {} - - ssid = inner_data.get("ssid") - password = inner_data.get("password") - ota_url = inner_data.get("url") - mode = (inner_data.get("mode") or "").strip().lower() # "4g"/"wifi"/"" - - if not ota_url: - print("ota missing_url") - safe_enqueue({"result": "missing_url"}, 2) - continue - - # 自动判断:mode 非法/为空时,优先 Wi-Fi(如果已连),否则 4G - if mode not in ("4g", "wifi"): - print("ota missing mode") - mode = "wifi" if is_wifi_connected() else "4g" - - if update_thread_started: - safe_enqueue({"result": "update_already_started"}, 2) - continue - - update_thread_started = True - - if mode == "4g": - _thread.start_new_thread(direct_ota_download_via_4g, (ota_url,)) - else: - # wifi 模式:需要 ssid/password - if not ssid or not password: - update_thread_started = False - safe_enqueue({"result": "missing_ssid_or_password"}, 2) - else: - _thread.start_new_thread(handle_wifi_and_update, (ssid, password, ota_url)) - elif inner_cmd == 6: - try: - ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() - ip = ip if ip else "no_ip" - except: - ip = "error_getting_ip" - safe_enqueue({"result": "current_ip", "ip": ip}, 2) - elif inner_cmd == 7: - # global update_thread_started - if update_thread_started: - safe_enqueue({"result": "update_already_started"}, 2) - continue - - # 实时检查是否有 IP - try: - ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() - except: - ip = None - - if not ip: - safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, MSG_TYPE_STATUS) - else: - # 启动纯下载线程 - update_thread_started = True - _thread.start_new_thread(direct_ota_download, ()) - else: - # 非指令包(或未携带 cmd),不做任何动作 - pass - else: - time.sleep_ms(5) - - # 发送队列中的业务数据 - if logged_in and (high_send_queue or normal_send_queue): - # 只在锁内取出一个待发包,发送放到锁外,避免长时间占用队列锁 - msg_type = None - data_dict = None - if queue_lock.try_acquire(): - try: - if high_send_queue: - msg_type, data_dict = high_send_queue.pop(0) - elif normal_send_queue: - msg_type, data_dict = normal_send_queue.pop(0) - finally: - queue_lock.release() - - if msg_type is not None and data_dict is not None: - pkt = make_packet(msg_type, data_dict) - if not tcp_send_raw(pkt): - tcp_connected = False - break - - # 发送激光校准结果 - if logged_in and laser_calibration_result is not None: - x = y = None - with laser_calibration_lock: - if laser_calibration_result is not None: - x, y = laser_calibration_result - laser_calibration_result = None - if x is not None and y is not None: - safe_enqueue({"result": "ok", "x": x, "y": y}, 2) - - # 定期发送心跳 - current_time = time.ticks_ms() - if logged_in and current_time - last_heartbeat_send_time > HEARTBEAT_INTERVAL * 1000: - vol_val = get_bus_voltage() - if not tcp_send_raw(make_packet(4, {"vol":vol_val, "vol_per":voltage_to_percent(vol_val)})): - print("💔 心跳发送失败") - send_hartbeat_fail_count += 1 - if send_hartbeat_fail_count >= 3: - send_hartbeat_fail_count = 0 - print("连续3次发送心跳失败,重连") - break - else: - continue - else: - send_hartbeat_fail_count = 0 - last_heartbeat_send_time = current_time - print("💓 心跳已发送") - - # 心跳超时重连 - if logged_in and current_time - last_heartbeat_ack_time > 1000*60*10: # 十分钟 - print("⏰ 十分钟无心跳ACK,重连") - break - - time.sleep_ms(50) - - tcp_connected = False - print("🔌 连接异常,2秒后重连...") - time.sleep_ms(2000) +import json + +# 导入新模块 +import config +from version import VERSION +# from logger import init_logging, get_logger, stop_logging +from logger_manager import logger_manager +from time_sync import sync_system_time_from_4g +from power import init_ina226, get_bus_voltage, voltage_to_percent +from laser_manager import laser_manager +from vision import detect_circle_v3, estimate_distance, compute_laser_position, save_shot_image +from network import network_manager +from ota_manager import ota_manager +from hardware import hardware_manager def laser_calibration_worker(): """后台线程:持续检测是否需要执行激光校准""" - global laser_calibration_active, laser_calibration_result, laser_calibration_lock + from maix import camera + from laser_manager import laser_manager + from ota_manager import ota_manager + + logger = logger_manager.logger + if logger: + logger.info("[LASER] 激光校准线程启动") + while True: - # OTA 期间尽量省电:暂停后台校准(会占用 Camera) try: - if int(globals().get("ota_in_progress", 0)) > 0: + try: + if ota_manager.ota_in_progress: + time.sleep_ms(200) + continue + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[LASER] OTA检查异常: {e}") time.sleep_ms(200) continue - except: - pass - if laser_calibration_active: - # 关键:不要在每次尝试里反复 new Camera(会导致 MMF 反复初始化刷屏) - cam = None - try: - cam = camera.Camera(640, 480) - start = time.ticks_ms() - timeout_ms = 8000 # 8 秒内找不到红点就退出一次,避免一直占用资源 - while laser_calibration_active and time.ticks_diff(time.ticks_ms(), start) < timeout_ms: - frame = cam.read() - pos = find_red_laser(frame) - if pos: - with laser_calibration_lock: - laser_calibration_result = pos - laser_calibration_active = False - save_laser_point(pos) - print(f"✅ 后台校准成功: {pos}") - break - time.sleep_ms(60) - except Exception as e: - # 出错时也不要死循环刷屏 - print(f"[LASER] calibration error: {e}") - time.sleep_ms(200) - finally: + if laser_manager.calibration_active: + cam = None try: - # 释放摄像头资源(MaixPy 通常靠 GC,但显式 del 更稳) - if cam is not None: - del cam - except: - pass - - # 如果超时仍未成功,稍微休息一下再允许下一次 cmd=2 触发 - if laser_calibration_active: - time.sleep_ms(300) - else: - time.sleep_ms(50) - -def download_file_via_4g(url, filename, - total_timeout_ms=600000, - retries=3, - debug=True): - """ - ML307R HTTP 下载(更稳的“固定小块 Range 顺序下载”): - - 只依赖 +MHTTPURC:"header"/"content"(不依赖 MHTTPREAD/cached) - - 每次只请求一个小块 Range(默认 1024B),失败就重试同一块,必要时缩小块大小 - - 每个 chunk 都重新 MHTTPCREATE/MHTTPREQUEST,避免卡在“206 header 但不吐 content”的坏状态 - """ - from urllib.parse import urlparse - - # 小块策略(可按现场再调) - # - chunk 越小越稳(URC 压力更小),代价是请求次数更多 - CHUNK_MAX = 10240 - CHUNK_MIN = 128 - CHUNK_RETRIES = 12 - FRAG_SIZE = 1024 # 0-1024 - FRAG_DELAY = 10 # 0-2000 ms - - t_func0 = time.ticks_ms() - - parsed = urlparse(url) - host = parsed.hostname - path = parsed.path or "/" - if not host: - return False, "bad_url (no host)" - - # 很多 ML307R 的 MHTTP 对 https 不稳定;对已知域名做降级 - if isinstance(url, str) and url.startswith("https://static.shelingxingqiu.com/"): - base_url = "http://static.shelingxingqiu.com" - else: - base_url = f"http://{host}" - - def _log(*a): - if debug: - print(*a) - - def _pwr_log(prefix=""): - """debug 用:输出电压/电量,用于判断是否掉压导致 4G/USB 异常""" - if not debug: - return - try: - v = get_bus_voltage() - p = voltage_to_percent(v) - print(f"[PWR]{prefix} v={v:.3f}V p={p}%") - except Exception as e: - try: - print(f"[PWR]{prefix} read_failed: {e}") - except: - pass - - def _clear_http_events(): - while at_client.pop_http_event() is not None: - pass - - def _parse_httpid(resp: str): - m = re.search(r"\+MHTTPCREATE:\s*(\d+)", resp) - return int(m.group(1)) if m else None - - def _get_ip(): - r = at("AT+CGPADDR=1", "OK", 3000) - m = re.search(r'\+CGPADDR:\s*1,"([^"]+)"', r) - return m.group(1) if m else "" - - def _ensure_pdp(): - ip = _get_ip() - if ip and ip != "0.0.0.0": - return True, ip - at("AT+MIPCALL=1,1", "OK", 15000) - for _ in range(10): - ip = _get_ip() - if ip and ip != "0.0.0.0": - return True, ip - time.sleep(1) - return False, ip - - def _extract_hdr_fields(hdr_text: str): - mlen = re.search(r"Content-Length:\s*(\d+)", hdr_text, re.IGNORECASE) - clen = int(mlen.group(1)) if mlen else None - mmd5 = re.search(r"Content-Md5:\s*([A-Za-z0-9+/=]+)", hdr_text, re.IGNORECASE) - md5_b64 = mmd5.group(1).strip() if mmd5 else None - return clen, md5_b64 - - def _extract_content_range(hdr_text: str): - m = re.search(r"Content-Range:\s*bytes\s*(\d+)\s*-\s*(\d+)\s*/\s*(\d+)", hdr_text, re.IGNORECASE) - if not m: - return None, None, None - try: - return int(m.group(1)), int(m.group(2)), int(m.group(3)) - except: - return None, None, None - - def _hard_reset_http(): - """ - 模块进入“坏状态”时的保守清场: - - 清空 ATClient 的事件队列,避免串台 - - 删除 0..5 的 httpid(常见固件槽位范围),尽量把内部 HTTP 状态机拉回干净 - 注意:很慢,所以只在连续异常时调用。 - """ - _clear_http_events() - for i in range(0, 6): - try: - at(f"AT+MHTTPDEL={i}", "OK", 1200) - except: - pass - _clear_http_events() - - def _create_httpid(full_reset=False): - _clear_http_events() - at_client.flush() - if full_reset: - _hard_reset_http() - resp = at(f'AT+MHTTPCREATE="{base_url}"', "OK", 8000) - hid = _parse_httpid(resp) - return hid, resp - - def _fetch_range_into_buf(start, want_len, out_buf, full_reset=False): - """ - 请求 Range [start, start+want_len) ,写入 out_buf(bytearray,长度=want_len) - 返回 (ok, msg, total_len, md5_b64, got_len) - """ - end_incl = start + want_len - 1 - hid, cresp = _create_httpid(full_reset=full_reset) - if hid is None: - return False, f"MHTTPCREATE failed: {cresp}", None, None, 0 - - # 降低 URC 压力(分片/延迟) - at(f'AT+MHTTPCFG="fragment",{hid},{FRAG_SIZE},{FRAG_DELAY}', "OK", 1500) - # 设置 Range header(inclusive) - at(f'AT+MHTTPCFG="header",{hid},"Range: bytes={start}-{end_incl}"', "OK", 3000) - - req = at(f'AT+MHTTPREQUEST={hid},1,0,"{path}"', "OK", 15000) - if "ERROR" in req or "CME ERROR" in req: - at(f"AT+MHTTPDEL={hid}", "OK", 2000) - return False, f"MHTTPREQUEST failed: {req}", None, None, 0 - - # 等 header + content - # 注意:部分 ML307R 固件会把 header 分成多条 +MHTTPURC:"header" 分片吐出来, - # 其中有的分片只有 Content-Length,有的只有 Content-Range。 - # 因此这里需要做“累积解析”,否则会出现 resp_total=None -> no_header_or_total。 - hdr_text = None - hdr_accum = "" - code = None - resp_total = None - total_len = None - md5_b64 = None - - got_ranges = set() - last_sum = 0 - t0 = time.ticks_ms() - # Range 场景不宜等待太久,卡住就换 hid 重来 - timeout_ms = 9000 - logged_hdr = False - - while time.ticks_ms() - t0 < timeout_ms: - ev = at_client.pop_http_event() - if not ev: - time.sleep_ms(5) - continue - - if ev[0] == "header": - _, ehid, ecode, ehdr = ev - if ehid != hid: - continue - code = ecode - hdr_text = ehdr - # 累积 header 文本并从累积内容里提取字段(避免 split header 丢字段) - if ehdr: - hdr_accum = (hdr_accum + "\n" + ehdr) if hdr_accum else ehdr - - resp_total_tmp, md5_tmp = _extract_hdr_fields(hdr_accum) - if md5_tmp: - md5_b64 = md5_tmp - cr_s, cr_e, cr_total = _extract_content_range(hdr_accum) - if cr_total is not None: - total_len = cr_total - # 有些 header 没有 Content-Length,但有 Content-Range(206),可由 range 计算出本次 body 长度 - if resp_total_tmp is not None: - resp_total = resp_total_tmp - elif resp_total is None and (cr_s is not None) and (cr_e is not None) and (cr_e >= cr_s): - resp_total = (cr_e - cr_s + 1) - # 206 才是 Range 正常响应;部分服务器可能忽略 Range 返回 200 - # 节流:每个 hid 只打一次 header(否则你会看到连续 3-4 条 [HDR],且很多 cr=None) - if (not logged_hdr) and (resp_total is not None or total_len is not None): - _log(f"[HDR] id={hid} code={code} clen={resp_total} cr={cr_s}-{cr_e}/{cr_total}") - logged_hdr = True - continue - - if ev[0] == "content": - _, ehid, _total, _sum, _cur, payload = ev - if ehid != hid: - continue - if resp_total is None: - resp_total = _total - if resp_total is None or resp_total <= 0: - continue - start_rel = _sum - _cur - end_rel = _sum - if start_rel < 0 or start_rel >= resp_total: - continue - if end_rel > resp_total: - end_rel = resp_total - actual_len = min(len(payload), end_rel - start_rel) - if actual_len <= 0: - continue - out_buf[start_rel:start_rel + actual_len] = payload[:actual_len] - got_ranges.add((start_rel, start_rel + actual_len)) - if _sum > last_sum: - last_sum = _sum - # 进度日志节流 - if debug and (last_sum >= resp_total or (last_sum % 512 == 0)): - _log(f"[CHUNK] {start}+{last_sum}/{resp_total}") - - # 收齐就退出 - if last_sum >= resp_total: - break - - # 清理实例(快路径:只删当前 hid) - try: - at(f"AT+MHTTPDEL={hid}", "OK", 2000) - except: - pass - - if resp_total is None: - return False, "no_header_or_total", total_len, md5_b64, 0 - - # 计算实际填充长度 - merged = sorted(got_ranges) - filled = 0 - prev = 0 - for s, e in merged: - if e <= s: - continue - if s > prev: - # 有洞 - pass - prev = max(prev, e) - # 重新合并算 filled - merged2 = [] - for s, e in merged: - if not merged2 or s > merged2[-1][1]: - merged2.append((s, e)) - else: - merged2[-1] = (merged2[-1][0], max(merged2[-1][1], e)) - filled = sum(e - s for s, e in merged2) - - if filled < resp_total: - return False, f"incomplete_chunk got={filled} expected={resp_total} code={code}", total_len, md5_b64, filled - - got_len = resp_total - # 如果服务器忽略 Range 返回 200,resp_total 可能是整文件,这里允许 want_len 不匹配 - return True, "OK", total_len, md5_b64, got_len - - global ota_in_progress - try: - ota_in_progress = int(ota_in_progress) + 1 - except: - ota_in_progress = 1 - with uart4g_lock: - try: - ok_pdp, ip = _ensure_pdp() - if not ok_pdp: - return False, f"PDP not ready (ip={ip})" - - # 先清空旧事件,避免串台 - _clear_http_events() - - # 为了支持随机写入,先创建空文件 - try: - with open(filename, "wb") as f: - f.write(b"") - except Exception as e: - return False, f"open_file_failed: {e}" - - total_len = None - expect_md5_b64 = None - - offset = 0 - chunk = CHUNK_MAX - t_start = time.ticks_ms() - last_progress_ms = t_start - STALL_TIMEOUT_MS = 60000 # 60s 没有任何 offset 推进则判定卡死 - last_pwr_ms = t_start - _pwr_log(prefix=" ota_start") - bad_http_state = 0 # 连续“疑似模块 HTTP 坏状态”的计数,达到阈值才做 full reset - - while True: - now = time.ticks_ms() - # debug:每 5 秒打印一次电压/电量 + 进度 - if debug and time.ticks_diff(now, last_pwr_ms) >= 5000: - last_pwr_ms = now - _pwr_log(prefix=f" off={offset}/{total_len or '?'}") - # 超时保护:整体(很宽,避免“慢但在推进”的情况误判失败) - if time.ticks_diff(now, t_start) > total_timeout_ms: - return False, f"timeout overall after {total_timeout_ms}ms offset={offset} total={total_len}" - - # 超时保护:无进展(关键) - if time.ticks_diff(now, last_progress_ms) > STALL_TIMEOUT_MS: - return False, f"timeout stalled {STALL_TIMEOUT_MS}ms offset={offset} total={total_len}" - - if total_len is not None and offset >= total_len: - break - - want = chunk - if total_len is not None: - remain = total_len - offset - if remain <= 0: - break - if want > remain: - want = remain - - # 本 chunk 的 buffer(长度=want) - buf = bytearray(want) - - success = False - last_err = "unknown" - md5_seen = None - got_len = 0 - for k in range(1, CHUNK_RETRIES + 1): - # 只有在连续坏状态时才 full reset,否则只删当前 hid(更快) - do_full_reset = (bad_http_state >= 2) - ok, msg, tlen, md5_b64, got = _fetch_range_into_buf(offset, want, buf, full_reset=do_full_reset) - last_err = msg - if tlen is not None and total_len is None: - total_len = tlen - if md5_b64 and not expect_md5_b64: - expect_md5_b64 = md5_b64 - if ok: - success = True - got_len = got - bad_http_state = 0 - break - - # 判定是否属于“模块 HTTP 坏状态”(header-only/no header/request err 等) + cam = camera.Camera(640, 480) + start = time.ticks_ms() + timeout_ms = 8000 + while laser_manager.calibration_active and time.ticks_diff(time.ticks_ms(), start) < timeout_ms: + frame = cam.read() + pos = laser_manager.find_red_laser(frame) + if pos: + laser_manager.set_calibration_result(pos) + laser_manager.stop_calibration() + laser_manager.save_laser_point(pos) + logger = logger_manager.logger + if logger: + logger.info(f"✅ 后台校准成功: {pos}") + break + time.sleep_ms(60) + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[LASER] calibration error: {e}") + import traceback + logger.error(traceback.format_exc()) + time.sleep_ms(200) + finally: try: - if ("no_header_or_total" in msg) or ("MHTTPREQUEST failed" in msg) or ("MHTTPCREATE failed" in msg): - bad_http_state += 1 - else: - bad_http_state = max(0, bad_http_state - 1) - except: - pass + if cam is not None: + del cam + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[LASER] 释放相机资源异常: {e}") - # 失败:缩小 chunk,提高成功率 - if chunk > CHUNK_MIN: - chunk = max(CHUNK_MIN, chunk // 2) - want = min(chunk, want) - buf = bytearray(want) - _log(f"[RETRY] off={offset} want={want} try={k} err={msg}") - _pwr_log(prefix=f" retry{k} off={offset}") - time.sleep_ms(120) - - if not success: - return False, f"chunk_failed off={offset} want={want} err={last_err} total={total_len}" - - # 写入文件(注意:got_len 可能 > want(服务器忽略 Range 返回 200)) - # 只写入当前请求的 want 字节(buf),避免越界 - try: - with open(filename, "r+b") as f: - f.seek(offset) - f.write(bytes(buf)) - except Exception as e: - return False, f"write_failed off={offset}: {e}" - - offset += len(buf) - last_progress_ms = time.ticks_ms() - # 成功推进后恢复 chunk - chunk = CHUNK_MAX - if debug: - _log(f"[OK] offset={offset}/{total_len or '?'}") - - # 可选:如果有 Content-Md5 且 hashlib 可用,做校验(Range 响应未必会提供 md5) - if expect_md5_b64 and hashlib is not None: - try: - with open(filename, "rb") as f: - data = f.read() - digest = hashlib.md5(data).digest() - got_b64 = binascii.b2a_base64(digest).decode().strip() - if got_b64 != expect_md5_b64: - return False, f"md5_mismatch got={got_b64} expected={expect_md5_b64}" - except Exception as e: - return False, f"md5_check_failed: {e}" - - t_cost = time.ticks_diff(time.ticks_ms(), t_func0) - return True, f"OK size={offset} ip={ip} cost_ms={t_cost}" - finally: - try: - ota_in_progress = max(0, int(ota_in_progress) - 1) - except: - ota_in_progress = 0 - - -def direct_ota_download_via_4g(ota_url): - """通过 4G 模块下载 OTA(不需要 Wi-Fi)""" - global update_thread_started, ota_in_progress, tcp_connected - try: - t_ota0 = time.ticks_ms() - if not ota_url: - safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2) - return - - # OTA 全程暂停 TCP(避免心跳/重连抢占 uart4g_lock,导致 server 断链 + HTTP URC 更容易丢) - try: - ota_in_progress = int(ota_in_progress) + 1 - except: - ota_in_progress = 1 - - # 主动断开 AT TCP,减少 +MIPURC 噪声干扰 HTTP URC 下载 - tcp_connected = False - try: - with uart4g_lock: - at("AT+MIPCLOSE=0", "OK", 1500) - except: - pass - - print(f"[OTA-4G] 开始通过 4G 下载: {ota_url}") - # 重要说明: - # - AT+MDIALUP / RNDIS 是“USB 主机拨号上网”模式,在不少 ML307R 固件上会占用/切换内部网络栈, - # 从而导致 AT+MIPOPEN / +MIPURC 这套 TCP 连接无法工作(你会看到一直“连接到服务器...”)。 - # - 这个设备当前 4G 是走 UART + AT Socket(MIPOPEN),并没有把 4G 变成系统网卡(如 ppp0)。 - # 因此这里不再自动拨号/改路由;只有当系统本来就有 default route(例如 eth0 已联网)时,才尝试走 requests 下载。 - - ok_sys = False - msg_sys = "" - try: - if _has_default_route(): - # 1) 先试原始 URL(可能是 https) - ok_sys, msg_sys = _download_file_system_bytes(ota_url, local_filename, timeout_s=30) - if (not ok_sys) and isinstance(ota_url, str) and ota_url.startswith("https://static.shelingxingqiu.com/"): - # 2) 部分系统 SSL 不完整:对固定域名降级到 http 再试一次 - http_url = "http://" + ota_url[len("https://"):] - ok_sys, msg_sys = _download_file_system_bytes(http_url, local_filename, timeout_s=30) + if laser_manager.calibration_active: + time.sleep_ms(300) else: - ok_sys = False - msg_sys = "no_default_route (system network not available)" + time.sleep_ms(50) except Exception as e: - ok_sys = False - msg_sys = f"system_download_exception: {e}" + # 线程顶层异常捕获,防止线程静默退出 + logger = logger_manager.logger + if logger: + logger.error(f"[LASER] 校准线程异常: {e}") + import traceback + logger.error(traceback.format_exc()) + else: + print(f"[LASER] 校准线程异常: {e}") + import traceback + traceback.print_exc() + time.sleep_ms(1000) # 等待1秒后继续 - if ok_sys: - print(f"[OTA-4G] system {msg_sys}") - if apply_ota_and_reboot(ota_url): - return - - print(f"[OTA-4G] system failed: {msg_sys} -> fallback to URC download") - # debug:进入 URC 下载前打印一次电压/电量(防止下载太快看不到 [PWR] 周期日志) - try: - v = get_bus_voltage() - p = voltage_to_percent(v) - print(f"[OTA-4G][PWR] before_urc v={v:.3f}V p={p}%") - except Exception as e: - print(f"[OTA-4G][PWR] before_urc read_failed: {e}") - - t_dl0 = time.ticks_ms() - success, msg = download_file_via_4g(ota_url, local_filename, debug=True) - t_dl_cost = time.ticks_diff(time.ticks_ms(), t_dl0) - print(f"[OTA-4G] {msg}") - print(f"[OTA-4G] download_cost_ms={t_dl_cost}") - - if success and "OK" in msg: - if apply_ota_and_reboot(ota_url): - return - else: - safe_enqueue({"result": msg_sys or msg}, 2) - - except Exception as e: - error_msg = f"OTA-4G 异常: {str(e)}" - print(error_msg) - safe_enqueue({"result": "ota_failed", "reason": error_msg}, 2) - finally: - # 总耗时(注意:若成功并 reboot,这行可能来不及打印) - try: - t_cost = time.ticks_diff(time.ticks_ms(), t_ota0) - print(f"[OTA-4G] total_cost_ms={t_cost}") - except: - pass - update_thread_started = False - # 对应上面的 ota_in_progress +1 - try: - ota_in_progress = max(0, int(ota_in_progress) - 1) - except: - ota_in_progress = 0 - - -# ==================== 主程序入口 ==================== def cmd_str(): - global DEVICE_ID, PASSWORD + """主程序入口""" + # ==================== 第一阶段:硬件初始化 ==================== + # 按照 main104.py 的顺序,先完成所有硬件初始化 - DEVICE_ID = read_device_id() - PASSWORD = DEVICE_ID + "." - - # 创建照片存储目录 - photo_dir = "/root/phot" - if photo_dir not in os.listdir("/root"): + # 1. 引脚功能映射 + for pin, func in config.PIN_MAPPINGS.items(): try: - os.mkdir(photo_dir) + pinmap.set_pin_function(pin, func) except: pass - - # 初始化硬件 + + # 2. 初始化硬件对象(UART、I2C、ADC) + hardware_manager.init_uart4g() + hardware_manager.init_distance_serial() + hardware_manager.init_bus() + hardware_manager.init_adc() + hardware_manager.init_at_client() + + # 3. 初始化 INA226 电量监测芯片 init_ina226() - load_laser_point() + + # 4. 加载激光点配置 + laser_manager.load_laser_point() + + # 5. 初始化显示和相机 disp = display.Display() cam = camera.Camera(640, 480) + + # ==================== 第二阶段:软件初始化 ==================== + + # 1. 初始化日志系统 + import logging + logger_manager.init_logging(log_level=logging.DEBUG) + logger = logger_manager.logger + + # 2. 从4G模块同步系统时间(需要 at_client 已初始化) + sync_system_time_from_4g() + + # 3. 启动时检查:是否需要恢复备份 + pending_path = f"{config.APP_DIR}/ota_pending.json" + if os.path.exists(pending_path): + try: + with open(pending_path, 'r', encoding='utf-8') as f: + pending_obj = json.load(f) + + restart_count = pending_obj.get('restart_count', 0) + max_restarts = pending_obj.get('max_restarts', 3) + backup_dir = pending_obj.get('backup_dir') + + if logger: + logger.info(f"检测到 ota_pending.json,重启计数: {restart_count}/{max_restarts}") + + if restart_count >= max_restarts: + if logger: + logger.error(f"[STARTUP] 重启次数 ({restart_count}) 超过阈值 ({max_restarts}),执行恢复...") + + if backup_dir and os.path.exists(backup_dir): + if ota_manager.restore_from_backup(backup_dir): + if logger: + logger.info(f"[STARTUP] 已从指定备份恢复: {backup_dir}") + else: + if logger: + logger.info(f"[STARTUP] 指定备份恢复失败,尝试恢复最新备份...") + ota_manager.restore_from_backup(None) + else: + if ota_manager.restore_from_backup(None): + if logger: + logger.info(f"[STARTUP] 已从最新备份恢复") + else: + if logger: + logger.error(f"[STARTUP] 恢复备份失败") + + try: + os.remove(pending_path) + if logger: + logger.info(f"[STARTUP] 已删除 ota_pending.json") + except Exception as e: + if logger: + logger.error(f"[STARTUP] 删除 pending 文件失败: {e}") + + if logger: + logger.info(f"[STARTUP] 恢复完成,准备重启系统...") + time.sleep_ms(2000) + os.system("reboot") + return + else: + pending_obj['restart_count'] = restart_count + 1 + try: + with open(pending_path, 'w', encoding='utf-8') as f: + json.dump(pending_obj, f) + if logger: + logger.info(f"[STARTUP] 已更新重启计数: {pending_obj['restart_count']}") + except Exception as e: + if logger: + logger.error(f"[STARTUP] 更新重启计数失败: {e}") + except Exception as e: + if logger: + logger.error(f"[STARTUP] 检查 pending 文件时出错: {e}") + try: + if logger: + logger.info(f"[STARTUP] pending 文件可能损坏,尝试恢复备份...") + if ota_manager.restore_from_backup(None): + os.remove(pending_path) + if logger: + logger.info(f"[STARTUP] 已恢复备份并删除损坏的 pending 文件") + time.sleep_ms(2000) + os.system("reboot") + return + except: + pass + + # 4. 初始化设备ID(network_manager 内部会自动设置 device_id 和 password) + network_manager.read_device_id() + + # 5. 创建照片存储目录(如果启用图像保存) + if config.SAVE_IMAGE_ENABLED: + photo_dir = config.PHOTO_DIR + if photo_dir not in os.listdir("/root"): + try: + os.mkdir(photo_dir) + except: + pass - # 启动通信与校准线程 - # from tcp_handler import tcp_main - _thread.start_new_thread(tcp_main, ()) + # 6. 启动通信与校准线程 + _thread.start_new_thread(network_manager.tcp_main, ()) _thread.start_new_thread(laser_calibration_worker, ()) - print("系统准备完成...") + if logger: + logger.info("系统准备完成...") last_adc_trigger = 0 # 主循环:检测扳机触发 → 拍照 → 分析 → 上报 while not app.need_exit(): - current_time = time.ticks_ms() - # OTA 期间尽量省电:暂停相机预览/拍照/分析,只保留最低频率的循环 try: - if int(globals().get("ota_in_progress", 0)) > 0: + current_time = time.ticks_ms() + + # OTA 期间暂停相机预览 + try: + if ota_manager.ota_in_progress: + time.sleep_ms(250) + continue + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[MAIN] OTA检查异常: {e}") time.sleep_ms(250) continue - except: - pass - - # print("压力传感器数值: ", adc_obj.read()) - adc_val = adc_obj.read() - # if adc_val > 2400: - # print(f"adc: {adc_val}") - if adc_val > ADC_TRIGGER_THRESHOLD: - diff_ms = current_time-last_adc_trigger - if diff_ms<3000: - continue - last_adc_trigger = current_time - time.sleep_ms(60) # 防抖 - frame = cam.read() - x, y = laser_point - - # 绘制激光十字线 - frame.draw_line(int(x - length), int(y), int(x + length), int(y), color, thickness) - frame.draw_line(int(x), int(y - length), int(x), int(y + length), color, thickness) - frame.draw_circle(int(x), int(y), 1, color, thickness) - - # 检测靶心 - result_img, center, radius, method, best_radius1 = detect_circle(frame) - disp.show(result_img) - - # 计算偏移与距离 - dx, dy = compute_laser_position(center, (x, y), radius, method) - distance_m = estimate_distance(best_radius1) - - # 读取电量 - voltage = get_bus_voltage() - battery_percent = voltage_to_percent(voltage) - - # 保存图像(带标注) + + # 读取ADC值(扳机检测) try: - jpg_count = len([f for f in os.listdir(photo_dir) if f.endswith('.jpg')]) - filename = f"{photo_dir}/{int(x)}_{int(y)}_{round((distance_m or 0.0) * 100)}_{method}_{jpg_count:04d}.jpg" - result_img.save(filename, quality=70) + if network_manager.manual_trigger_flag: + network_manager.clear_manual_trigger() + adc_val = config.ADC_TRIGGER_THRESHOLD + 1 + if logger: + logger.info("[TEST] TCP命令触发射箭") + else: + adc_val = hardware_manager.adc_obj.read() except Exception as e: - print(f"❌ 保存失败: {e}") + logger = logger_manager.logger + if logger: + logger.error(f"[MAIN] ADC读取异常: {e}") + time.sleep_ms(100) + continue + + if adc_val > config.ADC_TRIGGER_THRESHOLD: + diff_ms = current_time - last_adc_trigger + if diff_ms < 3000: + continue + last_adc_trigger = current_time + + try: + frame = cam.read() + laser_point = laser_manager.laser_point + if laser_point is None: + logger = logger_manager.logger + if logger: + logger.warning("[MAIN] 激光点未初始化,跳过本次检测") + time.sleep_ms(100) + continue + + x, y = laser_point - # 构造上报数据 - inner_data = { - "x": float(dx) if dx is not None else 200.0, - "y": float(dy) if dy is not None else 200.0, - "r": 90.0, - "d": round((distance_m or 0.0) * 100), # 距离(厘米) - "m": method, - "adc": adc_val - } - report_data = {"cmd": 1, "data": inner_data} - # 射箭事件高优先级入队,由 tcp_main 统一发送 - safe_enqueue(report_data, msg_type=2, high=True) - print("📤 射箭事件已加入发送队列") + # 绘制激光十字线 + color = image.Color(config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2]) + frame.draw_line( + int(x - config.LASER_LENGTH), int(y), + int(x + config.LASER_LENGTH), int(y), + color, config.LASER_THICKNESS + ) + frame.draw_line( + int(x), int(y - config.LASER_LENGTH), + int(x), int(y + config.LASER_LENGTH), + color, config.LASER_THICKNESS + ) + frame.draw_circle(int(x), int(y), 1, color, config.LASER_THICKNESS) - # 闪一下激光(射箭反馈) - # TODO: remove after test done - flash_laser(1000) # 闪300ms,可以根据需要调整时长 + # 检测靶心 + result_img, center, radius, method, best_radius1, ellipse_params = detect_circle_v3(frame, laser_point) + disp.show(result_img) - time.sleep_ms(100) - else: - disp.show(cam.read()) - time.sleep_ms(50) + # 计算偏移与距离(如果检测到靶心) + if center and radius: + dx, dy = compute_laser_position(center, (x, y), radius, method) + distance_m = estimate_distance(best_radius1) + else: + # 未检测到靶心 + dx, dy = None, None + distance_m = None + if logger: + logger.warning("[MAIN] 未检测到靶心,但会保存图像") + + # 快速激光测距(激光一闪而过,约500-600ms) + laser_distance_m = None + try: + laser_distance_m = laser_manager.quick_measure_distance() + if logger: + if laser_distance_m > 0: + logger.info(f"[MAIN] 激光测距成功: {laser_distance_m:.3f} m") + else: + logger.warning("[MAIN] 激光测距失败或返回0") + except Exception as e: + if logger: + logger.error(f"[MAIN] 激光测距异常: {e}") + + # 读取电量 + voltage = get_bus_voltage() + battery_percent = voltage_to_percent(voltage) + + # 保存图像(无论是否检测到靶心都保存) + # save_shot_image 函数会确保绘制激光十字线和检测标注(如果有) + # 如果未检测到靶心,文件名会包含 "no_target" 标识 + save_shot_image( + result_img, + center, + radius, + method, + ellipse_params, + (x, y), + distance_m, + photo_dir=config.PHOTO_DIR if config.SAVE_IMAGE_ENABLED else None + ) + + # 构造上报数据 + inner_data = { + "x": float(dx) if dx is not None else 200.0, + "y": float(dy) if dy is not None else 200.0, + "r": 90.0, + "d": round((distance_m or 0.0) * 100), # 视觉测距值(厘米) + "d_laser": round((laser_distance_m or 0.0) * 100), # 激光测距值(厘米) + "m": method if method else "no_target", + "adc": adc_val + } + report_data = {"cmd": 1, "data": inner_data} + network_manager.safe_enqueue(report_data, msg_type=2, high=True) + if logger: + if center and radius: + logger.info("射箭事件已加入发送队列(已检测到靶心)") + else: + logger.info("射箭事件已加入发送队列(未检测到靶心,已保存图像)") + + # 闪一下激光(射箭反馈) + # laser_manager.flash_laser(1000) + + time.sleep_ms(100) + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[MAIN] 图像处理异常: {e}") + import traceback + logger.error(traceback.format_exc()) + time.sleep_ms(100) + continue + else: + try: + disp.show(cam.read()) + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[MAIN] 显示异常: {e}") + time.sleep_ms(50) -def dump_system_info(): - cmds = [ - "uname -a", - "cat /etc/os-release 2>/dev/null", - "cat /proc/version 2>/dev/null", - "cat /proc/1/comm 2>/dev/null", - "ps 2>/dev/null | head -n 5", - "ls -l /sbin/init 2>/dev/null", - "ls -l /etc/init.d 2>/dev/null | head -n 10", - "which systemctl 2>/dev/null", - "which rc-service 2>/dev/null", - "which busybox 2>/dev/null && busybox | head -n 1", - "ls /dev/watchdog* 2>/dev/null", - ] - for c in cmds: - try: - out = os.popen(c).read() - print("\n$ " + c + "\n" + (out.strip() or "")) except Exception as e: - print("\n$ " + c + "\n " + str(e)) + # 主循环的顶层异常捕获,防止程序静默退出 + logger = logger_manager.logger + if logger: + logger.error(f"[MAIN] 主循环异常: {e}") + import traceback + logger.error(traceback.format_exc()) + else: + print(f"[MAIN] 主循环异常: {e}") + import traceback + traceback.print_exc() + time.sleep_ms(1000) # 等待1秒后继续 + +# 主程序入口 if __name__ == "__main__": - # dump_system_info() - # try: - # import threading - # print("threading module:", threading) - # print("has Lock:", hasattr(threading, "Lock")) - # if hasattr(threading, "Lock"): - # print("has lock") - # finally: - # pass - # import config - # print("env: ", config.get_env()) - cmd_str() \ No newline at end of file + try: + cmd_str() + except KeyboardInterrupt: + logger = logger_manager.logger + if logger: + logger.info("[MAIN] 收到中断信号,程序退出") + logger_manager.stop_logging() + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[MAIN] 程序异常退出: {e}") + import traceback + logger.error(traceback.format_exc()) + else: + print(f"[MAIN] 程序异常退出: {e}") + import traceback + traceback.print_exc() + logger_manager.stop_logging() + raise # 重新抛出异常,让系统知道程序异常退出 diff --git a/vision.py b/vision.py index d601c09..585de40 100644 --- a/vision.py +++ b/vision.py @@ -9,7 +9,6 @@ import numpy as np import os import math from maix import image -import globals import config from logger_manager import logger_manager