Files
archery/triangle_target.py

593 lines
22 KiB
Python
Raw Permalink Normal View History

2026-04-17 18:30:50 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
靶纸四角等腰直角三角形检测单应性落点PnP 估距
test/aruco_deteck.py 抽出供主流程 shoot_manager 使用
"""
import json
import os
from itertools import combinations
import cv2
import numpy as np
def _log(msg):
try:
from logger_manager import logger_manager
if logger_manager.logger:
logger_manager.logger.info(msg)
except Exception:
pass
def load_camera_from_xml(path):
"""读取 OpenCV FileStorage XML返回 (camera_matrix, dist_coeffs) 或 (None, None)。"""
if not path or not os.path.isfile(path):
_log(f"[TRI] 标定文件不存在: {path}")
return None, None
try:
fs = cv2.FileStorage(path, cv2.FILE_STORAGE_READ)
K = fs.getNode("camera_matrix").mat()
dist = fs.getNode("distortion_coefficients").mat()
fs.release()
if K is None or K.size == 0:
return None, None
if dist is None or dist.size == 0:
dist = np.zeros((5, 1), dtype=np.float64)
return K, dist
except Exception as e:
_log(f"[TRI] 读取标定失败: {e}")
return None, None
def load_triangle_positions(path):
"""加载 triangle_positions.json返回 dict[int, [x,y,z]]。"""
if not path or not os.path.isfile(path):
_log(f"[TRI] 三角形位置文件不存在: {path}")
return None
with open(path, "r", encoding="utf-8") as f:
raw = json.load(f)
return {int(k): v for k, v in raw.items()}
def homography_calibration(marker_centers, marker_ids, marker_positions, impact_point_pixel):
target_points = []
for mid in marker_ids:
pos = marker_positions.get(mid)
if pos is None:
return False, None, None, None
target_points.append([pos[0], pos[1]])
src_pts = np.array(marker_centers, dtype=np.float32)
dst_pts = np.array(target_points, dtype=np.float32)
H, _ = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, ransacReprojThreshold=1.0)
if H is None:
return False, None, None, None
pt = np.array([[impact_point_pixel]], dtype=np.float32)
transformed = cv2.perspectiveTransform(pt, H)
target_x = float(transformed[0][0][0])
target_y = float(transformed[0][0][1])
return True, target_x, target_y, H
def complete_fourth_point(detected_ids, detected_centers, marker_positions):
target_order = [0, 1, 2, 3]
target_coords = {mid: marker_positions[mid][:2] for mid in target_order}
all_ids = set(target_coords.keys())
missing_id = (all_ids - set(detected_ids)).pop()
known_src = []
known_dst = []
for mid, pt in zip(detected_ids, detected_centers):
known_src.append(pt)
known_dst.append(target_coords[mid])
M_inv, _ = cv2.estimateAffine2D(
np.array(known_dst, dtype=np.float32),
np.array(known_src, dtype=np.float32),
)
if M_inv is None:
return None
missing_target = target_coords[missing_id]
missing_src_h = M_inv @ np.array([missing_target[0], missing_target[1], 1.0])
missing_src = missing_src_h[:2]
complete_centers = []
for mid in target_order:
if mid == missing_id:
complete_centers.append(missing_src)
else:
idx = detected_ids.index(mid)
complete_centers.append(detected_centers[idx])
return complete_centers, target_order
def pnp_distance_meters(marker_ids, marker_centers_px, marker_positions, K, dist):
"""
靶面原点 (0,0,0) 到相机光心的距离||tvec||object 单位为 cm tvec cm返回米
"""
obj = []
for mid in marker_ids:
p = marker_positions[mid]
obj.append([float(p[0]), float(p[1]), float(p[2])])
obj_pts = np.array(obj, dtype=np.float64)
img_pts = np.array(marker_centers_px, dtype=np.float64)
ok, rvec, tvec = cv2.solvePnP(
obj_pts, img_pts, K, dist, flags=cv2.SOLVEPNP_ITERATIVE
)
if not ok:
return None
tvec = tvec.reshape(-1)
dist_cm = float(np.linalg.norm(tvec))
return dist_cm / 100.0
def detect_triangle_markers(
gray_image,
orig_gray=None,
size_range=(8, 500),
2026-04-21 21:14:12 +08:00
max_interior_gray=None,
dark_pixel_gray=None,
min_dark_ratio=None,
2026-04-17 18:30:50 +08:00
verbose=True,
):
# 读取可调参数(缺省值与 config.py 保持一致)
try:
import config as _cfg
early_exit = int(getattr(_cfg, "TRIANGLE_EARLY_EXIT_CANDIDATES", 4))
block_sizes = tuple(getattr(_cfg, "TRIANGLE_ADAPTIVE_BLOCK_SIZES", (11, 21, 35)))
max_combo_n = int(getattr(_cfg, "TRIANGLE_MAX_FILTERED_FOR_COMBO", 10))
2026-04-21 21:14:12 +08:00
if max_interior_gray is None:
max_interior_gray = int(getattr(_cfg, "TRIANGLE_MAX_INTERIOR_GRAY", 130))
if dark_pixel_gray is None:
dark_pixel_gray = int(getattr(_cfg, "TRIANGLE_DARK_PIXEL_GRAY", 130))
if min_dark_ratio is None:
min_dark_ratio = float(getattr(_cfg, "TRIANGLE_MIN_DARK_RATIO", 0.30))
min_contrast_diff = int(getattr(_cfg, "TRIANGLE_MIN_CONTRAST_DIFF", 15))
2026-04-17 18:30:50 +08:00
except Exception:
early_exit = 4
block_sizes = (11, 21, 35)
max_combo_n = 10
2026-04-21 21:14:12 +08:00
if max_interior_gray is None:
max_interior_gray = 130
if dark_pixel_gray is None:
dark_pixel_gray = 130
if min_dark_ratio is None:
min_dark_ratio = 0.30
min_contrast_diff = 15
2026-04-17 18:30:50 +08:00
min_leg, max_leg = size_range
min_area = 0.5 * (min_leg ** 2) * 0.1
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
def _check_shape(approx):
pts = approx.reshape(3, 2).astype(np.float32)
sides = [
np.linalg.norm(pts[1] - pts[0]),
np.linalg.norm(pts[2] - pts[1]),
np.linalg.norm(pts[0] - pts[2]),
]
order = sorted(range(3), key=lambda i: sides[i])
leg1, leg2, hyp = sides[order[0]], sides[order[1]], sides[order[2]]
avg_leg = (leg1 + leg2) / 2
if not (min_leg <= avg_leg <= max_leg):
return None
if abs(leg1 - leg2) / (avg_leg + 1e-6) > 0.20:
return None
if abs(hyp - avg_leg * np.sqrt(2)) / (avg_leg * np.sqrt(2) + 1e-6) > 0.20:
return None
edge_verts = [(0, 1), (1, 2), (2, 0)]
hv0, hv1 = edge_verts[order[2]]
right_v = 3 - hv0 - hv1
right_pt = pts[right_v]
v0 = pts[hv0] - right_pt
v1_vec = pts[hv1] - right_pt
cos_a = np.dot(v0, v1_vec) / (
np.linalg.norm(v0) * np.linalg.norm(v1_vec) + 1e-6
)
if abs(cos_a) > 0.20:
return None
return right_pt, avg_leg, pts
def _color_ok(approx):
if orig_gray is None:
return True
mask = np.zeros(orig_gray.shape[:2], dtype=np.uint8)
cv2.fillPoly(mask, [approx], 255)
erode_k = max(1, int(min(orig_gray.shape[:2]) * 0.002))
erode_k = min(erode_k, 5)
k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2 * erode_k + 1, 2 * erode_k + 1))
mask_in = cv2.erode(mask, k, iterations=1)
if cv2.countNonZero(mask_in) < 20:
mask_in = mask
mean_val = cv2.mean(orig_gray, mask=mask_in)[0]
ys, xs = np.where(mask_in > 0)
if len(xs) == 0:
return False
interior = orig_gray[ys, xs]
dark_ratio = float(np.mean(interior <= dark_pixel_gray))
2026-04-21 21:14:12 +08:00
# 条件1绝对阈值三角形内部足够暗
abs_ok = (mean_val <= max_interior_gray) and (dark_ratio >= min_dark_ratio)
# 条件2相对对比度 — 三角形内部比周围背景明显更暗
contrast_ok = False
if min_contrast_diff > 0:
try:
dilate_k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2 * erode_k + 3, 2 * erode_k + 3))
mask_dilated = cv2.dilate(mask, dilate_k, iterations=2)
mask_border = cv2.subtract(mask_dilated, mask)
border_nz = cv2.countNonZero(mask_border)
if border_nz > 20:
mean_surround = cv2.mean(orig_gray, mask=mask_border)[0]
contrast_ok = (mean_surround - mean_val) >= min_contrast_diff
except Exception:
pass
return abs_ok or contrast_ok
2026-04-17 18:30:50 +08:00
def _extract_candidates(binary_img):
contours, _ = cv2.findContours(binary_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
found = []
2026-04-21 21:14:12 +08:00
# ---- 诊断计数 ----
_n_area_skip = 0
_n_3vert = 0
_n_shape_ok = 0
_n_color_ok = 0
_dbg_fail_shape = [] # 记录前几个失败原因
_dbg_fail_color = [] # 记录前几个颜色失败详情
2026-04-17 18:30:50 +08:00
for cnt in contours:
2026-04-21 21:14:12 +08:00
area = cv2.contourArea(cnt)
if area < min_area:
_n_area_skip += 1
2026-04-17 18:30:50 +08:00
continue
peri = cv2.arcLength(cnt, True)
2026-04-21 21:14:12 +08:00
eps = 0.05 * peri if peri > 60 else 0.03 * peri
approx = cv2.approxPolyDP(cnt, eps, True)
2026-04-17 18:30:50 +08:00
if len(approx) != 3:
continue
2026-04-21 21:14:12 +08:00
_n_3vert += 1
2026-04-17 18:30:50 +08:00
shape = _check_shape(approx)
if shape is None:
2026-04-21 21:14:12 +08:00
if len(_dbg_fail_shape) < 3:
pts3 = approx.reshape(3, 2).astype(np.float32)
sides = sorted([np.linalg.norm(pts3[1]-pts3[0]),
np.linalg.norm(pts3[2]-pts3[1]),
np.linalg.norm(pts3[0]-pts3[2])])
avg_l = (sides[0]+sides[1])/2
reason = f"avg_leg={avg_l:.1f} range=[{min_leg},{max_leg}] legs={sides[0]:.1f},{sides[1]:.1f} hyp={sides[2]:.1f} exp_hyp={avg_l*1.414:.1f}"
_dbg_fail_shape.append(reason)
2026-04-17 18:30:50 +08:00
continue
2026-04-21 21:14:12 +08:00
_n_shape_ok += 1
2026-04-17 18:30:50 +08:00
if not _color_ok(approx):
2026-04-21 21:14:12 +08:00
if len(_dbg_fail_color) < 3 and orig_gray is not None:
mask = np.zeros(orig_gray.shape[:2], dtype=np.uint8)
cv2.fillPoly(mask, [approx], 255)
mean_v = cv2.mean(orig_gray, mask=mask)[0]
ys, xs = np.where(mask > 0)
if len(xs) > 0:
dr = float(np.mean(orig_gray[ys, xs] <= dark_pixel_gray))
else:
dr = 0
_dbg_fail_color.append(f"mean={mean_v:.1f}(<={max_interior_gray}?) dark_r={dr:.2f}(>={min_dark_ratio}?)")
2026-04-17 18:30:50 +08:00
continue
2026-04-21 21:14:12 +08:00
_n_color_ok += 1
2026-04-17 18:30:50 +08:00
right_pt, avg_leg, pts = shape
center_px = np.mean(pts, axis=0).tolist()
dedup_key = f"{int(center_px[0] // 10)},{int(center_px[1] // 10)}"
found.append({
"center_px": center_px,
"right_pt": right_pt.tolist(),
"corners": pts.tolist(),
"avg_leg": avg_leg,
"dedup_key": dedup_key,
})
2026-04-21 21:14:12 +08:00
if verbose:
_log(f"[TRI] _extract: total={len(contours)} area_skip={_n_area_skip} "
f"3vert={_n_3vert} shape_ok={_n_shape_ok} color_ok={_n_color_ok}")
if _dbg_fail_shape:
_log(f"[TRI] shape失败原因(前3): {'; '.join(_dbg_fail_shape)}")
if _dbg_fail_color:
_log(f"[TRI] color失败原因(前3): {'; '.join(_dbg_fail_color)}")
2026-04-17 18:30:50 +08:00
return found
all_candidates = []
seen_keys = set()
# 早退条件:不仅要数量够,还要候选分布足够分散(覆盖多个象限),避免误检集中导致提前退出
h0, w0 = gray_image.shape[:2]
cx0, cy0 = w0 / 2.0, h0 / 2.0
seen_quadrants = set()
# 4 个候选就够 4 角检测3 个够 3 点补全,加 1 裕量
_EARLY_EXIT = max(3, early_exit)
def _add_from_binary(b):
b = cv2.morphologyEx(b, cv2.MORPH_CLOSE, kernel)
for c in _extract_candidates(b):
if c["dedup_key"] not in seen_keys:
seen_keys.add(c["dedup_key"])
all_candidates.append(c)
# 象限统计:按图像中心划分
tx, ty = c["center_px"]
if tx < cx0 and ty < cy0:
q = 0
elif tx < cx0:
q = 1
elif ty >= cy0:
q = 2
else:
q = 3
seen_quadrants.add(q)
def _should_early_exit():
# 至少覆盖 3 个象限 + 数量达到阈值,才认为“足够像四角”可停止更多尝试
return (len(all_candidates) >= _EARLY_EXIT) and (len(seen_quadrants) >= 3)
# 1. 最快:全局 Otsu无需逐像素邻域计算~10ms
_, b_otsu = cv2.threshold(gray_image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
2026-04-21 21:14:12 +08:00
# ---- 临时调试:保存 Otsu 二值图供人工检查 ----
try:
import config as _dbg_cfg
if getattr(_dbg_cfg, 'TRIANGLE_SAVE_DEBUG_IMAGE', False):
_dbg_path = getattr(_dbg_cfg, 'PHOTO_DIR', '/root/phot') + '/tri_otsu_debug.jpg'
cv2.imwrite(_dbg_path, b_otsu)
_log(f"[TRI] DEBUG: Otsu 二值图已保存到 {_dbg_path}")
except Exception:
pass
2026-04-17 18:30:50 +08:00
_add_from_binary(b_otsu)
# 2. 只在 Otsu 不够时才跑自适应阈值(每次 ~100ms尽早退出
for block_size in block_sizes:
if _should_early_exit():
break
if block_size is None:
continue
b = cv2.adaptiveThreshold(
gray_image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, block_size, 4
)
_add_from_binary(b)
if verbose:
_log(f"[TRI] 候选三角形共 {len(all_candidates)} 个(预过滤前)")
if len(all_candidates) < 2:
return []
all_legs = [c["avg_leg"] for c in all_candidates]
med_leg = float(np.median(all_legs))
filtered = []
for c in all_candidates:
leg = c["avg_leg"]
if med_leg > 1e-6 and not (0.40 * med_leg <= leg <= 2.0 * med_leg):
continue
filtered.append(c)
if len(filtered) < 2:
return []
# 候选过多时,四点组合枚举会变慢:截断到更可能的 max_combo_n 个候选
if max_combo_n > 0 and len(filtered) > max_combo_n:
# 以 avg_leg 接近中位数优先(更符合四角同尺度)
med_leg = float(np.median([c["avg_leg"] for c in filtered]))
filtered = sorted(filtered, key=lambda c: abs(c["avg_leg"] - med_leg))[:max_combo_n]
def _order_quad(pts_4):
by_y = sorted(range(4), key=lambda i: pts_4[i][1])
top_pair = sorted(by_y[:2], key=lambda i: pts_4[i][0])
bot_pair = sorted(by_y[2:], key=lambda i: pts_4[i][0])
return top_pair[0], bot_pair[0], bot_pair[1], top_pair[1]
def _score_quad(cands_4):
pts = [np.array(c["center_px"]) for c in cands_4]
legs = [c["avg_leg"] for c in cands_4]
tl, bl, br, tr = _order_quad(pts)
diag1 = np.linalg.norm(pts[tl] - pts[br])
diag2 = np.linalg.norm(pts[bl] - pts[tr])
diag_ratio = max(diag1, diag2) / (min(diag1, diag2) + 1e-6)
s_top = np.linalg.norm(pts[tl] - pts[tr])
s_bot = np.linalg.norm(pts[bl] - pts[br])
s_left = np.linalg.norm(pts[tl] - pts[bl])
s_right = np.linalg.norm(pts[tr] - pts[br])
h_ratio = max(s_top, s_bot) / (min(s_top, s_bot) + 1e-6)
v_ratio = max(s_left, s_right) / (min(s_left, s_right) + 1e-6)
med_l = float(np.median(legs))
leg_dev = max(abs(l - med_l) / (med_l + 1e-6) for l in legs)
score = (diag_ratio - 1.0) * 3.0 + (h_ratio - 1.0) + (v_ratio - 1.0) + leg_dev * 2.0
return score, (tl, bl, br, tr)
assigned = None
if len(filtered) >= 4:
best_score = float("inf")
best_combo = None
best_order = None
for combo in combinations(range(len(filtered)), 4):
cands = [filtered[i] for i in combo]
score, order = _score_quad(cands)
if score < best_score:
best_score = score
best_combo = combo
best_order = order
if verbose:
_log(f"[TRI] 最优四边形: score={best_score:.3f}")
if best_score < 3.0:
cands = [filtered[i] for i in best_combo]
tl, bl, br, tr = best_order
assigned = {
0: cands[tl],
1: cands[bl],
2: cands[br],
3: cands[tr],
}
if assigned is None:
cx = np.mean([c["center_px"][0] for c in filtered])
cy = np.mean([c["center_px"][1] for c in filtered])
quadrant_map = {}
for c in filtered:
tx, ty = c["center_px"]
if tx < cx and ty < cy:
q = 0
elif tx < cx:
q = 1
elif ty >= cy:
q = 2
else:
q = 3
if q not in quadrant_map or c["avg_leg"] > quadrant_map[q]["avg_leg"]:
quadrant_map[q] = c
assigned = quadrant_map
result = []
for tid in sorted(assigned.keys()):
c = assigned[tid]
result.append({
"id": tid,
"center": c["right_pt"],
"corners": c["corners"],
})
return result
def try_triangle_scoring(
img_rgb,
laser_xy,
marker_positions,
camera_matrix,
dist_coeffs,
size_range=(8, 500),
):
"""
尝试三角形单应性 + PnP 估距
img_rgb: RGB laser_xy 同一像素坐标系
返回 dict:
ok, dx_cm, dy_cm, distance_m, offset_method, distance_method
"""
out = {
"ok": False,
"dx_cm": None,
"dy_cm": None,
"distance_m": None,
"offset_method": None,
"distance_method": None,
}
if marker_positions is None or camera_matrix is None or dist_coeffs is None:
return out
h_orig, w_orig = img_rgb.shape[:2]
# 缩图加速:嵌入式 CPU 上图像处理耗时与面积成正比,缩到最长边 320px 可获得 ~4× 加速
# 检测完后把像素坐标乘以 inv_scale 还原到原始分辨率,再送入单应性/PnP与 K 标定分辨率一致)
2026-04-21 21:14:12 +08:00
MAX_DETECT_DIM = 640
2026-04-17 18:30:50 +08:00
long_side = max(h_orig, w_orig)
if long_side > MAX_DETECT_DIM:
det_scale = MAX_DETECT_DIM / long_side
det_w = int(w_orig * det_scale)
det_h = int(h_orig * det_scale)
img_det = cv2.resize(img_rgb, (det_w, det_h), interpolation=cv2.INTER_LINEAR)
inv_scale = 1.0 / det_scale
size_range_det = (max(4, int(size_range[0] * det_scale)),
max(8, int(size_range[1] * det_scale)))
else:
img_det = img_rgb
inv_scale = 1.0
size_range_det = size_range
gray = cv2.cvtColor(img_det, cv2.COLOR_RGB2GRAY)
# 快速路径:直接在原始灰度图上跑(内部先走 Otsu几乎不耗时
# 光照均匀时通常在这一步就找到 ≥3 个三角形,完全跳过 CLAHE
tri_markers = detect_triangle_markers(
gray, orig_gray=gray, size_range=size_range_det, verbose=True
)
if len(tri_markers) < 3:
# 慢速兜底CLAHE 增强对比度后再试(光线不均 / 局部过暗时有效)
# 默认关闭以优先速度;由 config.TRIANGLE_ENABLE_CLAHE_FALLBACK 控制。
try:
import config as _cfg
enable_clahe = bool(getattr(_cfg, "TRIANGLE_ENABLE_CLAHE_FALLBACK", False))
except Exception:
enable_clahe = False
if enable_clahe:
_log(f"[TRI] 快速路径不足{len(tri_markers)}启用CLAHE增强")
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
gray_clahe = clahe.apply(gray)
tri_markers = detect_triangle_markers(
gray_clahe, orig_gray=gray, size_range=size_range_det, verbose=True
)
else:
_log(f"[TRI] 快速路径不足{len(tri_markers)}跳过CLAHE兜底已关闭")
if len(tri_markers) < 3:
_log(f"[TRI] 三角形不足3个: {len(tri_markers)}")
return out
# 将缩图坐标还原为原始分辨率K 矩阵在原始分辨率下标定)
if inv_scale != 1.0:
for m in tri_markers:
m["center"] = [m["center"][0] * inv_scale, m["center"][1] * inv_scale]
m["corners"] = [[c[0] * inv_scale, c[1] * inv_scale] for c in m["corners"]]
lx = float(np.clip(laser_xy[0], 0, w_orig - 1))
ly = float(np.clip(laser_xy[1], 0, h_orig - 1))
if len(tri_markers) == 4:
tri_sorted = sorted(tri_markers, key=lambda m: m["id"])
marker_ids = [m["id"] for m in tri_sorted]
marker_centers = [[float(m["center"][0]), float(m["center"][1])] for m in tri_sorted]
offset_tag = "triangle_homography"
else:
marker_ids_list = [m["id"] for m in tri_markers]
marker_centers_orig = [[float(m["center"][0]), float(m["center"][1])] for m in tri_markers]
comp = complete_fourth_point(marker_ids_list, marker_centers_orig, marker_positions)
if comp is None:
_log("[TRI] 3点补全第4点失败")
return out
marker_centers, marker_ids = comp
marker_centers = [[float(c[0]), float(c[1])] for c in marker_centers]
offset_tag = "triangle_homography_3pt"
ok_h, tx, ty, _H = homography_calibration(
marker_centers, marker_ids, marker_positions, [lx, ly]
)
if not ok_h:
_log("[TRI] 单应性失败")
return out
# 与 laser_manager.compute_laser_position 现网约定一致:(x_cm, -y_cm_target)
out["dx_cm"] = tx
out["dy_cm"] = -ty
out["ok"] = True
out["offset_method"] = offset_tag
out["markers"] = tri_markers # 供上层绘制标注用
out["homography"] = _H # 供上层反推靶心像素位置用
dist_m = pnp_distance_meters(marker_ids, marker_centers, marker_positions, camera_matrix, dist_coeffs)
if dist_m is not None and 0.3 < dist_m < 50.0:
out["distance_m"] = dist_m
out["distance_method"] = "pnp_triangle"
_log(f"[TRI] PnP 距离={dist_m:.2f}m, 偏移=({out['dx_cm']:.2f},{out['dy_cm']:.2f})cm")
else:
out["distance_m"] = None
out["distance_method"] = None
_log(f"[TRI] PnP 距离无效,回退黄心估距; 偏移=({out['dx_cm']:.2f},{out['dy_cm']:.2f})cm")
return out