Files
archery/main.py

522 lines
22 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
激光射击系统主程序视觉测距版
功能目标检测激光校准4G TCP 通信OTA 升级单目测距INA226 电量监测
平台MaixPy (Sipeed MAIX)
作者ZZH
最后更新2025-11-21
重构版本使用模块化设计
"""
from maix import camera, display, image, app, time, uart, pinmap, i2c
2025-12-28 16:22:41 +08:00
from maix.peripheral import adc
import _thread
import os
import json
2025-12-30 16:23:17 +08:00
# 导入新模块
import config
from version import VERSION
# from logger import init_logging, get_logger, stop_logging
from logger_manager import logger_manager
from time_sync import sync_system_time_from_4g
from power import init_ina226, get_bus_voltage, voltage_to_percent
from laser_manager import laser_manager
2026-01-24 11:05:03 +08:00
from vision import detect_circle_v3, estimate_distance, enqueue_save_shot, start_save_shot_worker
from network import network_manager
from ota_manager import ota_manager
from hardware import hardware_manager
2026-01-20 11:25:17 +08:00
from camera_manager import camera_manager
2025-12-30 16:23:17 +08:00
def laser_calibration_worker():
"""后台线程:持续检测是否需要执行激光校准"""
from laser_manager import laser_manager
from ota_manager import ota_manager
2025-12-28 16:22:41 +08:00
logger = logger_manager.logger
if logger:
logger.info("[LASER] 激光校准线程启动")
2025-12-28 16:22:41 +08:00
while True:
2025-12-28 16:22:41 +08:00
try:
try:
if ota_manager.ota_in_progress:
time.sleep_ms(200)
2025-12-26 11:47:33 +08:00
continue
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[LASER] OTA检查异常: {e}")
2025-12-30 16:23:17 +08:00
time.sleep_ms(200)
continue
if laser_manager.calibration_active:
2026-01-20 11:25:17 +08:00
# 调用校准方法,所有逻辑都在 LaserManager 中
result = laser_manager.calibrate_laser_position(timeout_ms=8000, check_sharpness=True)
# 如果超时仍未成功,稍微休息一下
if laser_manager.calibration_active:
time.sleep_ms(300)
2025-12-30 09:21:58 +08:00
else:
time.sleep_ms(50)
except Exception as e:
# 线程顶层异常捕获,防止线程静默退出
logger = logger_manager.logger
if logger:
logger.error(f"[LASER] 校准线程异常: {e}")
import traceback
logger.error(traceback.format_exc())
else:
print(f"[LASER] 校准线程异常: {e}")
import traceback
traceback.print_exc()
time.sleep_ms(1000) # 等待1秒后继续
def cmd_str():
"""主程序入口"""
# ==================== 第一阶段:硬件初始化 ====================
# 按照 main104.py 的顺序,先完成所有硬件初始化
# 1. 引脚功能映射
for pin, func in config.PIN_MAPPINGS.items():
2025-12-30 16:23:17 +08:00
try:
pinmap.set_pin_function(pin, func)
2025-12-30 16:23:17 +08:00
except:
pass
# 2. 初始化硬件对象UART、I2C、ADC
hardware_manager.init_uart4g()
hardware_manager.init_distance_serial()
hardware_manager.init_bus()
hardware_manager.init_adc()
hardware_manager.init_at_client()
# 3. 初始化 INA226 电量监测芯片
init_ina226()
2026-01-20 18:40:54 +08:00
2026-01-20 18:40:54 +08:00
# 4. 初始化显示和相机
2026-01-20 11:25:17 +08:00
camera_manager.init_camera(640, 480)
camera_manager.init_display()
# ==================== 第二阶段:软件初始化 ====================
# 1. 初始化日志系统
import logging
logger_manager.init_logging(log_level=logging.DEBUG)
logger = logger_manager.logger
# 2. 从4G模块同步系统时间需要 at_client 已初始化)
sync_system_time_from_4g()
2026-01-24 11:05:03 +08:00
# 2.5. 启动存图 worker 线程(队列 + worker避免主循环阻塞
start_save_shot_worker()
# 3. 启动时检查:是否需要恢复备份
pending_path = f"{config.APP_DIR}/ota_pending.json"
if os.path.exists(pending_path):
2025-12-30 16:23:17 +08:00
try:
with open(pending_path, 'r', encoding='utf-8') as f:
pending_obj = json.load(f)
restart_count = pending_obj.get('restart_count', 0)
max_restarts = pending_obj.get('max_restarts', 3)
backup_dir = pending_obj.get('backup_dir')
if logger:
logger.info(f"检测到 ota_pending.json重启计数: {restart_count}/{max_restarts}")
if restart_count >= max_restarts:
if logger:
logger.error(f"[STARTUP] 重启次数 ({restart_count}) 超过阈值 ({max_restarts}),执行恢复...")
if backup_dir and os.path.exists(backup_dir):
if ota_manager.restore_from_backup(backup_dir):
if logger:
logger.info(f"[STARTUP] 已从指定备份恢复: {backup_dir}")
else:
if logger:
logger.info(f"[STARTUP] 指定备份恢复失败,尝试恢复最新备份...")
ota_manager.restore_from_backup(None)
else:
if ota_manager.restore_from_backup(None):
if logger:
logger.info(f"[STARTUP] 已从最新备份恢复")
else:
if logger:
logger.error(f"[STARTUP] 恢复备份失败")
try:
os.remove(pending_path)
if logger:
logger.info(f"[STARTUP] 已删除 ota_pending.json")
except Exception as e:
if logger:
logger.error(f"[STARTUP] 删除 pending 文件失败: {e}")
if logger:
logger.info(f"[STARTUP] 恢复完成,准备重启系统...")
time.sleep_ms(2000)
os.system("reboot")
return
else:
pending_obj['restart_count'] = restart_count + 1
try:
with open(pending_path, 'w', encoding='utf-8') as f:
json.dump(pending_obj, f)
if logger:
logger.info(f"[STARTUP] 已更新重启计数: {pending_obj['restart_count']}")
except Exception as e:
if logger:
logger.error(f"[STARTUP] 更新重启计数失败: {e}")
2025-12-30 16:23:17 +08:00
except Exception as e:
if logger:
logger.error(f"[STARTUP] 检查 pending 文件时出错: {e}")
2025-12-30 16:23:17 +08:00
try:
if logger:
logger.info(f"[STARTUP] pending 文件可能损坏,尝试恢复备份...")
if ota_manager.restore_from_backup(None):
os.remove(pending_path)
if logger:
logger.info(f"[STARTUP] 已恢复备份并删除损坏的 pending 文件")
time.sleep_ms(2000)
os.system("reboot")
return
2025-12-30 16:23:17 +08:00
except:
pass
# 4. 初始化设备IDnetwork_manager 内部会自动设置 device_id 和 password
network_manager.read_device_id()
# 5. 创建照片存储目录(如果启用图像保存)
if config.SAVE_IMAGE_ENABLED:
photo_dir = config.PHOTO_DIR
if photo_dir not in os.listdir("/root"):
2025-12-30 16:23:17 +08:00
try:
os.mkdir(photo_dir)
2025-12-30 16:23:17 +08:00
except:
pass
# 6. 启动通信与校准线程
_thread.start_new_thread(network_manager.tcp_main, ())
2026-01-22 17:55:11 +08:00
if not config.HARDCODE_LASER_POINT:
_thread.start_new_thread(laser_calibration_worker, ())
2025-12-30 16:23:17 +08:00
2026-01-20 18:40:54 +08:00
# 7. 加载激光点配置
laser_manager.load_laser_point()
if logger:
logger.info("系统准备完成...")
2025-12-30 16:23:17 +08:00
last_adc_trigger = 0
2026-02-07 17:09:39 +08:00
# 气压采样:减少日志频率(每 N 个点输出一条),避免 logger.debug 拖慢采样
PRESSURE_BATCH_SIZE = 100
pressure_buf = []
pressure_sum = 0
pressure_min = 4095
pressure_max = 0
pressure_t0_ms = None
def _flush_pressure_buf(reason: str):
if not config.AIR_PRESSURE_lOG:
return
nonlocal pressure_buf, pressure_sum, pressure_min, pressure_max, pressure_t0_ms, logger
if not pressure_buf:
return
t1_ms = time.ticks_ms()
n = len(pressure_buf)
avg = (pressure_sum / n) if n else 0
# 一行输出:方便后处理画曲线;同时带上统计信息便于快速看波峰
line = (
f"[气压批量] reason={reason} "
f"t0={pressure_t0_ms} t1={t1_ms} n={n} "
f"min={pressure_min} max={pressure_max} avg={avg:.1f} "
f"values={','.join(map(str, pressure_buf))}"
)
if logger:
logger.debug(line)
else:
print(line)
pressure_buf = []
pressure_sum = 0
pressure_min = 4095
pressure_max = 0
pressure_t0_ms = None
2025-12-30 16:23:17 +08:00
# 主循环:检测扳机触发 → 拍照 → 分析 → 上报
while not app.need_exit():
2025-12-30 16:23:17 +08:00
try:
current_time = time.ticks_ms()
# OTA 期间暂停相机预览
try:
if ota_manager.ota_in_progress:
time.sleep_ms(250)
continue
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[MAIN] OTA检查异常: {e}")
time.sleep_ms(250)
2025-12-30 16:23:17 +08:00
continue
# 读取ADC值扳机检测
2025-12-30 16:23:17 +08:00
try:
if network_manager.manual_trigger_flag:
network_manager.clear_manual_trigger()
adc_val = config.ADC_TRIGGER_THRESHOLD + 1
if logger:
logger.info("[TEST] TCP命令触发射箭")
else:
adc_val = hardware_manager.adc_obj.read()
2025-12-30 16:23:17 +08:00
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[MAIN] ADC读取异常: {e}")
time.sleep_ms(100)
continue
2026-02-07 17:09:39 +08:00
# ====== 气压采样缓存(每次循环都记录,批量输出日志)======
if pressure_t0_ms is None:
pressure_t0_ms = current_time
pressure_buf.append(adc_val)
pressure_sum += adc_val
if adc_val < pressure_min:
pressure_min = adc_val
if adc_val > pressure_max:
pressure_max = adc_val
if len(pressure_buf) >= PRESSURE_BATCH_SIZE:
_flush_pressure_buf("batch")
# if adc_val >= 2000:
# print(f"adc :{adc_val}")
if adc_val >= config.ADC_TRIGGER_THRESHOLD:
diff_ms = current_time - last_adc_trigger
if diff_ms < 3000:
continue
last_adc_trigger = current_time
2026-02-07 17:09:39 +08:00
# 触发前先把缓存刷出来,避免波形被长耗时处理截断
_flush_pressure_buf("before_trigger")
try:
2026-01-20 11:25:17 +08:00
frame = camera_manager.read_frame()
laser_point_method = None # 记录激光点选择方法
if config.HARDCODE_LASER_POINT:
# 硬编码模式:使用硬编码值
laser_point = laser_manager.laser_point
laser_point_method = "hardcode"
elif laser_manager.has_calibrated_point():
# 假如校准过,并且有保存值,使用校准值
laser_point = laser_manager.laser_point
laser_point_method = "calibrated"
logger_manager.logger.info(f"[算法] 使用校准值: {laser_manager.laser_point}")
elif distance_m and distance_m > 0:
# 动态计算模式:根据距离计算激光点
2026-01-23 11:28:40 +08:00
# 先检测靶心以获取距离(用于计算激光点)
# 第一次检测不使用激光点,仅用于获取距离
result_img_temp, center_temp, radius_temp, method_temp, best_radius1_temp, ellipse_params_temp = detect_circle_v3(frame, None)
# 计算距离
distance_m = estimate_distance(best_radius1_temp) if best_radius1_temp else None
2026-01-20 11:25:17 +08:00
laser_point = laser_manager.calculate_laser_point_from_distance(distance_m)
laser_point_method = "dynamic"
if laser_point is None:
logger = logger_manager.logger
if logger:
logger.warning("[MAIN] 激光点未初始化,跳过本次检测")
time.sleep_ms(100)
continue
x, y = laser_point
2026-01-23 11:28:40 +08:00
# 检测靶心
result_img, center, radius, method, best_radius1, ellipse_params = detect_circle_v3(frame, laser_point)
2026-01-23 11:28:40 +08:00
2026-01-20 11:25:17 +08:00
camera_manager.show(result_img)
2025-12-30 16:23:17 +08:00
# 计算偏移与距离(如果检测到靶心)
if center and radius:
2026-01-20 11:25:17 +08:00
dx, dy = laser_manager.compute_laser_position(center, (x, y), radius, method)
distance_m = estimate_distance(best_radius1)
else:
# 未检测到靶心
dx, dy = None, None
distance_m = None
if logger:
logger.warning("[MAIN] 未检测到靶心,但会保存图像")
# 快速激光测距激光一闪而过约500-600ms
laser_distance_m = None
2026-01-12 20:53:23 +08:00
laser_signal_quality = 0
2026-01-20 11:25:17 +08:00
# try:
# result = laser_manager.quick_measure_distance()
# if isinstance(result, tuple) and len(result) == 2:
# laser_distance_m, laser_signal_quality = result
# else:
# # 向后兼容:如果返回的是单个值
# laser_distance_m = result if isinstance(result, (int, float)) else 0.0
# laser_signal_quality = 0
# if logger:
# if laser_distance_m > 0:
# logger.info(f"[MAIN] 激光测距成功: {laser_distance_m:.3f} m, 信号质量: {laser_signal_quality}")
# else:
# logger.warning("[MAIN] 激光测距失败或返回0")
# except Exception as e:
# if logger:
# logger.error(f"[MAIN] 激光测距异常: {e}")
# 读取电量
voltage = get_bus_voltage()
battery_percent = voltage_to_percent(voltage)
2026-01-20 11:25:17 +08:00
# 生成射箭ID
from shot_id_generator import shot_id_generator
shot_id = shot_id_generator.generate_id() # 不需要使用device_id
if logger:
logger.info(f"[MAIN] 射箭ID: {shot_id}")
2026-01-24 11:05:03 +08:00
# 保存图像(无论是否检测到靶心都保存):放入队列由 worker 异步保存,不阻塞主循环
enqueue_save_shot(
result_img,
center,
radius,
method,
ellipse_params,
2026-01-24 11:05:03 +08:00
(x, y),
distance_m,
2026-01-20 11:25:17 +08:00
shot_id=shot_id,
2026-01-24 11:05:03 +08:00
photo_dir=config.PHOTO_DIR if config.SAVE_IMAGE_ENABLED else None,
)
2025-12-30 16:23:17 +08:00
# 构造上报数据
inner_data = {
2026-01-20 11:25:17 +08:00
"shot_id": shot_id, # 射箭ID用于关联图片和服务端日志
"x": float(dx) if dx is not None else 200.0,
"y": float(dy) if dy is not None else 200.0,
"r": 90.0,
"d": round((distance_m or 0.0) * 100), # 视觉测距值(厘米)
"d_laser": round((laser_distance_m or 0.0) * 100), # 激光测距值(厘米)
2026-01-12 20:53:23 +08:00
"d_laser_quality": laser_signal_quality, # 激光测距信号质量
"m": method if method else "no_target",
2026-01-20 11:25:17 +08:00
"adc": adc_val,
# 新增字段:激光点选择方法
"laser_method": laser_point_method, # 激光点选择方法hardcode/calibrated/dynamic/default
# 激光点坐标(像素)
"target_x": float(x), # 激光点 X 坐标(像素)
"target_y": float(y), # 激光点 Y 坐标(像素)
}
2026-01-20 11:25:17 +08:00
# 添加椭圆参数(如果存在)
if ellipse_params:
(ell_center, (width, height), angle) = ellipse_params
inner_data["ellipse_major_axis"] = float(max(width, height)) # 长轴(像素)
inner_data["ellipse_minor_axis"] = float(min(width, height)) # 短轴(像素)
inner_data["ellipse_angle"] = float(angle) # 椭圆角度(度)
inner_data["ellipse_center_x"] = float(ell_center[0]) # 椭圆中心 X 坐标(像素)
inner_data["ellipse_center_y"] = float(ell_center[1]) # 椭圆中心 Y 坐标(像素)
else:
inner_data["ellipse_major_axis"] = None
inner_data["ellipse_minor_axis"] = None
inner_data["ellipse_angle"] = None
inner_data["ellipse_center_x"] = None
inner_data["ellipse_center_y"] = None
report_data = {"cmd": 1, "data": inner_data}
network_manager.safe_enqueue(report_data, msg_type=2, high=True)
if logger:
if center and radius:
2026-01-20 11:25:17 +08:00
logger.info(f"射箭事件已加入发送队列已检测到靶心ID: {shot_id}")
else:
2026-01-20 11:25:17 +08:00
logger.info(f"射箭事件已加入发送队列未检测到靶心已保存图像ID: {shot_id}")
2025-12-30 16:23:17 +08:00
# 闪一下激光(射箭反馈)
2026-01-20 11:25:17 +08:00
laser_manager.flash_laser(1000)
2025-12-30 16:23:17 +08:00
time.sleep_ms(100)
2025-12-30 16:23:17 +08:00
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[MAIN] 图像处理异常: {e}")
import traceback
logger.error(traceback.format_exc())
time.sleep_ms(100)
continue
else:
2026-02-07 17:09:39 +08:00
if config.SHOW_CAMERA_PHOTO_WHILE_SHOOTING:
try:
camera_manager.show(camera_manager.read_frame())
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[MAIN] 显示异常: {e}")
2026-02-09 11:24:46 +08:00
time.sleep_ms(10)
2025-12-30 16:23:17 +08:00
except Exception as e:
# 主循环的顶层异常捕获,防止程序静默退出
logger = logger_manager.logger
if logger:
logger.error(f"[MAIN] 主循环异常: {e}")
import traceback
logger.error(traceback.format_exc())
else:
print(f"[MAIN] 主循环异常: {e}")
import traceback
traceback.print_exc()
2026-02-07 17:09:39 +08:00
# 异常发生时尽量把缓存刷盘,方便定位问题
try:
_flush_pressure_buf("exception")
except:
pass
time.sleep_ms(1000) # 等待1秒后继续
2025-12-30 16:23:17 +08:00
2026-01-20 11:25:17 +08:00
# 主程序入口
if __name__ == "__main__":
try:
cmd_str()
2026-01-20 11:25:17 +08:00
# 用于调用测试函数
# test()
# 用于测试图片清晰度
# 方式1: 测试单张图片
# test_sharpness("/root/phot/image.bmp")
# 方式2: 测试目录下所有图片
# test_sharpness("/root/phot")
# 方式3: 使用默认路径config.PHOTO_DIR
# test_sharpness("/root/phot/")
# 用于测试激光点清晰度
# 方式1: 测试单张图片
# test_laser_point_sharpness("/root/phot/image.bmp")
# 方式2: 测试目录下所有图片
# test_laser_point_sharpness("/root/phot")
# 方式3: 使用默认路径config.PHOTO_DIR
# test_laser_point_sharpness("/root/phot/")
except KeyboardInterrupt:
logger = logger_manager.logger
if logger:
logger.info("[MAIN] 收到中断信号,程序退出")
logger_manager.stop_logging()
2025-12-28 16:22:41 +08:00
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[MAIN] 程序异常退出: {e}")
import traceback
logger.error(traceback.format_exc())
else:
print(f"[MAIN] 程序异常退出: {e}")
import traceback
traceback.print_exc()
logger_manager.stop_logging()
raise # 重新抛出异常,让系统知道程序异常退出