1851 lines
68 KiB
Python
1851 lines
68 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
激光射击系统主程序(视觉测距版)
|
||
功能:目标检测、激光校准、4G TCP 通信、OTA 升级、单目测距、INA226 电量监测
|
||
平台:MaixPy (Sipeed MAIX)
|
||
作者:ZZH
|
||
最后更新:2025-11-21
|
||
"""
|
||
from maix import camera, display, image, app, time, key, uart, pinmap, i2c, network, err
|
||
import cv2
|
||
import numpy as np
|
||
import json
|
||
import struct
|
||
import re
|
||
from maix.peripheral import adc
|
||
import _thread
|
||
import os
|
||
import hmac
|
||
import ujson
|
||
import hashlib
|
||
import requests
|
||
import socket
|
||
import re
|
||
import binascii
|
||
|
||
try:
|
||
import hashlib
|
||
except:
|
||
hashlib = None
|
||
|
||
# import config
|
||
|
||
# ==================== Locks ====================
|
||
|
||
class _Mutex:
|
||
"""
|
||
基于 _thread.allocate_lock() 的互斥锁封装:
|
||
- 支持 with
|
||
- 支持 try_acquire(若固件不支持非阻塞 acquire 参数,则退化为阻塞 acquire)
|
||
"""
|
||
def __init__(self):
|
||
self._lk = _thread.allocate_lock()
|
||
|
||
def acquire(self, blocking=True):
|
||
try:
|
||
return self._lk.acquire(blocking)
|
||
except TypeError:
|
||
self._lk.acquire()
|
||
return True
|
||
|
||
def try_acquire(self):
|
||
return self.acquire(False)
|
||
|
||
def release(self):
|
||
self._lk.release()
|
||
|
||
def __enter__(self):
|
||
self.acquire(True)
|
||
return self
|
||
|
||
def __exit__(self, exc_type, exc, tb):
|
||
self.release()
|
||
return False
|
||
|
||
|
||
class ATClient:
|
||
"""
|
||
单读者 AT/URC 客户端:唯一读取 uart4g,避免 tcp_main/at()/OTA 抢读导致 EOF / 丢包。
|
||
- send(cmd, expect, timeout_ms) : 发送 AT 并等待 expect
|
||
- pop_tcp_payload() : 获取 +MIPURC:"rtcp" 的 payload(已按长度裁剪)
|
||
- pop_http_event() : 获取 +MHTTPURC 事件(header/content)
|
||
"""
|
||
def __init__(self, uart_obj):
|
||
self.uart = uart_obj
|
||
self._cmd_lock = _Mutex()
|
||
self._q_lock = _Mutex()
|
||
self._rx = b""
|
||
self._tcp_payloads = []
|
||
self._http_events = []
|
||
|
||
# 当前命令等待状态(仅允许单命令 in-flight)
|
||
self._waiting = False
|
||
self._expect = b"OK"
|
||
self._resp = b""
|
||
|
||
self._running = False
|
||
|
||
def start(self):
|
||
if self._running:
|
||
return
|
||
self._running = True
|
||
_thread.start_new_thread(self._reader_loop, ())
|
||
|
||
def stop(self):
|
||
self._running = False
|
||
|
||
def flush(self):
|
||
"""清空内部缓存与队列(用于 OTA/异常恢复)"""
|
||
with self._q_lock:
|
||
self._rx = b""
|
||
self._tcp_payloads.clear()
|
||
self._http_events.clear()
|
||
self._resp = b""
|
||
|
||
def pop_tcp_payload(self):
|
||
with self._q_lock:
|
||
if self._tcp_payloads:
|
||
return self._tcp_payloads.pop(0)
|
||
return None
|
||
|
||
def pop_http_event(self):
|
||
with self._q_lock:
|
||
if self._http_events:
|
||
return self._http_events.pop(0)
|
||
return None
|
||
|
||
def _push_tcp_payload(self, payload: bytes):
|
||
# 注意:在 _reader_loop 内部解析 URC 时已经持有 _q_lock,
|
||
# 这里不要再次 acquire(锁不可重入,会死锁)。
|
||
self._tcp_payloads.append(payload)
|
||
|
||
def _push_http_event(self, ev):
|
||
# 同上:避免在 _reader_loop 持锁期间二次 acquire
|
||
self._http_events.append(ev)
|
||
|
||
def send(self, cmd: str, expect: str = "OK", timeout_ms: int = 2000):
|
||
"""
|
||
发送 AT 命令并等待 expect(子串匹配)。
|
||
注意:expect=">" 用于等待 prompt。
|
||
"""
|
||
expect_b = expect.encode() if isinstance(expect, str) else expect
|
||
with self._cmd_lock:
|
||
# 初始化等待
|
||
self._waiting = True
|
||
self._expect = expect_b
|
||
self._resp = b""
|
||
|
||
# 发送
|
||
if cmd:
|
||
# 注意:这里不要再用 uart4g_lock(否则外层已经持锁时会死锁)。
|
||
# 写入由 _cmd_lock 串行化即可。
|
||
self.uart.write((cmd + "\r\n").encode())
|
||
|
||
t0 = time.ticks_ms()
|
||
while time.ticks_ms() - t0 < timeout_ms:
|
||
if (not self._waiting) or (self._expect in self._resp):
|
||
self._waiting = False
|
||
break
|
||
time.sleep_ms(5)
|
||
|
||
# 超时也返回已收集内容(便于诊断)
|
||
self._waiting = False
|
||
try:
|
||
return self._resp.decode(errors="ignore")
|
||
except:
|
||
return str(self._resp)
|
||
|
||
def _parse_mipurc_rtcp(self):
|
||
"""
|
||
解析:+MIPURC: "rtcp",<link_id>,<len>,<payload...>
|
||
之前硬编码 link_id=0 会导致在多连接/重连场景下收不到数据。
|
||
"""
|
||
prefix = b'+MIPURC: "rtcp",'
|
||
i = self._rx.find(prefix)
|
||
if i < 0:
|
||
return False
|
||
# 丢掉前置噪声
|
||
if i > 0:
|
||
self._rx = self._rx[i:]
|
||
i = 0
|
||
|
||
j = len(prefix)
|
||
# 解析 link_id
|
||
k = j
|
||
while k < len(self._rx) and 48 <= self._rx[k] <= 57:
|
||
k += 1
|
||
if k == j or k >= len(self._rx):
|
||
return False
|
||
if self._rx[k:k+1] != b",":
|
||
self._rx = self._rx[1:]
|
||
return True
|
||
try:
|
||
link_id = int(self._rx[j:k].decode())
|
||
except:
|
||
self._rx = self._rx[1:]
|
||
return True
|
||
|
||
# 解析 len
|
||
j2 = k + 1
|
||
k2 = j2
|
||
while k2 < len(self._rx) and 48 <= self._rx[k2] <= 57:
|
||
k2 += 1
|
||
if k2 == j2 or k2 >= len(self._rx):
|
||
return False
|
||
if self._rx[k2:k2+1] != b",":
|
||
self._rx = self._rx[1:]
|
||
return True
|
||
try:
|
||
n = int(self._rx[j2:k2].decode())
|
||
except:
|
||
self._rx = self._rx[1:]
|
||
return True
|
||
|
||
payload_start = k2 + 1
|
||
payload_end = payload_start + n
|
||
if len(self._rx) < payload_end:
|
||
return False # payload 未收齐
|
||
|
||
payload = self._rx[payload_start:payload_end]
|
||
# 把 link_id 一起带上,便于上层过滤(如果需要)
|
||
self._push_tcp_payload((link_id, payload))
|
||
self._rx = self._rx[payload_end:]
|
||
return True
|
||
|
||
def _parse_mhttpurc_header(self):
|
||
tag = b'+MHTTPURC: "header",'
|
||
i = self._rx.find(tag)
|
||
if i < 0:
|
||
return False
|
||
if i > 0:
|
||
self._rx = self._rx[i:]
|
||
i = 0
|
||
|
||
# header: +MHTTPURC: "header",<id>,<code>,<hdr_len>,<hdr_text...>
|
||
j = len(tag)
|
||
comma_count = 0
|
||
k = j
|
||
while k < len(self._rx) and comma_count < 3:
|
||
if self._rx[k:k+1] == b",":
|
||
comma_count += 1
|
||
k += 1
|
||
if comma_count < 3:
|
||
return False
|
||
|
||
prefix = self._rx[:k]
|
||
m = re.search(rb'\+MHTTPURC: "header",\s*(\d+),\s*(\d+),\s*(\d+),', prefix)
|
||
if not m:
|
||
self._rx = self._rx[1:]
|
||
return True
|
||
urc_id = int(m.group(1))
|
||
code = int(m.group(2))
|
||
hdr_len = int(m.group(3))
|
||
|
||
text_start = k
|
||
text_end = text_start + hdr_len
|
||
if len(self._rx) < text_end:
|
||
return False
|
||
|
||
hdr_text = self._rx[text_start:text_end].decode("utf-8", "ignore")
|
||
self._push_http_event(("header", urc_id, code, hdr_text))
|
||
self._rx = self._rx[text_end:]
|
||
return True
|
||
|
||
def _parse_mhttpurc_content(self):
|
||
tag = b'+MHTTPURC: "content",'
|
||
i = self._rx.find(tag)
|
||
if i < 0:
|
||
return False
|
||
if i > 0:
|
||
self._rx = self._rx[i:]
|
||
i = 0
|
||
|
||
# content: +MHTTPURC: "content",<id>,<total>,<sum>,<cur>,<payload...>
|
||
j = len(tag)
|
||
comma_count = 0
|
||
k = j
|
||
while k < len(self._rx) and comma_count < 4:
|
||
if self._rx[k:k+1] == b",":
|
||
comma_count += 1
|
||
k += 1
|
||
if comma_count < 4:
|
||
return False
|
||
|
||
prefix = self._rx[:k]
|
||
m = re.search(rb'\+MHTTPURC: "content",\s*(\d+),\s*(\d+),\s*(\d+),\s*(\d+),', prefix)
|
||
if not m:
|
||
self._rx = self._rx[1:]
|
||
return True
|
||
urc_id = int(m.group(1))
|
||
total_len = int(m.group(2))
|
||
sum_len = int(m.group(3))
|
||
cur_len = int(m.group(4))
|
||
|
||
payload_start = k
|
||
payload_end = payload_start + cur_len
|
||
if len(self._rx) < payload_end:
|
||
return False
|
||
|
||
payload = self._rx[payload_start:payload_end]
|
||
self._push_http_event(("content", urc_id, total_len, sum_len, cur_len, payload))
|
||
self._rx = self._rx[payload_end:]
|
||
return True
|
||
|
||
def _reader_loop(self):
|
||
while self._running:
|
||
d = self.uart.read(4096)
|
||
if not d:
|
||
time.sleep_ms(2)
|
||
continue
|
||
|
||
with self._q_lock:
|
||
# 统一累积到内部 buffer(用于 URC 解析)
|
||
self._rx += d
|
||
# 命令等待期间,把原始字节流复制到响应缓冲(不影响 URC 解析)
|
||
if self._waiting:
|
||
self._resp += d
|
||
|
||
# 解析 URC:尽可能多地从 _rx 中剥离完整 URC,避免丢包
|
||
while True:
|
||
progressed = (
|
||
self._parse_mipurc_rtcp()
|
||
or self._parse_mhttpurc_header()
|
||
or self._parse_mhttpurc_content()
|
||
)
|
||
if not progressed:
|
||
break
|
||
|
||
# 防止 _rx 因为"非 URC 文本/回显"无限增长:保留尾部即可
|
||
# 关键修复:如果 buffer 中有 HTTP URC 标签(说明 OTA 在进行),完全禁用截断
|
||
# 避免在 OTA 下载时截断 buffer 导致数据丢失(之前 16KB 限制太小,导致数据被截断)
|
||
has_http_urc = (b'+MHTTPURC: "content"' in self._rx or
|
||
b'+MHTTPURC: "header"' in self._rx)
|
||
if has_http_urc:
|
||
# OTA 下载中:完全禁用 buffer 截断,避免数据丢失
|
||
# 通常 OTA 文件不会超过几百 KB,即使不截断也不会导致内存问题
|
||
pass # 不截断
|
||
else:
|
||
# 非 OTA 状态:使用较小的 buffer 限制(16KB)
|
||
if len(self._rx) > 16384:
|
||
self._rx = self._rx[-4096:]
|
||
|
||
# ==================== 全局配置 ====================
|
||
|
||
# OTA 升级地址与本地路径
|
||
# url = "https://static.shelingxingqiu.com/shoot/202511031031/main.py"
|
||
local_filename = "/maixapp/apps/t11/main_tmp.py"
|
||
app_version = '1.0.2'
|
||
# OTA 下发参数(由后端指令写入)
|
||
OTA_URL = None
|
||
OTA_MODE = None # "4g" / "wifi" / None
|
||
|
||
def is_wifi_connected():
|
||
"""尽量判断当前是否有 Wi-Fi(有则走 Wi-Fi OTA,否则走 4G OTA)"""
|
||
# 优先用 MaixPy network(如果可用)
|
||
try:
|
||
wlan = network.WLAN(network.TYPE_WIFI)
|
||
if wlan.isconnected():
|
||
return True
|
||
except:
|
||
pass
|
||
|
||
# 兜底:看系统 wlan0 有没有 IP(你系统可能没有 wlan0,则返回 False)
|
||
try:
|
||
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
|
||
return bool(ip)
|
||
except:
|
||
return False
|
||
|
||
# 设备认证信息(运行时动态加载)
|
||
DEVICE_ID = None
|
||
PASSWORD = None
|
||
|
||
# 服务器连接参数
|
||
SERVER_IP = "www.shelingxingqiu.com"
|
||
SERVER_PORT = 50005
|
||
HEARTBEAT_INTERVAL = 60 # 心跳间隔(秒)
|
||
|
||
# 激光校准配置
|
||
CONFIG_FILE = "/root/laser_config.json"
|
||
DEFAULT_POINT = (640, 480) # 默认激光中心点(图像中心)
|
||
laser_point = DEFAULT_POINT
|
||
|
||
# HTTP 上报接口
|
||
URL = "http://ws.shelingxingqiu.com"
|
||
API_PATH = "/home/shoot/device_fire/arrow/fire"
|
||
|
||
# UART 设备初始化
|
||
uart4g = uart.UART("/dev/ttyS2", 115200) # 4G 模块通信
|
||
distance_serial = uart.UART("/dev/ttyS1", 9600) # 激光测距模块
|
||
|
||
# 单读者 ATClient(唯一读取 uart4g)
|
||
at_client = ATClient(uart4g)
|
||
at_client.start()
|
||
|
||
# 引脚功能映射
|
||
pinmap.set_pin_function("A18", "UART1_RX")
|
||
pinmap.set_pin_function("A19", "UART1_TX")
|
||
pinmap.set_pin_function("A29", "UART2_RX")
|
||
pinmap.set_pin_function("A28", "UART2_TX")
|
||
pinmap.set_pin_function("P18", "I2C1_SCL")
|
||
pinmap.set_pin_function("P21", "I2C1_SDA")
|
||
# pinmap.set_pin_function("A15", "I2C5_SCL")
|
||
# pinmap.set_pin_function("A27", "I2C5_SDA")#ota升级要修改的
|
||
# ADC 触发阈值(用于检测扳机/激光触发)
|
||
ADC_TRIGGER_THRESHOLD = 3000
|
||
ADC_LASER_THRESHOLD = 3000
|
||
|
||
# 显示参数:激光十字线样式
|
||
color = image.Color(255, 100, 0)
|
||
thickness = 1
|
||
length = 2
|
||
|
||
# 全局状态变量
|
||
laser_calibration_active = False # 是否正在后台校准激光
|
||
laser_calibration_result = None # 校准结果坐标 (x, y)
|
||
laser_calibration_lock = _Mutex() # 互斥锁,防止多线程冲突
|
||
|
||
# 硬件对象初始化
|
||
laser_x, laser_y = laser_point
|
||
adc_obj = adc.ADC(0, adc.RES_BIT_12)
|
||
bus = i2c.I2C(1, i2c.Mode.MASTER) # 使用 I2C1 总线
|
||
# bus = i2c.I2C(5, i2c.Mode.MASTER) #ota升级的
|
||
# INA226 电流/电压监测芯片寄存器地址
|
||
INA226_ADDR = 0x40
|
||
REG_CONFIGURATION = 0x00
|
||
REG_BUS_VOLTAGE = 0x02
|
||
REG_CALIBRATION = 0x05
|
||
CALIBRATION_VALUE = 0x1400
|
||
|
||
# 激光控制指令(自定义协议)
|
||
MODULE_ADDR = 0x00
|
||
LASER_ON_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC1])
|
||
LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0])
|
||
|
||
# 相机标定参数(用于距离估算)
|
||
# FOCAL_LENGTH_PIX = 3800.0 # 焦距(像素)
|
||
FOCAL_LENGTH_PIX = 1900.0 # 焦距(像素)
|
||
REAL_RADIUS_CM = 15 # 靶心实际半径(厘米)
|
||
|
||
# # TCP 连接状态
|
||
tcp_connected = False
|
||
high_send_queue = [] # 高优先级发送队列:射箭事件等
|
||
normal_send_queue = [] # 普通发送队列:电量/校准结果/状态等
|
||
queue_lock = _Mutex() # 互斥锁,保护队列
|
||
uart4g_lock = _Mutex() # 互斥锁,保护 4G 串口 AT 发送流程(防并发)
|
||
update_thread_started = False # 防止 OTA 更新线程重复启动
|
||
ota_in_progress = False # OTA(4G HTTP URC) 期间暂停 tcp_main 读取 uart4g,避免吞掉 +MHTTPURC
|
||
|
||
# ==================== 工具函数 ====================
|
||
|
||
def download_file(url, filename):
|
||
"""从指定 URL 下载文件并保存为 UTF-8 编码文本"""
|
||
try:
|
||
print(f"正在从 {url} 下载文件...")
|
||
response = requests.get(url)
|
||
response.raise_for_status()
|
||
response.encoding = 'utf-8'
|
||
with open(filename, 'w', encoding='utf-8') as file:
|
||
file.write(response.text)
|
||
return f"下载成功!文件已保存为: {filename}"
|
||
except requests.exceptions.RequestException as e:
|
||
return f"下载失败!网络请求错误: {e}"
|
||
except OSError as e:
|
||
return f"下载失败!文件写入错误: {e}"
|
||
except Exception as e:
|
||
return f"下载失败!发生未知错误: {e}"
|
||
|
||
|
||
def is_server_reachable(host, port=80, timeout=5):
|
||
"""检查目标主机端口是否可达(用于 OTA 前网络检测)"""
|
||
try:
|
||
addr_info = socket.getaddrinfo(host, port)[0]
|
||
s = socket.socket(addr_info[0], addr_info[1], addr_info[2])
|
||
s.settimeout(timeout)
|
||
s.connect(addr_info[-1])
|
||
s.close()
|
||
return True
|
||
except Exception as e:
|
||
print(f"[NET] 无法连接 {host}:{port} - {e}")
|
||
return False
|
||
|
||
def apply_ota_and_reboot(ota_url=None):
|
||
# TODO: remove this return after test
|
||
# return True
|
||
|
||
"""
|
||
OTA 文件下载成功后:备份原 main.py -> 替换 main_tmp.py -> 重启设备
|
||
"""
|
||
import shutil
|
||
|
||
main_py = "/maixapp/apps/t11/main.py"
|
||
main_tmp = "/maixapp/apps/t11/main_tmp.py"
|
||
main_bak = "/maixapp/apps/t11/main.py.bak"
|
||
ota_pending = "/maixapp/apps/t11/ota_pending.json"
|
||
|
||
try:
|
||
# 1. 检查下载的文件是否存在
|
||
if not os.path.exists(main_tmp):
|
||
print(f"[OTA] 错误:{main_tmp} 不存在")
|
||
return False
|
||
|
||
# 2. 备份原 main.py(如果存在)
|
||
if os.path.exists(main_py):
|
||
try:
|
||
shutil.copy2(main_py, main_bak)
|
||
print(f"[OTA] 已备份 {main_py} -> {main_bak}")
|
||
except Exception as e:
|
||
print(f"[OTA] 备份失败: {e}")
|
||
# 备份失败也继续(可能没有原文件)
|
||
|
||
# 3. 替换:main_tmp.py -> main.py
|
||
try:
|
||
shutil.copy2(main_tmp, main_py)
|
||
print(f"[OTA] 已替换 {main_tmp} -> {main_py}")
|
||
|
||
# 确保写入磁盘
|
||
try:
|
||
os.sync() # 如果系统支持
|
||
except:
|
||
pass
|
||
time.sleep_ms(500) # 额外等待确保写入完成
|
||
|
||
except Exception as e:
|
||
print(f"[OTA] 替换失败: {e}")
|
||
return False
|
||
|
||
# 3.5 写入 pending(用于重启后确认成功并上报)
|
||
try:
|
||
pending_obj = {
|
||
"ts": int(time.time()) if hasattr(time, "time") else 0,
|
||
"url": ota_url or "",
|
||
"tmp": main_tmp,
|
||
"main": main_py,
|
||
"bak": main_bak,
|
||
}
|
||
with open(ota_pending, "w", encoding="utf-8") as f:
|
||
json.dump(pending_obj, f)
|
||
try:
|
||
os.sync()
|
||
except:
|
||
pass
|
||
except Exception as e:
|
||
print(f"[OTA] 写入 ota_pending 失败: {e}")
|
||
|
||
# 4. 通知服务器(可选,但重启前发一次)
|
||
safe_enqueue({"result": "ota_applied_rebooting"}, 2)
|
||
time.sleep_ms(1000) # 给一点时间让消息发出
|
||
|
||
# 5. 重启设备
|
||
print("[OTA] 准备重启设备...")
|
||
os.system("reboot") # MaixPy 通常是这个命令
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"[OTA] apply_ota_and_reboot 异常: {e}")
|
||
return False
|
||
|
||
|
||
def direct_ota_download(ota_url):
|
||
"""
|
||
直接执行 OTA 下载(假设已有网络)
|
||
用于 cmd=7 / 或 wifi 模式
|
||
"""
|
||
global update_thread_started
|
||
try:
|
||
if not ota_url:
|
||
safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2)
|
||
return
|
||
|
||
from urllib.parse import urlparse
|
||
parsed_url = urlparse(ota_url)
|
||
host = parsed_url.hostname
|
||
port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80)
|
||
|
||
if not is_server_reachable(host, port, timeout=8):
|
||
safe_enqueue({"result": "ota_failed", "reason": f"无法连接 {host}:{port}"}, 2)
|
||
return
|
||
|
||
print(f"[OTA] 开始下载: {ota_url}")
|
||
# from ota import download_file
|
||
result_msg = download_file(ota_url, local_filename)
|
||
print(f"[OTA] {result_msg}")
|
||
|
||
# 检查是否下载成功(包含"成功"或"下载成功"关键字)
|
||
if "成功" in result_msg or "下载成功" in result_msg:
|
||
# 下载成功:备份+替换+重启
|
||
if apply_ota_and_reboot(ota_url):
|
||
return # 会重启,不会执行到 finally
|
||
else:
|
||
safe_enqueue({"result": result_msg}, 2)
|
||
|
||
except Exception as e:
|
||
error_msg = f"OTA 异常: {str(e)}"
|
||
print(error_msg)
|
||
safe_enqueue({"result": "ota_failed", "reason": error_msg}, 2)
|
||
finally:
|
||
update_thread_started = False
|
||
|
||
def handle_wifi_and_update(ssid, password, ota_url):
|
||
"""在子线程中执行 Wi-Fi 连接 + OTA 更新流程"""
|
||
global update_thread_started
|
||
try:
|
||
ip, error = connect_wifi(ssid, password)
|
||
if error:
|
||
safe_enqueue({"result": "wifi_failed", "error": error}, 2)
|
||
return
|
||
safe_enqueue({"result": "wifi_connected", "ip": ip}, 2)
|
||
|
||
# 下载
|
||
if not ota_url:
|
||
safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2)
|
||
return
|
||
|
||
from urllib.parse import urlparse
|
||
parsed_url = urlparse(ota_url)
|
||
host = parsed_url.hostname
|
||
port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80)
|
||
|
||
if not is_server_reachable(host, port, timeout=8):
|
||
err_msg = f"网络不通:无法连接 {host}:{port}"
|
||
safe_enqueue({"result": err_msg}, 2)
|
||
return
|
||
|
||
print(f"[NET] 已确认可访问 {host}:{port},开始下载...")
|
||
result = download_file(ota_url, local_filename)
|
||
print(result)
|
||
|
||
# 检查是否下载成功(包含"成功"或"下载成功"关键字)
|
||
if "成功" in result or "下载成功" in result:
|
||
# 下载成功:备份+替换+重启
|
||
if apply_ota_and_reboot(ota_url):
|
||
return # 会重启,不会执行到 finally
|
||
else:
|
||
safe_enqueue({"result": result}, 2)
|
||
|
||
finally:
|
||
update_thread_started = False
|
||
print("[UPDATE] 更新线程执行完毕,即将退出。")
|
||
|
||
|
||
def connect_wifi(ssid, password):
|
||
"""
|
||
连接 Wi-Fi 并将凭证持久化保存到 /boot/ 目录,
|
||
以便设备重启后自动连接。
|
||
"""
|
||
conf_path = "/etc/wpa_supplicant.conf"
|
||
ssid_file = "/boot/wifi.ssid"
|
||
pass_file = "/boot/wifi.pass"
|
||
|
||
try:
|
||
# 生成 wpa_supplicant 配置
|
||
net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read()
|
||
if "network={" not in net_conf:
|
||
return None, "Failed to generate wpa config"
|
||
|
||
# 写入运行时配置
|
||
with open(conf_path, "w") as f:
|
||
f.write("ctrl_interface=/var/run/wpa_supplicant\n")
|
||
f.write("update_config=1\n\n")
|
||
f.write(net_conf)
|
||
|
||
# 持久化保存 SSID/PASS(关键!)
|
||
with open(ssid_file, "w") as f:
|
||
f.write(ssid.strip())
|
||
with open(pass_file, "w") as f:
|
||
f.write(password.strip())
|
||
|
||
# 重启 Wi-Fi 服务
|
||
os.system("/etc/init.d/S30wifi restart")
|
||
|
||
# 等待获取 IP
|
||
for _ in range(20):
|
||
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
|
||
if ip:
|
||
return ip, None
|
||
time.sleep(1)
|
||
|
||
return None, "Timeout: No IP obtained"
|
||
|
||
except Exception as e:
|
||
return None, f"Exception: {str(e)}"
|
||
|
||
|
||
def read_device_id():
|
||
"""从 /device_key 文件读取设备唯一 ID,失败则使用默认值"""
|
||
try:
|
||
with open("/device_key", "r") as f:
|
||
device_id = f.read().strip()
|
||
if device_id:
|
||
print(f"[INFO] 从 /device_key 读取到 DEVICE_ID: {device_id}")
|
||
return device_id
|
||
except Exception as e:
|
||
print(f"[ERROR] 无法读取 /device_key: {e}")
|
||
return "DEFAULT_DEVICE_ID"
|
||
|
||
|
||
def safe_enqueue(data_dict, msg_type=2, high=False):
|
||
"""线程安全地将消息加入 TCP 发送队列(支持优先级)"""
|
||
global queue_lock, high_send_queue, normal_send_queue
|
||
item = (msg_type, data_dict)
|
||
with queue_lock:
|
||
if high:
|
||
high_send_queue.append(item)
|
||
else:
|
||
normal_send_queue.append(item)
|
||
|
||
def at(cmd, wait="OK", timeout=2000):
|
||
"""向 4G 模块发送 AT 指令并等待响应"""
|
||
# 统一由 ATClient 负责读 uart4g,避免多线程抢读
|
||
return at_client.send(cmd, wait, timeout)
|
||
|
||
|
||
def make_packet(msg_type: int, body_dict: dict) -> bytes:
|
||
"""打包 TCP 数据包:头部(长度+类型+校验)+ JSON 正文"""
|
||
body = json.dumps(body_dict).encode()
|
||
body_len = len(body)
|
||
checksum = body_len + msg_type
|
||
header = struct.pack(">III", body_len, msg_type, checksum)
|
||
return header + body
|
||
|
||
|
||
def parse_packet(data: bytes):
|
||
"""解析 TCP 数据包,返回 (类型, 正文字典)"""
|
||
if len(data) < 12:
|
||
return None, None
|
||
body_len, msg_type, checksum = struct.unpack(">III", data[:12])
|
||
body = data[12:12 + body_len]
|
||
try:
|
||
return msg_type, json.loads(body.decode())
|
||
except:
|
||
return msg_type, {"raw": body.decode(errors="ignore")}
|
||
|
||
|
||
def tcp_send_raw(data: bytes, max_retries=2) -> bool:
|
||
global tcp_connected
|
||
if not tcp_connected:
|
||
return False
|
||
|
||
with uart4g_lock:
|
||
for _ in range(max_retries):
|
||
cmd = f'AT+MIPSEND=0,{len(data)}'
|
||
if ">" not in at(cmd, ">", 2000):
|
||
time.sleep_ms(50)
|
||
continue
|
||
|
||
# 关键:确保把 data 全部写出去
|
||
total = 0
|
||
while total < len(data):
|
||
n = uart4g.write(data[total:])
|
||
if not n or n < 0:
|
||
time.sleep_ms(1)
|
||
continue
|
||
total += n
|
||
|
||
# 关键:再发结束符(不算进 payload)
|
||
uart4g.write(b"\x1A")
|
||
|
||
# 等发送完成确认(不同固件可能是 SEND OK / OK / +MIPSEND)
|
||
r = at("", "OK", 8000)
|
||
if ("SEND OK" in r) or ("OK" in r) or ("+MIPSEND" in r):
|
||
return True
|
||
|
||
time.sleep_ms(50)
|
||
|
||
return False
|
||
|
||
|
||
def generate_token(device_id):
|
||
"""生成用于 HTTP 接口鉴权的 Token(HMAC-SHA256)"""
|
||
SALT = "shootMessageFire"
|
||
SALT2 = "shoot"
|
||
return "Arrow_" + hmac.new((SALT + device_id).encode(), SALT2.encode(), hashlib.sha256).hexdigest()
|
||
|
||
|
||
def send_http_cmd(cmd_str, timeout_ms=3000):
|
||
"""发送 HTTP 相关 AT 指令(调试用)"""
|
||
print("[HTTP AT] =>", cmd_str)
|
||
return at(cmd_str, "OK", timeout_ms)
|
||
|
||
|
||
def read_http_response(timeout_ms=5000):
|
||
"""读取并打印 HTTP 响应(用于调试)"""
|
||
# 仅保留占位:UART 读取由 ATClient 独占;如需调试,请从 ATClient 的 http_events 中取。
|
||
time.sleep_ms(timeout_ms)
|
||
|
||
|
||
def upload_shoot_event(json_data):
|
||
"""通过 4G 模块上报射击事件到 HTTP 接口(备用通道)"""
|
||
token = generate_token(DEVICE_ID)
|
||
if not send_http_cmd(f'AT+MHTTPCREATE="{URL}"'):
|
||
return False
|
||
instance_id = 0
|
||
send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"Content-Type: application/json"')
|
||
send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"Authorization: {token}"')
|
||
send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"DeviceId: {DEVICE_ID}"')
|
||
json_str = ujson.dumps(json_data)
|
||
if not send_http_cmd(f'AT+MHTTPCONTENT={instance_id},0,0,"{json_str}"'):
|
||
return False
|
||
if send_http_cmd(f'AT+MHTTPREQUEST={instance_id},2,0,"{API_PATH}"'):
|
||
read_http_response()
|
||
return True
|
||
return False
|
||
|
||
|
||
def load_laser_point():
|
||
"""从配置文件加载激光中心点,失败则使用默认值"""
|
||
global laser_point
|
||
try:
|
||
if "laser_config.json" in os.listdir("/root"):
|
||
with open(CONFIG_FILE, "r") as f:
|
||
data = json.load(f)
|
||
if isinstance(data, list) and len(data) == 2:
|
||
laser_point = (int(data[0]), int(data[1]))
|
||
print(f"[INFO] 加载激光点: {laser_point}")
|
||
else:
|
||
raise ValueError
|
||
else:
|
||
laser_point = DEFAULT_POINT
|
||
except:
|
||
laser_point = DEFAULT_POINT
|
||
|
||
|
||
def save_laser_point(point):
|
||
"""保存激光中心点到配置文件"""
|
||
global laser_point
|
||
try:
|
||
with open(CONFIG_FILE, "w") as f:
|
||
json.dump([point[0], point[1]], f)
|
||
laser_point = point
|
||
except:
|
||
pass
|
||
|
||
|
||
def turn_on_laser():
|
||
"""发送指令开启激光,并读取回包(部分模块支持)"""
|
||
distance_serial.write(LASER_ON_CMD)
|
||
time.sleep_ms(10)
|
||
resp = distance_serial.read(20)
|
||
if resp:
|
||
if resp == LASER_ON_CMD:
|
||
print("✅ 激光指令已确认")
|
||
else:
|
||
print("🔇 无回包(正常或模块不支持)")
|
||
return resp
|
||
|
||
def flash_laser(duration_ms=1000):
|
||
"""闪一下激光(用于射箭反馈)"""
|
||
try:
|
||
distance_serial.write(LASER_ON_CMD)
|
||
time.sleep_ms(duration_ms)
|
||
distance_serial.write(LASER_OFF_CMD)
|
||
except Exception as e:
|
||
print(f"闪激光失败: {e}")
|
||
|
||
|
||
def find_red_laser(frame, threshold=150):
|
||
"""在图像中查找最亮的红色激光点(基于 RGB 阈值)"""
|
||
w, h = frame.width(), frame.height()
|
||
img_bytes = frame.to_bytes()
|
||
max_sum = 0
|
||
best_pos = None
|
||
for y in range(0, h, 2):
|
||
for x in range(0, w, 2):
|
||
idx = (y * w + x) * 3
|
||
r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2]
|
||
if r > threshold and r > g * 2 and r > b * 2:
|
||
rgb_sum = r + g + b
|
||
if rgb_sum > max_sum:
|
||
max_sum = rgb_sum
|
||
best_pos = (x, y)
|
||
return best_pos
|
||
|
||
|
||
def calibrate_laser_position():
|
||
"""执行一次激光校准:拍照 → 找红点 → 保存坐标"""
|
||
global laser_x, laser_y
|
||
time.sleep_ms(80)
|
||
cam = camera.Camera(640, 480)
|
||
frame = cam.read()
|
||
pos = find_red_laser(frame)
|
||
if pos:
|
||
laser_x, laser_y = pos
|
||
save_laser_point(pos)
|
||
return pos
|
||
return None
|
||
|
||
|
||
# ==================== 电源管理(INA226) ====================
|
||
|
||
def write_register(reg, value):
|
||
data = [(value >> 8) & 0xFF, value & 0xFF]
|
||
bus.writeto_mem(INA226_ADDR, reg, bytes(data))
|
||
|
||
|
||
def read_register(reg):
|
||
data = bus.readfrom_mem(INA226_ADDR, reg, 2)
|
||
return (data[0] << 8) | data[1]
|
||
|
||
|
||
def init_ina226():
|
||
"""初始化 INA226 芯片:配置模式 + 校准值"""
|
||
write_register(REG_CONFIGURATION, 0x4527)
|
||
write_register(REG_CALIBRATION, CALIBRATION_VALUE)
|
||
|
||
|
||
def get_bus_voltage():
|
||
"""读取总线电压(单位:V)"""
|
||
raw = read_register(REG_BUS_VOLTAGE)
|
||
return raw * 1.25 / 1000
|
||
|
||
|
||
def voltage_to_percent(voltage):
|
||
"""根据电压估算电池百分比(查表插值)"""
|
||
points = [
|
||
(4.20, 100), (4.10, 95), (4.05, 85), (4.00, 75), (3.95, 65),
|
||
(3.90, 55), (3.85, 45), (3.80, 35), (3.75, 25), (3.70, 15),
|
||
(3.65, 5), (3.60, 0)
|
||
]
|
||
if voltage >= points[0][0]:
|
||
return 100
|
||
if voltage <= points[-1][0]:
|
||
return 0
|
||
for i in range(len(points) - 1):
|
||
v1, p1 = points[i]
|
||
v2, p2 = points[i + 1]
|
||
if voltage >= v2:
|
||
ratio = (voltage - v1) / (v2 - v1)
|
||
percent = p1 + (p2 - p1) * ratio
|
||
return max(0, min(100, int(round(percent))))
|
||
return 0
|
||
|
||
|
||
# ==================== 靶心检测与距离计算 ====================
|
||
|
||
def detect_circle(frame):
|
||
"""检测图像中的靶心(优先清晰轮廓,其次黄色区域)"""
|
||
global REAL_RADIUS_CM
|
||
img_cv = image.image2cv(frame, False, False)
|
||
gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY)
|
||
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
|
||
edged = cv2.Canny(blurred, 50, 150)
|
||
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
|
||
ceroded = cv2.erode(cv2.dilate(edged, kernel), kernel)
|
||
|
||
contours, _ = cv2.findContours(ceroded, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
|
||
best_center = best_radius = best_radius1 = method = None
|
||
|
||
# 方法1:基于轮廓拟合椭圆(清晰靶心)
|
||
for cnt in contours:
|
||
area = cv2.contourArea(cnt)
|
||
perimeter = cv2.arcLength(cnt, True)
|
||
if perimeter < 100 or area < 100:
|
||
continue
|
||
circularity = 4 * np.pi * area / (perimeter ** 2)
|
||
if circularity > 0.75 and len(cnt) >= 5:
|
||
center, axes, angle = cv2.fitEllipse(cnt)
|
||
radius = (axes[0] + axes[1]) / 4
|
||
best_center = (int(center[0]), int(center[1]))
|
||
best_radius = int(radius)
|
||
best_radius1 = best_radius
|
||
REAL_RADIUS_CM = 15
|
||
method = "清晰"
|
||
break
|
||
|
||
# 方法2:基于 HSV 黄色掩码(模糊靶心)
|
||
if not best_center:
|
||
hsv = cv2.cvtColor(img_cv, cv2.COLOR_BGR2HSV)
|
||
h, s, v = cv2.split(hsv)
|
||
s = np.clip(s * 2, 0, 255).astype(np.uint8)
|
||
hsv = cv2.merge((h, s, v))
|
||
lower_yellow = np.array([7, 80, 0])
|
||
upper_yellow = np.array([32, 255, 182])
|
||
mask = cv2.inRange(hsv, lower_yellow, upper_yellow)
|
||
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
|
||
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
|
||
mask = cv2.morphologyEx(mask, cv2.MORPH_DILATE, kernel)
|
||
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||
if contours:
|
||
largest = max(contours, key=cv2.contourArea)
|
||
if cv2.contourArea(largest) > 50:
|
||
(x, y), radius = cv2.minEnclosingCircle(largest)
|
||
best_center = (int(x), int(y))
|
||
best_radius = int(radius)
|
||
best_radius1 = best_radius
|
||
REAL_RADIUS_CM = 15
|
||
method = "模糊"
|
||
|
||
result_img = image.cv2image(img_cv, False, False)
|
||
return result_img, best_center, best_radius, method, best_radius1
|
||
|
||
|
||
def estimate_distance(pixel_radius):
|
||
"""根据像素半径估算实际距离(单位:米)"""
|
||
if not pixel_radius:
|
||
return 0.0
|
||
return (REAL_RADIUS_CM * FOCAL_LENGTH_PIX) / pixel_radius / 100.0
|
||
|
||
|
||
def compute_laser_position(circle_center, laser_point, radius, method):
|
||
"""计算激光相对于靶心的偏移量(单位:厘米)"""
|
||
if not all([circle_center, radius, method]):
|
||
return None, None
|
||
cx, cy = circle_center
|
||
lx, ly = laser_point
|
||
# 根据检测方法动态调整靶心物理半径(简化模型)
|
||
circle_r = (radius / 4.0) * 20.0 if method == "模糊" else (68 / 16.0) * 20.0
|
||
dx = lx - cx
|
||
dy = ly - cy
|
||
return dx / (circle_r / 100.0), -dy / (circle_r / 100.0)
|
||
|
||
|
||
# ==================== TCP 通信线程 ====================
|
||
|
||
def connect_server():
|
||
"""通过 4G 模块建立 TCP 连接"""
|
||
global tcp_connected
|
||
if tcp_connected:
|
||
return True
|
||
print("连接到服务器...")
|
||
with uart4g_lock:
|
||
at("AT+MIPCLOSE=0", "OK", 1000)
|
||
res = at(f'AT+MIPOPEN=0,"TCP","{SERVER_IP}",{SERVER_PORT}', "+MIPOPEN", 8000)
|
||
if "+MIPOPEN: 0,0" in res:
|
||
tcp_connected = True
|
||
return True
|
||
return False
|
||
|
||
raw_line_data = []
|
||
def tcp_main():
|
||
"""TCP 主通信循环:登录、心跳、处理指令、发送数据"""
|
||
global tcp_connected, high_send_queue, normal_send_queue, queue_lock, laser_calibration_active, laser_calibration_result, laser_calibration_lock, update_thread_started
|
||
send_hartbeat_fail_count = 0
|
||
while not app.need_exit():
|
||
if not connect_server():
|
||
time.sleep_ms(5000)
|
||
continue
|
||
|
||
# 发送登录包
|
||
login_data = {"deviceId": DEVICE_ID, "password": PASSWORD, "version": app_version}
|
||
if not tcp_send_raw(make_packet(1, login_data)):
|
||
tcp_connected = False
|
||
time.sleep_ms(2000)
|
||
continue
|
||
|
||
print("➡️ 登录包已发送,等待确认...")
|
||
logged_in = False
|
||
last_heartbeat_ack_time = time.ticks_ms()
|
||
last_heartbeat_send_time = time.ticks_ms()
|
||
while True:
|
||
# 接收数据(唯一来源:ATClient 解析后的 TCP payload 队列)
|
||
item = at_client.pop_tcp_payload()
|
||
if item:
|
||
# item 可能是 (link_id, payload) 或直接 payload(兼容旧队列格式)
|
||
if isinstance(item, tuple) and len(item) == 2:
|
||
link_id, payload = item
|
||
else:
|
||
link_id, payload = 0, item
|
||
|
||
# 登录阶段加一条轻量 debug,确认 ACK 是否进入队列
|
||
if not logged_in:
|
||
try:
|
||
print(f"[TCP] rx link={link_id} len={len(payload)} head={payload[:12].hex()}")
|
||
except:
|
||
pass
|
||
|
||
msg_type, body = parse_packet(payload)
|
||
|
||
# 处理登录响应
|
||
if not logged_in and msg_type == 1:
|
||
if body and body.get("cmd") == 1 and body.get("data") == "登录成功":
|
||
logged_in = True
|
||
last_heartbeat_ack_time = time.ticks_ms()
|
||
print("✅ 登录成功")
|
||
# 若存在 ota_pending.json,说明上次 OTA 已应用并重启;
|
||
# 这里以“能成功登录服务器”为 OTA 成功判据:上报 ota_ok 并删除 pending,确保只上报一次。
|
||
try:
|
||
pending_path = "/maixapp/apps/t11/ota_pending.json"
|
||
if os.path.exists(pending_path):
|
||
try:
|
||
with open(pending_path, "r", encoding="utf-8") as f:
|
||
pending_obj = json.load(f)
|
||
except:
|
||
pending_obj = {}
|
||
safe_enqueue({"result": "ota_ok", "url": pending_obj.get("url", "")}, 2)
|
||
try:
|
||
os.remove(pending_path)
|
||
except:
|
||
pass
|
||
except Exception as e:
|
||
print(f"[OTA] ota_ok 上报失败: {e}")
|
||
else:
|
||
# 登录失败,跳出重连
|
||
break
|
||
|
||
# 处理心跳 ACK
|
||
elif logged_in and msg_type == 4:
|
||
last_heartbeat_ack_time = time.ticks_ms()
|
||
print("✅ 收到心跳确认")
|
||
|
||
elif logged_in and msg_type == 40:
|
||
if isinstance(body, dict):
|
||
t = body.get('t', 0)
|
||
v = body.get('v')
|
||
raw_line_data.append(body)
|
||
if len(raw_line_data) >= int(t):
|
||
print(f"下载完成")
|
||
stock_array = list(map(lambda x: x.get('d'), raw_line_data))
|
||
with open(local_filename, 'w', encoding='utf-8') as file:
|
||
file.write("\n".join(stock_array))
|
||
apply_ota_and_reboot()
|
||
else:
|
||
safe_enqueue({'data':{'l': len(raw_line_data), 'v': v}, 'cmd': 41})
|
||
print(f"已下载{len(raw_line_data)} 全部:{t} 版本:{v}")
|
||
|
||
# 处理业务指令
|
||
elif logged_in and isinstance(body, dict):
|
||
# 重要:每个包都要重新解析 inner_cmd,避免上一次的 cmd “粘住”导致反复执行
|
||
inner_cmd = None
|
||
data_obj = body.get("data")
|
||
if isinstance(data_obj, dict):
|
||
inner_cmd = data_obj.get("cmd")
|
||
|
||
if inner_cmd == 2: # 开启激光并校准
|
||
# 幂等:正在校准则不重复触发(服务器可能重发 cmd=2)
|
||
if not laser_calibration_active:
|
||
turn_on_laser()
|
||
time.sleep_ms(100)
|
||
laser_calibration_active = True
|
||
safe_enqueue({"result": "calibrating"}, 2)
|
||
elif inner_cmd == 3: # 关闭激光
|
||
distance_serial.write(LASER_OFF_CMD)
|
||
laser_calibration_active = False
|
||
safe_enqueue({"result": "laser_off"}, 2)
|
||
elif inner_cmd == 4: # 上报电量
|
||
voltage = get_bus_voltage()
|
||
battery_percent = voltage_to_percent(voltage)
|
||
battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)}
|
||
safe_enqueue(battery_data, 2)
|
||
print(f"🔋 电量上报: {battery_percent}%")
|
||
elif inner_cmd == 5: # OTA 升级(含 Wi-Fi 配置,及4g)
|
||
inner_data = data_obj.get("data", {}) if isinstance(data_obj, dict) else {}
|
||
|
||
ssid = inner_data.get("ssid")
|
||
password = inner_data.get("password")
|
||
ota_url = inner_data.get("url")
|
||
mode = (inner_data.get("mode") or "").strip().lower() # "4g"/"wifi"/""
|
||
|
||
if not ota_url:
|
||
print("ota missing_url")
|
||
safe_enqueue({"result": "missing_url"}, 2)
|
||
continue
|
||
|
||
# 自动判断:mode 非法/为空时,优先 Wi-Fi(如果已连),否则 4G
|
||
if mode not in ("4g", "wifi"):
|
||
print("ota missing mode")
|
||
mode = "wifi" if is_wifi_connected() else "4g"
|
||
|
||
if update_thread_started:
|
||
safe_enqueue({"result": "update_already_started"}, 2)
|
||
continue
|
||
|
||
update_thread_started = True
|
||
|
||
if mode == "4g":
|
||
_thread.start_new_thread(direct_ota_download_via_4g, (ota_url,))
|
||
else:
|
||
# wifi 模式:需要 ssid/password
|
||
if not ssid or not password:
|
||
update_thread_started = False
|
||
safe_enqueue({"result": "missing_ssid_or_password"}, 2)
|
||
else:
|
||
_thread.start_new_thread(handle_wifi_and_update, (ssid, password, ota_url))
|
||
elif inner_cmd == 6:
|
||
try:
|
||
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
|
||
ip = ip if ip else "no_ip"
|
||
except:
|
||
ip = "error_getting_ip"
|
||
safe_enqueue({"result": "current_ip", "ip": ip}, 2)
|
||
elif inner_cmd == 7:
|
||
# global update_thread_started
|
||
if update_thread_started:
|
||
safe_enqueue({"result": "update_already_started"}, 2)
|
||
continue
|
||
|
||
# 实时检查是否有 IP
|
||
try:
|
||
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
|
||
except:
|
||
ip = None
|
||
|
||
if not ip:
|
||
safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, MSG_TYPE_STATUS)
|
||
else:
|
||
# 启动纯下载线程
|
||
update_thread_started = True
|
||
_thread.start_new_thread(direct_ota_download, ())
|
||
else:
|
||
# 非指令包(或未携带 cmd),不做任何动作
|
||
pass
|
||
else:
|
||
time.sleep_ms(5)
|
||
|
||
# 发送队列中的业务数据
|
||
if logged_in and (high_send_queue or normal_send_queue):
|
||
# 只在锁内取出一个待发包,发送放到锁外,避免长时间占用队列锁
|
||
msg_type = None
|
||
data_dict = None
|
||
if queue_lock.try_acquire():
|
||
try:
|
||
if high_send_queue:
|
||
msg_type, data_dict = high_send_queue.pop(0)
|
||
elif normal_send_queue:
|
||
msg_type, data_dict = normal_send_queue.pop(0)
|
||
finally:
|
||
queue_lock.release()
|
||
|
||
if msg_type is not None and data_dict is not None:
|
||
pkt = make_packet(msg_type, data_dict)
|
||
if not tcp_send_raw(pkt):
|
||
tcp_connected = False
|
||
break
|
||
|
||
# 发送激光校准结果
|
||
if logged_in and laser_calibration_result is not None:
|
||
x = y = None
|
||
with laser_calibration_lock:
|
||
if laser_calibration_result is not None:
|
||
x, y = laser_calibration_result
|
||
laser_calibration_result = None
|
||
if x is not None and y is not None:
|
||
safe_enqueue({"result": "ok", "x": x, "y": y}, 2)
|
||
|
||
# 定期发送心跳
|
||
current_time = time.ticks_ms()
|
||
if logged_in and current_time - last_heartbeat_send_time > HEARTBEAT_INTERVAL * 1000:
|
||
vol_val = get_bus_voltage()
|
||
if not tcp_send_raw(make_packet(4, {"vol":vol_val, "vol_per":voltage_to_percent(vol_val)})):
|
||
print("💔 心跳发送失败")
|
||
send_hartbeat_fail_count += 1
|
||
if send_hartbeat_fail_count >= 3:
|
||
send_hartbeat_fail_count = 0
|
||
print("连续3次发送心跳失败,重连")
|
||
break
|
||
else:
|
||
continue
|
||
else:
|
||
send_hartbeat_fail_count = 0
|
||
last_heartbeat_send_time = current_time
|
||
print("💓 心跳已发送")
|
||
|
||
# 心跳超时重连
|
||
if logged_in and current_time - last_heartbeat_ack_time > 1000*60*10: # 十分钟
|
||
print("⏰ 十分钟无心跳ACK,重连")
|
||
break
|
||
|
||
time.sleep_ms(50)
|
||
|
||
tcp_connected = False
|
||
print("🔌 连接异常,2秒后重连...")
|
||
time.sleep_ms(2000)
|
||
|
||
|
||
def laser_calibration_worker():
|
||
"""后台线程:持续检测是否需要执行激光校准"""
|
||
global laser_calibration_active, laser_calibration_result, laser_calibration_lock
|
||
while True:
|
||
if laser_calibration_active:
|
||
# 关键:不要在每次尝试里反复 new Camera(会导致 MMF 反复初始化刷屏)
|
||
cam = None
|
||
try:
|
||
cam = camera.Camera(640, 480)
|
||
start = time.ticks_ms()
|
||
timeout_ms = 8000 # 8 秒内找不到红点就退出一次,避免一直占用资源
|
||
while laser_calibration_active and time.ticks_diff(time.ticks_ms(), start) < timeout_ms:
|
||
frame = cam.read()
|
||
pos = find_red_laser(frame)
|
||
if pos:
|
||
with laser_calibration_lock:
|
||
laser_calibration_result = pos
|
||
laser_calibration_active = False
|
||
save_laser_point(pos)
|
||
print(f"✅ 后台校准成功: {pos}")
|
||
break
|
||
time.sleep_ms(60)
|
||
except Exception as e:
|
||
# 出错时也不要死循环刷屏
|
||
print(f"[LASER] calibration error: {e}")
|
||
time.sleep_ms(200)
|
||
finally:
|
||
try:
|
||
# 释放摄像头资源(MaixPy 通常靠 GC,但显式 del 更稳)
|
||
if cam is not None:
|
||
del cam
|
||
except:
|
||
pass
|
||
|
||
# 如果超时仍未成功,稍微休息一下再允许下一次 cmd=2 触发
|
||
if laser_calibration_active:
|
||
time.sleep_ms(300)
|
||
else:
|
||
time.sleep_ms(50)
|
||
|
||
|
||
|
||
|
||
def download_file_via_4g(url, filename,
|
||
total_timeout_ms=30000,
|
||
retries=3,
|
||
debug=False):
|
||
"""
|
||
ML307R HTTP 下载(URC content 分片模式)
|
||
- 重试:empty/incomplete/AT错误都会重试
|
||
- 超时:total_timeout_ms
|
||
- 校验:Content-Length 必须填满;如有 Content-Md5 且 hashlib 可用则校验 MD5
|
||
- 日志:默认干净;debug=True 才打印 URC 进度
|
||
"""
|
||
from urllib.parse import urlparse
|
||
parsed = urlparse(url)
|
||
host = parsed.hostname
|
||
path = parsed.path or "/"
|
||
base_url = f"http://{host}" # 你已验证 HTTP 可 200;如需 https 需另配 SSL
|
||
|
||
def _log(*args):
|
||
if debug:
|
||
print(*args)
|
||
|
||
def _merge_ranges(ranges_iter):
|
||
"""合并重叠/相邻区间,返回 merged(list[(s,e)])(半开区间)"""
|
||
rs = sorted(ranges_iter)
|
||
merged = []
|
||
for s, e in rs:
|
||
if e <= s:
|
||
continue
|
||
if merged and s <= merged[-1][1]:
|
||
merged[-1] = (merged[-1][0], max(merged[-1][1], e))
|
||
else:
|
||
merged.append((s, e))
|
||
return merged
|
||
|
||
def _compute_gaps(total_len, got_ranges):
|
||
"""根据已填充区间计算缺口(半开区间)"""
|
||
if not total_len or total_len <= 0:
|
||
return [(0, 0)]
|
||
merged = _merge_ranges(got_ranges)
|
||
gaps = []
|
||
prev = 0
|
||
for s, e in merged:
|
||
if s > prev:
|
||
gaps.append((prev, s))
|
||
prev = max(prev, e)
|
||
if prev < total_len:
|
||
gaps.append((prev, total_len))
|
||
return gaps, merged
|
||
|
||
def _extract_content_range(hdr_text: str):
|
||
"""
|
||
Content-Range: bytes <start>-<end>/<total>
|
||
返回 (start, end, total);解析失败返回 (None,None,None)
|
||
"""
|
||
m = re.search(r"Content-Range:\s*bytes\s*(\d+)\s*-\s*(\d+)\s*/\s*(\d+)", hdr_text, re.IGNORECASE)
|
||
if not m:
|
||
return None, None, None
|
||
try:
|
||
return int(m.group(1)), int(m.group(2)), int(m.group(3))
|
||
except:
|
||
return None, None, None
|
||
|
||
def _get_ip():
|
||
r = at("AT+CGPADDR=1", "OK", 3000)
|
||
m = re.search(r'\+CGPADDR:\s*1,"([^"]+)"', r)
|
||
return m.group(1) if m else ""
|
||
|
||
def _clear_http_events():
|
||
# 清空旧的 HTTP URC 事件,避免串台
|
||
while at_client.pop_http_event() is not None:
|
||
pass
|
||
|
||
# 旧版基于直接 uart4g.read 的解析已迁移到 ATClient(单读者),保留函数占位避免大改动
|
||
|
||
def _parse_httpid(raw: bytes):
|
||
m = re.search(rb"\+MHTTPCREATE:\s*(\d+)", raw)
|
||
return int(m.group(1)) if m else None
|
||
|
||
# _try_parse_header/_try_parse_one_content 已由 ATClient 在 reader 线程中解析并推送事件
|
||
|
||
# _try_parse_one_content 已由 ATClient 解析
|
||
|
||
def _extract_hdr_fields(hdr_text: str):
|
||
# Content-Length
|
||
mlen = re.search(r"Content-Length:\s*(\d+)", hdr_text, re.IGNORECASE)
|
||
clen = int(mlen.group(1)) if mlen else None
|
||
|
||
# Content-Md5 (base64)
|
||
mmd5 = re.search(r"Content-Md5:\s*([A-Za-z0-9+/=]+)", hdr_text, re.IGNORECASE)
|
||
md5_b64 = mmd5.group(1).strip() if mmd5 else None
|
||
|
||
return clen, md5_b64
|
||
|
||
def _md5_base64(data: bytes) -> str:
|
||
if hashlib is None:
|
||
return ""
|
||
digest = hashlib.md5(data).digest()
|
||
# base64: 24 chars with ==
|
||
return binascii.b2a_base64(digest).decode().strip()
|
||
|
||
def _one_attempt(range_start=None, range_end=None,
|
||
body_buf=None, got_ranges=None,
|
||
total_len=None,
|
||
expect_md5_b64=None):
|
||
# 0) PDP:确保有 IP(避免把 OK 当成功)
|
||
ip = _get_ip()
|
||
if not ip or ip == "0.0.0.0":
|
||
at("AT+MIPCALL=1,1", "OK", 15000)
|
||
for _ in range(10):
|
||
ip = _get_ip()
|
||
if ip and ip != "0.0.0.0":
|
||
break
|
||
time.sleep(1)
|
||
if not ip or ip == "0.0.0.0":
|
||
return False, "PDP not ready (no_ip)", body_buf, got_ranges, total_len, expect_md5_b64
|
||
|
||
# 1) 清理旧实例 + 清空旧 HTTP 事件
|
||
for i in range(0, 6):
|
||
at(f"AT+MHTTPDEL={i}", "OK", 1500)
|
||
_clear_http_events()
|
||
|
||
# 2) 创建实例(用 at() 等待返回)
|
||
create_resp = at(f'AT+MHTTPCREATE="{base_url}"', "OK", 8000)
|
||
httpid = _parse_httpid(create_resp.encode())
|
||
if httpid is None:
|
||
return False, "MHTTPCREATE failed (no httpid)", body_buf, got_ranges, total_len, expect_md5_b64
|
||
|
||
# 2.5) Range 补洞:按缺口请求指定字节段(HTTP Range 右端是 inclusive)
|
||
if range_start is not None and range_end is not None:
|
||
# 每次请求使用新 httpid,避免 header 累积/污染
|
||
at(f'AT+MHTTPCFG="header",{httpid},"Range: bytes={int(range_start)}-{int(range_end)}"', "OK", 3000)
|
||
|
||
# 3) 发 GET(HTTP URC 由 ATClient 解析并入队)
|
||
req_resp = at(f'AT+MHTTPREQUEST={httpid},1,0,"{path}"', "OK", 15000)
|
||
if "ERROR" in req_resp or "CME ERROR" in req_resp:
|
||
at(f"AT+MHTTPDEL={httpid}", "OK", 3000)
|
||
return False, f"MHTTPREQUEST failed: {req_resp}", body_buf, got_ranges, total_len, expect_md5_b64
|
||
|
||
# 4) 从 ATClient 的 http_events 队列收 header/content
|
||
urc_id = None
|
||
status_code = None
|
||
expect_len = None
|
||
# 若是 Range 响应(206),需要把响应内的偏移映射到“全文件”偏移
|
||
offset_base = 0
|
||
# got_ranges 记录“真实写入 body_buf 的半开区间”
|
||
if got_ranges is None:
|
||
got_ranges = set()
|
||
filled_new_bytes = 0
|
||
last_sum = 0
|
||
no_progress_count = 0 # 连续没有进展的次数
|
||
last_print_ms = time.ticks_ms()
|
||
last_print_sum = 0
|
||
|
||
t0 = time.ticks_ms()
|
||
while time.ticks_ms() - t0 < total_timeout_ms:
|
||
ev = at_client.pop_http_event()
|
||
if not ev:
|
||
# 如果 sum 已经达到 total_len,但仍有 gaps,等待更长时间(有些分片可能延迟到达)
|
||
if total_len and last_sum >= total_len:
|
||
gaps_now, merged_now = _compute_gaps(total_len, got_ranges)
|
||
if gaps_now and not (len(gaps_now) == 1 and gaps_now[0] == (0, 0)):
|
||
time.sleep_ms(50)
|
||
else:
|
||
time.sleep_ms(5)
|
||
else:
|
||
time.sleep_ms(5)
|
||
no_progress_count += 1
|
||
# 如果长时间没有新事件,且 sum 已经达到 total_len,认为接收完成(可能有丢包)
|
||
if no_progress_count > 100 and total_len and last_sum >= total_len:
|
||
break
|
||
continue
|
||
|
||
no_progress_count = 0 # 有事件,重置计数器
|
||
|
||
if ev[0] == "header":
|
||
_, hid, code, hdr_text = ev
|
||
if urc_id is None:
|
||
urc_id = hid
|
||
if hid != urc_id:
|
||
continue
|
||
status_code = code
|
||
expect_len, md5_b64 = _extract_hdr_fields(hdr_text)
|
||
# 只在“首次全量 header”里保留 Content-Md5;Range 响应通常不带该字段
|
||
if md5_b64:
|
||
expect_md5_b64 = md5_b64
|
||
|
||
cr_s, cr_e, cr_total = _extract_content_range(hdr_text)
|
||
if cr_s is not None and cr_total is not None:
|
||
# 206 Partial Content
|
||
offset_base = cr_s
|
||
# Content-Range end 是 inclusive;总长度以 total 为准
|
||
if total_len is None:
|
||
total_len = cr_total
|
||
elif total_len != cr_total:
|
||
_log(f"[WARN] total_len changed {total_len}->{cr_total}")
|
||
total_len = cr_total
|
||
if body_buf is None and total_len:
|
||
body_buf = bytearray(total_len)
|
||
|
||
_log(f"[HDR] id={hid} code={code} len={expect_len} md5={expect_md5_b64}")
|
||
continue
|
||
|
||
if ev[0] == "content":
|
||
_, cid, _total, _sum, _cur, payload = ev
|
||
if urc_id is None:
|
||
urc_id = cid
|
||
if cid != urc_id:
|
||
continue
|
||
|
||
# 全量 200:这里的 _total 就是全文件长度;Range 206:_total 可能只是“本次响应体长度”
|
||
if body_buf is None:
|
||
# 如果 header 没解析出 Content-Range,总长度用 content 的 _total
|
||
if total_len is None:
|
||
total_len = _total
|
||
if total_len:
|
||
body_buf = bytearray(total_len)
|
||
if body_buf is None or total_len is None:
|
||
continue
|
||
|
||
rel_start = _sum - _cur
|
||
rel_end = _sum
|
||
abs_start = offset_base + rel_start
|
||
abs_end = offset_base + rel_end
|
||
if abs_start < 0 or abs_start >= total_len:
|
||
continue
|
||
if abs_end < abs_start:
|
||
continue
|
||
if abs_end > total_len:
|
||
abs_end = total_len
|
||
|
||
expected_span = abs_end - abs_start
|
||
actual_len = min(len(payload), expected_span)
|
||
if actual_len <= 0:
|
||
continue
|
||
|
||
# 写入并记录“实际写入区间”,用于 gap 计算
|
||
body_buf[abs_start:abs_start + actual_len] = payload[:actual_len]
|
||
got_ranges.add((abs_start, abs_start + actual_len))
|
||
filled_new_bytes += actual_len
|
||
|
||
# 记录最大的 sum 值,用于判断是否所有数据都已发送
|
||
if _sum > last_sum:
|
||
last_sum = _sum
|
||
|
||
# debug 输出节流:每 ~8000 字节或 >=500ms 输出一次,避免 print 导致 UART 丢包
|
||
if debug:
|
||
now = time.ticks_ms()
|
||
if (time.ticks_diff(now, last_print_ms) >= 500) or (_sum - last_print_sum >= 8000) or (rel_end == _total):
|
||
_log(f"[URC] {abs_start}:{abs_start+actual_len} sum={_sum}/{_total} base={offset_base} +{filled_new_bytes}")
|
||
last_print_ms = now
|
||
last_print_sum = _sum
|
||
|
||
# 若是全量请求(offset_base=0 且 total_len==_total),尽早结束
|
||
if offset_base == 0 and total_len == _total:
|
||
# 不要用 filled_new_bytes 判断是否完整(可能有重叠)
|
||
pass
|
||
|
||
# 5) 清理实例
|
||
at(f"AT+MHTTPDEL={httpid}", "OK", 3000)
|
||
|
||
if body_buf is None:
|
||
return False, "empty_body", body_buf, got_ranges, total_len, expect_md5_b64
|
||
if total_len is None:
|
||
return False, "no_total_len", body_buf, got_ranges, total_len, expect_md5_b64
|
||
|
||
# 返回“本次尝试是否有实质进展”:Range 补洞时,哪怕不完整也算成功推进
|
||
if filled_new_bytes <= 0:
|
||
return False, "no_progress", body_buf, got_ranges, total_len, expect_md5_b64
|
||
return True, f"PARTIAL ok +{filled_new_bytes} ip={ip} code={status_code}", body_buf, got_ranges, total_len, expect_md5_b64
|
||
|
||
global ota_in_progress
|
||
ota_in_progress = True
|
||
with uart4g_lock:
|
||
try:
|
||
# -------- Phase 1: 全量 GET(允许不完整,后面用 Range 补洞)--------
|
||
body_buf = None
|
||
got_ranges = set()
|
||
total_len = None
|
||
expect_md5_b64 = None
|
||
|
||
last_err = "unknown"
|
||
for attempt in range(1, retries + 1):
|
||
ok, msg, body_buf, got_ranges, total_len, expect_md5_b64 = _one_attempt(
|
||
body_buf=body_buf, got_ranges=got_ranges, total_len=total_len, expect_md5_b64=expect_md5_b64
|
||
)
|
||
last_err = msg
|
||
if not ok:
|
||
_log(f"[RETRY] full attempt={attempt} failed={msg}")
|
||
time.sleep_ms(200)
|
||
continue
|
||
|
||
gaps, merged = _compute_gaps(total_len, got_ranges)
|
||
filled_total = sum(e - s for s, e in merged)
|
||
if gaps and gaps[0] == (0, 0):
|
||
gaps = []
|
||
if not gaps:
|
||
break
|
||
_log(f"[GAPS] after full attempt={attempt} filled={filled_total}/{total_len} gaps={gaps[:3]}")
|
||
time.sleep_ms(150)
|
||
|
||
if body_buf is None or total_len is None:
|
||
return False, f"FAILED: {last_err}"
|
||
|
||
gaps, merged = _compute_gaps(total_len, got_ranges)
|
||
if gaps and gaps[0] == (0, 0):
|
||
gaps = []
|
||
|
||
# -------- Phase 2: Range 补洞 --------
|
||
# 限制单次 Range 的最大长度(太大也会触发同样的 UART 压力)
|
||
MAX_RANGE_BYTES = 8192
|
||
RANGE_RETRIES_EACH = 2
|
||
MAX_HOLE_ROUNDS = 10
|
||
|
||
round_i = 0
|
||
while gaps and round_i < MAX_HOLE_ROUNDS:
|
||
round_i += 1
|
||
# 优先补最大的洞(通常只丢中间一两段)
|
||
gaps = sorted(gaps, key=lambda g: g[1] - g[0], reverse=True)
|
||
_log(f"[RANGE] round={round_i} gaps={gaps[:3]}")
|
||
|
||
progress_any = False
|
||
# 每轮最多补前 5 个洞,避免无限循环
|
||
for (gs, ge) in gaps[:5]:
|
||
cur = gs
|
||
while cur < ge:
|
||
sub_end = min(ge, cur + MAX_RANGE_BYTES)
|
||
# HTTP Range end is inclusive
|
||
rs = cur
|
||
re_incl = sub_end - 1
|
||
|
||
before_gaps, before_merged = _compute_gaps(total_len, got_ranges)
|
||
before_filled = sum(e - s for s, e in before_merged)
|
||
|
||
sub_ok = False
|
||
sub_err = "unknown"
|
||
for k in range(1, RANGE_RETRIES_EACH + 1):
|
||
ok2, msg2, body_buf, got_ranges, total_len, expect_md5_b64 = _one_attempt(
|
||
range_start=rs, range_end=re_incl,
|
||
body_buf=body_buf, got_ranges=got_ranges, total_len=total_len, expect_md5_b64=expect_md5_b64
|
||
)
|
||
sub_err = msg2
|
||
if ok2:
|
||
sub_ok = True
|
||
break
|
||
_log(f"[RETRY] range {rs}-{re_incl} try={k} failed={msg2}")
|
||
time.sleep_ms(120)
|
||
|
||
after_gaps, after_merged = _compute_gaps(total_len, got_ranges)
|
||
after_filled = sum(e - s for s, e in after_merged)
|
||
if after_filled > before_filled:
|
||
progress_any = True
|
||
|
||
if not sub_ok:
|
||
_log(f"[WARN] range {rs}-{re_incl} failed={sub_err}")
|
||
|
||
# 小歇一下,给读线程喘息
|
||
time.sleep_ms(80)
|
||
cur = sub_end
|
||
|
||
gaps, merged = _compute_gaps(total_len, got_ranges)
|
||
if gaps and gaps[0] == (0, 0):
|
||
gaps = []
|
||
|
||
filled_total = sum(e - s for s, e in merged)
|
||
if not gaps:
|
||
break
|
||
if not progress_any:
|
||
# 本轮没有推进,退出避免死循环
|
||
_log(f"[RANGE] no progress in round={round_i}, stop. filled={filled_total}/{total_len}")
|
||
break
|
||
_log(f"[RANGE] round={round_i} filled={filled_total}/{total_len} gaps={gaps[:3]}")
|
||
|
||
# 完整性检查
|
||
gaps, merged = _compute_gaps(total_len, got_ranges)
|
||
if gaps and gaps[0] == (0, 0):
|
||
gaps = []
|
||
filled_total = sum(e - s for s, e in merged)
|
||
if gaps:
|
||
return False, f"incomplete_body got={filled_total} expected={total_len} missing={total_len - filled_total} gaps={gaps[:5]}"
|
||
|
||
data = bytes(body_buf)
|
||
|
||
# 校验:Content-Md5(base64)(若有)
|
||
if expect_md5_b64 and hashlib is not None:
|
||
md5_b64 = _md5_base64(data)
|
||
if md5_b64 != expect_md5_b64:
|
||
return False, f"md5_mismatch got={md5_b64} expected={expect_md5_b64}"
|
||
|
||
# 写文件(原样 bytes)
|
||
with open(filename, "wb") as f:
|
||
f.write(data)
|
||
|
||
return True, f"OK size={len(data)} ip={_get_ip()} md5={expect_md5_b64 or ''}"
|
||
finally:
|
||
ota_in_progress = False
|
||
|
||
|
||
def direct_ota_download_via_4g(ota_url):
|
||
"""通过 4G 模块下载 OTA(不需要 Wi-Fi)"""
|
||
global update_thread_started
|
||
try:
|
||
if not ota_url:
|
||
safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2)
|
||
return
|
||
|
||
print(f"[OTA-4G] 开始通过 4G 下载: {ota_url}")
|
||
success, msg = download_file_via_4g(ota_url, local_filename, debug=True)
|
||
print(f"[OTA-4G] {msg}")
|
||
|
||
if success and "OK" in msg:
|
||
# 下载成功:备份+替换+重启
|
||
if apply_ota_and_reboot(ota_url):
|
||
return # 会重启,不会执行到 finally
|
||
else:
|
||
safe_enqueue({"result": msg}, 2)
|
||
|
||
except Exception as e:
|
||
error_msg = f"OTA-4G 异常: {str(e)}"
|
||
print(error_msg)
|
||
safe_enqueue({"result": "ota_failed", "reason": error_msg}, 2)
|
||
finally:
|
||
update_thread_started = False
|
||
|
||
|
||
# ==================== 主程序入口 ====================
|
||
|
||
def cmd_str():
|
||
global DEVICE_ID, PASSWORD
|
||
|
||
DEVICE_ID = read_device_id()
|
||
PASSWORD = DEVICE_ID + "."
|
||
|
||
# 创建照片存储目录
|
||
photo_dir = "/root/phot"
|
||
if photo_dir not in os.listdir("/root"):
|
||
try:
|
||
os.mkdir(photo_dir)
|
||
except:
|
||
pass
|
||
|
||
# 初始化硬件
|
||
init_ina226()
|
||
load_laser_point()
|
||
disp = display.Display()
|
||
cam = camera.Camera(640, 480)
|
||
|
||
# 启动通信与校准线程
|
||
# from tcp_handler import tcp_main
|
||
_thread.start_new_thread(tcp_main, ())
|
||
_thread.start_new_thread(laser_calibration_worker, ())
|
||
|
||
print("系统准备完成...")
|
||
|
||
last_adc_trigger = 0
|
||
|
||
# 主循环:检测扳机触发 → 拍照 → 分析 → 上报
|
||
while not app.need_exit():
|
||
current_time = time.ticks_ms()
|
||
# print("压力传感器数值: ", adc_obj.read())
|
||
adc_val = adc_obj.read()
|
||
# if adc_val > 2400:
|
||
# print(f"adc: {adc_val}")
|
||
if adc_val > ADC_TRIGGER_THRESHOLD:
|
||
diff_ms = current_time-last_adc_trigger
|
||
if diff_ms<3000:
|
||
continue
|
||
last_adc_trigger = current_time
|
||
time.sleep_ms(60) # 防抖
|
||
frame = cam.read()
|
||
x, y = laser_point
|
||
|
||
# 绘制激光十字线
|
||
frame.draw_line(int(x - length), int(y), int(x + length), int(y), color, thickness)
|
||
frame.draw_line(int(x), int(y - length), int(x), int(y + length), color, thickness)
|
||
frame.draw_circle(int(x), int(y), 1, color, thickness)
|
||
|
||
# 检测靶心
|
||
result_img, center, radius, method, best_radius1 = detect_circle(frame)
|
||
disp.show(result_img)
|
||
|
||
# 计算偏移与距离
|
||
dx, dy = compute_laser_position(center, (x, y), radius, method)
|
||
distance_m = estimate_distance(best_radius1)
|
||
|
||
# 读取电量
|
||
voltage = get_bus_voltage()
|
||
battery_percent = voltage_to_percent(voltage)
|
||
|
||
# 保存图像(带标注)
|
||
try:
|
||
jpg_count = len([f for f in os.listdir(photo_dir) if f.endswith('.jpg')])
|
||
filename = f"{photo_dir}/{int(x)}_{int(y)}_{round((distance_m or 0.0) * 100)}_{method}_{jpg_count:04d}.jpg"
|
||
result_img.save(filename, quality=70)
|
||
except Exception as e:
|
||
print(f"❌ 保存失败: {e}")
|
||
|
||
# 构造上报数据
|
||
inner_data = {
|
||
"x": float(dx) if dx is not None else 200.0,
|
||
"y": float(dy) if dy is not None else 200.0,
|
||
"r": 90.0,
|
||
"d": round((distance_m or 0.0) * 100), # 距离(厘米)
|
||
"m": method,
|
||
"adc": adc_val
|
||
}
|
||
report_data = {"cmd": 1, "data": inner_data}
|
||
# 射箭事件高优先级入队,由 tcp_main 统一发送
|
||
safe_enqueue(report_data, msg_type=2, high=True)
|
||
print("📤 射箭事件已加入发送队列")
|
||
|
||
# 闪一下激光(射箭反馈)
|
||
# TODO: remove after test done
|
||
flash_laser(1000) # 闪300ms,可以根据需要调整时长
|
||
|
||
time.sleep_ms(100)
|
||
else:
|
||
disp.show(cam.read())
|
||
time.sleep_ms(50)
|
||
|
||
def dump_system_info():
|
||
cmds = [
|
||
"uname -a",
|
||
"cat /etc/os-release 2>/dev/null",
|
||
"cat /proc/version 2>/dev/null",
|
||
"cat /proc/1/comm 2>/dev/null",
|
||
"ps 2>/dev/null | head -n 5",
|
||
"ls -l /sbin/init 2>/dev/null",
|
||
"ls -l /etc/init.d 2>/dev/null | head -n 10",
|
||
"which systemctl 2>/dev/null",
|
||
"which rc-service 2>/dev/null",
|
||
"which busybox 2>/dev/null && busybox | head -n 1",
|
||
"ls /dev/watchdog* 2>/dev/null",
|
||
]
|
||
for c in cmds:
|
||
try:
|
||
out = os.popen(c).read()
|
||
print("\n$ " + c + "\n" + (out.strip() or "<empty>"))
|
||
except Exception as e:
|
||
print("\n$ " + c + "\n<error> " + str(e))
|
||
|
||
if __name__ == "__main__":
|
||
# dump_system_info()
|
||
# try:
|
||
# import threading
|
||
# print("threading module:", threading)
|
||
# print("has Lock:", hasattr(threading, "Lock"))
|
||
# if hasattr(threading, "Lock"):
|
||
# print("has lock")
|
||
# finally:
|
||
# pass
|
||
# import config
|
||
# print("env: ", config.get_env())
|
||
cmd_str() |