import re import hashlib import binascii from maix import time from power import get_bus_voltage, voltage_to_percent from urllib.parse import urlparse from hardware import hardware_manager class DownloadManager4G: """4g下载管理器(单例)""" _instance = None def __new__(cls): if cls._instance is None: cls._instance = super(DownloadManager4G, cls).__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return # 私有状态 self.FRAG_SIZE = 1024 self.FRAG_DELAY = 10 self._initialized = True def _log(self, *a): if debug: self.logger.debug(" ".join(str(x) for x in a)) def _pwr_log(self, prefix=""): """debug 用:输出电压/电量""" if not debug: return try: v = get_bus_voltage() p = voltage_to_percent(v) self.logger.debug(f"[PWR]{prefix} v={v:.3f}V p={p}%") except Exception as e: try: self.logger.debug(f"[PWR]{prefix} read_failed: {e}") except: pass def _clear_http_events(self): if hardware_manager.at_client: while hardware_manager.at_client.pop_http_event() is not None: pass def _parse_httpid(self, resp: str): m = re.search(r"\+MHTTPCREATE:\s*(\d+)", resp) return int(m.group(1)) if m else None def _get_ip(self, ): r = hardware_manager.at_client.send("AT+CGPADDR=1", "OK", 3000) m = re.search(r'\+CGPADDR:\s*1,"([^"]+)"', r) return m.group(1) if m else "" def _ensure_pdp(self, ): ip = self._get_ip() if ip and ip != "0.0.0.0": return True, ip hardware_manager.at_client.send("AT+MIPCALL=1,1", "OK", 15000) for _ in range(10): ip = self._get_ip() if ip and ip != "0.0.0.0": return True, ip time.sleep(1) return False, ip def _extract_hdr_fields(self, hdr_text: str): mlen = re.search(r"Content-Length:\s*(\d+)", hdr_text, re.IGNORECASE) clen = int(mlen.group(1)) if mlen else None mmd5 = re.search(r"Content-Md5:\s*([A-Za-z0-9+/=]+)", hdr_text, re.IGNORECASE) md5_b64 = mmd5.group(1).strip() if mmd5 else None return clen, md5_b64 def _extract_content_range(self, hdr_text: str): m = re.search(r"Content-Range:\s*bytes\s*(\d+)\s*-\s*(\d+)\s*/\s*(\d+)", hdr_text, re.IGNORECASE) if not m: return None, None, None try: return int(m.group(1)), int(m.group(2)), int(m.group(3)) except: return None, None, None def _hard_reset_http(self, ): """模块进入"坏状态"时的保守清场""" self._clear_http_events() for i in range(0, 6): try: hardware_manager.at_client.send(f"AT+MHTTPDEL={i}", "OK", 1200) except: pass self._clear_http_events() def _create_httpid(self, full_reset=False): self._clear_http_events() if hardware_manager.at_client: hardware_manager.at_client.flush() if full_reset: self._hard_reset_http() resp = hardware_manager.at_client.send(f'AT+MHTTPCREATE="{base_url}"', "OK", 8000) hid = self._parse_httpid(resp) if self._is_https: resp = hardware_manager.at_client.send(f'AT+MHTTPCFG="ssl",{hid},1,1', "OK", 2000) if "ERROR" in resp or "CME ERROR" in resp: self.logger.error(f"MHTTPCFG SSL failed: {resp}") # 尝试https 降级到http downgraded_base_url = base_url.replace("https://", "http://") resp = hardware_manager.at_client.send(f'AT+MHTTPCREATE="{downgraded_base_url}"', "OK", 8000) hid = self._parse_httpid(resp) return hid, resp def _fetch_range_into_buf(self, start, want_len, out_buf, path, full_reset=False): """ 请求 Range [start, start+want_len),写入 out_buf(bytearray,长度=want_len) 返回 (ok, msg, total_len, md5_b64, got_len) """ end_incl = start + want_len - 1 hid, cresp = self._create_httpid(full_reset=full_reset) if hid is None: return False, f"MHTTPCREATE failed: {cresp}", None, None, 0 # 降低 URC 压力(分片/延迟) hardware_manager.at_client.send(f'AT+MHTTPCFG="fragment",{hid},{self.FRAG_SIZE},{self.FRAG_DELAY}', "OK", 1500) # 设置 Range header(inclusive) hardware_manager.at_client.send(f'AT+MHTTPCFG="header",{hid},"Range: bytes={start}-{end_incl}"', "OK", 3000) req = hardware_manager.at_client.send(f'AT+MHTTPREQUEST={hid},1,0,"{path}"', "OK", 15000) if "ERROR" in req or "CME ERROR" in req: hardware_manager.at_client.send(f"AT+MHTTPDEL={hid}", "OK", 2000) return False, f"MHTTPREQUEST failed: {req}", None, None, 0 # 等 header + content hdr_text = None hdr_accum = "" code = None resp_total = None total_len = None md5_b64 = None got_ranges = set() last_sum = 0 t0 = time.ticks_ms() timeout_ms = 9000 logged_hdr = False while time.ticks_ms() - t0 < timeout_ms: ev = hardware_manager.at_client.pop_http_event() if hardware_manager.at_client else None if not ev: time.sleep_ms(5) continue if ev[0] == "header": _, ehid, ecode, ehdr = ev if ehid != hid: continue code = ecode hdr_text = ehdr if ehdr: hdr_accum = (hdr_accum + "\n" + ehdr) if hdr_accum else ehdr resp_total_tmp, md5_tmp = self._extract_hdr_fields(hdr_accum) if md5_tmp: md5_b64 = md5_tmp cr_s, cr_e, cr_total = self._extract_content_range(hdr_accum) if cr_total is not None: total_len = cr_total if resp_total_tmp is not None: resp_total = resp_total_tmp elif resp_total is None and (cr_s is not None) and (cr_e is not None) and (cr_e >= cr_s): resp_total = (cr_e - cr_s + 1) if (not logged_hdr) and (resp_total is not None or total_len is not None): self._log(f"[HDR] id={hid} code={code} clen={resp_total} cr={cr_s}-{cr_e}/{cr_total}") logged_hdr = True continue if ev[0] == "content": _, ehid, _total, _sum, _cur, payload = ev if ehid != hid: continue if resp_total is None: resp_total = _total if resp_total is None or resp_total <= 0: continue start_rel = _sum - _cur end_rel = _sum if start_rel < 0 or start_rel >= resp_total: continue if end_rel > resp_total: end_rel = resp_total actual_len = min(len(payload), end_rel - start_rel) if actual_len <= 0: continue out_buf[start_rel:start_rel + actual_len] = payload[:actual_len] got_ranges.add((start_rel, start_rel + actual_len)) if _sum > last_sum: last_sum = _sum if debug and (last_sum >= resp_total or (last_sum % 512 == 0)): self._log(f"[CHUNK] {start}+{last_sum}/{resp_total}") if last_sum >= resp_total: break # 清理实例(快路径:只删当前 hid) try: hardware_manager.at_client.send(f"AT+MHTTPDEL={hid}", "OK", 2000) except: pass if resp_total is None: return False, "no_header_or_total", total_len, md5_b64, 0 # 计算实际填充长度 merged = sorted(got_ranges) merged2 = [] for s, e in merged: if not merged2 or s > merged2[-1][1]: merged2.append((s, e)) else: merged2[-1] = (merged2[-1][0], max(merged2[-1][1], e)) filled = sum(e - s for s, e in merged2) if filled < resp_total: return False, f"incomplete_chunk got={filled} expected={resp_total} code={code}", total_len, md5_b64, filled got_len = resp_total return True, "OK", total_len, md5_b64, got_len def download_file_via_4g(self, url, filename, total_timeout_ms=600000, retries=3, debug=False): """ ML307R HTTP 下载(更稳的"固定小块 Range 顺序下载",基于main109.py): - 只依赖 +MHTTPURC:"header"/"content"(不依赖 MHTTPREAD/cached) - 每次只请求一个小块 Range(默认 10240B),失败就重试同一块,必要时缩小块大小 - 每个 chunk 都重新 MHTTPCREATE/MHTTPREQUEST,避免卡在"206 header 但不吐 content"的坏状态 - 使用二进制模式下载,确保文件完整性 """ # 小块策略(与main109.py保持一致) CHUNK_MAX = 10240 CHUNK_MIN = 128 CHUNK_RETRIES = 12 t_func0 = time.ticks_ms() parsed = urlparse(url) host = parsed.hostname path = parsed.path or "/" if not host: return False, "bad_url (no host)" if isinstance(url, str) and url.startswith("https://static.shelingxingqiu.com/"): base_url = "https://static.shelingxingqiu.com" # TODO:使用https,看看是否能成功 self._is_https = True else: base_url = f"http://{host}" self._is_https = False try: self._begin_ota() except: pass from network import network_manager with network_manager.get_uart_lock(): try: ok_pdp, ip = self._ensure_pdp() if not ok_pdp: return False, f"PDP not ready (ip={ip})" # 先清空旧事件,避免串台 self._clear_http_events() # 为了支持随机写入,先创建空文件 try: with open(filename, "wb") as f: f.write(b"") except Exception as e: return False, f"open_file_failed: {e}" total_len = None expect_md5_b64 = None offset = 0 chunk = CHUNK_MAX t_start = time.ticks_ms() last_progress_ms = t_start STALL_TIMEOUT_MS = 60000 last_pwr_ms = t_start self._pwr_log(prefix=" ota_start") bad_http_state = 0 while True: now = time.ticks_ms() if debug and time.ticks_diff(now, last_pwr_ms) >= 5000: last_pwr_ms = now self._pwr_log(prefix=f" off={offset}/{total_len or '?'}") if time.ticks_diff(now, t_start) > total_timeout_ms: return False, f"timeout overall after {total_timeout_ms}ms offset={offset} total={total_len}" if time.ticks_diff(now, last_progress_ms) > STALL_TIMEOUT_MS: return False, f"timeout stalled {STALL_TIMEOUT_MS}ms offset={offset} total={total_len}" if total_len is not None and offset >= total_len: break want = chunk if total_len is not None: remain = total_len - offset if remain <= 0: break if want > remain: want = remain # 本 chunk 的 buffer(长度=want) buf = bytearray(want) success = False last_err = "unknown" md5_seen = None got_len = 0 for k in range(1, CHUNK_RETRIES + 1): do_full_reset = (bad_http_state >= 2) ok, msg, tlen, md5_b64, got = self._fetch_range_into_buf(offset, want, buf, base_url, path, full_reset=do_full_reset) last_err = msg if tlen is not None and total_len is None: total_len = tlen if md5_b64 and not expect_md5_b64: expect_md5_b64 = md5_b64 if ok: success = True got_len = got bad_http_state = 0 break try: if ("no_header_or_total" in msg) or ("MHTTPREQUEST failed" in msg) or ( "MHTTPCREATE failed" in msg): bad_http_state += 1 else: bad_http_state = max(0, bad_http_state - 1) except: pass if chunk > CHUNK_MIN: chunk = max(CHUNK_MIN, chunk // 2) want = min(chunk, want) buf = bytearray(want) self._log(f"[RETRY] off={offset} want={want} try={k} err={msg}") self._pwr_log(prefix=f" retry{k} off={offset}") time.sleep_ms(120) if not success: return False, f"chunk_failed off={offset} want={want} err={last_err} total={total_len}" # 写入文件(二进制模式) try: with open(filename, "r+b") as f: f.seek(offset) f.write(bytes(buf)) except Exception as e: return False, f"write_failed off={offset}: {e}" offset += len(buf) last_progress_ms = time.ticks_ms() chunk = CHUNK_MAX if debug: self._log(f"[OK] offset={offset}/{total_len or '?'}") # MD5 校验 if expect_md5_b64 and hashlib is not None: try: with open(filename, "rb") as f: data = f.read() digest = hashlib.md5(data).digest() got_b64 = binascii.b2a_base64(digest).decode().strip() if got_b64 != expect_md5_b64: return False, f"md5_mismatch got={got_b64} expected={expect_md5_b64}" self.logger.debug(f"[4G-DL] MD5 verified: {got_b64}") except Exception as e: return False, f"md5_check_failed: {e}" t_cost = time.ticks_diff(time.ticks_ms(), t_func0) self.logger.info(f"[4G-DL] download complete: size={offset} ip={ip} cost_ms={t_cost}") return True, f"OK size={offset} ip={ip} cost_ms={t_cost}" finally: self._end_ota()