Compare commits

..

22 Commits

Author SHA1 Message Date
gcw_4spBpAfv
8ce8831315 v1.2.2 2026-01-24 11:05:03 +08:00
gcw_4spBpAfv
28fb62e5d6 v1.2.1 2026-01-23 11:28:40 +08:00
gcw_4spBpAfv
42bfdd033c invole c++ 2026-01-22 17:55:11 +08:00
gcw_4spBpAfv
945077a453 refind logger 2026-01-20 18:40:54 +08:00
gcw_4spBpAfv
0ce140a210 v1.1.5 2026-01-20 11:25:17 +08:00
huangzhenwei2
83fe0776eb update laser cabration 2026-01-13 00:01:39 +08:00
huangzhenwei2
a0019b8b0e fix the laser point x,y 2026-01-12 20:53:23 +08:00
huangzhenwei2
2a0534ac62 update laser estismate 2026-01-12 18:53:01 +08:00
huangzhenwei2
3c45fba0f5 update distance estismate by laser, both distance value are uploaded 2026-01-12 18:06:04 +08:00
huangzhenwei2
708925ab41 refine the code to different part 2026-01-12 11:39:27 +08:00
huangzhenwei2
92ad32bb8e refine ota 2025-12-30 16:40:01 +08:00
huangzhenwei2
669d032f96 ota with 4g 2025-12-30 16:23:17 +08:00
b37c492930 feat: 4g模块按行升级 2025-12-30 15:49:16 +08:00
huangzhenwei2
46757e848f update ota and tcp msg control 2025-12-30 09:21:58 +08:00
huangzhenwei2
201de84ad0 conflict read 2025-12-28 18:41:36 +08:00
huangzhenwei2
85a5ff9ff0 conflict merge 2025-12-28 16:30:11 +08:00
huangzhenwei2
e712e11ea0 ota update 2025-12-28 16:22:41 +08:00
b552d20a46 fix:测距 2025-12-28 16:19:00 +08:00
21cec260b8 fix: 修改电量不固定 2025-12-26 15:12:47 +08:00
5a98bf2e85 pref: 计算环数代码 2025-12-26 14:04:43 +08:00
huangzhenwei2
f11b31c09c update hearbeat 2025-12-26 11:47:33 +08:00
0b18ec353c temp: 2025-12-25 16:08:42 +08:00
29 changed files with 32036 additions and 1635 deletions

3
.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
/cpp_ext/build/
/.cursor/
/dist/

79
S99archery Normal file
View File

@@ -0,0 +1,79 @@
#!/bin/sh
# /etc/init.d/S99archery
# 系统启动时处理致命错误恢复(仅处理无法启动的情况)
# 注意:应用的启动由系统自动启动机制处理(通过 auto_start.txt
# 功能:
# 1. 处理致命错误(无法启动)- 恢复 main.py
# 2. 如果重启次数超过阈值,恢复 main.py 并重启系统
APP_DIR="/maixapp/apps/t11"
MAIN_PY="$APP_DIR/main.py"
PENDING_FILE="$APP_DIR/ota_pending.json"
BACKUP_BASE="$APP_DIR/backups"
# 进入应用目录
cd "$APP_DIR" || exit 0
# 检查 pending 文件,如果存在且超过重启次数,恢复 main.py处理致命错误
if [ -f "$PENDING_FILE" ]; then
echo "[S99] 检测到 ota_pending.json检查重启计数..."
# 尝试从JSON中提取重启计数使用grep简单提取
RESTART_COUNT=$(cat "$PENDING_FILE" 2>/dev/null | grep -o '"restart_count":[0-9]*' | grep -o '[0-9]*' || echo "0")
MAX_RESTARTS=$(cat "$PENDING_FILE" 2>/dev/null | grep -o '"max_restarts":[0-9]*' | grep -o '[0-9]*' || echo "3")
if [ -n "$RESTART_COUNT" ] && [ "$RESTART_COUNT" -ge "$MAX_RESTARTS" ]; then
echo "[S99] 检测到重启次数 ($RESTART_COUNT) 超过阈值 ($MAX_RESTARTS),恢复 main.py..."
# 尝试从JSON中提取备份目录
BACKUP_DIR=$(cat "$PENDING_FILE" 2>/dev/null | grep -o '"backup_dir":"[^"]*"' | grep -o '/[^"]*' || echo "")
if [ -n "$BACKUP_DIR" ] && [ -f "$BACKUP_DIR/main.py" ]; then
# 使用指定的备份目录
echo "[S99] 从备份目录恢复: $BACKUP_DIR/main.py"
cp "$BACKUP_DIR/main.py" "$MAIN_PY" 2>/dev/null && echo "[S99] 已恢复 main.py"
else
# 查找最新的备份目录
LATEST_BACKUP=$(ls -dt "$BACKUP_BASE"/backup_* 2>/dev/null | head -1)
if [ -n "$LATEST_BACKUP" ] && [ -f "$LATEST_BACKUP/main.py" ]; then
echo "[S99] 从最新备份恢复: $LATEST_BACKUP/main.py"
cp "$LATEST_BACKUP/main.py" "$MAIN_PY" 2>/dev/null && echo "[S99] 已恢复 main.py"
else
# 如果没有备份目录,尝试使用 main.py.bak
if [ -f "$APP_DIR/main.py.bak" ]; then
echo "[S99] 从 main.py.bak 恢复"
cp "$APP_DIR/main.py.bak" "$MAIN_PY" 2>/dev/null && echo "[S99] 已恢复 main.py"
fi
fi
fi
# 恢复后重置重启计数,避免循环恢复
# 注意:不在这里删除 pending 文件,让 main.py 在心跳成功后删除
# 但是重置重启计数,以便恢复后的版本可以重新开始计数
python3 -c "
import json, os
try:
pending_path = '$PENDING_FILE'
if os.path.exists(pending_path):
with open(pending_path, 'r', encoding='utf-8') as f:
d = json.load(f)
d['restart_count'] = 0 # 重置重启计数
with open(pending_path, 'w', encoding='utf-8') as f:
json.dump(d, f)
print('[S99] 已重置重启计数为 0')
except Exception as e:
print(f'[S99] 重置重启计数失败: {e}')
" 2>/dev/null || echo "[S99] 无法重置重启计数可能需要Python支持"
echo "[S99] 已恢复 main.py重启系统..."
echo "[S99] 注意pending 文件将在心跳成功后由 main.py 删除"
sleep 2
reboot
exit 0
fi
fi
# 不启动应用,让系统自动启动机制处理
# 这个脚本只负责处理致命错误恢复
exit 0

View File

@@ -1,8 +1,24 @@
id: t11
name: t11
version: 1.0.2
version: 1.2.1
author: t11
icon: ''
desc: t11
files:
- app.yaml
- archery_netcore.cpython-311-riscv64-linux-gnu.so
- at_client.py
- camera_manager.py
- config.py
- hardware.py
- laser_manager.py
- logger_manager.py
- main.py
- network.py
- ota_manager.py
- power.py
- shoot_manager.py
- shot_id_generator.py
- time_sync.py
- version.py
- vision.py

307
at_client.py Normal file
View File

@@ -0,0 +1,307 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AT客户端模块
负责4G模块的AT命令通信和URC解析
"""
import _thread
from maix import time
import re
import threading
class ATClient:
"""
单读者 AT/URC 客户端:唯一读取 uart4g避免 tcp_main/at()/OTA 抢读导致 EOF / 丢包。
- send(cmd, expect, timeout_ms) : 发送 AT 并等待 expect
- pop_tcp_payload() : 获取 +MIPURC:"rtcp" 的 payload已按长度裁剪
- pop_http_event() : 获取 +MHTTPURC 事件header/content
"""
def __init__(self, uart_obj):
self.uart = uart_obj
self._cmd_lock = threading.Lock()
self._q_lock = threading.Lock()
self._rx = b""
self._tcp_payloads = []
self._http_events = []
# 当前命令等待状态(仅允许单命令 in-flight
self._waiting = False
self._expect = b"OK"
self._resp = b""
self._running = False
def start(self):
if self._running:
return
self._running = True
_thread.start_new_thread(self._reader_loop, ())
def stop(self):
self._running = False
def flush(self):
"""清空内部缓存与队列(用于 OTA/异常恢复)"""
with self._q_lock:
self._rx = b""
self._tcp_payloads.clear()
self._http_events.clear()
self._resp = b""
def pop_tcp_payload(self):
with self._q_lock:
if self._tcp_payloads:
return self._tcp_payloads.pop(0)
return None
def pop_http_event(self):
with self._q_lock:
if self._http_events:
return self._http_events.pop(0)
return None
def _push_tcp_payload(self, payload: bytes):
# 注意:在 _reader_loop 内部解析 URC 时已经持有 _q_lock
# 这里不要再次 acquire锁不可重入会死锁
self._tcp_payloads.append(payload)
def _push_http_event(self, ev):
# 同上:避免在 _reader_loop 持锁期间二次 acquire
self._http_events.append(ev)
def send(self, cmd: str, expect: str = "OK", timeout_ms: int = 2000):
"""
发送 AT 命令并等待 expect子串匹配
注意expect=">" 用于等待 prompt。
"""
expect_b = expect.encode() if isinstance(expect, str) else expect
with self._cmd_lock:
# 初始化等待
self._waiting = True
self._expect = expect_b
self._resp = b""
# 发送
if cmd:
# 注意:这里不要再用 uart4g_lock否则外层已经持锁时会死锁
# 写入由 _cmd_lock 串行化即可。
self.uart.write((cmd + "\r\n").encode())
t0 = time.ticks_ms()
while abs(time.ticks_diff(time.ticks_ms(), t0)) < timeout_ms:
if (not self._waiting) or (self._expect in self._resp):
self._waiting = False
break
time.sleep_ms(5)
# 超时也返回已收集内容(便于诊断)
self._waiting = False
try:
return self._resp.decode(errors="ignore")
except:
return str(self._resp)
def _find_urc_tag(self, tag: bytes):
"""
只在"真正的 URC 边界"查找 tag避免误命中 HTTP payload 内容。
规则tag 必须出现在 buffer 开头,或紧跟在 b"\\r\\n" 后面。
"""
try:
i = 0
rx = self._rx
while True:
j = rx.find(tag, i)
if j < 0:
return -1
if j == 0:
return 0
if j >= 2 and rx[j - 2:j] == b"\r\n":
return j
i = j + 1
except:
return -1
def _parse_mipurc_rtcp(self):
"""
解析:+MIPURC: "rtcp",<link_id>,<len>,<payload...>
之前硬编码 link_id=0 会导致在多连接/重连场景下收不到数据。
"""
prefix = b'+MIPURC: "rtcp",'
i = self._find_urc_tag(prefix)
if i < 0:
return False
# 丢掉前置噪声
if i > 0:
self._rx = self._rx[i:]
i = 0
j = len(prefix)
# 解析 link_id
k = j
while k < len(self._rx) and 48 <= self._rx[k] <= 57:
k += 1
if k == j or k >= len(self._rx):
return False
if self._rx[k:k+1] != b",":
self._rx = self._rx[1:]
return True
try:
link_id = int(self._rx[j:k].decode())
except:
self._rx = self._rx[1:]
return True
# 解析 len
j2 = k + 1
k2 = j2
while k2 < len(self._rx) and 48 <= self._rx[k2] <= 57:
k2 += 1
if k2 == j2 or k2 >= len(self._rx):
return False
if self._rx[k2:k2+1] != b",":
self._rx = self._rx[1:]
return True
try:
n = int(self._rx[j2:k2].decode())
except:
self._rx = self._rx[1:]
return True
payload_start = k2 + 1
payload_end = payload_start + n
if len(self._rx) < payload_end:
return False # payload 未收齐
payload = self._rx[payload_start:payload_end]
# 把 link_id 一起带上,便于上层过滤(如果需要)
self._push_tcp_payload((link_id, payload))
self._rx = self._rx[payload_end:]
return True
def _parse_mhttpurc_header(self):
tag = b'+MHTTPURC: "header",'
i = self._find_urc_tag(tag)
if i < 0:
return False
if i > 0:
self._rx = self._rx[i:]
i = 0
# header: +MHTTPURC: "header",<id>,<code>,<hdr_len>,<hdr_text...>
j = len(tag)
comma_count = 0
k = j
while k < len(self._rx) and comma_count < 3:
if self._rx[k:k+1] == b",":
comma_count += 1
k += 1
if comma_count < 3:
return False
prefix = self._rx[:k]
m = re.search(rb'\+MHTTPURC: "header",\s*(\d+),\s*(\d+),\s*(\d+),', prefix)
if not m:
self._rx = self._rx[1:]
return True
urc_id = int(m.group(1))
code = int(m.group(2))
hdr_len = int(m.group(3))
text_start = k
text_end = text_start + hdr_len
if len(self._rx) < text_end:
return False
hdr_text = self._rx[text_start:text_end].decode("utf-8", "ignore")
self._push_http_event(("header", urc_id, code, hdr_text))
self._rx = self._rx[text_end:]
return True
def _parse_mhttpurc_content(self):
tag = b'+MHTTPURC: "content",'
i = self._find_urc_tag(tag)
if i < 0:
return False
if i > 0:
self._rx = self._rx[i:]
i = 0
# content: +MHTTPURC: "content",<id>,<total>,<sum>,<cur>,<payload...>
j = len(tag)
comma_count = 0
k = j
while k < len(self._rx) and comma_count < 4:
if self._rx[k:k+1] == b",":
comma_count += 1
k += 1
if comma_count < 4:
return False
prefix = self._rx[:k]
m = re.search(rb'\+MHTTPURC: "content",\s*(\d+),\s*(\d+),\s*(\d+),\s*(\d+),', prefix)
if not m:
self._rx = self._rx[1:]
return True
urc_id = int(m.group(1))
total_len = int(m.group(2))
sum_len = int(m.group(3))
cur_len = int(m.group(4))
payload_start = k
payload_end = payload_start + cur_len
if len(self._rx) < payload_end:
return False
payload = self._rx[payload_start:payload_end]
self._push_http_event(("content", urc_id, total_len, sum_len, cur_len, payload))
self._rx = self._rx[payload_end:]
return True
def _reader_loop(self):
while self._running:
# 关键UART 驱动偶发 read failed必须兜住否则线程挂了 OTA/TCP 都会卡死
try:
d = self.uart.read(4096) # 8192 在一些驱动上更容易触发 read failed
except Exception as e:
try:
print("[ATClient] uart read failed:", e)
except:
pass
time.sleep_ms(50)
continue
if not d:
time.sleep_ms(1)
continue
with self._q_lock:
self._rx += d
if self._waiting:
self._resp += d
while True:
progressed = (
self._parse_mipurc_rtcp()
or self._parse_mhttpurc_header()
or self._parse_mhttpurc_content()
)
if not progressed:
break
# 使用 ota_manager 访问 ota_in_progress
try:
from ota_manager import ota_manager
ota_flag = ota_manager.ota_in_progress
except:
ota_flag = False
has_http_hint = (b"+MHTTP" in self._rx) or (b"+MHTTPURC" in self._rx)
if ota_flag or has_http_hint:
if len(self._rx) > 512 * 1024:
self._rx = self._rx[-256 * 1024:]
else:
if len(self._rx) > 16384:
self._rx = self._rx[-4096:]

137
camera_manager.py Normal file
View File

@@ -0,0 +1,137 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
相机管理器模块
提供相机和显示的统一管理和线程安全访问
"""
import threading
import config
from logger_manager import logger_manager
class CameraManager:
"""相机管理器(单例)"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(CameraManager, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# 私有对象
self._camera = None
self._display = None
# 线程安全锁
self._camera_lock = threading.Lock()
self._display_lock = threading.Lock()
# 相机配置
self._camera_width = 640
self._camera_height = 480
self._initialized = True
# ==================== 初始化方法 ====================
@property
def logger(self):
"""获取 logger 对象"""
return logger_manager.logger
def init_camera(self, width=640, height=480):
"""初始化相机"""
if self._camera is not None:
return self._camera
from maix import camera
self._camera_width = width
self._camera_height = height
with self._camera_lock:
if self._camera is None:
self._camera = camera.Camera(width, height)
return self._camera
def init_display(self):
"""初始化显示"""
if self._display is not None:
return self._display
from maix import display
with self._display_lock:
if self._display is None:
self._display = display.Display()
return self._display
# ==================== 访问方法 ====================
@property
def camera(self):
"""获取相机实例(懒加载)"""
if self._camera is None:
self.init_camera()
return self._camera
@property
def display(self):
"""获取显示实例(懒加载)"""
if self._display is None:
self.init_display()
return self._display
# ==================== 业务方法 ====================
def read_frame(self):
"""
线程安全地读取一帧图像
Returns:
frame: 图像帧对象
"""
with self._camera_lock:
if self._camera is None:
self.init_camera()
return self._camera.read()
def show(self, image):
"""
线程安全地显示图像
Args:
image: 要显示的图像对象
"""
with self._display_lock:
if self._display is None:
self.init_display()
self._display.show(image)
def release(self):
"""释放相机和显示资源(如果需要)"""
with self._camera_lock:
if self._camera is not None:
# MaixPy 的 Camera 可能不需要显式释放,但可以在这里清理
self._camera = None
with self._display_lock:
if self._display is not None:
# MaixPy 的 Display 可能不需要显式释放
self._display = None
# 创建全局单例实例
camera_manager = CameraManager()

140
config.py Normal file
View File

@@ -0,0 +1,140 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
系统配置常量
这些值在程序运行期间基本不变,或只在配置时改变
"""
from version import VERSION
# ==================== 应用配置 ====================
APP_VERSION = VERSION
APP_DIR = "/maixapp/apps/t11"
LOCAL_FILENAME = "/maixapp/apps/t11/main_tmp.py"
# ==================== 服务器配置 ====================
# SERVER_IP = "stcp.shelingxingqiu.com"
SERVER_IP = "www.shelingxingqiu.com"
SERVER_PORT = 50005
HEARTBEAT_INTERVAL = 15 # 心跳间隔(秒)
# ===== TCP over SSL(TLS) 配置 =====
USE_TCP_SSL = False # True=按手册走 MSSLCFG/MIPCFG 绑定 SSL
TCP_LINK_ID = 2 #
TCP_SSL_PORT = 443 # TLS 端口(不一定必须 443以服务器为准
# SSL profile
SSL_ID = 1 # ssl_id=1
SSL_AUTH_MODE = 1 # 1=单向认证验证服务器2=双向
SSL_VERIFY_MODE = 1 # 1=写入并使用 CA 证书0=不验(仅测试加密,风险大)
SSL_CERT_FILENAME = "test.cer" # 模组里证书名MSSLCERTWR / MSSLCFG="cert" 用)
SSL_CERT_PATH = "/root/test.cer" # 设备文件系统里 CA 证书路径(你自己放进去)
# MIPOPEN 末尾的参数在不同固件里含义可能不同;按你手册例子保留
MIPOPEN_TAIL = ",,0"
# ==================== 文件路径配置 ====================
CONFIG_FILE = "/root/laser_config.json"
LOG_FILE = "/maixapp/apps/t11/app.log"
BACKUP_BASE = "/maixapp/apps/t11/backups"
# ==================== 硬件配置 ====================
# WiFi模块开关True=有WiFi模块False=无WiFi模块
HAS_WIFI_MODULE = True # 根据实际硬件情况设置
# UART配置
UART4G_DEVICE = "/dev/ttyS2"
UART4G_BAUDRATE = 115200
DISTANCE_SERIAL_DEVICE = "/dev/ttyS1"
DISTANCE_SERIAL_BAUDRATE = 9600
# I2C配置根据WiFi模块开关自动选择
# 无WiFi模块I2C_BUS_NUM = 1引脚P18(I2C1_SCL), P21(I2C1_SDA)
# 有WiFi模块I2C_BUS_NUM = 5引脚A15(I2C5_SCL), A27(I2C5_SDA)
I2C_BUS_NUM = 5 if HAS_WIFI_MODULE else 1
INA226_ADDR = 0x40
REG_CONFIGURATION = 0x00
REG_BUS_VOLTAGE = 0x02
REG_CURRENT = 0x04 # 电流寄存器
REG_CALIBRATION = 0x05
CALIBRATION_VALUE = 0x1400
# ADC配置
ADC_CHANNEL = 0
ADC_TRIGGER_THRESHOLD = 3000
ADC_LASER_THRESHOLD = 3000
# ==================== 激光配置 ====================
MODULE_ADDR = 0x00
LASER_ON_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC1])
LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0])
DISTANCE_QUERY_CMD = bytes([0xAA, MODULE_ADDR, 0x00, 0x20, 0x00, 0x01, 0x00, 0x00, 0x21]) # 激光测距查询命令
DISTANCE_RESPONSE_LEN = 13 # 激光测距响应数据长度(字节)
DEFAULT_LASER_POINT = (320, 252) # 默认激光中心点
# 硬编码激光点配置
HARDCODE_LASER_POINT = True # 是否使用硬编码的激光点True=使用硬编码值False=使用校准值)
HARDCODE_LASER_POINT_VALUE = (320, 252) # 硬编码的激光点坐标(315, 263) # # 硬编码的激光点坐标 (x, y)
# 激光点检测配置
LASER_DETECTION_THRESHOLD = 140 # 红色通道阈值默认120可调整范围建议100-150
LASER_RED_RATIO = 1.5 # 红色相对于绿色/蓝色的倍数要求默认1.5可调整范围建议1.3-2.0
LASER_SEARCH_RADIUS = 50 # 搜索半径像素从图像中心开始搜索默认20限制激光点不能偏离中心太远
LASER_MAX_DISTANCE_FROM_CENTER = 50 # 激光点距离中心的最大允许距离像素超过此距离则拒绝默认20
LASER_OVEREXPOSED_THRESHOLD = 200 # 过曝红色判断阈值默认200接近白色时的阈值
LASER_OVEREXPOSED_DIFF = 10 # 过曝红色时r 与 g/b 的最小差值默认10
LASER_REQUIRE_IN_ELLIPSE = False # 是否要求激光点必须在黄心椭圆内True=必须False=不要求)
LASER_USE_ELLIPSE_FITTING = True # 是否使用椭圆拟合方法查找激光点True=椭圆拟合更准确False=最亮点方法)
LASER_MIN_AREA = 5 # 激光点区域的最小面积像素小于此值认为是噪声默认5
LASER_DRAW_ELLIPSE = True # 是否在图像上绘制激光点的拟合椭圆True=绘制False=不绘制)
# ==================== 视觉检测配置 ====================
FOCAL_LENGTH_PIX = 2250.0 # 焦距(像素)
REAL_RADIUS_CM = 20 # 靶心实际半径(厘米)
# 图像清晰度检测配置
IMAGE_SHARPNESS_THRESHOLD = 100.0 # 清晰度阈值,低于此值认为图像模糊
# 清晰图像通常 > 200模糊图像通常 < 100
# 激光与摄像头物理位置配置
LASER_CAMERA_OFFSET_CM = 1.4 # 激光在摄像头下方的物理距离(厘米),正值表示激光在摄像头下方
IMAGE_CENTER_X = 320 # 图像中心 X 坐标
IMAGE_CENTER_Y = 240 # 图像中心 Y 坐标
# ==================== 显示配置 ====================
LASER_COLOR = (0, 255, 0) # RGB颜色
LASER_THICKNESS = 1
LASER_LENGTH = 2
# ==================== 图像保存配置 ====================
SAVE_IMAGE_ENABLED = True # 是否保存图像True=保存False=不保存)
PHOTO_DIR = "/root/phot" # 照片存储目录
MAX_IMAGES = 1000
# ==================== OTA配置 ====================
MAX_BACKUPS = 5
LOG_MAX_BYTES = 10 * 1024 * 1024 # 10MB
LOG_BACKUP_COUNT = 5
# ==================== 引脚映射配置 ====================
# 无WiFi模块的引脚映射I2C1
PIN_MAPPINGS_NO_WIFI = {
"A18": "UART1_RX",
"A19": "UART1_TX",
"A29": "UART2_RX",
"A28": "UART2_TX",
"P18": "I2C1_SCL",
"P21": "I2C1_SDA",
}
# 有WiFi模块的引脚映射I2C5
PIN_MAPPINGS_WITH_WIFI = {
"A18": "UART1_RX",
"A19": "UART1_TX",
"A29": "UART2_RX",
"A28": "UART2_TX",
"A15": "I2C5_SCL",
"A27": "I2C5_SDA",
}
# 根据WiFi模块开关选择引脚映射
PIN_MAPPINGS = PIN_MAPPINGS_WITH_WIFI if HAS_WIFI_MODULE else PIN_MAPPINGS_NO_WIFI

