refine the code to different part

This commit is contained in:
huangzhenwei2
2026-01-12 11:39:27 +08:00
parent 92ad32bb8e
commit 708925ab41
10 changed files with 2795 additions and 0 deletions

79
S99archery Normal file
View File

@@ -0,0 +1,79 @@
#!/bin/sh
# /etc/init.d/S99archery
# 系统启动时处理致命错误恢复(仅处理无法启动的情况)
# 注意:应用的启动由系统自动启动机制处理(通过 auto_start.txt
# 功能:
# 1. 处理致命错误(无法启动)- 恢复 main.py
# 2. 如果重启次数超过阈值,恢复 main.py 并重启系统
APP_DIR="/maixapp/apps/t11"
MAIN_PY="$APP_DIR/main.py"
PENDING_FILE="$APP_DIR/ota_pending.json"
BACKUP_BASE="$APP_DIR/backups"
# 进入应用目录
cd "$APP_DIR" || exit 0
# 检查 pending 文件,如果存在且超过重启次数,恢复 main.py处理致命错误
if [ -f "$PENDING_FILE" ]; then
echo "[S99] 检测到 ota_pending.json检查重启计数..."
# 尝试从JSON中提取重启计数使用grep简单提取
RESTART_COUNT=$(cat "$PENDING_FILE" 2>/dev/null | grep -o '"restart_count":[0-9]*' | grep -o '[0-9]*' || echo "0")
MAX_RESTARTS=$(cat "$PENDING_FILE" 2>/dev/null | grep -o '"max_restarts":[0-9]*' | grep -o '[0-9]*' || echo "3")
if [ -n "$RESTART_COUNT" ] && [ "$RESTART_COUNT" -ge "$MAX_RESTARTS" ]; then
echo "[S99] 检测到重启次数 ($RESTART_COUNT) 超过阈值 ($MAX_RESTARTS),恢复 main.py..."
# 尝试从JSON中提取备份目录
BACKUP_DIR=$(cat "$PENDING_FILE" 2>/dev/null | grep -o '"backup_dir":"[^"]*"' | grep -o '/[^"]*' || echo "")
if [ -n "$BACKUP_DIR" ] && [ -f "$BACKUP_DIR/main.py" ]; then
# 使用指定的备份目录
echo "[S99] 从备份目录恢复: $BACKUP_DIR/main.py"
cp "$BACKUP_DIR/main.py" "$MAIN_PY" 2>/dev/null && echo "[S99] 已恢复 main.py"
else
# 查找最新的备份目录
LATEST_BACKUP=$(ls -dt "$BACKUP_BASE"/backup_* 2>/dev/null | head -1)
if [ -n "$LATEST_BACKUP" ] && [ -f "$LATEST_BACKUP/main.py" ]; then
echo "[S99] 从最新备份恢复: $LATEST_BACKUP/main.py"
cp "$LATEST_BACKUP/main.py" "$MAIN_PY" 2>/dev/null && echo "[S99] 已恢复 main.py"
else
# 如果没有备份目录,尝试使用 main.py.bak
if [ -f "$APP_DIR/main.py.bak" ]; then
echo "[S99] 从 main.py.bak 恢复"
cp "$APP_DIR/main.py.bak" "$MAIN_PY" 2>/dev/null && echo "[S99] 已恢复 main.py"
fi
fi
fi
# 恢复后重置重启计数,避免循环恢复
# 注意:不在这里删除 pending 文件,让 main.py 在心跳成功后删除
# 但是重置重启计数,以便恢复后的版本可以重新开始计数
python3 -c "
import json, os
try:
pending_path = '$PENDING_FILE'
if os.path.exists(pending_path):
with open(pending_path, 'r', encoding='utf-8') as f:
d = json.load(f)
d['restart_count'] = 0 # 重置重启计数
with open(pending_path, 'w', encoding='utf-8') as f:
json.dump(d, f)
print('[S99] 已重置重启计数为 0')
except Exception as e:
print(f'[S99] 重置重启计数失败: {e}')
" 2>/dev/null || echo "[S99] 无法重置重启计数可能需要Python支持"
echo "[S99] 已恢复 main.py重启系统..."
echo "[S99] 注意pending 文件将在心跳成功后由 main.py 删除"
sleep 2
reboot
exit 0
fi
fi
# 不启动应用,让系统自动启动机制处理
# 这个脚本只负责处理致命错误恢复
exit 0

307
at_client.py Normal file
View File

