514 lines
18 KiB
Python
514 lines
18 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
靶纸四角等腰直角三角形:检测、单应性落点、PnP 估距。
|
||
从 test/aruco_deteck.py 抽出,供主流程 shoot_manager 使用。
|
||
"""
|
||
import json
|
||
import os
|
||
from itertools import combinations
|
||
|
||
import cv2
|
||
import numpy as np
|
||
|
||
|
||
def _log(msg):
|
||
try:
|
||
from logger_manager import logger_manager
|
||
if logger_manager.logger:
|
||
logger_manager.logger.info(msg)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def load_camera_from_xml(path):
|
||
"""读取 OpenCV FileStorage XML,返回 (camera_matrix, dist_coeffs) 或 (None, None)。"""
|
||
if not path or not os.path.isfile(path):
|
||
_log(f"[TRI] 标定文件不存在: {path}")
|
||
return None, None
|
||
try:
|
||
fs = cv2.FileStorage(path, cv2.FILE_STORAGE_READ)
|
||
K = fs.getNode("camera_matrix").mat()
|
||
dist = fs.getNode("distortion_coefficients").mat()
|
||
fs.release()
|
||
if K is None or K.size == 0:
|
||
return None, None
|
||
if dist is None or dist.size == 0:
|
||
dist = np.zeros((5, 1), dtype=np.float64)
|
||
return K, dist
|
||
except Exception as e:
|
||
_log(f"[TRI] 读取标定失败: {e}")
|
||
return None, None
|
||
|
||
|
||
def load_triangle_positions(path):
|
||
"""加载 triangle_positions.json,返回 dict[int, [x,y,z]]。"""
|
||
if not path or not os.path.isfile(path):
|
||
_log(f"[TRI] 三角形位置文件不存在: {path}")
|
||
return None
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
raw = json.load(f)
|
||
return {int(k): v for k, v in raw.items()}
|
||
|
||
|
||
def homography_calibration(marker_centers, marker_ids, marker_positions, impact_point_pixel):
|
||
target_points = []
|
||
for mid in marker_ids:
|
||
pos = marker_positions.get(mid)
|
||
if pos is None:
|
||
return False, None, None, None
|
||
target_points.append([pos[0], pos[1]])
|
||
|
||
src_pts = np.array(marker_centers, dtype=np.float32)
|
||
dst_pts = np.array(target_points, dtype=np.float32)
|
||
H, _ = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, ransacReprojThreshold=1.0)
|
||
if H is None:
|
||
return False, None, None, None
|
||
|
||
pt = np.array([[impact_point_pixel]], dtype=np.float32)
|
||
transformed = cv2.perspectiveTransform(pt, H)
|
||
target_x = float(transformed[0][0][0])
|
||
target_y = float(transformed[0][0][1])
|
||
return True, target_x, target_y, H
|
||
|
||
|
||
def complete_fourth_point(detected_ids, detected_centers, marker_positions):
|
||
target_order = [0, 1, 2, 3]
|
||
target_coords = {mid: marker_positions[mid][:2] for mid in target_order}
|
||
all_ids = set(target_coords.keys())
|
||
missing_id = (all_ids - set(detected_ids)).pop()
|
||
|
||
known_src = []
|
||
known_dst = []
|
||
for mid, pt in zip(detected_ids, detected_centers):
|
||
known_src.append(pt)
|
||
known_dst.append(target_coords[mid])
|
||
|
||
M_inv, _ = cv2.estimateAffine2D(
|
||
np.array(known_dst, dtype=np.float32),
|
||
np.array(known_src, dtype=np.float32),
|
||
)
|
||
if M_inv is None:
|
||
return None
|
||
|
||
missing_target = target_coords[missing_id]
|
||
missing_src_h = M_inv @ np.array([missing_target[0], missing_target[1], 1.0])
|
||
missing_src = missing_src_h[:2]
|
||
|
||
complete_centers = []
|
||
for mid in target_order:
|
||
if mid == missing_id:
|
||
complete_centers.append(missing_src)
|
||
else:
|
||
idx = detected_ids.index(mid)
|
||
complete_centers.append(detected_centers[idx])
|
||
|
||
return complete_centers, target_order
|
||
|
||
|
||
def pnp_distance_meters(marker_ids, marker_centers_px, marker_positions, K, dist):
|
||
"""
|
||
靶面原点 (0,0,0) 到相机光心的距离:||tvec||,object 单位为 cm 时 tvec 为 cm,返回米。
|
||
"""
|
||
obj = []
|
||
for mid in marker_ids:
|
||
p = marker_positions[mid]
|
||
obj.append([float(p[0]), float(p[1]), float(p[2])])
|
||
obj_pts = np.array(obj, dtype=np.float64)
|
||
img_pts = np.array(marker_centers_px, dtype=np.float64)
|
||
|
||
ok, rvec, tvec = cv2.solvePnP(
|
||
obj_pts, img_pts, K, dist, flags=cv2.SOLVEPNP_ITERATIVE
|
||
)
|
||
if not ok:
|
||
return None
|
||
tvec = tvec.reshape(-1)
|
||
dist_cm = float(np.linalg.norm(tvec))
|
||
return dist_cm / 100.0
|
||
|
||
|
||
def detect_triangle_markers(
|
||
gray_image,
|
||
orig_gray=None,
|
||
size_range=(8, 500),
|
||
max_interior_gray=90,
|
||
dark_pixel_gray=80,
|
||
min_dark_ratio=0.70,
|
||
verbose=True,
|
||
):
|
||
# 读取可调参数(缺省值与 config.py 保持一致)
|
||
try:
|
||
import config as _cfg
|
||
early_exit = int(getattr(_cfg, "TRIANGLE_EARLY_EXIT_CANDIDATES", 4))
|
||
block_sizes = tuple(getattr(_cfg, "TRIANGLE_ADAPTIVE_BLOCK_SIZES", (11, 21, 35)))
|
||
max_combo_n = int(getattr(_cfg, "TRIANGLE_MAX_FILTERED_FOR_COMBO", 10))
|
||
except Exception:
|
||
early_exit = 4
|
||
block_sizes = (11, 21, 35)
|
||
max_combo_n = 10
|
||
|
||
min_leg, max_leg = size_range
|
||
min_area = 0.5 * (min_leg ** 2) * 0.1
|
||
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
|
||
|
||
def _check_shape(approx):
|
||
pts = approx.reshape(3, 2).astype(np.float32)
|
||
sides = [
|
||
np.linalg.norm(pts[1] - pts[0]),
|
||
np.linalg.norm(pts[2] - pts[1]),
|
||
np.linalg.norm(pts[0] - pts[2]),
|
||
]
|
||
order = sorted(range(3), key=lambda i: sides[i])
|
||
leg1, leg2, hyp = sides[order[0]], sides[order[1]], sides[order[2]]
|
||
avg_leg = (leg1 + leg2) / 2
|
||
|
||
if not (min_leg <= avg_leg <= max_leg):
|
||
return None
|
||
if abs(leg1 - leg2) / (avg_leg + 1e-6) > 0.20:
|
||
return None
|
||
if abs(hyp - avg_leg * np.sqrt(2)) / (avg_leg * np.sqrt(2) + 1e-6) > 0.20:
|
||
return None
|
||
|
||
edge_verts = [(0, 1), (1, 2), (2, 0)]
|
||
hv0, hv1 = edge_verts[order[2]]
|
||
right_v = 3 - hv0 - hv1
|
||
right_pt = pts[right_v]
|
||
|
||
v0 = pts[hv0] - right_pt
|
||
v1_vec = pts[hv1] - right_pt
|
||
cos_a = np.dot(v0, v1_vec) / (
|
||
np.linalg.norm(v0) * np.linalg.norm(v1_vec) + 1e-6
|
||
)
|
||
if abs(cos_a) > 0.20:
|
||
return None
|
||
|
||
return right_pt, avg_leg, pts
|
||
|
||
def _color_ok(approx):
|
||
if orig_gray is None:
|
||
return True
|
||
mask = np.zeros(orig_gray.shape[:2], dtype=np.uint8)
|
||
cv2.fillPoly(mask, [approx], 255)
|
||
erode_k = max(1, int(min(orig_gray.shape[:2]) * 0.002))
|
||
erode_k = min(erode_k, 5)
|
||
k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2 * erode_k + 1, 2 * erode_k + 1))
|
||
mask_in = cv2.erode(mask, k, iterations=1)
|
||
if cv2.countNonZero(mask_in) < 20:
|
||
mask_in = mask
|
||
|
||
mean_val = cv2.mean(orig_gray, mask=mask_in)[0]
|
||
ys, xs = np.where(mask_in > 0)
|
||
if len(xs) == 0:
|
||
return False
|
||
interior = orig_gray[ys, xs]
|
||
dark_ratio = float(np.mean(interior <= dark_pixel_gray))
|
||
return (mean_val <= max_interior_gray) and (dark_ratio >= min_dark_ratio)
|
||
|
||
def _extract_candidates(binary_img):
|
||
contours, _ = cv2.findContours(binary_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||
found = []
|
||
for cnt in contours:
|
||
if cv2.contourArea(cnt) < min_area:
|
||
continue
|
||
peri = cv2.arcLength(cnt, True)
|
||
approx = cv2.approxPolyDP(cnt, 0.05 * peri, True)
|
||
if len(approx) != 3:
|
||
continue
|
||
shape = _check_shape(approx)
|
||
if shape is None:
|
||
continue
|
||
if not _color_ok(approx):
|
||
continue
|
||
right_pt, avg_leg, pts = shape
|
||
center_px = np.mean(pts, axis=0).tolist()
|
||
dedup_key = f"{int(center_px[0] // 10)},{int(center_px[1] // 10)}"
|
||
found.append({
|
||
"center_px": center_px,
|
||
"right_pt": right_pt.tolist(),
|
||
"corners": pts.tolist(),
|
||
"avg_leg": avg_leg,
|
||
"dedup_key": dedup_key,
|
||
})
|
||
return found
|
||
|
||
all_candidates = []
|
||
seen_keys = set()
|
||
# 早退条件:不仅要数量够,还要候选分布足够分散(覆盖多个象限),避免误检集中导致提前退出
|
||
h0, w0 = gray_image.shape[:2]
|
||
cx0, cy0 = w0 / 2.0, h0 / 2.0
|
||
seen_quadrants = set()
|
||
# 4 个候选就够 4 角检测;3 个够 3 点补全,加 1 裕量
|
||
_EARLY_EXIT = max(3, early_exit)
|
||
|
||
def _add_from_binary(b):
|
||
b = cv2.morphologyEx(b, cv2.MORPH_CLOSE, kernel)
|
||
for c in _extract_candidates(b):
|
||
if c["dedup_key"] not in seen_keys:
|
||
seen_keys.add(c["dedup_key"])
|
||
all_candidates.append(c)
|
||
# 象限统计:按图像中心划分
|
||
tx, ty = c["center_px"]
|
||
if tx < cx0 and ty < cy0:
|
||
q = 0
|
||
elif tx < cx0:
|
||
q = 1
|
||
elif ty >= cy0:
|
||
q = 2
|
||
else:
|
||
q = 3
|
||
seen_quadrants.add(q)
|
||
|
||
def _should_early_exit():
|
||
# 至少覆盖 3 个象限 + 数量达到阈值,才认为“足够像四角”可停止更多尝试
|
||
return (len(all_candidates) >= _EARLY_EXIT) and (len(seen_quadrants) >= 3)
|
||
|
||
# 1. 最快:全局 Otsu(无需逐像素邻域计算,~10ms)
|
||
_, b_otsu = cv2.threshold(gray_image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
|
||
_add_from_binary(b_otsu)
|
||
|
||
# 2. 只在 Otsu 不够时才跑自适应阈值(每次 ~100ms,尽早退出)
|
||
for block_size in block_sizes:
|
||
if _should_early_exit():
|
||
break
|
||
if block_size is None:
|
||
continue
|
||
b = cv2.adaptiveThreshold(
|
||
gray_image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, block_size, 4
|
||
)
|
||
_add_from_binary(b)
|
||
|
||
if verbose:
|
||
_log(f"[TRI] 候选三角形共 {len(all_candidates)} 个(预过滤前)")
|
||
|
||
if len(all_candidates) < 2:
|
||
return []
|
||
|
||
all_legs = [c["avg_leg"] for c in all_candidates]
|
||
med_leg = float(np.median(all_legs))
|
||
filtered = []
|
||
for c in all_candidates:
|
||
leg = c["avg_leg"]
|
||
if med_leg > 1e-6 and not (0.40 * med_leg <= leg <= 2.0 * med_leg):
|
||
continue
|
||
filtered.append(c)
|
||
|
||
if len(filtered) < 2:
|
||
return []
|
||
|
||
# 候选过多时,四点组合枚举会变慢:截断到更可能的 max_combo_n 个候选
|
||
if max_combo_n > 0 and len(filtered) > max_combo_n:
|
||
# 以 avg_leg 接近中位数优先(更符合四角同尺度)
|
||
med_leg = float(np.median([c["avg_leg"] for c in filtered]))
|
||
filtered = sorted(filtered, key=lambda c: abs(c["avg_leg"] - med_leg))[:max_combo_n]
|
||
|
||
def _order_quad(pts_4):
|
||
by_y = sorted(range(4), key=lambda i: pts_4[i][1])
|
||
top_pair = sorted(by_y[:2], key=lambda i: pts_4[i][0])
|
||
bot_pair = sorted(by_y[2:], key=lambda i: pts_4[i][0])
|
||
return top_pair[0], bot_pair[0], bot_pair[1], top_pair[1]
|
||
|
||
def _score_quad(cands_4):
|
||
pts = [np.array(c["center_px"]) for c in cands_4]
|
||
legs = [c["avg_leg"] for c in cands_4]
|
||
tl, bl, br, tr = _order_quad(pts)
|
||
|
||
diag1 = np.linalg.norm(pts[tl] - pts[br])
|
||
diag2 = np.linalg.norm(pts[bl] - pts[tr])
|
||
diag_ratio = max(diag1, diag2) / (min(diag1, diag2) + 1e-6)
|
||
|
||
s_top = np.linalg.norm(pts[tl] - pts[tr])
|
||
s_bot = np.linalg.norm(pts[bl] - pts[br])
|
||
s_left = np.linalg.norm(pts[tl] - pts[bl])
|
||
s_right = np.linalg.norm(pts[tr] - pts[br])
|
||
h_ratio = max(s_top, s_bot) / (min(s_top, s_bot) + 1e-6)
|
||
v_ratio = max(s_left, s_right) / (min(s_left, s_right) + 1e-6)
|
||
|
||
med_l = float(np.median(legs))
|
||
leg_dev = max(abs(l - med_l) / (med_l + 1e-6) for l in legs)
|
||
|
||
score = (diag_ratio - 1.0) * 3.0 + (h_ratio - 1.0) + (v_ratio - 1.0) + leg_dev * 2.0
|
||
return score, (tl, bl, br, tr)
|
||
|
||
assigned = None
|
||
if len(filtered) >= 4:
|
||
best_score = float("inf")
|
||
best_combo = None
|
||
best_order = None
|
||
|
||
for combo in combinations(range(len(filtered)), 4):
|
||
cands = [filtered[i] for i in combo]
|
||
score, order = _score_quad(cands)
|
||
if score < best_score:
|
||
best_score = score
|
||
best_combo = combo
|
||
best_order = order
|
||
|
||
if verbose:
|
||
_log(f"[TRI] 最优四边形: score={best_score:.3f}")
|
||
|
||
if best_score < 3.0:
|
||
cands = [filtered[i] for i in best_combo]
|
||
tl, bl, br, tr = best_order
|
||
assigned = {
|
||
0: cands[tl],
|
||
1: cands[bl],
|
||
2: cands[br],
|
||
3: cands[tr],
|
||
}
|
||
|
||
if assigned is None:
|
||
cx = np.mean([c["center_px"][0] for c in filtered])
|
||
cy = np.mean([c["center_px"][1] for c in filtered])
|
||
quadrant_map = {}
|
||
for c in filtered:
|
||
tx, ty = c["center_px"]
|
||
if tx < cx and ty < cy:
|
||
q = 0
|
||
elif tx < cx:
|
||
q = 1
|
||
elif ty >= cy:
|
||
q = 2
|
||
else:
|
||
q = 3
|
||
if q not in quadrant_map or c["avg_leg"] > quadrant_map[q]["avg_leg"]:
|
||
quadrant_map[q] = c
|
||
assigned = quadrant_map
|
||
|
||
result = []
|
||
for tid in sorted(assigned.keys()):
|
||
c = assigned[tid]
|
||
result.append({
|
||
"id": tid,
|
||
"center": c["right_pt"],
|
||
"corners": c["corners"],
|
||
})
|
||
return result
|
||
|
||
|
||
def try_triangle_scoring(
|
||
img_rgb,
|
||
laser_xy,
|
||
marker_positions,
|
||
camera_matrix,
|
||
dist_coeffs,
|
||
size_range=(8, 500),
|
||
):
|
||
"""
|
||
尝试三角形单应性 + PnP 估距。
|
||
img_rgb: RGB,与 laser_xy 同一像素坐标系。
|
||
返回 dict:
|
||
ok, dx_cm, dy_cm, distance_m, offset_method, distance_method
|
||
"""
|
||
out = {
|
||
"ok": False,
|
||
"dx_cm": None,
|
||
"dy_cm": None,
|
||
"distance_m": None,
|
||
"offset_method": None,
|
||
"distance_method": None,
|
||
}
|
||
if marker_positions is None or camera_matrix is None or dist_coeffs is None:
|
||
return out
|
||
|
||
h_orig, w_orig = img_rgb.shape[:2]
|
||
|
||
# 缩图加速:嵌入式 CPU 上图像处理耗时与面积成正比,缩到最长边 320px 可获得 ~4× 加速
|
||
# 检测完后把像素坐标乘以 inv_scale 还原到原始分辨率,再送入单应性/PnP(与 K 标定分辨率一致)
|
||
MAX_DETECT_DIM = 320
|
||
long_side = max(h_orig, w_orig)
|
||
if long_side > MAX_DETECT_DIM:
|
||
det_scale = MAX_DETECT_DIM / long_side
|
||
det_w = int(w_orig * det_scale)
|
||
det_h = int(h_orig * det_scale)
|
||
img_det = cv2.resize(img_rgb, (det_w, det_h), interpolation=cv2.INTER_LINEAR)
|
||
inv_scale = 1.0 / det_scale
|
||
size_range_det = (max(4, int(size_range[0] * det_scale)),
|
||
max(8, int(size_range[1] * det_scale)))
|
||
else:
|
||
img_det = img_rgb
|
||
inv_scale = 1.0
|
||
size_range_det = size_range
|
||
|
||
gray = cv2.cvtColor(img_det, cv2.COLOR_RGB2GRAY)
|
||
|
||
# 快速路径:直接在原始灰度图上跑(内部先走 Otsu,几乎不耗时)
|
||
# 光照均匀时通常在这一步就找到 ≥3 个三角形,完全跳过 CLAHE
|
||
tri_markers = detect_triangle_markers(
|
||
gray, orig_gray=gray, size_range=size_range_det, verbose=True
|
||
)
|
||
|
||
if len(tri_markers) < 3:
|
||
# 慢速兜底:CLAHE 增强对比度后再试(光线不均 / 局部过暗时有效)
|
||
# 默认关闭以优先速度;由 config.TRIANGLE_ENABLE_CLAHE_FALLBACK 控制。
|
||
try:
|
||
import config as _cfg
|
||
enable_clahe = bool(getattr(_cfg, "TRIANGLE_ENABLE_CLAHE_FALLBACK", False))
|
||
except Exception:
|
||
enable_clahe = False
|
||
|
||
if enable_clahe:
|
||
_log(f"[TRI] 快速路径不足{len(tri_markers)}个,启用CLAHE增强")
|
||
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
|
||
gray_clahe = clahe.apply(gray)
|
||
tri_markers = detect_triangle_markers(
|
||
gray_clahe, orig_gray=gray, size_range=size_range_det, verbose=True
|
||
)
|
||
else:
|
||
_log(f"[TRI] 快速路径不足{len(tri_markers)}个,跳过CLAHE兜底(已关闭)")
|
||
|
||
if len(tri_markers) < 3:
|
||
_log(f"[TRI] 三角形不足3个: {len(tri_markers)}")
|
||
return out
|
||
|
||
# 将缩图坐标还原为原始分辨率(K 矩阵在原始分辨率下标定)
|
||
if inv_scale != 1.0:
|
||
for m in tri_markers:
|
||
m["center"] = [m["center"][0] * inv_scale, m["center"][1] * inv_scale]
|
||
m["corners"] = [[c[0] * inv_scale, c[1] * inv_scale] for c in m["corners"]]
|
||
|
||
lx = float(np.clip(laser_xy[0], 0, w_orig - 1))
|
||
ly = float(np.clip(laser_xy[1], 0, h_orig - 1))
|
||
|
||
if len(tri_markers) == 4:
|
||
tri_sorted = sorted(tri_markers, key=lambda m: m["id"])
|
||
marker_ids = [m["id"] for m in tri_sorted]
|
||
marker_centers = [[float(m["center"][0]), float(m["center"][1])] for m in tri_sorted]
|
||
offset_tag = "triangle_homography"
|
||
else:
|
||
marker_ids_list = [m["id"] for m in tri_markers]
|
||
marker_centers_orig = [[float(m["center"][0]), float(m["center"][1])] for m in tri_markers]
|
||
comp = complete_fourth_point(marker_ids_list, marker_centers_orig, marker_positions)
|
||
if comp is None:
|
||
_log("[TRI] 3点补全第4点失败")
|
||
return out
|
||
marker_centers, marker_ids = comp
|
||
marker_centers = [[float(c[0]), float(c[1])] for c in marker_centers]
|
||
offset_tag = "triangle_homography_3pt"
|
||
|
||
ok_h, tx, ty, _H = homography_calibration(
|
||
marker_centers, marker_ids, marker_positions, [lx, ly]
|
||
)
|
||
if not ok_h:
|
||
_log("[TRI] 单应性失败")
|
||
return out
|
||
|
||
# 与 laser_manager.compute_laser_position 现网约定一致:(x_cm, -y_cm_target)
|
||
out["dx_cm"] = tx
|
||
out["dy_cm"] = -ty
|
||
out["ok"] = True
|
||
out["offset_method"] = offset_tag
|
||
out["markers"] = tri_markers # 供上层绘制标注用
|
||
out["homography"] = _H # 供上层反推靶心像素位置用
|
||
|
||
dist_m = pnp_distance_meters(marker_ids, marker_centers, marker_positions, camera_matrix, dist_coeffs)
|
||
if dist_m is not None and 0.3 < dist_m < 50.0:
|
||
out["distance_m"] = dist_m
|
||
out["distance_method"] = "pnp_triangle"
|
||
_log(f"[TRI] PnP 距离={dist_m:.2f}m, 偏移=({out['dx_cm']:.2f},{out['dy_cm']:.2f})cm")
|
||
else:
|
||
out["distance_m"] = None
|
||
out["distance_method"] = None
|
||
_log(f"[TRI] PnP 距离无效,回退黄心估距; 偏移=({out['dx_cm']:.2f},{out['dy_cm']:.2f})cm")
|
||
|
||
return out
|