Compare commits
2 Commits
669d032f96
...
708925ab41
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
708925ab41 | ||
|
|
92ad32bb8e |
79
S99archery
Normal file
79
S99archery
Normal file
@@ -0,0 +1,79 @@
|
||||
#!/bin/sh
|
||||
# /etc/init.d/S99archery
|
||||
# 系统启动时处理致命错误恢复(仅处理无法启动的情况)
|
||||
# 注意:应用的启动由系统自动启动机制处理(通过 auto_start.txt)
|
||||
# 功能:
|
||||
# 1. 处理致命错误(无法启动)- 恢复 main.py
|
||||
# 2. 如果重启次数超过阈值,恢复 main.py 并重启系统
|
||||
|
||||
APP_DIR="/maixapp/apps/t11"
|
||||
MAIN_PY="$APP_DIR/main.py"
|
||||
PENDING_FILE="$APP_DIR/ota_pending.json"
|
||||
BACKUP_BASE="$APP_DIR/backups"
|
||||
|
||||
# 进入应用目录
|
||||
cd "$APP_DIR" || exit 0
|
||||
|
||||
# 检查 pending 文件,如果存在且超过重启次数,恢复 main.py(处理致命错误)
|
||||
if [ -f "$PENDING_FILE" ]; then
|
||||
echo "[S99] 检测到 ota_pending.json,检查重启计数..."
|
||||
|
||||
# 尝试从JSON中提取重启计数(使用grep简单提取)
|
||||
RESTART_COUNT=$(cat "$PENDING_FILE" 2>/dev/null | grep -o '"restart_count":[0-9]*' | grep -o '[0-9]*' || echo "0")
|
||||
MAX_RESTARTS=$(cat "$PENDING_FILE" 2>/dev/null | grep -o '"max_restarts":[0-9]*' | grep -o '[0-9]*' || echo "3")
|
||||
|
||||
if [ -n "$RESTART_COUNT" ] && [ "$RESTART_COUNT" -ge "$MAX_RESTARTS" ]; then
|
||||
echo "[S99] 检测到重启次数 ($RESTART_COUNT) 超过阈值 ($MAX_RESTARTS),恢复 main.py..."
|
||||
|
||||
# 尝试从JSON中提取备份目录
|
||||
BACKUP_DIR=$(cat "$PENDING_FILE" 2>/dev/null | grep -o '"backup_dir":"[^"]*"' | grep -o '/[^"]*' || echo "")
|
||||
|
||||
if [ -n "$BACKUP_DIR" ] && [ -f "$BACKUP_DIR/main.py" ]; then
|
||||
# 使用指定的备份目录
|
||||
echo "[S99] 从备份目录恢复: $BACKUP_DIR/main.py"
|
||||
cp "$BACKUP_DIR/main.py" "$MAIN_PY" 2>/dev/null && echo "[S99] 已恢复 main.py"
|
||||
else
|
||||
# 查找最新的备份目录
|
||||
LATEST_BACKUP=$(ls -dt "$BACKUP_BASE"/backup_* 2>/dev/null | head -1)
|
||||
if [ -n "$LATEST_BACKUP" ] && [ -f "$LATEST_BACKUP/main.py" ]; then
|
||||
echo "[S99] 从最新备份恢复: $LATEST_BACKUP/main.py"
|
||||
cp "$LATEST_BACKUP/main.py" "$MAIN_PY" 2>/dev/null && echo "[S99] 已恢复 main.py"
|
||||
else
|
||||
# 如果没有备份目录,尝试使用 main.py.bak
|
||||
if [ -f "$APP_DIR/main.py.bak" ]; then
|
||||
echo "[S99] 从 main.py.bak 恢复"
|
||||
cp "$APP_DIR/main.py.bak" "$MAIN_PY" 2>/dev/null && echo "[S99] 已恢复 main.py"
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
|
||||
# 恢复后重置重启计数,避免循环恢复
|
||||
# 注意:不在这里删除 pending 文件,让 main.py 在心跳成功后删除
|
||||
# 但是重置重启计数,以便恢复后的版本可以重新开始计数
|
||||
python3 -c "
|
||||
import json, os
|
||||
try:
|
||||
pending_path = '$PENDING_FILE'
|
||||
if os.path.exists(pending_path):
|
||||
with open(pending_path, 'r', encoding='utf-8') as f:
|
||||
d = json.load(f)
|
||||
d['restart_count'] = 0 # 重置重启计数
|
||||
with open(pending_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(d, f)
|
||||
print('[S99] 已重置重启计数为 0')
|
||||
except Exception as e:
|
||||
print(f'[S99] 重置重启计数失败: {e}')
|
||||
" 2>/dev/null || echo "[S99] 无法重置重启计数(可能需要Python支持)"
|
||||
|
||||
echo "[S99] 已恢复 main.py,重启系统..."
|
||||
echo "[S99] 注意:pending 文件将在心跳成功后由 main.py 删除"
|
||||
sleep 2
|
||||
reboot
|
||||
exit 0
|
||||
fi
|
||||
fi
|
||||
|
||||
# 不启动应用,让系统自动启动机制处理
|
||||
# 这个脚本只负责处理致命错误恢复
|
||||
exit 0
|
||||
|
||||
307
at_client.py
Normal file
307
at_client.py
Normal file
@@ -0,0 +1,307 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
AT客户端模块
|
||||
负责4G模块的AT命令通信和URC解析
|
||||
"""
|
||||
import _thread
|
||||
from maix import time
|
||||
import re
|
||||
import threading
|
||||
|
||||
class ATClient:
|
||||
"""
|
||||
单读者 AT/URC 客户端:唯一读取 uart4g,避免 tcp_main/at()/OTA 抢读导致 EOF / 丢包。
|
||||
- send(cmd, expect, timeout_ms) : 发送 AT 并等待 expect
|
||||
- pop_tcp_payload() : 获取 +MIPURC:"rtcp" 的 payload(已按长度裁剪)
|
||||
- pop_http_event() : 获取 +MHTTPURC 事件(header/content)
|
||||
"""
|
||||
def __init__(self, uart_obj):
|
||||
self.uart = uart_obj
|
||||
self._cmd_lock = threading.Lock()
|
||||
self._q_lock = threading.Lock()
|
||||
self._rx = b""
|
||||
self._tcp_payloads = []
|
||||
self._http_events = []
|
||||
|
||||
# 当前命令等待状态(仅允许单命令 in-flight)
|
||||
self._waiting = False
|
||||
self._expect = b"OK"
|
||||
self._resp = b""
|
||||
|
||||
self._running = False
|
||||
|
||||
def start(self):
|
||||
if self._running:
|
||||
return
|
||||
self._running = True
|
||||
_thread.start_new_thread(self._reader_loop, ())
|
||||
|
||||
def stop(self):
|
||||
self._running = False
|
||||
|
||||
def flush(self):
|
||||
"""清空内部缓存与队列(用于 OTA/异常恢复)"""
|
||||
with self._q_lock:
|
||||
self._rx = b""
|
||||
self._tcp_payloads.clear()
|
||||
self._http_events.clear()
|
||||
self._resp = b""
|
||||
|
||||
def pop_tcp_payload(self):
|
||||
with self._q_lock:
|
||||
if self._tcp_payloads:
|
||||
return self._tcp_payloads.pop(0)
|
||||
return None
|
||||
|
||||
def pop_http_event(self):
|
||||
with self._q_lock:
|
||||
if self._http_events:
|
||||
return self._http_events.pop(0)
|
||||
return None
|
||||
|
||||
def _push_tcp_payload(self, payload: bytes):
|
||||
# 注意:在 _reader_loop 内部解析 URC 时已经持有 _q_lock,
|
||||
# 这里不要再次 acquire(锁不可重入,会死锁)。
|
||||
self._tcp_payloads.append(payload)
|
||||
|
||||
def _push_http_event(self, ev):
|
||||
# 同上:避免在 _reader_loop 持锁期间二次 acquire
|
||||
self._http_events.append(ev)
|
||||
|
||||
def send(self, cmd: str, expect: str = "OK", timeout_ms: int = 2000):
|
||||
"""
|
||||
发送 AT 命令并等待 expect(子串匹配)。
|
||||
注意:expect=">" 用于等待 prompt。
|
||||
"""
|
||||
expect_b = expect.encode() if isinstance(expect, str) else expect
|
||||
with self._cmd_lock:
|
||||
# 初始化等待
|
||||
self._waiting = True
|
||||
self._expect = expect_b
|
||||
self._resp = b""
|
||||
|
||||
# 发送
|
||||
if cmd:
|
||||
# 注意:这里不要再用 uart4g_lock(否则外层已经持锁时会死锁)。
|
||||
# 写入由 _cmd_lock 串行化即可。
|
||||
self.uart.write((cmd + "\r\n").encode())
|
||||
|
||||
t0 = time.ticks_ms()
|
||||
while time.ticks_ms() - t0 < timeout_ms:
|
||||
if (not self._waiting) or (self._expect in self._resp):
|
||||
self._waiting = False
|
||||
break
|
||||
time.sleep_ms(5)
|
||||
|
||||
# 超时也返回已收集内容(便于诊断)
|
||||
self._waiting = False
|
||||
try:
|
||||
return self._resp.decode(errors="ignore")
|
||||
except:
|
||||
return str(self._resp)
|
||||
|
||||
def _find_urc_tag(self, tag: bytes):
|
||||
"""
|
||||
只在"真正的 URC 边界"查找 tag,避免误命中 HTTP payload 内容。
|
||||
规则:tag 必须出现在 buffer 开头,或紧跟在 b"\\r\\n" 后面。
|
||||
"""
|
||||
try:
|
||||
i = 0
|
||||
rx = self._rx
|
||||
while True:
|
||||
j = rx.find(tag, i)
|
||||
if j < 0:
|
||||
return -1
|
||||
if j == 0:
|
||||
return 0
|
||||
if j >= 2 and rx[j - 2:j] == b"\r\n":
|
||||
return j
|
||||
i = j + 1
|
||||
except:
|
||||
return -1
|
||||
|
||||
def _parse_mipurc_rtcp(self):
|
||||
"""
|
||||
解析:+MIPURC: "rtcp",<link_id>,<len>,<payload...>
|
||||
之前硬编码 link_id=0 会导致在多连接/重连场景下收不到数据。
|
||||
"""
|
||||
prefix = b'+MIPURC: "rtcp",'
|
||||
i = self._find_urc_tag(prefix)
|
||||
if i < 0:
|
||||
return False
|
||||
# 丢掉前置噪声
|
||||
if i > 0:
|
||||
self._rx = self._rx[i:]
|
||||
i = 0
|
||||
|
||||
j = len(prefix)
|
||||
# 解析 link_id
|
||||
k = j
|
||||
while k < len(self._rx) and 48 <= self._rx[k] <= 57:
|
||||
k += 1
|
||||
if k == j or k >= len(self._rx):
|
||||
return False
|
||||
if self._rx[k:k+1] != b",":
|
||||
self._rx = self._rx[1:]
|
||||
return True
|
||||
try:
|
||||
link_id = int(self._rx[j:k].decode())
|
||||
except:
|
||||
self._rx = self._rx[1:]
|
||||
return True
|
||||
|
||||
# 解析 len
|
||||
j2 = k + 1
|
||||
k2 = j2
|
||||
while k2 < len(self._rx) and 48 <= self._rx[k2] <= 57:
|
||||
k2 += 1
|
||||
if k2 == j2 or k2 >= len(self._rx):
|
||||
return False
|
||||
if self._rx[k2:k2+1] != b",":
|
||||
self._rx = self._rx[1:]
|
||||
return True
|
||||
try:
|
||||
n = int(self._rx[j2:k2].decode())
|
||||
except:
|
||||
self._rx = self._rx[1:]
|
||||
return True
|
||||
|
||||
payload_start = k2 + 1
|
||||
payload_end = payload_start + n
|
||||
if len(self._rx) < payload_end:
|
||||
return False # payload 未收齐
|
||||
|
||||
payload = self._rx[payload_start:payload_end]
|
||||
# 把 link_id 一起带上,便于上层过滤(如果需要)
|
||||
self._push_tcp_payload((link_id, payload))
|
||||
self._rx = self._rx[payload_end:]
|
||||
return True
|
||||
|
||||
def _parse_mhttpurc_header(self):
|
||||
tag = b'+MHTTPURC: "header",'
|
||||
i = self._find_urc_tag(tag)
|
||||
if i < 0:
|
||||
return False
|
||||
if i > 0:
|
||||
self._rx = self._rx[i:]
|
||||
i = 0
|
||||
|
||||
# header: +MHTTPURC: "header",<id>,<code>,<hdr_len>,<hdr_text...>
|
||||
j = len(tag)
|
||||
comma_count = 0
|
||||
k = j
|
||||
while k < len(self._rx) and comma_count < 3:
|
||||
if self._rx[k:k+1] == b",":
|
||||
comma_count += 1
|
||||
k += 1
|
||||
if comma_count < 3:
|
||||
return False
|
||||
|
||||
prefix = self._rx[:k]
|
||||
m = re.search(rb'\+MHTTPURC: "header",\s*(\d+),\s*(\d+),\s*(\d+),', prefix)
|
||||
if not m:
|
||||
self._rx = self._rx[1:]
|
||||
return True
|
||||
urc_id = int(m.group(1))
|
||||
code = int(m.group(2))
|
||||
hdr_len = int(m.group(3))
|
||||
|
||||
text_start = k
|
||||
text_end = text_start + hdr_len
|
||||
if len(self._rx) < text_end:
|
||||
return False
|
||||
|
||||
hdr_text = self._rx[text_start:text_end].decode("utf-8", "ignore")
|
||||
self._push_http_event(("header", urc_id, code, hdr_text))
|
||||
self._rx = self._rx[text_end:]
|
||||
return True
|
||||
|
||||
def _parse_mhttpurc_content(self):
|
||||
tag = b'+MHTTPURC: "content",'
|
||||
i = self._find_urc_tag(tag)
|
||||
if i < 0:
|
||||
return False
|
||||
if i > 0:
|
||||
self._rx = self._rx[i:]
|
||||
i = 0
|
||||
|
||||
# content: +MHTTPURC: "content",<id>,<total>,<sum>,<cur>,<payload...>
|
||||
j = len(tag)
|
||||
comma_count = 0
|
||||
k = j
|
||||
while k < len(self._rx) and comma_count < 4:
|
||||
if self._rx[k:k+1] == b",":
|
||||
comma_count += 1
|
||||
k += 1
|
||||
if comma_count < 4:
|
||||
return False
|
||||
|
||||
prefix = self._rx[:k]
|
||||
m = re.search(rb'\+MHTTPURC: "content",\s*(\d+),\s*(\d+),\s*(\d+),\s*(\d+),', prefix)
|
||||
if not m:
|
||||
self._rx = self._rx[1:]
|
||||
return True
|
||||
urc_id = int(m.group(1))
|
||||
total_len = int(m.group(2))
|
||||
sum_len = int(m.group(3))
|
||||
cur_len = int(m.group(4))
|
||||
|
||||
payload_start = k
|
||||
payload_end = payload_start + cur_len
|
||||
if len(self._rx) < payload_end:
|
||||
return False
|
||||
|
||||
payload = self._rx[payload_start:payload_end]
|
||||
self._push_http_event(("content", urc_id, total_len, sum_len, cur_len, payload))
|
||||
self._rx = self._rx[payload_end:]
|
||||
return True
|
||||
|
||||
def _reader_loop(self):
|
||||
while self._running:
|
||||
# 关键:UART 驱动偶发 read failed,必须兜住,否则线程挂了 OTA/TCP 都会卡死
|
||||
try:
|
||||
d = self.uart.read(4096) # 8192 在一些驱动上更容易触发 read failed
|
||||
except Exception as e:
|
||||
try:
|
||||
print("[ATClient] uart read failed:", e)
|
||||
except:
|
||||
pass
|
||||
time.sleep_ms(50)
|
||||
continue
|
||||
|
||||
if not d:
|
||||
time.sleep_ms(1)
|
||||
continue
|
||||
|
||||
with self._q_lock:
|
||||
self._rx += d
|
||||
if self._waiting:
|
||||
self._resp += d
|
||||
|
||||
while True:
|
||||
progressed = (
|
||||
self._parse_mipurc_rtcp()
|
||||
or self._parse_mhttpurc_header()
|
||||
or self._parse_mhttpurc_content()
|
||||
)
|
||||
if not progressed:
|
||||
break
|
||||
|
||||
# 使用 ota_manager 访问 ota_in_progress
|
||||
try:
|
||||
from ota_manager import ota_manager
|
||||
ota_flag = ota_manager.ota_in_progress
|
||||
except:
|
||||
ota_flag = False
|
||||
|
||||
has_http_hint = (b"+MHTTP" in self._rx) or (b"+MHTTPURC" in self._rx)
|
||||
if ota_flag or has_http_hint:
|
||||
if len(self._rx) > 512 * 1024:
|
||||
self._rx = self._rx[-256 * 1024:]
|
||||
else:
|
||||
if len(self._rx) > 16384:
|
||||
self._rx = self._rx[-4096:]
|
||||
|
||||
|
||||
|
||||
81
config.py
Normal file
81
config.py
Normal file
@@ -0,0 +1,81 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
系统配置常量
|
||||
这些值在程序运行期间基本不变,或只在配置时改变
|
||||
"""
|
||||
from version import VERSION
|
||||
|
||||
# ==================== 应用配置 ====================
|
||||
APP_VERSION = VERSION
|
||||
APP_DIR = "/maixapp/apps/t11"
|
||||
LOCAL_FILENAME = "/maixapp/apps/t11/main_tmp.py"
|
||||
|
||||
# ==================== 服务器配置 ====================
|
||||
SERVER_IP = "www.shelingxingqiu.com"
|
||||
SERVER_PORT = 50005
|
||||
HEARTBEAT_INTERVAL = 15 # 心跳间隔(秒)
|
||||
|
||||
# ==================== HTTP配置 ====================
|
||||
HTTP_URL = "http://ws.shelingxingqiu.com"
|
||||
HTTP_API_PATH = "/home/shoot/device_fire/arrow/fire"
|
||||
|
||||
# ==================== 文件路径配置 ====================
|
||||
CONFIG_FILE = "/root/laser_config.json"
|
||||
LOG_FILE = "/maixapp/apps/t11/app.log"
|
||||
BACKUP_BASE = "/maixapp/apps/t11/backups"
|
||||
|
||||
# ==================== 硬件配置 ====================
|
||||
# UART配置
|
||||
UART4G_DEVICE = "/dev/ttyS2"
|
||||
UART4G_BAUDRATE = 115200
|
||||
DISTANCE_SERIAL_DEVICE = "/dev/ttyS1"
|
||||
DISTANCE_SERIAL_BAUDRATE = 9600
|
||||
|
||||
# I2C配置
|
||||
I2C_BUS_NUM = 1
|
||||
INA226_ADDR = 0x40
|
||||
REG_CONFIGURATION = 0x00
|
||||
REG_BUS_VOLTAGE = 0x02
|
||||
REG_CURRENT = 0x04 # 电流寄存器
|
||||
REG_CALIBRATION = 0x05
|
||||
CALIBRATION_VALUE = 0x1400
|
||||
|
||||
# ADC配置
|
||||
ADC_CHANNEL = 0
|
||||
ADC_TRIGGER_THRESHOLD = 3000
|
||||
ADC_LASER_THRESHOLD = 3000
|
||||
|
||||
# ==================== 激光配置 ====================
|
||||
MODULE_ADDR = 0x00
|
||||
LASER_ON_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC1])
|
||||
LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0])
|
||||
DEFAULT_LASER_POINT = (640, 480) # 默认激光中心点
|
||||
|
||||
# ==================== 视觉检测配置 ====================
|
||||
FOCAL_LENGTH_PIX = 2250.0 # 焦距(像素)
|
||||
REAL_RADIUS_CM = 20 # 靶心实际半径(厘米)
|
||||
|
||||
# ==================== 显示配置 ====================
|
||||
LASER_COLOR = (255, 100, 0) # RGB颜色
|
||||
LASER_THICKNESS = 1
|
||||
LASER_LENGTH = 2
|
||||
|
||||
# ==================== 图像保存配置 ====================
|
||||
SAVE_IMAGE_ENABLED = True # 是否保存图像(True=保存,False=不保存)
|
||||
PHOTO_DIR = "/root/phot" # 照片存储目录
|
||||
|
||||
# ==================== OTA配置 ====================
|
||||
MAX_BACKUPS = 5
|
||||
LOG_MAX_BYTES = 10 * 1024 * 1024 # 10MB
|
||||
LOG_BACKUP_COUNT = 5
|
||||
|
||||
# ==================== 引脚映射配置 ====================
|
||||
PIN_MAPPINGS = {
|
||||
"A18": "UART1_RX",
|
||||
"A19": "UART1_TX",
|
||||
"A29": "UART2_RX",
|
||||
"A28": "UART2_TX",
|
||||
"P18": "I2C1_SCL",
|
||||
"P21": "I2C1_SDA",
|
||||
}
|
||||
119
hardware.py
Normal file
119
hardware.py
Normal file
@@ -0,0 +1,119 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
硬件管理器模块
|
||||
提供硬件对象的统一管理和访问
|
||||
"""
|
||||
import threading
|
||||
import config
|
||||
from at_client import ATClient
|
||||
|
||||
|
||||
class HardwareManager:
|
||||
"""硬件管理器(单例)"""
|
||||
_instance = None
|
||||
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
cls._instance = super(HardwareManager, cls).__new__(cls)
|
||||
cls._instance._initialized = False
|
||||
return cls._instance
|
||||
|
||||
def __init__(self):
|
||||
if self._initialized:
|
||||
return
|
||||
|
||||
# 私有硬件对象
|
||||
self._uart4g = None # 4G模块UART
|
||||
self._distance_serial = None # 激光测距串口
|
||||
self._bus = None # I2C总线
|
||||
self._adc_obj = None # ADC对象
|
||||
self._at_client = None # AT客户端
|
||||
|
||||
self._initialized = True
|
||||
|
||||
# ==================== 硬件访问(只读属性)====================
|
||||
|
||||
@property
|
||||
def uart4g(self):
|
||||
"""4G模块UART(只读)"""
|
||||
return self._uart4g
|
||||
|
||||
@property
|
||||
def distance_serial(self):
|
||||
"""激光测距串口(只读)"""
|
||||
return self._distance_serial
|
||||
|
||||
@property
|
||||
def bus(self):
|
||||
"""I2C总线(只读)"""
|
||||
return self._bus
|
||||
|
||||
@property
|
||||
def adc_obj(self):
|
||||
"""ADC对象(只读)"""
|
||||
return self._adc_obj
|
||||
|
||||
@property
|
||||
def at_client(self):
|
||||
"""AT客户端(只读)"""
|
||||
return self._at_client
|
||||
|
||||
# ==================== 初始化方法 ====================
|
||||
|
||||
def init_uart4g(self, device=None, baudrate=None):
|
||||
"""初始化4G模块UART"""
|
||||
from maix import uart
|
||||
if device is None:
|
||||
device = config.UART4G_DEVICE
|
||||
if baudrate is None:
|
||||
baudrate = config.UART4G_BAUDRATE
|
||||
self._uart4g = uart.UART(device, baudrate)
|
||||
return self._uart4g
|
||||
|
||||
def init_distance_serial(self, device=None, baudrate=None):
|
||||
"""初始化激光测距串口(激光控制)"""
|
||||
from maix import uart
|
||||
if device is None:
|
||||
device = config.DISTANCE_SERIAL_DEVICE
|
||||
if baudrate is None:
|
||||
baudrate = config.DISTANCE_SERIAL_BAUDRATE
|
||||
|
||||
print(f"[HW] 初始化激光串口: device={device}, baudrate={baudrate}")
|
||||
self._distance_serial = uart.UART(device, baudrate)
|
||||
print(f"[HW] 激光串口初始化完成: {self._distance_serial}")
|
||||
return self._distance_serial
|
||||
|
||||
def init_bus(self, bus_num=None):
|
||||
"""初始化I2C总线"""
|
||||
from maix import i2c
|
||||
if bus_num is None:
|
||||
bus_num = config.I2C_BUS_NUM
|
||||
self._bus = i2c.I2C(bus_num, i2c.Mode.MASTER)
|
||||
return self._bus
|
||||
|
||||
def init_adc(self, channel=None, res_bit=None):
|
||||
"""初始化ADC"""
|
||||
from maix.peripheral import adc
|
||||
if channel is None:
|
||||
channel = config.ADC_CHANNEL
|
||||
if res_bit is None:
|
||||
res_bit = adc.RES_BIT_12
|
||||
self._adc_obj = adc.ADC(channel, res_bit)
|
||||
return self._adc_obj
|
||||
|
||||
def init_at_client(self, uart_obj=None):
|
||||
"""初始化AT客户端"""
|
||||
if uart_obj is None:
|
||||
if self._uart4g is None:
|
||||
raise ValueError("uart4g must be initialized before at_client")
|
||||
uart_obj = self._uart4g
|
||||
self._at_client = ATClient(uart_obj)
|
||||
self._at_client.start()
|
||||
return self._at_client
|
||||
|
||||
|
||||
# 创建全局单例实例
|
||||
hardware_manager = HardwareManager()
|
||||
|
||||
|
||||
260
laser_manager.py
Normal file
260
laser_manager.py
Normal file
@@ -0,0 +1,260 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
激光管理器模块
|
||||
提供激光控制、校准等功能
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
from maix import time, camera
|
||||
import threading
|
||||
import config
|
||||
from logger_manager import logger_manager
|
||||
|
||||
|
||||
class LaserManager:
|
||||
"""激光控制管理器(单例)"""
|
||||
_instance = None
|
||||
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
cls._instance = super(LaserManager, cls).__new__(cls)
|
||||
cls._instance._initialized = False
|
||||
return cls._instance
|
||||
|
||||
def __init__(self):
|
||||
if self._initialized:
|
||||
return
|
||||
|
||||
# 私有状态
|
||||
self._calibration_active = False
|
||||
self._calibration_result = None
|
||||
self._calibration_lock = threading.Lock()
|
||||
self._laser_point = None
|
||||
|
||||
self._initialized = True
|
||||
|
||||
# ==================== 状态访问(只读属性)====================
|
||||
|
||||
@property
|
||||
def calibration_active(self):
|
||||
"""是否正在校准"""
|
||||
return self._calibration_active
|
||||
|
||||
@property
|
||||
def laser_point(self):
|
||||
"""当前激光点"""
|
||||
return self._laser_point
|
||||
|
||||
# ==================== 业务方法 ====================
|
||||
|
||||
def load_laser_point(self):
|
||||
"""从配置文件加载激光中心点,失败则使用默认值"""
|
||||
try:
|
||||
if "laser_config.json" in os.listdir("/root"):
|
||||
with open(config.CONFIG_FILE, "r") as f:
|
||||
data = json.load(f)
|
||||
if isinstance(data, list) and len(data) == 2:
|
||||
self._laser_point = (int(data[0]), int(data[1]))
|
||||
logger = logger_manager.logger
|
||||
if logger:
|
||||
logger.debug(f"[INFO] 加载激光点: {self._laser_point}")
|
||||
return self._laser_point
|
||||
else:
|
||||
raise ValueError
|
||||
else:
|
||||
self._laser_point = config.DEFAULT_LASER_POINT
|
||||
except:
|
||||
self._laser_point = config.DEFAULT_LASER_POINT
|
||||
|
||||
return self._laser_point
|
||||
|
||||
def save_laser_point(self, point):
|
||||
"""保存激光中心点到配置文件"""
|
||||
try:
|
||||
with open(config.CONFIG_FILE, "w") as f:
|
||||
json.dump([point[0], point[1]], f)
|
||||
self._laser_point = point
|
||||
return True
|
||||
except Exception as e:
|
||||
logger = logger_manager.logger
|
||||
if logger:
|
||||
logger.error(f"[LASER] 保存激光点失败: {e}")
|
||||
return False
|
||||
|
||||
def turn_on_laser(self):
|
||||
"""发送指令开启激光,并读取回包(部分模块支持)"""
|
||||
from hardware import hardware_manager
|
||||
logger = logger_manager.logger
|
||||
|
||||
if hardware_manager.distance_serial is None:
|
||||
if logger:
|
||||
logger.error("[LASER] distance_serial 未初始化")
|
||||
return None
|
||||
|
||||
# 打印调试信息
|
||||
if logger:
|
||||
logger.info(f"[LASER] 发送开启命令: {config.LASER_ON_CMD.hex()}")
|
||||
|
||||
# 清空接收缓冲区
|
||||
try:
|
||||
hardware_manager.distance_serial.read(-1) # 清空缓冲区
|
||||
except:
|
||||
pass
|
||||
|
||||
# 发送命令
|
||||
written = hardware_manager.distance_serial.write(config.LASER_ON_CMD)
|
||||
if logger:
|
||||
logger.info(f"[LASER] 写入字节数: {written}")
|
||||
|
||||
time.sleep_ms(50) # 增加等待时间,让模块有时间响应
|
||||
|
||||
# 读取回包
|
||||
resp = hardware_manager.distance_serial.read(20)
|
||||
if resp:
|
||||
if logger:
|
||||
logger.info(f"[LASER] 收到回包 ({len(resp)}字节): {resp.hex()}")
|
||||
if resp == config.LASER_ON_CMD:
|
||||
if logger:
|
||||
logger.info("✅ 激光开启指令已确认")
|
||||
else:
|
||||
if logger:
|
||||
logger.warning("🔇 无回包(可能正常或模块不支持回包)")
|
||||
return resp
|
||||
|
||||
def turn_off_laser(self):
|
||||
"""发送指令关闭激光"""
|
||||
from hardware import hardware_manager
|
||||
logger = logger_manager.logger
|
||||
|
||||
if hardware_manager.distance_serial is None:
|
||||
if logger:
|
||||
logger.error("[LASER] distance_serial 未初始化")
|
||||
return None
|
||||
|
||||
# 打印调试信息
|
||||
if logger:
|
||||
logger.info(f"[LASER] 发送关闭命令: {config.LASER_OFF_CMD.hex()}")
|
||||
|
||||
# 清空接收缓冲区
|
||||
try:
|
||||
hardware_manager.distance_serial.read(-1)
|
||||
except:
|
||||
pass
|
||||
|
||||
# 发送命令
|
||||
written = hardware_manager.distance_serial.write(config.LASER_OFF_CMD)
|
||||
if logger:
|
||||
logger.info(f"[LASER] 写入字节数: {written}")
|
||||
|
||||
time.sleep_ms(50) # 增加等待时间
|
||||
|
||||
# 读取回包
|
||||
resp = hardware_manager.distance_serial.read(20)
|
||||
if resp:
|
||||
if logger:
|
||||
logger.info(f"[LASER] 收到回包 ({len(resp)}字节): {resp.hex()}")
|
||||
else:
|
||||
if logger:
|
||||
logger.warning("🔇 无回包")
|
||||
return resp
|
||||
|
||||
def flash_laser(self, duration_ms=1000):
|
||||
"""闪一下激光(用于射箭反馈)"""
|
||||
try:
|
||||
self.turn_on_laser()
|
||||
time.sleep_ms(duration_ms)
|
||||
self.turn_off_laser()
|
||||
except Exception as e:
|
||||
logger = logger_manager.logger
|
||||
if logger:
|
||||
logger.error(f"闪激光失败: {e}")
|
||||
|
||||
def find_red_laser(self, frame, threshold=150):
|
||||
"""在图像中查找最亮的红色激光点(基于 RGB 阈值)"""
|
||||
w, h = frame.width(), frame.height()
|
||||
img_bytes = frame.to_bytes()
|
||||
max_sum = 0
|
||||
best_pos = None
|
||||
for y in range(0, h, 2):
|
||||
for x in range(0, w, 2):
|
||||
idx = (y * w + x) * 3
|
||||
r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2]
|
||||
if r > threshold and r > g * 2 and r > b * 2:
|
||||
rgb_sum = r + g + b
|
||||
if rgb_sum > max_sum:
|
||||
max_sum = rgb_sum
|
||||
best_pos = (x, y)
|
||||
return best_pos
|
||||
|
||||
def calibrate_laser_position(self):
|
||||
"""执行一次激光校准:拍照 → 找红点 → 保存坐标"""
|
||||
time.sleep_ms(80)
|
||||
cam = camera.Camera(640, 480)
|
||||
frame = cam.read()
|
||||
pos = self.find_red_laser(frame)
|
||||
if pos:
|
||||
self.save_laser_point(pos)
|
||||
return pos
|
||||
return None
|
||||
|
||||
def start_calibration(self):
|
||||
"""开始校准(公共方法)"""
|
||||
with self._calibration_lock:
|
||||
if self._calibration_active:
|
||||
return False
|
||||
self._calibration_active = True
|
||||
self._calibration_result = None
|
||||
return True
|
||||
|
||||
def stop_calibration(self):
|
||||
"""停止校准(公共方法)"""
|
||||
with self._calibration_lock:
|
||||
self._calibration_active = False
|
||||
|
||||
def set_calibration_result(self, result):
|
||||
"""设置校准结果(内部方法)"""
|
||||
with self._calibration_lock:
|
||||
self._calibration_result = result
|
||||
|
||||
def get_calibration_result(self):
|
||||
"""获取并清除校准结果(内部方法)"""
|
||||
with self._calibration_lock:
|
||||
result = self._calibration_result
|
||||
self._calibration_result = None
|
||||
return result
|
||||
|
||||
|
||||
# 创建全局单例实例
|
||||
laser_manager = LaserManager()
|
||||
|
||||
# ==================== 向后兼容的函数接口 ====================
|
||||
|
||||
def load_laser_point():
|
||||
"""加载激光点(向后兼容接口)"""
|
||||
return laser_manager.load_laser_point()
|
||||
|
||||
def save_laser_point(point):
|
||||
"""保存激光点(向后兼容接口)"""
|
||||
return laser_manager.save_laser_point(point)
|
||||
|
||||
def turn_on_laser():
|
||||
"""开启激光(向后兼容接口)"""
|
||||
return laser_manager.turn_on_laser()
|
||||
|
||||
def turn_off_laser():
|
||||
"""关闭激光(向后兼容接口)"""
|
||||
return laser_manager.turn_off_laser()
|
||||
|
||||
def flash_laser(duration_ms=1000):
|
||||
"""闪激光(向后兼容接口)"""
|
||||
return laser_manager.flash_laser(duration_ms)
|
||||
|
||||
def find_red_laser(frame, threshold=150):
|
||||
"""查找红色激光点(向后兼容接口)"""
|
||||
return laser_manager.find_red_laser(frame, threshold)
|
||||
|
||||
def calibrate_laser_position():
|
||||
"""校准激光位置(向后兼容接口)"""
|
||||
return laser_manager.calibrate_laser_position()
|
||||
|
||||
467
main.py
467
main.py
@@ -1460,472 +1460,6 @@ def laser_calibration_worker():
|
||||
else:
|
||||
time.sleep_ms(50)
|
||||
|
||||
|
||||
|
||||
|
||||
def download_file_via_4g_legacy(url, filename,
|
||||
total_timeout_ms=30000,
|
||||
retries=3,
|
||||
debug=False):
|
||||
"""
|
||||
ML307R HTTP 下载(URC content 分片模式)
|
||||
- 重试:empty/incomplete/AT错误都会重试
|
||||
- 超时:total_timeout_ms
|
||||
- 校验:Content-Length 必须填满;如有 Content-Md5 且 hashlib 可用则校验 MD5
|
||||
- 日志:默认干净;debug=True 才打印 URC 进度
|
||||
"""
|
||||
from urllib.parse import urlparse
|
||||
parsed = urlparse(url)
|
||||
host = parsed.hostname
|
||||
path = parsed.path or "/"
|
||||
base_url = f"http://{host}" # 你已验证 HTTP 可 200;如需 https 需另配 SSL
|
||||
|
||||
def _log(*args):
|
||||
if debug:
|
||||
print(*args)
|
||||
|
||||
def _merge_ranges(ranges_iter):
|
||||
"""合并重叠/相邻区间,返回 merged(list[(s,e)])(半开区间)"""
|
||||
rs = sorted(ranges_iter)
|
||||
merged = []
|
||||
for s, e in rs:
|
||||
if e <= s:
|
||||
continue
|
||||
if merged and s <= merged[-1][1]:
|
||||
merged[-1] = (merged[-1][0], max(merged[-1][1], e))
|
||||
else:
|
||||
merged.append((s, e))
|
||||
return merged
|
||||
|
||||
def _compute_gaps(total_len, got_ranges):
|
||||
"""根据已填充区间计算缺口(半开区间)"""
|
||||
if not total_len or total_len <= 0:
|
||||
return [(0, 0)]
|
||||
merged = _merge_ranges(got_ranges)
|
||||
gaps = []
|
||||
prev = 0
|
||||
for s, e in merged:
|
||||
if s > prev:
|
||||
gaps.append((prev, s))
|
||||
prev = max(prev, e)
|
||||
if prev < total_len:
|
||||
gaps.append((prev, total_len))
|
||||
return gaps, merged
|
||||
|
||||
def _extract_content_range(hdr_text: str):
|
||||
"""
|
||||
Content-Range: bytes <start>-<end>/<total>
|
||||
返回 (start, end, total);解析失败返回 (None,None,None)
|
||||
"""
|
||||
m = re.search(r"Content-Range:\s*bytes\s*(\d+)\s*-\s*(\d+)\s*/\s*(\d+)", hdr_text, re.IGNORECASE)
|
||||
if not m:
|
||||
return None, None, None
|
||||
try:
|
||||
return int(m.group(1)), int(m.group(2)), int(m.group(3))
|
||||
except:
|
||||
return None, None, None
|
||||
|
||||
def _get_ip():
|
||||
r = at("AT+CGPADDR=1", "OK", 3000)
|
||||
m = re.search(r'\+CGPADDR:\s*1,"([^"]+)"', r)
|
||||
return m.group(1) if m else ""
|
||||
|
||||
def _clear_http_events():
|
||||
# 清空旧的 HTTP URC 事件,避免串台
|
||||
while at_client.pop_http_event() is not None:
|
||||
pass
|
||||
|
||||
# 旧版基于直接 uart4g.read 的解析已迁移到 ATClient(单读者),保留函数占位避免大改动
|
||||
|
||||
def _parse_httpid(raw: bytes):
|
||||
m = re.search(rb"\+MHTTPCREATE:\s*(\d+)", raw)
|
||||
return int(m.group(1)) if m else None
|
||||
|
||||
# _try_parse_header/_try_parse_one_content 已由 ATClient 在 reader 线程中解析并推送事件
|
||||
|
||||
# _try_parse_one_content 已由 ATClient 解析
|
||||
|
||||
def _extract_hdr_fields(hdr_text: str):
|
||||
# Content-Length
|
||||
mlen = re.search(r"Content-Length:\s*(\d+)", hdr_text, re.IGNORECASE)
|
||||
clen = int(mlen.group(1)) if mlen else None
|
||||
|
||||
# Content-Md5 (base64)
|
||||
mmd5 = re.search(r"Content-Md5:\s*([A-Za-z0-9+/=]+)", hdr_text, re.IGNORECASE)
|
||||
md5_b64 = mmd5.group(1).strip() if mmd5 else None
|
||||
|
||||
return clen, md5_b64
|
||||
|
||||
def _md5_base64(data: bytes) -> str:
|
||||
if hashlib is None:
|
||||
return ""
|
||||
digest = hashlib.md5(data).digest()
|
||||
# base64: 24 chars with ==
|
||||
return binascii.b2a_base64(digest).decode().strip()
|
||||
|
||||
def _one_attempt(range_start=None, range_end=None,
|
||||
body_buf=None, got_ranges=None,
|
||||
total_len=None,
|
||||
expect_md5_b64=None):
|
||||
# 0) PDP:确保有 IP(避免把 OK 当成功)
|
||||
ip = _get_ip()
|
||||
if not ip or ip == "0.0.0.0":
|
||||
at("AT+MIPCALL=1,1", "OK", 15000)
|
||||
for _ in range(10):
|
||||
ip = _get_ip()
|
||||
if ip and ip != "0.0.0.0":
|
||||
break
|
||||
time.sleep(1)
|
||||
if not ip or ip == "0.0.0.0":
|
||||
return False, "PDP not ready (no_ip)", body_buf, got_ranges, total_len, expect_md5_b64
|
||||
|
||||
# 1) 清理旧实例 + 清空旧 HTTP 事件
|
||||
for i in range(0, 6):
|
||||
at(f"AT+MHTTPDEL={i}", "OK", 1500)
|
||||
_clear_http_events()
|
||||
|
||||
# 2) 创建实例(用 at() 等待返回)
|
||||
create_resp = at(f'AT+MHTTPCREATE="{base_url}"', "OK", 8000)
|
||||
httpid = _parse_httpid(create_resp.encode())
|
||||
if httpid is None:
|
||||
return False, "MHTTPCREATE failed (no httpid)", body_buf, got_ranges, total_len, expect_md5_b64
|
||||
|
||||
# 2.5) Range 补洞:按缺口请求指定字节段(HTTP Range 右端是 inclusive)
|
||||
if range_start is not None and range_end is not None:
|
||||
# 每次请求使用新 httpid,避免 header 累积/污染
|
||||
at(f'AT+MHTTPCFG="header",{httpid},"Range: bytes={int(range_start)}-{int(range_end)}"', "OK", 3000)
|
||||
|
||||
# 3) 发 GET(HTTP URC 由 ATClient 解析并入队)
|
||||
req_resp = at(f'AT+MHTTPREQUEST={httpid},1,0,"{path}"', "OK", 15000)
|
||||
if "ERROR" in req_resp or "CME ERROR" in req_resp:
|
||||
at(f"AT+MHTTPDEL={httpid}", "OK", 3000)
|
||||
return False, f"MHTTPREQUEST failed: {req_resp}", body_buf, got_ranges, total_len, expect_md5_b64
|
||||
|
||||
# 4) 从 ATClient 的 http_events 队列收 header/content
|
||||
urc_id = None
|
||||
status_code = None
|
||||
expect_len = None
|
||||
# 若是 Range 响应(206),需要把响应内的偏移映射到“全文件”偏移
|
||||
offset_base = 0
|
||||
# got_ranges 记录“真实写入 body_buf 的半开区间”
|
||||
if got_ranges is None:
|
||||
got_ranges = set()
|
||||
filled_new_bytes = 0
|
||||
# last_sum/resp_total 用于判断“本次 HTTP 响应体”是否接收完成(尤其是 Range 场景)
|
||||
last_sum = 0
|
||||
resp_total = None
|
||||
no_progress_count = 0 # 连续没有进展的次数
|
||||
last_print_ms = time.ticks_ms()
|
||||
last_print_sum = 0
|
||||
|
||||
# Range 补洞不需要等太久,避免卡死;全量下载用总超时
|
||||
attempt_timeout_ms = total_timeout_ms
|
||||
if range_start is not None and range_end is not None:
|
||||
attempt_timeout_ms = min(total_timeout_ms, 8000)
|
||||
|
||||
t0 = time.ticks_ms()
|
||||
while time.ticks_ms() - t0 < attempt_timeout_ms:
|
||||
ev = at_client.pop_http_event()
|
||||
if not ev:
|
||||
# 如果 sum 已经达到 total_len,但仍有 gaps,等待更长时间(有些分片可能延迟到达)
|
||||
# 对 Range:last_sum 只会到 resp_total(比如 686/774),不能拿 total_len(59776) 比
|
||||
if resp_total and last_sum >= resp_total:
|
||||
# 本次响应体应该收齐了,继续等一小会儿(防止最后一个 URC 延迟),然后退出循环
|
||||
time.sleep_ms(30)
|
||||
no_progress_count += 1
|
||||
if no_progress_count > 30:
|
||||
break
|
||||
continue
|
||||
|
||||
# 全量模式:如果模块宣称 sum 已经达到 total_len,但仍有 gaps,稍微多等
|
||||
if (range_start is None and range_end is None) and total_len and last_sum >= total_len:
|
||||
gaps_now, merged_now = _compute_gaps(total_len, got_ranges)
|
||||
if gaps_now and not (len(gaps_now) == 1 and gaps_now[0] == (0, 0)):
|
||||
time.sleep_ms(50)
|
||||
else:
|
||||
time.sleep_ms(5)
|
||||
else:
|
||||
time.sleep_ms(5)
|
||||
no_progress_count += 1
|
||||
# Range:如果长时间没有事件也结束(让上层重试)
|
||||
if range_start is not None and range_end is not None and no_progress_count > 200:
|
||||
break
|
||||
# 全量:如果长时间没有新事件,且 sum 已经达到 total_len,认为接收完成(可能有丢包)
|
||||
if no_progress_count > 100 and total_len and last_sum >= total_len:
|
||||
break
|
||||
continue
|
||||
|
||||
no_progress_count = 0 # 有事件,重置计数器
|
||||
|
||||
if ev[0] == "header":
|
||||
_, hid, code, hdr_text = ev
|
||||
if urc_id is None:
|
||||
urc_id = hid
|
||||
if hid != urc_id:
|
||||
continue
|
||||
status_code = code
|
||||
expect_len, md5_b64 = _extract_hdr_fields(hdr_text)
|
||||
# 只在“首次全量 header”里保留 Content-Md5;Range 响应通常不带该字段
|
||||
if md5_b64:
|
||||
expect_md5_b64 = md5_b64
|
||||
|
||||
cr_s, cr_e, cr_total = _extract_content_range(hdr_text)
|
||||
if cr_s is not None and cr_total is not None:
|
||||
# 206 Partial Content
|
||||
offset_base = cr_s
|
||||
# Content-Range end 是 inclusive;总长度以 total 为准
|
||||
if total_len is None:
|
||||
total_len = cr_total
|
||||
elif total_len != cr_total:
|
||||
_log(f"[WARN] total_len changed {total_len}->{cr_total}")
|
||||
total_len = cr_total
|
||||
if body_buf is None and total_len:
|
||||
body_buf = bytearray(total_len)
|
||||
|
||||
# 对 Range 响应:优先使用 Content-Length 作为本次响应体长度
|
||||
if expect_len is not None:
|
||||
resp_total = expect_len
|
||||
|
||||
_log(f"[HDR] id={hid} code={code} len={expect_len} md5={expect_md5_b64}")
|
||||
continue
|
||||
|
||||
if ev[0] == "content":
|
||||
_, cid, _total, _sum, _cur, payload = ev
|
||||
if urc_id is None:
|
||||
urc_id = cid
|
||||
if cid != urc_id:
|
||||
continue
|
||||
|
||||
# 全量 200:这里的 _total 就是全文件长度;Range 206:_total 可能只是“本次响应体长度”
|
||||
if body_buf is None:
|
||||
# 如果 header 没解析出 Content-Range,总长度用 content 的 _total
|
||||
if total_len is None:
|
||||
total_len = _total
|
||||
if total_len:
|
||||
body_buf = bytearray(total_len)
|
||||
if body_buf is None or total_len is None:
|
||||
continue
|
||||
|
||||
# 若 header 没给 Content-Length,就用 content 的 _total 作为本次响应体长度(Range 场景下通常是这次 body 的长度)
|
||||
if resp_total is None:
|
||||
resp_total = _total
|
||||
|
||||
rel_start = _sum - _cur
|
||||
rel_end = _sum
|
||||
abs_start = offset_base + rel_start
|
||||
abs_end = offset_base + rel_end
|
||||
if abs_start < 0 or abs_start >= total_len:
|
||||
continue
|
||||
if abs_end < abs_start:
|
||||
continue
|
||||
if abs_end > total_len:
|
||||
abs_end = total_len
|
||||
|
||||
expected_span = abs_end - abs_start
|
||||
actual_len = min(len(payload), expected_span)
|
||||
if actual_len <= 0:
|
||||
continue
|
||||
|
||||
# 写入并记录“实际写入区间”,用于 gap 计算
|
||||
body_buf[abs_start:abs_start + actual_len] = payload[:actual_len]
|
||||
got_ranges.add((abs_start, abs_start + actual_len))
|
||||
filled_new_bytes += actual_len
|
||||
|
||||
# 记录最大的 sum 值,用于判断是否所有数据都已发送
|
||||
if _sum > last_sum:
|
||||
last_sum = _sum
|
||||
|
||||
# debug 输出节流:每 ~8000 字节或 >=500ms 输出一次,避免 print 导致 UART 丢包
|
||||
if debug:
|
||||
now = time.ticks_ms()
|
||||
if (time.ticks_diff(now, last_print_ms) >= 500) or (_sum - last_print_sum >= 8000) or (rel_end == _total):
|
||||
_log(f"[URC] {abs_start}:{abs_start+actual_len} sum={_sum}/{_total} base={offset_base} +{filled_new_bytes}")
|
||||
last_print_ms = now
|
||||
last_print_sum = _sum
|
||||
|
||||
# 若是全量请求(offset_base=0 且 total_len==_total),尽早结束
|
||||
if offset_base == 0 and total_len == _total:
|
||||
# 不要用 filled_new_bytes 判断是否完整(可能有重叠)
|
||||
pass
|
||||
|
||||
# Range:本次响应体已收齐,退出,交给上层判断是否补上了缺口
|
||||
if resp_total is not None and last_sum >= resp_total:
|
||||
# 给一点时间让可能的尾部事件入队,然后退出
|
||||
time.sleep_ms(10)
|
||||
break
|
||||
|
||||
# 5) 清理实例
|
||||
at(f"AT+MHTTPDEL={httpid}", "OK", 3000)
|
||||
|
||||
if body_buf is None:
|
||||
return False, "empty_body", body_buf, got_ranges, total_len, expect_md5_b64
|
||||
if total_len is None:
|
||||
return False, "no_total_len", body_buf, got_ranges, total_len, expect_md5_b64
|
||||
|
||||
# 返回“本次尝试是否有实质进展”:Range 补洞时,哪怕不完整也算成功推进
|
||||
if filled_new_bytes <= 0:
|
||||
return False, "no_progress", body_buf, got_ranges, total_len, expect_md5_b64
|
||||
return True, f"PARTIAL ok +{filled_new_bytes} ip={ip} code={status_code}", body_buf, got_ranges, total_len, expect_md5_b64
|
||||
|
||||
global ota_in_progress
|
||||
try:
|
||||
ota_in_progress = int(ota_in_progress) + 1
|
||||
except:
|
||||
ota_in_progress = 1
|
||||
with uart4g_lock:
|
||||
try:
|
||||
# -------- Phase 1: 全量 GET(允许不完整,后面用 Range 补洞)--------
|
||||
body_buf = None
|
||||
got_ranges = set()
|
||||
total_len = None
|
||||
expect_md5_b64 = None
|
||||
|
||||
last_err = "unknown"
|
||||
for attempt in range(1, retries + 1):
|
||||
ok, msg, body_buf, got_ranges, total_len, expect_md5_b64 = _one_attempt(
|
||||
body_buf=body_buf, got_ranges=got_ranges, total_len=total_len, expect_md5_b64=expect_md5_b64
|
||||
)
|
||||
last_err = msg
|
||||
if not ok:
|
||||
_log(f"[RETRY] full attempt={attempt} failed={msg}")
|
||||
time.sleep_ms(200)
|
||||
continue
|
||||
|
||||
gaps, merged = _compute_gaps(total_len, got_ranges)
|
||||
filled_total = sum(e - s for s, e in merged)
|
||||
if gaps and gaps[0] == (0, 0):
|
||||
gaps = []
|
||||
if not gaps:
|
||||
break
|
||||
_log(f"[GAPS] after full attempt={attempt} filled={filled_total}/{total_len} gaps={gaps[:3]}")
|
||||
time.sleep_ms(150)
|
||||
|
||||
if body_buf is None or total_len is None:
|
||||
return False, f"FAILED: {last_err}"
|
||||
|
||||
gaps, merged = _compute_gaps(total_len, got_ranges)
|
||||
if gaps and gaps[0] == (0, 0):
|
||||
gaps = []
|
||||
|
||||
# -------- Phase 2: Range 补洞 --------
|
||||
# 说明:
|
||||
# - “全量 GET 多次重试 + 合并已收到分片”我们已经在 Phase1 做了(got_ranges/body_buf 会跨 attempt 累积)。
|
||||
# - 仍存在 gaps 说明:这些字节段在全量阶段始终没收到,需要靠 Range 反复补洞。
|
||||
#
|
||||
# 策略:
|
||||
# - Range 分块更小(更稳),失败时继续“二分缩小”到 MIN_RANGE_BYTES;
|
||||
# - 不要因为某一轮 no_progress 就立刻退出(UART 偶发丢 URC,需要多轮撞上一次成功)。
|
||||
MAX_RANGE_BYTES = 1024
|
||||
MIN_RANGE_BYTES = 128
|
||||
RANGE_RETRIES_EACH = 8
|
||||
MAX_HOLE_ROUNDS = 50
|
||||
NO_PROGRESS_ROUNDS_LIMIT = 8
|
||||
|
||||
round_i = 0
|
||||
no_progress_rounds = 0
|
||||
while gaps and round_i < MAX_HOLE_ROUNDS:
|
||||
round_i += 1
|
||||
# 优先补最大的洞(通常只丢中间一两段)
|
||||
gaps = sorted(gaps, key=lambda g: g[1] - g[0], reverse=True)
|
||||
_log(f"[RANGE] round={round_i} gaps={gaps[:3]}")
|
||||
|
||||
progress_any = False
|
||||
# 每轮最多补前 5 个洞,避免无限循环
|
||||
for (gs, ge) in gaps[:5]:
|
||||
cur = gs
|
||||
chunk = MAX_RANGE_BYTES
|
||||
while cur < ge:
|
||||
sub_end = min(ge, cur + chunk)
|
||||
# HTTP Range end is inclusive
|
||||
rs = cur
|
||||
re_incl = sub_end - 1
|
||||
|
||||
before_gaps, before_merged = _compute_gaps(total_len, got_ranges)
|
||||
before_filled = sum(e - s for s, e in before_merged)
|
||||
|
||||
sub_ok = False
|
||||
sub_err = "unknown"
|
||||
for k in range(1, RANGE_RETRIES_EACH + 1):
|
||||
ok2, msg2, body_buf, got_ranges, total_len, expect_md5_b64 = _one_attempt(
|
||||
range_start=rs, range_end=re_incl,
|
||||
body_buf=body_buf, got_ranges=got_ranges, total_len=total_len, expect_md5_b64=expect_md5_b64
|
||||
)
|
||||
sub_err = msg2
|
||||
if ok2:
|
||||
sub_ok = True
|
||||
break
|
||||
_log(f"[RETRY] range {rs}-{re_incl} try={k} failed={msg2}")
|
||||
time.sleep_ms(150)
|
||||
|
||||
after_gaps, after_merged = _compute_gaps(total_len, got_ranges)
|
||||
after_filled = sum(e - s for s, e in after_merged)
|
||||
if after_filled > before_filled:
|
||||
progress_any = True
|
||||
# 成功推进:恢复到较大 chunk,加快补洞
|
||||
chunk = MAX_RANGE_BYTES
|
||||
cur = sub_end
|
||||
else:
|
||||
# 没推进:缩小 chunk,继续在同一位置重试;不要前进 cur
|
||||
if chunk > MIN_RANGE_BYTES:
|
||||
chunk = max(MIN_RANGE_BYTES, chunk // 2)
|
||||
_log(f"[RANGE] shrink chunk -> {chunk} at pos={cur}")
|
||||
else:
|
||||
# 已经很小还不行:本轮先放弃这个位置,留给下一轮再撞
|
||||
if not sub_ok:
|
||||
_log(f"[WARN] range {rs}-{re_incl} failed={sub_err}")
|
||||
break
|
||||
|
||||
# 小歇一下,给读线程喘息
|
||||
time.sleep_ms(120)
|
||||
|
||||
gaps, merged = _compute_gaps(total_len, got_ranges)
|
||||
if gaps and gaps[0] == (0, 0):
|
||||
gaps = []
|
||||
|
||||
filled_total = sum(e - s for s, e in merged)
|
||||
if not gaps:
|
||||
break
|
||||
if not progress_any:
|
||||
no_progress_rounds += 1
|
||||
_log(f"[RANGE] no progress in round={round_i} ({no_progress_rounds}/{NO_PROGRESS_ROUNDS_LIMIT}) filled={filled_total}/{total_len}")
|
||||
# 多轮无进展才退出(避免偶发“只 header 无 content URC”导致过早退出)
|
||||
if no_progress_rounds >= NO_PROGRESS_ROUNDS_LIMIT:
|
||||
break
|
||||
# 退避等待一下再继续下一轮
|
||||
time.sleep_ms(500)
|
||||
continue
|
||||
else:
|
||||
no_progress_rounds = 0
|
||||
_log(f"[RANGE] round={round_i} filled={filled_total}/{total_len} gaps={gaps[:3]}")
|
||||
|
||||
# 完整性检查
|
||||
gaps, merged = _compute_gaps(total_len, got_ranges)
|
||||
if gaps and gaps[0] == (0, 0):
|
||||
gaps = []
|
||||
filled_total = sum(e - s for s, e in merged)
|
||||
if gaps:
|
||||
return False, f"incomplete_body got={filled_total} expected={total_len} missing={total_len - filled_total} gaps={gaps[:5]}"
|
||||
|
||||
data = bytes(body_buf)
|
||||
|
||||
# 校验:Content-Md5(base64)(若有)
|
||||
if expect_md5_b64 and hashlib is not None:
|
||||
md5_b64 = _md5_base64(data)
|
||||
if md5_b64 != expect_md5_b64:
|
||||
return False, f"md5_mismatch got={md5_b64} expected={expect_md5_b64}"
|
||||
|
||||
# 写文件(原样 bytes)
|
||||
with open(filename, "wb") as f:
|
||||
f.write(data)
|
||||
|
||||
return True, f"OK size={len(data)} ip={_get_ip()} md5={expect_md5_b64 or ''}"
|
||||
finally:
|
||||
try:
|
||||
ota_in_progress = max(0, int(ota_in_progress) - 1)
|
||||
except:
|
||||
ota_in_progress = 0
|
||||
|
||||
|
||||
def download_file_via_4g(url, filename,
|
||||
total_timeout_ms=600000,
|
||||
retries=3,
|
||||
@@ -2036,6 +1570,7 @@ def download_file_via_4g(url, filename,
|
||||
|
||||
def _create_httpid(full_reset=False):
|
||||
_clear_http_events()
|
||||
at_client.flush()
|
||||
if full_reset:
|
||||
_hard_reset_http()
|
||||
resp = at(f'AT+MHTTPCREATE="{base_url}"', "OK", 8000)
|
||||
|
||||
1278
ota_manager.py
Normal file
1278
ota_manager.py
Normal file
File diff suppressed because it is too large
Load Diff
112
power.py
Normal file
112
power.py
Normal file
@@ -0,0 +1,112 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
电源管理模块(INA226)
|
||||
提供电压、电流监测和充电状态检测
|
||||
"""
|
||||
import config
|
||||
from logger_manager import logger_manager
|
||||
|
||||
|
||||
def write_register(reg, value):
|
||||
"""写入INA226寄存器"""
|
||||
from hardware import hardware_manager
|
||||
data = [(value >> 8) & 0xFF, value & 0xFF]
|
||||
hardware_manager.bus.writeto_mem(config.INA226_ADDR, reg, bytes(data))
|
||||
|
||||
|
||||
def read_register(reg):
|
||||
"""读取INA226寄存器"""
|
||||
from hardware import hardware_manager
|
||||
data = hardware_manager.bus.readfrom_mem(config.INA226_ADDR, reg, 2)
|
||||
return (data[0] << 8) | data[1]
|
||||
|
||||
|
||||
def init_ina226():
|
||||
"""初始化 INA226 芯片:配置模式 + 校准值"""
|
||||
write_register(config.REG_CONFIGURATION, 0x4527)
|
||||
write_register(config.REG_CALIBRATION, config.CALIBRATION_VALUE)
|
||||
|
||||
|
||||
def get_bus_voltage():
|
||||
"""读取总线电压(单位:V)"""
|
||||
raw = read_register(config.REG_BUS_VOLTAGE)
|
||||
return raw * 1.25 / 1000
|
||||
|
||||
|
||||
def get_current():
|
||||
"""
|
||||
读取电流(单位:mA)
|
||||
正数表示充电,负数表示放电
|
||||
|
||||
INA226 电流计算公式:
|
||||
Current = (Current Register Value) × Current_LSB
|
||||
Current_LSB = 0.001 × CALIBRATION_VALUE / 4096
|
||||
"""
|
||||
try:
|
||||
raw = read_register(config.REG_CURRENT)
|
||||
# INA226 电流寄存器是16位有符号整数
|
||||
# 最高位是符号位:0=正(充电),1=负(放电)
|
||||
# 计算 Current_LSB(根据 CALIBRATION_VALUE)
|
||||
current_lsb = 0.001 * config.CALIBRATION_VALUE / 4096 # 单位:A
|
||||
# 处理有符号数:如果最高位为1,转换为负数
|
||||
if raw & 0x8000: # 最高位为1,表示负数(放电)
|
||||
signed_raw = raw - 0x10000 # 转换为有符号整数
|
||||
else: # 最高位为0,表示正数(充电)
|
||||
signed_raw = raw
|
||||
# 转换为毫安
|
||||
current_ma = signed_raw * current_lsb * 1000
|
||||
return current_ma
|
||||
except Exception as e:
|
||||
logger = logger_manager.logger
|
||||
if logger:
|
||||
logger.error(f"[INA226] 读取电流失败: {e}")
|
||||
else:
|
||||
print(f"[INA226] 读取电流失败: {e}")
|
||||
return 0.0
|
||||
|
||||
|
||||
def is_charging(threshold_ma=10.0):
|
||||
"""
|
||||
检测是否在充电(通过电流方向判断)
|
||||
|
||||
Args:
|
||||
threshold_ma: 电流阈值(毫安),超过此值认为在充电,默认10mA
|
||||
|
||||
Returns:
|
||||
True: 正在充电
|
||||
False: 未充电或读取失败
|
||||
"""
|
||||
try:
|
||||
current = get_current()
|
||||
is_charge = current > threshold_ma
|
||||
return is_charge
|
||||
except Exception as e:
|
||||
logger = logger_manager.logger
|
||||
if logger:
|
||||
logger.error(f"[CHARGE] 检测充电状态失败: {e}")
|
||||
else:
|
||||
print(f"[CHARGE] 检测充电状态失败: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def voltage_to_percent(voltage):
|
||||
"""根据电压估算电池百分比(查表插值)"""
|
||||
points = [
|
||||
(4.20, 100), (4.10, 95), (4.05, 85), (4.00, 75), (3.95, 65),
|
||||
(3.90, 55), (3.85, 45), (3.80, 35), (3.75, 25), (3.70, 15),
|
||||
(3.65, 5), (3.60, 0)
|
||||
]
|
||||
if voltage >= points[0][0]:
|
||||
return 100
|
||||
if voltage <= points[-1][0]:
|
||||
return 0
|
||||
for i in range(len(points) - 1):
|
||||
v1, p1 = points[i]
|
||||
v2, p2 = points[i + 1]
|
||||
if voltage >= v2:
|
||||
ratio = (voltage - v1) / (v2 - v1)
|
||||
percent = p1 + (p2 - p1) * ratio
|
||||
return max(0, min(100, int(round(percent))))
|
||||
return 0
|
||||
|
||||
186
time_sync.py
Normal file
186
time_sync.py
Normal file
@@ -0,0 +1,186 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
时间同步模块
|
||||
从4G模块获取时间并同步到系统
|
||||
"""
|
||||
import re
|
||||
import os
|
||||
from datetime import datetime, timedelta
|
||||
import config
|
||||
# from logger_bak import get_logger
|
||||
from logger_manager import logger_manager
|
||||
|
||||
|
||||
def parse_4g_time(cclk_response, timezone_offset=8):
|
||||
"""
|
||||
解析 AT+CCLK? 返回的时间字符串,并转换为本地时间
|
||||
|
||||
Args:
|
||||
cclk_response: AT+CCLK? 的响应字符串
|
||||
timezone_offset: 时区偏移(小时),默认8(中国时区 UTC+8)
|
||||
|
||||
Returns:
|
||||
datetime 对象(已转换为本地时间),如果解析失败返回 None
|
||||
"""
|
||||
try:
|
||||
# 匹配格式: +CCLK: "YY/MM/DD,HH:MM:SS+TZ"
|
||||
# 时区单位是四分之一小时(quarters of an hour)
|
||||
match = re.search(r'\+CCLK:\s*"(\d{2})/(\d{2})/(\d{2}),(\d{2}):(\d{2}):(\d{2})([+-]\d{1,3})?"', cclk_response)
|
||||
if not match:
|
||||
return None
|
||||
|
||||
yy, mm, dd, hh, MM, ss, tz_str = match.groups()
|
||||
|
||||
# 年份处理:26 -> 2026
|
||||
year = 2000 + int(yy)
|
||||
month = int(mm)
|
||||
day = int(dd)
|
||||
hour = int(hh)
|
||||
minute = int(MM)
|
||||
second = int(ss)
|
||||
|
||||
# 创建 UTC 时间的 datetime 对象
|
||||
dt_utc = datetime(year, month, day, hour, minute, second)
|
||||
|
||||
# 解析时区偏移(单位:四分之一小时)
|
||||
if tz_str:
|
||||
try:
|
||||
# 时区偏移值(四分之一小时)
|
||||
tz_quarters = int(tz_str)
|
||||
|
||||
# 转换为小时(除以4)
|
||||
tz_hours = tz_quarters / 4.0
|
||||
|
||||
logger = logger_manager.logger
|
||||
if logger:
|
||||
logger.info(f"[TIME] 时区偏移: {tz_str} (四分之一小时) = {tz_hours} 小时")
|
||||
|
||||
# 转换为本地时间
|
||||
dt_local = dt_utc + timedelta(hours=tz_hours)
|
||||
except ValueError:
|
||||
# 如果时区解析失败,使用默认值
|
||||
logger = logger_manager.logger
|
||||
if logger:
|
||||
logger.warning(f"[TIME] 时区解析失败: {tz_str},使用默认 UTC+{timezone_offset}")
|
||||
dt_local = dt_utc + timedelta(hours=timezone_offset)
|
||||
else:
|
||||
# 没有时区信息,使用默认值
|
||||
logger = logger_manager.logger
|
||||
if logger:
|
||||
logger.info(f"[TIME] 未找到时区信息,使用默认 UTC+{timezone_offset}")
|
||||
dt_local = dt_utc + timedelta(hours=timezone_offset)
|
||||
|
||||
logger = logger_manager.logger
|
||||
if logger:
|
||||
logger.info(f"[TIME] UTC时间: {dt_utc.strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
logger.info(f"[TIME] 本地时间: {dt_local.strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
|
||||
return dt_local
|
||||
except Exception as e:
|
||||
logger = logger_manager.logger
|
||||
if logger:
|
||||
logger.error(f"[TIME] 解析时间失败: {e}, 响应: {cclk_response}")
|
||||
else:
|
||||
print(f"[TIME] 解析时间失败: {e}, 响应: {cclk_response}")
|
||||
return None
|
||||
|
||||
|
||||
def get_time_from_4g(timezone_offset=8):
|
||||
"""
|
||||
通过4G模块获取当前时间(已转换为本地时间)
|
||||
|
||||
Args:
|
||||
timezone_offset: 时区偏移(小时),默认8(中国时区)
|
||||
|
||||
Returns:
|
||||
datetime 对象(本地时间),如果获取失败返回 None
|
||||
"""
|
||||
try:
|
||||
# 发送 AT+CCLK? 命令(延迟导入避免循环依赖)
|
||||
from hardware import hardware_manager
|
||||
# 检查 at_client 是否已初始化
|
||||
if hardware_manager.at_client is None:
|
||||
logger = logger_manager.logger
|
||||
if logger:
|
||||
logger.warning("[TIME] ATClient 尚未初始化,无法获取4G时间")
|
||||
else:
|
||||
print("[TIME] ATClient 尚未初始化,无法获取4G时间")
|
||||
return None
|
||||
resp = hardware_manager.at_client.send("AT+CCLK?", "OK", 3000)
|
||||
|
||||
if not resp or "+CCLK:" not in resp:
|
||||
logger = logger_manager.logger
|
||||
if logger:
|
||||
logger.warning(f"[TIME] 未获取到时间响应: {resp}")
|
||||
else:
|
||||
print(f"[TIME] 未获取到时间响应: {resp}")
|
||||
return None
|
||||
|
||||
# 解析并转换时区
|
||||
dt = parse_4g_time(resp, timezone_offset)
|
||||
return dt
|
||||
except Exception as e:
|
||||
logger = logger_manager.logger
|
||||
if logger:
|
||||
logger.error(f"[TIME] 获取4G时间异常: {e}")
|
||||
else:
|
||||
print(f"[TIME] 获取4G时间异常: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def sync_system_time_from_4g(timezone_offset=8):
|
||||
"""
|
||||
从4G模块同步时间到系统
|
||||
|
||||
Args:
|
||||
timezone_offset: 时区偏移(小时),默认8(中国时区)
|
||||
|
||||
Returns:
|
||||
bool: 是否成功
|
||||
"""
|
||||
dt = get_time_from_4g(timezone_offset)
|
||||
if not dt:
|
||||
return False
|
||||
|
||||
try:
|
||||
# 转换为系统 date 命令需要的格式
|
||||
time_str = dt.strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
# 设置系统时间
|
||||
cmd = f'date -s "{time_str}" 2>&1'
|
||||
result = os.system(cmd)
|
||||
|
||||
if result == 0:
|
||||
logger = logger_manager.logger
|
||||
if logger:
|
||||
logger.info(f"[TIME] 系统时间已设置为: {time_str}")
|
||||
else:
|
||||
print(f"[TIME] 系统时间已设置为: {time_str}")
|
||||
|
||||
# 可选:同步到硬件时钟
|
||||
try:
|
||||
os.system('hwclock -w 2>/dev/null')
|
||||
logger = logger_manager.logger
|
||||
if logger:
|
||||
logger.info("[TIME] 已同步到硬件时钟")
|
||||
except:
|
||||
pass
|
||||
|
||||
return True
|
||||
else:
|
||||
logger = logger_manager.logger
|
||||
if logger:
|
||||
logger.error(f"[TIME] 设置系统时间失败,退出码: {result}")
|
||||
else:
|
||||
print(f"[TIME] 设置系统时间失败,退出码: {result}")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger = logger_manager.logger
|
||||
if logger:
|
||||
logger.error(f"[TIME] 同步系统时间异常: {e}")
|
||||
else:
|
||||
print(f"[TIME] 同步系统时间异常: {e}")
|
||||
return False
|
||||
|
||||
13
version.py
Normal file
13
version.py
Normal file
@@ -0,0 +1,13 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
应用版本号
|
||||
每次 OTA 更新时,只需要更新这个文件中的版本号
|
||||
"""
|
||||
VERSION = '1.1.1'
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
360
vision.py
Normal file
360
vision.py
Normal file
@@ -0,0 +1,360 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
视觉检测模块
|
||||
提供靶心检测、距离估算、图像保存等功能
|
||||
"""
|
||||
import cv2
|
||||
import numpy as np
|
||||
import os
|
||||
import math
|
||||
from maix import image
|
||||
import globals
|
||||
import config
|
||||
from logger_manager import logger_manager
|
||||
|
||||
|
||||
def detect_circle_v3(frame, laser_point=None):
|
||||
"""检测图像中的靶心(优先清晰轮廓,其次黄色区域)- 返回椭圆参数版本
|
||||
增加红色圆圈检测,验证黄色圆圈是否为真正的靶心
|
||||
如果提供 laser_point,会选择最接近激光点的目标
|
||||
|
||||
Args:
|
||||
frame: 图像帧
|
||||
laser_point: 激光点坐标 (x, y),用于多目标场景下的目标选择
|
||||
|
||||
Returns:
|
||||
(result_img, best_center, best_radius, method, best_radius1, ellipse_params)
|
||||
"""
|
||||
img_cv = image.image2cv(frame, False, False)
|
||||
|
||||
best_center = best_radius = best_radius1 = method = None
|
||||
ellipse_params = None
|
||||
|
||||
# HSV 黄色掩码检测(模糊靶心)
|
||||
hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV)
|
||||
h, s, v = cv2.split(hsv)
|
||||
|
||||
# 调整饱和度策略:稍微增强,不要过度
|
||||
s = np.clip(s * 1.1, 0, 255).astype(np.uint8)
|
||||
|
||||
hsv = cv2.merge((h, s, v))
|
||||
|
||||
# 放宽 HSV 阈值范围(针对模糊图像的关键调整)
|
||||
lower_yellow = np.array([7, 80, 0]) # 饱和度下限降低,捕捉淡黄色
|
||||
upper_yellow = np.array([32, 255, 255]) # 亮度上限拉满
|
||||
|
||||
mask_yellow = cv2.inRange(hsv, lower_yellow, upper_yellow)
|
||||
|
||||
# 调整形态学操作
|
||||
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
|
||||
mask_yellow = cv2.morphologyEx(mask_yellow, cv2.MORPH_CLOSE, kernel)
|
||||
|
||||
contours_yellow, _ = cv2.findContours(mask_yellow, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
|
||||
# 存储所有有效的黄色-红色组合
|
||||
valid_targets = []
|
||||
|
||||
if contours_yellow:
|
||||
for cnt_yellow in contours_yellow:
|
||||
area = cv2.contourArea(cnt_yellow)
|
||||
perimeter = cv2.arcLength(cnt_yellow, True)
|
||||
|
||||
# 计算圆度
|
||||
if perimeter > 0:
|
||||
circularity = (4 * np.pi * area) / (perimeter * perimeter)
|
||||
else:
|
||||
circularity = 0
|
||||
|
||||
logger = logger_manager.logger
|
||||
if area > 50 and circularity > 0.7:
|
||||
if logger:
|
||||
logger.info(f"[target] -> 面积:{area}, 圆度:{circularity:.2f}")
|
||||
# 尝试拟合椭圆
|
||||
yellow_center = None
|
||||
yellow_radius = None
|
||||
yellow_ellipse = None
|
||||
|
||||
if len(cnt_yellow) >= 5:
|
||||
(x, y), (width, height), angle = cv2.fitEllipse(cnt_yellow)
|
||||
yellow_ellipse = ((x, y), (width, height), angle)
|
||||
axes_minor = min(width, height)
|
||||
radius = axes_minor / 2
|
||||
yellow_center = (int(x), int(y))
|
||||
yellow_radius = int(radius)
|
||||
else:
|
||||
(x, y), radius = cv2.minEnclosingCircle(cnt_yellow)
|
||||
yellow_center = (int(x), int(y))
|
||||
yellow_radius = int(radius)
|
||||
yellow_ellipse = None
|
||||
|
||||
# 如果检测到黄色圆圈,再检测红色圆圈进行验证
|
||||
if yellow_center and yellow_radius:
|
||||
# HSV 红色掩码检测(红色在HSV中跨越0度,需要两个范围)
|
||||
# 红色范围1: 0-10度(接近0度的红色)
|
||||
lower_red1 = np.array([0, 80, 0])
|
||||
upper_red1 = np.array([10, 255, 255])
|
||||
mask_red1 = cv2.inRange(hsv, lower_red1, upper_red1)
|
||||
|
||||
# 红色范围2: 170-180度(接近180度的红色)
|
||||
lower_red2 = np.array([170, 80, 0])
|
||||
upper_red2 = np.array([180, 255, 255])
|
||||
mask_red2 = cv2.inRange(hsv, lower_red2, upper_red2)
|
||||
|
||||
# 合并两个红色掩码
|
||||
mask_red = cv2.bitwise_or(mask_red1, mask_red2)
|
||||
|
||||
# 形态学操作
|
||||
kernel_red = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
|
||||
mask_red = cv2.morphologyEx(mask_red, cv2.MORPH_CLOSE, kernel_red)
|
||||
|
||||
contours_red, _ = cv2.findContours(mask_red, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
|
||||
found_valid_red = False
|
||||
|
||||
if contours_red:
|
||||
# 找到所有符合条件的红色圆圈
|
||||
for cnt_red in contours_red:
|
||||
area_red = cv2.contourArea(cnt_red)
|
||||
perimeter_red = cv2.arcLength(cnt_red, True)
|
||||
|
||||
if perimeter_red > 0:
|
||||
circularity_red = (4 * np.pi * area_red) / (perimeter_red * perimeter_red)
|
||||
else:
|
||||
circularity_red = 0
|
||||
|
||||
# 红色圆圈也应该有一定的圆度
|
||||
if area_red > 50 and circularity_red > 0.6:
|
||||
# 计算红色圆圈的中心和半径
|
||||
if len(cnt_red) >= 5:
|
||||
(x_red, y_red), (w_red, h_red), angle_red = cv2.fitEllipse(cnt_red)
|
||||
radius_red = min(w_red, h_red) / 2
|
||||
red_center = (int(x_red), int(y_red))
|
||||
red_radius = int(radius_red)
|
||||
else:
|
||||
(x_red, y_red), radius_red = cv2.minEnclosingCircle(cnt_red)
|
||||
red_center = (int(x_red), int(y_red))
|
||||
red_radius = int(radius_red)
|
||||
|
||||
# 计算黄色和红色圆心的距离
|
||||
if red_center:
|
||||
dx = yellow_center[0] - red_center[0]
|
||||
dy = yellow_center[1] - red_center[1]
|
||||
distance = np.sqrt(dx*dx + dy*dy)
|
||||
|
||||
# 圆心距离阈值:应该小于黄色半径的某个倍数(比如1.5倍)
|
||||
max_distance = yellow_radius * 1.5
|
||||
|
||||
# 红色圆圈应该比黄色圆圈大(外圈)
|
||||
if distance < max_distance and red_radius > yellow_radius * 0.8:
|
||||
found_valid_red = True
|
||||
logger = logger_manager.logger
|
||||
if logger:
|
||||
logger.info(f"[target] -> 找到匹配的红圈: 黄心({yellow_center}), 红心({red_center}), 距离:{distance:.1f}, 黄半径:{yellow_radius}, 红半径:{red_radius}")
|
||||
|
||||
# 记录这个有效目标
|
||||
valid_targets.append({
|
||||
'center': yellow_center,
|
||||
'radius': yellow_radius,
|
||||
'ellipse': yellow_ellipse,
|
||||
'area': area
|
||||
})
|
||||
break
|
||||
|
||||
if not found_valid_red:
|
||||
logger = logger_manager.logger
|
||||
if logger:
|
||||
logger.debug("Debug -> 未找到匹配的红色圆圈,可能是误识别")
|
||||
|
||||
# 从所有有效目标中选择最佳目标
|
||||
if valid_targets:
|
||||
if laser_point:
|
||||
# 如果有激光点,选择最接近激光点的目标
|
||||
best_target = None
|
||||
min_distance = float('inf')
|
||||
for target in valid_targets:
|
||||
dx = target['center'][0] - laser_point[0]
|
||||
dy = target['center'][1] - laser_point[1]
|
||||
distance = np.sqrt(dx*dx + dy*dy)
|
||||
if distance < min_distance:
|
||||
min_distance = distance
|
||||
best_target = target
|
||||
if best_target:
|
||||
best_center = best_target['center']
|
||||
best_radius = best_target['radius']
|
||||
ellipse_params = best_target['ellipse']
|
||||
method = "v3_ellipse_red_validated_laser_selected"
|
||||
best_radius1 = best_radius * 5
|
||||
else:
|
||||
# 如果没有激光点,选择面积最大的目标
|
||||
best_target = max(valid_targets, key=lambda t: t['area'])
|
||||
best_center = best_target['center']
|
||||
best_radius = best_target['radius']
|
||||
ellipse_params = best_target['ellipse']
|
||||
method = "v3_ellipse_red_validated"
|
||||
best_radius1 = best_radius * 5
|
||||
|
||||
result_img = image.cv2image(img_cv, False, False)
|
||||
return result_img, best_center, best_radius, method, best_radius1, ellipse_params
|
||||
|
||||
|
||||
def estimate_distance(pixel_radius):
|
||||
"""根据像素半径估算实际距离(单位:米)"""
|
||||
if not pixel_radius:
|
||||
return 0.0
|
||||
return (config.REAL_RADIUS_CM * config.FOCAL_LENGTH_PIX) / pixel_radius / 100.0
|
||||
|
||||
|
||||
def compute_laser_position(circle_center, laser_point, radius, method):
|
||||
"""计算激光相对于靶心的偏移量(单位:厘米)"""
|
||||
if not all([circle_center, radius, method]):
|
||||
return None, None
|
||||
cx, cy = circle_center
|
||||
lx, ly = laser_point
|
||||
# 根据检测方法动态调整靶心物理半径(简化模型)
|
||||
circle_r = (radius / 4.0) * 20.0 if method == "模糊" else (68 / 16.0) * 20.0
|
||||
dx = lx - cx
|
||||
dy = ly - cy
|
||||
return dx / (circle_r / 100.0), -dy / (circle_r / 100.0)
|
||||
|
||||
|
||||
def save_shot_image(result_img, center, radius, method, ellipse_params,
|
||||
laser_point, distance_m, photo_dir=None):
|
||||
"""
|
||||
保存射击图像(带标注)
|
||||
即使没有检测到靶心也会保存图像,文件名会标注 "no_target"
|
||||
确保保存的图像总是包含激光十字线
|
||||
|
||||
Args:
|
||||
result_img: 处理后的图像对象(可能已经包含激光十字线或检测标注)
|
||||
center: 靶心中心坐标 (x, y),可能为 None(未检测到靶心)
|
||||
radius: 靶心半径,可能为 None(未检测到靶心)
|
||||
method: 检测方法,可能为 None(未检测到靶心)
|
||||
ellipse_params: 椭圆参数 ((center, (width, height), angle)) 或 None
|
||||
laser_point: 激光点坐标 (x, y)
|
||||
distance_m: 距离(米),可能为 None(未检测到靶心)
|
||||
photo_dir: 照片存储目录,如果为None则使用 config.PHOTO_DIR
|
||||
|
||||
Returns:
|
||||
str: 保存的文件路径,如果保存失败或未启用则返回 None
|
||||
"""
|
||||
# 检查是否启用图像保存
|
||||
if not config.SAVE_IMAGE_ENABLED:
|
||||
return None
|
||||
|
||||
if photo_dir is None:
|
||||
photo_dir = config.PHOTO_DIR
|
||||
|
||||
try:
|
||||
# 确保照片目录存在
|
||||
try:
|
||||
if photo_dir not in os.listdir("/root"):
|
||||
os.mkdir(photo_dir)
|
||||
except:
|
||||
pass
|
||||
|
||||
# 生成文件名
|
||||
# 统计所有图片文件(包括 .bmp 和 .jpg)
|
||||
try:
|
||||
all_images = [f for f in os.listdir(photo_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))]
|
||||
img_count = len(all_images)
|
||||
except:
|
||||
img_count = 0
|
||||
|
||||
x, y = laser_point
|
||||
|
||||
# 如果未检测到靶心,在文件名中标注
|
||||
if center is None or radius is None:
|
||||
method_str = "no_target"
|
||||
distance_str = "000"
|
||||
else:
|
||||
method_str = method or "unknown"
|
||||
distance_str = str(round((distance_m or 0.0) * 100))
|
||||
|
||||
filename = f"{photo_dir}/{method_str}_{int(x)}_{int(y)}_{distance_str}_{img_count:04d}.bmp"
|
||||
|
||||
logger = logger_manager.logger
|
||||
if logger:
|
||||
if center and radius:
|
||||
logger.info(f"结果 -> 圆心: {center}, 半径: {radius}, 方法: {method}")
|
||||
if ellipse_params:
|
||||
(ell_center, (width, height), angle) = ellipse_params
|
||||
logger.info(f"椭圆 -> 中心: ({ell_center[0]:.1f}, {ell_center[1]:.1f}), 长轴: {max(width, height):.1f}, 短轴: {min(width, height):.1f}, 角度: {angle:.1f}°")
|
||||
else:
|
||||
logger.info(f"结果 -> 未检测到靶心,保存原始图像(激光点: ({x}, {y}))")
|
||||
|
||||
# 转换图像为 OpenCV 格式以便绘制
|
||||
img_cv = image.image2cv(result_img, False, False)
|
||||
|
||||
# 确保激光十字线被绘制(使用OpenCV在图像上绘制,确保可见性)
|
||||
laser_color = (config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2])
|
||||
thickness = max(config.LASER_THICKNESS, 2) # 至少2像素宽,确保可见
|
||||
length = max(config.LASER_LENGTH, 10) # 至少10像素长
|
||||
|
||||
# 绘制激光十字线(水平线)
|
||||
cv2.line(img_cv,
|
||||
(int(x - length), int(y)),
|
||||
(int(x + length), int(y)),
|
||||
laser_color, thickness)
|
||||
# 绘制激光十字线(垂直线)
|
||||
cv2.line(img_cv,
|
||||
(int(x), int(y - length)),
|
||||
(int(x), int(y + length)),
|
||||
laser_color, thickness)
|
||||
# 绘制激光点
|
||||
cv2.circle(img_cv, (int(x), int(y)), max(thickness, 3), laser_color, -1)
|
||||
|
||||
# 如果检测到靶心,绘制靶心标注
|
||||
if center and radius:
|
||||
cx, cy = center
|
||||
|
||||
if ellipse_params:
|
||||
(ell_center, (width, height), angle) = ellipse_params
|
||||
cx_ell, cy_ell = int(ell_center[0]), int(ell_center[1])
|
||||
|
||||
# 绘制椭圆
|
||||
cv2.ellipse(img_cv,
|
||||
(cx_ell, cy_ell),
|
||||
(int(width/2), int(height/2)),
|
||||
angle,
|
||||
0, 360,
|
||||
(0, 255, 0),
|
||||
2)
|
||||
cv2.circle(img_cv, (cx_ell, cy_ell), 3, (255, 0, 0), -1)
|
||||
|
||||
# 绘制短轴
|
||||
minor_length = min(width, height) / 2
|
||||
minor_angle = angle + 90 if width >= height else angle
|
||||
minor_angle_rad = math.radians(minor_angle)
|
||||
dx_minor = minor_length * math.cos(minor_angle_rad)
|
||||
dy_minor = minor_length * math.sin(minor_angle_rad)
|
||||
pt1_minor = (int(cx_ell - dx_minor), int(cy_ell - dy_minor))
|
||||
pt2_minor = (int(cx_ell + dx_minor), int(cy_ell + dy_minor))
|
||||
cv2.line(img_cv, pt1_minor, pt2_minor, (0, 0, 255), 2)
|
||||
else:
|
||||
# 绘制圆形靶心
|
||||
cv2.circle(img_cv, (cx, cy), radius, (0, 0, 255), 2)
|
||||
cv2.circle(img_cv, (cx, cy), 2, (0, 0, 255), -1)
|
||||
|
||||
# 如果检测到靶心,绘制从激光点到靶心的连线(可选,用于可视化偏移)
|
||||
cv2.line(img_cv, (int(x), int(y)), (cx, cy), (255, 255, 0), 1)
|
||||
|
||||
# 转换回 MaixPy 图像格式并保存
|
||||
result_img = image.cv2image(img_cv, False, False)
|
||||
result_img.save(filename)
|
||||
|
||||
if logger:
|
||||
if center and radius:
|
||||
logger.debug(f"图像已保存(含靶心标注): {filename}")
|
||||
else:
|
||||
logger.debug(f"图像已保存(无靶心,含激光十字线): {filename}")
|
||||
|
||||
return filename
|
||||
except Exception as e:
|
||||
logger = logger_manager.logger
|
||||
if logger:
|
||||
logger.error(f"保存图像失败: {e}")
|
||||
import traceback
|
||||
logger.error(traceback.format_exc())
|
||||
return None
|
||||
|
||||
Reference in New Issue
Block a user