From e712e11ea09d28a736e02b9da83e4b5c754a4dcb Mon Sep 17 00:00:00 2001 From: huangzhenwei2 <10934114+huangzhenwei2@user.noreply.gitee.com> Date: Sun, 28 Dec 2025 16:22:41 +0800 Subject: [PATCH] ota update --- main.py | 803 +++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 673 insertions(+), 130 deletions(-) diff --git a/main.py b/main.py index 03804a6..583d63e 100644 --- a/main.py +++ b/main.py @@ -7,29 +7,88 @@ 作者:ZZH 最后更新:2025-11-21 """ -import _thread -import hashlib -import hmac -import json -import os -import re -import socket -import struct - +from maix import camera, display, image, app, time, key, uart, pinmap, i2c, network, err import cv2 import numpy as np -import requests -import ujson -from maix import camera, display, image, app, time, key, uart, pinmap, i2c, network, err +import json +import struct +import re from maix.peripheral import adc +import _thread +import os +import hmac +import ujson +import hashlib +import requests +import socket +import re +import binascii + +try: + import hashlib +except: + hashlib = None # import config +# ==================== Locks ==================== + +class _Mutex: + """ + 基于 _thread.allocate_lock() 的互斥锁封装: + - 支持 with + - 支持 try_acquire(若固件不支持非阻塞 acquire 参数,则退化为阻塞 acquire) + """ + def __init__(self): + self._lk = _thread.allocate_lock() + + def acquire(self, blocking=True): + try: + return self._lk.acquire(blocking) + except TypeError: + self._lk.acquire() + return True + + def try_acquire(self): + return self.acquire(False) + + def release(self): + self._lk.release() + + def __enter__(self): + self.acquire(True) + return self + + def __exit__(self, exc_type, exc, tb): + self.release() + return False + # ==================== 全局配置 ==================== # OTA 升级地址与本地路径 -url = "https://static.shelingxingqiu.com/shoot/202511031031/main.py" -local_filename = "/maixapp/apps/t11/main.py" +# url = "https://static.shelingxingqiu.com/shoot/202511031031/main.py" +local_filename = "/maixapp/apps/t11/main_tmp.py" +app_version = '1.0.0' +# OTA 下发参数(由后端指令写入) +OTA_URL = None +OTA_MODE = None # "4g" / "wifi" / None + +def is_wifi_connected(): + """尽量判断当前是否有 Wi-Fi(有则走 Wi-Fi OTA,否则走 4G OTA)""" + # 优先用 MaixPy network(如果可用) + try: + wlan = network.WLAN(network.TYPE_WIFI) + if wlan.isconnected(): + return True + except: + pass + + # 兜底:看系统 wlan0 有没有 IP(你系统可能没有 wlan0,则返回 False) + try: + ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() + return bool(ip) + except: + return False # 设备认证信息(运行时动态加载) DEVICE_ID = None @@ -38,7 +97,7 @@ PASSWORD = None # 服务器连接参数 SERVER_IP = "www.shelingxingqiu.com" SERVER_PORT = 50005 -HEARTBEAT_INTERVAL = 5 # 心跳间隔(秒) +HEARTBEAT_INTERVAL = 60 # 心跳间隔(秒) # 激光校准配置 CONFIG_FILE = "/root/laser_config.json" @@ -50,7 +109,7 @@ URL = "http://ws.shelingxingqiu.com" API_PATH = "/home/shoot/device_fire/arrow/fire" # UART 设备初始化 -uart4g = uart.UART("/dev/ttyS2", 115200) # 4G 模块通信 +uart4g = uart.UART("/dev/ttyS2", 115200) # 4G 模块通信 distance_serial = uart.UART("/dev/ttyS1", 9600) # 激光测距模块 # 引脚功能映射 @@ -72,14 +131,14 @@ thickness = 1 length = 2 # 全局状态变量 -laser_calibration_active = False # 是否正在后台校准激光 -laser_calibration_result = None # 校准结果坐标 (x, y) -laser_calibration_lock = False # 简易互斥锁,防止多线程冲突 +laser_calibration_active = False # 是否正在后台校准激光 +laser_calibration_result = None # 校准结果坐标 (x, y) +laser_calibration_lock = _Mutex() # 互斥锁,防止多线程冲突 # 硬件对象初始化 laser_x, laser_y = laser_point adc_obj = adc.ADC(0, adc.RES_BIT_12) -bus = i2c.I2C(1, i2c.Mode.MASTER) # 使用 I2C1 总线 +bus = i2c.I2C(1, i2c.Mode.MASTER) # 使用 I2C1 总线 # bus = i2c.I2C(5, i2c.Mode.MASTER) #ota升级的 # INA226 电流/电压监测芯片寄存器地址 INA226_ADDR = 0x40 @@ -95,16 +154,17 @@ LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0x # 相机标定参数(用于距离估算) # FOCAL_LENGTH_PIX = 3800.0 # 焦距(像素) -FOCAL_LENGTH_PIX = 1900.0 # 焦距(像素) -REAL_RADIUS_CM = 15 # 靶心实际半径(厘米) +FOCAL_LENGTH_PIX = 1900.0 # 焦距(像素) +REAL_RADIUS_CM = 15 # 靶心实际半径(厘米) # TCP 连接状态 tcp_connected = False -high_send_queue = [] # 高优先级发送队列:射箭事件等 -normal_send_queue = [] # 普通发送队列:电量/校准结果/状态等 -queue_lock = False # 简易互斥锁,保护队列 -uart4g_lock = False # 简易互斥锁,保护 4G 串口 AT 发送流程(防并发) +high_send_queue = [] # 高优先级发送队列:射箭事件等 +normal_send_queue = [] # 普通发送队列:电量/校准结果/状态等 +queue_lock = _Mutex() # 互斥锁,保护队列 +uart4g_lock = _Mutex() # 互斥锁,保护 4G 串口 AT 发送流程(防并发) update_thread_started = False # 防止 OTA 更新线程重复启动 +ota_in_progress = False # OTA(4G HTTP URC) 期间暂停 tcp_main 读取 uart4g,避免吞掉 +MHTTPURC # ==================== 工具函数 ==================== @@ -140,16 +200,94 @@ def is_server_reachable(host, port=80, timeout=5): print(f"[NET] 无法连接 {host}:{port} - {e}") return False -def direct_ota_download(): +def apply_ota_and_reboot(ota_url=None): + """ + OTA 文件下载成功后:备份原 main.py -> 替换 main_tmp.py -> 重启设备 + """ + import shutil + + main_py = "/maixapp/apps/t11/main.py" + main_tmp = "/maixapp/apps/t11/main_tmp.py" + main_bak = "/maixapp/apps/t11/main.py.bak" + ota_pending = "/maixapp/apps/t11/ota_pending.json" + + try: + # 1. 检查下载的文件是否存在 + if not os.path.exists(main_tmp): + print(f"[OTA] 错误:{main_tmp} 不存在") + return False + + # 2. 备份原 main.py(如果存在) + if os.path.exists(main_py): + try: + shutil.copy2(main_py, main_bak) + print(f"[OTA] 已备份 {main_py} -> {main_bak}") + except Exception as e: + print(f"[OTA] 备份失败: {e}") + # 备份失败也继续(可能没有原文件) + + # 3. 替换:main_tmp.py -> main.py + try: + shutil.copy2(main_tmp, main_py) + print(f"[OTA] 已替换 {main_tmp} -> {main_py}") + + # 确保写入磁盘 + try: + os.sync() # 如果系统支持 + except: + pass + time.sleep_ms(500) # 额外等待确保写入完成 + + except Exception as e: + print(f"[OTA] 替换失败: {e}") + return False + + # 3.5 写入 pending(用于重启后确认成功并上报) + try: + pending_obj = { + "ts": int(time.time()) if hasattr(time, "time") else 0, + "url": ota_url or "", + "tmp": main_tmp, + "main": main_py, + "bak": main_bak, + } + with open(ota_pending, "w", encoding="utf-8") as f: + json.dump(pending_obj, f) + try: + os.sync() + except: + pass + except Exception as e: + print(f"[OTA] 写入 ota_pending 失败: {e}") + + # 4. 通知服务器(可选,但重启前发一次) + safe_enqueue({"result": "ota_applied_rebooting"}, 2) + time.sleep_ms(1000) # 给一点时间让消息发出 + + # 5. 重启设备 + print("[OTA] 准备重启设备...") + os.system("reboot") # MaixPy 通常是这个命令 + + return True + + except Exception as e: + print(f"[OTA] apply_ota_and_reboot 异常: {e}") + return False + + +def direct_ota_download(ota_url): """ 直接执行 OTA 下载(假设已有网络) - 用于 cmd=7 触发 + 用于 cmd=7 / 或 wifi 模式 """ global update_thread_started try: - # 再次确认网络可达(可选但推荐) + if not ota_url: + safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2) + return + from urllib.parse import urlparse - parsed_url = urlparse(url) + parsed_url = urlparse(ota_url) host = parsed_url.hostname port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80) @@ -157,21 +295,28 @@ def direct_ota_download(): safe_enqueue({"result": "ota_failed", "reason": f"无法连接 {host}:{port}"}, 2) return - print(f"[OTA] 开始直接下载固件...") - result_msg = download_file(url, local_filename) + print(f"[OTA] 开始下载: {ota_url}") + result_msg = download_file(ota_url, local_filename) print(f"[OTA] {result_msg}") - safe_enqueue({"result": result_msg}, 2) + + # 检查是否下载成功(包含"成功"或"下载成功"关键字) + if "成功" in result_msg or "下载成功" in result_msg: + # 下载成功:备份+替换+重启 + if apply_ota_and_reboot(ota_url): + return # 会重启,不会执行到 finally + else: + safe_enqueue({"result": result_msg}, 2) except Exception as e: error_msg = f"OTA 异常: {str(e)}" print(error_msg) safe_enqueue({"result": "ota_failed", "reason": error_msg}, 2) finally: - update_thread_started = False # 允许下次 OTA + update_thread_started = False - -def handle_wifi_and_update(ssid, password): +def handle_wifi_and_update(ssid, password, ota_url): """在子线程中执行 Wi-Fi 连接 + OTA 更新流程""" + global update_thread_started try: ip, error = connect_wifi(ssid, password) if error: @@ -179,9 +324,13 @@ def handle_wifi_and_update(ssid, password): return safe_enqueue({"result": "wifi_connected", "ip": ip}, 2) - # 解析 OTA 地址并测试连通性 + # 下载 + if not ota_url: + safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2) + return + from urllib.parse import urlparse - parsed_url = urlparse(url) + parsed_url = urlparse(ota_url) host = parsed_url.hostname port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80) @@ -191,9 +340,16 @@ def handle_wifi_and_update(ssid, password): return print(f"[NET] 已确认可访问 {host}:{port},开始下载...") - result = download_file(url, local_filename) + result = download_file(ota_url, local_filename) print(result) - safe_enqueue({"result": result}, 2) + + # 检查是否下载成功(包含"成功"或"下载成功"关键字) + if "成功" in result or "下载成功" in result: + # 下载成功:备份+替换+重启 + if apply_ota_and_reboot(ota_url): + return # 会重启,不会执行到 finally + else: + safe_enqueue({"result": result}, 2) finally: update_thread_started = False @@ -259,27 +415,22 @@ def read_device_id(): def safe_enqueue(data_dict, msg_type=2, high=False): """线程安全地将消息加入 TCP 发送队列(支持优先级)""" global queue_lock, high_send_queue, normal_send_queue - while queue_lock: - time.sleep_ms(1) - queue_lock = True item = (msg_type, data_dict) - if high: - high_send_queue.append(item) - else: - normal_send_queue.append(item) - queue_lock = False + with queue_lock: + if high: + high_send_queue.append(item) + else: + normal_send_queue.append(item) def _uart4g_lock_acquire(): - global uart4g_lock - while uart4g_lock: - time.sleep_ms(1) - uart4g_lock = True + # 兼容旧调用:建议改为 `with uart4g_lock:` + uart4g_lock.acquire(True) def _uart4g_lock_release(): - global uart4g_lock - uart4g_lock = False + # 兼容旧调用:建议改为 `with uart4g_lock:` + uart4g_lock.release() def at(cmd, wait="OK", timeout=2000): @@ -323,8 +474,7 @@ def tcp_send_raw(data: bytes, max_retries=2) -> bool: global tcp_connected if not tcp_connected: return False - _uart4g_lock_acquire() - try: + with uart4g_lock: for attempt in range(max_retries): cmd = f'AT+MIPSEND=0,{len(data)}' if ">" not in at(cmd, ">", 1500): @@ -345,8 +495,6 @@ def tcp_send_raw(data: bytes, max_retries=2) -> bool: time.sleep_ms(100) return False - finally: - _uart4g_lock_release() def generate_token(device_id): @@ -444,7 +592,7 @@ def find_red_laser(frame, threshold=150): for y in range(0, h, 2): for x in range(0, w, 2): idx = (y * w + x) * 3 - r, g, b = img_bytes[idx], img_bytes[idx + 1], img_bytes[idx + 2] + r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2] if r > threshold and r > g * 2 and r > b * 2: rgb_sum = r + g + b if rgb_sum > max_sum: @@ -591,19 +739,6 @@ def compute_laser_position(circle_center, laser_point, radius, method): return dx / (circle_r / 100.0), -dy / (circle_r / 100.0) -def compute_laser_position_v2(circle_center, laser_point): - if circle_center is None: - return 200, 200 - cx, cy = circle_center - lx, ly = 320, 220 - dx = lx - cx - dy = ly - cy - r = 22.16 * 5 - target_x = dx / r * 100 - target_y = dy / r * 100 - print(f"lx:{lx} ly: {ly} cx: {cx} cy: {cy} dx: {dx} dy: {dy} result_x: {target_x} result_y: {-target_y}") - return (target_x, -target_y) - # ==================== TCP 通信线程 ==================== def connect_server(): @@ -612,12 +747,9 @@ def connect_server(): if tcp_connected: return True print("连接到服务器...") - _uart4g_lock_acquire() - try: + with uart4g_lock: at("AT+MIPCLOSE=0", "OK", 1000) res = at(f'AT+MIPOPEN=0,"TCP","{SERVER_IP}",{SERVER_PORT}', "+MIPOPEN", 8000) - finally: - _uart4g_lock_release() if "+MIPOPEN: 0,0" in res: tcp_connected = True return True @@ -648,6 +780,13 @@ def tcp_main(): while True: # 接收数据 + # OTA(4G HTTP) 会从 uart4g 吐出大量 +MHTTPURC 数据; + # tcp_main 若在此 read,会把 URC 吃掉,导致 OTA empty_body/incomplete_body。 + # 同时 uart4g_lock 置 True 时也不要抢读。 + if ota_in_progress: + time.sleep_ms(20) + continue + data = uart4g.read() if data: rx_buf += data @@ -666,6 +805,23 @@ def tcp_main(): logged_in = True last_heartbeat_ack_time = time.ticks_ms() print("✅ 登录成功") + # 若存在 ota_pending.json,说明上次 OTA 已应用并重启; + # 这里以“能成功登录服务器”为 OTA 成功判据:上报 ota_ok 并删除 pending,确保只上报一次。 + try: + pending_path = "/maixapp/apps/t11/ota_pending.json" + if os.path.exists(pending_path): + try: + with open(pending_path, "r", encoding="utf-8") as f: + pending_obj = json.load(f) + except: + pending_obj = {} + safe_enqueue({"result": "ota_ok", "url": pending_obj.get("url", "")}, 2) + try: + os.remove(pending_path) + except: + pass + except Exception as e: + print(f"[OTA] ota_ok 上报失败: {e}") else: break @@ -693,23 +849,44 @@ def tcp_main(): battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)} safe_enqueue(battery_data, 2) print(f"🔋 电量上报: {battery_percent}%") - elif inner_cmd == 5: # OTA 升级(含 Wi-Fi 配置) + elif inner_cmd == 5: # OTA 升级(含 Wi-Fi 配置,及4g) inner_data = body["data"].get("data", {}) + ssid = inner_data.get("ssid") password = inner_data.get("password") - if not ssid or not password: - safe_enqueue({"result": "missing_ssid_or_password"}, 2) + ota_url = inner_data.get("url") + mode = (inner_data.get("mode") or "").strip().lower() # "4g"/"wifi"/"" + + if not ota_url: + print("ota missing_url") + safe_enqueue({"result": "missing_url"}, 2) + rx_buf = rx_buf[match.end():] # 关键:先消费掉这个包 + continue + + # 自动判断:mode 非法/为空时,优先 Wi-Fi(如果已连),否则 4G + if mode not in ("4g", "wifi"): + print("ota missing mode") + mode = "wifi" if is_wifi_connected() else "4g" + + if update_thread_started: + safe_enqueue({"result": "update_already_started"}, 2) + rx_buf = rx_buf[match.end():] # 关键:先消费掉这个包 + continue + + update_thread_started = True + + if mode == "4g": + _thread.start_new_thread(direct_ota_download_via_4g, (ota_url,)) else: - # global update_thread_started - if not update_thread_started: - update_thread_started = True - _thread.start_new_thread(handle_wifi_and_update, (ssid, password)) + # wifi 模式:需要 ssid/password + if not ssid or not password: + update_thread_started = False + safe_enqueue({"result": "missing_ssid_or_password"}, 2) else: - safe_enqueue({"result": "update_already_started"}, 2) + _thread.start_new_thread(handle_wifi_and_update, (ssid, password, ota_url)) elif inner_cmd == 6: try: - ip = os.popen( - "ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() + ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() ip = ip if ip else "no_ip" except: ip = "error_getting_ip" @@ -718,18 +895,17 @@ def tcp_main(): # global update_thread_started if update_thread_started: safe_enqueue({"result": "update_already_started"}, 2) + rx_buf = rx_buf[match.end():] # 关键:先消费掉这个包 continue # 实时检查是否有 IP try: - ip = os.popen( - "ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() + ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() except: ip = None if not ip: - safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, - MSG_TYPE_STATUS) + safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, MSG_TYPE_STATUS) else: # 启动纯下载线程 update_thread_started = True @@ -743,29 +919,34 @@ def tcp_main(): break # 发送队列中的业务数据 - if logged_in and (high_send_queue or normal_send_queue) and (not queue_lock): + if logged_in and (high_send_queue or normal_send_queue): # 只在锁内取出一个待发包,发送放到锁外,避免长时间占用队列锁 - while queue_lock: - time.sleep_ms(1) - queue_lock = True - if high_send_queue: - msg_type, data_dict = high_send_queue.pop(0) - else: - msg_type, data_dict = normal_send_queue.pop(0) - queue_lock = False + msg_type = None + data_dict = None + if queue_lock.try_acquire(): + try: + if high_send_queue: + msg_type, data_dict = high_send_queue.pop(0) + elif normal_send_queue: + msg_type, data_dict = normal_send_queue.pop(0) + finally: + queue_lock.release() - pkt = make_packet(msg_type, data_dict) - if not tcp_send_raw(pkt): - tcp_connected = False - break + if msg_type is not None and data_dict is not None: + pkt = make_packet(msg_type, data_dict) + if not tcp_send_raw(pkt): + tcp_connected = False + break # 发送激光校准结果 - if logged_in and not laser_calibration_lock and laser_calibration_result is not None: - laser_calibration_lock = True - x, y = laser_calibration_result - safe_enqueue({"result": "ok", "x": x, "y": y}, 2) - laser_calibration_result = None - laser_calibration_lock = False + if logged_in and laser_calibration_result is not None: + x = y = None + with laser_calibration_lock: + if laser_calibration_result is not None: + x, y = laser_calibration_result + laser_calibration_result = None + if x is not None and y is not None: + safe_enqueue({"result": "ok", "x": x, "y": y}, 2) # 定期发送心跳 current_time = time.ticks_ms() @@ -785,7 +966,7 @@ def tcp_main(): print("💓 心跳已发送") # 心跳超时重连 - if logged_in and current_time - last_heartbeat_ack_time > 1000 * 60 * 10: # 十分钟 + if logged_in and current_time - last_heartbeat_ack_time > 1000*60*10: # 十分钟 print("⏰ 十分钟无心跳ACK,重连") break @@ -803,12 +984,9 @@ def laser_calibration_worker(): if laser_calibration_active: result = calibrate_laser_position() if result and len(result) == 2: - while laser_calibration_lock: - time.sleep_ms(1) - laser_calibration_lock = True - laser_calibration_result = result - laser_calibration_active = False - laser_calibration_lock = False + with laser_calibration_lock: + laser_calibration_result = result + laser_calibration_active = False print(f"✅ 后台校准成功: {result}") else: time.sleep_ms(80) @@ -816,6 +994,342 @@ def laser_calibration_worker(): time.sleep_ms(50) + + +def download_file_via_4g(url, filename, + total_timeout_ms=30000, + retries=3, + debug=False): + """ + ML307R HTTP 下载(URC content 分片模式) + - 重试:empty/incomplete/AT错误都会重试 + - 超时:total_timeout_ms + - 校验:Content-Length 必须填满;如有 Content-Md5 且 hashlib 可用则校验 MD5 + - 日志:默认干净;debug=True 才打印 URC 进度 + """ + from urllib.parse import urlparse + parsed = urlparse(url) + host = parsed.hostname + path = parsed.path or "/" + base_url = f"http://{host}" # 你已验证 HTTP 可 200;如需 https 需另配 SSL + + def _log(*args): + if debug: + print(*args) + + def _get_ip(): + r = at("AT+CGPADDR=1", "OK", 3000) + m = re.search(r'\+CGPADDR:\s*1,"([^"]+)"', r) + return m.group(1) if m else "" + + def _drain_uart(max_ms=300): + """清空串口残留,防止旧 URC 干扰""" + t0 = time.ticks_ms() + while time.ticks_ms() - t0 < max_ms: + d = uart4g.read(4096) + if not d: + time.sleep_ms(10) + continue + + def _read_more(buf: bytes, ms=50) -> bytes: + t0 = time.ticks_ms() + while time.ticks_ms() - t0 < ms: + d = uart4g.read(4096) + if d: + buf += d + time.sleep_ms(1) + return buf + + def _parse_httpid(raw: bytes): + m = re.search(rb"\+MHTTPCREATE:\s*(\d+)", raw) + return int(m.group(1)) if m else None + + def _try_parse_header(buf: bytes): + """ + +MHTTPURC: "header",,,, + 返回 (urc_id, status_code, header_text, rest_buf) 或 None + """ + tag = b'+MHTTPURC: "header",' + i = buf.find(tag) + if i < 0: + return None + if i > 0: + buf = buf[i:] + i = 0 + + # header 在 hdr_text 里包含 \r\n;我们用 hdr_len 精确截取 + j = i + len(tag) + comma_count = 0 + k = j + while k < len(buf) and comma_count < 3: # 3 个逗号到 hdr_len 后的逗号 + if buf[k:k+1] == b",": + comma_count += 1 + k += 1 + if comma_count < 3: + return None + + header_prefix = buf[i:k] + m = re.search(rb'\+MHTTPURC: "header",\s*(\d+),\s*(\d+),\s*(\d+),', header_prefix) + if not m: + return ("drop", buf[1:]) + + urc_id = int(m.group(1)) + code = int(m.group(2)) + hdr_len = int(m.group(3)) + + text_start = k + text_end = text_start + hdr_len + if len(buf) < text_end: + return None + + hdr_text = buf[text_start:text_end].decode("utf-8", "ignore") + rest = buf[text_end:] + return ("ok", urc_id, code, hdr_text, rest) + + def _try_parse_one_content(buf: bytes): + """ + +MHTTPURC: "content",,,,, + 返回 ("ok", urc_id, total_len, sum_len, cur_len, payload_bytes, rest_buf) 或 None + """ + tag = b'+MHTTPURC: "content",' + i = buf.find(tag) + if i < 0: + return None + if i > 0: + buf = buf[i:] + i = 0 + + j = i + len(tag) + comma_count = 0 + k = j + while k < len(buf) and comma_count < 4: # payload 前只有 4 个逗号 + if buf[k:k+1] == b",": + comma_count += 1 + k += 1 + if comma_count < 4: + return None + + prefix = buf[i:k] + m = re.search(rb'\+MHTTPURC: "content",\s*(\d+),\s*(\d+),\s*(\d+),\s*(\d+),', prefix) + if not m: + return ("drop", buf[1:]) + + urc_id = int(m.group(1)) + total_len = int(m.group(2)) + sum_len = int(m.group(3)) + cur_len = int(m.group(4)) + + payload_start = k + payload_end = payload_start + cur_len + if len(buf) < payload_end: + return None + + payload = buf[payload_start:payload_end] + rest = buf[payload_end:] + return ("ok", urc_id, total_len, sum_len, cur_len, payload, rest) + + def _extract_hdr_fields(hdr_text: str): + # Content-Length + mlen = re.search(r"Content-Length:\s*(\d+)", hdr_text, re.IGNORECASE) + clen = int(mlen.group(1)) if mlen else None + + # Content-Md5 (base64) + mmd5 = re.search(r"Content-Md5:\s*([A-Za-z0-9+/=]+)", hdr_text, re.IGNORECASE) + md5_b64 = mmd5.group(1).strip() if mmd5 else None + + return clen, md5_b64 + + def _md5_base64(data: bytes) -> str: + if hashlib is None: + return "" + digest = hashlib.md5(data).digest() + # base64: 24 chars with == + return binascii.b2a_base64(digest).decode().strip() + + def _one_attempt(): + # 0) PDP:确保有 IP(避免把 OK 当成功) + ip = _get_ip() + if not ip or ip == "0.0.0.0": + at("AT+MIPCALL=1,1", "OK", 15000) + for _ in range(10): + ip = _get_ip() + if ip and ip != "0.0.0.0": + break + time.sleep(1) + if not ip or ip == "0.0.0.0": + return False, "PDP not ready (no_ip)" + + # 1) 清理旧实例 + 清空串口残留 + for i in range(0, 6): + at(f"AT+MHTTPDEL={i}", "OK", 1500) + _drain_uart(300) + + # 2) 创建实例(httpid 可能延迟吐出来) + uart4g.write((f'AT+MHTTPCREATE="{base_url}"\r\n').encode()) + raw = b"" + raw = _read_more(raw, 300) + raw = _read_more(raw, 2000) + httpid = _parse_httpid(raw) + if httpid is None: + return False, "MHTTPCREATE failed (no httpid)" + + # 3) 发 GET:不用 at()(避免 at 吃掉 URC) + _drain_uart(100) + uart4g.write((f'AT+MHTTPREQUEST={httpid},1,0,"{path}"\r\n').encode()) + + # 4) 收 URC:先拿 header(code/length/md5),再收 content 分片 + buf = b"" + urc_id = None + status_code = None + total_len = None + expect_len = None + expect_md5_b64 = None + + body_buf = None + got_ranges = set() + filled_bytes = 0 + + t0 = time.ticks_ms() + last_sum = 0 + + while time.ticks_ms() - t0 < total_timeout_ms: + buf = _read_more(buf, 50) + + # 4.1 尝试解析 header(可能先来) + while True: + ph = _try_parse_header(buf) + if ph is None: + break + if ph[0] == "drop": + buf = ph[1] + continue + _, hid, code, hdr_text, rest = ph + buf = rest + if urc_id is None: + urc_id = hid + status_code = code + expect_len, expect_md5_b64 = _extract_hdr_fields(hdr_text) + _log(f"[HDR] id={hid} code={code} len={expect_len} md5={expect_md5_b64}") + + # 4.2 解析尽可能多的 content + while True: + pc = _try_parse_one_content(buf) + if pc is None: + break + if pc[0] == "drop": + buf = pc[1] + continue + _, cid, _total, _sum, _cur, payload, rest = pc + buf = rest + + if urc_id is None: + urc_id = cid + if cid != urc_id: + continue + + if total_len is None: + total_len = _total + body_buf = bytearray(total_len) + + # 定位写入 + start = _sum - _cur + end = _sum + if start < 0 or end > total_len: + continue + + key = (start, end) + if key not in got_ranges: + got_ranges.add(key) + body_buf[start:end] = payload + filled_bytes += (end - start) + + if _sum > last_sum: + last_sum = _sum + + if debug: + _log(f"[URC] {start}:{end} sum={_sum}/{total_len} filled={filled_bytes}") + + # 完整条件:填满 total_len + if filled_bytes == total_len: + break + + if total_len is not None and filled_bytes == total_len: + break + + # 5) 清理实例 + at(f"AT+MHTTPDEL={httpid}", "OK", 3000) + + if body_buf is None: + return False, "empty_body" + if total_len is None: + return False, "no_total_len" + if filled_bytes != total_len: + return False, f"incomplete_body got={filled_bytes} expected={total_len}" + + data = bytes(body_buf) + + # 6) 校验:Content-Length + if expect_len is not None and len(data) != expect_len: + return False, f"length_mismatch got={len(data)} expected={expect_len}" + + # 7) 校验:Content-Md5(base64) + if expect_md5_b64 and hashlib is not None: + md5_b64 = _md5_base64(data) + if md5_b64 != expect_md5_b64: + return False, f"md5_mismatch got={md5_b64} expected={expect_md5_b64}" + + # 8) 写文件(原样 bytes) + with open(filename, "wb") as f: + f.write(data) + + return True, f"OK size={len(data)} ip={ip} code={status_code}" + + global ota_in_progress + ota_in_progress = True + with uart4g_lock: + try: + last_err = "unknown" + for attempt in range(1, retries + 1): + ok, msg = _one_attempt() + if ok: + return True, msg + last_err = msg + # 重试前等待 + 清 UART + _log(f"[RETRY] attempt={attempt} failed={msg}") + _drain_uart(500) + time.sleep_ms(300) + return False, f"FAILED after {retries} retries: {last_err}" + finally: + ota_in_progress = False + + +def direct_ota_download_via_4g(ota_url): + """通过 4G 模块下载 OTA(不需要 Wi-Fi)""" + global update_thread_started + try: + if not ota_url: + safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2) + return + + print(f"[OTA-4G] 开始通过 4G 下载: {ota_url}") + success, msg = download_file_via_4g(ota_url, local_filename) + print(f"[OTA-4G] {msg}") + + if success and "OK" in msg: + # 下载成功:备份+替换+重启 + if apply_ota_and_reboot(ota_url): + return # 会重启,不会执行到 finally + else: + safe_enqueue({"result": msg}, 2) + + except Exception as e: + error_msg = f"OTA-4G 异常: {str(e)}" + print(error_msg) + safe_enqueue({"result": "ota_failed", "reason": error_msg}, 2) + finally: + update_thread_started = False + + # ==================== 主程序入口 ==================== def cmd_str(): @@ -850,10 +1364,9 @@ def cmd_str(): while not app.need_exit(): current_time = time.ticks_ms() # print("压力传感器数值: ", adc_obj.read()) - adc_val = adc_obj.read() - if adc_val > ADC_TRIGGER_THRESHOLD: - diff_ms = current_time - last_adc_trigger - if diff_ms < 3000: + if adc_obj.read() > ADC_TRIGGER_THRESHOLD: + diff_ms = current_time-last_adc_trigger + if diff_ms<3000: continue last_adc_trigger = current_time time.sleep_ms(60) # 防抖 @@ -870,7 +1383,7 @@ def cmd_str(): disp.show(result_img) # 计算偏移与距离 - dx, dy = compute_laser_position_v2(center, (x, y)) + dx, dy = compute_laser_position(center, (x, y), radius, method) distance_m = estimate_distance(best_radius1) # 读取电量 @@ -878,20 +1391,20 @@ def cmd_str(): battery_percent = voltage_to_percent(voltage) # 保存图像(带标注) - # try: - # jpg_count = len([f for f in os.listdir(photo_dir) if f.endswith('.jpg')]) - # filename = f"{photo_dir}/{int(x)}_{int(y)}_{round((distance_m or 0.0) * 100)}_{method}_{jpg_count:04d}.jpg" - # result_img.save(filename, quality=70) - # except Exception as e: - # print(f"❌ 保存失败: {e}") + try: + jpg_count = len([f for f in os.listdir(photo_dir) if f.endswith('.jpg')]) + filename = f"{photo_dir}/{int(x)}_{int(y)}_{round((distance_m or 0.0) * 100)}_{method}_{jpg_count:04d}.jpg" + result_img.save(filename, quality=70) + except Exception as e: + print(f"❌ 保存失败: {e}") + # 构造上报数据 inner_data = { "x": float(dx) if dx is not None else 200.0, "y": float(dy) if dy is not None else 200.0, "r": 90.0, "d": round((distance_m or 0.0) * 100), # 距离(厘米) - "m": method, - "adc": adc_val, + "m": method } report_data = {"cmd": 1, "data": inner_data} # 射箭事件高优先级入队,由 tcp_main 统一发送 @@ -903,6 +1416,36 @@ def cmd_str(): disp.show(cam.read()) time.sleep_ms(50) +def dump_system_info(): + cmds = [ + "uname -a", + "cat /etc/os-release 2>/dev/null", + "cat /proc/version 2>/dev/null", + "cat /proc/1/comm 2>/dev/null", + "ps 2>/dev/null | head -n 5", + "ls -l /sbin/init 2>/dev/null", + "ls -l /etc/init.d 2>/dev/null | head -n 10", + "which systemctl 2>/dev/null", + "which rc-service 2>/dev/null", + "which busybox 2>/dev/null && busybox | head -n 1", + "ls /dev/watchdog* 2>/dev/null", + ] + for c in cmds: + try: + out = os.popen(c).read() + print("\n$ " + c + "\n" + (out.strip() or "")) + except Exception as e: + print("\n$ " + c + "\n " + str(e)) if __name__ == "__main__": - cmd_str() + # dump_system_info() + try: + import threading + print("threading module:", threading) + print("has Lock:", hasattr(threading, "Lock")) + if hasattr(threading, "Lock"): + print("has lock") + finally: + pass + + cmd_str() \ No newline at end of file