#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ WiFi管理模块 提供WiFi连接、网络检测、质量监测等功能 """ import os import re import socket import threading import time as std_time from maix import time import config from logger_manager import logger_manager class WiFiManager: """WiFi管理器(单例)""" _instance = None def __new__(cls): if cls._instance is None: cls._instance = super(WiFiManager, cls).__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return # WiFi 相关状态 self._wifi_connected = False self._wifi_ip = None self._wifi_socket = None self._wifi_socket_lock = threading.Lock() self._prefer_wifi = True # 是否优先使用 WiFi self._recv_buffer = b"" # TCP 接收缓冲区 # WiFi 质量监测(后台线程) self._wifi_quality_monitor_thread = None self._wifi_quality_stop_event = threading.Event() self._last_wifi_rtt_ms = None # 最近一次测量的 RTT self._last_wifi_rssi_dbm = None # 最近一次测量的 RSSI # 服务器相关(用于网络检测) try: import archery_netcore as _netcore self._server_ip = _netcore.get_config().get("SERVER_IP") self._server_port = _netcore.get_config().get("SERVER_PORT") except Exception: self._server_ip = getattr(config, "SERVER_IP", None) self._server_port = getattr(config, "SERVER_PORT", None) self._initialized = True @property def logger(self): """获取 logger 对象""" return logger_manager.logger @property def wifi_connected(self): """WiFi是否已连接""" return self._wifi_connected @property def wifi_ip(self): """WiFi IP地址""" return self._wifi_ip @property def wifi_socket(self): """WiFi socket对象""" return self._wifi_socket @wifi_socket.setter def wifi_socket(self, value): """设置WiFi socket对象""" self._wifi_socket = value @property def wifi_socket_lock(self): """获取WiFi socket锁""" return self._wifi_socket_lock @property def prefer_wifi(self): """是否优先使用WiFi""" return self._prefer_wifi @prefer_wifi.setter def prefer_wifi(self, value): """设置是否优先使用WiFi""" self._prefer_wifi = value @property def last_wifi_rtt_ms(self): """最近一次测量的RTT""" return self._last_wifi_rtt_ms @property def last_wifi_rssi_dbm(self): """最近一次测量的RSSI""" return self._last_wifi_rssi_dbm @property def recv_buffer(self): """TCP接收缓冲区""" return self._recv_buffer @recv_buffer.setter def recv_buffer(self, value): """设置TCP接收缓冲区""" self._recv_buffer = value # ==================== WiFi 连接方法 ==================== def is_sta_associated(self): """ 是否作为 STA 已关联到上游 AP(用于与 AP 模式区分:AP 模式下 wlan0 可能有 IP 但 iw link 为 Not connected)。 """ try: out = os.popen("iw dev wlan0 link 2>/dev/null").read() if not out.strip(): return False if "Not connected" in out: return False return "Connected to" in out except Exception: return False def is_wifi_connected(self): """检查WiFi是否已连接""" # AP 模式下 wlan0 也可能有 IP(如 192.168.66.1),但这不代表已作为 STA 连上路由器。 # 业务侧(选网/TCP)只应在 STA 已关联到上游 AP 时认为 WiFi 可用。 if not self.is_sta_associated(): self._wifi_connected = False return False # 优先用 MaixPy network(如果可用) try: from maix import network wifi = network.wifi.Wifi() if wifi.is_connected(): self._wifi_connected = True return True except: self.logger.warning("Failed to check WiFi connection using MaixPy network", exc_info=True) # 兜底:看系统 wlan0 有没有 IP try: ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() if ip: self._wifi_connected = True self._wifi_ip = ip return True except: self.logger.warning("Failed to check WiFi connection using system command", exc_info=True) self._wifi_connected = False return False def connect_wifi(self, ssid, password, verify_callback=None, persist=True, timeout_s=20): """ 连接 Wi-Fi(先用新凭证尝试连接并验证可用性;失败自动回滚;成功后再决定是否落盘) 重要:系统的 /etc/init.d/S30wifi 通常会读取 /boot/wifi.ssid 与 /boot/wifi.pass 来连接 WiFi。 因此要"真正尝试连接新 WiFi",必须临时写入 /boot/ 触发重启;若失败则把旧值写回去(回滚)。 Args: ssid: WiFi SSID password: WiFi密码 verify_callback: 验证回调函数,接收 (ip) 参数,返回 (success: bool, error: str) persist: 是否持久化保存凭证 timeout_s: 连接超时时间(秒) Returns: (ip, error): IP地址和错误信息(成功时error为None) """ # 配置文件路径定义 conf_path = "/etc/wpa_supplicant.conf" ssid_file = "/boot/wifi.ssid" pass_file = "/boot/wifi.pass" def _read_text(path: str): try: if os.path.exists(path): with open(path, "r", encoding="utf-8") as f: return f.read() except Exception: return None return None def _write_text(path: str, content: str): with open(path, "w", encoding="utf-8") as f: f.write(content) def _restore_boot(old_ssid: str | None, old_pass: str | None): # 还原 /boot 凭证:原来没有就删除,原来有就写回 try: if old_ssid is None: if os.path.exists(ssid_file): os.remove(ssid_file) else: _write_text(ssid_file, old_ssid) except Exception: pass try: if old_pass is None: if os.path.exists(pass_file): os.remove(pass_file) else: _write_text(pass_file, old_pass) except Exception: pass old_conf = _read_text(conf_path) old_boot_ssid = _read_text(ssid_file) old_boot_pass = _read_text(pass_file) try: # 生成 wpa_supplicant 配置(写 /etc 作为辅助,具体是否生效取决于 S30wifi 脚本) net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read() if "network={" not in net_conf: raise RuntimeError("Failed to generate wpa config") try: _write_text( conf_path, "ctrl_interface=/var/run/wpa_supplicant\n" "update_config=1\n\n" + net_conf, ) except Exception: # 不强制要求写 /etc 成功(某些系统只用 /boot) pass # ====== 临时写入 /boot 凭证,触发 WiFi 服务真正尝试连接新 SSID ====== _write_text(ssid_file, ssid.strip()) _write_text(pass_file, password.strip()) # 重启 Wi-Fi 服务 os.system("/etc/init.d/S30wifi restart") # 等待获取 IP wait_s = int(timeout_s) if timeout_s and timeout_s > 0 else 20 wait_s = min(max(wait_s, 5), 60) for _ in range(wait_s): ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() if ip: # 拿到 IP 不代表可上网/可访问目标;继续做可达性验证 self._wifi_connected = True self._wifi_ip = ip self.logger.info(f"[WIFI] 已连接,IP: {ip},开始验证网络可用性...") # 验证能访问指定目标(通过回调函数) if verify_callback: success, error = verify_callback(ip) if not success: raise RuntimeError(error or "Verification failed") # ====== 验证通过 ====== if not persist: # 不持久化:把 /boot 恢复成旧值(不重启,当前连接保持不变) _restore_boot(old_boot_ssid, old_boot_pass) self.logger.info("[WIFI] 网络验证通过,但按 persist=False 回滚 /boot 凭证(不重启)") else: self.logger.info("[WIFI] 网络验证通过,/boot 凭证已保留(持久化)") return ip, None std_time.sleep(1) raise RuntimeError("Timeout: No IP obtained") except Exception as e: # 失败:回滚 /boot 和 /etc,重启 WiFi 恢复旧网络 _restore_boot(old_boot_ssid, old_boot_pass) try: if old_conf is not None: _write_text(conf_path, old_conf) except Exception: pass try: os.system("/etc/init.d/S30wifi restart") except Exception: pass self._wifi_connected = False self._wifi_ip = None self.logger.error(f"[WIFI] 连接/验证失败,已回滚: {e}") return None, str(e) def persist_sta_credentials(self, ssid: str, password: str, restart_service: bool = True): """ 仅写入 STA 凭证(/etc/wpa_supplicant.conf + /boot/wifi.ssid|pass), 可选是否立即 /etc/init.d/S30wifi restart。 不做可达性验证。用于热点配网页提交后切换到连接指定路由器。 password 为空时按开放网络(key_mgmt=NONE)写入。 Returns: (ok: bool, err_msg: str) """ ssid = (ssid or "").strip() password = (password or "").strip() if not ssid: return False, "SSID 为空" conf_path = "/etc/wpa_supplicant.conf" ssid_file = "/boot/wifi.ssid" pass_file = "/boot/wifi.pass" def _write_text(path: str, content: str): with open(path, "w", encoding="utf-8") as f: f.write(content) try: if password: net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read() if "network={" not in net_conf: return False, "wpa_passphrase 失败" else: esc = ssid.replace("\\", "\\\\").replace('"', '\\"') net_conf = ( "network={\n" f' ssid="{esc}"\n' " key_mgmt=NONE\n" "}\n" ) _write_text( conf_path, "ctrl_interface=/var/run/wpa_supplicant\n" "update_config=1\n\n" + net_conf, ) except Exception as e: return False, str(e) try: _write_text(ssid_file, ssid) _write_text(pass_file, password) except Exception as e: return False, str(e) if restart_service: try: os.system("/etc/init.d/S30wifi restart") except Exception as e: return False, str(e) self.logger.info(f"[WIFI] persist_sta_credentials: 已写入并重启 S30wifi, ssid={ssid!r}") else: self.logger.info(f"[WIFI] persist_sta_credentials: 已写入凭证(未重启 S30wifi), ssid={ssid!r}") return True, "" def disconnect_wifi(self): """断开WiFi连接并清理资源""" if self._wifi_socket: try: self._wifi_socket.close() except Exception: pass finally: self._wifi_socket = None self._wifi_connected = False self._wifi_ip = None # ==================== WiFi 质量监测 ==================== def _get_wifi_rssi_dbm(self): """ 获取 WiFi 信号强度(dBm,越大越好;比如 -40 比 -80 好) 由于不同固件实现差异,这里做多策略兜底,失败返回 None """ # 1) 优先使用:iw dev wlan0 link # 你提供的输出示例包含:signal: -58 dBm try: out = os.popen("iw dev wlan0 link 2>/dev/null").read() if out: m = re.search(r"signal:\s*(-?\d+(?:\.\d+)?)\s*dBm", out, re.IGNORECASE) if m: v = float(m.group(1)) # 合理范围兜底 if -120.0 <= v <= 0.0: return v m2 = re.search(r"signal:\s*(-?\d+(?:\.\d+)?)", out, re.IGNORECASE) if m2: v = float(m2.group(1)) if -120.0 <= v <= 0.0: return v except Exception: pass # 2) 兜底:iwconfig try: out = os.popen("iwconfig wlan0 2>/dev/null").read() m = re.search(r"Signal level[=:]\s*(-?\d+(?:\.\d+)?)\s*dBm", out, re.IGNORECASE) if m: v = float(m.group(1)) if -120.0 <= v <= 0.0: return v m2 = re.search(r"Signal level[=:]\s*(-?\d+(?:\.\d+)?)", out, re.IGNORECASE) if m2: v = float(m2.group(1)) if -120.0 <= v <= 0.0: return v except Exception: pass return None def _measure_wifi_tcp_rtt_ms(self, host, port, samples=3, per_sample_timeout_ms=900): """ 测量:在当前 WiFi 下,TCP 建连耗时(RTT 的近似) Args: host: 目标主机 port: 目标端口 samples: 采样次数 per_sample_timeout_ms: 每次采样超时时间(毫秒) Returns: (median_rtt_ms, reachable_bool) """ rtts = [] reachable = False addr = None # 先解析一次地址,避免每次样本都做 DNS try: addr_info = socket.getaddrinfo(host, port)[0] addr = (addr_info[0], addr_info[1], addr_info[2], addr_info[-1]) except Exception: return float("inf"), False for _ in range(max(1, int(samples or 1))): s = None try: s = socket.socket(addr[0], addr[1], addr[2]) s.settimeout(max(0.1, float(per_sample_timeout_ms) / 1000.0)) t0 = time.ticks_ms() s.connect(addr[-1]) elapsed_ms = abs(time.ticks_diff(time.ticks_ms(), t0)) rtts.append(float(elapsed_ms)) reachable = True except Exception: # 单个样本失败不影响整体,只要有成功样本就继续 pass finally: try: if s: s.close() except Exception: pass # 小间隔,避免过度占用 try: time.sleep_ms(100) except Exception: pass if not rtts: return float("inf"), False rtts_sorted = sorted(rtts) mid = len(rtts_sorted) // 2 if len(rtts_sorted) % 2 == 1: median = rtts_sorted[mid] else: median = (rtts_sorted[mid - 1] + rtts_sorted[mid]) / 2.0 return median, reachable def _is_wifi_quality_bad(self, wifi_rtt_ms, wifi_rssi_dbm): """ 综合判断 WiFi 质量是否差: - RTT中位数超过阈值 -> bad - 若启用 RSSI:信号弱(RSSI更差于阈值) 且 RTT 也偏高 -> bad """ if wifi_rtt_ms >= config.WIFI_QUALITY_RTT_BAD_MS: return True if not getattr(config, "WIFI_QUALITY_USE_RSSI", False): return False if wifi_rssi_dbm is None: return False # "rtt_warn + rssi_bad" 联合条件 if wifi_rtt_ms >= config.WIFI_QUALITY_RTT_WARN_MS and wifi_rssi_dbm <= config.WIFI_QUALITY_RSSI_BAD_DBM: return True return False def get_wifi_quality_status(self): """ 获取当前 WiFi 质量状态(用于调试或显示) Returns: dict: {"rtt_ms": float, "rssi_dbm": float, "is_bad": bool} """ rtt = self._last_wifi_rtt_ms rssi = self._last_wifi_rssi_dbm is_bad = False if rtt is not None and rtt != float("inf"): is_bad = self._is_wifi_quality_bad(rtt, rssi) return { "rtt_ms": rtt if rtt is not None and rtt != float("inf") else None, "rssi_dbm": rssi, "is_bad": is_bad } # ==================== 后台质量监测线程 ==================== def start_quality_monitor(self, network_type_callback, on_poor_quality_callback): """ 启动 WiFi 质量后台监测线程(每 5 秒测量一次 RTT 和 RSSI) 只在 WiFi 连接时运行,不影响业务发送性能 Args: network_type_callback: 获取当前网络类型的回调函数 on_poor_quality_callback: WiFi质量差时的回调函数 """ if self._wifi_quality_monitor_thread is not None: self.logger.warning("[WiFi Monitor] 监测线程已在运行") return self._network_type_callback = network_type_callback self._on_poor_quality_callback = on_poor_quality_callback self._wifi_quality_stop_event.clear() self._wifi_quality_monitor_thread = threading.Thread( target=self._quality_monitor_loop, daemon=True, name="wifi_quality_monitor" ) self._wifi_quality_monitor_thread.start() self.logger.info("[WiFi Monitor] 已启动后台监测线程") def stop_quality_monitor(self): """停止 WiFi 质量监测线程""" if self._wifi_quality_monitor_thread is None: return self._wifi_quality_stop_event.set() try: self._wifi_quality_monitor_thread.join(timeout=2.0) except Exception as e: self.logger.error(f"[WiFi Monitor] 停止线程失败:{e}") finally: self._wifi_quality_monitor_thread = None self.logger.info("[WiFi Monitor] 已停止后台监测线程") def _quality_monitor_loop(self): """ WiFi 质量监测循环(后台线程) 每 5 秒测量一次 RTT 和 RSSI,发现质量差则触发切换 """ while not self._wifi_quality_stop_event.is_set(): try: # 只在 WiFi 连接时才测量 network_type = self._network_type_callback() if network_type == "wifi" and self._wifi_socket: # # 测量 RTT(1 个样本,快速测量) # rtt_ms, reachable = self._measure_wifi_tcp_rtt_ms( # self._server_ip, self._server_port, # samples=1, per_sample_timeout_ms=600 # ) # 获取 RSSI rssi_dbm = self._get_wifi_rssi_dbm() # 更新缓存 # 不使用 RTT 测量 rtt_ms = 0 reachable = True self._last_wifi_rtt_ms = rtt_ms if reachable else None self._last_wifi_rssi_dbm = rssi_dbm self.logger.debug(f"[WiFi Monitor] - RTT={rtt_ms:.0f}ms, RSSI={rssi_dbm:.0f}dBm") # 判断质量是否差(切换前做 2 次快速复测,防止瞬时抖动) def _is_bad_now(_reachable, _rtt, _rssi): if (not _reachable) or (_rtt is None) or (_rtt == float("inf")): return True return self._is_wifi_quality_bad(_rtt, _rssi) bad = _is_bad_now(reachable, rtt_ms, rssi_dbm) if bad: self.logger.warning("[WiFi Monitor] 质量差,切换前快速重试 2 次(每次间隔1秒)") for retry_idx in range(2): time.sleep_ms(1000) # 不使用 RTT 测量 rtt2 = 0 reachable2 = True # rtt2, reachable2 = self._measure_wifi_tcp_rtt_ms( # self._server_ip, self._server_port, # samples=1, per_sample_timeout_ms=600 # ) rssi2 = self._get_wifi_rssi_dbm() # 更新缓存,便于外部查看最新状态 self._last_wifi_rtt_ms = rtt2 if reachable2 else None self._last_wifi_rssi_dbm = rssi2 bad2 = _is_bad_now(reachable2, rtt2, rssi2) try: self.logger.info( f"[WiFi Monitor] 复测{retry_idx+1}/2: reachable={reachable2}, " f"rtt={rtt2 if rtt2 != float('inf') else -1:.0f}ms, rssi={rssi2}, bad={bad2}" ) except Exception: pass if not bad2: self.logger.info("[WiFi Monitor] 复测恢复正常,继续保留 WiFi(不切换)") bad = False break if bad: self.logger.warning("[WiFi Monitor] 复测仍差/不通,尝试切换到 4G") self._on_poor_quality_callback() # 休眠 5 秒 time.sleep(5) except Exception as e: self.logger.error(f"[WiFi Monitor] 监测异常:{e}") # 异常后继续循环,避免线程退出 continue # 全局 WiFi 管理器实例 wifi_manager = WiFiManager() # ==================== 兼容旧接口的函数 ==================== def is_wifi_connected(): """尽量判断当前是否有 Wi-Fi(有则走 Wi-Fi OTA,否则走 4G OTA)""" return wifi_manager.is_wifi_connected() def connect_wifi(ssid, password, verify_callback=None, persist=True, timeout_s=20): """ 连接 Wi-Fi 并将凭证持久化保存到 /boot/ 目录, 以便设备重启后自动连接。 Args: ssid: WiFi SSID password: WiFi密码 verify_callback: 验证回调函数,接收 (ip) 参数,返回 (success: bool, error: str) persist: 是否持久化保存 timeout_s: 超时时间(秒) Returns: (ip, error): IP地址和错误信息(成功时error为None) """ return wifi_manager.connect_wifi(ssid, password, verify_callback, persist, timeout_s)