#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 视觉检测模块 提供靶心检测、距离估算、图像保存等功能 """ import cv2 import numpy as np import os import math import threading import queue from maix import image import config from logger_manager import logger_manager # 存图队列 + worker _save_queue = queue.Queue(maxsize=16) _save_worker_started = False _save_worker_lock = threading.Lock() def check_laser_point_sharpness(frame, laser_point=None, roi_size=30, threshold=100.0, ellipse_params=None): """ 检测激光点本身的清晰度(不是整个靶子) Args: frame: 图像帧对象 laser_point: 激光点坐标 (x, y),如果为None则自动查找 roi_size: ROI区域大小(像素),默认30x30 threshold: 清晰度阈值 ellipse_params: 椭圆参数 ((center_x, center_y), (width, height), angle),用于限制激光点必须在椭圆内 Returns: (is_sharp, sharpness_score, laser_pos): (是否清晰, 清晰度分数, 激光点坐标) """ try: # 1. 如果没有提供激光点,先查找 if laser_point is None: from laser_manager import laser_manager laser_point = laser_manager.find_red_laser(frame, ellipse_params=ellipse_params) if laser_point is None: logger_manager.logger.debug(f"未找到激光点") return False, 0.0, None x, y = laser_point # 2. 转换为 OpenCV 格式 img_cv = image.image2cv(frame, False, False) h, w = img_cv.shape[:2] # 3. 提取 ROI 区域(激光点周围) roi_half = roi_size // 2 x_min = max(0, int(x) - roi_half) x_max = min(w, int(x) + roi_half) y_min = max(0, int(y) - roi_half) y_max = min(h, int(y) + roi_half) roi = img_cv[y_min:y_max, x_min:x_max] if roi.size == 0: return False, 0.0, laser_point # 4. 转换为灰度图(用于清晰度检测) gray_roi = cv2.cvtColor(roi, cv2.COLOR_RGB2GRAY) # 5. 方法1:检测点的扩散程度(能量集中度) # 计算中心区域的能量集中度 center_x, center_y = roi.shape[1] // 2, roi.shape[0] // 2 center_radius = min(5, roi.shape[0] // 4) # 中心区域半径 # 创建中心区域的掩码 y_coords, x_coords = np.ogrid[:roi.shape[0], :roi.shape[1]] center_mask = (x_coords - center_x)**2 + (y_coords - center_y)**2 <= center_radius**2 # 计算中心区域和周围区域的亮度 center_brightness = gray_roi[center_mask].mean() outer_mask = ~center_mask outer_brightness = gray_roi[outer_mask].mean() if np.any(outer_mask) else 0 # 对比度(清晰的点对比度高) contrast = abs(center_brightness - outer_brightness) # 6. 方法2:检测点的边缘锐度(使用拉普拉斯) laplacian = cv2.Laplacian(gray_roi, cv2.CV_64F) edge_sharpness = abs(laplacian).var() # 7. 方法3:检测点的能量集中度(方差) # 清晰的点:能量集中在中心,方差小 # 模糊的点:能量分散,方差大 # 但我们需要的是:清晰的点中心亮度高,周围低,所以梯度大 sobel_x = cv2.Sobel(gray_roi, cv2.CV_64F, 1, 0, ksize=3) sobel_y = cv2.Sobel(gray_roi, cv2.CV_64F, 0, 1, ksize=3) gradient = np.sqrt(sobel_x**2 + sobel_y**2) gradient_sharpness = gradient.var() # 8. 组合多个指标 # 对比度权重0.3,边缘锐度权重0.4,梯度权重0.3 sharpness_score = (contrast * 0.3 + edge_sharpness * 0.4 + gradient_sharpness * 0.3) is_sharp = sharpness_score >= threshold logger = logger_manager.logger if logger: logger.debug(f"[VISION] 激光点清晰度: 位置=({x}, {y}), 对比度={contrast:.2f}, 边缘={edge_sharpness:.2f}, 梯度={gradient_sharpness:.2f}, 综合={sharpness_score:.2f}, 是否清晰={is_sharp}") return is_sharp, sharpness_score, laser_point except Exception as e: logger = logger_manager.logger if logger: logger.error(f"[VISION] 激光点清晰度检测失败: {e}") import traceback logger.error(traceback.format_exc()) return False, 0.0, laser_point def check_image_sharpness(frame, threshold=100.0, save_debug_images=False): """ 检查图像清晰度(针对圆形靶子优化,基于圆形边缘检测) 检测靶心的圆形边缘,计算边缘区域的梯度清晰度 Args: frame: 图像帧对象 threshold: 清晰度阈值,低于此值认为图像模糊(默认100.0) 可以根据实际情况调整: - 清晰图像通常 > 200 - 模糊图像通常 < 100 - 中等清晰度 100-200 save_debug_images: 是否保存调试图像(原始图和边缘图),默认False Returns: (is_sharp, sharpness_score): (是否清晰, 清晰度分数) """ try: logger_manager.logger.debug(f"begin") # 转换为 OpenCV 格式 img_cv = image.image2cv(frame, False, False) logger_manager.logger.debug(f"after image2cv") # 转换为 HSV 颜色空间 hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV) h, s, v = cv2.split(hsv) logger_manager.logger.debug(f"after HSV conversion") # 检测黄色区域(靶心) # 调整饱和度策略:稍微增强,不要过度 s_enhanced = np.clip(s * 1.1, 0, 255).astype(np.uint8) hsv_enhanced = cv2.merge((h, s_enhanced, v)) # HSV 阈值范围(与 detect_circle_v3 保持一致) lower_yellow = np.array([7, 80, 0]) upper_yellow = np.array([32, 255, 255]) mask_yellow = cv2.inRange(hsv_enhanced, lower_yellow, upper_yellow) # 形态学操作,填充小孔洞 kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) mask_yellow = cv2.morphologyEx(mask_yellow, cv2.MORPH_CLOSE, kernel) logger_manager.logger.debug(f"after yellow mask detection") # 计算边缘区域:扩展黄色区域,然后减去原始区域,得到边缘区域 mask_dilated = cv2.dilate(mask_yellow, kernel, iterations=2) mask_edge = cv2.subtract(mask_dilated, mask_yellow) # 边缘区域 # 计算边缘区域的像素数量 edge_pixel_count = np.sum(mask_edge > 0) logger_manager.logger.debug(f"edge pixel count: {edge_pixel_count}") # 如果检测不到边缘区域,使用全局梯度作为后备方案 if edge_pixel_count < 100: logger_manager.logger.debug(f"edge region too small, using global gradient") # 使用 V 通道计算全局梯度 sobel_v_x = cv2.Sobel(v, cv2.CV_64F, 1, 0, ksize=3) sobel_v_y = cv2.Sobel(v, cv2.CV_64F, 0, 1, ksize=3) gradient = np.sqrt(sobel_v_x**2 + sobel_v_y**2) sharpness_score = gradient.var() logger_manager.logger.debug(f"global gradient variance: {sharpness_score:.2f}") else: # 在边缘区域计算梯度清晰度 # 使用 V(亮度)通道计算梯度,因为边缘在亮度上通常很明显 sobel_v_x = cv2.Sobel(v, cv2.CV_64F, 1, 0, ksize=3) sobel_v_y = cv2.Sobel(v, cv2.CV_64F, 0, 1, ksize=3) gradient = np.sqrt(sobel_v_x**2 + sobel_v_y**2) # 只在边缘区域计算清晰度 edge_gradient = gradient[mask_edge > 0] if len(edge_gradient) > 0: # 计算边缘梯度的方差(清晰图像的边缘梯度变化大) sharpness_score = edge_gradient.var() # 也可以使用均值作为补充指标(清晰图像的边缘梯度均值也较大) gradient_mean = edge_gradient.mean() logger_manager.logger.debug(f"edge gradient: mean={gradient_mean:.2f}, var={sharpness_score:.2f}, pixels={len(edge_gradient)}") else: # 如果边缘区域没有有效梯度,使用全局梯度 sharpness_score = gradient.var() logger_manager.logger.debug(f"no edge gradient, using global: {sharpness_score:.2f}") # 保存调试图像(如果启用) if save_debug_images: try: debug_dir = config.PHOTO_DIR if debug_dir not in os.listdir("/root"): try: os.mkdir(debug_dir) except: pass # 生成文件名 try: all_images = [f for f in os.listdir(debug_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))] img_count = len(all_images) except: img_count = 0 # 保存原始图像 img_orig = image.cv2image(img_cv, False, False) orig_filename = f"{debug_dir}/sharpness_debug_orig_{img_count:04d}.bmp" img_orig.save(orig_filename) # # 保存边缘检测结果(可视化) # # 创建可视化图像:原始图像 + 黄色区域 + 边缘区域 # debug_img = img_cv.copy() # # 在黄色区域绘制绿色 # debug_img[mask_yellow > 0] = [0, 255, 0] # RGB格式,绿色 # # 在边缘区域绘制红色 # debug_img[mask_edge > 0] = [255, 0, 0] # RGB格式,红色 # debug_img_maix = image.cv2image(debug_img, False, False) # debug_filename = f"{debug_dir}/sharpness_debug_edge_{img_count:04d}.bmp" # debug_img_maix.save(debug_filename) # logger = logger_manager.logger # if logger: # logger.info(f"[VISION] 保存调试图像: {orig_filename}, {debug_filename}") except Exception as e: logger = logger_manager.logger if logger: logger.warning(f"[VISION] 保存调试图像失败: {e}") import traceback logger.error(traceback.format_exc()) is_sharp = sharpness_score >= threshold logger = logger_manager.logger if logger: logger.debug(f"[VISION] 清晰度检测: 分数={sharpness_score:.2f}, 边缘像素数={edge_pixel_count}, 是否清晰={is_sharp}, 阈值={threshold}") return is_sharp, sharpness_score except Exception as e: logger = logger_manager.logger if logger: logger.error(f"[VISION] 清晰度检测失败: {e}") import traceback logger.error(traceback.format_exc()) # 出错时返回 False,避免使用模糊图像 return False, 0.0 def save_calibration_image(frame, laser_pos, photo_dir=None): """ 保存激光校准图像(带标注) 在找到的激光点位置绘制圆圈,便于检查算法是否正确 Args: frame: 原始图像帧 laser_pos: 找到的激光点坐标 (x, y) photo_dir: 照片存储目录,如果为None则使用 config.PHOTO_DIR Returns: str: 保存的文件路径,如果保存失败则返回 None """ # 检查是否启用图像保存 if not config.SAVE_IMAGE_ENABLED: return None if photo_dir is None: photo_dir = config.PHOTO_DIR try: # 确保照片目录存在 try: if photo_dir not in os.listdir("/root"): os.mkdir(photo_dir) except: pass # 生成文件名 try: all_images = [f for f in os.listdir(photo_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))] img_count = len(all_images) except: img_count = 0 x, y = laser_pos filename = f"{photo_dir}/calibration_{int(x)}_{int(y)}_{img_count:04d}.bmp" logger = logger_manager.logger if logger: logger.info(f"保存校准图像: {filename}, 激光点: ({x}, {y})") # 转换图像为 OpenCV 格式以便绘制 img_cv = image.image2cv(frame, False, False) # 绘制激光点圆圈(用绿色圆圈标出找到的激光点) cv2.circle(img_cv, (int(x), int(y)), 10, (0, 255, 0), 2) # 外圈:绿色,半径10 cv2.circle(img_cv, (int(x), int(y)), 5, (0, 255, 0), 2) # 中圈:绿色,半径5 cv2.circle(img_cv, (int(x), int(y)), 2, (0, 255, 0), -1) # 中心点:绿色实心 # 可选:绘制十字线帮助定位 cv2.line(img_cv, (int(x - 20), int(y)), (int(x + 20), int(y)), (0, 255, 0), 1) # 水平线 cv2.line(img_cv, (int(x), int(y - 20)), (int(x), int(y + 20)), (0, 255, 0), 1) # 垂直线 # 转换回 MaixPy 图像格式并保存 result_img = image.cv2image(img_cv, False, False) result_img.save(filename) if logger: logger.debug(f"校准图像已保存: {filename}") return filename except Exception as e: logger = logger_manager.logger if logger: logger.error(f"保存校准图像失败: {e}") import traceback logger.error(traceback.format_exc()) return None def detect_circle_v3(frame, laser_point=None): """检测图像中的靶心(优先清晰轮廓,其次黄色区域)- 返回椭圆参数版本 增加红色圆圈检测,验证黄色圆圈是否为真正的靶心 如果提供 laser_point,会选择最接近激光点的目标 Args: frame: 图像帧 laser_point: 激光点坐标 (x, y),用于多目标场景下的目标选择 Returns: (result_img, best_center, best_radius, method, best_radius1, ellipse_params) """ img_cv = image.image2cv(frame, False, False) best_center = best_radius = best_radius1 = method = None ellipse_params = None # HSV 黄色掩码检测(模糊靶心) hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV) h, s, v = cv2.split(hsv) # 调整饱和度策略:稍微增强,不要过度 s = np.clip(s * 1.1, 0, 255).astype(np.uint8) hsv = cv2.merge((h, s, v)) # 放宽 HSV 阈值范围(针对模糊图像的关键调整) lower_yellow = np.array([7, 80, 0]) # 饱和度下限降低,捕捉淡黄色 upper_yellow = np.array([32, 255, 255]) # 亮度上限拉满 mask_yellow = cv2.inRange(hsv, lower_yellow, upper_yellow) # 调整形态学操作 kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) mask_yellow = cv2.morphologyEx(mask_yellow, cv2.MORPH_CLOSE, kernel) contours_yellow, _ = cv2.findContours(mask_yellow, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) # 存储所有有效的黄色-红色组合 valid_targets = [] if contours_yellow: for cnt_yellow in contours_yellow: area = cv2.contourArea(cnt_yellow) perimeter = cv2.arcLength(cnt_yellow, True) # 计算圆度 if perimeter > 0: circularity = (4 * np.pi * area) / (perimeter * perimeter) else: circularity = 0 logger = logger_manager.logger if area > 50 and circularity > 0.7: if logger: logger.info(f"[target] -> 面积:{area}, 圆度:{circularity:.2f}") # 尝试拟合椭圆 yellow_center = None yellow_radius = None yellow_ellipse = None if len(cnt_yellow) >= 5: (x, y), (width, height), angle = cv2.fitEllipse(cnt_yellow) yellow_ellipse = ((x, y), (width, height), angle) axes_minor = min(width, height) radius = axes_minor / 2 yellow_center = (int(x), int(y)) yellow_radius = int(radius) else: (x, y), radius = cv2.minEnclosingCircle(cnt_yellow) yellow_center = (int(x), int(y)) yellow_radius = int(radius) yellow_ellipse = None # 如果检测到黄色圆圈,再检测红色圆圈进行验证 if yellow_center and yellow_radius: # HSV 红色掩码检测(红色在HSV中跨越0度,需要两个范围) # 红色范围1: 0-10度(接近0度的红色) lower_red1 = np.array([0, 80, 0]) upper_red1 = np.array([10, 255, 255]) mask_red1 = cv2.inRange(hsv, lower_red1, upper_red1) # 红色范围2: 170-180度(接近180度的红色) lower_red2 = np.array([170, 80, 0]) upper_red2 = np.array([180, 255, 255]) mask_red2 = cv2.inRange(hsv, lower_red2, upper_red2) # 合并两个红色掩码 mask_red = cv2.bitwise_or(mask_red1, mask_red2) # 形态学操作 kernel_red = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) mask_red = cv2.morphologyEx(mask_red, cv2.MORPH_CLOSE, kernel_red) contours_red, _ = cv2.findContours(mask_red, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) found_valid_red = False if contours_red: # 找到所有符合条件的红色圆圈 for cnt_red in contours_red: area_red = cv2.contourArea(cnt_red) perimeter_red = cv2.arcLength(cnt_red, True) if perimeter_red > 0: circularity_red = (4 * np.pi * area_red) / (perimeter_red * perimeter_red) else: circularity_red = 0 # 红色圆圈也应该有一定的圆度 if area_red > 50 and circularity_red > 0.6: # 计算红色圆圈的中心和半径 if len(cnt_red) >= 5: (x_red, y_red), (w_red, h_red), angle_red = cv2.fitEllipse(cnt_red) radius_red = min(w_red, h_red) / 2 red_center = (int(x_red), int(y_red)) red_radius = int(radius_red) else: (x_red, y_red), radius_red = cv2.minEnclosingCircle(cnt_red) red_center = (int(x_red), int(y_red)) red_radius = int(radius_red) # 计算黄色和红色圆心的距离 if red_center: dx = yellow_center[0] - red_center[0] dy = yellow_center[1] - red_center[1] distance = np.sqrt(dx*dx + dy*dy) # 圆心距离阈值:应该小于黄色半径的某个倍数(比如1.5倍) max_distance = yellow_radius * 1.5 # 红色圆圈应该比黄色圆圈大(外圈) if distance < max_distance and red_radius > yellow_radius * 0.8: found_valid_red = True logger = logger_manager.logger if logger: logger.info(f"[target] -> 找到匹配的红圈: 黄心({yellow_center}), 红心({red_center}), 距离:{distance:.1f}, 黄半径:{yellow_radius}, 红半径:{red_radius}") # 记录这个有效目标 valid_targets.append({ 'center': yellow_center, 'radius': yellow_radius, 'ellipse': yellow_ellipse, 'area': area }) break if not found_valid_red: logger = logger_manager.logger if logger: logger.debug("Debug -> 未找到匹配的红色圆圈,可能是误识别") # 从所有有效目标中选择最佳目标 if valid_targets: if laser_point: # 如果有激光点,选择最接近激光点的目标 best_target = None min_distance = float('inf') for target in valid_targets: dx = target['center'][0] - laser_point[0] dy = target['center'][1] - laser_point[1] distance = np.sqrt(dx*dx + dy*dy) if distance < min_distance: min_distance = distance best_target = target if best_target: best_center = best_target['center'] best_radius = best_target['radius'] ellipse_params = best_target['ellipse'] method = "v3_ellipse_red_validated_laser_selected" best_radius1 = best_radius * 5 else: # 如果没有激光点,选择面积最大的目标 best_target = max(valid_targets, key=lambda t: t['area']) best_center = best_target['center'] best_radius = best_target['radius'] ellipse_params = best_target['ellipse'] method = "v3_ellipse_red_validated" best_radius1 = best_radius * 5 result_img = image.cv2image(img_cv, False, False) return result_img, best_center, best_radius, method, best_radius1, ellipse_params def estimate_distance(pixel_radius): """根据像素半径估算实际距离(单位:米)""" if not pixel_radius: return 0.0 return (config.REAL_RADIUS_CM * config.FOCAL_LENGTH_PIX) / pixel_radius / 100.0 def estimate_pixel(physical_distance_cm, target_distance_m): """ 根据物理距离和目标距离计算对应的像素偏移 Args: physical_distance_cm: 物理世界中的距离(厘米),例如激光与摄像头的距离 target_distance_m: 目标距离(米),例如到靶心的距离 Returns: float: 对应的像素偏移 """ if not target_distance_m or target_distance_m <= 0: return 0.0 # 公式:像素偏移 = (物理距离_米) * 焦距_像素 / 目标距离_米 return (physical_distance_cm / 100.0) * config.FOCAL_LENGTH_PIX / target_distance_m def _save_shot_image_impl(img_cv, center, radius, method, ellipse_params, laser_point, distance_m, shot_id=None, photo_dir=None): """ 内部实现:在 img_cv (numpy HWC RGB) 上绘制标注并保存。 由 save_shot_image(同步)和存图 worker(异步)调用。 """ if not config.SAVE_IMAGE_ENABLED: return None if photo_dir is None: photo_dir = config.PHOTO_DIR try: try: if photo_dir not in os.listdir("/root"): os.mkdir(photo_dir) except Exception: pass x, y = laser_point if shot_id: if center is None or radius is None: filename = f"{photo_dir}/shot_{shot_id}_no_target.bmp" else: method_str = method or "unknown" filename = f"{photo_dir}/shot_{shot_id}_{method_str}.bmp" else: try: all_images = [f for f in os.listdir(photo_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))] img_count = len(all_images) except Exception: img_count = 0 if center is None or radius is None: method_str = "no_target" distance_str = "000" else: method_str = method or "unknown" distance_str = str(round((distance_m or 0.0) * 100)) filename = f"{photo_dir}/{method_str}_{int(x)}_{int(y)}_{distance_str}_{img_count:04d}.bmp" logger = logger_manager.logger if logger: if shot_id: logger.info(f"[VISION] 保存射箭图像,ID: {shot_id}, 文件名: {filename}") if center and radius: logger.info(f"结果 -> 圆心: {center}, 半径: {radius}, 方法: {method}") if ellipse_params: (ec, (ew, eh), ea) = ellipse_params logger.info(f"椭圆 -> 中心: ({ec[0]:.1f}, {ec[1]:.1f}), 长轴: {max(ew, eh):.1f}, 短轴: {min(ew, eh):.1f}, 角度: {ea:.1f}°") else: logger.info(f"结果 -> 未检测到靶心,保存原始图像(激光点: ({x}, {y}))") laser_color = (config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2]) cross_thickness = int(max(getattr(config, "LASER_THICKNESS", 1), 1)) cross_length = int(max(getattr(config, "LASER_LENGTH", 10), 10)) cv2.line(img_cv, (int(x - cross_length), int(y)), (int(x + cross_length), int(y)), laser_color, cross_thickness) cv2.line(img_cv, (int(x), int(y - cross_length)), (int(x), int(y + cross_length)), laser_color, cross_thickness) cv2.circle(img_cv, (int(x), int(y)), 1, laser_color, cross_thickness) ring_thickness = 1 cv2.circle(img_cv, (int(x), int(y)), 10, laser_color, ring_thickness) cv2.circle(img_cv, (int(x), int(y)), 5, laser_color, ring_thickness) cv2.circle(img_cv, (int(x), int(y)), 2, laser_color, -1) if center and radius: cx, cy = center if ellipse_params: (ell_center, (width, height), angle) = ellipse_params cx_ell, cy_ell = int(ell_center[0]), int(ell_center[1]) cv2.ellipse(img_cv, (cx_ell, cy_ell), (int(width / 2), int(height / 2)), angle, 0, 360, (0, 255, 0), 2) cv2.circle(img_cv, (cx_ell, cy_ell), 3, (255, 0, 0), -1) minor_length = min(width, height) / 2 minor_angle = angle + 90 if width >= height else angle minor_angle_rad = math.radians(minor_angle) dx_minor = minor_length * math.cos(minor_angle_rad) dy_minor = minor_length * math.sin(minor_angle_rad) pt1 = (int(cx_ell - dx_minor), int(cy_ell - dy_minor)) pt2 = (int(cx_ell + dx_minor), int(cy_ell + dy_minor)) cv2.line(img_cv, pt1, pt2, (0, 0, 255), 2) else: cv2.circle(img_cv, (cx, cy), radius, (0, 0, 255), 2) cv2.circle(img_cv, (cx, cy), 2, (0, 0, 255), -1) cv2.line(img_cv, (int(x), int(y)), (cx, cy), (255, 255, 0), 1) out = image.cv2image(img_cv, False, False) out.save(filename) if logger: if center and radius: logger.debug(f"图像已保存(含靶心标注): {filename}") else: logger.debug(f"图像已保存(无靶心,含激光十字线): {filename}") # 清理旧图片:如果目录下图片超过100张,删除最老的 try: image_files = [] for f in os.listdir(photo_dir): if f.endswith(('.bmp', '.jpg', '.jpeg')): filepath = os.path.join(photo_dir, f) try: mtime = os.path.getmtime(filepath) image_files.append((mtime, filepath, f)) except Exception: pass from config import MAX_IMAGES if len(image_files) > MAX_IMAGES: image_files.sort(key=lambda x: x[0]) to_delete = len(image_files) - MAX_IMAGES deleted_count = 0 for _, filepath, fname in image_files[:to_delete]: try: os.remove(filepath) deleted_count += 1 if logger: logger.debug(f"[VISION] 删除旧图片: {fname}") except Exception as e: if logger: logger.warning(f"[VISION] 删除旧图片失败 {fname}: {e}") if logger and deleted_count > 0: logger.info(f"[VISION] 已清理 {deleted_count} 张旧图片,当前剩余 {MAX_IMAGES} 张") except Exception as e: if logger: logger.warning(f"[VISION] 清理旧图片时出错(可忽略): {e}") return filename except Exception as e: logger = logger_manager.logger if logger: logger.error(f"保存图像失败: {e}") import traceback logger.error(traceback.format_exc()) return None def _save_worker_loop(): """存图 worker:从队列取任务并调用 _save_shot_image_impl。""" while True: try: item = _save_queue.get() if item is None: break _save_shot_image_impl(*item) except Exception as e: logger = logger_manager.logger if logger: logger.error(f"[VISION] 存图 worker 异常: {e}") import traceback logger.error(traceback.format_exc()) finally: try: _save_queue.task_done() except Exception: pass def start_save_shot_worker(): """启动存图 worker 线程(应在程序初始化时调用一次)。""" global _save_worker_started with _save_worker_lock: if _save_worker_started: return _save_worker_started = True t = threading.Thread(target=_save_worker_loop, daemon=True) t.start() logger = logger_manager.logger if logger: logger.info("[VISION] 存图 worker 线程已启动") def enqueue_save_shot(result_img, center, radius, method, ellipse_params, laser_point, distance_m, shot_id=None, photo_dir=None): """ 将存图任务放入队列,由 worker 异步保存。主线程传入 result_img 的复制,不阻塞。 """ if not config.SAVE_IMAGE_ENABLED: return if photo_dir is None: photo_dir = config.PHOTO_DIR try: img_cv = image.image2cv(result_img, False, False) img_copy = np.copy(img_cv) except Exception as e: logger = logger_manager.logger if logger: logger.error(f"[VISION] enqueue_save_shot 复制图像失败: {e}") return task = (img_copy, center, radius, method, ellipse_params, laser_point, distance_m, shot_id, photo_dir) try: _save_queue.put_nowait(task) except queue.Full: logger = logger_manager.logger if logger: logger.warning("[VISION] 存图队列已满,跳过本次保存") def save_shot_image(result_img, center, radius, method, ellipse_params, laser_point, distance_m, shot_id=None, photo_dir=None): """ 保存射击图像(带标注)。同步调用,会阻塞。 主流程建议使用 enqueue_save_shot;此处保留供校准、测试等场景使用。 """ if not config.SAVE_IMAGE_ENABLED: return None if photo_dir is None: photo_dir = config.PHOTO_DIR try: img_cv = image.image2cv(result_img, False, False) return _save_shot_image_impl(img_cv, center, radius, method, ellipse_params, laser_point, distance_m, shot_id, photo_dir) except Exception as e: logger = logger_manager.logger if logger: logger.error(f"[VISION] save_shot_image 转换图像失败: {e}") return None