From ec80107128da317f5d904bba39a9d5047168a7fe Mon Sep 17 00:00:00 2001 From: gcw_4spBpAfv Date: Thu, 2 Apr 2026 18:02:34 +0800 Subject: [PATCH] refind network and monitor wifi connection --- app.yaml | 3 +- design_doc/solution_record.md | 24 +- network.py | 407 ++++++++++++++++-------- wifi.py | 572 ++++++++++++++++++++++++++++++++++ 4 files changed, 877 insertions(+), 129 deletions(-) create mode 100644 wifi.py diff --git a/app.yaml b/app.yaml index 2b48e71..40b272a 100644 --- a/app.yaml +++ b/app.yaml @@ -21,4 +21,5 @@ files: - shot_id_generator.py - time_sync.py - version.py - - vision.py + - vision.cpython-311-riscv64-linux-gnu.so + - wifi.py diff --git a/design_doc/solution_record.md b/design_doc/solution_record.md index 0884276..f76bdb6 100644 --- a/design_doc/solution_record.md +++ b/design_doc/solution_record.md @@ -80,4 +80,26 @@ 自动关机的时机: 超过配置的idle时长, 禁止自动关机的情况:1.校准中,2.OTA中 重启计时的时机:1.校准完成,2.命令触发射箭,3.真实触发射箭,4.初始化完成 -9. \ No newline at end of file +9. Wifi网络监控: +有两次发现wifi网络下,有些消息发送很慢,但具体是什么缘故还不清楚,现在增加了wifi网络下的检测,并一旦发现wifi的网络质量差,就会切换到4G。 +WiFi 连接成功 + ↓ +启动后台监测线程 + ↓ +每 5 秒循环: + 测量 RTT (1 样本,600ms timeout) + 获取 RSSI + 更新缓存 + 判断是否差: + - RTT >= 600ms → 差 + - RTT >= 350ms 且 RSSI <= -80dBm → 差 + ↓ +如果质量差: + 快速重试2次,如果其中任意一次网络恢复了,继续使用wifi。否则, + 调用 _switch_to_4g_due_to_poor_wifi() + 关闭 WiFi socket + 重置连接状态 + 尝试切换到 4G + ↓ +上层检测到连接断开: + 重新 connect_server() → 自动选择 4G \ No newline at end of file diff --git a/network.py b/network.py index dd8a4a3..96d3612 100644 --- a/network.py +++ b/network.py @@ -5,6 +5,7 @@ 提供TCP通信、数据包打包/解析、队列管理等功能 """ import json +import re from math import e import struct from maix import time @@ -21,6 +22,7 @@ from power import get_bus_voltage, voltage_to_percent # from laser import laser_manager # from ota import ota_manager from logger_manager import logger_manager +from wifi import wifi_manager class NetworkManager: @@ -49,14 +51,11 @@ class NetworkManager: self._raw_line_data = [] self._manual_trigger_flag = False - # WiFi 相关状态 - self._network_type = None # "wifi" 或 "4g" 或 None - self._wifi_connected = False - self._wifi_ip = None - self._wifi_socket = None - self._wifi_socket_lock = threading.Lock() - self._prefer_wifi = True # 是否优先使用WiFi - self._wifi_recv_buffer = b"" # WiFi接收缓冲区 + # 网络类型状态 + self._network_type = None # "wifi" 或 "4G" 或 None + # 本次上电曾因 WiFi 质量差切换到 4G 后,直至关机不再改回 WiFi + self._session_force_4g = False + self._initialized = True # 导入 archery_netcore 模块,并检查是否存在 parse_packet 和 make_packet 函数 @@ -117,12 +116,12 @@ class NetworkManager: @property def wifi_connected(self): """WiFi是否已连接""" - return self._wifi_connected + return wifi_manager.wifi_connected @property def wifi_ip(self): """WiFi IP地址""" - return self._wifi_ip + return wifi_manager.wifi_ip # ==================== 内部状态管理方法 ==================== @@ -200,40 +199,16 @@ class NetworkManager: self._password = default_id + "." return default_id - # ==================== WiFi 管理方法 ==================== - + # ==================== WiFi 管理方法(委托给 wifi_manager)==================== + def is_wifi_connected(self): """检查WiFi是否已连接""" - # 优先用 MaixPy network(如果可用) - try: - from maix import network - wlan = network.WLAN(network.TYPE_WIFI) - if wlan.isconnected(): - self._wifi_connected = True - return True - except: - pass + return wifi_manager.is_wifi_connected() - # 兜底:看系统 wlan0 有没有 IP - try: - ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() - if ip: - self._wifi_connected = True - self._wifi_ip = ip - return True - except: - pass - - self._wifi_connected = False - return False - def connect_wifi(self, ssid, password, verify_host=None, verify_port=None, persist=True, timeout_s=20): """ - 连接 Wi-Fi(先用新凭证尝试连接并验证可用性;失败自动回滚;成功后再决定是否落盘) - - 重要:系统的 /etc/init.d/S30wifi 通常会读取 /boot/wifi.ssid 与 /boot/wifi.pass 来连接 WiFi。 - 因此要“真正尝试连接新 WiFi”,必须临时写入 /boot/ 触发重启;若失败则把旧值写回去(回滚)。 - + 连接 Wi-Fi(委托给 wifi_manager) + Returns: (ip, error): IP地址和错误信息(成功时error为None) """ @@ -369,6 +344,77 @@ class NetworkManager: # ==================== 网络选择策略 ==================== + def _get_wifi_rssi_dbm(self): + """获取 WiFi 信号强度(委托给 wifi_manager)""" + return wifi_manager._get_wifi_rssi_dbm() + + def _measure_wifi_tcp_rtt_ms(self, host, port, samples=3, per_sample_timeout_ms=900): + """测量 WiFi TCP RTT(委托给 wifi_manager)""" + return wifi_manager._measure_wifi_tcp_rtt_ms(host, port, samples, per_sample_timeout_ms) + + def _is_wifi_quality_bad(self, wifi_rtt_ms, wifi_rssi_dbm): + """判断 WiFi 质量是否差(委托给 wifi_manager)""" + return wifi_manager._is_wifi_quality_bad(wifi_rtt_ms, wifi_rssi_dbm) + + def is_4g_available(self): + """ + 快速判断 4G 是否可用(不做 DNS,仅验证 SIM/附着/有 IP) + 兼容性兜底:若未拿到 IP,再补充设置 APN 并激活 PDP context + """ + try: + atc = hardware_manager.at_client + if atc is None: + return False + + with self.get_uart_lock(): + # 1) SIM 就绪 + r = atc.send("AT+CPIN?", "READY", 3000) + if "READY" not in r: + return False + + # 2) 尝试附着到网络 + # 不同模块返回可能略有差异,所以做宽松解析 + r2 = atc.send("AT+CGATT?", "OK", 3000) + m = re.search(r"\+CGATT:\s*(\d+)", r2) + attached = int(m.group(1)) == 1 if m else None + if attached is False or attached is None: + atc.send("AT+CGATT=1", "OK", 5000) + + # 3) 查询 PDP 地址(有 IP 表示网络可用) + def _extract_ip(resp: str): + m3 = re.search(r"(\d{1,3}\.){3}\d{1,3}", resp or "") + if not m3: + return None + ip = m3.group(0) + if ip.startswith("0.") or ip.startswith("127."): + return None + return ip + + r3 = atc.send("AT+CGPADDR=1", "OK", 3000) + ip = _extract_ip(r3) + if ip: + return True + + # 4) 若没 IP:补充设置 APN + 激活 PDP(某些网络/模组需要这一步) + atc.send('AT+CGDCONT=1,"IP","CMNET"', "OK", 3000) + + qact_resp = atc.send("AT+CGACT?", "OK", 3000) + if "+CGACT:" not in (qact_resp or "") or "1,1" not in (qact_resp or ""): + atc.send("AT+CGACT=1,1", "OK", 10000) + + r4 = atc.send("AT+CGPADDR=1", "OK", 3000) + ip2 = _extract_ip(r4) + if ip2: + return True + return False + except Exception: + return False + + def _apply_session_force_4g(self): + """锁定本次会话为 4G(直到关机,期间不再回切 WiFi)""" + self._session_force_4g = True + self._network_type = "4g" + def select_network(self, prefer_wifi=None): """ 自动选择网络(WiFi优先) @@ -380,32 +426,108 @@ class NetworkManager: "wifi" 或 "4g" 或 None(无可用网络) """ if prefer_wifi is None: - prefer_wifi = self._prefer_wifi + prefer_wifi = wifi_manager.prefer_wifi + + # 本次会话锁定:只要 4G 可用就一直用 4G + if self._session_force_4g: + self.logger.info("[NET] 会话锁定 4G:继续使用 4G(跳过 WiFi 质量评估)") + self._network_type = "4g" + return "4g" - # 策略1:如果指定优先WiFi,且WiFi可用,使用WiFi + host = self._server_ip + port = self._server_port + + # 1) 开机先尝试 WiFi,并评估质量 if prefer_wifi and self.is_wifi_connected(): - # 检查WiFi是否能连接到服务器 - if self.is_server_reachable(self._server_ip, self._server_port, timeout=3): + wifi_rssi_dbm = self._get_wifi_rssi_dbm() + wifi_rtt_ms, wifi_reachable = self._measure_wifi_tcp_rtt_ms( + host, port, + samples=getattr(config, "WIFI_QUALITY_RTT_SAMPLES", 3), + per_sample_timeout_ms=900, + ) + wifi_bad = self._is_wifi_quality_bad(wifi_rtt_ms, wifi_rssi_dbm) + + try: + self.logger.info( + f"[NET] WiFi质量评估:rtt_ms(median)={wifi_rtt_ms:.1f}, rssi_dbm={wifi_rssi_dbm}, " + f"reachable={wifi_reachable}, bad={wifi_bad}" + ) + except Exception: + pass + + # 如果质量差且 4G 可用 -> 切换 4G 并锁定本次会话 + if wifi_bad and self.is_4g_available(): + self._apply_session_force_4g() + self.logger.warning("[NET] WiFi质量差且 4G 可用 -> 切换到 4G并锁定本次会话") + return "4g" + + # 否则:仍以 WiFi 为主,但如果服务器不可达则回退到 4G + if wifi_reachable or self.is_server_reachable(host, port, timeout=3): self._network_type = "wifi" - self.logger.info(f"[NET] 选择WiFi网络,IP: {self._wifi_ip}") - import os - os.environ["TZ"] = "Asia/Shanghai" - os.system("ntpdate pool.ntp.org") + self.logger.info(f"[NET] 选择WiFi网络,IP: {wifi_manager.wifi_ip}") + try: + os.environ["TZ"] = "Asia/Shanghai" + os.system("ntpdate pool.ntp.org") + except Exception: + pass return "wifi" - else: - self.logger.warning("[NET] WiFi已连接但无法访问服务器,尝试4G") + + # WiFi可用但服务器不可达 + self.logger.warning("[NET] WiFi可用但服务器不可达,尝试4G") + + # 2) 如果 WiFi 没法用/不想用,回退到 4G + if self.is_4g_available(): + self._network_type = "4g" + self.logger.info("[NET] 选择4G网络") + return "4g" + + # 3) 两者都不可用 + self.logger.error("[NET] WiFi 与 4G 均不可用") + return None + + def _start_wifi_quality_monitor(self): + """ + 启动 WiFi 质量后台监测线程(委托给 wifi_manager) + """ + wifi_manager.start_quality_monitor( + network_type_callback=lambda: self._network_type, + on_poor_quality_callback=self._switch_to_4g_due_to_poor_wifi + ) + + def _stop_wifi_quality_monitor(self): + """停止 WiFi 质量监测线程(委托给 wifi_manager)""" + wifi_manager.stop_quality_monitor() + + def get_wifi_quality_status(self): + """获取当前 WiFi 质量状态(委托给 wifi_manager)""" + return wifi_manager.get_wifi_quality_status() - # 策略2:如果WiFi可用,使用WiFi - if self.is_wifi_connected(): - if self.is_server_reachable(self._server_ip, self._server_port, timeout=3): - self._network_type = "wifi" - self.logger.info(f"[NET] 选择WiFi网络,IP: {self._wifi_ip}") - return "wifi" - - # 策略3:回退到4G - self.logger.info("[NET] WiFi不可用或无法连接服务器,使用4G网络") - self._network_type = "4g" - return "4g" + def _switch_to_4g_due_to_poor_wifi(self): + """ + 由于 WiFi 质量差,切换到 4G 网络 + """ + self.logger.info("[WiFi->4G] 开始切换到 4G 网络") + + # 1. 标记本次上电强制使用 4G + self._session_force_4g = True + + # 2. 关闭 WiFi socket + wifi_manager.disconnect_wifi() + + # 3. 重置连接状态 + self._tcp_connected = False + self._network_type = None # 清空,让 select_network 重新选择 + + # 4. 检查 4G 是否可用 + if self.is_4g_available(): + self._network_type = "4g" + self.logger.info("[WiFi->4G] 切换成功,将使用 4G 网络") + return True + else: + self.logger.error("[WiFi->4G] 4G 不可用,无法切换") + # 回退:继续使用 WiFi(虽然质量差) + self._session_force_4g = False + return False def safe_enqueue(self, data_dict, msg_type=2, high=False): @@ -485,32 +607,36 @@ class NetworkManager: """通过WiFi建立TCP连接""" try: # 创建TCP socket - self._wifi_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._wifi_socket.settimeout(5.0) # 5秒超时 + wifi_manager.wifi_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + wifi_manager.wifi_socket.settimeout(5.0) # 5秒超时 # 连接到服务器 addr_info = socket.getaddrinfo(config.SERVER_IP, config.SERVER_PORT, socket.AF_INET, socket.SOCK_STREAM)[0] - self._wifi_socket.connect(addr_info[-1]) + wifi_manager.wifi_socket.connect(addr_info[-1]) # 设置非阻塞模式(用于接收数据) - self._wifi_socket.setblocking(False) + wifi_manager.wifi_socket.setblocking(False) # 加快消息发送 - self._wifi_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - + wifi_manager.wifi_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + self._tcp_connected = True - self.logger.info("[WIFI-TCP] TCP连接已建立") + self.logger.info("[WIFI-TCP] TCP 连接已建立") + + # 启动 WiFi 质量后台监测 + self._start_wifi_quality_monitor() + return True except Exception as e: self.logger.error(f"[WIFI-TCP] 连接失败: {e}") - if self._wifi_socket: + if wifi_manager.wifi_socket: try: - self._wifi_socket.close() + wifi_manager.wifi_socket.close() except: pass - self._wifi_socket = None + wifi_manager.wifi_socket = None return False @@ -543,12 +669,12 @@ class NetworkManager: def _check_wifi_connection(self): """检查WiFi TCP连接是否仍然有效""" - if not self._wifi_socket: + if not wifi_manager.wifi_socket: return False try: # send(b"") 在很多实现里是 no-op,无法可靠探测断线。 # 用非阻塞 peek 来判断:若对端已关闭,recv 会返回 b""。 - data = self._wifi_socket.recv(1, socket.MSG_PEEK | socket.MSG_DONTWAIT) + data = wifi_manager.wifi_socket.recv(1, socket.MSG_PEEK | socket.MSG_DONTWAIT) if data == b"": raise OSError("wifi socket closed") return True @@ -560,8 +686,8 @@ class NetworkManager: err = getattr(e, "errno", None) if err in (11, 35, 10035): # EAGAIN/EWOULDBLOCK on linux/mac/win return True - - # 某些平台会把“无数据可读/超时”抛成 socket.timeout / TimeoutError,errno 可能为 None, + + # 某些平台会把"无数据可读/超时"抛成 socket.timeout / TimeoutError,errno 可能为 None, # 这不代表断线:视为 benign,交给真正的 send/recv 去判定断线。 if (err is None) and (("timed out" in str(e).lower()) or isinstance(e, (TimeoutError, socket.timeout))): try: @@ -569,9 +695,9 @@ class NetworkManager: except: pass return True - + # 某些嵌入式 socket 实现可能不支持 MSG_PEEK/MSG_DONTWAIT,或返回 EINVAL/ENOTSUP。 - # 这种情况不代表断线:选择“无法检测但不判死”,交给真正的 send/recv 去触发断线处理。 + # 这种情况不代表断线:选择"无法检测但不判死",交给真正的 send/recv 去触发断线处理。 # 常见:EINVAL(22), ENOTSUP(95), EOPNOTSUPP(95), EINTR(4) if err in (4, 22, 95): try: @@ -579,38 +705,38 @@ class NetworkManager: except: pass return True - + # 记录真实错误,便于定位 try: self.logger.error(f"[WIFI-TCP] conncheck failed errno={err}: {e}") except: pass - - # 明确的“连接不可用”错误才判定断线并清理 + + # 明确的"连接不可用"错误才判定断线并清理 # 常见:ENOTCONN(107), ECONNRESET(104), EPIPE(32), EBADF(9) if err in (9, 32, 104, 107, 10054, 10057): try: - self._wifi_socket.close() + wifi_manager.wifi_socket.close() except: pass - self._wifi_socket = None + wifi_manager.wifi_socket = None self._tcp_connected = False return False - + # socket已断开或不可用,清理 try: - self._wifi_socket.close() + wifi_manager.wifi_socket.close() except: pass - self._wifi_socket = None + wifi_manager.wifi_socket = None self._tcp_connected = False return False except Exception: try: - self._wifi_socket.close() + wifi_manager.wifi_socket.close() except: pass - self._wifi_socket = None + wifi_manager.wifi_socket = None self._tcp_connected = False return False @@ -628,14 +754,18 @@ class NetworkManager: self._network_type = None def _disconnect_tcp_via_wifi(self): - """断开WiFi TCP连接""" - with self._wifi_socket_lock: - if self._wifi_socket: + """断开 WiFi TCP 连接并停止监测""" + # 先停止监测线程 + self._stop_wifi_quality_monitor() + + # 再关闭 socket + with wifi_manager.wifi_socket_lock: + if wifi_manager.wifi_socket: try: - self._wifi_socket.close() + wifi_manager.wifi_socket.close() except: pass - self._wifi_socket = None + wifi_manager.wifi_socket = None def _disconnect_tcp_via_4g(self): link_id = getattr(config, "TCP_LINK_ID", 0) @@ -672,16 +802,16 @@ class NetworkManager: def _tcp_send_raw_via_wifi(self, data: bytes, max_retries=2) -> bool: """通过WiFi socket发送TCP数据""" - if not self._wifi_socket: + if not wifi_manager.wifi_socket: return False - with self._wifi_socket_lock: + with wifi_manager.wifi_socket_lock: for attempt in range(max_retries): try: - # 标准socket发送 + # 标准 socket 发送 total_sent = 0 while total_sent < len(data): - sent = self._wifi_socket.send(data[total_sent:]) + sent = wifi_manager.wifi_socket.send(data[total_sent:]) if sent == 0: # socket连接已断开 self.logger.warning(f"[WIFI-TCP] 发送失败,socket已断开(尝试 {attempt+1}/{max_retries})") @@ -698,19 +828,19 @@ class NetworkManager: self.logger.error(f"[WIFI-TCP] 发送异常: {e}(尝试 {attempt+1}/{max_retries})") # 发送异常通常意味着连接已不可用,主动关闭以触发重连 try: - self._wifi_socket.close() + wifi_manager.wifi_socket.close() except: pass - self._wifi_socket = None + wifi_manager.wifi_socket = None self._tcp_connected = False return False except Exception as e: self.logger.error(f"[WIFI-TCP] 未知错误: {e}(尝试 {attempt+1}/{max_retries})") try: - self._wifi_socket.close() + wifi_manager.wifi_socket.close() except: pass - self._wifi_socket = None + wifi_manager.wifi_socket = None self._tcp_connected = False return False @@ -747,15 +877,14 @@ class NetworkManager: verify_mode = getattr(config, "SSL_VERIFY_MODE", 0) # 1) 配置认证方式 - # r = hardware_manager.at_client.send(f'AT+MSSLCFG="auth",{ssl_id},{auth_mode}', "OK", 3000) - r = hardware_manager.at_client.send(f'AT+MSSLCFG="auth",{ssl_id},0', "OK", 3000) + r = hardware_manager.at_client.send(f'AT+MSSLCFG="auth",{ssl_id},{auth_mode}', "OK", 3000) + # r = hardware_manager.at_client.send(f'AT+MSSLCFG="auth",{ssl_id},0', "OK", 3000) self.logger.info(f"[4G-TCP] AT+MSSLCFG=\"auth\",{ssl_id},0 response: {r}") if "OK" not in r: return False # 2) 写入根证书(只有 verify_mode=1 才需要) - # if verify_mode == 1: - if False: + if verify_mode == 1: cert_filename = getattr(config, "SSL_CERT_FILENAME", None) cert_path = getattr(config, "SSL_CERT_PATH", None) if not cert_filename or not cert_path: @@ -767,6 +896,7 @@ class NetworkManager: # 按手册:AT+MSSLCERTWR="file",0,size -> 等待 ">" -> 写入证书内容 -> 等 OK r = hardware_manager.at_client.send(f'AT+MSSLCERTWR="{cert_filename}",0,{len(cert_data)}', ">", 5000) + self.logger.info(f"[4G-TCP] AT+MSSLCERTWR=\"{cert_filename}\",0,{len(cert_data)} response: {r}") if ">" not in r: return False @@ -774,18 +904,36 @@ class NetworkManager: hardware_manager.uart4g.write(cert_data) r = hardware_manager.at_client.send("", "OK", 8000) + self.logger.info(f"[4G-TCP] AT+MSSLCERTWR=\"{cert_filename}\",0,{len(cert_data)} response: {r}") if "OK" not in r: return False + r = hardware_manager.at_client.send(f'AT+MSSLCHECK="{cert_filename}"', "OK", 8000) + if "OK" not in r: + self.logger.error(f"[4G-TCP] MSSLCHECK failed, response: {r}") + return False + else: + self.logger.info(f"[4G-TCP] MSSLCHECK response: {r}") + + r = hardware_manager.at_client.send(f'AT+MSSLLIST=1', "OK", 3000) + self.logger.info(f"[4G-TCP] AT+MSSLLIST=1 response: {r}") + + r = hardware_manager.at_client.send(f'AT+MSSLCERTRD="{cert_filename}"', "OK", 3000) + self.logger.info(f"[4G-TCP] AT+MSSLCERTRD=\"{cert_filename}\" response: {r}") + + # 3) 引用根证书 r = hardware_manager.at_client.send(f'AT+MSSLCFG="cert",{ssl_id},"{cert_filename}"', "OK", 3000) if "OK" not in r: return False + else: + self.logger.info(f"[4G-TCP] MSSLCFG(cert) response: {r}") # 4) 绑定 TCP 通道到 ssl_id,并启用 r = hardware_manager.at_client.send(f'AT+MIPCFG="ssl",{link_id},{ssl_id},1', "OK", 3000) self.logger.info(f"[4G-TCP] AT+MIPCFG=\"ssl\",{link_id},{ssl_id},1 response: {r}") if "OK" not in r: + self.logger.error(f"[4G-TCP] MIPCFG(ssl) failed, response: {r}") return False return True @@ -793,37 +941,37 @@ class NetworkManager: def receive_tcp_data_via_wifi(self, timeout_ms=100): """ 通过WiFi接收TCP数据 - + Args: timeout_ms: 超时时间(毫秒) - + Returns: bytes: 接收到的数据,如果没有数据则返回 b"" """ - if not self._wifi_socket: + if not wifi_manager.wifi_socket: return b"" - + try: # 这里保持 socket 为非阻塞模式(连接时已 setblocking(False))。 - # 不要反复 settimeout(),否则会把 socket 切回“阻塞+超时”,并导致 conncheck 误报 timed out。 - data = self._wifi_socket.recv(4096) # 每次最多接收4KB(无数据会抛 BlockingIOError) + # 不要反复 settimeout(),否则会把 socket 切回"阻塞+超时",并导致 conncheck 误报 timed out。 + data = wifi_manager.wifi_socket.recv(4096) # 每次最多接收4KB(无数据会抛 BlockingIOError) return data - + except BlockingIOError: # 无数据可读是正常的 return b"" except OSError as e: # socket错误(连接断开等) self.logger.warning(f"[WIFI-TCP] 接收数据失败: {e}") - + # 关闭socket try: - self._wifi_socket.close() + wifi_manager.wifi_socket.close() except: pass - self._wifi_socket = None + wifi_manager.wifi_socket = None self._tcp_connected = False - + return b"" except Exception as e: self.logger.error(f"[WIFI-TCP] 接收数据异常: {e}") @@ -1150,19 +1298,19 @@ class NetworkManager: data = self.receive_tcp_data_via_wifi(timeout_ms=50) if data: # 将数据添加到缓冲区 - self._wifi_recv_buffer += data + wifi_manager.recv_buffer += data # 尝试从缓冲区解析完整的数据包 - while len(self._wifi_recv_buffer) >= 12: # 至少需要12字节的头部 + while len(wifi_manager.recv_buffer) >= 12: # 至少需要12字节的头部 # 解析头部 try: - body_len, msg_type, checksum = struct.unpack(">III", self._wifi_recv_buffer[:12]) + body_len, msg_type, checksum = struct.unpack(">III", wifi_manager.recv_buffer[:12]) total_len = 12 + body_len - if len(self._wifi_recv_buffer) >= total_len: + if len(wifi_manager.recv_buffer) >= total_len: # 有完整的数据包 - payload = self._wifi_recv_buffer[:total_len] - self._wifi_recv_buffer = self._wifi_recv_buffer[total_len:] + payload = wifi_manager.recv_buffer[:total_len] + wifi_manager.recv_buffer = wifi_manager.recv_buffer[total_len:] item = (0, payload) # link_id=0 for WiFi break else: @@ -1170,7 +1318,7 @@ class NetworkManager: break except: # 解析失败,清空缓冲区 - self._wifi_recv_buffer = b"" + wifi_manager.recv_buffer = b"" break elif self._network_type == "4g": # 4G接收数据 @@ -1296,13 +1444,18 @@ class NetworkManager: # 自动判断模式:如果没有明确指定,根据WiFi连接状态和凭证决定 if mode not in ("4g", "wifi"): self.logger.info("ota missing mode, auto-detecting...") - # 只有同时满足:WiFi已连接 且 提供了WiFi凭证,才使用WiFi - if self.is_wifi_connected() and ssid and password: - mode = "wifi" - self.logger.info("ota auto-selected: wifi (WiFi connected and credentials provided)") - else: + # 若本次会话已锁定 4G,则 OTA 自动也走 4G,避免后续回切导致体验不一致 + if self._session_force_4g: mode = "4g" - self.logger.info("ota auto-selected: 4g (WiFi not available or no credentials)") + self.logger.info("ota auto-selected: 4g (session locked on 4g)") + else: + # 只有同时满足:WiFi已连接 且 提供了WiFi凭证,才使用WiFi + if self.is_wifi_connected() and ssid and password: + mode = "wifi" + self.logger.info("ota auto-selected: wifi (WiFi connected and credentials provided)") + else: + mode = "4g" + self.logger.info("ota auto-selected: 4g (WiFi not available or no credentials)") hardware_manager.stop_idle_timer() # 停表,注意OTA停表之后,就没有再开表,因为OTA后面会重启,会重新开表 diff --git a/wifi.py b/wifi.py new file mode 100644 index 0000000..f2636d4 --- /dev/null +++ b/wifi.py @@ -0,0 +1,572 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +WiFi管理模块 +提供WiFi连接、网络检测、质量监测等功能 +""" +import os +import re +import socket +import threading +import time as std_time +from maix import time + +import config +from logger_manager import logger_manager + + +class WiFiManager: + """WiFi管理器(单例)""" + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super(WiFiManager, cls).__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + if self._initialized: + return + + # WiFi 相关状态 + self._wifi_connected = False + self._wifi_ip = None + self._wifi_socket = None + self._wifi_socket_lock = threading.Lock() + self._prefer_wifi = True # 是否优先使用 WiFi + self._recv_buffer = b"" # TCP 接收缓冲区 + + # WiFi 质量监测(后台线程) + self._wifi_quality_monitor_thread = None + self._wifi_quality_stop_event = threading.Event() + self._last_wifi_rtt_ms = None # 最近一次测量的 RTT + self._last_wifi_rssi_dbm = None # 最近一次测量的 RSSI + + # 服务器相关(用于网络检测) + try: + import archery_netcore as _netcore + self._server_ip = _netcore.get_config().get("SERVER_IP") + self._server_port = _netcore.get_config().get("SERVER_PORT") + except Exception: + self._server_ip = getattr(config, "SERVER_IP", None) + self._server_port = getattr(config, "SERVER_PORT", None) + + self._initialized = True + + @property + def logger(self): + """获取 logger 对象""" + return logger_manager.logger + + @property + def wifi_connected(self): + """WiFi是否已连接""" + return self._wifi_connected + + @property + def wifi_ip(self): + """WiFi IP地址""" + return self._wifi_ip + + @property + def wifi_socket(self): + """WiFi socket对象""" + return self._wifi_socket + + @wifi_socket.setter + def wifi_socket(self, value): + """设置WiFi socket对象""" + self._wifi_socket = value + + @property + def wifi_socket_lock(self): + """获取WiFi socket锁""" + return self._wifi_socket_lock + + @property + def prefer_wifi(self): + """是否优先使用WiFi""" + return self._prefer_wifi + + @prefer_wifi.setter + def prefer_wifi(self, value): + """设置是否优先使用WiFi""" + self._prefer_wifi = value + + @property + def last_wifi_rtt_ms(self): + """最近一次测量的RTT""" + return self._last_wifi_rtt_ms + + @property + def last_wifi_rssi_dbm(self): + """最近一次测量的RSSI""" + return self._last_wifi_rssi_dbm + + @property + def recv_buffer(self): + """TCP接收缓冲区""" + return self._recv_buffer + + @recv_buffer.setter + def recv_buffer(self, value): + """设置TCP接收缓冲区""" + self._recv_buffer = value + + # ==================== WiFi 连接方法 ==================== + + def is_wifi_connected(self): + """检查WiFi是否已连接""" + # 优先用 MaixPy network(如果可用) + try: + from maix import network + wlan = network.WLAN(network.TYPE_WIFI) + if wlan.isconnected(): + self._wifi_connected = True + return True + except: + self.logger.warning("Failed to check WiFi connection using MaixPy network", exc_info=True) + + # 兜底:看系统 wlan0 有没有 IP + try: + ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() + if ip: + self._wifi_connected = True + self._wifi_ip = ip + return True + except: + self.logger.warning("Failed to check WiFi connection using system command", exc_info=True) + + self._wifi_connected = False + return False + + def connect_wifi(self, ssid, password, verify_callback=None, persist=True, timeout_s=20): + """ + 连接 Wi-Fi(先用新凭证尝试连接并验证可用性;失败自动回滚;成功后再决定是否落盘) + + 重要:系统的 /etc/init.d/S30wifi 通常会读取 /boot/wifi.ssid 与 /boot/wifi.pass 来连接 WiFi。 + 因此要"真正尝试连接新 WiFi",必须临时写入 /boot/ 触发重启;若失败则把旧值写回去(回滚)。 + + Args: + ssid: WiFi SSID + password: WiFi密码 + verify_callback: 验证回调函数,接收 (ip) 参数,返回 (success: bool, error: str) + persist: 是否持久化保存凭证 + timeout_s: 连接超时时间(秒) + + Returns: + (ip, error): IP地址和错误信息(成功时error为None) + """ + # 配置文件路径定义 + conf_path = "/etc/wpa_supplicant.conf" + ssid_file = "/boot/wifi.ssid" + pass_file = "/boot/wifi.pass" + + def _read_text(path: str): + try: + if os.path.exists(path): + with open(path, "r", encoding="utf-8") as f: + return f.read() + except Exception: + return None + return None + + def _write_text(path: str, content: str): + with open(path, "w", encoding="utf-8") as f: + f.write(content) + + def _restore_boot(old_ssid: str | None, old_pass: str | None): + # 还原 /boot 凭证:原来没有就删除,原来有就写回 + try: + if old_ssid is None: + if os.path.exists(ssid_file): + os.remove(ssid_file) + else: + _write_text(ssid_file, old_ssid) + except Exception: + pass + try: + if old_pass is None: + if os.path.exists(pass_file): + os.remove(pass_file) + else: + _write_text(pass_file, old_pass) + except Exception: + pass + + old_conf = _read_text(conf_path) + old_boot_ssid = _read_text(ssid_file) + old_boot_pass = _read_text(pass_file) + + try: + # 生成 wpa_supplicant 配置(写 /etc 作为辅助,具体是否生效取决于 S30wifi 脚本) + net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read() + if "network={" not in net_conf: + raise RuntimeError("Failed to generate wpa config") + + try: + _write_text( + conf_path, + "ctrl_interface=/var/run/wpa_supplicant\n" + "update_config=1\n\n" + + net_conf, + ) + except Exception: + # 不强制要求写 /etc 成功(某些系统只用 /boot) + pass + + # ====== 临时写入 /boot 凭证,触发 WiFi 服务真正尝试连接新 SSID ====== + _write_text(ssid_file, ssid.strip()) + _write_text(pass_file, password.strip()) + + # 重启 Wi-Fi 服务 + os.system("/etc/init.d/S30wifi restart") + + # 等待获取 IP + wait_s = int(timeout_s) if timeout_s and timeout_s > 0 else 20 + wait_s = min(max(wait_s, 5), 60) + for _ in range(wait_s): + ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() + if ip: + # 拿到 IP 不代表可上网/可访问目标;继续做可达性验证 + self._wifi_connected = True + self._wifi_ip = ip + self.logger.info(f"[WIFI] 已连接,IP: {ip},开始验证网络可用性...") + + # 验证能访问指定目标(通过回调函数) + if verify_callback: + success, error = verify_callback(ip) + if not success: + raise RuntimeError(error or "Verification failed") + + # ====== 验证通过 ====== + if not persist: + # 不持久化:把 /boot 恢复成旧值(不重启,当前连接保持不变) + _restore_boot(old_boot_ssid, old_boot_pass) + self.logger.info("[WIFI] 网络验证通过,但按 persist=False 回滚 /boot 凭证(不重启)") + else: + self.logger.info("[WIFI] 网络验证通过,/boot 凭证已保留(持久化)") + + return ip, None + + std_time.sleep(1) + + raise RuntimeError("Timeout: No IP obtained") + + except Exception as e: + # 失败:回滚 /boot 和 /etc,重启 WiFi 恢复旧网络 + _restore_boot(old_boot_ssid, old_boot_pass) + try: + if old_conf is not None: + _write_text(conf_path, old_conf) + except Exception: + pass + try: + os.system("/etc/init.d/S30wifi restart") + except Exception: + pass + + self._wifi_connected = False + self._wifi_ip = None + self.logger.error(f"[WIFI] 连接/验证失败,已回滚: {e}") + return None, str(e) + + def disconnect_wifi(self): + """断开WiFi连接并清理资源""" + if self._wifi_socket: + try: + self._wifi_socket.close() + except Exception: + pass + finally: + self._wifi_socket = None + self._wifi_connected = False + self._wifi_ip = None + + # ==================== WiFi 质量监测 ==================== + + def _get_wifi_rssi_dbm(self): + """ + 获取 WiFi 信号强度(dBm,越大越好;比如 -40 比 -80 好) + 由于不同固件实现差异,这里做多策略兜底,失败返回 None + """ + # 1) 优先使用:iw dev wlan0 link + # 你提供的输出示例包含:signal: -58 dBm + try: + out = os.popen("iw dev wlan0 link 2>/dev/null").read() + if out: + m = re.search(r"signal:\s*(-?\d+(?:\.\d+)?)\s*dBm", out, re.IGNORECASE) + if m: + v = float(m.group(1)) + # 合理范围兜底 + if -120.0 <= v <= 0.0: + return v + m2 = re.search(r"signal:\s*(-?\d+(?:\.\d+)?)", out, re.IGNORECASE) + if m2: + v = float(m2.group(1)) + if -120.0 <= v <= 0.0: + return v + except Exception: + pass + + # 2) 兜底:iwconfig + try: + out = os.popen("iwconfig wlan0 2>/dev/null").read() + m = re.search(r"Signal level[=:]\s*(-?\d+(?:\.\d+)?)\s*dBm", out, re.IGNORECASE) + if m: + v = float(m.group(1)) + if -120.0 <= v <= 0.0: + return v + m2 = re.search(r"Signal level[=:]\s*(-?\d+(?:\.\d+)?)", out, re.IGNORECASE) + if m2: + v = float(m2.group(1)) + if -120.0 <= v <= 0.0: + return v + except Exception: + pass + + return None + + def _measure_wifi_tcp_rtt_ms(self, host, port, samples=3, per_sample_timeout_ms=900): + """ + 测量:在当前 WiFi 下,TCP 建连耗时(RTT 的近似) + + Args: + host: 目标主机 + port: 目标端口 + samples: 采样次数 + per_sample_timeout_ms: 每次采样超时时间(毫秒) + + Returns: + (median_rtt_ms, reachable_bool) + """ + rtts = [] + reachable = False + addr = None + + # 先解析一次地址,避免每次样本都做 DNS + try: + addr_info = socket.getaddrinfo(host, port)[0] + addr = (addr_info[0], addr_info[1], addr_info[2], addr_info[-1]) + except Exception: + return float("inf"), False + + for _ in range(max(1, int(samples or 1))): + s = None + try: + s = socket.socket(addr[0], addr[1], addr[2]) + s.settimeout(max(0.1, float(per_sample_timeout_ms) / 1000.0)) + t0 = time.ticks_ms() + s.connect(addr[-1]) + elapsed_ms = abs(time.ticks_diff(time.ticks_ms(), t0)) + rtts.append(float(elapsed_ms)) + reachable = True + except Exception: + # 单个样本失败不影响整体,只要有成功样本就继续 + pass + finally: + try: + if s: + s.close() + except Exception: + pass + + # 小间隔,避免过度占用 + try: + time.sleep_ms(100) + except Exception: + pass + + if not rtts: + return float("inf"), False + + rtts_sorted = sorted(rtts) + mid = len(rtts_sorted) // 2 + if len(rtts_sorted) % 2 == 1: + median = rtts_sorted[mid] + else: + median = (rtts_sorted[mid - 1] + rtts_sorted[mid]) / 2.0 + return median, reachable + + def _is_wifi_quality_bad(self, wifi_rtt_ms, wifi_rssi_dbm): + """ + 综合判断 WiFi 质量是否差: + - RTT中位数超过阈值 -> bad + - 若启用 RSSI:信号弱(RSSI更差于阈值) 且 RTT 也偏高 -> bad + """ + if wifi_rtt_ms >= config.WIFI_QUALITY_RTT_BAD_MS: + return True + + if not getattr(config, "WIFI_QUALITY_USE_RSSI", False): + return False + + if wifi_rssi_dbm is None: + return False + + # "rtt_warn + rssi_bad" 联合条件 + if wifi_rtt_ms >= config.WIFI_QUALITY_RTT_WARN_MS and wifi_rssi_dbm <= config.WIFI_QUALITY_RSSI_BAD_DBM: + return True + + return False + + def get_wifi_quality_status(self): + """ + 获取当前 WiFi 质量状态(用于调试或显示) + + Returns: + dict: {"rtt_ms": float, "rssi_dbm": float, "is_bad": bool} + """ + rtt = self._last_wifi_rtt_ms + rssi = self._last_wifi_rssi_dbm + is_bad = False + + if rtt is not None and rtt != float("inf"): + is_bad = self._is_wifi_quality_bad(rtt, rssi) + + return { + "rtt_ms": rtt if rtt is not None and rtt != float("inf") else None, + "rssi_dbm": rssi, + "is_bad": is_bad + } + + # ==================== 后台质量监测线程 ==================== + + def start_quality_monitor(self, network_type_callback, on_poor_quality_callback): + """ + 启动 WiFi 质量后台监测线程(每 5 秒测量一次 RTT 和 RSSI) + 只在 WiFi 连接时运行,不影响业务发送性能 + + Args: + network_type_callback: 获取当前网络类型的回调函数 + on_poor_quality_callback: WiFi质量差时的回调函数 + """ + if self._wifi_quality_monitor_thread is not None: + self.logger.warning("[WiFi Monitor] 监测线程已在运行") + return + + self._network_type_callback = network_type_callback + self._on_poor_quality_callback = on_poor_quality_callback + self._wifi_quality_stop_event.clear() + self._wifi_quality_monitor_thread = threading.Thread( + target=self._quality_monitor_loop, + daemon=True, + name="wifi_quality_monitor" + ) + self._wifi_quality_monitor_thread.start() + self.logger.info("[WiFi Monitor] 已启动后台监测线程") + + def stop_quality_monitor(self): + """停止 WiFi 质量监测线程""" + if self._wifi_quality_monitor_thread is None: + return + + self._wifi_quality_stop_event.set() + try: + self._wifi_quality_monitor_thread.join(timeout=2.0) + except Exception as e: + self.logger.error(f"[WiFi Monitor] 停止线程失败:{e}") + finally: + self._wifi_quality_monitor_thread = None + self.logger.info("[WiFi Monitor] 已停止后台监测线程") + + def _quality_monitor_loop(self): + """ + WiFi 质量监测循环(后台线程) + 每 5 秒测量一次 RTT 和 RSSI,发现质量差则触发切换 + """ + while not self._wifi_quality_stop_event.is_set(): + try: + # 只在 WiFi 连接时才测量 + network_type = self._network_type_callback() + if network_type == "wifi" and self._wifi_socket: + # 测量 RTT(1 个样本,快速测量) + rtt_ms, reachable = self._measure_wifi_tcp_rtt_ms( + self._server_ip, self._server_port, + samples=1, per_sample_timeout_ms=600 + ) + + # 获取 RSSI + rssi_dbm = self._get_wifi_rssi_dbm() + + # 更新缓存 + self._last_wifi_rtt_ms = rtt_ms if reachable else None + self._last_wifi_rssi_dbm = rssi_dbm + self.logger.debug(f"[WiFi Monitor] - RTT={rtt_ms:.0f}ms, RSSI={rssi_dbm:.0f}dBm") + + # 判断质量是否差(切换前做 2 次快速复测,防止瞬时抖动) + def _is_bad_now(_reachable, _rtt, _rssi): + if (not _reachable) or (_rtt is None) or (_rtt == float("inf")): + return True + return self._is_wifi_quality_bad(_rtt, _rssi) + + bad = _is_bad_now(reachable, rtt_ms, rssi_dbm) + if bad: + self.logger.warning("[WiFi Monitor] 质量差,切换前快速重试 2 次(每次间隔1秒)") + + for retry_idx in range(2): + time.sleep_ms(1000) + rtt2, reachable2 = self._measure_wifi_tcp_rtt_ms( + self._server_ip, self._server_port, + samples=1, per_sample_timeout_ms=600 + ) + rssi2 = self._get_wifi_rssi_dbm() + + # 更新缓存,便于外部查看最新状态 + self._last_wifi_rtt_ms = rtt2 if reachable2 else None + self._last_wifi_rssi_dbm = rssi2 + + bad2 = _is_bad_now(reachable2, rtt2, rssi2) + try: + self.logger.info( + f"[WiFi Monitor] 复测{retry_idx+1}/2: reachable={reachable2}, " + f"rtt={rtt2 if rtt2 != float('inf') else -1:.0f}ms, rssi={rssi2}, bad={bad2}" + ) + except Exception: + pass + + if not bad2: + self.logger.info("[WiFi Monitor] 复测恢复正常,继续保留 WiFi(不切换)") + bad = False + break + + if bad: + self.logger.warning("[WiFi Monitor] 复测仍差/不通,尝试切换到 4G") + self._on_poor_quality_callback() + + # 休眠 5 秒 + time.sleep(5) + + except Exception as e: + self.logger.error(f"[WiFi Monitor] 监测异常:{e}") + # 异常后继续循环,避免线程退出 + continue + + +# 全局 WiFi 管理器实例 +wifi_manager = WiFiManager() + + +# ==================== 兼容旧接口的函数 ==================== + +def is_wifi_connected(): + """尽量判断当前是否有 Wi-Fi(有则走 Wi-Fi OTA,否则走 4G OTA)""" + return wifi_manager.is_wifi_connected() + + +def connect_wifi(ssid, password, verify_callback=None, persist=True, timeout_s=20): + """ + 连接 Wi-Fi 并将凭证持久化保存到 /boot/ 目录, + 以便设备重启后自动连接。 + + Args: + ssid: WiFi SSID + password: WiFi密码 + verify_callback: 验证回调函数,接收 (ip) 参数,返回 (success: bool, error: str) + persist: 是否持久化保存 + timeout_s: 超时时间(秒) + + Returns: + (ip, error): IP地址和错误信息(成功时error为None) + """ + return wifi_manager.connect_wifi(ssid, password, verify_callback, persist, timeout_s)