Files
archery/at_client.py

308 lines
9.5 KiB
Python
Raw Normal View History

2026-01-12 11:39:27 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AT客户端模块
负责4G模块的AT命令通信和URC解析
"""
import _thread
from maix import time
import re
import threading
class ATClient:
"""
单读者 AT/URC 客户端唯一读取 uart4g避免 tcp_main/at()/OTA 抢读导致 EOF / 丢包
- send(cmd, expect, timeout_ms) : 发送 AT 并等待 expect
- pop_tcp_payload() : 获取 +MIPURC:"rtcp" payload已按长度裁剪
- pop_http_event() : 获取 +MHTTPURC 事件header/content
"""
def __init__(self, uart_obj):
self.uart = uart_obj
self._cmd_lock = threading.Lock()
self._q_lock = threading.Lock()
self._rx = b""
self._tcp_payloads = []
self._http_events = []
# 当前命令等待状态(仅允许单命令 in-flight
self._waiting = False
self._expect = b"OK"
self._resp = b""
self._running = False
def start(self):
if self._running:
return
self._running = True
_thread.start_new_thread(self._reader_loop, ())
def stop(self):
self._running = False
def flush(self):
"""清空内部缓存与队列(用于 OTA/异常恢复)"""
with self._q_lock:
self._rx = b""
self._tcp_payloads.clear()
self._http_events.clear()
self._resp = b""
def pop_tcp_payload(self):
with self._q_lock:
if self._tcp_payloads:
return self._tcp_payloads.pop(0)
return None
def pop_http_event(self):
with self._q_lock:
if self._http_events:
return self._http_events.pop(0)
return None
def _push_tcp_payload(self, payload: bytes):
# 注意:在 _reader_loop 内部解析 URC 时已经持有 _q_lock
# 这里不要再次 acquire锁不可重入会死锁
self._tcp_payloads.append(payload)
def _push_http_event(self, ev):
# 同上:避免在 _reader_loop 持锁期间二次 acquire
self._http_events.append(ev)
def send(self, cmd: str, expect: str = "OK", timeout_ms: int = 2000):
"""
发送 AT 命令并等待 expect子串匹配
注意expect=">" 用于等待 prompt
"""
expect_b = expect.encode() if isinstance(expect, str) else expect
with self._cmd_lock:
# 初始化等待
self._waiting = True
self._expect = expect_b
self._resp = b""
# 发送
if cmd:
# 注意:这里不要再用 uart4g_lock否则外层已经持锁时会死锁
# 写入由 _cmd_lock 串行化即可。
self.uart.write((cmd + "\r\n").encode())
t0 = time.ticks_ms()
while time.ticks_ms() - t0 < timeout_ms:
if (not self._waiting) or (self._expect in self._resp):
self._waiting = False
break
time.sleep_ms(5)
# 超时也返回已收集内容(便于诊断)
self._waiting = False
try:
return self._resp.decode(errors="ignore")
except:
return str(self._resp)
def _find_urc_tag(self, tag: bytes):
"""
只在"真正的 URC 边界"查找 tag避免误命中 HTTP payload 内容
规则tag 必须出现在 buffer 开头或紧跟在 b"\\r\\n" 后面
"""
try:
i = 0
rx = self._rx
while True:
j = rx.find(tag, i)
if j < 0:
return -1
if j == 0:
return 0
if j >= 2 and rx[j - 2:j] == b"\r\n":
return j
i = j + 1
except:
return -1
def _parse_mipurc_rtcp(self):
"""
解析+MIPURC: "rtcp",<link_id>,<len>,<payload...>
之前硬编码 link_id=0 会导致在多连接/重连场景下收不到数据
"""
prefix = b'+MIPURC: "rtcp",'
i = self._find_urc_tag(prefix)
if i < 0:
return False
# 丢掉前置噪声
if i > 0:
self._rx = self._rx[i:]
i = 0
j = len(prefix)
# 解析 link_id
k = j
while k < len(self._rx) and 48 <= self._rx[k] <= 57:
k += 1
if k == j or k >= len(self._rx):
return False
if self._rx[k:k+1] != b",":
self._rx = self._rx[1:]
return True
try:
link_id = int(self._rx[j:k].decode())
except:
self._rx = self._rx[1:]
return True
# 解析 len
j2 = k + 1
k2 = j2
while k2 < len(self._rx) and 48 <= self._rx[k2] <= 57:
k2 += 1
if k2 == j2 or k2 >= len(self._rx):
return False
if self._rx[k2:k2+1] != b",":
self._rx = self._rx[1:]
return True
try:
n = int(self._rx[j2:k2].decode())
except:
self._rx = self._rx[1:]
return True
payload_start = k2 + 1
payload_end = payload_start + n
if len(self._rx) < payload_end:
return False # payload 未收齐
payload = self._rx[payload_start:payload_end]
# 把 link_id 一起带上,便于上层过滤(如果需要)
self._push_tcp_payload((link_id, payload))
self._rx = self._rx[payload_end:]
return True
def _parse_mhttpurc_header(self):
tag = b'+MHTTPURC: "header",'
i = self._find_urc_tag(tag)
if i < 0:
return False
if i > 0:
self._rx = self._rx[i:]
i = 0
# header: +MHTTPURC: "header",<id>,<code>,<hdr_len>,<hdr_text...>
j = len(tag)
comma_count = 0
k = j
while k < len(self._rx) and comma_count < 3:
if self._rx[k:k+1] == b",":
comma_count += 1
k += 1
if comma_count < 3:
return False
prefix = self._rx[:k]
m = re.search(rb'\+MHTTPURC: "header",\s*(\d+),\s*(\d+),\s*(\d+),', prefix)
if not m:
self._rx = self._rx[1:]
return True
urc_id = int(m.group(1))
code = int(m.group(2))
hdr_len = int(m.group(3))
text_start = k
text_end = text_start + hdr_len
if len(self._rx) < text_end:
return False
hdr_text = self._rx[text_start:text_end].decode("utf-8", "ignore")
self._push_http_event(("header", urc_id, code, hdr_text))
self._rx = self._rx[text_end:]
return True
def _parse_mhttpurc_content(self):
tag = b'+MHTTPURC: "content",'
i = self._find_urc_tag(tag)
if i < 0:
return False
if i > 0:
self._rx = self._rx[i:]
i = 0
# content: +MHTTPURC: "content",<id>,<total>,<sum>,<cur>,<payload...>
j = len(tag)
comma_count = 0
k = j
while k < len(self._rx) and comma_count < 4:
if self._rx[k:k+1] == b",":
comma_count += 1
k += 1
if comma_count < 4:
return False
prefix = self._rx[:k]
m = re.search(rb'\+MHTTPURC: "content",\s*(\d+),\s*(\d+),\s*(\d+),\s*(\d+),', prefix)
if not m:
self._rx = self._rx[1:]
return True
urc_id = int(m.group(1))
total_len = int(m.group(2))
sum_len = int(m.group(3))
cur_len = int(m.group(4))
payload_start = k
payload_end = payload_start + cur_len
if len(self._rx) < payload_end:
return False
payload = self._rx[payload_start:payload_end]
self._push_http_event(("content", urc_id, total_len, sum_len, cur_len, payload))
self._rx = self._rx[payload_end:]
return True
def _reader_loop(self):
while self._running:
# 关键UART 驱动偶发 read failed必须兜住否则线程挂了 OTA/TCP 都会卡死
try:
d = self.uart.read(4096) # 8192 在一些驱动上更容易触发 read failed
except Exception as e:
try:
print("[ATClient] uart read failed:", e)
except:
pass
time.sleep_ms(50)
continue
if not d:
time.sleep_ms(1)
continue
with self._q_lock:
self._rx += d
if self._waiting:
self._resp += d
while True:
progressed = (
self._parse_mipurc_rtcp()
or self._parse_mhttpurc_header()
or self._parse_mhttpurc_content()
)
if not progressed:
break
# 使用 ota_manager 访问 ota_in_progress
try:
from ota_manager import ota_manager
ota_flag = ota_manager.ota_in_progress
except:
ota_flag = False
has_http_hint = (b"+MHTTP" in self._rx) or (b"+MHTTPURC" in self._rx)
if ota_flag or has_http_hint:
if len(self._rx) > 512 * 1024:
self._rx = self._rx[-256 * 1024:]
else:
if len(self._rx) > 16384:
self._rx = self._rx[-4096:]