Files
archery/main.py
2025-12-30 09:21:58 +08:00

1836 lines
68 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
激光射击系统主程序(视觉测距版)
功能目标检测、激光校准、4G TCP 通信、OTA 升级、单目测距、INA226 电量监测
平台MaixPy (Sipeed MAIX)
作者ZZH
最后更新2025-11-21
"""
from maix import camera, display, image, app, time, key, uart, pinmap, i2c, network, err
import cv2
import numpy as np
import json
import struct
import re
from maix.peripheral import adc
import _thread
import os
import hmac
import ujson
import hashlib
import requests
import socket
import re
import binascii
try:
import hashlib
except:
hashlib = None
# import config
# ==================== Locks ====================
class _Mutex:
"""
基于 _thread.allocate_lock() 的互斥锁封装:
- 支持 with
- 支持 try_acquire若固件不支持非阻塞 acquire 参数,则退化为阻塞 acquire
"""
def __init__(self):
self._lk = _thread.allocate_lock()
def acquire(self, blocking=True):
try:
return self._lk.acquire(blocking)
except TypeError:
self._lk.acquire()
return True
def try_acquire(self):
return self.acquire(False)
def release(self):
self._lk.release()
def __enter__(self):
self.acquire(True)
return self
def __exit__(self, exc_type, exc, tb):
self.release()
return False
class ATClient:
"""
单读者 AT/URC 客户端:唯一读取 uart4g避免 tcp_main/at()/OTA 抢读导致 EOF / 丢包。
- send(cmd, expect, timeout_ms) : 发送 AT 并等待 expect
- pop_tcp_payload() : 获取 +MIPURC:"rtcp" 的 payload已按长度裁剪
- pop_http_event() : 获取 +MHTTPURC 事件header/content
"""
def __init__(self, uart_obj):
self.uart = uart_obj
self._cmd_lock = _Mutex()
self._q_lock = _Mutex()
self._rx = b""
self._tcp_payloads = []
self._http_events = []
# 当前命令等待状态(仅允许单命令 in-flight
self._waiting = False
self._expect = b"OK"
self._resp = b""
self._running = False
def start(self):
if self._running:
return
self._running = True
_thread.start_new_thread(self._reader_loop, ())
def stop(self):
self._running = False
def flush(self):
"""清空内部缓存与队列(用于 OTA/异常恢复)"""
with self._q_lock:
self._rx = b""
self._tcp_payloads.clear()
self._http_events.clear()
self._resp = b""
def pop_tcp_payload(self):
with self._q_lock:
if self._tcp_payloads:
return self._tcp_payloads.pop(0)
return None
def pop_http_event(self):
with self._q_lock:
if self._http_events:
return self._http_events.pop(0)
return None
def _push_tcp_payload(self, payload: bytes):
# 注意:在 _reader_loop 内部解析 URC 时已经持有 _q_lock
# 这里不要再次 acquire锁不可重入会死锁
self._tcp_payloads.append(payload)
def _push_http_event(self, ev):
# 同上:避免在 _reader_loop 持锁期间二次 acquire
self._http_events.append(ev)
def send(self, cmd: str, expect: str = "OK", timeout_ms: int = 2000):
"""
发送 AT 命令并等待 expect子串匹配
注意expect=">" 用于等待 prompt。
"""
expect_b = expect.encode() if isinstance(expect, str) else expect
with self._cmd_lock:
# 初始化等待
self._waiting = True
self._expect = expect_b
self._resp = b""
# 发送
if cmd:
# 注意:这里不要再用 uart4g_lock否则外层已经持锁时会死锁
# 写入由 _cmd_lock 串行化即可。
self.uart.write((cmd + "\r\n").encode())
t0 = time.ticks_ms()
while time.ticks_ms() - t0 < timeout_ms:
if (not self._waiting) or (self._expect in self._resp):
self._waiting = False
break
time.sleep_ms(5)
# 超时也返回已收集内容(便于诊断)
self._waiting = False
try:
return self._resp.decode(errors="ignore")
except:
return str(self._resp)
def _parse_mipurc_rtcp(self):
"""
解析:+MIPURC: "rtcp",<link_id>,<len>,<payload...>
之前硬编码 link_id=0 会导致在多连接/重连场景下收不到数据。
"""
prefix = b'+MIPURC: "rtcp",'
i = self._rx.find(prefix)
if i < 0:
return False
# 丢掉前置噪声
if i > 0:
self._rx = self._rx[i:]
i = 0
j = len(prefix)
# 解析 link_id
k = j
while k < len(self._rx) and 48 <= self._rx[k] <= 57:
k += 1
if k == j or k >= len(self._rx):
return False
if self._rx[k:k+1] != b",":
self._rx = self._rx[1:]
return True
try:
link_id = int(self._rx[j:k].decode())
except:
self._rx = self._rx[1:]
return True
# 解析 len
j2 = k + 1
k2 = j2
while k2 < len(self._rx) and 48 <= self._rx[k2] <= 57:
k2 += 1
if k2 == j2 or k2 >= len(self._rx):
return False
if self._rx[k2:k2+1] != b",":
self._rx = self._rx[1:]
return True
try:
n = int(self._rx[j2:k2].decode())
except:
self._rx = self._rx[1:]
return True
payload_start = k2 + 1
payload_end = payload_start + n
if len(self._rx) < payload_end:
return False # payload 未收齐
payload = self._rx[payload_start:payload_end]
# 把 link_id 一起带上,便于上层过滤(如果需要)
self._push_tcp_payload((link_id, payload))
self._rx = self._rx[payload_end:]
return True
def _parse_mhttpurc_header(self):
tag = b'+MHTTPURC: "header",'
i = self._rx.find(tag)
if i < 0:
return False
if i > 0:
self._rx = self._rx[i:]
i = 0
# header: +MHTTPURC: "header",<id>,<code>,<hdr_len>,<hdr_text...>
j = len(tag)
comma_count = 0
k = j
while k < len(self._rx) and comma_count < 3:
if self._rx[k:k+1] == b",":
comma_count += 1
k += 1
if comma_count < 3:
return False
prefix = self._rx[:k]
m = re.search(rb'\+MHTTPURC: "header",\s*(\d+),\s*(\d+),\s*(\d+),', prefix)
if not m:
self._rx = self._rx[1:]
return True
urc_id = int(m.group(1))
code = int(m.group(2))
hdr_len = int(m.group(3))
text_start = k
text_end = text_start + hdr_len
if len(self._rx) < text_end:
return False
hdr_text = self._rx[text_start:text_end].decode("utf-8", "ignore")
self._push_http_event(("header", urc_id, code, hdr_text))
self._rx = self._rx[text_end:]
return True
def _parse_mhttpurc_content(self):
tag = b'+MHTTPURC: "content",'
i = self._rx.find(tag)
if i < 0:
return False
if i > 0:
self._rx = self._rx[i:]
i = 0
# content: +MHTTPURC: "content",<id>,<total>,<sum>,<cur>,<payload...>
j = len(tag)
comma_count = 0
k = j
while k < len(self._rx) and comma_count < 4:
if self._rx[k:k+1] == b",":
comma_count += 1
k += 1
if comma_count < 4:
return False
prefix = self._rx[:k]
m = re.search(rb'\+MHTTPURC: "content",\s*(\d+),\s*(\d+),\s*(\d+),\s*(\d+),', prefix)
if not m:
self._rx = self._rx[1:]
return True
urc_id = int(m.group(1))
total_len = int(m.group(2))
sum_len = int(m.group(3))
cur_len = int(m.group(4))
payload_start = k
payload_end = payload_start + cur_len
if len(self._rx) < payload_end:
return False
payload = self._rx[payload_start:payload_end]
self._push_http_event(("content", urc_id, total_len, sum_len, cur_len, payload))
self._rx = self._rx[payload_end:]
return True
def _reader_loop(self):
while self._running:
d = self.uart.read(4096)
if not d:
time.sleep_ms(2)
continue
with self._q_lock:
# 统一累积到内部 buffer用于 URC 解析)
self._rx += d
# 命令等待期间,把原始字节流复制到响应缓冲(不影响 URC 解析)
if self._waiting:
self._resp += d
# 解析 URC尽可能多地从 _rx 中剥离完整 URC避免丢包
while True:
progressed = (
self._parse_mipurc_rtcp()
or self._parse_mhttpurc_header()
or self._parse_mhttpurc_content()
)
if not progressed:
break
# 防止 _rx 因为"非 URC 文本/回显"无限增长:保留尾部即可
# 关键修复:如果 buffer 中有 HTTP URC 标签(说明 OTA 在进行),完全禁用截断
# 避免在 OTA 下载时截断 buffer 导致数据丢失(之前 16KB 限制太小,导致数据被截断)
has_http_urc = (b'+MHTTPURC: "content"' in self._rx or
b'+MHTTPURC: "header"' in self._rx)
if has_http_urc:
# OTA 下载中:完全禁用 buffer 截断,避免数据丢失
# 通常 OTA 文件不会超过几百 KB即使不截断也不会导致内存问题
pass # 不截断
else:
# 非 OTA 状态:使用较小的 buffer 限制16KB
if len(self._rx) > 16384:
self._rx = self._rx[-4096:]
# ==================== 全局配置 ====================
# OTA 升级地址与本地路径
# url = "https://static.shelingxingqiu.com/shoot/202511031031/main.py"
local_filename = "/maixapp/apps/t11/main_tmp.py"
app_version = '1.0.0'
# OTA 下发参数(由后端指令写入)
OTA_URL = None
OTA_MODE = None # "4g" / "wifi" / None
def is_wifi_connected():
"""尽量判断当前是否有 Wi-Fi有则走 Wi-Fi OTA否则走 4G OTA"""
# 优先用 MaixPy network如果可用
try:
wlan = network.WLAN(network.TYPE_WIFI)
if wlan.isconnected():
return True
except:
pass
# 兜底:看系统 wlan0 有没有 IP你系统可能没有 wlan0则返回 False
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
return bool(ip)
except:
return False
# 设备认证信息(运行时动态加载)
DEVICE_ID = None
PASSWORD = None
# 服务器连接参数
SERVER_IP = "www.shelingxingqiu.com"
SERVER_PORT = 50005
HEARTBEAT_INTERVAL = 60 # 心跳间隔(秒)
# 激光校准配置
CONFIG_FILE = "/root/laser_config.json"
DEFAULT_POINT = (640, 480) # 默认激光中心点(图像中心)
laser_point = DEFAULT_POINT
# HTTP 上报接口
URL = "http://ws.shelingxingqiu.com"
API_PATH = "/home/shoot/device_fire/arrow/fire"
# UART 设备初始化
uart4g = uart.UART("/dev/ttyS2", 115200) # 4G 模块通信
distance_serial = uart.UART("/dev/ttyS1", 9600) # 激光测距模块
# 单读者 ATClient唯一读取 uart4g
at_client = ATClient(uart4g)
at_client.start()
# 引脚功能映射
pinmap.set_pin_function("A18", "UART1_RX")
pinmap.set_pin_function("A19", "UART1_TX")
pinmap.set_pin_function("A29", "UART2_RX")
pinmap.set_pin_function("A28", "UART2_TX")
pinmap.set_pin_function("P18", "I2C1_SCL")
pinmap.set_pin_function("P21", "I2C1_SDA")
# pinmap.set_pin_function("A15", "I2C5_SCL")
# pinmap.set_pin_function("A27", "I2C5_SDA")#ota升级要修改的
# ADC 触发阈值(用于检测扳机/激光触发)
ADC_TRIGGER_THRESHOLD = 3000
ADC_LASER_THRESHOLD = 3000
# 显示参数:激光十字线样式
color = image.Color(255, 100, 0)
thickness = 1
length = 2
# 全局状态变量
laser_calibration_active = False # 是否正在后台校准激光
laser_calibration_result = None # 校准结果坐标 (x, y)
laser_calibration_lock = _Mutex() # 互斥锁,防止多线程冲突
# 硬件对象初始化
laser_x, laser_y = laser_point
adc_obj = adc.ADC(0, adc.RES_BIT_12)
bus = i2c.I2C(1, i2c.Mode.MASTER) # 使用 I2C1 总线
# bus = i2c.I2C(5, i2c.Mode.MASTER) #ota升级的
# INA226 电流/电压监测芯片寄存器地址
INA226_ADDR = 0x40
REG_CONFIGURATION = 0x00
REG_BUS_VOLTAGE = 0x02
REG_CALIBRATION = 0x05
CALIBRATION_VALUE = 0x1400
# 激光控制指令(自定义协议)
MODULE_ADDR = 0x00
LASER_ON_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC1])
LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0])
# 相机标定参数(用于距离估算)
# FOCAL_LENGTH_PIX = 3800.0 # 焦距(像素)
FOCAL_LENGTH_PIX = 1900.0 # 焦距(像素)
REAL_RADIUS_CM = 15 # 靶心实际半径(厘米)
# # TCP 连接状态
tcp_connected = False
high_send_queue = [] # 高优先级发送队列:射箭事件等
normal_send_queue = [] # 普通发送队列:电量/校准结果/状态等
queue_lock = _Mutex() # 互斥锁,保护队列
uart4g_lock = _Mutex() # 互斥锁,保护 4G 串口 AT 发送流程(防并发)
update_thread_started = False # 防止 OTA 更新线程重复启动
ota_in_progress = False # OTA(4G HTTP URC) 期间暂停 tcp_main 读取 uart4g避免吞掉 +MHTTPURC
# ==================== 工具函数 ====================
def download_file(url, filename):
"""从指定 URL 下载文件并保存为 UTF-8 编码文本"""
try:
print(f"正在从 {url} 下载文件...")
response = requests.get(url)
response.raise_for_status()
response.encoding = 'utf-8'
with open(filename, 'w', encoding='utf-8') as file:
file.write(response.text)
return f"下载成功!文件已保存为: {filename}"
except requests.exceptions.RequestException as e:
return f"下载失败!网络请求错误: {e}"
except OSError as e:
return f"下载失败!文件写入错误: {e}"
except Exception as e:
return f"下载失败!发生未知错误: {e}"
def is_server_reachable(host, port=80, timeout=5):
"""检查目标主机端口是否可达(用于 OTA 前网络检测)"""
try:
addr_info = socket.getaddrinfo(host, port)[0]
s = socket.socket(addr_info[0], addr_info[1], addr_info[2])
s.settimeout(timeout)
s.connect(addr_info[-1])
s.close()
return True
except Exception as e:
print(f"[NET] 无法连接 {host}:{port} - {e}")
return False
def apply_ota_and_reboot(ota_url=None):
# TODO: remove this return after test
# return True
"""
OTA 文件下载成功后:备份原 main.py -> 替换 main_tmp.py -> 重启设备
"""
import shutil
main_py = "/maixapp/apps/t11/main.py"
main_tmp = "/maixapp/apps/t11/main_tmp.py"
main_bak = "/maixapp/apps/t11/main.py.bak"
ota_pending = "/maixapp/apps/t11/ota_pending.json"
try:
# 1. 检查下载的文件是否存在
if not os.path.exists(main_tmp):
print(f"[OTA] 错误:{main_tmp} 不存在")
return False
# 2. 备份原 main.py如果存在
if os.path.exists(main_py):
try:
shutil.copy2(main_py, main_bak)
print(f"[OTA] 已备份 {main_py} -> {main_bak}")
except Exception as e:
print(f"[OTA] 备份失败: {e}")
# 备份失败也继续(可能没有原文件)
# 3. 替换main_tmp.py -> main.py
try:
shutil.copy2(main_tmp, main_py)
print(f"[OTA] 已替换 {main_tmp} -> {main_py}")
# 确保写入磁盘
try:
os.sync() # 如果系统支持
except:
pass
time.sleep_ms(500) # 额外等待确保写入完成
except Exception as e:
print(f"[OTA] 替换失败: {e}")
return False
# 3.5 写入 pending用于重启后确认成功并上报
try:
pending_obj = {
"ts": int(time.time()) if hasattr(time, "time") else 0,
"url": ota_url or "",
"tmp": main_tmp,
"main": main_py,
"bak": main_bak,
}
with open(ota_pending, "w", encoding="utf-8") as f:
json.dump(pending_obj, f)
try:
os.sync()
except:
pass
except Exception as e:
print(f"[OTA] 写入 ota_pending 失败: {e}")
# 4. 通知服务器(可选,但重启前发一次)
safe_enqueue({"result": "ota_applied_rebooting"}, 2)
time.sleep_ms(1000) # 给一点时间让消息发出
# 5. 重启设备
print("[OTA] 准备重启设备...")
os.system("reboot") # MaixPy 通常是这个命令
return True
except Exception as e:
print(f"[OTA] apply_ota_and_reboot 异常: {e}")
return False
def direct_ota_download(ota_url):
"""
直接执行 OTA 下载(假设已有网络)
用于 cmd=7 / 或 wifi 模式
"""
global update_thread_started
try:
if not ota_url:
safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2)
return
from urllib.parse import urlparse
parsed_url = urlparse(ota_url)
host = parsed_url.hostname
port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80)
if not is_server_reachable(host, port, timeout=8):
safe_enqueue({"result": "ota_failed", "reason": f"无法连接 {host}:{port}"}, 2)
return
print(f"[OTA] 开始下载: {ota_url}")
# from ota import download_file
result_msg = download_file(ota_url, local_filename)
print(f"[OTA] {result_msg}")
# 检查是否下载成功(包含"成功"或"下载成功"关键字)
if "成功" in result_msg or "下载成功" in result_msg:
# 下载成功:备份+替换+重启
if apply_ota_and_reboot(ota_url):
return # 会重启,不会执行到 finally
else:
safe_enqueue({"result": result_msg}, 2)
except Exception as e:
error_msg = f"OTA 异常: {str(e)}"
print(error_msg)
safe_enqueue({"result": "ota_failed", "reason": error_msg}, 2)
finally:
update_thread_started = False
def handle_wifi_and_update(ssid, password, ota_url):
"""在子线程中执行 Wi-Fi 连接 + OTA 更新流程"""
global update_thread_started
try:
ip, error = connect_wifi(ssid, password)
if error:
safe_enqueue({"result": "wifi_failed", "error": error}, 2)
return
safe_enqueue({"result": "wifi_connected", "ip": ip}, 2)
# 下载
if not ota_url:
safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2)
return
from urllib.parse import urlparse
parsed_url = urlparse(ota_url)
host = parsed_url.hostname
port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80)
if not is_server_reachable(host, port, timeout=8):
err_msg = f"网络不通:无法连接 {host}:{port}"
safe_enqueue({"result": err_msg}, 2)
return
print(f"[NET] 已确认可访问 {host}:{port},开始下载...")
result = download_file(ota_url, local_filename)
print(result)
# 检查是否下载成功(包含"成功"或"下载成功"关键字)
if "成功" in result or "下载成功" in result:
# 下载成功:备份+替换+重启
if apply_ota_and_reboot(ota_url):
return # 会重启,不会执行到 finally
else:
safe_enqueue({"result": result}, 2)
finally:
update_thread_started = False
print("[UPDATE] 更新线程执行完毕,即将退出。")
def connect_wifi(ssid, password):
"""
连接 Wi-Fi 并将凭证持久化保存到 /boot/ 目录,
以便设备重启后自动连接。
"""
conf_path = "/etc/wpa_supplicant.conf"
ssid_file = "/boot/wifi.ssid"
pass_file = "/boot/wifi.pass"
try:
# 生成 wpa_supplicant 配置
net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read()
if "network={" not in net_conf:
return None, "Failed to generate wpa config"
# 写入运行时配置
with open(conf_path, "w") as f:
f.write("ctrl_interface=/var/run/wpa_supplicant\n")
f.write("update_config=1\n\n")
f.write(net_conf)
# 持久化保存 SSID/PASS关键
with open(ssid_file, "w") as f:
f.write(ssid.strip())
with open(pass_file, "w") as f:
f.write(password.strip())
# 重启 Wi-Fi 服务
os.system("/etc/init.d/S30wifi restart")
# 等待获取 IP
for _ in range(20):
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
if ip:
return ip, None
time.sleep(1)
return None, "Timeout: No IP obtained"
except Exception as e:
return None, f"Exception: {str(e)}"
def read_device_id():
"""从 /device_key 文件读取设备唯一 ID失败则使用默认值"""
try:
with open("/device_key", "r") as f:
device_id = f.read().strip()
if device_id:
print(f"[INFO] 从 /device_key 读取到 DEVICE_ID: {device_id}")
return device_id
except Exception as e:
print(f"[ERROR] 无法读取 /device_key: {e}")
return "DEFAULT_DEVICE_ID"
def safe_enqueue(data_dict, msg_type=2, high=False):
"""线程安全地将消息加入 TCP 发送队列(支持优先级)"""
global queue_lock, high_send_queue, normal_send_queue
item = (msg_type, data_dict)
with queue_lock:
if high:
high_send_queue.append(item)
else:
normal_send_queue.append(item)
def at(cmd, wait="OK", timeout=2000):
"""向 4G 模块发送 AT 指令并等待响应"""
# 统一由 ATClient 负责读 uart4g避免多线程抢读
return at_client.send(cmd, wait, timeout)
def make_packet(msg_type: int, body_dict: dict) -> bytes:
"""打包 TCP 数据包:头部(长度+类型+校验)+ JSON 正文"""
body = json.dumps(body_dict).encode()
body_len = len(body)
checksum = body_len + msg_type
header = struct.pack(">III", body_len, msg_type, checksum)
return header + body
def parse_packet(data: bytes):
"""解析 TCP 数据包,返回 (类型, 正文字典)"""
if len(data) < 12:
return None, None
body_len, msg_type, checksum = struct.unpack(">III", data[:12])
body = data[12:12 + body_len]
try:
return msg_type, json.loads(body.decode())
except:
return msg_type, {"raw": body.decode(errors="ignore")}
def tcp_send_raw(data: bytes, max_retries=2) -> bool:
global tcp_connected
if not tcp_connected:
return False
with uart4g_lock:
for _ in range(max_retries):
cmd = f'AT+MIPSEND=0,{len(data)}'
if ">" not in at(cmd, ">", 2000):
time.sleep_ms(50)
continue
# 关键:确保把 data 全部写出去
total = 0
while total < len(data):
n = uart4g.write(data[total:])
if not n or n < 0:
time.sleep_ms(1)
continue
total += n
# 关键:再发结束符(不算进 payload
uart4g.write(b"\x1A")
# 等发送完成确认(不同固件可能是 SEND OK / OK / +MIPSEND
r = at("", "OK", 8000)
if ("SEND OK" in r) or ("OK" in r) or ("+MIPSEND" in r):
return True
time.sleep_ms(50)
return False
def generate_token(device_id):
"""生成用于 HTTP 接口鉴权的 TokenHMAC-SHA256"""
SALT = "shootMessageFire"
SALT2 = "shoot"
return "Arrow_" + hmac.new((SALT + device_id).encode(), SALT2.encode(), hashlib.sha256).hexdigest()
def send_http_cmd(cmd_str, timeout_ms=3000):
"""发送 HTTP 相关 AT 指令(调试用)"""
print("[HTTP AT] =>", cmd_str)
return at(cmd_str, "OK", timeout_ms)
def read_http_response(timeout_ms=5000):
"""读取并打印 HTTP 响应(用于调试)"""
# 仅保留占位UART 读取由 ATClient 独占;如需调试,请从 ATClient 的 http_events 中取。
time.sleep_ms(timeout_ms)
def upload_shoot_event(json_data):
"""通过 4G 模块上报射击事件到 HTTP 接口(备用通道)"""
token = generate_token(DEVICE_ID)
if not send_http_cmd(f'AT+MHTTPCREATE="{URL}"'):
return False
instance_id = 0
send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"Content-Type: application/json"')
send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"Authorization: {token}"')
send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"DeviceId: {DEVICE_ID}"')
json_str = ujson.dumps(json_data)
if not send_http_cmd(f'AT+MHTTPCONTENT={instance_id},0,0,"{json_str}"'):
return False
if send_http_cmd(f'AT+MHTTPREQUEST={instance_id},2,0,"{API_PATH}"'):
read_http_response()
return True
return False
def load_laser_point():
"""从配置文件加载激光中心点,失败则使用默认值"""
global laser_point
try:
if "laser_config.json" in os.listdir("/root"):
with open(CONFIG_FILE, "r") as f:
data = json.load(f)
if isinstance(data, list) and len(data) == 2:
laser_point = (int(data[0]), int(data[1]))
print(f"[INFO] 加载激光点: {laser_point}")
else:
raise ValueError
else:
laser_point = DEFAULT_POINT
except:
laser_point = DEFAULT_POINT
def save_laser_point(point):
"""保存激光中心点到配置文件"""
global laser_point
try:
with open(CONFIG_FILE, "w") as f:
json.dump([point[0], point[1]], f)
laser_point = point
except:
pass
def turn_on_laser():
"""发送指令开启激光,并读取回包(部分模块支持)"""
distance_serial.write(LASER_ON_CMD)
time.sleep_ms(10)
resp = distance_serial.read(20)
if resp:
if resp == LASER_ON_CMD:
print("✅ 激光指令已确认")
else:
print("🔇 无回包(正常或模块不支持)")
return resp
def flash_laser(duration_ms=1000):
"""闪一下激光(用于射箭反馈)"""
try:
distance_serial.write(LASER_ON_CMD)
time.sleep_ms(duration_ms)
distance_serial.write(LASER_OFF_CMD)
except Exception as e:
print(f"闪激光失败: {e}")
def find_red_laser(frame, threshold=150):
"""在图像中查找最亮的红色激光点(基于 RGB 阈值)"""
w, h = frame.width(), frame.height()
img_bytes = frame.to_bytes()
max_sum = 0
best_pos = None
for y in range(0, h, 2):
for x in range(0, w, 2):
idx = (y * w + x) * 3
r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2]
if r > threshold and r > g * 2 and r > b * 2:
rgb_sum = r + g + b
if rgb_sum > max_sum:
max_sum = rgb_sum
best_pos = (x, y)
return best_pos
def calibrate_laser_position():
"""执行一次激光校准:拍照 → 找红点 → 保存坐标"""
global laser_x, laser_y
time.sleep_ms(80)
cam = camera.Camera(640, 480)
frame = cam.read()
pos = find_red_laser(frame)
if pos:
laser_x, laser_y = pos
save_laser_point(pos)
return pos
return None
# ==================== 电源管理INA226 ====================
def write_register(reg, value):
data = [(value >> 8) & 0xFF, value & 0xFF]
bus.writeto_mem(INA226_ADDR, reg, bytes(data))
def read_register(reg):
data = bus.readfrom_mem(INA226_ADDR, reg, 2)
return (data[0] << 8) | data[1]
def init_ina226():
"""初始化 INA226 芯片:配置模式 + 校准值"""
write_register(REG_CONFIGURATION, 0x4527)
write_register(REG_CALIBRATION, CALIBRATION_VALUE)
def get_bus_voltage():
"""读取总线电压单位V"""
raw = read_register(REG_BUS_VOLTAGE)
return raw * 1.25 / 1000
def voltage_to_percent(voltage):
"""根据电压估算电池百分比(查表插值)"""
points = [
(4.20, 100), (4.10, 95), (4.05, 85), (4.00, 75), (3.95, 65),
(3.90, 55), (3.85, 45), (3.80, 35), (3.75, 25), (3.70, 15),
(3.65, 5), (3.60, 0)
]
if voltage >= points[0][0]:
return 100
if voltage <= points[-1][0]:
return 0
for i in range(len(points) - 1):
v1, p1 = points[i]
v2, p2 = points[i + 1]
if voltage >= v2:
ratio = (voltage - v1) / (v2 - v1)
percent = p1 + (p2 - p1) * ratio
return max(0, min(100, int(round(percent))))
return 0
# ==================== 靶心检测与距离计算 ====================
def detect_circle(frame):
"""检测图像中的靶心(优先清晰轮廓,其次黄色区域)"""
global REAL_RADIUS_CM
img_cv = image.image2cv(frame, False, False)
gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY)
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
edged = cv2.Canny(blurred, 50, 150)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
ceroded = cv2.erode(cv2.dilate(edged, kernel), kernel)
contours, _ = cv2.findContours(ceroded, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
best_center = best_radius = best_radius1 = method = None
# 方法1基于轮廓拟合椭圆清晰靶心
for cnt in contours:
area = cv2.contourArea(cnt)
perimeter = cv2.arcLength(cnt, True)
if perimeter < 100 or area < 100:
continue
circularity = 4 * np.pi * area / (perimeter ** 2)
if circularity > 0.75 and len(cnt) >= 5:
center, axes, angle = cv2.fitEllipse(cnt)
radius = (axes[0] + axes[1]) / 4
best_center = (int(center[0]), int(center[1]))
best_radius = int(radius)
best_radius1 = best_radius
REAL_RADIUS_CM = 15
method = "清晰"
break
# 方法2基于 HSV 黄色掩码(模糊靶心)
if not best_center:
hsv = cv2.cvtColor(img_cv, cv2.COLOR_BGR2HSV)
h, s, v = cv2.split(hsv)
s = np.clip(s * 2, 0, 255).astype(np.uint8)
hsv = cv2.merge((h, s, v))
lower_yellow = np.array([7, 80, 0])
upper_yellow = np.array([32, 255, 182])
mask = cv2.inRange(hsv, lower_yellow, upper_yellow)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
mask = cv2.morphologyEx(mask, cv2.MORPH_DILATE, kernel)
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if contours:
largest = max(contours, key=cv2.contourArea)
if cv2.contourArea(largest) > 50:
(x, y), radius = cv2.minEnclosingCircle(largest)
best_center = (int(x), int(y))
best_radius = int(radius)
best_radius1 = best_radius
REAL_RADIUS_CM = 15
method = "模糊"
result_img = image.cv2image(img_cv, False, False)
return result_img, best_center, best_radius, method, best_radius1
def estimate_distance(pixel_radius):
"""根据像素半径估算实际距离(单位:米)"""
if not pixel_radius:
return 0.0
return (REAL_RADIUS_CM * FOCAL_LENGTH_PIX) / pixel_radius / 100.0
def compute_laser_position(circle_center, laser_point, radius, method):
"""计算激光相对于靶心的偏移量(单位:厘米)"""
if not all([circle_center, radius, method]):
return None, None
cx, cy = circle_center
lx, ly = laser_point
# 根据检测方法动态调整靶心物理半径(简化模型)
circle_r = (radius / 4.0) * 20.0 if method == "模糊" else (68 / 16.0) * 20.0
dx = lx - cx
dy = ly - cy
return dx / (circle_r / 100.0), -dy / (circle_r / 100.0)
# ==================== TCP 通信线程 ====================
def connect_server():
"""通过 4G 模块建立 TCP 连接"""
global tcp_connected
if tcp_connected:
return True
print("连接到服务器...")
with uart4g_lock:
at("AT+MIPCLOSE=0", "OK", 1000)
res = at(f'AT+MIPOPEN=0,"TCP","{SERVER_IP}",{SERVER_PORT}', "+MIPOPEN", 8000)
if "+MIPOPEN: 0,0" in res:
tcp_connected = True
return True
return False
def tcp_main():
"""TCP 主通信循环:登录、心跳、处理指令、发送数据"""
global tcp_connected, high_send_queue, normal_send_queue, queue_lock, laser_calibration_active, laser_calibration_result, laser_calibration_lock, update_thread_started
send_hartbeat_fail_count = 0
while not app.need_exit():
if not connect_server():
time.sleep_ms(5000)
continue
# 发送登录包
login_data = {"deviceId": DEVICE_ID, "password": PASSWORD, "version": app_version}
if not tcp_send_raw(make_packet(1, login_data)):
tcp_connected = False
time.sleep_ms(2000)
continue
print("➡️ 登录包已发送,等待确认...")
logged_in = False
last_heartbeat_ack_time = time.ticks_ms()
last_heartbeat_send_time = time.ticks_ms()
while True:
# 接收数据唯一来源ATClient 解析后的 TCP payload 队列)
item = at_client.pop_tcp_payload()
if item:
# item 可能是 (link_id, payload) 或直接 payload兼容旧队列格式
if isinstance(item, tuple) and len(item) == 2:
link_id, payload = item
else:
link_id, payload = 0, item
# 登录阶段加一条轻量 debug确认 ACK 是否进入队列
if not logged_in:
try:
print(f"[TCP] rx link={link_id} len={len(payload)} head={payload[:12].hex()}")
except:
pass
msg_type, body = parse_packet(payload)
# 处理登录响应
if not logged_in and msg_type == 1:
if body and body.get("cmd") == 1 and body.get("data") == "登录成功":
logged_in = True
last_heartbeat_ack_time = time.ticks_ms()
print("✅ 登录成功")
# 若存在 ota_pending.json说明上次 OTA 已应用并重启;
# 这里以“能成功登录服务器”为 OTA 成功判据:上报 ota_ok 并删除 pending确保只上报一次。
try:
pending_path = "/maixapp/apps/t11/ota_pending.json"
if os.path.exists(pending_path):
try:
with open(pending_path, "r", encoding="utf-8") as f:
pending_obj = json.load(f)
except:
pending_obj = {}
safe_enqueue({"result": "ota_ok", "url": pending_obj.get("url", "")}, 2)
try:
os.remove(pending_path)
except:
pass
except Exception as e:
print(f"[OTA] ota_ok 上报失败: {e}")
else:
# 登录失败,跳出重连
break
# 处理心跳 ACK
elif logged_in and msg_type == 4:
last_heartbeat_ack_time = time.ticks_ms()
print("✅ 收到心跳确认")
# 处理业务指令
elif logged_in and isinstance(body, dict):
# 重要:每个包都要重新解析 inner_cmd避免上一次的 cmd “粘住”导致反复执行
inner_cmd = None
data_obj = body.get("data")
if isinstance(data_obj, dict):
inner_cmd = data_obj.get("cmd")
if inner_cmd == 2: # 开启激光并校准
# 幂等:正在校准则不重复触发(服务器可能重发 cmd=2
if not laser_calibration_active:
turn_on_laser()
time.sleep_ms(100)
laser_calibration_active = True
safe_enqueue({"result": "calibrating"}, 2)
elif inner_cmd == 3: # 关闭激光
distance_serial.write(LASER_OFF_CMD)
laser_calibration_active = False
safe_enqueue({"result": "laser_off"}, 2)
elif inner_cmd == 4: # 上报电量
voltage = get_bus_voltage()
battery_percent = voltage_to_percent(voltage)
battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)}
safe_enqueue(battery_data, 2)
print(f"🔋 电量上报: {battery_percent}%")
elif inner_cmd == 5: # OTA 升级(含 Wi-Fi 配置及4g
inner_data = data_obj.get("data", {}) if isinstance(data_obj, dict) else {}
ssid = inner_data.get("ssid")
password = inner_data.get("password")
ota_url = inner_data.get("url")
mode = (inner_data.get("mode") or "").strip().lower() # "4g"/"wifi"/""
if not ota_url:
print("ota missing_url")
safe_enqueue({"result": "missing_url"}, 2)
continue
# 自动判断mode 非法/为空时,优先 Wi-Fi如果已连否则 4G
if mode not in ("4g", "wifi"):
print("ota missing mode")
mode = "wifi" if is_wifi_connected() else "4g"
if update_thread_started:
safe_enqueue({"result": "update_already_started"}, 2)
continue
update_thread_started = True
if mode == "4g":
_thread.start_new_thread(direct_ota_download_via_4g, (ota_url,))
else:
# wifi 模式:需要 ssid/password
if not ssid or not password:
update_thread_started = False
safe_enqueue({"result": "missing_ssid_or_password"}, 2)
else:
_thread.start_new_thread(handle_wifi_and_update, (ssid, password, ota_url))
elif inner_cmd == 6:
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
ip = ip if ip else "no_ip"
except:
ip = "error_getting_ip"
safe_enqueue({"result": "current_ip", "ip": ip}, 2)
elif inner_cmd == 7:
# global update_thread_started
if update_thread_started:
safe_enqueue({"result": "update_already_started"}, 2)
continue
# 实时检查是否有 IP
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
except:
ip = None
if not ip:
safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, MSG_TYPE_STATUS)
else:
# 启动纯下载线程
update_thread_started = True
_thread.start_new_thread(direct_ota_download, ())
else:
# 非指令包(或未携带 cmd不做任何动作
pass
else:
time.sleep_ms(5)
# 发送队列中的业务数据
if logged_in and (high_send_queue or normal_send_queue):
# 只在锁内取出一个待发包,发送放到锁外,避免长时间占用队列锁
msg_type = None
data_dict = None
if queue_lock.try_acquire():
try:
if high_send_queue:
msg_type, data_dict = high_send_queue.pop(0)
elif normal_send_queue:
msg_type, data_dict = normal_send_queue.pop(0)
finally:
queue_lock.release()
if msg_type is not None and data_dict is not None:
pkt = make_packet(msg_type, data_dict)
if not tcp_send_raw(pkt):
tcp_connected = False
break
# 发送激光校准结果
if logged_in and laser_calibration_result is not None:
x = y = None
with laser_calibration_lock:
if laser_calibration_result is not None:
x, y = laser_calibration_result
laser_calibration_result = None
if x is not None and y is not None:
safe_enqueue({"result": "ok", "x": x, "y": y}, 2)
# 定期发送心跳
current_time = time.ticks_ms()
if logged_in and current_time - last_heartbeat_send_time > HEARTBEAT_INTERVAL * 1000:
vol_val = get_bus_voltage()
if not tcp_send_raw(make_packet(4, {"vol":vol_val, "vol_per":voltage_to_percent(vol_val)})):
print("💔 心跳发送失败")
send_hartbeat_fail_count += 1
if send_hartbeat_fail_count >= 3:
send_hartbeat_fail_count = 0
print("连续3次发送心跳失败重连")
break
else:
continue
else:
send_hartbeat_fail_count = 0
last_heartbeat_send_time = current_time
print("💓 心跳已发送")
# 心跳超时重连
if logged_in and current_time - last_heartbeat_ack_time > 1000*60*10: # 十分钟
print("⏰ 十分钟无心跳ACK重连")
break
time.sleep_ms(50)
tcp_connected = False
print("🔌 连接异常2秒后重连...")
time.sleep_ms(2000)
def laser_calibration_worker():
"""后台线程:持续检测是否需要执行激光校准"""
global laser_calibration_active, laser_calibration_result, laser_calibration_lock
while True:
if laser_calibration_active:
# 关键:不要在每次尝试里反复 new Camera会导致 MMF 反复初始化刷屏)
cam = None
try:
cam = camera.Camera(640, 480)
start = time.ticks_ms()
timeout_ms = 8000 # 8 秒内找不到红点就退出一次,避免一直占用资源
while laser_calibration_active and time.ticks_diff(time.ticks_ms(), start) < timeout_ms:
frame = cam.read()
pos = find_red_laser(frame)
if pos:
with laser_calibration_lock:
laser_calibration_result = pos
laser_calibration_active = False
save_laser_point(pos)
print(f"✅ 后台校准成功: {pos}")
break
time.sleep_ms(60)
except Exception as e:
# 出错时也不要死循环刷屏
print(f"[LASER] calibration error: {e}")
time.sleep_ms(200)
finally:
try:
# 释放摄像头资源MaixPy 通常靠 GC但显式 del 更稳)
if cam is not None:
del cam
except:
pass
# 如果超时仍未成功,稍微休息一下再允许下一次 cmd=2 触发
if laser_calibration_active:
time.sleep_ms(300)
else:
time.sleep_ms(50)
def download_file_via_4g(url, filename,
total_timeout_ms=30000,
retries=3,
debug=False):
"""
ML307R HTTP 下载URC content 分片模式)
- 重试empty/incomplete/AT错误都会重试
- 超时total_timeout_ms
- 校验Content-Length 必须填满;如有 Content-Md5 且 hashlib 可用则校验 MD5
- 日志默认干净debug=True 才打印 URC 进度
"""
from urllib.parse import urlparse
parsed = urlparse(url)
host = parsed.hostname
path = parsed.path or "/"
base_url = f"http://{host}" # 你已验证 HTTP 可 200如需 https 需另配 SSL
def _log(*args):
if debug:
print(*args)
def _merge_ranges(ranges_iter):
"""合并重叠/相邻区间,返回 merged(list[(s,e)])(半开区间)"""
rs = sorted(ranges_iter)
merged = []
for s, e in rs:
if e <= s:
continue
if merged and s <= merged[-1][1]:
merged[-1] = (merged[-1][0], max(merged[-1][1], e))
else:
merged.append((s, e))
return merged
def _compute_gaps(total_len, got_ranges):
"""根据已填充区间计算缺口(半开区间)"""
if not total_len or total_len <= 0:
return [(0, 0)]
merged = _merge_ranges(got_ranges)
gaps = []
prev = 0
for s, e in merged:
if s > prev:
gaps.append((prev, s))
prev = max(prev, e)
if prev < total_len:
gaps.append((prev, total_len))
return gaps, merged
def _extract_content_range(hdr_text: str):
"""
Content-Range: bytes <start>-<end>/<total>
返回 (start, end, total);解析失败返回 (None,None,None)
"""
m = re.search(r"Content-Range:\s*bytes\s*(\d+)\s*-\s*(\d+)\s*/\s*(\d+)", hdr_text, re.IGNORECASE)
if not m:
return None, None, None
try:
return int(m.group(1)), int(m.group(2)), int(m.group(3))
except:
return None, None, None
def _get_ip():
r = at("AT+CGPADDR=1", "OK", 3000)
m = re.search(r'\+CGPADDR:\s*1,"([^"]+)"', r)
return m.group(1) if m else ""
def _clear_http_events():
# 清空旧的 HTTP URC 事件,避免串台
while at_client.pop_http_event() is not None:
pass
# 旧版基于直接 uart4g.read 的解析已迁移到 ATClient单读者保留函数占位避免大改动
def _parse_httpid(raw: bytes):
m = re.search(rb"\+MHTTPCREATE:\s*(\d+)", raw)
return int(m.group(1)) if m else None
# _try_parse_header/_try_parse_one_content 已由 ATClient 在 reader 线程中解析并推送事件
# _try_parse_one_content 已由 ATClient 解析
def _extract_hdr_fields(hdr_text: str):
# Content-Length
mlen = re.search(r"Content-Length:\s*(\d+)", hdr_text, re.IGNORECASE)
clen = int(mlen.group(1)) if mlen else None
# Content-Md5 (base64)
mmd5 = re.search(r"Content-Md5:\s*([A-Za-z0-9+/=]+)", hdr_text, re.IGNORECASE)
md5_b64 = mmd5.group(1).strip() if mmd5 else None
return clen, md5_b64
def _md5_base64(data: bytes) -> str:
if hashlib is None:
return ""
digest = hashlib.md5(data).digest()
# base64: 24 chars with ==
return binascii.b2a_base64(digest).decode().strip()
def _one_attempt(range_start=None, range_end=None,
body_buf=None, got_ranges=None,
total_len=None,
expect_md5_b64=None):
# 0) PDP确保有 IP避免把 OK 当成功)
ip = _get_ip()
if not ip or ip == "0.0.0.0":
at("AT+MIPCALL=1,1", "OK", 15000)
for _ in range(10):
ip = _get_ip()
if ip and ip != "0.0.0.0":
break
time.sleep(1)
if not ip or ip == "0.0.0.0":
return False, "PDP not ready (no_ip)", body_buf, got_ranges, total_len, expect_md5_b64
# 1) 清理旧实例 + 清空旧 HTTP 事件
for i in range(0, 6):
at(f"AT+MHTTPDEL={i}", "OK", 1500)
_clear_http_events()
# 2) 创建实例(用 at() 等待返回)
create_resp = at(f'AT+MHTTPCREATE="{base_url}"', "OK", 8000)
httpid = _parse_httpid(create_resp.encode())
if httpid is None:
return False, "MHTTPCREATE failed (no httpid)", body_buf, got_ranges, total_len, expect_md5_b64
# 2.5) Range 补洞按缺口请求指定字节段HTTP Range 右端是 inclusive
if range_start is not None and range_end is not None:
# 每次请求使用新 httpid避免 header 累积/污染
at(f'AT+MHTTPCFG="header",{httpid},"Range: bytes={int(range_start)}-{int(range_end)}"', "OK", 3000)
# 3) 发 GETHTTP URC 由 ATClient 解析并入队)
req_resp = at(f'AT+MHTTPREQUEST={httpid},1,0,"{path}"', "OK", 15000)
if "ERROR" in req_resp or "CME ERROR" in req_resp:
at(f"AT+MHTTPDEL={httpid}", "OK", 3000)
return False, f"MHTTPREQUEST failed: {req_resp}", body_buf, got_ranges, total_len, expect_md5_b64
# 4) 从 ATClient 的 http_events 队列收 header/content
urc_id = None
status_code = None
expect_len = None
# 若是 Range 响应206需要把响应内的偏移映射到“全文件”偏移
offset_base = 0
# got_ranges 记录“真实写入 body_buf 的半开区间”
if got_ranges is None:
got_ranges = set()
filled_new_bytes = 0
last_sum = 0
no_progress_count = 0 # 连续没有进展的次数
last_print_ms = time.ticks_ms()
last_print_sum = 0
t0 = time.ticks_ms()
while time.ticks_ms() - t0 < total_timeout_ms:
ev = at_client.pop_http_event()
if not ev:
# 如果 sum 已经达到 total_len但仍有 gaps等待更长时间有些分片可能延迟到达
if total_len and last_sum >= total_len:
gaps_now, merged_now = _compute_gaps(total_len, got_ranges)
if gaps_now and not (len(gaps_now) == 1 and gaps_now[0] == (0, 0)):
time.sleep_ms(50)
else:
time.sleep_ms(5)
else:
time.sleep_ms(5)
no_progress_count += 1
# 如果长时间没有新事件,且 sum 已经达到 total_len认为接收完成可能有丢包
if no_progress_count > 100 and total_len and last_sum >= total_len:
break
continue
no_progress_count = 0 # 有事件,重置计数器
if ev[0] == "header":
_, hid, code, hdr_text = ev
if urc_id is None:
urc_id = hid
if hid != urc_id:
continue
status_code = code
expect_len, md5_b64 = _extract_hdr_fields(hdr_text)
# 只在“首次全量 header”里保留 Content-Md5Range 响应通常不带该字段
if md5_b64:
expect_md5_b64 = md5_b64
cr_s, cr_e, cr_total = _extract_content_range(hdr_text)
if cr_s is not None and cr_total is not None:
# 206 Partial Content
offset_base = cr_s
# Content-Range end 是 inclusive总长度以 total 为准
if total_len is None:
total_len = cr_total
elif total_len != cr_total:
_log(f"[WARN] total_len changed {total_len}->{cr_total}")
total_len = cr_total
if body_buf is None and total_len:
body_buf = bytearray(total_len)
_log(f"[HDR] id={hid} code={code} len={expect_len} md5={expect_md5_b64}")
continue
if ev[0] == "content":
_, cid, _total, _sum, _cur, payload = ev
if urc_id is None:
urc_id = cid
if cid != urc_id:
continue
# 全量 200这里的 _total 就是全文件长度Range 206_total 可能只是“本次响应体长度”
if body_buf is None:
# 如果 header 没解析出 Content-Range总长度用 content 的 _total
if total_len is None:
total_len = _total
if total_len:
body_buf = bytearray(total_len)
if body_buf is None or total_len is None:
continue
rel_start = _sum - _cur
rel_end = _sum
abs_start = offset_base + rel_start
abs_end = offset_base + rel_end
if abs_start < 0 or abs_start >= total_len:
continue
if abs_end < abs_start:
continue
if abs_end > total_len:
abs_end = total_len
expected_span = abs_end - abs_start
actual_len = min(len(payload), expected_span)
if actual_len <= 0:
continue
# 写入并记录“实际写入区间”,用于 gap 计算
body_buf[abs_start:abs_start + actual_len] = payload[:actual_len]
got_ranges.add((abs_start, abs_start + actual_len))
filled_new_bytes += actual_len
# 记录最大的 sum 值,用于判断是否所有数据都已发送
if _sum > last_sum:
last_sum = _sum
# debug 输出节流:每 ~8000 字节或 >=500ms 输出一次,避免 print 导致 UART 丢包
if debug:
now = time.ticks_ms()
if (time.ticks_diff(now, last_print_ms) >= 500) or (_sum - last_print_sum >= 8000) or (rel_end == _total):
_log(f"[URC] {abs_start}:{abs_start+actual_len} sum={_sum}/{_total} base={offset_base} +{filled_new_bytes}")
last_print_ms = now
last_print_sum = _sum
# 若是全量请求offset_base=0 且 total_len==_total尽早结束
if offset_base == 0 and total_len == _total:
# 不要用 filled_new_bytes 判断是否完整(可能有重叠)
pass
# 5) 清理实例
at(f"AT+MHTTPDEL={httpid}", "OK", 3000)
if body_buf is None:
return False, "empty_body", body_buf, got_ranges, total_len, expect_md5_b64
if total_len is None:
return False, "no_total_len", body_buf, got_ranges, total_len, expect_md5_b64
# 返回“本次尝试是否有实质进展”Range 补洞时,哪怕不完整也算成功推进
if filled_new_bytes <= 0:
return False, "no_progress", body_buf, got_ranges, total_len, expect_md5_b64
return True, f"PARTIAL ok +{filled_new_bytes} ip={ip} code={status_code}", body_buf, got_ranges, total_len, expect_md5_b64
global ota_in_progress
ota_in_progress = True
with uart4g_lock:
try:
# -------- Phase 1: 全量 GET允许不完整后面用 Range 补洞)--------
body_buf = None
got_ranges = set()
total_len = None
expect_md5_b64 = None
last_err = "unknown"
for attempt in range(1, retries + 1):
ok, msg, body_buf, got_ranges, total_len, expect_md5_b64 = _one_attempt(
body_buf=body_buf, got_ranges=got_ranges, total_len=total_len, expect_md5_b64=expect_md5_b64
)
last_err = msg
if not ok:
_log(f"[RETRY] full attempt={attempt} failed={msg}")
time.sleep_ms(200)
continue
gaps, merged = _compute_gaps(total_len, got_ranges)
filled_total = sum(e - s for s, e in merged)
if gaps and gaps[0] == (0, 0):
gaps = []
if not gaps:
break
_log(f"[GAPS] after full attempt={attempt} filled={filled_total}/{total_len} gaps={gaps[:3]}")
time.sleep_ms(150)
if body_buf is None or total_len is None:
return False, f"FAILED: {last_err}"
gaps, merged = _compute_gaps(total_len, got_ranges)
if gaps and gaps[0] == (0, 0):
gaps = []
# -------- Phase 2: Range 补洞 --------
# 限制单次 Range 的最大长度(太大也会触发同样的 UART 压力)
MAX_RANGE_BYTES = 8192
RANGE_RETRIES_EACH = 2
MAX_HOLE_ROUNDS = 10
round_i = 0
while gaps and round_i < MAX_HOLE_ROUNDS:
round_i += 1
# 优先补最大的洞(通常只丢中间一两段)
gaps = sorted(gaps, key=lambda g: g[1] - g[0], reverse=True)
_log(f"[RANGE] round={round_i} gaps={gaps[:3]}")
progress_any = False
# 每轮最多补前 5 个洞,避免无限循环
for (gs, ge) in gaps[:5]:
cur = gs
while cur < ge:
sub_end = min(ge, cur + MAX_RANGE_BYTES)
# HTTP Range end is inclusive
rs = cur
re_incl = sub_end - 1
before_gaps, before_merged = _compute_gaps(total_len, got_ranges)
before_filled = sum(e - s for s, e in before_merged)
sub_ok = False
sub_err = "unknown"
for k in range(1, RANGE_RETRIES_EACH + 1):
ok2, msg2, body_buf, got_ranges, total_len, expect_md5_b64 = _one_attempt(
range_start=rs, range_end=re_incl,
body_buf=body_buf, got_ranges=got_ranges, total_len=total_len, expect_md5_b64=expect_md5_b64
)
sub_err = msg2
if ok2:
sub_ok = True
break
_log(f"[RETRY] range {rs}-{re_incl} try={k} failed={msg2}")
time.sleep_ms(120)
after_gaps, after_merged = _compute_gaps(total_len, got_ranges)
after_filled = sum(e - s for s, e in after_merged)
if after_filled > before_filled:
progress_any = True
if not sub_ok:
_log(f"[WARN] range {rs}-{re_incl} failed={sub_err}")
# 小歇一下,给读线程喘息
time.sleep_ms(80)
cur = sub_end
gaps, merged = _compute_gaps(total_len, got_ranges)
if gaps and gaps[0] == (0, 0):
gaps = []
filled_total = sum(e - s for s, e in merged)
if not gaps:
break
if not progress_any:
# 本轮没有推进,退出避免死循环
_log(f"[RANGE] no progress in round={round_i}, stop. filled={filled_total}/{total_len}")
break
_log(f"[RANGE] round={round_i} filled={filled_total}/{total_len} gaps={gaps[:3]}")
# 完整性检查
gaps, merged = _compute_gaps(total_len, got_ranges)
if gaps and gaps[0] == (0, 0):
gaps = []
filled_total = sum(e - s for s, e in merged)
if gaps:
return False, f"incomplete_body got={filled_total} expected={total_len} missing={total_len - filled_total} gaps={gaps[:5]}"
data = bytes(body_buf)
# 校验Content-Md5base64若有
if expect_md5_b64 and hashlib is not None:
md5_b64 = _md5_base64(data)
if md5_b64 != expect_md5_b64:
return False, f"md5_mismatch got={md5_b64} expected={expect_md5_b64}"
# 写文件(原样 bytes
with open(filename, "wb") as f:
f.write(data)
return True, f"OK size={len(data)} ip={_get_ip()} md5={expect_md5_b64 or ''}"
finally:
ota_in_progress = False
def direct_ota_download_via_4g(ota_url):
"""通过 4G 模块下载 OTA不需要 Wi-Fi"""
global update_thread_started
try:
if not ota_url:
safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2)
return
print(f"[OTA-4G] 开始通过 4G 下载: {ota_url}")
success, msg = download_file_via_4g(ota_url, local_filename, debug=True)
print(f"[OTA-4G] {msg}")
if success and "OK" in msg:
# 下载成功:备份+替换+重启
if apply_ota_and_reboot(ota_url):
return # 会重启,不会执行到 finally
else:
safe_enqueue({"result": msg}, 2)
except Exception as e:
error_msg = f"OTA-4G 异常: {str(e)}"
print(error_msg)
safe_enqueue({"result": "ota_failed", "reason": error_msg}, 2)
finally:
update_thread_started = False
# ==================== 主程序入口 ====================
def cmd_str():
global DEVICE_ID, PASSWORD
DEVICE_ID = read_device_id()
PASSWORD = DEVICE_ID + "."
# 创建照片存储目录
photo_dir = "/root/phot"
if photo_dir not in os.listdir("/root"):
try:
os.mkdir(photo_dir)
except:
pass
# 初始化硬件
init_ina226()
load_laser_point()
disp = display.Display()
cam = camera.Camera(640, 480)
# 启动通信与校准线程
# from tcp_handler import tcp_main
_thread.start_new_thread(tcp_main, ())
_thread.start_new_thread(laser_calibration_worker, ())
print("系统准备完成...")
last_adc_trigger = 0
# 主循环:检测扳机触发 → 拍照 → 分析 → 上报
while not app.need_exit():
current_time = time.ticks_ms()
# print("压力传感器数值: ", adc_obj.read())
adc_val = adc_obj.read()
# if adc_val > 2400:
# print(f"adc: {adc_val}")
if adc_val > ADC_TRIGGER_THRESHOLD:
diff_ms = current_time-last_adc_trigger
if diff_ms<3000:
continue
last_adc_trigger = current_time
time.sleep_ms(60) # 防抖
frame = cam.read()
x, y = laser_point
# 绘制激光十字线
frame.draw_line(int(x - length), int(y), int(x + length), int(y), color, thickness)
frame.draw_line(int(x), int(y - length), int(x), int(y + length), color, thickness)
frame.draw_circle(int(x), int(y), 1, color, thickness)
# 检测靶心
result_img, center, radius, method, best_radius1 = detect_circle(frame)
disp.show(result_img)
# 计算偏移与距离
dx, dy = compute_laser_position(center, (x, y), radius, method)
distance_m = estimate_distance(best_radius1)
# 读取电量
voltage = get_bus_voltage()
battery_percent = voltage_to_percent(voltage)
# 保存图像(带标注)
try:
jpg_count = len([f for f in os.listdir(photo_dir) if f.endswith('.jpg')])
filename = f"{photo_dir}/{int(x)}_{int(y)}_{round((distance_m or 0.0) * 100)}_{method}_{jpg_count:04d}.jpg"
result_img.save(filename, quality=70)
except Exception as e:
print(f"❌ 保存失败: {e}")
# 构造上报数据
inner_data = {
"x": float(dx) if dx is not None else 200.0,
"y": float(dy) if dy is not None else 200.0,
"r": 90.0,
"d": round((distance_m or 0.0) * 100), # 距离(厘米)
"m": method,
"adc": adc_val
}
report_data = {"cmd": 1, "data": inner_data}
# 射箭事件高优先级入队,由 tcp_main 统一发送
safe_enqueue(report_data, msg_type=2, high=True)
print("📤 射箭事件已加入发送队列")
# 闪一下激光(射箭反馈)
# TODO: remove after test done
flash_laser(1000) # 闪300ms可以根据需要调整时长
time.sleep_ms(100)
else:
disp.show(cam.read())
time.sleep_ms(50)
def dump_system_info():
cmds = [
"uname -a",
"cat /etc/os-release 2>/dev/null",
"cat /proc/version 2>/dev/null",
"cat /proc/1/comm 2>/dev/null",
"ps 2>/dev/null | head -n 5",
"ls -l /sbin/init 2>/dev/null",
"ls -l /etc/init.d 2>/dev/null | head -n 10",
"which systemctl 2>/dev/null",
"which rc-service 2>/dev/null",
"which busybox 2>/dev/null && busybox | head -n 1",
"ls /dev/watchdog* 2>/dev/null",
]
for c in cmds:
try:
out = os.popen(c).read()
print("\n$ " + c + "\n" + (out.strip() or "<empty>"))
except Exception as e:
print("\n$ " + c + "\n<error> " + str(e))
if __name__ == "__main__":
# dump_system_info()
# try:
# import threading
# print("threading module:", threading)
# print("has Lock:", hasattr(threading, "Lock"))
# if hasattr(threading, "Lock"):
# print("has lock")
# finally:
# pass
import config
print("env: ", config.get_env())
cmd_str()