@@ -0,0 +1,307 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AT客户端模块
负责4G模块的AT命令通信和URC解析
"""
import _thread
from maix import time
import re
import threading
class ATClient:
"""
单读者 AT/URC 客户端:唯一读取 uart4g避免 tcp_main/at()/OTA 抢读导致 EOF / 丢包。
- send(cmd, expect, timeout_ms) : 发送 AT 并等待 expect
- pop_tcp_payload() : 获取 +MIPURC:"rtcp" 的 payload已按长度裁剪
- pop_http_event() : 获取 +MHTTPURC 事件header/content
"""
def __init__(self, uart_obj):
self.uart = uart_obj
self._cmd_lock = threading.Lock()
self._q_lock = threading.Lock()
self._rx = b""
self._tcp_payloads = []
self._http_events = []
# 当前命令等待状态(仅允许单命令 in-flight
self._waiting = False
self._expect = b"OK"
self._resp = b""
self._running = False
def start(self):
if self._running:
return
self._running = True
_thread.start_new_thread(self._reader_loop, ())
def stop(self):
self._running = False
def flush(self):
"""清空内部缓存与队列(用于 OTA/异常恢复)"""
with self._q_lock:
self._rx = b""
self._tcp_payloads.clear()
self._http_events.clear()
self._resp = b""
def pop_tcp_payload(self):
with self._q_lock:
if self._tcp_payloads:
return self._tcp_payloads.pop(0)
return None
def pop_http_event(self):
with self._q_lock:
if self._http_events:
return self._http_events.pop(0)
return None
def _push_tcp_payload(self, payload: bytes):
# 注意:在 _reader_loop 内部解析 URC 时已经持有 _q_lock
# 这里不要再次 acquire锁不可重入会死锁
self._tcp_payloads.append(payload)
def _push_http_event(self, ev):
# 同上:避免在 _reader_loop 持锁期间二次 acquire
self._http_events.append(ev)
def send(self, cmd: str, expect: str = "OK", timeout_ms: int = 2000):
"""
发送 AT 命令并等待 expect子串匹配
注意expect=">" 用于等待 prompt。
"""
expect_b = expect.encode() if isinstance(expect, str) else expect
with self._cmd_lock:
# 初始化等待
self._waiting = True
self._expect = expect_b
self._resp = b""
# 发送
if cmd:
# 注意:这里不要再用 uart4g_lock否则外层已经持锁时会死锁
# 写入由 _cmd_lock 串行化即可。
self.uart.write((cmd + "\r\n").encode())
t0 = time.ticks_ms()
while time.ticks_ms() - t0 < timeout_ms:
if (not self._waiting) or (self._expect in self._resp):
self._waiting = False
break
time.sleep_ms(5)
# 超时也返回已收集内容(便于诊断)
self._waiting = False
try:
return self._resp.decode(errors="ignore")
except:
return str(self._resp)
def _find_urc_tag(self, tag: bytes):
"""
只在"真正的 URC 边界"查找 tag避免误命中 HTTP payload 内容。
规则tag 必须出现在 buffer 开头,或紧跟在 b"\\r\\n" 后面。
"""
try:
i = 0
rx = self._rx
while True:
j = rx.find(tag, i)
if j < 0:
return -1
if j == 0:
return 0
if j >= 2 and rx[j - 2:j] == b"\r\n":
return j
i = j + 1
except:
return -1
def _parse_mipurc_rtcp(self):
"""
解析:+MIPURC: "rtcp",<link_id>,<len>,<payload...>
之前硬编码 link_id=0 会导致在多连接/重连场景下收不到数据。
"""
prefix = b'+MIPURC: "rtcp",'
i = self._find_urc_tag(prefix)
if i < 0:
return False
# 丢掉前置噪声
if i > 0:
self._rx = self._rx[i:]
i = 0
j = len(prefix)
# 解析 link_id
k = j
while k < len(self._rx) and 48 <= self._rx[k] <= 57:
k += 1
if k == j or k >= len(self._rx):
return False
if self._rx[k:k+1] != b",":
self._rx = self._rx[1:]
return True
try:
link_id = int(self._rx[j:k].decode())
except:
self._rx = self._rx[1:]
return True
# 解析 len
j2 = k + 1
k2 = j2
while k2 < len(self._rx) and 48 <= self._rx[k2] <= 57:
k2 += 1
if k2 == j2 or k2 >= len(self._rx):
return False
if self._rx[k2:k2+1] != b",":
self._rx = self._rx[1:]
return True
try:
n = int(self._rx[j2:k2].decode())
except:
self._rx = self._rx[1:]
return True
payload_start = k2 + 1
payload_end = payload_start + n
if len(self._rx) < payload_end:
return False # payload 未收齐
payload = self._rx[payload_start:payload_end]
# 把 link_id 一起带上,便于上层过滤(如果需要)
self._push_tcp_payload((link_id, payload))
self._rx = self._rx[payload_end:]
return True
def _parse_mhttpurc_header(self):
tag = b'+MHTTPURC: "header",'
i = self._find_urc_tag(tag)
if i < 0:
return False
if i > 0:
self._rx = self._rx[i:]
i = 0
# header: +MHTTPURC: "header",<id>,<code>,<hdr_len>,<hdr_text...>
j = len(tag)
comma_count = 0
k = j
while k < len(self._rx) and comma_count < 3:
if self._rx[k:k+1] == b",":
comma_count += 1
k += 1
if comma_count < 3:
return False
prefix = self._rx[:k]
m = re.search(rb'\+MHTTPURC: "header",\s*(\d+),\s*(\d+),\s*(\d+),', prefix)
if not m:
self._rx = self._rx[1:]
return True
urc_id = int(m.group(1))
code = int(m.group(2))
hdr_len = int(m.group(3))
text_start = k
text_end = text_start + hdr_len
if len(self._rx) < text_end:
return False
hdr_text = self._rx[text_start:text_end].decode("utf-8", "ignore")
self._push_http_event(("header", urc_id, code, hdr_text))
self._rx = self._rx[text_end:]
return True
def _parse_mhttpurc_content(self):
tag = b'+MHTTPURC: "content",'
i = self._find_urc_tag(tag)
if i < 0:
return False
if i > 0:
self._rx = self._rx[i:]
i = 0
# content: +MHTTPURC: "content",<id>,<total>,<sum>,<cur>,<payload...>
j = len(tag)
comma_count = 0
k = j
while k < len(self._rx) and comma_count < 4:
if self._rx[k:k+1] == b",":
comma_count += 1
k += 1
if comma_count < 4:
return False
prefix = self._rx[:k]
m = re.search(rb'\+MHTTPURC: "content",\s*(\d+),\s*(\d+),\s*(\d+),\s*(\d+),', prefix)
if not m:
self._rx = self._rx[1:]
return True
urc_id = int(m.group(1))
total_len = int(m.group(2))
sum_len = int(m.group(3))
cur_len = int(m.group(4))
payload_start = k
payload_end = payload_start + cur_len
if len(self._rx) < payload_end:
return False
payload = self._rx[payload_start:payload_end]
self._push_http_event(("content", urc_id, total_len, sum_len, cur_len, payload))
self._rx = self._rx[payload_end:]
return True
def _reader_loop(self):
while self._running:
# 关键UART 驱动偶发 read failed必须兜住否则线程挂了 OTA/TCP 都会卡死
try:
d = self.uart.read(4096) # 8192 在一些驱动上更容易触发 read failed
except Exception as e:
try:
print("[ATClient] uart read failed:", e)
except:
pass
time.sleep_ms(50)
continue
if not d:
time.sleep_ms(1)
continue
with self._q_lock:
self._rx += d
if self._waiting:
self._resp += d
while True:
progressed = (
self._parse_mipurc_rtcp()
or self._parse_mhttpurc_header()
or self._parse_mhttpurc_content()
)
if not progressed:
break
# 使用 ota_manager 访问 ota_in_progress
try:
from ota_manager import ota_manager
ota_flag = ota_manager.ota_in_progress
except:
ota_flag = False
has_http_hint = (b"+MHTTP" in self._rx) or (b"+MHTTPURC" in self._rx)
if ota_flag or has_http_hint:
if len(self._rx) > 512 * 1024:
self._rx = self._rx[-256 * 1024:]
else:
if len(self._rx) > 16384:
self._rx = self._rx[-4096:]

81
config.py Normal file
View File

@@ -0,0 +1,81 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
系统配置常量
这些值在程序运行期间基本不变,或只在配置时改变
"""
from version import VERSION
# ==================== 应用配置 ====================
APP_VERSION = VERSION
APP_DIR = "/maixapp/apps/t11"
LOCAL_FILENAME = "/maixapp/apps/t11/main_tmp.py"
# ==================== 服务器配置 ====================
SERVER_IP = "www.shelingxingqiu.com"
SERVER_PORT = 50005
HEARTBEAT_INTERVAL = 15 # 心跳间隔(秒)
# ==================== HTTP配置 ====================
HTTP_URL = "http://ws.shelingxingqiu.com"
HTTP_API_PATH = "/home/shoot/device_fire/arrow/fire"
# ==================== 文件路径配置 ====================
CONFIG_FILE = "/root/laser_config.json"
LOG_FILE = "/maixapp/apps/t11/app.log"
BACKUP_BASE = "/maixapp/apps/t11/backups"
# ==================== 硬件配置 ====================
# UART配置
UART4G_DEVICE = "/dev/ttyS2"
UART4G_BAUDRATE = 115200
DISTANCE_SERIAL_DEVICE = "/dev/ttyS1"
DISTANCE_SERIAL_BAUDRATE = 9600
# I2C配置
I2C_BUS_NUM = 1
INA226_ADDR = 0x40
REG_CONFIGURATION = 0x00
REG_BUS_VOLTAGE = 0x02
REG_CURRENT = 0x04 # 电流寄存器
REG_CALIBRATION = 0x05
CALIBRATION_VALUE = 0x1400
# ADC配置
ADC_CHANNEL = 0
ADC_TRIGGER_THRESHOLD = 3000
ADC_LASER_THRESHOLD = 3000
# ==================== 激光配置 ====================
MODULE_ADDR = 0x00
LASER_ON_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC1])
LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0])
DEFAULT_LASER_POINT = (640, 480) # 默认激光中心点
# ==================== 视觉检测配置 ====================
FOCAL_LENGTH_PIX = 2250.0 # 焦距(像素)
REAL_RADIUS_CM = 20 # 靶心实际半径(厘米)
# ==================== 显示配置 ====================
LASER_COLOR = (255, 100, 0) # RGB颜色
LASER_THICKNESS = 1
LASER_LENGTH = 2
# ==================== 图像保存配置 ====================
SAVE_IMAGE_ENABLED = True # 是否保存图像True=保存False=不保存)
PHOTO_DIR = "/root/phot" # 照片存储目录
# ==================== OTA配置 ====================
MAX_BACKUPS = 5
LOG_MAX_BYTES = 10 * 1024 * 1024 # 10MB
LOG_BACKUP_COUNT = 5
# ==================== 引脚映射配置 ====================
PIN_MAPPINGS = {
"A18": "UART1_RX",
"A19": "UART1_TX",
"A29": "UART2_RX",
"A28": "UART2_TX",
"P18": "I2C1_SCL",
"P21": "I2C1_SDA",
}

