Files
archery/wifi.py

659 lines
24 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
WiFi管理模块
提供WiFi连接网络检测质量监测等功能
"""
import os
import re
import socket
import threading
import time as std_time
from maix import time
import config
from logger_manager import logger_manager
class WiFiManager:
"""WiFi管理器单例"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(WiFiManager, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# WiFi 相关状态
self._wifi_connected = False
self._wifi_ip = None
self._wifi_socket = None
self._wifi_socket_lock = threading.Lock()
self._prefer_wifi = True # 是否优先使用 WiFi
self._recv_buffer = b"" # TCP 接收缓冲区
# WiFi 质量监测(后台线程)
self._wifi_quality_monitor_thread = None
self._wifi_quality_stop_event = threading.Event()
self._last_wifi_rtt_ms = None # 最近一次测量的 RTT
self._last_wifi_rssi_dbm = None # 最近一次测量的 RSSI
# 服务器相关(用于网络检测)
try:
import archery_netcore as _netcore
self._server_ip = _netcore.get_config().get("SERVER_IP")
self._server_port = _netcore.get_config().get("SERVER_PORT")
except Exception:
self._server_ip = getattr(config, "SERVER_IP", None)
self._server_port = getattr(config, "SERVER_PORT", None)
self._initialized = True
@property
def logger(self):
"""获取 logger 对象"""
return logger_manager.logger
@property
def wifi_connected(self):
"""WiFi是否已连接"""
return self._wifi_connected
@property
def wifi_ip(self):
"""WiFi IP地址"""
return self._wifi_ip
@property
def wifi_socket(self):
"""WiFi socket对象"""
return self._wifi_socket
@wifi_socket.setter
def wifi_socket(self, value):
"""设置WiFi socket对象"""
self._wifi_socket = value
@property
def wifi_socket_lock(self):
"""获取WiFi socket锁"""
return self._wifi_socket_lock
@property
def prefer_wifi(self):
"""是否优先使用WiFi"""
return self._prefer_wifi
@prefer_wifi.setter
def prefer_wifi(self, value):
"""设置是否优先使用WiFi"""
self._prefer_wifi = value
@property
def last_wifi_rtt_ms(self):
"""最近一次测量的RTT"""
return self._last_wifi_rtt_ms
@property
def last_wifi_rssi_dbm(self):
"""最近一次测量的RSSI"""
return self._last_wifi_rssi_dbm
@property
def recv_buffer(self):
"""TCP接收缓冲区"""
return self._recv_buffer
@recv_buffer.setter
def recv_buffer(self, value):
"""设置TCP接收缓冲区"""
self._recv_buffer = value
# ==================== WiFi 连接方法 ====================
2026-04-03 15:40:07 +08:00
def is_sta_associated(self):
"""
是否作为 STA 已关联到上游 AP用于与 AP 模式区分AP 模式下 wlan0 可能有 IP iw link Not connected
"""
try:
out = os.popen("iw dev wlan0 link 2>/dev/null").read()
if not out.strip():
return False
if "Not connected" in out:
return False
return "Connected to" in out
except Exception:
return False
def is_wifi_connected(self):
"""检查WiFi是否已连接"""
2026-04-07 17:29:24 +08:00
# AP 模式下 wlan0 也可能有 IP如 192.168.66.1),但这不代表已作为 STA 连上路由器。
# 业务侧(选网/TCP只应在 STA 已关联到上游 AP 时认为 WiFi 可用。
if not self.is_sta_associated():
self._wifi_connected = False
return False
# 优先用 MaixPy network如果可用
try:
from maix import network
2026-04-03 11:24:29 +08:00
wifi = network.wifi.Wifi()
if wifi.is_connected():
self._wifi_connected = True
return True
except:
self.logger.warning("Failed to check WiFi connection using MaixPy network", exc_info=True)
# 兜底:看系统 wlan0 有没有 IP
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
if ip:
self._wifi_connected = True
self._wifi_ip = ip
return True
except:
self.logger.warning("Failed to check WiFi connection using system command", exc_info=True)
self._wifi_connected = False
return False
def connect_wifi(self, ssid, password, verify_callback=None, persist=True, timeout_s=20):
"""
连接 Wi-Fi先用新凭证尝试连接并验证可用性失败自动回滚成功后再决定是否落盘
重要系统的 /etc/init.d/S30wifi 通常会读取 /boot/wifi.ssid /boot/wifi.pass 来连接 WiFi
因此要"真正尝试连接新 WiFi"必须临时写入 /boot/ 触发重启若失败则把旧值写回去回滚
Args:
ssid: WiFi SSID
password: WiFi密码
verify_callback: 验证回调函数接收 (ip) 参数返回 (success: bool, error: str)
persist: 是否持久化保存凭证
timeout_s: 连接超时时间
Returns:
(ip, error): IP地址和错误信息成功时error为None
"""
# 配置文件路径定义
conf_path = "/etc/wpa_supplicant.conf"
ssid_file = "/boot/wifi.ssid"
pass_file = "/boot/wifi.pass"
def _read_text(path: str):
try:
if os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
return f.read()
except Exception:
return None
return None
def _write_text(path: str, content: str):
with open(path, "w", encoding="utf-8") as f:
f.write(content)
def _restore_boot(old_ssid: str | None, old_pass: str | None):
# 还原 /boot 凭证:原来没有就删除,原来有就写回
try:
if old_ssid is None:
if os.path.exists(ssid_file):
os.remove(ssid_file)
else:
_write_text(ssid_file, old_ssid)
except Exception:
pass
try:
if old_pass is None:
if os.path.exists(pass_file):
os.remove(pass_file)
else:
_write_text(pass_file, old_pass)
except Exception:
pass
old_conf = _read_text(conf_path)
old_boot_ssid = _read_text(ssid_file)
old_boot_pass = _read_text(pass_file)
try:
# 生成 wpa_supplicant 配置(写 /etc 作为辅助,具体是否生效取决于 S30wifi 脚本)
net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read()
if "network={" not in net_conf:
raise RuntimeError("Failed to generate wpa config")
try:
_write_text(
conf_path,
"ctrl_interface=/var/run/wpa_supplicant\n"
"update_config=1\n\n"
+ net_conf,
)
except Exception:
# 不强制要求写 /etc 成功(某些系统只用 /boot
pass
# ====== 临时写入 /boot 凭证,触发 WiFi 服务真正尝试连接新 SSID ======
_write_text(ssid_file, ssid.strip())
_write_text(pass_file, password.strip())
# 重启 Wi-Fi 服务
os.system("/etc/init.d/S30wifi restart")
# 等待获取 IP
wait_s = int(timeout_s) if timeout_s and timeout_s > 0 else 20
wait_s = min(max(wait_s, 5), 60)
for _ in range(wait_s):
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
if ip:
# 拿到 IP 不代表可上网/可访问目标;继续做可达性验证
self._wifi_connected = True
self._wifi_ip = ip
self.logger.info(f"[WIFI] 已连接IP: {ip},开始验证网络可用性...")
# 验证能访问指定目标(通过回调函数)
if verify_callback:
success, error = verify_callback(ip)
if not success:
raise RuntimeError(error or "Verification failed")
# ====== 验证通过 ======
if not persist:
# 不持久化:把 /boot 恢复成旧值(不重启,当前连接保持不变)
_restore_boot(old_boot_ssid, old_boot_pass)
self.logger.info("[WIFI] 网络验证通过,但按 persist=False 回滚 /boot 凭证(不重启)")
else:
self.logger.info("[WIFI] 网络验证通过,/boot 凭证已保留(持久化)")
return ip, None
std_time.sleep(1)
raise RuntimeError("Timeout: No IP obtained")
except Exception as e:
# 失败:回滚 /boot 和 /etc重启 WiFi 恢复旧网络
_restore_boot(old_boot_ssid, old_boot_pass)
try:
if old_conf is not None:
_write_text(conf_path, old_conf)
except Exception:
pass
try:
os.system("/etc/init.d/S30wifi restart")
except Exception:
pass
self._wifi_connected = False
self._wifi_ip = None
self.logger.error(f"[WIFI] 连接/验证失败,已回滚: {e}")
return None, str(e)
2026-04-03 15:40:07 +08:00
def persist_sta_credentials(self, ssid: str, password: str, restart_service: bool = True):
"""
仅写入 STA 凭证/etc/wpa_supplicant.conf + /boot/wifi.ssid|pass
可选是否立即 /etc/init.d/S30wifi restart
不做可达性验证用于热点配网页提交后切换到连接指定路由器
password 为空时按开放网络key_mgmt=NONE写入
Returns:
(ok: bool, err_msg: str)
"""
ssid = (ssid or "").strip()
password = (password or "").strip()
if not ssid:
return False, "SSID 为空"
conf_path = "/etc/wpa_supplicant.conf"
ssid_file = "/boot/wifi.ssid"
pass_file = "/boot/wifi.pass"
def _write_text(path: str, content: str):
with open(path, "w", encoding="utf-8") as f:
f.write(content)
try:
if password:
net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read()
if "network={" not in net_conf:
return False, "wpa_passphrase 失败"
else:
esc = ssid.replace("\\", "\\\\").replace('"', '\\"')
net_conf = (
"network={\n"
f' ssid="{esc}"\n'
" key_mgmt=NONE\n"
"}\n"
)
_write_text(
conf_path,
"ctrl_interface=/var/run/wpa_supplicant\n"
"update_config=1\n\n"
+ net_conf,
)
except Exception as e:
return False, str(e)
try:
_write_text(ssid_file, ssid)
_write_text(pass_file, password)
except Exception as e:
return False, str(e)
if restart_service:
try:
os.system("/etc/init.d/S30wifi restart")
except Exception as e:
return False, str(e)
self.logger.info(f"[WIFI] persist_sta_credentials: 已写入并重启 S30wifi, ssid={ssid!r}")
else:
self.logger.info(f"[WIFI] persist_sta_credentials: 已写入凭证(未重启 S30wifi, ssid={ssid!r}")
return True, ""
def disconnect_wifi(self):
"""断开WiFi连接并清理资源"""
if self._wifi_socket:
try:
self._wifi_socket.close()
except Exception:
pass
finally:
self._wifi_socket = None
self._wifi_connected = False
self._wifi_ip = None
# ==================== WiFi 质量监测 ====================
def _get_wifi_rssi_dbm(self):
"""
获取 WiFi 信号强度dBm越大越好比如 -40 -80
由于不同固件实现差异这里做多策略兜底失败返回 None
"""
# 1) 优先使用iw dev wlan0 link
# 你提供的输出示例包含signal: -58 dBm
try:
out = os.popen("iw dev wlan0 link 2>/dev/null").read()
if out:
m = re.search(r"signal:\s*(-?\d+(?:\.\d+)?)\s*dBm", out, re.IGNORECASE)
if m:
v = float(m.group(1))
# 合理范围兜底
if -120.0 <= v <= 0.0:
return v
m2 = re.search(r"signal:\s*(-?\d+(?:\.\d+)?)", out, re.IGNORECASE)
if m2:
v = float(m2.group(1))
if -120.0 <= v <= 0.0:
return v
except Exception:
pass
# 2) 兜底iwconfig
try:
out = os.popen("iwconfig wlan0 2>/dev/null").read()
m = re.search(r"Signal level[=:]\s*(-?\d+(?:\.\d+)?)\s*dBm", out, re.IGNORECASE)
if m:
v = float(m.group(1))
if -120.0 <= v <= 0.0:
return v
m2 = re.search(r"Signal level[=:]\s*(-?\d+(?:\.\d+)?)", out, re.IGNORECASE)
if m2:
v = float(m2.group(1))
if -120.0 <= v <= 0.0:
return v
except Exception:
pass
return None
def _measure_wifi_tcp_rtt_ms(self, host, port, samples=3, per_sample_timeout_ms=900):
"""
测量在当前 WiFi TCP 建连耗时RTT 的近似
Args:
host: 目标主机
port: 目标端口
samples: 采样次数
per_sample_timeout_ms: 每次采样超时时间毫秒
Returns:
(median_rtt_ms, reachable_bool)
"""
rtts = []
reachable = False
addr = None
# 先解析一次地址,避免每次样本都做 DNS
try:
addr_info = socket.getaddrinfo(host, port)[0]
addr = (addr_info[0], addr_info[1], addr_info[2], addr_info[-1])
except Exception:
return float("inf"), False
for _ in range(max(1, int(samples or 1))):
s = None
try:
s = socket.socket(addr[0], addr[1], addr[2])
s.settimeout(max(0.1, float(per_sample_timeout_ms) / 1000.0))
t0 = time.ticks_ms()
s.connect(addr[-1])
elapsed_ms = abs(time.ticks_diff(time.ticks_ms(), t0))
rtts.append(float(elapsed_ms))
reachable = True
except Exception:
# 单个样本失败不影响整体,只要有成功样本就继续
pass
finally:
try:
if s:
s.close()
except Exception:
pass
# 小间隔,避免过度占用
try:
time.sleep_ms(100)
except Exception:
pass
if not rtts:
return float("inf"), False
rtts_sorted = sorted(rtts)
mid = len(rtts_sorted) // 2
if len(rtts_sorted) % 2 == 1:
median = rtts_sorted[mid]
else:
median = (rtts_sorted[mid - 1] + rtts_sorted[mid]) / 2.0
return median, reachable
def _is_wifi_quality_bad(self, wifi_rtt_ms, wifi_rssi_dbm):
"""
综合判断 WiFi 质量是否差
- RTT中位数超过阈值 -> bad
- 若启用 RSSI信号弱(RSSI更差于阈值) RTT 也偏高 -> bad
"""
if wifi_rtt_ms >= config.WIFI_QUALITY_RTT_BAD_MS:
return True
if not getattr(config, "WIFI_QUALITY_USE_RSSI", False):
return False
if wifi_rssi_dbm is None:
return False
# "rtt_warn + rssi_bad" 联合条件
if wifi_rtt_ms >= config.WIFI_QUALITY_RTT_WARN_MS and wifi_rssi_dbm <= config.WIFI_QUALITY_RSSI_BAD_DBM:
return True
return False
def get_wifi_quality_status(self):
"""
获取当前 WiFi 质量状态用于调试或显示
Returns:
dict: {"rtt_ms": float, "rssi_dbm": float, "is_bad": bool}
"""
rtt = self._last_wifi_rtt_ms
rssi = self._last_wifi_rssi_dbm
is_bad = False
if rtt is not None and rtt != float("inf"):
is_bad = self._is_wifi_quality_bad(rtt, rssi)
return {
"rtt_ms": rtt if rtt is not None and rtt != float("inf") else None,
"rssi_dbm": rssi,
"is_bad": is_bad
}
# ==================== 后台质量监测线程 ====================
def start_quality_monitor(self, network_type_callback, on_poor_quality_callback):
"""
启动 WiFi 质量后台监测线程 5 秒测量一次 RTT RSSI
只在 WiFi 连接时运行不影响业务发送性能
Args:
network_type_callback: 获取当前网络类型的回调函数
on_poor_quality_callback: WiFi质量差时的回调函数
"""
if self._wifi_quality_monitor_thread is not None:
self.logger.warning("[WiFi Monitor] 监测线程已在运行")
return
self._network_type_callback = network_type_callback
self._on_poor_quality_callback = on_poor_quality_callback
self._wifi_quality_stop_event.clear()
self._wifi_quality_monitor_thread = threading.Thread(
target=self._quality_monitor_loop,
daemon=True,
name="wifi_quality_monitor"
)
self._wifi_quality_monitor_thread.start()
self.logger.info("[WiFi Monitor] 已启动后台监测线程")
def stop_quality_monitor(self):
"""停止 WiFi 质量监测线程"""
if self._wifi_quality_monitor_thread is None:
return
self._wifi_quality_stop_event.set()
try:
self._wifi_quality_monitor_thread.join(timeout=2.0)
except Exception as e:
self.logger.error(f"[WiFi Monitor] 停止线程失败:{e}")
finally:
self._wifi_quality_monitor_thread = None
self.logger.info("[WiFi Monitor] 已停止后台监测线程")
def _quality_monitor_loop(self):
"""
WiFi 质量监测循环后台线程
5 秒测量一次 RTT RSSI发现质量差则触发切换
"""
while not self._wifi_quality_stop_event.is_set():
try:
# 只在 WiFi 连接时才测量
network_type = self._network_type_callback()
if network_type == "wifi" and self._wifi_socket:
2026-04-03 11:24:29 +08:00
# # 测量 RTT1 个样本,快速测量)
# rtt_ms, reachable = self._measure_wifi_tcp_rtt_ms(
# self._server_ip, self._server_port,
# samples=1, per_sample_timeout_ms=600
# )
# 获取 RSSI
rssi_dbm = self._get_wifi_rssi_dbm()
# 更新缓存
2026-04-03 11:24:29 +08:00
# 不使用 RTT 测量
rtt_ms = 0
reachable = True
self._last_wifi_rtt_ms = rtt_ms if reachable else None
self._last_wifi_rssi_dbm = rssi_dbm
self.logger.debug(f"[WiFi Monitor] - RTT={rtt_ms:.0f}ms, RSSI={rssi_dbm:.0f}dBm")
# 判断质量是否差(切换前做 2 次快速复测,防止瞬时抖动)
def _is_bad_now(_reachable, _rtt, _rssi):
if (not _reachable) or (_rtt is None) or (_rtt == float("inf")):
return True
return self._is_wifi_quality_bad(_rtt, _rssi)
bad = _is_bad_now(reachable, rtt_ms, rssi_dbm)
if bad:
self.logger.warning("[WiFi Monitor] 质量差,切换前快速重试 2 次每次间隔1秒")
for retry_idx in range(2):
time.sleep_ms(1000)
2026-04-03 11:24:29 +08:00
# 不使用 RTT 测量
rtt2 = 0
reachable2 = True
# rtt2, reachable2 = self._measure_wifi_tcp_rtt_ms(
# self._server_ip, self._server_port,
# samples=1, per_sample_timeout_ms=600
# )
rssi2 = self._get_wifi_rssi_dbm()
# 更新缓存,便于外部查看最新状态
self._last_wifi_rtt_ms = rtt2 if reachable2 else None
self._last_wifi_rssi_dbm = rssi2
bad2 = _is_bad_now(reachable2, rtt2, rssi2)
try:
self.logger.info(
f"[WiFi Monitor] 复测{retry_idx+1}/2: reachable={reachable2}, "
f"rtt={rtt2 if rtt2 != float('inf') else -1:.0f}ms, rssi={rssi2}, bad={bad2}"
)
except Exception:
pass
if not bad2:
self.logger.info("[WiFi Monitor] 复测恢复正常,继续保留 WiFi不切换")
bad = False
break
if bad:
self.logger.warning("[WiFi Monitor] 复测仍差/不通,尝试切换到 4G")
self._on_poor_quality_callback()
# 休眠 5 秒
time.sleep(5)
except Exception as e:
self.logger.error(f"[WiFi Monitor] 监测异常:{e}")
# 异常后继续循环,避免线程退出
continue
# 全局 WiFi 管理器实例
wifi_manager = WiFiManager()
# ==================== 兼容旧接口的函数 ====================
def is_wifi_connected():
"""尽量判断当前是否有 Wi-Fi有则走 Wi-Fi OTA否则走 4G OTA"""
return wifi_manager.is_wifi_connected()
def connect_wifi(ssid, password, verify_callback=None, persist=True, timeout_s=20):
"""
连接 Wi-Fi 并将凭证持久化保存到 /boot/ 目录
以便设备重启后自动连接
Args:
ssid: WiFi SSID
password: WiFi密码
verify_callback: 验证回调函数接收 (ip) 参数返回 (success: bool, error: str)
persist: 是否持久化保存
timeout_s: 超时时间
Returns:
(ip, error): IP地址和错误信息成功时error为None
"""
return wifi_manager.connect_wifi(ssid, password, verify_callback, persist, timeout_s)