new shoot algo

This commit is contained in:
gcw_4spBpAfv
2026-04-17 18:30:50 +08:00
parent 0ee970d8bd
commit 43e7e0ba17
11 changed files with 1976 additions and 97 deletions

View File

@@ -1,11 +1,44 @@
import os
import threading
import config
from camera_manager import camera_manager
from laser_manager import laser_manager
from logger_manager import logger_manager
from network import network_manager
from power import get_bus_voltage, voltage_to_percent
from vision import estimate_distance, detect_circle_v3, save_shot_image
from maix import camera, display, image, app, time, uart, pinmap, i2c
from triangle_target import load_camera_from_xml, load_triangle_positions, try_triangle_scoring
from vision import estimate_distance, detect_circle_v3, enqueue_save_shot
from maix import image, time
# 缓存相机标定与三角形位置,避免每次射箭重复读磁盘
_tri_calib_cache = None
def _get_triangle_calib():
"""返回 (K, dist, marker_positions);首次调用时从磁盘加载并缓存。"""
global _tri_calib_cache
if _tri_calib_cache is not None:
return _tri_calib_cache
calib_path = getattr(config, "CAMERA_CALIB_XML", "")
tri_json = getattr(config, "TRIANGLE_POSITIONS_JSON", "")
if not (os.path.isfile(calib_path) and os.path.isfile(tri_json)):
_tri_calib_cache = (None, None, None)
return _tri_calib_cache
K, dist = load_camera_from_xml(calib_path)
pos = load_triangle_positions(tri_json)
_tri_calib_cache = (K, dist, pos)
return _tri_calib_cache
def preload_triangle_calib():
"""
启动阶段预加载三角形标定与坐标文件,避免首次射箭触发时的读盘/解析开销。
"""
try:
_get_triangle_calib()
except Exception:
# 预加载失败不影响主流程;射箭时会再次按需尝试
pass
def analyze_shot(frame, laser_point=None):
"""
@@ -13,18 +46,18 @@ def analyze_shot(frame, laser_point=None):
:param frame: 图像帧
:param laser_point: 激光点坐标 (x, y)
:return: 包含分析结果的字典
优先级:
1. 三角形单应性USE_TRIANGLE_OFFSET=True 时)— 成功则直接返回,跳过圆形检测
2. 圆形检测(三角形不可用或识别失败时兜底)
"""
logger = logger_manager.logger
from datetime import datetime
# 先检测靶心以获取距离(用于计算激光点)
result_img_temp, center_temp, radius_temp, method_temp, best_radius1_temp, ellipse_params_temp = detect_circle_v3(
frame, None)
# 计算距离
distance_m = estimate_distance(best_radius1_temp) if best_radius1_temp else None
# 根据距离动态计算激光点坐标
# ── Step 1: 确定激光点 ────────────────────────────────────────────────────
laser_point_method = None
distance_m_first = None
if config.HARDCODE_LASER_POINT:
laser_point = laser_manager.laser_point
laser_point_method = "hardcode"
@@ -33,65 +66,128 @@ def analyze_shot(frame, laser_point=None):
laser_point_method = "calibrated"
if logger:
logger.info(f"[算法] 使用校准值: {laser_manager.laser_point}")
elif distance_m and distance_m > 0:
laser_point = laser_manager.calculate_laser_point_from_distance(distance_m)
laser_point_method = "dynamic"
if logger:
logger.info(f"[算法] 使用比例尺: {laser_point}")
else:
laser_point = laser_manager.laser_point
laser_point_method = "default"
if logger:
logger.info(f"[算法] 使用默认值: {laser_point}")
# 动态模式:先做一次无激光点检测以估算距离,再推算激光点
_, _, _, _, best_radius1_temp, _ = detect_circle_v3(frame, None)
distance_m_first = estimate_distance(best_radius1_temp) if best_radius1_temp else None
if distance_m_first and distance_m_first > 0:
laser_point = laser_manager.calculate_laser_point_from_distance(distance_m_first)
laser_point_method = "dynamic"
if logger:
logger.info(f"[算法] 使用比例尺: {laser_point}")
else:
laser_point = laser_manager.laser_point
laser_point_method = "default"
if logger:
logger.info(f"[算法] 使用默认值: {laser_point}")
if laser_point is None:
return {
"success": False,
"reason": "laser_point_not_initialized"
}
return {"success": False, "reason": "laser_point_not_initialized"}
x, y = laser_point
# 绘制激光十字线
color = image.Color(config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2])
frame.draw_line(
int(x - config.LASER_LENGTH), int(y),
int(x + config.LASER_LENGTH), int(y),
color, config.LASER_THICKNESS
)
frame.draw_line(
int(x), int(y - config.LASER_LENGTH),
int(x), int(y + config.LASER_LENGTH),
color, config.LASER_THICKNESS
)
frame.draw_circle(int(x), int(y), 1, color, config.LASER_THICKNESS)
# ── Step 2: 提前转换一次图像,两个检测线程共享(只读)────────────────────────
img_cv = image.image2cv(frame, False, False)
# 重新检测靶心(使用计算出的激光点)
result_img, center, radius, method, best_radius1, ellipse_params = detect_circle_v3(frame, laser_point)
# ── Step 3: 检查三角形是否可用 ────────────────────────────────────────────────
use_tri = getattr(config, "USE_TRIANGLE_OFFSET", False)
K = dist_coef = pos = None
if use_tri:
K, dist_coef, pos = _get_triangle_calib()
use_tri = K is not None and dist_coef is not None and pos
# 计算偏移与距离
if center and radius:
dx, dy = laser_manager.compute_laser_position(center, (x, y), radius, method)
distance_m = estimate_distance(best_radius1)
else:
def _build_circle_result(cdata):
"""从圆形检测结果构建 analyze_shot 返回值。"""
r_img, center, radius, method, best_radius1, ellipse_params = cdata
dx, dy = None, None
distance_m = None
d_m = distance_m_first
if center and radius:
dx, dy = laser_manager.compute_laser_position(center, (x, y), radius, method)
d_m = estimate_distance(best_radius1) if best_radius1 else distance_m_first
return {
"success": True,
"result_img": r_img,
"center": center, "radius": radius, "method": method,
"best_radius1": best_radius1, "ellipse_params": ellipse_params,
"dx": dx, "dy": dy, "distance_m": d_m,
"laser_point": laser_point, "laser_point_method": laser_point_method,
"offset_method": "yellow_ellipse" if ellipse_params else "yellow_circle",
"distance_method": "yellow_radius",
}
# 返回分析结果
return {
"success": True,
"result_img": result_img,
"center": center,
"radius": radius,
"method": method,
"best_radius1": best_radius1,
"ellipse_params": ellipse_params,
"dx": dx,
"dy": dy,
"distance_m": distance_m,
"laser_point": laser_point,
"laser_point_method": laser_point_method
}
if not use_tri:
# 三角形未配置,直接跑圆形检测
return _build_circle_result(
detect_circle_v3(frame, laser_point, img_cv=img_cv)
)
# ── Step 4: 三角形 + 圆形并行检测 ─────────────────────────────────────────────
# 两个线程共享只读的 img_cv互不干扰
tri_result = {}
circle_result = {}
def _run_triangle():
try:
logger.info(f"[TRI] begin {datetime.now()}")
logger.info(f"[TRI] K: {K}, dist: {dist_coef}, pos: {pos}, {datetime.now()}")
tri = try_triangle_scoring(
img_cv, (x, y), pos, K, dist_coef,
size_range=getattr(config, "TRIANGLE_SIZE_RANGE", (8, 500)),
)
logger.info(f"[TRI] tri: {tri}, {datetime.now()}")
tri_result['data'] = tri
except Exception as e:
logger.error(f"[TRI] 三角形路径异常: {e}")
tri_result['data'] = {'ok': False}
def _run_circle():
try:
circle_result['data'] = detect_circle_v3(frame, laser_point, img_cv=img_cv)
except Exception as e:
logger.error(f"[CIRCLE] 圆形检测异常: {e}")
circle_result['data'] = (frame, None, None, None, None, None)
t_tri = threading.Thread(target=_run_triangle, daemon=True)
t_cir = threading.Thread(target=_run_circle, daemon=True)
t_tri.start()
t_cir.start()
# 最多等待三角形 TRIANGLE_TIMEOUT_MS默认 1000ms
tri_timeout_s = float(getattr(config, "TRIANGLE_TIMEOUT_MS", 1000)) / 1000.0
t_tri.join(timeout=tri_timeout_s)
if t_tri.is_alive():
# 超时:直接放弃三角形结果,回退圆心(圆心线程通常已跑完)
logger.warning(f"[TRI] timeout>{tri_timeout_s:.2f}s回退圆心算法")
t_cir.join()
return _build_circle_result(
circle_result.get('data') or (frame, None, None, None, None, None)
)
tri = tri_result.get('data', {})
if tri.get('ok'):
logger.info(f"[TRI] end {datetime.now()}")
return {
"success": True,
"result_img": frame,
"center": None, "radius": None,
"method": tri.get("offset_method") or "triangle_homography",
"best_radius1": None, "ellipse_params": None,
"dx": tri["dx_cm"], "dy": tri["dy_cm"],
"distance_m": tri.get("distance_m") or distance_m_first,
"laser_point": laser_point, "laser_point_method": laser_point_method,
"offset_method": tri.get("offset_method") or "triangle_homography",
"distance_method": tri.get("distance_method") or "pnp_triangle",
"tri_markers": tri.get("markers", []),
"tri_homography": tri.get("homography"),
}
# 三角形失败,等圆形结果(已并行跑完,几乎无额外等待)
t_cir.join()
logger.info(f"[TRI] end(fallback) {datetime.now()}")
return _build_circle_result(
circle_result.get('data') or (frame, None, None, None, None, None)
)
def process_shot(adc_val):
@@ -103,6 +199,7 @@ def process_shot(adc_val):
logger = logger_manager.logger
try:
network_manager.safe_enqueue({"shoot_event": "start"}, msg_type=2, high=True)
frame = camera_manager.read_frame()
# 调用算法分析
@@ -126,16 +223,21 @@ def process_shot(adc_val):
distance_m = analysis_result["distance_m"]
laser_point = analysis_result["laser_point"]
laser_point_method = analysis_result["laser_point_method"]
offset_method = analysis_result.get("offset_method", "yellow_circle")
distance_method = analysis_result.get("distance_method", "yellow_radius")
tri_markers = analysis_result.get("tri_markers", [])
tri_homography = analysis_result.get("tri_homography")
x, y = laser_point
camera_manager.show(result_img)
# 三角形路径成功时 center/radius 为空是正常的;此时用 triangle 方法名用于保存文件名与上报字段 m
if (not method) and tri_markers:
method = offset_method or "triangle_homography"
if not (center and radius) and logger:
logger.warning("[MAIN] 未检测到靶心,但会保存图像")
if config.SHOW_CAMERA_PHOTO_WHILE_SHOOTING:
camera_manager.show(result_img)
# 读取电量
voltage = get_bus_voltage()
battery_percent = voltage_to_percent(voltage)
if dx is None and dy is None and logger:
logger.warning("[MAIN] 未检测到偏移量(三角形与圆形均失败),但会保存图像")
# 生成射箭ID
from shot_id_generator import shot_id_generator
@@ -144,33 +246,30 @@ def process_shot(adc_val):
if logger:
logger.info(f"[MAIN] 射箭ID: {shot_id}")
# 保存图像
save_shot_image(
result_img,
center,
radius,
method,
ellipse_params,
(x, y),
distance_m,
shot_id=shot_id,
photo_dir=config.PHOTO_DIR if config.SAVE_IMAGE_ENABLED else None
)
laser_distance_m = None
laser_signal_quality = 0
# x,y 单位物理厘米compute_laser_position 与三角形单应性均输出物理 cm
# 未检测到靶心时 x/y 用 200.0(脱靶标志)
srv_x = round(float(dx), 4) if dx is not None else 200.0
srv_y = round(float(dy), 4) if dy is not None else 200.0
# 构造上报数据
inner_data = {
"shot_id": shot_id,
"x": float(dx) if dx is not None else 200.0,
"y": float(dy) if dy is not None else 200.0,
"r": 90.0,
"x": srv_x,
"y": srv_y,
"r": 20.0, # 保留字段(服务端当前忽略,物理外环半径 cm
"d": round((distance_m or 0.0) * 100),
"d_laser": 0.0,
"d_laser_quality": 0,
"d_laser": round((laser_distance_m or 0.0) * 100),
"d_laser_quality": laser_signal_quality,
"m": method if method else "no_target",
"adc": adc_val,
"laser_method": laser_point_method,
"target_x": float(x),
"target_y": float(y),
"offset_method": offset_method,
"distance_method": distance_method,
}
if ellipse_params:
@@ -190,14 +289,99 @@ def process_shot(adc_val):
report_data = {"cmd": 1, "data": inner_data}
network_manager.safe_enqueue(report_data, msg_type=2, high=True)
if logger:
if center and radius:
logger.info(f"射箭事件已加入发送队列已检测到靶心ID: {shot_id}")
else:
logger.info(f"射箭事件已加入发送队列未检测到靶心已保存图像ID: {shot_id}")
# 数据上报后再画标注,不干扰检测阶段的原始画面
if result_img is not None:
# 1. 若有三角形标记,先用 cv2 画轮廓 / 顶点 / ID再反推靶心位置
if tri_markers:
import cv2 as _cv2
import numpy as _np
_img_cv = image.image2cv(result_img, False, False)
# 三角形轮廓 + 直角顶点 + ID
for _m in tri_markers:
_corners = _np.array(_m["corners"], dtype=_np.int32)
_cv2.polylines(_img_cv, [_corners], True, (0, 255, 0), 2)
_cx, _cy = int(_m["center"][0]), int(_m["center"][1])
_cv2.circle(_img_cv, (_cx, _cy), 4, (0, 0, 255), -1)
_cv2.putText(_img_cv, f"T{_m['id']}",
(_cx - 18, _cy - 12),
_cv2.FONT_HERSHEY_SIMPLEX, 0.55, (0, 255, 0), 1)
# 靶心H_inv @ [0,0]):小红圆
_center_px = None
if tri_homography is not None:
try:
_H_inv = _np.linalg.inv(tri_homography)
_c_img = _cv2.perspectiveTransform(
_np.array([[[0.0, 0.0]]], dtype=_np.float32), _H_inv)[0][0]
_ocx, _ocy = int(_c_img[0]), int(_c_img[1])
_cv2.circle(_img_cv, (_ocx, _ocy), 5, (0, 0, 255), -1) # 实心
_cv2.circle(_img_cv, (_ocx, _ocy), 9, (0, 0, 255), 1) # 外框
_center_px = (_ocx, _ocy)
logger.info(f"[算法] 靶心: {_center_px}")
except Exception:
pass
# 叠加信息:落点-圆心距离 / 相机-靶距离等
try:
import math as _math
_lines = []
if dx is not None and dy is not None:
_r_cm = _math.hypot(float(dx), float(dy))
_lines.append(f"offset=({float(dx):.2f},{float(dy):.2f})cm |r|={_r_cm:.2f}cm")
if distance_m is not None:
_lines.append(f"cam_dist={float(distance_m):.2f}m ({distance_method})")
if method:
_lines.append(f"method={method}")
if _lines:
_y0 = 22
for i, _t in enumerate(_lines):
_cv2.putText(
_img_cv,
_t,
(10, _y0 + i * 18),
_cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 255, 0),
1,
)
except Exception:
pass
result_img = image.cv2image(_img_cv, False, False)
# 2. 激光十字线
_lc = image.Color(config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2])
result_img.draw_line(int(x - config.LASER_LENGTH), int(y),
int(x + config.LASER_LENGTH), int(y),
_lc, config.LASER_THICKNESS)
result_img.draw_line(int(x), int(y - config.LASER_LENGTH),
int(x), int(y + config.LASER_LENGTH),
_lc, config.LASER_THICKNESS)
result_img.draw_circle(int(x), int(y), 1, _lc, config.LASER_THICKNESS)
# 闪一下激光(射箭反馈)
laser_manager.flash_laser(1000)
if config.FLASH_LASER_WHILE_SHOOTING:
laser_manager.flash_laser(config.FLASH_LASER_DURATION_MS)
# 保存图像(异步队列,与 main.py 一致)
enqueue_save_shot(
result_img,
center,
radius,
method,
ellipse_params,
(x, y),
distance_m,
shot_id=shot_id,
photo_dir=config.PHOTO_DIR if config.SAVE_IMAGE_ENABLED else None,
)
if logger:
if dx is not None and dy is not None:
logger.info(f"射箭事件已加入发送队列(偏移=({dx:.2f},{dy:.2f})cmID: {shot_id}")
else:
logger.info(f"射箭事件已加入发送队列未检测到偏移已保存图像ID: {shot_id}")
time.sleep_ms(100)
except Exception as e: