diff --git a/camera_manager.py b/camera_manager.py index 7b77782..0f28ed4 100644 --- a/camera_manager.py +++ b/camera_manager.py @@ -1,138 +1,137 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -相机管理器模块 -提供相机和显示的统一管理和线程安全访问 -""" -import threading -import config -from logger_manager import logger_manager - - -class CameraManager: - """相机管理器(单例)""" - _instance = None - - def __new__(cls): - if cls._instance is None: - cls._instance = super(CameraManager, cls).__new__(cls) - cls._instance._initialized = False - return cls._instance - - def __init__(self): - if self._initialized: - return - - # 私有对象 - self._camera = None - self._display = None - - # 线程安全锁 - self._camera_lock = threading.Lock() - self._display_lock = threading.Lock() - - # 相机配置 - self._camera_width = 640 - self._camera_height = 480 - - self._initialized = True - - # ==================== 初始化方法 ==================== - - def init_camera(self, width=640, height=480): - """初始化相机""" - if self._camera is not None: - return self._camera - - from maix import camera - - self._camera_width = width - self._camera_height = height - - with self._camera_lock: - if self._camera is None: - self._camera = camera.Camera(width, height) - logger = logger_manager.logger - if logger: - logger.info(f"[CAMERA] 相机初始化完成: {width}x{height}") - - return self._camera - - def init_display(self): - """初始化显示""" - if self._display is not None: - return self._display - - from maix import display - - with self._display_lock: - if self._display is None: - self._display = display.Display() - logger = logger_manager.logger - if logger: - logger.info("[CAMERA] 显示初始化完成") - - return self._display - - # ==================== 访问方法 ==================== - - @property - def camera(self): - """获取相机实例(懒加载)""" - if self._camera is None: - self.init_camera() - return self._camera - - @property - def display(self): - """获取显示实例(懒加载)""" - if self._display is None: - self.init_display() - return self._display - - # ==================== 业务方法 ==================== - - def read_frame(self): - """ - 线程安全地读取一帧图像 - - Returns: - frame: 图像帧对象 - """ - with self._camera_lock: - if self._camera is None: - self.init_camera() - return self._camera.read() - - def show(self, image): - """ - 线程安全地显示图像 - - Args: - image: 要显示的图像对象 - """ - with self._display_lock: - if self._display is None: - self.init_display() - self._display.show(image) - - def release(self): - """释放相机和显示资源(如果需要)""" - with self._camera_lock: - if self._camera is not None: - # MaixPy 的 Camera 可能不需要显式释放,但可以在这里清理 - self._camera = None - - with self._display_lock: - if self._display is not None: - # MaixPy 的 Display 可能不需要显式释放 - self._display = None - - -# 创建全局单例实例 -camera_manager = CameraManager() - - - - - +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +相机管理器模块 +提供相机和显示的统一管理和线程安全访问 +""" +import threading +import config +from logger_manager import logger_manager + + +class CameraManager: + """相机管理器(单例)""" + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super(CameraManager, cls).__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + + # 私有对象 + self._camera = None + self._display = None + + # 线程安全锁 + self._camera_lock = threading.Lock() + self._display_lock = threading.Lock() + + # 相机配置 + self._camera_width = 640 + self._camera_height = 480 + + self._initialized = True + + # ==================== 初始化方法 ==================== + + @property + def logger(self): + """获取 logger 对象""" + return logger_manager.logger + + def init_camera(self, width=640, height=480): + """初始化相机""" + if self._camera is not None: + return self._camera + + from maix import camera + + self._camera_width = width + self._camera_height = height + + with self._camera_lock: + if self._camera is None: + self._camera = camera.Camera(width, height) + + return self._camera + + def init_display(self): + """初始化显示""" + if self._display is not None: + return self._display + + from maix import display + + with self._display_lock: + if self._display is None: + self._display = display.Display() + + return self._display + + # ==================== 访问方法 ==================== + + @property + def camera(self): + """获取相机实例(懒加载)""" + if self._camera is None: + self.init_camera() + return self._camera + + @property + def display(self): + """获取显示实例(懒加载)""" + if self._display is None: + self.init_display() + return self._display + + # ==================== 业务方法 ==================== + + def read_frame(self): + """ + 线程安全地读取一帧图像 + + Returns: + frame: 图像帧对象 + """ + with self._camera_lock: + if self._camera is None: + self.init_camera() + return self._camera.read() + + def show(self, image): + """ + 线程安全地显示图像 + + Args: + image: 要显示的图像对象 + """ + with self._display_lock: + if self._display is None: + self.init_display() + self._display.show(image) + + def release(self): + """释放相机和显示资源(如果需要)""" + with self._camera_lock: + if self._camera is not None: + # MaixPy 的 Camera 可能不需要显式释放,但可以在这里清理 + self._camera = None + + with self._display_lock: + if self._display is not None: + # MaixPy 的 Display 可能不需要显式释放 + self._display = None + + +# 创建全局单例实例 +camera_manager = CameraManager() + + + + + diff --git a/config.py b/config.py index f96782c..cb94f7f 100644 --- a/config.py +++ b/config.py @@ -12,7 +12,8 @@ APP_DIR = "/maixapp/apps/t11" LOCAL_FILENAME = "/maixapp/apps/t11/main_tmp.py" # ==================== 服务器配置 ==================== -SERVER_IP = "www.shelingxingqiu.com" +SERVER_IP = "stcp.shelingxingqiu.com" +# SERVER_IP = "www.shelingxingqiu.com" SERVER_PORT = 50005 HEARTBEAT_INTERVAL = 15 # 心跳间隔(秒) @@ -20,6 +21,23 @@ HEARTBEAT_INTERVAL = 15 # 心跳间隔(秒) HTTP_URL = "http://ws.shelingxingqiu.com" HTTP_API_PATH = "/home/shoot/device_fire/arrow/fire" +# config.py 里新增(建议放到“服务器配置”附近) + +# ===== TCP over SSL(TLS) 配置 ===== +USE_TCP_SSL = True # True=按手册走 MSSLCFG/MIPCFG 绑定 SSL +TCP_LINK_ID = 1 # +TCP_SSL_PORT = 443 # TLS 端口(不一定必须 443,以服务器为准) + +# SSL profile +SSL_ID = 1 # ssl_id=1 +SSL_AUTH_MODE = 1 # 1=单向认证(验证服务器),2=双向 +SSL_VERIFY_MODE = 1 # 1=写入并使用 CA 证书;0=不验(仅测试加密,风险大) + +SSL_CERT_FILENAME = "test.cer" # 模组里证书名(MSSLCERTWR / MSSLCFG="cert" 用) +SSL_CERT_PATH = "/root/test.cer" # 设备文件系统里 CA 证书路径(你自己放进去) +# MIPOPEN 末尾的参数在不同固件里含义可能不同;按你手册例子保留 +MIPOPEN_TAIL = ",,0" + # ==================== 文件路径配置 ==================== CONFIG_FILE = "/root/laser_config.json" LOG_FILE = "/maixapp/apps/t11/app.log" diff --git a/cpp_ext/CMakeLists.txt b/cpp_ext/CMakeLists.txt new file mode 100644 index 0000000..99b0499 --- /dev/null +++ b/cpp_ext/CMakeLists.txt @@ -0,0 +1,34 @@ +cmake_minimum_required(VERSION 3.16) +project(archery_netcore CXX) + +set(CMAKE_SYSTEM_NAME Linux) +set(CMAKE_SYSTEM_PROCESSOR riscv64) +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_POSITION_INDEPENDENT_CODE ON) + +if(NOT DEFINED PY_INCLUDE_DIR) + message(FATAL_ERROR "PY_INCLUDE_DIR not set") +endif() +if(NOT DEFINED PY_LIB) + message(FATAL_ERROR "PY_LIB not set") +endif() +if(NOT DEFINED PY_EXT_SUFFIX) + message(FATAL_ERROR "PY_EXT_SUFFIX not set") +endif() +if(NOT DEFINED MAIXCDK_PATH) + message(FATAL_ERROR "MAIXCDK_PATH not set (need components/3rd_party/pybind11)") +endif() + +add_library(archery_netcore MODULE archery_netcore.cpp) + +target_include_directories(archery_netcore PRIVATE + "${PY_INCLUDE_DIR}" + "${MAIXCDK_PATH}/components/3rd_party/pybind11/pybind11/include" +) + +set_target_properties(archery_netcore PROPERTIES + PREFIX "" + SUFFIX "${PY_EXT_SUFFIX}" +) + +target_link_libraries(archery_netcore PRIVATE "${PY_LIB}") \ No newline at end of file diff --git a/cpp_ext/archery_netcore.cpp b/cpp_ext/archery_netcore.cpp new file mode 100644 index 0000000..627e037 --- /dev/null +++ b/cpp_ext/archery_netcore.cpp @@ -0,0 +1,12 @@ +#include + +namespace py = pybind11; + +static const char* kServerIp = "stcp.shelingxingqiu.com"; + +PYBIND11_MODULE(archery_netcore, m) { + m.doc() = "Archery net core (native, pybind11)."; + m.def("server_ip", []() { + return py::str(kServerIp); + }); +} diff --git a/laser_manager.py b/laser_manager.py index bf35459..f7fc114 100644 --- a/laser_manager.py +++ b/laser_manager.py @@ -39,6 +39,11 @@ class LaserManager: # ==================== 状态访问(只读属性)==================== + @property + def logger(self): + """获取 logger 对象""" + return logger_manager.logger + @property def calibration_active(self): """是否正在校准""" @@ -69,9 +74,7 @@ class LaserManager: if config.HARDCODE_LASER_POINT: # 硬编码模式:直接使用硬编码值 self._laser_point = config.HARDCODE_LASER_POINT_VALUE - logger = logger_manager.logger - if logger: - logger.info(f"[LASER] 使用硬编码激光点: {self._laser_point}") + self.logger.info(f"[LASER] 使用硬编码激光点: {self._laser_point}") return self._laser_point # 正常模式:从配置文件加载 @@ -81,9 +84,7 @@ class LaserManager: data = json.load(f) if isinstance(data, list) and len(data) == 2: self._laser_point = (int(data[0]), int(data[1])) - logger = logger_manager.logger - if logger: - logger.debug(f"[INFO] 加载激光点: {self._laser_point}") + self.logger.debug(f"[INFO] 加载激光点: {self._laser_point}") return self._laser_point else: raise ValueError @@ -101,9 +102,7 @@ class LaserManager: if config.HARDCODE_LASER_POINT: # 硬编码模式:不保存到文件,但更新内存中的值(虽然不会被使用) self._laser_point = point - logger = logger_manager.logger - if logger: - logger.info(f"[LASER] 硬编码模式已启用,跳过保存激光点: {point}") + self.logger.info(f"[LASER] 硬编码模式已启用,跳过保存激光点: {point}") return True # 正常模式:保存到配置文件 @@ -113,24 +112,19 @@ class LaserManager: self._laser_point = point return True except Exception as e: - logger = logger_manager.logger - if logger: - logger.error(f"[LASER] 保存激光点失败: {e}") + self.logger.error(f"[LASER] 保存激光点失败: {e}") return False def turn_on_laser(self): """发送指令开启激光,并读取回包(部分模块支持)""" from hardware import hardware_manager - logger = logger_manager.logger if hardware_manager.distance_serial is None: - if logger: - logger.error("[LASER] distance_serial 未初始化") + self.logger.error("[LASER] distance_serial 未初始化") return None # 打印调试信息 - if logger: - logger.info(f"[LASER] 发送开启命令: {config.LASER_ON_CMD.hex()}") + self.logger.info(f"[LASER] 发送开启命令: {config.LASER_ON_CMD.hex()}") # 清空接收缓冲区 try: @@ -140,8 +134,7 @@ class LaserManager: # 发送命令 written = hardware_manager.distance_serial.write(config.LASER_ON_CMD) - if logger: - logger.info(f"[LASER] 写入字节数: {written}") + self.logger.info(f"[LASER] 写入字节数: {written}") # return None @@ -149,29 +142,23 @@ class LaserManager: # 读取回包 resp = hardware_manager.distance_serial.read(len=20,timeout=10) if resp: - if logger: - logger.info(f"[LASER] 收到回包 ({len(resp)}字节): {resp.hex()}") + self.logger.info(f"[LASER] 收到回包 ({len(resp)}字节): {resp.hex()}") if resp == config.LASER_ON_CMD: - if logger: - logger.info("✅ 激光开启指令已确认") + self.logger.info("✅ 激光开启指令已确认") else: - if logger: - logger.warning("🔇 无回包(可能正常或模块不支持回包)") + self.logger.warning("🔇 无回包(可能正常或模块不支持回包)") return resp def turn_off_laser(self): """发送指令关闭激光""" from hardware import hardware_manager - logger = logger_manager.logger if hardware_manager.distance_serial is None: - if logger: - logger.error("[LASER] distance_serial 未初始化") + self.logger.error("[LASER] distance_serial 未初始化") return None # 打印调试信息 - if logger: - logger.info(f"[LASER] 发送关闭命令: {config.LASER_OFF_CMD.hex()}") + self.logger.info(f"[LASER] 发送关闭命令: {config.LASER_OFF_CMD.hex()}") # 清空接收缓冲区 try: @@ -181,18 +168,15 @@ class LaserManager: # 发送命令 written = hardware_manager.distance_serial.write(config.LASER_OFF_CMD) - if logger: - logger.info(f"[LASER] 写入字节数: {written}") + self.logger.info(f"[LASER] 写入字节数: {written}") # 读取回包 resp = hardware_manager.distance_serial.read(20) if resp: - if logger: - logger.info(f"[LASER] 收到回包 ({len(resp)}字节): {resp.hex()}") + self.logger.info(f"[LASER] 收到回包 ({len(resp)}字节): {resp.hex()}") else: - if logger: - logger.warning("🔇 无回包") + self.logger.warning("🔇 无回包") return resp # 不用读回包 # return None @@ -204,9 +188,7 @@ class LaserManager: time.sleep_ms(duration_ms) self.turn_off_laser() except Exception as e: - logger = logger_manager.logger - if logger: - logger.error(f"闪激光失败: {e}") + self.logger.error(f"闪激光失败: {e}") # def find_red_laser(self, frame, threshold=150): # """在图像中查找最亮的红色激光点(基于 RGB 阈值)""" @@ -349,7 +331,7 @@ class LaserManager: import cv2 import numpy as np from maix import image - logger_manager.logger.debug(f"find_red_laser_with_ellipse start: {time.ticks_ms()}") + self.logger.debug(f"find_red_laser_with_ellipse start: {time.ticks_ms()}") # 使用配置项 if threshold is None: threshold = config.LASER_DETECTION_THRESHOLD @@ -360,7 +342,6 @@ class LaserManager: overexposed_threshold = config.LASER_OVEREXPOSED_THRESHOLD overexposed_diff = config.LASER_OVEREXPOSED_DIFF - logger = logger_manager.logger w, h = frame.width(), frame.height() center_x, center_y = w // 2, h // 2 @@ -376,8 +357,7 @@ class LaserManager: # 提取ROI区域(只处理搜索区域,而不是整个图像) roi = img_cv[y_min:y_max, x_min:x_max] if roi.size == 0: - if logger: - logger.debug("[LASER] ROI区域为空") + self.logger.debug("[LASER] ROI区域为空") return None # 分离RGB通道(向量化操作,比循环快得多) @@ -408,15 +388,14 @@ class LaserManager: # 转换为uint8格式 mask_roi = mask_combined.astype(np.uint8) * 255 - logger_manager.logger.debug(f"ellipse fitting start: {time.ticks_ms()}") + self.logger.debug(f"ellipse fitting start: {time.ticks_ms()}") # 查找轮廓(只在搜索区域内) contours, _ = cv2.findContours(mask_roi, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) - logger_manager.logger.debug(f"ellipse fitting end: {time.ticks_ms()}") + self.logger.debug(f"ellipse fitting end: {time.ticks_ms()}") if not contours: - if logger: - logger.debug("[LASER] 未找到红色像素区域") + self.logger.debug("[LASER] 未找到红色像素区域") return None - logger_manager.logger.debug(f"ellipse filtering start: {time.ticks_ms()}") + self.logger.debug(f"ellipse filtering start: {time.ticks_ms()}") # 找到最大的轮廓(应该是激光点) largest_contour = max(contours, key=cv2.contourArea) @@ -424,8 +403,7 @@ class LaserManager: area = cv2.contourArea(largest_contour) min_area = config.LASER_MIN_AREA if area < min_area: - if logger: - logger.debug(f"[LASER] 红色区域太小(面积={area:.1f}),可能是噪声(最小={min_area})") + self.logger.debug(f"[LASER] 红色区域太小(面积={area:.1f}),可能是噪声(最小={min_area})") return None # 使用椭圆拟合找到中心 @@ -446,8 +424,7 @@ class LaserManager: (x_outer, y_outer), (width_outer, height_outer), angle_outer = cv2.fitEllipse(contour_global) outer_ellipse_params = ((x_outer, y_outer), (width_outer, height_outer), angle_outer) - if logger: - logger.debug(f"[LASER] 外层红色椭圆拟合成功: 中心=({x_outer:.1f}, {y_outer:.1f}), 尺寸=({width_outer:.1f}, {height_outer:.1f}), 角度={angle_outer:.1f}°, 面积={area:.1f}") + self.logger.debug(f"[LASER] 外层红色椭圆拟合成功: 中心=({x_outer:.1f}, {y_outer:.1f}), 尺寸=({width_outer:.1f}, {height_outer:.1f}), 角度={angle_outer:.1f}°, 面积={area:.1f}") # 第二步:在外层椭圆区域内,找亮度最高的像素 # 创建外层椭圆的掩码 @@ -490,7 +467,7 @@ class LaserManager: (x_inner, y_inner), (width_inner, height_inner), angle_inner = cv2.fitEllipse(largest_brightness_contour) inner_ellipse_params = ((x_inner, y_inner), (width_inner, height_inner), angle_inner) laser_center = (float(x_inner), float(y_inner)) - logger.debug(f"[LASER] 内层亮度椭圆拟合成功: 中心=({x_inner:.1f}, {y_inner:.1f}), 尺寸=({width_inner:.1f}, {height_inner:.1f}), 角度={angle_inner:.1f}°, 面积={brightness_area:.1f}") + self.logger.debug(f"[LASER] 内层亮度椭圆拟合成功: 中心=({x_inner:.1f}, {y_inner:.1f}), 尺寸=({width_inner:.1f}, {height_inner:.1f}), 角度={angle_inner:.1f}°, 面积={brightness_area:.1f}") except Exception as e: # 内层椭圆拟合失败,使用质心 M = cv2.moments(largest_brightness_contour) @@ -498,12 +475,11 @@ class LaserManager: cx = M["m10"] / M["m00"] cy = M["m01"] / M["m00"] laser_center = (float(cx), float(cy)) - logger.debug(f"[LASER] 内层亮度椭圆拟合失败,使用质心: {laser_center}, 错误: {e}") + self.logger.debug(f"[LASER] 内层亮度椭圆拟合失败,使用质心: {laser_center}, 错误: {e}") else: # 质心计算失败,使用外层椭圆中心 laser_center = (float(x_outer), float(y_outer)) - if logger: - logger.debug(f"[LASER] 内层区域质心计算失败,使用外层椭圆中心: {laser_center}") + self.logger.debug(f"[LASER] 内层区域质心计算失败,使用外层椭圆中心: {laser_center}") elif brightness_area >= 1: # 面积太小,使用质心 M = cv2.moments(largest_brightness_contour) @@ -511,26 +487,23 @@ class LaserManager: cx = M["m10"] / M["m00"] cy = M["m01"] / M["m00"] laser_center = (float(cx), float(cy)) - logger.debug(f"[LASER] 内层区域质心计算成功: {laser_center}") + self.logger.debug(f"[LASER] 内层区域质心计算成功: {laser_center}") else: # 质心计算失败,使用外层椭圆中心 laser_center = (float(x_outer), float(y_outer)) - if logger: - logger.debug(f"[LASER] 内层区域质心计算失败,使用外层椭圆中心: {laser_center}") + self.logger.debug(f"[LASER] 内层区域质心计算失败,使用外层椭圆中心: {laser_center}") else: # brightness_area < 1,面积太小,直接使用外层椭圆中心 laser_center = (float(x_outer), float(y_outer)) - if logger: - logger.debug(f"[LASER] 内层亮度区域面积太小({brightness_area:.1f}),使用外层椭圆中心: {laser_center}") + self.logger.debug(f"[LASER] 内层亮度区域面积太小({brightness_area:.1f}),使用外层椭圆中心: {laser_center}") else: # 没有找到亮度轮廓,使用外层椭圆中心 laser_center = (float(x_outer), float(y_outer)) - logger.debug(f"[LASER] 外层椭圆区域内无有效亮度值,使用外层椭圆中心: {laser_center}") + self.logger.debug(f"[LASER] 外层椭圆区域内无有效亮度值,使用外层椭圆中心: {laser_center}") else: # 没有亮度值,使用外层椭圆中心 laser_center = (float(x_outer), float(y_outer)) - if logger: - logger.debug(f"[LASER] 外层椭圆区域内无有效亮度值,使用外层椭圆中心: {laser_center}") + self.logger.debug(f"[LASER] 外层椭圆区域内无有效亮度值,使用外层椭圆中心: {laser_center}") # 如果启用绘制椭圆,在图像上绘制 if config.LASER_DRAW_ELLIPSE: @@ -568,11 +541,10 @@ class LaserManager: from maix import image self._last_frame_with_ellipse = image.cv2image(img_cv, False, False) - if logger: - if inner_ellipse_params: - logger.debug(f"[LASER] 已绘制双层椭圆: 外层(绿色)中心=({cx_outer}, {cy_outer}), 内层(黄色)中心=({cx_inner}, {cy_inner})") - else: - logger.debug(f"[LASER] 已绘制外层椭圆: 中心=({cx_outer}, {cy_outer})") + if inner_ellipse_params: + self.logger.debug(f"[LASER] 已绘制双层椭圆: 外层(绿色)中心=({cx_outer}, {cy_outer}), 内层(黄色)中心=({cx_inner}, {cy_inner})") + else: + self.logger.debug(f"[LASER] 已绘制外层椭圆: 中心=({cx_outer}, {cy_outer})") except Exception as e: laser_ellipse_params = None # 椭圆拟合失败,使用质心 @@ -581,8 +553,8 @@ class LaserManager: cx = M["m10"] / M["m00"] cy = M["m01"] / M["m00"] laser_center = (float(cx), float(cy)) - logger.debug(f"[LASER] 椭圆拟合失败,使用质心: {laser_center}, 错误: {e}") - logger_manager.logger.debug(f"ellipse filtering start: {time.ticks_ms()}") + self.logger.debug(f"[LASER] 椭圆拟合失败,使用质心: {laser_center}, 错误: {e}") + self.logger.debug(f"ellipse filtering start: {time.ticks_ms()}") else: # 点太少,使用质心 contour_global = largest_contour.copy() @@ -595,8 +567,7 @@ class LaserManager: cx = M["m10"] / M["m00"] cy = M["m01"] / M["m00"] laser_center = (float(cx), float(cy)) - if logger: - logger.debug(f"[LASER] 点太少({len(largest_contour)}个),使用质心: {laser_center}") + self.logger.debug(f"[LASER] 点太少({len(largest_contour)}个),使用质心: {laser_center}") if laser_center is None: # 清除之前保存的椭圆图像 @@ -611,8 +582,7 @@ class LaserManager: max_distance = config.LASER_MAX_DISTANCE_FROM_CENTER if distance_from_center_final > max_distance: - if logger: - logger.warning(f"[LASER] 激光点距离中心太远: 位置={laser_center}, " + self.logger.warning(f"[LASER] 激光点距离中心太远: 位置={laser_center}, " f"距离中心={distance_from_center_final:.1f}像素, " f"最大允许距离={max_distance}像素") return None @@ -620,22 +590,20 @@ class LaserManager: # 检查是否在黄心椭圆范围内(如果启用) if config.LASER_REQUIRE_IN_ELLIPSE and ellipse_params is not None: if not self._is_point_in_ellipse(laser_center, ellipse_params): - if logger: - (ell_center, (ell_width, ell_height), ell_angle) = ellipse_params - logger.warning(f"[LASER] 激光点不在黄心椭圆内: 位置={laser_center}, " + (ell_center, (ell_width, ell_height), ell_angle) = ellipse_params + self.logger.warning(f"[LASER] 激光点不在黄心椭圆内: 位置={laser_center}, " f"椭圆中心={ell_center}, 椭圆尺寸=({ell_width:.1f}, {ell_height:.1f})") return None - if logger: - ellipse_info = "" - if config.LASER_REQUIRE_IN_ELLIPSE and ellipse_params is not None: - ellipse_info = f", 椭圆内检查: 通过" - elif not config.LASER_REQUIRE_IN_ELLIPSE: - ellipse_info = f", 椭圆检查: 已禁用" - logger.debug(f"[LASER] 找到激光点(椭圆拟合): 位置={laser_center}, " + ellipse_info = "" + if config.LASER_REQUIRE_IN_ELLIPSE and ellipse_params is not None: + ellipse_info = f", 椭圆内检查: 通过" + elif not config.LASER_REQUIRE_IN_ELLIPSE: + ellipse_info = f", 椭圆检查: 已禁用" + self.logger.debug(f"[LASER] 找到激光点(椭圆拟合): 位置={laser_center}, " f"距离中心={distance_from_center_final:.1f}像素{ellipse_info}") - if config.LASER_DRAW_ELLIPSE and self._last_frame_with_ellipse is not None: - logger.debug(f"[LASER] 已保存绘制了椭圆的图像,可通过 get_last_frame_with_ellipse() 获取") + if config.LASER_DRAW_ELLIPSE and self._last_frame_with_ellipse is not None: + self.logger.debug(f"[LASER] 已保存绘制了椭圆的图像,可通过 get_last_frame_with_ellipse() 获取") return laser_center @@ -665,7 +633,6 @@ class LaserManager: overexposed_threshold = config.LASER_OVEREXPOSED_THRESHOLD overexposed_diff = config.LASER_OVEREXPOSED_DIFF - logger = logger_manager.logger w, h = frame.width(), frame.height() center_x, center_y = w // 2, h // 2 @@ -731,13 +698,12 @@ class LaserManager: # 如果没有找到候选点,输出调试信息 if candidate_pos is None: - if logger: - if best_near_red: - logger.debug(f"[LASER] 未找到激光点,最接近的点: 位置={best_near_red}, RGB={best_near_red_rgb}, " + if best_near_red: + self.logger.debug(f"[LASER] 未找到激光点,最接近的点: 位置={best_near_red}, RGB={best_near_red_rgb}, " f"阈值={threshold}, 倍数要求={red_ratio}, r/g={best_near_red_rgb[0]/(best_near_red_rgb[1]+1):.2f}, " f"r/b={best_near_red_rgb[0]/(best_near_red_rgb[2]+1):.2f}") - else: - logger.debug(f"[LASER] 未找到激光点,搜索区域: ({x_min}, {y_min}) 到 ({x_max}, {y_max}), " + else: + self.logger.debug(f"[LASER] 未找到激光点,搜索区域: ({x_min}, {y_min}) 到 ({x_max}, {y_max}), " f"阈值={threshold}, 倍数要求={red_ratio}") return None @@ -790,8 +756,7 @@ class LaserManager: max_distance = config.LASER_MAX_DISTANCE_FROM_CENTER if distance_from_center_final > max_distance: # 距离中心太远,拒绝这个结果 - if logger: - logger.warning(f"[LASER] 找到的激光点距离中心太远: 位置={best_pos}, " + self.logger.warning(f"[LASER] 找到的激光点距离中心太远: 位置={best_pos}, " f"距离中心={distance_from_center_final:.1f}像素, " f"最大允许距离={max_distance}像素, 拒绝此结果") return None @@ -800,21 +765,19 @@ class LaserManager: if config.LASER_REQUIRE_IN_ELLIPSE and ellipse_params is not None: if not self._is_point_in_ellipse(best_pos, ellipse_params): # 不在椭圆内,拒绝这个结果 - if logger: - (ell_center, (ell_width, ell_height), ell_angle) = ellipse_params - logger.warning(f"[LASER] 找到的激光点不在黄心椭圆内: 位置={best_pos}, " + (ell_center, (ell_width, ell_height), ell_angle) = ellipse_params + self.logger.warning(f"[LASER] 找到的激光点不在黄心椭圆内: 位置={best_pos}, " f"椭圆中心={ell_center}, 椭圆尺寸=({ell_width:.1f}, {ell_height:.1f}), " f"椭圆角度={ell_angle:.1f}°, 拒绝此结果") return None # 输出成功找到激光点的日志 - if logger: - ellipse_info = "" - if config.LASER_REQUIRE_IN_ELLIPSE and ellipse_params is not None: - ellipse_info = f", 椭圆内检查: 通过" - elif not config.LASER_REQUIRE_IN_ELLIPSE: - ellipse_info = f", 椭圆检查: 已禁用" - logger.debug(f"[LASER] 找到激光点: 位置={best_pos}, RGB={best_rgb}, " + ellipse_info = "" + if config.LASER_REQUIRE_IN_ELLIPSE and ellipse_params is not None: + ellipse_info = f", 椭圆内检查: 通过" + elif not config.LASER_REQUIRE_IN_ELLIPSE: + ellipse_info = f", 椭圆检查: 已禁用" + self.logger.debug(f"[LASER] 找到激光点: 位置={best_pos}, RGB={best_rgb}, " f"亮度={max_brightness}, 距离中心={distance_from_center_final:.1f}像素{ellipse_info}, " f"阈值={threshold}, 倍数要求={red_ratio}") @@ -857,7 +820,6 @@ class LaserManager: import vision from maix import time - logger = logger_manager.logger start = time.ticks_ms() # 注意:使用 abs(time.ticks_diff(start, time.ticks_ms())) 避免负数问题 @@ -875,8 +837,7 @@ class LaserManager: # 只有检测到靶心时才继续处理激光点 if center_temp is None or radius_temp is None: - if logger: - logger.debug(f"[LASER] 未检测到靶心,跳过") + self.logger.debug(f"[LASER] 未检测到靶心,跳过") time.sleep_ms(60) continue @@ -898,15 +859,13 @@ class LaserManager: if laser_pos is None: # 未找到激光点 - if logger: - logger.debug(f"[LASER] 未找到激光点,跳过") + self.logger.debug(f"[LASER] 未找到激光点,跳过") time.sleep_ms(60) continue if not is_sharp: # 激光点模糊 - if logger: - logger.debug(f"[LASER] 激光点模糊(清晰度: {sharpness_score:.2f}),跳过") + self.logger.debug(f"[LASER] 激光点模糊(清晰度: {sharpness_score:.2f}),跳过") time.sleep_ms(60) continue @@ -914,8 +873,7 @@ class LaserManager: pos = laser_pos except Exception as e: - if logger: - logger.warning(f"[LASER] 激光点清晰度检测失败: {e},继续处理") + self.logger.warning(f"[LASER] 激光点清晰度检测失败: {e},继续处理") # 检测失败时,回退到原来的方法:直接查找激光点 # 仅在启用椭圆检查时传入椭圆参数 pos = self.find_red_laser(frame, ellipse_params=ellipse_params_temp if config.LASER_REQUIRE_IN_ELLIPSE else None) @@ -943,37 +901,33 @@ class LaserManager: vision.save_calibration_image(frame_to_save, pos) except Exception as e: - if logger: - logger.error(f"[LASER] 保存校准图像失败: {e}") + self.logger.error(f"[LASER] 保存校准图像失败: {e}") # 设置结果、停止校准、保存坐标 self.set_calibration_result(pos) self.stop_calibration() self.save_laser_point(pos) - if logger: - if sharpness_score is not None: - logger.info(f"✅ 校准成功: {pos} (清晰度: {sharpness_score:.2f}, 靶心: {center_temp}, 半径: {radius_temp})") - else: - logger.info(f"✅ 校准成功: {pos} (靶心: {center_temp}, 半径: {radius_temp})") + if sharpness_score is not None: + self.logger.info(f"✅ 校准成功: {pos} (清晰度: {sharpness_score:.2f}, 靶心: {center_temp}, 半径: {radius_temp})") + else: + self.logger.info(f"✅ 校准成功: {pos} (靶心: {center_temp}, 半径: {radius_temp})") return pos # 未找到激光点,继续循环 time.sleep_ms(60) except Exception as e: - if logger: - logger.error(f"[LASER] 校准过程异常: {e}") - import traceback - logger.error(traceback.format_exc()) + self.logger.error(f"[LASER] 校准过程异常: {e}") + import traceback + self.logger.error(traceback.format_exc()) time.sleep_ms(200) # 超时或校准被停止 - if logger: - if self._calibration_active: - logger.warning(f"[LASER] 校准超时({timeout_ms}ms)") - else: - logger.info("[LASER] 校准已停止") + if self._calibration_active: + self.logger.warning(f"[LASER] 校准超时({timeout_ms}ms)") + else: + self.logger.info("[LASER] 校准已停止") return None @@ -1012,9 +966,7 @@ class LaserManager: distance_int = int(hex_string) return distance_int / 1000.0 except Exception as e: - logger = logger_manager.logger - if logger: - logger.error(f"[LASER] BCD 解析失败: {e}") + self.logger.error(f"[LASER] BCD 解析失败: {e}") return 0.0 def read_distance_from_laser_sensor(self): @@ -1022,11 +974,8 @@ class LaserManager: 返回: (distance_m, signal_quality) 元组,失败返回 (0.0, 0) """ from hardware import hardware_manager - logger = logger_manager.logger - if hardware_manager.distance_serial is None: - if logger: - logger.error("[LASER] distance_serial 未初始化") + self.logger.error("[LASER] distance_serial 未初始化") return (0.0, 0) try: @@ -1057,8 +1006,7 @@ class LaserManager: # 注意:使用 time.ticks_diff(start_time, time.ticks_ms()) 避免负数问题 elapsed_ms = abs(time.ticks_diff(start_time, time.ticks_ms())) if elapsed_ms >= max_wait_ms: - if logger: - logger.warning(f"[LASER] 读取超时 ({elapsed_ms}ms),未收到完整响应") + self.logger.warning(f"[LASER] 读取超时 ({elapsed_ms}ms),未收到完整响应") return (0.0, 0) # 尝试读取数据 @@ -1067,8 +1015,7 @@ class LaserManager: # 如果读到完整数据,立即返回 if response and len(response) == config.DISTANCE_RESPONSE_LEN: elapsed_ms = abs(time.ticks_diff(start_time, time.ticks_ms())) - if logger: - logger.debug(f"[LASER] 收到响应 ({elapsed_ms}ms)") + self.logger.debug(f"[LASER] 收到响应 ({elapsed_ms}ms)") break # 如果还没超时,短暂等待后继续尝试 @@ -1079,8 +1026,7 @@ class LaserManager: if response[3] != 0x20: if response[0] == 0xEE: err_code = (response[7] << 8) | response[8] - if logger: - logger.warning(f"[LASER] 模块错误代码: {hex(err_code)}") + self.logger.warning(f"[LASER] 模块错误代码: {hex(err_code)}") return (0.0, 0) # 解析BCD码距离 @@ -1088,17 +1034,13 @@ class LaserManager: distance_value_m = self.parse_bcd_distance(bcd_bytes) signal_quality = (response[10] << 8) | response[11] - if logger: - logger.debug(f"[LASER] 测距成功: {distance_value_m:.3f} m, 信号质量: {signal_quality}") + self.logger.debug(f"[LASER] 测距成功: {distance_value_m:.3f} m, 信号质量: {signal_quality}") return (distance_value_m, signal_quality) - if logger: - logger.warning(f"[LASER] 无效响应: {response.hex() if response else 'None'}") + self.logger.warning(f"[LASER] 无效响应: {response.hex() if response else 'None'}") return (0.0, 0) except Exception as e: - logger = logger_manager.logger - if logger: - logger.error(f"[LASER] 读取激光测距失败: {e}") + self.logger.error(f"[LASER] 读取激光测距失败: {e}") return (0.0, 0) def calculate_laser_point_from_distance(self, distance_m): @@ -1126,9 +1068,7 @@ class LaserManager: laser_x = center_x laser_y = center_y + int(pixel_offset_y) - logger = logger_manager.logger - if logger: - logger.debug(f"[LASER] 根据距离 {distance_m:.2f}m 计算激光点: ({laser_x}, {laser_y}), 像素偏移: {pixel_offset_y:.2f}") + self.logger.debug(f"[LASER] 根据距离 {distance_m:.2f}m 计算激光点: ({laser_x}, {laser_y}), 像素偏移: {pixel_offset_y:.2f}") return (laser_x, laser_y) @@ -1166,10 +1106,10 @@ class LaserManager: lx, ly = laser_point # r = 22.16 * 5 r = radius * 5 - logger_manager.logger.debug(f"compute_laser_position: circle_center: {circle_center} laser_point: {laser_point} radius: {radius} method: {method} r: {r}") + self.logger.debug(f"compute_laser_position: circle_center: {circle_center} laser_point: {laser_point} radius: {radius} method: {method} r: {r}") target_x = (lx-cx)/r*100 target_y = (ly-cy)/r*100 - logger_manager.logger.info(f"lx:{lx} ly: {ly} cx: {cx} cy: {cy} result_x: {target_x} result_y: {-target_y} real_r_x: {lx-cx} real_r_y: {-1*(ly-cy)}") + self.logger.info(f"lx:{lx} ly: {ly} cx: {cx} cy: {cy} result_x: {target_x} result_y: {-target_y} real_r_x: {lx-cx} real_r_y: {-1*(ly-cy)}") return (target_x, -target_y) # # 根据检测方法动态调整靶心物理半径(简化模型) @@ -1184,7 +1124,6 @@ class LaserManager: 激光开启时间最小化(约500-600ms),尽量不让用户觉察到 返回: (distance_m, signal_quality) 元组,失败返回 (0.0, 0) """ - logger = logger_manager.logger self._laser_turned_on = False try: @@ -1197,8 +1136,7 @@ class LaserManager: result = self.read_distance_from_laser_sensor() return result except Exception as e: - if logger: - logger.error(f"[LASER] 快速测距异常: {e}") + self.logger.error(f"[LASER] 快速测距异常: {e}") return (0.0, 0) finally: # 确保激光关闭 @@ -1206,8 +1144,7 @@ class LaserManager: try: self.turn_off_laser() except Exception as e: - if logger: - logger.error(f"[LASER] 关闭激光失败: {e}") + self.logger.error(f"[LASER] 关闭激光失败: {e}") # 创建全局单例实例 diff --git a/main.py b/main.py index a8f4e27..820ecde 100644 --- a/main.py +++ b/main.py @@ -169,10 +169,9 @@ def cmd_str(): # 3. 初始化 INA226 电量监测芯片 init_ina226() - # 4. 加载激光点配置 - laser_manager.load_laser_point() + - # 5. 初始化显示和相机 + # 4. 初始化显示和相机 camera_manager.init_camera(640, 480) camera_manager.init_display() @@ -275,6 +274,9 @@ def cmd_str(): _thread.start_new_thread(network_manager.tcp_main, ()) _thread.start_new_thread(laser_calibration_worker, ()) + # 7. 加载激光点配置 + laser_manager.load_laser_point() + if logger: logger.info("系统准备完成...") diff --git a/network.py b/network.py index 42f1722..44ed8b5 100644 --- a/network.py +++ b/network.py @@ -55,11 +55,15 @@ class NetworkManager: self._wifi_socket_lock = threading.Lock() self._prefer_wifi = True # 是否优先使用WiFi self._wifi_recv_buffer = b"" # WiFi接收缓冲区 - self._initialized = True # ==================== 状态访问(只读属性)==================== + @property + def logger(self): + """获取 logger 对象""" + return logger_manager.logger + @property def tcp_connected(self): """TCP连接状态""" @@ -162,17 +166,13 @@ class NetworkManager: with open("/device_key", "r") as f: device_id = f.read().strip() if device_id: - logger = logger_manager.logger - if logger: - logger.debug(f"[INFO] 从 /device_key 读取到 DEVICE_ID: {device_id}") + self.logger.debug(f"[INFO] 从 /device_key 读取到 DEVICE_ID: {device_id}") # 设置内部状态 self._device_id = device_id self._password = device_id + "." return device_id except Exception as e: - logger = logger_manager.logger - if logger: - logger.error(f"[ERROR] 无法读取 /device_key: {e}") + self.logger.error(f"[ERROR] 无法读取 /device_key: {e}") # 使用默认值 default_id = "DEFAULT_DEVICE_ID" @@ -246,18 +246,14 @@ class NetworkManager: if ip: self._wifi_connected = True self._wifi_ip = ip - logger = logger_manager.logger - if logger: - logger.info(f"[WIFI] 已连接,IP: {ip}") + self.logger.info(f"[WIFI] 已连接,IP: {ip}") return ip, None std_time.sleep(1) return None, "Timeout: No IP obtained" except Exception as e: - logger = logger_manager.logger - if logger: - logger.error(f"[WIFI] 连接失败: {e}") + self.logger.error(f"[WIFI] 连接失败: {e}") return None, f"Exception: {str(e)}" def is_server_reachable(self, host, port=80, timeout=5): @@ -270,9 +266,7 @@ class NetworkManager: s.close() return True except Exception as e: - logger = logger_manager.logger - if logger: - logger.warning(f"[NET] 无法连接 {host}:{port} - {e}") + self.logger.warning(f"[NET] 无法连接 {host}:{port} - {e}") return False # ==================== 网络选择策略 ==================== @@ -290,31 +284,25 @@ class NetworkManager: if prefer_wifi is None: prefer_wifi = self._prefer_wifi - logger = logger_manager.logger - # 策略1:如果指定优先WiFi,且WiFi可用,使用WiFi if prefer_wifi and self.is_wifi_connected(): # 检查WiFi是否能连接到服务器 if self.is_server_reachable(config.SERVER_IP, config.SERVER_PORT, timeout=3): self._network_type = "wifi" - if logger: - logger.info(f"[NET] 选择WiFi网络,IP: {self._wifi_ip}") + self.logger.info(f"[NET] 选择WiFi网络,IP: {self._wifi_ip}") return "wifi" else: - if logger: - logger.warning("[NET] WiFi已连接但无法访问服务器,尝试4G") + self.logger.warning("[NET] WiFi已连接但无法访问服务器,尝试4G") # 策略2:如果WiFi可用,使用WiFi if self.is_wifi_connected(): if self.is_server_reachable(config.SERVER_IP, config.SERVER_PORT, timeout=3): self._network_type = "wifi" - if logger: - logger.info(f"[NET] 选择WiFi网络,IP: {self._wifi_ip}") + self.logger.info(f"[NET] 选择WiFi网络,IP: {self._wifi_ip}") return "wifi" # 策略3:回退到4G - if logger: - logger.info("[NET] WiFi不可用或无法连接服务器,使用4G网络") + self.logger.info("[NET] WiFi不可用或无法连接服务器,使用4G网络") self._network_type = "4g" return "4g" @@ -362,9 +350,7 @@ class NetworkManager: if not network_type: return False - logger = logger_manager.logger - if logger: - logger.info(f"连接到服务器(使用{network_type.upper()})...") + self.logger.info(f"连接到服务器,使用{network_type.upper()}...") # 根据网络类型建立TCP连接 if network_type == "wifi": @@ -389,15 +375,11 @@ class NetworkManager: self._wifi_socket.setblocking(False) self._tcp_connected = True - logger = logger_manager.logger - if logger: - logger.info("[WIFI-TCP] TCP连接已建立") + self.logger.info("[WIFI-TCP] TCP连接已建立") return True except Exception as e: - logger = logger_manager.logger - if logger: - logger.error(f"[WIFI-TCP] 连接失败: {e}") + self.logger.error(f"[WIFI-TCP] 连接失败: {e}") if self._wifi_socket: try: @@ -409,16 +391,30 @@ class NetworkManager: return False def _connect_tcp_via_4g(self): - """通过4G模块建立TCP连接""" + """通过4G模块建立TCP连接(支持按手册绑定 SSL)""" + link_id = getattr(config, "TCP_LINK_ID", 0) + use_ssl = getattr(config, "USE_TCP_SSL", False) + + host = config.SERVER_IP + port = getattr(config, "TCP_SSL_PORT", 443) if use_ssl else config.SERVER_PORT + tail = getattr(config, "MIPOPEN_TAIL", "") with self.get_uart_lock(): - hardware_manager.at_client.send("AT+MIPCLOSE=0", "OK", 1000) - res = hardware_manager.at_client.send(f'AT+MIPOPEN=0,"TCP","{config.SERVER_IP}",{config.SERVER_PORT}', "+MIPOPEN", 8000) - if "+MIPOPEN: 0,0" in res: + resp = hardware_manager.at_client.send(f"AT+MIPCLOSE={link_id}", "OK", 1000) + self.logger.info(f"[4G-TCP] AT+MIPCLOSE={link_id} response: {resp}") + + if use_ssl: + ok = self._configure_ssl_before_connect(link_id) + if not ok: + return False + + # 按手册:AT+MIPOPEN=1,"TCP","host",443,,0 + # cmd = f'AT+MIPOPEN={link_id},"TCP","{host}",{port}{tail}' + cmd = f'AT+MIPOPEN={link_id},"TCP","{host}",{port}' + res = hardware_manager.at_client.send(cmd, "+MIPOPEN", 8000) + self.logger.info(f"[4G-TCP] {cmd} response: {res}") + if f"+MIPOPEN: {link_id},0" in res: self._tcp_connected = True return True - else: - logger = logger_manager.logger - logger.error(f"[4G-TCP] 连接失败: {res}") return False def _check_wifi_connection(self): @@ -442,9 +438,7 @@ class NetworkManager: def disconnect_server(self): """断开TCP连接""" if self._tcp_connected: - logger = logger_manager.logger - if logger: - logger.info("与服务器断开链接") + self.logger.info("与服务器断开链接") if self._network_type == "wifi": self._disconnect_tcp_via_wifi() @@ -465,9 +459,10 @@ class NetworkManager: self._wifi_socket = None def _disconnect_tcp_via_4g(self): - """断开4G TCP连接""" + link_id = getattr(config, "TCP_LINK_ID", 0) with self.get_uart_lock(): - hardware_manager.at_client.send("AT+MIPCLOSE=0", "OK", 1000) + hardware_manager.at_client.send(f"AT+MIPCLOSE={link_id}", "OK", 1000) + def tcp_send_raw(self, data: bytes, max_retries=2) -> bool: """ @@ -489,9 +484,7 @@ class NetworkManager: elif self._network_type == "4g": return self._tcp_send_raw_via_4g(data, max_retries) else: - logger = logger_manager.logger - if logger: - logger.error("[NET] 未选择网络类型,无法发送数据") + self.logger.error("[NET] 未选择网络类型,无法发送数据") return False def _tcp_send_raw_via_wifi(self, data: bytes, max_retries=2) -> bool: @@ -508,9 +501,7 @@ class NetworkManager: sent = self._wifi_socket.send(data[total_sent:]) if sent == 0: # socket连接已断开 - logger = logger_manager.logger - if logger: - logger.warning(f"[WIFI-TCP] 发送失败,socket已断开(尝试 {attempt+1}/{max_retries})") + self.logger.warning(f"[WIFI-TCP] 发送失败,socket已断开(尝试 {attempt+1}/{max_retries})") break total_sent += sent @@ -521,23 +512,19 @@ class NetworkManager: time.sleep_ms(50) except OSError as e: - logger = logger_manager.logger - if logger: - logger.error(f"[WIFI-TCP] 发送异常: {e}(尝试 {attempt+1}/{max_retries})") + self.logger.error(f"[WIFI-TCP] 发送异常: {e}(尝试 {attempt+1}/{max_retries})") time.sleep_ms(50) except Exception as e: - logger = logger_manager.logger - if logger: - logger.error(f"[WIFI-TCP] 未知错误: {e}(尝试 {attempt+1}/{max_retries})") + self.logger.error(f"[WIFI-TCP] 未知错误: {e}(尝试 {attempt+1}/{max_retries})") time.sleep_ms(50) return False def _tcp_send_raw_via_4g(self, data: bytes, max_retries=2) -> bool: - """通过4G模块发送TCP数据""" + link_id = getattr(config, "TCP_LINK_ID", 0) with self.get_uart_lock(): for _ in range(max_retries): - cmd = f'AT+MIPSEND=0,{len(data)}' + cmd = f'AT+MIPSEND={link_id},{len(data)}' if ">" not in hardware_manager.at_client.send(cmd, ">", 2000): time.sleep_ms(50) continue @@ -551,15 +538,62 @@ class NetworkManager: total += n hardware_manager.uart4g.write(b"\x1A") - r = hardware_manager.at_client.send("", "OK", 8000) if ("SEND OK" in r) or ("OK" in r) or ("+MIPSEND" in r): return True - time.sleep_ms(50) - return False - + + def _configure_ssl_before_connect(self, link_id: int) -> bool: + """按手册:MSSLCFG(auth) -> (可选) MSSLCERTWR -> MSSLCFG(cert) -> MIPCFG(ssl)""" + ssl_id = getattr(config, "SSL_ID", 1) + auth_mode = getattr(config, "SSL_AUTH_MODE", 1) + verify_mode = getattr(config, "SSL_VERIFY_MODE", 0) + + # 1) 配置认证方式 + # r = hardware_manager.at_client.send(f'AT+MSSLCFG="auth",{ssl_id},{auth_mode}', "OK", 3000) + r = hardware_manager.at_client.send(f'AT+MSSLCFG="auth",{ssl_id},0', "OK", 3000) + self.logger.info(f"[4G-TCP] AT+MSSLCFG=\"auth\",{ssl_id},0 response: {r}") + if "OK" not in r: + return False + + # 2) 写入根证书(只有 verify_mode=1 才需要) + # if verify_mode == 1: + if False: + cert_filename = getattr(config, "SSL_CERT_FILENAME", None) + cert_path = getattr(config, "SSL_CERT_PATH", None) + if not cert_filename or not cert_path: + return False + + # 读取证书文件(设备侧路径) + with open(cert_path, "rb") as f: + cert_data = f.read() + + # 按手册:AT+MSSLCERTWR="file",0,size -> 等待 ">" -> 写入证书内容 -> 等 OK + r = hardware_manager.at_client.send(f'AT+MSSLCERTWR="{cert_filename}",0,{len(cert_data)}', ">", 5000) + if ">" not in r: + return False + + # 直接写原始字节,不要额外拼 \r\n + hardware_manager.uart4g.write(cert_data) + + r = hardware_manager.at_client.send("", "OK", 8000) + if "OK" not in r: + return False + + # 3) 引用根证书 + r = hardware_manager.at_client.send(f'AT+MSSLCFG="cert",{ssl_id},"{cert_filename}"', "OK", 3000) + if "OK" not in r: + return False + + # 4) 绑定 TCP 通道到 ssl_id,并启用 + r = hardware_manager.at_client.send(f'AT+MIPCFG="ssl",{link_id},{ssl_id},1', "OK", 3000) + self.logger.info(f"[4G-TCP] AT+MIPCFG=\"ssl\",{link_id},{ssl_id},1 response: {r}") + if "OK" not in r: + return False + + return True + def receive_tcp_data_via_wifi(self, timeout_ms=100): """ 通过WiFi接收TCP数据 @@ -586,9 +620,7 @@ class NetworkManager: return b"" except OSError as e: # socket错误(连接断开等) - logger = logger_manager.logger - if logger: - logger.warning(f"[WIFI-TCP] 接收数据失败: {e}") + self.logger.warning(f"[WIFI-TCP] 接收数据失败: {e}") # 关闭socket try: @@ -600,9 +632,7 @@ class NetworkManager: return b"" except Exception as e: - logger = logger_manager.logger - if logger: - logger.error(f"[WIFI-TCP] 接收数据异常: {e}") + self.logger.error(f"[WIFI-TCP] 接收数据异常: {e}") return b"" @@ -615,9 +645,7 @@ class NetworkManager: def send_http_cmd(self, cmd_str, timeout_ms=3000): """发送 HTTP 相关 AT 指令(调试用)""" - logger = logger_manager.logger - if logger: - logger.debug(f"[HTTP AT] => {cmd_str}") + self.logger.debug(f"[HTTP AT] => {cmd_str}") return hardware_manager.at_client.send(cmd_str, "OK", timeout_ms) @@ -644,9 +672,7 @@ class NetworkManager: import _thread from maix import camera - logger = logger_manager.logger - if logger: - logger.info("[NET] TCP主线程启动") + self.logger.info("[NET] TCP主线程启动") send_hartbeat_fail_count = 0 last_charging_check = 0 @@ -666,9 +692,7 @@ class NetworkManager: time.sleep_ms(200) continue except Exception as e: - logger = logger_manager.logger - if logger: - logger.error(f"[NET] OTA检查异常: {e}") + self.logger.error(f"[NET] OTA检查异常: {e}") time.sleep_ms(200) continue @@ -690,8 +714,7 @@ class NetworkManager: time.sleep_ms(2000) continue - if logger: - logger.info("➡️ 登录包已发送,等待确认...") + self.logger.info("➡️ 登录包已发送,等待确认...") logged_in = False pending_cleared = False last_heartbeat_ack_time = time.ticks_ms() @@ -705,9 +728,7 @@ class NetworkManager: time.sleep_ms(200) continue except Exception as e: - logger = logger_manager.logger - if logger: - logger.error(f"[NET] OTA检查异常: {e}") + self.logger.error(f"[NET] OTA检查异常: {e}") time.sleep_ms(200) continue @@ -752,8 +773,7 @@ class NetworkManager: if not logged_in: try: - if logger: - logger.debug(f"[TCP] rx link={link_id} len={len(payload)} head={payload[:12].hex()}") + self.logger.debug(f"[TCP] rx link={link_id} len={len(payload)} head={payload[:12].hex()}") except: pass @@ -764,8 +784,7 @@ class NetworkManager: if body and body.get("cmd") == 1 and body.get("data") == "登录成功": logged_in = True last_heartbeat_ack_time = time.ticks_ms() - if logger: - logger.info("登录成功") + self.logger.info("登录成功") # 检查 ota_pending.json try: @@ -777,19 +796,16 @@ class NetworkManager: except: pending_obj = {} self.safe_enqueue({"result": "ota_ok", "url": pending_obj.get("url", "")}, 2) - if logger: - logger.info("[OTA] 已上报 ota_ok,等待心跳确认后删除 pending") + self.logger.info("[OTA] 已上报 ota_ok,等待心跳确认后删除 pending") except Exception as e: - if logger: - logger.error(f"[OTA] ota_ok 上报失败: {e}") + self.logger.error(f"[OTA] ota_ok 上报失败: {e}") else: break # 处理心跳 ACK elif logged_in and msg_type == 4: last_heartbeat_ack_time = time.ticks_ms() - if logger: - logger.debug("✅ 收到心跳确认") + self.logger.debug("✅ 收到心跳确认") # 处理命令40(分片下载) elif logged_in and msg_type == 40: @@ -806,8 +822,7 @@ class NetworkManager: self._raw_line_data.clear() self._raw_line_data.append(body) if len(self._raw_line_data) >= int(t): - if logger: - logger.info(f"下载完成") + self.logger.info(f"下载完成") from ota_manager import ota_manager stock_array = list(map(lambda x: x.get('d'), self._raw_line_data)) local_filename = config.LOCAL_FILENAME @@ -816,8 +831,7 @@ class NetworkManager: ota_manager.apply_ota_and_reboot(None, local_filename) else: self.safe_enqueue({'data':{'l': len(self._raw_line_data), 'v': v}, 'cmd': 41}) - if logger: - logger.info(f"已下载{len(self._raw_line_data)} 全部:{t} 版本:{v}") + self.logger.info(f"已下载{len(self._raw_line_data)} 全部:{t} 版本:{v}") # 处理业务指令 elif logged_in and isinstance(body, dict): @@ -843,8 +857,7 @@ class NetworkManager: battery_percent = voltage_to_percent(voltage) battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)} self.safe_enqueue(battery_data, 2) - if logger: - logger.info(f"电量上报: {battery_percent}%") + self.logger.info(f"电量上报: {battery_percent}%") elif inner_cmd == 5: # OTA 升级 inner_data = data_obj.get("data", {}) if isinstance(data_obj, dict) else {} ssid = inner_data.get("ssid") @@ -853,8 +866,7 @@ class NetworkManager: mode = (inner_data.get("mode") or "").strip().lower() if not ota_url: - if logger: - logger.error("ota missing_url") + self.logger.error("ota missing_url") self.safe_enqueue({"result": "missing_url"}, 2) continue @@ -865,17 +877,14 @@ class NetworkManager: # 自动判断模式:如果没有明确指定,根据WiFi连接状态和凭证决定 if mode not in ("4g", "wifi"): - if logger: - logger.info("ota missing mode, auto-detecting...") + self.logger.info("ota missing mode, auto-detecting...") # 只有同时满足:WiFi已连接 且 提供了WiFi凭证,才使用WiFi if self.is_wifi_connected() and ssid and password: mode = "wifi" - if logger: - logger.info("ota auto-selected: wifi (WiFi connected and credentials provided)") + self.logger.info("ota auto-selected: wifi (WiFi connected and credentials provided)") else: mode = "4g" - if logger: - logger.info("ota auto-selected: 4g (WiFi not available or no credentials)") + self.logger.info("ota auto-selected: 4g (WiFi not available or no credentials)") if mode == "4g": ota_manager._set_ota_url(ota_url) # 记录 OTA URL,供命令7使用 @@ -883,8 +892,7 @@ class NetworkManager: _thread.start_new_thread(ota_manager.direct_ota_download_via_4g, (ota_url,)) else: # mode == "wifi" if not ssid or not password: - if logger: - logger.error("ota wifi mode requires ssid and password") + self.logger.error("ota wifi mode requires ssid and password") self.safe_enqueue({"result": "missing_ssid_or_password"}, 2) else: ota_manager._start_update_thread() @@ -914,20 +922,17 @@ class NetworkManager: # 如果 ota_manager.ota_url 为 None,需要从其他地方获取 ota_url_to_use = ota_manager.ota_url if not ota_url_to_use: - if logger: - logger.error("[OTA] cmd=7 但 OTA_URL 未设置") + self.logger.error("[OTA] cmd=7 但 OTA_URL 未设置") self.safe_enqueue({"result": "ota_failed", "reason": "ota_url_not_set"}, 2) else: ota_manager._start_update_thread() _thread.start_new_thread(ota_manager.direct_ota_download, (ota_url_to_use,)) elif inner_cmd == 41: - if logger: - logger.info("[TEST] 收到TCP射箭触发命令") + self.logger.info("[TEST] 收到TCP射箭触发命令") self._manual_trigger_flag = True self.safe_enqueue({"result": "trigger_ack"}, 2) elif inner_cmd == 42: # 关机命令 - if logger: - logger.info("[SHUTDOWN] 收到TCP关机命令,准备关机...") + self.logger.info("[SHUTDOWN] 收到TCP关机命令,准备关机...") self.safe_enqueue({"result": "shutdown_ack"}, 2) time.sleep_ms(1000) self.disconnect_server() @@ -975,22 +980,19 @@ class NetworkManager: if logged_in and current_time - last_heartbeat_send_time > config.HEARTBEAT_INTERVAL * 1000: vol_val = get_bus_voltage() if not self.tcp_send_raw(self.make_packet(4, {"vol": vol_val, "vol_per": voltage_to_percent(vol_val)})): - if logger: - logger.error("心跳发送失败") + self.logger.error("心跳发送失败") time.sleep_ms(3000) send_hartbeat_fail_count += 1 if send_hartbeat_fail_count >= 3: send_hartbeat_fail_count = 0 - if logger: - logger.error("连续3次发送心跳失败,重连") + self.logger.error("连续3次发送心跳失败,重连") break else: continue else: send_hartbeat_fail_count = 0 last_heartbeat_send_time = current_time - if logger: - logger.debug("心跳已发送") + self.logger.debug("心跳已发送") # 删除 pending 文件(心跳发送成功后) if not pending_cleared: @@ -1000,39 +1002,28 @@ class NetworkManager: try: os.remove(pending_path) pending_cleared = True - if logger: - logger.info("[OTA] 心跳发送成功,已删除 ota_pending.json") + self.logger.info("[OTA] 心跳发送成功,已删除 ota_pending.json") except Exception as e: - if logger: - logger.error(f"[OTA] 删除 pending 文件失败: {e}") + self.logger.error(f"[OTA] 删除 pending 文件失败: {e}") except Exception as e: - if logger: - logger.error(f"[OTA] 检查 pending 文件时出错: {e}") + self.logger.error(f"[OTA] 检查 pending 文件时出错: {e}") # 心跳超时重连 if logged_in and current_time - last_heartbeat_ack_time > 1000*60*10: - if logger: - logger.error("十分钟无心跳ACK,重连") + self.logger.error("十分钟无心跳ACK,重连") break time.sleep_ms(50) self._tcp_connected = False - if logger: - logger.error("连接异常,2秒后重连...") + self.logger.error("连接异常,2秒后重连...") time.sleep_ms(2000) except Exception as e: # TCP主循环的顶层异常捕获,防止线程静默退出 - logger = logger_manager.logger - if logger: - logger.error(f"[NET] TCP主循环异常: {e}") - import traceback - logger.error(traceback.format_exc()) - else: - print(f"[NET] TCP主循环异常: {e}") - import traceback - traceback.print_exc() + self.logger.error(f"[NET] TCP主循环异常: {e}") + import traceback + self.logger.error(traceback.format_exc()) self._tcp_connected = False time.sleep_ms(5000) # 等待5秒后重试连接 diff --git a/ota_manager.py b/ota_manager.py index 7d3926e..6e56f7f 100644 --- a/ota_manager.py +++ b/ota_manager.py @@ -51,6 +51,11 @@ class OTAManager: # ==================== 状态访问(只读属性)==================== + @property + def logger(self): + """获取 logger 对象""" + return logger_manager.logger + @property def update_thread_started(self): """OTA线程是否已启动""" @@ -128,14 +133,10 @@ class OTAManager: filename_lower = filename.lower() if filename_lower.endswith('.zip'): - logger = logger_manager.logger - if logger: - logger.info(f"[EXTRACT] 检测到ZIP文件(扩展名: .zip)") + self.logger.info(f"[EXTRACT] 检测到ZIP文件(扩展名: .zip)") return True, 'zip' - logger = logger_manager.logger - if logger: - logger.info(f"[EXTRACT] 不是ZIP格式(扩展名: {os.path.splitext(filename)[1] or '无'})") + self.logger.info(f"[EXTRACT] 不是ZIP格式(扩展名: {os.path.splitext(filename)[1] or '无'})") return False, None def extract_zip_archive(self, archive_path, extract_to_dir=None, target_file=None): @@ -153,8 +154,7 @@ class OTAManager: if extract_to_dir is None: extract_to_dir = os.path.dirname(archive_path) or '/tmp' - logger = logger_manager.logger - logger.info(f"[EXTRACT] 开始解压ZIP文件: {archive_path}") + self.logger.info(f"[EXTRACT] 开始解压ZIP文件: {archive_path}") try: os.makedirs(extract_to_dir, exist_ok=True) @@ -167,18 +167,18 @@ class OTAManager: result = os.system(cmd) if result != 0: - logger.warning(f"[EXTRACT] 直接解压目标文件失败,尝试解压所有文件...") + self.logger.warning(f"[EXTRACT] 直接解压目标文件失败,尝试解压所有文件...") cmd_all = f"unzip -q -o '{archive_path}' -d '{extract_to_dir}' 2>&1" result_all = os.system(cmd_all) if result_all != 0: - logger.error(f"[EXTRACT] 解压失败,退出码: {result_all}") + self.logger.error(f"[EXTRACT] 解压失败,退出码: {result_all}") return False, None return True, extract_to_dir except Exception as e: - logger.error(f"[EXTRACT] 解压过程出错: {e}") + self.logger.error(f"[EXTRACT] 解压过程出错: {e}") return False, None def apply_ota_and_reboot(self, ota_url=None, downloaded_file=None): @@ -203,12 +203,11 @@ class OTAManager: ota_pending = f"{config.APP_DIR}/ota_pending.json" - logger = logger_manager.logger - logger.info(f"[OTA] 准备应用OTA更新,下载文件: {downloaded_file}") + self.logger.info(f"[OTA] 准备应用OTA更新,下载文件: {downloaded_file}") try: if not os.path.exists(downloaded_file): - logger.error(f"[OTA] 错误:{downloaded_file} 不存在") + self.logger.error(f"[OTA] 错误:{downloaded_file} 不存在") return False # 备份 @@ -230,9 +229,9 @@ class OTAManager: f.write(str(counter)) backup_dir = os.path.join(backup_base, f"backup_{counter:04d}") - logger.info(f"[OTA] 使用备份目录: {backup_dir} (第{counter}次OTA)") + self.logger.info(f"[OTA] 使用备份目录: {backup_dir} (第{counter}次OTA)") except Exception as e: - logger.error(f"[OTA] 生成备份目录名失败: {e},使用默认目录") + self.logger.error(f"[OTA] 生成备份目录名失败: {e},使用默认目录") backup_dir = os.path.join(backup_base, "backup_0000") # 清理旧备份 @@ -255,11 +254,11 @@ class OTAManager: for item, dir_num, item_path in backup_dirs[config.MAX_BACKUPS:]: try: shutil.rmtree(item_path, ignore_errors=True) - logger.info(f"[OTA] 已删除旧备份: {item}") + self.logger.info(f"[OTA] 已删除旧备份: {item}") except Exception as e: - logger.warning(f"[OTA] 删除旧备份失败: {e}") + self.logger.warning(f"[OTA] 删除旧备份失败: {e}") except Exception as e: - logger.warning(f"[OTA] 清理旧备份时出错: {e}") + self.logger.warning(f"[OTA] 清理旧备份时出错: {e}") os.makedirs(backup_dir, exist_ok=True) @@ -286,16 +285,15 @@ class OTAManager: shutil.copy2(source_path, backup_path) backed_up_files.append(rel_path) except Exception as e: - if logger: - logger.error(f"[OTA] 备份 {rel_path} 失败: {e}") + self.logger.error(f"[OTA] 备份 {rel_path} 失败: {e}") if backed_up_files: - logger.info(f"[OTA] 总共备份了 {len(backed_up_files)} 个文件到 {backup_dir}") + self.logger.info(f"[OTA] 总共备份了 {len(backed_up_files)} 个文件到 {backup_dir}") else: - logger.warning(f"[OTA] 没有备份任何文件") + self.logger.warning(f"[OTA] 没有备份任何文件") except Exception as e: - logger.error(f"[OTA] 备份过程出错: {e}") + self.logger.error(f"[OTA] 备份过程出错: {e}") if not backup_dir: backup_dir = None @@ -309,15 +307,15 @@ class OTAManager: with open(downloaded_file, "rb") as f: zip_header = f.read(4) if zip_header[:2] != b'PK': - logger.error(f"[OTA] ZIP文件头验证失败: {zip_header.hex()}") + self.logger.error(f"[OTA] ZIP文件头验证失败: {zip_header.hex()}") return False file_size = os.path.getsize(downloaded_file) - logger.info(f"[OTA] ZIP文件验证通过: 大小={file_size} bytes, 头={zip_header.hex()}") + self.logger.info(f"[OTA] ZIP文件验证通过: 大小={file_size} bytes, 头={zip_header.hex()}") except Exception as e: - logger.error(f"[OTA] ZIP文件验证异常: {e}") + self.logger.error(f"[OTA] ZIP文件验证异常: {e}") return False - logger.info(f"[OTA] 检测到ZIP压缩包,开始解压...") + self.logger.info(f"[OTA] 检测到ZIP压缩包,开始解压...") extract_dir = "/tmp/ota_extract" try: os.makedirs(extract_dir, exist_ok=True) @@ -339,12 +337,12 @@ class OTAManager: files_to_copy.append((source_path, rel_path)) if files_to_copy: - logger.info(f"[OTA] 解压成功,共 {len(files_to_copy)} 个文件") + self.logger.info(f"[OTA] 解压成功,共 {len(files_to_copy)} 个文件") else: - logger.error(f"[OTA] 解压成功但未找到任何文件") + self.logger.error(f"[OTA] 解压成功但未找到任何文件") return False else: - logger.error(f"[OTA] 解压失败") + self.logger.error(f"[OTA] 解压失败") return False else: # 单个文件更新:从下载的文件名推断目标文件名 @@ -359,11 +357,11 @@ class OTAManager: target_rel_path = filename files_to_copy = [(downloaded_file, target_rel_path)] - logger.info(f"[OTA] 单个文件更新: {downloaded_file} -> {target_rel_path}") + self.logger.info(f"[OTA] 单个文件更新: {downloaded_file} -> {target_rel_path}") # 复制文件 if not files_to_copy: - logger.error(f"[OTA] 没有文件需要复制") + self.logger.error(f"[OTA] 没有文件需要复制") return False copied_files = [] @@ -372,7 +370,7 @@ class OTAManager: # 检查源文件和目标文件是否是同一个文件(避免复制到自身) if os.path.abspath(source_path) == os.path.abspath(dest_path): - logger.warning(f"[OTA] 源文件和目标文件相同,跳过复制: {rel_path} (文件已在正确位置)") + self.logger.warning(f"[OTA] 源文件和目标文件相同,跳过复制: {rel_path} (文件已在正确位置)") copied_files.append(rel_path) continue @@ -386,13 +384,13 @@ class OTAManager: try: shutil.copy2(source_path, dest_path) copied_files.append(rel_path) - logger.info(f"[OTA] 已复制: {rel_path}") + self.logger.info(f"[OTA] 已复制: {rel_path}") except Exception as e: - logger.error(f"[OTA] 复制 {rel_path} 失败: {e}") + self.logger.error(f"[OTA] 复制 {rel_path} 失败: {e}") return False if copied_files: - logger.info(f"[OTA] 成功复制 {len(copied_files)} 个文件到应用目录") + self.logger.info(f"[OTA] 成功复制 {len(copied_files)} 个文件到应用目录") # 确保写入磁盘 try: @@ -421,7 +419,7 @@ class OTAManager: except: pass except Exception as e: - logger.error(f"[OTA] 写入 ota_pending 失败: {e}") + self.logger.error(f"[OTA] 写入 ota_pending 失败: {e}") # 通知服务器(延迟导入避免循环导入) from network import safe_enqueue @@ -433,9 +431,9 @@ class OTAManager: try: if os.path.exists(extract_dir): shutil.rmtree(extract_dir, ignore_errors=True) - logger.info(f"[OTA] 已清理临时解压目录: {extract_dir}") + self.logger.info(f"[OTA] 已清理临时解压目录: {extract_dir}") except Exception as e: - logger.warning(f"[OTA] 清理临时目录失败(可忽略): {e}") + self.logger.warning(f"[OTA] 清理临时目录失败(可忽略): {e}") # 清理下载文件 try: @@ -443,9 +441,9 @@ class OTAManager: # 删除下载的文件 try: os.remove(downloaded_file) - logger.info(f"[OTA] 已删除下载文件: {downloaded_file}") + self.logger.info(f"[OTA] 已删除下载文件: {downloaded_file}") except Exception as e: - logger.warning(f"[OTA] 删除下载文件失败(可忽略): {e}") + self.logger.warning(f"[OTA] 删除下载文件失败(可忽略): {e}") # 尝试删除时间戳目录(如果为空) try: @@ -457,25 +455,24 @@ class OTAManager: files_in_dir = os.listdir(download_dir) if not files_in_dir: os.rmdir(download_dir) - logger.info(f"[OTA] 已删除空时间戳目录: {download_dir}") + self.logger.info(f"[OTA] 已删除空时间戳目录: {download_dir}") except Exception as e: - logger.debug(f"[OTA] 删除时间戳目录失败(可忽略): {e}") + self.logger.debug(f"[OTA] 删除时间戳目录失败(可忽略): {e}") except Exception as e: - logger.debug(f"[OTA] 清理时间戳目录时出错(可忽略): {e}") + self.logger.debug(f"[OTA] 清理时间戳目录时出错(可忽略): {e}") except Exception as e: - logger.warning(f"[OTA] 清理下载文件时出错(可忽略): {e}") + self.logger.warning(f"[OTA] 清理下载文件时出错(可忽略): {e}") # 重启设备 - logger.info("[OTA] 准备重启设备...") + self.logger.info("[OTA] 准备重启设备...") os.system("reboot") return True except Exception as e: - logger = logger_manager.logger - logger.error(f"[OTA] apply_ota_and_reboot 异常: {e}") + self.logger.error(f"[OTA] apply_ota_and_reboot 异常: {e}") import traceback - logger.error(traceback.format_exc()) + self.logger.error(traceback.format_exc()) return False def get_download_timestamp_dir(self): @@ -512,8 +509,7 @@ class OTAManager: try: os.makedirs(download_dir, exist_ok=True) except Exception as e: - logger = logger_manager.logger - logger.warning(f"[OTA] 创建下载目录失败: {e},使用基础目录") + self.logger.warning(f"[OTA] 创建下载目录失败: {e},使用基础目录") download_dir = download_base try: os.makedirs(download_dir, exist_ok=True) @@ -522,8 +518,7 @@ class OTAManager: return download_dir except Exception as e: - logger = logger_manager.logger - logger.error(f"[OTA] 获取下载目录失败: {e},使用默认目录") + self.logger.error(f"[OTA] 获取下载目录失败: {e},使用默认目录") return "/tmp/download" def get_filename_from_url(self, url, default_name="main_tmp"): @@ -553,16 +548,14 @@ class OTAManager: # 只有在完全无法提取文件名时,才使用默认名称 return f"{download_dir}/{default_name}" except Exception as e: - logger = logger_manager.logger - logger.error(f"[OTA] 从URL提取文件名失败: {e},使用默认文件名") + self.logger.error(f"[OTA] 从URL提取文件名失败: {e},使用默认文件名") download_dir = self.get_download_timestamp_dir() return f"{download_dir}/{default_name}" def download_file(self, url, filename): """从指定 URL 下载文件,根据文件类型自动选择文本或二进制模式,并支持MD5校验""" try: - logger = logger_manager.logger - logger.info(f"正在从 {url} 下载文件...") + self.logger.info(f"正在从 {url} 下载文件...") response = requests.get(url) response.raise_for_status() @@ -570,7 +563,7 @@ class OTAManager: md5_b64_expected = None if 'Content-Md5' in response.headers: md5_b64_expected = response.headers['Content-Md5'].strip() - logger.info(f"[DOWNLOAD] 服务器提供了MD5校验值: {md5_b64_expected}") + self.logger.info(f"[DOWNLOAD] 服务器提供了MD5校验值: {md5_b64_expected}") # 根据文件扩展名判断是否为二进制文件 filename_lower = filename.lower() @@ -586,14 +579,14 @@ class OTAManager: os.sync() except: pass - logger.info(f"[DOWNLOAD] 使用二进制模式下载: {filename}, 大小: {len(data)} bytes") + self.logger.info(f"[DOWNLOAD] 使用二进制模式下载: {filename}, 大小: {len(data)} bytes") else: # 文本文件:使用文本模式写入 response.encoding = 'utf-8' with open(filename, 'w', encoding='utf-8') as file: file.write(response.text) - logger.info(f"[DOWNLOAD] 使用文本模式下载: {filename}") + self.logger.info(f"[DOWNLOAD] 使用文本模式下载: {filename}") # MD5 校验(如果服务器提供了MD5值) if md5_b64_expected and hashlib is not None: @@ -604,18 +597,18 @@ class OTAManager: md5_b64_got = binascii.b2a_base64(digest).decode().strip() if md5_b64_got != md5_b64_expected: - logger.error(f"[DOWNLOAD] MD5校验失败: 期望={md5_b64_expected}, 实际={md5_b64_got}") + self.logger.error(f"[DOWNLOAD] MD5校验失败: 期望={md5_b64_expected}, 实际={md5_b64_got}") return f"下载失败!MD5校验失败: 期望={md5_b64_expected}, 实际={md5_b64_got}" else: - logger.info(f"[DOWNLOAD] MD5校验通过: {md5_b64_got}") + self.logger.info(f"[DOWNLOAD] MD5校验通过: {md5_b64_got}") except Exception as e: - logger.warning(f"[DOWNLOAD] MD5校验过程出错: {e}") + self.logger.warning(f"[DOWNLOAD] MD5校验过程出错: {e}") # MD5校验出错时,如果是二进制文件(特别是ZIP),应该失败 if is_binary: return f"下载失败!MD5校验异常: {e}" elif is_binary and not md5_b64_expected: # 二进制文件(特别是ZIP)建议有MD5校验 - logger.warning(f"[DOWNLOAD] 警告: 服务器未提供MD5校验值,无法验证文件完整性") + self.logger.warning(f"[DOWNLOAD] 警告: 服务器未提供MD5校验值,无法验证文件完整性") return f"下载成功!文件已保存为: {filename}" except requests.exceptions.RequestException as e: @@ -647,11 +640,10 @@ class OTAManager: return downloaded_filename = self.get_filename_from_url(ota_url, default_name="main_tmp") - logger = logger_manager.logger - logger.info(f"[OTA] 下载文件将保存为: {downloaded_filename}") - logger.info(f"[OTA] 开始下载: {ota_url}") + self.logger.info(f"[OTA] 下载文件将保存为: {downloaded_filename}") + self.logger.info(f"[OTA] 开始下载: {ota_url}") result_msg = self.download_file(ota_url, downloaded_filename) - logger.info(f"[OTA] {result_msg}") + self.logger.info(f"[OTA] {result_msg}") if "成功" in result_msg or "下载成功" in result_msg: if self.apply_ota_and_reboot(ota_url, downloaded_filename): @@ -662,8 +654,7 @@ class OTAManager: except Exception as e: error_msg = f"OTA 异常: {str(e)}" - logger = logger_manager.logger - logger.error(error_msg) + self.logger.error(error_msg) from network import safe_enqueue safe_enqueue({"result": "ota_failed", "reason": error_msg}, 2) finally: @@ -707,11 +698,11 @@ class OTAManager: else: base_url = f"http://{host}" self._is_https = False - logger = logger_manager.logger + # logger removed - use self.logger instead def _log(*a): - if debug and logger: - logger.debug(" ".join(str(x) for x in a)) + if debug: + self.logger.debug(" ".join(str(x) for x in a)) def _pwr_log(prefix=""): """debug 用:输出电压/电量""" @@ -720,12 +711,10 @@ class OTAManager: try: v = get_bus_voltage() p = voltage_to_percent(v) - if logger: - logger.debug(f"[PWR]{prefix} v={v:.3f}V p={p}%") + self.logger.debug(f"[PWR]{prefix} v={v:.3f}V p={p}%") except Exception as e: try: - if logger: - logger.debug(f"[PWR]{prefix} read_failed: {e}") + self.logger.debug(f"[PWR]{prefix} read_failed: {e}") except: pass @@ -792,7 +781,7 @@ class OTAManager: if self._is_https: resp = hardware_manager.at_client.send(f'AT+MHTTPCFG="ssl",{hid},1,1', "OK", 2000) if "ERROR" in resp or "CME ERROR" in resp: - logger_manager.logger.error(f"MHTTPCFG SSL failed: {resp}") + self.logger.error(f"MHTTPCFG SSL failed: {resp}") # 尝试https 降级到http downgraded_base_url = base_url.replace("https://", "http://") resp = hardware_manager.at_client.send(f'AT+MHTTPCREATE="{downgraded_base_url}"', "OK", 8000) @@ -1035,14 +1024,12 @@ class OTAManager: got_b64 = binascii.b2a_base64(digest).decode().strip() if got_b64 != expect_md5_b64: return False, f"md5_mismatch got={got_b64} expected={expect_md5_b64}" - if logger: - logger.debug(f"[4G-DL] MD5 verified: {got_b64}") + self.logger.debug(f"[4G-DL] MD5 verified: {got_b64}") except Exception as e: return False, f"md5_check_failed: {e}" t_cost = time.ticks_diff(time.ticks_ms(), t_func0) - if logger: - logger.info(f"[4G-DL] download complete: size={offset} ip={ip} cost_ms={t_cost}") + self.logger.info(f"[4G-DL] download complete: size={offset} ip={ip} cost_ms={t_cost}") return True, f"OK size={offset} ip={ip} cost_ms={t_cost}" finally: @@ -1076,9 +1063,9 @@ class OTAManager: # 从URL中提取文件名(保留原始扩展名) downloaded_filename = self.get_filename_from_url(ota_url, default_name="main_tmp") - logger_manager.logger.info(f"[OTA-4G] 下载文件将保存为: {downloaded_filename}") + self.logger.info(f"[OTA-4G] 下载文件将保存为: {downloaded_filename}") - logger_manager.logger.info(f"[OTA-4G] 开始通过 4G 下载: {ota_url}") + self.logger.info(f"[OTA-4G] 开始通过 4G 下载: {ota_url}") # 重要说明: # - AT+MDIALUP / RNDIS 是"USB 主机拨号上网"模式,在不少 ML307R 固件上会占用/切换内部网络栈, # 从而导致 AT+MIPOPEN / +MIPURC 这套 TCP 连接无法工作(你会看到一直"连接到服务器...")。 @@ -1090,15 +1077,15 @@ class OTAManager: import power v = power.get_bus_voltage() p = power.voltage_to_percent(v) - logger_manager.logger.info(f"[OTA-4G][PWR] before_urc v={v:.3f}V p={p}%") + self.logger.info(f"[OTA-4G][PWR] before_urc v={v:.3f}V p={p}%") except Exception as e: - logger_manager.logger.error(f"[OTA-4G][PWR] before_urc read_failed: {e}") + self.logger.error(f"[OTA-4G][PWR] before_urc read_failed: {e}") t_dl0 = time.ticks_ms() success, msg = self.download_file_via_4g(ota_url, downloaded_filename, debug=False) t_dl_cost = time.ticks_diff(t_dl0, time.ticks_ms()) - logger_manager.logger.info(f"[OTA-4G] {msg}") - logger_manager.logger.info(f"[OTA-4G] download_cost_ms={t_dl_cost}") + self.logger.info(f"[OTA-4G] {msg}") + self.logger.info(f"[OTA-4G] download_cost_ms={t_dl_cost}") if success and "OK" in msg: if self.apply_ota_and_reboot(ota_url, downloaded_filename): @@ -1108,13 +1095,13 @@ class OTAManager: except Exception as e: error_msg = f"OTA-4G 异常: {str(e)}" - logger_manager.logger.error(error_msg) + self.logger.error(error_msg) safe_enqueue({"result": "ota_failed", "reason": error_msg}, 2) finally: # 总耗时(注意:若成功并 reboot,这行可能来不及打印) try: t_cost = time.ticks_diff(time.ticks_ms(), t_ota0) - logger_manager.logger.info(f"[OTA-4G] total_cost_ms={t_cost}") + self.logger.info(f"[OTA-4G] total_cost_ms={t_cost}") except: pass self._stop_update_thread() @@ -1147,21 +1134,15 @@ class OTAManager: if not network_manager.is_server_reachable(host, port, timeout=8): err_msg = f"网络不通:无法连接 {host}:{port}" safe_enqueue({"result": err_msg}, 2) - logger = logger_manager.logger - if logger: - logger.error(err_msg) + self.logger.error(err_msg) return downloaded_filename = self.get_filename_from_url(ota_url, default_name="main_tmp") - logger = logger_manager.logger - if logger: - logger.info(f"[OTA] 下载文件将保存为: {downloaded_filename}") + self.logger.info(f"[OTA] 下载文件将保存为: {downloaded_filename}") - if logger: - logger.info(f"[NET] 已确认可访问 {host}:{port},开始下载...") + self.logger.info(f"[NET] 已确认可访问 {host}:{port},开始下载...") result = self.download_file(ota_url, downloaded_filename) - if logger: - logger.info(result) + self.logger.info(result) if "成功" in result or "下载成功" in result: if self.apply_ota_and_reboot(ota_url, downloaded_filename): @@ -1188,9 +1169,7 @@ class OTAManager: try: if backup_dir_path is None: if not os.path.exists(backup_base): - logger = logger_manager.logger - if logger: - logger.error(f"[RESTORE] 备份目录不存在: {backup_base}") + self.logger.error(f"[RESTORE] 备份目录不存在: {backup_base}") return False backup_dirs = [] @@ -1207,9 +1186,7 @@ class OTAManager: pass if not backup_dirs: - logger = logger_manager.logger - if logger: - logger.error(f"[RESTORE] 没有找到备份目录") + self.logger.error(f"[RESTORE] 没有找到备份目录") return False backup_dirs.sort(key=lambda x: x[1], reverse=True) @@ -1217,14 +1194,10 @@ class OTAManager: backup_dir_path = os.path.join(backup_base, latest_backup) if not os.path.exists(backup_dir_path): - logger = logger_manager.logger - if logger: - logger.error(f"[RESTORE] 备份目录不存在: {backup_dir_path}") + self.logger.error(f"[RESTORE] 备份目录不存在: {backup_dir_path}") return False - logger = logger_manager.logger - if logger: - logger.info(f"[RESTORE] 开始从备份恢复: {backup_dir_path}") + self.logger.info(f"[RESTORE] 开始从备份恢复: {backup_dir_path}") restored_files = [] for root, dirs, files in os.walk(backup_dir_path): @@ -1240,25 +1213,19 @@ class OTAManager: try: shutil.copy2(source_path, dest_path) restored_files.append(rel_path) - if logger: - logger.info(f"[RESTORE] 已恢复: {rel_path}") + self.logger.info(f"[RESTORE] 已恢复: {rel_path}") except Exception as e: - if logger: - logger.error(f"[RESTORE] 恢复 {rel_path} 失败: {e}") + self.logger.error(f"[RESTORE] 恢复 {rel_path} 失败: {e}") if restored_files: - if logger: - logger.info(f"[RESTORE] 成功恢复 {len(restored_files)} 个文件") + self.logger.info(f"[RESTORE] 成功恢复 {len(restored_files)} 个文件") return True else: - if logger: - logger.info(f"[RESTORE] 没有文件被恢复") + self.logger.info(f"[RESTORE] 没有文件被恢复") return False except Exception as e: - logger = logger_manager.logger - if logger: - logger.error(f"[RESTORE] 恢复过程出错: {e}") + self.logger.error(f"[RESTORE] 恢复过程出错: {e}") return False diff --git a/shot_id_generator.py b/shot_id_generator.py index ce1f1c7..5b403c8 100644 --- a/shot_id_generator.py +++ b/shot_id_generator.py @@ -1,76 +1,76 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -射箭ID生成器 -为每次射箭生成唯一ID,格式:{timestamp_ms}_{counter} -""" -from maix import time -import threading - - -class ShotIDGenerator: - """射箭ID生成器(单例)""" - _instance = None - _lock = threading.Lock() - - def __new__(cls): - if cls._instance is None: - with cls._lock: - if cls._instance is None: - cls._instance = super(ShotIDGenerator, cls).__new__(cls) - cls._instance._initialized = False - return cls._instance - - def __init__(self): - if self._initialized: - return - - self._counter = 0 - self._last_timestamp_ms = 0 - self._lock = threading.Lock() - - self._initialized = True - - def generate_id(self, device_id=None): - """ - 生成唯一的射箭ID - - Args: - device_id: 可选的设备ID,如果提供则包含在ID中(格式:{device_id}_{timestamp_ms}_{counter}) - 如果不提供,则使用简单格式(格式:{timestamp_ms}_{counter}) - - Returns: - str: 唯一的射箭ID - """ - with self._lock: - current_timestamp_ms = time.ticks_ms() - - # 如果时间戳相同,增加计数器;否则重置计数器 - if current_timestamp_ms == self._last_timestamp_ms: - self._counter += 1 - else: - self._counter = 0 - self._last_timestamp_ms = current_timestamp_ms - - # 生成ID - if device_id: - shot_id = f"{device_id}_{current_timestamp_ms}_{self._counter}" - else: - shot_id = f"{current_timestamp_ms}_{self._counter}" - - return shot_id - - def reset(self): - """重置计数器(通常不需要调用)""" - with self._lock: - self._counter = 0 - self._last_timestamp_ms = 0 - - -# 创建全局单例实例 -shot_id_generator = ShotIDGenerator() - - - - - +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +射箭ID生成器 +为每次射箭生成唯一ID,格式:{timestamp_ms}_{counter} +""" +from maix import time +import threading + + +class ShotIDGenerator: + """射箭ID生成器(单例)""" + _instance = None + _lock = threading.Lock() + + def __new__(cls): + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super(ShotIDGenerator, cls).__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + + self._counter = 0 + self._last_timestamp_ms = 0 + self._lock = threading.Lock() + + self._initialized = True + + def generate_id(self, device_id=None): + """ + 生成唯一的射箭ID + + Args: + device_id: 可选的设备ID,如果提供则包含在ID中(格式:{device_id}_{timestamp_ms}_{counter}) + 如果不提供,则使用简单格式(格式:{timestamp_ms}_{counter}) + + Returns: + str: 唯一的射箭ID + """ + with self._lock: + current_timestamp_ms = time.ticks_ms() + + # 如果时间戳相同,增加计数器;否则重置计数器 + if current_timestamp_ms == self._last_timestamp_ms: + self._counter += 1 + else: + self._counter = 0 + self._last_timestamp_ms = current_timestamp_ms + + # 生成ID + if device_id: + shot_id = f"{device_id}_{current_timestamp_ms}_{self._counter}" + else: + shot_id = f"{current_timestamp_ms}_{self._counter}" + + return shot_id + + def reset(self): + """重置计数器(通常不需要调用)""" + with self._lock: + self._counter = 0 + self._last_timestamp_ms = 0 + + +# 创建全局单例实例 +shot_id_generator = ShotIDGenerator() + + + + + diff --git a/version.py b/version.py index de0a931..aececdc 100644 --- a/version.py +++ b/version.py @@ -1,17 +1,17 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -应用版本号 -每次 OTA 更新时,只需要更新这个文件中的版本号 -""" -VERSION = '1.1.5' - - - - - - - - - - +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +应用版本号 +每次 OTA 更新时,只需要更新这个文件中的版本号 +""" +VERSION = '1.1.5' + + + + + + + + + +