refine ota

This commit is contained in:
huangzhenwei2
2025-12-30 16:40:01 +08:00
parent 669d032f96
commit 92ad32bb8e

467
main.py
View File

@@ -1460,472 +1460,6 @@ def laser_calibration_worker():
else: else:
time.sleep_ms(50) time.sleep_ms(50)
def download_file_via_4g_legacy(url, filename,
total_timeout_ms=30000,
retries=3,
debug=False):
"""
ML307R HTTP 下载URC content 分片模式)
- 重试empty/incomplete/AT错误都会重试
- 超时total_timeout_ms
- 校验Content-Length 必须填满;如有 Content-Md5 且 hashlib 可用则校验 MD5
- 日志默认干净debug=True 才打印 URC 进度
"""
from urllib.parse import urlparse
parsed = urlparse(url)
host = parsed.hostname
path = parsed.path or "/"
base_url = f"http://{host}" # 你已验证 HTTP 可 200如需 https 需另配 SSL
def _log(*args):
if debug:
print(*args)
def _merge_ranges(ranges_iter):
"""合并重叠/相邻区间,返回 merged(list[(s,e)])(半开区间)"""
rs = sorted(ranges_iter)
merged = []
for s, e in rs:
if e <= s:
continue
if merged and s <= merged[-1][1]:
merged[-1] = (merged[-1][0], max(merged[-1][1], e))
else:
merged.append((s, e))
return merged
def _compute_gaps(total_len, got_ranges):
"""根据已填充区间计算缺口(半开区间)"""
if not total_len or total_len <= 0:
return [(0, 0)]
merged = _merge_ranges(got_ranges)
gaps = []
prev = 0
for s, e in merged:
if s > prev:
gaps.append((prev, s))
prev = max(prev, e)
if prev < total_len:
gaps.append((prev, total_len))
return gaps, merged
def _extract_content_range(hdr_text: str):
"""
Content-Range: bytes <start>-<end>/<total>
返回 (start, end, total);解析失败返回 (None,None,None)
"""
m = re.search(r"Content-Range:\s*bytes\s*(\d+)\s*-\s*(\d+)\s*/\s*(\d+)", hdr_text, re.IGNORECASE)
if not m:
return None, None, None
try:
return int(m.group(1)), int(m.group(2)), int(m.group(3))
except:
return None, None, None
def _get_ip():
r = at("AT+CGPADDR=1", "OK", 3000)
m = re.search(r'\+CGPADDR:\s*1,"([^"]+)"', r)
return m.group(1) if m else ""
def _clear_http_events():
# 清空旧的 HTTP URC 事件,避免串台
while at_client.pop_http_event() is not None:
pass
# 旧版基于直接 uart4g.read 的解析已迁移到 ATClient单读者保留函数占位避免大改动
def _parse_httpid(raw: bytes):
m = re.search(rb"\+MHTTPCREATE:\s*(\d+)", raw)
return int(m.group(1)) if m else None
# _try_parse_header/_try_parse_one_content 已由 ATClient 在 reader 线程中解析并推送事件
# _try_parse_one_content 已由 ATClient 解析
def _extract_hdr_fields(hdr_text: str):
# Content-Length
mlen = re.search(r"Content-Length:\s*(\d+)", hdr_text, re.IGNORECASE)
clen = int(mlen.group(1)) if mlen else None
# Content-Md5 (base64)
mmd5 = re.search(r"Content-Md5:\s*([A-Za-z0-9+/=]+)", hdr_text, re.IGNORECASE)
md5_b64 = mmd5.group(1).strip() if mmd5 else None
return clen, md5_b64
def _md5_base64(data: bytes) -> str:
if hashlib is None:
return ""
digest = hashlib.md5(data).digest()
# base64: 24 chars with ==
return binascii.b2a_base64(digest).decode().strip()
def _one_attempt(range_start=None, range_end=None,
body_buf=None, got_ranges=None,
total_len=None,
expect_md5_b64=None):
# 0) PDP确保有 IP避免把 OK 当成功)
ip = _get_ip()
if not ip or ip == "0.0.0.0":
at("AT+MIPCALL=1,1", "OK", 15000)
for _ in range(10):
ip = _get_ip()
if ip and ip != "0.0.0.0":
break
time.sleep(1)
if not ip or ip == "0.0.0.0":
return False, "PDP not ready (no_ip)", body_buf, got_ranges, total_len, expect_md5_b64
# 1) 清理旧实例 + 清空旧 HTTP 事件
for i in range(0, 6):
at(f"AT+MHTTPDEL={i}", "OK", 1500)
_clear_http_events()
# 2) 创建实例(用 at() 等待返回)
create_resp = at(f'AT+MHTTPCREATE="{base_url}"', "OK", 8000)
httpid = _parse_httpid(create_resp.encode())
if httpid is None:
return False, "MHTTPCREATE failed (no httpid)", body_buf, got_ranges, total_len, expect_md5_b64
# 2.5) Range 补洞按缺口请求指定字节段HTTP Range 右端是 inclusive
if range_start is not None and range_end is not None:
# 每次请求使用新 httpid避免 header 累积/污染
at(f'AT+MHTTPCFG="header",{httpid},"Range: bytes={int(range_start)}-{int(range_end)}"', "OK", 3000)
# 3) 发 GETHTTP URC 由 ATClient 解析并入队)
req_resp = at(f'AT+MHTTPREQUEST={httpid},1,0,"{path}"', "OK", 15000)
if "ERROR" in req_resp or "CME ERROR" in req_resp:
at(f"AT+MHTTPDEL={httpid}", "OK", 3000)
return False, f"MHTTPREQUEST failed: {req_resp}", body_buf, got_ranges, total_len, expect_md5_b64
# 4) 从 ATClient 的 http_events 队列收 header/content
urc_id = None
status_code = None
expect_len = None
# 若是 Range 响应206需要把响应内的偏移映射到“全文件”偏移
offset_base = 0
# got_ranges 记录“真实写入 body_buf 的半开区间”
if got_ranges is None:
got_ranges = set()
filled_new_bytes = 0
# last_sum/resp_total 用于判断“本次 HTTP 响应体”是否接收完成(尤其是 Range 场景)
last_sum = 0
resp_total = None
no_progress_count = 0 # 连续没有进展的次数
last_print_ms = time.ticks_ms()
last_print_sum = 0
# Range 补洞不需要等太久,避免卡死;全量下载用总超时
attempt_timeout_ms = total_timeout_ms
if range_start is not None and range_end is not None:
attempt_timeout_ms = min(total_timeout_ms, 8000)
t0 = time.ticks_ms()
while time.ticks_ms() - t0 < attempt_timeout_ms:
ev = at_client.pop_http_event()
if not ev:
# 如果 sum 已经达到 total_len但仍有 gaps等待更长时间有些分片可能延迟到达
# 对 Rangelast_sum 只会到 resp_total比如 686/774不能拿 total_len(59776) 比
if resp_total and last_sum >= resp_total:
# 本次响应体应该收齐了,继续等一小会儿(防止最后一个 URC 延迟),然后退出循环
time.sleep_ms(30)
no_progress_count += 1
if no_progress_count > 30:
break
continue
# 全量模式:如果模块宣称 sum 已经达到 total_len但仍有 gaps稍微多等
if (range_start is None and range_end is None) and total_len and last_sum >= total_len:
gaps_now, merged_now = _compute_gaps(total_len, got_ranges)
if gaps_now and not (len(gaps_now) == 1 and gaps_now[0] == (0, 0)):
time.sleep_ms(50)
else:
time.sleep_ms(5)
else:
time.sleep_ms(5)
no_progress_count += 1
# Range如果长时间没有事件也结束让上层重试
if range_start is not None and range_end is not None and no_progress_count > 200:
break
# 全量:如果长时间没有新事件,且 sum 已经达到 total_len认为接收完成可能有丢包
if no_progress_count > 100 and total_len and last_sum >= total_len:
break
continue
no_progress_count = 0 # 有事件,重置计数器
if ev[0] == "header":
_, hid, code, hdr_text = ev
if urc_id is None:
urc_id = hid
if hid != urc_id:
continue
status_code = code
expect_len, md5_b64 = _extract_hdr_fields(hdr_text)
# 只在“首次全量 header”里保留 Content-Md5Range 响应通常不带该字段
if md5_b64:
expect_md5_b64 = md5_b64
cr_s, cr_e, cr_total = _extract_content_range(hdr_text)
if cr_s is not None and cr_total is not None:
# 206 Partial Content
offset_base = cr_s
# Content-Range end 是 inclusive总长度以 total 为准
if total_len is None:
total_len = cr_total
elif total_len != cr_total:
_log(f"[WARN] total_len changed {total_len}->{cr_total}")
total_len = cr_total
if body_buf is None and total_len:
body_buf = bytearray(total_len)
# 对 Range 响应:优先使用 Content-Length 作为本次响应体长度
if expect_len is not None:
resp_total = expect_len
_log(f"[HDR] id={hid} code={code} len={expect_len} md5={expect_md5_b64}")
continue
if ev[0] == "content":
_, cid, _total, _sum, _cur, payload = ev
if urc_id is None:
urc_id = cid
if cid != urc_id:
continue
# 全量 200这里的 _total 就是全文件长度Range 206_total 可能只是“本次响应体长度”
if body_buf is None:
# 如果 header 没解析出 Content-Range总长度用 content 的 _total
if total_len is None:
total_len = _total
if total_len:
body_buf = bytearray(total_len)
if body_buf is None or total_len is None:
continue
# 若 header 没给 Content-Length就用 content 的 _total 作为本次响应体长度Range 场景下通常是这次 body 的长度)
if resp_total is None:
resp_total = _total
rel_start = _sum - _cur
rel_end = _sum
abs_start = offset_base + rel_start
abs_end = offset_base + rel_end
if abs_start < 0 or abs_start >= total_len:
continue
if abs_end < abs_start:
continue
if abs_end > total_len:
abs_end = total_len
expected_span = abs_end - abs_start
actual_len = min(len(payload), expected_span)
if actual_len <= 0:
continue
# 写入并记录“实际写入区间”,用于 gap 计算
body_buf[abs_start:abs_start + actual_len] = payload[:actual_len]
got_ranges.add((abs_start, abs_start + actual_len))
filled_new_bytes += actual_len
# 记录最大的 sum 值,用于判断是否所有数据都已发送
if _sum > last_sum:
last_sum = _sum
# debug 输出节流:每 ~8000 字节或 >=500ms 输出一次,避免 print 导致 UART 丢包
if debug:
now = time.ticks_ms()
if (time.ticks_diff(now, last_print_ms) >= 500) or (_sum - last_print_sum >= 8000) or (rel_end == _total):
_log(f"[URC] {abs_start}:{abs_start+actual_len} sum={_sum}/{_total} base={offset_base} +{filled_new_bytes}")
last_print_ms = now
last_print_sum = _sum
# 若是全量请求offset_base=0 且 total_len==_total尽早结束
if offset_base == 0 and total_len == _total:
# 不要用 filled_new_bytes 判断是否完整(可能有重叠)
pass
# Range本次响应体已收齐退出交给上层判断是否补上了缺口
if resp_total is not None and last_sum >= resp_total:
# 给一点时间让可能的尾部事件入队,然后退出
time.sleep_ms(10)
break
# 5) 清理实例
at(f"AT+MHTTPDEL={httpid}", "OK", 3000)
if body_buf is None:
return False, "empty_body", body_buf, got_ranges, total_len, expect_md5_b64
if total_len is None:
return False, "no_total_len", body_buf, got_ranges, total_len, expect_md5_b64
# 返回“本次尝试是否有实质进展”Range 补洞时,哪怕不完整也算成功推进
if filled_new_bytes <= 0:
return False, "no_progress", body_buf, got_ranges, total_len, expect_md5_b64
return True, f"PARTIAL ok +{filled_new_bytes} ip={ip} code={status_code}", body_buf, got_ranges, total_len, expect_md5_b64
global ota_in_progress
try:
ota_in_progress = int(ota_in_progress) + 1
except:
ota_in_progress = 1
with uart4g_lock:
try:
# -------- Phase 1: 全量 GET允许不完整后面用 Range 补洞)--------
body_buf = None
got_ranges = set()
total_len = None
expect_md5_b64 = None
last_err = "unknown"
for attempt in range(1, retries + 1):
ok, msg, body_buf, got_ranges, total_len, expect_md5_b64 = _one_attempt(
body_buf=body_buf, got_ranges=got_ranges, total_len=total_len, expect_md5_b64=expect_md5_b64
)
last_err = msg
if not ok:
_log(f"[RETRY] full attempt={attempt} failed={msg}")
time.sleep_ms(200)
continue
gaps, merged = _compute_gaps(total_len, got_ranges)
filled_total = sum(e - s for s, e in merged)
if gaps and gaps[0] == (0, 0):
gaps = []
if not gaps:
break
_log(f"[GAPS] after full attempt={attempt} filled={filled_total}/{total_len} gaps={gaps[:3]}")
time.sleep_ms(150)
if body_buf is None or total_len is None:
return False, f"FAILED: {last_err}"
gaps, merged = _compute_gaps(total_len, got_ranges)
if gaps and gaps[0] == (0, 0):
gaps = []
# -------- Phase 2: Range 补洞 --------
# 说明:
# - “全量 GET 多次重试 + 合并已收到分片”我们已经在 Phase1 做了got_ranges/body_buf 会跨 attempt 累积)。
# - 仍存在 gaps 说明:这些字节段在全量阶段始终没收到,需要靠 Range 反复补洞。
#
# 策略:
# - Range 分块更小(更稳),失败时继续“二分缩小”到 MIN_RANGE_BYTES
# - 不要因为某一轮 no_progress 就立刻退出UART 偶发丢 URC需要多轮撞上一次成功
MAX_RANGE_BYTES = 1024
MIN_RANGE_BYTES = 128
RANGE_RETRIES_EACH = 8
MAX_HOLE_ROUNDS = 50
NO_PROGRESS_ROUNDS_LIMIT = 8
round_i = 0
no_progress_rounds = 0
while gaps and round_i < MAX_HOLE_ROUNDS:
round_i += 1
# 优先补最大的洞(通常只丢中间一两段)
gaps = sorted(gaps, key=lambda g: g[1] - g[0], reverse=True)
_log(f"[RANGE] round={round_i} gaps={gaps[:3]}")
progress_any = False
# 每轮最多补前 5 个洞,避免无限循环
for (gs, ge) in gaps[:5]:
cur = gs
chunk = MAX_RANGE_BYTES
while cur < ge:
sub_end = min(ge, cur + chunk)
# HTTP Range end is inclusive
rs = cur
re_incl = sub_end - 1
before_gaps, before_merged = _compute_gaps(total_len, got_ranges)
before_filled = sum(e - s for s, e in before_merged)
sub_ok = False
sub_err = "unknown"
for k in range(1, RANGE_RETRIES_EACH + 1):
ok2, msg2, body_buf, got_ranges, total_len, expect_md5_b64 = _one_attempt(
range_start=rs, range_end=re_incl,
body_buf=body_buf, got_ranges=got_ranges, total_len=total_len, expect_md5_b64=expect_md5_b64
)
sub_err = msg2
if ok2:
sub_ok = True
break
_log(f"[RETRY] range {rs}-{re_incl} try={k} failed={msg2}")
time.sleep_ms(150)
after_gaps, after_merged = _compute_gaps(total_len, got_ranges)
after_filled = sum(e - s for s, e in after_merged)
if after_filled > before_filled:
progress_any = True
# 成功推进:恢复到较大 chunk加快补洞
chunk = MAX_RANGE_BYTES
cur = sub_end
else:
# 没推进:缩小 chunk继续在同一位置重试不要前进 cur
if chunk > MIN_RANGE_BYTES:
chunk = max(MIN_RANGE_BYTES, chunk // 2)
_log(f"[RANGE] shrink chunk -> {chunk} at pos={cur}")
else:
# 已经很小还不行:本轮先放弃这个位置,留给下一轮再撞
if not sub_ok:
_log(f"[WARN] range {rs}-{re_incl} failed={sub_err}")
break
# 小歇一下,给读线程喘息
time.sleep_ms(120)
gaps, merged = _compute_gaps(total_len, got_ranges)
if gaps and gaps[0] == (0, 0):
gaps = []
filled_total = sum(e - s for s, e in merged)
if not gaps:
break
if not progress_any:
no_progress_rounds += 1
_log(f"[RANGE] no progress in round={round_i} ({no_progress_rounds}/{NO_PROGRESS_ROUNDS_LIMIT}) filled={filled_total}/{total_len}")
# 多轮无进展才退出(避免偶发“只 header 无 content URC”导致过早退出
if no_progress_rounds >= NO_PROGRESS_ROUNDS_LIMIT:
break
# 退避等待一下再继续下一轮
time.sleep_ms(500)
continue
else:
no_progress_rounds = 0
_log(f"[RANGE] round={round_i} filled={filled_total}/{total_len} gaps={gaps[:3]}")
# 完整性检查
gaps, merged = _compute_gaps(total_len, got_ranges)
if gaps and gaps[0] == (0, 0):
gaps = []
filled_total = sum(e - s for s, e in merged)
if gaps:
return False, f"incomplete_body got={filled_total} expected={total_len} missing={total_len - filled_total} gaps={gaps[:5]}"
data = bytes(body_buf)
# 校验Content-Md5base64若有
if expect_md5_b64 and hashlib is not None:
md5_b64 = _md5_base64(data)
if md5_b64 != expect_md5_b64:
return False, f"md5_mismatch got={md5_b64} expected={expect_md5_b64}"
# 写文件(原样 bytes
with open(filename, "wb") as f:
f.write(data)
return True, f"OK size={len(data)} ip={_get_ip()} md5={expect_md5_b64 or ''}"
finally:
try:
ota_in_progress = max(0, int(ota_in_progress) - 1)
except:
ota_in_progress = 0
def download_file_via_4g(url, filename, def download_file_via_4g(url, filename,
total_timeout_ms=600000, total_timeout_ms=600000,
retries=3, retries=3,
@@ -2036,6 +1570,7 @@ def download_file_via_4g(url, filename,
def _create_httpid(full_reset=False): def _create_httpid(full_reset=False):
_clear_http_events() _clear_http_events()
at_client.flush()
if full_reset: if full_reset:
_hard_reset_http() _hard_reset_http()
resp = at(f'AT+MHTTPCREATE="{base_url}"', "OK", 8000) resp = at(f'AT+MHTTPCREATE="{base_url}"', "OK", 8000)