68
cpp_ext/CMakeLists.txt Normal file
View File

@@ -0,0 +1,68 @@
cmake_minimum_required(VERSION 3.16)
project(archery_netcore CXX)
set(CMAKE_SYSTEM_NAME Linux)
set(CMAKE_SYSTEM_PROCESSOR riscv64)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_POSITION_INDEPENDENT_CODE ON)
if(NOT DEFINED PY_INCLUDE_DIR)
message(FATAL_ERROR "PY_INCLUDE_DIR not set")
endif()
if(NOT DEFINED PY_LIB)
message(FATAL_ERROR "PY_LIB not set")
endif()
if(NOT DEFINED PY_EXT_SUFFIX)
message(FATAL_ERROR "PY_EXT_SUFFIX not set")
endif()
if(NOT DEFINED MAIXCDK_PATH)
message(FATAL_ERROR "MAIXCDK_PATH not set (need components/3rd_party/pybind11)")
endif()
add_library(archery_netcore MODULE
archery_netcore.cpp
native_logger.cpp
)
target_include_directories(archery_netcore PRIVATE
"${PY_INCLUDE_DIR}"
"${MAIXCDK_PATH}/components/3rd_party/pybind11/pybind11/include"
"${MAIXCDK_PATH}/components/3rd_party/openssl/include"
"${CMAKE_CURRENT_SOURCE_DIR}/third_party" # 添加 nlohmann/json 路径
)
# 尽量减少 .so 体积并增加逆向成本
target_compile_options(archery_netcore PRIVATE
-Os
-ffunction-sections
-fdata-sections
-fvisibility=hidden
-fvisibility-inlines-hidden
)
target_link_options(archery_netcore PRIVATE
-Wl,--gc-sections
-Wl,-s
)
set_target_properties(archery_netcore PROPERTIES
PREFIX ""
SUFFIX "${PY_EXT_SUFFIX}"
)
# OpenSSL (for AES-256-GCM decrypt)
# 使用 MaixCDK 提供的 OpenSSL 库(在 so/maixcam 目录下)
set(OPENSSL_LIB_DIR "${MAIXCDK_PATH}/components/3rd_party/openssl/so/maixcam")
if(EXISTS "${OPENSSL_LIB_DIR}/libcrypto.so")
target_link_directories(archery_netcore PRIVATE "${OPENSSL_LIB_DIR}")
target_link_libraries(archery_netcore PRIVATE "${PY_LIB}" crypto ssl)
message(STATUS "Using OpenSSL from MaixCDK: ${OPENSSL_LIB_DIR}")
else()
# Fallback: 尝试 find_package 或系统库
find_package(OpenSSL QUIET)
if(OpenSSL_FOUND)
target_link_libraries(archery_netcore PRIVATE "${PY_LIB}" OpenSSL::Crypto OpenSSL::SSL)
else()
message(WARNING "OpenSSL not found in MaixCDK, trying system libraries (may fail)")
target_link_libraries(archery_netcore PRIVATE "${PY_LIB}" crypto ssl)
endif()
endif()

404
cpp_ext/archery_netcore.cpp Normal file
View File

