Files
archery/main.py
gcw_4spBpAfv 28fb62e5d6 v1.2.1
2026-01-23 11:28:40 +08:00

467 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
激光射击系统主程序(视觉测距版)
功能目标检测、激光校准、4G TCP 通信、OTA 升级、单目测距、INA226 电量监测
平台MaixPy (Sipeed MAIX)
作者ZZH
最后更新2025-11-21
重构版本:使用模块化设计
"""
from maix import camera, display, image, app, time, uart, pinmap, i2c
from maix.peripheral import adc
import _thread
import os
import json
# 导入新模块
import config
from version import VERSION
# from logger import init_logging, get_logger, stop_logging
from logger_manager import logger_manager
from time_sync import sync_system_time_from_4g
from power import init_ina226, get_bus_voltage, voltage_to_percent
from laser_manager import laser_manager
from vision import detect_circle_v3, estimate_distance, save_shot_image, save_calibration_image
from network import network_manager
from ota_manager import ota_manager
from hardware import hardware_manager
from camera_manager import camera_manager
def laser_calibration_worker():
"""后台线程:持续检测是否需要执行激光校准"""
from laser_manager import laser_manager
from ota_manager import ota_manager
logger = logger_manager.logger
if logger:
logger.info("[LASER] 激光校准线程启动")
while True:
try:
try:
if ota_manager.ota_in_progress:
time.sleep_ms(200)
continue
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[LASER] OTA检查异常: {e}")
time.sleep_ms(200)
continue
if laser_manager.calibration_active:
# 调用校准方法,所有逻辑都在 LaserManager 中
result = laser_manager.calibrate_laser_position(timeout_ms=8000, check_sharpness=True)
# 如果超时仍未成功,稍微休息一下
if laser_manager.calibration_active:
time.sleep_ms(300)
else:
time.sleep_ms(50)
except Exception as e:
# 线程顶层异常捕获,防止线程静默退出
logger = logger_manager.logger
if logger:
logger.error(f"[LASER] 校准线程异常: {e}")
import traceback
logger.error(traceback.format_exc())
else:
print(f"[LASER] 校准线程异常: {e}")
import traceback
traceback.print_exc()
time.sleep_ms(1000) # 等待1秒后继续
def cmd_str():
"""主程序入口"""
# ==================== 第一阶段:硬件初始化 ====================
# 按照 main104.py 的顺序,先完成所有硬件初始化
# 1. 引脚功能映射
for pin, func in config.PIN_MAPPINGS.items():
try:
pinmap.set_pin_function(pin, func)
except:
pass
# 2. 初始化硬件对象UART、I2C、ADC
hardware_manager.init_uart4g()
hardware_manager.init_distance_serial()
hardware_manager.init_bus()
hardware_manager.init_adc()
hardware_manager.init_at_client()
# 3. 初始化 INA226 电量监测芯片
init_ina226()
# 4. 初始化显示和相机
camera_manager.init_camera(640, 480)
camera_manager.init_display()
# ==================== 第二阶段:软件初始化 ====================
# 1. 初始化日志系统
import logging
logger_manager.init_logging(log_level=logging.DEBUG)
logger = logger_manager.logger
# 2. 从4G模块同步系统时间需要 at_client 已初始化)
sync_system_time_from_4g()
# 3. 启动时检查:是否需要恢复备份
pending_path = f"{config.APP_DIR}/ota_pending.json"
if os.path.exists(pending_path):
try:
with open(pending_path, 'r', encoding='utf-8') as f:
pending_obj = json.load(f)
restart_count = pending_obj.get('restart_count', 0)
max_restarts = pending_obj.get('max_restarts', 3)
backup_dir = pending_obj.get('backup_dir')
if logger:
logger.info(f"检测到 ota_pending.json重启计数: {restart_count}/{max_restarts}")
if restart_count >= max_restarts:
if logger:
logger.error(f"[STARTUP] 重启次数 ({restart_count}) 超过阈值 ({max_restarts}),执行恢复...")
if backup_dir and os.path.exists(backup_dir):
if ota_manager.restore_from_backup(backup_dir):
if logger:
logger.info(f"[STARTUP] 已从指定备份恢复: {backup_dir}")
else:
if logger:
logger.info(f"[STARTUP] 指定备份恢复失败,尝试恢复最新备份...")
ota_manager.restore_from_backup(None)
else:
if ota_manager.restore_from_backup(None):
if logger:
logger.info(f"[STARTUP] 已从最新备份恢复")
else:
if logger:
logger.error(f"[STARTUP] 恢复备份失败")
try:
os.remove(pending_path)
if logger:
logger.info(f"[STARTUP] 已删除 ota_pending.json")
except Exception as e:
if logger:
logger.error(f"[STARTUP] 删除 pending 文件失败: {e}")
if logger:
logger.info(f"[STARTUP] 恢复完成,准备重启系统...")
time.sleep_ms(2000)
os.system("reboot")
return
else:
pending_obj['restart_count'] = restart_count + 1
try:
with open(pending_path, 'w', encoding='utf-8') as f:
json.dump(pending_obj, f)
if logger:
logger.info(f"[STARTUP] 已更新重启计数: {pending_obj['restart_count']}")
except Exception as e:
if logger:
logger.error(f"[STARTUP] 更新重启计数失败: {e}")
except Exception as e:
if logger:
logger.error(f"[STARTUP] 检查 pending 文件时出错: {e}")
try:
if logger:
logger.info(f"[STARTUP] pending 文件可能损坏,尝试恢复备份...")
if ota_manager.restore_from_backup(None):
os.remove(pending_path)
if logger:
logger.info(f"[STARTUP] 已恢复备份并删除损坏的 pending 文件")
time.sleep_ms(2000)
os.system("reboot")
return
except:
pass
# 4. 初始化设备IDnetwork_manager 内部会自动设置 device_id 和 password
network_manager.read_device_id()
# 5. 创建照片存储目录(如果启用图像保存)
if config.SAVE_IMAGE_ENABLED:
photo_dir = config.PHOTO_DIR
if photo_dir not in os.listdir("/root"):
try:
os.mkdir(photo_dir)
except:
pass
# 6. 启动通信与校准线程
_thread.start_new_thread(network_manager.tcp_main, ())
if not config.HARDCODE_LASER_POINT:
_thread.start_new_thread(laser_calibration_worker, ())
# 7. 加载激光点配置
laser_manager.load_laser_point()
if logger:
logger.info("系统准备完成...")
last_adc_trigger = 0
# 主循环:检测扳机触发 → 拍照 → 分析 → 上报
while not app.need_exit():
try:
current_time = time.ticks_ms()
# OTA 期间暂停相机预览
try:
if ota_manager.ota_in_progress:
time.sleep_ms(250)
continue
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[MAIN] OTA检查异常: {e}")
time.sleep_ms(250)
continue
# 读取ADC值扳机检测
try:
if network_manager.manual_trigger_flag:
network_manager.clear_manual_trigger()
adc_val = config.ADC_TRIGGER_THRESHOLD + 1
if logger:
logger.info("[TEST] TCP命令触发射箭")
else:
adc_val = hardware_manager.adc_obj.read()
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[MAIN] ADC读取异常: {e}")
time.sleep_ms(100)
continue
if adc_val > config.ADC_TRIGGER_THRESHOLD:
diff_ms = current_time - last_adc_trigger
if diff_ms < 3000:
continue
last_adc_trigger = current_time
try:
frame = camera_manager.read_frame()
laser_point_method = None # 记录激光点选择方法
if config.HARDCODE_LASER_POINT:
# 硬编码模式:使用硬编码值
laser_point = laser_manager.laser_point
laser_point_method = "hardcode"
elif laser_manager.has_calibrated_point():
# 假如校准过,并且有保存值,使用校准值
laser_point = laser_manager.laser_point
laser_point_method = "calibrated"
logger_manager.logger.info(f"[算法] 使用校准值: {laser_manager.laser_point}")
elif distance_m and distance_m > 0:
# 动态计算模式:根据距离计算激光点
# 先检测靶心以获取距离(用于计算激光点)
# 第一次检测不使用激光点,仅用于获取距离
result_img_temp, center_temp, radius_temp, method_temp, best_radius1_temp, ellipse_params_temp = detect_circle_v3(frame, None)
# 计算距离
distance_m = estimate_distance(best_radius1_temp) if best_radius1_temp else None
laser_point = laser_manager.calculate_laser_point_from_distance(distance_m)
laser_point_method = "dynamic"
if laser_point is None:
logger = logger_manager.logger
if logger:
logger.warning("[MAIN] 激光点未初始化,跳过本次检测")
time.sleep_ms(100)
continue
x, y = laser_point
# 检测靶心
result_img, center, radius, method, best_radius1, ellipse_params = detect_circle_v3(frame, laser_point)
camera_manager.show(result_img)
# 计算偏移与距离(如果检测到靶心)
if center and radius:
dx, dy = laser_manager.compute_laser_position(center, (x, y), radius, method)
distance_m = estimate_distance(best_radius1)
else:
# 未检测到靶心
dx, dy = None, None
distance_m = None
if logger:
logger.warning("[MAIN] 未检测到靶心,但会保存图像")
# 快速激光测距激光一闪而过约500-600ms
laser_distance_m = None
laser_signal_quality = 0
# try:
# result = laser_manager.quick_measure_distance()
# if isinstance(result, tuple) and len(result) == 2:
# laser_distance_m, laser_signal_quality = result
# else:
# # 向后兼容:如果返回的是单个值
# laser_distance_m = result if isinstance(result, (int, float)) else 0.0
# laser_signal_quality = 0
# if logger:
# if laser_distance_m > 0:
# logger.info(f"[MAIN] 激光测距成功: {laser_distance_m:.3f} m, 信号质量: {laser_signal_quality}")
# else:
# logger.warning("[MAIN] 激光测距失败或返回0")
# except Exception as e:
# if logger:
# logger.error(f"[MAIN] 激光测距异常: {e}")
# 读取电量
voltage = get_bus_voltage()
battery_percent = voltage_to_percent(voltage)
# 生成射箭ID
from shot_id_generator import shot_id_generator
shot_id = shot_id_generator.generate_id() # 不需要使用device_id
if logger:
logger.info(f"[MAIN] 射箭ID: {shot_id}")
# 保存图像(无论是否检测到靶心都保存)
# save_shot_image 函数会确保绘制激光十字线和检测标注(如果有)
# 如果未检测到靶心,文件名会包含 "no_target" 标识
save_shot_image(
result_img,
center,
radius,
method,
ellipse_params,
(x, y),
distance_m,
shot_id=shot_id,
photo_dir=config.PHOTO_DIR if config.SAVE_IMAGE_ENABLED else None
)
# 构造上报数据
inner_data = {
"shot_id": shot_id, # 射箭ID用于关联图片和服务端日志
"x": float(dx) if dx is not None else 200.0,
"y": float(dy) if dy is not None else 200.0,
"r": 90.0,
"d": round((distance_m or 0.0) * 100), # 视觉测距值(厘米)
"d_laser": round((laser_distance_m or 0.0) * 100), # 激光测距值(厘米)
"d_laser_quality": laser_signal_quality, # 激光测距信号质量
"m": method if method else "no_target",
"adc": adc_val,
# 新增字段:激光点选择方法
"laser_method": laser_point_method, # 激光点选择方法hardcode/calibrated/dynamic/default
# 激光点坐标(像素)
"target_x": float(x), # 激光点 X 坐标(像素)
"target_y": float(y), # 激光点 Y 坐标(像素)
}
# 添加椭圆参数(如果存在)
if ellipse_params:
(ell_center, (width, height), angle) = ellipse_params
inner_data["ellipse_major_axis"] = float(max(width, height)) # 长轴(像素)
inner_data["ellipse_minor_axis"] = float(min(width, height)) # 短轴(像素)
inner_data["ellipse_angle"] = float(angle) # 椭圆角度(度)
inner_data["ellipse_center_x"] = float(ell_center[0]) # 椭圆中心 X 坐标(像素)
inner_data["ellipse_center_y"] = float(ell_center[1]) # 椭圆中心 Y 坐标(像素)
else:
inner_data["ellipse_major_axis"] = None
inner_data["ellipse_minor_axis"] = None
inner_data["ellipse_angle"] = None
inner_data["ellipse_center_x"] = None
inner_data["ellipse_center_y"] = None
report_data = {"cmd": 1, "data": inner_data}
network_manager.safe_enqueue(report_data, msg_type=2, high=True)
if logger:
if center and radius:
logger.info(f"射箭事件已加入发送队列已检测到靶心ID: {shot_id}")
else:
logger.info(f"射箭事件已加入发送队列未检测到靶心已保存图像ID: {shot_id}")
# 闪一下激光(射箭反馈)
laser_manager.flash_laser(1000)
time.sleep_ms(100)
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[MAIN] 图像处理异常: {e}")
import traceback
logger.error(traceback.format_exc())
time.sleep_ms(100)
continue
else:
try:
camera_manager.show(camera_manager.read_frame())
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[MAIN] 显示异常: {e}")
time.sleep_ms(50)
except Exception as e:
# 主循环的顶层异常捕获,防止程序静默退出
logger = logger_manager.logger
if logger:
logger.error(f"[MAIN] 主循环异常: {e}")
import traceback
logger.error(traceback.format_exc())
else:
print(f"[MAIN] 主循环异常: {e}")
import traceback
traceback.print_exc()
time.sleep_ms(1000) # 等待1秒后继续
# 主程序入口
if __name__ == "__main__":
try:
cmd_str()
# 用于调用测试函数
# test()
# 用于测试图片清晰度
# 方式1: 测试单张图片
# test_sharpness("/root/phot/image.bmp")
# 方式2: 测试目录下所有图片
# test_sharpness("/root/phot")
# 方式3: 使用默认路径config.PHOTO_DIR
# test_sharpness("/root/phot/")
# 用于测试激光点清晰度
# 方式1: 测试单张图片
# test_laser_point_sharpness("/root/phot/image.bmp")
# 方式2: 测试目录下所有图片
# test_laser_point_sharpness("/root/phot")
# 方式3: 使用默认路径config.PHOTO_DIR
# test_laser_point_sharpness("/root/phot/")
except KeyboardInterrupt:
logger = logger_manager.logger
if logger:
logger.info("[MAIN] 收到中断信号,程序退出")
logger_manager.stop_logging()
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[MAIN] 程序异常退出: {e}")
import traceback
logger.error(traceback.format_exc())
else:
print(f"[MAIN] 程序异常退出: {e}")
import traceback
traceback.print_exc()
logger_manager.stop_logging()
raise # 重新抛出异常,让系统知道程序异常退出