#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 靶纸四角等腰直角三角形:检测、单应性落点、PnP 估距。 从 test/aruco_deteck.py 抽出,供主流程 shoot_manager 使用。 """ import json import os from itertools import combinations import cv2 import numpy as np def _log(msg): try: from logger_manager import logger_manager if logger_manager.logger: logger_manager.logger.info(msg) except Exception: pass def load_camera_from_xml(path): """读取 OpenCV FileStorage XML,返回 (camera_matrix, dist_coeffs) 或 (None, None)。""" if not path or not os.path.isfile(path): _log(f"[TRI] 标定文件不存在: {path}") return None, None try: fs = cv2.FileStorage(path, cv2.FILE_STORAGE_READ) K = fs.getNode("camera_matrix").mat() dist = fs.getNode("distortion_coefficients").mat() fs.release() if K is None or K.size == 0: return None, None if dist is None or dist.size == 0: dist = np.zeros((5, 1), dtype=np.float64) return K, dist except Exception as e: _log(f"[TRI] 读取标定失败: {e}") return None, None def load_triangle_positions(path): """加载 triangle_positions.json,返回 dict[int, [x,y,z]]。""" if not path or not os.path.isfile(path): _log(f"[TRI] 三角形位置文件不存在: {path}") return None with open(path, "r", encoding="utf-8") as f: raw = json.load(f) return {int(k): v for k, v in raw.items()} def homography_calibration(marker_centers, marker_ids, marker_positions, impact_point_pixel): target_points = [] for mid in marker_ids: pos = marker_positions.get(mid) if pos is None: return False, None, None, None target_points.append([pos[0], pos[1]]) src_pts = np.array(marker_centers, dtype=np.float32) dst_pts = np.array(target_points, dtype=np.float32) H, _ = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, ransacReprojThreshold=1.0) if H is None: return False, None, None, None pt = np.array([[impact_point_pixel]], dtype=np.float32) transformed = cv2.perspectiveTransform(pt, H) target_x = float(transformed[0][0][0]) target_y = float(transformed[0][0][1]) return True, target_x, target_y, H def complete_fourth_point(detected_ids, detected_centers, marker_positions): target_order = [0, 1, 2, 3] target_coords = {mid: marker_positions[mid][:2] for mid in target_order} all_ids = set(target_coords.keys()) missing_id = (all_ids - set(detected_ids)).pop() known_src = [] known_dst = [] for mid, pt in zip(detected_ids, detected_centers): known_src.append(pt) known_dst.append(target_coords[mid]) M_inv, _ = cv2.estimateAffine2D( np.array(known_dst, dtype=np.float32), np.array(known_src, dtype=np.float32), ) if M_inv is None: return None missing_target = target_coords[missing_id] missing_src_h = M_inv @ np.array([missing_target[0], missing_target[1], 1.0]) missing_src = missing_src_h[:2] complete_centers = [] for mid in target_order: if mid == missing_id: complete_centers.append(missing_src) else: idx = detected_ids.index(mid) complete_centers.append(detected_centers[idx]) return complete_centers, target_order def pnp_distance_meters(marker_ids, marker_centers_px, marker_positions, K, dist): """ 靶面原点 (0,0,0) 到相机光心的距离:||tvec||,object 单位为 cm 时 tvec 为 cm,返回米。 """ obj = [] for mid in marker_ids: p = marker_positions[mid] obj.append([float(p[0]), float(p[1]), float(p[2])]) obj_pts = np.array(obj, dtype=np.float64) img_pts = np.array(marker_centers_px, dtype=np.float64) ok, rvec, tvec = cv2.solvePnP( obj_pts, img_pts, K, dist, flags=cv2.SOLVEPNP_ITERATIVE ) if not ok: return None tvec = tvec.reshape(-1) dist_cm = float(np.linalg.norm(tvec)) return dist_cm / 100.0 def detect_triangle_markers( gray_image, orig_gray=None, size_range=(8, 500), max_interior_gray=None, dark_pixel_gray=None, min_dark_ratio=None, verbose=True, ): # 读取可调参数(缺省值与 config.py 保持一致) try: import config as _cfg early_exit = int(getattr(_cfg, "TRIANGLE_EARLY_EXIT_CANDIDATES", 4)) block_sizes = tuple(getattr(_cfg, "TRIANGLE_ADAPTIVE_BLOCK_SIZES", (11, 21, 35))) max_combo_n = int(getattr(_cfg, "TRIANGLE_MAX_FILTERED_FOR_COMBO", 10)) if max_interior_gray is None: max_interior_gray = int(getattr(_cfg, "TRIANGLE_MAX_INTERIOR_GRAY", 130)) if dark_pixel_gray is None: dark_pixel_gray = int(getattr(_cfg, "TRIANGLE_DARK_PIXEL_GRAY", 130)) if min_dark_ratio is None: min_dark_ratio = float(getattr(_cfg, "TRIANGLE_MIN_DARK_RATIO", 0.30)) min_contrast_diff = int(getattr(_cfg, "TRIANGLE_MIN_CONTRAST_DIFF", 15)) except Exception: early_exit = 4 block_sizes = (11, 21, 35) max_combo_n = 10 if max_interior_gray is None: max_interior_gray = 130 if dark_pixel_gray is None: dark_pixel_gray = 130 if min_dark_ratio is None: min_dark_ratio = 0.30 min_contrast_diff = 15 min_leg, max_leg = size_range min_area = 0.5 * (min_leg ** 2) * 0.1 kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3)) def _check_shape(approx): pts = approx.reshape(3, 2).astype(np.float32) sides = [ np.linalg.norm(pts[1] - pts[0]), np.linalg.norm(pts[2] - pts[1]), np.linalg.norm(pts[0] - pts[2]), ] order = sorted(range(3), key=lambda i: sides[i]) leg1, leg2, hyp = sides[order[0]], sides[order[1]], sides[order[2]] avg_leg = (leg1 + leg2) / 2 if not (min_leg <= avg_leg <= max_leg): return None if abs(leg1 - leg2) / (avg_leg + 1e-6) > 0.20: return None if abs(hyp - avg_leg * np.sqrt(2)) / (avg_leg * np.sqrt(2) + 1e-6) > 0.20: return None edge_verts = [(0, 1), (1, 2), (2, 0)] hv0, hv1 = edge_verts[order[2]] right_v = 3 - hv0 - hv1 right_pt = pts[right_v] v0 = pts[hv0] - right_pt v1_vec = pts[hv1] - right_pt cos_a = np.dot(v0, v1_vec) / ( np.linalg.norm(v0) * np.linalg.norm(v1_vec) + 1e-6 ) if abs(cos_a) > 0.20: return None return right_pt, avg_leg, pts def _color_ok(approx): if orig_gray is None: return True mask = np.zeros(orig_gray.shape[:2], dtype=np.uint8) cv2.fillPoly(mask, [approx], 255) erode_k = max(1, int(min(orig_gray.shape[:2]) * 0.002)) erode_k = min(erode_k, 5) k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2 * erode_k + 1, 2 * erode_k + 1)) mask_in = cv2.erode(mask, k, iterations=1) if cv2.countNonZero(mask_in) < 20: mask_in = mask mean_val = cv2.mean(orig_gray, mask=mask_in)[0] ys, xs = np.where(mask_in > 0) if len(xs) == 0: return False interior = orig_gray[ys, xs] dark_ratio = float(np.mean(interior <= dark_pixel_gray)) # 条件1:绝对阈值(三角形内部足够暗) abs_ok = (mean_val <= max_interior_gray) and (dark_ratio >= min_dark_ratio) # 条件2:相对对比度 — 三角形内部比周围背景明显更暗 contrast_ok = False if min_contrast_diff > 0: try: dilate_k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2 * erode_k + 3, 2 * erode_k + 3)) mask_dilated = cv2.dilate(mask, dilate_k, iterations=2) mask_border = cv2.subtract(mask_dilated, mask) border_nz = cv2.countNonZero(mask_border) if border_nz > 20: mean_surround = cv2.mean(orig_gray, mask=mask_border)[0] contrast_ok = (mean_surround - mean_val) >= min_contrast_diff except Exception: pass return abs_ok or contrast_ok def _extract_candidates(binary_img): contours, _ = cv2.findContours(binary_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) found = [] # ---- 诊断计数 ---- _n_area_skip = 0 _n_3vert = 0 _n_shape_ok = 0 _n_color_ok = 0 _dbg_fail_shape = [] # 记录前几个失败原因 _dbg_fail_color = [] # 记录前几个颜色失败详情 for cnt in contours: area = cv2.contourArea(cnt) if area < min_area: _n_area_skip += 1 continue peri = cv2.arcLength(cnt, True) eps = 0.05 * peri if peri > 60 else 0.03 * peri approx = cv2.approxPolyDP(cnt, eps, True) if len(approx) != 3: continue _n_3vert += 1 shape = _check_shape(approx) if shape is None: if len(_dbg_fail_shape) < 3: pts3 = approx.reshape(3, 2).astype(np.float32) sides = sorted([np.linalg.norm(pts3[1]-pts3[0]), np.linalg.norm(pts3[2]-pts3[1]), np.linalg.norm(pts3[0]-pts3[2])]) avg_l = (sides[0]+sides[1])/2 reason = f"avg_leg={avg_l:.1f} range=[{min_leg},{max_leg}] legs={sides[0]:.1f},{sides[1]:.1f} hyp={sides[2]:.1f} exp_hyp={avg_l*1.414:.1f}" _dbg_fail_shape.append(reason) continue _n_shape_ok += 1 if not _color_ok(approx): if len(_dbg_fail_color) < 3 and orig_gray is not None: mask = np.zeros(orig_gray.shape[:2], dtype=np.uint8) cv2.fillPoly(mask, [approx], 255) mean_v = cv2.mean(orig_gray, mask=mask)[0] ys, xs = np.where(mask > 0) if len(xs) > 0: dr = float(np.mean(orig_gray[ys, xs] <= dark_pixel_gray)) else: dr = 0 _dbg_fail_color.append(f"mean={mean_v:.1f}(<={max_interior_gray}?) dark_r={dr:.2f}(>={min_dark_ratio}?)") continue _n_color_ok += 1 right_pt, avg_leg, pts = shape center_px = np.mean(pts, axis=0).tolist() dedup_key = f"{int(center_px[0] // 10)},{int(center_px[1] // 10)}" found.append({ "center_px": center_px, "right_pt": right_pt.tolist(), "corners": pts.tolist(), "avg_leg": avg_leg, "dedup_key": dedup_key, }) if verbose: _log(f"[TRI] _extract: total={len(contours)} area_skip={_n_area_skip} " f"3vert={_n_3vert} shape_ok={_n_shape_ok} color_ok={_n_color_ok}") if _dbg_fail_shape: _log(f"[TRI] shape失败原因(前3): {'; '.join(_dbg_fail_shape)}") if _dbg_fail_color: _log(f"[TRI] color失败原因(前3): {'; '.join(_dbg_fail_color)}") return found all_candidates = [] seen_keys = set() # 早退条件:不仅要数量够,还要候选分布足够分散(覆盖多个象限),避免误检集中导致提前退出 h0, w0 = gray_image.shape[:2] cx0, cy0 = w0 / 2.0, h0 / 2.0 seen_quadrants = set() # 4 个候选就够 4 角检测;3 个够 3 点补全,加 1 裕量 _EARLY_EXIT = max(3, early_exit) def _add_from_binary(b): b = cv2.morphologyEx(b, cv2.MORPH_CLOSE, kernel) for c in _extract_candidates(b): if c["dedup_key"] not in seen_keys: seen_keys.add(c["dedup_key"]) all_candidates.append(c) # 象限统计:按图像中心划分 tx, ty = c["center_px"] if tx < cx0 and ty < cy0: q = 0 elif tx < cx0: q = 1 elif ty >= cy0: q = 2 else: q = 3 seen_quadrants.add(q) def _should_early_exit(): # 至少覆盖 3 个象限 + 数量达到阈值,才认为“足够像四角”可停止更多尝试 return (len(all_candidates) >= _EARLY_EXIT) and (len(seen_quadrants) >= 3) # 1. 最快:全局 Otsu(无需逐像素邻域计算,~10ms) _, b_otsu = cv2.threshold(gray_image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) # ---- 临时调试:保存 Otsu 二值图供人工检查 ---- try: import config as _dbg_cfg if getattr(_dbg_cfg, 'TRIANGLE_SAVE_DEBUG_IMAGE', False): _dbg_path = getattr(_dbg_cfg, 'PHOTO_DIR', '/root/phot') + '/tri_otsu_debug.jpg' cv2.imwrite(_dbg_path, b_otsu) _log(f"[TRI] DEBUG: Otsu 二值图已保存到 {_dbg_path}") except Exception: pass _add_from_binary(b_otsu) # 2. 只在 Otsu 不够时才跑自适应阈值(每次 ~100ms,尽早退出) for block_size in block_sizes: if _should_early_exit(): break if block_size is None: continue b = cv2.adaptiveThreshold( gray_image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, block_size, 4 ) _add_from_binary(b) if verbose: _log(f"[TRI] 候选三角形共 {len(all_candidates)} 个(预过滤前)") if len(all_candidates) < 2: return [] all_legs = [c["avg_leg"] for c in all_candidates] med_leg = float(np.median(all_legs)) filtered = [] for c in all_candidates: leg = c["avg_leg"] if med_leg > 1e-6 and not (0.40 * med_leg <= leg <= 2.0 * med_leg): continue filtered.append(c) if len(filtered) < 2: return [] # 候选过多时,四点组合枚举会变慢:截断到更可能的 max_combo_n 个候选 if max_combo_n > 0 and len(filtered) > max_combo_n: # 以 avg_leg 接近中位数优先(更符合四角同尺度) med_leg = float(np.median([c["avg_leg"] for c in filtered])) filtered = sorted(filtered, key=lambda c: abs(c["avg_leg"] - med_leg))[:max_combo_n] def _order_quad(pts_4): by_y = sorted(range(4), key=lambda i: pts_4[i][1]) top_pair = sorted(by_y[:2], key=lambda i: pts_4[i][0]) bot_pair = sorted(by_y[2:], key=lambda i: pts_4[i][0]) return top_pair[0], bot_pair[0], bot_pair[1], top_pair[1] def _score_quad(cands_4): pts = [np.array(c["center_px"]) for c in cands_4] legs = [c["avg_leg"] for c in cands_4] tl, bl, br, tr = _order_quad(pts) diag1 = np.linalg.norm(pts[tl] - pts[br]) diag2 = np.linalg.norm(pts[bl] - pts[tr]) diag_ratio = max(diag1, diag2) / (min(diag1, diag2) + 1e-6) s_top = np.linalg.norm(pts[tl] - pts[tr]) s_bot = np.linalg.norm(pts[bl] - pts[br]) s_left = np.linalg.norm(pts[tl] - pts[bl]) s_right = np.linalg.norm(pts[tr] - pts[br]) h_ratio = max(s_top, s_bot) / (min(s_top, s_bot) + 1e-6) v_ratio = max(s_left, s_right) / (min(s_left, s_right) + 1e-6) med_l = float(np.median(legs)) leg_dev = max(abs(l - med_l) / (med_l + 1e-6) for l in legs) score = (diag_ratio - 1.0) * 3.0 + (h_ratio - 1.0) + (v_ratio - 1.0) + leg_dev * 2.0 return score, (tl, bl, br, tr) assigned = None if len(filtered) >= 4: best_score = float("inf") best_combo = None best_order = None for combo in combinations(range(len(filtered)), 4): cands = [filtered[i] for i in combo] score, order = _score_quad(cands) if score < best_score: best_score = score best_combo = combo best_order = order if verbose: _log(f"[TRI] 最优四边形: score={best_score:.3f}") if best_score < 3.0: cands = [filtered[i] for i in best_combo] tl, bl, br, tr = best_order assigned = { 0: cands[tl], 1: cands[bl], 2: cands[br], 3: cands[tr], } if assigned is None: cx = np.mean([c["center_px"][0] for c in filtered]) cy = np.mean([c["center_px"][1] for c in filtered]) quadrant_map = {} for c in filtered: tx, ty = c["center_px"] if tx < cx and ty < cy: q = 0 elif tx < cx: q = 1 elif ty >= cy: q = 2 else: q = 3 if q not in quadrant_map or c["avg_leg"] > quadrant_map[q]["avg_leg"]: quadrant_map[q] = c assigned = quadrant_map result = [] for tid in sorted(assigned.keys()): c = assigned[tid] result.append({ "id": tid, "center": c["right_pt"], "corners": c["corners"], }) return result def try_triangle_scoring( img_rgb, laser_xy, marker_positions, camera_matrix, dist_coeffs, size_range=(8, 500), ): """ 尝试三角形单应性 + PnP 估距。 img_rgb: RGB,与 laser_xy 同一像素坐标系。 返回 dict: ok, dx_cm, dy_cm, distance_m, offset_method, distance_method """ out = { "ok": False, "dx_cm": None, "dy_cm": None, "distance_m": None, "offset_method": None, "distance_method": None, } if marker_positions is None or camera_matrix is None or dist_coeffs is None: return out h_orig, w_orig = img_rgb.shape[:2] # 缩图加速:嵌入式 CPU 上图像处理耗时与面积成正比,缩到最长边 320px 可获得 ~4× 加速 # 检测完后把像素坐标乘以 inv_scale 还原到原始分辨率,再送入单应性/PnP(与 K 标定分辨率一致) MAX_DETECT_DIM = 640 long_side = max(h_orig, w_orig) if long_side > MAX_DETECT_DIM: det_scale = MAX_DETECT_DIM / long_side det_w = int(w_orig * det_scale) det_h = int(h_orig * det_scale) img_det = cv2.resize(img_rgb, (det_w, det_h), interpolation=cv2.INTER_LINEAR) inv_scale = 1.0 / det_scale size_range_det = (max(4, int(size_range[0] * det_scale)), max(8, int(size_range[1] * det_scale))) else: img_det = img_rgb inv_scale = 1.0 size_range_det = size_range gray = cv2.cvtColor(img_det, cv2.COLOR_RGB2GRAY) # 快速路径:直接在原始灰度图上跑(内部先走 Otsu,几乎不耗时) # 光照均匀时通常在这一步就找到 ≥3 个三角形,完全跳过 CLAHE tri_markers = detect_triangle_markers( gray, orig_gray=gray, size_range=size_range_det, verbose=True ) if len(tri_markers) < 3: # 慢速兜底:CLAHE 增强对比度后再试(光线不均 / 局部过暗时有效) # 默认关闭以优先速度;由 config.TRIANGLE_ENABLE_CLAHE_FALLBACK 控制。 try: import config as _cfg enable_clahe = bool(getattr(_cfg, "TRIANGLE_ENABLE_CLAHE_FALLBACK", False)) except Exception: enable_clahe = False if enable_clahe: _log(f"[TRI] 快速路径不足{len(tri_markers)}个,启用CLAHE增强") clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) gray_clahe = clahe.apply(gray) tri_markers = detect_triangle_markers( gray_clahe, orig_gray=gray, size_range=size_range_det, verbose=True ) else: _log(f"[TRI] 快速路径不足{len(tri_markers)}个,跳过CLAHE兜底(已关闭)") if len(tri_markers) < 3: _log(f"[TRI] 三角形不足3个: {len(tri_markers)}") return out # 将缩图坐标还原为原始分辨率(K 矩阵在原始分辨率下标定) if inv_scale != 1.0: for m in tri_markers: m["center"] = [m["center"][0] * inv_scale, m["center"][1] * inv_scale] m["corners"] = [[c[0] * inv_scale, c[1] * inv_scale] for c in m["corners"]] lx = float(np.clip(laser_xy[0], 0, w_orig - 1)) ly = float(np.clip(laser_xy[1], 0, h_orig - 1)) if len(tri_markers) == 4: tri_sorted = sorted(tri_markers, key=lambda m: m["id"]) marker_ids = [m["id"] for m in tri_sorted] marker_centers = [[float(m["center"][0]), float(m["center"][1])] for m in tri_sorted] offset_tag = "triangle_homography" else: marker_ids_list = [m["id"] for m in tri_markers] marker_centers_orig = [[float(m["center"][0]), float(m["center"][1])] for m in tri_markers] comp = complete_fourth_point(marker_ids_list, marker_centers_orig, marker_positions) if comp is None: _log("[TRI] 3点补全第4点失败") return out marker_centers, marker_ids = comp marker_centers = [[float(c[0]), float(c[1])] for c in marker_centers] offset_tag = "triangle_homography_3pt" ok_h, tx, ty, _H = homography_calibration( marker_centers, marker_ids, marker_positions, [lx, ly] ) if not ok_h: _log("[TRI] 单应性失败") return out # 与 laser_manager.compute_laser_position 现网约定一致:(x_cm, -y_cm_target) out["dx_cm"] = tx out["dy_cm"] = -ty out["ok"] = True out["offset_method"] = offset_tag out["markers"] = tri_markers # 供上层绘制标注用 out["homography"] = _H # 供上层反推靶心像素位置用 dist_m = pnp_distance_meters(marker_ids, marker_centers, marker_positions, camera_matrix, dist_coeffs) if dist_m is not None and 0.3 < dist_m < 50.0: out["distance_m"] = dist_m out["distance_method"] = "pnp_triangle" _log(f"[TRI] PnP 距离={dist_m:.2f}m, 偏移=({out['dx_cm']:.2f},{out['dy_cm']:.2f})cm") else: out["distance_m"] = None out["distance_method"] = None _log(f"[TRI] PnP 距离无效,回退黄心估距; 偏移=({out['dx_cm']:.2f},{out['dy_cm']:.2f})cm") return out