Files
archery/vision.py
gcw_4spBpAfv 8ce8831315 v1.2.2
2026-01-24 11:05:03 +08:00

752 lines
32 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
视觉检测模块
提供靶心检测、距离估算、图像保存等功能
"""
import cv2
import numpy as np
import os
import math
import threading
import queue
from maix import image
import config
from logger_manager import logger_manager
# 存图队列 + worker
_save_queue = queue.Queue(maxsize=16)
_save_worker_started = False
_save_worker_lock = threading.Lock()
def check_laser_point_sharpness(frame, laser_point=None, roi_size=30, threshold=100.0, ellipse_params=None):
"""
检测激光点本身的清晰度(不是整个靶子)
Args:
frame: 图像帧对象
laser_point: 激光点坐标 (x, y)如果为None则自动查找
roi_size: ROI区域大小像素默认30x30
threshold: 清晰度阈值
ellipse_params: 椭圆参数 ((center_x, center_y), (width, height), angle),用于限制激光点必须在椭圆内
Returns:
(is_sharp, sharpness_score, laser_pos): (是否清晰, 清晰度分数, 激光点坐标)
"""
try:
# 1. 如果没有提供激光点,先查找
if laser_point is None:
from laser_manager import laser_manager
laser_point = laser_manager.find_red_laser(frame, ellipse_params=ellipse_params)
if laser_point is None:
logger_manager.logger.debug(f"未找到激光点")
return False, 0.0, None
x, y = laser_point
# 2. 转换为 OpenCV 格式
img_cv = image.image2cv(frame, False, False)
h, w = img_cv.shape[:2]
# 3. 提取 ROI 区域(激光点周围)
roi_half = roi_size // 2
x_min = max(0, int(x) - roi_half)
x_max = min(w, int(x) + roi_half)
y_min = max(0, int(y) - roi_half)
y_max = min(h, int(y) + roi_half)
roi = img_cv[y_min:y_max, x_min:x_max]
if roi.size == 0:
return False, 0.0, laser_point
# 4. 转换为灰度图(用于清晰度检测)
gray_roi = cv2.cvtColor(roi, cv2.COLOR_RGB2GRAY)
# 5. 方法1检测点的扩散程度能量集中度
# 计算中心区域的能量集中度
center_x, center_y = roi.shape[1] // 2, roi.shape[0] // 2
center_radius = min(5, roi.shape[0] // 4) # 中心区域半径
# 创建中心区域的掩码
y_coords, x_coords = np.ogrid[:roi.shape[0], :roi.shape[1]]
center_mask = (x_coords - center_x)**2 + (y_coords - center_y)**2 <= center_radius**2
# 计算中心区域和周围区域的亮度
center_brightness = gray_roi[center_mask].mean()
outer_mask = ~center_mask
outer_brightness = gray_roi[outer_mask].mean() if np.any(outer_mask) else 0
# 对比度(清晰的点对比度高)
contrast = abs(center_brightness - outer_brightness)
# 6. 方法2检测点的边缘锐度使用拉普拉斯
laplacian = cv2.Laplacian(gray_roi, cv2.CV_64F)
edge_sharpness = abs(laplacian).var()
# 7. 方法3检测点的能量集中度方差
# 清晰的点:能量集中在中心,方差小
# 模糊的点:能量分散,方差大
# 但我们需要的是:清晰的点中心亮度高,周围低,所以梯度大
sobel_x = cv2.Sobel(gray_roi, cv2.CV_64F, 1, 0, ksize=3)
sobel_y = cv2.Sobel(gray_roi, cv2.CV_64F, 0, 1, ksize=3)
gradient = np.sqrt(sobel_x**2 + sobel_y**2)
gradient_sharpness = gradient.var()
# 8. 组合多个指标
# 对比度权重0.3边缘锐度权重0.4梯度权重0.3
sharpness_score = (contrast * 0.3 + edge_sharpness * 0.4 + gradient_sharpness * 0.3)
is_sharp = sharpness_score >= threshold
logger = logger_manager.logger
if logger:
logger.debug(f"[VISION] 激光点清晰度: 位置=({x}, {y}), 对比度={contrast:.2f}, 边缘={edge_sharpness:.2f}, 梯度={gradient_sharpness:.2f}, 综合={sharpness_score:.2f}, 是否清晰={is_sharp}")
return is_sharp, sharpness_score, laser_point
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[VISION] 激光点清晰度检测失败: {e}")
import traceback
logger.error(traceback.format_exc())
return False, 0.0, laser_point
def check_image_sharpness(frame, threshold=100.0, save_debug_images=False):
"""
检查图像清晰度(针对圆形靶子优化,基于圆形边缘检测)
检测靶心的圆形边缘,计算边缘区域的梯度清晰度
Args:
frame: 图像帧对象
threshold: 清晰度阈值低于此值认为图像模糊默认100.0
可以根据实际情况调整:
- 清晰图像通常 > 200
- 模糊图像通常 < 100
- 中等清晰度 100-200
save_debug_images: 是否保存调试图像原始图和边缘图默认False
Returns:
(is_sharp, sharpness_score): (是否清晰, 清晰度分数)
"""
try:
logger_manager.logger.debug(f"begin")
# 转换为 OpenCV 格式
img_cv = image.image2cv(frame, False, False)
logger_manager.logger.debug(f"after image2cv")
# 转换为 HSV 颜色空间
hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV)
h, s, v = cv2.split(hsv)
logger_manager.logger.debug(f"after HSV conversion")
# 检测黄色区域(靶心)
# 调整饱和度策略:稍微增强,不要过度
s_enhanced = np.clip(s * 1.1, 0, 255).astype(np.uint8)
hsv_enhanced = cv2.merge((h, s_enhanced, v))
# HSV 阈值范围(与 detect_circle_v3 保持一致)
lower_yellow = np.array([7, 80, 0])
upper_yellow = np.array([32, 255, 255])
mask_yellow = cv2.inRange(hsv_enhanced, lower_yellow, upper_yellow)
# 形态学操作,填充小孔洞
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask_yellow = cv2.morphologyEx(mask_yellow, cv2.MORPH_CLOSE, kernel)
logger_manager.logger.debug(f"after yellow mask detection")
# 计算边缘区域:扩展黄色区域,然后减去原始区域,得到边缘区域
mask_dilated = cv2.dilate(mask_yellow, kernel, iterations=2)
mask_edge = cv2.subtract(mask_dilated, mask_yellow) # 边缘区域
# 计算边缘区域的像素数量
edge_pixel_count = np.sum(mask_edge > 0)
logger_manager.logger.debug(f"edge pixel count: {edge_pixel_count}")
# 如果检测不到边缘区域,使用全局梯度作为后备方案
if edge_pixel_count < 100:
logger_manager.logger.debug(f"edge region too small, using global gradient")
# 使用 V 通道计算全局梯度
sobel_v_x = cv2.Sobel(v, cv2.CV_64F, 1, 0, ksize=3)
sobel_v_y = cv2.Sobel(v, cv2.CV_64F, 0, 1, ksize=3)
gradient = np.sqrt(sobel_v_x**2 + sobel_v_y**2)
sharpness_score = gradient.var()
logger_manager.logger.debug(f"global gradient variance: {sharpness_score:.2f}")
else:
# 在边缘区域计算梯度清晰度
# 使用 V亮度通道计算梯度因为边缘在亮度上通常很明显
sobel_v_x = cv2.Sobel(v, cv2.CV_64F, 1, 0, ksize=3)
sobel_v_y = cv2.Sobel(v, cv2.CV_64F, 0, 1, ksize=3)
gradient = np.sqrt(sobel_v_x**2 + sobel_v_y**2)
# 只在边缘区域计算清晰度
edge_gradient = gradient[mask_edge > 0]
if len(edge_gradient) > 0:
# 计算边缘梯度的方差(清晰图像的边缘梯度变化大)
sharpness_score = edge_gradient.var()
# 也可以使用均值作为补充指标(清晰图像的边缘梯度均值也较大)
gradient_mean = edge_gradient.mean()
logger_manager.logger.debug(f"edge gradient: mean={gradient_mean:.2f}, var={sharpness_score:.2f}, pixels={len(edge_gradient)}")
else:
# 如果边缘区域没有有效梯度,使用全局梯度
sharpness_score = gradient.var()
logger_manager.logger.debug(f"no edge gradient, using global: {sharpness_score:.2f}")
# 保存调试图像(如果启用)
if save_debug_images:
try:
debug_dir = config.PHOTO_DIR
if debug_dir not in os.listdir("/root"):
try:
os.mkdir(debug_dir)
except:
pass
# 生成文件名
try:
all_images = [f for f in os.listdir(debug_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))]
img_count = len(all_images)
except:
img_count = 0
# 保存原始图像
img_orig = image.cv2image(img_cv, False, False)
orig_filename = f"{debug_dir}/sharpness_debug_orig_{img_count:04d}.bmp"
img_orig.save(orig_filename)
# # 保存边缘检测结果(可视化)
# # 创建可视化图像:原始图像 + 黄色区域 + 边缘区域
# debug_img = img_cv.copy()
# # 在黄色区域绘制绿色
# debug_img[mask_yellow > 0] = [0, 255, 0] # RGB格式绿色
# # 在边缘区域绘制红色
# debug_img[mask_edge > 0] = [255, 0, 0] # RGB格式红色
# debug_img_maix = image.cv2image(debug_img, False, False)
# debug_filename = f"{debug_dir}/sharpness_debug_edge_{img_count:04d}.bmp"
# debug_img_maix.save(debug_filename)
# logger = logger_manager.logger
# if logger:
# logger.info(f"[VISION] 保存调试图像: {orig_filename}, {debug_filename}")
except Exception as e:
logger = logger_manager.logger
if logger:
logger.warning(f"[VISION] 保存调试图像失败: {e}")
import traceback
logger.error(traceback.format_exc())
is_sharp = sharpness_score >= threshold
logger = logger_manager.logger
if logger:
logger.debug(f"[VISION] 清晰度检测: 分数={sharpness_score:.2f}, 边缘像素数={edge_pixel_count}, 是否清晰={is_sharp}, 阈值={threshold}")
return is_sharp, sharpness_score
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[VISION] 清晰度检测失败: {e}")
import traceback
logger.error(traceback.format_exc())
# 出错时返回 False避免使用模糊图像
return False, 0.0
def save_calibration_image(frame, laser_pos, photo_dir=None):
"""
保存激光校准图像(带标注)
在找到的激光点位置绘制圆圈,便于检查算法是否正确
Args:
frame: 原始图像帧
laser_pos: 找到的激光点坐标 (x, y)
photo_dir: 照片存储目录如果为None则使用 config.PHOTO_DIR
Returns:
str: 保存的文件路径,如果保存失败则返回 None
"""
# 检查是否启用图像保存
if not config.SAVE_IMAGE_ENABLED:
return None
if photo_dir is None:
photo_dir = config.PHOTO_DIR
try:
# 确保照片目录存在
try:
if photo_dir not in os.listdir("/root"):
os.mkdir(photo_dir)
except:
pass
# 生成文件名
try:
all_images = [f for f in os.listdir(photo_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))]
img_count = len(all_images)
except:
img_count = 0
x, y = laser_pos
filename = f"{photo_dir}/calibration_{int(x)}_{int(y)}_{img_count:04d}.bmp"
logger = logger_manager.logger
if logger:
logger.info(f"保存校准图像: {filename}, 激光点: ({x}, {y})")
# 转换图像为 OpenCV 格式以便绘制
img_cv = image.image2cv(frame, False, False)
# 绘制激光点圆圈(用绿色圆圈标出找到的激光点)
cv2.circle(img_cv, (int(x), int(y)), 10, (0, 255, 0), 2) # 外圈绿色半径10
cv2.circle(img_cv, (int(x), int(y)), 5, (0, 255, 0), 2) # 中圈绿色半径5
cv2.circle(img_cv, (int(x), int(y)), 2, (0, 255, 0), -1) # 中心点:绿色实心
# 可选:绘制十字线帮助定位
cv2.line(img_cv,
(int(x - 20), int(y)),
(int(x + 20), int(y)),
(0, 255, 0), 1) # 水平线
cv2.line(img_cv,
(int(x), int(y - 20)),
(int(x), int(y + 20)),
(0, 255, 0), 1) # 垂直线
# 转换回 MaixPy 图像格式并保存
result_img = image.cv2image(img_cv, False, False)
result_img.save(filename)
if logger:
logger.debug(f"校准图像已保存: {filename}")
return filename
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"保存校准图像失败: {e}")
import traceback
logger.error(traceback.format_exc())
return None
def detect_circle_v3(frame, laser_point=None):
"""检测图像中的靶心(优先清晰轮廓,其次黄色区域)- 返回椭圆参数版本
增加红色圆圈检测,验证黄色圆圈是否为真正的靶心
如果提供 laser_point会选择最接近激光点的目标
Args:
frame: 图像帧
laser_point: 激光点坐标 (x, y),用于多目标场景下的目标选择
Returns:
(result_img, best_center, best_radius, method, best_radius1, ellipse_params)
"""
img_cv = image.image2cv(frame, False, False)
best_center = best_radius = best_radius1 = method = None
ellipse_params = None
# HSV 黄色掩码检测(模糊靶心)
hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV)
h, s, v = cv2.split(hsv)
# 调整饱和度策略:稍微增强,不要过度
s = np.clip(s * 1.1, 0, 255).astype(np.uint8)
hsv = cv2.merge((h, s, v))
# 放宽 HSV 阈值范围(针对模糊图像的关键调整)
lower_yellow = np.array([7, 80, 0]) # 饱和度下限降低,捕捉淡黄色
upper_yellow = np.array([32, 255, 255]) # 亮度上限拉满
mask_yellow = cv2.inRange(hsv, lower_yellow, upper_yellow)
# 调整形态学操作
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask_yellow = cv2.morphologyEx(mask_yellow, cv2.MORPH_CLOSE, kernel)
contours_yellow, _ = cv2.findContours(mask_yellow, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# 存储所有有效的黄色-红色组合
valid_targets = []
if contours_yellow:
for cnt_yellow in contours_yellow:
area = cv2.contourArea(cnt_yellow)
perimeter = cv2.arcLength(cnt_yellow, True)
# 计算圆度
if perimeter > 0:
circularity = (4 * np.pi * area) / (perimeter * perimeter)
else:
circularity = 0
logger = logger_manager.logger
if area > 50 and circularity > 0.7:
if logger:
logger.info(f"[target] -> 面积:{area}, 圆度:{circularity:.2f}")
# 尝试拟合椭圆
yellow_center = None
yellow_radius = None
yellow_ellipse = None
if len(cnt_yellow) >= 5:
(x, y), (width, height), angle = cv2.fitEllipse(cnt_yellow)
yellow_ellipse = ((x, y), (width, height), angle)
axes_minor = min(width, height)
radius = axes_minor / 2
yellow_center = (int(x), int(y))
yellow_radius = int(radius)
else:
(x, y), radius = cv2.minEnclosingCircle(cnt_yellow)
yellow_center = (int(x), int(y))
yellow_radius = int(radius)
yellow_ellipse = None
# 如果检测到黄色圆圈,再检测红色圆圈进行验证
if yellow_center and yellow_radius:
# HSV 红色掩码检测红色在HSV中跨越0度需要两个范围
# 红色范围1: 0-10度接近0度的红色
lower_red1 = np.array([0, 80, 0])
upper_red1 = np.array([10, 255, 255])
mask_red1 = cv2.inRange(hsv, lower_red1, upper_red1)
# 红色范围2: 170-180度接近180度的红色
lower_red2 = np.array([170, 80, 0])
upper_red2 = np.array([180, 255, 255])
mask_red2 = cv2.inRange(hsv, lower_red2, upper_red2)
# 合并两个红色掩码
mask_red = cv2.bitwise_or(mask_red1, mask_red2)
# 形态学操作
kernel_red = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask_red = cv2.morphologyEx(mask_red, cv2.MORPH_CLOSE, kernel_red)
contours_red, _ = cv2.findContours(mask_red, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
found_valid_red = False
if contours_red:
# 找到所有符合条件的红色圆圈
for cnt_red in contours_red:
area_red = cv2.contourArea(cnt_red)
perimeter_red = cv2.arcLength(cnt_red, True)
if perimeter_red > 0:
circularity_red = (4 * np.pi * area_red) / (perimeter_red * perimeter_red)
else:
circularity_red = 0
# 红色圆圈也应该有一定的圆度
if area_red > 50 and circularity_red > 0.6:
# 计算红色圆圈的中心和半径
if len(cnt_red) >= 5:
(x_red, y_red), (w_red, h_red), angle_red = cv2.fitEllipse(cnt_red)
radius_red = min(w_red, h_red) / 2
red_center = (int(x_red), int(y_red))
red_radius = int(radius_red)
else:
(x_red, y_red), radius_red = cv2.minEnclosingCircle(cnt_red)
red_center = (int(x_red), int(y_red))
red_radius = int(radius_red)
# 计算黄色和红色圆心的距离
if red_center:
dx = yellow_center[0] - red_center[0]
dy = yellow_center[1] - red_center[1]
distance = np.sqrt(dx*dx + dy*dy)
# 圆心距离阈值应该小于黄色半径的某个倍数比如1.5倍)
max_distance = yellow_radius * 1.5
# 红色圆圈应该比黄色圆圈大(外圈)
if distance < max_distance and red_radius > yellow_radius * 0.8:
found_valid_red = True
logger = logger_manager.logger
if logger:
logger.info(f"[target] -> 找到匹配的红圈: 黄心({yellow_center}), 红心({red_center}), 距离:{distance:.1f}, 黄半径:{yellow_radius}, 红半径:{red_radius}")
# 记录这个有效目标
valid_targets.append({
'center': yellow_center,
'radius': yellow_radius,
'ellipse': yellow_ellipse,
'area': area
})
break
if not found_valid_red:
logger = logger_manager.logger
if logger:
logger.debug("Debug -> 未找到匹配的红色圆圈,可能是误识别")
# 从所有有效目标中选择最佳目标
if valid_targets:
if laser_point:
# 如果有激光点,选择最接近激光点的目标
best_target = None
min_distance = float('inf')
for target in valid_targets:
dx = target['center'][0] - laser_point[0]
dy = target['center'][1] - laser_point[1]
distance = np.sqrt(dx*dx + dy*dy)
if distance < min_distance:
min_distance = distance
best_target = target
if best_target:
best_center = best_target['center']
best_radius = best_target['radius']
ellipse_params = best_target['ellipse']
method = "v3_ellipse_red_validated_laser_selected"
best_radius1 = best_radius * 5
else:
# 如果没有激光点,选择面积最大的目标
best_target = max(valid_targets, key=lambda t: t['area'])
best_center = best_target['center']
best_radius = best_target['radius']
ellipse_params = best_target['ellipse']
method = "v3_ellipse_red_validated"
best_radius1 = best_radius * 5
result_img = image.cv2image(img_cv, False, False)
return result_img, best_center, best_radius, method, best_radius1, ellipse_params
def estimate_distance(pixel_radius):
"""根据像素半径估算实际距离(单位:米)"""
if not pixel_radius:
return 0.0
return (config.REAL_RADIUS_CM * config.FOCAL_LENGTH_PIX) / pixel_radius / 100.0
def estimate_pixel(physical_distance_cm, target_distance_m):
"""
根据物理距离和目标距离计算对应的像素偏移
Args:
physical_distance_cm: 物理世界中的距离(厘米),例如激光与摄像头的距离
target_distance_m: 目标距离(米),例如到靶心的距离
Returns:
float: 对应的像素偏移
"""
if not target_distance_m or target_distance_m <= 0:
return 0.0
# 公式:像素偏移 = (物理距离_米) * 焦距_像素 / 目标距离_米
return (physical_distance_cm / 100.0) * config.FOCAL_LENGTH_PIX / target_distance_m
def _save_shot_image_impl(img_cv, center, radius, method, ellipse_params,
laser_point, distance_m, shot_id=None, photo_dir=None):
"""
内部实现:在 img_cv (numpy HWC RGB) 上绘制标注并保存。
由 save_shot_image同步和存图 worker异步调用。
"""
if not config.SAVE_IMAGE_ENABLED:
return None
if photo_dir is None:
photo_dir = config.PHOTO_DIR
try:
try:
if photo_dir not in os.listdir("/root"):
os.mkdir(photo_dir)
except Exception:
pass
x, y = laser_point
if shot_id:
if center is None or radius is None:
filename = f"{photo_dir}/shot_{shot_id}_no_target.bmp"
else:
method_str = method or "unknown"
filename = f"{photo_dir}/shot_{shot_id}_{method_str}.bmp"
else:
try:
all_images = [f for f in os.listdir(photo_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))]
img_count = len(all_images)
except Exception:
img_count = 0
if center is None or radius is None:
method_str = "no_target"
distance_str = "000"
else:
method_str = method or "unknown"
distance_str = str(round((distance_m or 0.0) * 100))
filename = f"{photo_dir}/{method_str}_{int(x)}_{int(y)}_{distance_str}_{img_count:04d}.bmp"
logger = logger_manager.logger
if logger:
if shot_id:
logger.info(f"[VISION] 保存射箭图像ID: {shot_id}, 文件名: {filename}")
if center and radius:
logger.info(f"结果 -> 圆心: {center}, 半径: {radius}, 方法: {method}")
if ellipse_params:
(ec, (ew, eh), ea) = ellipse_params
logger.info(f"椭圆 -> 中心: ({ec[0]:.1f}, {ec[1]:.1f}), 长轴: {max(ew, eh):.1f}, 短轴: {min(ew, eh):.1f}, 角度: {ea:.1f}°")
else:
logger.info(f"结果 -> 未检测到靶心,保存原始图像(激光点: ({x}, {y})")
laser_color = (config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2])
cross_thickness = int(max(getattr(config, "LASER_THICKNESS", 1), 1))
cross_length = int(max(getattr(config, "LASER_LENGTH", 10), 10))
cv2.line(img_cv, (int(x - cross_length), int(y)), (int(x + cross_length), int(y)), laser_color, cross_thickness)
cv2.line(img_cv, (int(x), int(y - cross_length)), (int(x), int(y + cross_length)), laser_color, cross_thickness)
cv2.circle(img_cv, (int(x), int(y)), 1, laser_color, cross_thickness)
ring_thickness = 1
cv2.circle(img_cv, (int(x), int(y)), 10, laser_color, ring_thickness)
cv2.circle(img_cv, (int(x), int(y)), 5, laser_color, ring_thickness)
cv2.circle(img_cv, (int(x), int(y)), 2, laser_color, -1)
if center and radius:
cx, cy = center
if ellipse_params:
(ell_center, (width, height), angle) = ellipse_params
cx_ell, cy_ell = int(ell_center[0]), int(ell_center[1])
cv2.ellipse(img_cv, (cx_ell, cy_ell), (int(width / 2), int(height / 2)), angle, 0, 360, (0, 255, 0), 2)
cv2.circle(img_cv, (cx_ell, cy_ell), 3, (255, 0, 0), -1)
minor_length = min(width, height) / 2
minor_angle = angle + 90 if width >= height else angle
minor_angle_rad = math.radians(minor_angle)
dx_minor = minor_length * math.cos(minor_angle_rad)
dy_minor = minor_length * math.sin(minor_angle_rad)
pt1 = (int(cx_ell - dx_minor), int(cy_ell - dy_minor))
pt2 = (int(cx_ell + dx_minor), int(cy_ell + dy_minor))
cv2.line(img_cv, pt1, pt2, (0, 0, 255), 2)
else:
cv2.circle(img_cv, (cx, cy), radius, (0, 0, 255), 2)
cv2.circle(img_cv, (cx, cy), 2, (0, 0, 255), -1)
cv2.line(img_cv, (int(x), int(y)), (cx, cy), (255, 255, 0), 1)
out = image.cv2image(img_cv, False, False)
out.save(filename)
if logger:
if center and radius:
logger.debug(f"图像已保存(含靶心标注): {filename}")
else:
logger.debug(f"图像已保存(无靶心,含激光十字线): {filename}")
# 清理旧图片如果目录下图片超过100张删除最老的
try:
image_files = []
for f in os.listdir(photo_dir):
if f.endswith(('.bmp', '.jpg', '.jpeg')):
filepath = os.path.join(photo_dir, f)
try:
mtime = os.path.getmtime(filepath)
image_files.append((mtime, filepath, f))
except Exception:
pass
from config import MAX_IMAGES
if len(image_files) > MAX_IMAGES:
image_files.sort(key=lambda x: x[0])
to_delete = len(image_files) - MAX_IMAGES
deleted_count = 0
for _, filepath, fname in image_files[:to_delete]:
try:
os.remove(filepath)
deleted_count += 1
if logger:
logger.debug(f"[VISION] 删除旧图片: {fname}")
except Exception as e:
if logger:
logger.warning(f"[VISION] 删除旧图片失败 {fname}: {e}")
if logger and deleted_count > 0:
logger.info(f"[VISION] 已清理 {deleted_count} 张旧图片,当前剩余 {MAX_IMAGES}")
except Exception as e:
if logger:
logger.warning(f"[VISION] 清理旧图片时出错(可忽略): {e}")
return filename
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"保存图像失败: {e}")
import traceback
logger.error(traceback.format_exc())
return None
def _save_worker_loop():
"""存图 worker从队列取任务并调用 _save_shot_image_impl。"""
while True:
try:
item = _save_queue.get()
if item is None:
break
_save_shot_image_impl(*item)
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[VISION] 存图 worker 异常: {e}")
import traceback
logger.error(traceback.format_exc())
finally:
try:
_save_queue.task_done()
except Exception:
pass
def start_save_shot_worker():
"""启动存图 worker 线程(应在程序初始化时调用一次)。"""
global _save_worker_started
with _save_worker_lock:
if _save_worker_started:
return
_save_worker_started = True
t = threading.Thread(target=_save_worker_loop, daemon=True)
t.start()
logger = logger_manager.logger
if logger:
logger.info("[VISION] 存图 worker 线程已启动")
def enqueue_save_shot(result_img, center, radius, method, ellipse_params,
laser_point, distance_m, shot_id=None, photo_dir=None):
"""
将存图任务放入队列,由 worker 异步保存。主线程传入 result_img 的复制,不阻塞。
"""
if not config.SAVE_IMAGE_ENABLED:
return
if photo_dir is None:
photo_dir = config.PHOTO_DIR
try:
img_cv = image.image2cv(result_img, False, False)
img_copy = np.copy(img_cv)
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[VISION] enqueue_save_shot 复制图像失败: {e}")
return
task = (img_copy, center, radius, method, ellipse_params, laser_point, distance_m, shot_id, photo_dir)
try:
_save_queue.put_nowait(task)
except queue.Full:
logger = logger_manager.logger
if logger:
logger.warning("[VISION] 存图队列已满,跳过本次保存")
def save_shot_image(result_img, center, radius, method, ellipse_params,
laser_point, distance_m, shot_id=None, photo_dir=None):
"""
保存射击图像(带标注)。同步调用,会阻塞。
主流程建议使用 enqueue_save_shot此处保留供校准、测试等场景使用。
"""
if not config.SAVE_IMAGE_ENABLED:
return None
if photo_dir is None:
photo_dir = config.PHOTO_DIR
try:
img_cv = image.image2cv(result_img, False, False)
return _save_shot_image_impl(img_cv, center, radius, method, ellipse_params,
laser_point, distance_m, shot_id, photo_dir)
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[VISION] save_shot_image 转换图像失败: {e}")
return None