#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 激光射击系统主程序(视觉测距版) 功能:目标检测、激光校准、4G TCP 通信、OTA 升级、单目测距、INA226 电量监测 平台:MaixPy (Sipeed MAIX) 作者:ZZH 最后更新:2025-11-21 """ from maix import camera, display, image, app, time, key, uart, pinmap, i2c, network, err import cv2 import numpy as np import json import struct import re from maix.peripheral import adc import _thread import os import hmac import ujson import hashlib import requests import socket import re import binascii try: import hashlib except: hashlib = None # import config # ==================== Locks ==================== class _Mutex: """ 基于 _thread.allocate_lock() 的互斥锁封装: - 支持 with - 支持 try_acquire(若固件不支持非阻塞 acquire 参数,则退化为阻塞 acquire) """ def __init__(self): self._lk = _thread.allocate_lock() def acquire(self, blocking=True): try: return self._lk.acquire(blocking) except TypeError: self._lk.acquire() return True def try_acquire(self): return self.acquire(False) def release(self): self._lk.release() def __enter__(self): self.acquire(True) return self def __exit__(self, exc_type, exc, tb): self.release() return False class ATClient: """ 单读者 AT/URC 客户端:唯一读取 uart4g,避免 tcp_main/at()/OTA 抢读导致 EOF / 丢包。 - send(cmd, expect, timeout_ms) : 发送 AT 并等待 expect - pop_tcp_payload() : 获取 +MIPURC:"rtcp" 的 payload(已按长度裁剪) - pop_http_event() : 获取 +MHTTPURC 事件(header/content) """ def __init__(self, uart_obj): self.uart = uart_obj self._cmd_lock = _Mutex() self._q_lock = _Mutex() self._rx = b"" self._tcp_payloads = [] self._http_events = [] # 当前命令等待状态(仅允许单命令 in-flight) self._waiting = False self._expect = b"OK" self._resp = b"" self._running = False def start(self): if self._running: return self._running = True _thread.start_new_thread(self._reader_loop, ()) def stop(self): self._running = False def flush(self): """清空内部缓存与队列(用于 OTA/异常恢复)""" with self._q_lock: self._rx = b"" self._tcp_payloads.clear() self._http_events.clear() self._resp = b"" def pop_tcp_payload(self): with self._q_lock: if self._tcp_payloads: return self._tcp_payloads.pop(0) return None def pop_http_event(self): with self._q_lock: if self._http_events: return self._http_events.pop(0) return None def _push_tcp_payload(self, payload: bytes): # 注意:在 _reader_loop 内部解析 URC 时已经持有 _q_lock, # 这里不要再次 acquire(锁不可重入,会死锁)。 self._tcp_payloads.append(payload) def _push_http_event(self, ev): # 同上:避免在 _reader_loop 持锁期间二次 acquire self._http_events.append(ev) def send(self, cmd: str, expect: str = "OK", timeout_ms: int = 2000): """ 发送 AT 命令并等待 expect(子串匹配)。 注意:expect=">" 用于等待 prompt。 """ expect_b = expect.encode() if isinstance(expect, str) else expect with self._cmd_lock: # 初始化等待 self._waiting = True self._expect = expect_b self._resp = b"" # 发送 if cmd: # 注意:这里不要再用 uart4g_lock(否则外层已经持锁时会死锁)。 # 写入由 _cmd_lock 串行化即可。 self.uart.write((cmd + "\r\n").encode()) t0 = time.ticks_ms() while time.ticks_ms() - t0 < timeout_ms: if (not self._waiting) or (self._expect in self._resp): self._waiting = False break time.sleep_ms(5) # 超时也返回已收集内容(便于诊断) self._waiting = False try: return self._resp.decode(errors="ignore") except: return str(self._resp) def _find_urc_tag(self, tag: bytes): """ 只在“真正的 URC 边界”查找 tag,避免误命中 HTTP payload 内容。 规则:tag 必须出现在 buffer 开头,或紧跟在 b"\\r\\n" 后面。 """ try: i = 0 rx = self._rx while True: j = rx.find(tag, i) if j < 0: return -1 if j == 0: return 0 if j >= 2 and rx[j - 2:j] == b"\r\n": return j i = j + 1 except: return -1 def _parse_mipurc_rtcp(self): """ 解析:+MIPURC: "rtcp",,, 之前硬编码 link_id=0 会导致在多连接/重连场景下收不到数据。 """ prefix = b'+MIPURC: "rtcp",' i = self._find_urc_tag(prefix) if i < 0: return False # 丢掉前置噪声 if i > 0: self._rx = self._rx[i:] i = 0 j = len(prefix) # 解析 link_id k = j while k < len(self._rx) and 48 <= self._rx[k] <= 57: k += 1 if k == j or k >= len(self._rx): return False if self._rx[k:k+1] != b",": self._rx = self._rx[1:] return True try: link_id = int(self._rx[j:k].decode()) except: self._rx = self._rx[1:] return True # 解析 len j2 = k + 1 k2 = j2 while k2 < len(self._rx) and 48 <= self._rx[k2] <= 57: k2 += 1 if k2 == j2 or k2 >= len(self._rx): return False if self._rx[k2:k2+1] != b",": self._rx = self._rx[1:] return True try: n = int(self._rx[j2:k2].decode()) except: self._rx = self._rx[1:] return True payload_start = k2 + 1 payload_end = payload_start + n if len(self._rx) < payload_end: return False # payload 未收齐 payload = self._rx[payload_start:payload_end] # 把 link_id 一起带上,便于上层过滤(如果需要) self._push_tcp_payload((link_id, payload)) self._rx = self._rx[payload_end:] return True def _parse_mhttpurc_header(self): tag = b'+MHTTPURC: "header",' i = self._find_urc_tag(tag) if i < 0: return False if i > 0: self._rx = self._rx[i:] i = 0 # header: +MHTTPURC: "header",,,, j = len(tag) comma_count = 0 k = j while k < len(self._rx) and comma_count < 3: if self._rx[k:k+1] == b",": comma_count += 1 k += 1 if comma_count < 3: return False prefix = self._rx[:k] m = re.search(rb'\+MHTTPURC: "header",\s*(\d+),\s*(\d+),\s*(\d+),', prefix) if not m: self._rx = self._rx[1:] return True urc_id = int(m.group(1)) code = int(m.group(2)) hdr_len = int(m.group(3)) text_start = k text_end = text_start + hdr_len if len(self._rx) < text_end: return False hdr_text = self._rx[text_start:text_end].decode("utf-8", "ignore") self._push_http_event(("header", urc_id, code, hdr_text)) self._rx = self._rx[text_end:] return True def _parse_mhttpurc_content(self): tag = b'+MHTTPURC: "content",' i = self._find_urc_tag(tag) if i < 0: return False if i > 0: self._rx = self._rx[i:] i = 0 # content: +MHTTPURC: "content",,,,, j = len(tag) comma_count = 0 k = j while k < len(self._rx) and comma_count < 4: if self._rx[k:k+1] == b",": comma_count += 1 k += 1 if comma_count < 4: return False prefix = self._rx[:k] m = re.search(rb'\+MHTTPURC: "content",\s*(\d+),\s*(\d+),\s*(\d+),\s*(\d+),', prefix) if not m: self._rx = self._rx[1:] return True urc_id = int(m.group(1)) total_len = int(m.group(2)) sum_len = int(m.group(3)) cur_len = int(m.group(4)) payload_start = k payload_end = payload_start + cur_len if len(self._rx) < payload_end: return False payload = self._rx[payload_start:payload_end] self._push_http_event(("content", urc_id, total_len, sum_len, cur_len, payload)) self._rx = self._rx[payload_end:] return True def _reader_loop(self): while self._running: # 关键:UART 驱动偶发 read failed,必须兜住,否则线程挂了 OTA/TCP 都会卡死 try: d = self.uart.read(4096) # 8192 在一些驱动上更容易触发 read failed except Exception as e: try: print("[ATClient] uart read failed:", e) except: pass time.sleep_ms(50) continue if not d: time.sleep_ms(1) continue with self._q_lock: self._rx += d if self._waiting: self._resp += d while True: progressed = ( self._parse_mipurc_rtcp() or self._parse_mhttpurc_header() or self._parse_mhttpurc_content() ) if not progressed: break try: ota_flag = int(globals().get("ota_in_progress", 0)) > 0 except: ota_flag = False has_http_hint = (b"+MHTTP" in self._rx) or (b"+MHTTPURC" in self._rx) if ota_flag or has_http_hint: if len(self._rx) > 512 * 1024: self._rx = self._rx[-256 * 1024:] else: if len(self._rx) > 16384: self._rx = self._rx[-4096:] # ==================== 全局配置 ==================== # OTA 升级地址与本地路径 # url = "https://static.shelingxingqiu.com/shoot/202511031031/main.py" local_filename = "/maixapp/apps/t11/main_tmp.py" app_version = '1.0.2' # OTA 下发参数(由后端指令写入) OTA_URL = None OTA_MODE = None # "4g" / "wifi" / None def is_wifi_connected(): """尽量判断当前是否有 Wi-Fi(有则走 Wi-Fi OTA,否则走 4G OTA)""" # 优先用 MaixPy network(如果可用) try: wlan = network.WLAN(network.TYPE_WIFI) if wlan.isconnected(): return True except: pass # 兜底:看系统 wlan0 有没有 IP(你系统可能没有 wlan0,则返回 False) try: ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() return bool(ip) except: return False # 设备认证信息(运行时动态加载) DEVICE_ID = None PASSWORD = None # 服务器连接参数 SERVER_IP = "www.shelingxingqiu.com" SERVER_PORT = 50005 HEARTBEAT_INTERVAL = 60 # 心跳间隔(秒) # 激光校准配置 CONFIG_FILE = "/root/laser_config.json" DEFAULT_POINT = (640, 480) # 默认激光中心点(图像中心) laser_point = DEFAULT_POINT # HTTP 上报接口 URL = "http://ws.shelingxingqiu.com" API_PATH = "/home/shoot/device_fire/arrow/fire" # UART 设备初始化 uart4g = uart.UART("/dev/ttyS2", 115200) # 4G 模块通信 distance_serial = uart.UART("/dev/ttyS1", 9600) # 激光测距模块 # 单读者 ATClient(唯一读取 uart4g) at_client = ATClient(uart4g) at_client.start() # 引脚功能映射 pinmap.set_pin_function("A18", "UART1_RX") pinmap.set_pin_function("A19", "UART1_TX") pinmap.set_pin_function("A29", "UART2_RX") pinmap.set_pin_function("A28", "UART2_TX") pinmap.set_pin_function("P18", "I2C1_SCL") pinmap.set_pin_function("P21", "I2C1_SDA") # pinmap.set_pin_function("A15", "I2C5_SCL") # pinmap.set_pin_function("A27", "I2C5_SDA")#ota升级要修改的 # ADC 触发阈值(用于检测扳机/激光触发) ADC_TRIGGER_THRESHOLD = 3000 ADC_LASER_THRESHOLD = 3000 # 显示参数:激光十字线样式 color = image.Color(255, 100, 0) thickness = 1 length = 2 # 全局状态变量 laser_calibration_active = False # 是否正在后台校准激光 laser_calibration_result = None # 校准结果坐标 (x, y) laser_calibration_lock = _Mutex() # 互斥锁,防止多线程冲突 # 硬件对象初始化 laser_x, laser_y = laser_point adc_obj = adc.ADC(0, adc.RES_BIT_12) bus = i2c.I2C(1, i2c.Mode.MASTER) # 使用 I2C1 总线 # bus = i2c.I2C(5, i2c.Mode.MASTER) #ota升级的 # INA226 电流/电压监测芯片寄存器地址 INA226_ADDR = 0x40 REG_CONFIGURATION = 0x00 REG_BUS_VOLTAGE = 0x02 REG_CALIBRATION = 0x05 CALIBRATION_VALUE = 0x1400 # 激光控制指令(自定义协议) MODULE_ADDR = 0x00 LASER_ON_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC1]) LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0]) # 相机标定参数(用于距离估算) # FOCAL_LENGTH_PIX = 3800.0 # 焦距(像素) FOCAL_LENGTH_PIX = 1900.0 # 焦距(像素) REAL_RADIUS_CM = 15 # 靶心实际半径(厘米) # # TCP 连接状态 tcp_connected = False high_send_queue = [] # 高优先级发送队列:射箭事件等 normal_send_queue = [] # 普通发送队列:电量/校准结果/状态等 queue_lock = _Mutex() # 互斥锁,保护队列 uart4g_lock = _Mutex() # 互斥锁,保护 4G 串口 AT 发送流程(防并发) update_thread_started = False # 防止 OTA 更新线程重复启动 # OTA(4G HTTP URC) 期间“暂停 TCP 活动/让读线程用大 buffer”: # 用计数器而不是 bool,避免“外层 OTA 还没结束,内层 downloader finally 又把它置 False”。 ota_in_progress = 0 # ==================== 工具函数 ==================== def download_file(url, filename): """从指定 URL 下载文件并保存为 UTF-8 编码文本""" try: print(f"正在从 {url} 下载文件...") response = requests.get(url) response.raise_for_status() response.encoding = 'utf-8' with open(filename, 'w', encoding='utf-8') as file: file.write(response.text) return f"下载成功!文件已保存为: {filename}" except requests.exceptions.RequestException as e: return f"下载失败!网络请求错误: {e}" except OSError as e: return f"下载失败!文件写入错误: {e}" except Exception as e: return f"下载失败!发生未知错误: {e}" def _download_file_system_bytes(url, filename, timeout_s=25): """ 走“系统网络栈”的下载(RNDIS/ECM/eth0 等),用 TCP 自带重传,适合 OTA 大文件。 注意:这里写 bytes,不做编码假设。 """ try: print(f"[NET] system download: {url} -> {filename}") r = requests.get(url, timeout=timeout_s) r.raise_for_status() data = r.content with open(filename, "wb") as f: f.write(data) return True, f"OK size={len(data)}" except Exception as e: return False, f"system_download_failed: {e}" def _has_default_route(): try: out = os.popen("ip route 2>/dev/null").read() for line in out.splitlines(): if line.strip().startswith("default "): return True return False except: return False def _get_if_ipv4(ifname: str): try: out = os.popen(f"ip -4 addr show {ifname} 2>/dev/null").read() m = re.search(r"\binet\s+(\d+\.\d+\.\d+\.\d+)/(\d+)", out) if not m: return None return m.group(1) except: return None def _ping_once(ip: str, ifname=None, timeout_s=1): # busybox ping 可能不支持 -W;做两套尝试 if ifname: cmd1 = f"ping -I {ifname} -c 1 -W {timeout_s} {ip} >/dev/null 2>&1" cmd2 = f"ping -I {ifname} -c 1 {ip} >/dev/null 2>&1" else: cmd1 = f"ping -c 1 -W {timeout_s} {ip} >/dev/null 2>&1" cmd2 = f"ping -c 1 {ip} >/dev/null 2>&1" rc = os.system(cmd1) if rc == 0: return True rc = os.system(cmd2) return rc == 0 def ensure_ml307r_dialup_and_route(prefer_if=("usb0", "usb1"), metric=200, debug=True): """ 用 ML307R 的 RNDIS/ECM 方式把“系统网络”拉起来: - AT+MDIALUP=1,1(拨号) - 为 usb0/usb1 添加 default route(不改 IP,尽量不影响现有管理网段) 目标:让 requests.get() 能直接下载 OTA,避免 UART URC 丢包。 """ def _dlog(*a): if debug: print("[DIALUP]", *a) # 1) 拨号(重复执行也安全) try: with uart4g_lock: at('AT+MDIALUPCFG="mode"', "OK", 2000) r = at("AT+MDIALUP=1,1", "OK", 20000) _dlog("MDIALUP resp:", r) except Exception as e: _dlog("MDIALUP exception:", e) # 2) 如果已经有 default route,直接返回 if _has_default_route(): _dlog("default route already exists") return True # 3) 尝试在 usb0/usb1 上猜测网关并加默认路由 for ifname in prefer_if: ip = _get_if_ipv4(ifname) if not ip: continue parts = ip.split(".") if len(parts) != 4: continue base = ".".join(parts[:3]) last = int(parts[3]) # 常见网关候选:.2 / .254 / .1(跳过自己) candidates = [] if last != 2: candidates.append(f"{base}.2") if last != 254: candidates.append(f"{base}.254") if last != 1: candidates.append(f"{base}.1") for gw in candidates: if gw == ip: continue # ping 一下让 ARP/neigh 建立(不一定通,但很多系统会因此学到邻居) _ping_once(gw, ifname=ifname, timeout_s=1) # 直接尝试加默认路由(若已存在会失败但无害) os.system(f"ip route add default via {gw} dev {ifname} metric {metric} 2>/dev/null") if _has_default_route(): _dlog("default route set:", gw, "dev", ifname) _dlog(os.popen("ip route 2>/dev/null").read().strip()) return True _dlog("failed to set default route. ip route:", os.popen("ip route 2>/dev/null").read().strip()) return False def is_server_reachable(host, port=80, timeout=5): """检查目标主机端口是否可达(用于 OTA 前网络检测)""" try: addr_info = socket.getaddrinfo(host, port)[0] s = socket.socket(addr_info[0], addr_info[1], addr_info[2]) s.settimeout(timeout) s.connect(addr_info[-1]) s.close() return True except Exception as e: print(f"[NET] 无法连接 {host}:{port} - {e}") return False def apply_ota_and_reboot(ota_url=None): # TODO: remove this return after test # return True """ OTA 文件下载成功后:备份原 main.py -> 替换 main_tmp.py -> 重启设备 """ import shutil main_py = "/maixapp/apps/t11/main.py" main_tmp = "/maixapp/apps/t11/main_tmp.py" main_bak = "/maixapp/apps/t11/main.py.bak" ota_pending = "/maixapp/apps/t11/ota_pending.json" try: # 1. 检查下载的文件是否存在 if not os.path.exists(main_tmp): print(f"[OTA] 错误:{main_tmp} 不存在") return False # 2. 备份原 main.py(如果存在) if os.path.exists(main_py): try: shutil.copy2(main_py, main_bak) print(f"[OTA] 已备份 {main_py} -> {main_bak}") except Exception as e: print(f"[OTA] 备份失败: {e}") # 备份失败也继续(可能没有原文件) # 3. 替换:main_tmp.py -> main.py try: shutil.copy2(main_tmp, main_py) print(f"[OTA] 已替换 {main_tmp} -> {main_py}") # 确保写入磁盘 try: os.sync() # 如果系统支持 except: pass time.sleep_ms(500) # 额外等待确保写入完成 except Exception as e: print(f"[OTA] 替换失败: {e}") return False # 3.5 写入 pending(用于重启后确认成功并上报) try: pending_obj = { "ts": int(time.time()) if hasattr(time, "time") else 0, "url": ota_url or "", "tmp": main_tmp, "main": main_py, "bak": main_bak, } with open(ota_pending, "w", encoding="utf-8") as f: json.dump(pending_obj, f) try: os.sync() except: pass except Exception as e: print(f"[OTA] 写入 ota_pending 失败: {e}") # 4. 通知服务器(可选,但重启前发一次) safe_enqueue({"result": "ota_applied_rebooting"}, 2) time.sleep_ms(1000) # 给一点时间让消息发出 # 5. 重启设备 print("[OTA] 准备重启设备...") os.system("reboot") # MaixPy 通常是这个命令 return True except Exception as e: print(f"[OTA] apply_ota_and_reboot 异常: {e}") return False def direct_ota_download(ota_url): """ 直接执行 OTA 下载(假设已有网络) 用于 cmd=7 / 或 wifi 模式 """ global update_thread_started try: if not ota_url: safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2) return from urllib.parse import urlparse parsed_url = urlparse(ota_url) host = parsed_url.hostname port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80) if not is_server_reachable(host, port, timeout=8): safe_enqueue({"result": "ota_failed", "reason": f"无法连接 {host}:{port}"}, 2) return print(f"[OTA] 开始下载: {ota_url}") # from ota import download_file result_msg = download_file(ota_url, local_filename) print(f"[OTA] {result_msg}") # 检查是否下载成功(包含"成功"或"下载成功"关键字) if "成功" in result_msg or "下载成功" in result_msg: # 下载成功:备份+替换+重启 if apply_ota_and_reboot(ota_url): return # 会重启,不会执行到 finally else: safe_enqueue({"result": result_msg}, 2) except Exception as e: error_msg = f"OTA 异常: {str(e)}" print(error_msg) safe_enqueue({"result": "ota_failed", "reason": error_msg}, 2) finally: update_thread_started = False def handle_wifi_and_update(ssid, password, ota_url): """在子线程中执行 Wi-Fi 连接 + OTA 更新流程""" global update_thread_started try: ip, error = connect_wifi(ssid, password) if error: safe_enqueue({"result": "wifi_failed", "error": error}, 2) return safe_enqueue({"result": "wifi_connected", "ip": ip}, 2) # 下载 if not ota_url: safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2) return from urllib.parse import urlparse parsed_url = urlparse(ota_url) host = parsed_url.hostname port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80) if not is_server_reachable(host, port, timeout=8): err_msg = f"网络不通:无法连接 {host}:{port}" safe_enqueue({"result": err_msg}, 2) return print(f"[NET] 已确认可访问 {host}:{port},开始下载...") result = download_file(ota_url, local_filename) print(result) # 检查是否下载成功(包含"成功"或"下载成功"关键字) if "成功" in result or "下载成功" in result: # 下载成功:备份+替换+重启 if apply_ota_and_reboot(ota_url): return # 会重启,不会执行到 finally else: safe_enqueue({"result": result}, 2) finally: update_thread_started = False print("[UPDATE] 更新线程执行完毕,即将退出。") def connect_wifi(ssid, password): """ 连接 Wi-Fi 并将凭证持久化保存到 /boot/ 目录, 以便设备重启后自动连接。 """ conf_path = "/etc/wpa_supplicant.conf" ssid_file = "/boot/wifi.ssid" pass_file = "/boot/wifi.pass" try: # 生成 wpa_supplicant 配置 net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read() if "network={" not in net_conf: return None, "Failed to generate wpa config" # 写入运行时配置 with open(conf_path, "w") as f: f.write("ctrl_interface=/var/run/wpa_supplicant\n") f.write("update_config=1\n\n") f.write(net_conf) # 持久化保存 SSID/PASS(关键!) with open(ssid_file, "w") as f: f.write(ssid.strip()) with open(pass_file, "w") as f: f.write(password.strip()) # 重启 Wi-Fi 服务 os.system("/etc/init.d/S30wifi restart") # 等待获取 IP for _ in range(20): ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() if ip: return ip, None time.sleep(1) return None, "Timeout: No IP obtained" except Exception as e: return None, f"Exception: {str(e)}" def read_device_id(): """从 /device_key 文件读取设备唯一 ID,失败则使用默认值""" try: with open("/device_key", "r") as f: device_id = f.read().strip() if device_id: print(f"[INFO] 从 /device_key 读取到 DEVICE_ID: {device_id}") return device_id except Exception as e: print(f"[ERROR] 无法读取 /device_key: {e}") return "DEFAULT_DEVICE_ID" def safe_enqueue(data_dict, msg_type=2, high=False): """线程安全地将消息加入 TCP 发送队列(支持优先级)""" global queue_lock, high_send_queue, normal_send_queue item = (msg_type, data_dict) with queue_lock: if high: high_send_queue.append(item) else: normal_send_queue.append(item) def at(cmd, wait="OK", timeout=2000): """向 4G 模块发送 AT 指令并等待响应""" # 统一由 ATClient 负责读 uart4g,避免多线程抢读 return at_client.send(cmd, wait, timeout) def make_packet(msg_type: int, body_dict: dict) -> bytes: """打包 TCP 数据包:头部(长度+类型+校验)+ JSON 正文""" body = json.dumps(body_dict).encode() body_len = len(body) checksum = body_len + msg_type header = struct.pack(">III", body_len, msg_type, checksum) return header + body def parse_packet(data: bytes): """解析 TCP 数据包,返回 (类型, 正文字典)""" if len(data) < 12: return None, None body_len, msg_type, checksum = struct.unpack(">III", data[:12]) body = data[12:12 + body_len] try: return msg_type, json.loads(body.decode()) except: return msg_type, {"raw": body.decode(errors="ignore")} def tcp_send_raw(data: bytes, max_retries=2) -> bool: global tcp_connected if not tcp_connected: return False with uart4g_lock: for _ in range(max_retries): cmd = f'AT+MIPSEND=0,{len(data)}' if ">" not in at(cmd, ">", 2000): time.sleep_ms(50) continue # 关键:确保把 data 全部写出去 total = 0 while total < len(data): n = uart4g.write(data[total:]) if not n or n < 0: time.sleep_ms(1) continue total += n # 关键:再发结束符(不算进 payload) uart4g.write(b"\x1A") # 等发送完成确认(不同固件可能是 SEND OK / OK / +MIPSEND) r = at("", "OK", 8000) if ("SEND OK" in r) or ("OK" in r) or ("+MIPSEND" in r): return True time.sleep_ms(50) return False def generate_token(device_id): """生成用于 HTTP 接口鉴权的 Token(HMAC-SHA256)""" SALT = "shootMessageFire" SALT2 = "shoot" return "Arrow_" + hmac.new((SALT + device_id).encode(), SALT2.encode(), hashlib.sha256).hexdigest() def send_http_cmd(cmd_str, timeout_ms=3000): """发送 HTTP 相关 AT 指令(调试用)""" print("[HTTP AT] =>", cmd_str) return at(cmd_str, "OK", timeout_ms) def read_http_response(timeout_ms=5000): """读取并打印 HTTP 响应(用于调试)""" # 仅保留占位:UART 读取由 ATClient 独占;如需调试,请从 ATClient 的 http_events 中取。 time.sleep_ms(timeout_ms) def upload_shoot_event(json_data): """通过 4G 模块上报射击事件到 HTTP 接口(备用通道)""" token = generate_token(DEVICE_ID) if not send_http_cmd(f'AT+MHTTPCREATE="{URL}"'): return False instance_id = 0 send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"Content-Type: application/json"') send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"Authorization: {token}"') send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"DeviceId: {DEVICE_ID}"') json_str = ujson.dumps(json_data) if not send_http_cmd(f'AT+MHTTPCONTENT={instance_id},0,0,"{json_str}"'): return False if send_http_cmd(f'AT+MHTTPREQUEST={instance_id},2,0,"{API_PATH}"'): read_http_response() return True return False def load_laser_point(): """从配置文件加载激光中心点,失败则使用默认值""" global laser_point try: if "laser_config.json" in os.listdir("/root"): with open(CONFIG_FILE, "r") as f: data = json.load(f) if isinstance(data, list) and len(data) == 2: laser_point = (int(data[0]), int(data[1])) print(f"[INFO] 加载激光点: {laser_point}") else: raise ValueError else: laser_point = DEFAULT_POINT except: laser_point = DEFAULT_POINT def save_laser_point(point): """保存激光中心点到配置文件""" global laser_point try: with open(CONFIG_FILE, "w") as f: json.dump([point[0], point[1]], f) laser_point = point except: pass def turn_on_laser(): """发送指令开启激光,并读取回包(部分模块支持)""" distance_serial.write(LASER_ON_CMD) time.sleep_ms(10) resp = distance_serial.read(20) if resp: if resp == LASER_ON_CMD: print("✅ 激光指令已确认") else: print("🔇 无回包(正常或模块不支持)") return resp def flash_laser(duration_ms=1000): """闪一下激光(用于射箭反馈)""" try: distance_serial.write(LASER_ON_CMD) time.sleep_ms(duration_ms) distance_serial.write(LASER_OFF_CMD) except Exception as e: print(f"闪激光失败: {e}") def find_red_laser(frame, threshold=150): """在图像中查找最亮的红色激光点(基于 RGB 阈值)""" w, h = frame.width(), frame.height() img_bytes = frame.to_bytes() max_sum = 0 best_pos = None for y in range(0, h, 2): for x in range(0, w, 2): idx = (y * w + x) * 3 r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2] if r > threshold and r > g * 2 and r > b * 2: rgb_sum = r + g + b if rgb_sum > max_sum: max_sum = rgb_sum best_pos = (x, y) return best_pos def calibrate_laser_position(): """执行一次激光校准:拍照 → 找红点 → 保存坐标""" global laser_x, laser_y time.sleep_ms(80) cam = camera.Camera(640, 480) frame = cam.read() pos = find_red_laser(frame) if pos: laser_x, laser_y = pos save_laser_point(pos) return pos return None # ==================== 电源管理(INA226) ==================== def write_register(reg, value): data = [(value >> 8) & 0xFF, value & 0xFF] bus.writeto_mem(INA226_ADDR, reg, bytes(data)) def read_register(reg): data = bus.readfrom_mem(INA226_ADDR, reg, 2) return (data[0] << 8) | data[1] def init_ina226(): """初始化 INA226 芯片:配置模式 + 校准值""" write_register(REG_CONFIGURATION, 0x4527) write_register(REG_CALIBRATION, CALIBRATION_VALUE) def get_bus_voltage(): """读取总线电压(单位:V)""" raw = read_register(REG_BUS_VOLTAGE) return raw * 1.25 / 1000 def voltage_to_percent(voltage): """根据电压估算电池百分比(查表插值)""" points = [ (4.20, 100), (4.10, 95), (4.05, 85), (4.00, 75), (3.95, 65), (3.90, 55), (3.85, 45), (3.80, 35), (3.75, 25), (3.70, 15), (3.65, 5), (3.60, 0) ] if voltage >= points[0][0]: return 100 if voltage <= points[-1][0]: return 0 for i in range(len(points) - 1): v1, p1 = points[i] v2, p2 = points[i + 1] if voltage >= v2: ratio = (voltage - v1) / (v2 - v1) percent = p1 + (p2 - p1) * ratio return max(0, min(100, int(round(percent)))) return 0 # ==================== 靶心检测与距离计算 ==================== def detect_circle(frame): """检测图像中的靶心(优先清晰轮廓,其次黄色区域)""" global REAL_RADIUS_CM img_cv = image.image2cv(frame, False, False) gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY) blurred = cv2.GaussianBlur(gray, (5, 5), 0) edged = cv2.Canny(blurred, 50, 150) kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) ceroded = cv2.erode(cv2.dilate(edged, kernel), kernel) contours, _ = cv2.findContours(ceroded, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) best_center = best_radius = best_radius1 = method = None # 方法1:基于轮廓拟合椭圆(清晰靶心) for cnt in contours: area = cv2.contourArea(cnt) perimeter = cv2.arcLength(cnt, True) if perimeter < 100 or area < 100: continue circularity = 4 * np.pi * area / (perimeter ** 2) if circularity > 0.75 and len(cnt) >= 5: center, axes, angle = cv2.fitEllipse(cnt) radius = (axes[0] + axes[1]) / 4 best_center = (int(center[0]), int(center[1])) best_radius = int(radius) best_radius1 = best_radius REAL_RADIUS_CM = 15 method = "清晰" break # 方法2:基于 HSV 黄色掩码(模糊靶心) if not best_center: hsv = cv2.cvtColor(img_cv, cv2.COLOR_BGR2HSV) h, s, v = cv2.split(hsv) s = np.clip(s * 2, 0, 255).astype(np.uint8) hsv = cv2.merge((h, s, v)) lower_yellow = np.array([7, 80, 0]) upper_yellow = np.array([32, 255, 182]) mask = cv2.inRange(hsv, lower_yellow, upper_yellow) kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel) mask = cv2.morphologyEx(mask, cv2.MORPH_DILATE, kernel) contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if contours: largest = max(contours, key=cv2.contourArea) if cv2.contourArea(largest) > 50: (x, y), radius = cv2.minEnclosingCircle(largest) best_center = (int(x), int(y)) best_radius = int(radius) best_radius1 = best_radius REAL_RADIUS_CM = 15 method = "模糊" result_img = image.cv2image(img_cv, False, False) return result_img, best_center, best_radius, method, best_radius1 def estimate_distance(pixel_radius): """根据像素半径估算实际距离(单位:米)""" if not pixel_radius: return 0.0 return (REAL_RADIUS_CM * FOCAL_LENGTH_PIX) / pixel_radius / 100.0 def compute_laser_position(circle_center, laser_point, radius, method): """计算激光相对于靶心的偏移量(单位:厘米)""" if not all([circle_center, radius, method]): return None, None cx, cy = circle_center lx, ly = laser_point # 根据检测方法动态调整靶心物理半径(简化模型) circle_r = (radius / 4.0) * 20.0 if method == "模糊" else (68 / 16.0) * 20.0 dx = lx - cx dy = ly - cy return dx / (circle_r / 100.0), -dy / (circle_r / 100.0) # ==================== TCP 通信线程 ==================== def connect_server(): """通过 4G 模块建立 TCP 连接""" global tcp_connected if tcp_connected: return True print("连接到服务器...") with uart4g_lock: at("AT+MIPCLOSE=0", "OK", 1000) res = at(f'AT+MIPOPEN=0,"TCP","{SERVER_IP}",{SERVER_PORT}', "+MIPOPEN", 8000) if "+MIPOPEN: 0,0" in res: tcp_connected = True return True return False raw_line_data = [] def tcp_main(): """TCP 主通信循环:登录、心跳、处理指令、发送数据""" global tcp_connected, high_send_queue, normal_send_queue, queue_lock, laser_calibration_active, laser_calibration_result, laser_calibration_lock, update_thread_started send_hartbeat_fail_count = 0 while not app.need_exit(): # OTA 期间不要 connect/登录/心跳/发送,避免与 HTTP URC 下载抢 uart4g_lock 导致心跳超时被服务器断开 try: if ota_in_progress: time.sleep_ms(200) continue except: pass if not connect_server(): time.sleep_ms(5000) continue # 发送登录包 login_data = {"deviceId": DEVICE_ID, "password": PASSWORD, "version": app_version} if not tcp_send_raw(make_packet(1, login_data)): tcp_connected = False time.sleep_ms(2000) continue print("➡️ 登录包已发送,等待确认...") logged_in = False last_heartbeat_ack_time = time.ticks_ms() last_heartbeat_send_time = time.ticks_ms() while True: # OTA 期间暂停 TCP 活动(不发心跳、不发业务),让下载独占 4G 串口 try: if ota_in_progress: time.sleep_ms(200) continue except: pass # 接收数据(唯一来源:ATClient 解析后的 TCP payload 队列) item = at_client.pop_tcp_payload() if item: # item 可能是 (link_id, payload) 或直接 payload(兼容旧队列格式) if isinstance(item, tuple) and len(item) == 2: link_id, payload = item else: link_id, payload = 0, item # 登录阶段加一条轻量 debug,确认 ACK 是否进入队列 if not logged_in: try: print(f"[TCP] rx link={link_id} len={len(payload)} head={payload[:12].hex()}") except: pass msg_type, body = parse_packet(payload) # 处理登录响应 if not logged_in and msg_type == 1: if body and body.get("cmd") == 1 and body.get("data") == "登录成功": logged_in = True last_heartbeat_ack_time = time.ticks_ms() print("✅ 登录成功") # 若存在 ota_pending.json,说明上次 OTA 已应用并重启; # 这里以“能成功登录服务器”为 OTA 成功判据:上报 ota_ok 并删除 pending,确保只上报一次。 try: pending_path = "/maixapp/apps/t11/ota_pending.json" if os.path.exists(pending_path): try: with open(pending_path, "r", encoding="utf-8") as f: pending_obj = json.load(f) except: pending_obj = {} safe_enqueue({"result": "ota_ok", "url": pending_obj.get("url", "")}, 2) try: os.remove(pending_path) except: pass except Exception as e: print(f"[OTA] ota_ok 上报失败: {e}") else: # 登录失败,跳出重连 break # 处理心跳 ACK elif logged_in and msg_type == 4: last_heartbeat_ack_time = time.ticks_ms() print("✅ 收到心跳确认") elif logged_in and msg_type == 40: if isinstance(body, dict): t = body.get('t', 0) v = body.get('v') raw_line_data.append(body) if len(raw_line_data) >= int(t): print(f"下载完成") stock_array = list(map(lambda x: x.get('d'), raw_line_data)) with open(local_filename, 'w', encoding='utf-8') as file: file.write("\n".join(stock_array)) apply_ota_and_reboot() else: safe_enqueue({'data':{'l': len(raw_line_data), 'v': v}, 'cmd': 41}) print(f"已下载{len(raw_line_data)} 全部:{t} 版本:{v}") # 处理业务指令 elif logged_in and isinstance(body, dict): # 重要:每个包都要重新解析 inner_cmd,避免上一次的 cmd “粘住”导致反复执行 inner_cmd = None data_obj = body.get("data") if isinstance(data_obj, dict): inner_cmd = data_obj.get("cmd") if inner_cmd == 2: # 开启激光并校准 # 幂等:正在校准则不重复触发(服务器可能重发 cmd=2) if not laser_calibration_active: turn_on_laser() time.sleep_ms(100) laser_calibration_active = True safe_enqueue({"result": "calibrating"}, 2) elif inner_cmd == 3: # 关闭激光 distance_serial.write(LASER_OFF_CMD) laser_calibration_active = False safe_enqueue({"result": "laser_off"}, 2) elif inner_cmd == 4: # 上报电量 voltage = get_bus_voltage() battery_percent = voltage_to_percent(voltage) battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)} safe_enqueue(battery_data, 2) print(f"🔋 电量上报: {battery_percent}%") elif inner_cmd == 5: # OTA 升级(含 Wi-Fi 配置,及4g) inner_data = data_obj.get("data", {}) if isinstance(data_obj, dict) else {} ssid = inner_data.get("ssid") password = inner_data.get("password") ota_url = inner_data.get("url") mode = (inner_data.get("mode") or "").strip().lower() # "4g"/"wifi"/"" if not ota_url: print("ota missing_url") safe_enqueue({"result": "missing_url"}, 2) continue # 自动判断:mode 非法/为空时,优先 Wi-Fi(如果已连),否则 4G if mode not in ("4g", "wifi"): print("ota missing mode") mode = "wifi" if is_wifi_connected() else "4g" if update_thread_started: safe_enqueue({"result": "update_already_started"}, 2) continue update_thread_started = True if mode == "4g": _thread.start_new_thread(direct_ota_download_via_4g, (ota_url,)) else: # wifi 模式:需要 ssid/password if not ssid or not password: update_thread_started = False safe_enqueue({"result": "missing_ssid_or_password"}, 2) else: _thread.start_new_thread(handle_wifi_and_update, (ssid, password, ota_url)) elif inner_cmd == 6: try: ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() ip = ip if ip else "no_ip" except: ip = "error_getting_ip" safe_enqueue({"result": "current_ip", "ip": ip}, 2) elif inner_cmd == 7: # global update_thread_started if update_thread_started: safe_enqueue({"result": "update_already_started"}, 2) continue # 实时检查是否有 IP try: ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() except: ip = None if not ip: safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, MSG_TYPE_STATUS) else: # 启动纯下载线程 update_thread_started = True _thread.start_new_thread(direct_ota_download, ()) else: # 非指令包(或未携带 cmd),不做任何动作 pass else: time.sleep_ms(5) # 发送队列中的业务数据 if logged_in and (high_send_queue or normal_send_queue): # 只在锁内取出一个待发包,发送放到锁外,避免长时间占用队列锁 msg_type = None data_dict = None if queue_lock.try_acquire(): try: if high_send_queue: msg_type, data_dict = high_send_queue.pop(0) elif normal_send_queue: msg_type, data_dict = normal_send_queue.pop(0) finally: queue_lock.release() if msg_type is not None and data_dict is not None: pkt = make_packet(msg_type, data_dict) if not tcp_send_raw(pkt): tcp_connected = False break # 发送激光校准结果 if logged_in and laser_calibration_result is not None: x = y = None with laser_calibration_lock: if laser_calibration_result is not None: x, y = laser_calibration_result laser_calibration_result = None if x is not None and y is not None: safe_enqueue({"result": "ok", "x": x, "y": y}, 2) # 定期发送心跳 current_time = time.ticks_ms() if logged_in and current_time - last_heartbeat_send_time > HEARTBEAT_INTERVAL * 1000: vol_val = get_bus_voltage() if not tcp_send_raw(make_packet(4, {"vol":vol_val, "vol_per":voltage_to_percent(vol_val)})): print("💔 心跳发送失败") send_hartbeat_fail_count += 1 if send_hartbeat_fail_count >= 3: send_hartbeat_fail_count = 0 print("连续3次发送心跳失败,重连") break else: continue else: send_hartbeat_fail_count = 0 last_heartbeat_send_time = current_time print("💓 心跳已发送") # 心跳超时重连 if logged_in and current_time - last_heartbeat_ack_time > 1000*60*10: # 十分钟 print("⏰ 十分钟无心跳ACK,重连") break time.sleep_ms(50) tcp_connected = False print("🔌 连接异常,2秒后重连...") time.sleep_ms(2000) def laser_calibration_worker(): """后台线程:持续检测是否需要执行激光校准""" global laser_calibration_active, laser_calibration_result, laser_calibration_lock while True: # OTA 期间尽量省电:暂停后台校准(会占用 Camera) try: if int(globals().get("ota_in_progress", 0)) > 0: time.sleep_ms(200) continue except: pass if laser_calibration_active: # 关键:不要在每次尝试里反复 new Camera(会导致 MMF 反复初始化刷屏) cam = None try: cam = camera.Camera(640, 480) start = time.ticks_ms() timeout_ms = 8000 # 8 秒内找不到红点就退出一次,避免一直占用资源 while laser_calibration_active and time.ticks_diff(time.ticks_ms(), start) < timeout_ms: frame = cam.read() pos = find_red_laser(frame) if pos: with laser_calibration_lock: laser_calibration_result = pos laser_calibration_active = False save_laser_point(pos) print(f"✅ 后台校准成功: {pos}") break time.sleep_ms(60) except Exception as e: # 出错时也不要死循环刷屏 print(f"[LASER] calibration error: {e}") time.sleep_ms(200) finally: try: # 释放摄像头资源(MaixPy 通常靠 GC,但显式 del 更稳) if cam is not None: del cam except: pass # 如果超时仍未成功,稍微休息一下再允许下一次 cmd=2 触发 if laser_calibration_active: time.sleep_ms(300) else: time.sleep_ms(50) def download_file_via_4g_legacy(url, filename, total_timeout_ms=30000, retries=3, debug=False): """ ML307R HTTP 下载(URC content 分片模式) - 重试:empty/incomplete/AT错误都会重试 - 超时:total_timeout_ms - 校验:Content-Length 必须填满;如有 Content-Md5 且 hashlib 可用则校验 MD5 - 日志:默认干净;debug=True 才打印 URC 进度 """ from urllib.parse import urlparse parsed = urlparse(url) host = parsed.hostname path = parsed.path or "/" base_url = f"http://{host}" # 你已验证 HTTP 可 200;如需 https 需另配 SSL def _log(*args): if debug: print(*args) def _merge_ranges(ranges_iter): """合并重叠/相邻区间,返回 merged(list[(s,e)])(半开区间)""" rs = sorted(ranges_iter) merged = [] for s, e in rs: if e <= s: continue if merged and s <= merged[-1][1]: merged[-1] = (merged[-1][0], max(merged[-1][1], e)) else: merged.append((s, e)) return merged def _compute_gaps(total_len, got_ranges): """根据已填充区间计算缺口(半开区间)""" if not total_len or total_len <= 0: return [(0, 0)] merged = _merge_ranges(got_ranges) gaps = [] prev = 0 for s, e in merged: if s > prev: gaps.append((prev, s)) prev = max(prev, e) if prev < total_len: gaps.append((prev, total_len)) return gaps, merged def _extract_content_range(hdr_text: str): """ Content-Range: bytes -/ 返回 (start, end, total);解析失败返回 (None,None,None) """ m = re.search(r"Content-Range:\s*bytes\s*(\d+)\s*-\s*(\d+)\s*/\s*(\d+)", hdr_text, re.IGNORECASE) if not m: return None, None, None try: return int(m.group(1)), int(m.group(2)), int(m.group(3)) except: return None, None, None def _get_ip(): r = at("AT+CGPADDR=1", "OK", 3000) m = re.search(r'\+CGPADDR:\s*1,"([^"]+)"', r) return m.group(1) if m else "" def _clear_http_events(): # 清空旧的 HTTP URC 事件,避免串台 while at_client.pop_http_event() is not None: pass # 旧版基于直接 uart4g.read 的解析已迁移到 ATClient(单读者),保留函数占位避免大改动 def _parse_httpid(raw: bytes): m = re.search(rb"\+MHTTPCREATE:\s*(\d+)", raw) return int(m.group(1)) if m else None # _try_parse_header/_try_parse_one_content 已由 ATClient 在 reader 线程中解析并推送事件 # _try_parse_one_content 已由 ATClient 解析 def _extract_hdr_fields(hdr_text: str): # Content-Length mlen = re.search(r"Content-Length:\s*(\d+)", hdr_text, re.IGNORECASE) clen = int(mlen.group(1)) if mlen else None # Content-Md5 (base64) mmd5 = re.search(r"Content-Md5:\s*([A-Za-z0-9+/=]+)", hdr_text, re.IGNORECASE) md5_b64 = mmd5.group(1).strip() if mmd5 else None return clen, md5_b64 def _md5_base64(data: bytes) -> str: if hashlib is None: return "" digest = hashlib.md5(data).digest() # base64: 24 chars with == return binascii.b2a_base64(digest).decode().strip() def _one_attempt(range_start=None, range_end=None, body_buf=None, got_ranges=None, total_len=None, expect_md5_b64=None): # 0) PDP:确保有 IP(避免把 OK 当成功) ip = _get_ip() if not ip or ip == "0.0.0.0": at("AT+MIPCALL=1,1", "OK", 15000) for _ in range(10): ip = _get_ip() if ip and ip != "0.0.0.0": break time.sleep(1) if not ip or ip == "0.0.0.0": return False, "PDP not ready (no_ip)", body_buf, got_ranges, total_len, expect_md5_b64 # 1) 清理旧实例 + 清空旧 HTTP 事件 for i in range(0, 6): at(f"AT+MHTTPDEL={i}", "OK", 1500) _clear_http_events() # 2) 创建实例(用 at() 等待返回) create_resp = at(f'AT+MHTTPCREATE="{base_url}"', "OK", 8000) httpid = _parse_httpid(create_resp.encode()) if httpid is None: return False, "MHTTPCREATE failed (no httpid)", body_buf, got_ranges, total_len, expect_md5_b64 # 2.5) Range 补洞:按缺口请求指定字节段(HTTP Range 右端是 inclusive) if range_start is not None and range_end is not None: # 每次请求使用新 httpid,避免 header 累积/污染 at(f'AT+MHTTPCFG="header",{httpid},"Range: bytes={int(range_start)}-{int(range_end)}"', "OK", 3000) # 3) 发 GET(HTTP URC 由 ATClient 解析并入队) req_resp = at(f'AT+MHTTPREQUEST={httpid},1,0,"{path}"', "OK", 15000) if "ERROR" in req_resp or "CME ERROR" in req_resp: at(f"AT+MHTTPDEL={httpid}", "OK", 3000) return False, f"MHTTPREQUEST failed: {req_resp}", body_buf, got_ranges, total_len, expect_md5_b64 # 4) 从 ATClient 的 http_events 队列收 header/content urc_id = None status_code = None expect_len = None # 若是 Range 响应(206),需要把响应内的偏移映射到“全文件”偏移 offset_base = 0 # got_ranges 记录“真实写入 body_buf 的半开区间” if got_ranges is None: got_ranges = set() filled_new_bytes = 0 # last_sum/resp_total 用于判断“本次 HTTP 响应体”是否接收完成(尤其是 Range 场景) last_sum = 0 resp_total = None no_progress_count = 0 # 连续没有进展的次数 last_print_ms = time.ticks_ms() last_print_sum = 0 # Range 补洞不需要等太久,避免卡死;全量下载用总超时 attempt_timeout_ms = total_timeout_ms if range_start is not None and range_end is not None: attempt_timeout_ms = min(total_timeout_ms, 8000) t0 = time.ticks_ms() while time.ticks_ms() - t0 < attempt_timeout_ms: ev = at_client.pop_http_event() if not ev: # 如果 sum 已经达到 total_len,但仍有 gaps,等待更长时间(有些分片可能延迟到达) # 对 Range:last_sum 只会到 resp_total(比如 686/774),不能拿 total_len(59776) 比 if resp_total and last_sum >= resp_total: # 本次响应体应该收齐了,继续等一小会儿(防止最后一个 URC 延迟),然后退出循环 time.sleep_ms(30) no_progress_count += 1 if no_progress_count > 30: break continue # 全量模式:如果模块宣称 sum 已经达到 total_len,但仍有 gaps,稍微多等 if (range_start is None and range_end is None) and total_len and last_sum >= total_len: gaps_now, merged_now = _compute_gaps(total_len, got_ranges) if gaps_now and not (len(gaps_now) == 1 and gaps_now[0] == (0, 0)): time.sleep_ms(50) else: time.sleep_ms(5) else: time.sleep_ms(5) no_progress_count += 1 # Range:如果长时间没有事件也结束(让上层重试) if range_start is not None and range_end is not None and no_progress_count > 200: break # 全量:如果长时间没有新事件,且 sum 已经达到 total_len,认为接收完成(可能有丢包) if no_progress_count > 100 and total_len and last_sum >= total_len: break continue no_progress_count = 0 # 有事件,重置计数器 if ev[0] == "header": _, hid, code, hdr_text = ev if urc_id is None: urc_id = hid if hid != urc_id: continue status_code = code expect_len, md5_b64 = _extract_hdr_fields(hdr_text) # 只在“首次全量 header”里保留 Content-Md5;Range 响应通常不带该字段 if md5_b64: expect_md5_b64 = md5_b64 cr_s, cr_e, cr_total = _extract_content_range(hdr_text) if cr_s is not None and cr_total is not None: # 206 Partial Content offset_base = cr_s # Content-Range end 是 inclusive;总长度以 total 为准 if total_len is None: total_len = cr_total elif total_len != cr_total: _log(f"[WARN] total_len changed {total_len}->{cr_total}") total_len = cr_total if body_buf is None and total_len: body_buf = bytearray(total_len) # 对 Range 响应:优先使用 Content-Length 作为本次响应体长度 if expect_len is not None: resp_total = expect_len _log(f"[HDR] id={hid} code={code} len={expect_len} md5={expect_md5_b64}") continue if ev[0] == "content": _, cid, _total, _sum, _cur, payload = ev if urc_id is None: urc_id = cid if cid != urc_id: continue # 全量 200:这里的 _total 就是全文件长度;Range 206:_total 可能只是“本次响应体长度” if body_buf is None: # 如果 header 没解析出 Content-Range,总长度用 content 的 _total if total_len is None: total_len = _total if total_len: body_buf = bytearray(total_len) if body_buf is None or total_len is None: continue # 若 header 没给 Content-Length,就用 content 的 _total 作为本次响应体长度(Range 场景下通常是这次 body 的长度) if resp_total is None: resp_total = _total rel_start = _sum - _cur rel_end = _sum abs_start = offset_base + rel_start abs_end = offset_base + rel_end if abs_start < 0 or abs_start >= total_len: continue if abs_end < abs_start: continue if abs_end > total_len: abs_end = total_len expected_span = abs_end - abs_start actual_len = min(len(payload), expected_span) if actual_len <= 0: continue # 写入并记录“实际写入区间”,用于 gap 计算 body_buf[abs_start:abs_start + actual_len] = payload[:actual_len] got_ranges.add((abs_start, abs_start + actual_len)) filled_new_bytes += actual_len # 记录最大的 sum 值,用于判断是否所有数据都已发送 if _sum > last_sum: last_sum = _sum # debug 输出节流:每 ~8000 字节或 >=500ms 输出一次,避免 print 导致 UART 丢包 if debug: now = time.ticks_ms() if (time.ticks_diff(now, last_print_ms) >= 500) or (_sum - last_print_sum >= 8000) or (rel_end == _total): _log(f"[URC] {abs_start}:{abs_start+actual_len} sum={_sum}/{_total} base={offset_base} +{filled_new_bytes}") last_print_ms = now last_print_sum = _sum # 若是全量请求(offset_base=0 且 total_len==_total),尽早结束 if offset_base == 0 and total_len == _total: # 不要用 filled_new_bytes 判断是否完整(可能有重叠) pass # Range:本次响应体已收齐,退出,交给上层判断是否补上了缺口 if resp_total is not None and last_sum >= resp_total: # 给一点时间让可能的尾部事件入队,然后退出 time.sleep_ms(10) break # 5) 清理实例 at(f"AT+MHTTPDEL={httpid}", "OK", 3000) if body_buf is None: return False, "empty_body", body_buf, got_ranges, total_len, expect_md5_b64 if total_len is None: return False, "no_total_len", body_buf, got_ranges, total_len, expect_md5_b64 # 返回“本次尝试是否有实质进展”:Range 补洞时,哪怕不完整也算成功推进 if filled_new_bytes <= 0: return False, "no_progress", body_buf, got_ranges, total_len, expect_md5_b64 return True, f"PARTIAL ok +{filled_new_bytes} ip={ip} code={status_code}", body_buf, got_ranges, total_len, expect_md5_b64 global ota_in_progress try: ota_in_progress = int(ota_in_progress) + 1 except: ota_in_progress = 1 with uart4g_lock: try: # -------- Phase 1: 全量 GET(允许不完整,后面用 Range 补洞)-------- body_buf = None got_ranges = set() total_len = None expect_md5_b64 = None last_err = "unknown" for attempt in range(1, retries + 1): ok, msg, body_buf, got_ranges, total_len, expect_md5_b64 = _one_attempt( body_buf=body_buf, got_ranges=got_ranges, total_len=total_len, expect_md5_b64=expect_md5_b64 ) last_err = msg if not ok: _log(f"[RETRY] full attempt={attempt} failed={msg}") time.sleep_ms(200) continue gaps, merged = _compute_gaps(total_len, got_ranges) filled_total = sum(e - s for s, e in merged) if gaps and gaps[0] == (0, 0): gaps = [] if not gaps: break _log(f"[GAPS] after full attempt={attempt} filled={filled_total}/{total_len} gaps={gaps[:3]}") time.sleep_ms(150) if body_buf is None or total_len is None: return False, f"FAILED: {last_err}" gaps, merged = _compute_gaps(total_len, got_ranges) if gaps and gaps[0] == (0, 0): gaps = [] # -------- Phase 2: Range 补洞 -------- # 说明: # - “全量 GET 多次重试 + 合并已收到分片”我们已经在 Phase1 做了(got_ranges/body_buf 会跨 attempt 累积)。 # - 仍存在 gaps 说明:这些字节段在全量阶段始终没收到,需要靠 Range 反复补洞。 # # 策略: # - Range 分块更小(更稳),失败时继续“二分缩小”到 MIN_RANGE_BYTES; # - 不要因为某一轮 no_progress 就立刻退出(UART 偶发丢 URC,需要多轮撞上一次成功)。 MAX_RANGE_BYTES = 1024 MIN_RANGE_BYTES = 128 RANGE_RETRIES_EACH = 8 MAX_HOLE_ROUNDS = 50 NO_PROGRESS_ROUNDS_LIMIT = 8 round_i = 0 no_progress_rounds = 0 while gaps and round_i < MAX_HOLE_ROUNDS: round_i += 1 # 优先补最大的洞(通常只丢中间一两段) gaps = sorted(gaps, key=lambda g: g[1] - g[0], reverse=True) _log(f"[RANGE] round={round_i} gaps={gaps[:3]}") progress_any = False # 每轮最多补前 5 个洞,避免无限循环 for (gs, ge) in gaps[:5]: cur = gs chunk = MAX_RANGE_BYTES while cur < ge: sub_end = min(ge, cur + chunk) # HTTP Range end is inclusive rs = cur re_incl = sub_end - 1 before_gaps, before_merged = _compute_gaps(total_len, got_ranges) before_filled = sum(e - s for s, e in before_merged) sub_ok = False sub_err = "unknown" for k in range(1, RANGE_RETRIES_EACH + 1): ok2, msg2, body_buf, got_ranges, total_len, expect_md5_b64 = _one_attempt( range_start=rs, range_end=re_incl, body_buf=body_buf, got_ranges=got_ranges, total_len=total_len, expect_md5_b64=expect_md5_b64 ) sub_err = msg2 if ok2: sub_ok = True break _log(f"[RETRY] range {rs}-{re_incl} try={k} failed={msg2}") time.sleep_ms(150) after_gaps, after_merged = _compute_gaps(total_len, got_ranges) after_filled = sum(e - s for s, e in after_merged) if after_filled > before_filled: progress_any = True # 成功推进:恢复到较大 chunk,加快补洞 chunk = MAX_RANGE_BYTES cur = sub_end else: # 没推进:缩小 chunk,继续在同一位置重试;不要前进 cur if chunk > MIN_RANGE_BYTES: chunk = max(MIN_RANGE_BYTES, chunk // 2) _log(f"[RANGE] shrink chunk -> {chunk} at pos={cur}") else: # 已经很小还不行:本轮先放弃这个位置,留给下一轮再撞 if not sub_ok: _log(f"[WARN] range {rs}-{re_incl} failed={sub_err}") break # 小歇一下,给读线程喘息 time.sleep_ms(120) gaps, merged = _compute_gaps(total_len, got_ranges) if gaps and gaps[0] == (0, 0): gaps = [] filled_total = sum(e - s for s, e in merged) if not gaps: break if not progress_any: no_progress_rounds += 1 _log(f"[RANGE] no progress in round={round_i} ({no_progress_rounds}/{NO_PROGRESS_ROUNDS_LIMIT}) filled={filled_total}/{total_len}") # 多轮无进展才退出(避免偶发“只 header 无 content URC”导致过早退出) if no_progress_rounds >= NO_PROGRESS_ROUNDS_LIMIT: break # 退避等待一下再继续下一轮 time.sleep_ms(500) continue else: no_progress_rounds = 0 _log(f"[RANGE] round={round_i} filled={filled_total}/{total_len} gaps={gaps[:3]}") # 完整性检查 gaps, merged = _compute_gaps(total_len, got_ranges) if gaps and gaps[0] == (0, 0): gaps = [] filled_total = sum(e - s for s, e in merged) if gaps: return False, f"incomplete_body got={filled_total} expected={total_len} missing={total_len - filled_total} gaps={gaps[:5]}" data = bytes(body_buf) # 校验:Content-Md5(base64)(若有) if expect_md5_b64 and hashlib is not None: md5_b64 = _md5_base64(data) if md5_b64 != expect_md5_b64: return False, f"md5_mismatch got={md5_b64} expected={expect_md5_b64}" # 写文件(原样 bytes) with open(filename, "wb") as f: f.write(data) return True, f"OK size={len(data)} ip={_get_ip()} md5={expect_md5_b64 or ''}" finally: try: ota_in_progress = max(0, int(ota_in_progress) - 1) except: ota_in_progress = 0 def download_file_via_4g(url, filename, total_timeout_ms=600000, retries=3, debug=True): """ ML307R HTTP 下载(更稳的“固定小块 Range 顺序下载”): - 只依赖 +MHTTPURC:"header"/"content"(不依赖 MHTTPREAD/cached) - 每次只请求一个小块 Range(默认 1024B),失败就重试同一块,必要时缩小块大小 - 每个 chunk 都重新 MHTTPCREATE/MHTTPREQUEST,避免卡在“206 header 但不吐 content”的坏状态 """ from urllib.parse import urlparse # 小块策略(可按现场再调) # - chunk 越小越稳(URC 压力更小),代价是请求次数更多 CHUNK_MAX = 10240 CHUNK_MIN = 128 CHUNK_RETRIES = 12 FRAG_SIZE = 1024 # 0-1024 FRAG_DELAY = 10 # 0-2000 ms t_func0 = time.ticks_ms() parsed = urlparse(url) host = parsed.hostname path = parsed.path or "/" if not host: return False, "bad_url (no host)" # 很多 ML307R 的 MHTTP 对 https 不稳定;对已知域名做降级 if isinstance(url, str) and url.startswith("https://static.shelingxingqiu.com/"): base_url = "http://static.shelingxingqiu.com" else: base_url = f"http://{host}" def _log(*a): if debug: print(*a) def _pwr_log(prefix=""): """debug 用:输出电压/电量,用于判断是否掉压导致 4G/USB 异常""" if not debug: return try: v = get_bus_voltage() p = voltage_to_percent(v) print(f"[PWR]{prefix} v={v:.3f}V p={p}%") except Exception as e: try: print(f"[PWR]{prefix} read_failed: {e}") except: pass def _clear_http_events(): while at_client.pop_http_event() is not None: pass def _parse_httpid(resp: str): m = re.search(r"\+MHTTPCREATE:\s*(\d+)", resp) return int(m.group(1)) if m else None def _get_ip(): r = at("AT+CGPADDR=1", "OK", 3000) m = re.search(r'\+CGPADDR:\s*1,"([^"]+)"', r) return m.group(1) if m else "" def _ensure_pdp(): ip = _get_ip() if ip and ip != "0.0.0.0": return True, ip at("AT+MIPCALL=1,1", "OK", 15000) for _ in range(10): ip = _get_ip() if ip and ip != "0.0.0.0": return True, ip time.sleep(1) return False, ip def _extract_hdr_fields(hdr_text: str): mlen = re.search(r"Content-Length:\s*(\d+)", hdr_text, re.IGNORECASE) clen = int(mlen.group(1)) if mlen else None mmd5 = re.search(r"Content-Md5:\s*([A-Za-z0-9+/=]+)", hdr_text, re.IGNORECASE) md5_b64 = mmd5.group(1).strip() if mmd5 else None return clen, md5_b64 def _extract_content_range(hdr_text: str): m = re.search(r"Content-Range:\s*bytes\s*(\d+)\s*-\s*(\d+)\s*/\s*(\d+)", hdr_text, re.IGNORECASE) if not m: return None, None, None try: return int(m.group(1)), int(m.group(2)), int(m.group(3)) except: return None, None, None def _hard_reset_http(): """ 模块进入“坏状态”时的保守清场: - 清空 ATClient 的事件队列,避免串台 - 删除 0..5 的 httpid(常见固件槽位范围),尽量把内部 HTTP 状态机拉回干净 注意:很慢,所以只在连续异常时调用。 """ _clear_http_events() for i in range(0, 6): try: at(f"AT+MHTTPDEL={i}", "OK", 1200) except: pass _clear_http_events() def _create_httpid(full_reset=False): _clear_http_events() if full_reset: _hard_reset_http() resp = at(f'AT+MHTTPCREATE="{base_url}"', "OK", 8000) hid = _parse_httpid(resp) return hid, resp def _fetch_range_into_buf(start, want_len, out_buf, full_reset=False): """ 请求 Range [start, start+want_len) ,写入 out_buf(bytearray,长度=want_len) 返回 (ok, msg, total_len, md5_b64, got_len) """ end_incl = start + want_len - 1 hid, cresp = _create_httpid(full_reset=full_reset) if hid is None: return False, f"MHTTPCREATE failed: {cresp}", None, None, 0 # 降低 URC 压力(分片/延迟) at(f'AT+MHTTPCFG="fragment",{hid},{FRAG_SIZE},{FRAG_DELAY}', "OK", 1500) # 设置 Range header(inclusive) at(f'AT+MHTTPCFG="header",{hid},"Range: bytes={start}-{end_incl}"', "OK", 3000) req = at(f'AT+MHTTPREQUEST={hid},1,0,"{path}"', "OK", 15000) if "ERROR" in req or "CME ERROR" in req: at(f"AT+MHTTPDEL={hid}", "OK", 2000) return False, f"MHTTPREQUEST failed: {req}", None, None, 0 # 等 header + content # 注意:部分 ML307R 固件会把 header 分成多条 +MHTTPURC:"header" 分片吐出来, # 其中有的分片只有 Content-Length,有的只有 Content-Range。 # 因此这里需要做“累积解析”,否则会出现 resp_total=None -> no_header_or_total。 hdr_text = None hdr_accum = "" code = None resp_total = None total_len = None md5_b64 = None got_ranges = set() last_sum = 0 t0 = time.ticks_ms() # Range 场景不宜等待太久,卡住就换 hid 重来 timeout_ms = 9000 logged_hdr = False while time.ticks_ms() - t0 < timeout_ms: ev = at_client.pop_http_event() if not ev: time.sleep_ms(5) continue if ev[0] == "header": _, ehid, ecode, ehdr = ev if ehid != hid: continue code = ecode hdr_text = ehdr # 累积 header 文本并从累积内容里提取字段(避免 split header 丢字段) if ehdr: hdr_accum = (hdr_accum + "\n" + ehdr) if hdr_accum else ehdr resp_total_tmp, md5_tmp = _extract_hdr_fields(hdr_accum) if md5_tmp: md5_b64 = md5_tmp cr_s, cr_e, cr_total = _extract_content_range(hdr_accum) if cr_total is not None: total_len = cr_total # 有些 header 没有 Content-Length,但有 Content-Range(206),可由 range 计算出本次 body 长度 if resp_total_tmp is not None: resp_total = resp_total_tmp elif resp_total is None and (cr_s is not None) and (cr_e is not None) and (cr_e >= cr_s): resp_total = (cr_e - cr_s + 1) # 206 才是 Range 正常响应;部分服务器可能忽略 Range 返回 200 # 节流:每个 hid 只打一次 header(否则你会看到连续 3-4 条 [HDR],且很多 cr=None) if (not logged_hdr) and (resp_total is not None or total_len is not None): _log(f"[HDR] id={hid} code={code} clen={resp_total} cr={cr_s}-{cr_e}/{cr_total}") logged_hdr = True continue if ev[0] == "content": _, ehid, _total, _sum, _cur, payload = ev if ehid != hid: continue if resp_total is None: resp_total = _total if resp_total is None or resp_total <= 0: continue start_rel = _sum - _cur end_rel = _sum if start_rel < 0 or start_rel >= resp_total: continue if end_rel > resp_total: end_rel = resp_total actual_len = min(len(payload), end_rel - start_rel) if actual_len <= 0: continue out_buf[start_rel:start_rel + actual_len] = payload[:actual_len] got_ranges.add((start_rel, start_rel + actual_len)) if _sum > last_sum: last_sum = _sum # 进度日志节流 if debug and (last_sum >= resp_total or (last_sum % 512 == 0)): _log(f"[CHUNK] {start}+{last_sum}/{resp_total}") # 收齐就退出 if last_sum >= resp_total: break # 清理实例(快路径:只删当前 hid) try: at(f"AT+MHTTPDEL={hid}", "OK", 2000) except: pass if resp_total is None: return False, "no_header_or_total", total_len, md5_b64, 0 # 计算实际填充长度 merged = sorted(got_ranges) filled = 0 prev = 0 for s, e in merged: if e <= s: continue if s > prev: # 有洞 pass prev = max(prev, e) # 重新合并算 filled merged2 = [] for s, e in merged: if not merged2 or s > merged2[-1][1]: merged2.append((s, e)) else: merged2[-1] = (merged2[-1][0], max(merged2[-1][1], e)) filled = sum(e - s for s, e in merged2) if filled < resp_total: return False, f"incomplete_chunk got={filled} expected={resp_total} code={code}", total_len, md5_b64, filled got_len = resp_total # 如果服务器忽略 Range 返回 200,resp_total 可能是整文件,这里允许 want_len 不匹配 return True, "OK", total_len, md5_b64, got_len global ota_in_progress try: ota_in_progress = int(ota_in_progress) + 1 except: ota_in_progress = 1 with uart4g_lock: try: ok_pdp, ip = _ensure_pdp() if not ok_pdp: return False, f"PDP not ready (ip={ip})" # 先清空旧事件,避免串台 _clear_http_events() # 为了支持随机写入,先创建空文件 try: with open(filename, "wb") as f: f.write(b"") except Exception as e: return False, f"open_file_failed: {e}" total_len = None expect_md5_b64 = None offset = 0 chunk = CHUNK_MAX t_start = time.ticks_ms() last_progress_ms = t_start STALL_TIMEOUT_MS = 60000 # 60s 没有任何 offset 推进则判定卡死 last_pwr_ms = t_start _pwr_log(prefix=" ota_start") bad_http_state = 0 # 连续“疑似模块 HTTP 坏状态”的计数,达到阈值才做 full reset while True: now = time.ticks_ms() # debug:每 5 秒打印一次电压/电量 + 进度 if debug and time.ticks_diff(now, last_pwr_ms) >= 5000: last_pwr_ms = now _pwr_log(prefix=f" off={offset}/{total_len or '?'}") # 超时保护:整体(很宽,避免“慢但在推进”的情况误判失败) if time.ticks_diff(now, t_start) > total_timeout_ms: return False, f"timeout overall after {total_timeout_ms}ms offset={offset} total={total_len}" # 超时保护:无进展(关键) if time.ticks_diff(now, last_progress_ms) > STALL_TIMEOUT_MS: return False, f"timeout stalled {STALL_TIMEOUT_MS}ms offset={offset} total={total_len}" if total_len is not None and offset >= total_len: break want = chunk if total_len is not None: remain = total_len - offset if remain <= 0: break if want > remain: want = remain # 本 chunk 的 buffer(长度=want) buf = bytearray(want) success = False last_err = "unknown" md5_seen = None got_len = 0 for k in range(1, CHUNK_RETRIES + 1): # 只有在连续坏状态时才 full reset,否则只删当前 hid(更快) do_full_reset = (bad_http_state >= 2) ok, msg, tlen, md5_b64, got = _fetch_range_into_buf(offset, want, buf, full_reset=do_full_reset) last_err = msg if tlen is not None and total_len is None: total_len = tlen if md5_b64 and not expect_md5_b64: expect_md5_b64 = md5_b64 if ok: success = True got_len = got bad_http_state = 0 break # 判定是否属于“模块 HTTP 坏状态”(header-only/no header/request err 等) try: if ("no_header_or_total" in msg) or ("MHTTPREQUEST failed" in msg) or ("MHTTPCREATE failed" in msg): bad_http_state += 1 else: bad_http_state = max(0, bad_http_state - 1) except: pass # 失败:缩小 chunk,提高成功率 if chunk > CHUNK_MIN: chunk = max(CHUNK_MIN, chunk // 2) want = min(chunk, want) buf = bytearray(want) _log(f"[RETRY] off={offset} want={want} try={k} err={msg}") _pwr_log(prefix=f" retry{k} off={offset}") time.sleep_ms(120) if not success: return False, f"chunk_failed off={offset} want={want} err={last_err} total={total_len}" # 写入文件(注意:got_len 可能 > want(服务器忽略 Range 返回 200)) # 只写入当前请求的 want 字节(buf),避免越界 try: with open(filename, "r+b") as f: f.seek(offset) f.write(bytes(buf)) except Exception as e: return False, f"write_failed off={offset}: {e}" offset += len(buf) last_progress_ms = time.ticks_ms() # 成功推进后恢复 chunk chunk = CHUNK_MAX if debug: _log(f"[OK] offset={offset}/{total_len or '?'}") # 可选:如果有 Content-Md5 且 hashlib 可用,做校验(Range 响应未必会提供 md5) if expect_md5_b64 and hashlib is not None: try: with open(filename, "rb") as f: data = f.read() digest = hashlib.md5(data).digest() got_b64 = binascii.b2a_base64(digest).decode().strip() if got_b64 != expect_md5_b64: return False, f"md5_mismatch got={got_b64} expected={expect_md5_b64}" except Exception as e: return False, f"md5_check_failed: {e}" t_cost = time.ticks_diff(time.ticks_ms(), t_func0) return True, f"OK size={offset} ip={ip} cost_ms={t_cost}" finally: try: ota_in_progress = max(0, int(ota_in_progress) - 1) except: ota_in_progress = 0 def direct_ota_download_via_4g(ota_url): """通过 4G 模块下载 OTA(不需要 Wi-Fi)""" global update_thread_started, ota_in_progress, tcp_connected try: t_ota0 = time.ticks_ms() if not ota_url: safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2) return # OTA 全程暂停 TCP(避免心跳/重连抢占 uart4g_lock,导致 server 断链 + HTTP URC 更容易丢) try: ota_in_progress = int(ota_in_progress) + 1 except: ota_in_progress = 1 # 主动断开 AT TCP,减少 +MIPURC 噪声干扰 HTTP URC 下载 tcp_connected = False try: with uart4g_lock: at("AT+MIPCLOSE=0", "OK", 1500) except: pass print(f"[OTA-4G] 开始通过 4G 下载: {ota_url}") # 重要说明: # - AT+MDIALUP / RNDIS 是“USB 主机拨号上网”模式,在不少 ML307R 固件上会占用/切换内部网络栈, # 从而导致 AT+MIPOPEN / +MIPURC 这套 TCP 连接无法工作(你会看到一直“连接到服务器...”)。 # - 这个设备当前 4G 是走 UART + AT Socket(MIPOPEN),并没有把 4G 变成系统网卡(如 ppp0)。 # 因此这里不再自动拨号/改路由;只有当系统本来就有 default route(例如 eth0 已联网)时,才尝试走 requests 下载。 ok_sys = False msg_sys = "" try: if _has_default_route(): # 1) 先试原始 URL(可能是 https) ok_sys, msg_sys = _download_file_system_bytes(ota_url, local_filename, timeout_s=30) if (not ok_sys) and isinstance(ota_url, str) and ota_url.startswith("https://static.shelingxingqiu.com/"): # 2) 部分系统 SSL 不完整:对固定域名降级到 http 再试一次 http_url = "http://" + ota_url[len("https://"):] ok_sys, msg_sys = _download_file_system_bytes(http_url, local_filename, timeout_s=30) else: ok_sys = False msg_sys = "no_default_route (system network not available)" except Exception as e: ok_sys = False msg_sys = f"system_download_exception: {e}" if ok_sys: print(f"[OTA-4G] system {msg_sys}") if apply_ota_and_reboot(ota_url): return print(f"[OTA-4G] system failed: {msg_sys} -> fallback to URC download") # debug:进入 URC 下载前打印一次电压/电量(防止下载太快看不到 [PWR] 周期日志) try: v = get_bus_voltage() p = voltage_to_percent(v) print(f"[OTA-4G][PWR] before_urc v={v:.3f}V p={p}%") except Exception as e: print(f"[OTA-4G][PWR] before_urc read_failed: {e}") t_dl0 = time.ticks_ms() success, msg = download_file_via_4g(ota_url, local_filename, debug=True) t_dl_cost = time.ticks_diff(time.ticks_ms(), t_dl0) print(f"[OTA-4G] {msg}") print(f"[OTA-4G] download_cost_ms={t_dl_cost}") if success and "OK" in msg: if apply_ota_and_reboot(ota_url): return else: safe_enqueue({"result": msg_sys or msg}, 2) except Exception as e: error_msg = f"OTA-4G 异常: {str(e)}" print(error_msg) safe_enqueue({"result": "ota_failed", "reason": error_msg}, 2) finally: # 总耗时(注意:若成功并 reboot,这行可能来不及打印) try: t_cost = time.ticks_diff(time.ticks_ms(), t_ota0) print(f"[OTA-4G] total_cost_ms={t_cost}") except: pass update_thread_started = False # 对应上面的 ota_in_progress +1 try: ota_in_progress = max(0, int(ota_in_progress) - 1) except: ota_in_progress = 0 # ==================== 主程序入口 ==================== def cmd_str(): global DEVICE_ID, PASSWORD DEVICE_ID = read_device_id() PASSWORD = DEVICE_ID + "." # 创建照片存储目录 photo_dir = "/root/phot" if photo_dir not in os.listdir("/root"): try: os.mkdir(photo_dir) except: pass # 初始化硬件 init_ina226() load_laser_point() disp = display.Display() cam = camera.Camera(640, 480) # 启动通信与校准线程 # from tcp_handler import tcp_main _thread.start_new_thread(tcp_main, ()) _thread.start_new_thread(laser_calibration_worker, ()) print("系统准备完成...") last_adc_trigger = 0 # 主循环:检测扳机触发 → 拍照 → 分析 → 上报 while not app.need_exit(): current_time = time.ticks_ms() # OTA 期间尽量省电:暂停相机预览/拍照/分析,只保留最低频率的循环 try: if int(globals().get("ota_in_progress", 0)) > 0: time.sleep_ms(250) continue except: pass # print("压力传感器数值: ", adc_obj.read()) adc_val = adc_obj.read() # if adc_val > 2400: # print(f"adc: {adc_val}") if adc_val > ADC_TRIGGER_THRESHOLD: diff_ms = current_time-last_adc_trigger if diff_ms<3000: continue last_adc_trigger = current_time time.sleep_ms(60) # 防抖 frame = cam.read() x, y = laser_point # 绘制激光十字线 frame.draw_line(int(x - length), int(y), int(x + length), int(y), color, thickness) frame.draw_line(int(x), int(y - length), int(x), int(y + length), color, thickness) frame.draw_circle(int(x), int(y), 1, color, thickness) # 检测靶心 result_img, center, radius, method, best_radius1 = detect_circle(frame) disp.show(result_img) # 计算偏移与距离 dx, dy = compute_laser_position(center, (x, y), radius, method) distance_m = estimate_distance(best_radius1) # 读取电量 voltage = get_bus_voltage() battery_percent = voltage_to_percent(voltage) # 保存图像(带标注) try: jpg_count = len([f for f in os.listdir(photo_dir) if f.endswith('.jpg')]) filename = f"{photo_dir}/{int(x)}_{int(y)}_{round((distance_m or 0.0) * 100)}_{method}_{jpg_count:04d}.jpg" result_img.save(filename, quality=70) except Exception as e: print(f"❌ 保存失败: {e}") # 构造上报数据 inner_data = { "x": float(dx) if dx is not None else 200.0, "y": float(dy) if dy is not None else 200.0, "r": 90.0, "d": round((distance_m or 0.0) * 100), # 距离(厘米) "m": method, "adc": adc_val } report_data = {"cmd": 1, "data": inner_data} # 射箭事件高优先级入队,由 tcp_main 统一发送 safe_enqueue(report_data, msg_type=2, high=True) print("📤 射箭事件已加入发送队列") # 闪一下激光(射箭反馈) # TODO: remove after test done flash_laser(1000) # 闪300ms,可以根据需要调整时长 time.sleep_ms(100) else: disp.show(cam.read()) time.sleep_ms(50) def dump_system_info(): cmds = [ "uname -a", "cat /etc/os-release 2>/dev/null", "cat /proc/version 2>/dev/null", "cat /proc/1/comm 2>/dev/null", "ps 2>/dev/null | head -n 5", "ls -l /sbin/init 2>/dev/null", "ls -l /etc/init.d 2>/dev/null | head -n 10", "which systemctl 2>/dev/null", "which rc-service 2>/dev/null", "which busybox 2>/dev/null && busybox | head -n 1", "ls /dev/watchdog* 2>/dev/null", ] for c in cmds: try: out = os.popen(c).read() print("\n$ " + c + "\n" + (out.strip() or "")) except Exception as e: print("\n$ " + c + "\n " + str(e)) if __name__ == "__main__": # dump_system_info() # try: # import threading # print("threading module:", threading) # print("has Lock:", hasattr(threading, "Lock")) # if hasattr(threading, "Lock"): # print("has lock") # finally: # pass # import config # print("env: ", config.get_env()) cmd_str()