Files
archery/wifi.py
2026-04-03 11:24:29 +08:00

579 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
WiFi管理模块
提供WiFi连接、网络检测、质量监测等功能
"""
import os
import re
import socket
import threading
import time as std_time
from maix import time
import config
from logger_manager import logger_manager
class WiFiManager:
"""WiFi管理器单例"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(WiFiManager, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# WiFi 相关状态
self._wifi_connected = False
self._wifi_ip = None
self._wifi_socket = None
self._wifi_socket_lock = threading.Lock()
self._prefer_wifi = True # 是否优先使用 WiFi
self._recv_buffer = b"" # TCP 接收缓冲区
# WiFi 质量监测(后台线程)
self._wifi_quality_monitor_thread = None
self._wifi_quality_stop_event = threading.Event()
self._last_wifi_rtt_ms = None # 最近一次测量的 RTT
self._last_wifi_rssi_dbm = None # 最近一次测量的 RSSI
# 服务器相关(用于网络检测)
try:
import archery_netcore as _netcore
self._server_ip = _netcore.get_config().get("SERVER_IP")
self._server_port = _netcore.get_config().get("SERVER_PORT")
except Exception:
self._server_ip = getattr(config, "SERVER_IP", None)
self._server_port = getattr(config, "SERVER_PORT", None)
self._initialized = True
@property
def logger(self):
"""获取 logger 对象"""
return logger_manager.logger
@property
def wifi_connected(self):
"""WiFi是否已连接"""
return self._wifi_connected
@property
def wifi_ip(self):
"""WiFi IP地址"""
return self._wifi_ip
@property
def wifi_socket(self):
"""WiFi socket对象"""
return self._wifi_socket
@wifi_socket.setter
def wifi_socket(self, value):
"""设置WiFi socket对象"""
self._wifi_socket = value
@property
def wifi_socket_lock(self):
"""获取WiFi socket锁"""
return self._wifi_socket_lock
@property
def prefer_wifi(self):
"""是否优先使用WiFi"""
return self._prefer_wifi
@prefer_wifi.setter
def prefer_wifi(self, value):
"""设置是否优先使用WiFi"""
self._prefer_wifi = value
@property
def last_wifi_rtt_ms(self):
"""最近一次测量的RTT"""
return self._last_wifi_rtt_ms
@property
def last_wifi_rssi_dbm(self):
"""最近一次测量的RSSI"""
return self._last_wifi_rssi_dbm
@property
def recv_buffer(self):
"""TCP接收缓冲区"""
return self._recv_buffer
@recv_buffer.setter
def recv_buffer(self, value):
"""设置TCP接收缓冲区"""
self._recv_buffer = value
# ==================== WiFi 连接方法 ====================
def is_wifi_connected(self):
"""检查WiFi是否已连接"""
# 优先用 MaixPy network如果可用
try:
from maix import network
wifi = network.wifi.Wifi()
if wifi.is_connected():
self._wifi_connected = True
return True
except:
self.logger.warning("Failed to check WiFi connection using MaixPy network", exc_info=True)
# 兜底:看系统 wlan0 有没有 IP
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
if ip:
self._wifi_connected = True
self._wifi_ip = ip
return True
except:
self.logger.warning("Failed to check WiFi connection using system command", exc_info=True)
self._wifi_connected = False
return False
def connect_wifi(self, ssid, password, verify_callback=None, persist=True, timeout_s=20):
"""
连接 Wi-Fi先用新凭证尝试连接并验证可用性失败自动回滚成功后再决定是否落盘
重要:系统的 /etc/init.d/S30wifi 通常会读取 /boot/wifi.ssid 与 /boot/wifi.pass 来连接 WiFi。
因此要"真正尝试连接新 WiFi",必须临时写入 /boot/ 触发重启;若失败则把旧值写回去(回滚)。
Args:
ssid: WiFi SSID
password: WiFi密码
verify_callback: 验证回调函数,接收 (ip) 参数,返回 (success: bool, error: str)
persist: 是否持久化保存凭证
timeout_s: 连接超时时间(秒)
Returns:
(ip, error): IP地址和错误信息成功时error为None
"""
# 配置文件路径定义
conf_path = "/etc/wpa_supplicant.conf"
ssid_file = "/boot/wifi.ssid"
pass_file = "/boot/wifi.pass"
def _read_text(path: str):
try:
if os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
return f.read()
except Exception:
return None
return None
def _write_text(path: str, content: str):
with open(path, "w", encoding="utf-8") as f:
f.write(content)
def _restore_boot(old_ssid: str | None, old_pass: str | None):
# 还原 /boot 凭证:原来没有就删除,原来有就写回
try:
if old_ssid is None:
if os.path.exists(ssid_file):
os.remove(ssid_file)
else:
_write_text(ssid_file, old_ssid)
except Exception:
pass
try:
if old_pass is None:
if os.path.exists(pass_file):
os.remove(pass_file)
else:
_write_text(pass_file, old_pass)
except Exception:
pass
old_conf = _read_text(conf_path)
old_boot_ssid = _read_text(ssid_file)
old_boot_pass = _read_text(pass_file)
try:
# 生成 wpa_supplicant 配置(写 /etc 作为辅助,具体是否生效取决于 S30wifi 脚本)
net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read()
if "network={" not in net_conf:
raise RuntimeError("Failed to generate wpa config")
try:
_write_text(
conf_path,
"ctrl_interface=/var/run/wpa_supplicant\n"
"update_config=1\n\n"
+ net_conf,
)
except Exception:
# 不强制要求写 /etc 成功(某些系统只用 /boot
pass
# ====== 临时写入 /boot 凭证,触发 WiFi 服务真正尝试连接新 SSID ======
_write_text(ssid_file, ssid.strip())
_write_text(pass_file, password.strip())
# 重启 Wi-Fi 服务
os.system("/etc/init.d/S30wifi restart")
# 等待获取 IP
wait_s = int(timeout_s) if timeout_s and timeout_s > 0 else 20
wait_s = min(max(wait_s, 5), 60)
for _ in range(wait_s):
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
if ip:
# 拿到 IP 不代表可上网/可访问目标;继续做可达性验证
self._wifi_connected = True
self._wifi_ip = ip
self.logger.info(f"[WIFI] 已连接IP: {ip},开始验证网络可用性...")
# 验证能访问指定目标(通过回调函数)
if verify_callback:
success, error = verify_callback(ip)
if not success:
raise RuntimeError(error or "Verification failed")
# ====== 验证通过 ======
if not persist:
# 不持久化:把 /boot 恢复成旧值(不重启,当前连接保持不变)
_restore_boot(old_boot_ssid, old_boot_pass)
self.logger.info("[WIFI] 网络验证通过,但按 persist=False 回滚 /boot 凭证(不重启)")
else:
self.logger.info("[WIFI] 网络验证通过,/boot 凭证已保留(持久化)")
return ip, None
std_time.sleep(1)
raise RuntimeError("Timeout: No IP obtained")
except Exception as e:
# 失败:回滚 /boot 和 /etc重启 WiFi 恢复旧网络
_restore_boot(old_boot_ssid, old_boot_pass)
try:
if old_conf is not None:
_write_text(conf_path, old_conf)
except Exception:
pass
try:
os.system("/etc/init.d/S30wifi restart")
except Exception:
pass
self._wifi_connected = False
self._wifi_ip = None
self.logger.error(f"[WIFI] 连接/验证失败,已回滚: {e}")
return None, str(e)
def disconnect_wifi(self):
"""断开WiFi连接并清理资源"""
if self._wifi_socket:
try:
self._wifi_socket.close()
except Exception:
pass
finally:
self._wifi_socket = None
self._wifi_connected = False
self._wifi_ip = None
# ==================== WiFi 质量监测 ====================
def _get_wifi_rssi_dbm(self):
"""
获取 WiFi 信号强度dBm越大越好比如 -40 比 -80 好)
由于不同固件实现差异,这里做多策略兜底,失败返回 None
"""
# 1) 优先使用iw dev wlan0 link
# 你提供的输出示例包含signal: -58 dBm
try:
out = os.popen("iw dev wlan0 link 2>/dev/null").read()
if out:
m = re.search(r"signal:\s*(-?\d+(?:\.\d+)?)\s*dBm", out, re.IGNORECASE)
if m:
v = float(m.group(1))
# 合理范围兜底
if -120.0 <= v <= 0.0:
return v
m2 = re.search(r"signal:\s*(-?\d+(?:\.\d+)?)", out, re.IGNORECASE)
if m2:
v = float(m2.group(1))
if -120.0 <= v <= 0.0:
return v
except Exception:
pass
# 2) 兜底iwconfig
try:
out = os.popen("iwconfig wlan0 2>/dev/null").read()
m = re.search(r"Signal level[=:]\s*(-?\d+(?:\.\d+)?)\s*dBm", out, re.IGNORECASE)
if m:
v = float(m.group(1))
if -120.0 <= v <= 0.0:
return v
m2 = re.search(r"Signal level[=:]\s*(-?\d+(?:\.\d+)?)", out, re.IGNORECASE)
if m2:
v = float(m2.group(1))
if -120.0 <= v <= 0.0:
return v
except Exception:
pass
return None
def _measure_wifi_tcp_rtt_ms(self, host, port, samples=3, per_sample_timeout_ms=900):
"""
测量:在当前 WiFi 下TCP 建连耗时RTT 的近似)
Args:
host: 目标主机
port: 目标端口
samples: 采样次数
per_sample_timeout_ms: 每次采样超时时间(毫秒)
Returns:
(median_rtt_ms, reachable_bool)
"""
rtts = []
reachable = False
addr = None
# 先解析一次地址,避免每次样本都做 DNS
try:
addr_info = socket.getaddrinfo(host, port)[0]
addr = (addr_info[0], addr_info[1], addr_info[2], addr_info[-1])
except Exception:
return float("inf"), False
for _ in range(max(1, int(samples or 1))):
s = None
try:
s = socket.socket(addr[0], addr[1], addr[2])
s.settimeout(max(0.1, float(per_sample_timeout_ms) / 1000.0))
t0 = time.ticks_ms()
s.connect(addr[-1])
elapsed_ms = abs(time.ticks_diff(time.ticks_ms(), t0))
rtts.append(float(elapsed_ms))
reachable = True
except Exception:
# 单个样本失败不影响整体,只要有成功样本就继续
pass
finally:
try:
if s:
s.close()
except Exception:
pass
# 小间隔,避免过度占用
try:
time.sleep_ms(100)
except Exception:
pass
if not rtts:
return float("inf"), False
rtts_sorted = sorted(rtts)
mid = len(rtts_sorted) // 2
if len(rtts_sorted) % 2 == 1:
median = rtts_sorted[mid]
else:
median = (rtts_sorted[mid - 1] + rtts_sorted[mid]) / 2.0
return median, reachable
def _is_wifi_quality_bad(self, wifi_rtt_ms, wifi_rssi_dbm):
"""
综合判断 WiFi 质量是否差:
- RTT中位数超过阈值 -> bad
- 若启用 RSSI信号弱(RSSI更差于阈值) 且 RTT 也偏高 -> bad
"""
if wifi_rtt_ms >= config.WIFI_QUALITY_RTT_BAD_MS:
return True
if not getattr(config, "WIFI_QUALITY_USE_RSSI", False):
return False
if wifi_rssi_dbm is None:
return False
# "rtt_warn + rssi_bad" 联合条件
if wifi_rtt_ms >= config.WIFI_QUALITY_RTT_WARN_MS and wifi_rssi_dbm <= config.WIFI_QUALITY_RSSI_BAD_DBM:
return True
return False
def get_wifi_quality_status(self):
"""
获取当前 WiFi 质量状态(用于调试或显示)
Returns:
dict: {"rtt_ms": float, "rssi_dbm": float, "is_bad": bool}
"""
rtt = self._last_wifi_rtt_ms
rssi = self._last_wifi_rssi_dbm
is_bad = False
if rtt is not None and rtt != float("inf"):
is_bad = self._is_wifi_quality_bad(rtt, rssi)
return {
"rtt_ms": rtt if rtt is not None and rtt != float("inf") else None,
"rssi_dbm": rssi,
"is_bad": is_bad
}
# ==================== 后台质量监测线程 ====================
def start_quality_monitor(self, network_type_callback, on_poor_quality_callback):
"""
启动 WiFi 质量后台监测线程(每 5 秒测量一次 RTT 和 RSSI
只在 WiFi 连接时运行,不影响业务发送性能
Args:
network_type_callback: 获取当前网络类型的回调函数
on_poor_quality_callback: WiFi质量差时的回调函数
"""
if self._wifi_quality_monitor_thread is not None:
self.logger.warning("[WiFi Monitor] 监测线程已在运行")
return
self._network_type_callback = network_type_callback
self._on_poor_quality_callback = on_poor_quality_callback
self._wifi_quality_stop_event.clear()
self._wifi_quality_monitor_thread = threading.Thread(
target=self._quality_monitor_loop,
daemon=True,
name="wifi_quality_monitor"
)
self._wifi_quality_monitor_thread.start()
self.logger.info("[WiFi Monitor] 已启动后台监测线程")
def stop_quality_monitor(self):
"""停止 WiFi 质量监测线程"""
if self._wifi_quality_monitor_thread is None:
return
self._wifi_quality_stop_event.set()
try:
self._wifi_quality_monitor_thread.join(timeout=2.0)
except Exception as e:
self.logger.error(f"[WiFi Monitor] 停止线程失败:{e}")
finally:
self._wifi_quality_monitor_thread = None
self.logger.info("[WiFi Monitor] 已停止后台监测线程")
def _quality_monitor_loop(self):
"""
WiFi 质量监测循环(后台线程)
每 5 秒测量一次 RTT 和 RSSI发现质量差则触发切换
"""
while not self._wifi_quality_stop_event.is_set():
try:
# 只在 WiFi 连接时才测量
network_type = self._network_type_callback()
if network_type == "wifi" and self._wifi_socket:
# # 测量 RTT1 个样本,快速测量)
# rtt_ms, reachable = self._measure_wifi_tcp_rtt_ms(
# self._server_ip, self._server_port,
# samples=1, per_sample_timeout_ms=600
# )
# 获取 RSSI
rssi_dbm = self._get_wifi_rssi_dbm()
# 更新缓存
# 不使用 RTT 测量
rtt_ms = 0
reachable = True
self._last_wifi_rtt_ms = rtt_ms if reachable else None
self._last_wifi_rssi_dbm = rssi_dbm
self.logger.debug(f"[WiFi Monitor] - RTT={rtt_ms:.0f}ms, RSSI={rssi_dbm:.0f}dBm")
# 判断质量是否差(切换前做 2 次快速复测,防止瞬时抖动)
def _is_bad_now(_reachable, _rtt, _rssi):
if (not _reachable) or (_rtt is None) or (_rtt == float("inf")):
return True
return self._is_wifi_quality_bad(_rtt, _rssi)
bad = _is_bad_now(reachable, rtt_ms, rssi_dbm)
if bad:
self.logger.warning("[WiFi Monitor] 质量差,切换前快速重试 2 次每次间隔1秒")
for retry_idx in range(2):
time.sleep_ms(1000)
# 不使用 RTT 测量
rtt2 = 0
reachable2 = True
# rtt2, reachable2 = self._measure_wifi_tcp_rtt_ms(
# self._server_ip, self._server_port,
# samples=1, per_sample_timeout_ms=600
# )
rssi2 = self._get_wifi_rssi_dbm()
# 更新缓存,便于外部查看最新状态
self._last_wifi_rtt_ms = rtt2 if reachable2 else None
self._last_wifi_rssi_dbm = rssi2
bad2 = _is_bad_now(reachable2, rtt2, rssi2)
try:
self.logger.info(
f"[WiFi Monitor] 复测{retry_idx+1}/2: reachable={reachable2}, "
f"rtt={rtt2 if rtt2 != float('inf') else -1:.0f}ms, rssi={rssi2}, bad={bad2}"
)
except Exception:
pass
if not bad2:
self.logger.info("[WiFi Monitor] 复测恢复正常,继续保留 WiFi不切换")
bad = False
break
if bad:
self.logger.warning("[WiFi Monitor] 复测仍差/不通,尝试切换到 4G")
self._on_poor_quality_callback()
# 休眠 5 秒
time.sleep(5)
except Exception as e:
self.logger.error(f"[WiFi Monitor] 监测异常:{e}")
# 异常后继续循环,避免线程退出
continue
# 全局 WiFi 管理器实例
wifi_manager = WiFiManager()
# ==================== 兼容旧接口的函数 ====================
def is_wifi_connected():
"""尽量判断当前是否有 Wi-Fi有则走 Wi-Fi OTA否则走 4G OTA"""
return wifi_manager.is_wifi_connected()
def connect_wifi(ssid, password, verify_callback=None, persist=True, timeout_s=20):
"""
连接 Wi-Fi 并将凭证持久化保存到 /boot/ 目录,
以便设备重启后自动连接。
Args:
ssid: WiFi SSID
password: WiFi密码
verify_callback: 验证回调函数,接收 (ip) 参数,返回 (success: bool, error: str)
persist: 是否持久化保存
timeout_s: 超时时间(秒)
Returns:
(ip, error): IP地址和错误信息成功时error为None
"""
return wifi_manager.connect_wifi(ssid, password, verify_callback, persist, timeout_s)