@@ -0,0 +1,404 @@
#include <pybind11/pybind11.h>
#include <pybind11/stl.h> // 支持 std::vector, std::map 等
#include <nlohmann/json.hpp>
#include <cstring>
#include <cstdint>
#include <vector>
#include <string>
#include <fstream>
#include <array>
#include <algorithm>
#include <openssl/evp.h>
#include "native_logger.hpp"
namespace py = pybind11;
using json = nlohmann::json;
namespace {
// 配置项
const std::string _cfg_server_ip = "www.shelingxingqiu.com";
const int _cfg_server_port = 50005;
// OTA AEAD format: MAGIC(7) | nonce(12) | ciphertext(N) | tag(16)
constexpr const char* kOtaMagic = "AROTAE1";
constexpr size_t kOtaMagicLen = 7;
constexpr size_t kGcmNonceLen = 12;
constexpr size_t kGcmTagLen = 16;
// 固定 32-byte AES-256-GCM key提高被直接查看的成本不是绝对安全
// 注意:需要与打包端传入的 --aead-key-hex 保持一致。
static std::array<uint8_t, 32> ota_key_bytes() {
// 简单拆分混淆key = a XOR b
static const std::array<uint8_t, 32> a = {
0x92,0x99,0x4d,0x06,0x6f,0xb6,0xa6,0x3d,0x85,0x08,0xbe,0x73,0x5e,0x73,0x4d,0x8a,
0x53,0x88,0xe6,0x99,0xfc,0x10,0x29,0xb9,0x16,0x9b,0xe7,0x0c,0x65,0x21,0x1c,0xce
};
static const std::array<uint8_t, 32> b = {
0xcf,0x60,0xa2,0xc2,0x32,0x7a,0x61,0xb0,0x4c,0x8e,0x8a,0x62,0x31,0xc7,0x82,0xff,
0xec,0xac,0xa1,0x04,0x2a,0x4d,0xaa,0xf2,0xb0,0x5b,0x39,0x2b,0xf4,0xb3,0xad,0xad
};
std::array<uint8_t, 32> k{};
for (size_t i = 0; i < k.size(); i++) k[i] = static_cast<uint8_t>(a[i] ^ b[i]);
return k;
}
}
// 定义获取配置的函数
py::dict get_config() {
py::dict config;
config["SERVER_IP"] = _cfg_server_ip;
config["SERVER_PORT"] = _cfg_server_port;
return config;
}
// 辅助函数:将 py::dict 转为 nlohmann::json
json py_dict_to_json(py::dict d) {
json j;
for (auto item : d) {
std::string key = py::str(item.first);
py::object val = py::reinterpret_borrow<py::object>(item.second);
if (py::isinstance<py::dict>(val)) {
j[key] = py_dict_to_json(py::cast<py::dict>(val));
} else if (py::isinstance<py::list>(val)) {
py::list py_list = py::cast<py::list>(val);
json arr = json::array();
for (auto elem : py_list) {
py::object elem_obj = py::reinterpret_borrow<py::object>(elem);
if (py::isinstance<py::dict>(elem_obj)) {
arr.push_back(py_dict_to_json(py::cast<py::dict>(elem_obj)));
} else if (py::isinstance<py::int_>(elem_obj)) {
arr.push_back(py::cast<int64_t>(elem_obj));
} else if (py::isinstance<py::float_>(elem_obj)) {
arr.push_back(py::cast<double>(elem_obj));
} else {
arr.push_back(py::str(elem_obj));
}
}
j[key] = arr;
} else if (py::isinstance<py::int_>(val)) {
j[key] = py::cast<int64_t>(val);
} else if (py::isinstance<py::float_>(val)) {
j[key] = py::cast<double>(val);
} else if (py::isinstance<py::bool_>(val)) {
j[key] = py::cast<bool>(val);
} else if (val.is_none()) {
j[key] = nullptr;
} else {
j[key] = py::str(val);
}
}
return j;
}
// 辅助函数:将 nlohmann::json 转为 py::dict
py::dict json_to_py_dict(const json& j) {
py::dict d;
if (j.is_object()) {
for (auto& item : j.items()) {
std::string key = item.key();
json val = item.value();
if (val.is_object()) {
d[py::str(key)] = json_to_py_dict(val);
} else if (val.is_array()) {
py::list py_list;
for (auto& elem : val) {
if (elem.is_object()) {
py_list.append(json_to_py_dict(elem));
} else if (elem.is_number_integer()) {
py_list.append(py::int_(elem.get<int64_t>()));
} else if (elem.is_number_float()) {
py_list.append(py::float_(elem.get<double>()));
} else if (elem.is_boolean()) {
py_list.append(py::bool_(elem.get<bool>()));
} else if (elem.is_null()) {
py_list.append(py::none());
} else {
py_list.append(py::str(elem.get<std::string>()));
}
}
d[py::str(key)] = py_list;
} else if (val.is_number_integer()) {
d[py::str(key)] = py::int_(val.get<int64_t>());
} else if (val.is_number_float()) {
d[py::str(key)] = py::float_(val.get<double>());
} else if (val.is_boolean()) {
d[py::str(key)] = py::bool_(val.get<bool>());
} else if (val.is_null()) {
d[py::str(key)] = py::none();
} else {
d[py::str(key)] = py::str(val.get<std::string>());
}
}
}
return d;
}
static bool read_file_all(const std::string& path, std::vector<uint8_t>& out) {
std::ifstream ifs(path, std::ios::binary);
if (!ifs) return false;
ifs.seekg(0, std::ios::end);
std::streampos size = ifs.tellg();
if (size <= 0) return false;
ifs.seekg(0, std::ios::beg);
out.resize(static_cast<size_t>(size));
if (!ifs.read(reinterpret_cast<char*>(out.data()), size)) return false;
return true;
}
static bool write_file_all(const std::string& path, const uint8_t* data, size_t len) {
std::ofstream ofs(path, std::ios::binary | std::ios::trunc);
if (!ofs) return false;
ofs.write(reinterpret_cast<const char*>(data), static_cast<std::streamsize>(len));
return static_cast<bool>(ofs);
}
static bool decrypt_ota_file_impl(const std::string& input_path, const std::string& output_zip_path) {
std::vector<uint8_t> in;
if (!read_file_all(input_path, in)) {
netcore::log_error(std::string("decrypt_ota_file: read failed: ") + input_path);
return false;
}
const size_t min_len = kOtaMagicLen + kGcmNonceLen + kGcmTagLen + 1;
if (in.size() < min_len) {
netcore::log_error("decrypt_ota_file: too short");
return false;
}
if (!std::equal(in.begin(), in.begin() + kOtaMagicLen, reinterpret_cast<const uint8_t*>(kOtaMagic))) {
netcore::log_error("decrypt_ota_file: bad magic");
return false;
}
const uint8_t* nonce = in.data() + kOtaMagicLen;
const uint8_t* ct_and_tag = in.data() + kOtaMagicLen + kGcmNonceLen;
const size_t ct_and_tag_len = in.size() - (kOtaMagicLen + kGcmNonceLen);
if (ct_and_tag_len <= kGcmTagLen) {
netcore::log_error("decrypt_ota_file: no ciphertext");
return false;
}
const size_t ciphertext_len = ct_and_tag_len - kGcmTagLen;
const uint8_t* ciphertext = ct_and_tag;
const uint8_t* tag = ct_and_tag + ciphertext_len;
std::vector<uint8_t> plain(ciphertext_len);
int out_len1 = 0;
int out_len2 = 0;
EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new();
if (!ctx) {
netcore::log_error("decrypt_ota_file: EVP_CIPHER_CTX_new failed");
return false;
}
bool ok = false;
auto key = ota_key_bytes();
do {
if (1 != EVP_DecryptInit_ex(ctx, EVP_aes_256_gcm(), nullptr, nullptr, nullptr)) {
netcore::log_error("decrypt_ota_file: DecryptInit failed");
break;
}
if (1 != EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_SET_IVLEN, static_cast<int>(kGcmNonceLen), nullptr)) {
netcore::log_error("decrypt_ota_file: set ivlen failed");
break;
}
if (1 != EVP_DecryptInit_ex(ctx, nullptr, nullptr, key.data(), nonce)) {
netcore::log_error("decrypt_ota_file: set key/iv failed");
break;
}
if (1 != EVP_DecryptUpdate(ctx, plain.data(), &out_len1, ciphertext, static_cast<int>(ciphertext_len))) {
netcore::log_error("decrypt_ota_file: update failed");
break;
}
if (1 != EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_SET_TAG, static_cast<int>(kGcmTagLen), const_cast<uint8_t*>(tag))) {
netcore::log_error("decrypt_ota_file: set tag failed");
break;
}
if (1 != EVP_DecryptFinal_ex(ctx, plain.data() + out_len1, &out_len2)) {
netcore::log_error("decrypt_ota_file: final failed (auth tag mismatch?)");
break;
}
const size_t plain_len = static_cast<size_t>(out_len1 + out_len2);
if (!write_file_all(output_zip_path, plain.data(), plain_len)) {
netcore::log_error(std::string("decrypt_ota_file: write failed: ") + output_zip_path);
break;
}
ok = true;
} while (false);
EVP_CIPHER_CTX_free(ctx);
return ok;
}
// 打包 TCP 数据包
py::bytes make_packet(int msg_type, py::dict body_dict) {
netcore::log_debug(std::string("make_packet msg_type=") + std::to_string(msg_type));
// 1) 将 py::dict 转为 JSON 字符串
json j = py_dict_to_json(body_dict);
std::string body_str = j.dump();
// 2) 计算 body_len 和 checksum
uint32_t body_len = body_str.size();
uint32_t checksum = body_len + msg_type;
// 3) 打包头部(大端序)
std::vector<uint8_t> packet;
packet.reserve(12 + body_len);
// body_len (big-endian, 4 bytes)
packet.push_back((body_len >> 24) & 0xFF);
packet.push_back((body_len >> 16) & 0xFF);
packet.push_back((body_len >> 8) & 0xFF);
packet.push_back(body_len & 0xFF);
// msg_type (big-endian, 4 bytes)
packet.push_back((msg_type >> 24) & 0xFF);
packet.push_back((msg_type >> 16) & 0xFF);
packet.push_back((msg_type >> 8) & 0xFF);
packet.push_back(msg_type & 0xFF);
// checksum (big-endian, 4 bytes)
packet.push_back((checksum >> 24) & 0xFF);
packet.push_back((checksum >> 16) & 0xFF);
packet.push_back((checksum >> 8) & 0xFF);
packet.push_back(checksum & 0xFF);
// 4) 追加 body
packet.insert(packet.end(), body_str.begin(), body_str.end());
netcore::log_debug(std::string("make_packet done bytes=") + std::to_string(packet.size()));
return py::bytes(reinterpret_cast<const char*>(packet.data()), packet.size());
}
// 解析 TCP 数据包
py::tuple parse_packet(py::bytes data) {
// 1) 转换为 bytes view
py::buffer_info buf = py::buffer(data).request();
if (buf.size < 12) {
netcore::log_error(std::string("parse_packet too_short len=") + std::to_string(buf.size));
return py::make_tuple(py::none(), py::none());
}
const uint8_t* ptr = static_cast<const uint8_t*>(buf.ptr);
// 2) 解析头部(大端序)
uint32_t body_len = (ptr[0] << 24) | (ptr[1] << 16) | (ptr[2] << 8) | ptr[3];
uint32_t msg_type = (ptr[4] << 24) | (ptr[5] << 16) | (ptr[6] << 8) | ptr[7];
uint32_t checksum = (ptr[8] << 24) | (ptr[9] << 16) | (ptr[10] << 8) | ptr[11];
// 3) 校验 checksum可选你现有代码不强制校验
// if (checksum != (body_len + msg_type)) {
// return py::make_tuple(py::none(), py::none());
// }
// 4) 检查长度
uint32_t expected_len = 12 + body_len;
if (buf.size < expected_len) {
// 半包
netcore::log_warn(std::string("parse_packet incomplete got=") + std::to_string(buf.size) +
" expected=" + std::to_string(expected_len));
return py::make_tuple(py::none(), py::none());
}
// 5) 防御性检查:如果 data 比预期长,说明可能有粘包
// (只解析第一个包,忽略多余数据)
if (buf.size > expected_len) {
netcore::log_warn(std::string("parse_packet concat got=") + std::to_string(buf.size) +
" expected=" + std::to_string(expected_len) +
" body_len=" + std::to_string(body_len) +
" msg_type=" + std::to_string(msg_type));
}
// 6) 提取 body 并解析 JSON
std::string body_str(reinterpret_cast<const char*>(ptr + 12), body_len);
try {
json j = json::parse(body_str);
py::dict body_dict = json_to_py_dict(j);
return py::make_tuple(py::int_(msg_type), body_dict);
} catch (const json::parse_error& e) {
// JSON 解析失败,返回 raw兼容你现有的逻辑
netcore::log_error(std::string("parse_packet json_parse_error: ") + e.what());
py::dict raw_dict;
raw_dict["raw"] = body_str;
return py::make_tuple(py::int_(msg_type), raw_dict);
} catch (const std::exception& e) {
netcore::log_error(std::string("parse_packet json_parse_error: ") + e.what());
py::dict raw_dict;
raw_dict["raw"] = body_str;
return py::make_tuple(py::int_(msg_type), raw_dict);
}
}
PYBIND11_MODULE(archery_netcore, m) {
m.doc() = "Archery net core (native, pybind11).";
// Optional: configure native logger from Python.
// Default log file: /maixapp/apps/t11/netcore.log
m.def("set_log_file", [](const std::string& path) { netcore::set_log_file(path); }, py::arg("path"));
m.def("set_log_level", [](int level) {
if (level < 0) level = 0;
if (level > 3) level = 3;
netcore::set_log_level(static_cast<netcore::LogLevel>(level));
}, py::arg("level"));
m.def("log_test", [](const std::string& msg) {
netcore::log_info(std::string("log_test: ") + msg);
}, py::arg("msg"));
m.def("make_packet", &make_packet,
"Pack TCP packet: header (len+type+checksum) + JSON body",
py::arg("msg_type"), py::arg("body_dict"));
m.def("parse_packet", &parse_packet,
"Parse TCP packet, return (msg_type, body_dict)");
m.def("get_config", &get_config, "Get system configuration");
m.def(
"decrypt_ota_file",
[](const std::string& input_path, const std::string& output_zip_path) {
netcore::log_info(std::string("decrypt_ota_file in=") + input_path + " out=" + output_zip_path);
return decrypt_ota_file_impl(input_path, output_zip_path);
},
py::arg("input_path"),
py::arg("output_zip_path"),
"Decrypt OTA encrypted file (MAGIC|nonce|ciphertext|tag) to plaintext zip."
);
// Minimal demo: return actions for inner_cmd=41 (manual trigger + ack)
m.def("actions_for_inner_cmd", [](int inner_cmd) {
py::list actions;
if (inner_cmd == 41) {
// 1) set manual trigger flag
{
py::dict a;
a["type"] = "SET_FLAG";
py::dict args;
args["name"] = "manual_trigger_flag";
args["value"] = true;
a["args"] = args;
actions.append(a);
}
// 2) enqueue trigger_ack
{
py::dict a;
a["type"] = "ENQUEUE";
py::dict args;
args["msg_type"] = 2;
args["high"] = false;
py::dict body;
body["result"] = "trigger_ack";
args["body"] = body;
a["args"] = args;
actions.append(a);
}
}
return actions;
});
}

100
cpp_ext/native_logger.cpp Normal file
View File

@@ -0,0 +1,100 @@
#include "native_logger.hpp"
#include <cerrno>
#include <cstring>
#include <mutex>
#include <string>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
namespace netcore {
static std::mutex g_mu;
static int g_fd = -1;
static std::string g_path = "netcore.log";
static LogLevel g_level = LogLevel::kDebug; //LogLevel::kInfo;
static const char* level_name(LogLevel lvl) {
switch (lvl) {
case LogLevel::kError: return "E";
case LogLevel::kWarn: return "W";
case LogLevel::kInfo: return "I";
case LogLevel::kDebug: return "D";
default: return "?";
}
}
static void ensure_open_locked() {
if (g_path.empty()) return;
if (g_fd >= 0) return;
g_fd = ::open(g_path.c_str(), O_CREAT | O_WRONLY | O_APPEND, 0644);
}
void set_log_file(const std::string& path) {
std::lock_guard<std::mutex> lk(g_mu);
g_path = path;
if (g_fd >= 0) {
::close(g_fd);
g_fd = -1;
}
ensure_open_locked();
}
void set_log_level(LogLevel level) {
std::lock_guard<std::mutex> lk(g_mu);
g_level = level;
}
void log(LogLevel level, const std::string& msg) {
std::lock_guard<std::mutex> lk(g_mu);
if (static_cast<int>(level) > static_cast<int>(g_level)) return;
if (g_path.empty()) return;
ensure_open_locked();
if (g_fd < 0) {
// Last resort: stderr (avoid any Python APIs)
::write(STDERR_FILENO, msg.c_str(), msg.size());
::write(STDERR_FILENO, "\n", 1);
return;
}
// Timestamp: epoch milliseconds (simple and cheap)
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
// long long ms = (long long)ts.tv_sec * 1000LL + ts.tv_nsec / 1000000LL;
// 1. 将秒数转换为本地时间结构体 struct tm
struct tm *tm_info = localtime(&ts.tv_sec);
// 2. 准备一个缓冲区来存储时间字符串
char buffer[30];
// 3. 格式化秒的部分
// 格式: 年-月-日 时:分:秒
strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", tm_info);
// 4. 计算毫秒部分并追加到字符串中
// ts.tv_nsec 是纳秒,除以 1,000,000 得到毫秒
char ms_buffer[8];
snprintf(ms_buffer, sizeof(ms_buffer), ".%03ld", ts.tv_nsec / 1000000);
// Build one line to keep writes atomic-ish
char head[256];
int n = ::snprintf(head, sizeof(head), "[%s%s] [%s] ", buffer, ms_buffer, level_name(level));
if (n < 0) n = 0;
::write(g_fd, head, (size_t)n);
::write(g_fd, msg.c_str(), msg.size());
::write(g_fd, "\n", 1);
}
void log_debug(const std::string& msg) { log(LogLevel::kDebug, msg); }
void log_info (const std::string& msg) { log(LogLevel::kInfo, msg); }
void log_warn (const std::string& msg) { log(LogLevel::kWarn, msg); }
void log_error(const std::string& msg) { log(LogLevel::kError, msg); }
} // namespace netcore

28
cpp_ext/native_logger.hpp Normal file
View File

@@ -0,0 +1,28 @@
#pragma once
#include <string>
namespace netcore {
enum class LogLevel : int {
kError = 0,
kWarn = 1,
kInfo = 2,
kDebug = 3,
};
// Set log file path. If empty, logging is disabled.
void set_log_file(const std::string& path);
// Set minimum log level to write (default: kInfo).
void set_log_level(LogLevel level);
// Log helpers (thread-safe, never calls into Python).
void log(LogLevel level, const std::string& msg);
void log_debug(const std::string& msg);
void log_info(const std::string& msg);
void log_warn(const std::string& msg);
void log_error(const std::string& msg);
} // namespace netcore

24765
cpp_ext/third_party/nlohmann/json.hpp vendored Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,23 @@
1. CPP构建命令
cd /mnt/d/code/archery/cpp_ext
rm -rf build && mkdir build && cd build
TOOLCHAIN_BIN=/mnt/d/code/MaixCDK/dl/extracted/toolchains/maixcam/host-tools/gcc/riscv64-linux-musl-x86_64/bin
PYDEV=/mnt/d/code/shooting/python3_lib_maixcam_musl_3.11.6
MAIXCDK=/mnt/d/code/MaixCDK
cmake .. -G Ninja \
-DCMAKE_C_COMPILER="${TOOLCHAIN_BIN}/riscv64-unknown-linux-musl-gcc" \
-DCMAKE_CXX_COMPILER="${TOOLCHAIN_BIN}/riscv64-unknown-linux-musl-g++" \
-DCMAKE_BUILD_TYPE=Release \
-DCMAKE_C_FLAGS="-mcpu=c906fdv -march=rv64imafdcv0p7xthead -mcmodel=medany -mabi=lp64d" \
-DCMAKE_CXX_FLAGS="-mcpu=c906fdv -march=rv64imafdcv0p7xthead -mcmodel=medany -mabi=lp64d" \
-DPY_INCLUDE_DIR="${PYDEV}/include/python3.11" \
-DPY_LIB="${PYDEV}/lib/libpython3.11.so" \
-DPY_EXT_SUFFIX=".cpython-311-riscv64-linux-gnu.so" \
-DMAIXCDK_PATH="${MAIXCDK}"
ninja

View File

@@ -0,0 +1,13 @@
1. OTA 下载的时候,为什么使用十六进制下载,读取 URC 事件?
因为使用二进制下载的时候,经常会出现错误,并且会失败?然后最稳定传输的办法,是每次传输的时候,是分块,而且每次分块都要“删/建”http实例。推测原因是因为我们现在是直接传输文件的源代码代码中含有了一些字符串可能和 AT指令重复导致了 AT 模块在解释的时候出错。而使用 16 进制的方式,可以避免这个问题。因为十六进制直接把数据先转成了字符串,然后在设备端再把字符串转成数据,这样就不可能出现 AT的指令从而减少了麻烦。
2. OTA 下载的时候,为什么不用 AT 模块里 HTTPDLFILE 的指令?
因为在测试中发现,使用 HTTPDLFILE其实是下载到了 4G 模块内部,需要重新从模块内部转到存储卡,而且 4G 模块的存储较小,大概只有 40k所以还需要分块来下载和转存比较麻烦于是最终使用了使用读取串口事件的模式。
3. OTA 下载的时候,为什么不用 AT 模块里 HTTPREAD 的指令?
因为之前测试发现READ模式其实是需要多步
3.1. AT+MHTTPCREATE
3.2. AT+MHTTPCFG
3.3. AT+MHTTPREQUEST
3.4. AT+MHTTPREAD
它其实也是把数据下载到 4g 模块的缓存里,然后再从缓存里读取出来。所以也是比较繁琐的,还不如 HTTPDLFILE 简单。
4.
4.

41
design_doc/todo.md Normal file
View File

