Compare commits

..

2 Commits

Author SHA1 Message Date
huangzhenwei2
708925ab41 refine the code to different part 2026-01-12 11:39:27 +08:00
huangzhenwei2
92ad32bb8e refine ota 2025-12-30 16:40:01 +08:00
11 changed files with 2796 additions and 466 deletions

79
S99archery Normal file
View File

@@ -0,0 +1,79 @@
#!/bin/sh
# /etc/init.d/S99archery
# 系统启动时处理致命错误恢复(仅处理无法启动的情况)
# 注意:应用的启动由系统自动启动机制处理(通过 auto_start.txt
# 功能:
# 1. 处理致命错误(无法启动)- 恢复 main.py
# 2. 如果重启次数超过阈值,恢复 main.py 并重启系统
APP_DIR="/maixapp/apps/t11"
MAIN_PY="$APP_DIR/main.py"
PENDING_FILE="$APP_DIR/ota_pending.json"
BACKUP_BASE="$APP_DIR/backups"
# 进入应用目录
cd "$APP_DIR" || exit 0
# 检查 pending 文件,如果存在且超过重启次数,恢复 main.py处理致命错误
if [ -f "$PENDING_FILE" ]; then
echo "[S99] 检测到 ota_pending.json检查重启计数..."
# 尝试从JSON中提取重启计数使用grep简单提取
RESTART_COUNT=$(cat "$PENDING_FILE" 2>/dev/null | grep -o '"restart_count":[0-9]*' | grep -o '[0-9]*' || echo "0")
MAX_RESTARTS=$(cat "$PENDING_FILE" 2>/dev/null | grep -o '"max_restarts":[0-9]*' | grep -o '[0-9]*' || echo "3")
if [ -n "$RESTART_COUNT" ] && [ "$RESTART_COUNT" -ge "$MAX_RESTARTS" ]; then
echo "[S99] 检测到重启次数 ($RESTART_COUNT) 超过阈值 ($MAX_RESTARTS),恢复 main.py..."
# 尝试从JSON中提取备份目录
BACKUP_DIR=$(cat "$PENDING_FILE" 2>/dev/null | grep -o '"backup_dir":"[^"]*"' | grep -o '/[^"]*' || echo "")
if [ -n "$BACKUP_DIR" ] && [ -f "$BACKUP_DIR/main.py" ]; then
# 使用指定的备份目录
echo "[S99] 从备份目录恢复: $BACKUP_DIR/main.py"
cp "$BACKUP_DIR/main.py" "$MAIN_PY" 2>/dev/null && echo "[S99] 已恢复 main.py"
else
# 查找最新的备份目录
LATEST_BACKUP=$(ls -dt "$BACKUP_BASE"/backup_* 2>/dev/null | head -1)
if [ -n "$LATEST_BACKUP" ] && [ -f "$LATEST_BACKUP/main.py" ]; then
echo "[S99] 从最新备份恢复: $LATEST_BACKUP/main.py"
cp "$LATEST_BACKUP/main.py" "$MAIN_PY" 2>/dev/null && echo "[S99] 已恢复 main.py"
else
# 如果没有备份目录,尝试使用 main.py.bak
if [ -f "$APP_DIR/main.py.bak" ]; then
echo "[S99] 从 main.py.bak 恢复"
cp "$APP_DIR/main.py.bak" "$MAIN_PY" 2>/dev/null && echo "[S99] 已恢复 main.py"
fi
fi
fi
# 恢复后重置重启计数,避免循环恢复
# 注意:不在这里删除 pending 文件,让 main.py 在心跳成功后删除
# 但是重置重启计数,以便恢复后的版本可以重新开始计数
python3 -c "
import json, os
try:
pending_path = '$PENDING_FILE'
if os.path.exists(pending_path):
with open(pending_path, 'r', encoding='utf-8') as f:
d = json.load(f)
d['restart_count'] = 0 # 重置重启计数
with open(pending_path, 'w', encoding='utf-8') as f:
json.dump(d, f)
print('[S99] 已重置重启计数为 0')
except Exception as e:
print(f'[S99] 重置重启计数失败: {e}')
" 2>/dev/null || echo "[S99] 无法重置重启计数可能需要Python支持"
echo "[S99] 已恢复 main.py重启系统..."
echo "[S99] 注意pending 文件将在心跳成功后由 main.py 删除"
sleep 2
reboot
exit 0
fi
fi
# 不启动应用,让系统自动启动机制处理
# 这个脚本只负责处理致命错误恢复
exit 0

307
at_client.py Normal file
View File