119
hardware.py Normal file
View File

@@ -0,0 +1,119 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
硬件管理器模块
提供硬件对象的统一管理和访问
"""
import threading
import config
from at_client import ATClient
class HardwareManager:
"""硬件管理器(单例)"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(HardwareManager, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# 私有硬件对象
self._uart4g = None # 4G模块UART
self._distance_serial = None # 激光测距串口
self._bus = None # I2C总线
self._adc_obj = None # ADC对象
self._at_client = None # AT客户端
self._initialized = True
# ==================== 硬件访问(只读属性)====================
@property
def uart4g(self):
"""4G模块UART只读"""
return self._uart4g
@property
def distance_serial(self):
"""激光测距串口(只读)"""
return self._distance_serial
@property
def bus(self):
"""I2C总线只读"""
return self._bus
@property
def adc_obj(self):
"""ADC对象只读"""
return self._adc_obj
@property
def at_client(self):
"""AT客户端只读"""
return self._at_client
# ==================== 初始化方法 ====================
def init_uart4g(self, device=None, baudrate=None):
"""初始化4G模块UART"""
from maix import uart
if device is None:
device = config.UART4G_DEVICE
if baudrate is None:
baudrate = config.UART4G_BAUDRATE
self._uart4g = uart.UART(device, baudrate)
return self._uart4g
def init_distance_serial(self, device=None, baudrate=None):
"""初始化激光测距串口(激光控制)"""
from maix import uart
if device is None:
device = config.DISTANCE_SERIAL_DEVICE
if baudrate is None:
baudrate = config.DISTANCE_SERIAL_BAUDRATE
print(f"[HW] 初始化激光串口: device={device}, baudrate={baudrate}")
self._distance_serial = uart.UART(device, baudrate)
print(f"[HW] 激光串口初始化完成: {self._distance_serial}")
return self._distance_serial
def init_bus(self, bus_num=None):
"""初始化I2C总线"""
from maix import i2c
if bus_num is None:
bus_num = config.I2C_BUS_NUM
self._bus = i2c.I2C(bus_num, i2c.Mode.MASTER)
return self._bus
def init_adc(self, channel=None, res_bit=None):
"""初始化ADC"""
from maix.peripheral import adc
if channel is None:
channel = config.ADC_CHANNEL
if res_bit is None:
res_bit = adc.RES_BIT_12
self._adc_obj = adc.ADC(channel, res_bit)
return self._adc_obj
def init_at_client(self, uart_obj=None):
"""初始化AT客户端"""
if uart_obj is None:
if self._uart4g is None:
raise ValueError("uart4g must be initialized before at_client")
uart_obj = self._uart4g
self._at_client = ATClient(uart_obj)
self._at_client.start()
return self._at_client
# 创建全局单例实例
hardware_manager = HardwareManager()

