diff --git a/main.py b/main.py index 729b974..3480388 100644 --- a/main.py +++ b/main.py @@ -156,13 +156,33 @@ class ATClient: except: return str(self._resp) + def _find_urc_tag(self, tag: bytes): + """ + 只在“真正的 URC 边界”查找 tag,避免误命中 HTTP payload 内容。 + 规则:tag 必须出现在 buffer 开头,或紧跟在 b"\\r\\n" 后面。 + """ + try: + i = 0 + rx = self._rx + while True: + j = rx.find(tag, i) + if j < 0: + return -1 + if j == 0: + return 0 + if j >= 2 and rx[j - 2:j] == b"\r\n": + return j + i = j + 1 + except: + return -1 + def _parse_mipurc_rtcp(self): """ 解析:+MIPURC: "rtcp",,, 之前硬编码 link_id=0 会导致在多连接/重连场景下收不到数据。 """ prefix = b'+MIPURC: "rtcp",' - i = self._rx.find(prefix) + i = self._find_urc_tag(prefix) if i < 0: return False # 丢掉前置噪声 @@ -215,7 +235,7 @@ class ATClient: def _parse_mhttpurc_header(self): tag = b'+MHTTPURC: "header",' - i = self._rx.find(tag) + i = self._find_urc_tag(tag) if i < 0: return False if i > 0: @@ -254,7 +274,7 @@ class ATClient: def _parse_mhttpurc_content(self): tag = b'+MHTTPURC: "content",' - i = self._rx.find(tag) + i = self._find_urc_tag(tag) if i < 0: return False if i > 0: @@ -294,19 +314,26 @@ class ATClient: def _reader_loop(self): while self._running: - d = self.uart.read(4096) + # 关键:UART 驱动偶发 read failed,必须兜住,否则线程挂了 OTA/TCP 都会卡死 + try: + d = self.uart.read(4096) # 8192 在一些驱动上更容易触发 read failed + except Exception as e: + try: + print("[ATClient] uart read failed:", e) + except: + pass + time.sleep_ms(50) + continue + if not d: - time.sleep_ms(2) + time.sleep_ms(1) continue with self._q_lock: - # 统一累积到内部 buffer(用于 URC 解析) self._rx += d - # 命令等待期间,把原始字节流复制到响应缓冲(不影响 URC 解析) if self._waiting: self._resp += d - # 解析 URC:尽可能多地从 _rx 中剥离完整 URC,避免丢包 while True: progressed = ( self._parse_mipurc_rtcp() @@ -316,17 +343,16 @@ class ATClient: if not progressed: break - # 防止 _rx 因为"非 URC 文本/回显"无限增长:保留尾部即可 - # 关键修复:如果 buffer 中有 HTTP URC 标签(说明 OTA 在进行),完全禁用截断 - # 避免在 OTA 下载时截断 buffer 导致数据丢失(之前 16KB 限制太小,导致数据被截断) - has_http_urc = (b'+MHTTPURC: "content"' in self._rx or - b'+MHTTPURC: "header"' in self._rx) - if has_http_urc: - # OTA 下载中:完全禁用 buffer 截断,避免数据丢失 - # 通常 OTA 文件不会超过几百 KB,即使不截断也不会导致内存问题 - pass # 不截断 + try: + ota_flag = int(globals().get("ota_in_progress", 0)) > 0 + except: + ota_flag = False + + has_http_hint = (b"+MHTTP" in self._rx) or (b"+MHTTPURC" in self._rx) + if ota_flag or has_http_hint: + if len(self._rx) > 512 * 1024: + self._rx = self._rx[-256 * 1024:] else: - # 非 OTA 状态:使用较小的 buffer 限制(16KB) if len(self._rx) > 16384: self._rx = self._rx[-4096:] @@ -435,7 +461,9 @@ normal_send_queue = [] # 普通发送队列:电量/校准结果/状态等 queue_lock = _Mutex() # 互斥锁,保护队列 uart4g_lock = _Mutex() # 互斥锁,保护 4G 串口 AT 发送流程(防并发) update_thread_started = False # 防止 OTA 更新线程重复启动 -ota_in_progress = False # OTA(4G HTTP URC) 期间暂停 tcp_main 读取 uart4g,避免吞掉 +MHTTPURC +# OTA(4G HTTP URC) 期间“暂停 TCP 活动/让读线程用大 buffer”: +# 用计数器而不是 bool,避免“外层 OTA 还没结束,内层 downloader finally 又把它置 False”。 +ota_in_progress = 0 # ==================== 工具函数 ==================== @@ -457,6 +485,121 @@ def download_file(url, filename): return f"下载失败!发生未知错误: {e}" +def _download_file_system_bytes(url, filename, timeout_s=25): + """ + 走“系统网络栈”的下载(RNDIS/ECM/eth0 等),用 TCP 自带重传,适合 OTA 大文件。 + 注意:这里写 bytes,不做编码假设。 + """ + try: + print(f"[NET] system download: {url} -> {filename}") + r = requests.get(url, timeout=timeout_s) + r.raise_for_status() + data = r.content + with open(filename, "wb") as f: + f.write(data) + return True, f"OK size={len(data)}" + except Exception as e: + return False, f"system_download_failed: {e}" + + +def _has_default_route(): + try: + out = os.popen("ip route 2>/dev/null").read() + for line in out.splitlines(): + if line.strip().startswith("default "): + return True + return False + except: + return False + + +def _get_if_ipv4(ifname: str): + try: + out = os.popen(f"ip -4 addr show {ifname} 2>/dev/null").read() + m = re.search(r"\binet\s+(\d+\.\d+\.\d+\.\d+)/(\d+)", out) + if not m: + return None + return m.group(1) + except: + return None + + +def _ping_once(ip: str, ifname=None, timeout_s=1): + # busybox ping 可能不支持 -W;做两套尝试 + if ifname: + cmd1 = f"ping -I {ifname} -c 1 -W {timeout_s} {ip} >/dev/null 2>&1" + cmd2 = f"ping -I {ifname} -c 1 {ip} >/dev/null 2>&1" + else: + cmd1 = f"ping -c 1 -W {timeout_s} {ip} >/dev/null 2>&1" + cmd2 = f"ping -c 1 {ip} >/dev/null 2>&1" + rc = os.system(cmd1) + if rc == 0: + return True + rc = os.system(cmd2) + return rc == 0 + + +def ensure_ml307r_dialup_and_route(prefer_if=("usb0", "usb1"), metric=200, debug=True): + """ + 用 ML307R 的 RNDIS/ECM 方式把“系统网络”拉起来: + - AT+MDIALUP=1,1(拨号) + - 为 usb0/usb1 添加 default route(不改 IP,尽量不影响现有管理网段) + 目标:让 requests.get() 能直接下载 OTA,避免 UART URC 丢包。 + """ + def _dlog(*a): + if debug: + print("[DIALUP]", *a) + + # 1) 拨号(重复执行也安全) + try: + with uart4g_lock: + at('AT+MDIALUPCFG="mode"', "OK", 2000) + r = at("AT+MDIALUP=1,1", "OK", 20000) + _dlog("MDIALUP resp:", r) + except Exception as e: + _dlog("MDIALUP exception:", e) + + # 2) 如果已经有 default route,直接返回 + if _has_default_route(): + _dlog("default route already exists") + return True + + # 3) 尝试在 usb0/usb1 上猜测网关并加默认路由 + for ifname in prefer_if: + ip = _get_if_ipv4(ifname) + if not ip: + continue + + parts = ip.split(".") + if len(parts) != 4: + continue + base = ".".join(parts[:3]) + last = int(parts[3]) + + # 常见网关候选:.2 / .254 / .1(跳过自己) + candidates = [] + if last != 2: + candidates.append(f"{base}.2") + if last != 254: + candidates.append(f"{base}.254") + if last != 1: + candidates.append(f"{base}.1") + + for gw in candidates: + if gw == ip: + continue + # ping 一下让 ARP/neigh 建立(不一定通,但很多系统会因此学到邻居) + _ping_once(gw, ifname=ifname, timeout_s=1) + # 直接尝试加默认路由(若已存在会失败但无害) + os.system(f"ip route add default via {gw} dev {ifname} metric {metric} 2>/dev/null") + if _has_default_route(): + _dlog("default route set:", gw, "dev", ifname) + _dlog(os.popen("ip route 2>/dev/null").read().strip()) + return True + + _dlog("failed to set default route. ip route:", os.popen("ip route 2>/dev/null").read().strip()) + return False + def is_server_reachable(host, port=80, timeout=5): """检查目标主机端口是否可达(用于 OTA 前网络检测)""" try: @@ -1023,6 +1166,14 @@ def tcp_main(): global tcp_connected, high_send_queue, normal_send_queue, queue_lock, laser_calibration_active, laser_calibration_result, laser_calibration_lock, update_thread_started send_hartbeat_fail_count = 0 while not app.need_exit(): + # OTA 期间不要 connect/登录/心跳/发送,避免与 HTTP URC 下载抢 uart4g_lock 导致心跳超时被服务器断开 + try: + if ota_in_progress: + time.sleep_ms(200) + continue + except: + pass + if not connect_server(): time.sleep_ms(5000) continue @@ -1039,6 +1190,14 @@ def tcp_main(): last_heartbeat_ack_time = time.ticks_ms() last_heartbeat_send_time = time.ticks_ms() while True: + # OTA 期间暂停 TCP 活动(不发心跳、不发业务),让下载独占 4G 串口 + try: + if ota_in_progress: + time.sleep_ms(200) + continue + except: + pass + # 接收数据(唯一来源:ATClient 解析后的 TCP payload 队列) item = at_client.pop_tcp_payload() if item: @@ -1257,6 +1416,14 @@ def laser_calibration_worker(): """后台线程:持续检测是否需要执行激光校准""" global laser_calibration_active, laser_calibration_result, laser_calibration_lock while True: + # OTA 期间尽量省电:暂停后台校准(会占用 Camera) + try: + if int(globals().get("ota_in_progress", 0)) > 0: + time.sleep_ms(200) + continue + except: + pass + if laser_calibration_active: # 关键:不要在每次尝试里反复 new Camera(会导致 MMF 反复初始化刷屏) cam = None @@ -1296,7 +1463,7 @@ def laser_calibration_worker(): -def download_file_via_4g(url, filename, +def download_file_via_4g_legacy(url, filename, total_timeout_ms=30000, retries=3, debug=False): @@ -1444,17 +1611,34 @@ def download_file_via_4g(url, filename, if got_ranges is None: got_ranges = set() filled_new_bytes = 0 + # last_sum/resp_total 用于判断“本次 HTTP 响应体”是否接收完成(尤其是 Range 场景) last_sum = 0 + resp_total = None no_progress_count = 0 # 连续没有进展的次数 last_print_ms = time.ticks_ms() last_print_sum = 0 + # Range 补洞不需要等太久,避免卡死;全量下载用总超时 + attempt_timeout_ms = total_timeout_ms + if range_start is not None and range_end is not None: + attempt_timeout_ms = min(total_timeout_ms, 8000) + t0 = time.ticks_ms() - while time.ticks_ms() - t0 < total_timeout_ms: + while time.ticks_ms() - t0 < attempt_timeout_ms: ev = at_client.pop_http_event() if not ev: # 如果 sum 已经达到 total_len,但仍有 gaps,等待更长时间(有些分片可能延迟到达) - if total_len and last_sum >= total_len: + # 对 Range:last_sum 只会到 resp_total(比如 686/774),不能拿 total_len(59776) 比 + if resp_total and last_sum >= resp_total: + # 本次响应体应该收齐了,继续等一小会儿(防止最后一个 URC 延迟),然后退出循环 + time.sleep_ms(30) + no_progress_count += 1 + if no_progress_count > 30: + break + continue + + # 全量模式:如果模块宣称 sum 已经达到 total_len,但仍有 gaps,稍微多等 + if (range_start is None and range_end is None) and total_len and last_sum >= total_len: gaps_now, merged_now = _compute_gaps(total_len, got_ranges) if gaps_now and not (len(gaps_now) == 1 and gaps_now[0] == (0, 0)): time.sleep_ms(50) @@ -1463,7 +1647,10 @@ def download_file_via_4g(url, filename, else: time.sleep_ms(5) no_progress_count += 1 - # 如果长时间没有新事件,且 sum 已经达到 total_len,认为接收完成(可能有丢包) + # Range:如果长时间没有事件也结束(让上层重试) + if range_start is not None and range_end is not None and no_progress_count > 200: + break + # 全量:如果长时间没有新事件,且 sum 已经达到 total_len,认为接收完成(可能有丢包) if no_progress_count > 100 and total_len and last_sum >= total_len: break continue @@ -1495,6 +1682,10 @@ def download_file_via_4g(url, filename, if body_buf is None and total_len: body_buf = bytearray(total_len) + # 对 Range 响应:优先使用 Content-Length 作为本次响应体长度 + if expect_len is not None: + resp_total = expect_len + _log(f"[HDR] id={hid} code={code} len={expect_len} md5={expect_md5_b64}") continue @@ -1515,6 +1706,10 @@ def download_file_via_4g(url, filename, if body_buf is None or total_len is None: continue + # 若 header 没给 Content-Length,就用 content 的 _total 作为本次响应体长度(Range 场景下通常是这次 body 的长度) + if resp_total is None: + resp_total = _total + rel_start = _sum - _cur rel_end = _sum abs_start = offset_base + rel_start @@ -1553,6 +1748,12 @@ def download_file_via_4g(url, filename, # 不要用 filled_new_bytes 判断是否完整(可能有重叠) pass + # Range:本次响应体已收齐,退出,交给上层判断是否补上了缺口 + if resp_total is not None and last_sum >= resp_total: + # 给一点时间让可能的尾部事件入队,然后退出 + time.sleep_ms(10) + break + # 5) 清理实例 at(f"AT+MHTTPDEL={httpid}", "OK", 3000) @@ -1567,7 +1768,10 @@ def download_file_via_4g(url, filename, return True, f"PARTIAL ok +{filled_new_bytes} ip={ip} code={status_code}", body_buf, got_ranges, total_len, expect_md5_b64 global ota_in_progress - ota_in_progress = True + try: + ota_in_progress = int(ota_in_progress) + 1 + except: + ota_in_progress = 1 with uart4g_lock: try: # -------- Phase 1: 全量 GET(允许不完整,后面用 Range 补洞)-------- @@ -1604,12 +1808,21 @@ def download_file_via_4g(url, filename, gaps = [] # -------- Phase 2: Range 补洞 -------- - # 限制单次 Range 的最大长度(太大也会触发同样的 UART 压力) - MAX_RANGE_BYTES = 8192 - RANGE_RETRIES_EACH = 2 - MAX_HOLE_ROUNDS = 10 + # 说明: + # - “全量 GET 多次重试 + 合并已收到分片”我们已经在 Phase1 做了(got_ranges/body_buf 会跨 attempt 累积)。 + # - 仍存在 gaps 说明:这些字节段在全量阶段始终没收到,需要靠 Range 反复补洞。 + # + # 策略: + # - Range 分块更小(更稳),失败时继续“二分缩小”到 MIN_RANGE_BYTES; + # - 不要因为某一轮 no_progress 就立刻退出(UART 偶发丢 URC,需要多轮撞上一次成功)。 + MAX_RANGE_BYTES = 1024 + MIN_RANGE_BYTES = 128 + RANGE_RETRIES_EACH = 8 + MAX_HOLE_ROUNDS = 50 + NO_PROGRESS_ROUNDS_LIMIT = 8 round_i = 0 + no_progress_rounds = 0 while gaps and round_i < MAX_HOLE_ROUNDS: round_i += 1 # 优先补最大的洞(通常只丢中间一两段) @@ -1620,8 +1833,9 @@ def download_file_via_4g(url, filename, # 每轮最多补前 5 个洞,避免无限循环 for (gs, ge) in gaps[:5]: cur = gs + chunk = MAX_RANGE_BYTES while cur < ge: - sub_end = min(ge, cur + MAX_RANGE_BYTES) + sub_end = min(ge, cur + chunk) # HTTP Range end is inclusive rs = cur re_incl = sub_end - 1 @@ -1641,19 +1855,28 @@ def download_file_via_4g(url, filename, sub_ok = True break _log(f"[RETRY] range {rs}-{re_incl} try={k} failed={msg2}") - time.sleep_ms(120) + time.sleep_ms(150) after_gaps, after_merged = _compute_gaps(total_len, got_ranges) after_filled = sum(e - s for s, e in after_merged) if after_filled > before_filled: progress_any = True - - if not sub_ok: - _log(f"[WARN] range {rs}-{re_incl} failed={sub_err}") + # 成功推进:恢复到较大 chunk,加快补洞 + chunk = MAX_RANGE_BYTES + cur = sub_end + else: + # 没推进:缩小 chunk,继续在同一位置重试;不要前进 cur + if chunk > MIN_RANGE_BYTES: + chunk = max(MIN_RANGE_BYTES, chunk // 2) + _log(f"[RANGE] shrink chunk -> {chunk} at pos={cur}") + else: + # 已经很小还不行:本轮先放弃这个位置,留给下一轮再撞 + if not sub_ok: + _log(f"[WARN] range {rs}-{re_incl} failed={sub_err}") + break # 小歇一下,给读线程喘息 - time.sleep_ms(80) - cur = sub_end + time.sleep_ms(120) gaps, merged = _compute_gaps(total_len, got_ranges) if gaps and gaps[0] == (0, 0): @@ -1663,9 +1886,16 @@ def download_file_via_4g(url, filename, if not gaps: break if not progress_any: - # 本轮没有推进,退出避免死循环 - _log(f"[RANGE] no progress in round={round_i}, stop. filled={filled_total}/{total_len}") - break + no_progress_rounds += 1 + _log(f"[RANGE] no progress in round={round_i} ({no_progress_rounds}/{NO_PROGRESS_ROUNDS_LIMIT}) filled={filled_total}/{total_len}") + # 多轮无进展才退出(避免偶发“只 header 无 content URC”导致过早退出) + if no_progress_rounds >= NO_PROGRESS_ROUNDS_LIMIT: + break + # 退避等待一下再继续下一轮 + time.sleep_ms(500) + continue + else: + no_progress_rounds = 0 _log(f"[RANGE] round={round_i} filled={filled_total}/{total_len} gaps={gaps[:3]}") # 完整性检查 @@ -1690,34 +1920,493 @@ def download_file_via_4g(url, filename, return True, f"OK size={len(data)} ip={_get_ip()} md5={expect_md5_b64 or ''}" finally: - ota_in_progress = False + try: + ota_in_progress = max(0, int(ota_in_progress) - 1) + except: + ota_in_progress = 0 + + +def download_file_via_4g(url, filename, + total_timeout_ms=600000, + retries=3, + debug=True): + """ + ML307R HTTP 下载(更稳的“固定小块 Range 顺序下载”): + - 只依赖 +MHTTPURC:"header"/"content"(不依赖 MHTTPREAD/cached) + - 每次只请求一个小块 Range(默认 1024B),失败就重试同一块,必要时缩小块大小 + - 每个 chunk 都重新 MHTTPCREATE/MHTTPREQUEST,避免卡在“206 header 但不吐 content”的坏状态 + """ + from urllib.parse import urlparse + + # 小块策略(可按现场再调) + # - chunk 越小越稳(URC 压力更小),代价是请求次数更多 + CHUNK_MAX = 10240 + CHUNK_MIN = 128 + CHUNK_RETRIES = 12 + FRAG_SIZE = 1024 # 0-1024 + FRAG_DELAY = 10 # 0-2000 ms + + t_func0 = time.ticks_ms() + + parsed = urlparse(url) + host = parsed.hostname + path = parsed.path or "/" + if not host: + return False, "bad_url (no host)" + + # 很多 ML307R 的 MHTTP 对 https 不稳定;对已知域名做降级 + if isinstance(url, str) and url.startswith("https://static.shelingxingqiu.com/"): + base_url = "http://static.shelingxingqiu.com" + else: + base_url = f"http://{host}" + + def _log(*a): + if debug: + print(*a) + + def _pwr_log(prefix=""): + """debug 用:输出电压/电量,用于判断是否掉压导致 4G/USB 异常""" + if not debug: + return + try: + v = get_bus_voltage() + p = voltage_to_percent(v) + print(f"[PWR]{prefix} v={v:.3f}V p={p}%") + except Exception as e: + try: + print(f"[PWR]{prefix} read_failed: {e}") + except: + pass + + def _clear_http_events(): + while at_client.pop_http_event() is not None: + pass + + def _parse_httpid(resp: str): + m = re.search(r"\+MHTTPCREATE:\s*(\d+)", resp) + return int(m.group(1)) if m else None + + def _get_ip(): + r = at("AT+CGPADDR=1", "OK", 3000) + m = re.search(r'\+CGPADDR:\s*1,"([^"]+)"', r) + return m.group(1) if m else "" + + def _ensure_pdp(): + ip = _get_ip() + if ip and ip != "0.0.0.0": + return True, ip + at("AT+MIPCALL=1,1", "OK", 15000) + for _ in range(10): + ip = _get_ip() + if ip and ip != "0.0.0.0": + return True, ip + time.sleep(1) + return False, ip + + def _extract_hdr_fields(hdr_text: str): + mlen = re.search(r"Content-Length:\s*(\d+)", hdr_text, re.IGNORECASE) + clen = int(mlen.group(1)) if mlen else None + mmd5 = re.search(r"Content-Md5:\s*([A-Za-z0-9+/=]+)", hdr_text, re.IGNORECASE) + md5_b64 = mmd5.group(1).strip() if mmd5 else None + return clen, md5_b64 + + def _extract_content_range(hdr_text: str): + m = re.search(r"Content-Range:\s*bytes\s*(\d+)\s*-\s*(\d+)\s*/\s*(\d+)", hdr_text, re.IGNORECASE) + if not m: + return None, None, None + try: + return int(m.group(1)), int(m.group(2)), int(m.group(3)) + except: + return None, None, None + + def _hard_reset_http(): + """ + 模块进入“坏状态”时的保守清场: + - 清空 ATClient 的事件队列,避免串台 + - 删除 0..5 的 httpid(常见固件槽位范围),尽量把内部 HTTP 状态机拉回干净 + 注意:很慢,所以只在连续异常时调用。 + """ + _clear_http_events() + for i in range(0, 6): + try: + at(f"AT+MHTTPDEL={i}", "OK", 1200) + except: + pass + _clear_http_events() + + def _create_httpid(full_reset=False): + _clear_http_events() + if full_reset: + _hard_reset_http() + resp = at(f'AT+MHTTPCREATE="{base_url}"', "OK", 8000) + hid = _parse_httpid(resp) + return hid, resp + + def _fetch_range_into_buf(start, want_len, out_buf, full_reset=False): + """ + 请求 Range [start, start+want_len) ,写入 out_buf(bytearray,长度=want_len) + 返回 (ok, msg, total_len, md5_b64, got_len) + """ + end_incl = start + want_len - 1 + hid, cresp = _create_httpid(full_reset=full_reset) + if hid is None: + return False, f"MHTTPCREATE failed: {cresp}", None, None, 0 + + # 降低 URC 压力(分片/延迟) + at(f'AT+MHTTPCFG="fragment",{hid},{FRAG_SIZE},{FRAG_DELAY}', "OK", 1500) + # 设置 Range header(inclusive) + at(f'AT+MHTTPCFG="header",{hid},"Range: bytes={start}-{end_incl}"', "OK", 3000) + + req = at(f'AT+MHTTPREQUEST={hid},1,0,"{path}"', "OK", 15000) + if "ERROR" in req or "CME ERROR" in req: + at(f"AT+MHTTPDEL={hid}", "OK", 2000) + return False, f"MHTTPREQUEST failed: {req}", None, None, 0 + + # 等 header + content + # 注意:部分 ML307R 固件会把 header 分成多条 +MHTTPURC:"header" 分片吐出来, + # 其中有的分片只有 Content-Length,有的只有 Content-Range。 + # 因此这里需要做“累积解析”,否则会出现 resp_total=None -> no_header_or_total。 + hdr_text = None + hdr_accum = "" + code = None + resp_total = None + total_len = None + md5_b64 = None + + got_ranges = set() + last_sum = 0 + t0 = time.ticks_ms() + # Range 场景不宜等待太久,卡住就换 hid 重来 + timeout_ms = 9000 + logged_hdr = False + + while time.ticks_ms() - t0 < timeout_ms: + ev = at_client.pop_http_event() + if not ev: + time.sleep_ms(5) + continue + + if ev[0] == "header": + _, ehid, ecode, ehdr = ev + if ehid != hid: + continue + code = ecode + hdr_text = ehdr + # 累积 header 文本并从累积内容里提取字段(避免 split header 丢字段) + if ehdr: + hdr_accum = (hdr_accum + "\n" + ehdr) if hdr_accum else ehdr + + resp_total_tmp, md5_tmp = _extract_hdr_fields(hdr_accum) + if md5_tmp: + md5_b64 = md5_tmp + cr_s, cr_e, cr_total = _extract_content_range(hdr_accum) + if cr_total is not None: + total_len = cr_total + # 有些 header 没有 Content-Length,但有 Content-Range(206),可由 range 计算出本次 body 长度 + if resp_total_tmp is not None: + resp_total = resp_total_tmp + elif resp_total is None and (cr_s is not None) and (cr_e is not None) and (cr_e >= cr_s): + resp_total = (cr_e - cr_s + 1) + # 206 才是 Range 正常响应;部分服务器可能忽略 Range 返回 200 + # 节流:每个 hid 只打一次 header(否则你会看到连续 3-4 条 [HDR],且很多 cr=None) + if (not logged_hdr) and (resp_total is not None or total_len is not None): + _log(f"[HDR] id={hid} code={code} clen={resp_total} cr={cr_s}-{cr_e}/{cr_total}") + logged_hdr = True + continue + + if ev[0] == "content": + _, ehid, _total, _sum, _cur, payload = ev + if ehid != hid: + continue + if resp_total is None: + resp_total = _total + if resp_total is None or resp_total <= 0: + continue + start_rel = _sum - _cur + end_rel = _sum + if start_rel < 0 or start_rel >= resp_total: + continue + if end_rel > resp_total: + end_rel = resp_total + actual_len = min(len(payload), end_rel - start_rel) + if actual_len <= 0: + continue + out_buf[start_rel:start_rel + actual_len] = payload[:actual_len] + got_ranges.add((start_rel, start_rel + actual_len)) + if _sum > last_sum: + last_sum = _sum + # 进度日志节流 + if debug and (last_sum >= resp_total or (last_sum % 512 == 0)): + _log(f"[CHUNK] {start}+{last_sum}/{resp_total}") + + # 收齐就退出 + if last_sum >= resp_total: + break + + # 清理实例(快路径:只删当前 hid) + try: + at(f"AT+MHTTPDEL={hid}", "OK", 2000) + except: + pass + + if resp_total is None: + return False, "no_header_or_total", total_len, md5_b64, 0 + + # 计算实际填充长度 + merged = sorted(got_ranges) + filled = 0 + prev = 0 + for s, e in merged: + if e <= s: + continue + if s > prev: + # 有洞 + pass + prev = max(prev, e) + # 重新合并算 filled + merged2 = [] + for s, e in merged: + if not merged2 or s > merged2[-1][1]: + merged2.append((s, e)) + else: + merged2[-1] = (merged2[-1][0], max(merged2[-1][1], e)) + filled = sum(e - s for s, e in merged2) + + if filled < resp_total: + return False, f"incomplete_chunk got={filled} expected={resp_total} code={code}", total_len, md5_b64, filled + + got_len = resp_total + # 如果服务器忽略 Range 返回 200,resp_total 可能是整文件,这里允许 want_len 不匹配 + return True, "OK", total_len, md5_b64, got_len + + global ota_in_progress + try: + ota_in_progress = int(ota_in_progress) + 1 + except: + ota_in_progress = 1 + with uart4g_lock: + try: + ok_pdp, ip = _ensure_pdp() + if not ok_pdp: + return False, f"PDP not ready (ip={ip})" + + # 先清空旧事件,避免串台 + _clear_http_events() + + # 为了支持随机写入,先创建空文件 + try: + with open(filename, "wb") as f: + f.write(b"") + except Exception as e: + return False, f"open_file_failed: {e}" + + total_len = None + expect_md5_b64 = None + + offset = 0 + chunk = CHUNK_MAX + t_start = time.ticks_ms() + last_progress_ms = t_start + STALL_TIMEOUT_MS = 60000 # 60s 没有任何 offset 推进则判定卡死 + last_pwr_ms = t_start + _pwr_log(prefix=" ota_start") + bad_http_state = 0 # 连续“疑似模块 HTTP 坏状态”的计数,达到阈值才做 full reset + + while True: + now = time.ticks_ms() + # debug:每 5 秒打印一次电压/电量 + 进度 + if debug and time.ticks_diff(now, last_pwr_ms) >= 5000: + last_pwr_ms = now + _pwr_log(prefix=f" off={offset}/{total_len or '?'}") + # 超时保护:整体(很宽,避免“慢但在推进”的情况误判失败) + if time.ticks_diff(now, t_start) > total_timeout_ms: + return False, f"timeout overall after {total_timeout_ms}ms offset={offset} total={total_len}" + + # 超时保护:无进展(关键) + if time.ticks_diff(now, last_progress_ms) > STALL_TIMEOUT_MS: + return False, f"timeout stalled {STALL_TIMEOUT_MS}ms offset={offset} total={total_len}" + + if total_len is not None and offset >= total_len: + break + + want = chunk + if total_len is not None: + remain = total_len - offset + if remain <= 0: + break + if want > remain: + want = remain + + # 本 chunk 的 buffer(长度=want) + buf = bytearray(want) + + success = False + last_err = "unknown" + md5_seen = None + got_len = 0 + for k in range(1, CHUNK_RETRIES + 1): + # 只有在连续坏状态时才 full reset,否则只删当前 hid(更快) + do_full_reset = (bad_http_state >= 2) + ok, msg, tlen, md5_b64, got = _fetch_range_into_buf(offset, want, buf, full_reset=do_full_reset) + last_err = msg + if tlen is not None and total_len is None: + total_len = tlen + if md5_b64 and not expect_md5_b64: + expect_md5_b64 = md5_b64 + if ok: + success = True + got_len = got + bad_http_state = 0 + break + + # 判定是否属于“模块 HTTP 坏状态”(header-only/no header/request err 等) + try: + if ("no_header_or_total" in msg) or ("MHTTPREQUEST failed" in msg) or ("MHTTPCREATE failed" in msg): + bad_http_state += 1 + else: + bad_http_state = max(0, bad_http_state - 1) + except: + pass + + # 失败:缩小 chunk,提高成功率 + if chunk > CHUNK_MIN: + chunk = max(CHUNK_MIN, chunk // 2) + want = min(chunk, want) + buf = bytearray(want) + _log(f"[RETRY] off={offset} want={want} try={k} err={msg}") + _pwr_log(prefix=f" retry{k} off={offset}") + time.sleep_ms(120) + + if not success: + return False, f"chunk_failed off={offset} want={want} err={last_err} total={total_len}" + + # 写入文件(注意:got_len 可能 > want(服务器忽略 Range 返回 200)) + # 只写入当前请求的 want 字节(buf),避免越界 + try: + with open(filename, "r+b") as f: + f.seek(offset) + f.write(bytes(buf)) + except Exception as e: + return False, f"write_failed off={offset}: {e}" + + offset += len(buf) + last_progress_ms = time.ticks_ms() + # 成功推进后恢复 chunk + chunk = CHUNK_MAX + if debug: + _log(f"[OK] offset={offset}/{total_len or '?'}") + + # 可选:如果有 Content-Md5 且 hashlib 可用,做校验(Range 响应未必会提供 md5) + if expect_md5_b64 and hashlib is not None: + try: + with open(filename, "rb") as f: + data = f.read() + digest = hashlib.md5(data).digest() + got_b64 = binascii.b2a_base64(digest).decode().strip() + if got_b64 != expect_md5_b64: + return False, f"md5_mismatch got={got_b64} expected={expect_md5_b64}" + except Exception as e: + return False, f"md5_check_failed: {e}" + + t_cost = time.ticks_diff(time.ticks_ms(), t_func0) + return True, f"OK size={offset} ip={ip} cost_ms={t_cost}" + finally: + try: + ota_in_progress = max(0, int(ota_in_progress) - 1) + except: + ota_in_progress = 0 def direct_ota_download_via_4g(ota_url): """通过 4G 模块下载 OTA(不需要 Wi-Fi)""" - global update_thread_started + global update_thread_started, ota_in_progress, tcp_connected try: + t_ota0 = time.ticks_ms() if not ota_url: safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2) return + # OTA 全程暂停 TCP(避免心跳/重连抢占 uart4g_lock,导致 server 断链 + HTTP URC 更容易丢) + try: + ota_in_progress = int(ota_in_progress) + 1 + except: + ota_in_progress = 1 + + # 主动断开 AT TCP,减少 +MIPURC 噪声干扰 HTTP URC 下载 + tcp_connected = False + try: + with uart4g_lock: + at("AT+MIPCLOSE=0", "OK", 1500) + except: + pass + print(f"[OTA-4G] 开始通过 4G 下载: {ota_url}") - success, msg = download_file_via_4g(ota_url, local_filename, debug=True) - print(f"[OTA-4G] {msg}") - - if success and "OK" in msg: - # 下载成功:备份+替换+重启 + # 重要说明: + # - AT+MDIALUP / RNDIS 是“USB 主机拨号上网”模式,在不少 ML307R 固件上会占用/切换内部网络栈, + # 从而导致 AT+MIPOPEN / +MIPURC 这套 TCP 连接无法工作(你会看到一直“连接到服务器...”)。 + # - 这个设备当前 4G 是走 UART + AT Socket(MIPOPEN),并没有把 4G 变成系统网卡(如 ppp0)。 + # 因此这里不再自动拨号/改路由;只有当系统本来就有 default route(例如 eth0 已联网)时,才尝试走 requests 下载。 + + ok_sys = False + msg_sys = "" + try: + if _has_default_route(): + # 1) 先试原始 URL(可能是 https) + ok_sys, msg_sys = _download_file_system_bytes(ota_url, local_filename, timeout_s=30) + if (not ok_sys) and isinstance(ota_url, str) and ota_url.startswith("https://static.shelingxingqiu.com/"): + # 2) 部分系统 SSL 不完整:对固定域名降级到 http 再试一次 + http_url = "http://" + ota_url[len("https://"):] + ok_sys, msg_sys = _download_file_system_bytes(http_url, local_filename, timeout_s=30) + else: + ok_sys = False + msg_sys = "no_default_route (system network not available)" + except Exception as e: + ok_sys = False + msg_sys = f"system_download_exception: {e}" + + if ok_sys: + print(f"[OTA-4G] system {msg_sys}") if apply_ota_and_reboot(ota_url): - return # 会重启,不会执行到 finally + return + + print(f"[OTA-4G] system failed: {msg_sys} -> fallback to URC download") + # debug:进入 URC 下载前打印一次电压/电量(防止下载太快看不到 [PWR] 周期日志) + try: + v = get_bus_voltage() + p = voltage_to_percent(v) + print(f"[OTA-4G][PWR] before_urc v={v:.3f}V p={p}%") + except Exception as e: + print(f"[OTA-4G][PWR] before_urc read_failed: {e}") + + t_dl0 = time.ticks_ms() + success, msg = download_file_via_4g(ota_url, local_filename, debug=True) + t_dl_cost = time.ticks_diff(time.ticks_ms(), t_dl0) + print(f"[OTA-4G] {msg}") + print(f"[OTA-4G] download_cost_ms={t_dl_cost}") + + if success and "OK" in msg: + if apply_ota_and_reboot(ota_url): + return else: - safe_enqueue({"result": msg}, 2) + safe_enqueue({"result": msg_sys or msg}, 2) except Exception as e: error_msg = f"OTA-4G 异常: {str(e)}" print(error_msg) safe_enqueue({"result": "ota_failed", "reason": error_msg}, 2) finally: + # 总耗时(注意:若成功并 reboot,这行可能来不及打印) + try: + t_cost = time.ticks_diff(time.ticks_ms(), t_ota0) + print(f"[OTA-4G] total_cost_ms={t_cost}") + except: + pass update_thread_started = False + # 对应上面的 ota_in_progress +1 + try: + ota_in_progress = max(0, int(ota_in_progress) - 1) + except: + ota_in_progress = 0 # ==================== 主程序入口 ==================== @@ -1754,6 +2443,14 @@ def cmd_str(): # 主循环:检测扳机触发 → 拍照 → 分析 → 上报 while not app.need_exit(): current_time = time.ticks_ms() + # OTA 期间尽量省电:暂停相机预览/拍照/分析,只保留最低频率的循环 + try: + if int(globals().get("ota_in_progress", 0)) > 0: + time.sleep_ms(250) + continue + except: + pass + # print("压力传感器数值: ", adc_obj.read()) adc_val = adc_obj.read() # if adc_val > 2400: