Files
archery/shoot_manager.py
gcw_4spBpAfv 43e7e0ba17 new shoot algo
2026-04-17 18:31:44 +08:00

393 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import threading
import config
from camera_manager import camera_manager
from laser_manager import laser_manager
from logger_manager import logger_manager
from network import network_manager
from triangle_target import load_camera_from_xml, load_triangle_positions, try_triangle_scoring
from vision import estimate_distance, detect_circle_v3, enqueue_save_shot
from maix import image, time
# 缓存相机标定与三角形位置,避免每次射箭重复读磁盘
_tri_calib_cache = None
def _get_triangle_calib():
"""返回 (K, dist, marker_positions);首次调用时从磁盘加载并缓存。"""
global _tri_calib_cache
if _tri_calib_cache is not None:
return _tri_calib_cache
calib_path = getattr(config, "CAMERA_CALIB_XML", "")
tri_json = getattr(config, "TRIANGLE_POSITIONS_JSON", "")
if not (os.path.isfile(calib_path) and os.path.isfile(tri_json)):
_tri_calib_cache = (None, None, None)
return _tri_calib_cache
K, dist = load_camera_from_xml(calib_path)
pos = load_triangle_positions(tri_json)
_tri_calib_cache = (K, dist, pos)
return _tri_calib_cache
def preload_triangle_calib():
"""
启动阶段预加载三角形标定与坐标文件,避免首次射箭触发时的读盘/解析开销。
"""
try:
_get_triangle_calib()
except Exception:
# 预加载失败不影响主流程;射箭时会再次按需尝试
pass
def analyze_shot(frame, laser_point=None):
"""
分析射箭结果算法部分可迁移到C++
:param frame: 图像帧
:param laser_point: 激光点坐标 (x, y)
:return: 包含分析结果的字典
优先级:
1. 三角形单应性USE_TRIANGLE_OFFSET=True 时)— 成功则直接返回,跳过圆形检测
2. 圆形检测(三角形不可用或识别失败时兜底)
"""
logger = logger_manager.logger
from datetime import datetime
# ── Step 1: 确定激光点 ────────────────────────────────────────────────────
laser_point_method = None
distance_m_first = None
if config.HARDCODE_LASER_POINT:
laser_point = laser_manager.laser_point
laser_point_method = "hardcode"
elif laser_manager.has_calibrated_point():
laser_point = laser_manager.laser_point
laser_point_method = "calibrated"
if logger:
logger.info(f"[算法] 使用校准值: {laser_manager.laser_point}")
else:
# 动态模式:先做一次无激光点检测以估算距离,再推算激光点
_, _, _, _, best_radius1_temp, _ = detect_circle_v3(frame, None)
distance_m_first = estimate_distance(best_radius1_temp) if best_radius1_temp else None
if distance_m_first and distance_m_first > 0:
laser_point = laser_manager.calculate_laser_point_from_distance(distance_m_first)
laser_point_method = "dynamic"
if logger:
logger.info(f"[算法] 使用比例尺: {laser_point}")
else:
laser_point = laser_manager.laser_point
laser_point_method = "default"
if logger:
logger.info(f"[算法] 使用默认值: {laser_point}")
if laser_point is None:
return {"success": False, "reason": "laser_point_not_initialized"}
x, y = laser_point
# ── Step 2: 提前转换一次图像,两个检测线程共享(只读)────────────────────────
img_cv = image.image2cv(frame, False, False)
# ── Step 3: 检查三角形是否可用 ────────────────────────────────────────────────
use_tri = getattr(config, "USE_TRIANGLE_OFFSET", False)
K = dist_coef = pos = None
if use_tri:
K, dist_coef, pos = _get_triangle_calib()
use_tri = K is not None and dist_coef is not None and pos
def _build_circle_result(cdata):
"""从圆形检测结果构建 analyze_shot 返回值。"""
r_img, center, radius, method, best_radius1, ellipse_params = cdata
dx, dy = None, None
d_m = distance_m_first
if center and radius:
dx, dy = laser_manager.compute_laser_position(center, (x, y), radius, method)
d_m = estimate_distance(best_radius1) if best_radius1 else distance_m_first
return {
"success": True,
"result_img": r_img,
"center": center, "radius": radius, "method": method,
"best_radius1": best_radius1, "ellipse_params": ellipse_params,
"dx": dx, "dy": dy, "distance_m": d_m,
"laser_point": laser_point, "laser_point_method": laser_point_method,
"offset_method": "yellow_ellipse" if ellipse_params else "yellow_circle",
"distance_method": "yellow_radius",
}
if not use_tri:
# 三角形未配置,直接跑圆形检测
return _build_circle_result(
detect_circle_v3(frame, laser_point, img_cv=img_cv)
)
# ── Step 4: 三角形 + 圆形并行检测 ─────────────────────────────────────────────
# 两个线程共享只读的 img_cv互不干扰
tri_result = {}
circle_result = {}
def _run_triangle():
try:
logger.info(f"[TRI] begin {datetime.now()}")
logger.info(f"[TRI] K: {K}, dist: {dist_coef}, pos: {pos}, {datetime.now()}")
tri = try_triangle_scoring(
img_cv, (x, y), pos, K, dist_coef,
size_range=getattr(config, "TRIANGLE_SIZE_RANGE", (8, 500)),
)
logger.info(f"[TRI] tri: {tri}, {datetime.now()}")
tri_result['data'] = tri
except Exception as e:
logger.error(f"[TRI] 三角形路径异常: {e}")
tri_result['data'] = {'ok': False}
def _run_circle():
try:
circle_result['data'] = detect_circle_v3(frame, laser_point, img_cv=img_cv)
except Exception as e:
logger.error(f"[CIRCLE] 圆形检测异常: {e}")
circle_result['data'] = (frame, None, None, None, None, None)
t_tri = threading.Thread(target=_run_triangle, daemon=True)
t_cir = threading.Thread(target=_run_circle, daemon=True)
t_tri.start()
t_cir.start()
# 最多等待三角形 TRIANGLE_TIMEOUT_MS默认 1000ms
tri_timeout_s = float(getattr(config, "TRIANGLE_TIMEOUT_MS", 1000)) / 1000.0
t_tri.join(timeout=tri_timeout_s)
if t_tri.is_alive():
# 超时:直接放弃三角形结果,回退圆心(圆心线程通常已跑完)
logger.warning(f"[TRI] timeout>{tri_timeout_s:.2f}s回退圆心算法")
t_cir.join()
return _build_circle_result(
circle_result.get('data') or (frame, None, None, None, None, None)
)
tri = tri_result.get('data', {})
if tri.get('ok'):
logger.info(f"[TRI] end {datetime.now()}")
return {
"success": True,
"result_img": frame,
"center": None, "radius": None,
"method": tri.get("offset_method") or "triangle_homography",
"best_radius1": None, "ellipse_params": None,
"dx": tri["dx_cm"], "dy": tri["dy_cm"],
"distance_m": tri.get("distance_m") or distance_m_first,
"laser_point": laser_point, "laser_point_method": laser_point_method,
"offset_method": tri.get("offset_method") or "triangle_homography",
"distance_method": tri.get("distance_method") or "pnp_triangle",
"tri_markers": tri.get("markers", []),
"tri_homography": tri.get("homography"),
}
# 三角形失败,等圆形结果(已并行跑完,几乎无额外等待)
t_cir.join()
logger.info(f"[TRI] end(fallback) {datetime.now()}")
return _build_circle_result(
circle_result.get('data') or (frame, None, None, None, None, None)
)
def process_shot(adc_val):
"""
处理射箭事件(逻辑控制部分)
:param adc_val: ADC触发值
:return: None
"""
logger = logger_manager.logger
try:
network_manager.safe_enqueue({"shoot_event": "start"}, msg_type=2, high=True)
frame = camera_manager.read_frame()
# 调用算法分析
analysis_result = analyze_shot(frame)
if not analysis_result.get("success"):
reason = analysis_result.get("reason", "unknown")
if logger:
logger.warning(f"[MAIN] 射箭分析失败: {reason}")
time.sleep_ms(100)
return
# 提取分析结果
result_img = analysis_result["result_img"]
center = analysis_result["center"]
radius = analysis_result["radius"]
method = analysis_result["method"]
ellipse_params = analysis_result["ellipse_params"]
dx = analysis_result["dx"]
dy = analysis_result["dy"]
distance_m = analysis_result["distance_m"]
laser_point = analysis_result["laser_point"]
laser_point_method = analysis_result["laser_point_method"]
offset_method = analysis_result.get("offset_method", "yellow_circle")
distance_method = analysis_result.get("distance_method", "yellow_radius")
tri_markers = analysis_result.get("tri_markers", [])
tri_homography = analysis_result.get("tri_homography")
x, y = laser_point
# 三角形路径成功时 center/radius 为空是正常的;此时用 triangle 方法名用于保存文件名与上报字段 m
if (not method) and tri_markers:
method = offset_method or "triangle_homography"
if config.SHOW_CAMERA_PHOTO_WHILE_SHOOTING:
camera_manager.show(result_img)
if dx is None and dy is None and logger:
logger.warning("[MAIN] 未检测到偏移量(三角形与圆形均失败),但会保存图像")
# 生成射箭ID
from shot_id_generator import shot_id_generator
shot_id = shot_id_generator.generate_id()
if logger:
logger.info(f"[MAIN] 射箭ID: {shot_id}")
laser_distance_m = None
laser_signal_quality = 0
# x,y 单位物理厘米compute_laser_position 与三角形单应性均输出物理 cm
# 未检测到靶心时 x/y 用 200.0(脱靶标志)
srv_x = round(float(dx), 4) if dx is not None else 200.0
srv_y = round(float(dy), 4) if dy is not None else 200.0
# 构造上报数据
inner_data = {
"shot_id": shot_id,
"x": srv_x,
"y": srv_y,
"r": 20.0, # 保留字段(服务端当前忽略,物理外环半径 cm
"d": round((distance_m or 0.0) * 100),
"d_laser": round((laser_distance_m or 0.0) * 100),
"d_laser_quality": laser_signal_quality,
"m": method if method else "no_target",
"adc": adc_val,
"laser_method": laser_point_method,
"target_x": float(x),
"target_y": float(y),
"offset_method": offset_method,
"distance_method": distance_method,
}
if ellipse_params:
(ell_center, (width, height), angle) = ellipse_params
inner_data["ellipse_major_axis"] = float(max(width, height))
inner_data["ellipse_minor_axis"] = float(min(width, height))
inner_data["ellipse_angle"] = float(angle)
inner_data["ellipse_center_x"] = float(ell_center[0])
inner_data["ellipse_center_y"] = float(ell_center[1])
else:
inner_data["ellipse_major_axis"] = None
inner_data["ellipse_minor_axis"] = None
inner_data["ellipse_angle"] = None
inner_data["ellipse_center_x"] = None
inner_data["ellipse_center_y"] = None
report_data = {"cmd": 1, "data": inner_data}
network_manager.safe_enqueue(report_data, msg_type=2, high=True)
# 数据上报后再画标注,不干扰检测阶段的原始画面
if result_img is not None:
# 1. 若有三角形标记,先用 cv2 画轮廓 / 顶点 / ID再反推靶心位置
if tri_markers:
import cv2 as _cv2
import numpy as _np
_img_cv = image.image2cv(result_img, False, False)
# 三角形轮廓 + 直角顶点 + ID
for _m in tri_markers:
_corners = _np.array(_m["corners"], dtype=_np.int32)
_cv2.polylines(_img_cv, [_corners], True, (0, 255, 0), 2)
_cx, _cy = int(_m["center"][0]), int(_m["center"][1])
_cv2.circle(_img_cv, (_cx, _cy), 4, (0, 0, 255), -1)
_cv2.putText(_img_cv, f"T{_m['id']}",
(_cx - 18, _cy - 12),
_cv2.FONT_HERSHEY_SIMPLEX, 0.55, (0, 255, 0), 1)
# 靶心H_inv @ [0,0]):小红圆
_center_px = None
if tri_homography is not None:
try:
_H_inv = _np.linalg.inv(tri_homography)
_c_img = _cv2.perspectiveTransform(
_np.array([[[0.0, 0.0]]], dtype=_np.float32), _H_inv)[0][0]
_ocx, _ocy = int(_c_img[0]), int(_c_img[1])
_cv2.circle(_img_cv, (_ocx, _ocy), 5, (0, 0, 255), -1) # 实心
_cv2.circle(_img_cv, (_ocx, _ocy), 9, (0, 0, 255), 1) # 外框
_center_px = (_ocx, _ocy)
logger.info(f"[算法] 靶心: {_center_px}")
except Exception:
pass
# 叠加信息:落点-圆心距离 / 相机-靶距离等
try:
import math as _math
_lines = []
if dx is not None and dy is not None:
_r_cm = _math.hypot(float(dx), float(dy))
_lines.append(f"offset=({float(dx):.2f},{float(dy):.2f})cm |r|={_r_cm:.2f}cm")
if distance_m is not None:
_lines.append(f"cam_dist={float(distance_m):.2f}m ({distance_method})")
if method:
_lines.append(f"method={method}")
if _lines:
_y0 = 22
for i, _t in enumerate(_lines):
_cv2.putText(
_img_cv,
_t,
(10, _y0 + i * 18),
_cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 255, 0),
1,
)
except Exception:
pass
result_img = image.cv2image(_img_cv, False, False)
# 2. 激光十字线
_lc = image.Color(config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2])
result_img.draw_line(int(x - config.LASER_LENGTH), int(y),
int(x + config.LASER_LENGTH), int(y),
_lc, config.LASER_THICKNESS)
result_img.draw_line(int(x), int(y - config.LASER_LENGTH),
int(x), int(y + config.LASER_LENGTH),
_lc, config.LASER_THICKNESS)
result_img.draw_circle(int(x), int(y), 1, _lc, config.LASER_THICKNESS)
# 闪一下激光(射箭反馈)
if config.FLASH_LASER_WHILE_SHOOTING:
laser_manager.flash_laser(config.FLASH_LASER_DURATION_MS)
# 保存图像(异步队列,与 main.py 一致)
enqueue_save_shot(
result_img,
center,
radius,
method,
ellipse_params,
(x, y),
distance_m,
shot_id=shot_id,
photo_dir=config.PHOTO_DIR if config.SAVE_IMAGE_ENABLED else None,
)
if logger:
if dx is not None and dy is not None:
logger.info(f"射箭事件已加入发送队列(偏移=({dx:.2f},{dy:.2f})cmID: {shot_id}")
else:
logger.info(f"射箭事件已加入发送队列未检测到偏移已保存图像ID: {shot_id}")
time.sleep_ms(100)
except Exception as e:
if logger:
logger.error(f"[MAIN] 图像处理异常: {e}")
import traceback
logger.error(traceback.format_exc())
time.sleep_ms(100)