Files
archery/network.py

1904 lines
85 KiB
Python
Raw Normal View History

2026-01-20 11:25:17 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
网络通信模块
提供TCP通信数据包打包/解析队列管理等功能
"""
import json
import re
2026-01-20 11:25:17 +08:00
from math import e
import struct
from maix import time
import hmac
import hashlib
import ujson
import os
import threading
import socket
import config
2026-01-22 17:55:11 +08:00
2026-01-20 11:25:17 +08:00
from hardware import hardware_manager
from power import get_bus_voltage, voltage_to_percent
# from laser import laser_manager
# from ota import ota_manager
from logger_manager import logger_manager
from wifi import wifi_manager
2026-01-20 11:25:17 +08:00
2026-04-14 09:02:41 +08:00
def _calculate_tcp_ssl_password(device_id, iccid):
"""
与服务器 calculatePassword(deviceId, iccid) 一致
hex(md5(hex(md5(deviceId)) + iccid))iccid 为空则不拼接
"""
md5_device_hex = hashlib.md5(device_id.encode("utf-8")).hexdigest()
if iccid:
md5_device_hex = md5_device_hex + iccid
return hashlib.md5(md5_device_hex.encode("utf-8")).hexdigest()
def _wifi_tls_would_block(exc):
"""
非阻塞 TLS recv/send 常抛出 WANT_READ / WANT_WRITE或等价文案
表示需等待对端/内核缓冲区不是断线
"""
try:
import ssl as _ssl
except ImportError:
_ssl = None
if _ssl is not None and isinstance(exc, _ssl.SSLError):
err = getattr(exc, "errno", None)
if err in (
getattr(_ssl, "SSL_ERROR_WANT_READ", 2),
getattr(_ssl, "SSL_ERROR_WANT_WRITE", 3),
):
return True
msg = str(exc).lower()
if "did not complete" in msg and ("read" in msg or "write" in msg):
return True
return False
2026-01-20 11:25:17 +08:00
class NetworkManager:
"""网络通信管理器(单例)"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(NetworkManager, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# 私有状态
self._tcp_connected = False
self._high_send_queue = []
self._normal_send_queue = []
self._queue_lock = threading.Lock()
2026-02-10 17:52:55 +08:00
self._send_event = threading.Event()
2026-01-20 11:25:17 +08:00
self._uart4g_lock = threading.Lock()
self._device_id = None
self._password = None
self._raw_line_data = []
self._manual_trigger_flag = False
# 网络类型状态
self._network_type = None # "wifi" 或 "4G" 或 None
# 本次上电曾因 WiFi 质量差切换到 4G 后,直至关机不再改回 WiFi
self._session_force_4g = False
2026-01-20 11:25:17 +08:00
self._initialized = True
2026-01-22 17:55:11 +08:00
# 导入 archery_netcore 模块,并检查是否存在 parse_packet 和 make_packet 函数
try:
import archery_netcore as _netcore
self._netcore = _netcore
if hasattr(self._netcore, "parse_packet") and hasattr(self._netcore, "make_packet") and hasattr(self._netcore, "actions_for_inner_cmd"):
print("[NET] archery_netcore found")
else:
print("[NET] archery_netcore not found parse_packet or make_packet")
exit(1)
except Exception:
print("[NET] import archery_netcore failed")
exit(1)
# 服务器相关
self._server_ip = self._netcore.get_config().get("SERVER_IP")
self._server_port = self._netcore.get_config().get("SERVER_PORT")
2026-01-20 11:25:17 +08:00
# ==================== 状态访问(只读属性)====================
2026-01-20 18:40:54 +08:00
@property
def logger(self):
"""获取 logger 对象"""
return logger_manager.logger
2026-01-20 11:25:17 +08:00
@property
def tcp_connected(self):
"""TCP连接状态"""
return self._tcp_connected
@property
def device_id(self):
"""设备ID"""
return self._device_id
@property
def password(self):
"""密码"""
return self._password
@property
def has_pending_messages(self):
"""是否有待发送消息"""
with self._queue_lock:
return len(self._high_send_queue) > 0 or len(self._normal_send_queue) > 0
@property
def manual_trigger_flag(self):
"""手动触发标志"""
return self._manual_trigger_flag
@property
def network_type(self):
"""当前使用的网络类型("wifi""4g""""
return self._network_type
@property
def wifi_connected(self):
"""WiFi是否已连接"""
return wifi_manager.wifi_connected
2026-01-20 11:25:17 +08:00
@property
def wifi_ip(self):
"""WiFi IP地址"""
return wifi_manager.wifi_ip
2026-01-20 11:25:17 +08:00
# ==================== 内部状态管理方法 ====================
def set_manual_trigger(self, value=True):
"""设置手动触发标志(公共方法)"""
self._manual_trigger_flag = value
def clear_manual_trigger(self):
"""清除手动触发标志(公共方法)"""
self._manual_trigger_flag = False
def _set_tcp_connected(self, connected):
"""设置TCP连接状态内部方法"""
self._tcp_connected = connected
def _set_device_info(self, device_id, password):
"""设置设备信息(内部方法)"""
self._device_id = device_id
self._password = password
def _enqueue(self, item, high=False):
"""线程安全地加入队列(内部方法)"""
with self._queue_lock:
if high:
self._high_send_queue.append(item)
else:
self._normal_send_queue.append(item)
2026-02-10 17:52:55 +08:00
self._send_event.set()
2026-01-20 11:25:17 +08:00
def _dequeue(self):
"""线程安全地从队列取出(内部方法)"""
with self._queue_lock:
if self._high_send_queue:
return self._high_send_queue.pop(0)
elif self._normal_send_queue:
return self._normal_send_queue.pop(0)
return None
def _set_raw_line_data(self, data):
"""设置原始行数据(内部方法)"""
self._raw_line_data = data
def _get_raw_line_data(self):
"""获取原始行数据(内部方法)"""
return self._raw_line_data
def get_uart_lock(self):
"""获取UART锁用于with语句"""
return self._uart4g_lock
def get_queue_lock(self):
"""获取队列锁用于with语句"""
return self._queue_lock
# ==================== 业务方法 ====================
def read_device_id(self):
"""从 /device_key 文件读取设备唯一 ID失败则使用默认值"""
2026-04-14 09:02:41 +08:00
def _set_password_for_device_id(device_id):
if getattr(config, "USE_TCP_SSL", False):
iccid = self.get_4g_mccid()
iccid = iccid if iccid else ""
print(f"iccid: {iccid}")
self._password = _calculate_tcp_ssl_password(device_id, iccid)
else:
self._password = device_id + "."
print(f"self._password: {self._password}")
2026-01-20 11:25:17 +08:00
try:
with open("/device_key", "r") as f:
device_id = f.read().strip()
if device_id:
2026-01-20 18:40:54 +08:00
self.logger.debug(f"[INFO] 从 /device_key 读取到 DEVICE_ID: {device_id}")
2026-01-20 11:25:17 +08:00
# 设置内部状态
self._device_id = device_id
2026-04-14 09:02:41 +08:00
_set_password_for_device_id(device_id)
2026-01-20 11:25:17 +08:00
return device_id
except Exception as e:
2026-01-20 18:40:54 +08:00
self.logger.error(f"[ERROR] 无法读取 /device_key: {e}")
2026-01-20 11:25:17 +08:00
# 使用默认值
default_id = "DEFAULT_DEVICE_ID"
self._device_id = default_id
2026-04-14 09:02:41 +08:00
_set_password_for_device_id(default_id)
2026-01-20 11:25:17 +08:00
return default_id
# ==================== WiFi 管理方法(委托给 wifi_manager====================
2026-01-20 11:25:17 +08:00
def is_wifi_connected(self):
"""检查WiFi是否已连接"""
return wifi_manager.is_wifi_connected()
2026-01-20 11:25:17 +08:00
2026-02-09 11:24:46 +08:00
def connect_wifi(self, ssid, password, verify_host=None, verify_port=None, persist=True, timeout_s=20):
2026-01-20 11:25:17 +08:00
"""
连接 Wi-Fi委托给 wifi_manager
2026-01-20 11:25:17 +08:00
Returns:
(ip, error): IP地址和错误信息成功时error为None
"""
2026-01-24 11:05:03 +08:00
# 配置文件路径定义
2026-02-09 11:24:46 +08:00
conf_path = "/etc/wpa_supplicant.conf"
ssid_file = "/boot/wifi.ssid"
pass_file = "/boot/wifi.pass"
def _read_text(path: str):
try:
if os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
return f.read()
except Exception:
return None
return None
def _write_text(path: str, content: str):
with open(path, "w", encoding="utf-8") as f:
f.write(content)
def _restore_boot(old_ssid: str | None, old_pass: str | None):
# 还原 /boot 凭证:原来没有就删除,原来有就写回
try:
if old_ssid is None:
if os.path.exists(ssid_file):
os.remove(ssid_file)
else:
_write_text(ssid_file, old_ssid)
except Exception:
pass
try:
if old_pass is None:
if os.path.exists(pass_file):
os.remove(pass_file)
else:
_write_text(pass_file, old_pass)
except Exception:
pass
old_conf = _read_text(conf_path)
old_boot_ssid = _read_text(ssid_file)
old_boot_pass = _read_text(pass_file)
2026-01-20 11:25:17 +08:00
try:
2026-02-09 11:24:46 +08:00
# 生成 wpa_supplicant 配置(写 /etc 作为辅助,具体是否生效取决于 S30wifi 脚本)
net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read()
if "network={" not in net_conf:
raise RuntimeError("Failed to generate wpa config")
try:
_write_text(
conf_path,
"ctrl_interface=/var/run/wpa_supplicant\n"
"update_config=1\n\n"
+ net_conf,
)
except Exception:
# 不强制要求写 /etc 成功(某些系统只用 /boot
pass
# ====== 临时写入 /boot 凭证,触发 WiFi 服务真正尝试连接新 SSID ======
_write_text(ssid_file, ssid.strip())
_write_text(pass_file, password.strip())
2026-01-20 11:25:17 +08:00
# 重启 Wi-Fi 服务
2026-02-09 11:24:46 +08:00
os.system("/etc/init.d/S30wifi restart")
2026-01-20 11:25:17 +08:00
# 等待获取 IP
2026-02-09 11:24:46 +08:00
import time as std_time
wait_s = int(timeout_s) if timeout_s and timeout_s > 0 else 20
wait_s = min(max(wait_s, 5), 60)
for _ in range(wait_s):
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
if ip:
# 拿到 IP 不代表可上网/可访问目标;继续做可达性验证
self._wifi_connected = True
self._wifi_ip = ip
self.logger.info(f"[WIFI] 已连接IP: {ip},开始验证网络可用性...")
# 验证能访问指定目标(默认使用 TCP 服务器)
v_host = verify_host if verify_host is not None else self._server_ip
v_port = int(verify_port) if verify_port is not None else int(self._server_port)
if v_host and v_port:
if not self.is_server_reachable(v_host, v_port, timeout=5):
raise RuntimeError(f"Target unreachable ({v_host}:{v_port})")
# ====== 验证通过 ======
if not persist:
# 不持久化:把 /boot 恢复成旧值(不重启,当前连接保持不变)
_restore_boot(old_boot_ssid, old_boot_pass)
self.logger.info("[WIFI] 网络验证通过,但按 persist=False 回滚 /boot 凭证(不重启)")
else:
self.logger.info("[WIFI] 网络验证通过,/boot 凭证已保留(持久化)")
2026-01-20 11:25:17 +08:00
return ip, None
2026-02-09 11:24:46 +08:00
std_time.sleep(1)
raise RuntimeError("Timeout: No IP obtained")
except Exception as e:
# 失败:回滚 /boot 和 /etc重启 WiFi 恢复旧网络
_restore_boot(old_boot_ssid, old_boot_pass)
try:
if old_conf is not None:
_write_text(conf_path, old_conf)
except Exception:
pass
try:
os.system("/etc/init.d/S30wifi restart")
except Exception:
pass
self._wifi_connected = False
self._wifi_ip = None
self.logger.error(f"[WIFI] 连接/验证失败,已回滚: {e}")
return None, str(e)
2026-01-20 11:25:17 +08:00
def is_server_reachable(self, host, port=80, timeout=5):
"""检查目标主机端口是否可达(用于网络检测)"""
try:
addr_info = socket.getaddrinfo(host, port)[0]
s = socket.socket(addr_info[0], addr_info[1], addr_info[2])
s.settimeout(timeout)
s.connect(addr_info[-1])
s.close()
return True
except Exception as e:
2026-01-20 18:40:54 +08:00
self.logger.warning(f"[NET] 无法连接 {host}:{port} - {e}")
2026-01-20 11:25:17 +08:00
return False
# ==================== 网络选择策略 ====================
def _get_wifi_rssi_dbm(self):
"""获取 WiFi 信号强度(委托给 wifi_manager"""
return wifi_manager._get_wifi_rssi_dbm()
def _measure_wifi_tcp_rtt_ms(self, host, port, samples=3, per_sample_timeout_ms=900):
"""测量 WiFi TCP RTT委托给 wifi_manager"""
return wifi_manager._measure_wifi_tcp_rtt_ms(host, port, samples, per_sample_timeout_ms)
def _is_wifi_quality_bad(self, wifi_rtt_ms, wifi_rssi_dbm):
"""判断 WiFi 质量是否差(委托给 wifi_manager"""
return wifi_manager._is_wifi_quality_bad(wifi_rtt_ms, wifi_rssi_dbm)
def is_4g_available(self):
"""
快速判断 4G 是否可用不做 DNS仅验证 SIM/附着/ IP
兼容性兜底若未拿到 IP再补充设置 APN 并激活 PDP context
"""
try:
atc = hardware_manager.at_client
if atc is None:
return False
with self.get_uart_lock():
# 1) SIM 就绪
r = atc.send("AT+CPIN?", "READY", 3000)
if "READY" not in r:
return False
# 2) 尝试附着到网络
# 不同模块返回可能略有差异,所以做宽松解析
r2 = atc.send("AT+CGATT?", "OK", 3000)
m = re.search(r"\+CGATT:\s*(\d+)", r2)
attached = int(m.group(1)) == 1 if m else None
if attached is False or attached is None:
atc.send("AT+CGATT=1", "OK", 5000)
# 3) 查询 PDP 地址(有 IP 表示网络可用)
def _extract_ip(resp: str):
m3 = re.search(r"(\d{1,3}\.){3}\d{1,3}", resp or "")
if not m3:
return None
ip = m3.group(0)
if ip.startswith("0.") or ip.startswith("127."):
return None
return ip
r3 = atc.send("AT+CGPADDR=1", "OK", 3000)
ip = _extract_ip(r3)
if ip:
return True
# 4) 若没 IP补充设置 APN + 激活 PDP某些网络/模组需要这一步)
atc.send('AT+CGDCONT=1,"IP","CMNET"', "OK", 3000)
qact_resp = atc.send("AT+CGACT?", "OK", 3000)
if "+CGACT:" not in (qact_resp or "") or "1,1" not in (qact_resp or ""):
atc.send("AT+CGACT=1,1", "OK", 10000)
r4 = atc.send("AT+CGPADDR=1", "OK", 3000)
ip2 = _extract_ip(r4)
if ip2:
return True
return False
except Exception:
return False
2026-04-07 17:29:24 +08:00
def get_4g_phone_number(self):
"""
读取 SIM 本机号码AT+CNUM
典型响应+CNUM: "","+861442093407954",145
部分运营商/卡未在卡内写入号码时可能为空
Returns:
str: 号码含国家码 +86138...失败或未写入时返回 None
"""
try:
atc = hardware_manager.at_client
if atc is None:
return None
with self.get_uart_lock():
resp = atc.send("AT+CNUM", "OK", 3000)
if not resp:
return None
# 可能多行 +CNUM取第一个非空号码
for m in re.finditer(r'\+CNUM:\s*"[^"]*"\s*,\s*"([^"]*)"', resp):
num = (m.group(1) or "").strip()
if num:
return num
return None
except Exception:
return None
def get_4g_mccid(self):
"""
读取 MCCIDAT+MCCID模组侧命令常用于 SIM/卡标识类信息
典型响应行+MCCID: <> +MCCID: \"...\"
Returns:
str: 解析到的字符串失败时返回 None
"""
try:
atc = hardware_manager.at_client
if atc is None:
return None
with self.get_uart_lock():
resp = atc.send("AT+MCCID", "OK", 3000)
if not resp or "ERROR" in resp.upper():
return None
m = re.search(r"\+MCCID:\s*(.+)", resp, re.IGNORECASE)
if not m:
return None
val = (m.group(1) or "").strip()
# 去掉行尾 OK 之前可能粘在一起的杂质:只取第一行有效内容
val = val.split("\r")[0].split("\n")[0].strip()
val = val.strip('"').strip()
return val if val else None
except Exception:
return None
2026-04-14 09:02:41 +08:00
def _maybe_add_iccid_to_login(self, login_data):
"""
若应用目录下尚无 iccid 标记文件则在登录包中增加 iccid 字段值为当前卡号
标记文件仅在本次登录携带了 iccid 且服务器返回登录成功后创建 _create_iccid_marker_file
Returns:
bool: 本次登录是否携带了 iccid即成功后需要创建标记文件
"""
marker_path = os.path.join(config.APP_DIR, "iccid")
if os.path.exists(marker_path):
return False
iccid_val = self.get_4g_mccid()
login_data["iccid"] = iccid_val if iccid_val is not None else ""
return True
def _create_iccid_marker_file(self):
"""登录成功且曾携带 iccid 后创建空标记文件,后续登录不再带 iccid。"""
marker_path = os.path.join(config.APP_DIR, "iccid")
if os.path.exists(marker_path):
return
try:
with open(marker_path, "w"):
pass
except Exception as e:
self.logger.warning(f"[NET] 创建 iccid 标记文件失败: {e}")
def _apply_session_force_4g(self):
"""锁定本次会话为 4G直到关机期间不再回切 WiFi"""
self._session_force_4g = True
self._network_type = "4g"
2026-01-20 11:25:17 +08:00
def select_network(self, prefer_wifi=None):
"""
自动选择网络WiFi优先
Args:
prefer_wifi: 是否优先使用WiFiNone表示使用默认策略
Returns:
"wifi" "4g" None无可用网络
"""
if prefer_wifi is None:
prefer_wifi = wifi_manager.prefer_wifi
# 本次会话锁定:只要 4G 可用就一直用 4G
if self._session_force_4g:
self.logger.info("[NET] 会话锁定 4G继续使用 4G跳过 WiFi 质量评估)")
self._network_type = "4g"
return "4g"
2026-01-20 11:25:17 +08:00
host = self._server_ip
port = self._server_port
# 1) 开机先尝试 WiFi并评估质量
2026-01-20 11:25:17 +08:00
if prefer_wifi and self.is_wifi_connected():
wifi_rssi_dbm = self._get_wifi_rssi_dbm()
2026-04-03 11:24:29 +08:00
wifi_rtt_ms = 0
wifi_reachable = True
# wifi_rtt_ms, wifi_reachable = self._measure_wifi_tcp_rtt_ms(
# host, port,
# samples=getattr(config, "WIFI_QUALITY_RTT_SAMPLES", 3),
# per_sample_timeout_ms=900,
# )
wifi_bad = self._is_wifi_quality_bad(wifi_rtt_ms, wifi_rssi_dbm)
2026-04-03 11:24:29 +08:00
self.logger.info(
f"[NET] WiFi质量评估rtt_ms(median)={wifi_rtt_ms:.1f}, rssi_dbm={wifi_rssi_dbm}, "
f"reachable={wifi_reachable}, bad={wifi_bad}"
)
# 如果质量差且 4G 可用 -> 切换 4G 并锁定本次会话
if wifi_bad and self.is_4g_available():
self._apply_session_force_4g()
self.logger.warning("[NET] WiFi质量差且 4G 可用 -> 切换到 4G并锁定本次会话")
return "4g"
# 否则:仍以 WiFi 为主,但如果服务器不可达则回退到 4G
if wifi_reachable or self.is_server_reachable(host, port, timeout=3):
2026-01-20 11:25:17 +08:00
self._network_type = "wifi"
self.logger.info(f"[NET] 选择WiFi网络IP: {wifi_manager.wifi_ip}")
try:
os.environ["TZ"] = "Asia/Shanghai"
os.system("ntpdate pool.ntp.org")
except Exception:
pass
2026-01-20 11:25:17 +08:00
return "wifi"
# WiFi可用但服务器不可达
self.logger.warning("[NET] WiFi可用但服务器不可达尝试4G")
# 2) 如果 WiFi 没法用/不想用,回退到 4G
if self.is_4g_available():
self._network_type = "4g"
self.logger.info("[NET] 选择4G网络")
return "4g"
# 3) 两者都不可用
self.logger.error("[NET] WiFi 与 4G 均不可用")
return None
def _start_wifi_quality_monitor(self):
"""
启动 WiFi 质量后台监测线程委托给 wifi_manager
"""
wifi_manager.start_quality_monitor(
network_type_callback=lambda: self._network_type,
on_poor_quality_callback=self._switch_to_4g_due_to_poor_wifi
)
def _stop_wifi_quality_monitor(self):
"""停止 WiFi 质量监测线程(委托给 wifi_manager"""
wifi_manager.stop_quality_monitor()
def get_wifi_quality_status(self):
"""获取当前 WiFi 质量状态(委托给 wifi_manager"""
return wifi_manager.get_wifi_quality_status()
2026-01-20 11:25:17 +08:00
def _switch_to_4g_due_to_poor_wifi(self):
"""
由于 WiFi 质量差切换到 4G 网络
"""
self.logger.info("[WiFi->4G] 开始切换到 4G 网络")
# 1. 标记本次上电强制使用 4G
self._session_force_4g = True
# 2. 关闭 WiFi socket
wifi_manager.disconnect_wifi()
# 3. 重置连接状态
self._tcp_connected = False
self._network_type = None # 清空,让 select_network 重新选择
# 4. 检查 4G 是否可用
if self.is_4g_available():
self._network_type = "4g"
self.logger.info("[WiFi->4G] 切换成功,将使用 4G 网络")
return True
else:
self.logger.error("[WiFi->4G] 4G 不可用,无法切换")
# 回退:继续使用 WiFi虽然质量差
self._session_force_4g = False
return False
2026-01-20 11:25:17 +08:00
def safe_enqueue(self, data_dict, msg_type=2, high=False):
"""线程安全地将消息加入队列(公共方法)"""
self._enqueue((msg_type, data_dict), high)
def make_packet(self, msg_type: int, body_dict: dict) -> bytes:
"""打包 TCP 数据包:头部(长度+类型+校验)+ JSON 正文"""
body = json.dumps(body_dict).encode()
body_len = len(body)
checksum = body_len + msg_type
header = struct.pack(">III", body_len, msg_type, checksum)
return header + body
def parse_packet(self, data: bytes):
"""解析 TCP 数据包,返回 (类型, 正文字典)"""
if len(data) < 12:
return None, None
body_len, msg_type, checksum = struct.unpack(">III", data[:12])
2026-01-22 17:55:11 +08:00
expected_len = 12 + body_len
# 防御性检查:如果 data 比预期长,说明可能有粘包
if len(data) > expected_len:
self.logger.warning(
f"[TCP] parse_packet: data length ({len(data)}) > expected ({expected_len}), "
f"possible packet concatenation. body_len={body_len}, msg_type={msg_type}"
)
# 只解析第一个包,忽略多余数据(或者可以返回剩余部分)
# data = data[:expected_len]
# TODO 是否需要解析剩余部分?
# 如果 data 比预期短,说明包不完整(半包)
if len(data) < expected_len:
self.logger.warning(
f"[TCP] parse_packet: data length ({len(data)}) < expected ({expected_len}), "
f"incomplete packet. body_len={body_len}, msg_type={msg_type}"
)
return None, None
2026-01-20 11:25:17 +08:00
body = data[12:12 + body_len]
try:
return msg_type, json.loads(body.decode())
except:
return msg_type, {"raw": body.decode(errors="ignore")}
def connect_server(self):
"""
连接到服务器自动选择WiFi或4G
Returns:
bool: 是否连接成功
"""
if self._tcp_connected:
# 检查当前连接是否仍然有效
if self._network_type == "wifi":
return self._check_wifi_connection()
elif self._network_type == "4g":
return True # 4G连接状态由AT命令维护
return False
# 自动选择网络
network_type = self.select_network()
if not network_type:
return False
2026-01-20 18:40:54 +08:00
self.logger.info(f"连接到服务器,使用{network_type.upper()}...")
2026-01-20 11:25:17 +08:00
# 根据网络类型建立TCP连接
if network_type == "wifi":
return self._connect_tcp_via_wifi()
elif network_type == "4g":
return self._connect_tcp_via_4g()
return False
2026-04-14 09:02:41 +08:00
def _wrap_wifi_tls(self, plain_sock, hostname):
"""
在已建立的 TCP socket 上做 TLSWiFi 走主机 ssl 4G 仍用模组 AT+SSL
校验与 config.SSL_VERIFY_MODESSL_CERT_PATH 一致
"""
import ssl
verify_mode = getattr(config, "SSL_VERIFY_MODE", 0)
cert_path = getattr(config, "SSL_CERT_PATH", None) or ""
ca_ok = verify_mode == 1 and cert_path and os.path.exists(cert_path)
cert_none = getattr(ssl, "CERT_NONE", 0)
cert_required = getattr(ssl, "CERT_REQUIRED", 2)
if hasattr(ssl, "SSLContext"):
try:
proto = getattr(ssl, "PROTOCOL_TLS_CLIENT", None)
if proto is None:
proto = getattr(ssl, "PROTOCOL_TLS", 0)
ctx = ssl.SSLContext(proto)
if ca_ok:
ctx.load_verify_locations(cafile=cert_path)
ctx.verify_mode = cert_required
ctx.check_hostname = True
else:
ctx.check_hostname = False
ctx.verify_mode = cert_none
return ctx.wrap_socket(plain_sock, server_hostname=hostname)
except Exception as e:
self.logger.warning(f"[WIFI-TCP] SSLContext.wrap_socket 失败,改用 wrap_socket: {e}")
if ca_ok:
try:
return ssl.wrap_socket(
plain_sock,
server_hostname=hostname,
cert_reqs=cert_required,
ca_certs=cert_path,
)
except TypeError:
return ssl.wrap_socket(plain_sock, cert_reqs=cert_required, ca_certs=cert_path)
try:
return ssl.wrap_socket(plain_sock, server_hostname=hostname, cert_reqs=cert_none)
except TypeError:
return ssl.wrap_socket(plain_sock, cert_reqs=cert_none)
2026-01-20 11:25:17 +08:00
def _connect_tcp_via_wifi(self):
2026-04-14 09:02:41 +08:00
"""通过WiFi建立TCP连接USE_TCP_SSL 时在 TCP 之上走 tls"""
2026-01-20 11:25:17 +08:00
try:
# 创建TCP socket
wifi_manager.wifi_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
wifi_manager.wifi_socket.settimeout(5.0) # 5秒超时
2026-01-20 11:25:17 +08:00
# 连接到服务器
2026-04-14 09:02:41 +08:00
use_ssl = getattr(config, "USE_TCP_SSL", False)
host = self._server_ip
port = getattr(config, "TCP_SSL_PORT", 443) if use_ssl else config.SERVER_PORT
addr_info = socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM)[0]
wifi_manager.wifi_socket.connect(addr_info[-1])
2026-04-14 09:02:41 +08:00
if use_ssl:
wifi_manager.wifi_socket = self._wrap_wifi_tls(wifi_manager.wifi_socket, host)
2026-01-20 11:25:17 +08:00
# 设置非阻塞模式(用于接收数据)
wifi_manager.wifi_socket.setblocking(False)
2026-02-10 17:52:55 +08:00
# 加快消息发送
wifi_manager.wifi_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
2026-01-20 11:25:17 +08:00
self._tcp_connected = True
2026-04-14 09:02:41 +08:00
if use_ssl:
self.logger.info("[WIFI-TCP] TLS 连接已建立")
else:
self.logger.info("[WIFI-TCP] TCP 连接已建立")
2026-04-03 11:24:29 +08:00
# 启动 WiFi 质量后台检测
self._start_wifi_quality_monitor()
2026-01-20 11:25:17 +08:00
return True
except Exception as e:
2026-01-20 18:40:54 +08:00
self.logger.error(f"[WIFI-TCP] 连接失败: {e}")
2026-01-20 11:25:17 +08:00
if wifi_manager.wifi_socket:
2026-01-20 11:25:17 +08:00
try:
wifi_manager.wifi_socket.close()
2026-01-20 11:25:17 +08:00
except:
pass
wifi_manager.wifi_socket = None
2026-01-20 11:25:17 +08:00
return False
def _connect_tcp_via_4g(self):
2026-01-20 18:40:54 +08:00
"""通过4G模块建立TCP连接支持按手册绑定 SSL"""
link_id = getattr(config, "TCP_LINK_ID", 0)
use_ssl = getattr(config, "USE_TCP_SSL", False)
2026-01-22 17:55:11 +08:00
host = self._server_ip
2026-01-20 18:40:54 +08:00
port = getattr(config, "TCP_SSL_PORT", 443) if use_ssl else config.SERVER_PORT
tail = getattr(config, "MIPOPEN_TAIL", "")
2026-01-20 11:25:17 +08:00
with self.get_uart_lock():
2026-01-20 18:40:54 +08:00
resp = hardware_manager.at_client.send(f"AT+MIPCLOSE={link_id}", "OK", 1000)
self.logger.info(f"[4G-TCP] AT+MIPCLOSE={link_id} response: {resp}")
if use_ssl:
ok = self._configure_ssl_before_connect(link_id)
if not ok:
return False
# 按手册AT+MIPOPEN=1,"TCP","host",443,,0
# cmd = f'AT+MIPOPEN={link_id},"TCP","{host}",{port}{tail}'
cmd = f'AT+MIPOPEN={link_id},"TCP","{host}",{port}'
res = hardware_manager.at_client.send(cmd, "+MIPOPEN", 8000)
self.logger.info(f"[4G-TCP] {cmd} response: {res}")
if f"+MIPOPEN: {link_id},0" in res:
2026-01-20 11:25:17 +08:00
self._tcp_connected = True
return True
return False
def _check_wifi_connection(self):
"""检查WiFi TCP连接是否仍然有效"""
if not wifi_manager.wifi_socket:
2026-01-20 11:25:17 +08:00
return False
2026-04-14 09:02:41 +08:00
# TLS(ssl.wrap_socket/SSLContext.wrap_socket) 后的 socket 往往不支持 MSG_PEEK/MSG_DONTWAIT。
# 这种情况下“主动探测”反而容易误报断线;让真正的 send/recv 去判定更稳。
try:
if getattr(config, "USE_TCP_SSL", False) or hasattr(wifi_manager.wifi_socket, "cipher"):
return True
except Exception:
# 任何探测异常都不应导致断线清理
return True
2026-01-20 11:25:17 +08:00
try:
2026-02-07 17:09:39 +08:00
# send(b"") 在很多实现里是 no-op无法可靠探测断线。
# 用非阻塞 peek 来判断若对端已关闭recv 会返回 b""。
data = wifi_manager.wifi_socket.recv(1, socket.MSG_PEEK | socket.MSG_DONTWAIT)
2026-02-07 17:09:39 +08:00
if data == b"":
raise OSError("wifi socket closed")
2026-01-20 11:25:17 +08:00
return True
2026-04-14 09:02:41 +08:00
except TypeError as e:
# 某些实现(尤其是 TLS socket不支持 flags 参数;不要误判断线
try:
self.logger.warning(f"[WIFI-TCP] conncheck flags unsupported (TypeError): {e}")
except Exception:
pass
return True
2026-02-07 17:09:39 +08:00
except BlockingIOError:
# 无数据可读但连接仍在EAGAIN
return True
except OSError as e:
# 兼容不同平台的 EAGAIN / would block
err = getattr(e, "errno", None)
if err in (11, 35, 10035): # EAGAIN/EWOULDBLOCK on linux/mac/win
return True
# 某些平台会把"无数据可读/超时"抛成 socket.timeout / TimeoutErrorerrno 可能为 None
2026-02-09 11:24:46 +08:00
# 这不代表断线:视为 benign交给真正的 send/recv 去判定断线。
if (err is None) and (("timed out" in str(e).lower()) or isinstance(e, (TimeoutError, socket.timeout))):
try:
self.logger.warning(f"[WIFI-TCP] conncheck timeout treated as benign: {e}")
except:
pass
return True
2026-02-09 11:24:46 +08:00
# 某些嵌入式 socket 实现可能不支持 MSG_PEEK/MSG_DONTWAIT或返回 EINVAL/ENOTSUP。
# 这种情况不代表断线:选择"无法检测但不判死",交给真正的 send/recv 去触发断线处理。
2026-02-09 11:24:46 +08:00
# 常见EINVAL(22), ENOTSUP(95), EOPNOTSUPP(95), EINTR(4)
if err in (4, 22, 95):
try:
self.logger.warning(f"[WIFI-TCP] conncheck unsupported/benign errno={err}: {e}")
except:
pass
return True
2026-02-09 11:24:46 +08:00
# 记录真实错误,便于定位
try:
self.logger.error(f"[WIFI-TCP] conncheck failed errno={err}: {e}")
except:
pass
# 明确的"连接不可用"错误才判定断线并清理
2026-02-09 11:24:46 +08:00
# 常见ENOTCONN(107), ECONNRESET(104), EPIPE(32), EBADF(9)
if err in (9, 32, 104, 107, 10054, 10057):
try:
wifi_manager.wifi_socket.close()
2026-02-09 11:24:46 +08:00
except:
pass
wifi_manager.wifi_socket = None
2026-02-09 11:24:46 +08:00
self._tcp_connected = False
return False
2026-02-07 17:09:39 +08:00
# socket已断开或不可用清理
try:
wifi_manager.wifi_socket.close()
2026-02-07 17:09:39 +08:00
except:
pass
wifi_manager.wifi_socket = None
2026-02-07 17:09:39 +08:00
self._tcp_connected = False
return False
except Exception:
2026-01-20 11:25:17 +08:00
try:
wifi_manager.wifi_socket.close()
2026-01-20 11:25:17 +08:00
except:
pass
wifi_manager.wifi_socket = None
2026-01-20 11:25:17 +08:00
self._tcp_connected = False
return False
def disconnect_server(self):
"""断开TCP连接"""
if self._tcp_connected:
2026-01-20 18:40:54 +08:00
self.logger.info("与服务器断开链接")
2026-01-20 11:25:17 +08:00
if self._network_type == "wifi":
self._disconnect_tcp_via_wifi()
elif self._network_type == "4g":
self._disconnect_tcp_via_4g()
self._tcp_connected = False
self._network_type = None
def _disconnect_tcp_via_wifi(self):
"""断开 WiFi TCP 连接并停止监测"""
2026-04-03 11:24:29 +08:00
# 关闭wifi检测
self._stop_wifi_quality_monitor()
# 再关闭 socket
with wifi_manager.wifi_socket_lock:
if wifi_manager.wifi_socket:
2026-01-20 11:25:17 +08:00
try:
wifi_manager.wifi_socket.close()
2026-01-20 11:25:17 +08:00
except:
pass
wifi_manager.wifi_socket = None
2026-01-20 11:25:17 +08:00
def _disconnect_tcp_via_4g(self):
2026-01-20 18:40:54 +08:00
link_id = getattr(config, "TCP_LINK_ID", 0)
2026-01-20 11:25:17 +08:00
with self.get_uart_lock():
2026-01-20 18:40:54 +08:00
hardware_manager.at_client.send(f"AT+MIPCLOSE={link_id}", "OK", 1000)
2026-01-20 11:25:17 +08:00
def tcp_send_raw(self, data: bytes, max_retries=2) -> bool:
"""
统一的TCP发送接口自动选择WiFi或4G
Args:
data: 要发送的数据
max_retries: 最大重试次数
Returns:
bool: 是否发送成功
"""
if not self._tcp_connected:
return False
# 根据网络类型选择发送方式
if self._network_type == "wifi":
2026-02-07 17:09:39 +08:00
# 先快速校验 WiFi socket 是否仍有效,避免卡在半开连接上
if not self._check_wifi_connection():
2026-02-09 11:24:46 +08:00
print("_check_wifi_connection failed")
2026-02-07 17:09:39 +08:00
return False
2026-01-20 11:25:17 +08:00
return self._tcp_send_raw_via_wifi(data, max_retries)
elif self._network_type == "4g":
return self._tcp_send_raw_via_4g(data, max_retries)
else:
2026-01-20 18:40:54 +08:00
self.logger.error("[NET] 未选择网络类型,无法发送数据")
2026-01-20 11:25:17 +08:00
return False
def _tcp_send_raw_via_wifi(self, data: bytes, max_retries=2) -> bool:
"""通过WiFi socket发送TCP数据"""
if not wifi_manager.wifi_socket:
2026-01-20 11:25:17 +08:00
return False
with wifi_manager.wifi_socket_lock:
2026-01-20 11:25:17 +08:00
for attempt in range(max_retries):
try:
# 标准 socket 发送
2026-01-20 11:25:17 +08:00
total_sent = 0
while total_sent < len(data):
2026-04-14 09:02:41 +08:00
try:
sent = wifi_manager.wifi_socket.send(data[total_sent:])
except (BlockingIOError, OSError) as se:
if _wifi_tls_would_block(se):
time.sleep_ms(2)
continue
raise
2026-01-20 11:25:17 +08:00
if sent == 0:
# socket连接已断开
2026-01-20 18:40:54 +08:00
self.logger.warning(f"[WIFI-TCP] 发送失败socket已断开尝试 {attempt+1}/{max_retries}")
2026-02-07 17:09:39 +08:00
raise OSError("wifi socket closed (send returned 0)")
2026-01-20 11:25:17 +08:00
total_sent += sent
if total_sent == len(data):
return True
# 发送不完整,重试
time.sleep_ms(50)
except OSError as e:
2026-01-20 18:40:54 +08:00
self.logger.error(f"[WIFI-TCP] 发送异常: {e}(尝试 {attempt+1}/{max_retries}")
2026-02-07 17:09:39 +08:00
# 发送异常通常意味着连接已不可用,主动关闭以触发重连
try:
wifi_manager.wifi_socket.close()
2026-02-07 17:09:39 +08:00
except:
pass
wifi_manager.wifi_socket = None
2026-02-07 17:09:39 +08:00
self._tcp_connected = False
return False
2026-01-20 11:25:17 +08:00
except Exception as e:
2026-01-20 18:40:54 +08:00
self.logger.error(f"[WIFI-TCP] 未知错误: {e}(尝试 {attempt+1}/{max_retries}")
2026-02-07 17:09:39 +08:00
try:
wifi_manager.wifi_socket.close()
2026-02-07 17:09:39 +08:00
except:
pass
wifi_manager.wifi_socket = None
2026-02-07 17:09:39 +08:00
self._tcp_connected = False
return False
2026-01-20 11:25:17 +08:00
return False
def _tcp_send_raw_via_4g(self, data: bytes, max_retries=2) -> bool:
2026-01-20 18:40:54 +08:00
link_id = getattr(config, "TCP_LINK_ID", 0)
2026-01-20 11:25:17 +08:00
with self.get_uart_lock():
for _ in range(max_retries):
2026-01-20 18:40:54 +08:00
cmd = f'AT+MIPSEND={link_id},{len(data)}'
2026-01-20 11:25:17 +08:00
if ">" not in hardware_manager.at_client.send(cmd, ">", 2000):
time.sleep_ms(50)
continue
total = 0
while total < len(data):
n = hardware_manager.uart4g.write(data[total:])
if not n or n < 0:
time.sleep_ms(1)
continue
total += n
hardware_manager.uart4g.write(b"\x1A")
r = hardware_manager.at_client.send("", "OK", 8000)
if ("SEND OK" in r) or ("OK" in r) or ("+MIPSEND" in r):
return True
time.sleep_ms(50)
2026-01-20 18:40:54 +08:00
return False
2026-01-20 11:25:17 +08:00
2026-01-20 18:40:54 +08:00
def _configure_ssl_before_connect(self, link_id: int) -> bool:
"""按手册MSSLCFG(auth) -> (可选) MSSLCERTWR -> MSSLCFG(cert) -> MIPCFG(ssl)"""
ssl_id = getattr(config, "SSL_ID", 1)
auth_mode = getattr(config, "SSL_AUTH_MODE", 1)
verify_mode = getattr(config, "SSL_VERIFY_MODE", 0)
# 1) 配置认证方式
r = hardware_manager.at_client.send(f'AT+MSSLCFG="auth",{ssl_id},{auth_mode}', "OK", 3000)
# r = hardware_manager.at_client.send(f'AT+MSSLCFG="auth",{ssl_id},0', "OK", 3000)
2026-01-20 18:40:54 +08:00
self.logger.info(f"[4G-TCP] AT+MSSLCFG=\"auth\",{ssl_id},0 response: {r}")
if "OK" not in r:
2026-01-20 11:25:17 +08:00
return False
2026-01-20 18:40:54 +08:00
# 2) 写入根证书(只有 verify_mode=1 才需要)
if verify_mode == 1:
2026-01-20 18:40:54 +08:00
cert_filename = getattr(config, "SSL_CERT_FILENAME", None)
cert_path = getattr(config, "SSL_CERT_PATH", None)
if not cert_filename or not cert_path:
return False
# 读取证书文件(设备侧路径)
with open(cert_path, "rb") as f:
cert_data = f.read()
# 按手册AT+MSSLCERTWR="file",0,size -> 等待 ">" -> 写入证书内容 -> 等 OK
r = hardware_manager.at_client.send(f'AT+MSSLCERTWR="{cert_filename}",0,{len(cert_data)}', ">", 5000)
self.logger.info(f"[4G-TCP] AT+MSSLCERTWR=\"{cert_filename}\",0,{len(cert_data)} response: {r}")
2026-01-20 18:40:54 +08:00
if ">" not in r:
return False
# 直接写原始字节,不要额外拼 \r\n
hardware_manager.uart4g.write(cert_data)
r = hardware_manager.at_client.send("", "OK", 8000)
self.logger.info(f"[4G-TCP] AT+MSSLCERTWR=\"{cert_filename}\",0,{len(cert_data)} response: {r}")
2026-01-20 18:40:54 +08:00
if "OK" not in r:
return False
r = hardware_manager.at_client.send(f'AT+MSSLCHECK="{cert_filename}"', "OK", 8000)
if "OK" not in r:
self.logger.error(f"[4G-TCP] MSSLCHECK failed, response: {r}")
return False
else:
self.logger.info(f"[4G-TCP] MSSLCHECK response: {r}")
r = hardware_manager.at_client.send(f'AT+MSSLLIST=1', "OK", 3000)
self.logger.info(f"[4G-TCP] AT+MSSLLIST=1 response: {r}")
r = hardware_manager.at_client.send(f'AT+MSSLCERTRD="{cert_filename}"', "OK", 3000)
self.logger.info(f"[4G-TCP] AT+MSSLCERTRD=\"{cert_filename}\" response: {r}")
2026-01-20 18:40:54 +08:00
# 3) 引用根证书
r = hardware_manager.at_client.send(f'AT+MSSLCFG="cert",{ssl_id},"{cert_filename}"', "OK", 3000)
if "OK" not in r:
return False
else:
self.logger.info(f"[4G-TCP] MSSLCFG(cert) response: {r}")
2026-01-20 18:40:54 +08:00
# 4) 绑定 TCP 通道到 ssl_id并启用
r = hardware_manager.at_client.send(f'AT+MIPCFG="ssl",{link_id},{ssl_id},1', "OK", 3000)
self.logger.info(f"[4G-TCP] AT+MIPCFG=\"ssl\",{link_id},{ssl_id},1 response: {r}")
if "OK" not in r:
self.logger.error(f"[4G-TCP] MIPCFG(ssl) failed, response: {r}")
2026-01-20 18:40:54 +08:00
return False
return True
2026-01-20 11:25:17 +08:00
def receive_tcp_data_via_wifi(self, timeout_ms=100):
"""
通过WiFi接收TCP数据
2026-01-20 11:25:17 +08:00
Args:
timeout_ms: 超时时间毫秒
2026-01-20 11:25:17 +08:00
Returns:
bytes: 接收到的数据如果没有数据则返回 b""
"""
if not wifi_manager.wifi_socket:
2026-01-20 11:25:17 +08:00
return b""
2026-01-20 11:25:17 +08:00
try:
2026-02-09 11:24:46 +08:00
# 这里保持 socket 为非阻塞模式(连接时已 setblocking(False))。
# 不要反复 settimeout(),否则会把 socket 切回"阻塞+超时",并导致 conncheck 误报 timed out。
data = wifi_manager.wifi_socket.recv(4096) # 每次最多接收4KB无数据会抛 BlockingIOError
2026-01-20 11:25:17 +08:00
return data
2026-02-09 11:24:46 +08:00
except BlockingIOError:
# 无数据可读是正常的
2026-01-20 11:25:17 +08:00
return b""
except OSError as e:
2026-04-14 09:02:41 +08:00
if _wifi_tls_would_block(e):
return b""
2026-01-20 11:25:17 +08:00
# socket错误连接断开等
2026-01-20 18:40:54 +08:00
self.logger.warning(f"[WIFI-TCP] 接收数据失败: {e}")
2026-04-14 09:02:41 +08:00
2026-01-20 11:25:17 +08:00
# 关闭socket
try:
wifi_manager.wifi_socket.close()
2026-01-20 11:25:17 +08:00
except:
pass
wifi_manager.wifi_socket = None
2026-01-20 11:25:17 +08:00
self._tcp_connected = False
2026-04-14 09:02:41 +08:00
2026-01-20 11:25:17 +08:00
return b""
except Exception as e:
2026-01-20 18:40:54 +08:00
self.logger.error(f"[WIFI-TCP] 接收数据异常: {e}")
2026-01-20 11:25:17 +08:00
return b""
2026-02-09 11:24:46 +08:00
def _upload_log_file(self, upload_url, wifi_ssid=None, wifi_password=None, include_rotated=True, max_files=None, archive_format="tgz"):
2026-02-05 12:45:52 +08:00
"""上传日志文件到指定URL
Args:
upload_url: 上传目标URL例如 "https://example.com/upload/"
wifi_ssid: WiFi SSID可选如果未连接WiFi则尝试连接
wifi_password: WiFi 密码可选
2026-02-09 11:24:46 +08:00
include_rotated: 是否包含轮转日志app.log.1
max_files: 最多打包多少个日志文件包含 app.log 本身None= backupCount 自动推断
archive_format: 打包格式tgz zip
2026-02-05 12:45:52 +08:00
Note:
该功能仅在 WiFi 连接时可用4G 网络暂不支持文件上传
"""
import requests
import shutil
from datetime import datetime
2026-02-09 11:24:46 +08:00
import glob
2026-02-05 12:45:52 +08:00
try:
# 检查 WiFi 连接状态,如果未连接则尝试连接
if not self.is_wifi_connected():
if wifi_ssid and wifi_password:
self.logger.info(f"[LOG_UPLOAD] WiFi 未连接,尝试连接 WiFi: {wifi_ssid}")
self.safe_enqueue({"result": "log_upload_connecting_wifi", "ssid": wifi_ssid}, 2)
2026-02-09 11:24:46 +08:00
# 连接前先把“目标上传 URL”作为可达性验证目标只有验证通过才落盘保存 SSID/PASS
try:
from urllib.parse import urlparse
parsed = urlparse(upload_url)
v_host = parsed.hostname
v_port = parsed.port or (443 if parsed.scheme == "https" else 80)
except Exception:
v_host, v_port = None, None
ip, error = self.connect_wifi(
wifi_ssid,
wifi_password,
verify_host=v_host,
verify_port=v_port,
persist=True,
)
2026-02-05 12:45:52 +08:00
if error:
self.logger.error(f"[LOG_UPLOAD] WiFi 连接失败: {error}")
self.safe_enqueue({
"result": "log_upload_failed",
"reason": "wifi_connect_failed",
"detail": error
}, 2)
return
self.logger.info(f"[LOG_UPLOAD] WiFi 连接成功IP: {ip}")
else:
self.logger.warning("[LOG_UPLOAD] WiFi 未连接且未提供 WiFi 凭证,无法上传日志")
self.safe_enqueue({
"result": "log_upload_failed",
"reason": "wifi_not_connected",
"detail": "WiFi not connected and no credentials provided"
}, 2)
return
else:
self.logger.info("[LOG_UPLOAD] WiFi 已连接,跳过连接步骤")
self.logger.info(f"[LOG_UPLOAD] 开始上传日志文件...")
# 获取日志文件路径
log_file_path = config.LOG_FILE # /maixapp/apps/t11/app.log
if not os.path.exists(log_file_path):
self.logger.error(f"[LOG_UPLOAD] 日志文件不存在: {log_file_path}")
self.safe_enqueue({"result": "log_upload_failed", "reason": "log_file_not_found"}, 2)
return
2026-02-09 11:24:46 +08:00
# 生成带时间戳的文件名(归档)
2026-02-05 12:45:52 +08:00
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
device_id = self._device_id or "unknown"
2026-02-09 11:24:46 +08:00
base_name = f"logs_{timestamp}_{device_id}"
archive_format = (archive_format or "tgz").strip().lower()
if archive_format not in ("tgz", "zip"):
archive_format = "tgz"
# 选择要打包的日志文件
candidates = [log_file_path]
if include_rotated:
candidates = sorted(set(glob.glob(log_file_path + "*")))
candidates = [p for p in candidates if os.path.isfile(p)]
# app.log 优先其余按“数字后缀”排序app.log.1, app.log.2...
def _log_sort_key(p: str):
if p == log_file_path:
return (0, 0, p)
suffix = p[len(log_file_path):]
if suffix.startswith("."):
try:
return (1, int(suffix[1:]), p)
except:
return (2, 999999, p)
return (3, 999999, p)
candidates.sort(key=_log_sort_key)
# 限制最大文件数默认app.log + 轮转数量)
if max_files is None:
try:
max_files = 1 + int(getattr(config, "LOG_BACKUP_COUNT", 5))
except:
max_files = 6
try:
max_files = int(max_files)
except:
max_files = 6
max_files = max(1, min(max_files, 20))
selected = candidates[:max_files]
if not selected:
self.logger.error("[LOG_UPLOAD] 未找到可打包的日志文件")
self.safe_enqueue({"result": "log_upload_failed", "reason": "no_log_files"}, 2)
return
self.logger.info(f"[LOG_UPLOAD] 将打包日志文件数: {len(selected)}")
# 先 sync尽量确保日志落盘再复制快照到 /tmp避免打包过程中日志被追加导致内容不一致
2026-02-07 17:09:39 +08:00
os.system("sync")
2026-02-09 11:24:46 +08:00
temp_dir = "/tmp"
staging_dir = os.path.join(temp_dir, f"log_upload_{base_name}")
os.makedirs(staging_dir, exist_ok=True)
staged_paths = []
try:
for p in selected:
dst = os.path.join(staging_dir, os.path.basename(p))
shutil.copy2(p, dst)
staged_paths.append(dst)
except Exception as e:
self.logger.error(f"[LOG_UPLOAD] 复制日志快照失败: {e}")
self.safe_enqueue({"result": "log_upload_failed", "reason": "snapshot_failed", "detail": str(e)[:100]}, 2)
try:
shutil.rmtree(staging_dir)
except:
pass
return
# 打包压缩
if archive_format == "zip":
archive_filename = f"{base_name}.zip"
else:
archive_filename = f"{base_name}.tar.gz"
archive_path = os.path.join(temp_dir, archive_filename)
try:
if archive_format == "zip":
import zipfile
with zipfile.ZipFile(archive_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for p in staged_paths:
zf.write(p, arcname=os.path.basename(p))
else:
import tarfile
with tarfile.open(archive_path, "w:gz") as tf:
for p in staged_paths:
tf.add(p, arcname=os.path.basename(p))
self.logger.info(f"[LOG_UPLOAD] 日志压缩包已生成: {archive_path}")
except Exception as e:
self.logger.error(f"[LOG_UPLOAD] 打包压缩失败: {e}")
self.safe_enqueue({"result": "log_upload_failed", "reason": "archive_failed", "detail": str(e)[:100]}, 2)
try:
shutil.rmtree(staging_dir)
except:
pass
try:
if os.path.exists(archive_path):
os.remove(archive_path)
except:
pass
return
finally:
try:
shutil.rmtree(staging_dir)
except:
pass
# 使用 multipart/form-data 上传压缩包
with open(archive_path, 'rb') as f:
mime = "application/gzip" if archive_format == "tgz" else "application/zip"
files = {'file': (archive_filename, f, mime)}
2026-02-05 12:45:52 +08:00
# 添加额外的头部信息
headers = {
'User-Agent': 'Archery-Device/1.0',
'X-Device-ID': device_id,
}
# 如果是 ngrok-free.dev添加绕过警告页面的头
if 'ngrok-free.dev' in upload_url or 'ngrok.io' in upload_url:
headers['ngrok-skip-browser-warning'] = 'true'
self.logger.info(f"[LOG_UPLOAD] 正在上传到: {upload_url}")
# 禁用 SSL 警告(用于自签名证书或 SSL 兼容性问题)
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 发送请求verify=False 跳过 SSL 证书验证(解决 MaixCAM SSL 兼容性问题)
response = requests.post(upload_url, files=files, headers=headers, timeout=60, verify=False)
if response.status_code in (200, 201, 204):
self.logger.info(f"[LOG_UPLOAD] 上传成功! 状态码: {response.status_code}")
self.safe_enqueue({
"result": "log_upload_ok",
2026-02-09 11:24:46 +08:00
"filename": archive_filename,
2026-02-05 12:45:52 +08:00
"status_code": response.status_code
}, 2)
else:
self.logger.error(f"[LOG_UPLOAD] 上传失败! 状态码: {response.status_code}, 响应: {response.text[:200]}")
self.safe_enqueue({
"result": "log_upload_failed",
"reason": f"http_{response.status_code}",
"detail": response.text[:100]
}, 2)
# 清理临时文件
try:
2026-02-09 11:24:46 +08:00
os.remove(archive_path)
self.logger.debug(f"[LOG_UPLOAD] 临时文件已删除: {archive_path}")
2026-02-05 12:45:52 +08:00
except Exception as e:
self.logger.warning(f"[LOG_UPLOAD] 删除临时文件失败: {e}")
except requests.exceptions.Timeout:
self.logger.error("[LOG_UPLOAD] 上传超时")
self.safe_enqueue({"result": "log_upload_failed", "reason": "timeout"}, 2)
except requests.exceptions.ConnectionError as e:
self.logger.error(f"[LOG_UPLOAD] 连接失败: {e}")
self.safe_enqueue({"result": "log_upload_failed", "reason": "connection_error"}, 2)
except Exception as e:
self.logger.error(f"[LOG_UPLOAD] 上传异常: {e}")
self.safe_enqueue({"result": "log_upload_failed", "reason": str(e)[:100]}, 2)
2026-01-20 11:25:17 +08:00
def generate_token(self, device_id):
"""生成用于 HTTP 接口鉴权的 TokenHMAC-SHA256"""
SALT = "shootMessageFire"
SALT2 = "shoot"
return "Arrow_" + hmac.new((SALT + device_id).encode(), SALT2.encode(), hashlib.sha256).hexdigest()
def tcp_main(self):
"""TCP 主通信循环:登录、心跳、处理指令、发送数据"""
import _thread
2026-01-20 18:40:54 +08:00
self.logger.info("[NET] TCP主线程启动")
2026-01-20 11:25:17 +08:00
send_hartbeat_fail_count = 0
last_charging_check = 0
CHARGING_CHECK_INTERVAL = 5000 # 5秒检查一次充电状态
while True:
try:
# 检查充电状态每5秒检查一次
current_time = time.ticks_ms()
if current_time - last_charging_check > CHARGING_CHECK_INTERVAL:
last_charging_check = current_time
# OTA 期间不要 connect/登录/心跳/发送
try:
from ota_manager import ota_manager
if ota_manager.ota_in_progress:
time.sleep_ms(200)
continue
except Exception as e:
2026-01-20 18:40:54 +08:00
self.logger.error(f"[NET] OTA检查异常: {e}")
2026-01-20 11:25:17 +08:00
time.sleep_ms(200)
continue
if not self.connect_server():
time.sleep_ms(5000)
continue
# 发送登录包
vol_val = get_bus_voltage()
login_data = {
"deviceId": self.device_id,
"password": self.password,
"version": config.APP_VERSION,
"vol": vol_val,
"vol_per": voltage_to_percent(vol_val)
}
2026-04-14 09:02:41 +08:00
iccid_pending_marker = self._maybe_add_iccid_to_login(login_data)
print(f"login_data: {login_data}")
2026-01-22 17:55:11 +08:00
# if not self.tcp_send_raw(self.make_packet(1, login_data)):
if not self.tcp_send_raw(self._netcore.make_packet(1, login_data)):
2026-01-20 11:25:17 +08:00
self._tcp_connected = False
2026-02-07 17:09:39 +08:00
try:
self.disconnect_server()
except:
pass
2026-01-20 11:25:17 +08:00
time.sleep_ms(2000)
continue
2026-01-20 18:40:54 +08:00
self.logger.info("➡️ 登录包已发送,等待确认...")
2026-01-20 11:25:17 +08:00
logged_in = False
pending_cleared = False
last_heartbeat_ack_time = time.ticks_ms()
last_heartbeat_send_time = time.ticks_ms()
while True:
2026-02-07 17:09:39 +08:00
# 如果底层连接已断开,尽快跳出内层循环触发重连/重选网络
if not self._tcp_connected:
break
2026-01-20 11:25:17 +08:00
# OTA 期间暂停 TCP 活动
try:
from ota_manager import ota_manager
if ota_manager.ota_in_progress:
time.sleep_ms(200)
continue
except Exception as e:
2026-01-20 18:40:54 +08:00
self.logger.error(f"[NET] OTA检查异常: {e}")
2026-01-20 11:25:17 +08:00
time.sleep_ms(200)
continue
# 接收数据(根据网络类型选择接收方式)
2026-04-03 15:40:07 +08:00
# WiFi 粘包:一次 recv 可能含多条完整包;也可能缓冲里已有完整包但本轮 recv 超时为空
rx_items = []
2026-01-20 11:25:17 +08:00
if self._network_type == "wifi":
2026-04-03 15:40:07 +08:00
data = self.receive_tcp_data_via_wifi(timeout_ms=5)
2026-01-20 11:25:17 +08:00
if data:
2026-04-14 09:02:41 +08:00
# self.logger.info(f"[NET] 接收WiFi数据, {time.time()}")
wifi_manager.recv_buffer += data
2026-04-03 15:40:07 +08:00
while len(wifi_manager.recv_buffer) >= 12:
try:
body_len, msg_type, checksum = struct.unpack(">III", wifi_manager.recv_buffer[:12])
total_len = 12 + body_len
if len(wifi_manager.recv_buffer) >= total_len:
payload = wifi_manager.recv_buffer[:total_len]
wifi_manager.recv_buffer = wifi_manager.recv_buffer[total_len:]
rx_items.append((0, payload))
else:
self.logger.info(f"[NET] 接收WiFi数据不完整, {time.time()}")
2026-01-20 11:25:17 +08:00
break
2026-04-03 15:40:07 +08:00
except Exception:
wifi_manager.recv_buffer = b""
self.logger.info(f"[NET] 接收WiFi数据解析失败, {time.time()}")
break
2026-01-20 11:25:17 +08:00
elif self._network_type == "4g":
item = hardware_manager.at_client.pop_tcp_payload()
2026-04-03 15:40:07 +08:00
if item:
rx_items.append(item)
_rx_login_fail = False
_rx_skip_tcp_iteration = False
if rx_items:
self.logger.info(f"total {len(rx_items)} items")
for item in rx_items:
if isinstance(item, tuple) and len(item) == 2:
link_id, payload = item
2026-01-20 11:25:17 +08:00
else:
2026-04-03 15:40:07 +08:00
link_id, payload = 0, item
if not logged_in:
try:
self.logger.debug(f"[TCP] rx link={link_id} len={len(payload)} head={payload[:12].hex()}")
except:
pass
# msg_type, body = self.parse_packet(payload)
msg_type, body = self._netcore.parse_packet(payload)
# 处理登录响应
if not logged_in and msg_type == 1:
if body and body.get("cmd") == 1 and body.get("data") == "登录成功":
logged_in = True
last_heartbeat_ack_time = time.ticks_ms()
self.logger.info("登录成功")
2026-04-14 09:02:41 +08:00
if iccid_pending_marker:
self._create_iccid_marker_file()
iccid_pending_marker = False
2026-04-03 15:40:07 +08:00
# 检查 ota_pending.json
try:
pending_path = f"{config.APP_DIR}/ota_pending.json"
if os.path.exists(pending_path):
try:
with open(pending_path, "r", encoding="utf-8") as f:
pending_obj = json.load(f)
except:
pending_obj = {}
self.safe_enqueue({"result": "ota_ok", "url": pending_obj.get("url", "")}, 2)
self.logger.info("[OTA] 已上报 ota_ok等待心跳确认后删除 pending")
except Exception as e:
self.logger.error(f"[OTA] ota_ok 上报失败: {e}")
2026-01-20 11:25:17 +08:00
else:
2026-04-03 15:40:07 +08:00
_rx_login_fail = True
break
# 处理心跳 ACK
elif logged_in and msg_type == 4:
last_heartbeat_ack_time = time.ticks_ms()
self.logger.debug("✅ 收到心跳确认")
# 处理命令40分片下载
elif logged_in and msg_type == 40:
if isinstance(body, dict):
t = body.get('t', 0)
v = body.get('v')
# 如果是第一个分片,清空之前的缓存
if len(self._raw_line_data) == 0 or (len(self._raw_line_data) > 0 and self._raw_line_data[0].get('v') != v):
self._raw_line_data.clear()
# 或者更简单每次收到命令40时如果版本号不同清空缓存
if len(self._raw_line_data) > 0:
first_v = self._raw_line_data[0].get('v')
if first_v and first_v != v:
self._raw_line_data.clear()
self._raw_line_data.append(body)
if len(self._raw_line_data) >= int(t):
self.logger.info(f"下载完成")
from ota_manager import ota_manager
stock_array = list(map(lambda x: x.get('d'), self._raw_line_data))
local_filename = config.LOCAL_FILENAME
with open(local_filename, 'w', encoding='utf-8') as file:
file.write("\n".join(stock_array))
ota_manager.apply_ota_and_reboot(None, local_filename)
else:
self.safe_enqueue({'data':{'l': len(self._raw_line_data), 'v': v}, 'cmd': 41})
self.logger.info(f"已下载{len(self._raw_line_data)} 全部:{t} 版本:{v}")
# 处理业务指令
elif logged_in and isinstance(body, dict):
inner_cmd = None
data_obj = body.get("data")
if isinstance(data_obj, dict):
inner_cmd = data_obj.get("cmd")
if inner_cmd == 2: # 开启激光并校准
from laser_manager import laser_manager
if not laser_manager.calibration_active:
laser_manager.turn_on_laser()
time.sleep_ms(100)
hardware_manager.stop_idle_timer() # 停表
if not config.HARDCODE_LASER_POINT:
laser_manager.start_calibration()
self.safe_enqueue({"result": "calibrating"}, 2)
2026-04-03 11:24:29 +08:00
else:
2026-04-03 15:40:07 +08:00
# 写死的逻辑,不需要校准激光点
self.safe_enqueue({"result": "laser pos set by hard code"}, 2)
elif inner_cmd == 3: # 关闭激光
from laser_manager import laser_manager
laser_manager.turn_off_laser()
laser_manager.stop_calibration()
hardware_manager.start_idle_timer() # 开表
self.safe_enqueue({"result": "laser_off"}, 2)
elif inner_cmd == 4: # 上报电量
voltage = get_bus_voltage()
battery_percent = voltage_to_percent(voltage)
battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)}
self.safe_enqueue(battery_data, 2)
self.logger.info(f"电量上报: {battery_percent}%")
elif inner_cmd == 5: # OTA 升级
inner_data = data_obj.get("data", {}) if isinstance(data_obj, dict) else {}
ssid = inner_data.get("ssid")
password = inner_data.get("password")
ota_url = inner_data.get("url")
mode = (inner_data.get("mode") or "").strip().lower()
if not ota_url:
self.logger.error("ota missing_url")
self.safe_enqueue({"result": "missing_url"}, 2)
_rx_skip_tcp_iteration = True
break
from ota_manager import ota_manager
if ota_manager.update_thread_started:
self.safe_enqueue({"result": "update_already_started"}, 2)
_rx_skip_tcp_iteration = True
break
# 自动判断模式如果没有明确指定根据WiFi连接状态和凭证决定
if mode not in ("4g", "wifi"):
self.logger.info("ota missing mode, auto-detecting...")
# 若本次会话已锁定 4G则 OTA 自动也走 4G避免后续回切导致体验不一致
if self._session_force_4g:
2026-04-03 11:24:29 +08:00
mode = "4g"
2026-04-03 15:40:07 +08:00
self.logger.info("ota auto-selected: 4g (session locked on 4g)")
else:
# 只有同时满足WiFi已连接 且 提供了WiFi凭证才使用WiFi
if self.is_wifi_connected() and ssid and password:
mode = "wifi"
self.logger.info("ota auto-selected: wifi (WiFi connected and credentials provided)")
else:
mode = "4g"
self.logger.info("ota auto-selected: 4g (WiFi not available or no credentials)")
hardware_manager.stop_idle_timer() # 停表注意OTA停表之后就没有再开表因为OTA后面会重启会重新开表
if mode == "4g":
ota_manager._set_ota_url(ota_url) # 记录 OTA URL供命令7使用
2026-04-03 11:24:29 +08:00
ota_manager._start_update_thread()
2026-04-03 15:40:07 +08:00
_thread.start_new_thread(ota_manager.direct_ota_download_via_4g, (ota_url,))
else: # mode == "wifi"
if not ssid or not password:
self.logger.error("ota wifi mode requires ssid and password")
self.safe_enqueue({"result": "missing_ssid_or_password"}, 2)
else:
ota_manager._start_update_thread()
_thread.start_new_thread(ota_manager.handle_wifi_and_update, (ssid, password, ota_url))
elif inner_cmd == 6:
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
ip = ip if ip else "no_ip"
except:
ip = "error_getting_ip"
self.safe_enqueue({"result": "current_ip", "ip": ip}, 2)
2026-04-07 17:29:24 +08:00
elif inner_cmd == 44: # 读 4G 本机号码AT+CNUM
cnum = self.get_4g_phone_number()
self.logger.info(f"4G 本机号码: {cnum}")
self.safe_enqueue({"result": "cnum", "number": cnum if cnum is not None else ""}, 2)
elif inner_cmd == 45: # 读 MCCIDAT+MCCID
mccid = self.get_4g_mccid()
self.logger.info(f"4G MCCID: {mccid}")
self.safe_enqueue({"result": "mccid", "mccid": mccid if mccid is not None else ""}, 2)
2026-04-03 15:40:07 +08:00
# elif inner_cmd == 7:
# from ota_manager import ota_manager
# if ota_manager.update_thread_started:
# self.safe_enqueue({"result": "update_already_started"}, 2)
# continue
# try:
# ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
# except:
# ip = None
# if not ip:
# self.safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, 2)
# else:
# # 注意direct_ota_download 需要 ota_url 参数
# # 如果 ota_manager.ota_url 为 None需要从其他地方获取
# ota_url_to_use = ota_manager.ota_url
# if not ota_url_to_use:
# self.logger.error("[OTA] cmd=7 但 OTA_URL 未设置")
# self.safe_enqueue({"result": "ota_failed", "reason": "ota_url_not_set"}, 2)
# else:
# ota_manager._start_update_thread()
# _thread.start_new_thread(ota_manager.direct_ota_download, (ota_url_to_use,))
elif inner_cmd == 41:
self.logger.info(f"[TEST] 收到TCP射箭触发命令, {time.time()}")
self._manual_trigger_flag = True
self.safe_enqueue({"result": "trigger_ack"}, 2)
hardware_manager.start_idle_timer() # 重新计时
elif inner_cmd == 42: # 关机命令
self.logger.info("[SHUTDOWN] 收到TCP关机命令准备关机...")
self.safe_enqueue({"result": "shutdown_ack"}, 2)
time.sleep_ms(1000)
self.disconnect_server()
# 尝试关闭4G模块
try:
with self.get_uart_lock():
hardware_manager.at_client.send("AT+CFUN=0", "OK", 5000)
except:
pass
time.sleep_ms(2000)
os.system("sync") # 刷新文件系统缓存到磁盘,防止数据丢失
time.sleep_ms(500)
# os.system("poweroff")
hardware_manager.power_off()
return
elif inner_cmd == 43: # 上传日志命令
# 格式: {"cmd":43, "data":{"ssid":"xxx","password":"xxx","url":"xxx", ...}}
inner_data = data_obj.get("data", {})
upload_url = inner_data.get("url")
wifi_ssid = inner_data.get("ssid")
wifi_password = inner_data.get("password")
include_rotated = inner_data.get("include_rotated", True)
max_files = inner_data.get("max_files")
archive_format = inner_data.get("archive", "tgz") # tgz 或 zip
hardware_manager.start_idle_timer() # 重新计时
if not upload_url:
self.logger.error("[LOG_UPLOAD] 缺少 url 参数")
self.safe_enqueue({"result": "log_upload_failed", "reason": "missing_url"}, 2)
else:
self.logger.info(f"[LOG_UPLOAD] 收到日志上传命令目标URL: {upload_url}")
# 在新线程中执行上传,避免阻塞主循环
import _thread
_thread.start_new_thread(
self._upload_log_file,
(upload_url, wifi_ssid, wifi_password, include_rotated, max_files, archive_format)
)
else: # data的结构不是 dict
self.logger.info(f"[NET] body={body}, {time.time()}")
else:
self.logger.info(f"[NET] 未知数据 {body}, {time.time()}")
if _rx_login_fail:
break
if _rx_skip_tcp_iteration:
continue
2026-01-20 11:25:17 +08:00
else:
time.sleep_ms(5)
# 发送队列中的业务数据
2026-02-07 17:09:39 +08:00
if logged_in:
item = None
item_is_high = False
# 出队:发送失败时会把 item 放回队首,避免丢数据
with self.get_queue_lock():
if self._high_send_queue:
item = self._high_send_queue.pop(0)
item_is_high = True
elif self._normal_send_queue:
item = self._normal_send_queue.pop(0)
item_is_high = False
2026-01-20 11:25:17 +08:00
2026-02-07 17:09:39 +08:00
if item:
msg_type, data_dict = item
2026-01-22 17:55:11 +08:00
pkt = self._netcore.make_packet(msg_type, data_dict)
2026-01-20 11:25:17 +08:00
if not self.tcp_send_raw(pkt):
2026-02-07 17:09:39 +08:00
# 发送失败:将消息放回队首,触发重连(避免丢消息)
with self.get_queue_lock():
if item_is_high:
self._high_send_queue.insert(0, item)
else:
self._normal_send_queue.insert(0, item)
2026-01-20 11:25:17 +08:00
self._tcp_connected = False
2026-02-07 17:09:39 +08:00
try:
self.disconnect_server()
except:
pass
2026-01-20 11:25:17 +08:00
break
# 发送激光校准结果
if logged_in:
from laser_manager import laser_manager
result = laser_manager.get_calibration_result()
if result:
x, y = result
self.safe_enqueue({"result": "ok", "x": x, "y": y}, 2)
# 定期发送心跳
current_time = time.ticks_ms()
if logged_in and current_time - last_heartbeat_send_time > config.HEARTBEAT_INTERVAL * 1000:
vol_val = get_bus_voltage()
2026-01-22 17:55:11 +08:00
if not self.tcp_send_raw(self._netcore.make_packet(4, {"vol": vol_val, "vol_per": voltage_to_percent(vol_val)})):
# if not self.tcp_send_raw(self.make_packet(4, {"vol": vol_val, "vol_per": voltage_to_percent(vol_val)})):
2026-01-20 11:25:17 +08:00
send_hartbeat_fail_count += 1
2026-02-09 11:24:46 +08:00
# 短暂波动可能导致一次发送失败:连续失败达到阈值才重连,避免重连风暴
self.logger.error(f"心跳发送失败({send_hartbeat_fail_count}/3),准备重试")
if send_hartbeat_fail_count >= 3:
self.logger.error("心跳连续失败>=3准备重连")
self._tcp_connected = False
try:
self.disconnect_server()
except:
pass
break
else:
# 不立即断开,让下一轮心跳再试;同时缩短一点等待,提升恢复速度
time.sleep_ms(200)
continue
2026-01-20 11:25:17 +08:00
else:
send_hartbeat_fail_count = 0
last_heartbeat_send_time = current_time
2026-01-20 18:40:54 +08:00
self.logger.debug("心跳已发送")
2026-01-20 11:25:17 +08:00
# 删除 pending 文件(心跳发送成功后)
if not pending_cleared:
try:
pending_path = f"{config.APP_DIR}/ota_pending.json"
if os.path.exists(pending_path):
try:
os.remove(pending_path)
pending_cleared = True
2026-01-20 18:40:54 +08:00
self.logger.info("[OTA] 心跳发送成功,已删除 ota_pending.json")
2026-01-20 11:25:17 +08:00
except Exception as e:
2026-01-20 18:40:54 +08:00
self.logger.error(f"[OTA] 删除 pending 文件失败: {e}")
2026-01-20 11:25:17 +08:00
except Exception as e:
2026-01-20 18:40:54 +08:00
self.logger.error(f"[OTA] 检查 pending 文件时出错: {e}")
2026-01-20 11:25:17 +08:00
# 心跳超时重连
if logged_in and current_time - last_heartbeat_ack_time > 1000*60*10:
2026-01-20 18:40:54 +08:00
self.logger.error("十分钟无心跳ACK重连")
2026-01-20 11:25:17 +08:00
break
2026-02-10 17:52:55 +08:00
self._send_event.wait(timeout=0.05) # 0.05秒 = 50ms
self._send_event.clear()
2026-01-20 11:25:17 +08:00
self._tcp_connected = False
2026-01-20 18:40:54 +08:00
self.logger.error("连接异常2秒后重连...")
2026-01-20 11:25:17 +08:00
time.sleep_ms(2000)
except Exception as e:
# TCP主循环的顶层异常捕获防止线程静默退出
2026-01-20 18:40:54 +08:00
self.logger.error(f"[NET] TCP主循环异常: {e}")
import traceback
self.logger.error(traceback.format_exc())
2026-01-20 11:25:17 +08:00
self._tcp_connected = False
time.sleep_ms(5000) # 等待5秒后重试连接
# 创建全局单例实例
network_manager = NetworkManager()
# ==================== 向后兼容的函数接口 ====================
def tcp_main():
"""TCP主循环向后兼容接口"""
return network_manager.tcp_main()
def read_device_id():
"""读取设备ID向后兼容接口"""
return network_manager.read_device_id()
def safe_enqueue(data_dict, msg_type=2, high=False):
"""线程安全地加入队列(向后兼容接口)"""
return network_manager.safe_enqueue(data_dict, msg_type, high)
def connect_server():
"""连接服务器(向后兼容接口)"""
return network_manager.connect_server()
def disconnet_server():
"""断开服务器连接(向后兼容接口)"""
return network_manager.disconnect_server()
def is_wifi_connected():
"""检查WiFi是否已连接向后兼容接口"""
return network_manager.is_wifi_connected()
def connect_wifi(ssid, password):
"""连接WiFi向后兼容接口"""
return network_manager.connect_wifi(ssid, password)
def is_server_reachable(host, port=80, timeout=5):
"""检查服务器是否可达(向后兼容接口)"""
return network_manager.is_server_reachable(host, port, timeout)