#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ AT客户端模块 负责4G模块的AT命令通信和URC解析 """ import _thread from maix import time import re import threading class ATClient: """ 单读者 AT/URC 客户端:唯一读取 uart4g,避免 tcp_main/at()/OTA 抢读导致 EOF / 丢包。 - send(cmd, expect, timeout_ms) : 发送 AT 并等待 expect - pop_tcp_payload() : 获取 +MIPURC:"rtcp" 的 payload(已按长度裁剪) - pop_http_event() : 获取 +MHTTPURC 事件(header/content) """ def __init__(self, uart_obj): self.uart = uart_obj self._cmd_lock = threading.Lock() self._q_lock = threading.Lock() self._rx = b"" self._tcp_payloads = [] self._http_events = [] # 当前命令等待状态(仅允许单命令 in-flight) self._waiting = False self._expect = b"OK" self._resp = b"" self._running = False def start(self): if self._running: return self._running = True _thread.start_new_thread(self._reader_loop, ()) def stop(self): self._running = False def flush(self): """清空内部缓存与队列(用于 OTA/异常恢复)""" with self._q_lock: self._rx = b"" self._tcp_payloads.clear() self._http_events.clear() self._resp = b"" def pop_tcp_payload(self): with self._q_lock: if self._tcp_payloads: return self._tcp_payloads.pop(0) return None def pop_http_event(self): with self._q_lock: if self._http_events: return self._http_events.pop(0) return None def _push_tcp_payload(self, payload: bytes): # 注意:在 _reader_loop 内部解析 URC 时已经持有 _q_lock, # 这里不要再次 acquire(锁不可重入,会死锁)。 self._tcp_payloads.append(payload) def _push_http_event(self, ev): # 同上:避免在 _reader_loop 持锁期间二次 acquire self._http_events.append(ev) def send(self, cmd: str, expect: str = "OK", timeout_ms: int = 2000): """ 发送 AT 命令并等待 expect(子串匹配)。 注意:expect=">" 用于等待 prompt。 """ expect_b = expect.encode() if isinstance(expect, str) else expect with self._cmd_lock: # 初始化等待 self._waiting = True self._expect = expect_b self._resp = b"" # 发送 if cmd: # 注意:这里不要再用 uart4g_lock(否则外层已经持锁时会死锁)。 # 写入由 _cmd_lock 串行化即可。 self.uart.write((cmd + "\r\n").encode()) t0 = time.ticks_ms() while abs(time.ticks_diff(time.ticks_ms(), t0)) < timeout_ms: if (not self._waiting) or (self._expect in self._resp): self._waiting = False break time.sleep_ms(5) # 超时也返回已收集内容(便于诊断) self._waiting = False try: return self._resp.decode(errors="ignore") except: return str(self._resp) def _find_urc_tag(self, tag: bytes): """ 只在"真正的 URC 边界"查找 tag,避免误命中 HTTP payload 内容。 规则:tag 必须出现在 buffer 开头,或紧跟在 b"\\r\\n" 后面。 """ try: i = 0 rx = self._rx while True: j = rx.find(tag, i) if j < 0: return -1 if j == 0: return 0 if j >= 2 and rx[j - 2:j] == b"\r\n": return j i = j + 1 except: return -1 def _parse_mipurc_rtcp(self): """ 解析:+MIPURC: "rtcp",,, 之前硬编码 link_id=0 会导致在多连接/重连场景下收不到数据。 """ prefix = b'+MIPURC: "rtcp",' i = self._find_urc_tag(prefix) if i < 0: return False # 丢掉前置噪声 if i > 0: self._rx = self._rx[i:] i = 0 j = len(prefix) # 解析 link_id k = j while k < len(self._rx) and 48 <= self._rx[k] <= 57: k += 1 if k == j or k >= len(self._rx): return False if self._rx[k:k+1] != b",": self._rx = self._rx[1:] return True try: link_id = int(self._rx[j:k].decode()) except: self._rx = self._rx[1:] return True # 解析 len j2 = k + 1 k2 = j2 while k2 < len(self._rx) and 48 <= self._rx[k2] <= 57: k2 += 1 if k2 == j2 or k2 >= len(self._rx): return False if self._rx[k2:k2+1] != b",": self._rx = self._rx[1:] return True try: n = int(self._rx[j2:k2].decode()) except: self._rx = self._rx[1:] return True payload_start = k2 + 1 payload_end = payload_start + n if len(self._rx) < payload_end: return False # payload 未收齐 payload = self._rx[payload_start:payload_end] # 把 link_id 一起带上,便于上层过滤(如果需要) self._push_tcp_payload((link_id, payload)) self._rx = self._rx[payload_end:] return True def _parse_mhttpurc_header(self): tag = b'+MHTTPURC: "header",' i = self._find_urc_tag(tag) if i < 0: return False if i > 0: self._rx = self._rx[i:] i = 0 # header: +MHTTPURC: "header",,,, j = len(tag) comma_count = 0 k = j while k < len(self._rx) and comma_count < 3: if self._rx[k:k+1] == b",": comma_count += 1 k += 1 if comma_count < 3: return False prefix = self._rx[:k] m = re.search(rb'\+MHTTPURC: "header",\s*(\d+),\s*(\d+),\s*(\d+),', prefix) if not m: self._rx = self._rx[1:] return True urc_id = int(m.group(1)) code = int(m.group(2)) hdr_len = int(m.group(3)) text_start = k text_end = text_start + hdr_len if len(self._rx) < text_end: return False hdr_text = self._rx[text_start:text_end].decode("utf-8", "ignore") self._push_http_event(("header", urc_id, code, hdr_text)) self._rx = self._rx[text_end:] return True def _parse_mhttpurc_content(self): tag = b'+MHTTPURC: "content",' i = self._find_urc_tag(tag) if i < 0: return False if i > 0: self._rx = self._rx[i:] i = 0 # content: +MHTTPURC: "content",,,,, j = len(tag) comma_count = 0 k = j while k < len(self._rx) and comma_count < 4: if self._rx[k:k+1] == b",": comma_count += 1 k += 1 if comma_count < 4: return False prefix = self._rx[:k] m = re.search(rb'\+MHTTPURC: "content",\s*(\d+),\s*(\d+),\s*(\d+),\s*(\d+),', prefix) if not m: self._rx = self._rx[1:] return True urc_id = int(m.group(1)) total_len = int(m.group(2)) sum_len = int(m.group(3)) cur_len = int(m.group(4)) payload_start = k payload_end = payload_start + cur_len if len(self._rx) < payload_end: return False payload = self._rx[payload_start:payload_end] self._push_http_event(("content", urc_id, total_len, sum_len, cur_len, payload)) self._rx = self._rx[payload_end:] return True def _reader_loop(self): while self._running: # 关键:UART 驱动偶发 read failed,必须兜住,否则线程挂了 OTA/TCP 都会卡死 try: d = self.uart.read(4096) # 8192 在一些驱动上更容易触发 read failed except Exception as e: try: print("[ATClient] uart read failed:", e) except: pass time.sleep_ms(50) continue if not d: time.sleep_ms(1) continue with self._q_lock: self._rx += d if self._waiting: self._resp += d while True: progressed = ( self._parse_mipurc_rtcp() or self._parse_mhttpurc_header() or self._parse_mhttpurc_content() ) if not progressed: break # 使用 ota_manager 访问 ota_in_progress try: from ota_manager import ota_manager ota_flag = ota_manager.ota_in_progress except: ota_flag = False has_http_hint = (b"+MHTTP" in self._rx) or (b"+MHTTPURC" in self._rx) if ota_flag or has_http_hint: if len(self._rx) > 512 * 1024: self._rx = self._rx[-256 * 1024:] else: if len(self._rx) > 16384: self._rx = self._rx[-4096:]