Files
archery/shoot_manager.py

209 lines
7.1 KiB
Python
Raw Permalink Normal View History

2026-01-23 11:28:40 +08:00
import config
from camera_manager import camera_manager
from laser_manager import laser_manager
from logger_manager import logger_manager
from network import network_manager
from power import get_bus_voltage, voltage_to_percent
from vision import estimate_distance, detect_circle_v3, save_shot_image
from maix import camera, display, image, app, time, uart, pinmap, i2c
def analyze_shot(frame, laser_point=None):
"""
分析射箭结果算法部分可迁移到C++
:param frame: 图像帧
:param laser_point: 激光点坐标 (x, y)
:return: 包含分析结果的字典
"""
logger = logger_manager.logger
# 先检测靶心以获取距离(用于计算激光点)
result_img_temp, center_temp, radius_temp, method_temp, best_radius1_temp, ellipse_params_temp = detect_circle_v3(
frame, None)
# 计算距离
distance_m = estimate_distance(best_radius1_temp) if best_radius1_temp else None
# 根据距离动态计算激光点坐标
laser_point_method = None
if config.HARDCODE_LASER_POINT:
laser_point = laser_manager.laser_point
laser_point_method = "hardcode"
elif laser_manager.has_calibrated_point():
laser_point = laser_manager.laser_point
laser_point_method = "calibrated"
if logger:
logger.info(f"[算法] 使用校准值: {laser_manager.laser_point}")
elif distance_m and distance_m > 0:
laser_point = laser_manager.calculate_laser_point_from_distance(distance_m)
laser_point_method = "dynamic"
if logger:
logger.info(f"[算法] 使用比例尺: {laser_point}")
else:
laser_point = laser_manager.laser_point
laser_point_method = "default"
if logger:
logger.info(f"[算法] 使用默认值: {laser_point}")
if laser_point is None:
return {
"success": False,
"reason": "laser_point_not_initialized"
}
x, y = laser_point
# 绘制激光十字线
color = image.Color(config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2])
frame.draw_line(
int(x - config.LASER_LENGTH), int(y),
int(x + config.LASER_LENGTH), int(y),
color, config.LASER_THICKNESS
)
frame.draw_line(
int(x), int(y - config.LASER_LENGTH),
int(x), int(y + config.LASER_LENGTH),
color, config.LASER_THICKNESS
)
frame.draw_circle(int(x), int(y), 1, color, config.LASER_THICKNESS)
# 重新检测靶心(使用计算出的激光点)
result_img, center, radius, method, best_radius1, ellipse_params = detect_circle_v3(frame, laser_point)
# 计算偏移与距离
if center and radius:
dx, dy = laser_manager.compute_laser_position(center, (x, y), radius, method)
distance_m = estimate_distance(best_radius1)
else:
dx, dy = None, None
distance_m = None
# 返回分析结果
return {
"success": True,
"result_img": result_img,
"center": center,
"radius": radius,
"method": method,
"best_radius1": best_radius1,
"ellipse_params": ellipse_params,
"dx": dx,
"dy": dy,
"distance_m": distance_m,
"laser_point": laser_point,
"laser_point_method": laser_point_method
}
def process_shot(adc_val):
"""
处理射箭事件逻辑控制部分
:param adc_val: ADC触发值
:return: None
"""
logger = logger_manager.logger
try:
frame = camera_manager.read_frame()
# 调用算法分析
analysis_result = analyze_shot(frame)
if not analysis_result.get("success"):
reason = analysis_result.get("reason", "unknown")
if logger:
logger.warning(f"[MAIN] 射箭分析失败: {reason}")
time.sleep_ms(100)
return
# 提取分析结果
result_img = analysis_result["result_img"]
center = analysis_result["center"]
radius = analysis_result["radius"]
method = analysis_result["method"]
ellipse_params = analysis_result["ellipse_params"]
dx = analysis_result["dx"]
dy = analysis_result["dy"]
distance_m = analysis_result["distance_m"]
laser_point = analysis_result["laser_point"]
laser_point_method = analysis_result["laser_point_method"]
x, y = laser_point
camera_manager.show(result_img)
if not (center and radius) and logger:
logger.warning("[MAIN] 未检测到靶心,但会保存图像")
# 读取电量
voltage = get_bus_voltage()
battery_percent = voltage_to_percent(voltage)
# 生成射箭ID
from shot_id_generator import shot_id_generator
shot_id = shot_id_generator.generate_id()
if logger:
logger.info(f"[MAIN] 射箭ID: {shot_id}")
# 保存图像
save_shot_image(
result_img,
center,
radius,
method,
ellipse_params,
(x, y),
distance_m,
shot_id=shot_id,
photo_dir=config.PHOTO_DIR if config.SAVE_IMAGE_ENABLED else None
)
# 构造上报数据
inner_data = {
"shot_id": shot_id,
"x": float(dx) if dx is not None else 200.0,
"y": float(dy) if dy is not None else 200.0,
"r": 90.0,
"d": round((distance_m or 0.0) * 100),
"d_laser": 0.0,
"d_laser_quality": 0,
"m": method if method else "no_target",
"adc": adc_val,
"laser_method": laser_point_method,
"target_x": float(x),
"target_y": float(y),
}
if ellipse_params:
(ell_center, (width, height), angle) = ellipse_params
inner_data["ellipse_major_axis"] = float(max(width, height))
inner_data["ellipse_minor_axis"] = float(min(width, height))
inner_data["ellipse_angle"] = float(angle)
inner_data["ellipse_center_x"] = float(ell_center[0])
inner_data["ellipse_center_y"] = float(ell_center[1])
else:
inner_data["ellipse_major_axis"] = None
inner_data["ellipse_minor_axis"] = None
inner_data["ellipse_angle"] = None
inner_data["ellipse_center_x"] = None
inner_data["ellipse_center_y"] = None
report_data = {"cmd": 1, "data": inner_data}
network_manager.safe_enqueue(report_data, msg_type=2, high=True)
if logger:
if center and radius:
logger.info(f"射箭事件已加入发送队列已检测到靶心ID: {shot_id}")
else:
logger.info(f"射箭事件已加入发送队列未检测到靶心已保存图像ID: {shot_id}")
# 闪一下激光(射箭反馈)
laser_manager.flash_laser(1000)
time.sleep_ms(100)
except Exception as e:
if logger:
logger.error(f"[MAIN] 图像处理异常: {e}")
import traceback
logger.error(traceback.format_exc())
time.sleep_ms(100)