260
laser_manager.py Normal file
View File

@@ -0,0 +1,260 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
激光管理器模块
提供激光控制、校准等功能
"""
import json
import os
from maix import time, camera
import threading
import config
from logger_manager import logger_manager
class LaserManager:
"""激光控制管理器(单例)"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(LaserManager, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# 私有状态
self._calibration_active = False
self._calibration_result = None
self._calibration_lock = threading.Lock()
self._laser_point = None
self._initialized = True
# ==================== 状态访问(只读属性)====================
@property
def calibration_active(self):
"""是否正在校准"""
return self._calibration_active
@property
def laser_point(self):
"""当前激光点"""
return self._laser_point
# ==================== 业务方法 ====================
def load_laser_point(self):
"""从配置文件加载激光中心点,失败则使用默认值"""
try:
if "laser_config.json" in os.listdir("/root"):
with open(config.CONFIG_FILE, "r") as f:
data = json.load(f)
if isinstance(data, list) and len(data) == 2:
self._laser_point = (int(data[0]), int(data[1]))
logger = logger_manager.logger
if logger:
logger.debug(f"[INFO] 加载激光点: {self._laser_point}")
return self._laser_point
else:
raise ValueError
else:
self._laser_point = config.DEFAULT_LASER_POINT
except:
self._laser_point = config.DEFAULT_LASER_POINT
return self._laser_point
def save_laser_point(self, point):
"""保存激光中心点到配置文件"""
try:
with open(config.CONFIG_FILE, "w") as f:
json.dump([point[0], point[1]], f)
self._laser_point = point
return True
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[LASER] 保存激光点失败: {e}")
return False
def turn_on_laser(self):
"""发送指令开启激光,并读取回包(部分模块支持)"""
from hardware import hardware_manager
logger = logger_manager.logger
if hardware_manager.distance_serial is None:
if logger:
logger.error("[LASER] distance_serial 未初始化")
return None
# 打印调试信息
if logger:
logger.info(f"[LASER] 发送开启命令: {config.LASER_ON_CMD.hex()}")
# 清空接收缓冲区
try:
hardware_manager.distance_serial.read(-1) # 清空缓冲区
except:
pass
# 发送命令
written = hardware_manager.distance_serial.write(config.LASER_ON_CMD)
if logger:
logger.info(f"[LASER] 写入字节数: {written}")
time.sleep_ms(50) # 增加等待时间,让模块有时间响应
# 读取回包
resp = hardware_manager.distance_serial.read(20)
if resp:
if logger:
logger.info(f"[LASER] 收到回包 ({len(resp)}字节): {resp.hex()}")
if resp == config.LASER_ON_CMD:
if logger:
logger.info("✅ 激光开启指令已确认")
else:
if logger:
logger.warning("🔇 无回包(可能正常或模块不支持回包)")
return resp
def turn_off_laser(self):
"""发送指令关闭激光"""
from hardware import hardware_manager
logger = logger_manager.logger
if hardware_manager.distance_serial is None:
if logger:
logger.error("[LASER] distance_serial 未初始化")
return None
# 打印调试信息
if logger:
logger.info(f"[LASER] 发送关闭命令: {config.LASER_OFF_CMD.hex()}")
# 清空接收缓冲区
try:
hardware_manager.distance_serial.read(-1)
except:
pass
# 发送命令
written = hardware_manager.distance_serial.write(config.LASER_OFF_CMD)
if logger:
logger.info(f"[LASER] 写入字节数: {written}")
time.sleep_ms(50) # 增加等待时间
# 读取回包
resp = hardware_manager.distance_serial.read(20)
if resp:
if logger:
logger.info(f"[LASER] 收到回包 ({len(resp)}字节): {resp.hex()}")
else:
if logger:
logger.warning("🔇 无回包")
return resp
def flash_laser(self, duration_ms=1000):
"""闪一下激光(用于射箭反馈)"""
try:
self.turn_on_laser()
time.sleep_ms(duration_ms)
self.turn_off_laser()
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"闪激光失败: {e}")
def find_red_laser(self, frame, threshold=150):
"""在图像中查找最亮的红色激光点(基于 RGB 阈值)"""
w, h = frame.width(), frame.height()
img_bytes = frame.to_bytes()
max_sum = 0
best_pos = None
for y in range(0, h, 2):
for x in range(0, w, 2):
idx = (y * w + x) * 3
r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2]
if r > threshold and r > g * 2 and r > b * 2:
rgb_sum = r + g + b
if rgb_sum > max_sum:
max_sum = rgb_sum
best_pos = (x, y)
return best_pos
def calibrate_laser_position(self):
"""执行一次激光校准:拍照 → 找红点 → 保存坐标"""
time.sleep_ms(80)
cam = camera.Camera(640, 480)
frame = cam.read()
pos = self.find_red_laser(frame)
if pos:
self.save_laser_point(pos)
return pos
return None
def start_calibration(self):
"""开始校准(公共方法)"""
with self._calibration_lock:
if self._calibration_active:
return False
self._calibration_active = True
self._calibration_result = None
return True
def stop_calibration(self):
"""停止校准(公共方法)"""
with self._calibration_lock:
self._calibration_active = False
def set_calibration_result(self, result):
"""设置校准结果(内部方法)"""
with self._calibration_lock:
self._calibration_result = result
def get_calibration_result(self):
"""获取并清除校准结果(内部方法)"""
with self._calibration_lock:
result = self._calibration_result
self._calibration_result = None
return result
# 创建全局单例实例
laser_manager = LaserManager()
# ==================== 向后兼容的函数接口 ====================
def load_laser_point():
"""加载激光点(向后兼容接口)"""
return laser_manager.load_laser_point()
def save_laser_point(point):
"""保存激光点(向后兼容接口)"""
return laser_manager.save_laser_point(point)
def turn_on_laser():
"""开启激光(向后兼容接口)"""
return laser_manager.turn_on_laser()
def turn_off_laser():
"""关闭激光(向后兼容接口)"""
return laser_manager.turn_off_laser()
def flash_laser(duration_ms=1000):
"""闪激光(向后兼容接口)"""
return laser_manager.flash_laser(duration_ms)
def find_red_laser(frame, threshold=150):
"""查找红色激光点(向后兼容接口)"""
return laser_manager.find_red_laser(frame, threshold)
def calibrate_laser_position():
"""校准激光位置(向后兼容接口)"""
return laser_manager.calibrate_laser_position()

