diff --git a/main.py b/main.py index 9228697..4cb9503 100644 --- a/main.py +++ b/main.py @@ -63,6 +63,273 @@ class _Mutex: self.release() return False + +class ATClient: + """ + 单读者 AT/URC 客户端:唯一读取 uart4g,避免 tcp_main/at()/OTA 抢读导致 EOF / 丢包。 + - send(cmd, expect, timeout_ms) : 发送 AT 并等待 expect + - pop_tcp_payload() : 获取 +MIPURC:"rtcp" 的 payload(已按长度裁剪) + - pop_http_event() : 获取 +MHTTPURC 事件(header/content) + """ + def __init__(self, uart_obj): + self.uart = uart_obj + self._cmd_lock = _Mutex() + self._q_lock = _Mutex() + self._rx = b"" + self._tcp_payloads = [] + self._http_events = [] + + # 当前命令等待状态(仅允许单命令 in-flight) + self._waiting = False + self._expect = b"OK" + self._resp = b"" + + self._running = False + + def start(self): + if self._running: + return + self._running = True + _thread.start_new_thread(self._reader_loop, ()) + + def stop(self): + self._running = False + + def flush(self): + """清空内部缓存与队列(用于 OTA/异常恢复)""" + with self._q_lock: + self._rx = b"" + self._tcp_payloads.clear() + self._http_events.clear() + self._resp = b"" + + def pop_tcp_payload(self): + with self._q_lock: + if self._tcp_payloads: + return self._tcp_payloads.pop(0) + return None + + def pop_http_event(self): + with self._q_lock: + if self._http_events: + return self._http_events.pop(0) + return None + + def _push_tcp_payload(self, payload: bytes): + # 注意:在 _reader_loop 内部解析 URC 时已经持有 _q_lock, + # 这里不要再次 acquire(锁不可重入,会死锁)。 + self._tcp_payloads.append(payload) + + def _push_http_event(self, ev): + # 同上:避免在 _reader_loop 持锁期间二次 acquire + self._http_events.append(ev) + + def send(self, cmd: str, expect: str = "OK", timeout_ms: int = 2000): + """ + 发送 AT 命令并等待 expect(子串匹配)。 + 注意:expect=">" 用于等待 prompt。 + """ + expect_b = expect.encode() if isinstance(expect, str) else expect + with self._cmd_lock: + # 初始化等待 + self._waiting = True + self._expect = expect_b + self._resp = b"" + + # 发送 + if cmd: + # 注意:这里不要再用 uart4g_lock(否则外层已经持锁时会死锁)。 + # 写入由 _cmd_lock 串行化即可。 + self.uart.write((cmd + "\r\n").encode()) + + t0 = time.ticks_ms() + while time.ticks_ms() - t0 < timeout_ms: + if (not self._waiting) or (self._expect in self._resp): + self._waiting = False + break + time.sleep_ms(5) + + # 超时也返回已收集内容(便于诊断) + self._waiting = False + try: + return self._resp.decode(errors="ignore") + except: + return str(self._resp) + + def _parse_mipurc_rtcp(self): + """ + 解析:+MIPURC: "rtcp",,, + 之前硬编码 link_id=0 会导致在多连接/重连场景下收不到数据。 + """ + prefix = b'+MIPURC: "rtcp",' + i = self._rx.find(prefix) + if i < 0: + return False + # 丢掉前置噪声 + if i > 0: + self._rx = self._rx[i:] + i = 0 + + j = len(prefix) + # 解析 link_id + k = j + while k < len(self._rx) and 48 <= self._rx[k] <= 57: + k += 1 + if k == j or k >= len(self._rx): + return False + if self._rx[k:k+1] != b",": + self._rx = self._rx[1:] + return True + try: + link_id = int(self._rx[j:k].decode()) + except: + self._rx = self._rx[1:] + return True + + # 解析 len + j2 = k + 1 + k2 = j2 + while k2 < len(self._rx) and 48 <= self._rx[k2] <= 57: + k2 += 1 + if k2 == j2 or k2 >= len(self._rx): + return False + if self._rx[k2:k2+1] != b",": + self._rx = self._rx[1:] + return True + try: + n = int(self._rx[j2:k2].decode()) + except: + self._rx = self._rx[1:] + return True + + payload_start = k2 + 1 + payload_end = payload_start + n + if len(self._rx) < payload_end: + return False # payload 未收齐 + + payload = self._rx[payload_start:payload_end] + # 把 link_id 一起带上,便于上层过滤(如果需要) + self._push_tcp_payload((link_id, payload)) + self._rx = self._rx[payload_end:] + return True + + def _parse_mhttpurc_header(self): + tag = b'+MHTTPURC: "header",' + i = self._rx.find(tag) + if i < 0: + return False + if i > 0: + self._rx = self._rx[i:] + i = 0 + + # header: +MHTTPURC: "header",,,, + j = len(tag) + comma_count = 0 + k = j + while k < len(self._rx) and comma_count < 3: + if self._rx[k:k+1] == b",": + comma_count += 1 + k += 1 + if comma_count < 3: + return False + + prefix = self._rx[:k] + m = re.search(rb'\+MHTTPURC: "header",\s*(\d+),\s*(\d+),\s*(\d+),', prefix) + if not m: + self._rx = self._rx[1:] + return True + urc_id = int(m.group(1)) + code = int(m.group(2)) + hdr_len = int(m.group(3)) + + text_start = k + text_end = text_start + hdr_len + if len(self._rx) < text_end: + return False + + hdr_text = self._rx[text_start:text_end].decode("utf-8", "ignore") + self._push_http_event(("header", urc_id, code, hdr_text)) + self._rx = self._rx[text_end:] + return True + + def _parse_mhttpurc_content(self): + tag = b'+MHTTPURC: "content",' + i = self._rx.find(tag) + if i < 0: + return False + if i > 0: + self._rx = self._rx[i:] + i = 0 + + # content: +MHTTPURC: "content",,,,, + j = len(tag) + comma_count = 0 + k = j + while k < len(self._rx) and comma_count < 4: + if self._rx[k:k+1] == b",": + comma_count += 1 + k += 1 + if comma_count < 4: + return False + + prefix = self._rx[:k] + m = re.search(rb'\+MHTTPURC: "content",\s*(\d+),\s*(\d+),\s*(\d+),\s*(\d+),', prefix) + if not m: + self._rx = self._rx[1:] + return True + urc_id = int(m.group(1)) + total_len = int(m.group(2)) + sum_len = int(m.group(3)) + cur_len = int(m.group(4)) + + payload_start = k + payload_end = payload_start + cur_len + if len(self._rx) < payload_end: + return False + + payload = self._rx[payload_start:payload_end] + self._push_http_event(("content", urc_id, total_len, sum_len, cur_len, payload)) + self._rx = self._rx[payload_end:] + return True + + def _reader_loop(self): + while self._running: + d = self.uart.read(4096) + if not d: + time.sleep_ms(2) + continue + + with self._q_lock: + # 统一累积到内部 buffer(用于 URC 解析) + self._rx += d + # 命令等待期间,把原始字节流复制到响应缓冲(不影响 URC 解析) + if self._waiting: + self._resp += d + + # 解析 URC:尽可能多地从 _rx 中剥离完整 URC,避免丢包 + while True: + progressed = ( + self._parse_mipurc_rtcp() + or self._parse_mhttpurc_header() + or self._parse_mhttpurc_content() + ) + if not progressed: + break + + # 防止 _rx 因为"非 URC 文本/回显"无限增长:保留尾部即可 + # 关键修复:如果 buffer 中有 HTTP URC 标签(说明 OTA 在进行),完全禁用截断 + # 避免在 OTA 下载时截断 buffer 导致数据丢失(之前 16KB 限制太小,导致数据被截断) + has_http_urc = (b'+MHTTPURC: "content"' in self._rx or + b'+MHTTPURC: "header"' in self._rx) + if has_http_urc: + # OTA 下载中:完全禁用 buffer 截断,避免数据丢失 + # 通常 OTA 文件不会超过几百 KB,即使不截断也不会导致内存问题 + pass # 不截断 + else: + # 非 OTA 状态:使用较小的 buffer 限制(16KB) + if len(self._rx) > 16384: + self._rx = self._rx[-4096:] + # ==================== 全局配置 ==================== # OTA 升级地址与本地路径 @@ -97,7 +364,7 @@ PASSWORD = None # 服务器连接参数 SERVER_IP = "www.shelingxingqiu.com" SERVER_PORT = 50005 -HEARTBEAT_INTERVAL = 6 # 心跳间隔(秒) +HEARTBEAT_INTERVAL = 60 # 心跳间隔(秒) # 激光校准配置 CONFIG_FILE = "/root/laser_config.json" @@ -112,6 +379,10 @@ API_PATH = "/home/shoot/device_fire/arrow/fire" uart4g = uart.UART("/dev/ttyS2", 115200) # 4G 模块通信 distance_serial = uart.UART("/dev/ttyS1", 9600) # 激光测距模块 +# 单读者 ATClient(唯一读取 uart4g) +at_client = ATClient(uart4g) +at_client.start() + # 引脚功能映射 pinmap.set_pin_function("A18", "UART1_RX") pinmap.set_pin_function("A19", "UART1_TX") @@ -157,7 +428,7 @@ LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0x FOCAL_LENGTH_PIX = 1900.0 # 焦距(像素) REAL_RADIUS_CM = 15 # 靶心实际半径(厘米) -# TCP 连接状态 +# # TCP 连接状态 tcp_connected = False high_send_queue = [] # 高优先级发送队列:射箭事件等 normal_send_queue = [] # 普通发送队列:电量/校准结果/状态等 @@ -166,7 +437,6 @@ uart4g_lock = _Mutex() # 互斥锁,保护 4G 串口 AT 发送流程(防并 update_thread_started = False # 防止 OTA 更新线程重复启动 ota_in_progress = False # OTA(4G HTTP URC) 期间暂停 tcp_main 读取 uart4g,避免吞掉 +MHTTPURC - # ==================== 工具函数 ==================== def download_file(url, filename): @@ -201,6 +471,9 @@ def is_server_reachable(host, port=80, timeout=5): return False def apply_ota_and_reboot(ota_url=None): + # TODO: remove this return after test + # return True + """ OTA 文件下载成功后:备份原 main.py -> 替换 main_tmp.py -> 重启设备 """ @@ -296,6 +569,7 @@ def direct_ota_download(ota_url): return print(f"[OTA] 开始下载: {ota_url}") + # from ota import download_file result_msg = download_file(ota_url, local_filename) print(f"[OTA] {result_msg}") @@ -422,30 +696,10 @@ def safe_enqueue(data_dict, msg_type=2, high=False): else: normal_send_queue.append(item) - -def _uart4g_lock_acquire(): - # 兼容旧调用:建议改为 `with uart4g_lock:` - uart4g_lock.acquire(True) - - -def _uart4g_lock_release(): - # 兼容旧调用:建议改为 `with uart4g_lock:` - uart4g_lock.release() - - def at(cmd, wait="OK", timeout=2000): """向 4G 模块发送 AT 指令并等待响应""" - if cmd: - uart4g.write((cmd + "\r\n").encode()) - t0 = time.ticks_ms() - buf = b"" - while time.ticks_ms() - t0 < timeout: - data = uart4g.read() - if data: - buf += data - if wait.encode() in buf: - return buf.decode(errors="ignore") - return buf.decode(errors="ignore") + # 统一由 ATClient 负责读 uart4g,避免多线程抢读 + return at_client.send(cmd, wait, timeout) def make_packet(msg_type: int, body_dict: dict) -> bytes: @@ -494,7 +748,7 @@ def tcp_send_raw(data: bytes, max_retries=2) -> bool: uart4g.write(b"\x1A") # 等发送完成确认(不同固件可能是 SEND OK / OK / +MIPSEND) - r = at("", "OK", 5000) + r = at("", "OK", 8000) if ("SEND OK" in r) or ("OK" in r) or ("+MIPSEND" in r): return True @@ -518,15 +772,8 @@ def send_http_cmd(cmd_str, timeout_ms=3000): def read_http_response(timeout_ms=5000): """读取并打印 HTTP 响应(用于调试)""" - start = time.ticks_ms() - while time.ticks_ms() - start < timeout_ms: - data = uart4g.read(128) - if data: - try: - print("📡 HTTP 响应:", data.decode("utf-8", "ignore").strip()) - except: - print("📡 响应(raw):", data) - time.sleep_ms(100) + # 仅保留占位:UART 读取由 ATClient 独占;如需调试,请从 ATClient 的 http_events 中取。 + time.sleep_ms(timeout_ms) def upload_shoot_event(json_data): @@ -588,6 +835,15 @@ def turn_on_laser(): print("🔇 无回包(正常或模块不支持)") return resp +def flash_laser(duration_ms=1000): + """闪一下激光(用于射箭反馈)""" + try: + distance_serial.write(LASER_ON_CMD) + time.sleep_ms(duration_ms) + distance_serial.write(LASER_OFF_CMD) + except Exception as e: + print(f"闪激光失败: {e}") + def find_red_laser(frame, threshold=150): """在图像中查找最亮的红色激光点(基于 RGB 阈值)""" @@ -772,7 +1028,7 @@ def tcp_main(): continue # 发送登录包 - login_data = {"deviceId": DEVICE_ID, "password": PASSWORD} + login_data = {"deviceId": DEVICE_ID, "password": PASSWORD, "version": app_version} if not tcp_send_raw(make_packet(1, login_data)): tcp_connected = False time.sleep_ms(2000) @@ -782,154 +1038,146 @@ def tcp_main(): logged_in = False last_heartbeat_ack_time = time.ticks_ms() last_heartbeat_send_time = time.ticks_ms() - rx_buf = b"" - while True: - # 接收数据 - # OTA(4G HTTP) 会从 uart4g 吐出大量 +MHTTPURC 数据; - # tcp_main 若在此 read,会把 URC 吃掉,导致 OTA empty_body/incomplete_body。 - # 同时 uart4g_lock 置 True 时也不要抢读。 - if ota_in_progress: - time.sleep_ms(20) - continue + # 接收数据(唯一来源:ATClient 解析后的 TCP payload 队列) + item = at_client.pop_tcp_payload() + if item: + # item 可能是 (link_id, payload) 或直接 payload(兼容旧队列格式) + if isinstance(item, tuple) and len(item) == 2: + link_id, payload = item + else: + link_id, payload = 0, item - if not uart4g_lock.try_acquire(): - time.sleep_ms(1) - continue - try: - data = uart4g.read() - finally: - uart4g_lock.release() - if data: - rx_buf += data - # 解析 +MIPURC 消息 - while b'+MIPURC: "rtcp"' in rx_buf: + # 登录阶段加一条轻量 debug,确认 ACK 是否进入队列 + if not logged_in: try: - match = re.search(b'\+MIPURC: "rtcp",0,(\d+),(.+)', rx_buf, re.DOTALL) - if match: - payload_len = int(match.group(1)) - payload = match.group(2)[:payload_len] - msg_type, body = parse_packet(payload) - - # 处理登录响应 - if not logged_in and msg_type == 1: - if body and body.get("cmd") == 1 and body.get("data") == "登录成功": - logged_in = True - last_heartbeat_ack_time = time.ticks_ms() - print("✅ 登录成功") - # 若存在 ota_pending.json,说明上次 OTA 已应用并重启; - # 这里以“能成功登录服务器”为 OTA 成功判据:上报 ota_ok 并删除 pending,确保只上报一次。 - try: - pending_path = "/maixapp/apps/t11/ota_pending.json" - if os.path.exists(pending_path): - try: - with open(pending_path, "r", encoding="utf-8") as f: - pending_obj = json.load(f) - except: - pending_obj = {} - safe_enqueue({"result": "ota_ok", "url": pending_obj.get("url", "")}, 2) - try: - os.remove(pending_path) - except: - pass - except Exception as e: - print(f"[OTA] ota_ok 上报失败: {e}") - else: - break - - # 处理心跳 ACK - elif logged_in and msg_type == 4: - last_heartbeat_ack_time = time.ticks_ms() - print("✅ 收到心跳确认") - - # 处理业务指令 - elif logged_in and isinstance(body, dict): - if isinstance(body.get("data"), dict) and "cmd" in body["data"]: - inner_cmd = body["data"]["cmd"] - if inner_cmd == 2: # 开启激光并校准 - turn_on_laser() - time.sleep_ms(100) - laser_calibration_active = True - safe_enqueue({"result": "calibrating"}, 2) - elif inner_cmd == 3: # 关闭激光 - distance_serial.write(LASER_OFF_CMD) - laser_calibration_active = False - safe_enqueue({"result": "laser_off"}, 2) - elif inner_cmd == 4: # 上报电量 - voltage = get_bus_voltage() - battery_percent = voltage_to_percent(voltage) - battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)} - safe_enqueue(battery_data, 2) - print(f"🔋 电量上报: {battery_percent}%") - elif inner_cmd == 5: # OTA 升级(含 Wi-Fi 配置,及4g) - inner_data = body["data"].get("data", {}) - - ssid = inner_data.get("ssid") - password = inner_data.get("password") - ota_url = inner_data.get("url") - mode = (inner_data.get("mode") or "").strip().lower() # "4g"/"wifi"/"" - - if not ota_url: - print("ota missing_url") - safe_enqueue({"result": "missing_url"}, 2) - rx_buf = rx_buf[match.end():] # 关键:先消费掉这个包 - continue - - # 自动判断:mode 非法/为空时,优先 Wi-Fi(如果已连),否则 4G - if mode not in ("4g", "wifi"): - print("ota missing mode") - mode = "wifi" if is_wifi_connected() else "4g" - - if update_thread_started: - safe_enqueue({"result": "update_already_started"}, 2) - rx_buf = rx_buf[match.end():] # 关键:先消费掉这个包 - continue - - update_thread_started = True - - if mode == "4g": - _thread.start_new_thread(direct_ota_download_via_4g, (ota_url,)) - else: - # wifi 模式:需要 ssid/password - if not ssid or not password: - update_thread_started = False - safe_enqueue({"result": "missing_ssid_or_password"}, 2) - else: - _thread.start_new_thread(handle_wifi_and_update, (ssid, password, ota_url)) - elif inner_cmd == 6: - try: - ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() - ip = ip if ip else "no_ip" - except: - ip = "error_getting_ip" - safe_enqueue({"result": "current_ip", "ip": ip}, 2) - elif inner_cmd == 7: - # global update_thread_started - if update_thread_started: - safe_enqueue({"result": "update_already_started"}, 2) - rx_buf = rx_buf[match.end():] # 关键:先消费掉这个包 - continue - - # 实时检查是否有 IP - try: - ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() - except: - ip = None - - if not ip: - safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, MSG_TYPE_STATUS) - else: - # 启动纯下载线程 - update_thread_started = True - _thread.start_new_thread(direct_ota_download, ()) - - rx_buf = rx_buf[match.end():] - else: - break + print(f"[TCP] rx link={link_id} len={len(payload)} head={payload[:12].hex()}") except: - rx_buf = b"" + pass + + msg_type, body = parse_packet(payload) + + # 处理登录响应 + if not logged_in and msg_type == 1: + if body and body.get("cmd") == 1 and body.get("data") == "登录成功": + logged_in = True + last_heartbeat_ack_time = time.ticks_ms() + print("✅ 登录成功") + # 若存在 ota_pending.json,说明上次 OTA 已应用并重启; + # 这里以“能成功登录服务器”为 OTA 成功判据:上报 ota_ok 并删除 pending,确保只上报一次。 + try: + pending_path = "/maixapp/apps/t11/ota_pending.json" + if os.path.exists(pending_path): + try: + with open(pending_path, "r", encoding="utf-8") as f: + pending_obj = json.load(f) + except: + pending_obj = {} + safe_enqueue({"result": "ota_ok", "url": pending_obj.get("url", "")}, 2) + try: + os.remove(pending_path) + except: + pass + except Exception as e: + print(f"[OTA] ota_ok 上报失败: {e}") + else: + # 登录失败,跳出重连 break + # 处理心跳 ACK + elif logged_in and msg_type == 4: + last_heartbeat_ack_time = time.ticks_ms() + print("✅ 收到心跳确认") + + # 处理业务指令 + elif logged_in and isinstance(body, dict): + # 重要:每个包都要重新解析 inner_cmd,避免上一次的 cmd “粘住”导致反复执行 + inner_cmd = None + data_obj = body.get("data") + if isinstance(data_obj, dict): + inner_cmd = data_obj.get("cmd") + + if inner_cmd == 2: # 开启激光并校准 + # 幂等:正在校准则不重复触发(服务器可能重发 cmd=2) + if not laser_calibration_active: + turn_on_laser() + time.sleep_ms(100) + laser_calibration_active = True + safe_enqueue({"result": "calibrating"}, 2) + elif inner_cmd == 3: # 关闭激光 + distance_serial.write(LASER_OFF_CMD) + laser_calibration_active = False + safe_enqueue({"result": "laser_off"}, 2) + elif inner_cmd == 4: # 上报电量 + voltage = get_bus_voltage() + battery_percent = voltage_to_percent(voltage) + battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)} + safe_enqueue(battery_data, 2) + print(f"🔋 电量上报: {battery_percent}%") + elif inner_cmd == 5: # OTA 升级(含 Wi-Fi 配置,及4g) + inner_data = data_obj.get("data", {}) if isinstance(data_obj, dict) else {} + + ssid = inner_data.get("ssid") + password = inner_data.get("password") + ota_url = inner_data.get("url") + mode = (inner_data.get("mode") or "").strip().lower() # "4g"/"wifi"/"" + + if not ota_url: + print("ota missing_url") + safe_enqueue({"result": "missing_url"}, 2) + continue + + # 自动判断:mode 非法/为空时,优先 Wi-Fi(如果已连),否则 4G + if mode not in ("4g", "wifi"): + print("ota missing mode") + mode = "wifi" if is_wifi_connected() else "4g" + + if update_thread_started: + safe_enqueue({"result": "update_already_started"}, 2) + continue + + update_thread_started = True + + if mode == "4g": + _thread.start_new_thread(direct_ota_download_via_4g, (ota_url,)) + else: + # wifi 模式:需要 ssid/password + if not ssid or not password: + update_thread_started = False + safe_enqueue({"result": "missing_ssid_or_password"}, 2) + else: + _thread.start_new_thread(handle_wifi_and_update, (ssid, password, ota_url)) + elif inner_cmd == 6: + try: + ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() + ip = ip if ip else "no_ip" + except: + ip = "error_getting_ip" + safe_enqueue({"result": "current_ip", "ip": ip}, 2) + elif inner_cmd == 7: + # global update_thread_started + if update_thread_started: + safe_enqueue({"result": "update_already_started"}, 2) + continue + + # 实时检查是否有 IP + try: + ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() + except: + ip = None + + if not ip: + safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, MSG_TYPE_STATUS) + else: + # 启动纯下载线程 + update_thread_started = True + _thread.start_new_thread(direct_ota_download, ()) + else: + # 非指令包(或未携带 cmd),不做任何动作 + pass + else: + time.sleep_ms(5) + # 发送队列中的业务数据 if logged_in and (high_send_queue or normal_send_queue): # 只在锁内取出一个待发包,发送放到锁外,避免长时间占用队列锁 @@ -963,7 +1211,8 @@ def tcp_main(): # 定期发送心跳 current_time = time.ticks_ms() if logged_in and current_time - last_heartbeat_send_time > HEARTBEAT_INTERVAL * 1000: - if not tcp_send_raw(make_packet(4, {"t": int(time.time())})): + vol_val = get_bus_voltage() + if not tcp_send_raw(make_packet(4, {"vol":vol_val, "vol_per":voltage_to_percent(vol_val)})): print("💔 心跳发送失败") send_hartbeat_fail_count += 1 if send_hartbeat_fail_count >= 3: @@ -994,14 +1243,38 @@ def laser_calibration_worker(): global laser_calibration_active, laser_calibration_result, laser_calibration_lock while True: if laser_calibration_active: - result = calibrate_laser_position() - if result and len(result) == 2: - with laser_calibration_lock: - laser_calibration_result = result - laser_calibration_active = False - print(f"✅ 后台校准成功: {result}") - else: - time.sleep_ms(80) + # 关键:不要在每次尝试里反复 new Camera(会导致 MMF 反复初始化刷屏) + cam = None + try: + cam = camera.Camera(640, 480) + start = time.ticks_ms() + timeout_ms = 8000 # 8 秒内找不到红点就退出一次,避免一直占用资源 + while laser_calibration_active and time.ticks_diff(time.ticks_ms(), start) < timeout_ms: + frame = cam.read() + pos = find_red_laser(frame) + if pos: + with laser_calibration_lock: + laser_calibration_result = pos + laser_calibration_active = False + save_laser_point(pos) + print(f"✅ 后台校准成功: {pos}") + break + time.sleep_ms(60) + except Exception as e: + # 出错时也不要死循环刷屏 + print(f"[LASER] calibration error: {e}") + time.sleep_ms(200) + finally: + try: + # 释放摄像头资源(MaixPy 通常靠 GC,但显式 del 更稳) + if cam is not None: + del cam + except: + pass + + # 如果超时仍未成功,稍微休息一下再允许下一次 cmd=2 触发 + if laser_calibration_active: + time.sleep_ms(300) else: time.sleep_ms(50) @@ -1029,116 +1302,66 @@ def download_file_via_4g(url, filename, if debug: print(*args) + def _merge_ranges(ranges_iter): + """合并重叠/相邻区间,返回 merged(list[(s,e)])(半开区间)""" + rs = sorted(ranges_iter) + merged = [] + for s, e in rs: + if e <= s: + continue + if merged and s <= merged[-1][1]: + merged[-1] = (merged[-1][0], max(merged[-1][1], e)) + else: + merged.append((s, e)) + return merged + + def _compute_gaps(total_len, got_ranges): + """根据已填充区间计算缺口(半开区间)""" + if not total_len or total_len <= 0: + return [(0, 0)] + merged = _merge_ranges(got_ranges) + gaps = [] + prev = 0 + for s, e in merged: + if s > prev: + gaps.append((prev, s)) + prev = max(prev, e) + if prev < total_len: + gaps.append((prev, total_len)) + return gaps, merged + + def _extract_content_range(hdr_text: str): + """ + Content-Range: bytes -/ + 返回 (start, end, total);解析失败返回 (None,None,None) + """ + m = re.search(r"Content-Range:\s*bytes\s*(\d+)\s*-\s*(\d+)\s*/\s*(\d+)", hdr_text, re.IGNORECASE) + if not m: + return None, None, None + try: + return int(m.group(1)), int(m.group(2)), int(m.group(3)) + except: + return None, None, None + def _get_ip(): r = at("AT+CGPADDR=1", "OK", 3000) m = re.search(r'\+CGPADDR:\s*1,"([^"]+)"', r) return m.group(1) if m else "" - def _drain_uart(max_ms=300): - """清空串口残留,防止旧 URC 干扰""" - t0 = time.ticks_ms() - while time.ticks_ms() - t0 < max_ms: - d = uart4g.read(4096) - if not d: - time.sleep_ms(10) - continue + def _clear_http_events(): + # 清空旧的 HTTP URC 事件,避免串台 + while at_client.pop_http_event() is not None: + pass - def _read_more(buf: bytes, ms=50) -> bytes: - t0 = time.ticks_ms() - while time.ticks_ms() - t0 < ms: - d = uart4g.read(4096) - if d: - buf += d - time.sleep_ms(1) - return buf + # 旧版基于直接 uart4g.read 的解析已迁移到 ATClient(单读者),保留函数占位避免大改动 def _parse_httpid(raw: bytes): m = re.search(rb"\+MHTTPCREATE:\s*(\d+)", raw) return int(m.group(1)) if m else None - def _try_parse_header(buf: bytes): - """ - +MHTTPURC: "header",,,, - 返回 (urc_id, status_code, header_text, rest_buf) 或 None - """ - tag = b'+MHTTPURC: "header",' - i = buf.find(tag) - if i < 0: - return None - if i > 0: - buf = buf[i:] - i = 0 + # _try_parse_header/_try_parse_one_content 已由 ATClient 在 reader 线程中解析并推送事件 - # header 在 hdr_text 里包含 \r\n;我们用 hdr_len 精确截取 - j = i + len(tag) - comma_count = 0 - k = j - while k < len(buf) and comma_count < 3: # 3 个逗号到 hdr_len 后的逗号 - if buf[k:k+1] == b",": - comma_count += 1 - k += 1 - if comma_count < 3: - return None - - header_prefix = buf[i:k] - m = re.search(rb'\+MHTTPURC: "header",\s*(\d+),\s*(\d+),\s*(\d+),', header_prefix) - if not m: - return ("drop", buf[1:]) - - urc_id = int(m.group(1)) - code = int(m.group(2)) - hdr_len = int(m.group(3)) - - text_start = k - text_end = text_start + hdr_len - if len(buf) < text_end: - return None - - hdr_text = buf[text_start:text_end].decode("utf-8", "ignore") - rest = buf[text_end:] - return ("ok", urc_id, code, hdr_text, rest) - - def _try_parse_one_content(buf: bytes): - """ - +MHTTPURC: "content",,,,, - 返回 ("ok", urc_id, total_len, sum_len, cur_len, payload_bytes, rest_buf) 或 None - """ - tag = b'+MHTTPURC: "content",' - i = buf.find(tag) - if i < 0: - return None - if i > 0: - buf = buf[i:] - i = 0 - - j = i + len(tag) - comma_count = 0 - k = j - while k < len(buf) and comma_count < 4: # payload 前只有 4 个逗号 - if buf[k:k+1] == b",": - comma_count += 1 - k += 1 - if comma_count < 4: - return None - - prefix = buf[i:k] - m = re.search(rb'\+MHTTPURC: "content",\s*(\d+),\s*(\d+),\s*(\d+),\s*(\d+),', prefix) - if not m: - return ("drop", buf[1:]) - - urc_id = int(m.group(1)) - total_len = int(m.group(2)) - sum_len = int(m.group(3)) - cur_len = int(m.group(4)) - - payload_start = k - payload_end = payload_start + cur_len - if len(buf) < payload_end: - return None - - payload = buf[payload_start:payload_end] - rest = buf[payload_end:] - return ("ok", urc_id, total_len, sum_len, cur_len, payload, rest) + # _try_parse_one_content 已由 ATClient 解析 def _extract_hdr_fields(hdr_text: str): # Content-Length @@ -1158,7 +1381,10 @@ def download_file_via_4g(url, filename, # base64: 24 chars with == return binascii.b2a_base64(digest).decode().strip() - def _one_attempt(): + def _one_attempt(range_start=None, range_end=None, + body_buf=None, got_ranges=None, + total_len=None, + expect_md5_b64=None): # 0) PDP:确保有 IP(避免把 OK 当成功) ip = _get_ip() if not ip or ip == "0.0.0.0": @@ -1169,148 +1395,285 @@ def download_file_via_4g(url, filename, break time.sleep(1) if not ip or ip == "0.0.0.0": - return False, "PDP not ready (no_ip)" + return False, "PDP not ready (no_ip)", body_buf, got_ranges, total_len, expect_md5_b64 - # 1) 清理旧实例 + 清空串口残留 + # 1) 清理旧实例 + 清空旧 HTTP 事件 for i in range(0, 6): at(f"AT+MHTTPDEL={i}", "OK", 1500) - _drain_uart(300) + _clear_http_events() - # 2) 创建实例(httpid 可能延迟吐出来) - uart4g.write((f'AT+MHTTPCREATE="{base_url}"\r\n').encode()) - raw = b"" - raw = _read_more(raw, 300) - raw = _read_more(raw, 2000) - httpid = _parse_httpid(raw) + # 2) 创建实例(用 at() 等待返回) + create_resp = at(f'AT+MHTTPCREATE="{base_url}"', "OK", 8000) + httpid = _parse_httpid(create_resp.encode()) if httpid is None: - return False, "MHTTPCREATE failed (no httpid)" + return False, "MHTTPCREATE failed (no httpid)", body_buf, got_ranges, total_len, expect_md5_b64 - # 3) 发 GET:不用 at()(避免 at 吃掉 URC) - _drain_uart(100) - uart4g.write((f'AT+MHTTPREQUEST={httpid},1,0,"{path}"\r\n').encode()) + # 2.5) Range 补洞:按缺口请求指定字节段(HTTP Range 右端是 inclusive) + if range_start is not None and range_end is not None: + # 每次请求使用新 httpid,避免 header 累积/污染 + at(f'AT+MHTTPCFG="header",{httpid},"Range: bytes={int(range_start)}-{int(range_end)}"', "OK", 3000) - # 4) 收 URC:先拿 header(code/length/md5),再收 content 分片 - buf = b"" + # 3) 发 GET(HTTP URC 由 ATClient 解析并入队) + req_resp = at(f'AT+MHTTPREQUEST={httpid},1,0,"{path}"', "OK", 15000) + if "ERROR" in req_resp or "CME ERROR" in req_resp: + at(f"AT+MHTTPDEL={httpid}", "OK", 3000) + return False, f"MHTTPREQUEST failed: {req_resp}", body_buf, got_ranges, total_len, expect_md5_b64 + + # 4) 从 ATClient 的 http_events 队列收 header/content urc_id = None status_code = None - total_len = None expect_len = None - expect_md5_b64 = None - - body_buf = None - got_ranges = set() - filled_bytes = 0 + # 若是 Range 响应(206),需要把响应内的偏移映射到“全文件”偏移 + offset_base = 0 + # got_ranges 记录“真实写入 body_buf 的半开区间” + if got_ranges is None: + got_ranges = set() + filled_new_bytes = 0 + last_sum = 0 + no_progress_count = 0 # 连续没有进展的次数 + last_print_ms = time.ticks_ms() + last_print_sum = 0 t0 = time.ticks_ms() - last_sum = 0 - while time.ticks_ms() - t0 < total_timeout_ms: - buf = _read_more(buf, 50) - - # 4.1 尝试解析 header(可能先来) - while True: - ph = _try_parse_header(buf) - if ph is None: + ev = at_client.pop_http_event() + if not ev: + # 如果 sum 已经达到 total_len,但仍有 gaps,等待更长时间(有些分片可能延迟到达) + if total_len and last_sum >= total_len: + gaps_now, merged_now = _compute_gaps(total_len, got_ranges) + if gaps_now and not (len(gaps_now) == 1 and gaps_now[0] == (0, 0)): + time.sleep_ms(50) + else: + time.sleep_ms(5) + else: + time.sleep_ms(5) + no_progress_count += 1 + # 如果长时间没有新事件,且 sum 已经达到 total_len,认为接收完成(可能有丢包) + if no_progress_count > 100 and total_len and last_sum >= total_len: break - if ph[0] == "drop": - buf = ph[1] - continue - _, hid, code, hdr_text, rest = ph - buf = rest + continue + + no_progress_count = 0 # 有事件,重置计数器 + + if ev[0] == "header": + _, hid, code, hdr_text = ev if urc_id is None: urc_id = hid - status_code = code - expect_len, expect_md5_b64 = _extract_hdr_fields(hdr_text) - _log(f"[HDR] id={hid} code={code} len={expect_len} md5={expect_md5_b64}") - - # 4.2 解析尽可能多的 content - while True: - pc = _try_parse_one_content(buf) - if pc is None: - break - if pc[0] == "drop": - buf = pc[1] + if hid != urc_id: continue - _, cid, _total, _sum, _cur, payload, rest = pc - buf = rest + status_code = code + expect_len, md5_b64 = _extract_hdr_fields(hdr_text) + # 只在“首次全量 header”里保留 Content-Md5;Range 响应通常不带该字段 + if md5_b64: + expect_md5_b64 = md5_b64 + cr_s, cr_e, cr_total = _extract_content_range(hdr_text) + if cr_s is not None and cr_total is not None: + # 206 Partial Content + offset_base = cr_s + # Content-Range end 是 inclusive;总长度以 total 为准 + if total_len is None: + total_len = cr_total + elif total_len != cr_total: + _log(f"[WARN] total_len changed {total_len}->{cr_total}") + total_len = cr_total + if body_buf is None and total_len: + body_buf = bytearray(total_len) + + _log(f"[HDR] id={hid} code={code} len={expect_len} md5={expect_md5_b64}") + continue + + if ev[0] == "content": + _, cid, _total, _sum, _cur, payload = ev if urc_id is None: urc_id = cid if cid != urc_id: continue - if total_len is None: - total_len = _total - body_buf = bytearray(total_len) - - # 定位写入 - start = _sum - _cur - end = _sum - if start < 0 or end > total_len: + # 全量 200:这里的 _total 就是全文件长度;Range 206:_total 可能只是“本次响应体长度” + if body_buf is None: + # 如果 header 没解析出 Content-Range,总长度用 content 的 _total + if total_len is None: + total_len = _total + if total_len: + body_buf = bytearray(total_len) + if body_buf is None or total_len is None: continue - key = (start, end) - if key not in got_ranges: - got_ranges.add(key) - body_buf[start:end] = payload - filled_bytes += (end - start) + rel_start = _sum - _cur + rel_end = _sum + abs_start = offset_base + rel_start + abs_end = offset_base + rel_end + if abs_start < 0 or abs_start >= total_len: + continue + if abs_end < abs_start: + continue + if abs_end > total_len: + abs_end = total_len + expected_span = abs_end - abs_start + actual_len = min(len(payload), expected_span) + if actual_len <= 0: + continue + + # 写入并记录“实际写入区间”,用于 gap 计算 + body_buf[abs_start:abs_start + actual_len] = payload[:actual_len] + got_ranges.add((abs_start, abs_start + actual_len)) + filled_new_bytes += actual_len + + # 记录最大的 sum 值,用于判断是否所有数据都已发送 if _sum > last_sum: last_sum = _sum + # debug 输出节流:每 ~8000 字节或 >=500ms 输出一次,避免 print 导致 UART 丢包 if debug: - _log(f"[URC] {start}:{end} sum={_sum}/{total_len} filled={filled_bytes}") + now = time.ticks_ms() + if (time.ticks_diff(now, last_print_ms) >= 500) or (_sum - last_print_sum >= 8000) or (rel_end == _total): + _log(f"[URC] {abs_start}:{abs_start+actual_len} sum={_sum}/{_total} base={offset_base} +{filled_new_bytes}") + last_print_ms = now + last_print_sum = _sum - # 完整条件:填满 total_len - if filled_bytes == total_len: - break - - if total_len is not None and filled_bytes == total_len: - break + # 若是全量请求(offset_base=0 且 total_len==_total),尽早结束 + if offset_base == 0 and total_len == _total: + # 不要用 filled_new_bytes 判断是否完整(可能有重叠) + pass # 5) 清理实例 at(f"AT+MHTTPDEL={httpid}", "OK", 3000) if body_buf is None: - return False, "empty_body" + return False, "empty_body", body_buf, got_ranges, total_len, expect_md5_b64 if total_len is None: - return False, "no_total_len" - if filled_bytes != total_len: - return False, f"incomplete_body got={filled_bytes} expected={total_len}" + return False, "no_total_len", body_buf, got_ranges, total_len, expect_md5_b64 - data = bytes(body_buf) - - # 6) 校验:Content-Length - if expect_len is not None and len(data) != expect_len: - return False, f"length_mismatch got={len(data)} expected={expect_len}" - - # 7) 校验:Content-Md5(base64) - if expect_md5_b64 and hashlib is not None: - md5_b64 = _md5_base64(data) - if md5_b64 != expect_md5_b64: - return False, f"md5_mismatch got={md5_b64} expected={expect_md5_b64}" - - # 8) 写文件(原样 bytes) - with open(filename, "wb") as f: - f.write(data) - - return True, f"OK size={len(data)} ip={ip} code={status_code}" + # 返回“本次尝试是否有实质进展”:Range 补洞时,哪怕不完整也算成功推进 + if filled_new_bytes <= 0: + return False, "no_progress", body_buf, got_ranges, total_len, expect_md5_b64 + return True, f"PARTIAL ok +{filled_new_bytes} ip={ip} code={status_code}", body_buf, got_ranges, total_len, expect_md5_b64 global ota_in_progress ota_in_progress = True with uart4g_lock: try: + # -------- Phase 1: 全量 GET(允许不完整,后面用 Range 补洞)-------- + body_buf = None + got_ranges = set() + total_len = None + expect_md5_b64 = None + last_err = "unknown" for attempt in range(1, retries + 1): - ok, msg = _one_attempt() - if ok: - return True, msg + ok, msg, body_buf, got_ranges, total_len, expect_md5_b64 = _one_attempt( + body_buf=body_buf, got_ranges=got_ranges, total_len=total_len, expect_md5_b64=expect_md5_b64 + ) last_err = msg - # 重试前等待 + 清 UART - _log(f"[RETRY] attempt={attempt} failed={msg}") - _drain_uart(500) - time.sleep_ms(300) - return False, f"FAILED after {retries} retries: {last_err}" + if not ok: + _log(f"[RETRY] full attempt={attempt} failed={msg}") + time.sleep_ms(200) + continue + + gaps, merged = _compute_gaps(total_len, got_ranges) + filled_total = sum(e - s for s, e in merged) + if gaps and gaps[0] == (0, 0): + gaps = [] + if not gaps: + break + _log(f"[GAPS] after full attempt={attempt} filled={filled_total}/{total_len} gaps={gaps[:3]}") + time.sleep_ms(150) + + if body_buf is None or total_len is None: + return False, f"FAILED: {last_err}" + + gaps, merged = _compute_gaps(total_len, got_ranges) + if gaps and gaps[0] == (0, 0): + gaps = [] + + # -------- Phase 2: Range 补洞 -------- + # 限制单次 Range 的最大长度(太大也会触发同样的 UART 压力) + MAX_RANGE_BYTES = 8192 + RANGE_RETRIES_EACH = 2 + MAX_HOLE_ROUNDS = 10 + + round_i = 0 + while gaps and round_i < MAX_HOLE_ROUNDS: + round_i += 1 + # 优先补最大的洞(通常只丢中间一两段) + gaps = sorted(gaps, key=lambda g: g[1] - g[0], reverse=True) + _log(f"[RANGE] round={round_i} gaps={gaps[:3]}") + + progress_any = False + # 每轮最多补前 5 个洞,避免无限循环 + for (gs, ge) in gaps[:5]: + cur = gs + while cur < ge: + sub_end = min(ge, cur + MAX_RANGE_BYTES) + # HTTP Range end is inclusive + rs = cur + re_incl = sub_end - 1 + + before_gaps, before_merged = _compute_gaps(total_len, got_ranges) + before_filled = sum(e - s for s, e in before_merged) + + sub_ok = False + sub_err = "unknown" + for k in range(1, RANGE_RETRIES_EACH + 1): + ok2, msg2, body_buf, got_ranges, total_len, expect_md5_b64 = _one_attempt( + range_start=rs, range_end=re_incl, + body_buf=body_buf, got_ranges=got_ranges, total_len=total_len, expect_md5_b64=expect_md5_b64 + ) + sub_err = msg2 + if ok2: + sub_ok = True + break + _log(f"[RETRY] range {rs}-{re_incl} try={k} failed={msg2}") + time.sleep_ms(120) + + after_gaps, after_merged = _compute_gaps(total_len, got_ranges) + after_filled = sum(e - s for s, e in after_merged) + if after_filled > before_filled: + progress_any = True + + if not sub_ok: + _log(f"[WARN] range {rs}-{re_incl} failed={sub_err}") + + # 小歇一下,给读线程喘息 + time.sleep_ms(80) + cur = sub_end + + gaps, merged = _compute_gaps(total_len, got_ranges) + if gaps and gaps[0] == (0, 0): + gaps = [] + + filled_total = sum(e - s for s, e in merged) + if not gaps: + break + if not progress_any: + # 本轮没有推进,退出避免死循环 + _log(f"[RANGE] no progress in round={round_i}, stop. filled={filled_total}/{total_len}") + break + _log(f"[RANGE] round={round_i} filled={filled_total}/{total_len} gaps={gaps[:3]}") + + # 完整性检查 + gaps, merged = _compute_gaps(total_len, got_ranges) + if gaps and gaps[0] == (0, 0): + gaps = [] + filled_total = sum(e - s for s, e in merged) + if gaps: + return False, f"incomplete_body got={filled_total} expected={total_len} missing={total_len - filled_total} gaps={gaps[:5]}" + + data = bytes(body_buf) + + # 校验:Content-Md5(base64)(若有) + if expect_md5_b64 and hashlib is not None: + md5_b64 = _md5_base64(data) + if md5_b64 != expect_md5_b64: + return False, f"md5_mismatch got={md5_b64} expected={expect_md5_b64}" + + # 写文件(原样 bytes) + with open(filename, "wb") as f: + f.write(data) + + return True, f"OK size={len(data)} ip={_get_ip()} md5={expect_md5_b64 or ''}" finally: ota_in_progress = False @@ -1324,7 +1687,7 @@ def direct_ota_download_via_4g(ota_url): return print(f"[OTA-4G] 开始通过 4G 下载: {ota_url}") - success, msg = download_file_via_4g(ota_url, local_filename) + success, msg = download_file_via_4g(ota_url, local_filename, debug=True) print(f"[OTA-4G] {msg}") if success and "OK" in msg: @@ -1346,7 +1709,7 @@ def direct_ota_download_via_4g(ota_url): def cmd_str(): global DEVICE_ID, PASSWORD - # print("env: ", config.get_env()) + DEVICE_ID = read_device_id() PASSWORD = DEVICE_ID + "." @@ -1365,6 +1728,7 @@ def cmd_str(): cam = camera.Camera(640, 480) # 启动通信与校准线程 + # from tcp_handler import tcp_main _thread.start_new_thread(tcp_main, ()) _thread.start_new_thread(laser_calibration_worker, ()) @@ -1376,7 +1740,10 @@ def cmd_str(): while not app.need_exit(): current_time = time.ticks_ms() # print("压力传感器数值: ", adc_obj.read()) - if adc_obj.read() > ADC_TRIGGER_THRESHOLD: + adc_val = adc_obj.read() + # if adc_val > 2400: + # print(f"adc: {adc_val}") + if adc_val > ADC_TRIGGER_THRESHOLD: diff_ms = current_time-last_adc_trigger if diff_ms<3000: continue @@ -1416,13 +1783,18 @@ def cmd_str(): "y": float(dy) if dy is not None else 200.0, "r": 90.0, "d": round((distance_m or 0.0) * 100), # 距离(厘米) - "m": method + "m": method, + "adc": adc_val } report_data = {"cmd": 1, "data": inner_data} # 射箭事件高优先级入队,由 tcp_main 统一发送 safe_enqueue(report_data, msg_type=2, high=True) print("📤 射箭事件已加入发送队列") + # 闪一下激光(射箭反馈) + # TODO: remove after test done + flash_laser(1000) # 闪300ms,可以根据需要调整时长 + time.sleep_ms(100) else: disp.show(cam.read()) @@ -1451,13 +1823,14 @@ def dump_system_info(): if __name__ == "__main__": # dump_system_info() - try: - import threading - print("threading module:", threading) - print("has Lock:", hasattr(threading, "Lock")) - if hasattr(threading, "Lock"): - print("has lock") - finally: - pass - + # try: + # import threading + # print("threading module:", threading) + # print("has Lock:", hasattr(threading, "Lock")) + # if hasattr(threading, "Lock"): + # print("has lock") + # finally: + # pass + import config + print("env: ", config.get_env()) cmd_str() \ No newline at end of file