From 0ce140a210e9ae2d28ad24d9b495fcd1bb6a2ef2 Mon Sep 17 00:00:00 2001 From: gcw_4spBpAfv Date: Tue, 20 Jan 2026 11:25:17 +0800 Subject: [PATCH] v1.1.5 --- app.yaml | 4 +- camera_manager.py | 138 ++++++ config.py | 54 ++- laser_manager.py | 741 +++++++++++++++++++++++++++-- logger_manager.py | 212 +++++++++ main.py | 190 +++++--- network.py | 1077 ++++++++++++++++++++++++++++++++++++++++++ ota_manager.py | 18 +- shot_id_generator.py | 76 +++ version.py | 6 +- vision.py | 353 ++++++++++++-- 11 files changed, 2713 insertions(+), 156 deletions(-) create mode 100644 camera_manager.py create mode 100644 logger_manager.py create mode 100644 network.py create mode 100644 shot_id_generator.py diff --git a/app.yaml b/app.yaml index 90cac47..923e646 100644 --- a/app.yaml +++ b/app.yaml @@ -1,12 +1,13 @@ id: t11 name: t11 -version: 1.1.1 +version: 1.1.4 author: t11 icon: '' desc: t11 files: - app.yaml - at_client.py + - camera_manager.py - config.py - hardware.py - laser_manager.py @@ -15,6 +16,7 @@ files: - network.py - ota_manager.py - power.py + - shot_id_generator.py - time_sync.py - version.py - vision.py diff --git a/camera_manager.py b/camera_manager.py new file mode 100644 index 0000000..7b77782 --- /dev/null +++ b/camera_manager.py @@ -0,0 +1,138 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +相机管理器模块 +提供相机和显示的统一管理和线程安全访问 +""" +import threading +import config +from logger_manager import logger_manager + + +class CameraManager: + """相机管理器(单例)""" + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super(CameraManager, cls).__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + + # 私有对象 + self._camera = None + self._display = None + + # 线程安全锁 + self._camera_lock = threading.Lock() + self._display_lock = threading.Lock() + + # 相机配置 + self._camera_width = 640 + self._camera_height = 480 + + self._initialized = True + + # ==================== 初始化方法 ==================== + + def init_camera(self, width=640, height=480): + """初始化相机""" + if self._camera is not None: + return self._camera + + from maix import camera + + self._camera_width = width + self._camera_height = height + + with self._camera_lock: + if self._camera is None: + self._camera = camera.Camera(width, height) + logger = logger_manager.logger + if logger: + logger.info(f"[CAMERA] 相机初始化完成: {width}x{height}") + + return self._camera + + def init_display(self): + """初始化显示""" + if self._display is not None: + return self._display + + from maix import display + + with self._display_lock: + if self._display is None: + self._display = display.Display() + logger = logger_manager.logger + if logger: + logger.info("[CAMERA] 显示初始化完成") + + return self._display + + # ==================== 访问方法 ==================== + + @property + def camera(self): + """获取相机实例(懒加载)""" + if self._camera is None: + self.init_camera() + return self._camera + + @property + def display(self): + """获取显示实例(懒加载)""" + if self._display is None: + self.init_display() + return self._display + + # ==================== 业务方法 ==================== + + def read_frame(self): + """ + 线程安全地读取一帧图像 + + Returns: + frame: 图像帧对象 + """ + with self._camera_lock: + if self._camera is None: + self.init_camera() + return self._camera.read() + + def show(self, image): + """ + 线程安全地显示图像 + + Args: + image: 要显示的图像对象 + """ + with self._display_lock: + if self._display is None: + self.init_display() + self._display.show(image) + + def release(self): + """释放相机和显示资源(如果需要)""" + with self._camera_lock: + if self._camera is not None: + # MaixPy 的 Camera 可能不需要显式释放,但可以在这里清理 + self._camera = None + + with self._display_lock: + if self._display is not None: + # MaixPy 的 Display 可能不需要显式释放 + self._display = None + + +# 创建全局单例实例 +camera_manager = CameraManager() + + + + + diff --git a/config.py b/config.py index d47781e..f96782c 100644 --- a/config.py +++ b/config.py @@ -26,14 +26,19 @@ LOG_FILE = "/maixapp/apps/t11/app.log" BACKUP_BASE = "/maixapp/apps/t11/backups" # ==================== 硬件配置 ==================== +# WiFi模块开关(True=有WiFi模块,False=无WiFi模块) +HAS_WIFI_MODULE = True # 根据实际硬件情况设置 + # UART配置 UART4G_DEVICE = "/dev/ttyS2" UART4G_BAUDRATE = 115200 DISTANCE_SERIAL_DEVICE = "/dev/ttyS1" DISTANCE_SERIAL_BAUDRATE = 9600 -# I2C配置 -I2C_BUS_NUM = 1 +# I2C配置(根据WiFi模块开关自动选择) +# 无WiFi模块:I2C_BUS_NUM = 1,引脚:P18(I2C1_SCL), P21(I2C1_SDA) +# 有WiFi模块:I2C_BUS_NUM = 5,引脚:A15(I2C5_SCL), A27(I2C5_SDA) +I2C_BUS_NUM = 5 if HAS_WIFI_MODULE else 1 INA226_ADDR = 0x40 REG_CONFIGURATION = 0x00 REG_BUS_VOLTAGE = 0x02 @@ -52,14 +57,39 @@ LASER_ON_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0]) DISTANCE_QUERY_CMD = bytes([0xAA, MODULE_ADDR, 0x00, 0x20, 0x00, 0x01, 0x00, 0x00, 0x21]) # 激光测距查询命令 DISTANCE_RESPONSE_LEN = 13 # 激光测距响应数据长度(字节) -DEFAULT_LASER_POINT = (640, 480) # 默认激光中心点 +DEFAULT_LASER_POINT = (320, 252) # 默认激光中心点 + +# 硬编码激光点配置 +HARDCODE_LASER_POINT = True # 是否使用硬编码的激光点(True=使用硬编码值,False=使用校准值) +HARDCODE_LASER_POINT_VALUE = (320, 252) # 硬编码的激光点坐标(315, 263) # # 硬编码的激光点坐标 (x, y) + +# 激光点检测配置 +LASER_DETECTION_THRESHOLD = 140 # 红色通道阈值(默认120,可调整,范围建议:100-150) +LASER_RED_RATIO = 1.5 # 红色相对于绿色/蓝色的倍数要求(默认1.5,可调整,范围建议:1.3-2.0) +LASER_SEARCH_RADIUS = 50 # 搜索半径(像素),从图像中心开始搜索(默认20,限制激光点不能偏离中心太远) +LASER_MAX_DISTANCE_FROM_CENTER = 50 # 激光点距离中心的最大允许距离(像素),超过此距离则拒绝(默认20) +LASER_OVEREXPOSED_THRESHOLD = 200 # 过曝红色判断阈值(默认200,接近白色时的阈值) +LASER_OVEREXPOSED_DIFF = 10 # 过曝红色时,r 与 g/b 的最小差值(默认10) +LASER_REQUIRE_IN_ELLIPSE = False # 是否要求激光点必须在黄心椭圆内(True=必须,False=不要求) +LASER_USE_ELLIPSE_FITTING = True # 是否使用椭圆拟合方法查找激光点(True=椭圆拟合更准确,False=最亮点方法) +LASER_MIN_AREA = 5 # 激光点区域的最小面积(像素),小于此值认为是噪声(默认5) +LASER_DRAW_ELLIPSE = True # 是否在图像上绘制激光点的拟合椭圆(True=绘制,False=不绘制) # ==================== 视觉检测配置 ==================== FOCAL_LENGTH_PIX = 2250.0 # 焦距(像素) REAL_RADIUS_CM = 20 # 靶心实际半径(厘米) +# 图像清晰度检测配置 +IMAGE_SHARPNESS_THRESHOLD = 100.0 # 清晰度阈值,低于此值认为图像模糊 + # 清晰图像通常 > 200,模糊图像通常 < 100 + +# 激光与摄像头物理位置配置 +LASER_CAMERA_OFFSET_CM = 1.4 # 激光在摄像头下方的物理距离(厘米),正值表示激光在摄像头下方 +IMAGE_CENTER_X = 320 # 图像中心 X 坐标 +IMAGE_CENTER_Y = 240 # 图像中心 Y 坐标 + # ==================== 显示配置 ==================== -LASER_COLOR = (255, 100, 0) # RGB颜色 +LASER_COLOR = (0, 255, 0) # RGB颜色 LASER_THICKNESS = 1 LASER_LENGTH = 2 @@ -73,7 +103,8 @@ LOG_MAX_BYTES = 10 * 1024 * 1024 # 10MB LOG_BACKUP_COUNT = 5 # ==================== 引脚映射配置 ==================== -PIN_MAPPINGS = { +# 无WiFi模块的引脚映射(I2C1) +PIN_MAPPINGS_NO_WIFI = { "A18": "UART1_RX", "A19": "UART1_TX", "A29": "UART2_RX", @@ -81,3 +112,16 @@ PIN_MAPPINGS = { "P18": "I2C1_SCL", "P21": "I2C1_SDA", } + +# 有WiFi模块的引脚映射(I2C5) +PIN_MAPPINGS_WITH_WIFI = { + "A18": "UART1_RX", + "A19": "UART1_TX", + "A29": "UART2_RX", + "A28": "UART2_TX", + "A15": "I2C5_SCL", + "A27": "I2C5_SDA", +} + +# 根据WiFi模块开关选择引脚映射 +PIN_MAPPINGS = PIN_MAPPINGS_WITH_WIFI if HAS_WIFI_MODULE else PIN_MAPPINGS_NO_WIFI diff --git a/laser_manager.py b/laser_manager.py index 66d5065..bf35459 100644 --- a/laser_manager.py +++ b/laser_manager.py @@ -11,6 +11,7 @@ from maix import time, camera import threading import config from logger_manager import logger_manager +import vision class LaserManager: @@ -33,6 +34,7 @@ class LaserManager: self._calibration_lock = threading.Lock() self._laser_point = None self._laser_turned_on = False + self._last_frame_with_ellipse = None # 保存绘制了椭圆的图像(用于调试/显示) self._initialized = True # ==================== 状态访问(只读属性)==================== @@ -44,13 +46,35 @@ class LaserManager: @property def laser_point(self): - """当前激光点""" + """当前激光点(如果启用硬编码,则返回硬编码值)""" + if config.HARDCODE_LASER_POINT: + return config.HARDCODE_LASER_POINT_VALUE return self._laser_point + def get_last_frame_with_ellipse(self): + """ + 获取最后一次查找激光点时绘制了椭圆的图像(如果启用椭圆绘制) + + Returns: + MaixPy 图像对象,如果没有则返回 None + """ + return self._last_frame_with_ellipse + # ==================== 业务方法 ==================== def load_laser_point(self): - """从配置文件加载激光中心点,失败则使用默认值""" + """从配置文件加载激光中心点,失败则使用默认值 + 如果启用硬编码模式,则直接使用硬编码值 + """ + if config.HARDCODE_LASER_POINT: + # 硬编码模式:直接使用硬编码值 + self._laser_point = config.HARDCODE_LASER_POINT_VALUE + logger = logger_manager.logger + if logger: + logger.info(f"[LASER] 使用硬编码激光点: {self._laser_point}") + return self._laser_point + + # 正常模式:从配置文件加载 try: if "laser_config.json" in os.listdir("/root"): with open(config.CONFIG_FILE, "r") as f: @@ -71,7 +95,18 @@ class LaserManager: return self._laser_point def save_laser_point(self, point): - """保存激光中心点到配置文件""" + """保存激光中心点到配置文件 + 如果启用硬编码模式,则不保存(直接返回 True) + """ + if config.HARDCODE_LASER_POINT: + # 硬编码模式:不保存到文件,但更新内存中的值(虽然不会被使用) + self._laser_point = point + logger = logger_manager.logger + if logger: + logger.info(f"[LASER] 硬编码模式已启用,跳过保存激光点: {point}") + return True + + # 正常模式:保存到配置文件 try: with open(config.CONFIG_FILE, "w") as f: json.dump([point[0], point[1]], f) @@ -112,9 +147,7 @@ class LaserManager: # TODO: 暂时去掉这个等待 # 读取回包 - print("before read:", time.ticks_ms()) resp = hardware_manager.distance_serial.read(len=20,timeout=10) - print("after read:", time.ticks_ms()) if resp: if logger: logger.info(f"[LASER] 收到回包 ({len(resp)}字节): {resp.hex()}") @@ -256,19 +289,383 @@ class LaserManager: # best_pos = (x, y) # print("best_pos:", best_pos) # return best_pos - def find_red_laser(self, frame, threshold=150, search_radius=150): + def _is_point_in_ellipse(self, point, ellipse_params): """ - 在图像中心附近查找最亮的红色激光点(基于 RGB 阈值) - 使用两阶段搜索:先粗搜索找到候选区域,再精细搜索找到最亮点 + 判断点是否在椭圆内 + + Args: + point: 点坐标 (x, y) + ellipse_params: 椭圆参数 ((center_x, center_y), (width, height), angle) + + Returns: + bool: 如果点在椭圆内返回 True,否则返回 False + """ + if ellipse_params is None: + return True # 如果没有椭圆参数,不进行限制 + + import math + + (cx, cy), (width, height), angle = ellipse_params + px, py = point + + # 椭圆半长轴和半短轴 + a = width / 2.0 # 半长轴 + b = height / 2.0 # 半短轴 + + # 将点坐标平移到椭圆中心 + dx = px - cx + dy = py - cy + + # 旋转坐标系,使椭圆的长轴与x轴对齐 + # angle 是度,需要转换为弧度 + angle_rad = math.radians(angle) + cos_a = math.cos(angle_rad) + sin_a = math.sin(angle_rad) + + # 旋转后的坐标 + x_rot = dx * cos_a + dy * sin_a + y_rot = -dx * sin_a + dy * cos_a + + # 检查点是否在椭圆内:((x_rot/a)^2 + (y_rot/b)^2) <= 1 + ellipse_value = (x_rot / a) ** 2 + (y_rot / b) ** 2 + + return ellipse_value <= 1.0 + + def find_red_laser_with_ellipse(self, frame, threshold=None, search_radius=None, ellipse_params=None): + """ + 使用椭圆拟合方法查找激光点中心(更准确) + 先找到所有红色像素,然后拟合椭圆找到中心 Args: frame: 图像帧 - threshold: 红色通道阈值(默认150) - search_radius: 搜索半径(像素),从图像中心开始搜索(默认150) + threshold: 红色通道阈值(如果为None,使用config.LASER_DETECTION_THRESHOLD) + search_radius: 搜索半径(像素),从图像中心开始搜索(如果为None,使用config.LASER_SEARCH_RADIUS) + ellipse_params: 椭圆参数 ((center_x, center_y), (width, height), angle),用于限制激光点必须在椭圆内 Returns: (x, y) 坐标,如果未找到则返回 None + 注意:如果启用 LASER_DRAW_ELLIPSE,会在原始 frame 上绘制椭圆(会修改输入图像) """ + import cv2 + import numpy as np + from maix import image + logger_manager.logger.debug(f"find_red_laser_with_ellipse start: {time.ticks_ms()}") + # 使用配置项 + if threshold is None: + threshold = config.LASER_DETECTION_THRESHOLD + if search_radius is None: + search_radius = config.LASER_SEARCH_RADIUS + + red_ratio = config.LASER_RED_RATIO + overexposed_threshold = config.LASER_OVEREXPOSED_THRESHOLD + overexposed_diff = config.LASER_OVEREXPOSED_DIFF + + logger = logger_manager.logger + w, h = frame.width(), frame.height() + center_x, center_y = w // 2, h // 2 + + # 转换为 OpenCV 格式 + img_cv = image.image2cv(frame, False, False) + + # 只在中心区域搜索 + x_min = max(0, center_x - search_radius) + x_max = min(w, center_x + search_radius) + y_min = max(0, center_y - search_radius) + y_max = min(h, center_y + search_radius) + + # 提取ROI区域(只处理搜索区域,而不是整个图像) + roi = img_cv[y_min:y_max, x_min:x_max] + if roi.size == 0: + if logger: + logger.debug("[LASER] ROI区域为空") + return None + + # 分离RGB通道(向量化操作,比循环快得多) + r_channel = roi[:, :, 0].astype(np.int32) # 转换为int32避免溢出 + g_channel = roi[:, :, 1].astype(np.int32) + b_channel = roi[:, :, 2].astype(np.int32) + + # 情况1:正常红色判断(向量化) + # r > threshold and r > g * red_ratio and r > b * red_ratio + mask_red = (r_channel > threshold) & \ + (r_channel > (g_channel * red_ratio)) & \ + (r_channel > (b_channel * red_ratio)) + + # 情况2:过曝的红色判断(向量化) + # r > overexposed_threshold and g > overexposed_threshold and b > overexposed_threshold + # and r >= g and r >= b and (r - g) > overexposed_diff and (r - b) > overexposed_diff + mask_overexposed = (r_channel > overexposed_threshold) & \ + (g_channel > overexposed_threshold) & \ + (b_channel > overexposed_threshold) & \ + (r_channel >= g_channel) & \ + (r_channel >= b_channel) & \ + ((r_channel - g_channel) > overexposed_diff) & \ + ((r_channel - b_channel) > overexposed_diff) + + # 合并两种情况的掩码 + mask_combined = mask_red | mask_overexposed + + # 转换为uint8格式 + mask_roi = mask_combined.astype(np.uint8) * 255 + + logger_manager.logger.debug(f"ellipse fitting start: {time.ticks_ms()}") + # 查找轮廓(只在搜索区域内) + contours, _ = cv2.findContours(mask_roi, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + logger_manager.logger.debug(f"ellipse fitting end: {time.ticks_ms()}") + if not contours: + if logger: + logger.debug("[LASER] 未找到红色像素区域") + return None + logger_manager.logger.debug(f"ellipse filtering start: {time.ticks_ms()}") + # 找到最大的轮廓(应该是激光点) + largest_contour = max(contours, key=cv2.contourArea) + + # 检查轮廓面积(太小可能是噪声) + area = cv2.contourArea(largest_contour) + min_area = config.LASER_MIN_AREA + if area < min_area: + if logger: + logger.debug(f"[LASER] 红色区域太小(面积={area:.1f}),可能是噪声(最小={min_area})") + return None + + # 使用椭圆拟合找到中心 + laser_center = None + outer_ellipse_params = None # 外层红色椭圆参数 + inner_ellipse_params = None # 内层亮度椭圆参数 + + if len(largest_contour) >= 5: + # 椭圆拟合需要至少5个点 + # 注意:需要将轮廓坐标转换回全图坐标 + contour_global = largest_contour.copy() + for i in range(len(contour_global)): + contour_global[i][0][0] += x_min + contour_global[i][0][1] += y_min + + try: + # 第一步:拟合外层红色椭圆 + (x_outer, y_outer), (width_outer, height_outer), angle_outer = cv2.fitEllipse(contour_global) + outer_ellipse_params = ((x_outer, y_outer), (width_outer, height_outer), angle_outer) + + if logger: + logger.debug(f"[LASER] 外层红色椭圆拟合成功: 中心=({x_outer:.1f}, {y_outer:.1f}), 尺寸=({width_outer:.1f}, {height_outer:.1f}), 角度={angle_outer:.1f}°, 面积={area:.1f}") + + # 第二步:在外层椭圆区域内,找亮度最高的像素 + # 创建外层椭圆的掩码 + outer_ellipse_mask = np.zeros((h, w), dtype=np.uint8) + cv2.ellipse(outer_ellipse_mask, + (int(x_outer), int(y_outer)), + (int(width_outer/2), int(height_outer/2)), + angle_outer, + 0, 360, + 255, -1) # 填充椭圆区域 + + # 在外层椭圆区域内,计算每个像素的亮度(RGB总和) + brightness = (img_cv[:, :, 0].astype(np.int32) + + img_cv[:, :, 1].astype(np.int32) + + img_cv[:, :, 2].astype(np.int32)) + + # 只考虑外层椭圆区域内的像素 + brightness_masked = np.where(outer_ellipse_mask > 0, brightness, 0) + + # 找到亮度阈值(使用区域内亮度的较高百分位,比如80%) + brightness_values = brightness_masked[brightness_masked > 0] + if len(brightness_values) > 0: + brightness_threshold = np.percentile(brightness_values, 90) # 取90%分位数 + + # 创建亮度掩码(只保留高亮度像素) + brightness_mask = (brightness_masked >= brightness_threshold).astype(np.uint8) * 255 + + # 查找亮度区域的轮廓 + brightness_contours, _ = cv2.findContours(brightness_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + # 在 brightness_contours 处理部分,添加 else 分支处理 brightness_area < 1 的情况 + if brightness_contours: + # 找到最大的亮度轮廓 + largest_brightness_contour = max(brightness_contours, key=cv2.contourArea) + brightness_area = cv2.contourArea(largest_brightness_contour) + + if brightness_area >= 3 and len(largest_brightness_contour) >= 5: + # 第三步:拟合内层亮度椭圆 + try: + (x_inner, y_inner), (width_inner, height_inner), angle_inner = cv2.fitEllipse(largest_brightness_contour) + inner_ellipse_params = ((x_inner, y_inner), (width_inner, height_inner), angle_inner) + laser_center = (float(x_inner), float(y_inner)) + logger.debug(f"[LASER] 内层亮度椭圆拟合成功: 中心=({x_inner:.1f}, {y_inner:.1f}), 尺寸=({width_inner:.1f}, {height_inner:.1f}), 角度={angle_inner:.1f}°, 面积={brightness_area:.1f}") + except Exception as e: + # 内层椭圆拟合失败,使用质心 + M = cv2.moments(largest_brightness_contour) + if M["m00"] != 0: + cx = M["m10"] / M["m00"] + cy = M["m01"] / M["m00"] + laser_center = (float(cx), float(cy)) + logger.debug(f"[LASER] 内层亮度椭圆拟合失败,使用质心: {laser_center}, 错误: {e}") + else: + # 质心计算失败,使用外层椭圆中心 + laser_center = (float(x_outer), float(y_outer)) + if logger: + logger.debug(f"[LASER] 内层区域质心计算失败,使用外层椭圆中心: {laser_center}") + elif brightness_area >= 1: + # 面积太小,使用质心 + M = cv2.moments(largest_brightness_contour) + if M["m00"] != 0: + cx = M["m10"] / M["m00"] + cy = M["m01"] / M["m00"] + laser_center = (float(cx), float(cy)) + logger.debug(f"[LASER] 内层区域质心计算成功: {laser_center}") + else: + # 质心计算失败,使用外层椭圆中心 + laser_center = (float(x_outer), float(y_outer)) + if logger: + logger.debug(f"[LASER] 内层区域质心计算失败,使用外层椭圆中心: {laser_center}") + else: + # brightness_area < 1,面积太小,直接使用外层椭圆中心 + laser_center = (float(x_outer), float(y_outer)) + if logger: + logger.debug(f"[LASER] 内层亮度区域面积太小({brightness_area:.1f}),使用外层椭圆中心: {laser_center}") + else: + # 没有找到亮度轮廓,使用外层椭圆中心 + laser_center = (float(x_outer), float(y_outer)) + logger.debug(f"[LASER] 外层椭圆区域内无有效亮度值,使用外层椭圆中心: {laser_center}") + else: + # 没有亮度值,使用外层椭圆中心 + laser_center = (float(x_outer), float(y_outer)) + if logger: + logger.debug(f"[LASER] 外层椭圆区域内无有效亮度值,使用外层椭圆中心: {laser_center}") + + # 如果启用绘制椭圆,在图像上绘制 + if config.LASER_DRAW_ELLIPSE: + import math + # 绘制外层红色椭圆(绿色) + cx_outer, cy_outer = int(x_outer), int(y_outer) + cv2.ellipse(img_cv, + (cx_outer, cy_outer), + (int(width_outer/2), int(height_outer/2)), + angle_outer, + 0, 360, + (0, 255, 0), # 绿色 (RGB格式) + 2) + + # 如果找到内层椭圆,绘制内层亮度椭圆(黄色)和中心点(红色) + if inner_ellipse_params is not None: + (x_inner, y_inner), (width_inner, height_inner), angle_inner = inner_ellipse_params + cx_inner, cy_inner = int(x_inner), int(y_inner) + # 绘制内层椭圆(黄色) + cv2.ellipse(img_cv, + (cx_inner, cy_inner), + (int(width_inner/2), int(height_inner/2)), + angle_inner, + 0, 360, + (255, 255, 0), # 黄色 (RGB格式) + 2) + # 绘制内层椭圆中心点(红色,较大) + cv2.circle(img_cv, (cx_inner, cy_inner), 5, (255, 0, 0), -1) + else: + # 只绘制外层椭圆中心点(红色) + cv2.circle(img_cv, (cx_outer, cy_outer), 3, (255, 0, 0), -1) + + + # 将绘制后的图像转换回 MaixPy 格式并保存到实例变量 + from maix import image + self._last_frame_with_ellipse = image.cv2image(img_cv, False, False) + + if logger: + if inner_ellipse_params: + logger.debug(f"[LASER] 已绘制双层椭圆: 外层(绿色)中心=({cx_outer}, {cy_outer}), 内层(黄色)中心=({cx_inner}, {cy_inner})") + else: + logger.debug(f"[LASER] 已绘制外层椭圆: 中心=({cx_outer}, {cy_outer})") + except Exception as e: + laser_ellipse_params = None + # 椭圆拟合失败,使用质心 + M = cv2.moments(contour_global) + if M["m00"] != 0: + cx = M["m10"] / M["m00"] + cy = M["m01"] / M["m00"] + laser_center = (float(cx), float(cy)) + logger.debug(f"[LASER] 椭圆拟合失败,使用质心: {laser_center}, 错误: {e}") + logger_manager.logger.debug(f"ellipse filtering start: {time.ticks_ms()}") + else: + # 点太少,使用质心 + contour_global = largest_contour.copy() + for i in range(len(contour_global)): + contour_global[i][0][0] += x_min + contour_global[i][0][1] += y_min + + M = cv2.moments(contour_global) + if M["m00"] != 0: + cx = M["m10"] / M["m00"] + cy = M["m01"] / M["m00"] + laser_center = (float(cx), float(cy)) + if logger: + logger.debug(f"[LASER] 点太少({len(largest_contour)}个),使用质心: {laser_center}") + + if laser_center is None: + # 清除之前保存的椭圆图像 + self._last_frame_with_ellipse = None + return None + + # 检查距离中心是否太远 + final_x, final_y = laser_center + dx_final = final_x - center_x + dy_final = final_y - center_y + distance_from_center_final = (dx_final * dx_final + dy_final * dy_final) ** 0.5 + + max_distance = config.LASER_MAX_DISTANCE_FROM_CENTER + if distance_from_center_final > max_distance: + if logger: + logger.warning(f"[LASER] 激光点距离中心太远: 位置={laser_center}, " + f"距离中心={distance_from_center_final:.1f}像素, " + f"最大允许距离={max_distance}像素") + return None + + # 检查是否在黄心椭圆范围内(如果启用) + if config.LASER_REQUIRE_IN_ELLIPSE and ellipse_params is not None: + if not self._is_point_in_ellipse(laser_center, ellipse_params): + if logger: + (ell_center, (ell_width, ell_height), ell_angle) = ellipse_params + logger.warning(f"[LASER] 激光点不在黄心椭圆内: 位置={laser_center}, " + f"椭圆中心={ell_center}, 椭圆尺寸=({ell_width:.1f}, {ell_height:.1f})") + return None + + if logger: + ellipse_info = "" + if config.LASER_REQUIRE_IN_ELLIPSE and ellipse_params is not None: + ellipse_info = f", 椭圆内检查: 通过" + elif not config.LASER_REQUIRE_IN_ELLIPSE: + ellipse_info = f", 椭圆检查: 已禁用" + logger.debug(f"[LASER] 找到激光点(椭圆拟合): 位置={laser_center}, " + f"距离中心={distance_from_center_final:.1f}像素{ellipse_info}") + if config.LASER_DRAW_ELLIPSE and self._last_frame_with_ellipse is not None: + logger.debug(f"[LASER] 已保存绘制了椭圆的图像,可通过 get_last_frame_with_ellipse() 获取") + + return laser_center + + def _find_red_laser_brightest(self, frame, threshold=None, search_radius=None, ellipse_params=None): + """ + 在图像中心附近查找最亮的红色激光点(基于 RGB 阈值) + 使用两阶段搜索:先粗搜索找到候选区域,再精细搜索找到最亮点 + 如果启用 LASER_REQUIRE_IN_ELLIPSE,只有激光点落在黄心椭圆范围内才算有效 + + Args: + frame: 图像帧 + threshold: 红色通道阈值(如果为None,使用config.LASER_DETECTION_THRESHOLD) + search_radius: 搜索半径(像素),从图像中心开始搜索(如果为None,使用config.LASER_SEARCH_RADIUS) + ellipse_params: 椭圆参数 ((center_x, center_y), (width, height), angle),用于限制激光点必须在椭圆内 + 如果 config.LASER_REQUIRE_IN_ELLIPSE 为 False,则忽略此参数 + + Returns: + (x, y) 坐标,如果未找到或不在椭圆内(如果启用检查)则返回 None + """ + # 使用配置项,如果参数未提供则使用默认配置 + if threshold is None: + threshold = config.LASER_DETECTION_THRESHOLD + if search_radius is None: + search_radius = config.LASER_SEARCH_RADIUS + + red_ratio = config.LASER_RED_RATIO + overexposed_threshold = config.LASER_OVEREXPOSED_THRESHOLD + overexposed_diff = config.LASER_OVEREXPOSED_DIFF + + logger = logger_manager.logger w, h = frame.width(), frame.height() center_x, center_y = w // 2, h // 2 @@ -282,6 +679,11 @@ class LaserManager: max_score = 0 candidate_pos = None + # 用于调试:记录最接近但未满足条件的点 + best_near_red = None + best_near_red_score = 0 + best_near_red_rgb = None + # 第一阶段:粗搜索(每2像素采样),找到候选点 for y in range(y_min, y_max, 2): for x in range(x_min, x_max, 2): @@ -292,13 +694,13 @@ class LaserManager: is_red = False is_overexposed_red = False - # 情况1:正常红色(r 明显大于 g 和 b) - if r > threshold and r > g * 2 and r > b * 2: + # 情况1:正常红色(使用配置的倍数要求) + if r > threshold and r > g * red_ratio and r > b * red_ratio: is_red = True # 情况2:过曝的红色(发白,r, g, b 都接近255,但 r 仍然最大) - elif r > 200 and g > 200 and b > 200: # 接近白色 - if r >= g and r >= b and (r - g) > 10 and (r - b) > 10: # r 仍然明显最大 + elif r > overexposed_threshold and g > overexposed_threshold and b > overexposed_threshold: + if r >= g and r >= b and (r - g) > overexposed_diff and (r - b) > overexposed_diff: is_overexposed_red = True if is_red or is_overexposed_red: @@ -315,9 +717,28 @@ class LaserManager: if score > max_score: max_score = score candidate_pos = (x, y) + else: + # 记录最接近但未满足条件的点(用于调试) + if r > threshold * 0.8: # 至少接近阈值 + rgb_sum = r + g + b + # 计算接近度分数 + ratio_score = min(r / (g + 1), r / (b + 1)) # 避免除零 + near_score = rgb_sum * ratio_score + if near_score > best_near_red_score: + best_near_red_score = near_score + best_near_red = (x, y) + best_near_red_rgb = (r, g, b) - # 如果没有找到候选点,直接返回 + # 如果没有找到候选点,输出调试信息 if candidate_pos is None: + if logger: + if best_near_red: + logger.debug(f"[LASER] 未找到激光点,最接近的点: 位置={best_near_red}, RGB={best_near_red_rgb}, " + f"阈值={threshold}, 倍数要求={red_ratio}, r/g={best_near_red_rgb[0]/(best_near_red_rgb[1]+1):.2f}, " + f"r/b={best_near_red_rgb[0]/(best_near_red_rgb[2]+1):.2f}") + else: + logger.debug(f"[LASER] 未找到激光点,搜索区域: ({x_min}, {y_min}) 到 ({x_max}, {y_max}), " + f"阈值={threshold}, 倍数要求={red_ratio}") return None # 第二阶段:在候选点周围进行精细搜索(1像素间隔) @@ -332,6 +753,7 @@ class LaserManager: max_brightness = 0 best_pos = candidate_pos + best_rgb = None # 精细搜索:1像素间隔,只考虑亮度(RGB总和) for y in range(y_min_fine, y_max_fine, 1): @@ -339,14 +761,14 @@ class LaserManager: idx = (y * w + x) * 3 r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2] - # 判断是否为红色或过曝的红色 + # 判断是否为红色或过曝的红色(使用配置的倍数要求) is_red = False is_overexposed_red = False - if r > threshold and r > g * 2 and r > b * 2: + if r > threshold and r > g * red_ratio and r > b * red_ratio: is_red = True - elif r > 200 and g > 200 and b > 200: - if r >= g and r >= b and (r - g) > 10 and (r - b) > 10: + elif r > overexposed_threshold and g > overexposed_threshold and b > overexposed_threshold: + if r >= g and r >= b and (r - g) > overexposed_diff and (r - b) > overexposed_diff: is_overexposed_red = True if is_red or is_overexposed_red: @@ -355,18 +777,204 @@ class LaserManager: if rgb_sum > max_brightness: max_brightness = rgb_sum best_pos = (x, y) + best_rgb = (r, g, b) + + # 检查找到的激光点是否满足条件 + if best_pos: + final_x, final_y = best_pos + dx_final = final_x - center_x + dy_final = final_y - center_y + distance_from_center_final = (dx_final * dx_final + dy_final * dy_final) ** 0.5 + + # 检查1:距离中心是否太远 + max_distance = config.LASER_MAX_DISTANCE_FROM_CENTER + if distance_from_center_final > max_distance: + # 距离中心太远,拒绝这个结果 + if logger: + logger.warning(f"[LASER] 找到的激光点距离中心太远: 位置={best_pos}, " + f"距离中心={distance_from_center_final:.1f}像素, " + f"最大允许距离={max_distance}像素, 拒绝此结果") + return None + + # 检查2:是否在黄心椭圆范围内(仅在启用时检查) + if config.LASER_REQUIRE_IN_ELLIPSE and ellipse_params is not None: + if not self._is_point_in_ellipse(best_pos, ellipse_params): + # 不在椭圆内,拒绝这个结果 + if logger: + (ell_center, (ell_width, ell_height), ell_angle) = ellipse_params + logger.warning(f"[LASER] 找到的激光点不在黄心椭圆内: 位置={best_pos}, " + f"椭圆中心={ell_center}, 椭圆尺寸=({ell_width:.1f}, {ell_height:.1f}), " + f"椭圆角度={ell_angle:.1f}°, 拒绝此结果") + return None + + # 输出成功找到激光点的日志 + if logger: + ellipse_info = "" + if config.LASER_REQUIRE_IN_ELLIPSE and ellipse_params is not None: + ellipse_info = f", 椭圆内检查: 通过" + elif not config.LASER_REQUIRE_IN_ELLIPSE: + ellipse_info = f", 椭圆检查: 已禁用" + logger.debug(f"[LASER] 找到激光点: 位置={best_pos}, RGB={best_rgb}, " + f"亮度={max_brightness}, 距离中心={distance_from_center_final:.1f}像素{ellipse_info}, " + f"阈值={threshold}, 倍数要求={red_ratio}") return best_pos - def calibrate_laser_position(self): - """执行一次激光校准:拍照 → 找红点 → 保存坐标""" - time.sleep_ms(80) - cam = camera.Camera(640, 480) - frame = cam.read() - pos = self.find_red_laser(frame) - if pos: - self.save_laser_point(pos) - return pos + def find_red_laser(self, frame, threshold=None, search_radius=None, ellipse_params=None): + """ + 查找激光点(支持两种方法:椭圆拟合或最亮点) + 根据 config.LASER_USE_ELLIPSE_FITTING 配置选择使用哪种方法 + + Args: + frame: 图像帧 + threshold: 红色通道阈值(如果为None,使用config.LASER_DETECTION_THRESHOLD) + search_radius: 搜索半径(像素),从图像中心开始搜索(如果为None,使用config.LASER_SEARCH_RADIUS) + ellipse_params: 椭圆参数 ((center_x, center_y), (width, height), angle),用于限制激光点必须在椭圆内 + + Returns: + (x, y) 坐标,如果未找到则返回 None + """ + if config.LASER_USE_ELLIPSE_FITTING: + return self.find_red_laser_with_ellipse(frame, threshold, search_radius, ellipse_params) + else: + # 使用原来的最亮点方法 + return self._find_red_laser_brightest(frame, threshold, search_radius, ellipse_params) + + def calibrate_laser_position(self, timeout_ms=8000, check_sharpness=True): + """ + 执行激光校准:循环拍照 → 检测靶心 → 检查激光点清晰度 → 找红点 → 保存坐标 + 只有检测到靶心时才读取激光点 + + Args: + timeout_ms: 超时时间(毫秒),默认8000ms + check_sharpness: 是否检查激光点清晰度,默认True + + Returns: + (x, y) 坐标,如果超时或失败则返回 None + """ + from camera_manager import camera_manager + # from vision import check_laser_point_sharpness, save_calibration_image, detect_circle_v3 + import vision + from maix import time + + logger = logger_manager.logger + start = time.ticks_ms() + + # 注意:使用 abs(time.ticks_diff(start, time.ticks_ms())) 避免负数问题 + while self._calibration_active and abs(time.ticks_diff(start, time.ticks_ms())) < timeout_ms: + try: + # 使用全局 camera_manager,线程安全读取 + frame = camera_manager.read_frame() + + # 先检测靶心(仅在需要椭圆检查时) + ellipse_params_temp = None + center_temp = None + radius_temp = None + if config.LASER_REQUIRE_IN_ELLIPSE: + result_img_temp, center_temp, radius_temp, method_temp, best_radius1_temp, ellipse_params_temp = vision.detect_circle_v3(frame, None) + + # 只有检测到靶心时才继续处理激光点 + if center_temp is None or radius_temp is None: + if logger: + logger.debug(f"[LASER] 未检测到靶心,跳过") + time.sleep_ms(60) + continue + + # 检测到靶心,继续处理激光点 + # 检查激光点清晰度(可选) + sharpness_score = None # 初始化清晰度分数 + if check_sharpness: + try: + # 使用 check_laser_point_sharpness 检测激光点清晰度 + # 该函数会自动查找激光点并检测其清晰度 + # 仅在启用椭圆检查时传入椭圆参数 + is_sharp, sharpness_score, laser_pos = vision.check_laser_point_sharpness( + frame, + laser_point=None, # 自动查找激光点 + roi_size=30, + threshold=config.IMAGE_SHARPNESS_THRESHOLD, + ellipse_params=ellipse_params_temp if config.LASER_REQUIRE_IN_ELLIPSE else None + ) + + if laser_pos is None: + # 未找到激光点 + if logger: + logger.debug(f"[LASER] 未找到激光点,跳过") + time.sleep_ms(60) + continue + + if not is_sharp: + # 激光点模糊 + if logger: + logger.debug(f"[LASER] 激光点模糊(清晰度: {sharpness_score:.2f}),跳过") + time.sleep_ms(60) + continue + + # 激光点清晰,使用找到的激光点位置 + pos = laser_pos + + except Exception as e: + if logger: + logger.warning(f"[LASER] 激光点清晰度检测失败: {e},继续处理") + # 检测失败时,回退到原来的方法:直接查找激光点 + # 仅在启用椭圆检查时传入椭圆参数 + pos = self.find_red_laser(frame, ellipse_params=ellipse_params_temp if config.LASER_REQUIRE_IN_ELLIPSE else None) + if pos is None: + time.sleep_ms(60) + continue + else: + # 不检查清晰度,直接查找激光点 + # 仅在启用椭圆检查时传入椭圆参数 + pos = self.find_red_laser(frame, ellipse_params=ellipse_params_temp if config.LASER_REQUIRE_IN_ELLIPSE else None) + if pos is None: + time.sleep_ms(60) + continue + + # 找到清晰的激光点,保存校准图像 + if pos: + # 保存校准图像(带标注) + try: + # 如果使用椭圆拟合且启用了椭圆绘制,使用绘制了椭圆的图像 + frame_to_save = frame + if config.LASER_USE_ELLIPSE_FITTING and config.LASER_DRAW_ELLIPSE: + frame_with_ellipse = self.get_last_frame_with_ellipse() + if frame_with_ellipse is not None: + frame_to_save = frame_with_ellipse + + vision.save_calibration_image(frame_to_save, pos) + except Exception as e: + if logger: + logger.error(f"[LASER] 保存校准图像失败: {e}") + + # 设置结果、停止校准、保存坐标 + self.set_calibration_result(pos) + self.stop_calibration() + self.save_laser_point(pos) + + if logger: + if sharpness_score is not None: + logger.info(f"✅ 校准成功: {pos} (清晰度: {sharpness_score:.2f}, 靶心: {center_temp}, 半径: {radius_temp})") + else: + logger.info(f"✅ 校准成功: {pos} (靶心: {center_temp}, 半径: {radius_temp})") + return pos + + # 未找到激光点,继续循环 + time.sleep_ms(60) + + except Exception as e: + if logger: + logger.error(f"[LASER] 校准过程异常: {e}") + import traceback + logger.error(traceback.format_exc()) + time.sleep_ms(200) + + # 超时或校准被停止 + if logger: + if self._calibration_active: + logger.warning(f"[LASER] 校准超时({timeout_ms}ms)") + else: + logger.info("[LASER] 校准已停止") + return None def start_calibration(self): @@ -492,6 +1100,83 @@ class LaserManager: if logger: logger.error(f"[LASER] 读取激光测距失败: {e}") return (0.0, 0) + + def calculate_laser_point_from_distance(self, distance_m): + """ + 根据目标距离动态计算激光点在图像中的坐标 + 激光在摄像头下方,所以需要将图像中心的 y 值加上偏移 + + Args: + distance_m: 目标距离(米),例如到靶心的距离 + + Returns: + (x, y): 激光点在图像中的坐标 + """ + # from vision import estimate_pixel + + # 图像中心坐标 + center_x = config.IMAGE_CENTER_X + center_y = config.IMAGE_CENTER_Y + + # 计算激光在摄像头下方的像素偏移(y 方向) + # 激光在摄像头下方,所以 y 值要增加(向下为正) + pixel_offset_y = estimate_pixel(config.LASER_CAMERA_OFFSET_CM, distance_m) + + # 激光点坐标:x 保持中心,y 加上偏移, + laser_x = center_x + laser_y = center_y + int(pixel_offset_y) + + logger = logger_manager.logger + if logger: + logger.debug(f"[LASER] 根据距离 {distance_m:.2f}m 计算激光点: ({laser_x}, {laser_y}), 像素偏移: {pixel_offset_y:.2f}") + + return (laser_x, laser_y) + + def has_calibrated_point(self): + """检查是否真正校准过(配置文件存在且不是默认值)""" + if config.HARDCODE_LASER_POINT: + return False # 硬编码模式下不算校准 + + # 检查配置文件是否存在 + if "laser_config.json" not in os.listdir("/root"): + return False + + # 检查当前值是否是默认值 + if self._laser_point == config.DEFAULT_LASER_POINT: + return False + + return self._laser_point is not None + + def compute_laser_position(self, circle_center, laser_point, radius, method): + """计算激光相对于靶心的偏移量(单位:厘米) + + Args: + circle_center: 靶心中心坐标 (x, y) + laser_point: 激光点坐标 (x, y) + radius: 靶心半径(像素) + method: 检测方法("模糊" 或其他) + + Returns: + (dx, dy): 激光相对于靶心的偏移量(厘米),如果输入无效则返回 (None, None) + """ + if not all([circle_center, radius, method]): + return None, None + + cx, cy = circle_center + lx, ly = laser_point + # r = 22.16 * 5 + r = radius * 5 + logger_manager.logger.debug(f"compute_laser_position: circle_center: {circle_center} laser_point: {laser_point} radius: {radius} method: {method} r: {r}") + target_x = (lx-cx)/r*100 + target_y = (ly-cy)/r*100 + logger_manager.logger.info(f"lx:{lx} ly: {ly} cx: {cx} cy: {cy} result_x: {target_x} result_y: {-target_y} real_r_x: {lx-cx} real_r_y: {-1*(ly-cy)}") + return (target_x, -target_y) + + # # 根据检测方法动态调整靶心物理半径(简化模型) + # circle_r = (radius / 4.0) * 20.0 if method == "模糊" else (68 / 16.0) * 20.0 + # dx = lx - cx + # dy = ly - cy + # return dx / (circle_r / 100.0), -dy / (circle_r / 100.0) def quick_measure_distance(self): """ diff --git a/logger_manager.py b/logger_manager.py new file mode 100644 index 0000000..59ec3e4 --- /dev/null +++ b/logger_manager.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +日志管理器模块 +提供异步日志功能(使用 QueueHandler + QueueListener) +""" +import logging +from logging.handlers import QueueHandler, QueueListener, RotatingFileHandler +import queue +import os +import config +from version import VERSION + + +class LoggerManager: + """日志管理器(单例)""" + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super(LoggerManager, cls).__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + + # 私有状态 + self._log_queue = None + self._queue_listener = None + self._logger = None + + self._initialized = True + + # ==================== 状态访问(只读属性)==================== + + @property + def logger(self): + """获取logger对象(只读)""" + return self._logger + + @property + def log_queue(self): + """获取日志队列(只读)""" + return self._log_queue + + # ==================== 业务方法 ==================== + + def init_logging(self, log_level=logging.INFO, log_file=None, max_bytes=None, backup_count=None): + """ + 初始化异步日志系统(使用 QueueHandler + QueueListener) + + Args: + log_level: 日志级别,默认 INFO + log_file: 日志文件路径,默认使用 config.LOG_FILE + max_bytes: 单个日志文件最大大小(字节),默认使用 config.LOG_MAX_BYTES + backup_count: 保留的备份文件数量,默认使用 config.LOG_BACKUP_COUNT + """ + if log_file is None: + log_file = config.LOG_FILE + if max_bytes is None: + max_bytes = config.LOG_MAX_BYTES + if backup_count is None: + backup_count = config.LOG_BACKUP_COUNT + + try: + # 创建日志队列(无界队列) + self._log_queue = queue.Queue(-1) + + # 确保日志文件所在的目录存在 + log_dir = os.path.dirname(log_file) + if log_dir: # 如果日志路径包含目录 + try: + os.makedirs(log_dir, exist_ok=True) + except Exception as e: + print(f"[WARN] 无法创建日志目录 {log_dir}: {e}") + + # 尝试创建文件Handler(带日志轮转) + try: + file_handler = RotatingFileHandler( + log_file, + maxBytes=max_bytes, + backupCount=backup_count, + encoding='utf-8', + mode='a' # 追加模式,确保不覆盖 + ) + except Exception as e: + # 如果RotatingFileHandler不可用,降级为普通FileHandler + print(f"[WARN] RotatingFileHandler不可用,使用普通FileHandler: {e}") + try: + file_handler = logging.FileHandler(log_file, encoding='utf-8', mode='a') + except Exception as e2: + # 如果文件Handler创建失败,只使用控制台Handler + print(f"[WARN] 无法创建文件Handler,仅使用控制台输出: {e2}") + file_handler = None + + # 自定义Formatter,包含版本信息 + class CustomFormatter(logging.Formatter): + """自定义日志格式,包含版本信息和行号""" + def format(self, record): + record.version = VERSION + return super().format(record) + + # 如果file_handler存在,设置格式和级别 + if file_handler is not None: + file_handler.setFormatter(CustomFormatter( + '%(asctime)s [v%(version)s] [%(levelname)s] %(filename)s:%(lineno)d - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + )) + file_handler.setLevel(log_level) + + # 创建控制台Handler(保留原有的print输出) + console_handler = logging.StreamHandler() + console_handler.setFormatter(CustomFormatter( + '[v%(version)s] [%(levelname)s] %(filename)s:%(lineno)d - %(message)s' + )) + console_handler.setLevel(log_level) + + # 创建QueueListener(后台线程处理日志写入) + # 如果file_handler为None,只使用console_handler + handlers = [console_handler] + if file_handler is not None: + handlers.append(file_handler) + + self._queue_listener = QueueListener( + self._log_queue, + *handlers, + respect_handler_level=True + ) + self._queue_listener.start() + + # 创建QueueHandler(用于记录日志) + queue_handler = QueueHandler(self._log_queue) + + # 配置根logger + self._logger = logging.getLogger() + self._logger.addHandler(queue_handler) + self._logger.setLevel(log_level) + + # 避免日志向上传播到其他logger + self._logger.propagate = False + + # 添加启动标记 + self._logger.info("=" * 60) + self._logger.info("程序启动 - 日志系统初始化") + self._logger.info(f"版本: {VERSION}") + self._logger.info(f"日志文件: {log_file}") + self._logger.info("=" * 60) + + return True + except Exception as e: + # 如果日志初始化失败,至少保证程序能运行 + print(f"[ERROR] 日志系统初始化失败: {e}") + import traceback + try: + traceback.print_exc() + except: + pass + return False + + def stop_logging(self): + """停止日志系统(程序退出时调用)""" + try: + if self._logger: + # 确保所有日志都写入 + self._logger.info("程序退出,正在保存日志...") + import time as std_time + std_time.sleep(0.5) # 给一点时间让日志写入 + + if self._queue_listener: + self._queue_listener.stop() + + if self._logger: + # 等待队列中的日志处理完成 + if self._log_queue: + import time as std_time + timeout = 5 + start = std_time.time() + while not self._log_queue.empty() and (std_time.time() - start) < timeout: + std_time.sleep(0.1) + print("[LOG] 日志系统已停止") + except Exception as e: + print(f"[ERROR] 停止日志系统失败: {e}") + + +# 创建全局单例实例 +logger_manager = LoggerManager() + +# ==================== 向后兼容的函数接口 ==================== + +def init_logging(log_level=logging.INFO, log_file=None, max_bytes=None, backup_count=None): + """初始化日志系统(向后兼容接口)""" + return logger_manager.init_logging(log_level, log_file, max_bytes, backup_count) + +def stop_logging(): + """停止日志系统(向后兼容接口)""" + return logger_manager.stop_logging() + +def get_logger(): + """ + 获取全局logger对象(向后兼容接口) + 如果日志系统未初始化,返回None(此时可以使用print作为fallback) + """ + return logger_manager.logger + + + + + + + diff --git a/main.py b/main.py index 9fc06d5..a8f4e27 100644 --- a/main.py +++ b/main.py @@ -23,10 +23,11 @@ from logger_manager import logger_manager from time_sync import sync_system_time_from_4g from power import init_ina226, get_bus_voltage, voltage_to_percent from laser_manager import laser_manager -from vision import detect_circle_v3, estimate_distance, compute_laser_position, save_shot_image, save_calibration_image +from vision import detect_circle_v3, estimate_distance, save_shot_image, save_calibration_image from network import network_manager from ota_manager import ota_manager from hardware import hardware_manager +from camera_manager import camera_manager # def laser_calibration_worker(): @@ -104,10 +105,8 @@ from hardware import hardware_manager # time.sleep_ms(1000) # 等待1秒后继续 def laser_calibration_worker(): """后台线程:持续检测是否需要执行激光校准""" - from maix import camera from laser_manager import laser_manager from ota_manager import ota_manager - from vision import save_calibration_image # 添加导入 logger = logger_manager.logger if logger: @@ -127,47 +126,10 @@ def laser_calibration_worker(): continue if laser_manager.calibration_active: - cam = None - try: - cam = camera.Camera(640, 480) - start = time.ticks_ms() - timeout_ms = 8000 - while laser_manager.calibration_active and time.ticks_diff(time.ticks_ms(), start) < timeout_ms: - frame = cam.read() - pos = laser_manager.find_red_laser(frame) - if pos: - # 保存校准图像(带标注) - try: - save_calibration_image(frame, pos) - except Exception as e: - logger = logger_manager.logger - if logger: - logger.error(f"[LASER] 保存校准图像失败: {e}") - - laser_manager.set_calibration_result(pos) - laser_manager.stop_calibration() - laser_manager.save_laser_point(pos) - logger = logger_manager.logger - if logger: - logger.info(f"✅ 后台校准成功: {pos}") - break - time.sleep_ms(60) - except Exception as e: - logger = logger_manager.logger - if logger: - logger.error(f"[LASER] calibration error: {e}") - import traceback - logger.error(traceback.format_exc()) - time.sleep_ms(200) - finally: - try: - if cam is not None: - del cam - except Exception as e: - logger = logger_manager.logger - if logger: - logger.error(f"[LASER] 释放相机资源异常: {e}") - + # 调用校准方法,所有逻辑都在 LaserManager 中 + result = laser_manager.calibrate_laser_position(timeout_ms=8000, check_sharpness=True) + + # 如果超时仍未成功,稍微休息一下 if laser_manager.calibration_active: time.sleep_ms(300) else: @@ -211,8 +173,8 @@ def cmd_str(): laser_manager.load_laser_point() # 5. 初始化显示和相机 - disp = display.Display() - cam = camera.Camera(640, 480) + camera_manager.init_camera(640, 480) + camera_manager.init_display() # ==================== 第二阶段:软件初始化 ==================== @@ -358,8 +320,37 @@ def cmd_str(): last_adc_trigger = current_time try: - frame = cam.read() - laser_point = laser_manager.laser_point + frame = camera_manager.read_frame() + + # 先检测靶心以获取距离(用于计算激光点) + # 第一次检测不使用激光点,仅用于获取距离 + result_img_temp, center_temp, radius_temp, method_temp, best_radius1_temp, ellipse_params_temp = detect_circle_v3(frame, None) + + # 计算距离 + distance_m = estimate_distance(best_radius1_temp) if best_radius1_temp else None + + # 根据距离动态计算激光点坐标(如果未检测到靶心,使用硬编码值或默认值) + laser_point_method = None # 记录激光点选择方法 + if config.HARDCODE_LASER_POINT: + # 硬编码模式:使用硬编码值 + laser_point = laser_manager.laser_point + laser_point_method = "hardcode" + elif laser_manager.has_calibrated_point(): + # 假如校准过,并且有保存值,使用校准值 + laser_point = laser_manager.laser_point + laser_point_method = "calibrated" + logger_manager.logger.info(f"[算法] 使用校准值: {laser_manager.laser_point}") + elif distance_m and distance_m > 0: + # 动态计算模式:根据距离计算激光点 + laser_point = laser_manager.calculate_laser_point_from_distance(distance_m) + laser_point_method = "dynamic" + logger_manager.logger.info(f"[算法] 使用比例尺: {laser_point}") + else: + # 未检测到靶心且未启用硬编码:使用默认激光点或从配置文件加载 + laser_point = laser_manager.laser_point + laser_point_method = "default" + logger_manager.logger.info(f"[算法] 使用默认值: {laser_point}") + if laser_point is None: logger = logger_manager.logger if logger: @@ -383,13 +374,13 @@ def cmd_str(): ) frame.draw_circle(int(x), int(y), 1, color, config.LASER_THICKNESS) - # 检测靶心 + # 重新检测靶心(使用计算出的激光点) result_img, center, radius, method, best_radius1, ellipse_params = detect_circle_v3(frame, laser_point) - disp.show(result_img) + camera_manager.show(result_img) # 计算偏移与距离(如果检测到靶心) if center and radius: - dx, dy = compute_laser_position(center, (x, y), radius, method) + dx, dy = laser_manager.compute_laser_position(center, (x, y), radius, method) distance_m = estimate_distance(best_radius1) else: # 未检测到靶心 @@ -401,27 +392,34 @@ def cmd_str(): # 快速激光测距(激光一闪而过,约500-600ms) laser_distance_m = None laser_signal_quality = 0 - try: - result = laser_manager.quick_measure_distance() - if isinstance(result, tuple) and len(result) == 2: - laser_distance_m, laser_signal_quality = result - else: - # 向后兼容:如果返回的是单个值 - laser_distance_m = result if isinstance(result, (int, float)) else 0.0 - laser_signal_quality = 0 - if logger: - if laser_distance_m > 0: - logger.info(f"[MAIN] 激光测距成功: {laser_distance_m:.3f} m, 信号质量: {laser_signal_quality}") - else: - logger.warning("[MAIN] 激光测距失败或返回0") - except Exception as e: - if logger: - logger.error(f"[MAIN] 激光测距异常: {e}") + # try: + # result = laser_manager.quick_measure_distance() + # if isinstance(result, tuple) and len(result) == 2: + # laser_distance_m, laser_signal_quality = result + # else: + # # 向后兼容:如果返回的是单个值 + # laser_distance_m = result if isinstance(result, (int, float)) else 0.0 + # laser_signal_quality = 0 + # if logger: + # if laser_distance_m > 0: + # logger.info(f"[MAIN] 激光测距成功: {laser_distance_m:.3f} m, 信号质量: {laser_signal_quality}") + # else: + # logger.warning("[MAIN] 激光测距失败或返回0") + # except Exception as e: + # if logger: + # logger.error(f"[MAIN] 激光测距异常: {e}") # 读取电量 voltage = get_bus_voltage() battery_percent = voltage_to_percent(voltage) + # 生成射箭ID + from shot_id_generator import shot_id_generator + shot_id = shot_id_generator.generate_id() # 不需要使用device_id + + if logger: + logger.info(f"[MAIN] 射箭ID: {shot_id}") + # 保存图像(无论是否检测到靶心都保存) # save_shot_image 函数会确保绘制激光十字线和检测标注(如果有) # 如果未检测到靶心,文件名会包含 "no_target" 标识 @@ -433,11 +431,13 @@ def cmd_str(): ellipse_params, (x, y), distance_m, + shot_id=shot_id, photo_dir=config.PHOTO_DIR if config.SAVE_IMAGE_ENABLED else None ) # 构造上报数据 inner_data = { + "shot_id": shot_id, # 射箭ID,用于关联图片和服务端日志 "x": float(dx) if dx is not None else 200.0, "y": float(dy) if dy is not None else 200.0, "r": 90.0, @@ -445,18 +445,39 @@ def cmd_str(): "d_laser": round((laser_distance_m or 0.0) * 100), # 激光测距值(厘米) "d_laser_quality": laser_signal_quality, # 激光测距信号质量 "m": method if method else "no_target", - "adc": adc_val + "adc": adc_val, + # 新增字段:激光点选择方法 + "laser_method": laser_point_method, # 激光点选择方法:hardcode/calibrated/dynamic/default + # 激光点坐标(像素) + "target_x": float(x), # 激光点 X 坐标(像素) + "target_y": float(y), # 激光点 Y 坐标(像素) } + + # 添加椭圆参数(如果存在) + if ellipse_params: + (ell_center, (width, height), angle) = ellipse_params + inner_data["ellipse_major_axis"] = float(max(width, height)) # 长轴(像素) + inner_data["ellipse_minor_axis"] = float(min(width, height)) # 短轴(像素) + inner_data["ellipse_angle"] = float(angle) # 椭圆角度(度) + inner_data["ellipse_center_x"] = float(ell_center[0]) # 椭圆中心 X 坐标(像素) + inner_data["ellipse_center_y"] = float(ell_center[1]) # 椭圆中心 Y 坐标(像素) + else: + inner_data["ellipse_major_axis"] = None + inner_data["ellipse_minor_axis"] = None + inner_data["ellipse_angle"] = None + inner_data["ellipse_center_x"] = None + inner_data["ellipse_center_y"] = None + report_data = {"cmd": 1, "data": inner_data} network_manager.safe_enqueue(report_data, msg_type=2, high=True) if logger: if center and radius: - logger.info("射箭事件已加入发送队列(已检测到靶心)") + logger.info(f"射箭事件已加入发送队列(已检测到靶心),ID: {shot_id}") else: - logger.info("射箭事件已加入发送队列(未检测到靶心,已保存图像)") + logger.info(f"射箭事件已加入发送队列(未检测到靶心,已保存图像),ID: {shot_id}") # 闪一下激光(射箭反馈) - # laser_manager.flash_laser(1000) + laser_manager.flash_laser(1000) time.sleep_ms(100) except Exception as e: @@ -469,7 +490,7 @@ def cmd_str(): continue else: try: - disp.show(cam.read()) + camera_manager.show(camera_manager.read_frame()) except Exception as e: logger = logger_manager.logger if logger: @@ -490,10 +511,35 @@ def cmd_str(): time.sleep_ms(1000) # 等待1秒后继续 + + + # 主程序入口 if __name__ == "__main__": try: cmd_str() + + # 用于调用测试函数 + # test() + + # 用于测试图片清晰度 + # 方式1: 测试单张图片 + # test_sharpness("/root/phot/image.bmp") + + # 方式2: 测试目录下所有图片 + # test_sharpness("/root/phot") + + # 方式3: 使用默认路径(config.PHOTO_DIR) + # test_sharpness("/root/phot/") + + # 用于测试激光点清晰度 + # 方式1: 测试单张图片 + # test_laser_point_sharpness("/root/phot/image.bmp") + # 方式2: 测试目录下所有图片 + # test_laser_point_sharpness("/root/phot") + # 方式3: 使用默认路径(config.PHOTO_DIR) + # test_laser_point_sharpness("/root/phot/") + except KeyboardInterrupt: logger = logger_manager.logger if logger: diff --git a/network.py b/network.py new file mode 100644 index 0000000..42f1722 --- /dev/null +++ b/network.py @@ -0,0 +1,1077 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +网络通信模块 +提供TCP通信、数据包打包/解析、队列管理等功能 +""" +import json +from math import e +import struct +from maix import time +import hmac +import hashlib +import ujson +import os +import threading +import socket +import config +from hardware import hardware_manager +from power import get_bus_voltage, voltage_to_percent +# from laser import laser_manager +# from ota import ota_manager +from logger_manager import logger_manager + + +class NetworkManager: + """网络通信管理器(单例)""" + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super(NetworkManager, cls).__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + + # 私有状态 + self._tcp_connected = False + self._high_send_queue = [] + self._normal_send_queue = [] + self._queue_lock = threading.Lock() + self._uart4g_lock = threading.Lock() + self._device_id = None + self._password = None + self._raw_line_data = [] + self._manual_trigger_flag = False + + # WiFi 相关状态 + self._network_type = None # "wifi" 或 "4g" 或 None + self._wifi_connected = False + self._wifi_ip = None + self._wifi_socket = None + self._wifi_socket_lock = threading.Lock() + self._prefer_wifi = True # 是否优先使用WiFi + self._wifi_recv_buffer = b"" # WiFi接收缓冲区 + + self._initialized = True + + # ==================== 状态访问(只读属性)==================== + + @property + def tcp_connected(self): + """TCP连接状态""" + return self._tcp_connected + + @property + def device_id(self): + """设备ID""" + return self._device_id + + @property + def password(self): + """密码""" + return self._password + + @property + def has_pending_messages(self): + """是否有待发送消息""" + with self._queue_lock: + return len(self._high_send_queue) > 0 or len(self._normal_send_queue) > 0 + + @property + def manual_trigger_flag(self): + """手动触发标志""" + return self._manual_trigger_flag + + @property + def network_type(self): + """当前使用的网络类型("wifi" 或 "4g")""" + return self._network_type + + @property + def wifi_connected(self): + """WiFi是否已连接""" + return self._wifi_connected + + @property + def wifi_ip(self): + """WiFi IP地址""" + return self._wifi_ip + + # ==================== 内部状态管理方法 ==================== + + + def set_manual_trigger(self, value=True): + """设置手动触发标志(公共方法)""" + self._manual_trigger_flag = value + + def clear_manual_trigger(self): + """清除手动触发标志(公共方法)""" + self._manual_trigger_flag = False + + def _set_tcp_connected(self, connected): + """设置TCP连接状态(内部方法)""" + self._tcp_connected = connected + + def _set_device_info(self, device_id, password): + """设置设备信息(内部方法)""" + self._device_id = device_id + self._password = password + + def _enqueue(self, item, high=False): + """线程安全地加入队列(内部方法)""" + with self._queue_lock: + if high: + self._high_send_queue.append(item) + else: + self._normal_send_queue.append(item) + + def _dequeue(self): + """线程安全地从队列取出(内部方法)""" + with self._queue_lock: + if self._high_send_queue: + return self._high_send_queue.pop(0) + elif self._normal_send_queue: + return self._normal_send_queue.pop(0) + return None + + def _set_raw_line_data(self, data): + """设置原始行数据(内部方法)""" + self._raw_line_data = data + + def _get_raw_line_data(self): + """获取原始行数据(内部方法)""" + return self._raw_line_data + + def get_uart_lock(self): + """获取UART锁(用于with语句)""" + return self._uart4g_lock + + def get_queue_lock(self): + """获取队列锁(用于with语句)""" + return self._queue_lock + + # ==================== 业务方法 ==================== + + def read_device_id(self): + """从 /device_key 文件读取设备唯一 ID,失败则使用默认值""" + try: + with open("/device_key", "r") as f: + device_id = f.read().strip() + if device_id: + logger = logger_manager.logger + if logger: + logger.debug(f"[INFO] 从 /device_key 读取到 DEVICE_ID: {device_id}") + # 设置内部状态 + self._device_id = device_id + self._password = device_id + "." + return device_id + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[ERROR] 无法读取 /device_key: {e}") + + # 使用默认值 + default_id = "DEFAULT_DEVICE_ID" + self._device_id = default_id + self._password = default_id + "." + return default_id + + # ==================== WiFi 管理方法 ==================== + + def is_wifi_connected(self): + """检查WiFi是否已连接""" + # 优先用 MaixPy network(如果可用) + try: + from maix import network + wlan = network.WLAN(network.TYPE_WIFI) + if wlan.isconnected(): + self._wifi_connected = True + return True + except: + pass + + # 兜底:看系统 wlan0 有没有 IP + try: + ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() + if ip: + self._wifi_connected = True + self._wifi_ip = ip + return True + except: + pass + + self._wifi_connected = False + return False + + def connect_wifi(self, ssid, password): + """ + 连接 Wi-Fi 并将凭证持久化保存到 /boot/ 目录 + + Returns: + (ip, error): IP地址和错误信息(成功时error为None) + """ + conf_path = "/etc/wpa_supplicant.conf" + ssid_file = "/boot/wifi.ssid" + pass_file = "/boot/wifi.pass" + + try: + # 生成 wpa_supplicant 配置 + net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read() + if "network={" not in net_conf: + return None, "Failed to generate wpa config" + + # 写入运行时配置 + with open(conf_path, "w") as f: + f.write("ctrl_interface=/var/run/wpa_supplicant\n") + f.write("update_config=1\n\n") + f.write(net_conf) + + # 持久化保存 SSID/PASS + with open(ssid_file, "w") as f: + f.write(ssid.strip()) + with open(pass_file, "w") as f: + f.write(password.strip()) + + # 重启 Wi-Fi 服务 + os.system("/etc/init.d/S30wifi restart") + + # 等待获取 IP + import time as std_time + for _ in range(20): + ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() + if ip: + self._wifi_connected = True + self._wifi_ip = ip + logger = logger_manager.logger + if logger: + logger.info(f"[WIFI] 已连接,IP: {ip}") + return ip, None + std_time.sleep(1) + + return None, "Timeout: No IP obtained" + + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[WIFI] 连接失败: {e}") + return None, f"Exception: {str(e)}" + + def is_server_reachable(self, host, port=80, timeout=5): + """检查目标主机端口是否可达(用于网络检测)""" + try: + addr_info = socket.getaddrinfo(host, port)[0] + s = socket.socket(addr_info[0], addr_info[1], addr_info[2]) + s.settimeout(timeout) + s.connect(addr_info[-1]) + s.close() + return True + except Exception as e: + logger = logger_manager.logger + if logger: + logger.warning(f"[NET] 无法连接 {host}:{port} - {e}") + return False + + # ==================== 网络选择策略 ==================== + + def select_network(self, prefer_wifi=None): + """ + 自动选择网络(WiFi优先) + + Args: + prefer_wifi: 是否优先使用WiFi(None表示使用默认策略) + + Returns: + "wifi" 或 "4g" 或 None(无可用网络) + """ + if prefer_wifi is None: + prefer_wifi = self._prefer_wifi + + logger = logger_manager.logger + + # 策略1:如果指定优先WiFi,且WiFi可用,使用WiFi + if prefer_wifi and self.is_wifi_connected(): + # 检查WiFi是否能连接到服务器 + if self.is_server_reachable(config.SERVER_IP, config.SERVER_PORT, timeout=3): + self._network_type = "wifi" + if logger: + logger.info(f"[NET] 选择WiFi网络,IP: {self._wifi_ip}") + return "wifi" + else: + if logger: + logger.warning("[NET] WiFi已连接但无法访问服务器,尝试4G") + + # 策略2:如果WiFi可用,使用WiFi + if self.is_wifi_connected(): + if self.is_server_reachable(config.SERVER_IP, config.SERVER_PORT, timeout=3): + self._network_type = "wifi" + if logger: + logger.info(f"[NET] 选择WiFi网络,IP: {self._wifi_ip}") + return "wifi" + + # 策略3:回退到4G + if logger: + logger.info("[NET] WiFi不可用或无法连接服务器,使用4G网络") + self._network_type = "4g" + return "4g" + + + def safe_enqueue(self, data_dict, msg_type=2, high=False): + """线程安全地将消息加入队列(公共方法)""" + self._enqueue((msg_type, data_dict), high) + + def make_packet(self, msg_type: int, body_dict: dict) -> bytes: + """打包 TCP 数据包:头部(长度+类型+校验)+ JSON 正文""" + body = json.dumps(body_dict).encode() + body_len = len(body) + checksum = body_len + msg_type + header = struct.pack(">III", body_len, msg_type, checksum) + return header + body + + def parse_packet(self, data: bytes): + """解析 TCP 数据包,返回 (类型, 正文字典)""" + if len(data) < 12: + return None, None + body_len, msg_type, checksum = struct.unpack(">III", data[:12]) + body = data[12:12 + body_len] + try: + return msg_type, json.loads(body.decode()) + except: + return msg_type, {"raw": body.decode(errors="ignore")} + + def connect_server(self): + """ + 连接到服务器(自动选择WiFi或4G) + + Returns: + bool: 是否连接成功 + """ + if self._tcp_connected: + # 检查当前连接是否仍然有效 + if self._network_type == "wifi": + return self._check_wifi_connection() + elif self._network_type == "4g": + return True # 4G连接状态由AT命令维护 + return False + + # 自动选择网络 + network_type = self.select_network() + if not network_type: + return False + + logger = logger_manager.logger + if logger: + logger.info(f"连接到服务器(使用{network_type.upper()})...") + + # 根据网络类型建立TCP连接 + if network_type == "wifi": + return self._connect_tcp_via_wifi() + elif network_type == "4g": + return self._connect_tcp_via_4g() + return False + + def _connect_tcp_via_wifi(self): + """通过WiFi建立TCP连接""" + try: + # 创建TCP socket + self._wifi_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._wifi_socket.settimeout(5.0) # 5秒超时 + + # 连接到服务器 + addr_info = socket.getaddrinfo(config.SERVER_IP, config.SERVER_PORT, + socket.AF_INET, socket.SOCK_STREAM)[0] + self._wifi_socket.connect(addr_info[-1]) + + # 设置非阻塞模式(用于接收数据) + self._wifi_socket.setblocking(False) + + self._tcp_connected = True + logger = logger_manager.logger + if logger: + logger.info("[WIFI-TCP] TCP连接已建立") + return True + + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[WIFI-TCP] 连接失败: {e}") + + if self._wifi_socket: + try: + self._wifi_socket.close() + except: + pass + self._wifi_socket = None + + return False + + def _connect_tcp_via_4g(self): + """通过4G模块建立TCP连接""" + with self.get_uart_lock(): + hardware_manager.at_client.send("AT+MIPCLOSE=0", "OK", 1000) + res = hardware_manager.at_client.send(f'AT+MIPOPEN=0,"TCP","{config.SERVER_IP}",{config.SERVER_PORT}', "+MIPOPEN", 8000) + if "+MIPOPEN: 0,0" in res: + self._tcp_connected = True + return True + else: + logger = logger_manager.logger + logger.error(f"[4G-TCP] 连接失败: {res}") + return False + + def _check_wifi_connection(self): + """检查WiFi TCP连接是否仍然有效""" + if not self._wifi_socket: + return False + try: + # 尝试发送0字节来检测连接状态 + self._wifi_socket.send(b"", socket.MSG_DONTWAIT) + return True + except: + # socket已断开 + try: + self._wifi_socket.close() + except: + pass + self._wifi_socket = None + self._tcp_connected = False + return False + + def disconnect_server(self): + """断开TCP连接""" + if self._tcp_connected: + logger = logger_manager.logger + if logger: + logger.info("与服务器断开链接") + + if self._network_type == "wifi": + self._disconnect_tcp_via_wifi() + elif self._network_type == "4g": + self._disconnect_tcp_via_4g() + + self._tcp_connected = False + self._network_type = None + + def _disconnect_tcp_via_wifi(self): + """断开WiFi TCP连接""" + with self._wifi_socket_lock: + if self._wifi_socket: + try: + self._wifi_socket.close() + except: + pass + self._wifi_socket = None + + def _disconnect_tcp_via_4g(self): + """断开4G TCP连接""" + with self.get_uart_lock(): + hardware_manager.at_client.send("AT+MIPCLOSE=0", "OK", 1000) + + def tcp_send_raw(self, data: bytes, max_retries=2) -> bool: + """ + 统一的TCP发送接口(自动选择WiFi或4G) + + Args: + data: 要发送的数据 + max_retries: 最大重试次数 + + Returns: + bool: 是否发送成功 + """ + if not self._tcp_connected: + return False + + # 根据网络类型选择发送方式 + if self._network_type == "wifi": + return self._tcp_send_raw_via_wifi(data, max_retries) + elif self._network_type == "4g": + return self._tcp_send_raw_via_4g(data, max_retries) + else: + logger = logger_manager.logger + if logger: + logger.error("[NET] 未选择网络类型,无法发送数据") + return False + + def _tcp_send_raw_via_wifi(self, data: bytes, max_retries=2) -> bool: + """通过WiFi socket发送TCP数据""" + if not self._wifi_socket: + return False + + with self._wifi_socket_lock: + for attempt in range(max_retries): + try: + # 标准socket发送 + total_sent = 0 + while total_sent < len(data): + sent = self._wifi_socket.send(data[total_sent:]) + if sent == 0: + # socket连接已断开 + logger = logger_manager.logger + if logger: + logger.warning(f"[WIFI-TCP] 发送失败,socket已断开(尝试 {attempt+1}/{max_retries})") + break + total_sent += sent + + if total_sent == len(data): + return True + + # 发送不完整,重试 + time.sleep_ms(50) + + except OSError as e: + logger = logger_manager.logger + if logger: + logger.error(f"[WIFI-TCP] 发送异常: {e}(尝试 {attempt+1}/{max_retries})") + time.sleep_ms(50) + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[WIFI-TCP] 未知错误: {e}(尝试 {attempt+1}/{max_retries})") + time.sleep_ms(50) + + return False + + def _tcp_send_raw_via_4g(self, data: bytes, max_retries=2) -> bool: + """通过4G模块发送TCP数据""" + with self.get_uart_lock(): + for _ in range(max_retries): + cmd = f'AT+MIPSEND=0,{len(data)}' + if ">" not in hardware_manager.at_client.send(cmd, ">", 2000): + time.sleep_ms(50) + continue + + total = 0 + while total < len(data): + n = hardware_manager.uart4g.write(data[total:]) + if not n or n < 0: + time.sleep_ms(1) + continue + total += n + + hardware_manager.uart4g.write(b"\x1A") + + r = hardware_manager.at_client.send("", "OK", 8000) + if ("SEND OK" in r) or ("OK" in r) or ("+MIPSEND" in r): + return True + + time.sleep_ms(50) + + return False + + def receive_tcp_data_via_wifi(self, timeout_ms=100): + """ + 通过WiFi接收TCP数据 + + Args: + timeout_ms: 超时时间(毫秒) + + Returns: + bytes: 接收到的数据,如果没有数据则返回 b"" + """ + if not self._wifi_socket: + return b"" + + try: + # 设置接收超时 + self._wifi_socket.settimeout(timeout_ms / 1000.0) + + # 尝试接收数据 + data = self._wifi_socket.recv(4096) # 每次最多接收4KB + return data + + except socket.timeout: + # 超时是正常的,表示没有数据 + return b"" + except OSError as e: + # socket错误(连接断开等) + logger = logger_manager.logger + if logger: + logger.warning(f"[WIFI-TCP] 接收数据失败: {e}") + + # 关闭socket + try: + self._wifi_socket.close() + except: + pass + self._wifi_socket = None + self._tcp_connected = False + + return b"" + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[WIFI-TCP] 接收数据异常: {e}") + return b"" + + + def generate_token(self, device_id): + """生成用于 HTTP 接口鉴权的 Token(HMAC-SHA256)""" + SALT = "shootMessageFire" + SALT2 = "shoot" + return "Arrow_" + hmac.new((SALT + device_id).encode(), SALT2.encode(), hashlib.sha256).hexdigest() + + + def send_http_cmd(self, cmd_str, timeout_ms=3000): + """发送 HTTP 相关 AT 指令(调试用)""" + logger = logger_manager.logger + if logger: + logger.debug(f"[HTTP AT] => {cmd_str}") + return hardware_manager.at_client.send(cmd_str, "OK", timeout_ms) + + + def upload_shoot_event(self,json_data): + """通过 4G 模块上报射击事件到 HTTP 接口(备用通道)""" + token = self.generate_token(self.device_id) + if not self.send_http_cmd(f'AT+MHTTPCREATE="{config.HTTP_URL}"'): + return False + instance_id = 0 + self.send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"Content-Type: application/json"') + self.send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"Authorization: {token}"') + self.send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"DeviceId: {self.device_id}"') + json_str = ujson.dumps(json_data) + if not self.send_http_cmd(f'AT+MHTTPCONTENT={instance_id},0,0,"{json_str}"'): + return False + if self.send_http_cmd(f'AT+MHTTPREQUEST={instance_id},2,0,"{config.HTTP_API_PATH}"'): + time.sleep_ms(5000) + return True + return False + + + def tcp_main(self): + """TCP 主通信循环:登录、心跳、处理指令、发送数据""" + import _thread + from maix import camera + + logger = logger_manager.logger + if logger: + logger.info("[NET] TCP主线程启动") + + send_hartbeat_fail_count = 0 + last_charging_check = 0 + CHARGING_CHECK_INTERVAL = 5000 # 5秒检查一次充电状态 + + while True: + try: + # 检查充电状态(每5秒检查一次) + current_time = time.ticks_ms() + if current_time - last_charging_check > CHARGING_CHECK_INTERVAL: + last_charging_check = current_time + + # OTA 期间不要 connect/登录/心跳/发送 + try: + from ota_manager import ota_manager + if ota_manager.ota_in_progress: + time.sleep_ms(200) + continue + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[NET] OTA检查异常: {e}") + time.sleep_ms(200) + continue + + if not self.connect_server(): + time.sleep_ms(5000) + continue + + # 发送登录包 + vol_val = get_bus_voltage() + login_data = { + "deviceId": self.device_id, + "password": self.password, + "version": config.APP_VERSION, + "vol": vol_val, + "vol_per": voltage_to_percent(vol_val) + } + if not self.tcp_send_raw(self.make_packet(1, login_data)): + self._tcp_connected = False + time.sleep_ms(2000) + continue + + if logger: + logger.info("➡️ 登录包已发送,等待确认...") + logged_in = False + pending_cleared = False + last_heartbeat_ack_time = time.ticks_ms() + last_heartbeat_send_time = time.ticks_ms() + + while True: + # OTA 期间暂停 TCP 活动 + try: + from ota_manager import ota_manager + if ota_manager.ota_in_progress: + time.sleep_ms(200) + continue + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[NET] OTA检查异常: {e}") + time.sleep_ms(200) + continue + + # 接收数据(根据网络类型选择接收方式) + item = None + if self._network_type == "wifi": + # WiFi接收数据 + data = self.receive_tcp_data_via_wifi(timeout_ms=50) + if data: + # 将数据添加到缓冲区 + self._wifi_recv_buffer += data + + # 尝试从缓冲区解析完整的数据包 + while len(self._wifi_recv_buffer) >= 12: # 至少需要12字节的头部 + # 解析头部 + try: + body_len, msg_type, checksum = struct.unpack(">III", self._wifi_recv_buffer[:12]) + total_len = 12 + body_len + + if len(self._wifi_recv_buffer) >= total_len: + # 有完整的数据包 + payload = self._wifi_recv_buffer[:total_len] + self._wifi_recv_buffer = self._wifi_recv_buffer[total_len:] + item = (0, payload) # link_id=0 for WiFi + break + else: + # 数据包不完整,等待更多数据 + break + except: + # 解析失败,清空缓冲区 + self._wifi_recv_buffer = b"" + break + elif self._network_type == "4g": + # 4G接收数据 + item = hardware_manager.at_client.pop_tcp_payload() + + if item: + if isinstance(item, tuple) and len(item) == 2: + link_id, payload = item + else: + link_id, payload = 0, item + + if not logged_in: + try: + if logger: + logger.debug(f"[TCP] rx link={link_id} len={len(payload)} head={payload[:12].hex()}") + except: + pass + + msg_type, body = self.parse_packet(payload) + + # 处理登录响应 + if not logged_in and msg_type == 1: + if body and body.get("cmd") == 1 and body.get("data") == "登录成功": + logged_in = True + last_heartbeat_ack_time = time.ticks_ms() + if logger: + logger.info("登录成功") + + # 检查 ota_pending.json + try: + pending_path = f"{config.APP_DIR}/ota_pending.json" + if os.path.exists(pending_path): + try: + with open(pending_path, "r", encoding="utf-8") as f: + pending_obj = json.load(f) + except: + pending_obj = {} + self.safe_enqueue({"result": "ota_ok", "url": pending_obj.get("url", "")}, 2) + if logger: + logger.info("[OTA] 已上报 ota_ok,等待心跳确认后删除 pending") + except Exception as e: + if logger: + logger.error(f"[OTA] ota_ok 上报失败: {e}") + else: + break + + # 处理心跳 ACK + elif logged_in and msg_type == 4: + last_heartbeat_ack_time = time.ticks_ms() + if logger: + logger.debug("✅ 收到心跳确认") + + # 处理命令40(分片下载) + elif logged_in and msg_type == 40: + if isinstance(body, dict): + t = body.get('t', 0) + v = body.get('v') + # 如果是第一个分片,清空之前的缓存 + if len(self._raw_line_data) == 0 or (len(self._raw_line_data) > 0 and self._raw_line_data[0].get('v') != v): + self._raw_line_data.clear() + # 或者更简单:每次收到命令40时,如果版本号不同,清空缓存 + if len(self._raw_line_data) > 0: + first_v = self._raw_line_data[0].get('v') + if first_v and first_v != v: + self._raw_line_data.clear() + self._raw_line_data.append(body) + if len(self._raw_line_data) >= int(t): + if logger: + logger.info(f"下载完成") + from ota_manager import ota_manager + stock_array = list(map(lambda x: x.get('d'), self._raw_line_data)) + local_filename = config.LOCAL_FILENAME + with open(local_filename, 'w', encoding='utf-8') as file: + file.write("\n".join(stock_array)) + ota_manager.apply_ota_and_reboot(None, local_filename) + else: + self.safe_enqueue({'data':{'l': len(self._raw_line_data), 'v': v}, 'cmd': 41}) + if logger: + logger.info(f"已下载{len(self._raw_line_data)} 全部:{t} 版本:{v}") + + # 处理业务指令 + elif logged_in and isinstance(body, dict): + inner_cmd = None + data_obj = body.get("data") + if isinstance(data_obj, dict): + inner_cmd = data_obj.get("cmd") + + if inner_cmd == 2: # 开启激光并校准 + from laser_manager import laser_manager + if not laser_manager.calibration_active: + laser_manager.turn_on_laser() + time.sleep_ms(100) + laser_manager.start_calibration() + self.safe_enqueue({"result": "calibrating"}, 2) + elif inner_cmd == 3: # 关闭激光 + from laser_manager import laser_manager + laser_manager.turn_off_laser() + laser_manager.stop_calibration() + self.safe_enqueue({"result": "laser_off"}, 2) + elif inner_cmd == 4: # 上报电量 + voltage = get_bus_voltage() + battery_percent = voltage_to_percent(voltage) + battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)} + self.safe_enqueue(battery_data, 2) + if logger: + logger.info(f"电量上报: {battery_percent}%") + elif inner_cmd == 5: # OTA 升级 + inner_data = data_obj.get("data", {}) if isinstance(data_obj, dict) else {} + ssid = inner_data.get("ssid") + password = inner_data.get("password") + ota_url = inner_data.get("url") + mode = (inner_data.get("mode") or "").strip().lower() + + if not ota_url: + if logger: + logger.error("ota missing_url") + self.safe_enqueue({"result": "missing_url"}, 2) + continue + + from ota_manager import ota_manager + if ota_manager.update_thread_started: + self.safe_enqueue({"result": "update_already_started"}, 2) + continue + + # 自动判断模式:如果没有明确指定,根据WiFi连接状态和凭证决定 + if mode not in ("4g", "wifi"): + if logger: + logger.info("ota missing mode, auto-detecting...") + # 只有同时满足:WiFi已连接 且 提供了WiFi凭证,才使用WiFi + if self.is_wifi_connected() and ssid and password: + mode = "wifi" + if logger: + logger.info("ota auto-selected: wifi (WiFi connected and credentials provided)") + else: + mode = "4g" + if logger: + logger.info("ota auto-selected: 4g (WiFi not available or no credentials)") + + if mode == "4g": + ota_manager._set_ota_url(ota_url) # 记录 OTA URL,供命令7使用 + ota_manager._start_update_thread() + _thread.start_new_thread(ota_manager.direct_ota_download_via_4g, (ota_url,)) + else: # mode == "wifi" + if not ssid or not password: + if logger: + logger.error("ota wifi mode requires ssid and password") + self.safe_enqueue({"result": "missing_ssid_or_password"}, 2) + else: + ota_manager._start_update_thread() + _thread.start_new_thread(ota_manager.handle_wifi_and_update, (ssid, password, ota_url)) + elif inner_cmd == 6: + try: + ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() + ip = ip if ip else "no_ip" + except: + ip = "error_getting_ip" + self.safe_enqueue({"result": "current_ip", "ip": ip}, 2) + elif inner_cmd == 7: + from ota_manager import ota_manager + if ota_manager.update_thread_started: + self.safe_enqueue({"result": "update_already_started"}, 2) + continue + + try: + ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() + except: + ip = None + + if not ip: + self.safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, 2) + else: + # 注意:direct_ota_download 需要 ota_url 参数 + # 如果 ota_manager.ota_url 为 None,需要从其他地方获取 + ota_url_to_use = ota_manager.ota_url + if not ota_url_to_use: + if logger: + logger.error("[OTA] cmd=7 但 OTA_URL 未设置") + self.safe_enqueue({"result": "ota_failed", "reason": "ota_url_not_set"}, 2) + else: + ota_manager._start_update_thread() + _thread.start_new_thread(ota_manager.direct_ota_download, (ota_url_to_use,)) + elif inner_cmd == 41: + if logger: + logger.info("[TEST] 收到TCP射箭触发命令") + self._manual_trigger_flag = True + self.safe_enqueue({"result": "trigger_ack"}, 2) + elif inner_cmd == 42: # 关机命令 + if logger: + logger.info("[SHUTDOWN] 收到TCP关机命令,准备关机...") + self.safe_enqueue({"result": "shutdown_ack"}, 2) + time.sleep_ms(1000) + self.disconnect_server() + # 尝试关闭4G模块 + try: + with self.get_uart_lock(): + hardware_manager.at_client.send("AT+CFUN=0", "OK", 5000) + except: + pass + time.sleep_ms(2000) + os.system("poweroff") + return + else: + time.sleep_ms(5) + + # 发送队列中的业务数据 + if logged_in and (self._high_send_queue or self._normal_send_queue): + msg_type = None + data_dict = None + if self.get_queue_lock().acquire(blocking=False): + try: + if self._high_send_queue: + msg_type, data_dict = self._high_send_queue.pop(0) + elif self._normal_send_queue: + msg_type, data_dict = self._normal_send_queue.pop(0) + finally: + self.get_queue_lock().release() + + if msg_type is not None and data_dict is not None: + pkt = self.make_packet(msg_type, data_dict) + if not self.tcp_send_raw(pkt): + self._tcp_connected = False + break + + # 发送激光校准结果 + if logged_in: + from laser_manager import laser_manager + result = laser_manager.get_calibration_result() + if result: + x, y = result + self.safe_enqueue({"result": "ok", "x": x, "y": y}, 2) + + # 定期发送心跳 + current_time = time.ticks_ms() + if logged_in and current_time - last_heartbeat_send_time > config.HEARTBEAT_INTERVAL * 1000: + vol_val = get_bus_voltage() + if not self.tcp_send_raw(self.make_packet(4, {"vol": vol_val, "vol_per": voltage_to_percent(vol_val)})): + if logger: + logger.error("心跳发送失败") + time.sleep_ms(3000) + send_hartbeat_fail_count += 1 + if send_hartbeat_fail_count >= 3: + send_hartbeat_fail_count = 0 + if logger: + logger.error("连续3次发送心跳失败,重连") + break + else: + continue + else: + send_hartbeat_fail_count = 0 + last_heartbeat_send_time = current_time + if logger: + logger.debug("心跳已发送") + + # 删除 pending 文件(心跳发送成功后) + if not pending_cleared: + try: + pending_path = f"{config.APP_DIR}/ota_pending.json" + if os.path.exists(pending_path): + try: + os.remove(pending_path) + pending_cleared = True + if logger: + logger.info("[OTA] 心跳发送成功,已删除 ota_pending.json") + except Exception as e: + if logger: + logger.error(f"[OTA] 删除 pending 文件失败: {e}") + except Exception as e: + if logger: + logger.error(f"[OTA] 检查 pending 文件时出错: {e}") + + # 心跳超时重连 + if logged_in and current_time - last_heartbeat_ack_time > 1000*60*10: + if logger: + logger.error("十分钟无心跳ACK,重连") + break + + time.sleep_ms(50) + + self._tcp_connected = False + if logger: + logger.error("连接异常,2秒后重连...") + time.sleep_ms(2000) + + except Exception as e: + # TCP主循环的顶层异常捕获,防止线程静默退出 + logger = logger_manager.logger + if logger: + logger.error(f"[NET] TCP主循环异常: {e}") + import traceback + logger.error(traceback.format_exc()) + else: + print(f"[NET] TCP主循环异常: {e}") + import traceback + traceback.print_exc() + self._tcp_connected = False + time.sleep_ms(5000) # 等待5秒后重试连接 + + +# 创建全局单例实例 +network_manager = NetworkManager() + +# ==================== 向后兼容的函数接口 ==================== + +def tcp_main(): + """TCP主循环(向后兼容接口)""" + return network_manager.tcp_main() + +def read_device_id(): + """读取设备ID(向后兼容接口)""" + return network_manager.read_device_id() + +def safe_enqueue(data_dict, msg_type=2, high=False): + """线程安全地加入队列(向后兼容接口)""" + return network_manager.safe_enqueue(data_dict, msg_type, high) + +def connect_server(): + """连接服务器(向后兼容接口)""" + return network_manager.connect_server() + +def disconnet_server(): + """断开服务器连接(向后兼容接口)""" + return network_manager.disconnect_server() + +def is_wifi_connected(): + """检查WiFi是否已连接(向后兼容接口)""" + return network_manager.is_wifi_connected() + +def connect_wifi(ssid, password): + """连接WiFi(向后兼容接口)""" + return network_manager.connect_wifi(ssid, password) + +def is_server_reachable(host, port=80, timeout=5): + """检查服务器是否可达(向后兼容接口)""" + return network_manager.is_server_reachable(host, port, timeout) + + diff --git a/ota_manager.py b/ota_manager.py index 2efe62b..7d3926e 100644 --- a/ota_manager.py +++ b/ota_manager.py @@ -46,7 +46,7 @@ class OTAManager: self._ota_url = None self._ota_mode = None self._lock = threading.Lock() - + self._is_https = False self._initialized = True # ==================== 状态访问(只读属性)==================== @@ -699,11 +699,14 @@ class OTAManager: return False, "bad_url (no host)" # 很多 ML307R 的 MHTTP 对 https 不稳定;对已知域名做降级 + if isinstance(url, str) and url.startswith("https://static.shelingxingqiu.com/"): - base_url = "http://static.shelingxingqiu.com" + base_url = "https://static.shelingxingqiu.com" + # TODO:使用https,看看是否能成功 + self._is_https = True else: base_url = f"http://{host}" - + self._is_https = False logger = logger_manager.logger def _log(*a): @@ -786,6 +789,15 @@ class OTAManager: _hard_reset_http() resp = hardware_manager.at_client.send(f'AT+MHTTPCREATE="{base_url}"', "OK", 8000) hid = _parse_httpid(resp) + if self._is_https: + resp = hardware_manager.at_client.send(f'AT+MHTTPCFG="ssl",{hid},1,1', "OK", 2000) + if "ERROR" in resp or "CME ERROR" in resp: + logger_manager.logger.error(f"MHTTPCFG SSL failed: {resp}") + # 尝试https 降级到http + downgraded_base_url = base_url.replace("https://", "http://") + resp = hardware_manager.at_client.send(f'AT+MHTTPCREATE="{downgraded_base_url}"', "OK", 8000) + hid = _parse_httpid(resp) + return hid, resp def _fetch_range_into_buf(start, want_len, out_buf, full_reset=False): diff --git a/shot_id_generator.py b/shot_id_generator.py new file mode 100644 index 0000000..ce1f1c7 --- /dev/null +++ b/shot_id_generator.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +射箭ID生成器 +为每次射箭生成唯一ID,格式:{timestamp_ms}_{counter} +""" +from maix import time +import threading + + +class ShotIDGenerator: + """射箭ID生成器(单例)""" + _instance = None + _lock = threading.Lock() + + def __new__(cls): + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super(ShotIDGenerator, cls).__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + + self._counter = 0 + self._last_timestamp_ms = 0 + self._lock = threading.Lock() + + self._initialized = True + + def generate_id(self, device_id=None): + """ + 生成唯一的射箭ID + + Args: + device_id: 可选的设备ID,如果提供则包含在ID中(格式:{device_id}_{timestamp_ms}_{counter}) + 如果不提供,则使用简单格式(格式:{timestamp_ms}_{counter}) + + Returns: + str: 唯一的射箭ID + """ + with self._lock: + current_timestamp_ms = time.ticks_ms() + + # 如果时间戳相同,增加计数器;否则重置计数器 + if current_timestamp_ms == self._last_timestamp_ms: + self._counter += 1 + else: + self._counter = 0 + self._last_timestamp_ms = current_timestamp_ms + + # 生成ID + if device_id: + shot_id = f"{device_id}_{current_timestamp_ms}_{self._counter}" + else: + shot_id = f"{current_timestamp_ms}_{self._counter}" + + return shot_id + + def reset(self): + """重置计数器(通常不需要调用)""" + with self._lock: + self._counter = 0 + self._last_timestamp_ms = 0 + + +# 创建全局单例实例 +shot_id_generator = ShotIDGenerator() + + + + + diff --git a/version.py b/version.py index ef0a9b6..de0a931 100644 --- a/version.py +++ b/version.py @@ -4,7 +4,11 @@ 应用版本号 每次 OTA 更新时,只需要更新这个文件中的版本号 """ -VERSION = '1.1.1' +VERSION = '1.1.5' + + + + diff --git a/vision.py b/vision.py index 38847e8..c421539 100644 --- a/vision.py +++ b/vision.py @@ -12,6 +12,241 @@ from maix import image import config from logger_manager import logger_manager +def check_laser_point_sharpness(frame, laser_point=None, roi_size=30, threshold=100.0, ellipse_params=None): + """ + 检测激光点本身的清晰度(不是整个靶子) + + Args: + frame: 图像帧对象 + laser_point: 激光点坐标 (x, y),如果为None则自动查找 + roi_size: ROI区域大小(像素),默认30x30 + threshold: 清晰度阈值 + ellipse_params: 椭圆参数 ((center_x, center_y), (width, height), angle),用于限制激光点必须在椭圆内 + + Returns: + (is_sharp, sharpness_score, laser_pos): (是否清晰, 清晰度分数, 激光点坐标) + """ + try: + # 1. 如果没有提供激光点,先查找 + if laser_point is None: + from laser_manager import laser_manager + laser_point = laser_manager.find_red_laser(frame, ellipse_params=ellipse_params) + if laser_point is None: + logger_manager.logger.debug(f"未找到激光点") + return False, 0.0, None + + x, y = laser_point + + # 2. 转换为 OpenCV 格式 + img_cv = image.image2cv(frame, False, False) + h, w = img_cv.shape[:2] + + # 3. 提取 ROI 区域(激光点周围) + roi_half = roi_size // 2 + x_min = max(0, int(x) - roi_half) + x_max = min(w, int(x) + roi_half) + y_min = max(0, int(y) - roi_half) + y_max = min(h, int(y) + roi_half) + + roi = img_cv[y_min:y_max, x_min:x_max] + + if roi.size == 0: + return False, 0.0, laser_point + + # 4. 转换为灰度图(用于清晰度检测) + gray_roi = cv2.cvtColor(roi, cv2.COLOR_RGB2GRAY) + + # 5. 方法1:检测点的扩散程度(能量集中度) + # 计算中心区域的能量集中度 + center_x, center_y = roi.shape[1] // 2, roi.shape[0] // 2 + center_radius = min(5, roi.shape[0] // 4) # 中心区域半径 + + # 创建中心区域的掩码 + y_coords, x_coords = np.ogrid[:roi.shape[0], :roi.shape[1]] + center_mask = (x_coords - center_x)**2 + (y_coords - center_y)**2 <= center_radius**2 + + # 计算中心区域和周围区域的亮度 + center_brightness = gray_roi[center_mask].mean() + outer_mask = ~center_mask + outer_brightness = gray_roi[outer_mask].mean() if np.any(outer_mask) else 0 + + # 对比度(清晰的点对比度高) + contrast = abs(center_brightness - outer_brightness) + + # 6. 方法2:检测点的边缘锐度(使用拉普拉斯) + laplacian = cv2.Laplacian(gray_roi, cv2.CV_64F) + edge_sharpness = abs(laplacian).var() + + # 7. 方法3:检测点的能量集中度(方差) + # 清晰的点:能量集中在中心,方差小 + # 模糊的点:能量分散,方差大 + # 但我们需要的是:清晰的点中心亮度高,周围低,所以梯度大 + sobel_x = cv2.Sobel(gray_roi, cv2.CV_64F, 1, 0, ksize=3) + sobel_y = cv2.Sobel(gray_roi, cv2.CV_64F, 0, 1, ksize=3) + gradient = np.sqrt(sobel_x**2 + sobel_y**2) + gradient_sharpness = gradient.var() + + # 8. 组合多个指标 + # 对比度权重0.3,边缘锐度权重0.4,梯度权重0.3 + sharpness_score = (contrast * 0.3 + edge_sharpness * 0.4 + gradient_sharpness * 0.3) + + is_sharp = sharpness_score >= threshold + + logger = logger_manager.logger + if logger: + logger.debug(f"[VISION] 激光点清晰度: 位置=({x}, {y}), 对比度={contrast:.2f}, 边缘={edge_sharpness:.2f}, 梯度={gradient_sharpness:.2f}, 综合={sharpness_score:.2f}, 是否清晰={is_sharp}") + + return is_sharp, sharpness_score, laser_point + + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[VISION] 激光点清晰度检测失败: {e}") + import traceback + logger.error(traceback.format_exc()) + return False, 0.0, laser_point + +def check_image_sharpness(frame, threshold=100.0, save_debug_images=False): + """ + 检查图像清晰度(针对圆形靶子优化,基于圆形边缘检测) + 检测靶心的圆形边缘,计算边缘区域的梯度清晰度 + + Args: + frame: 图像帧对象 + threshold: 清晰度阈值,低于此值认为图像模糊(默认100.0) + 可以根据实际情况调整: + - 清晰图像通常 > 200 + - 模糊图像通常 < 100 + - 中等清晰度 100-200 + save_debug_images: 是否保存调试图像(原始图和边缘图),默认False + + Returns: + (is_sharp, sharpness_score): (是否清晰, 清晰度分数) + """ + try: + logger_manager.logger.debug(f"begin") + # 转换为 OpenCV 格式 + img_cv = image.image2cv(frame, False, False) + logger_manager.logger.debug(f"after image2cv") + + # 转换为 HSV 颜色空间 + hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV) + h, s, v = cv2.split(hsv) + logger_manager.logger.debug(f"after HSV conversion") + + # 检测黄色区域(靶心) + # 调整饱和度策略:稍微增强,不要过度 + s_enhanced = np.clip(s * 1.1, 0, 255).astype(np.uint8) + hsv_enhanced = cv2.merge((h, s_enhanced, v)) + + # HSV 阈值范围(与 detect_circle_v3 保持一致) + lower_yellow = np.array([7, 80, 0]) + upper_yellow = np.array([32, 255, 255]) + mask_yellow = cv2.inRange(hsv_enhanced, lower_yellow, upper_yellow) + + # 形态学操作,填充小孔洞 + kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) + mask_yellow = cv2.morphologyEx(mask_yellow, cv2.MORPH_CLOSE, kernel) + logger_manager.logger.debug(f"after yellow mask detection") + + # 计算边缘区域:扩展黄色区域,然后减去原始区域,得到边缘区域 + mask_dilated = cv2.dilate(mask_yellow, kernel, iterations=2) + mask_edge = cv2.subtract(mask_dilated, mask_yellow) # 边缘区域 + + # 计算边缘区域的像素数量 + edge_pixel_count = np.sum(mask_edge > 0) + logger_manager.logger.debug(f"edge pixel count: {edge_pixel_count}") + + # 如果检测不到边缘区域,使用全局梯度作为后备方案 + if edge_pixel_count < 100: + logger_manager.logger.debug(f"edge region too small, using global gradient") + # 使用 V 通道计算全局梯度 + sobel_v_x = cv2.Sobel(v, cv2.CV_64F, 1, 0, ksize=3) + sobel_v_y = cv2.Sobel(v, cv2.CV_64F, 0, 1, ksize=3) + gradient = np.sqrt(sobel_v_x**2 + sobel_v_y**2) + sharpness_score = gradient.var() + logger_manager.logger.debug(f"global gradient variance: {sharpness_score:.2f}") + else: + # 在边缘区域计算梯度清晰度 + # 使用 V(亮度)通道计算梯度,因为边缘在亮度上通常很明显 + sobel_v_x = cv2.Sobel(v, cv2.CV_64F, 1, 0, ksize=3) + sobel_v_y = cv2.Sobel(v, cv2.CV_64F, 0, 1, ksize=3) + gradient = np.sqrt(sobel_v_x**2 + sobel_v_y**2) + + # 只在边缘区域计算清晰度 + edge_gradient = gradient[mask_edge > 0] + + if len(edge_gradient) > 0: + # 计算边缘梯度的方差(清晰图像的边缘梯度变化大) + sharpness_score = edge_gradient.var() + # 也可以使用均值作为补充指标(清晰图像的边缘梯度均值也较大) + gradient_mean = edge_gradient.mean() + logger_manager.logger.debug(f"edge gradient: mean={gradient_mean:.2f}, var={sharpness_score:.2f}, pixels={len(edge_gradient)}") + else: + # 如果边缘区域没有有效梯度,使用全局梯度 + sharpness_score = gradient.var() + logger_manager.logger.debug(f"no edge gradient, using global: {sharpness_score:.2f}") + + # 保存调试图像(如果启用) + if save_debug_images: + try: + debug_dir = config.PHOTO_DIR + if debug_dir not in os.listdir("/root"): + try: + os.mkdir(debug_dir) + except: + pass + + # 生成文件名 + try: + all_images = [f for f in os.listdir(debug_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))] + img_count = len(all_images) + except: + img_count = 0 + + # 保存原始图像 + img_orig = image.cv2image(img_cv, False, False) + orig_filename = f"{debug_dir}/sharpness_debug_orig_{img_count:04d}.bmp" + img_orig.save(orig_filename) + + # # 保存边缘检测结果(可视化) + # # 创建可视化图像:原始图像 + 黄色区域 + 边缘区域 + # debug_img = img_cv.copy() + # # 在黄色区域绘制绿色 + # debug_img[mask_yellow > 0] = [0, 255, 0] # RGB格式,绿色 + # # 在边缘区域绘制红色 + # debug_img[mask_edge > 0] = [255, 0, 0] # RGB格式,红色 + + # debug_img_maix = image.cv2image(debug_img, False, False) + # debug_filename = f"{debug_dir}/sharpness_debug_edge_{img_count:04d}.bmp" + # debug_img_maix.save(debug_filename) + + # logger = logger_manager.logger + # if logger: + # logger.info(f"[VISION] 保存调试图像: {orig_filename}, {debug_filename}") + except Exception as e: + logger = logger_manager.logger + if logger: + logger.warning(f"[VISION] 保存调试图像失败: {e}") + import traceback + logger.error(traceback.format_exc()) + + is_sharp = sharpness_score >= threshold + + logger = logger_manager.logger + if logger: + logger.debug(f"[VISION] 清晰度检测: 分数={sharpness_score:.2f}, 边缘像素数={edge_pixel_count}, 是否清晰={is_sharp}, 阈值={threshold}") + + return is_sharp, sharpness_score + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[VISION] 清晰度检测失败: {e}") + import traceback + logger.error(traceback.format_exc()) + # 出错时返回 False,避免使用模糊图像 + return False, 0.0 + def save_calibration_image(frame, laser_pos, photo_dir=None): """ 保存激光校准图像(带标注) @@ -278,22 +513,24 @@ def estimate_distance(pixel_radius): return 0.0 return (config.REAL_RADIUS_CM * config.FOCAL_LENGTH_PIX) / pixel_radius / 100.0 - -def compute_laser_position(circle_center, laser_point, radius, method): - """计算激光相对于靶心的偏移量(单位:厘米)""" - if not all([circle_center, radius, method]): - return None, None - cx, cy = circle_center - lx, ly = 320, 230 - # 根据检测方法动态调整靶心物理半径(简化模型) - circle_r = (radius / 4.0) * 20.0 if method == "模糊" else (68 / 16.0) * 20.0 - dx = lx - cx - dy = ly - cy - return dx / (circle_r / 100.0), -dy / (circle_r / 100.0) - +def estimate_pixel(physical_distance_cm, target_distance_m): + """ + 根据物理距离和目标距离计算对应的像素偏移 + + Args: + physical_distance_cm: 物理世界中的距离(厘米),例如激光与摄像头的距离 + target_distance_m: 目标距离(米),例如到靶心的距离 + + Returns: + float: 对应的像素偏移 + """ + if not target_distance_m or target_distance_m <= 0: + return 0.0 + # 公式:像素偏移 = (物理距离_米) * 焦距_像素 / 目标距离_米 + return (physical_distance_cm / 100.0) * config.FOCAL_LENGTH_PIX / target_distance_m def save_shot_image(result_img, center, radius, method, ellipse_params, - laser_point, distance_m, photo_dir=None): + laser_point, distance_m, shot_id=None, photo_dir=None): """ 保存射击图像(带标注) 即使没有检测到靶心也会保存图像,文件名会标注 "no_target" @@ -307,6 +544,7 @@ def save_shot_image(result_img, center, radius, method, ellipse_params, ellipse_params: 椭圆参数 ((center, (width, height), angle)) 或 None laser_point: 激光点坐标 (x, y) distance_m: 距离(米),可能为 None(未检测到靶心) + shot_id: 射箭ID,如果提供则用作文件名,否则使用旧的文件名格式 photo_dir: 照片存储目录,如果为None则使用 config.PHOTO_DIR Returns: @@ -327,28 +565,39 @@ def save_shot_image(result_img, center, radius, method, ellipse_params, except: pass - # 生成文件名 - # 统计所有图片文件(包括 .bmp 和 .jpg) - try: - all_images = [f for f in os.listdir(photo_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))] - img_count = len(all_images) - except: - img_count = 0 - x, y = laser_point - # 如果未检测到靶心,在文件名中标注 - if center is None or radius is None: - method_str = "no_target" - distance_str = "000" + # 生成文件名:优先使用 shot_id,否则使用旧格式 + if shot_id: + # 使用射箭ID作为文件名 + # 如果未检测到靶心,在文件名中标注 + if center is None or radius is None: + filename = f"{photo_dir}/shot_{shot_id}_no_target.bmp" + else: + method_str = method or "unknown" + filename = f"{photo_dir}/shot_{shot_id}_{method_str}.bmp" else: - method_str = method or "unknown" - distance_str = str(round((distance_m or 0.0) * 100)) - - filename = f"{photo_dir}/{method_str}_{int(x)}_{int(y)}_{distance_str}_{img_count:04d}.bmp" + # 旧的文件名格式(向后兼容) + try: + all_images = [f for f in os.listdir(photo_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))] + img_count = len(all_images) + except: + img_count = 0 + + # 如果未检测到靶心,在文件名中标注 + if center is None or radius is None: + method_str = "no_target" + distance_str = "000" + else: + method_str = method or "unknown" + distance_str = str(round((distance_m or 0.0) * 100)) + + filename = f"{photo_dir}/{method_str}_{int(x)}_{int(y)}_{distance_str}_{img_count:04d}.bmp" logger = logger_manager.logger if logger: + if shot_id: + logger.info(f"[VISION] 保存射箭图像,ID: {shot_id}, 文件名: {filename}") if center and radius: logger.info(f"结果 -> 圆心: {center}, 半径: {radius}, 方法: {method}") if ellipse_params: @@ -360,23 +609,35 @@ def save_shot_image(result_img, center, radius, method, ellipse_params, # 转换图像为 OpenCV 格式以便绘制 img_cv = image.image2cv(result_img, False, False) - # 确保激光十字线被绘制(使用OpenCV在图像上绘制,确保可见性) - laser_color = (config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2]) - thickness = max(config.LASER_THICKNESS, 2) # 至少2像素宽,确保可见 - length = max(config.LASER_LENGTH, 10) # 至少10像素长 + # # 确保激光十字线被绘制(使用OpenCV在图像上绘制,确保可见性) + # laser_color = (config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2]) + # thickness = max(config.LASER_THICKNESS, 2) # 至少2像素宽,确保可见 + # length = max(config.LASER_LENGTH, 10) # 至少10像素长 - # 绘制激光十字线(水平线) - cv2.line(img_cv, - (int(x - length), int(y)), - (int(x + length), int(y)), - laser_color, thickness) - # 绘制激光十字线(垂直线) - cv2.line(img_cv, - (int(x), int(y - length)), - (int(x), int(y + length)), - laser_color, thickness) - # 绘制激光点 - cv2.circle(img_cv, (int(x), int(y)), max(thickness, 3), laser_color, -1) + # # 绘制激光十字线(水平线) + # cv2.line(img_cv, + # (int(x - length), int(y)), + # (int(x + length), int(y)), + # laser_color, thickness) + # # 绘制激光十字线(垂直线) + # cv2.line(img_cv, + # (int(x), int(y - length)), + # (int(x), int(y + length)), + # laser_color, thickness) + # # 绘制激光点 + # cv2.circle(img_cv, (int(x), int(y)), max(thickness, 3), laser_color, -1) + # 在 vision.py 的 save_shot_image 函数中,替换第598-614行的代码: + + # 绘制激光点标注(使用空心圆圈,类似校准时的标注方式) + laser_color = (config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2]) + thickness = 1 # 圆圈线宽 + + # 绘制外圈(半径10,空心) + cv2.circle(img_cv, (int(x), int(y)), 10, laser_color, thickness) + # 绘制中圈(半径5,空心) + cv2.circle(img_cv, (int(x), int(y)), 5, laser_color, thickness) + # 绘制中心点(半径2,实心,用于精确定位) + cv2.circle(img_cv, (int(x), int(y)), 2, laser_color, -1) # 如果检测到靶心,绘制靶心标注 if center and radius: