Files
archery/4g_download_manager.py
gcw_4spBpAfv 592dc6ceb1 v1.2.8
2026-02-10 17:52:55 +08:00

400 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import hashlib
import binascii
from maix import time
from power import get_bus_voltage, voltage_to_percent
from urllib.parse import urlparse
from hardware import hardware_manager
class DownloadManager4G:
"""4g下载管理器单例"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(DownloadManager4G, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# 私有状态
self.FRAG_SIZE = 1024
self.FRAG_DELAY = 10
self._initialized = True
def _log(self, *a):
if debug:
self.logger.debug(" ".join(str(x) for x in a))
def _pwr_log(self, prefix=""):
"""debug 用:输出电压/电量"""
if not debug:
return
try:
v = get_bus_voltage()
p = voltage_to_percent(v)
self.logger.debug(f"[PWR]{prefix} v={v:.3f}V p={p}%")
except Exception as e:
try:
self.logger.debug(f"[PWR]{prefix} read_failed: {e}")
except:
pass
def _clear_http_events(self):
if hardware_manager.at_client:
while hardware_manager.at_client.pop_http_event() is not None:
pass
def _parse_httpid(self, resp: str):
m = re.search(r"\+MHTTPCREATE:\s*(\d+)", resp)
return int(m.group(1)) if m else None
def _get_ip(self, ):
r = hardware_manager.at_client.send("AT+CGPADDR=1", "OK", 3000)
m = re.search(r'\+CGPADDR:\s*1,"([^"]+)"', r)
return m.group(1) if m else ""
def _ensure_pdp(self, ):
ip = self._get_ip()
if ip and ip != "0.0.0.0":
return True, ip
hardware_manager.at_client.send("AT+MIPCALL=1,1", "OK", 15000)
for _ in range(10):
ip = self._get_ip()
if ip and ip != "0.0.0.0":
return True, ip
time.sleep(1)
return False, ip
def _extract_hdr_fields(self, hdr_text: str):
mlen = re.search(r"Content-Length:\s*(\d+)", hdr_text, re.IGNORECASE)
clen = int(mlen.group(1)) if mlen else None
mmd5 = re.search(r"Content-Md5:\s*([A-Za-z0-9+/=]+)", hdr_text, re.IGNORECASE)
md5_b64 = mmd5.group(1).strip() if mmd5 else None
return clen, md5_b64
def _extract_content_range(self, hdr_text: str):
m = re.search(r"Content-Range:\s*bytes\s*(\d+)\s*-\s*(\d+)\s*/\s*(\d+)", hdr_text, re.IGNORECASE)
if not m:
return None, None, None
try:
return int(m.group(1)), int(m.group(2)), int(m.group(3))
except:
return None, None, None
def _hard_reset_http(self, ):
"""模块进入"坏状态"时的保守清场"""
self._clear_http_events()
for i in range(0, 6):
try:
hardware_manager.at_client.send(f"AT+MHTTPDEL={i}", "OK", 1200)
except:
pass
self._clear_http_events()
def _create_httpid(self, full_reset=False):
self._clear_http_events()
if hardware_manager.at_client:
hardware_manager.at_client.flush()
if full_reset:
self._hard_reset_http()
resp = hardware_manager.at_client.send(f'AT+MHTTPCREATE="{base_url}"', "OK", 8000)
hid = self._parse_httpid(resp)
if self._is_https:
resp = hardware_manager.at_client.send(f'AT+MHTTPCFG="ssl",{hid},1,1', "OK", 2000)
if "ERROR" in resp or "CME ERROR" in resp:
self.logger.error(f"MHTTPCFG SSL failed: {resp}")
# 尝试https 降级到http
downgraded_base_url = base_url.replace("https://", "http://")
resp = hardware_manager.at_client.send(f'AT+MHTTPCREATE="{downgraded_base_url}"', "OK", 8000)
hid = self._parse_httpid(resp)
return hid, resp
def _fetch_range_into_buf(self, start, want_len, out_buf, path, full_reset=False):
"""
请求 Range [start, start+want_len),写入 out_bufbytearray长度=want_len
返回 (ok, msg, total_len, md5_b64, got_len)
"""
end_incl = start + want_len - 1
hid, cresp = self._create_httpid(full_reset=full_reset)
if hid is None:
return False, f"MHTTPCREATE failed: {cresp}", None, None, 0
# 降低 URC 压力(分片/延迟)
hardware_manager.at_client.send(f'AT+MHTTPCFG="fragment",{hid},{self.FRAG_SIZE},{self.FRAG_DELAY}', "OK", 1500)
# 设置 Range headerinclusive
hardware_manager.at_client.send(f'AT+MHTTPCFG="header",{hid},"Range: bytes={start}-{end_incl}"', "OK", 3000)
req = hardware_manager.at_client.send(f'AT+MHTTPREQUEST={hid},1,0,"{path}"', "OK", 15000)
if "ERROR" in req or "CME ERROR" in req:
hardware_manager.at_client.send(f"AT+MHTTPDEL={hid}", "OK", 2000)
return False, f"MHTTPREQUEST failed: {req}", None, None, 0
# 等 header + content
hdr_text = None
hdr_accum = ""
code = None
resp_total = None
total_len = None
md5_b64 = None
got_ranges = set()
last_sum = 0
t0 = time.ticks_ms()
timeout_ms = 9000
logged_hdr = False
while time.ticks_ms() - t0 < timeout_ms:
ev = hardware_manager.at_client.pop_http_event() if hardware_manager.at_client else None
if not ev:
time.sleep_ms(5)
continue
if ev[0] == "header":
_, ehid, ecode, ehdr = ev
if ehid != hid:
continue
code = ecode
hdr_text = ehdr
if ehdr:
hdr_accum = (hdr_accum + "\n" + ehdr) if hdr_accum else ehdr
resp_total_tmp, md5_tmp = self._extract_hdr_fields(hdr_accum)
if md5_tmp:
md5_b64 = md5_tmp
cr_s, cr_e, cr_total = self._extract_content_range(hdr_accum)
if cr_total is not None:
total_len = cr_total
if resp_total_tmp is not None:
resp_total = resp_total_tmp
elif resp_total is None and (cr_s is not None) and (cr_e is not None) and (cr_e >= cr_s):
resp_total = (cr_e - cr_s + 1)
if (not logged_hdr) and (resp_total is not None or total_len is not None):
self._log(f"[HDR] id={hid} code={code} clen={resp_total} cr={cr_s}-{cr_e}/{cr_total}")
logged_hdr = True
continue
if ev[0] == "content":
_, ehid, _total, _sum, _cur, payload = ev
if ehid != hid:
continue
if resp_total is None:
resp_total = _total
if resp_total is None or resp_total <= 0:
continue
start_rel = _sum - _cur
end_rel = _sum
if start_rel < 0 or start_rel >= resp_total:
continue
if end_rel > resp_total:
end_rel = resp_total
actual_len = min(len(payload), end_rel - start_rel)
if actual_len <= 0:
continue
out_buf[start_rel:start_rel + actual_len] = payload[:actual_len]
got_ranges.add((start_rel, start_rel + actual_len))
if _sum > last_sum:
last_sum = _sum
if debug and (last_sum >= resp_total or (last_sum % 512 == 0)):
self._log(f"[CHUNK] {start}+{last_sum}/{resp_total}")
if last_sum >= resp_total:
break
# 清理实例(快路径:只删当前 hid
try:
hardware_manager.at_client.send(f"AT+MHTTPDEL={hid}", "OK", 2000)
except:
pass
if resp_total is None:
return False, "no_header_or_total", total_len, md5_b64, 0
# 计算实际填充长度
merged = sorted(got_ranges)
merged2 = []
for s, e in merged:
if not merged2 or s > merged2[-1][1]:
merged2.append((s, e))
else:
merged2[-1] = (merged2[-1][0], max(merged2[-1][1], e))
filled = sum(e - s for s, e in merged2)
if filled < resp_total:
return False, f"incomplete_chunk got={filled} expected={resp_total} code={code}", total_len, md5_b64, filled
got_len = resp_total
return True, "OK", total_len, md5_b64, got_len
def download_file_via_4g(self, url, filename,
total_timeout_ms=600000,
retries=3,
debug=False):
"""
ML307R HTTP 下载(更稳的"固定小块 Range 顺序下载"基于main109.py
- 只依赖 +MHTTPURC:"header"/"content"(不依赖 MHTTPREAD/cached
- 每次只请求一个小块 Range默认 10240B失败就重试同一块必要时缩小块大小
- 每个 chunk 都重新 MHTTPCREATE/MHTTPREQUEST避免卡在"206 header 但不吐 content"的坏状态
- 使用二进制模式下载,确保文件完整性
"""
# 小块策略与main109.py保持一致
CHUNK_MAX = 10240
CHUNK_MIN = 128
CHUNK_RETRIES = 12
t_func0 = time.ticks_ms()
parsed = urlparse(url)
host = parsed.hostname
path = parsed.path or "/"
if not host:
return False, "bad_url (no host)"
if isinstance(url, str) and url.startswith("https://static.shelingxingqiu.com/"):
base_url = "https://static.shelingxingqiu.com"
# TODO使用https看看是否能成功
self._is_https = True
else:
base_url = f"http://{host}"
self._is_https = False
try:
self._begin_ota()
except:
pass
from network import network_manager
with network_manager.get_uart_lock():
try:
ok_pdp, ip = self._ensure_pdp()
if not ok_pdp:
return False, f"PDP not ready (ip={ip})"
# 先清空旧事件,避免串台
self._clear_http_events()
# 为了支持随机写入,先创建空文件
try:
with open(filename, "wb") as f:
f.write(b"")
except Exception as e:
return False, f"open_file_failed: {e}"
total_len = None
expect_md5_b64 = None
offset = 0
chunk = CHUNK_MAX
t_start = time.ticks_ms()
last_progress_ms = t_start
STALL_TIMEOUT_MS = 60000
last_pwr_ms = t_start
self._pwr_log(prefix=" ota_start")
bad_http_state = 0
while True:
now = time.ticks_ms()
if debug and time.ticks_diff(now, last_pwr_ms) >= 5000:
last_pwr_ms = now
self._pwr_log(prefix=f" off={offset}/{total_len or '?'}")
if time.ticks_diff(now, t_start) > total_timeout_ms:
return False, f"timeout overall after {total_timeout_ms}ms offset={offset} total={total_len}"
if time.ticks_diff(now, last_progress_ms) > STALL_TIMEOUT_MS:
return False, f"timeout stalled {STALL_TIMEOUT_MS}ms offset={offset} total={total_len}"
if total_len is not None and offset >= total_len:
break
want = chunk
if total_len is not None:
remain = total_len - offset
if remain <= 0:
break
if want > remain:
want = remain
# 本 chunk 的 buffer长度=want
buf = bytearray(want)
success = False
last_err = "unknown"
md5_seen = None
got_len = 0
for k in range(1, CHUNK_RETRIES + 1):
do_full_reset = (bad_http_state >= 2)
ok, msg, tlen, md5_b64, got = self._fetch_range_into_buf(offset, want, buf, base_url, path, full_reset=do_full_reset)
last_err = msg
if tlen is not None and total_len is None:
total_len = tlen
if md5_b64 and not expect_md5_b64:
expect_md5_b64 = md5_b64
if ok:
success = True
got_len = got
bad_http_state = 0
break
try:
if ("no_header_or_total" in msg) or ("MHTTPREQUEST failed" in msg) or (
"MHTTPCREATE failed" in msg):
bad_http_state += 1
else:
bad_http_state = max(0, bad_http_state - 1)
except:
pass
if chunk > CHUNK_MIN:
chunk = max(CHUNK_MIN, chunk // 2)
want = min(chunk, want)
buf = bytearray(want)
self._log(f"[RETRY] off={offset} want={want} try={k} err={msg}")
self._pwr_log(prefix=f" retry{k} off={offset}")
time.sleep_ms(120)
if not success:
return False, f"chunk_failed off={offset} want={want} err={last_err} total={total_len}"
# 写入文件(二进制模式)
try:
with open(filename, "r+b") as f:
f.seek(offset)
f.write(bytes(buf))
except Exception as e:
return False, f"write_failed off={offset}: {e}"
offset += len(buf)
last_progress_ms = time.ticks_ms()
chunk = CHUNK_MAX
if debug:
self._log(f"[OK] offset={offset}/{total_len or '?'}")
# MD5 校验
if expect_md5_b64 and hashlib is not None:
try:
with open(filename, "rb") as f:
data = f.read()
digest = hashlib.md5(data).digest()
got_b64 = binascii.b2a_base64(digest).decode().strip()
if got_b64 != expect_md5_b64:
return False, f"md5_mismatch got={got_b64} expected={expect_md5_b64}"
self.logger.debug(f"[4G-DL] MD5 verified: {got_b64}")
except Exception as e:
return False, f"md5_check_failed: {e}"
t_cost = time.ticks_diff(time.ticks_ms(), t_func0)
self.logger.info(f"[4G-DL] download complete: size={offset} ip={ip} cost_ms={t_cost}")
return True, f"OK size={offset} ip={ip} cost_ms={t_cost}"
finally:
self._end_ota()