425 lines
17 KiB
Python
425 lines
17 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
激光射击系统主程序(视觉测距版)
|
||
功能:目标检测、激光校准、4G TCP 通信、OTA 升级、单目测距、INA226 电量监测
|
||
平台:MaixPy (Sipeed MAIX)
|
||
作者:ZZH
|
||
最后更新:2025-11-21
|
||
|
||
重构版本:使用模块化设计
|
||
"""
|
||
from maix import camera, display, image, app, time, uart, pinmap, i2c
|
||
from maix.peripheral import adc
|
||
import _thread
|
||
import os
|
||
import json
|
||
|
||
# 导入新模块
|
||
import config
|
||
from version import VERSION
|
||
# from logger import init_logging, get_logger, stop_logging
|
||
from logger_manager import logger_manager
|
||
from time_sync import sync_system_time_from_4g
|
||
from power import init_ina226, get_bus_voltage, voltage_to_percent
|
||
from laser_manager import laser_manager
|
||
from vision import detect_circle_v3, estimate_distance, compute_laser_position, save_shot_image
|
||
from network import network_manager
|
||
from ota_manager import ota_manager
|
||
from hardware import hardware_manager
|
||
|
||
|
||
def laser_calibration_worker():
|
||
"""后台线程:持续检测是否需要执行激光校准"""
|
||
from maix import camera
|
||
from laser_manager import laser_manager
|
||
from ota_manager import ota_manager
|
||
|
||
logger = logger_manager.logger
|
||
if logger:
|
||
logger.info("[LASER] 激光校准线程启动")
|
||
|
||
while True:
|
||
try:
|
||
try:
|
||
if ota_manager.ota_in_progress:
|
||
time.sleep_ms(200)
|
||
continue
|
||
except Exception as e:
|
||
logger = logger_manager.logger
|
||
if logger:
|
||
logger.error(f"[LASER] OTA检查异常: {e}")
|
||
time.sleep_ms(200)
|
||
continue
|
||
|
||
if laser_manager.calibration_active:
|
||
cam = None
|
||
try:
|
||
cam = camera.Camera(640, 480)
|
||
start = time.ticks_ms()
|
||
timeout_ms = 8000
|
||
while laser_manager.calibration_active and time.ticks_diff(time.ticks_ms(), start) < timeout_ms:
|
||
frame = cam.read()
|
||
pos = laser_manager.find_red_laser(frame)
|
||
if pos:
|
||
laser_manager.set_calibration_result(pos)
|
||
laser_manager.stop_calibration()
|
||
laser_manager.save_laser_point(pos)
|
||
logger = logger_manager.logger
|
||
if logger:
|
||
logger.info(f"✅ 后台校准成功: {pos}")
|
||
break
|
||
time.sleep_ms(60)
|
||
except Exception as e:
|
||
logger = logger_manager.logger
|
||
if logger:
|
||
logger.error(f"[LASER] calibration error: {e}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
time.sleep_ms(200)
|
||
finally:
|
||
try:
|
||
if cam is not None:
|
||
del cam
|
||
except Exception as e:
|
||
logger = logger_manager.logger
|
||
if logger:
|
||
logger.error(f"[LASER] 释放相机资源异常: {e}")
|
||
|
||
if laser_manager.calibration_active:
|
||
time.sleep_ms(300)
|
||
else:
|
||
time.sleep_ms(50)
|
||
except Exception as e:
|
||
# 线程顶层异常捕获,防止线程静默退出
|
||
logger = logger_manager.logger
|
||
if logger:
|
||
logger.error(f"[LASER] 校准线程异常: {e}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
else:
|
||
print(f"[LASER] 校准线程异常: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
time.sleep_ms(1000) # 等待1秒后继续
|
||
|
||
|
||
def cmd_str():
|
||
"""主程序入口"""
|
||
# ==================== 第一阶段:硬件初始化 ====================
|
||
# 按照 main104.py 的顺序,先完成所有硬件初始化
|
||
|
||
# 1. 引脚功能映射
|
||
for pin, func in config.PIN_MAPPINGS.items():
|
||
try:
|
||
pinmap.set_pin_function(pin, func)
|
||
except:
|
||
pass
|
||
|
||
# 2. 初始化硬件对象(UART、I2C、ADC)
|
||
hardware_manager.init_uart4g()
|
||
hardware_manager.init_distance_serial()
|
||
hardware_manager.init_bus()
|
||
hardware_manager.init_adc()
|
||
hardware_manager.init_at_client()
|
||
|
||
# 3. 初始化 INA226 电量监测芯片
|
||
init_ina226()
|
||
|
||
# 4. 加载激光点配置
|
||
laser_manager.load_laser_point()
|
||
|
||
# 5. 初始化显示和相机
|
||
disp = display.Display()
|
||
cam = camera.Camera(640, 480)
|
||
|
||
# ==================== 第二阶段:软件初始化 ====================
|
||
|
||
# 1. 初始化日志系统
|
||
import logging
|
||
logger_manager.init_logging(log_level=logging.DEBUG)
|
||
logger = logger_manager.logger
|
||
|
||
# 2. 从4G模块同步系统时间(需要 at_client 已初始化)
|
||
sync_system_time_from_4g()
|
||
|
||
# 3. 启动时检查:是否需要恢复备份
|
||
pending_path = f"{config.APP_DIR}/ota_pending.json"
|
||
if os.path.exists(pending_path):
|
||
try:
|
||
with open(pending_path, 'r', encoding='utf-8') as f:
|
||
pending_obj = json.load(f)
|
||
|
||
restart_count = pending_obj.get('restart_count', 0)
|
||
max_restarts = pending_obj.get('max_restarts', 3)
|
||
backup_dir = pending_obj.get('backup_dir')
|
||
|
||
if logger:
|
||
logger.info(f"检测到 ota_pending.json,重启计数: {restart_count}/{max_restarts}")
|
||
|
||
if restart_count >= max_restarts:
|
||
if logger:
|
||
logger.error(f"[STARTUP] 重启次数 ({restart_count}) 超过阈值 ({max_restarts}),执行恢复...")
|
||
|
||
if backup_dir and os.path.exists(backup_dir):
|
||
if ota_manager.restore_from_backup(backup_dir):
|
||
if logger:
|
||
logger.info(f"[STARTUP] 已从指定备份恢复: {backup_dir}")
|
||
else:
|
||
if logger:
|
||
logger.info(f"[STARTUP] 指定备份恢复失败,尝试恢复最新备份...")
|
||
ota_manager.restore_from_backup(None)
|
||
else:
|
||
if ota_manager.restore_from_backup(None):
|
||
if logger:
|
||
logger.info(f"[STARTUP] 已从最新备份恢复")
|
||
else:
|
||
if logger:
|
||
logger.error(f"[STARTUP] 恢复备份失败")
|
||
|
||
try:
|
||
os.remove(pending_path)
|
||
if logger:
|
||
logger.info(f"[STARTUP] 已删除 ota_pending.json")
|
||
except Exception as e:
|
||
if logger:
|
||
logger.error(f"[STARTUP] 删除 pending 文件失败: {e}")
|
||
|
||
if logger:
|
||
logger.info(f"[STARTUP] 恢复完成,准备重启系统...")
|
||
time.sleep_ms(2000)
|
||
os.system("reboot")
|
||
return
|
||
else:
|
||
pending_obj['restart_count'] = restart_count + 1
|
||
try:
|
||
with open(pending_path, 'w', encoding='utf-8') as f:
|
||
json.dump(pending_obj, f)
|
||
if logger:
|
||
logger.info(f"[STARTUP] 已更新重启计数: {pending_obj['restart_count']}")
|
||
except Exception as e:
|
||
if logger:
|
||
logger.error(f"[STARTUP] 更新重启计数失败: {e}")
|
||
except Exception as e:
|
||
if logger:
|
||
logger.error(f"[STARTUP] 检查 pending 文件时出错: {e}")
|
||
try:
|
||
if logger:
|
||
logger.info(f"[STARTUP] pending 文件可能损坏,尝试恢复备份...")
|
||
if ota_manager.restore_from_backup(None):
|
||
os.remove(pending_path)
|
||
if logger:
|
||
logger.info(f"[STARTUP] 已恢复备份并删除损坏的 pending 文件")
|
||
time.sleep_ms(2000)
|
||
os.system("reboot")
|
||
return
|
||
except:
|
||
pass
|
||
|
||
# 4. 初始化设备ID(network_manager 内部会自动设置 device_id 和 password)
|
||
network_manager.read_device_id()
|
||
|
||
# 5. 创建照片存储目录(如果启用图像保存)
|
||
if config.SAVE_IMAGE_ENABLED:
|
||
photo_dir = config.PHOTO_DIR
|
||
if photo_dir not in os.listdir("/root"):
|
||
try:
|
||
os.mkdir(photo_dir)
|
||
except:
|
||
pass
|
||
|
||
# 6. 启动通信与校准线程
|
||
_thread.start_new_thread(network_manager.tcp_main, ())
|
||
_thread.start_new_thread(laser_calibration_worker, ())
|
||
|
||
if logger:
|
||
logger.info("系统准备完成...")
|
||
|
||
last_adc_trigger = 0
|
||
|
||
# 主循环:检测扳机触发 → 拍照 → 分析 → 上报
|
||
while not app.need_exit():
|
||
try:
|
||
current_time = time.ticks_ms()
|
||
|
||
# OTA 期间暂停相机预览
|
||
try:
|
||
if ota_manager.ota_in_progress:
|
||
time.sleep_ms(250)
|
||
continue
|
||
except Exception as e:
|
||
logger = logger_manager.logger
|
||
if logger:
|
||
logger.error(f"[MAIN] OTA检查异常: {e}")
|
||
time.sleep_ms(250)
|
||
continue
|
||
|
||
# 读取ADC值(扳机检测)
|
||
try:
|
||
if network_manager.manual_trigger_flag:
|
||
network_manager.clear_manual_trigger()
|
||
adc_val = config.ADC_TRIGGER_THRESHOLD + 1
|
||
if logger:
|
||
logger.info("[TEST] TCP命令触发射箭")
|
||
else:
|
||
adc_val = hardware_manager.adc_obj.read()
|
||
except Exception as e:
|
||
logger = logger_manager.logger
|
||
if logger:
|
||
logger.error(f"[MAIN] ADC读取异常: {e}")
|
||
time.sleep_ms(100)
|
||
continue
|
||
|
||
if adc_val > config.ADC_TRIGGER_THRESHOLD:
|
||
diff_ms = current_time - last_adc_trigger
|
||
if diff_ms < 3000:
|
||
continue
|
||
last_adc_trigger = current_time
|
||
|
||
try:
|
||
frame = cam.read()
|
||
laser_point = laser_manager.laser_point
|
||
if laser_point is None:
|
||
logger = logger_manager.logger
|
||
if logger:
|
||
logger.warning("[MAIN] 激光点未初始化,跳过本次检测")
|
||
time.sleep_ms(100)
|
||
continue
|
||
|
||
x, y = laser_point
|
||
|
||
# 绘制激光十字线
|
||
color = image.Color(config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2])
|
||
frame.draw_line(
|
||
int(x - config.LASER_LENGTH), int(y),
|
||
int(x + config.LASER_LENGTH), int(y),
|
||
color, config.LASER_THICKNESS
|
||
)
|
||
frame.draw_line(
|
||
int(x), int(y - config.LASER_LENGTH),
|
||
int(x), int(y + config.LASER_LENGTH),
|
||
color, config.LASER_THICKNESS
|
||
)
|
||
frame.draw_circle(int(x), int(y), 1, color, config.LASER_THICKNESS)
|
||
|
||
# 检测靶心
|
||
result_img, center, radius, method, best_radius1, ellipse_params = detect_circle_v3(frame, laser_point)
|
||
disp.show(result_img)
|
||
|
||
# 计算偏移与距离(如果检测到靶心)
|
||
if center and radius:
|
||
dx, dy = compute_laser_position(center, (x, y), radius, method)
|
||
distance_m = estimate_distance(best_radius1)
|
||
else:
|
||
# 未检测到靶心
|
||
dx, dy = None, None
|
||
distance_m = None
|
||
if logger:
|
||
logger.warning("[MAIN] 未检测到靶心,但会保存图像")
|
||
|
||
# 快速激光测距(激光一闪而过,约500-600ms)
|
||
laser_distance_m = None
|
||
try:
|
||
laser_distance_m = laser_manager.quick_measure_distance()
|
||
if logger:
|
||
if laser_distance_m > 0:
|
||
logger.info(f"[MAIN] 激光测距成功: {laser_distance_m:.3f} m")
|
||
else:
|
||
logger.warning("[MAIN] 激光测距失败或返回0")
|
||
except Exception as e:
|
||
if logger:
|
||
logger.error(f"[MAIN] 激光测距异常: {e}")
|
||
|
||
# 读取电量
|
||
voltage = get_bus_voltage()
|
||
battery_percent = voltage_to_percent(voltage)
|
||
|
||
# 保存图像(无论是否检测到靶心都保存)
|
||
# save_shot_image 函数会确保绘制激光十字线和检测标注(如果有)
|
||
# 如果未检测到靶心,文件名会包含 "no_target" 标识
|
||
save_shot_image(
|
||
result_img,
|
||
center,
|
||
radius,
|
||
method,
|
||
ellipse_params,
|
||
(x, y),
|
||
distance_m,
|
||
photo_dir=config.PHOTO_DIR if config.SAVE_IMAGE_ENABLED else None
|
||
)
|
||
|
||
# 构造上报数据
|
||
inner_data = {
|
||
"x": float(dx) if dx is not None else 200.0,
|
||
"y": float(dy) if dy is not None else 200.0,
|
||
"r": 90.0,
|
||
"d": round((distance_m or 0.0) * 100), # 视觉测距值(厘米)
|
||
"d_laser": round((laser_distance_m or 0.0) * 100), # 激光测距值(厘米)
|
||
"m": method if method else "no_target",
|
||
"adc": adc_val
|
||
}
|
||
report_data = {"cmd": 1, "data": inner_data}
|
||
network_manager.safe_enqueue(report_data, msg_type=2, high=True)
|
||
if logger:
|
||
if center and radius:
|
||
logger.info("射箭事件已加入发送队列(已检测到靶心)")
|
||
else:
|
||
logger.info("射箭事件已加入发送队列(未检测到靶心,已保存图像)")
|
||
|
||
# 闪一下激光(射箭反馈)
|
||
# laser_manager.flash_laser(1000)
|
||
|
||
time.sleep_ms(100)
|
||
except Exception as e:
|
||
logger = logger_manager.logger
|
||
if logger:
|
||
logger.error(f"[MAIN] 图像处理异常: {e}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
time.sleep_ms(100)
|
||
continue
|
||
else:
|
||
try:
|
||
disp.show(cam.read())
|
||
except Exception as e:
|
||
logger = logger_manager.logger
|
||
if logger:
|
||
logger.error(f"[MAIN] 显示异常: {e}")
|
||
time.sleep_ms(50)
|
||
|
||
except Exception as e:
|
||
# 主循环的顶层异常捕获,防止程序静默退出
|
||
logger = logger_manager.logger
|
||
if logger:
|
||
logger.error(f"[MAIN] 主循环异常: {e}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
else:
|
||
print(f"[MAIN] 主循环异常: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
time.sleep_ms(1000) # 等待1秒后继续
|
||
|
||
|
||
# 主程序入口
|
||
if __name__ == "__main__":
|
||
try:
|
||
cmd_str()
|
||
except KeyboardInterrupt:
|
||
logger = logger_manager.logger
|
||
if logger:
|
||
logger.info("[MAIN] 收到中断信号,程序退出")
|
||
logger_manager.stop_logging()
|
||
except Exception as e:
|
||
logger = logger_manager.logger
|
||
if logger:
|
||
logger.error(f"[MAIN] 程序异常退出: {e}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
else:
|
||
print(f"[MAIN] 程序异常退出: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
logger_manager.stop_logging()
|
||
raise # 重新抛出异常,让系统知道程序异常退出
|