@@ -0,0 +1,307 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AT客户端模块
负责4G模块的AT命令通信和URC解析
"""
import _thread
from maix import time
import re
import threading
class ATClient:
"""
单读者 AT/URC 客户端:唯一读取 uart4g避免 tcp_main/at()/OTA 抢读导致 EOF / 丢包。
- send(cmd, expect, timeout_ms) : 发送 AT 并等待 expect
- pop_tcp_payload() : 获取 +MIPURC:"rtcp" 的 payload已按长度裁剪
- pop_http_event() : 获取 +MHTTPURC 事件header/content
"""
def __init__(self, uart_obj):
self.uart = uart_obj
self._cmd_lock = threading.Lock()
self._q_lock = threading.Lock()
self._rx = b""
self._tcp_payloads = []
self._http_events = []
# 当前命令等待状态(仅允许单命令 in-flight
self._waiting = False
self._expect = b"OK"
self._resp = b""
self._running = False
def start(self):
if self._running:
return
self._running = True
_thread.start_new_thread(self._reader_loop, ())
def stop(self):
self._running = False
def flush(self):
"""清空内部缓存与队列(用于 OTA/异常恢复)"""
with self._q_lock:
self._rx = b""
self._tcp_payloads.clear()
self._http_events.clear()
self._resp = b""
def pop_tcp_payload(self):
with self._q_lock:
if self._tcp_payloads:
return self._tcp_payloads.pop(0)
return None
def pop_http_event(self):
with self._q_lock:
if self._http_events:
return self._http_events.pop(0)
return None
def _push_tcp_payload(self, payload: bytes):
# 注意:在 _reader_loop 内部解析 URC 时已经持有 _q_lock
# 这里不要再次 acquire锁不可重入会死锁
self._tcp_payloads.append(payload)
def _push_http_event(self, ev):
# 同上:避免在 _reader_loop 持锁期间二次 acquire
self._http_events.append(ev)
def send(self, cmd: str, expect: str = "OK", timeout_ms: int = 2000):
"""
发送 AT 命令并等待 expect子串匹配
注意expect=">" 用于等待 prompt。
"""
expect_b = expect.encode() if isinstance(expect, str) else expect
with self._cmd_lock:
# 初始化等待
self._waiting = True
self._expect = expect_b
self._resp = b""
# 发送
if cmd:
# 注意:这里不要再用 uart4g_lock否则外层已经持锁时会死锁
# 写入由 _cmd_lock 串行化即可。
self.uart.write((cmd + "\r\n").encode())
t0 = time.ticks_ms()
while time.ticks_ms() - t0 < timeout_ms:
if (not self._waiting) or (self._expect in self._resp):
self._waiting = False
break
time.sleep_ms(5)
# 超时也返回已收集内容(便于诊断)
self._waiting = False
try:
return self._resp.decode(errors="ignore")
except:
return str(self._resp)
def _find_urc_tag(self, tag: bytes):
"""
只在"真正的 URC 边界"查找 tag避免误命中 HTTP payload 内容。
规则tag 必须出现在 buffer 开头,或紧跟在 b"\\r\\n" 后面。
"""
try:
i = 0
rx = self._rx
while True:
j = rx.find(tag, i)
if j < 0:
return -1
if j == 0:
return 0
if j >= 2 and rx[j - 2:j] == b"\r\n":
return j
i = j + 1
except:
return -1
def _parse_mipurc_rtcp(self):
"""
解析:+MIPURC: "rtcp",<link_id>,<len>,<payload...>
之前硬编码 link_id=0 会导致在多连接/重连场景下收不到数据。
"""
prefix = b'+MIPURC: "rtcp",'
i = self._find_urc_tag(prefix)
if i < 0:
return False
# 丢掉前置噪声
if i > 0:
self._rx = self._rx[i:]
i = 0
j = len(prefix)
# 解析 link_id
k = j
while k < len(self._rx) and 48 <= self._rx[k] <= 57:
k += 1
if k == j or k >= len(self._rx):
return False
if self._rx[k:k+1] != b",":
self._rx = self._rx[1:]
return True
try:
link_id = int(self._rx[j:k].decode())
except:
self._rx = self._rx[1:]
return True
# 解析 len
j2 = k + 1
k2 = j2
while k2 < len(self._rx) and 48 <= self._rx[k2] <= 57:
k2 += 1
if k2 == j2 or k2 >= len(self._rx):
return False
if self._rx[k2:k2+1] != b",":
self._rx = self._rx[1:]
return True
try:
n = int(self._rx[j2:k2].decode())
except:
self._rx = self._rx[1:]
return True
payload_start = k2 + 1
payload_end = payload_start + n
if len(self._rx) < payload_end:
return False # payload 未收齐
payload = self._rx[payload_start:payload_end]
# 把 link_id 一起带上,便于上层过滤(如果需要)
self._push_tcp_payload((link_id, payload))
self._rx = self._rx[payload_end:]
return True
def _parse_mhttpurc_header(self):
tag = b'+MHTTPURC: "header",'
i = self._find_urc_tag(tag)
if i < 0:
return False
if i > 0:
self._rx = self._rx[i:]
i = 0
# header: +MHTTPURC: "header",<id>,<code>,<hdr_len>,<hdr_text...>
j = len(tag)
comma_count = 0
k = j
while k < len(self._rx) and comma_count < 3:
if self._rx[k:k+1] == b",":
comma_count += 1
k += 1
if comma_count < 3:
return False
prefix = self._rx[:k]
m = re.search(rb'\+MHTTPURC: "header",\s*(\d+),\s*(\d+),\s*(\d+),', prefix)
if not m:
self._rx = self._rx[1:]
return True
urc_id = int(m.group(1))
code = int(m.group(2))
hdr_len = int(m.group(3))
text_start = k
text_end = text_start + hdr_len
if len(self._rx) < text_end:
return False
hdr_text = self._rx[text_start:text_end].decode("utf-8", "ignore")
self._push_http_event(("header", urc_id, code, hdr_text))
self._rx = self._rx[text_end:]
return True
def _parse_mhttpurc_content(self):
tag = b'+MHTTPURC: "content",'
i = self._find_urc_tag(tag)
if i < 0:
return False
if i > 0:
self._rx = self._rx[i:]
i = 0
# content: +MHTTPURC: "content",<id>,<total>,<sum>,<cur>,<payload...>
j = len(tag)
comma_count = 0
k = j
while k < len(self._rx) and comma_count < 4:
if self._rx[k:k+1] == b",":
comma_count += 1
k += 1
if comma_count < 4:
return False
prefix = self._rx[:k]
m = re.search(rb'\+MHTTPURC: "content",\s*(\d+),\s*(\d+),\s*(\d+),\s*(\d+),', prefix)
if not m:
self._rx = self._rx[1:]
return True
urc_id = int(m.group(1))
total_len = int(m.group(2))
sum_len = int(m.group(3))
cur_len = int(m.group(4))
payload_start = k
payload_end = payload_start + cur_len
if len(self._rx) < payload_end:
return False
payload = self._rx[payload_start:payload_end]
self._push_http_event(("content", urc_id, total_len, sum_len, cur_len, payload))
self._rx = self._rx[payload_end:]
return True
def _reader_loop(self):
while self._running:
# 关键UART 驱动偶发 read failed必须兜住否则线程挂了 OTA/TCP 都会卡死
try:
d = self.uart.read(4096) # 8192 在一些驱动上更容易触发 read failed
except Exception as e:
try:
print("[ATClient] uart read failed:", e)
except:
pass
time.sleep_ms(50)
continue
if not d:
time.sleep_ms(1)
continue
with self._q_lock:
self._rx += d
if self._waiting:
self._resp += d
while True:
progressed = (
self._parse_mipurc_rtcp()
or self._parse_mhttpurc_header()
or self._parse_mhttpurc_content()
)
if not progressed:
break
# 使用 ota_manager 访问 ota_in_progress
try:
from ota_manager import ota_manager
ota_flag = ota_manager.ota_in_progress
except:
ota_flag = False
has_http_hint = (b"+MHTTP" in self._rx) or (b"+MHTTPURC" in self._rx)
if ota_flag or has_http_hint:
if len(self._rx) > 512 * 1024:
self._rx = self._rx[-256 * 1024:]
else:
if len(self._rx) > 16384:
self._rx = self._rx[-4096:]

81
config.py Normal file
View File

@@ -0,0 +1,81 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
系统配置常量
这些值在程序运行期间基本不变,或只在配置时改变
"""
from version import VERSION
# ==================== 应用配置 ====================
APP_VERSION = VERSION
APP_DIR = "/maixapp/apps/t11"
LOCAL_FILENAME = "/maixapp/apps/t11/main_tmp.py"
# ==================== 服务器配置 ====================
SERVER_IP = "www.shelingxingqiu.com"
SERVER_PORT = 50005
HEARTBEAT_INTERVAL = 15 # 心跳间隔(秒)
# ==================== HTTP配置 ====================
HTTP_URL = "http://ws.shelingxingqiu.com"
HTTP_API_PATH = "/home/shoot/device_fire/arrow/fire"
# ==================== 文件路径配置 ====================
CONFIG_FILE = "/root/laser_config.json"
LOG_FILE = "/maixapp/apps/t11/app.log"
BACKUP_BASE = "/maixapp/apps/t11/backups"
# ==================== 硬件配置 ====================
# UART配置
UART4G_DEVICE = "/dev/ttyS2"
UART4G_BAUDRATE = 115200
DISTANCE_SERIAL_DEVICE = "/dev/ttyS1"
DISTANCE_SERIAL_BAUDRATE = 9600
# I2C配置
I2C_BUS_NUM = 1
INA226_ADDR = 0x40
REG_CONFIGURATION = 0x00
REG_BUS_VOLTAGE = 0x02
REG_CURRENT = 0x04 # 电流寄存器
REG_CALIBRATION = 0x05
CALIBRATION_VALUE = 0x1400
# ADC配置
ADC_CHANNEL = 0
ADC_TRIGGER_THRESHOLD = 3000
ADC_LASER_THRESHOLD = 3000
# ==================== 激光配置 ====================
MODULE_ADDR = 0x00
LASER_ON_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC1])
LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0])
DEFAULT_LASER_POINT = (640, 480) # 默认激光中心点
# ==================== 视觉检测配置 ====================
FOCAL_LENGTH_PIX = 2250.0 # 焦距(像素)
REAL_RADIUS_CM = 20 # 靶心实际半径(厘米)
# ==================== 显示配置 ====================
LASER_COLOR = (255, 100, 0) # RGB颜色
LASER_THICKNESS = 1
LASER_LENGTH = 2
# ==================== 图像保存配置 ====================
SAVE_IMAGE_ENABLED = True # 是否保存图像True=保存False=不保存)
PHOTO_DIR = "/root/phot" # 照片存储目录
# ==================== OTA配置 ====================
MAX_BACKUPS = 5
LOG_MAX_BYTES = 10 * 1024 * 1024 # 10MB
LOG_BACKUP_COUNT = 5
# ==================== 引脚映射配置 ====================
PIN_MAPPINGS = {
"A18": "UART1_RX",
"A19": "UART1_TX",
"A29": "UART2_RX",
"A28": "UART2_TX",
"P18": "I2C1_SCL",
"P21": "I2C1_SDA",
}

119
hardware.py Normal file
View File