@@ -0,0 +1,41 @@
你现在要防的是“别人拿到设备/拿到代码包后,能伪造请求、刷接口、下发恶意 OTA、甚至劫持通信”。单靠隐藏 Python 源码只能提高门槛,真正的安全要靠协议和密钥设计。结合你仓库里实际内容,建议你重点隐藏/整改这些点(按风险排序)。
1. 必须隐藏/必须整改(高风险)
1.1 登录口令规则太弱(几乎等于明文)
你现在的登录是 password = device_id + "."(见 network.py 读取设备 ID 后直接拼出来),这意味着只要攻击者知道/猜到 device_id就能直接登录伪装设备。
相关位置:
with open("/device_key", "r") as f: device_id = f.read().strip() ... self._device_id = device_id self._password = device_id + "."
1.2 HTTP 鉴权 token 的盐值是硬编码常量(泄露后可离线伪造)
你 token 是 HMAC-SHA256((SALT+device_id), SALT2),而 SALT/SALT2 是固定字符串:"shootMessageFire" / "shoot"。这类“硬编码盐值 + 可猜/可读的 device_id”意味着攻击者只要拿到代码包/逆向 .so就能在自己电脑上批量算 token伪造 HTTP 请求。
相关位置:
SALT = "shootMessageFire"SALT2 = "shoot"return "Arrow_" + hmac.new((SALT + device_id).encode(), SALT2.encode(), hashlib.sha256).hexdigest()
1.3 TLS 配置目前看起来没有做证书校验(容易被中间人攻击)
config.py 虽然 USE_TCP_SSL=True但你在 network.py 里实际把 MSSLCFG="auth" 固定成 0不验且写证书分支被 if False 禁用了。这样“看起来是 TLS”但仍可能被抓包/篡改/假服务器接入。
相关位置:
r = hardware_manager.at_client.send(f'AT+MSSLCFG="auth",{ssl_id},0', "OK", 3000)...if False: # 写证书/校验被禁用 ...r = hardware_manager.at_client.send(f'AT+MIPCFG="ssl",{link_id},{ssl_id},1', "OK", 3000)
1.4 OTA 下发“url”如果缺少强校验就是远程代码执行入口
你 OTA 逻辑里从服务器指令拿到 url 就去下载并替换文件/重启(这是正常 OTA但安全性取决于
是否只允许白名单域名/路径
是否强制 https 并校验服务器证书
是否对 OTA 包做签名校验(最关键)
你这里能看到固定域名 static.shelingxingqiu.com 的特殊处理ota_manager.py 里还在纠结 http/https这块一定要“服务端签名 + 设备端验签”,否则隐藏源码也没用。
2. 建议隐藏(中风险,但很容易被人利用)
2.1 所有服务器地址/端口/API 路径(可被用于扫描、压测、撞库、协议逆向)
这些在 config.py 是明文:
SERVER_IP = "stcp.shelingxingqiu.com"SERVER_PORT = 50005HTTP_URL = "http://ws.shelingxingqiu.com"HTTP_API_PATH = "/home/shoot/device_fire/arrow/fire"
注意:即使你把它们藏进 .so攻击者仍可能通过抓包/观察 DNS/SNI/流量拿到域名或 IP所以“隐藏”只能降低静态分析风险不能替代鉴权/签名。
2.2 WiFi 凭证落盘位置
你会把 SSID/密码写到 /boot/wifi.ssid 和 /boot/wifi.passnetwork.py/wifi.py 都有)。拿到设备存储就能读到明文密码,这属于设备侧安全问题。
2.3 日志/调试信息泄露
你仓库里 .cursor/debug.log* 已经记录了完整的 AT+MIPOPEN ... host:port 等信息;如果这些被打进包或被用户导出,也会泄露通信细节。
你发布包里“真正会被带走的敏感内容”
你现在 app.yaml 的打包清单只包含核心运行文件(不包含 test/、backup_code/、.cursor/ 等),这一点很好:
files: - app.yaml - at_client.py - camera_manager.py - config.py - hardware.py - laser_manager.py - logger_manager.py - main.py - network.py - ota_manager.py - power.py - shot_id_generator.py - time_sync.py - version.py - vision.py
但要注意:你当前最敏感的“盐值/口令规则/HTTP header 逻辑/OTA 策略”都在这些会打包的文件里(尤其是 network.py、ota_manager.py、config.py
3. 最有效的“防黑”措施(比隐藏更关键)
把“password=device_id+.” 改为真正的 per-device secret设备出厂烧录服务端保存或服务端下发公钥设备用私钥签名
所有上报/控制命令加签名 + nonce/timestamp + 服务端防重放(别人抓到一次包也不能复用)
OTA 包必须做签名校验(设备端内置公钥,下载后验签通过才应用)
TLS 必须做证书校验/最好做 pinning至少别用 auth=0
如果你告诉我:你们服务端目前能不能改协议(例如新增签名字段、下发 challenge、做 OTA 签名),我可以按“最小改动但提升最大安全”的顺序,帮你规划一套从现状平滑升级的方案。

119
hardware.py Normal file
View File

@@ -0,0 +1,119 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
硬件管理器模块
提供硬件对象的统一管理和访问
"""
import threading
import config
from at_client import ATClient
class HardwareManager:
"""硬件管理器(单例)"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(HardwareManager, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# 私有硬件对象
self._uart4g = None # 4G模块UART
self._distance_serial = None # 激光测距串口
self._bus = None # I2C总线
self._adc_obj = None # ADC对象
self._at_client = None # AT客户端
self._initialized = True
# ==================== 硬件访问(只读属性)====================
@property
def uart4g(self):
"""4G模块UART只读"""
return self._uart4g
@property
def distance_serial(self):
"""激光测距串口(只读)"""
return self._distance_serial
@property
def bus(self):
"""I2C总线只读"""
return self._bus
@property
def adc_obj(self):
"""ADC对象只读"""
return self._adc_obj
@property
def at_client(self):
"""AT客户端只读"""
return self._at_client
# ==================== 初始化方法 ====================
def init_uart4g(self, device=None, baudrate=None):
"""初始化4G模块UART"""
from maix import uart
if device is None:
device = config.UART4G_DEVICE
if baudrate is None:
baudrate = config.UART4G_BAUDRATE
self._uart4g = uart.UART(device, baudrate)
return self._uart4g
def init_distance_serial(self, device=None, baudrate=None):
"""初始化激光测距串口(激光控制)"""
from maix import uart
if device is None:
device = config.DISTANCE_SERIAL_DEVICE
if baudrate is None:
baudrate = config.DISTANCE_SERIAL_BAUDRATE
print(f"[HW] 初始化激光串口: device={device}, baudrate={baudrate}")
self._distance_serial = uart.UART(device, baudrate)
print(f"[HW] 激光串口初始化完成: {self._distance_serial}")
return self._distance_serial
def init_bus(self, bus_num=None):
"""初始化I2C总线"""
from maix import i2c
if bus_num is None:
bus_num = config.I2C_BUS_NUM
self._bus = i2c.I2C(bus_num, i2c.Mode.MASTER)
return self._bus
def init_adc(self, channel=None, res_bit=None):
"""初始化ADC"""
from maix.peripheral import adc
if channel is None:
channel = config.ADC_CHANNEL
if res_bit is None:
res_bit = adc.RES_BIT_12
self._adc_obj = adc.ADC(channel, res_bit)
return self._adc_obj
def init_at_client(self, uart_obj=None):
"""初始化AT客户端"""
if uart_obj is None:
if self._uart4g is None:
raise ValueError("uart4g must be initialized before at_client")
uart_obj = self._uart4g
self._at_client = ATClient(uart_obj)
self._at_client.start()
return self._at_client
# 创建全局单例实例
hardware_manager = HardwareManager()

826
laser.py
View File

@@ -1,826 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
激光射击系统主程序(激光测距版)
功能目标检测、激光校准、4G TCP 通信、OTA 升级、M01 激光测距、INA226 电量监测
平台MaixPy (Sipeed MAIX)
作者ZZH
最后更新2025-11-21
"""
from maix import camera, display, image, app, time, key, uart, pinmap, i2c, network, err
import cv2
import numpy as np
import json
import struct
import re
from maix.peripheral import adc
import _thread
import os
import requests
import socket
import binascii
# ==============================
# 全局配置
# ==============================
# OTA 升级地址(建议后续改为动态下发)
url = "https://static.shelingxingqiu.com/shoot/202511031031/main.py"
local_filename = "/maixapp/apps/t11/main.py"
DEVICE_ID = None
PASSWORD = None
SERVER_IP = "www.shelingxingqiu.com"
SERVER_PORT = 50005
HEARTBEAT_INTERVAL = 2 # 心跳间隔(秒)
CONFIG_FILE = "/root/laser_config.json"
DEFAULT_POINT = (640, 480) # 图像中心点
laser_point = DEFAULT_POINT
# HTTP API当前未使用保留备用
URL = "http://ws.shelingxingqiu.com"
API_PATH = "/home/shoot/device_fire/arrow/fire"
# UART 设备初始化
uart4g = uart.UART("/dev/ttyS2", 115200) # 4G 模块TCP 透传)
distance_serial = uart.UART("/dev/ttyS1", 9600) # M01 激光测距模块
# 消息类型常量
MSG_TYPE_LOGIN_REQ = 1 # 登录请求
MSG_TYPE_STATUS = 2 # 状态上报
MSG_TYPE_HEARTBEAT = 4 # 心跳包
# 引脚功能映射
pinmap.set_pin_function("A18", "UART1_RX")
pinmap.set_pin_function("A19", "UART1_TX")
pinmap.set_pin_function("A29", "UART2_RX")
pinmap.set_pin_function("A28", "UART2_TX")
pinmap.set_pin_function("P18", "I2C1_SCL")
pinmap.set_pin_function("P21", "I2C1_SDA")
# pinmap.set_pin_function("A15", "I2C5_SCL")
# pinmap.set_pin_function("A27", "I2C5_SDA")#ota升级要修改的
# ADC 触发阈值(用于检测扳机/激光触发)
ADC_TRIGGER_THRESHOLD = 3000
ADC_LASER_THRESHOLD = 3000
# 显示参数
color = image.Color(255, 100, 0) # 橙色十字线
thickness = 1
length = 2
# ADC 扳机触发阈值0~4095
ADC_TRIGGER_THRESHOLD = 3000
# I2C 电源监测INA226
adc_obj = adc.ADC(0, adc.RES_BIT_12)
bus = i2c.I2C(1, i2c.Mode.MASTER)
# bus = i2c.I2C(5, i2c.Mode.MASTER)#ota升级总线
INA226_ADDR = 0x40
REG_CONFIGURATION = 0x00
REG_BUS_VOLTAGE = 0x02
REG_CALIBRATION = 0x05
CALIBRATION_VALUE = 0x1400
# M01 激光模块指令
MODULE_ADDR = 0x00
LASER_ON_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC1])
LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0])
DISTANCE_QUERY_CMD = bytes([0xAA, MODULE_ADDR, 0x00, 0x20, 0x00, 0x01, 0x00, 0x00, 0x21])
DISTANCE_RESPONSE_LEN = 13
# TCP / 线程状态
tcp_connected = False
send_queue = []
update_thread_started = False # 防止重复 OTA
send_queue_lock = _thread.allocate_lock()
laser_calibration_data_lock = _thread.allocate_lock()
laser_calibration_active = False
laser_calibration_result = None
# ==============================
# 网络工具函数
# ==============================
def is_server_reachable(host, port=80, timeout=5):
"""检查能否连接到指定主机和端口(用于 OTA 前网络检测)"""
try:
addr_info = socket.getaddrinfo(host, port)[0]
s = socket.socket(addr_info[0], addr_info[1], addr_info[2])
s.settimeout(timeout)
s.connect(addr_info[-1])
s.close()
return True
except Exception as e:
print(f"[NET] 无法连接 {host}:{port} - {e}")
return False
def download_file(url, filename):
"""
从指定 URL 下载文件并保存为 UTF-8 文本。
注意:此操作会覆盖本地 main.py
"""
try:
print(f"[OTA] 正在从 {url} 下载文件...")
response = requests.get(url, timeout=10) # ⏱️ 防止卡死
response.raise_for_status()
response.encoding = 'utf-8'
with open(filename, 'w', encoding='utf-8') as file:
file.write(response.text)
return f"下载成功!文件已保存为: {filename}"
except requests.exceptions.RequestException as e:
return f"下载失败!网络请求错误: {e}"
except OSError as e:
return f"下载失败!文件写入错误: {e}"
except Exception as e:
return f"下载失败!发生未知错误: {e}"
def connect_wifi(ssid, password):
"""
连接 Wi-Fi 并持久化凭证到 /boot/ 目录,使设备重启后自动连接。
返回 (ip, error) 元组。
"""
conf_path = "/etc/wpa_supplicant.conf"
ssid_file = "/boot/wifi.ssid"
pass_file = "/boot/wifi.pass"
try:
# 生成 wpa_supplicant 配置
net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read()
if "network={" not in net_conf:
return None, "Failed to generate wpa config"
# 写入运行时配置
with open(conf_path, "w") as f:
f.write("ctrl_interface=/var/run/wpa_supplicant\n")
f.write("update_config=1\n\n")
f.write(net_conf)
# 持久化保存(供开机脚本读取)
with open(ssid_file, "w") as f:
f.write(ssid.strip())
with open(pass_file, "w") as f:
f.write(password.strip())
# 重启 Wi-Fi 服务
os.system("/etc/init.d/S30wifi restart")
# 等待获取 IP最多 20 秒)
for _ in range(20):
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
if ip:
return ip, None
time.sleep(1)
return None, "Timeout: No IP obtained"
except Exception as e:
return None, f"Exception: {str(e)}"
def direct_ota_download():
"""
直接执行 OTA 下载(假设已有网络)
用于 cmd=7 触发
"""
global update_thread_started
try:
# 再次确认网络可达(可选但推荐)
from urllib.parse import urlparse
parsed_url = urlparse(url)
host = parsed_url.hostname
port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80)
if not is_server_reachable(host, port, timeout=8):
safe_enqueue({"result": "ota_failed", "reason": f"无法连接 {host}:{port}"}, MSG_TYPE_STATUS)
return
print(f"[OTA] 开始直接下载固件...")
result_msg = download_file(url, local_filename)
print(f"[OTA] {result_msg}")
safe_enqueue({"result": result_msg}, MSG_TYPE_STATUS)
except Exception as e:
error_msg = f"OTA 异常: {str(e)}"
print(error_msg)
safe_enqueue({"result": "ota_failed", "reason": error_msg}, MSG_TYPE_STATUS)
finally:
update_thread_started = False # 允许下次 OTA
def handle_wifi_and_update(ssid, password):
"""
OTA 更新线程入口。
注意:必须在 finally 中重置 update_thread_started
"""
global update_thread_started
try:
ip, error = connect_wifi(ssid, password)
if error:
safe_enqueue({"result": "wifi_failed", "error": error}, MSG_TYPE_STATUS)
return
safe_enqueue({"result": "wifi_connected", "ip": ip}, MSG_TYPE_STATUS)
from urllib.parse import urlparse
parsed_url = urlparse(url)
host = parsed_url.hostname
port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80)
if not is_server_reachable(host, port, timeout=8):
err_msg = f"网络不通:无法连接 {host}:{port}"
safe_enqueue({"result": err_msg}, MSG_TYPE_STATUS)
return
print(f"[OTA] 已确认可访问 {host}:{port},开始下载...")
try:
cs = download_file(url, local_filename)
except Exception as e:
cs = f"下载失败: {str(e)}"
print(cs)
safe_enqueue({"result": cs}, MSG_TYPE_STATUS)
finally:
# ✅ 关键修复:允许下次 OTA
update_thread_started = False
print("[UPDATE] OTA 线程执行完毕,标志已重置。")
# ==============================
# 工具函数
# ==============================
def read_device_id():
"""从 /device_key 读取设备唯一 ID"""
try:
with open("/device_key", "r") as f:
device_id = f.read().strip()
if device_id:
print(f"[INFO] 从 /device_key 读取到 DEVICE_ID: {device_id}")
return device_id
else:
raise ValueError("文件为空")
except Exception as e:
print(f"[ERROR] 无法读取 /device_key: {e}")
return "DEFAULT_DEVICE_ID"
def safe_enqueue(data_dict, msg_type=MSG_TYPE_STATUS):
"""线程安全地将消息加入发送队列"""
global send_queue, send_queue_lock
with send_queue_lock:
send_queue.append((msg_type, data_dict))
def at(cmd, wait="OK", timeout=2000):
"""向 4G 模块发送 AT 指令并等待响应"""
if cmd:
uart4g.write((cmd + "\r\n").encode())
t0 = time.ticks_ms()
buf = b""
while time.ticks_ms() - t0 < timeout:
data = uart4g.read()
if data:
buf += data
if wait.encode() in buf:
return buf.decode(errors="ignore")
return buf.decode(errors="ignore")
def make_packet(msg_type: int, body_dict: dict) -> bytes:
"""构造二进制数据包:[body_len][msg_type][checksum][body]"""
body = json.dumps(body_dict, ensure_ascii=False).encode('utf-8')
body_len = len(body)
checksum = body_len + msg_type
header = struct.pack(">III", body_len, msg_type, checksum)
return header + body
def parse_packet(data: bytes):
"""解析二进制数据包"""
if len(data) < 12:
return None, None
body_len, msg_type, checksum = struct.unpack(">III", data[:12])
body = data[12:12 + body_len]
try:
# ✅ 显式指定 UTF-8 编码
return msg_type, json.loads(body.decode('utf-8'))
except Exception as e:
print(f"[ERROR] 解析包体失败: {e}")
return msg_type, {"raw": body.decode('utf-8', errors='ignore')}
def tcp_send_raw(data: bytes, max_retries=2) -> bool:
"""通过 4G 模块发送原始 TCP 数据(仅在 tcp_main 线程调用)"""
global tcp_connected
if not tcp_connected:
return False
for attempt in range(max_retries):
cmd = f'AT+MIPSEND=0,{len(data)}'
if ">" not in at(cmd, ">", 1500):
time.sleep_ms(100)
continue
time.sleep_ms(10)
full = data + b"\x1A"
try:
sent = uart4g.write(full)
if sent != len(full):
time.sleep_ms(100)
continue
except:
time.sleep_ms(100)
continue
if "OK" in at("", "OK", 1000):
return True
time.sleep_ms(100)
return False
def load_laser_point():
"""从配置文件加载激光点坐标"""
global laser_point
try:
if "laser_config.json" in os.listdir("/root"):
with open(CONFIG_FILE, "r") as f:
data = json.load(f)
if isinstance(data, list) and len(data) == 2:
laser_point = (int(data[0]), int(data[1]))
print(f"[INFO] 加载激光点: {laser_point}")
else:
raise ValueError
else:
laser_point = DEFAULT_POINT
except:
laser_point = DEFAULT_POINT
def save_laser_point(point):
"""保存激光点坐标到文件"""
global laser_point
try:
with open(CONFIG_FILE, "w") as f:
json.dump([point[0], point[1]], f)
laser_point = point
except:
pass
def turn_on_laser():
"""发送激光开启指令"""
distance_serial.write(LASER_ON_CMD)
time.sleep_ms(10)
resp = distance_serial.read(20)
if resp:
if resp == LASER_ON_CMD:
print("✅ 激光指令已确认")
else:
print("🔇 无回包(正常或模块不支持)")
return resp
# ==============================
# M01 激光测距模块
# ==============================
def parse_bcd_distance(bcd_bytes: bytes) -> float:
"""将 4 字节 BCD 码转换为距离(米)"""
if len(bcd_bytes) != 4:
return 0.0
try:
hex_string = binascii.hexlify(bcd_bytes).decode()
distance_int = int(hex_string)
return distance_int / 1000.0
except Exception as e:
print(f"[ERROR] BCD 解析失败: {e}")
return 0.0
def read_distance_from_laser_sensor():
"""发送测距指令并返回距离(米)"""
global distance_serial
try:
distance_serial.read() # 清空缓冲区
distance_serial.write(DISTANCE_QUERY_CMD)
time.sleep_ms(500)
response = distance_serial.read(DISTANCE_RESPONSE_LEN)
if response and len(response) == DISTANCE_RESPONSE_LEN:
if response[3] != 0x20:
if response[0] == 0xEE:
err_code = (response[7] << 8) | response[8]
print(f"[LASER] 模块错误代码: {hex(err_code)}")
return 0.0
bcd_bytes = response[6:10]
distance_value_m = parse_bcd_distance(bcd_bytes)
signal_quality = (response[10] << 8) | response[11]
print(f"[LASER] 测距成功: {distance_value_m:.3f} m, 信号质量: {signal_quality}")
return distance_value_m
print(f"[LASER] 无效响应: {response.hex() if response else 'None'}")
return 0.0
except Exception as e:
print(f"[ERROR] 读取激光测距失败: {e}")
return 0.0
# ==============================
# 激光点校准
# ==============================
def find_red_laser(frame, threshold=150):
"""在图像中查找最亮的红色点(简单 RGB 判定)"""
w, h = frame.width(), frame.height()
img_bytes = frame.to_bytes()
max_sum = 0
best_pos = None
for y in range(0, h, 2):
for x in range(0, w, 2):
idx = (y * w + x) * 3
r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2]
if r > threshold and r > g * 2 and r > b * 2:
rgb_sum = r + g + b
if rgb_sum > max_sum:
max_sum = rgb_sum
best_pos = (x, y)
return best_pos
def calibrate_laser_position():
"""拍摄一帧并识别激光点位置"""
time.sleep_ms(80)
cam = camera.Camera(640, 480)
frame = cam.read()
pos = find_red_laser(frame)
if pos:
save_laser_point(pos)
return pos
return None
# ==============================
# 电量监测INA226
# ==============================
def write_register(reg, value):
data = [(value >> 8) & 0xFF, value & 0xFF]
bus.writeto_mem(INA226_ADDR, reg, bytes(data))
def read_register(reg):
data = bus.readfrom_mem(INA226_ADDR, reg, 2)
return (data[0] << 8) | data[1]
def init_ina226():
write_register(REG_CONFIGURATION, 0x4527)
write_register(REG_CALIBRATION, CALIBRATION_VALUE)
def get_bus_voltage():
raw = read_register(REG_BUS_VOLTAGE)
return raw * 1.25 / 1000
def voltage_to_percent(voltage):
points = [
(4.20, 100), (4.10, 95), (4.05, 85), (4.00, 75), (3.95, 65),
(3.90, 55), (3.85, 45), (3.80, 35), (3.75, 25), (3.70, 15),
(3.65, 5), (3.60, 0)
]
if voltage >= points[0][0]: return 100
if voltage <= points[-1][0]: return 0
for i in range(len(points) - 1):
v1, p1 = points[i]; v2, p2 = points[i + 1]
if v2 <= voltage <= v1:
ratio = (voltage - v1) / (v2 - v1)
percent = p1 + (p2 - p1) * ratio
return max(0, min(100, int(round(percent))))
return 0
# ==============================
# 目标检测
# ==============================
def detect_circle(frame):
"""检测靶心圆(清晰/模糊两种模式)"""
img_cv = image.image2cv(frame, False, False)
gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY)
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
edged = cv2.Canny(blurred, 50, 150)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
ceroded = cv2.erode(cv2.dilate(edged, kernel), kernel)
contours, _ = cv2.findContours(ceroded, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
best_center = best_radius = method = None
for cnt in contours:
area = cv2.contourArea(cnt)
perimeter = cv2.arcLength(cnt, True)
if perimeter < 100 or area < 100: continue
circularity = 4 * np.pi * area / (perimeter ** 2)
if circularity > 0.75 and len(cnt) >= 5:
center, axes, angle = cv2.fitEllipse(cnt)
radius = (axes[0] + axes[1]) / 4
best_center = (int(center[0]), int(center[1]))
best_radius = int(radius)
method = "清晰"
break
if not best_center:
hsv = cv2.cvtColor(img_cv, cv2.COLOR_BGR2HSV)
h, s, v = cv2.split(hsv)
s = np.clip(s * 2, 0, 255).astype(np.uint8)
hsv = cv2.merge((h, s, v))
lower_yellow = np.array([7, 80, 0])
upper_yellow = np.array([32, 255, 182])
mask = cv2.inRange(hsv, lower_yellow, upper_yellow)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
mask = cv2.morphologyEx(mask, cv2.MORPH_DILATE, kernel)
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if contours:
largest = max(contours, key=cv2.contourArea)
if cv2.contourArea(largest) > 50:
(x, y), radius = cv2.minEnclosingCircle(largest)
best_center = (int(x), int(y))
best_radius = int(radius)
method = "模糊"
result_img = image.cv2image(img_cv, False, False)
return result_img, best_center, best_radius, method, best_radius
def compute_laser_position(circle_center, laser_point, radius, method):
"""计算激光相对于靶心的偏差(单位:厘米)"""
if not all([circle_center, radius, method]):
return None, None
cx, cy = circle_center
lx, ly = laser_point
# 根据检测模式估算实际半径(单位:像素 → 厘米)
circle_r_cm = (radius / 4.0) * 20.0 if method == "模糊" else (68 / 16.0) * 20.0
dx = lx - cx
dy = ly - cy
scale = circle_r_cm / radius if radius != 0 else 1.0
return dx * scale, -dy * scale
# ==============================
# TCP 通信主线程
# ==============================
def connect_server():
"""连接服务器(通过 4G 模块 AT 指令)"""
global tcp_connected
if tcp_connected:
return True
print("正在连接服务器...")
at("AT+MIPCLOSE=0", "OK", 1000)
res = at(f'AT+MIPOPEN=0,"TCP","{SERVER_IP}",{SERVER_PORT}', "+MIPOPEN", 8000)
if "+MIPOPEN: 0,0" in res:
tcp_connected = True
return True
return False
def tcp_main():
"""TCP 通信主循环(独立线程)"""
global tcp_connected, send_queue, laser_calibration_active, laser_calibration_result,update_thread_started
while not app.need_exit():
if not connect_server():
time.sleep_ms(5000)
continue
login_data = {"deviceId": DEVICE_ID, "password": PASSWORD}
if not tcp_send_raw(make_packet(MSG_TYPE_LOGIN_REQ, login_data)):
tcp_connected = False
time.sleep_ms(2000)
continue
print("➡️ 登录包已发送,等待确认...")
logged_in = False
last_heartbeat_ack_time = time.ticks_ms()
last_heartbeat_send_time = time.ticks_ms()
rx_buf = b""
while True:
data = uart4g.read()
if data:
rx_buf += data
while b'+MIPURC: "rtcp"' in rx_buf:
try:
match = re.search(b'\+MIPURC: "rtcp",0,(\d+),(.+)', rx_buf, re.DOTALL)
if match:
payload_len = int(match.group(1))
payload = match.group(2)[:payload_len]
msg_type, body = parse_packet(payload)
if not logged_in and msg_type == MSG_TYPE_LOGIN_REQ:
if body and body.get("cmd") == 1 and body.get("data") == "登录成功":
logged_in = True
last_heartbeat_ack_time = time.ticks_ms()
print("✅ 登录成功")
else:
break
elif logged_in and msg_type == MSG_TYPE_HEARTBEAT:
last_heartbeat_ack_time = time.ticks_ms()
print("✅ 收到心跳确认")
elif logged_in and isinstance(body, dict):
inner_data = body.get("data", {})
if isinstance(inner_data, dict) and "cmd" in inner_data:
inner_cmd = inner_data["cmd"]
if inner_cmd == 2:
turn_on_laser()
time.sleep_ms(100)
laser_calibration_active = True
safe_enqueue({"result": "calibrating"}, MSG_TYPE_STATUS)
elif inner_cmd == 3:
distance_serial.write(LASER_OFF_CMD)
laser_calibration_active = False
safe_enqueue({"result": "laser_off"}, MSG_TYPE_STATUS)
elif inner_cmd == 4:
voltage = get_bus_voltage()
battery_percent = voltage_to_percent(voltage)
battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)}
safe_enqueue(battery_data, MSG_TYPE_STATUS)
elif inner_cmd == 5:
ssid = inner_data.get("ssid")
password = inner_data.get("password")
if not ssid or not password:
safe_enqueue({"result": "missing_ssid_or_password"}, MSG_TYPE_STATUS)
else:
# global update_thread_started
if not update_thread_started:
update_thread_started = True
_thread.start_new_thread(handle_wifi_and_update, (ssid, password))
else:
safe_enqueue({"result": "update_already_started"}, MSG_TYPE_STATUS)
elif inner_cmd == 6:
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
ip = ip if ip else "no_ip"
except:
ip = "error_getting_ip"
safe_enqueue({"result": "current_ip", "ip": ip}, MSG_TYPE_STATUS)
elif inner_cmd == 7:
# global update_thread_started
if update_thread_started:
safe_enqueue({"result": "update_already_started"}, MSG_TYPE_STATUS)
continue
# 实时检查是否有 IP
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
except:
ip = None
if not ip:
safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, MSG_TYPE_STATUS)
else:
# 启动纯下载线程
update_thread_started = True
_thread.start_new_thread(direct_ota_download, ())
rx_buf = rx_buf[match.end():]
else:
break
except Exception as e:
print(f"[ERROR] 解析/处理数据包失败: {e}")
rx_buf = b""
break
# 发送队列处理
msg_type = None
if logged_in:
with send_queue_lock:
if send_queue:
msg_type, data_dict = send_queue.pop(0)
if msg_type is not None:
pkt = make_packet(msg_type, data_dict)
if not tcp_send_raw(pkt):
print("💔 发送失败,断开重连")
break
# 校准结果上报
if logged_in:
x = y = None
with laser_calibration_data_lock:
if laser_calibration_result is not None:
x, y = laser_calibration_result
laser_calibration_result = None
if x is not None:
safe_enqueue({"result": "ok", "x": x, "y": y}, MSG_TYPE_STATUS)
# 心跳机制
current_time = time.ticks_ms()
if logged_in and current_time - last_heartbeat_send_time > HEARTBEAT_INTERVAL * 1000:
if not tcp_send_raw(make_packet(MSG_TYPE_HEARTBEAT, {"t": int(time.time())})):
print("💔 心跳发送失败")
break
last_heartbeat_send_time = current_time
if logged_in and current_time - last_heartbeat_ack_time > 6000:
print("⏰ 6秒无心跳ACK重连")
break
time.sleep_ms(50)
tcp_connected = False
time.sleep_ms(2000)
def laser_calibration_worker():
"""后台激光校准线程"""
global laser_calibration_active, laser_calibration_result
while True:
if laser_calibration_active:
result = calibrate_laser_position()
if result and len(result) == 2:
with laser_calibration_data_lock:
laser_calibration_result = result
laser_calibration_active = False
print(f"✅ 后台校准成功: {result}")
else:
time.sleep_ms(80)
else:
time.sleep_ms(50)
# ==============================
# 主程序入口
# ==============================
def cmd_str():
global DEVICE_ID, PASSWORD
DEVICE_ID = read_device_id()
PASSWORD = DEVICE_ID + "."
photo_dir = "/root/phot"
if photo_dir not in os.listdir("/root"):
try:
os.mkdir(photo_dir)
except:
pass
init_ina226()
load_laser_point()
disp = display.Display()
cam = camera.Camera(640, 480)
_thread.start_new_thread(tcp_main, ())
_thread.start_new_thread(laser_calibration_worker, ())
print("系统准备完成...")
while not app.need_exit():
if adc_obj.read() > ADC_TRIGGER_THRESHOLD:
time.sleep_ms(60)
frame = cam.read()
x, y = laser_point
frame.draw_line(int(x - length), int(y), int(x + length), int(y), color, thickness)
frame.draw_line(int(x), int(y - length), int(x), int(y + length), color, thickness)
frame.draw_circle(int(x), int(y), 1, color, thickness)
result_img, center, radius, method, _ = detect_circle(frame)
disp.show(result_img)
dx, dy = compute_laser_position(center, (x, y), radius, method)
distance_m = read_distance_from_laser_sensor()
voltage = get_bus_voltage()
battery_percent = voltage_to_percent(voltage)
try:
jpg_count = len([f for f in os.listdir(photo_dir) if f.endswith('.jpg')])
filename = f"{photo_dir}/{int(x)}_{int(y)}_{round((distance_m or 0.0) * 100)}_{method}_{jpg_count:04d}.jpg"
result_img.save(filename, quality=70)
except Exception as e:
print(f"❌ 保存照片失败: {e}")
inner_data = {
"x": float(dx) if dx is not None else 200.0,
"y": float(dy) if dy is not None else 200.0,
"r": 90.0,
"d": round((distance_m or 0.0) * 100),
"m": method
}
report_data = {"cmd": 1, "data": inner_data}
safe_enqueue(report_data, MSG_TYPE_STATUS)
time.sleep_ms(100)
else:
disp.show(cam.read())
time.sleep_ms(50)
if __name__ == "__main__":
cmd_str()

1137
laser_manager.py Normal file

File diff suppressed because it is too large Load Diff

212
logger_manager.py Normal file
View File

@@ -0,0 +1,212 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
日志管理器模块
提供异步日志功能(使用 QueueHandler + QueueListener
"""
import logging
from logging.handlers import QueueHandler, QueueListener, RotatingFileHandler
import queue
import os
import config
from version import VERSION
class LoggerManager:
"""日志管理器(单例)"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(LoggerManager, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# 私有状态
self._log_queue = None
self._queue_listener = None
self._logger = None
self._initialized = True
# ==================== 状态访问(只读属性)====================
@property
def logger(self):
"""获取logger对象只读"""
return self._logger
@property
def log_queue(self):
"""获取日志队列(只读)"""
return self._log_queue
# ==================== 业务方法 ====================
def init_logging(self, log_level=logging.INFO, log_file=None, max_bytes=None, backup_count=None):
"""
初始化异步日志系统(使用 QueueHandler + QueueListener
Args:
log_level: 日志级别,默认 INFO
log_file: 日志文件路径,默认使用 config.LOG_FILE
max_bytes: 单个日志文件最大大小(字节),默认使用 config.LOG_MAX_BYTES
backup_count: 保留的备份文件数量,默认使用 config.LOG_BACKUP_COUNT
"""
if log_file is None:
log_file = config.LOG_FILE
if max_bytes is None:
max_bytes = config.LOG_MAX_BYTES
if backup_count is None:
backup_count = config.LOG_BACKUP_COUNT
try:
# 创建日志队列(无界队列)
self._log_queue = queue.Queue(-1)
# 确保日志文件所在的目录存在
log_dir = os.path.dirname(log_file)
if log_dir: # 如果日志路径包含目录
try:
os.makedirs(log_dir, exist_ok=True)
except Exception as e:
print(f"[WARN] 无法创建日志目录 {log_dir}: {e}")
# 尝试创建文件Handler带日志轮转
try:
file_handler = RotatingFileHandler(
log_file,
maxBytes=max_bytes,
backupCount=backup_count,
encoding='utf-8',
mode='a' # 追加模式,确保不覆盖
)
except Exception as e:
# 如果RotatingFileHandler不可用降级为普通FileHandler
print(f"[WARN] RotatingFileHandler不可用使用普通FileHandler: {e}")
try:
file_handler = logging.FileHandler(log_file, encoding='utf-8', mode='a')
except Exception as e2:
# 如果文件Handler创建失败只使用控制台Handler
print(f"[WARN] 无法创建文件Handler仅使用控制台输出: {e2}")
file_handler = None
# 自定义Formatter包含版本信息
class CustomFormatter(logging.Formatter):
"""自定义日志格式,包含版本信息和行号"""
def format(self, record):
record.version = VERSION
return super().format(record)
# 如果file_handler存在设置格式和级别
if file_handler is not None:
file_handler.setFormatter(CustomFormatter(
'%(asctime)s [v%(version)s] [%(levelname)s] %(filename)s:%(lineno)d - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
))
file_handler.setLevel(log_level)
# 创建控制台Handler保留原有的print输出
console_handler = logging.StreamHandler()
console_handler.setFormatter(CustomFormatter(
'[v%(version)s] [%(levelname)s] %(filename)s:%(lineno)d - %(message)s'
))
console_handler.setLevel(log_level)
# 创建QueueListener后台线程处理日志写入
# 如果file_handler为None只使用console_handler
handlers = [console_handler]
if file_handler is not None:
handlers.append(file_handler)
self._queue_listener = QueueListener(
self._log_queue,
*handlers,
respect_handler_level=True
)
self._queue_listener.start()
# 创建QueueHandler用于记录日志
queue_handler = QueueHandler(self._log_queue)
# 配置根logger
self._logger = logging.getLogger()
self._logger.addHandler(queue_handler)
self._logger.setLevel(log_level)
# 避免日志向上传播到其他logger
self._logger.propagate = False
# 添加启动标记
self._logger.info("=" * 60)
self._logger.info("程序启动 - 日志系统初始化")
self._logger.info(f"版本: {VERSION}")
self._logger.info(f"日志文件: {log_file}")
self._logger.info("=" * 60)
return True
except Exception as e:
# 如果日志初始化失败,至少保证程序能运行
print(f"[ERROR] 日志系统初始化失败: {e}")
import traceback
try:
traceback.print_exc()
except:
pass
return False
def stop_logging(self):
"""停止日志系统(程序退出时调用)"""
try:
if self._logger:
# 确保所有日志都写入
self._logger.info("程序退出,正在保存日志...")
import time as std_time
std_time.sleep(0.5) # 给一点时间让日志写入
if self._queue_listener:
self._queue_listener.stop()
if self._logger:
# 等待队列中的日志处理完成
if self._log_queue:
import time as std_time
timeout = 5
start = std_time.time()
while not self._log_queue.empty() and (std_time.time() - start) < timeout:
std_time.sleep(0.1)
print("[LOG] 日志系统已停止")
except Exception as e:
print(f"[ERROR] 停止日志系统失败: {e}")
# 创建全局单例实例
logger_manager = LoggerManager()
# ==================== 向后兼容的函数接口 ====================
def init_logging(log_level=logging.INFO, log_file=None, max_bytes=None, backup_count=None):
"""初始化日志系统(向后兼容接口)"""
return logger_manager.init_logging(log_level, log_file, max_bytes, backup_count)
def stop_logging():
"""停止日志系统(向后兼容接口)"""
return logger_manager.stop_logging()
def get_logger():
"""
获取全局logger对象向后兼容接口
如果日志系统未初始化返回None此时可以使用print作为fallback
"""
return logger_manager.logger

1237
main.py

File diff suppressed because it is too large Load Diff

1095
network.py Normal file

File diff suppressed because it is too large Load Diff

1336
ota_manager.py Normal file

File diff suppressed because it is too large Load Diff

230
package.py Normal file
View File

@@ -0,0 +1,230 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
应用打包脚本
根据 app.yaml 中列出的文件,打包成 zip 文件
版本号从 version.py 中读取
"""
import argparse
import os
import yaml
import zipfile
from datetime import datetime
import sys
import secrets
MAGIC = b"AROTAE1" # 7 bytes: Archery OTA Encrypted v1
GCM_NONCE_LEN = 12
GCM_TAG_LEN = 16
# 添加当前目录到路径,以便导入 version 模块
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
def load_app_yaml(yaml_path='app.yaml'):
"""加载 app.yaml 文件"""
try:
with open(yaml_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
except Exception as e:
print(f"[ERROR] 读取 {yaml_path} 失败: {e}")
return None
def check_files_exist(files, base_dir='.'):
"""检查文件是否存在"""
missing_files = []
existing_files = []
for file_path in files:
full_path = os.path.join(base_dir, file_path)
if os.path.exists(full_path):
existing_files.append(file_path)
else:
missing_files.append(file_path)
return existing_files, missing_files
def get_version_from_version_py():
"""从 version.py 读取版本号"""
try:
from version import VERSION
return VERSION
except ImportError:
print("[WARNING] 无法导入 version.py使用默认版本号 1.0.0")
return '1.0.0'
except Exception as e:
print(f"[WARNING] 读取 version.py 失败: {e},使用默认版本号 1.0.0")
return '1.0.0'
def create_zip_package(app_info, files, output_dir='.', base_dir='.'):
"""创建 zip 打包文件"""
# 生成输出文件名:{name}_v{version}_{timestamp}.zip
# 版本号从 version.py 读取,而不是从 app.yaml
app_name = app_info.get('name', 'app')
version = get_version_from_version_py() # 从 version.py 读取版本号
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
zip_filename = f"{app_name}_v{version}_{timestamp}.zip"
zip_path = os.path.join(output_dir, zip_filename)
print(f"[INFO] 开始打包: {zip_filename}")
print(f"[INFO] 包含文件数: {len(files)}")
try:
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file_path in files:
full_path = os.path.join(base_dir, file_path)
# 使用相对路径作为 zip 内的路径
zipf.write(full_path, file_path)
print(f"{file_path}")
# 获取文件大小
file_size = os.path.getsize(zip_path)
file_size_mb = file_size / (1024 * 1024)
print(f"\n[SUCCESS] 打包完成!")
print(f" 文件名: {zip_filename}")
print(f" 文件大小: {file_size_mb:.2f} MB ({file_size:,} 字节)")
print(f" 文件路径: {os.path.abspath(zip_path)}")
return zip_path
except Exception as e:
print(f"[ERROR] 打包失败: {e}")
import traceback
traceback.print_exc()
return None
def _validate_key_hex(key_hex: str) -> bytes:
if not isinstance(key_hex, str):
raise ValueError("aead key must be hex string")
key_hex = key_hex.strip().lower()
if key_hex.startswith("0x"):
key_hex = key_hex[2:]
if len(key_hex) != 64:
raise ValueError("aead key must be 64 hex chars (32 bytes)")
try:
key = bytes.fromhex(key_hex)
except Exception as e:
raise ValueError(f"invalid hex key: {e}")
if len(key) != 32:
raise ValueError("aead key must be 32 bytes")
return key
def encrypt_zip_aead(zip_path: str, key_hex: str, out_ext: str = ".enc") -> str:
"""
Encrypt the whole zip file as one blob:
output format: MAGIC(7) | nonce(12) | ciphertext(N) | tag(16)
using AES-256-GCM (AEAD).
"""
# Lazy import: packaging-only dependency
try:
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
except Exception as e:
raise RuntimeError(
"Missing dependency: cryptography. Install with: pip install cryptography. "
f"Import error: {e}"
)
key = _validate_key_hex(key_hex)
with open(zip_path, "rb") as f:
plain = f.read()
nonce = secrets.token_bytes(GCM_NONCE_LEN)
aesgcm = AESGCM(key)
ct_and_tag = aesgcm.encrypt(nonce, plain, None) # ciphertext || tag (16 bytes)
enc_path = zip_path + out_ext if out_ext else (zip_path + ".enc")
with open(enc_path, "wb") as f:
f.write(MAGIC)
f.write(nonce)
f.write(ct_and_tag)
return enc_path
def main():
"""主函数"""
parser = argparse.ArgumentParser(description="打包 app.yaml 文件列表到 zip并可选进行 AES-256-GCM 加密输出 .enc")
parser.add_argument("--aead-key-hex", default=None, help="AES-256-GCM key (64 hex chars = 32 bytes). If set, output encrypted file.")
parser.add_argument("--keep-zip", action="store_true", help="Keep the plaintext zip when encryption is enabled.")
parser.add_argument("--out-ext", default=".enc", help="Encrypted output extension appended to zip path. Default: .enc (produces *.zip.enc)")
args = parser.parse_args()
print("=" * 60)
print("应用打包脚本")
print("=" * 60)
# 1. 加载 app.yaml
app_info = load_app_yaml('app.yaml')
if app_info is None:
return
# 从 version.py 读取版本号
version = get_version_from_version_py()
print(f"\n[INFO] 应用信息:")
print(f" ID: {app_info.get('id', 'N/A')}")
print(f" 名称: {app_info.get('name', 'N/A')}")
print(f" 版本: {version} (来自 version.py)")
print(f" 作者: {app_info.get('author', 'N/A')}")
if app_info.get('version') != version:
print(f" [注意] app.yaml 中的版本 ({app_info.get('version', 'N/A')}) 与 version.py 不一致")
# 2. 获取文件列表
files = app_info.get('files', [])
if not files:
print("[ERROR] app.yaml 中没有找到 files 列表")
return
print(f"\n[INFO] 文件列表 ({len(files)} 个文件):")
# 3. 检查文件是否存在
existing_files, missing_files = check_files_exist(files)
if missing_files:
print(f"\n[WARNING] 以下文件不存在,将被跳过:")
for f in missing_files:
print(f"{f}")
if not existing_files:
print("\n[ERROR] 没有找到任何有效文件,无法打包")
return
print(f"\n[INFO] 找到 {len(existing_files)} 个有效文件")
# 4. 创建 zip 包
zip_path = create_zip_package(app_info, existing_files)
if zip_path:
enc_path = None
if args.aead_key_hex:
try:
enc_path = encrypt_zip_aead(zip_path, args.aead_key_hex, out_ext=args.out_ext)
enc_size = os.path.getsize(enc_path)
print(f"\n[SUCCESS] AEAD加密完成: {os.path.basename(enc_path)} ({enc_size:,} bytes)")
print(f" 文件路径: {os.path.abspath(enc_path)}")
if not args.keep_zip:
try:
os.remove(zip_path)
print(f"[INFO] 已删除明文zip: {os.path.basename(zip_path)}")
except Exception as e:
print(f"[WARNING] 删除明文zip失败可忽略: {e}")
except Exception as e:
print(f"\n[ERROR] AEAD加密失败: {e}")
print("[ERROR] 保留明文zip用于排查。")
print("\n" + "=" * 60)
print("打包成功完成!")
print("=" * 60)
else:
print("\n" + "=" * 60)
print("打包失败!")
print("=" * 60)
if __name__ == "__main__":
main()

112
power.py Normal file
View File

@@ -0,0 +1,112 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
电源管理模块INA226
提供电压、电流监测和充电状态检测
"""
import config
from logger_manager import logger_manager
def write_register(reg, value):
"""写入INA226寄存器"""
from hardware import hardware_manager
data = [(value >> 8) & 0xFF, value & 0xFF]
hardware_manager.bus.writeto_mem(config.INA226_ADDR, reg, bytes(data))
def read_register(reg):
"""读取INA226寄存器"""
from hardware import hardware_manager
data = hardware_manager.bus.readfrom_mem(config.INA226_ADDR, reg, 2)
return (data[0] << 8) | data[1]
def init_ina226():
"""初始化 INA226 芯片:配置模式 + 校准值"""
write_register(config.REG_CONFIGURATION, 0x4527)
write_register(config.REG_CALIBRATION, config.CALIBRATION_VALUE)
def get_bus_voltage():
"""读取总线电压单位V"""
raw = read_register(config.REG_BUS_VOLTAGE)
return raw * 1.25 / 1000
def get_current():
"""
读取电流单位mA
正数表示充电,负数表示放电
INA226 电流计算公式:
Current = (Current Register Value) × Current_LSB
Current_LSB = 0.001 × CALIBRATION_VALUE / 4096
"""
try:
raw = read_register(config.REG_CURRENT)
# INA226 电流寄存器是16位有符号整数
# 最高位是符号位0=正充电1=负(放电)
# 计算 Current_LSB根据 CALIBRATION_VALUE
current_lsb = 0.001 * config.CALIBRATION_VALUE / 4096 # 单位A
# 处理有符号数如果最高位为1转换为负数
if raw & 0x8000: # 最高位为1表示负数放电
signed_raw = raw - 0x10000 # 转换为有符号整数
else: # 最高位为0表示正数充电
signed_raw = raw
# 转换为毫安
current_ma = signed_raw * current_lsb * 1000
return current_ma
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[INA226] 读取电流失败: {e}")
else:
print(f"[INA226] 读取电流失败: {e}")
return 0.0
def is_charging(threshold_ma=10.0):
"""
检测是否在充电(通过电流方向判断)
Args:
threshold_ma: 电流阈值毫安超过此值认为在充电默认10mA
Returns:
True: 正在充电
False: 未充电或读取失败
"""
try:
current = get_current()
is_charge = current > threshold_ma
return is_charge
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[CHARGE] 检测充电状态失败: {e}")
else:
print(f"[CHARGE] 检测充电状态失败: {e}")
return False
def voltage_to_percent(voltage):
"""根据电压估算电池百分比(查表插值)"""
points = [
(4.20, 100), (4.10, 95), (4.05, 85), (4.00, 75), (3.95, 65),
(3.90, 55), (3.85, 45), (3.80, 35), (3.75, 25), (3.70, 15),
(3.65, 5), (3.60, 0)
]
if voltage >= points[0][0]:
return 100
if voltage <= points[-1][0]:
return 0
for i in range(len(points) - 1):
v1, p1 = points[i]
v2, p2 = points[i + 1]
if voltage >= v2:
ratio = (voltage - v1) / (v2 - v1)
percent = p1 + (p2 - p1) * ratio
return max(0, min(100, int(round(percent))))
return 0

View File

@@ -47,6 +47,7 @@ def set_autostart_app(app_id):
if __name__ == "__main__":
new_autostart_app_id = "t11" # change to app_id you want to set
# new_autostart_app_id = None # remove autostart
# new_autostart_app_id = "z1222" # change to app_id you want to set
list_apps()
print("Before set autostart appid:", get_curr_autostart_app())

208
shoot_manager.py Normal file
View File

@@ -0,0 +1,208 @@
import config
from camera_manager import camera_manager
from laser_manager import laser_manager
from logger_manager import logger_manager
from network import network_manager
from power import get_bus_voltage, voltage_to_percent
from vision import estimate_distance, detect_circle_v3, save_shot_image
from maix import camera, display, image, app, time, uart, pinmap, i2c
def analyze_shot(frame, laser_point=None):
"""
分析射箭结果算法部分可迁移到C++
:param frame: 图像帧
:param laser_point: 激光点坐标 (x, y)
:return: 包含分析结果的字典
"""
logger = logger_manager.logger
# 先检测靶心以获取距离(用于计算激光点)
result_img_temp, center_temp, radius_temp, method_temp, best_radius1_temp, ellipse_params_temp = detect_circle_v3(
frame, None)
# 计算距离
distance_m = estimate_distance(best_radius1_temp) if best_radius1_temp else None
# 根据距离动态计算激光点坐标
laser_point_method = None
if config.HARDCODE_LASER_POINT:
laser_point = laser_manager.laser_point
laser_point_method = "hardcode"
elif laser_manager.has_calibrated_point():
laser_point = laser_manager.laser_point
laser_point_method = "calibrated"
if logger:
logger.info(f"[算法] 使用校准值: {laser_manager.laser_point}")
elif distance_m and distance_m > 0:
laser_point = laser_manager.calculate_laser_point_from_distance(distance_m)
laser_point_method = "dynamic"
if logger:
logger.info(f"[算法] 使用比例尺: {laser_point}")
else:
laser_point = laser_manager.laser_point
laser_point_method = "default"
if logger:
logger.info(f"[算法] 使用默认值: {laser_point}")
if laser_point is None:
return {
"success": False,
"reason": "laser_point_not_initialized"
}
x, y = laser_point
# 绘制激光十字线
color = image.Color(config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2])
frame.draw_line(
int(x - config.LASER_LENGTH), int(y),
int(x + config.LASER_LENGTH), int(y),
color, config.LASER_THICKNESS
)
frame.draw_line(
int(x), int(y - config.LASER_LENGTH),
int(x), int(y + config.LASER_LENGTH),
color, config.LASER_THICKNESS
)
frame.draw_circle(int(x), int(y), 1, color, config.LASER_THICKNESS)
# 重新检测靶心(使用计算出的激光点)
result_img, center, radius, method, best_radius1, ellipse_params = detect_circle_v3(frame, laser_point)
# 计算偏移与距离
if center and radius:
dx, dy = laser_manager.compute_laser_position(center, (x, y), radius, method)
distance_m = estimate_distance(best_radius1)
else:
dx, dy = None, None
distance_m = None
# 返回分析结果
return {
"success": True,
"result_img": result_img,
"center": center,
"radius": radius,
"method": method,
"best_radius1": best_radius1,
"ellipse_params": ellipse_params,
"dx": dx,
"dy": dy,
"distance_m": distance_m,
"laser_point": laser_point,
"laser_point_method": laser_point_method
}
def process_shot(adc_val):
"""
处理射箭事件(逻辑控制部分)
:param adc_val: ADC触发值
:return: None
"""
logger = logger_manager.logger
try:
frame = camera_manager.read_frame()
# 调用算法分析
analysis_result = analyze_shot(frame)
if not analysis_result.get("success"):
reason = analysis_result.get("reason", "unknown")
if logger:
logger.warning(f"[MAIN] 射箭分析失败: {reason}")
time.sleep_ms(100)
return
# 提取分析结果
result_img = analysis_result["result_img"]
center = analysis_result["center"]
radius = analysis_result["radius"]
method = analysis_result["method"]
ellipse_params = analysis_result["ellipse_params"]
dx = analysis_result["dx"]
dy = analysis_result["dy"]
distance_m = analysis_result["distance_m"]
laser_point = analysis_result["laser_point"]
laser_point_method = analysis_result["laser_point_method"]
x, y = laser_point
camera_manager.show(result_img)
if not (center and radius) and logger:
logger.warning("[MAIN] 未检测到靶心,但会保存图像")
# 读取电量
voltage = get_bus_voltage()
battery_percent = voltage_to_percent(voltage)
# 生成射箭ID
from shot_id_generator import shot_id_generator
shot_id = shot_id_generator.generate_id()
if logger:
logger.info(f"[MAIN] 射箭ID: {shot_id}")
# 保存图像
save_shot_image(
result_img,
center,
radius,
method,
ellipse_params,
(x, y),
distance_m,
shot_id=shot_id,
photo_dir=config.PHOTO_DIR if config.SAVE_IMAGE_ENABLED else None
)
# 构造上报数据
inner_data = {
"shot_id": shot_id,
"x": float(dx) if dx is not None else 200.0,
"y": float(dy) if dy is not None else 200.0,
"r": 90.0,
"d": round((distance_m or 0.0) * 100),
"d_laser": 0.0,
"d_laser_quality": 0,
"m": method if method else "no_target",
"adc": adc_val,
"laser_method": laser_point_method,
"target_x": float(x),
"target_y": float(y),
}
if ellipse_params:
(ell_center, (width, height), angle) = ellipse_params
inner_data["ellipse_major_axis"] = float(max(width, height))
inner_data["ellipse_minor_axis"] = float(min(width, height))
inner_data["ellipse_angle"] = float(angle)
inner_data["ellipse_center_x"] = float(ell_center[0])
inner_data["ellipse_center_y"] = float(ell_center[1])
else:
inner_data["ellipse_major_axis"] = None
inner_data["ellipse_minor_axis"] = None
inner_data["ellipse_angle"] = None
inner_data["ellipse_center_x"] = None
inner_data["ellipse_center_y"] = None
report_data = {"cmd": 1, "data": inner_data}
network_manager.safe_enqueue(report_data, msg_type=2, high=True)
if logger:
if center and radius:
logger.info(f"射箭事件已加入发送队列已检测到靶心ID: {shot_id}")
else:
logger.info(f"射箭事件已加入发送队列未检测到靶心已保存图像ID: {shot_id}")
# 闪一下激光(射箭反馈)
laser_manager.flash_laser(1000)
time.sleep_ms(100)
except Exception as e:
if logger:
logger.error(f"[MAIN] 图像处理异常: {e}")
import traceback
logger.error(traceback.format_exc())
time.sleep_ms(100)

76
shot_id_generator.py Normal file
View File

@@ -0,0 +1,76 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
射箭ID生成器
为每次射箭生成唯一ID格式{timestamp_ms}_{counter}
"""
from maix import time
import threading
class ShotIDGenerator:
"""射箭ID生成器单例"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super(ShotIDGenerator, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._counter = 0
self._last_timestamp_ms = 0
self._lock = threading.Lock()
self._initialized = True
def generate_id(self, device_id=None):
"""
生成唯一的射箭ID
Args:
device_id: 可选的设备ID如果提供则包含在ID中格式{device_id}_{timestamp_ms}_{counter}
如果不提供,则使用简单格式(格式:{timestamp_ms}_{counter}
Returns:
str: 唯一的射箭ID
"""
with self._lock:
current_timestamp_ms = time.ticks_ms()
# 如果时间戳相同,增加计数器;否则重置计数器
if current_timestamp_ms == self._last_timestamp_ms:
self._counter += 1
else:
self._counter = 0
self._last_timestamp_ms = current_timestamp_ms
# 生成ID
if device_id:
shot_id = f"{device_id}_{current_timestamp_ms}_{self._counter}"
else:
shot_id = f"{current_timestamp_ms}_{self._counter}"
return shot_id
def reset(self):
"""重置计数器(通常不需要调用)"""
with self._lock:
self._counter = 0
self._last_timestamp_ms = 0
# 创建全局单例实例
shot_id_generator = ShotIDGenerator()

186
time_sync.py Normal file
View File

@@ -0,0 +1,186 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
时间同步模块
从4G模块获取时间并同步到系统
"""
import re
import os
from datetime import datetime, timedelta
import config
# from logger_bak import get_logger
from logger_manager import logger_manager
def parse_4g_time(cclk_response, timezone_offset=8):
"""
解析 AT+CCLK? 返回的时间字符串,并转换为本地时间
Args:
cclk_response: AT+CCLK? 的响应字符串
timezone_offset: 时区偏移小时默认8中国时区 UTC+8
Returns:
datetime 对象(已转换为本地时间),如果解析失败返回 None
"""
try:
# 匹配格式: +CCLK: "YY/MM/DD,HH:MM:SS+TZ"
# 时区单位是四分之一小时quarters of an hour
match = re.search(r'\+CCLK:\s*"(\d{2})/(\d{2})/(\d{2}),(\d{2}):(\d{2}):(\d{2})([+-]\d{1,3})?"', cclk_response)
if not match:
return None
yy, mm, dd, hh, MM, ss, tz_str = match.groups()
# 年份处理26 -> 2026
year = 2000 + int(yy)
month = int(mm)
day = int(dd)
hour = int(hh)
minute = int(MM)
second = int(ss)
# 创建 UTC 时间的 datetime 对象
dt_utc = datetime(year, month, day, hour, minute, second)
# 解析时区偏移(单位:四分之一小时)
if tz_str:
try:
# 时区偏移值(四分之一小时)
tz_quarters = int(tz_str)
# 转换为小时除以4
tz_hours = tz_quarters / 4.0
logger = logger_manager.logger
if logger:
logger.info(f"[TIME] 时区偏移: {tz_str} (四分之一小时) = {tz_hours} 小时")
# 转换为本地时间
dt_local = dt_utc + timedelta(hours=tz_hours)
except ValueError:
# 如果时区解析失败,使用默认值
logger = logger_manager.logger
if logger:
logger.warning(f"[TIME] 时区解析失败: {tz_str},使用默认 UTC+{timezone_offset}")
dt_local = dt_utc + timedelta(hours=timezone_offset)
else:
# 没有时区信息,使用默认值
logger = logger_manager.logger
if logger:
logger.info(f"[TIME] 未找到时区信息,使用默认 UTC+{timezone_offset}")
dt_local = dt_utc + timedelta(hours=timezone_offset)
logger = logger_manager.logger
if logger:
logger.info(f"[TIME] UTC时间: {dt_utc.strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"[TIME] 本地时间: {dt_local.strftime('%Y-%m-%d %H:%M:%S')}")
return dt_local
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[TIME] 解析时间失败: {e}, 响应: {cclk_response}")
else:
print(f"[TIME] 解析时间失败: {e}, 响应: {cclk_response}")
return None
def get_time_from_4g(timezone_offset=8):
"""
通过4G模块获取当前时间已转换为本地时间
Args:
timezone_offset: 时区偏移小时默认8中国时区
Returns:
datetime 对象(本地时间),如果获取失败返回 None
"""
try:
# 发送 AT+CCLK? 命令(延迟导入避免循环依赖)
from hardware import hardware_manager
# 检查 at_client 是否已初始化
if hardware_manager.at_client is None:
logger = logger_manager.logger
if logger:
logger.warning("[TIME] ATClient 尚未初始化无法获取4G时间")
else:
print("[TIME] ATClient 尚未初始化无法获取4G时间")
return None
resp = hardware_manager.at_client.send("AT+CCLK?", "OK", 3000)
if not resp or "+CCLK:" not in resp:
logger = logger_manager.logger
if logger:
logger.warning(f"[TIME] 未获取到时间响应: {resp}")
else:
print(f"[TIME] 未获取到时间响应: {resp}")
return None
# 解析并转换时区
dt = parse_4g_time(resp, timezone_offset)
return dt
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[TIME] 获取4G时间异常: {e}")
else:
print(f"[TIME] 获取4G时间异常: {e}")
return None
def sync_system_time_from_4g(timezone_offset=8):
"""
从4G模块同步时间到系统
Args:
timezone_offset: 时区偏移小时默认8中国时区
Returns:
bool: 是否成功
"""
dt = get_time_from_4g(timezone_offset)
if not dt:
return False
try:
# 转换为系统 date 命令需要的格式
time_str = dt.strftime('%Y-%m-%d %H:%M:%S')
# 设置系统时间
cmd = f'date -s "{time_str}" 2>&1'
result = os.system(cmd)
if result == 0:
logger = logger_manager.logger
if logger:
logger.info(f"[TIME] 系统时间已设置为: {time_str}")
else:
print(f"[TIME] 系统时间已设置为: {time_str}")
# 可选:同步到硬件时钟
try:
os.system('hwclock -w 2>/dev/null')
logger = logger_manager.logger
if logger:
logger.info("[TIME] 已同步到硬件时钟")
except:
pass
return True
else:
logger = logger_manager.logger
if logger:
logger.error(f"[TIME] 设置系统时间失败,退出码: {result}")
else:
print(f"[TIME] 设置系统时间失败,退出码: {result}")
return False
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[TIME] 同步系统时间异常: {e}")
else:
print(f"[TIME] 同步系统时间异常: {e}")
return False

19
version.py Normal file
View File

@@ -0,0 +1,19 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
应用版本号
每次 OTA 更新时,只需要更新这个文件中的版本号
"""
VERSION = '1.2.1'
# 1.2.0 开始使用C++编译成.so替换部分代码
# 1.2.1 ota使用加密包
# 1.2.2 支持wifi ota并且设定时区并使用单独线程保存图片

751
vision.py Normal file
View File

@@ -0,0 +1,751 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
视觉检测模块
提供靶心检测、距离估算、图像保存等功能
"""
import cv2
import numpy as np
import os
import math
import threading
import queue
from maix import image
import config
from logger_manager import logger_manager
# 存图队列 + worker
_save_queue = queue.Queue(maxsize=16)
_save_worker_started = False
_save_worker_lock = threading.Lock()
def check_laser_point_sharpness(frame, laser_point=None, roi_size=30, threshold=100.0, ellipse_params=None):
"""
检测激光点本身的清晰度(不是整个靶子)
Args:
frame: 图像帧对象
laser_point: 激光点坐标 (x, y)如果为None则自动查找
roi_size: ROI区域大小像素默认30x30
threshold: 清晰度阈值
ellipse_params: 椭圆参数 ((center_x, center_y), (width, height), angle),用于限制激光点必须在椭圆内
Returns:
(is_sharp, sharpness_score, laser_pos): (是否清晰, 清晰度分数, 激光点坐标)
"""
try:
# 1. 如果没有提供激光点,先查找
if laser_point is None:
from laser_manager import laser_manager
laser_point = laser_manager.find_red_laser(frame, ellipse_params=ellipse_params)
if laser_point is None:
logger_manager.logger.debug(f"未找到激光点")
return False, 0.0, None
x, y = laser_point
# 2. 转换为 OpenCV 格式
img_cv = image.image2cv(frame, False, False)
h, w = img_cv.shape[:2]
# 3. 提取 ROI 区域(激光点周围)
roi_half = roi_size // 2
x_min = max(0, int(x) - roi_half)
x_max = min(w, int(x) + roi_half)
y_min = max(0, int(y) - roi_half)
y_max = min(h, int(y) + roi_half)
roi = img_cv[y_min:y_max, x_min:x_max]
if roi.size == 0:
return False, 0.0, laser_point
# 4. 转换为灰度图(用于清晰度检测)
gray_roi = cv2.cvtColor(roi, cv2.COLOR_RGB2GRAY)
# 5. 方法1检测点的扩散程度能量集中度
# 计算中心区域的能量集中度
center_x, center_y = roi.shape[1] // 2, roi.shape[0] // 2
center_radius = min(5, roi.shape[0] // 4) # 中心区域半径
# 创建中心区域的掩码
y_coords, x_coords = np.ogrid[:roi.shape[0], :roi.shape[1]]
center_mask = (x_coords - center_x)**2 + (y_coords - center_y)**2 <= center_radius**2
# 计算中心区域和周围区域的亮度
center_brightness = gray_roi[center_mask].mean()
outer_mask = ~center_mask
outer_brightness = gray_roi[outer_mask].mean() if np.any(outer_mask) else 0
# 对比度(清晰的点对比度高)
contrast = abs(center_brightness - outer_brightness)
# 6. 方法2检测点的边缘锐度使用拉普拉斯
laplacian = cv2.Laplacian(gray_roi, cv2.CV_64F)
edge_sharpness = abs(laplacian).var()
# 7. 方法3检测点的能量集中度方差
# 清晰的点:能量集中在中心,方差小
# 模糊的点:能量分散,方差大
# 但我们需要的是:清晰的点中心亮度高,周围低,所以梯度大
sobel_x = cv2.Sobel(gray_roi, cv2.CV_64F, 1, 0, ksize=3)
sobel_y = cv2.Sobel(gray_roi, cv2.CV_64F, 0, 1, ksize=3)
gradient = np.sqrt(sobel_x**2 + sobel_y**2)
gradient_sharpness = gradient.var()
# 8. 组合多个指标
# 对比度权重0.3边缘锐度权重0.4梯度权重0.3
sharpness_score = (contrast * 0.3 + edge_sharpness * 0.4 + gradient_sharpness * 0.3)
is_sharp = sharpness_score >= threshold
logger = logger_manager.logger
if logger:
logger.debug(f"[VISION] 激光点清晰度: 位置=({x}, {y}), 对比度={contrast:.2f}, 边缘={edge_sharpness:.2f}, 梯度={gradient_sharpness:.2f}, 综合={sharpness_score:.2f}, 是否清晰={is_sharp}")
return is_sharp, sharpness_score, laser_point
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[VISION] 激光点清晰度检测失败: {e}")
import traceback
logger.error(traceback.format_exc())
return False, 0.0, laser_point
def check_image_sharpness(frame, threshold=100.0, save_debug_images=False):
"""
检查图像清晰度(针对圆形靶子优化,基于圆形边缘检测)
检测靶心的圆形边缘,计算边缘区域的梯度清晰度
Args:
frame: 图像帧对象
threshold: 清晰度阈值低于此值认为图像模糊默认100.0
可以根据实际情况调整:
- 清晰图像通常 > 200
- 模糊图像通常 < 100
- 中等清晰度 100-200
save_debug_images: 是否保存调试图像原始图和边缘图默认False
Returns:
(is_sharp, sharpness_score): (是否清晰, 清晰度分数)
"""
try:
logger_manager.logger.debug(f"begin")
# 转换为 OpenCV 格式
img_cv = image.image2cv(frame, False, False)
logger_manager.logger.debug(f"after image2cv")
# 转换为 HSV 颜色空间
hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV)
h, s, v = cv2.split(hsv)
logger_manager.logger.debug(f"after HSV conversion")
# 检测黄色区域(靶心)
# 调整饱和度策略:稍微增强,不要过度
s_enhanced = np.clip(s * 1.1, 0, 255).astype(np.uint8)
hsv_enhanced = cv2.merge((h, s_enhanced, v))
# HSV 阈值范围(与 detect_circle_v3 保持一致)
lower_yellow = np.array([7, 80, 0])
upper_yellow = np.array([32, 255, 255])
mask_yellow = cv2.inRange(hsv_enhanced, lower_yellow, upper_yellow)
# 形态学操作,填充小孔洞
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask_yellow = cv2.morphologyEx(mask_yellow, cv2.MORPH_CLOSE, kernel)
logger_manager.logger.debug(f"after yellow mask detection")
# 计算边缘区域:扩展黄色区域,然后减去原始区域,得到边缘区域
mask_dilated = cv2.dilate(mask_yellow, kernel, iterations=2)
mask_edge = cv2.subtract(mask_dilated, mask_yellow) # 边缘区域
# 计算边缘区域的像素数量
edge_pixel_count = np.sum(mask_edge > 0)
logger_manager.logger.debug(f"edge pixel count: {edge_pixel_count}")
# 如果检测不到边缘区域,使用全局梯度作为后备方案
if edge_pixel_count < 100:
logger_manager.logger.debug(f"edge region too small, using global gradient")
# 使用 V 通道计算全局梯度
sobel_v_x = cv2.Sobel(v, cv2.CV_64F, 1, 0, ksize=3)
sobel_v_y = cv2.Sobel(v, cv2.CV_64F, 0, 1, ksize=3)
gradient = np.sqrt(sobel_v_x**2 + sobel_v_y**2)
sharpness_score = gradient.var()
logger_manager.logger.debug(f"global gradient variance: {sharpness_score:.2f}")
else:
# 在边缘区域计算梯度清晰度
# 使用 V亮度通道计算梯度因为边缘在亮度上通常很明显
sobel_v_x = cv2.Sobel(v, cv2.CV_64F, 1, 0, ksize=3)
sobel_v_y = cv2.Sobel(v, cv2.CV_64F, 0, 1, ksize=3)
gradient = np.sqrt(sobel_v_x**2 + sobel_v_y**2)
# 只在边缘区域计算清晰度
edge_gradient = gradient[mask_edge > 0]
if len(edge_gradient) > 0:
# 计算边缘梯度的方差(清晰图像的边缘梯度变化大)
sharpness_score = edge_gradient.var()
# 也可以使用均值作为补充指标(清晰图像的边缘梯度均值也较大)
gradient_mean = edge_gradient.mean()
logger_manager.logger.debug(f"edge gradient: mean={gradient_mean:.2f}, var={sharpness_score:.2f}, pixels={len(edge_gradient)}")
else:
# 如果边缘区域没有有效梯度,使用全局梯度
sharpness_score = gradient.var()
logger_manager.logger.debug(f"no edge gradient, using global: {sharpness_score:.2f}")
# 保存调试图像(如果启用)
if save_debug_images:
try:
debug_dir = config.PHOTO_DIR
if debug_dir not in os.listdir("/root"):
try:
os.mkdir(debug_dir)
except:
pass
# 生成文件名
try:
all_images = [f for f in os.listdir(debug_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))]
img_count = len(all_images)
except:
img_count = 0
# 保存原始图像
img_orig = image.cv2image(img_cv, False, False)
orig_filename = f"{debug_dir}/sharpness_debug_orig_{img_count:04d}.bmp"
img_orig.save(orig_filename)
# # 保存边缘检测结果(可视化)
# # 创建可视化图像:原始图像 + 黄色区域 + 边缘区域
# debug_img = img_cv.copy()
# # 在黄色区域绘制绿色
# debug_img[mask_yellow > 0] = [0, 255, 0] # RGB格式绿色
# # 在边缘区域绘制红色
# debug_img[mask_edge > 0] = [255, 0, 0] # RGB格式红色
# debug_img_maix = image.cv2image(debug_img, False, False)
# debug_filename = f"{debug_dir}/sharpness_debug_edge_{img_count:04d}.bmp"
# debug_img_maix.save(debug_filename)
# logger = logger_manager.logger
# if logger:
# logger.info(f"[VISION] 保存调试图像: {orig_filename}, {debug_filename}")
except Exception as e:
logger = logger_manager.logger
if logger:
logger.warning(f"[VISION] 保存调试图像失败: {e}")
import traceback
logger.error(traceback.format_exc())
is_sharp = sharpness_score >= threshold
logger = logger_manager.logger
if logger:
logger.debug(f"[VISION] 清晰度检测: 分数={sharpness_score:.2f}, 边缘像素数={edge_pixel_count}, 是否清晰={is_sharp}, 阈值={threshold}")
return is_sharp, sharpness_score
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[VISION] 清晰度检测失败: {e}")
import traceback
logger.error(traceback.format_exc())
# 出错时返回 False避免使用模糊图像
return False, 0.0
def save_calibration_image(frame, laser_pos, photo_dir=None):
"""
保存激光校准图像(带标注)
在找到的激光点位置绘制圆圈,便于检查算法是否正确
Args:
frame: 原始图像帧
laser_pos: 找到的激光点坐标 (x, y)
photo_dir: 照片存储目录如果为None则使用 config.PHOTO_DIR
Returns:
str: 保存的文件路径,如果保存失败则返回 None
"""
# 检查是否启用图像保存
if not config.SAVE_IMAGE_ENABLED:
return None
if photo_dir is None:
photo_dir = config.PHOTO_DIR
try:
# 确保照片目录存在
try:
if photo_dir not in os.listdir("/root"):
os.mkdir(photo_dir)
except:
pass
# 生成文件名
try:
all_images = [f for f in os.listdir(photo_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))]
img_count = len(all_images)
except:
img_count = 0
x, y = laser_pos
filename = f"{photo_dir}/calibration_{int(x)}_{int(y)}_{img_count:04d}.bmp"
logger = logger_manager.logger
if logger:
logger.info(f"保存校准图像: {filename}, 激光点: ({x}, {y})")
# 转换图像为 OpenCV 格式以便绘制
img_cv = image.image2cv(frame, False, False)
# 绘制激光点圆圈(用绿色圆圈标出找到的激光点)
cv2.circle(img_cv, (int(x), int(y)), 10, (0, 255, 0), 2) # 外圈绿色半径10
cv2.circle(img_cv, (int(x), int(y)), 5, (0, 255, 0), 2) # 中圈绿色半径5
cv2.circle(img_cv, (int(x), int(y)), 2, (0, 255, 0), -1) # 中心点:绿色实心
# 可选:绘制十字线帮助定位
cv2.line(img_cv,
(int(x - 20), int(y)),
(int(x + 20), int(y)),
(0, 255, 0), 1) # 水平线
cv2.line(img_cv,
(int(x), int(y - 20)),
(int(x), int(y + 20)),
(0, 255, 0), 1) # 垂直线
# 转换回 MaixPy 图像格式并保存
result_img = image.cv2image(img_cv, False, False)
result_img.save(filename)
if logger:
logger.debug(f"校准图像已保存: {filename}")
return filename
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"保存校准图像失败: {e}")
import traceback
logger.error(traceback.format_exc())
return None
def detect_circle_v3(frame, laser_point=None):
"""检测图像中的靶心(优先清晰轮廓,其次黄色区域)- 返回椭圆参数版本
增加红色圆圈检测,验证黄色圆圈是否为真正的靶心
如果提供 laser_point会选择最接近激光点的目标
Args:
frame: 图像帧
laser_point: 激光点坐标 (x, y),用于多目标场景下的目标选择
Returns:
(result_img, best_center, best_radius, method, best_radius1, ellipse_params)
"""
img_cv = image.image2cv(frame, False, False)
best_center = best_radius = best_radius1 = method = None
ellipse_params = None
# HSV 黄色掩码检测(模糊靶心)
hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV)
h, s, v = cv2.split(hsv)
# 调整饱和度策略:稍微增强,不要过度
s = np.clip(s * 1.1, 0, 255).astype(np.uint8)
hsv = cv2.merge((h, s, v))
# 放宽 HSV 阈值范围(针对模糊图像的关键调整)
lower_yellow = np.array([7, 80, 0]) # 饱和度下限降低,捕捉淡黄色
upper_yellow = np.array([32, 255, 255]) # 亮度上限拉满
mask_yellow = cv2.inRange(hsv, lower_yellow, upper_yellow)
# 调整形态学操作
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask_yellow = cv2.morphologyEx(mask_yellow, cv2.MORPH_CLOSE, kernel)
contours_yellow, _ = cv2.findContours(mask_yellow, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# 存储所有有效的黄色-红色组合
valid_targets = []
if contours_yellow:
for cnt_yellow in contours_yellow:
area = cv2.contourArea(cnt_yellow)
perimeter = cv2.arcLength(cnt_yellow, True)
# 计算圆度
if perimeter > 0:
circularity = (4 * np.pi * area) / (perimeter * perimeter)
else:
circularity = 0
logger = logger_manager.logger
if area > 50 and circularity > 0.7:
if logger:
logger.info(f"[target] -> 面积:{area}, 圆度:{circularity:.2f}")
# 尝试拟合椭圆
yellow_center = None
yellow_radius = None
yellow_ellipse = None
if len(cnt_yellow) >= 5:
(x, y), (width, height), angle = cv2.fitEllipse(cnt_yellow)
yellow_ellipse = ((x, y), (width, height), angle)
axes_minor = min(width, height)
radius = axes_minor / 2
yellow_center = (int(x), int(y))
yellow_radius = int(radius)
else:
(x, y), radius = cv2.minEnclosingCircle(cnt_yellow)
yellow_center = (int(x), int(y))
yellow_radius = int(radius)
yellow_ellipse = None
# 如果检测到黄色圆圈,再检测红色圆圈进行验证
if yellow_center and yellow_radius:
# HSV 红色掩码检测红色在HSV中跨越0度需要两个范围
# 红色范围1: 0-10度接近0度的红色
lower_red1 = np.array([0, 80, 0])
upper_red1 = np.array([10, 255, 255])
mask_red1 = cv2.inRange(hsv, lower_red1, upper_red1)
# 红色范围2: 170-180度接近180度的红色
lower_red2 = np.array([170, 80, 0])
upper_red2 = np.array([180, 255, 255])
mask_red2 = cv2.inRange(hsv, lower_red2, upper_red2)
# 合并两个红色掩码
mask_red = cv2.bitwise_or(mask_red1, mask_red2)
# 形态学操作
kernel_red = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask_red = cv2.morphologyEx(mask_red, cv2.MORPH_CLOSE, kernel_red)
contours_red, _ = cv2.findContours(mask_red, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
found_valid_red = False
if contours_red:
# 找到所有符合条件的红色圆圈
for cnt_red in contours_red:
area_red = cv2.contourArea(cnt_red)
perimeter_red = cv2.arcLength(cnt_red, True)
if perimeter_red > 0:
circularity_red = (4 * np.pi * area_red) / (perimeter_red * perimeter_red)
else:
circularity_red = 0
# 红色圆圈也应该有一定的圆度
if area_red > 50 and circularity_red > 0.6:
# 计算红色圆圈的中心和半径
if len(cnt_red) >= 5:
(x_red, y_red), (w_red, h_red), angle_red = cv2.fitEllipse(cnt_red)
radius_red = min(w_red, h_red) / 2
red_center = (int(x_red), int(y_red))
red_radius = int(radius_red)
else:
(x_red, y_red), radius_red = cv2.minEnclosingCircle(cnt_red)
red_center = (int(x_red), int(y_red))
red_radius = int(radius_red)
# 计算黄色和红色圆心的距离
if red_center:
dx = yellow_center[0] - red_center[0]
dy = yellow_center[1] - red_center[1]
distance = np.sqrt(dx*dx + dy*dy)
# 圆心距离阈值应该小于黄色半径的某个倍数比如1.5倍)
max_distance = yellow_radius * 1.5
# 红色圆圈应该比黄色圆圈大(外圈)
if distance < max_distance and red_radius > yellow_radius * 0.8:
found_valid_red = True
logger = logger_manager.logger
if logger:
logger.info(f"[target] -> 找到匹配的红圈: 黄心({yellow_center}), 红心({red_center}), 距离:{distance:.1f}, 黄半径:{yellow_radius}, 红半径:{red_radius}")
# 记录这个有效目标
valid_targets.append({
'center': yellow_center,
'radius': yellow_radius,
'ellipse': yellow_ellipse,
'area': area
})
break
if not found_valid_red:
logger = logger_manager.logger
if logger:
logger.debug("Debug -> 未找到匹配的红色圆圈,可能是误识别")
# 从所有有效目标中选择最佳目标
if valid_targets:
if laser_point:
# 如果有激光点,选择最接近激光点的目标
best_target = None
min_distance = float('inf')
for target in valid_targets:
dx = target['center'][0] - laser_point[0]
dy = target['center'][1] - laser_point[1]
distance = np.sqrt(dx*dx + dy*dy)
if distance < min_distance:
min_distance = distance
best_target = target
if best_target:
best_center = best_target['center']
best_radius = best_target['radius']
ellipse_params = best_target['ellipse']
method = "v3_ellipse_red_validated_laser_selected"
best_radius1 = best_radius * 5
else:
# 如果没有激光点,选择面积最大的目标
best_target = max(valid_targets, key=lambda t: t['area'])
best_center = best_target['center']
best_radius = best_target['radius']
ellipse_params = best_target['ellipse']
method = "v3_ellipse_red_validated"
best_radius1 = best_radius * 5
result_img = image.cv2image(img_cv, False, False)
return result_img, best_center, best_radius, method, best_radius1, ellipse_params
def estimate_distance(pixel_radius):
"""根据像素半径估算实际距离(单位:米)"""
if not pixel_radius:
return 0.0
return (config.REAL_RADIUS_CM * config.FOCAL_LENGTH_PIX) / pixel_radius / 100.0
def estimate_pixel(physical_distance_cm, target_distance_m):
"""
根据物理距离和目标距离计算对应的像素偏移
Args:
physical_distance_cm: 物理世界中的距离(厘米),例如激光与摄像头的距离
target_distance_m: 目标距离(米),例如到靶心的距离
Returns:
float: 对应的像素偏移
"""
if not target_distance_m or target_distance_m <= 0:
return 0.0
# 公式:像素偏移 = (物理距离_米) * 焦距_像素 / 目标距离_米
return (physical_distance_cm / 100.0) * config.FOCAL_LENGTH_PIX / target_distance_m
def _save_shot_image_impl(img_cv, center, radius, method, ellipse_params,
laser_point, distance_m, shot_id=None, photo_dir=None):
"""
内部实现:在 img_cv (numpy HWC RGB) 上绘制标注并保存。
由 save_shot_image同步和存图 worker异步调用。
"""
if not config.SAVE_IMAGE_ENABLED:
return None
if photo_dir is None:
photo_dir = config.PHOTO_DIR
try:
try:
if photo_dir not in os.listdir("/root"):
os.mkdir(photo_dir)
except Exception:
pass
x, y = laser_point
if shot_id:
if center is None or radius is None:
filename = f"{photo_dir}/shot_{shot_id}_no_target.bmp"
else:
method_str = method or "unknown"
filename = f"{photo_dir}/shot_{shot_id}_{method_str}.bmp"
else:
try:
all_images = [f for f in os.listdir(photo_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))]
img_count = len(all_images)
except Exception:
img_count = 0
if center is None or radius is None:
method_str = "no_target"
distance_str = "000"
else:
method_str = method or "unknown"
distance_str = str(round((distance_m or 0.0) * 100))
filename = f"{photo_dir}/{method_str}_{int(x)}_{int(y)}_{distance_str}_{img_count:04d}.bmp"
logger = logger_manager.logger
if logger:
if shot_id:
logger.info(f"[VISION] 保存射箭图像ID: {shot_id}, 文件名: {filename}")
if center and radius:
logger.info(f"结果 -> 圆心: {center}, 半径: {radius}, 方法: {method}")
if ellipse_params:
(ec, (ew, eh), ea) = ellipse_params
logger.info(f"椭圆 -> 中心: ({ec[0]:.1f}, {ec[1]:.1f}), 长轴: {max(ew, eh):.1f}, 短轴: {min(ew, eh):.1f}, 角度: {ea:.1f}°")
else:
logger.info(f"结果 -> 未检测到靶心,保存原始图像(激光点: ({x}, {y})")
laser_color = (config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2])
cross_thickness = int(max(getattr(config, "LASER_THICKNESS", 1), 1))
cross_length = int(max(getattr(config, "LASER_LENGTH", 10), 10))
cv2.line(img_cv, (int(x - cross_length), int(y)), (int(x + cross_length), int(y)), laser_color, cross_thickness)
cv2.line(img_cv, (int(x), int(y - cross_length)), (int(x), int(y + cross_length)), laser_color, cross_thickness)
cv2.circle(img_cv, (int(x), int(y)), 1, laser_color, cross_thickness)
ring_thickness = 1
cv2.circle(img_cv, (int(x), int(y)), 10, laser_color, ring_thickness)
cv2.circle(img_cv, (int(x), int(y)), 5, laser_color, ring_thickness)
cv2.circle(img_cv, (int(x), int(y)), 2, laser_color, -1)
if center and radius:
cx, cy = center
if ellipse_params:
(ell_center, (width, height), angle) = ellipse_params
cx_ell, cy_ell = int(ell_center[0]), int(ell_center[1])
cv2.ellipse(img_cv, (cx_ell, cy_ell), (int(width / 2), int(height / 2)), angle, 0, 360, (0, 255, 0), 2)
cv2.circle(img_cv, (cx_ell, cy_ell), 3, (255, 0, 0), -1)
minor_length = min(width, height) / 2
minor_angle = angle + 90 if width >= height else angle
minor_angle_rad = math.radians(minor_angle)
dx_minor = minor_length * math.cos(minor_angle_rad)
dy_minor = minor_length * math.sin(minor_angle_rad)
pt1 = (int(cx_ell - dx_minor), int(cy_ell - dy_minor))
pt2 = (int(cx_ell + dx_minor), int(cy_ell + dy_minor))
cv2.line(img_cv, pt1, pt2, (0, 0, 255), 2)
else:
cv2.circle(img_cv, (cx, cy), radius, (0, 0, 255), 2)
cv2.circle(img_cv, (cx, cy), 2, (0, 0, 255), -1)
cv2.line(img_cv, (int(x), int(y)), (cx, cy), (255, 255, 0), 1)
out = image.cv2image(img_cv, False, False)
out.save(filename)
if logger:
if center and radius:
logger.debug(f"图像已保存(含靶心标注): {filename}")
else:
logger.debug(f"图像已保存(无靶心,含激光十字线): {filename}")
# 清理旧图片如果目录下图片超过100张删除最老的
try:
image_files = []
for f in os.listdir(photo_dir):
if f.endswith(('.bmp', '.jpg', '.jpeg')):
filepath = os.path.join(photo_dir, f)
try:
mtime = os.path.getmtime(filepath)
image_files.append((mtime, filepath, f))
except Exception:
pass
from config import MAX_IMAGES
if len(image_files) > MAX_IMAGES:
image_files.sort(key=lambda x: x[0])
to_delete = len(image_files) - MAX_IMAGES
deleted_count = 0
for _, filepath, fname in image_files[:to_delete]:
try:
os.remove(filepath)
deleted_count += 1
if logger:
logger.debug(f"[VISION] 删除旧图片: {fname}")
except Exception as e:
if logger:
logger.warning(f"[VISION] 删除旧图片失败 {fname}: {e}")
if logger and deleted_count > 0:
logger.info(f"[VISION] 已清理 {deleted_count} 张旧图片,当前剩余 {MAX_IMAGES}")
except Exception as e:
if logger:
logger.warning(f"[VISION] 清理旧图片时出错(可忽略): {e}")
return filename
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"保存图像失败: {e}")
import traceback
logger.error(traceback.format_exc())
return None
def _save_worker_loop():
"""存图 worker从队列取任务并调用 _save_shot_image_impl。"""
while True:
try:
item = _save_queue.get()
if item is None:
break
_save_shot_image_impl(*item)
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[VISION] 存图 worker 异常: {e}")
import traceback
logger.error(traceback.format_exc())
finally:
try:
_save_queue.task_done()
except Exception:
pass
def start_save_shot_worker():
"""启动存图 worker 线程(应在程序初始化时调用一次)。"""
global _save_worker_started
with _save_worker_lock:
if _save_worker_started:
return
_save_worker_started = True
t = threading.Thread(target=_save_worker_loop, daemon=True)
t.start()
logger = logger_manager.logger
if logger:
logger.info("[VISION] 存图 worker 线程已启动")
def enqueue_save_shot(result_img, center, radius, method, ellipse_params,
laser_point, distance_m, shot_id=None, photo_dir=None):
"""
将存图任务放入队列,由 worker 异步保存。主线程传入 result_img 的复制,不阻塞。
"""
if not config.SAVE_IMAGE_ENABLED:
return
if photo_dir is None:
photo_dir = config.PHOTO_DIR
try:
img_cv = image.image2cv(result_img, False, False)
img_copy = np.copy(img_cv)
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[VISION] enqueue_save_shot 复制图像失败: {e}")
return
task = (img_copy, center, radius, method, ellipse_params, laser_point, distance_m, shot_id, photo_dir)
try:
_save_queue.put_nowait(task)
except queue.Full:
logger = logger_manager.logger
if logger:
logger.warning("[VISION] 存图队列已满,跳过本次保存")
def save_shot_image(result_img, center, radius, method, ellipse_params,
laser_point, distance_m, shot_id=None, photo_dir=None):
"""
保存射击图像(带标注)。同步调用,会阻塞。
主流程建议使用 enqueue_save_shot此处保留供校准、测试等场景使用。
"""
if not config.SAVE_IMAGE_ENABLED:
return None
if photo_dir is None:
photo_dir = config.PHOTO_DIR
try:
img_cv = image.image2cv(result_img, False, False)
return _save_shot_image_impl(img_cv, center, radius, method, ellipse_params,
laser_point, distance_m, shot_id, photo_dir)
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[VISION] save_shot_image 转换图像失败: {e}")
return None