Compare commits

...

2 Commits

Author SHA1 Message Date
huangzhenwei2
85a5ff9ff0 conflict merge 2025-12-28 16:30:11 +08:00
huangzhenwei2
e712e11ea0 ota update 2025-12-28 16:22:41 +08:00

747
main.py
View File

@@ -7,29 +7,88 @@
作者ZZH
最后更新2025-11-21
"""
import _thread
import hashlib
import hmac
import json
import os
import re
import socket
import struct
from maix import camera, display, image, app, time, key, uart, pinmap, i2c, network, err
import cv2
import numpy as np
import requests
import ujson
from maix import camera, display, image, app, time, key, uart, pinmap, i2c, network, err
import json
import struct
import re
from maix.peripheral import adc
import _thread
import os
import hmac
import ujson
import hashlib
import requests
import socket
import re
import binascii
try:
import hashlib
except:
hashlib = None
# import config
# ==================== Locks ====================
class _Mutex:
"""
基于 _thread.allocate_lock() 的互斥锁封装:
- 支持 with
- 支持 try_acquire若固件不支持非阻塞 acquire 参数,则退化为阻塞 acquire
"""
def __init__(self):
self._lk = _thread.allocate_lock()
def acquire(self, blocking=True):
try:
return self._lk.acquire(blocking)
except TypeError:
self._lk.acquire()
return True
def try_acquire(self):
return self.acquire(False)
def release(self):
self._lk.release()
def __enter__(self):
self.acquire(True)
return self
def __exit__(self, exc_type, exc, tb):
self.release()
return False
# ==================== 全局配置 ====================
# OTA 升级地址与本地路径
url = "https://static.shelingxingqiu.com/shoot/202511031031/main.py"
local_filename = "/maixapp/apps/t11/main.py"
# url = "https://static.shelingxingqiu.com/shoot/202511031031/main.py"
local_filename = "/maixapp/apps/t11/main_tmp.py"
app_version = '1.0.0'
# OTA 下发参数(由后端指令写入)
OTA_URL = None
OTA_MODE = None # "4g" / "wifi" / None
def is_wifi_connected():
"""尽量判断当前是否有 Wi-Fi有则走 Wi-Fi OTA否则走 4G OTA"""
# 优先用 MaixPy network如果可用
try:
wlan = network.WLAN(network.TYPE_WIFI)
if wlan.isconnected():
return True
except:
pass
# 兜底:看系统 wlan0 有没有 IP你系统可能没有 wlan0则返回 False
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
return bool(ip)
except:
return False
# 设备认证信息(运行时动态加载)
DEVICE_ID = None
@@ -74,7 +133,7 @@ length = 2
# 全局状态变量
laser_calibration_active = False # 是否正在后台校准激光
laser_calibration_result = None # 校准结果坐标 (x, y)
laser_calibration_lock = False # 简易互斥锁,防止多线程冲突
laser_calibration_lock = _Mutex() # 互斥锁,防止多线程冲突
# 硬件对象初始化
laser_x, laser_y = laser_point
@@ -102,9 +161,10 @@ REAL_RADIUS_CM = 15 # 靶心实际半径(厘米)
tcp_connected = False
high_send_queue = [] # 高优先级发送队列:射箭事件等
normal_send_queue = [] # 普通发送队列:电量/校准结果/状态等
queue_lock = False # 简易互斥锁,保护队列
uart4g_lock = False # 简易互斥锁,保护 4G 串口 AT 发送流程(防并发)
queue_lock = _Mutex() # 互斥锁,保护队列
uart4g_lock = _Mutex() # 互斥锁,保护 4G 串口 AT 发送流程(防并发)
update_thread_started = False # 防止 OTA 更新线程重复启动
ota_in_progress = False # OTA(4G HTTP URC) 期间暂停 tcp_main 读取 uart4g避免吞掉 +MHTTPURC
# ==================== 工具函数 ====================
@@ -140,16 +200,94 @@ def is_server_reachable(host, port=80, timeout=5):
print(f"[NET] 无法连接 {host}:{port} - {e}")
return False
def direct_ota_download():
def apply_ota_and_reboot(ota_url=None):
"""
OTA 文件下载成功后:备份原 main.py -> 替换 main_tmp.py -> 重启设备
"""
import shutil
main_py = "/maixapp/apps/t11/main.py"
main_tmp = "/maixapp/apps/t11/main_tmp.py"
main_bak = "/maixapp/apps/t11/main.py.bak"
ota_pending = "/maixapp/apps/t11/ota_pending.json"
try:
# 1. 检查下载的文件是否存在
if not os.path.exists(main_tmp):
print(f"[OTA] 错误:{main_tmp} 不存在")
return False
# 2. 备份原 main.py如果存在
if os.path.exists(main_py):
try:
shutil.copy2(main_py, main_bak)
print(f"[OTA] 已备份 {main_py} -> {main_bak}")
except Exception as e:
print(f"[OTA] 备份失败: {e}")
# 备份失败也继续(可能没有原文件)
# 3. 替换main_tmp.py -> main.py
try:
shutil.copy2(main_tmp, main_py)
print(f"[OTA] 已替换 {main_tmp} -> {main_py}")
# 确保写入磁盘
try:
os.sync() # 如果系统支持
except:
pass
time.sleep_ms(500) # 额外等待确保写入完成
except Exception as e:
print(f"[OTA] 替换失败: {e}")
return False
# 3.5 写入 pending用于重启后确认成功并上报
try:
pending_obj = {
"ts": int(time.time()) if hasattr(time, "time") else 0,
"url": ota_url or "",
"tmp": main_tmp,
"main": main_py,
"bak": main_bak,
}
with open(ota_pending, "w", encoding="utf-8") as f:
json.dump(pending_obj, f)
try:
os.sync()
except:
pass
except Exception as e:
print(f"[OTA] 写入 ota_pending 失败: {e}")
# 4. 通知服务器(可选,但重启前发一次)
safe_enqueue({"result": "ota_applied_rebooting"}, 2)
time.sleep_ms(1000) # 给一点时间让消息发出
# 5. 重启设备
print("[OTA] 准备重启设备...")
os.system("reboot") # MaixPy 通常是这个命令
return True
except Exception as e:
print(f"[OTA] apply_ota_and_reboot 异常: {e}")
return False
def direct_ota_download(ota_url):
"""
直接执行 OTA 下载(假设已有网络)
用于 cmd=7 触发
用于 cmd=7 / 或 wifi 模式
"""
global update_thread_started
try:
# 再次确认网络可达(可选但推荐)
if not ota_url:
safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2)
return
from urllib.parse import urlparse
parsed_url = urlparse(url)
parsed_url = urlparse(ota_url)
host = parsed_url.hostname
port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80)
@@ -157,9 +295,16 @@ def direct_ota_download():
safe_enqueue({"result": "ota_failed", "reason": f"无法连接 {host}:{port}"}, 2)
return
print(f"[OTA] 开始直接下载固件...")
result_msg = download_file(url, local_filename)
print(f"[OTA] 开始下载: {ota_url}")
result_msg = download_file(ota_url, local_filename)
print(f"[OTA] {result_msg}")
# 检查是否下载成功(包含"成功"或"下载成功"关键字)
if "成功" in result_msg or "下载成功" in result_msg:
# 下载成功:备份+替换+重启
if apply_ota_and_reboot(ota_url):
return # 会重启,不会执行到 finally
else:
safe_enqueue({"result": result_msg}, 2)
except Exception as e:
@@ -167,11 +312,11 @@ def direct_ota_download():
print(error_msg)
safe_enqueue({"result": "ota_failed", "reason": error_msg}, 2)
finally:
update_thread_started = False # 允许下次 OTA
update_thread_started = False
def handle_wifi_and_update(ssid, password):
def handle_wifi_and_update(ssid, password, ota_url):
"""在子线程中执行 Wi-Fi 连接 + OTA 更新流程"""
global update_thread_started
try:
ip, error = connect_wifi(ssid, password)
if error:
@@ -179,9 +324,13 @@ def handle_wifi_and_update(ssid, password):
return
safe_enqueue({"result": "wifi_connected", "ip": ip}, 2)
# 解析 OTA 地址并测试连通性
# 下载
if not ota_url:
safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2)
return
from urllib.parse import urlparse
parsed_url = urlparse(url)
parsed_url = urlparse(ota_url)
host = parsed_url.hostname
port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80)
@@ -191,8 +340,15 @@ def handle_wifi_and_update(ssid, password):
return
print(f"[NET] 已确认可访问 {host}:{port},开始下载...")
result = download_file(url, local_filename)
result = download_file(ota_url, local_filename)
print(result)
# 检查是否下载成功(包含"成功"或"下载成功"关键字)
if "成功" in result or "下载成功" in result:
# 下载成功:备份+替换+重启
if apply_ota_and_reboot(ota_url):
return # 会重启,不会执行到 finally
else:
safe_enqueue({"result": result}, 2)
finally:
@@ -259,27 +415,22 @@ def read_device_id():
def safe_enqueue(data_dict, msg_type=2, high=False):
"""线程安全地将消息加入 TCP 发送队列(支持优先级)"""
global queue_lock, high_send_queue, normal_send_queue
while queue_lock:
time.sleep_ms(1)
queue_lock = True
item = (msg_type, data_dict)
with queue_lock:
if high:
high_send_queue.append(item)
else:
normal_send_queue.append(item)
queue_lock = False
def _uart4g_lock_acquire():
global uart4g_lock
while uart4g_lock:
time.sleep_ms(1)
uart4g_lock = True
# 兼容旧调用:建议改为 `with uart4g_lock:`
uart4g_lock.acquire(True)
def _uart4g_lock_release():
global uart4g_lock
uart4g_lock = False
# 兼容旧调用:建议改为 `with uart4g_lock:`
uart4g_lock.release()
def at(cmd, wait="OK", timeout=2000):
@@ -323,8 +474,7 @@ def tcp_send_raw(data: bytes, max_retries=2) -> bool:
global tcp_connected
if not tcp_connected:
return False
_uart4g_lock_acquire()
try:
with uart4g_lock:
for attempt in range(max_retries):
cmd = f'AT+MIPSEND=0,{len(data)}'
if ">" not in at(cmd, ">", 1500):
@@ -345,8 +495,6 @@ def tcp_send_raw(data: bytes, max_retries=2) -> bool:
time.sleep_ms(100)
return False
finally:
_uart4g_lock_release()
def generate_token(device_id):
@@ -444,7 +592,7 @@ def find_red_laser(frame, threshold=150):
for y in range(0, h, 2):
for x in range(0, w, 2):
idx = (y * w + x) * 3
r, g, b = img_bytes[idx], img_bytes[idx + 1], img_bytes[idx + 2]
r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2]
if r > threshold and r > g * 2 and r > b * 2:
rgb_sum = r + g + b
if rgb_sum > max_sum:
@@ -594,19 +742,6 @@ def compute_laser_position(circle_center, laser_point, radius, method):
return dx / (circle_r / 100.0), -dy / (circle_r / 100.0)
def compute_laser_position_v2(circle_center, laser_point):
if circle_center is None:
return 200, 200
cx, cy = circle_center
lx, ly = 320, 220
dx = lx - cx
dy = ly - cy
r = 22.16 * 5
target_x = dx / r * 100
target_y = dy / r * 100
print(f"lx{lx} ly: {ly} cx: {cx} cy: {cy} dx: {dx} dy: {dy} result_x: {target_x} result_y: {-target_y}")
return (target_x, -target_y)
# ==================== TCP 通信线程 ====================
def connect_server():
@@ -615,12 +750,9 @@ def connect_server():
if tcp_connected:
return True
print("连接到服务器...")
_uart4g_lock_acquire()
try:
with uart4g_lock:
at("AT+MIPCLOSE=0", "OK", 1000)
res = at(f'AT+MIPOPEN=0,"TCP","{SERVER_IP}",{SERVER_PORT}', "+MIPOPEN", 8000)
finally:
_uart4g_lock_release()
if "+MIPOPEN: 0,0" in res:
tcp_connected = True
return True
@@ -651,6 +783,13 @@ def tcp_main():
while True:
# 接收数据
# OTA(4G HTTP) 会从 uart4g 吐出大量 +MHTTPURC 数据;
# tcp_main 若在此 read会把 URC 吃掉,导致 OTA empty_body/incomplete_body。
# 同时 uart4g_lock 置 True 时也不要抢读。
if ota_in_progress:
time.sleep_ms(20)
continue
data = uart4g.read()
if data:
rx_buf += data
@@ -669,6 +808,23 @@ def tcp_main():
logged_in = True
last_heartbeat_ack_time = time.ticks_ms()
print("✅ 登录成功")
# 若存在 ota_pending.json说明上次 OTA 已应用并重启;
# 这里以“能成功登录服务器”为 OTA 成功判据:上报 ota_ok 并删除 pending确保只上报一次。
try:
pending_path = "/maixapp/apps/t11/ota_pending.json"
if os.path.exists(pending_path):
try:
with open(pending_path, "r", encoding="utf-8") as f:
pending_obj = json.load(f)
except:
pending_obj = {}
safe_enqueue({"result": "ota_ok", "url": pending_obj.get("url", "")}, 2)
try:
os.remove(pending_path)
except:
pass
except Exception as e:
print(f"[OTA] ota_ok 上报失败: {e}")
else:
break
@@ -696,23 +852,44 @@ def tcp_main():
battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)}
safe_enqueue(battery_data, 2)
print(f"🔋 电量上报: {battery_percent}%")
elif inner_cmd == 5: # OTA 升级(含 Wi-Fi 配置)
elif inner_cmd == 5: # OTA 升级(含 Wi-Fi 配置及4g
inner_data = body["data"].get("data", {})
ssid = inner_data.get("ssid")
password = inner_data.get("password")
ota_url = inner_data.get("url")
mode = (inner_data.get("mode") or "").strip().lower() # "4g"/"wifi"/""
if not ota_url:
print("ota missing_url")
safe_enqueue({"result": "missing_url"}, 2)
rx_buf = rx_buf[match.end():] # 关键:先消费掉这个包
continue
# 自动判断mode 非法/为空时,优先 Wi-Fi如果已连否则 4G
if mode not in ("4g", "wifi"):
print("ota missing mode")
mode = "wifi" if is_wifi_connected() else "4g"
if update_thread_started:
safe_enqueue({"result": "update_already_started"}, 2)
rx_buf = rx_buf[match.end():] # 关键:先消费掉这个包
continue
update_thread_started = True
if mode == "4g":
_thread.start_new_thread(direct_ota_download_via_4g, (ota_url,))
else:
# wifi 模式:需要 ssid/password
if not ssid or not password:
update_thread_started = False
safe_enqueue({"result": "missing_ssid_or_password"}, 2)
else:
# global update_thread_started
if not update_thread_started:
update_thread_started = True
_thread.start_new_thread(handle_wifi_and_update, (ssid, password))
else:
safe_enqueue({"result": "update_already_started"}, 2)
_thread.start_new_thread(handle_wifi_and_update, (ssid, password, ota_url))
elif inner_cmd == 6:
try:
ip = os.popen(
"ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
ip = ip if ip else "no_ip"
except:
ip = "error_getting_ip"
@@ -721,18 +898,17 @@ def tcp_main():
# global update_thread_started
if update_thread_started:
safe_enqueue({"result": "update_already_started"}, 2)
rx_buf = rx_buf[match.end():] # 关键:先消费掉这个包
continue
# 实时检查是否有 IP
try:
ip = os.popen(
"ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
except:
ip = None
if not ip:
safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"},
MSG_TYPE_STATUS)
safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, MSG_TYPE_STATUS)
else:
# 启动纯下载线程
update_thread_started = True
@@ -746,29 +922,34 @@ def tcp_main():
break
# 发送队列中的业务数据
if logged_in and (high_send_queue or normal_send_queue) and (not queue_lock):
if logged_in and (high_send_queue or normal_send_queue):
# 只在锁内取出一个待发包,发送放到锁外,避免长时间占用队列锁
while queue_lock:
time.sleep_ms(1)
queue_lock = True
msg_type = None
data_dict = None
if queue_lock.try_acquire():
try:
if high_send_queue:
msg_type, data_dict = high_send_queue.pop(0)
else:
elif normal_send_queue:
msg_type, data_dict = normal_send_queue.pop(0)
queue_lock = False
finally:
queue_lock.release()
if msg_type is not None and data_dict is not None:
pkt = make_packet(msg_type, data_dict)
if not tcp_send_raw(pkt):
tcp_connected = False
break
# 发送激光校准结果
if logged_in and not laser_calibration_lock and laser_calibration_result is not None:
laser_calibration_lock = True
if logged_in and laser_calibration_result is not None:
x = y = None
with laser_calibration_lock:
if laser_calibration_result is not None:
x, y = laser_calibration_result
safe_enqueue({"result": "ok", "x": x, "y": y}, 2)
laser_calibration_result = None
laser_calibration_lock = False
if x is not None and y is not None:
safe_enqueue({"result": "ok", "x": x, "y": y}, 2)
# 定期发送心跳
current_time = time.ticks_ms()
@@ -788,7 +969,7 @@ def tcp_main():
print("💓 心跳已发送")
# 心跳超时重连
if logged_in and current_time - last_heartbeat_ack_time > 1000 * 60 * 10: # 十分钟
if logged_in and current_time - last_heartbeat_ack_time > 1000*60*10: # 十分钟
print("⏰ 十分钟无心跳ACK重连")
break
@@ -806,12 +987,9 @@ def laser_calibration_worker():
if laser_calibration_active:
result = calibrate_laser_position()
if result and len(result) == 2:
while laser_calibration_lock:
time.sleep_ms(1)
laser_calibration_lock = True
with laser_calibration_lock:
laser_calibration_result = result
laser_calibration_active = False
laser_calibration_lock = False
print(f"✅ 后台校准成功: {result}")
else:
time.sleep_ms(80)
@@ -819,6 +997,342 @@ def laser_calibration_worker():
time.sleep_ms(50)
def download_file_via_4g(url, filename,
total_timeout_ms=30000,
retries=3,
debug=False):
"""
ML307R HTTP 下载URC content 分片模式)
- 重试empty/incomplete/AT错误都会重试
- 超时total_timeout_ms
- 校验Content-Length 必须填满;如有 Content-Md5 且 hashlib 可用则校验 MD5
- 日志默认干净debug=True 才打印 URC 进度
"""
from urllib.parse import urlparse
parsed = urlparse(url)
host = parsed.hostname
path = parsed.path or "/"
base_url = f"http://{host}" # 你已验证 HTTP 可 200如需 https 需另配 SSL
def _log(*args):
if debug:
print(*args)
def _get_ip():
r = at("AT+CGPADDR=1", "OK", 3000)
m = re.search(r'\+CGPADDR:\s*1,"([^"]+)"', r)
return m.group(1) if m else ""
def _drain_uart(max_ms=300):
"""清空串口残留,防止旧 URC 干扰"""
t0 = time.ticks_ms()
while time.ticks_ms() - t0 < max_ms:
d = uart4g.read(4096)
if not d:
time.sleep_ms(10)
continue
def _read_more(buf: bytes, ms=50) -> bytes:
t0 = time.ticks_ms()
while time.ticks_ms() - t0 < ms:
d = uart4g.read(4096)
if d:
buf += d
time.sleep_ms(1)
return buf
def _parse_httpid(raw: bytes):
m = re.search(rb"\+MHTTPCREATE:\s*(\d+)", raw)
return int(m.group(1)) if m else None
def _try_parse_header(buf: bytes):
"""
+MHTTPURC: "header",<id>,<code>,<hdr_len>,<hdr_text...>
返回 (urc_id, status_code, header_text, rest_buf) 或 None
"""
tag = b'+MHTTPURC: "header",'
i = buf.find(tag)
if i < 0:
return None
if i > 0:
buf = buf[i:]
i = 0
# header 在 hdr_text 里包含 \r\n我们用 hdr_len 精确截取
j = i + len(tag)
comma_count = 0
k = j
while k < len(buf) and comma_count < 3: # 3 个逗号到 hdr_len 后的逗号
if buf[k:k+1] == b",":
comma_count += 1
k += 1
if comma_count < 3:
return None
header_prefix = buf[i:k]
m = re.search(rb'\+MHTTPURC: "header",\s*(\d+),\s*(\d+),\s*(\d+),', header_prefix)
if not m:
return ("drop", buf[1:])
urc_id = int(m.group(1))
code = int(m.group(2))
hdr_len = int(m.group(3))
text_start = k
text_end = text_start + hdr_len
if len(buf) < text_end:
return None
hdr_text = buf[text_start:text_end].decode("utf-8", "ignore")
rest = buf[text_end:]
return ("ok", urc_id, code, hdr_text, rest)
def _try_parse_one_content(buf: bytes):
"""
+MHTTPURC: "content",<id>,<total_len>,<sum_len>,<cur_len>,<payload...>
返回 ("ok", urc_id, total_len, sum_len, cur_len, payload_bytes, rest_buf) 或 None
"""
tag = b'+MHTTPURC: "content",'
i = buf.find(tag)
if i < 0:
return None
if i > 0:
buf = buf[i:]
i = 0
j = i + len(tag)
comma_count = 0
k = j
while k < len(buf) and comma_count < 4: # payload 前只有 4 个逗号
if buf[k:k+1] == b",":
comma_count += 1
k += 1
if comma_count < 4:
return None
prefix = buf[i:k]
m = re.search(rb'\+MHTTPURC: "content",\s*(\d+),\s*(\d+),\s*(\d+),\s*(\d+),', prefix)
if not m:
return ("drop", buf[1:])
urc_id = int(m.group(1))
total_len = int(m.group(2))
sum_len = int(m.group(3))
cur_len = int(m.group(4))
payload_start = k
payload_end = payload_start + cur_len
if len(buf) < payload_end:
return None
payload = buf[payload_start:payload_end]
rest = buf[payload_end:]
return ("ok", urc_id, total_len, sum_len, cur_len, payload, rest)
def _extract_hdr_fields(hdr_text: str):
# Content-Length
mlen = re.search(r"Content-Length:\s*(\d+)", hdr_text, re.IGNORECASE)
clen = int(mlen.group(1)) if mlen else None
# Content-Md5 (base64)
mmd5 = re.search(r"Content-Md5:\s*([A-Za-z0-9+/=]+)", hdr_text, re.IGNORECASE)
md5_b64 = mmd5.group(1).strip() if mmd5 else None
return clen, md5_b64
def _md5_base64(data: bytes) -> str:
if hashlib is None:
return ""
digest = hashlib.md5(data).digest()
# base64: 24 chars with ==
return binascii.b2a_base64(digest).decode().strip()
def _one_attempt():
# 0) PDP确保有 IP避免把 OK 当成功)
ip = _get_ip()
if not ip or ip == "0.0.0.0":
at("AT+MIPCALL=1,1", "OK", 15000)
for _ in range(10):
ip = _get_ip()
if ip and ip != "0.0.0.0":
break
time.sleep(1)
if not ip or ip == "0.0.0.0":
return False, "PDP not ready (no_ip)"
# 1) 清理旧实例 + 清空串口残留
for i in range(0, 6):
at(f"AT+MHTTPDEL={i}", "OK", 1500)
_drain_uart(300)
# 2) 创建实例httpid 可能延迟吐出来)
uart4g.write((f'AT+MHTTPCREATE="{base_url}"\r\n').encode())
raw = b""
raw = _read_more(raw, 300)
raw = _read_more(raw, 2000)
httpid = _parse_httpid(raw)
if httpid is None:
return False, "MHTTPCREATE failed (no httpid)"
# 3) 发 GET不用 at()(避免 at 吃掉 URC
_drain_uart(100)
uart4g.write((f'AT+MHTTPREQUEST={httpid},1,0,"{path}"\r\n').encode())
# 4) 收 URC先拿 headercode/length/md5再收 content 分片
buf = b""
urc_id = None
status_code = None
total_len = None
expect_len = None
expect_md5_b64 = None
body_buf = None
got_ranges = set()
filled_bytes = 0
t0 = time.ticks_ms()
last_sum = 0
while time.ticks_ms() - t0 < total_timeout_ms:
buf = _read_more(buf, 50)
# 4.1 尝试解析 header可能先来
while True:
ph = _try_parse_header(buf)
if ph is None:
break
if ph[0] == "drop":
buf = ph[1]
continue
_, hid, code, hdr_text, rest = ph
buf = rest
if urc_id is None:
urc_id = hid
status_code = code
expect_len, expect_md5_b64 = _extract_hdr_fields(hdr_text)
_log(f"[HDR] id={hid} code={code} len={expect_len} md5={expect_md5_b64}")
# 4.2 解析尽可能多的 content
while True:
pc = _try_parse_one_content(buf)
if pc is None:
break
if pc[0] == "drop":
buf = pc[1]
continue
_, cid, _total, _sum, _cur, payload, rest = pc
buf = rest
if urc_id is None:
urc_id = cid
if cid != urc_id:
continue
if total_len is None:
total_len = _total
body_buf = bytearray(total_len)
# 定位写入
start = _sum - _cur
end = _sum
if start < 0 or end > total_len:
continue
key = (start, end)
if key not in got_ranges:
got_ranges.add(key)
body_buf[start:end] = payload
filled_bytes += (end - start)
if _sum > last_sum:
last_sum = _sum
if debug:
_log(f"[URC] {start}:{end} sum={_sum}/{total_len} filled={filled_bytes}")
# 完整条件:填满 total_len
if filled_bytes == total_len:
break
if total_len is not None and filled_bytes == total_len:
break
# 5) 清理实例
at(f"AT+MHTTPDEL={httpid}", "OK", 3000)
if body_buf is None:
return False, "empty_body"
if total_len is None:
return False, "no_total_len"
if filled_bytes != total_len:
return False, f"incomplete_body got={filled_bytes} expected={total_len}"
data = bytes(body_buf)
# 6) 校验Content-Length
if expect_len is not None and len(data) != expect_len:
return False, f"length_mismatch got={len(data)} expected={expect_len}"
# 7) 校验Content-Md5base64
if expect_md5_b64 and hashlib is not None:
md5_b64 = _md5_base64(data)
if md5_b64 != expect_md5_b64:
return False, f"md5_mismatch got={md5_b64} expected={expect_md5_b64}"
# 8) 写文件(原样 bytes
with open(filename, "wb") as f:
f.write(data)
return True, f"OK size={len(data)} ip={ip} code={status_code}"
global ota_in_progress
ota_in_progress = True
with uart4g_lock:
try:
last_err = "unknown"
for attempt in range(1, retries + 1):
ok, msg = _one_attempt()
if ok:
return True, msg
last_err = msg
# 重试前等待 + 清 UART
_log(f"[RETRY] attempt={attempt} failed={msg}")
_drain_uart(500)
time.sleep_ms(300)
return False, f"FAILED after {retries} retries: {last_err}"
finally:
ota_in_progress = False
def direct_ota_download_via_4g(ota_url):
"""通过 4G 模块下载 OTA不需要 Wi-Fi"""
global update_thread_started
try:
if not ota_url:
safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2)
return
print(f"[OTA-4G] 开始通过 4G 下载: {ota_url}")
success, msg = download_file_via_4g(ota_url, local_filename)
print(f"[OTA-4G] {msg}")
if success and "OK" in msg:
# 下载成功:备份+替换+重启
if apply_ota_and_reboot(ota_url):
return # 会重启,不会执行到 finally
else:
safe_enqueue({"result": msg}, 2)
except Exception as e:
error_msg = f"OTA-4G 异常: {str(e)}"
print(error_msg)
safe_enqueue({"result": "ota_failed", "reason": error_msg}, 2)
finally:
update_thread_started = False
# ==================== 主程序入口 ====================
def cmd_str():
@@ -853,10 +1367,9 @@ def cmd_str():
while not app.need_exit():
current_time = time.ticks_ms()
# print("压力传感器数值: ", adc_obj.read())
adc_val = adc_obj.read()
if adc_val > ADC_TRIGGER_THRESHOLD:
diff_ms = current_time - last_adc_trigger
if diff_ms < 3000:
if adc_obj.read() > ADC_TRIGGER_THRESHOLD:
diff_ms = current_time-last_adc_trigger
if diff_ms<3000:
continue
last_adc_trigger = current_time
time.sleep_ms(60) # 防抖
@@ -873,7 +1386,7 @@ def cmd_str():
disp.show(result_img)
# 计算偏移与距离
dx, dy = compute_laser_position_v2(center, (x, y))
dx, dy = compute_laser_position(center, (x, y), radius, method)
distance_m = estimate_distance(best_radius1)
# 读取电量
@@ -881,20 +1394,20 @@ def cmd_str():
battery_percent = voltage_to_percent(voltage)
# 保存图像(带标注)
# try:
# jpg_count = len([f for f in os.listdir(photo_dir) if f.endswith('.jpg')])
# filename = f"{photo_dir}/{int(x)}_{int(y)}_{round((distance_m or 0.0) * 100)}_{method}_{jpg_count:04d}.jpg"
# result_img.save(filename, quality=70)
# except Exception as e:
# print(f"❌ 保存失败: {e}")
try:
jpg_count = len([f for f in os.listdir(photo_dir) if f.endswith('.jpg')])
filename = f"{photo_dir}/{int(x)}_{int(y)}_{round((distance_m or 0.0) * 100)}_{method}_{jpg_count:04d}.jpg"
result_img.save(filename, quality=70)
except Exception as e:
print(f"❌ 保存失败: {e}")
# 构造上报数据
inner_data = {
"x": float(dx) if dx is not None else 200.0,
"y": float(dy) if dy is not None else 200.0,
"r": 90.0,
"d": round((distance_m or 0.0) * 100), # 距离(厘米)
"m": method,
"adc": adc_val,
"m": method
}
report_data = {"cmd": 1, "data": inner_data}
# 射箭事件高优先级入队,由 tcp_main 统一发送
@@ -906,6 +1419,36 @@ def cmd_str():
disp.show(cam.read())
time.sleep_ms(50)
def dump_system_info():
cmds = [
"uname -a",
"cat /etc/os-release 2>/dev/null",
"cat /proc/version 2>/dev/null",
"cat /proc/1/comm 2>/dev/null",
"ps 2>/dev/null | head -n 5",
"ls -l /sbin/init 2>/dev/null",
"ls -l /etc/init.d 2>/dev/null | head -n 10",
"which systemctl 2>/dev/null",
"which rc-service 2>/dev/null",
"which busybox 2>/dev/null && busybox | head -n 1",
"ls /dev/watchdog* 2>/dev/null",
]
for c in cmds:
try:
out = os.popen(c).read()
print("\n$ " + c + "\n" + (out.strip() or "<empty>"))
except Exception as e:
print("\n$ " + c + "\n<error> " + str(e))
if __name__ == "__main__":
# dump_system_info()
try:
import threading
print("threading module:", threading)
print("has Lock:", hasattr(threading, "Lock"))
if hasattr(threading, "Lock"):
print("has lock")
finally:
pass
cmd_str()