@@ -0,0 +1,119 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
硬件管理器模块
提供硬件对象的统一管理和访问
"""
import threading
import config
from at_client import ATClient
class HardwareManager:
"""硬件管理器(单例)"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(HardwareManager, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# 私有硬件对象
self._uart4g = None # 4G模块UART
self._distance_serial = None # 激光测距串口
self._bus = None # I2C总线
self._adc_obj = None # ADC对象
self._at_client = None # AT客户端
self._initialized = True
# ==================== 硬件访问(只读属性)====================
@property
def uart4g(self):
"""4G模块UART只读"""
return self._uart4g
@property
def distance_serial(self):
"""激光测距串口(只读)"""
return self._distance_serial
@property
def bus(self):
"""I2C总线只读"""
return self._bus
@property
def adc_obj(self):
"""ADC对象只读"""
return self._adc_obj
@property
def at_client(self):
"""AT客户端只读"""
return self._at_client
# ==================== 初始化方法 ====================
def init_uart4g(self, device=None, baudrate=None):
"""初始化4G模块UART"""
from maix import uart
if device is None:
device = config.UART4G_DEVICE
if baudrate is None:
baudrate = config.UART4G_BAUDRATE
self._uart4g = uart.UART(device, baudrate)
return self._uart4g
def init_distance_serial(self, device=None, baudrate=None):
"""初始化激光测距串口(激光控制)"""
from maix import uart
if device is None:
device = config.DISTANCE_SERIAL_DEVICE
if baudrate is None:
baudrate = config.DISTANCE_SERIAL_BAUDRATE
print(f"[HW] 初始化激光串口: device={device}, baudrate={baudrate}")
self._distance_serial = uart.UART(device, baudrate)
print(f"[HW] 激光串口初始化完成: {self._distance_serial}")
return self._distance_serial
def init_bus(self, bus_num=None):
"""初始化I2C总线"""
from maix import i2c
if bus_num is None:
bus_num = config.I2C_BUS_NUM
self._bus = i2c.I2C(bus_num, i2c.Mode.MASTER)
return self._bus
def init_adc(self, channel=None, res_bit=None):
"""初始化ADC"""
from maix.peripheral import adc
if channel is None:
channel = config.ADC_CHANNEL
if res_bit is None:
res_bit = adc.RES_BIT_12
self._adc_obj = adc.ADC(channel, res_bit)
return self._adc_obj
def init_at_client(self, uart_obj=None):
"""初始化AT客户端"""
if uart_obj is None:
if self._uart4g is None:
raise ValueError("uart4g must be initialized before at_client")
uart_obj = self._uart4g
self._at_client = ATClient(uart_obj)
self._at_client.start()
return self._at_client
# 创建全局单例实例
hardware_manager = HardwareManager()

260
laser_manager.py Normal file
View File

@@ -0,0 +1,260 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
激光管理器模块
提供激光控制、校准等功能
"""
import json
import os
from maix import time, camera
import threading
import config
from logger_manager import logger_manager
class LaserManager:
"""激光控制管理器(单例)"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(LaserManager, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# 私有状态
self._calibration_active = False
self._calibration_result = None
self._calibration_lock = threading.Lock()
self._laser_point = None
self._initialized = True
# ==================== 状态访问(只读属性)====================
@property
def calibration_active(self):
"""是否正在校准"""
return self._calibration_active
@property
def laser_point(self):
"""当前激光点"""
return self._laser_point
# ==================== 业务方法 ====================
def load_laser_point(self):
"""从配置文件加载激光中心点,失败则使用默认值"""
try:
if "laser_config.json" in os.listdir("/root"):
with open(config.CONFIG_FILE, "r") as f:
data = json.load(f)
if isinstance(data, list) and len(data) == 2:
self._laser_point = (int(data[0]), int(data[1]))
logger = logger_manager.logger
if logger:
logger.debug(f"[INFO] 加载激光点: {self._laser_point}")
return self._laser_point
else:
raise ValueError
else:
self._laser_point = config.DEFAULT_LASER_POINT
except:
self._laser_point = config.DEFAULT_LASER_POINT
return self._laser_point
def save_laser_point(self, point):
"""保存激光中心点到配置文件"""
try:
with open(config.CONFIG_FILE, "w") as f:
json.dump([point[0], point[1]], f)
self._laser_point = point
return True
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[LASER] 保存激光点失败: {e}")
return False
def turn_on_laser(self):
"""发送指令开启激光,并读取回包(部分模块支持)"""
from hardware import hardware_manager
logger = logger_manager.logger
if hardware_manager.distance_serial is None:
if logger:
logger.error("[LASER] distance_serial 未初始化")
return None
# 打印调试信息
if logger:
logger.info(f"[LASER] 发送开启命令: {config.LASER_ON_CMD.hex()}")
# 清空接收缓冲区
try:
hardware_manager.distance_serial.read(-1) # 清空缓冲区
except:
pass
# 发送命令
written = hardware_manager.distance_serial.write(config.LASER_ON_CMD)
if logger:
logger.info(f"[LASER] 写入字节数: {written}")
time.sleep_ms(50) # 增加等待时间,让模块有时间响应
# 读取回包
resp = hardware_manager.distance_serial.read(20)
if resp:
if logger:
logger.info(f"[LASER] 收到回包 ({len(resp)}字节): {resp.hex()}")
if resp == config.LASER_ON_CMD:
if logger:
logger.info("✅ 激光开启指令已确认")
else:
if logger:
logger.warning("🔇 无回包(可能正常或模块不支持回包)")
return resp
def turn_off_laser(self):
"""发送指令关闭激光"""
from hardware import hardware_manager
logger = logger_manager.logger
if hardware_manager.distance_serial is None:
if logger:
logger.error("[LASER] distance_serial 未初始化")
return None
# 打印调试信息
if logger:
logger.info(f"[LASER] 发送关闭命令: {config.LASER_OFF_CMD.hex()}")
# 清空接收缓冲区
try:
hardware_manager.distance_serial.read(-1)
except:
pass
# 发送命令
written = hardware_manager.distance_serial.write(config.LASER_OFF_CMD)
if logger:
logger.info(f"[LASER] 写入字节数: {written}")
time.sleep_ms(50) # 增加等待时间
# 读取回包
resp = hardware_manager.distance_serial.read(20)
if resp:
if logger:
logger.info(f"[LASER] 收到回包 ({len(resp)}字节): {resp.hex()}")
else:
if logger:
logger.warning("🔇 无回包")
return resp
def flash_laser(self, duration_ms=1000):
"""闪一下激光(用于射箭反馈)"""
try:
self.turn_on_laser()
time.sleep_ms(duration_ms)
self.turn_off_laser()
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"闪激光失败: {e}")
def find_red_laser(self, frame, threshold=150):
"""在图像中查找最亮的红色激光点(基于 RGB 阈值)"""
w, h = frame.width(), frame.height()
img_bytes = frame.to_bytes()
max_sum = 0
best_pos = None
for y in range(0, h, 2):
for x in range(0, w, 2):
idx = (y * w + x) * 3
r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2]
if r > threshold and r > g * 2 and r > b * 2:
rgb_sum = r + g + b
if rgb_sum > max_sum:
max_sum = rgb_sum
best_pos = (x, y)
return best_pos
def calibrate_laser_position(self):
"""执行一次激光校准:拍照 → 找红点 → 保存坐标"""
time.sleep_ms(80)
cam = camera.Camera(640, 480)
frame = cam.read()
pos = self.find_red_laser(frame)
if pos:
self.save_laser_point(pos)
return pos
return None
def start_calibration(self):
"""开始校准(公共方法)"""
with self._calibration_lock:
if self._calibration_active:
return False
self._calibration_active = True
self._calibration_result = None
return True
def stop_calibration(self):
"""停止校准(公共方法)"""
with self._calibration_lock:
self._calibration_active = False
def set_calibration_result(self, result):
"""设置校准结果(内部方法)"""
with self._calibration_lock:
self._calibration_result = result
def get_calibration_result(self):
"""获取并清除校准结果(内部方法)"""
with self._calibration_lock:
result = self._calibration_result
self._calibration_result = None
return result
# 创建全局单例实例
laser_manager = LaserManager()
# ==================== 向后兼容的函数接口 ====================
def load_laser_point():
"""加载激光点(向后兼容接口)"""
return laser_manager.load_laser_point()
def save_laser_point(point):
"""保存激光点(向后兼容接口)"""
return laser_manager.save_laser_point(point)
def turn_on_laser():
"""开启激光(向后兼容接口)"""
return laser_manager.turn_on_laser()
def turn_off_laser():
"""关闭激光(向后兼容接口)"""
return laser_manager.turn_off_laser()
def flash_laser(duration_ms=1000):
"""闪激光(向后兼容接口)"""
return laser_manager.flash_laser(duration_ms)
def find_red_laser(frame, threshold=150):
"""查找红色激光点(向后兼容接口)"""
return laser_manager.find_red_laser(frame, threshold)
def calibrate_laser_position():
"""校准激光位置(向后兼容接口)"""
return laser_manager.calibrate_laser_position()