1278
ota_manager.py Normal file

File diff suppressed because it is too large Load Diff

112
power.py Normal file
View File

@@ -0,0 +1,112 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
电源管理模块INA226
提供电压、电流监测和充电状态检测
"""
import config
from logger_manager import logger_manager
def write_register(reg, value):
"""写入INA226寄存器"""
from hardware import hardware_manager
data = [(value >> 8) & 0xFF, value & 0xFF]
hardware_manager.bus.writeto_mem(config.INA226_ADDR, reg, bytes(data))
def read_register(reg):
"""读取INA226寄存器"""
from hardware import hardware_manager
data = hardware_manager.bus.readfrom_mem(config.INA226_ADDR, reg, 2)
return (data[0] << 8) | data[1]
def init_ina226():
"""初始化 INA226 芯片:配置模式 + 校准值"""
write_register(config.REG_CONFIGURATION, 0x4527)
write_register(config.REG_CALIBRATION, config.CALIBRATION_VALUE)
def get_bus_voltage():
"""读取总线电压单位V"""
raw = read_register(config.REG_BUS_VOLTAGE)
return raw * 1.25 / 1000
def get_current():
"""
读取电流单位mA
正数表示充电,负数表示放电
INA226 电流计算公式:
Current = (Current Register Value) × Current_LSB
Current_LSB = 0.001 × CALIBRATION_VALUE / 4096
"""
try:
raw = read_register(config.REG_CURRENT)
# INA226 电流寄存器是16位有符号整数
# 最高位是符号位0=正充电1=负(放电)
# 计算 Current_LSB根据 CALIBRATION_VALUE
current_lsb = 0.001 * config.CALIBRATION_VALUE / 4096 # 单位A
# 处理有符号数如果最高位为1转换为负数
if raw & 0x8000: # 最高位为1表示负数放电
signed_raw = raw - 0x10000 # 转换为有符号整数
else: # 最高位为0表示正数充电
signed_raw = raw
# 转换为毫安
current_ma = signed_raw * current_lsb * 1000
return current_ma
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[INA226] 读取电流失败: {e}")
else:
print(f"[INA226] 读取电流失败: {e}")
return 0.0
def is_charging(threshold_ma=10.0):
"""
检测是否在充电(通过电流方向判断)
Args:
threshold_ma: 电流阈值毫安超过此值认为在充电默认10mA
Returns:
True: 正在充电
False: 未充电或读取失败
"""
try:
current = get_current()
is_charge = current > threshold_ma
return is_charge
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[CHARGE] 检测充电状态失败: {e}")
else:
print(f"[CHARGE] 检测充电状态失败: {e}")
return False
def voltage_to_percent(voltage):
"""根据电压估算电池百分比(查表插值)"""
points = [
(4.20, 100), (4.10, 95), (4.05, 85), (4.00, 75), (3.95, 65),
(3.90, 55), (3.85, 45), (3.80, 35), (3.75, 25), (3.70, 15),
(3.65, 5), (3.60, 0)
]
if voltage >= points[0][0]:
return 100
if voltage <= points[-1][0]:
return 0
for i in range(len(points) - 1):
v1, p1 = points[i]
v2, p2 = points[i + 1]
if voltage >= v2:
ratio = (voltage - v1) / (v2 - v1)
percent = p1 + (p2 - p1) * ratio
return max(0, min(100, int(round(percent))))
return 0

186
time_sync.py Normal file
View File

@@ -0,0 +1,186 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
时间同步模块
从4G模块获取时间并同步到系统
"""
import re
import os
from datetime import datetime, timedelta
import config
# from logger_bak import get_logger
from logger_manager import logger_manager
def parse_4g_time(cclk_response, timezone_offset=8):
"""
解析 AT+CCLK? 返回的时间字符串,并转换为本地时间
Args:
cclk_response: AT+CCLK? 的响应字符串
timezone_offset: 时区偏移小时默认8中国时区 UTC+8
Returns:
datetime 对象(已转换为本地时间),如果解析失败返回 None
"""
try:
# 匹配格式: +CCLK: "YY/MM/DD,HH:MM:SS+TZ"
# 时区单位是四分之一小时quarters of an hour
match = re.search(r'\+CCLK:\s*"(\d{2})/(\d{2})/(\d{2}),(\d{2}):(\d{2}):(\d{2})([+-]\d{1,3})?"', cclk_response)
if not match:
return None
yy, mm, dd, hh, MM, ss, tz_str = match.groups()
# 年份处理26 -> 2026
year = 2000 + int(yy)
month = int(mm)
day = int(dd)
hour = int(hh)
minute = int(MM)
second = int(ss)
# 创建 UTC 时间的 datetime 对象
dt_utc = datetime(year, month, day, hour, minute, second)
# 解析时区偏移(单位:四分之一小时)
if tz_str:
try:
# 时区偏移值(四分之一小时)
tz_quarters = int(tz_str)
# 转换为小时除以4
tz_hours = tz_quarters / 4.0
logger = logger_manager.logger
if logger:
logger.info(f"[TIME] 时区偏移: {tz_str} (四分之一小时) = {tz_hours} 小时")
# 转换为本地时间
dt_local = dt_utc + timedelta(hours=tz_hours)
except ValueError:
# 如果时区解析失败,使用默认值
logger = logger_manager.logger
if logger:
logger.warning(f"[TIME] 时区解析失败: {tz_str},使用默认 UTC+{timezone_offset}")
dt_local = dt_utc + timedelta(hours=timezone_offset)
else:
# 没有时区信息,使用默认值
logger = logger_manager.logger
if logger:
logger.info(f"[TIME] 未找到时区信息,使用默认 UTC+{timezone_offset}")
dt_local = dt_utc + timedelta(hours=timezone_offset)
logger = logger_manager.logger
if logger:
logger.info(f"[TIME] UTC时间: {dt_utc.strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"[TIME] 本地时间: {dt_local.strftime('%Y-%m-%d %H:%M:%S')}")
return dt_local
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[TIME] 解析时间失败: {e}, 响应: {cclk_response}")
else:
print(f"[TIME] 解析时间失败: {e}, 响应: {cclk_response}")
return None
def get_time_from_4g(timezone_offset=8):
"""
通过4G模块获取当前时间已转换为本地时间
Args:
timezone_offset: 时区偏移小时默认8中国时区
Returns:
datetime 对象(本地时间),如果获取失败返回 None
"""
try:
# 发送 AT+CCLK? 命令(延迟导入避免循环依赖)
from hardware import hardware_manager
# 检查 at_client 是否已初始化
if hardware_manager.at_client is None:
logger = logger_manager.logger
if logger:
logger.warning("[TIME] ATClient 尚未初始化无法获取4G时间")
else:
print("[TIME] ATClient 尚未初始化无法获取4G时间")
return None
resp = hardware_manager.at_client.send("AT+CCLK?", "OK", 3000)
if not resp or "+CCLK:" not in resp:
logger = logger_manager.logger
if logger:
logger.warning(f"[TIME] 未获取到时间响应: {resp}")
else:
print(f"[TIME] 未获取到时间响应: {resp}")
return None
# 解析并转换时区
dt = parse_4g_time(resp, timezone_offset)
return dt
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[TIME] 获取4G时间异常: {e}")
else:
print(f"[TIME] 获取4G时间异常: {e}")
return None
def sync_system_time_from_4g(timezone_offset=8):
"""
从4G模块同步时间到系统
Args:
timezone_offset: 时区偏移小时默认8中国时区
Returns:
bool: 是否成功
"""
dt = get_time_from_4g(timezone_offset)
if not dt:
return False
try:
# 转换为系统 date 命令需要的格式
time_str = dt.strftime('%Y-%m-%d %H:%M:%S')
# 设置系统时间
cmd = f'date -s "{time_str}" 2>&1'
result = os.system(cmd)
if result == 0:
logger = logger_manager.logger
if logger:
logger.info(f"[TIME] 系统时间已设置为: {time_str}")
else:
print(f"[TIME] 系统时间已设置为: {time_str}")
# 可选:同步到硬件时钟
try:
os.system('hwclock -w 2>/dev/null')
logger = logger_manager.logger
if logger:
logger.info("[TIME] 已同步到硬件时钟")
except:
pass
return True
else:
logger = logger_manager.logger
if logger:
logger.error(f"[TIME] 设置系统时间失败,退出码: {result}")
else:
print(f"[TIME] 设置系统时间失败,退出码: {result}")
return False
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[TIME] 同步系统时间异常: {e}")
else:
print(f"[TIME] 同步系统时间异常: {e}")
return False

13
version.py Normal file
View File

@@ -0,0 +1,13 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
应用版本号
每次 OTA 更新时,只需要更新这个文件中的版本号
"""
VERSION = '1.1.1'

360
vision.py Normal file
View File

@@ -0,0 +1,360 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
视觉检测模块
提供靶心检测、距离估算、图像保存等功能
"""
import cv2
import numpy as np
import os
import math
from maix import image
import globals
import config
from logger_manager import logger_manager
def detect_circle_v3(frame, laser_point=None):
"""检测图像中的靶心(优先清晰轮廓,其次黄色区域)- 返回椭圆参数版本
增加红色圆圈检测,验证黄色圆圈是否为真正的靶心
如果提供 laser_point会选择最接近激光点的目标
Args:
frame: 图像帧
laser_point: 激光点坐标 (x, y),用于多目标场景下的目标选择
Returns:
(result_img, best_center, best_radius, method, best_radius1, ellipse_params)
"""
img_cv = image.image2cv(frame, False, False)
best_center = best_radius = best_radius1 = method = None
ellipse_params = None
# HSV 黄色掩码检测(模糊靶心)
hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV)
h, s, v = cv2.split(hsv)
# 调整饱和度策略:稍微增强,不要过度
s = np.clip(s * 1.1, 0, 255).astype(np.uint8)
hsv = cv2.merge((h, s, v))
# 放宽 HSV 阈值范围(针对模糊图像的关键调整)
lower_yellow = np.array([7, 80, 0]) # 饱和度下限降低,捕捉淡黄色
upper_yellow = np.array([32, 255, 255]) # 亮度上限拉满
mask_yellow = cv2.inRange(hsv, lower_yellow, upper_yellow)
# 调整形态学操作
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask_yellow = cv2.morphologyEx(mask_yellow, cv2.MORPH_CLOSE, kernel)
contours_yellow, _ = cv2.findContours(mask_yellow, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# 存储所有有效的黄色-红色组合
valid_targets = []
if contours_yellow:
for cnt_yellow in contours_yellow:
area = cv2.contourArea(cnt_yellow)
perimeter = cv2.arcLength(cnt_yellow, True)
# 计算圆度
if perimeter > 0:
circularity = (4 * np.pi * area) / (perimeter * perimeter)
else:
circularity = 0
logger = logger_manager.logger
if area > 50 and circularity > 0.7:
if logger:
logger.info(f"[target] -> 面积:{area}, 圆度:{circularity:.2f}")
# 尝试拟合椭圆
yellow_center = None
yellow_radius = None
yellow_ellipse = None
if len(cnt_yellow) >= 5:
(x, y), (width, height), angle = cv2.fitEllipse(cnt_yellow)
yellow_ellipse = ((x, y), (width, height), angle)
axes_minor = min(width, height)
radius = axes_minor / 2
yellow_center = (int(x), int(y))
yellow_radius = int(radius)
else:
(x, y), radius = cv2.minEnclosingCircle(cnt_yellow)
yellow_center = (int(x), int(y))
yellow_radius = int(radius)
yellow_ellipse = None
# 如果检测到黄色圆圈,再检测红色圆圈进行验证
if yellow_center and yellow_radius:
# HSV 红色掩码检测红色在HSV中跨越0度需要两个范围
# 红色范围1: 0-10度接近0度的红色
lower_red1 = np.array([0, 80, 0])
upper_red1 = np.array([10, 255, 255])
mask_red1 = cv2.inRange(hsv, lower_red1, upper_red1)
# 红色范围2: 170-180度接近180度的红色
lower_red2 = np.array([170, 80, 0])
upper_red2 = np.array([180, 255, 255])
mask_red2 = cv2.inRange(hsv, lower_red2, upper_red2)
# 合并两个红色掩码
mask_red = cv2.bitwise_or(mask_red1, mask_red2)
# 形态学操作
kernel_red = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask_red = cv2.morphologyEx(mask_red, cv2.MORPH_CLOSE, kernel_red)
contours_red, _ = cv2.findContours(mask_red, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
found_valid_red = False
if contours_red:
# 找到所有符合条件的红色圆圈
for cnt_red in contours_red:
area_red = cv2.contourArea(cnt_red)
perimeter_red = cv2.arcLength(cnt_red, True)
if perimeter_red > 0:
circularity_red = (4 * np.pi * area_red) / (perimeter_red * perimeter_red)
else:
circularity_red = 0
# 红色圆圈也应该有一定的圆度
if area_red > 50 and circularity_red > 0.6:
# 计算红色圆圈的中心和半径
if len(cnt_red) >= 5:
(x_red, y_red), (w_red, h_red), angle_red = cv2.fitEllipse(cnt_red)
radius_red = min(w_red, h_red) / 2
red_center = (int(x_red), int(y_red))
red_radius = int(radius_red)
else:
(x_red, y_red), radius_red = cv2.minEnclosingCircle(cnt_red)
red_center = (int(x_red), int(y_red))
red_radius = int(radius_red)
# 计算黄色和红色圆心的距离
if red_center:
dx = yellow_center[0] - red_center[0]
dy = yellow_center[1] - red_center[1]
distance = np.sqrt(dx*dx + dy*dy)
# 圆心距离阈值应该小于黄色半径的某个倍数比如1.5倍)
max_distance = yellow_radius * 1.5
# 红色圆圈应该比黄色圆圈大(外圈)
if distance < max_distance and red_radius > yellow_radius * 0.8:
found_valid_red = True
logger = logger_manager.logger
if logger:
logger.info(f"[target] -> 找到匹配的红圈: 黄心({yellow_center}), 红心({red_center}), 距离:{distance:.1f}, 黄半径:{yellow_radius}, 红半径:{red_radius}")
# 记录这个有效目标
valid_targets.append({
'center': yellow_center,
'radius': yellow_radius,
'ellipse': yellow_ellipse,
'area': area
})
break
if not found_valid_red:
logger = logger_manager.logger
if logger:
logger.debug("Debug -> 未找到匹配的红色圆圈,可能是误识别")
# 从所有有效目标中选择最佳目标
if valid_targets:
if laser_point:
# 如果有激光点,选择最接近激光点的目标
best_target = None
min_distance = float('inf')
for target in valid_targets:
dx = target['center'][0] - laser_point[0]
dy = target['center'][1] - laser_point[1]
distance = np.sqrt(dx*dx + dy*dy)
if distance < min_distance:
min_distance = distance
best_target = target
if best_target:
best_center = best_target['center']
best_radius = best_target['radius']
ellipse_params = best_target['ellipse']
method = "v3_ellipse_red_validated_laser_selected"
best_radius1 = best_radius * 5
else:
# 如果没有激光点,选择面积最大的目标
best_target = max(valid_targets, key=lambda t: t['area'])
best_center = best_target['center']
best_radius = best_target['radius']
ellipse_params = best_target['ellipse']
method = "v3_ellipse_red_validated"
best_radius1 = best_radius * 5
result_img = image.cv2image(img_cv, False, False)
return result_img, best_center, best_radius, method, best_radius1, ellipse_params
def estimate_distance(pixel_radius):
"""根据像素半径估算实际距离(单位:米)"""
if not pixel_radius:
return 0.0
return (config.REAL_RADIUS_CM * config.FOCAL_LENGTH_PIX) / pixel_radius / 100.0
def compute_laser_position(circle_center, laser_point, radius, method):
"""计算激光相对于靶心的偏移量(单位:厘米)"""
if not all([circle_center, radius, method]):
return None, None
cx, cy = circle_center
lx, ly = laser_point
# 根据检测方法动态调整靶心物理半径(简化模型)
circle_r = (radius / 4.0) * 20.0 if method == "模糊" else (68 / 16.0) * 20.0
dx = lx - cx
dy = ly - cy
return dx / (circle_r / 100.0), -dy / (circle_r / 100.0)
def save_shot_image(result_img, center, radius, method, ellipse_params,
laser_point, distance_m, photo_dir=None):
"""
保存射击图像(带标注)
即使没有检测到靶心也会保存图像,文件名会标注 "no_target"
确保保存的图像总是包含激光十字线
Args:
result_img: 处理后的图像对象(可能已经包含激光十字线或检测标注)
center: 靶心中心坐标 (x, y),可能为 None未检测到靶心
radius: 靶心半径,可能为 None未检测到靶心
method: 检测方法,可能为 None未检测到靶心
ellipse_params: 椭圆参数 ((center, (width, height), angle)) 或 None
laser_point: 激光点坐标 (x, y)
distance_m: 距离(米),可能为 None未检测到靶心
photo_dir: 照片存储目录如果为None则使用 config.PHOTO_DIR
Returns:
str: 保存的文件路径,如果保存失败或未启用则返回 None
"""
# 检查是否启用图像保存
if not config.SAVE_IMAGE_ENABLED:
return None
if photo_dir is None:
photo_dir = config.PHOTO_DIR
try:
# 确保照片目录存在
try:
if photo_dir not in os.listdir("/root"):
os.mkdir(photo_dir)
except:
pass
# 生成文件名
# 统计所有图片文件(包括 .bmp 和 .jpg
try:
all_images = [f for f in os.listdir(photo_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))]
img_count = len(all_images)
except:
img_count = 0
x, y = laser_point
# 如果未检测到靶心,在文件名中标注
if center is None or radius is None:
method_str = "no_target"
distance_str = "000"
else:
method_str = method or "unknown"
distance_str = str(round((distance_m or 0.0) * 100))
filename = f"{photo_dir}/{method_str}_{int(x)}_{int(y)}_{distance_str}_{img_count:04d}.bmp"
logger = logger_manager.logger
if logger:
if center and radius:
logger.info(f"结果 -> 圆心: {center}, 半径: {radius}, 方法: {method}")
if ellipse_params:
(ell_center, (width, height), angle) = ellipse_params
logger.info(f"椭圆 -> 中心: ({ell_center[0]:.1f}, {ell_center[1]:.1f}), 长轴: {max(width, height):.1f}, 短轴: {min(width, height):.1f}, 角度: {angle:.1f}°")
else:
logger.info(f"结果 -> 未检测到靶心,保存原始图像(激光点: ({x}, {y})")
# 转换图像为 OpenCV 格式以便绘制
img_cv = image.image2cv(result_img, False, False)
# 确保激光十字线被绘制使用OpenCV在图像上绘制确保可见性
laser_color = (config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2])
thickness = max(config.LASER_THICKNESS, 2) # 至少2像素宽确保可见
length = max(config.LASER_LENGTH, 10) # 至少10像素长
# 绘制激光十字线(水平线)
cv2.line(img_cv,
(int(x - length), int(y)),
(int(x + length), int(y)),
laser_color, thickness)
# 绘制激光十字线(垂直线)
cv2.line(img_cv,
(int(x), int(y - length)),
(int(x), int(y + length)),
laser_color, thickness)
# 绘制激光点
cv2.circle(img_cv, (int(x), int(y)), max(thickness, 3), laser_color, -1)
# 如果检测到靶心,绘制靶心标注
if center and radius:
cx, cy = center
if ellipse_params:
(ell_center, (width, height), angle) = ellipse_params
cx_ell, cy_ell = int(ell_center[0]), int(ell_center[1])
# 绘制椭圆
cv2.ellipse(img_cv,
(cx_ell, cy_ell),
(int(width/2), int(height/2)),
angle,
0, 360,
(0, 255, 0),
2)
cv2.circle(img_cv, (cx_ell, cy_ell), 3, (255, 0, 0), -1)
# 绘制短轴
minor_length = min(width, height) / 2
minor_angle = angle + 90 if width >= height else angle
minor_angle_rad = math.radians(minor_angle)
dx_minor = minor_length * math.cos(minor_angle_rad)
dy_minor = minor_length * math.sin(minor_angle_rad)
pt1_minor = (int(cx_ell - dx_minor), int(cy_ell - dy_minor))
pt2_minor = (int(cx_ell + dx_minor), int(cy_ell + dy_minor))
cv2.line(img_cv, pt1_minor, pt2_minor, (0, 0, 255), 2)
else:
# 绘制圆形靶心
cv2.circle(img_cv, (cx, cy), radius, (0, 0, 255), 2)
cv2.circle(img_cv, (cx, cy), 2, (0, 0, 255), -1)
# 如果检测到靶心,绘制从激光点到靶心的连线(可选,用于可视化偏移)
cv2.line(img_cv, (int(x), int(y)), (cx, cy), (255, 255, 0), 1)
# 转换回 MaixPy 图像格式并保存
result_img = image.cv2image(img_cv, False, False)
result_img.save(filename)
if logger:
if center and radius:
logger.debug(f"图像已保存(含靶心标注): {filename}")
else:
logger.debug(f"图像已保存(无靶心,含激光十字线): {filename}")
return filename
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"保存图像失败: {e}")
import traceback
logger.error(traceback.format_exc())
return None