diff --git a/S99archery b/S99archery new file mode 100644 index 0000000..e70c36c --- /dev/null +++ b/S99archery @@ -0,0 +1,79 @@ +#!/bin/sh +# /etc/init.d/S99archery +# 系统启动时处理致命错误恢复(仅处理无法启动的情况) +# 注意:应用的启动由系统自动启动机制处理(通过 auto_start.txt) +# 功能: +# 1. 处理致命错误(无法启动)- 恢复 main.py +# 2. 如果重启次数超过阈值,恢复 main.py 并重启系统 + +APP_DIR="/maixapp/apps/t11" +MAIN_PY="$APP_DIR/main.py" +PENDING_FILE="$APP_DIR/ota_pending.json" +BACKUP_BASE="$APP_DIR/backups" + +# 进入应用目录 +cd "$APP_DIR" || exit 0 + +# 检查 pending 文件,如果存在且超过重启次数,恢复 main.py(处理致命错误) +if [ -f "$PENDING_FILE" ]; then + echo "[S99] 检测到 ota_pending.json,检查重启计数..." + + # 尝试从JSON中提取重启计数(使用grep简单提取) + RESTART_COUNT=$(cat "$PENDING_FILE" 2>/dev/null | grep -o '"restart_count":[0-9]*' | grep -o '[0-9]*' || echo "0") + MAX_RESTARTS=$(cat "$PENDING_FILE" 2>/dev/null | grep -o '"max_restarts":[0-9]*' | grep -o '[0-9]*' || echo "3") + + if [ -n "$RESTART_COUNT" ] && [ "$RESTART_COUNT" -ge "$MAX_RESTARTS" ]; then + echo "[S99] 检测到重启次数 ($RESTART_COUNT) 超过阈值 ($MAX_RESTARTS),恢复 main.py..." + + # 尝试从JSON中提取备份目录 + BACKUP_DIR=$(cat "$PENDING_FILE" 2>/dev/null | grep -o '"backup_dir":"[^"]*"' | grep -o '/[^"]*' || echo "") + + if [ -n "$BACKUP_DIR" ] && [ -f "$BACKUP_DIR/main.py" ]; then + # 使用指定的备份目录 + echo "[S99] 从备份目录恢复: $BACKUP_DIR/main.py" + cp "$BACKUP_DIR/main.py" "$MAIN_PY" 2>/dev/null && echo "[S99] 已恢复 main.py" + else + # 查找最新的备份目录 + LATEST_BACKUP=$(ls -dt "$BACKUP_BASE"/backup_* 2>/dev/null | head -1) + if [ -n "$LATEST_BACKUP" ] && [ -f "$LATEST_BACKUP/main.py" ]; then + echo "[S99] 从最新备份恢复: $LATEST_BACKUP/main.py" + cp "$LATEST_BACKUP/main.py" "$MAIN_PY" 2>/dev/null && echo "[S99] 已恢复 main.py" + else + # 如果没有备份目录,尝试使用 main.py.bak + if [ -f "$APP_DIR/main.py.bak" ]; then + echo "[S99] 从 main.py.bak 恢复" + cp "$APP_DIR/main.py.bak" "$MAIN_PY" 2>/dev/null && echo "[S99] 已恢复 main.py" + fi + fi + fi + + # 恢复后重置重启计数,避免循环恢复 + # 注意:不在这里删除 pending 文件,让 main.py 在心跳成功后删除 + # 但是重置重启计数,以便恢复后的版本可以重新开始计数 + python3 -c " +import json, os +try: + pending_path = '$PENDING_FILE' + if os.path.exists(pending_path): + with open(pending_path, 'r', encoding='utf-8') as f: + d = json.load(f) + d['restart_count'] = 0 # 重置重启计数 + with open(pending_path, 'w', encoding='utf-8') as f: + json.dump(d, f) + print('[S99] 已重置重启计数为 0') +except Exception as e: + print(f'[S99] 重置重启计数失败: {e}') +" 2>/dev/null || echo "[S99] 无法重置重启计数(可能需要Python支持)" + + echo "[S99] 已恢复 main.py,重启系统..." + echo "[S99] 注意:pending 文件将在心跳成功后由 main.py 删除" + sleep 2 + reboot + exit 0 + fi +fi + +# 不启动应用,让系统自动启动机制处理 +# 这个脚本只负责处理致命错误恢复 +exit 0 + diff --git a/at_client.py b/at_client.py new file mode 100644 index 0000000..a87da55 --- /dev/null +++ b/at_client.py @@ -0,0 +1,307 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +AT客户端模块 +负责4G模块的AT命令通信和URC解析 +""" +import _thread +from maix import time +import re +import threading + +class ATClient: + """ + 单读者 AT/URC 客户端:唯一读取 uart4g,避免 tcp_main/at()/OTA 抢读导致 EOF / 丢包。 + - send(cmd, expect, timeout_ms) : 发送 AT 并等待 expect + - pop_tcp_payload() : 获取 +MIPURC:"rtcp" 的 payload(已按长度裁剪) + - pop_http_event() : 获取 +MHTTPURC 事件(header/content) + """ + def __init__(self, uart_obj): + self.uart = uart_obj + self._cmd_lock = threading.Lock() + self._q_lock = threading.Lock() + self._rx = b"" + self._tcp_payloads = [] + self._http_events = [] + + # 当前命令等待状态(仅允许单命令 in-flight) + self._waiting = False + self._expect = b"OK" + self._resp = b"" + + self._running = False + + def start(self): + if self._running: + return + self._running = True + _thread.start_new_thread(self._reader_loop, ()) + + def stop(self): + self._running = False + + def flush(self): + """清空内部缓存与队列(用于 OTA/异常恢复)""" + with self._q_lock: + self._rx = b"" + self._tcp_payloads.clear() + self._http_events.clear() + self._resp = b"" + + def pop_tcp_payload(self): + with self._q_lock: + if self._tcp_payloads: + return self._tcp_payloads.pop(0) + return None + + def pop_http_event(self): + with self._q_lock: + if self._http_events: + return self._http_events.pop(0) + return None + + def _push_tcp_payload(self, payload: bytes): + # 注意:在 _reader_loop 内部解析 URC 时已经持有 _q_lock, + # 这里不要再次 acquire(锁不可重入,会死锁)。 + self._tcp_payloads.append(payload) + + def _push_http_event(self, ev): + # 同上:避免在 _reader_loop 持锁期间二次 acquire + self._http_events.append(ev) + + def send(self, cmd: str, expect: str = "OK", timeout_ms: int = 2000): + """ + 发送 AT 命令并等待 expect(子串匹配)。 + 注意:expect=">" 用于等待 prompt。 + """ + expect_b = expect.encode() if isinstance(expect, str) else expect + with self._cmd_lock: + # 初始化等待 + self._waiting = True + self._expect = expect_b + self._resp = b"" + + # 发送 + if cmd: + # 注意:这里不要再用 uart4g_lock(否则外层已经持锁时会死锁)。 + # 写入由 _cmd_lock 串行化即可。 + self.uart.write((cmd + "\r\n").encode()) + + t0 = time.ticks_ms() + while time.ticks_ms() - t0 < timeout_ms: + if (not self._waiting) or (self._expect in self._resp): + self._waiting = False + break + time.sleep_ms(5) + + # 超时也返回已收集内容(便于诊断) + self._waiting = False + try: + return self._resp.decode(errors="ignore") + except: + return str(self._resp) + + def _find_urc_tag(self, tag: bytes): + """ + 只在"真正的 URC 边界"查找 tag,避免误命中 HTTP payload 内容。 + 规则:tag 必须出现在 buffer 开头,或紧跟在 b"\\r\\n" 后面。 + """ + try: + i = 0 + rx = self._rx + while True: + j = rx.find(tag, i) + if j < 0: + return -1 + if j == 0: + return 0 + if j >= 2 and rx[j - 2:j] == b"\r\n": + return j + i = j + 1 + except: + return -1 + + def _parse_mipurc_rtcp(self): + """ + 解析:+MIPURC: "rtcp",,, + 之前硬编码 link_id=0 会导致在多连接/重连场景下收不到数据。 + """ + prefix = b'+MIPURC: "rtcp",' + i = self._find_urc_tag(prefix) + if i < 0: + return False + # 丢掉前置噪声 + if i > 0: + self._rx = self._rx[i:] + i = 0 + + j = len(prefix) + # 解析 link_id + k = j + while k < len(self._rx) and 48 <= self._rx[k] <= 57: + k += 1 + if k == j or k >= len(self._rx): + return False + if self._rx[k:k+1] != b",": + self._rx = self._rx[1:] + return True + try: + link_id = int(self._rx[j:k].decode()) + except: + self._rx = self._rx[1:] + return True + + # 解析 len + j2 = k + 1 + k2 = j2 + while k2 < len(self._rx) and 48 <= self._rx[k2] <= 57: + k2 += 1 + if k2 == j2 or k2 >= len(self._rx): + return False + if self._rx[k2:k2+1] != b",": + self._rx = self._rx[1:] + return True + try: + n = int(self._rx[j2:k2].decode()) + except: + self._rx = self._rx[1:] + return True + + payload_start = k2 + 1 + payload_end = payload_start + n + if len(self._rx) < payload_end: + return False # payload 未收齐 + + payload = self._rx[payload_start:payload_end] + # 把 link_id 一起带上,便于上层过滤(如果需要) + self._push_tcp_payload((link_id, payload)) + self._rx = self._rx[payload_end:] + return True + + def _parse_mhttpurc_header(self): + tag = b'+MHTTPURC: "header",' + i = self._find_urc_tag(tag) + if i < 0: + return False + if i > 0: + self._rx = self._rx[i:] + i = 0 + + # header: +MHTTPURC: "header",,,, + j = len(tag) + comma_count = 0 + k = j + while k < len(self._rx) and comma_count < 3: + if self._rx[k:k+1] == b",": + comma_count += 1 + k += 1 + if comma_count < 3: + return False + + prefix = self._rx[:k] + m = re.search(rb'\+MHTTPURC: "header",\s*(\d+),\s*(\d+),\s*(\d+),', prefix) + if not m: + self._rx = self._rx[1:] + return True + urc_id = int(m.group(1)) + code = int(m.group(2)) + hdr_len = int(m.group(3)) + + text_start = k + text_end = text_start + hdr_len + if len(self._rx) < text_end: + return False + + hdr_text = self._rx[text_start:text_end].decode("utf-8", "ignore") + self._push_http_event(("header", urc_id, code, hdr_text)) + self._rx = self._rx[text_end:] + return True + + def _parse_mhttpurc_content(self): + tag = b'+MHTTPURC: "content",' + i = self._find_urc_tag(tag) + if i < 0: + return False + if i > 0: + self._rx = self._rx[i:] + i = 0 + + # content: +MHTTPURC: "content",,,,, + j = len(tag) + comma_count = 0 + k = j + while k < len(self._rx) and comma_count < 4: + if self._rx[k:k+1] == b",": + comma_count += 1 + k += 1 + if comma_count < 4: + return False + + prefix = self._rx[:k] + m = re.search(rb'\+MHTTPURC: "content",\s*(\d+),\s*(\d+),\s*(\d+),\s*(\d+),', prefix) + if not m: + self._rx = self._rx[1:] + return True + urc_id = int(m.group(1)) + total_len = int(m.group(2)) + sum_len = int(m.group(3)) + cur_len = int(m.group(4)) + + payload_start = k + payload_end = payload_start + cur_len + if len(self._rx) < payload_end: + return False + + payload = self._rx[payload_start:payload_end] + self._push_http_event(("content", urc_id, total_len, sum_len, cur_len, payload)) + self._rx = self._rx[payload_end:] + return True + + def _reader_loop(self): + while self._running: + # 关键:UART 驱动偶发 read failed,必须兜住,否则线程挂了 OTA/TCP 都会卡死 + try: + d = self.uart.read(4096) # 8192 在一些驱动上更容易触发 read failed + except Exception as e: + try: + print("[ATClient] uart read failed:", e) + except: + pass + time.sleep_ms(50) + continue + + if not d: + time.sleep_ms(1) + continue + + with self._q_lock: + self._rx += d + if self._waiting: + self._resp += d + + while True: + progressed = ( + self._parse_mipurc_rtcp() + or self._parse_mhttpurc_header() + or self._parse_mhttpurc_content() + ) + if not progressed: + break + + # 使用 ota_manager 访问 ota_in_progress + try: + from ota_manager import ota_manager + ota_flag = ota_manager.ota_in_progress + except: + ota_flag = False + + has_http_hint = (b"+MHTTP" in self._rx) or (b"+MHTTPURC" in self._rx) + if ota_flag or has_http_hint: + if len(self._rx) > 512 * 1024: + self._rx = self._rx[-256 * 1024:] + else: + if len(self._rx) > 16384: + self._rx = self._rx[-4096:] + + + diff --git a/config.py b/config.py new file mode 100644 index 0000000..b62fe63 --- /dev/null +++ b/config.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +系统配置常量 +这些值在程序运行期间基本不变,或只在配置时改变 +""" +from version import VERSION + +# ==================== 应用配置 ==================== +APP_VERSION = VERSION +APP_DIR = "/maixapp/apps/t11" +LOCAL_FILENAME = "/maixapp/apps/t11/main_tmp.py" + +# ==================== 服务器配置 ==================== +SERVER_IP = "www.shelingxingqiu.com" +SERVER_PORT = 50005 +HEARTBEAT_INTERVAL = 15 # 心跳间隔(秒) + +# ==================== HTTP配置 ==================== +HTTP_URL = "http://ws.shelingxingqiu.com" +HTTP_API_PATH = "/home/shoot/device_fire/arrow/fire" + +# ==================== 文件路径配置 ==================== +CONFIG_FILE = "/root/laser_config.json" +LOG_FILE = "/maixapp/apps/t11/app.log" +BACKUP_BASE = "/maixapp/apps/t11/backups" + +# ==================== 硬件配置 ==================== +# UART配置 +UART4G_DEVICE = "/dev/ttyS2" +UART4G_BAUDRATE = 115200 +DISTANCE_SERIAL_DEVICE = "/dev/ttyS1" +DISTANCE_SERIAL_BAUDRATE = 9600 + +# I2C配置 +I2C_BUS_NUM = 1 +INA226_ADDR = 0x40 +REG_CONFIGURATION = 0x00 +REG_BUS_VOLTAGE = 0x02 +REG_CURRENT = 0x04 # 电流寄存器 +REG_CALIBRATION = 0x05 +CALIBRATION_VALUE = 0x1400 + +# ADC配置 +ADC_CHANNEL = 0 +ADC_TRIGGER_THRESHOLD = 3000 +ADC_LASER_THRESHOLD = 3000 + +# ==================== 激光配置 ==================== +MODULE_ADDR = 0x00 +LASER_ON_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC1]) +LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0]) +DEFAULT_LASER_POINT = (640, 480) # 默认激光中心点 + +# ==================== 视觉检测配置 ==================== +FOCAL_LENGTH_PIX = 2250.0 # 焦距(像素) +REAL_RADIUS_CM = 20 # 靶心实际半径(厘米) + +# ==================== 显示配置 ==================== +LASER_COLOR = (255, 100, 0) # RGB颜色 +LASER_THICKNESS = 1 +LASER_LENGTH = 2 + +# ==================== 图像保存配置 ==================== +SAVE_IMAGE_ENABLED = True # 是否保存图像(True=保存,False=不保存) +PHOTO_DIR = "/root/phot" # 照片存储目录 + +# ==================== OTA配置 ==================== +MAX_BACKUPS = 5 +LOG_MAX_BYTES = 10 * 1024 * 1024 # 10MB +LOG_BACKUP_COUNT = 5 + +# ==================== 引脚映射配置 ==================== +PIN_MAPPINGS = { + "A18": "UART1_RX", + "A19": "UART1_TX", + "A29": "UART2_RX", + "A28": "UART2_TX", + "P18": "I2C1_SCL", + "P21": "I2C1_SDA", +} diff --git a/hardware.py b/hardware.py new file mode 100644 index 0000000..90c5de7 --- /dev/null +++ b/hardware.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +硬件管理器模块 +提供硬件对象的统一管理和访问 +""" +import threading +import config +from at_client import ATClient + + +class HardwareManager: + """硬件管理器(单例)""" + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super(HardwareManager, cls).__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + + # 私有硬件对象 + self._uart4g = None # 4G模块UART + self._distance_serial = None # 激光测距串口 + self._bus = None # I2C总线 + self._adc_obj = None # ADC对象 + self._at_client = None # AT客户端 + + self._initialized = True + + # ==================== 硬件访问(只读属性)==================== + + @property + def uart4g(self): + """4G模块UART(只读)""" + return self._uart4g + + @property + def distance_serial(self): + """激光测距串口(只读)""" + return self._distance_serial + + @property + def bus(self): + """I2C总线(只读)""" + return self._bus + + @property + def adc_obj(self): + """ADC对象(只读)""" + return self._adc_obj + + @property + def at_client(self): + """AT客户端(只读)""" + return self._at_client + + # ==================== 初始化方法 ==================== + + def init_uart4g(self, device=None, baudrate=None): + """初始化4G模块UART""" + from maix import uart + if device is None: + device = config.UART4G_DEVICE + if baudrate is None: + baudrate = config.UART4G_BAUDRATE + self._uart4g = uart.UART(device, baudrate) + return self._uart4g + + def init_distance_serial(self, device=None, baudrate=None): + """初始化激光测距串口(激光控制)""" + from maix import uart + if device is None: + device = config.DISTANCE_SERIAL_DEVICE + if baudrate is None: + baudrate = config.DISTANCE_SERIAL_BAUDRATE + + print(f"[HW] 初始化激光串口: device={device}, baudrate={baudrate}") + self._distance_serial = uart.UART(device, baudrate) + print(f"[HW] 激光串口初始化完成: {self._distance_serial}") + return self._distance_serial + + def init_bus(self, bus_num=None): + """初始化I2C总线""" + from maix import i2c + if bus_num is None: + bus_num = config.I2C_BUS_NUM + self._bus = i2c.I2C(bus_num, i2c.Mode.MASTER) + return self._bus + + def init_adc(self, channel=None, res_bit=None): + """初始化ADC""" + from maix.peripheral import adc + if channel is None: + channel = config.ADC_CHANNEL + if res_bit is None: + res_bit = adc.RES_BIT_12 + self._adc_obj = adc.ADC(channel, res_bit) + return self._adc_obj + + def init_at_client(self, uart_obj=None): + """初始化AT客户端""" + if uart_obj is None: + if self._uart4g is None: + raise ValueError("uart4g must be initialized before at_client") + uart_obj = self._uart4g + self._at_client = ATClient(uart_obj) + self._at_client.start() + return self._at_client + + +# 创建全局单例实例 +hardware_manager = HardwareManager() + + diff --git a/laser_manager.py b/laser_manager.py new file mode 100644 index 0000000..0555de5 --- /dev/null +++ b/laser_manager.py @@ -0,0 +1,260 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +激光管理器模块 +提供激光控制、校准等功能 +""" +import json +import os +from maix import time, camera +import threading +import config +from logger_manager import logger_manager + + +class LaserManager: + """激光控制管理器(单例)""" + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super(LaserManager, cls).__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + + # 私有状态 + self._calibration_active = False + self._calibration_result = None + self._calibration_lock = threading.Lock() + self._laser_point = None + + self._initialized = True + + # ==================== 状态访问(只读属性)==================== + + @property + def calibration_active(self): + """是否正在校准""" + return self._calibration_active + + @property + def laser_point(self): + """当前激光点""" + return self._laser_point + + # ==================== 业务方法 ==================== + + def load_laser_point(self): + """从配置文件加载激光中心点,失败则使用默认值""" + try: + if "laser_config.json" in os.listdir("/root"): + with open(config.CONFIG_FILE, "r") as f: + data = json.load(f) + if isinstance(data, list) and len(data) == 2: + self._laser_point = (int(data[0]), int(data[1])) + logger = logger_manager.logger + if logger: + logger.debug(f"[INFO] 加载激光点: {self._laser_point}") + return self._laser_point + else: + raise ValueError + else: + self._laser_point = config.DEFAULT_LASER_POINT + except: + self._laser_point = config.DEFAULT_LASER_POINT + + return self._laser_point + + def save_laser_point(self, point): + """保存激光中心点到配置文件""" + try: + with open(config.CONFIG_FILE, "w") as f: + json.dump([point[0], point[1]], f) + self._laser_point = point + return True + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[LASER] 保存激光点失败: {e}") + return False + + def turn_on_laser(self): + """发送指令开启激光,并读取回包(部分模块支持)""" + from hardware import hardware_manager + logger = logger_manager.logger + + if hardware_manager.distance_serial is None: + if logger: + logger.error("[LASER] distance_serial 未初始化") + return None + + # 打印调试信息 + if logger: + logger.info(f"[LASER] 发送开启命令: {config.LASER_ON_CMD.hex()}") + + # 清空接收缓冲区 + try: + hardware_manager.distance_serial.read(-1) # 清空缓冲区 + except: + pass + + # 发送命令 + written = hardware_manager.distance_serial.write(config.LASER_ON_CMD) + if logger: + logger.info(f"[LASER] 写入字节数: {written}") + + time.sleep_ms(50) # 增加等待时间,让模块有时间响应 + + # 读取回包 + resp = hardware_manager.distance_serial.read(20) + if resp: + if logger: + logger.info(f"[LASER] 收到回包 ({len(resp)}字节): {resp.hex()}") + if resp == config.LASER_ON_CMD: + if logger: + logger.info("✅ 激光开启指令已确认") + else: + if logger: + logger.warning("🔇 无回包(可能正常或模块不支持回包)") + return resp + + def turn_off_laser(self): + """发送指令关闭激光""" + from hardware import hardware_manager + logger = logger_manager.logger + + if hardware_manager.distance_serial is None: + if logger: + logger.error("[LASER] distance_serial 未初始化") + return None + + # 打印调试信息 + if logger: + logger.info(f"[LASER] 发送关闭命令: {config.LASER_OFF_CMD.hex()}") + + # 清空接收缓冲区 + try: + hardware_manager.distance_serial.read(-1) + except: + pass + + # 发送命令 + written = hardware_manager.distance_serial.write(config.LASER_OFF_CMD) + if logger: + logger.info(f"[LASER] 写入字节数: {written}") + + time.sleep_ms(50) # 增加等待时间 + + # 读取回包 + resp = hardware_manager.distance_serial.read(20) + if resp: + if logger: + logger.info(f"[LASER] 收到回包 ({len(resp)}字节): {resp.hex()}") + else: + if logger: + logger.warning("🔇 无回包") + return resp + + def flash_laser(self, duration_ms=1000): + """闪一下激光(用于射箭反馈)""" + try: + self.turn_on_laser() + time.sleep_ms(duration_ms) + self.turn_off_laser() + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"闪激光失败: {e}") + + def find_red_laser(self, frame, threshold=150): + """在图像中查找最亮的红色激光点(基于 RGB 阈值)""" + w, h = frame.width(), frame.height() + img_bytes = frame.to_bytes() + max_sum = 0 + best_pos = None + for y in range(0, h, 2): + for x in range(0, w, 2): + idx = (y * w + x) * 3 + r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2] + if r > threshold and r > g * 2 and r > b * 2: + rgb_sum = r + g + b + if rgb_sum > max_sum: + max_sum = rgb_sum + best_pos = (x, y) + return best_pos + + def calibrate_laser_position(self): + """执行一次激光校准:拍照 → 找红点 → 保存坐标""" + time.sleep_ms(80) + cam = camera.Camera(640, 480) + frame = cam.read() + pos = self.find_red_laser(frame) + if pos: + self.save_laser_point(pos) + return pos + return None + + def start_calibration(self): + """开始校准(公共方法)""" + with self._calibration_lock: + if self._calibration_active: + return False + self._calibration_active = True + self._calibration_result = None + return True + + def stop_calibration(self): + """停止校准(公共方法)""" + with self._calibration_lock: + self._calibration_active = False + + def set_calibration_result(self, result): + """设置校准结果(内部方法)""" + with self._calibration_lock: + self._calibration_result = result + + def get_calibration_result(self): + """获取并清除校准结果(内部方法)""" + with self._calibration_lock: + result = self._calibration_result + self._calibration_result = None + return result + + +# 创建全局单例实例 +laser_manager = LaserManager() + +# ==================== 向后兼容的函数接口 ==================== + +def load_laser_point(): + """加载激光点(向后兼容接口)""" + return laser_manager.load_laser_point() + +def save_laser_point(point): + """保存激光点(向后兼容接口)""" + return laser_manager.save_laser_point(point) + +def turn_on_laser(): + """开启激光(向后兼容接口)""" + return laser_manager.turn_on_laser() + +def turn_off_laser(): + """关闭激光(向后兼容接口)""" + return laser_manager.turn_off_laser() + +def flash_laser(duration_ms=1000): + """闪激光(向后兼容接口)""" + return laser_manager.flash_laser(duration_ms) + +def find_red_laser(frame, threshold=150): + """查找红色激光点(向后兼容接口)""" + return laser_manager.find_red_laser(frame, threshold) + +def calibrate_laser_position(): + """校准激光位置(向后兼容接口)""" + return laser_manager.calibrate_laser_position() + diff --git a/ota_manager.py b/ota_manager.py new file mode 100644 index 0000000..2efe62b --- /dev/null +++ b/ota_manager.py @@ -0,0 +1,1278 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +OTA管理器模块 +提供OTA升级的状态管理和主要功能封装 +""" +import binascii +import hashlib +import re +import threading +import os +import json +import shutil +from urllib.parse import urlparse, unquote + +import requests +from maix import time +import config +from hardware import hardware_manager +from network import network_manager +from logger_manager import logger_manager +from power import get_bus_voltage, voltage_to_percent + + +# 延迟导入避免循环依赖 +# from network import network_manager + + +class OTAManager: + """OTA升级管理器(单例)""" + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super(OTAManager, cls).__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + + # 私有状态 + self._update_thread_started = False + self._ota_in_progress = 0 + self._ota_url = None + self._ota_mode = None + self._lock = threading.Lock() + + self._initialized = True + + # ==================== 状态访问(只读属性)==================== + + @property + def update_thread_started(self): + """OTA线程是否已启动""" + return self._update_thread_started + + @property + def ota_in_progress(self): + """OTA是否正在进行""" + with self._lock: + return self._ota_in_progress > 0 + + @property + def ota_url(self): + """当前OTA URL""" + return self._ota_url + + @property + def ota_mode(self): + """当前OTA模式""" + return self._ota_mode + + # ==================== 内部状态管理方法 ==================== + + def _start_update_thread(self): + """标记OTA线程已启动(内部方法)""" + with self._lock: + if self._update_thread_started: + return False + self._update_thread_started = True + return True + + def _stop_update_thread(self): + """标记OTA线程已停止(内部方法)""" + with self._lock: + self._update_thread_started = False + + def _begin_ota(self, url=None, mode=None): + """开始OTA(增加计数,内部方法)""" + with self._lock: + self._ota_in_progress += 1 + if url: + self._ota_url = url + if mode: + self._ota_mode = mode + + def _end_ota(self): + """结束OTA(减少计数,内部方法)""" + with self._lock: + self._ota_in_progress = max(0, self._ota_in_progress - 1) + + def _set_ota_url(self, url): + """设置OTA URL(内部方法)""" + with self._lock: + self._ota_url = url + + def _set_ota_mode(self, mode): + """设置OTA模式(内部方法)""" + with self._lock: + self._ota_mode = mode + + # ==================== 业务方法 ==================== + # 注意:这些方法会调用 ota.py 中的实际实现函数 + # 为了保持向后兼容,实际的实现仍然在 ota.py 中 + + def is_archive_file(self, filename): + """ + 检查文件是否是ZIP压缩包(通过扩展名判断) + 约定:上传的代码要么是ZIP压缩包(.zip),要么是直接的PY文件(.py) + + Returns: + (is_archive, archive_type): (True/False, 'zip'/None) + """ + if not os.path.exists(filename): + return False, None + + filename_lower = filename.lower() + if filename_lower.endswith('.zip'): + logger = logger_manager.logger + if logger: + logger.info(f"[EXTRACT] 检测到ZIP文件(扩展名: .zip)") + return True, 'zip' + + logger = logger_manager.logger + if logger: + logger.info(f"[EXTRACT] 不是ZIP格式(扩展名: {os.path.splitext(filename)[1] or '无'})") + return False, None + + def extract_zip_archive(self, archive_path, extract_to_dir=None, target_file=None): + """ + 使用系统 unzip 命令解压ZIP文件 + + Args: + archive_path: ZIP文件路径 + extract_to_dir: 解压到的目录(如果为None,解压到压缩包所在目录) + target_file: 目标文件名(如'main.py'),如果指定,只提取该文件;None表示解压所有文件 + + Returns: + (success, extracted_dir): 成功则返回(True, 解压目录路径),失败返回(False, None) + """ + if extract_to_dir is None: + extract_to_dir = os.path.dirname(archive_path) or '/tmp' + + logger = logger_manager.logger + logger.info(f"[EXTRACT] 开始解压ZIP文件: {archive_path}") + + try: + os.makedirs(extract_to_dir, exist_ok=True) + + if target_file: + cmd = f"unzip -q -o '{archive_path}' '{target_file}' -d '{extract_to_dir}' 2>&1" + else: + cmd = f"unzip -q -o '{archive_path}' -d '{extract_to_dir}' 2>&1" + + result = os.system(cmd) + + if result != 0: + logger.warning(f"[EXTRACT] 直接解压目标文件失败,尝试解压所有文件...") + cmd_all = f"unzip -q -o '{archive_path}' -d '{extract_to_dir}' 2>&1" + result_all = os.system(cmd_all) + + if result_all != 0: + logger.error(f"[EXTRACT] 解压失败,退出码: {result_all}") + return False, None + + return True, extract_to_dir + + except Exception as e: + logger.error(f"[EXTRACT] 解压过程出错: {e}") + return False, None + + def apply_ota_and_reboot(self, ota_url=None, downloaded_file=None): + """ + OTA 文件下载成功后: + 1. 备份应用目录中的所有代码文件 + 2. 如果是ZIP则解压,如果是单个文件则直接使用 + 3. 复制/覆盖所有更新文件到应用目录 + 4. 重启设备 + + Args: + ota_url: OTA URL(用于记录) + downloaded_file: 下载的文件路径(如果为None,使用默认的main_tmp.py) + """ + + # 在调用前设置状态 + if ota_url: + self._set_ota_url(ota_url) + + if downloaded_file is None: + downloaded_file = config.LOCAL_FILENAME + + ota_pending = f"{config.APP_DIR}/ota_pending.json" + + logger = logger_manager.logger + logger.info(f"[OTA] 准备应用OTA更新,下载文件: {downloaded_file}") + + try: + if not os.path.exists(downloaded_file): + logger.error(f"[OTA] 错误:{downloaded_file} 不存在") + return False + + # 备份 + backup_base = config.BACKUP_BASE + backup_dir = None + + try: + os.makedirs(backup_base, exist_ok=True) + + counter_file = os.path.join(backup_base, ".counter") + try: + if os.path.exists(counter_file): + with open(counter_file, 'r') as f: + counter = int(f.read().strip()) + 1 + else: + counter = 1 + + with open(counter_file, 'w') as f: + f.write(str(counter)) + + backup_dir = os.path.join(backup_base, f"backup_{counter:04d}") + logger.info(f"[OTA] 使用备份目录: {backup_dir} (第{counter}次OTA)") + except Exception as e: + logger.error(f"[OTA] 生成备份目录名失败: {e},使用默认目录") + backup_dir = os.path.join(backup_base, "backup_0000") + + # 清理旧备份 + try: + backup_dirs = [] + for item in os.listdir(backup_base): + if item == ".counter": + continue + item_path = os.path.join(backup_base, item) + if os.path.isdir(item_path) and item.startswith("backup_"): + try: + dir_num_str = item.replace("backup_", "") + dir_num = int(dir_num_str) + backup_dirs.append((item, dir_num, item_path)) + except: + pass + + backup_dirs.sort(key=lambda x: x[1], reverse=True) + if len(backup_dirs) > config.MAX_BACKUPS: + for item, dir_num, item_path in backup_dirs[config.MAX_BACKUPS:]: + try: + shutil.rmtree(item_path, ignore_errors=True) + logger.info(f"[OTA] 已删除旧备份: {item}") + except Exception as e: + logger.warning(f"[OTA] 删除旧备份失败: {e}") + except Exception as e: + logger.warning(f"[OTA] 清理旧备份时出错: {e}") + + os.makedirs(backup_dir, exist_ok=True) + + exclude_patterns = ['.pyc', '__pycache__', '.log', 'backups', 'ota_extract', '.bak', 'download'] + backed_up_files = [] + + if os.path.exists(config.APP_DIR): + for root, dirs, files in os.walk(config.APP_DIR): + dirs[:] = [d for d in dirs if not any(ex in d for ex in exclude_patterns)] + + for f in files: + if any(ex in f for ex in exclude_patterns): + continue + + source_path = os.path.join(root, f) + rel_path = os.path.relpath(source_path, config.APP_DIR) + backup_path = os.path.join(backup_dir, rel_path) + + backup_parent = os.path.dirname(backup_path) + if backup_parent != backup_dir: + os.makedirs(backup_parent, exist_ok=True) + + try: + shutil.copy2(source_path, backup_path) + backed_up_files.append(rel_path) + except Exception as e: + if logger: + logger.error(f"[OTA] 备份 {rel_path} 失败: {e}") + + if backed_up_files: + logger.info(f"[OTA] 总共备份了 {len(backed_up_files)} 个文件到 {backup_dir}") + else: + logger.warning(f"[OTA] 没有备份任何文件") + + except Exception as e: + logger.error(f"[OTA] 备份过程出错: {e}") + if not backup_dir: + backup_dir = None + + # 检查是否是ZIP压缩包 + is_archive, archive_type = self.is_archive_file(downloaded_file) + files_to_copy = [] + + if is_archive and archive_type == 'zip': + # 在解压前验证ZIP文件完整性 + try: + with open(downloaded_file, "rb") as f: + zip_header = f.read(4) + if zip_header[:2] != b'PK': + logger.error(f"[OTA] ZIP文件头验证失败: {zip_header.hex()}") + return False + file_size = os.path.getsize(downloaded_file) + logger.info(f"[OTA] ZIP文件验证通过: 大小={file_size} bytes, 头={zip_header.hex()}") + except Exception as e: + logger.error(f"[OTA] ZIP文件验证异常: {e}") + return False + + logger.info(f"[OTA] 检测到ZIP压缩包,开始解压...") + extract_dir = "/tmp/ota_extract" + try: + os.makedirs(extract_dir, exist_ok=True) + except: + extract_dir = f"{config.APP_DIR}/ota_extract" + os.makedirs(extract_dir, exist_ok=True) + + success, extracted_dir = self.extract_zip_archive( + downloaded_file, + extract_to_dir=extract_dir, + target_file=None + ) + + if success and extracted_dir and os.path.exists(extracted_dir): + for root, dirs, files in os.walk(extracted_dir): + for f in files: + source_path = os.path.join(root, f) + rel_path = os.path.relpath(source_path, extracted_dir) + files_to_copy.append((source_path, rel_path)) + + if files_to_copy: + logger.info(f"[OTA] 解压成功,共 {len(files_to_copy)} 个文件") + else: + logger.error(f"[OTA] 解压成功但未找到任何文件") + return False + else: + logger.error(f"[OTA] 解压失败") + return False + else: + # 单个文件更新:从下载的文件名推断目标文件名 + filename = os.path.basename(downloaded_file) + + # 如果下载的文件是 main_tmp.py,目标应该是 main.py + # 如果下载的文件是 main.py,目标也是 main.py + # 其他文件名,保持原样 + if filename == "main_tmp.py": + target_rel_path = "main.py" + else: + target_rel_path = filename + + files_to_copy = [(downloaded_file, target_rel_path)] + logger.info(f"[OTA] 单个文件更新: {downloaded_file} -> {target_rel_path}") + + # 复制文件 + if not files_to_copy: + logger.error(f"[OTA] 没有文件需要复制") + return False + + copied_files = [] + for source_path, rel_path in files_to_copy: + dest_path = os.path.join(config.APP_DIR, rel_path) + + # 检查源文件和目标文件是否是同一个文件(避免复制到自身) + if os.path.abspath(source_path) == os.path.abspath(dest_path): + logger.warning(f"[OTA] 源文件和目标文件相同,跳过复制: {rel_path} (文件已在正确位置)") + copied_files.append(rel_path) + continue + + dest_dir = os.path.dirname(dest_path) + if dest_dir: + try: + os.makedirs(dest_dir, exist_ok=True) + except Exception: + pass + + try: + shutil.copy2(source_path, dest_path) + copied_files.append(rel_path) + logger.info(f"[OTA] 已复制: {rel_path}") + except Exception as e: + logger.error(f"[OTA] 复制 {rel_path} 失败: {e}") + return False + + if copied_files: + logger.info(f"[OTA] 成功复制 {len(copied_files)} 个文件到应用目录") + + # 确保写入磁盘 + try: + os.sync() + except: + pass + time.sleep_ms(500) + + # 写入 pending + try: + pending_obj = { + "ts": 0, # MaixPy time 模块没有 time() 函数,使用 0 + "url": ota_url or "", + "downloaded_file": downloaded_file, + "was_archive": is_archive, + "archive_type": archive_type if is_archive else None, + "backup_dir": backup_dir, + "updated_files": copied_files, + "restart_count": 0, + "max_restarts": 3, + } + with open(ota_pending, "w", encoding="utf-8") as f: + json.dump(pending_obj, f) + try: + os.sync() + except: + pass + except Exception as e: + logger.error(f"[OTA] 写入 ota_pending 失败: {e}") + + # 通知服务器(延迟导入避免循环导入) + from network import safe_enqueue + safe_enqueue({"result": "ota_applied_rebooting", "files": copied_files}, 2) + time.sleep_ms(1000) + + # 清理临时解压目录 + if is_archive and 'extract_dir' in locals(): + try: + if os.path.exists(extract_dir): + shutil.rmtree(extract_dir, ignore_errors=True) + logger.info(f"[OTA] 已清理临时解压目录: {extract_dir}") + except Exception as e: + logger.warning(f"[OTA] 清理临时目录失败(可忽略): {e}") + + # 清理下载文件 + try: + if os.path.exists(downloaded_file): + # 删除下载的文件 + try: + os.remove(downloaded_file) + logger.info(f"[OTA] 已删除下载文件: {downloaded_file}") + except Exception as e: + logger.warning(f"[OTA] 删除下载文件失败(可忽略): {e}") + + # 尝试删除时间戳目录(如果为空) + try: + download_dir = os.path.dirname(downloaded_file) + if download_dir.startswith("/tmp/download/"): + # 检查时间戳目录是否为空 + if os.path.exists(download_dir): + try: + files_in_dir = os.listdir(download_dir) + if not files_in_dir: + os.rmdir(download_dir) + logger.info(f"[OTA] 已删除空时间戳目录: {download_dir}") + except Exception as e: + logger.debug(f"[OTA] 删除时间戳目录失败(可忽略): {e}") + except Exception as e: + logger.debug(f"[OTA] 清理时间戳目录时出错(可忽略): {e}") + except Exception as e: + logger.warning(f"[OTA] 清理下载文件时出错(可忽略): {e}") + + # 重启设备 + logger.info("[OTA] 准备重启设备...") + os.system("reboot") + + return True + + except Exception as e: + logger = logger_manager.logger + logger.error(f"[OTA] apply_ota_and_reboot 异常: {e}") + import traceback + logger.error(traceback.format_exc()) + return False + + def get_download_timestamp_dir(self): + """ + 获取下载目录(带时间戳),格式:/tmp/download/YYYYMMDD_HHMMSS + 使用时间戳而不是日期,避免跨天问题 + + Returns: + 下载目录路径 + """ + try: + # 尝试从系统获取时间戳 + try: + # 方法1:使用系统 date 命令(精确到秒) + timestamp_str = os.popen("date +%Y%m%d_%H%M%S 2>/dev/null").read().strip() + if timestamp_str and len(timestamp_str) == 15: # YYYYMMDD_HHMMSS = 15字符 + timestamp_dir = timestamp_str + else: + raise ValueError("date command failed") + except: + # 方法2:使用 Python datetime(如果系统时间已同步) + try: + from datetime import datetime + now = datetime.now() + timestamp_dir = now.strftime("%Y%m%d_%H%M%S") + except: + # 方法3:如果都失败,使用默认时间戳 + timestamp_dir = "00000000_000000" + + download_base = "/tmp/download" + download_dir = f"{download_base}/{timestamp_dir}" + + # 确保目录存在 + try: + os.makedirs(download_dir, exist_ok=True) + except Exception as e: + logger = logger_manager.logger + logger.warning(f"[OTA] 创建下载目录失败: {e},使用基础目录") + download_dir = download_base + try: + os.makedirs(download_dir, exist_ok=True) + except: + pass + + return download_dir + except Exception as e: + logger = logger_manager.logger + logger.error(f"[OTA] 获取下载目录失败: {e},使用默认目录") + return "/tmp/download" + + def get_filename_from_url(self, url, default_name="main_tmp"): + """ + 从URL中提取文件名和扩展名,保存到带时间戳的下载目录 + + Args: + url: 下载URL + default_name: 如果无法从URL提取文件名,使用的默认名称 + + Returns: + 完整的文件路径,例如: "/tmp/download/20250108_143025/main.zip" + """ + try: + # 获取下载目录(带时间戳) + download_dir = self.get_download_timestamp_dir() + + parsed = urlparse(url) + path = parsed.path + filename = os.path.basename(path) + filename = unquote(filename) + + # 如果从URL提取到了文件名(无论是否有扩展名),都使用该文件名 + if filename and filename.strip(): + return f"{download_dir}/{filename}" + else: + # 只有在完全无法提取文件名时,才使用默认名称 + return f"{download_dir}/{default_name}" + except Exception as e: + logger = logger_manager.logger + logger.error(f"[OTA] 从URL提取文件名失败: {e},使用默认文件名") + download_dir = self.get_download_timestamp_dir() + return f"{download_dir}/{default_name}" + + def download_file(self, url, filename): + """从指定 URL 下载文件,根据文件类型自动选择文本或二进制模式,并支持MD5校验""" + try: + logger = logger_manager.logger + logger.info(f"正在从 {url} 下载文件...") + response = requests.get(url) + response.raise_for_status() + + # 从响应头中提取MD5(如果服务器提供) + md5_b64_expected = None + if 'Content-Md5' in response.headers: + md5_b64_expected = response.headers['Content-Md5'].strip() + logger.info(f"[DOWNLOAD] 服务器提供了MD5校验值: {md5_b64_expected}") + + # 根据文件扩展名判断是否为二进制文件 + filename_lower = filename.lower() + is_binary = filename_lower.endswith(('.zip', '.bin', '.tar', '.gz', '.exe', '.dll', '.so', '.dylib')) + + if is_binary: + # 二进制文件:使用二进制模式写入 + data = response.content + with open(filename, 'wb') as file: + file.write(data) + # 强制刷新到磁盘 + try: + os.sync() + except: + pass + logger.info(f"[DOWNLOAD] 使用二进制模式下载: {filename}, 大小: {len(data)} bytes") + else: + # 文本文件:使用文本模式写入 + response.encoding = 'utf-8' + with open(filename, 'w', encoding='utf-8') as file: + file.write(response.text) + + logger.info(f"[DOWNLOAD] 使用文本模式下载: {filename}") + + # MD5 校验(如果服务器提供了MD5值) + if md5_b64_expected and hashlib is not None: + try: + with open(filename, "rb") as f: + file_data = f.read() + digest = hashlib.md5(file_data).digest() + md5_b64_got = binascii.b2a_base64(digest).decode().strip() + + if md5_b64_got != md5_b64_expected: + logger.error(f"[DOWNLOAD] MD5校验失败: 期望={md5_b64_expected}, 实际={md5_b64_got}") + return f"下载失败!MD5校验失败: 期望={md5_b64_expected}, 实际={md5_b64_got}" + else: + logger.info(f"[DOWNLOAD] MD5校验通过: {md5_b64_got}") + except Exception as e: + logger.warning(f"[DOWNLOAD] MD5校验过程出错: {e}") + # MD5校验出错时,如果是二进制文件(特别是ZIP),应该失败 + if is_binary: + return f"下载失败!MD5校验异常: {e}" + elif is_binary and not md5_b64_expected: + # 二进制文件(特别是ZIP)建议有MD5校验 + logger.warning(f"[DOWNLOAD] 警告: 服务器未提供MD5校验值,无法验证文件完整性") + + return f"下载成功!文件已保存为: {filename}" + except requests.exceptions.RequestException as e: + return f"下载失败!网络请求错误: {e}" + except OSError as e: + return f"下载失败!文件写入错误: {e}" + except Exception as e: + return f"下载失败!发生未知错误: {e}" + + def direct_ota_download(self, ota_url): + """直接执行 OTA 下载(假设已有网络)""" + + self._set_ota_url(ota_url) + self._start_update_thread() + + try: + if not ota_url: + from network import safe_enqueue + safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2) + return + + parsed_url = urlparse(ota_url) + host = parsed_url.hostname + port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80) + + if not network_manager.is_server_reachable(host, port, timeout=8): + from network import safe_enqueue + safe_enqueue({"result": "ota_failed", "reason": f"无法连接 {host}:{port}"}, 2) + return + + downloaded_filename = self.get_filename_from_url(ota_url, default_name="main_tmp") + logger = logger_manager.logger + logger.info(f"[OTA] 下载文件将保存为: {downloaded_filename}") + logger.info(f"[OTA] 开始下载: {ota_url}") + result_msg = self.download_file(ota_url, downloaded_filename) + logger.info(f"[OTA] {result_msg}") + + if "成功" in result_msg or "下载成功" in result_msg: + if self.apply_ota_and_reboot(ota_url, downloaded_filename): + return + else: + from network import safe_enqueue + safe_enqueue({"result": result_msg}, 2) + + except Exception as e: + error_msg = f"OTA 异常: {str(e)}" + logger = logger_manager.logger + logger.error(error_msg) + from network import safe_enqueue + safe_enqueue({"result": "ota_failed", "reason": error_msg}, 2) + finally: + self._stop_update_thread() + + def download_file_via_4g(self, url, filename, + total_timeout_ms=600000, + retries=3, + debug=False): + """ + ML307R HTTP 下载(更稳的"固定小块 Range 顺序下载",基于main109.py): + - 只依赖 +MHTTPURC:"header"/"content"(不依赖 MHTTPREAD/cached) + - 每次只请求一个小块 Range(默认 10240B),失败就重试同一块,必要时缩小块大小 + - 每个 chunk 都重新 MHTTPCREATE/MHTTPREQUEST,避免卡在"206 header 但不吐 content"的坏状态 + - 使用二进制模式下载,确保文件完整性 + """ + from urllib.parse import urlparse + from hardware import hardware_manager + + # 小块策略(与main109.py保持一致) + CHUNK_MAX = 10240 + CHUNK_MIN = 128 + CHUNK_RETRIES = 12 + FRAG_SIZE = 1024 + FRAG_DELAY = 10 + + t_func0 = time.ticks_ms() + + parsed = urlparse(url) + host = parsed.hostname + path = parsed.path or "/" + if not host: + return False, "bad_url (no host)" + + # 很多 ML307R 的 MHTTP 对 https 不稳定;对已知域名做降级 + if isinstance(url, str) and url.startswith("https://static.shelingxingqiu.com/"): + base_url = "http://static.shelingxingqiu.com" + else: + base_url = f"http://{host}" + + logger = logger_manager.logger + + def _log(*a): + if debug and logger: + logger.debug(" ".join(str(x) for x in a)) + + def _pwr_log(prefix=""): + """debug 用:输出电压/电量""" + if not debug: + return + try: + v = get_bus_voltage() + p = voltage_to_percent(v) + if logger: + logger.debug(f"[PWR]{prefix} v={v:.3f}V p={p}%") + except Exception as e: + try: + if logger: + logger.debug(f"[PWR]{prefix} read_failed: {e}") + except: + pass + + def _clear_http_events(): + if hardware_manager.at_client: + while hardware_manager.at_client.pop_http_event() is not None: + pass + + def _parse_httpid(resp: str): + m = re.search(r"\+MHTTPCREATE:\s*(\d+)", resp) + return int(m.group(1)) if m else None + + def _get_ip(): + r = hardware_manager.at_client.send("AT+CGPADDR=1", "OK", 3000) + m = re.search(r'\+CGPADDR:\s*1,"([^"]+)"', r) + return m.group(1) if m else "" + + def _ensure_pdp(): + ip = _get_ip() + if ip and ip != "0.0.0.0": + return True, ip + hardware_manager.at_client.send("AT+MIPCALL=1,1", "OK", 15000) + for _ in range(10): + ip = _get_ip() + if ip and ip != "0.0.0.0": + return True, ip + time.sleep(1) + return False, ip + + def _extract_hdr_fields(hdr_text: str): + mlen = re.search(r"Content-Length:\s*(\d+)", hdr_text, re.IGNORECASE) + clen = int(mlen.group(1)) if mlen else None + mmd5 = re.search(r"Content-Md5:\s*([A-Za-z0-9+/=]+)", hdr_text, re.IGNORECASE) + md5_b64 = mmd5.group(1).strip() if mmd5 else None + return clen, md5_b64 + + def _extract_content_range(hdr_text: str): + m = re.search(r"Content-Range:\s*bytes\s*(\d+)\s*-\s*(\d+)\s*/\s*(\d+)", hdr_text, re.IGNORECASE) + if not m: + return None, None, None + try: + return int(m.group(1)), int(m.group(2)), int(m.group(3)) + except: + return None, None, None + + def _hard_reset_http(): + """模块进入"坏状态"时的保守清场""" + _clear_http_events() + for i in range(0, 6): + try: + hardware_manager.at_client.send(f"AT+MHTTPDEL={i}", "OK", 1200) + except: + pass + _clear_http_events() + + def _create_httpid(full_reset=False): + _clear_http_events() + if hardware_manager.at_client: + hardware_manager.at_client.flush() + if full_reset: + _hard_reset_http() + resp = hardware_manager.at_client.send(f'AT+MHTTPCREATE="{base_url}"', "OK", 8000) + hid = _parse_httpid(resp) + return hid, resp + + def _fetch_range_into_buf(start, want_len, out_buf, full_reset=False): + """ + 请求 Range [start, start+want_len),写入 out_buf(bytearray,长度=want_len) + 返回 (ok, msg, total_len, md5_b64, got_len) + """ + end_incl = start + want_len - 1 + hid, cresp = _create_httpid(full_reset=full_reset) + if hid is None: + return False, f"MHTTPCREATE failed: {cresp}", None, None, 0 + + # 降低 URC 压力(分片/延迟) + hardware_manager.at_client.send(f'AT+MHTTPCFG="fragment",{hid},{FRAG_SIZE},{FRAG_DELAY}', "OK", 1500) + # 设置 Range header(inclusive) + hardware_manager.at_client.send(f'AT+MHTTPCFG="header",{hid},"Range: bytes={start}-{end_incl}"', "OK", 3000) + + req = hardware_manager.at_client.send(f'AT+MHTTPREQUEST={hid},1,0,"{path}"', "OK", 15000) + if "ERROR" in req or "CME ERROR" in req: + hardware_manager.at_client.send(f"AT+MHTTPDEL={hid}", "OK", 2000) + return False, f"MHTTPREQUEST failed: {req}", None, None, 0 + + # 等 header + content + hdr_text = None + hdr_accum = "" + code = None + resp_total = None + total_len = None + md5_b64 = None + + got_ranges = set() + last_sum = 0 + t0 = time.ticks_ms() + timeout_ms = 9000 + logged_hdr = False + + while time.ticks_ms() - t0 < timeout_ms: + ev = hardware_manager.at_client.pop_http_event() if hardware_manager.at_client else None + if not ev: + time.sleep_ms(5) + continue + + if ev[0] == "header": + _, ehid, ecode, ehdr = ev + if ehid != hid: + continue + code = ecode + hdr_text = ehdr + if ehdr: + hdr_accum = (hdr_accum + "\n" + ehdr) if hdr_accum else ehdr + + resp_total_tmp, md5_tmp = _extract_hdr_fields(hdr_accum) + if md5_tmp: + md5_b64 = md5_tmp + cr_s, cr_e, cr_total = _extract_content_range(hdr_accum) + if cr_total is not None: + total_len = cr_total + if resp_total_tmp is not None: + resp_total = resp_total_tmp + elif resp_total is None and (cr_s is not None) and (cr_e is not None) and (cr_e >= cr_s): + resp_total = (cr_e - cr_s + 1) + if (not logged_hdr) and (resp_total is not None or total_len is not None): + _log(f"[HDR] id={hid} code={code} clen={resp_total} cr={cr_s}-{cr_e}/{cr_total}") + logged_hdr = True + continue + + if ev[0] == "content": + _, ehid, _total, _sum, _cur, payload = ev + if ehid != hid: + continue + if resp_total is None: + resp_total = _total + if resp_total is None or resp_total <= 0: + continue + start_rel = _sum - _cur + end_rel = _sum + if start_rel < 0 or start_rel >= resp_total: + continue + if end_rel > resp_total: + end_rel = resp_total + actual_len = min(len(payload), end_rel - start_rel) + if actual_len <= 0: + continue + out_buf[start_rel:start_rel + actual_len] = payload[:actual_len] + got_ranges.add((start_rel, start_rel + actual_len)) + if _sum > last_sum: + last_sum = _sum + if debug and (last_sum >= resp_total or (last_sum % 512 == 0)): + _log(f"[CHUNK] {start}+{last_sum}/{resp_total}") + + if last_sum >= resp_total: + break + + # 清理实例(快路径:只删当前 hid) + try: + hardware_manager.at_client.send(f"AT+MHTTPDEL={hid}", "OK", 2000) + except: + pass + + if resp_total is None: + return False, "no_header_or_total", total_len, md5_b64, 0 + + # 计算实际填充长度 + merged = sorted(got_ranges) + merged2 = [] + for s, e in merged: + if not merged2 or s > merged2[-1][1]: + merged2.append((s, e)) + else: + merged2[-1] = (merged2[-1][0], max(merged2[-1][1], e)) + filled = sum(e - s for s, e in merged2) + + if filled < resp_total: + return False, f"incomplete_chunk got={filled} expected={resp_total} code={code}", total_len, md5_b64, filled + + got_len = resp_total + return True, "OK", total_len, md5_b64, got_len + + try: + self._begin_ota() + except: + pass + + from network import network_manager + with network_manager.get_uart_lock(): + try: + ok_pdp, ip = _ensure_pdp() + if not ok_pdp: + return False, f"PDP not ready (ip={ip})" + + # 先清空旧事件,避免串台 + _clear_http_events() + + # 为了支持随机写入,先创建空文件 + try: + with open(filename, "wb") as f: + f.write(b"") + except Exception as e: + return False, f"open_file_failed: {e}" + + total_len = None + expect_md5_b64 = None + + offset = 0 + chunk = CHUNK_MAX + t_start = time.ticks_ms() + last_progress_ms = t_start + STALL_TIMEOUT_MS = 60000 + last_pwr_ms = t_start + _pwr_log(prefix=" ota_start") + bad_http_state = 0 + + while True: + now = time.ticks_ms() + if debug and time.ticks_diff(now, last_pwr_ms) >= 5000: + last_pwr_ms = now + _pwr_log(prefix=f" off={offset}/{total_len or '?'}") + if time.ticks_diff(now, t_start) > total_timeout_ms: + return False, f"timeout overall after {total_timeout_ms}ms offset={offset} total={total_len}" + + if time.ticks_diff(now, last_progress_ms) > STALL_TIMEOUT_MS: + return False, f"timeout stalled {STALL_TIMEOUT_MS}ms offset={offset} total={total_len}" + + if total_len is not None and offset >= total_len: + break + + want = chunk + if total_len is not None: + remain = total_len - offset + if remain <= 0: + break + if want > remain: + want = remain + + # 本 chunk 的 buffer(长度=want) + buf = bytearray(want) + + success = False + last_err = "unknown" + md5_seen = None + got_len = 0 + for k in range(1, CHUNK_RETRIES + 1): + do_full_reset = (bad_http_state >= 2) + ok, msg, tlen, md5_b64, got = _fetch_range_into_buf(offset, want, buf, full_reset=do_full_reset) + last_err = msg + if tlen is not None and total_len is None: + total_len = tlen + if md5_b64 and not expect_md5_b64: + expect_md5_b64 = md5_b64 + if ok: + success = True + got_len = got + bad_http_state = 0 + break + + try: + if ("no_header_or_total" in msg) or ("MHTTPREQUEST failed" in msg) or ("MHTTPCREATE failed" in msg): + bad_http_state += 1 + else: + bad_http_state = max(0, bad_http_state - 1) + except: + pass + + if chunk > CHUNK_MIN: + chunk = max(CHUNK_MIN, chunk // 2) + want = min(chunk, want) + buf = bytearray(want) + _log(f"[RETRY] off={offset} want={want} try={k} err={msg}") + _pwr_log(prefix=f" retry{k} off={offset}") + time.sleep_ms(120) + + if not success: + return False, f"chunk_failed off={offset} want={want} err={last_err} total={total_len}" + + # 写入文件(二进制模式) + try: + with open(filename, "r+b") as f: + f.seek(offset) + f.write(bytes(buf)) + except Exception as e: + return False, f"write_failed off={offset}: {e}" + + offset += len(buf) + last_progress_ms = time.ticks_ms() + chunk = CHUNK_MAX + if debug: + _log(f"[OK] offset={offset}/{total_len or '?'}") + + # MD5 校验 + if expect_md5_b64 and hashlib is not None: + try: + with open(filename, "rb") as f: + data = f.read() + digest = hashlib.md5(data).digest() + got_b64 = binascii.b2a_base64(digest).decode().strip() + if got_b64 != expect_md5_b64: + return False, f"md5_mismatch got={got_b64} expected={expect_md5_b64}" + if logger: + logger.debug(f"[4G-DL] MD5 verified: {got_b64}") + except Exception as e: + return False, f"md5_check_failed: {e}" + + t_cost = time.ticks_diff(time.ticks_ms(), t_func0) + if logger: + logger.info(f"[4G-DL] download complete: size={offset} ip={ip} cost_ms={t_cost}") + return True, f"OK size={offset} ip={ip} cost_ms={t_cost}" + + finally: + self._end_ota() + + def direct_ota_download_via_4g(self, ota_url): + """通过 4G 模块下载 OTA(不需要 Wi-Fi)""" + self._set_ota_url(ota_url) + self._set_ota_mode("4g") + self._start_update_thread() + # 延迟导入避免循环依赖 + from network import safe_enqueue + + try: + t_ota0 = time.ticks_ms() + if not ota_url: + safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2) + return + + # OTA 全程暂停 TCP(避免心跳/重连抢占 uart4g_lock,导致 server 断链 + HTTP URC 更容易丢) + self._begin_ota() + + # 主动断开 AT TCP,减少 +MIPURC 噪声干扰 HTTP URC 下载 + from network import network_manager + network_manager.disconnect_server() + try: + with network_manager.get_uart_lock(): + hardware_manager.at_client.send("AT+MIPCLOSE=0", "OK", 1500) + except: + pass + + # 从URL中提取文件名(保留原始扩展名) + downloaded_filename = self.get_filename_from_url(ota_url, default_name="main_tmp") + logger_manager.logger.info(f"[OTA-4G] 下载文件将保存为: {downloaded_filename}") + + logger_manager.logger.info(f"[OTA-4G] 开始通过 4G 下载: {ota_url}") + # 重要说明: + # - AT+MDIALUP / RNDIS 是"USB 主机拨号上网"模式,在不少 ML307R 固件上会占用/切换内部网络栈, + # 从而导致 AT+MIPOPEN / +MIPURC 这套 TCP 连接无法工作(你会看到一直"连接到服务器...")。 + # - 这个设备当前 4G 是走 UART + AT Socket(MIPOPEN),并没有把 4G 变成系统网卡(如 ppp0)。 + # 因此这里不再自动拨号/改路由;只有当系统本来就有 default route(例如 eth0 已联网)时,才尝试走 requests 下载。 + + msg_sys = "" + try: + import power + v = power.get_bus_voltage() + p = power.voltage_to_percent(v) + logger_manager.logger.info(f"[OTA-4G][PWR] before_urc v={v:.3f}V p={p}%") + except Exception as e: + logger_manager.logger.error(f"[OTA-4G][PWR] before_urc read_failed: {e}") + + t_dl0 = time.ticks_ms() + success, msg = self.download_file_via_4g(ota_url, downloaded_filename, debug=False) + t_dl_cost = time.ticks_diff(t_dl0, time.ticks_ms()) + logger_manager.logger.info(f"[OTA-4G] {msg}") + logger_manager.logger.info(f"[OTA-4G] download_cost_ms={t_dl_cost}") + + if success and "OK" in msg: + if self.apply_ota_and_reboot(ota_url, downloaded_filename): + return + else: + safe_enqueue({"result": msg_sys or msg}, 2) + + except Exception as e: + error_msg = f"OTA-4G 异常: {str(e)}" + logger_manager.logger.error(error_msg) + safe_enqueue({"result": "ota_failed", "reason": error_msg}, 2) + finally: + # 总耗时(注意:若成功并 reboot,这行可能来不及打印) + try: + t_cost = time.ticks_diff(time.ticks_ms(), t_ota0) + logger_manager.logger.info(f"[OTA-4G] total_cost_ms={t_cost}") + except: + pass + self._stop_update_thread() + # 对应上面的 _begin_ota() + self._end_ota() + + def handle_wifi_and_update(self, ssid, password, ota_url): + """在子线程中执行 Wi-Fi 连接 + OTA 更新流程""" + self._set_ota_url(ota_url) + self._set_ota_mode("wifi") + self._start_update_thread() + # 延迟导入避免循环导入 + from network import network_manager, safe_enqueue + + try: + ip, error = network_manager.connect_wifi(ssid, password) + if error: + safe_enqueue({"result": "wifi_failed", "error": error}, 2) + return + safe_enqueue({"result": "wifi_connected", "ip": ip}, 2) + + if not ota_url: + safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2) + return + from urllib.parse import urlparse + parsed_url = urlparse(ota_url) + host = parsed_url.hostname + port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80) + + if not network_manager.is_server_reachable(host, port, timeout=8): + err_msg = f"网络不通:无法连接 {host}:{port}" + safe_enqueue({"result": err_msg}, 2) + logger = logger_manager.logger + if logger: + logger.error(err_msg) + return + + downloaded_filename = self.get_filename_from_url(ota_url, default_name="main_tmp") + logger = logger_manager.logger + if logger: + logger.info(f"[OTA] 下载文件将保存为: {downloaded_filename}") + + if logger: + logger.info(f"[NET] 已确认可访问 {host}:{port},开始下载...") + result = self.download_file(ota_url, downloaded_filename) + if logger: + logger.info(result) + + if "成功" in result or "下载成功" in result: + if self.apply_ota_and_reboot(ota_url, downloaded_filename): + return + else: + safe_enqueue({"result": result}, 2) + + finally: + self._stop_update_thread() + print("[UPDATE] 更新线程执行完毕,即将退出。") + + def restore_from_backup(self, backup_dir_path=None): + """ + 从备份目录恢复所有文件到应用目录 + + Args: + backup_dir_path: 备份目录路径,如果为None,自动查找最新的备份目录 + + Returns: + bool: 是否成功恢复 + """ + backup_base = config.BACKUP_BASE + + try: + if backup_dir_path is None: + if not os.path.exists(backup_base): + logger = logger_manager.logger + if logger: + logger.error(f"[RESTORE] 备份目录不存在: {backup_base}") + return False + + backup_dirs = [] + for item in os.listdir(backup_base): + if item == ".counter": + continue + item_path = os.path.join(backup_base, item) + if os.path.isdir(item_path) and item.startswith("backup_"): + try: + dir_num_str = item.replace("backup_", "") + dir_num = int(dir_num_str) + backup_dirs.append((item, dir_num)) + except: + pass + + if not backup_dirs: + logger = logger_manager.logger + if logger: + logger.error(f"[RESTORE] 没有找到备份目录") + return False + + backup_dirs.sort(key=lambda x: x[1], reverse=True) + latest_backup = backup_dirs[0][0] + backup_dir_path = os.path.join(backup_base, latest_backup) + + if not os.path.exists(backup_dir_path): + logger = logger_manager.logger + if logger: + logger.error(f"[RESTORE] 备份目录不存在: {backup_dir_path}") + return False + + logger = logger_manager.logger + if logger: + logger.info(f"[RESTORE] 开始从备份恢复: {backup_dir_path}") + + restored_files = [] + for root, dirs, files in os.walk(backup_dir_path): + for f in files: + source_path = os.path.join(root, f) + rel_path = os.path.relpath(source_path, backup_dir_path) + dest_path = os.path.join(config.APP_DIR, rel_path) + + dest_dir = os.path.dirname(dest_path) + if dest_dir: + os.makedirs(dest_dir, exist_ok=True) + + try: + shutil.copy2(source_path, dest_path) + restored_files.append(rel_path) + if logger: + logger.info(f"[RESTORE] 已恢复: {rel_path}") + except Exception as e: + if logger: + logger.error(f"[RESTORE] 恢复 {rel_path} 失败: {e}") + + if restored_files: + if logger: + logger.info(f"[RESTORE] 成功恢复 {len(restored_files)} 个文件") + return True + else: + if logger: + logger.info(f"[RESTORE] 没有文件被恢复") + return False + + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[RESTORE] 恢复过程出错: {e}") + return False + + +# 创建全局单例实例 +ota_manager = OTAManager() + +# ==================== 向后兼容的函数接口 ==================== +# 这些函数会更新 ota_manager 的状态,并调用实际实现 + +def apply_ota_and_reboot(ota_url=None, downloaded_file=None): + """应用OTA并重启(向后兼容接口)""" + return ota_manager.apply_ota_and_reboot(ota_url, downloaded_file) + +def direct_ota_download(ota_url): + """直接执行OTA下载(向后兼容接口)""" + return ota_manager.direct_ota_download(ota_url) + +def direct_ota_download_via_4g(ota_url): + """通过4G模块下载OTA(向后兼容接口)""" + return ota_manager.direct_ota_download_via_4g(ota_url) + +def handle_wifi_and_update(ssid, password, ota_url): + """处理WiFi连接并更新(向后兼容接口)""" + return ota_manager.handle_wifi_and_update(ssid, password, ota_url) + +def restore_from_backup(backup_dir_path=None): + """从备份恢复(向后兼容接口)""" + return ota_manager.restore_from_backup(backup_dir_path) + diff --git a/power.py b/power.py new file mode 100644 index 0000000..c7aebfa --- /dev/null +++ b/power.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +电源管理模块(INA226) +提供电压、电流监测和充电状态检测 +""" +import config +from logger_manager import logger_manager + + +def write_register(reg, value): + """写入INA226寄存器""" + from hardware import hardware_manager + data = [(value >> 8) & 0xFF, value & 0xFF] + hardware_manager.bus.writeto_mem(config.INA226_ADDR, reg, bytes(data)) + + +def read_register(reg): + """读取INA226寄存器""" + from hardware import hardware_manager + data = hardware_manager.bus.readfrom_mem(config.INA226_ADDR, reg, 2) + return (data[0] << 8) | data[1] + + +def init_ina226(): + """初始化 INA226 芯片:配置模式 + 校准值""" + write_register(config.REG_CONFIGURATION, 0x4527) + write_register(config.REG_CALIBRATION, config.CALIBRATION_VALUE) + + +def get_bus_voltage(): + """读取总线电压(单位:V)""" + raw = read_register(config.REG_BUS_VOLTAGE) + return raw * 1.25 / 1000 + + +def get_current(): + """ + 读取电流(单位:mA) + 正数表示充电,负数表示放电 + + INA226 电流计算公式: + Current = (Current Register Value) × Current_LSB + Current_LSB = 0.001 × CALIBRATION_VALUE / 4096 + """ + try: + raw = read_register(config.REG_CURRENT) + # INA226 电流寄存器是16位有符号整数 + # 最高位是符号位:0=正(充电),1=负(放电) + # 计算 Current_LSB(根据 CALIBRATION_VALUE) + current_lsb = 0.001 * config.CALIBRATION_VALUE / 4096 # 单位:A + # 处理有符号数:如果最高位为1,转换为负数 + if raw & 0x8000: # 最高位为1,表示负数(放电) + signed_raw = raw - 0x10000 # 转换为有符号整数 + else: # 最高位为0,表示正数(充电) + signed_raw = raw + # 转换为毫安 + current_ma = signed_raw * current_lsb * 1000 + return current_ma + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[INA226] 读取电流失败: {e}") + else: + print(f"[INA226] 读取电流失败: {e}") + return 0.0 + + +def is_charging(threshold_ma=10.0): + """ + 检测是否在充电(通过电流方向判断) + + Args: + threshold_ma: 电流阈值(毫安),超过此值认为在充电,默认10mA + + Returns: + True: 正在充电 + False: 未充电或读取失败 + """ + try: + current = get_current() + is_charge = current > threshold_ma + return is_charge + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[CHARGE] 检测充电状态失败: {e}") + else: + print(f"[CHARGE] 检测充电状态失败: {e}") + return False + + +def voltage_to_percent(voltage): + """根据电压估算电池百分比(查表插值)""" + points = [ + (4.20, 100), (4.10, 95), (4.05, 85), (4.00, 75), (3.95, 65), + (3.90, 55), (3.85, 45), (3.80, 35), (3.75, 25), (3.70, 15), + (3.65, 5), (3.60, 0) + ] + if voltage >= points[0][0]: + return 100 + if voltage <= points[-1][0]: + return 0 + for i in range(len(points) - 1): + v1, p1 = points[i] + v2, p2 = points[i + 1] + if voltage >= v2: + ratio = (voltage - v1) / (v2 - v1) + percent = p1 + (p2 - p1) * ratio + return max(0, min(100, int(round(percent)))) + return 0 + diff --git a/time_sync.py b/time_sync.py new file mode 100644 index 0000000..c0bfd93 --- /dev/null +++ b/time_sync.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +时间同步模块 +从4G模块获取时间并同步到系统 +""" +import re +import os +from datetime import datetime, timedelta +import config +# from logger_bak import get_logger +from logger_manager import logger_manager + + +def parse_4g_time(cclk_response, timezone_offset=8): + """ + 解析 AT+CCLK? 返回的时间字符串,并转换为本地时间 + + Args: + cclk_response: AT+CCLK? 的响应字符串 + timezone_offset: 时区偏移(小时),默认8(中国时区 UTC+8) + + Returns: + datetime 对象(已转换为本地时间),如果解析失败返回 None + """ + try: + # 匹配格式: +CCLK: "YY/MM/DD,HH:MM:SS+TZ" + # 时区单位是四分之一小时(quarters of an hour) + match = re.search(r'\+CCLK:\s*"(\d{2})/(\d{2})/(\d{2}),(\d{2}):(\d{2}):(\d{2})([+-]\d{1,3})?"', cclk_response) + if not match: + return None + + yy, mm, dd, hh, MM, ss, tz_str = match.groups() + + # 年份处理:26 -> 2026 + year = 2000 + int(yy) + month = int(mm) + day = int(dd) + hour = int(hh) + minute = int(MM) + second = int(ss) + + # 创建 UTC 时间的 datetime 对象 + dt_utc = datetime(year, month, day, hour, minute, second) + + # 解析时区偏移(单位:四分之一小时) + if tz_str: + try: + # 时区偏移值(四分之一小时) + tz_quarters = int(tz_str) + + # 转换为小时(除以4) + tz_hours = tz_quarters / 4.0 + + logger = logger_manager.logger + if logger: + logger.info(f"[TIME] 时区偏移: {tz_str} (四分之一小时) = {tz_hours} 小时") + + # 转换为本地时间 + dt_local = dt_utc + timedelta(hours=tz_hours) + except ValueError: + # 如果时区解析失败,使用默认值 + logger = logger_manager.logger + if logger: + logger.warning(f"[TIME] 时区解析失败: {tz_str},使用默认 UTC+{timezone_offset}") + dt_local = dt_utc + timedelta(hours=timezone_offset) + else: + # 没有时区信息,使用默认值 + logger = logger_manager.logger + if logger: + logger.info(f"[TIME] 未找到时区信息,使用默认 UTC+{timezone_offset}") + dt_local = dt_utc + timedelta(hours=timezone_offset) + + logger = logger_manager.logger + if logger: + logger.info(f"[TIME] UTC时间: {dt_utc.strftime('%Y-%m-%d %H:%M:%S')}") + logger.info(f"[TIME] 本地时间: {dt_local.strftime('%Y-%m-%d %H:%M:%S')}") + + return dt_local + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[TIME] 解析时间失败: {e}, 响应: {cclk_response}") + else: + print(f"[TIME] 解析时间失败: {e}, 响应: {cclk_response}") + return None + + +def get_time_from_4g(timezone_offset=8): + """ + 通过4G模块获取当前时间(已转换为本地时间) + + Args: + timezone_offset: 时区偏移(小时),默认8(中国时区) + + Returns: + datetime 对象(本地时间),如果获取失败返回 None + """ + try: + # 发送 AT+CCLK? 命令(延迟导入避免循环依赖) + from hardware import hardware_manager + # 检查 at_client 是否已初始化 + if hardware_manager.at_client is None: + logger = logger_manager.logger + if logger: + logger.warning("[TIME] ATClient 尚未初始化,无法获取4G时间") + else: + print("[TIME] ATClient 尚未初始化,无法获取4G时间") + return None + resp = hardware_manager.at_client.send("AT+CCLK?", "OK", 3000) + + if not resp or "+CCLK:" not in resp: + logger = logger_manager.logger + if logger: + logger.warning(f"[TIME] 未获取到时间响应: {resp}") + else: + print(f"[TIME] 未获取到时间响应: {resp}") + return None + + # 解析并转换时区 + dt = parse_4g_time(resp, timezone_offset) + return dt + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[TIME] 获取4G时间异常: {e}") + else: + print(f"[TIME] 获取4G时间异常: {e}") + return None + + +def sync_system_time_from_4g(timezone_offset=8): + """ + 从4G模块同步时间到系统 + + Args: + timezone_offset: 时区偏移(小时),默认8(中国时区) + + Returns: + bool: 是否成功 + """ + dt = get_time_from_4g(timezone_offset) + if not dt: + return False + + try: + # 转换为系统 date 命令需要的格式 + time_str = dt.strftime('%Y-%m-%d %H:%M:%S') + + # 设置系统时间 + cmd = f'date -s "{time_str}" 2>&1' + result = os.system(cmd) + + if result == 0: + logger = logger_manager.logger + if logger: + logger.info(f"[TIME] 系统时间已设置为: {time_str}") + else: + print(f"[TIME] 系统时间已设置为: {time_str}") + + # 可选:同步到硬件时钟 + try: + os.system('hwclock -w 2>/dev/null') + logger = logger_manager.logger + if logger: + logger.info("[TIME] 已同步到硬件时钟") + except: + pass + + return True + else: + logger = logger_manager.logger + if logger: + logger.error(f"[TIME] 设置系统时间失败,退出码: {result}") + else: + print(f"[TIME] 设置系统时间失败,退出码: {result}") + return False + + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[TIME] 同步系统时间异常: {e}") + else: + print(f"[TIME] 同步系统时间异常: {e}") + return False + diff --git a/version.py b/version.py new file mode 100644 index 0000000..ef0a9b6 --- /dev/null +++ b/version.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +应用版本号 +每次 OTA 更新时,只需要更新这个文件中的版本号 +""" +VERSION = '1.1.1' + + + + + + diff --git a/vision.py b/vision.py new file mode 100644 index 0000000..d601c09 --- /dev/null +++ b/vision.py @@ -0,0 +1,360 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +视觉检测模块 +提供靶心检测、距离估算、图像保存等功能 +""" +import cv2 +import numpy as np +import os +import math +from maix import image +import globals +import config +from logger_manager import logger_manager + + +def detect_circle_v3(frame, laser_point=None): + """检测图像中的靶心(优先清晰轮廓,其次黄色区域)- 返回椭圆参数版本 + 增加红色圆圈检测,验证黄色圆圈是否为真正的靶心 + 如果提供 laser_point,会选择最接近激光点的目标 + + Args: + frame: 图像帧 + laser_point: 激光点坐标 (x, y),用于多目标场景下的目标选择 + + Returns: + (result_img, best_center, best_radius, method, best_radius1, ellipse_params) + """ + img_cv = image.image2cv(frame, False, False) + + best_center = best_radius = best_radius1 = method = None + ellipse_params = None + + # HSV 黄色掩码检测(模糊靶心) + hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV) + h, s, v = cv2.split(hsv) + + # 调整饱和度策略:稍微增强,不要过度 + s = np.clip(s * 1.1, 0, 255).astype(np.uint8) + + hsv = cv2.merge((h, s, v)) + + # 放宽 HSV 阈值范围(针对模糊图像的关键调整) + lower_yellow = np.array([7, 80, 0]) # 饱和度下限降低,捕捉淡黄色 + upper_yellow = np.array([32, 255, 255]) # 亮度上限拉满 + + mask_yellow = cv2.inRange(hsv, lower_yellow, upper_yellow) + + # 调整形态学操作 + kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) + mask_yellow = cv2.morphologyEx(mask_yellow, cv2.MORPH_CLOSE, kernel) + + contours_yellow, _ = cv2.findContours(mask_yellow, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + # 存储所有有效的黄色-红色组合 + valid_targets = [] + + if contours_yellow: + for cnt_yellow in contours_yellow: + area = cv2.contourArea(cnt_yellow) + perimeter = cv2.arcLength(cnt_yellow, True) + + # 计算圆度 + if perimeter > 0: + circularity = (4 * np.pi * area) / (perimeter * perimeter) + else: + circularity = 0 + + logger = logger_manager.logger + if area > 50 and circularity > 0.7: + if logger: + logger.info(f"[target] -> 面积:{area}, 圆度:{circularity:.2f}") + # 尝试拟合椭圆 + yellow_center = None + yellow_radius = None + yellow_ellipse = None + + if len(cnt_yellow) >= 5: + (x, y), (width, height), angle = cv2.fitEllipse(cnt_yellow) + yellow_ellipse = ((x, y), (width, height), angle) + axes_minor = min(width, height) + radius = axes_minor / 2 + yellow_center = (int(x), int(y)) + yellow_radius = int(radius) + else: + (x, y), radius = cv2.minEnclosingCircle(cnt_yellow) + yellow_center = (int(x), int(y)) + yellow_radius = int(radius) + yellow_ellipse = None + + # 如果检测到黄色圆圈,再检测红色圆圈进行验证 + if yellow_center and yellow_radius: + # HSV 红色掩码检测(红色在HSV中跨越0度,需要两个范围) + # 红色范围1: 0-10度(接近0度的红色) + lower_red1 = np.array([0, 80, 0]) + upper_red1 = np.array([10, 255, 255]) + mask_red1 = cv2.inRange(hsv, lower_red1, upper_red1) + + # 红色范围2: 170-180度(接近180度的红色) + lower_red2 = np.array([170, 80, 0]) + upper_red2 = np.array([180, 255, 255]) + mask_red2 = cv2.inRange(hsv, lower_red2, upper_red2) + + # 合并两个红色掩码 + mask_red = cv2.bitwise_or(mask_red1, mask_red2) + + # 形态学操作 + kernel_red = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) + mask_red = cv2.morphologyEx(mask_red, cv2.MORPH_CLOSE, kernel_red) + + contours_red, _ = cv2.findContours(mask_red, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + found_valid_red = False + + if contours_red: + # 找到所有符合条件的红色圆圈 + for cnt_red in contours_red: + area_red = cv2.contourArea(cnt_red) + perimeter_red = cv2.arcLength(cnt_red, True) + + if perimeter_red > 0: + circularity_red = (4 * np.pi * area_red) / (perimeter_red * perimeter_red) + else: + circularity_red = 0 + + # 红色圆圈也应该有一定的圆度 + if area_red > 50 and circularity_red > 0.6: + # 计算红色圆圈的中心和半径 + if len(cnt_red) >= 5: + (x_red, y_red), (w_red, h_red), angle_red = cv2.fitEllipse(cnt_red) + radius_red = min(w_red, h_red) / 2 + red_center = (int(x_red), int(y_red)) + red_radius = int(radius_red) + else: + (x_red, y_red), radius_red = cv2.minEnclosingCircle(cnt_red) + red_center = (int(x_red), int(y_red)) + red_radius = int(radius_red) + + # 计算黄色和红色圆心的距离 + if red_center: + dx = yellow_center[0] - red_center[0] + dy = yellow_center[1] - red_center[1] + distance = np.sqrt(dx*dx + dy*dy) + + # 圆心距离阈值:应该小于黄色半径的某个倍数(比如1.5倍) + max_distance = yellow_radius * 1.5 + + # 红色圆圈应该比黄色圆圈大(外圈) + if distance < max_distance and red_radius > yellow_radius * 0.8: + found_valid_red = True + logger = logger_manager.logger + if logger: + logger.info(f"[target] -> 找到匹配的红圈: 黄心({yellow_center}), 红心({red_center}), 距离:{distance:.1f}, 黄半径:{yellow_radius}, 红半径:{red_radius}") + + # 记录这个有效目标 + valid_targets.append({ + 'center': yellow_center, + 'radius': yellow_radius, + 'ellipse': yellow_ellipse, + 'area': area + }) + break + + if not found_valid_red: + logger = logger_manager.logger + if logger: + logger.debug("Debug -> 未找到匹配的红色圆圈,可能是误识别") + + # 从所有有效目标中选择最佳目标 + if valid_targets: + if laser_point: + # 如果有激光点,选择最接近激光点的目标 + best_target = None + min_distance = float('inf') + for target in valid_targets: + dx = target['center'][0] - laser_point[0] + dy = target['center'][1] - laser_point[1] + distance = np.sqrt(dx*dx + dy*dy) + if distance < min_distance: + min_distance = distance + best_target = target + if best_target: + best_center = best_target['center'] + best_radius = best_target['radius'] + ellipse_params = best_target['ellipse'] + method = "v3_ellipse_red_validated_laser_selected" + best_radius1 = best_radius * 5 + else: + # 如果没有激光点,选择面积最大的目标 + best_target = max(valid_targets, key=lambda t: t['area']) + best_center = best_target['center'] + best_radius = best_target['radius'] + ellipse_params = best_target['ellipse'] + method = "v3_ellipse_red_validated" + best_radius1 = best_radius * 5 + + result_img = image.cv2image(img_cv, False, False) + return result_img, best_center, best_radius, method, best_radius1, ellipse_params + + +def estimate_distance(pixel_radius): + """根据像素半径估算实际距离(单位:米)""" + if not pixel_radius: + return 0.0 + return (config.REAL_RADIUS_CM * config.FOCAL_LENGTH_PIX) / pixel_radius / 100.0 + + +def compute_laser_position(circle_center, laser_point, radius, method): + """计算激光相对于靶心的偏移量(单位:厘米)""" + if not all([circle_center, radius, method]): + return None, None + cx, cy = circle_center + lx, ly = laser_point + # 根据检测方法动态调整靶心物理半径(简化模型) + circle_r = (radius / 4.0) * 20.0 if method == "模糊" else (68 / 16.0) * 20.0 + dx = lx - cx + dy = ly - cy + return dx / (circle_r / 100.0), -dy / (circle_r / 100.0) + + +def save_shot_image(result_img, center, radius, method, ellipse_params, + laser_point, distance_m, photo_dir=None): + """ + 保存射击图像(带标注) + 即使没有检测到靶心也会保存图像,文件名会标注 "no_target" + 确保保存的图像总是包含激光十字线 + + Args: + result_img: 处理后的图像对象(可能已经包含激光十字线或检测标注) + center: 靶心中心坐标 (x, y),可能为 None(未检测到靶心) + radius: 靶心半径,可能为 None(未检测到靶心) + method: 检测方法,可能为 None(未检测到靶心) + ellipse_params: 椭圆参数 ((center, (width, height), angle)) 或 None + laser_point: 激光点坐标 (x, y) + distance_m: 距离(米),可能为 None(未检测到靶心) + photo_dir: 照片存储目录,如果为None则使用 config.PHOTO_DIR + + Returns: + str: 保存的文件路径,如果保存失败或未启用则返回 None + """ + # 检查是否启用图像保存 + if not config.SAVE_IMAGE_ENABLED: + return None + + if photo_dir is None: + photo_dir = config.PHOTO_DIR + + try: + # 确保照片目录存在 + try: + if photo_dir not in os.listdir("/root"): + os.mkdir(photo_dir) + except: + pass + + # 生成文件名 + # 统计所有图片文件(包括 .bmp 和 .jpg) + try: + all_images = [f for f in os.listdir(photo_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))] + img_count = len(all_images) + except: + img_count = 0 + + x, y = laser_point + + # 如果未检测到靶心,在文件名中标注 + if center is None or radius is None: + method_str = "no_target" + distance_str = "000" + else: + method_str = method or "unknown" + distance_str = str(round((distance_m or 0.0) * 100)) + + filename = f"{photo_dir}/{method_str}_{int(x)}_{int(y)}_{distance_str}_{img_count:04d}.bmp" + + logger = logger_manager.logger + if logger: + if center and radius: + logger.info(f"结果 -> 圆心: {center}, 半径: {radius}, 方法: {method}") + if ellipse_params: + (ell_center, (width, height), angle) = ellipse_params + logger.info(f"椭圆 -> 中心: ({ell_center[0]:.1f}, {ell_center[1]:.1f}), 长轴: {max(width, height):.1f}, 短轴: {min(width, height):.1f}, 角度: {angle:.1f}°") + else: + logger.info(f"结果 -> 未检测到靶心,保存原始图像(激光点: ({x}, {y}))") + + # 转换图像为 OpenCV 格式以便绘制 + img_cv = image.image2cv(result_img, False, False) + + # 确保激光十字线被绘制(使用OpenCV在图像上绘制,确保可见性) + laser_color = (config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2]) + thickness = max(config.LASER_THICKNESS, 2) # 至少2像素宽,确保可见 + length = max(config.LASER_LENGTH, 10) # 至少10像素长 + + # 绘制激光十字线(水平线) + cv2.line(img_cv, + (int(x - length), int(y)), + (int(x + length), int(y)), + laser_color, thickness) + # 绘制激光十字线(垂直线) + cv2.line(img_cv, + (int(x), int(y - length)), + (int(x), int(y + length)), + laser_color, thickness) + # 绘制激光点 + cv2.circle(img_cv, (int(x), int(y)), max(thickness, 3), laser_color, -1) + + # 如果检测到靶心,绘制靶心标注 + if center and radius: + cx, cy = center + + if ellipse_params: + (ell_center, (width, height), angle) = ellipse_params + cx_ell, cy_ell = int(ell_center[0]), int(ell_center[1]) + + # 绘制椭圆 + cv2.ellipse(img_cv, + (cx_ell, cy_ell), + (int(width/2), int(height/2)), + angle, + 0, 360, + (0, 255, 0), + 2) + cv2.circle(img_cv, (cx_ell, cy_ell), 3, (255, 0, 0), -1) + + # 绘制短轴 + minor_length = min(width, height) / 2 + minor_angle = angle + 90 if width >= height else angle + minor_angle_rad = math.radians(minor_angle) + dx_minor = minor_length * math.cos(minor_angle_rad) + dy_minor = minor_length * math.sin(minor_angle_rad) + pt1_minor = (int(cx_ell - dx_minor), int(cy_ell - dy_minor)) + pt2_minor = (int(cx_ell + dx_minor), int(cy_ell + dy_minor)) + cv2.line(img_cv, pt1_minor, pt2_minor, (0, 0, 255), 2) + else: + # 绘制圆形靶心 + cv2.circle(img_cv, (cx, cy), radius, (0, 0, 255), 2) + cv2.circle(img_cv, (cx, cy), 2, (0, 0, 255), -1) + + # 如果检测到靶心,绘制从激光点到靶心的连线(可选,用于可视化偏移) + cv2.line(img_cv, (int(x), int(y)), (cx, cy), (255, 255, 0), 1) + + # 转换回 MaixPy 图像格式并保存 + result_img = image.cv2image(img_cv, False, False) + result_img.save(filename) + + if logger: + if center and radius: + logger.debug(f"图像已保存(含靶心标注): {filename}") + else: + logger.debug(f"图像已保存(无靶心,含激光十字线): {filename}") + + return filename + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"保存图像失败: {e}") + import traceback + logger.error(traceback.format_exc()) + return None +