467
main.py
View File

@@ -1460,472 +1460,6 @@ def laser_calibration_worker():
else:
time.sleep_ms(50)
def download_file_via_4g_legacy(url, filename,
total_timeout_ms=30000,
retries=3,
debug=False):
"""
ML307R HTTP 下载URC content 分片模式)
- 重试empty/incomplete/AT错误都会重试
- 超时total_timeout_ms
- 校验Content-Length 必须填满;如有 Content-Md5 且 hashlib 可用则校验 MD5
- 日志默认干净debug=True 才打印 URC 进度
"""
from urllib.parse import urlparse
parsed = urlparse(url)
host = parsed.hostname
path = parsed.path or "/"
base_url = f"http://{host}" # 你已验证 HTTP 可 200如需 https 需另配 SSL
def _log(*args):
if debug:
print(*args)
def _merge_ranges(ranges_iter):
"""合并重叠/相邻区间,返回 merged(list[(s,e)])(半开区间)"""
rs = sorted(ranges_iter)
merged = []
for s, e in rs:
if e <= s:
continue
if merged and s <= merged[-1][1]:
merged[-1] = (merged[-1][0], max(merged[-1][1], e))
else:
merged.append((s, e))
return merged
def _compute_gaps(total_len, got_ranges):
"""根据已填充区间计算缺口(半开区间)"""
if not total_len or total_len <= 0:
return [(0, 0)]
merged = _merge_ranges(got_ranges)
gaps = []
prev = 0
for s, e in merged:
if s > prev:
gaps.append((prev, s))
prev = max(prev, e)
if prev < total_len:
gaps.append((prev, total_len))
return gaps, merged
def _extract_content_range(hdr_text: str):
"""
Content-Range: bytes <start>-<end>/<total>
返回 (start, end, total);解析失败返回 (None,None,None)
"""
m = re.search(r"Content-Range:\s*bytes\s*(\d+)\s*-\s*(\d+)\s*/\s*(\d+)", hdr_text, re.IGNORECASE)
if not m:
return None, None, None
try:
return int(m.group(1)), int(m.group(2)), int(m.group(3))
except:
return None, None, None
def _get_ip():
r = at("AT+CGPADDR=1", "OK", 3000)
m = re.search(r'\+CGPADDR:\s*1,"([^"]+)"', r)
return m.group(1) if m else ""
def _clear_http_events():
# 清空旧的 HTTP URC 事件,避免串台
while at_client.pop_http_event() is not None:
pass
# 旧版基于直接 uart4g.read 的解析已迁移到 ATClient单读者保留函数占位避免大改动
def _parse_httpid(raw: bytes):
m = re.search(rb"\+MHTTPCREATE:\s*(\d+)", raw)
return int(m.group(1)) if m else None
# _try_parse_header/_try_parse_one_content 已由 ATClient 在 reader 线程中解析并推送事件
# _try_parse_one_content 已由 ATClient 解析
def _extract_hdr_fields(hdr_text: str):
# Content-Length
mlen = re.search(r"Content-Length:\s*(\d+)", hdr_text, re.IGNORECASE)
clen = int(mlen.group(1)) if mlen else None
# Content-Md5 (base64)
mmd5 = re.search(r"Content-Md5:\s*([A-Za-z0-9+/=]+)", hdr_text, re.IGNORECASE)
md5_b64 = mmd5.group(1).strip() if mmd5 else None
return clen, md5_b64
def _md5_base64(data: bytes) -> str:
if hashlib is None:
return ""
digest = hashlib.md5(data).digest()
# base64: 24 chars with ==
return binascii.b2a_base64(digest).decode().strip()
def _one_attempt(range_start=None, range_end=None,
body_buf=None, got_ranges=None,
total_len=None,
expect_md5_b64=None):
# 0) PDP确保有 IP避免把 OK 当成功)
ip = _get_ip()
if not ip or ip == "0.0.0.0":
at("AT+MIPCALL=1,1", "OK", 15000)
for _ in range(10):
ip = _get_ip()
if ip and ip != "0.0.0.0":
break
time.sleep(1)
if not ip or ip == "0.0.0.0":
return False, "PDP not ready (no_ip)", body_buf, got_ranges, total_len, expect_md5_b64
# 1) 清理旧实例 + 清空旧 HTTP 事件
for i in range(0, 6):
at(f"AT+MHTTPDEL={i}", "OK", 1500)
_clear_http_events()
# 2) 创建实例(用 at() 等待返回)
create_resp = at(f'AT+MHTTPCREATE="{base_url}"', "OK", 8000)
httpid = _parse_httpid(create_resp.encode())
if httpid is None:
return False, "MHTTPCREATE failed (no httpid)", body_buf, got_ranges, total_len, expect_md5_b64
# 2.5) Range 补洞按缺口请求指定字节段HTTP Range 右端是 inclusive
if range_start is not None and range_end is not None:
# 每次请求使用新 httpid避免 header 累积/污染
at(f'AT+MHTTPCFG="header",{httpid},"Range: bytes={int(range_start)}-{int(range_end)}"', "OK", 3000)
# 3) 发 GETHTTP URC 由 ATClient 解析并入队)
req_resp = at(f'AT+MHTTPREQUEST={httpid},1,0,"{path}"', "OK", 15000)
if "ERROR" in req_resp or "CME ERROR" in req_resp:
at(f"AT+MHTTPDEL={httpid}", "OK", 3000)
return False, f"MHTTPREQUEST failed: {req_resp}", body_buf, got_ranges, total_len, expect_md5_b64
# 4) 从 ATClient 的 http_events 队列收 header/content
urc_id = None
status_code = None
expect_len = None
# 若是 Range 响应206需要把响应内的偏移映射到“全文件”偏移
offset_base = 0
# got_ranges 记录“真实写入 body_buf 的半开区间”
if got_ranges is None:
got_ranges = set()
filled_new_bytes = 0
# last_sum/resp_total 用于判断“本次 HTTP 响应体”是否接收完成(尤其是 Range 场景)
last_sum = 0
resp_total = None
no_progress_count = 0 # 连续没有进展的次数
last_print_ms = time.ticks_ms()
last_print_sum = 0
# Range 补洞不需要等太久,避免卡死;全量下载用总超时
attempt_timeout_ms = total_timeout_ms
if range_start is not None and range_end is not None:
attempt_timeout_ms = min(total_timeout_ms, 8000)
t0 = time.ticks_ms()
while time.ticks_ms() - t0 < attempt_timeout_ms:
ev = at_client.pop_http_event()
if not ev:
# 如果 sum 已经达到 total_len但仍有 gaps等待更长时间有些分片可能延迟到达
# 对 Rangelast_sum 只会到 resp_total比如 686/774不能拿 total_len(59776) 比
if resp_total and last_sum >= resp_total:
# 本次响应体应该收齐了,继续等一小会儿(防止最后一个 URC 延迟),然后退出循环
time.sleep_ms(30)
no_progress_count += 1
if no_progress_count > 30:
break
continue
# 全量模式:如果模块宣称 sum 已经达到 total_len但仍有 gaps稍微多等
if (range_start is None and range_end is None) and total_len and last_sum >= total_len:
gaps_now, merged_now = _compute_gaps(total_len, got_ranges)
if gaps_now and not (len(gaps_now) == 1 and gaps_now[0] == (0, 0)):
time.sleep_ms(50)
else:
time.sleep_ms(5)
else:
time.sleep_ms(5)
no_progress_count += 1
# Range如果长时间没有事件也结束让上层重试
if range_start is not None and range_end is not None and no_progress_count > 200:
break
# 全量:如果长时间没有新事件,且 sum 已经达到 total_len认为接收完成可能有丢包
if no_progress_count > 100 and total_len and last_sum >= total_len:
break
continue
no_progress_count = 0 # 有事件,重置计数器
if ev[0] == "header":
_, hid, code, hdr_text = ev
if urc_id is None:
urc_id = hid
if hid != urc_id:
continue
status_code = code
expect_len, md5_b64 = _extract_hdr_fields(hdr_text)
# 只在“首次全量 header”里保留 Content-Md5Range 响应通常不带该字段
if md5_b64:
expect_md5_b64 = md5_b64
cr_s, cr_e, cr_total = _extract_content_range(hdr_text)
if cr_s is not None and cr_total is not None:
# 206 Partial Content
offset_base = cr_s
# Content-Range end 是 inclusive总长度以 total 为准
if total_len is None:
total_len = cr_total
elif total_len != cr_total:
_log(f"[WARN] total_len changed {total_len}->{cr_total}")
total_len = cr_total
if body_buf is None and total_len:
body_buf = bytearray(total_len)
# 对 Range 响应:优先使用 Content-Length 作为本次响应体长度
if expect_len is not None:
resp_total = expect_len
_log(f"[HDR] id={hid} code={code} len={expect_len} md5={expect_md5_b64}")
continue
if ev[0] == "content":
_, cid, _total, _sum, _cur, payload = ev
if urc_id is None:
urc_id = cid
if cid != urc_id:
continue
# 全量 200这里的 _total 就是全文件长度Range 206_total 可能只是“本次响应体长度”
if body_buf is None:
# 如果 header 没解析出 Content-Range总长度用 content 的 _total
if total_len is None:
total_len = _total
if total_len:
body_buf = bytearray(total_len)
if body_buf is None or total_len is None:
continue
# 若 header 没给 Content-Length就用 content 的 _total 作为本次响应体长度Range 场景下通常是这次 body 的长度)
if resp_total is None:
resp_total = _total
rel_start = _sum - _cur
rel_end = _sum
abs_start = offset_base + rel_start
abs_end = offset_base + rel_end
if abs_start < 0 or abs_start >= total_len:
continue
if abs_end < abs_start:
continue
if abs_end > total_len:
abs_end = total_len
expected_span = abs_end - abs_start
actual_len = min(len(payload), expected_span)
if actual_len <= 0:
continue
# 写入并记录“实际写入区间”,用于 gap 计算
body_buf[abs_start:abs_start + actual_len] = payload[:actual_len]
got_ranges.add((abs_start, abs_start + actual_len))
filled_new_bytes += actual_len
# 记录最大的 sum 值,用于判断是否所有数据都已发送
if _sum > last_sum:
last_sum = _sum
# debug 输出节流:每 ~8000 字节或 >=500ms 输出一次,避免 print 导致 UART 丢包
if debug:
now = time.ticks_ms()
if (time.ticks_diff(now, last_print_ms) >= 500) or (_sum - last_print_sum >= 8000) or (rel_end == _total):
_log(f"[URC] {abs_start}:{abs_start+actual_len} sum={_sum}/{_total} base={offset_base} +{filled_new_bytes}")
last_print_ms = now
last_print_sum = _sum
# 若是全量请求offset_base=0 且 total_len==_total尽早结束
if offset_base == 0 and total_len == _total:
# 不要用 filled_new_bytes 判断是否完整(可能有重叠)
pass
# Range本次响应体已收齐退出交给上层判断是否补上了缺口
if resp_total is not None and last_sum >= resp_total:
# 给一点时间让可能的尾部事件入队,然后退出
time.sleep_ms(10)
break
# 5) 清理实例
at(f"AT+MHTTPDEL={httpid}", "OK", 3000)
if body_buf is None:
return False, "empty_body", body_buf, got_ranges, total_len, expect_md5_b64
if total_len is None:
return False, "no_total_len", body_buf, got_ranges, total_len, expect_md5_b64
# 返回“本次尝试是否有实质进展”Range 补洞时,哪怕不完整也算成功推进
if filled_new_bytes <= 0:
return False, "no_progress", body_buf, got_ranges, total_len, expect_md5_b64
return True, f"PARTIAL ok +{filled_new_bytes} ip={ip} code={status_code}", body_buf, got_ranges, total_len, expect_md5_b64
global ota_in_progress
try:
ota_in_progress = int(ota_in_progress) + 1
except:
ota_in_progress = 1
with uart4g_lock:
try:
# -------- Phase 1: 全量 GET允许不完整后面用 Range 补洞)--------
body_buf = None
got_ranges = set()
total_len = None
expect_md5_b64 = None
last_err = "unknown"
for attempt in range(1, retries + 1):
ok, msg, body_buf, got_ranges, total_len, expect_md5_b64 = _one_attempt(
body_buf=body_buf, got_ranges=got_ranges, total_len=total_len, expect_md5_b64=expect_md5_b64
)
last_err = msg
if not ok:
_log(f"[RETRY] full attempt={attempt} failed={msg}")
time.sleep_ms(200)
continue
gaps, merged = _compute_gaps(total_len, got_ranges)
filled_total = sum(e - s for s, e in merged)
if gaps and gaps[0] == (0, 0):
gaps = []
if not gaps:
break
_log(f"[GAPS] after full attempt={attempt} filled={filled_total}/{total_len} gaps={gaps[:3]}")
time.sleep_ms(150)
if body_buf is None or total_len is None:
return False, f"FAILED: {last_err}"
gaps, merged = _compute_gaps(total_len, got_ranges)
if gaps and gaps[0] == (0, 0):
gaps = []
# -------- Phase 2: Range 补洞 --------
# 说明:
# - “全量 GET 多次重试 + 合并已收到分片”我们已经在 Phase1 做了got_ranges/body_buf 会跨 attempt 累积)。
# - 仍存在 gaps 说明:这些字节段在全量阶段始终没收到,需要靠 Range 反复补洞。
#
# 策略:
# - Range 分块更小(更稳),失败时继续“二分缩小”到 MIN_RANGE_BYTES
# - 不要因为某一轮 no_progress 就立刻退出UART 偶发丢 URC需要多轮撞上一次成功
MAX_RANGE_BYTES = 1024
MIN_RANGE_BYTES = 128
RANGE_RETRIES_EACH = 8
MAX_HOLE_ROUNDS = 50
NO_PROGRESS_ROUNDS_LIMIT = 8
round_i = 0
no_progress_rounds = 0
while gaps and round_i < MAX_HOLE_ROUNDS:
round_i += 1
# 优先补最大的洞(通常只丢中间一两段)
gaps = sorted(gaps, key=lambda g: g[1] - g[0], reverse=True)
_log(f"[RANGE] round={round_i} gaps={gaps[:3]}")
progress_any = False
# 每轮最多补前 5 个洞,避免无限循环
for (gs, ge) in gaps[:5]:
cur = gs
chunk = MAX_RANGE_BYTES
while cur < ge:
sub_end = min(ge, cur + chunk)
# HTTP Range end is inclusive
rs = cur
re_incl = sub_end - 1
before_gaps, before_merged = _compute_gaps(total_len, got_ranges)
before_filled = sum(e - s for s, e in before_merged)
sub_ok = False
sub_err = "unknown"
for k in range(1, RANGE_RETRIES_EACH + 1):
ok2, msg2, body_buf, got_ranges, total_len, expect_md5_b64 = _one_attempt(
range_start=rs, range_end=re_incl,
body_buf=body_buf, got_ranges=got_ranges, total_len=total_len, expect_md5_b64=expect_md5_b64
)
sub_err = msg2
if ok2:
sub_ok = True
break
_log(f"[RETRY] range {rs}-{re_incl} try={k} failed={msg2}")
time.sleep_ms(150)
after_gaps, after_merged = _compute_gaps(total_len, got_ranges)
after_filled = sum(e - s for s, e in after_merged)
if after_filled > before_filled:
progress_any = True
# 成功推进:恢复到较大 chunk加快补洞
chunk = MAX_RANGE_BYTES
cur = sub_end
else:
# 没推进:缩小 chunk继续在同一位置重试不要前进 cur
if chunk > MIN_RANGE_BYTES:
chunk = max(MIN_RANGE_BYTES, chunk // 2)
_log(f"[RANGE] shrink chunk -> {chunk} at pos={cur}")
else:
# 已经很小还不行:本轮先放弃这个位置,留给下一轮再撞
if not sub_ok:
_log(f"[WARN] range {rs}-{re_incl} failed={sub_err}")
break
# 小歇一下,给读线程喘息
time.sleep_ms(120)
gaps, merged = _compute_gaps(total_len, got_ranges)
if gaps and gaps[0] == (0, 0):
gaps = []
filled_total = sum(e - s for s, e in merged)
if not gaps:
break
if not progress_any:
no_progress_rounds += 1
_log(f"[RANGE] no progress in round={round_i} ({no_progress_rounds}/{NO_PROGRESS_ROUNDS_LIMIT}) filled={filled_total}/{total_len}")
# 多轮无进展才退出(避免偶发“只 header 无 content URC”导致过早退出
if no_progress_rounds >= NO_PROGRESS_ROUNDS_LIMIT:
break
# 退避等待一下再继续下一轮
time.sleep_ms(500)
continue
else:
no_progress_rounds = 0
_log(f"[RANGE] round={round_i} filled={filled_total}/{total_len} gaps={gaps[:3]}")
# 完整性检查
gaps, merged = _compute_gaps(total_len, got_ranges)
if gaps and gaps[0] == (0, 0):
gaps = []
filled_total = sum(e - s for s, e in merged)
if gaps:
return False, f"incomplete_body got={filled_total} expected={total_len} missing={total_len - filled_total} gaps={gaps[:5]}"
data = bytes(body_buf)
# 校验Content-Md5base64若有
if expect_md5_b64 and hashlib is not None:
md5_b64 = _md5_base64(data)
if md5_b64 != expect_md5_b64:
return False, f"md5_mismatch got={md5_b64} expected={expect_md5_b64}"
# 写文件(原样 bytes
with open(filename, "wb") as f:
f.write(data)
return True, f"OK size={len(data)} ip={_get_ip()} md5={expect_md5_b64 or ''}"
finally:
try:
ota_in_progress = max(0, int(ota_in_progress) - 1)
except:
ota_in_progress = 0
def download_file_via_4g(url, filename,
total_timeout_ms=600000,
retries=3,
@@ -2036,6 +1570,7 @@ def download_file_via_4g(url, filename,
def _create_httpid(full_reset=False):
_clear_http_events()
at_client.flush()
if full_reset:
_hard_reset_http()
resp = at(f'AT+MHTTPCREATE="{base_url}"', "OK", 8000)

1278
ota_manager.py Normal file

File diff suppressed because it is too large Load Diff

112
power.py Normal file
View File

@@ -0,0 +1,112 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
电源管理模块INA226
提供电压、电流监测和充电状态检测
"""
import config
from logger_manager import logger_manager
def write_register(reg, value):
"""写入INA226寄存器"""
from hardware import hardware_manager
data = [(value >> 8) & 0xFF, value & 0xFF]
hardware_manager.bus.writeto_mem(config.INA226_ADDR, reg, bytes(data))
def read_register(reg):
"""读取INA226寄存器"""
from hardware import hardware_manager
data = hardware_manager.bus.readfrom_mem(config.INA226_ADDR, reg, 2)
return (data[0] << 8) | data[1]
def init_ina226():
"""初始化 INA226 芯片:配置模式 + 校准值"""
write_register(config.REG_CONFIGURATION, 0x4527)
write_register(config.REG_CALIBRATION, config.CALIBRATION_VALUE)
def get_bus_voltage():
"""读取总线电压单位V"""
raw = read_register(config.REG_BUS_VOLTAGE)
return raw * 1.25 / 1000
def get_current():
"""
读取电流单位mA
正数表示充电,负数表示放电
INA226 电流计算公式:
Current = (Current Register Value) × Current_LSB
Current_LSB = 0.001 × CALIBRATION_VALUE / 4096
"""
try:
raw = read_register(config.REG_CURRENT)
# INA226 电流寄存器是16位有符号整数
# 最高位是符号位0=正充电1=负(放电)
# 计算 Current_LSB根据 CALIBRATION_VALUE
current_lsb = 0.001 * config.CALIBRATION_VALUE / 4096 # 单位A
# 处理有符号数如果最高位为1转换为负数
if raw & 0x8000: # 最高位为1表示负数放电
signed_raw = raw - 0x10000 # 转换为有符号整数
else: # 最高位为0表示正数充电
signed_raw = raw
# 转换为毫安
current_ma = signed_raw * current_lsb * 1000
return current_ma
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[INA226] 读取电流失败: {e}")
else:
print(f"[INA226] 读取电流失败: {e}")
return 0.0
def is_charging(threshold_ma=10.0):
"""
检测是否在充电(通过电流方向判断)
Args:
threshold_ma: 电流阈值毫安超过此值认为在充电默认10mA
Returns:
True: 正在充电
False: 未充电或读取失败
"""
try:
current = get_current()
is_charge = current > threshold_ma
return is_charge
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[CHARGE] 检测充电状态失败: {e}")
else:
print(f"[CHARGE] 检测充电状态失败: {e}")
return False
def voltage_to_percent(voltage):
"""根据电压估算电池百分比(查表插值)"""
points = [
(4.20, 100), (4.10, 95), (4.05, 85), (4.00, 75), (3.95, 65),
(3.90, 55), (3.85, 45), (3.80, 35), (3.75, 25), (3.70, 15),
(3.65, 5), (3.60, 0)
]
if voltage >= points[0][0]:
return 100
if voltage <= points[-1][0]:
return 0
for i in range(len(points) - 1):
v1, p1 = points[i]
v2, p2 = points[i + 1]
if voltage >= v2:
ratio = (voltage - v1) / (v2 - v1)
percent = p1 + (p2 - p1) * ratio
return max(0, min(100, int(round(percent))))
return 0

186
time_sync.py Normal file
View File

@@ -0,0 +1,186 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
时间同步模块
从4G模块获取时间并同步到系统
"""
import re
import os
from datetime import datetime, timedelta
import config
# from logger_bak import get_logger
from logger_manager import logger_manager
def parse_4g_time(cclk_response, timezone_offset=8):
"""
解析 AT+CCLK? 返回的时间字符串,并转换为本地时间
Args:
cclk_response: AT+CCLK? 的响应字符串
timezone_offset: 时区偏移小时默认8中国时区 UTC+8
Returns:
datetime 对象(已转换为本地时间),如果解析失败返回 None
"""
try:
# 匹配格式: +CCLK: "YY/MM/DD,HH:MM:SS+TZ"
# 时区单位是四分之一小时quarters of an hour
match = re.search(r'\+CCLK:\s*"(\d{2})/(\d{2})/(\d{2}),(\d{2}):(\d{2}):(\d{2})([+-]\d{1,3})?"', cclk_response)
if not match:
return None
yy, mm, dd, hh, MM, ss, tz_str = match.groups()
# 年份处理26 -> 2026
year = 2000 + int(yy)
month = int(mm)
day = int(dd)
hour = int(hh)
minute = int(MM)
second = int(ss)
# 创建 UTC 时间的 datetime 对象
dt_utc = datetime(year, month, day, hour, minute, second)
# 解析时区偏移(单位:四分之一小时)
if tz_str:
try:
# 时区偏移值(四分之一小时)
tz_quarters = int(tz_str)
# 转换为小时除以4
tz_hours = tz_quarters / 4.0
logger = logger_manager.logger
if logger:
logger.info(f"[TIME] 时区偏移: {tz_str} (四分之一小时) = {tz_hours} 小时")
# 转换为本地时间
dt_local = dt_utc + timedelta(hours=tz_hours)
except ValueError:
# 如果时区解析失败,使用默认值
logger = logger_manager.logger
if logger:
logger.warning(f"[TIME] 时区解析失败: {tz_str},使用默认 UTC+{timezone_offset}")
dt_local = dt_utc + timedelta(hours=timezone_offset)
else:
# 没有时区信息,使用默认值
logger = logger_manager.logger
if logger:
logger.info(f"[TIME] 未找到时区信息,使用默认 UTC+{timezone_offset}")
dt_local = dt_utc + timedelta(hours=timezone_offset)
logger = logger_manager.logger
if logger:
logger.info(f"[TIME] UTC时间: {dt_utc.strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"[TIME] 本地时间: {dt_local.strftime('%Y-%m-%d %H:%M:%S')}")
return dt_local
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[TIME] 解析时间失败: {e}, 响应: {cclk_response}")
else:
print(f"[TIME] 解析时间失败: {e}, 响应: {cclk_response}")
return None
def get_time_from_4g(timezone_offset=8):
"""
通过4G模块获取当前时间已转换为本地时间
Args:
timezone_offset: 时区偏移小时默认8中国时区
Returns:
datetime 对象(本地时间),如果获取失败返回 None
"""
try:
# 发送 AT+CCLK? 命令(延迟导入避免循环依赖)
from hardware import hardware_manager
# 检查 at_client 是否已初始化
if hardware_manager.at_client is None:
logger = logger_manager.logger
if logger:
logger.warning("[TIME] ATClient 尚未初始化无法获取4G时间")
else:
print("[TIME] ATClient 尚未初始化无法获取4G时间")
return None
resp = hardware_manager.at_client.send("AT+CCLK?", "OK", 3000)
if not resp or "+CCLK:" not in resp:
logger = logger_manager.logger
if logger:
logger.warning(f"[TIME] 未获取到时间响应: {resp}")
else:
print(f"[TIME] 未获取到时间响应: {resp}")
return None
# 解析并转换时区
dt = parse_4g_time(resp, timezone_offset)
return dt
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[TIME] 获取4G时间异常: {e}")
else:
print(f"[TIME] 获取4G时间异常: {e}")
return None
def sync_system_time_from_4g(timezone_offset=8):
"""
从4G模块同步时间到系统
Args:
timezone_offset: 时区偏移小时默认8中国时区
Returns:
bool: 是否成功
"""
dt = get_time_from_4g(timezone_offset)
if not dt:
return False
try:
# 转换为系统 date 命令需要的格式
time_str = dt.strftime('%Y-%m-%d %H:%M:%S')
# 设置系统时间
cmd = f'date -s "{time_str}" 2>&1'
result = os.system(cmd)
if result == 0:
logger = logger_manager.logger
if logger:
logger.info(f"[TIME] 系统时间已设置为: {time_str}")
else:
print(f"[TIME] 系统时间已设置为: {time_str}")
# 可选:同步到硬件时钟
try:
os.system('hwclock -w 2>/dev/null')
logger = logger_manager.logger
if logger:
logger.info("[TIME] 已同步到硬件时钟")
except:
pass
return True
else:
logger = logger_manager.logger
if logger:
logger.error(f"[TIME] 设置系统时间失败,退出码: {result}")
else:
print(f"[TIME] 设置系统时间失败,退出码: {result}")
return False
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[TIME] 同步系统时间异常: {e}")
else:
print(f"[TIME] 同步系统时间异常: {e}")
return False

13
version.py Normal file
View File

@@ -0,0 +1,13 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
应用版本号
每次 OTA 更新时,只需要更新这个文件中的版本号
"""
VERSION = '1.1.1'

360
vision.py Normal file
View File

@@ -0,0 +1,360 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
视觉检测模块
提供靶心检测、距离估算、图像保存等功能
"""
import cv2
import numpy as np
import os
import math
from maix import image
import globals
import config
from logger_manager import logger_manager
def detect_circle_v3(frame, laser_point=None):
"""检测图像中的靶心(优先清晰轮廓,其次黄色区域)- 返回椭圆参数版本
增加红色圆圈检测,验证黄色圆圈是否为真正的靶心
如果提供 laser_point会选择最接近激光点的目标
Args:
frame: 图像帧
laser_point: 激光点坐标 (x, y),用于多目标场景下的目标选择
Returns:
(result_img, best_center, best_radius, method, best_radius1, ellipse_params)
"""
img_cv = image.image2cv(frame, False, False)
best_center = best_radius = best_radius1 = method = None
ellipse_params = None
# HSV 黄色掩码检测(模糊靶心)
hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV)
h, s, v = cv2.split(hsv)
# 调整饱和度策略:稍微增强,不要过度
s = np.clip(s * 1.1, 0, 255).astype(np.uint8)
hsv = cv2.merge((h, s, v))
# 放宽 HSV 阈值范围(针对模糊图像的关键调整)
lower_yellow = np.array([7, 80, 0]) # 饱和度下限降低,捕捉淡黄色
upper_yellow = np.array([32, 255, 255]) # 亮度上限拉满
mask_yellow = cv2.inRange(hsv, lower_yellow, upper_yellow)
# 调整形态学操作
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask_yellow = cv2.morphologyEx(mask_yellow, cv2.MORPH_CLOSE, kernel)
contours_yellow, _ = cv2.findContours(mask_yellow, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# 存储所有有效的黄色-红色组合
valid_targets = []
if contours_yellow:
for cnt_yellow in contours_yellow:
area = cv2.contourArea(cnt_yellow)
perimeter = cv2.arcLength(cnt_yellow, True)
# 计算圆度
if perimeter > 0:
circularity = (4 * np.pi * area) / (perimeter * perimeter)
else:
circularity = 0
logger = logger_manager.logger
if area > 50 and circularity > 0.7:
if logger:
logger.info(f"[target] -> 面积:{area}, 圆度:{circularity:.2f}")
# 尝试拟合椭圆
yellow_center = None
yellow_radius = None
yellow_ellipse = None
if len(cnt_yellow) >= 5:
(x, y), (width, height), angle = cv2.fitEllipse(cnt_yellow)
yellow_ellipse = ((x, y), (width, height), angle)
axes_minor = min(width, height)
radius = axes_minor / 2
yellow_center = (int(x), int(y))
yellow_radius = int(radius)
else:
(x, y), radius = cv2.minEnclosingCircle(cnt_yellow)
yellow_center = (int(x), int(y))
yellow_radius = int(radius)
yellow_ellipse = None
# 如果检测到黄色圆圈,再检测红色圆圈进行验证
if yellow_center and yellow_radius:
# HSV 红色掩码检测红色在HSV中跨越0度需要两个范围
# 红色范围1: 0-10度接近0度的红色
lower_red1 = np.array([0, 80, 0])
upper_red1 = np.array([10, 255, 255])
mask_red1 = cv2.inRange(hsv, lower_red1, upper_red1)
# 红色范围2: 170-180度接近180度的红色
lower_red2 = np.array([170, 80, 0])
upper_red2 = np.array([180, 255, 255])
mask_red2 = cv2.inRange(hsv, lower_red2, upper_red2)
# 合并两个红色掩码
mask_red = cv2.bitwise_or(mask_red1, mask_red2)
# 形态学操作
kernel_red = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask_red = cv2.morphologyEx(mask_red, cv2.MORPH_CLOSE, kernel_red)
contours_red, _ = cv2.findContours(mask_red, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
found_valid_red = False
if contours_red:
# 找到所有符合条件的红色圆圈
for cnt_red in contours_red:
area_red = cv2.contourArea(cnt_red)
perimeter_red = cv2.arcLength(cnt_red, True)
if perimeter_red > 0:
circularity_red = (4 * np.pi * area_red) / (perimeter_red * perimeter_red)
else:
circularity_red = 0
# 红色圆圈也应该有一定的圆度
if area_red > 50 and circularity_red > 0.6:
# 计算红色圆圈的中心和半径
if len(cnt_red) >= 5:
(x_red, y_red), (w_red, h_red), angle_red = cv2.fitEllipse(cnt_red)
radius_red = min(w_red, h_red) / 2
red_center = (int(x_red), int(y_red))
red_radius = int(radius_red)
else:
(x_red, y_red), radius_red = cv2.minEnclosingCircle(cnt_red)
red_center = (int(x_red), int(y_red))
red_radius = int(radius_red)
# 计算黄色和红色圆心的距离
if red_center:
dx = yellow_center[0] - red_center[0]
dy = yellow_center[1] - red_center[1]
distance = np.sqrt(dx*dx + dy*dy)
# 圆心距离阈值应该小于黄色半径的某个倍数比如1.5倍)
max_distance = yellow_radius * 1.5
# 红色圆圈应该比黄色圆圈大(外圈)
if distance < max_distance and red_radius > yellow_radius * 0.8:
found_valid_red = True
logger = logger_manager.logger
if logger:
logger.info(f"[target] -> 找到匹配的红圈: 黄心({yellow_center}), 红心({red_center}), 距离:{distance:.1f}, 黄半径:{yellow_radius}, 红半径:{red_radius}")
# 记录这个有效目标
valid_targets.append({
'center': yellow_center,
'radius': yellow_radius,
'ellipse': yellow_ellipse,
'area': area
})
break
if not found_valid_red:
logger = logger_manager.logger
if logger:
logger.debug("Debug -> 未找到匹配的红色圆圈,可能是误识别")
# 从所有有效目标中选择最佳目标
if valid_targets:
if laser_point:
# 如果有激光点,选择最接近激光点的目标
best_target = None
min_distance = float('inf')
for target in valid_targets:
dx = target['center'][0] - laser_point[0]
dy = target['center'][1] - laser_point[1]
distance = np.sqrt(dx*dx + dy*dy)
if distance < min_distance:
min_distance = distance
best_target = target
if best_target:
best_center = best_target['center']
best_radius = best_target['radius']
ellipse_params = best_target['ellipse']
method = "v3_ellipse_red_validated_laser_selected"
best_radius1 = best_radius * 5
else:
# 如果没有激光点,选择面积最大的目标
best_target = max(valid_targets, key=lambda t: t['area'])
best_center = best_target['center']
best_radius = best_target['radius']
ellipse_params = best_target['ellipse']
method = "v3_ellipse_red_validated"
best_radius1 = best_radius * 5
result_img = image.cv2image(img_cv, False, False)
return result_img, best_center, best_radius, method, best_radius1, ellipse_params
def estimate_distance(pixel_radius):
"""根据像素半径估算实际距离(单位:米)"""
if not pixel_radius:
return 0.0
return (config.REAL_RADIUS_CM * config.FOCAL_LENGTH_PIX) / pixel_radius / 100.0
def compute_laser_position(circle_center, laser_point, radius, method):
"""计算激光相对于靶心的偏移量(单位:厘米)"""
if not all([circle_center, radius, method]):
return None, None
cx, cy = circle_center
lx, ly = laser_point
# 根据检测方法动态调整靶心物理半径(简化模型)
circle_r = (radius / 4.0) * 20.0 if method == "模糊" else (68 / 16.0) * 20.0
dx = lx - cx
dy = ly - cy
return dx / (circle_r / 100.0), -dy / (circle_r / 100.0)
def save_shot_image(result_img, center, radius, method, ellipse_params,
laser_point, distance_m, photo_dir=None):
"""
保存射击图像(带标注)
即使没有检测到靶心也会保存图像,文件名会标注 "no_target"
确保保存的图像总是包含激光十字线
Args:
result_img: 处理后的图像对象(可能已经包含激光十字线或检测标注)
center: 靶心中心坐标 (x, y),可能为 None未检测到靶心
radius: 靶心半径,可能为 None未检测到靶心
method: 检测方法,可能为 None未检测到靶心
ellipse_params: 椭圆参数 ((center, (width, height), angle)) 或 None
laser_point: 激光点坐标 (x, y)
distance_m: 距离(米),可能为 None未检测到靶心
photo_dir: 照片存储目录如果为None则使用 config.PHOTO_DIR
Returns:
str: 保存的文件路径,如果保存失败或未启用则返回 None
"""
# 检查是否启用图像保存
if not config.SAVE_IMAGE_ENABLED:
return None
if photo_dir is None:
photo_dir = config.PHOTO_DIR
try:
# 确保照片目录存在
try:
if photo_dir not in os.listdir("/root"):
os.mkdir(photo_dir)
except:
pass
# 生成文件名
# 统计所有图片文件(包括 .bmp 和 .jpg
try:
all_images = [f for f in os.listdir(photo_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))]
img_count = len(all_images)
except:
img_count = 0
x, y = laser_point
# 如果未检测到靶心,在文件名中标注
if center is None or radius is None:
method_str = "no_target"
distance_str = "000"
else:
method_str = method or "unknown"
distance_str = str(round((distance_m or 0.0) * 100))
filename = f"{photo_dir}/{method_str}_{int(x)}_{int(y)}_{distance_str}_{img_count:04d}.bmp"
logger = logger_manager.logger
if logger:
if center and radius:
logger.info(f"结果 -> 圆心: {center}, 半径: {radius}, 方法: {method}")
if ellipse_params:
(ell_center, (width, height), angle) = ellipse_params
logger.info(f"椭圆 -> 中心: ({ell_center[0]:.1f}, {ell_center[1]:.1f}), 长轴: {max(width, height):.1f}, 短轴: {min(width, height):.1f}, 角度: {angle:.1f}°")
else:
logger.info(f"结果 -> 未检测到靶心,保存原始图像(激光点: ({x}, {y})")
# 转换图像为 OpenCV 格式以便绘制
img_cv = image.image2cv(result_img, False, False)
# 确保激光十字线被绘制使用OpenCV在图像上绘制确保可见性
laser_color = (config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2])
thickness = max(config.LASER_THICKNESS, 2) # 至少2像素宽确保可见
length = max(config.LASER_LENGTH, 10) # 至少10像素长
# 绘制激光十字线(水平线)
cv2.line(img_cv,
(int(x - length), int(y)),
(int(x + length), int(y)),
laser_color, thickness)
# 绘制激光十字线(垂直线)
cv2.line(img_cv,
(int(x), int(y - length)),
(int(x), int(y + length)),
laser_color, thickness)
# 绘制激光点
cv2.circle(img_cv, (int(x), int(y)), max(thickness, 3), laser_color, -1)
# 如果检测到靶心,绘制靶心标注
if center and radius:
cx, cy = center
if ellipse_params:
(ell_center, (width, height), angle) = ellipse_params
cx_ell, cy_ell = int(ell_center[0]), int(ell_center[1])
# 绘制椭圆
cv2.ellipse(img_cv,
(cx_ell, cy_ell),
(int(width/2), int(height/2)),
angle,
0, 360,
(0, 255, 0),
2)
cv2.circle(img_cv, (cx_ell, cy_ell), 3, (255, 0, 0), -1)
# 绘制短轴
minor_length = min(width, height) / 2
minor_angle = angle + 90 if width >= height else angle
minor_angle_rad = math.radians(minor_angle)
dx_minor = minor_length * math.cos(minor_angle_rad)
dy_minor = minor_length * math.sin(minor_angle_rad)
pt1_minor = (int(cx_ell - dx_minor), int(cy_ell - dy_minor))
pt2_minor = (int(cx_ell + dx_minor), int(cy_ell + dy_minor))
cv2.line(img_cv, pt1_minor, pt2_minor, (0, 0, 255), 2)
else:
# 绘制圆形靶心
cv2.circle(img_cv, (cx, cy), radius, (0, 0, 255), 2)
cv2.circle(img_cv, (cx, cy), 2, (0, 0, 255), -1)
# 如果检测到靶心,绘制从激光点到靶心的连线(可选,用于可视化偏移)
cv2.line(img_cv, (int(x), int(y)), (cx, cy), (255, 255, 0), 1)
# 转换回 MaixPy 图像格式并保存
result_img = image.cv2image(img_cv, False, False)
result_img.save(filename)
if logger:
if center and radius:
logger.debug(f"图像已保存(含靶心标注): {filename}")
else:
logger.debug(f"图像已保存(无靶心,含激光十字线): {filename}")
return filename
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"保存图像失败: {e}")
import traceback
logger.error(traceback.format_exc())
return None