Compare commits

...

2 Commits

Author SHA1 Message Date
gcw_4spBpAfv
ead2060ab3 wifi config while no 4g and wifi 2026-04-07 17:29:24 +08:00
gcw_4spBpAfv
bdc3254ed2 fix wifi 2 pkg issue 2026-04-03 15:40:07 +08:00
7 changed files with 932 additions and 255 deletions

View File

@@ -24,6 +24,17 @@ WIFI_QUALITY_RTT_WARN_MS = 350.0 # 与 RSSI 联合:超过此值且信号弱
WIFI_QUALITY_RSSI_BAD_DBM = -80.0 # 低于此 dBm更负更差视为信号弱
WIFI_QUALITY_USE_RSSI = True # 是否把 RSSI 纳入综合判定False 则仅看 RTT
# WiFi 热点配网(手机连设备 AP浏览器提交路由器 SSID/密码;仅 GET/POST标准库 socket
WIFI_CONFIG_AP_FALLBACK = True # # WiFi 配网失败时,是否退回热点模式,并等待重新配网
WIFI_AP_FALLBACK_WAIT_SEC = 5 # 等待5秒后再检测STA/4G
WIFI_CONFIG_AP_TIMEOUT = 5 # 热点模式超时时间(秒)
WIFI_CONFIG_AP_ENABLED = True # True=启动时开热点并起迷你 HTTP 配网服务
WIFI_CONFIG_AP_SSID = "ArcherySetup" # 设备发出的热点名称
WIFI_CONFIG_AP_PASSWORD = "12345678" # 热点密码WPA2 通常至少 8 位)
WIFI_CONFIG_HTTP_HOST = "0.0.0.0" # HTTP 监听地址
WIFI_CONFIG_HTTP_PORT = 8080 # 默认 8080避免占用 80 需 root
WIFI_CONFIG_AP_IP = "192.168.66.1" # 与 MaixPy Wifi.start_ap 默认一致,手机访问 http://192.168.66.1:8080/
# ===== TCP over SSL(TLS) 配置 =====
USE_TCP_SSL = False # True=按手册走 MSSLCFG/MIPCFG 绑定 SSL
TCP_LINK_ID = 2 #
@@ -124,7 +135,7 @@ SAVE_IMAGE_ENABLED = True # 是否保存图像True=保存False=不保存
PHOTO_DIR = "/root/phot" # 照片存储目录
MAX_IMAGES = 1000
SHOW_CAMERA_PHOTO_WHILE_SHOOTING = True # 是否在拍摄时显示摄像头图像True=显示False=不显示建议在连着USB测试过程中打开
SHOW_CAMERA_PHOTO_WHILE_SHOOTING = False # 是否在拍摄时显示摄像头图像True=显示False=不显示建议在连着USB测试过程中打开
# ==================== OTA配置 ====================
MAX_BACKUPS = 5

View File

@@ -33,3 +33,6 @@ cat /dev/ttyS2
# 3. 发送下载命令(原窗口)
printf 'AT+MHTTPDLFILE="http://static.shelingxingqiu.com/shoot/v1/main.py","downloaded.py",5120\r\n' > /dev/ttyS2
4. wifi的启动条件在 /boot 目录下,看看是否有 wifi.sta 和 wifi.ssid wifi.pass 这些文件。其中 wifi.sta 是开关文件。
如果没有了它就不会启动wifi流程。具体的wifi流程 由 /etc/init.d/S30wifi 控制。它会判断 wifi.sta 是否存在然后是否启动wifi还是启动热点。

View File

@@ -29,8 +29,14 @@
从日志看就是开始发送登录信息之后就崩溃了。出发了底层的read failed。经过排查是一定要插上电源板的数据连线以及电源板要插上电池。这个应该是
登录时需要读电源电压数据,
3. 问题描述202609 批次的拓展版有一块maixcam的蓝灯常亮询问maixcam的人他们觉得应该是卡没有插好。但是拓展版上的激光口挡住了数据卡的出口
3. a问题描述202609 批次的拓展版有一块maixcam的蓝灯常亮询问maixcam的人他们觉得应该是卡没有插好。但是拓展版上的激光口挡住了数据卡的出口
没法拔出检查,
解决方案:需要做拓展版的公司(深链鑫创)在做好板子之后,确定系统能正常启动
4.
b问题描述2022609 批次的拓展板有一次maixcam的蓝灯亮的时候很长不会闪烁后面把sd卡插进去一点又恢复正常了初步怀疑是射箭时没有缓冲
导致了sd 卡被撞松了
4. 问题描述4G模块不可用模块的绿灯没有闪亮
解决方案有这样的一种情况就是4G模块的天线触碰到了旁边的电容导致短路所以模块启动失败。需要保证电容和天线的金属头不会触碰
5.

18
main.py
View File

@@ -115,6 +115,15 @@ def cmd_str():
# 2. 从4G模块同步系统时间需要 at_client 已初始化)
sync_system_time_from_4g()
# 2.1 WiFi 热点配网兜底:仅当 STA 与 4G 均不可用时起 AP + HTTP提交后删 /boot/wifi.ap、建 wifi.sta 并 reboot
try:
from wifi_config_httpd import maybe_start_wifi_ap_fallback
maybe_start_wifi_ap_fallback(logger)
except Exception as e:
if logger:
logger.error(f"[WIFI-AP] 兜底配网检测/启动失败: {e}")
# 2.5. 启动存图 worker 线程(队列 + worker避免主循环阻塞
start_save_shot_worker()
@@ -204,7 +213,16 @@ def cmd_str():
pass
# 6. 启动通信与校准线程
# 若已进入 AP 配网兜底(/boot/wifi.ap则不启动 TCP 主循环;等待用户配网后 reboot。
try:
if os.path.exists("/boot/wifi.ap"):
if logger:
logger.warning("[NET] 当前处于 AP 配网模式(/boot/wifi.ap 存在),跳过 TCP 主线程启动")
else:
_thread.start_new_thread(network_manager.tcp_main, ())
except Exception as e:
if logger:
logger.error(f"[NET] 启动 TCP 主线程失败: {e}")
if not config.HARDCODE_LASER_POINT:
_thread.start_new_thread(laser_calibration_worker, ())

View File

@@ -410,6 +410,57 @@ class NetworkManager:
except Exception:
return False
def get_4g_phone_number(self):
"""
读取 SIM 本机号码AT+CNUM
典型响应:+CNUM: "","+861442093407954",145
部分运营商/卡未在卡内写入号码时可能为空。
Returns:
str: 号码(含国家码,如 +86138...),失败或未写入时返回 None
"""
try:
atc = hardware_manager.at_client
if atc is None:
return None
with self.get_uart_lock():
resp = atc.send("AT+CNUM", "OK", 3000)
if not resp:
return None
# 可能多行 +CNUM取第一个非空号码
for m in re.finditer(r'\+CNUM:\s*"[^"]*"\s*,\s*"([^"]*)"', resp):
num = (m.group(1) or "").strip()
if num:
return num
return None
except Exception:
return None
def get_4g_mccid(self):
"""
读取 MCCIDAT+MCCID模组侧命令常用于 SIM/卡标识类信息)。
典型响应行:+MCCID: <值> 或 +MCCID: \"...\"
Returns:
str: 解析到的字符串;失败时返回 None
"""
try:
atc = hardware_manager.at_client
if atc is None:
return None
with self.get_uart_lock():
resp = atc.send("AT+MCCID", "OK", 3000)
if not resp or "ERROR" in resp.upper():
return None
m = re.search(r"\+MCCID:\s*(.+)", resp, re.IGNORECASE)
if not m:
return None
val = (m.group(1) or "").strip()
# 去掉行尾 OK 之前可能粘在一起的杂质:只取第一行有效内容
val = val.split("\r")[0].split("\n")[0].strip()
val = val.strip('"').strip()
return val if val else None
except Exception:
return None
def _apply_session_force_4g(self):
"""锁定本次会话为 4G直到关机期间不再回切 WiFi"""
self._session_force_4g = True
@@ -1291,42 +1342,38 @@ class NetworkManager:
continue
# 接收数据(根据网络类型选择接收方式)
item = None
# WiFi 粘包:一次 recv 可能含多条完整包;也可能缓冲里已有完整包但本轮 recv 超时为空
rx_items = []
if self._network_type == "wifi":
# WiFi接收数据
data = self.receive_tcp_data_via_wifi(timeout_ms=50)
data = self.receive_tcp_data_via_wifi(timeout_ms=5)
if data:
# 将数据添加到缓冲区
self.logger.info(f"[NET] 接收WiFi数据, {time.time()}")
wifi_manager.recv_buffer += data
# 尝试从缓冲区解析完整的数据包
while len(wifi_manager.recv_buffer) >= 12: # 至少需要12字节的头部
# 解析头部
while len(wifi_manager.recv_buffer) >= 12:
try:
body_len, msg_type, checksum = struct.unpack(">III", wifi_manager.recv_buffer[:12])
total_len = 12 + body_len
if len(wifi_manager.recv_buffer) >= total_len:
# 有完整的数据包
payload = wifi_manager.recv_buffer[:total_len]
wifi_manager.recv_buffer = wifi_manager.recv_buffer[total_len:]
item = (0, payload) # link_id=0 for WiFi
break
rx_items.append((0, payload))
else:
# 数据包不完整,等待更多数据
self.logger.info(f"[NET] 接收WiFi数据不完整, {time.time()}")
break
except:
# 解析失败,清空缓冲区
except Exception:
wifi_manager.recv_buffer = b""
self.logger.info(f"[NET] 接收WiFi数据解析失败, {time.time()}")
break
elif self._network_type == "4g":
# 4G接收数据
item = hardware_manager.at_client.pop_tcp_payload()
if item:
rx_items.append(item)
_rx_login_fail = False
_rx_skip_tcp_iteration = False
if rx_items:
self.logger.info(f"total {len(rx_items)} items")
for item in rx_items:
if isinstance(item, tuple) and len(item) == 2:
link_id, payload = item
else:
@@ -1362,6 +1409,7 @@ class NetworkManager:
except Exception as e:
self.logger.error(f"[OTA] ota_ok 上报失败: {e}")
else:
_rx_login_fail = True
break
# 处理心跳 ACK
@@ -1435,12 +1483,14 @@ class NetworkManager:
if not ota_url:
self.logger.error("ota missing_url")
self.safe_enqueue({"result": "missing_url"}, 2)
continue
_rx_skip_tcp_iteration = True
break
from ota_manager import ota_manager
if ota_manager.update_thread_started:
self.safe_enqueue({"result": "update_already_started"}, 2)
continue
_rx_skip_tcp_iteration = True
break
# 自动判断模式如果没有明确指定根据WiFi连接状态和凭证决定
if mode not in ("4g", "wifi"):
@@ -1478,6 +1528,14 @@ class NetworkManager:
except:
ip = "error_getting_ip"
self.safe_enqueue({"result": "current_ip", "ip": ip}, 2)
elif inner_cmd == 44: # 读 4G 本机号码AT+CNUM
cnum = self.get_4g_phone_number()
self.logger.info(f"4G 本机号码: {cnum}")
self.safe_enqueue({"result": "cnum", "number": cnum if cnum is not None else ""}, 2)
elif inner_cmd == 45: # 读 MCCIDAT+MCCID
mccid = self.get_4g_mccid()
self.logger.info(f"4G MCCID: {mccid}")
self.safe_enqueue({"result": "mccid", "mccid": mccid if mccid is not None else ""}, 2)
# elif inner_cmd == 7:
# from ota_manager import ota_manager
# if ota_manager.update_thread_started:
@@ -1550,6 +1608,10 @@ class NetworkManager:
self.logger.info(f"[NET] body={body}, {time.time()}")
else:
self.logger.info(f"[NET] 未知数据 {body}, {time.time()}")
if _rx_login_fail:
break
if _rx_skip_tcp_iteration:
continue
else:
time.sleep_ms(5)

80
wifi.py
View File

@@ -116,8 +116,28 @@ class WiFiManager:
# ==================== WiFi 连接方法 ====================
def is_sta_associated(self):
"""
是否作为 STA 已关联到上游 AP用于与 AP 模式区分AP 模式下 wlan0 可能有 IP 但 iw link 为 Not connected
"""
try:
out = os.popen("iw dev wlan0 link 2>/dev/null").read()
if not out.strip():
return False
if "Not connected" in out:
return False
return "Connected to" in out
except Exception:
return False
def is_wifi_connected(self):
"""检查WiFi是否已连接"""
# AP 模式下 wlan0 也可能有 IP如 192.168.66.1),但这不代表已作为 STA 连上路由器。
# 业务侧(选网/TCP只应在 STA 已关联到上游 AP 时认为 WiFi 可用。
if not self.is_sta_associated():
self._wifi_connected = False
return False
# 优先用 MaixPy network如果可用
try:
from maix import network
@@ -272,6 +292,66 @@ class WiFiManager:
self.logger.error(f"[WIFI] 连接/验证失败,已回滚: {e}")
return None, str(e)
def persist_sta_credentials(self, ssid: str, password: str, restart_service: bool = True):
"""
仅写入 STA 凭证(/etc/wpa_supplicant.conf + /boot/wifi.ssid|pass
可选是否立即 /etc/init.d/S30wifi restart。
不做可达性验证。用于热点配网页提交后切换到连接指定路由器。
password 为空时按开放网络key_mgmt=NONE写入。
Returns:
(ok: bool, err_msg: str)
"""
ssid = (ssid or "").strip()
password = (password or "").strip()
if not ssid:
return False, "SSID 为空"
conf_path = "/etc/wpa_supplicant.conf"
ssid_file = "/boot/wifi.ssid"
pass_file = "/boot/wifi.pass"
def _write_text(path: str, content: str):
with open(path, "w", encoding="utf-8") as f:
f.write(content)
try:
if password:
net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read()
if "network={" not in net_conf:
return False, "wpa_passphrase 失败"
else:
esc = ssid.replace("\\", "\\\\").replace('"', '\\"')
net_conf = (
"network={\n"
f' ssid="{esc}"\n'
" key_mgmt=NONE\n"
"}\n"
)
_write_text(
conf_path,
"ctrl_interface=/var/run/wpa_supplicant\n"
"update_config=1\n\n"
+ net_conf,
)
except Exception as e:
return False, str(e)
try:
_write_text(ssid_file, ssid)
_write_text(pass_file, password)
except Exception as e:
return False, str(e)
if restart_service:
try:
os.system("/etc/init.d/S30wifi restart")
except Exception as e:
return False, str(e)
self.logger.info(f"[WIFI] persist_sta_credentials: 已写入并重启 S30wifi, ssid={ssid!r}")
else:
self.logger.info(f"[WIFI] persist_sta_credentials: 已写入凭证(未重启 S30wifi, ssid={ssid!r}")
return True, ""
def disconnect_wifi(self):
"""断开WiFi连接并清理资源"""
if self._wifi_socket:

497
wifi_config_httpd.py Normal file
View File

@@ -0,0 +1,497 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
WiFi 热点配网:迷你 HTTP 服务器(仅 GET/POST标准库 socket独立线程运行。
策略(与 /etc/init.d/S30wifi 一致):
- 仅当 STA 未连上 WiFi 且 4G 也不可用时,写入 /boot/wifi.ap、去掉 /boot/wifi.sta
并重启 S30wifi 由系统起热点;再在本进程起 HTTP。
- 用户 POST 提交路由器 SSID/密码后仅写凭证、stop S30wifi、删 /boot/wifi.ap、建 /boot/wifi.sta、sync、reboot。
"""
import html
import os
import socket
import threading
import time as std_time
from urllib.parse import parse_qs
import config
from logger_manager import logger_manager
from wifi import wifi_manager
_http_thread = None
_http_stop = threading.Event()
def _http_response(status, body_bytes, content_type="text/html; charset=utf-8"):
head = (
f"HTTP/1.1 {status}\r\n"
f"Content-Type: {content_type}\r\n"
f"Content-Length: {len(body_bytes)}\r\n"
f"Connection: close\r\n"
f"\r\n"
).encode("utf-8")
return head + body_bytes
def _read_http_request(conn, max_total=65536):
"""返回 (method, path, headers_str, body_bytes) 或 None。"""
buf = b""
while b"\r\n\r\n" not in buf and len(buf) < max_total:
chunk = conn.recv(4096)
if not chunk:
break
buf += chunk
if b"\r\n\r\n" not in buf:
return None
idx = buf.index(b"\r\n\r\n")
header_bytes = buf[:idx]
rest = buf[idx + 4 :]
try:
headers_str = header_bytes.decode("utf-8", errors="replace")
except Exception:
headers_str = ""
lines = headers_str.split("\r\n")
if not lines:
return None
parts = lines[0].split()
method = parts[0] if parts else "GET"
path = parts[1] if len(parts) > 1 else "/"
content_length = 0
for line in lines[1:]:
if line.lower().startswith("content-length:"):
try:
content_length = int(line.split(":", 1)[1].strip())
except Exception:
content_length = 0
break
body = rest
while content_length > 0 and len(body) < content_length and len(body) < max_total:
chunk = conn.recv(4096)
if not chunk:
break
body += chunk
body = body[:content_length]
return method, path, headers_str, body
def _page_form(msg_html=""):
# 页面展示的热点名:以 /boot/wifi.ssid 为准(与实际 AP 保持一致)
try:
if os.path.exists("/boot/wifi.ssid"):
with open("/boot/wifi.ssid", "r", encoding="utf-8") as f:
_ssid = f.read().strip()
else:
_ssid = ""
except Exception:
_ssid = ""
ap_ssid = html.escape(_ssid or getattr(config, "WIFI_CONFIG_AP_SSID", "ArcherySetup"))
port = int(getattr(config, "WIFI_CONFIG_HTTP_PORT", 8080))
ap_ip = html.escape(getattr(config, "WIFI_CONFIG_AP_IP", "192.168.66.1"))
body = f"""<!DOCTYPE html>
<html><head><meta charset="utf-8"/><meta name="viewport" content="width=device-width,initial-scale=1"/>
<title>WiFi 配网</title></head><body>
<h1>WiFi 配网</h1>
<p>热点:<b>{ap_ssid}</b> · 端口 <b>{port}</b></p>
<p>请填写要连接的<b>路由器</b> SSID 与密码(用于 STA 上网,不是热点密码)。提交后将关闭热点、保存并<b>重启设备</b>。</p>
{msg_html}
<form method="POST" action="/" accept-charset="utf-8">
<p>SSID<br/><input name="ssid" type="text" style="width:100%;max-width:320px" required/></p>
<p>密码(开放网络可留空)<br/><input name="password" type="password" style="width:100%;max-width:320px"/></p>
<p><button type="submit">保存并重启</button></p>
</form>
<p style="color:#666;font-size:12px">提示:提交后设备会重启;请手机改连路由器 WiFi。</p>
</body></html>"""
return body.encode("utf-8")
def _apply_sta_and_reboot(router_ssid: str, router_password: str):
"""
写路由器 STA 凭证 -> 停 WiFi 服务 -> 删 /boot/wifi.ap -> 建 /boot/wifi.sta -> sync -> reboot
"""
logger = logger_manager.logger
ok, err = wifi_manager.persist_sta_credentials(router_ssid, router_password, restart_service=False)
if not ok:
return False, err
try:
os.system("/etc/init.d/S30wifi stop")
except Exception as e:
logger.warning(f"[WIFI-AP] S30wifi stop: {e}")
ap_flag = "/boot/wifi.ap"
sta_flag = "/boot/wifi.sta"
try:
if os.path.exists(ap_flag):
os.remove(ap_flag)
except Exception as e:
return False, f"删除 {ap_flag} 失败: {e}"
try:
with open(sta_flag, "w", encoding="utf-8") as f:
f.write("")
except Exception as e:
return False, f"创建 {sta_flag} 失败: {e}"
try:
os.system("sync")
except Exception:
pass
logger.info("[WIFI-AP] 已切换为 STA 标志并准备 reboot")
try:
os.system("reboot")
except Exception as e:
return False, f"reboot 调用失败: {e}"
return True, ""
def _handle_client(conn, addr):
logger = logger_manager.logger
try:
conn.settimeout(30.0)
req = _read_http_request(conn)
if not req:
conn.sendall(_http_response("400 Bad Request", b"Bad Request"))
return
method, path, _headers, body = req
path = path.split("?", 1)[0]
if method == "GET" and path in ("/", "/index.html"):
conn.sendall(_http_response("200 OK", _page_form()))
return
if method == "POST" and path in ("/", "/index.html"):
try:
qs = body.decode("utf-8", errors="replace")
except Exception:
qs = ""
fields = parse_qs(qs, keep_blank_values=True)
ssid = (fields.get("ssid") or [""])[0].strip()
password = (fields.get("password") or [""])[0]
ok, err = _apply_sta_and_reboot(ssid, password)
if ok:
msg = '<p style="color:green"><b>已保存,设备正在重启…</b></p>'
else:
msg = f'<p style="color:red"><b>失败:</b>{html.escape(err)}</p>'
conn.sendall(_http_response("200 OK", _page_form(msg)))
return
if method == "GET" and path == "/favicon.ico":
conn.sendall(_http_response("204 No Content", b""))
return
conn.sendall(_http_response("404 Not Found", b"Not Found"))
except Exception as e:
try:
logger.error(f"[WIFI-HTTP] 处理请求异常 {addr}: {e}")
except Exception:
pass
finally:
try:
conn.close()
except Exception:
pass
def _serve_loop(host, port):
logger = logger_manager.logger
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.bind((host, port))
srv.listen(5)
srv.settimeout(1.0)
logger.info(f"[WIFI-HTTP] 监听 {host}:{port}")
except Exception as e:
logger.error(f"[WIFI-HTTP] bind 失败: {e}")
try:
srv.close()
except Exception:
pass
return
while not _http_stop.is_set():
try:
conn, addr = srv.accept()
except socket.timeout:
continue
except Exception as e:
if _http_stop.is_set():
break
logger.warning(f"[WIFI-HTTP] accept: {e}")
continue
t = threading.Thread(target=_handle_client, args=(conn, addr), daemon=True)
t.start()
try:
srv.close()
except Exception:
pass
logger.info("[WIFI-HTTP] 服务已停止")
def _ensure_hostapd_ssid(ssid: str, logger=None) -> bool:
"""
某些固件会把 SSID 写到 /etc/hostapd.conf 或 /boot/hostapd.conf。
为避免只改 /boot/wifi.ssid 不生效,这里同步更新已存在的 hostapd.conf。
Returns:
bool: 任一文件被修改则 True
"""
if logger is None:
logger = logger_manager.logger
if not ssid:
return False
changed_any = False
for conf_path in ("/etc/hostapd.conf", "/boot/hostapd.conf"):
try:
if not os.path.exists(conf_path):
continue
with open(conf_path, "r", encoding="utf-8") as f:
lines = f.read().splitlines()
except Exception:
continue
changed = False
out = []
seen = False
for ln in lines:
s = ln.strip()
if s.lower().startswith("ssid="):
seen = True
cur = s.split("=", 1)[1].strip()
if cur != ssid:
out.append(f"ssid={ssid}")
changed = True
else:
out.append(ln)
else:
out.append(ln)
if not seen:
out.append(f"ssid={ssid}")
changed = True
if changed:
try:
with open(conf_path, "w", encoding="utf-8") as f:
f.write("\n".join(out).rstrip() + "\n")
changed_any = True
except Exception as e:
if logger:
logger.warning(f"[WIFI-AP] 写入 {conf_path} 失败: {e}")
if changed_any and logger:
logger.info(f"[WIFI-AP] 已同步热点 SSID 到 hostapd.conf: {ssid}")
return changed_any
def _write_boot_ap_credentials_for_s30wifi():
"""供 S30wifi AP 分支 gen_hostapd 使用的热点 SSID/密码。"""
base = (getattr(config, "WIFI_CONFIG_AP_SSID", "ArcherySetup") or "ArcherySetup").strip()
# 追加设备码,便于区分多台设备(读取 /device_key失败则不加后缀
suffix = ""
try:
with open("/device_key", "r", encoding="utf-8") as f:
dev = (f.read() or "").strip()
if dev:
s = dev
# 只保留字母数字,避免 SSID 出现不可见字符
s = "".join([c for c in s if c.isalnum()])
if s:
suffix = s
except Exception:
suffix = ""
ssid = f"{base}_{suffix}" if suffix else base
pwd = getattr(config, "WIFI_CONFIG_AP_PASSWORD", "12345678")
with open("/boot/wifi.ssid", "w", encoding="utf-8") as f:
f.write(ssid.strip())
with open("/boot/wifi.pass", "w", encoding="utf-8") as f:
f.write(pwd.strip())
try:
_ensure_hostapd_ssid(ssid.strip())
except Exception:
pass
def _ensure_hostapd_modern_security(logger=None) -> bool:
"""
确保 AP 使用较新的安全标准(至少 WPA2-PSK + CCMP
你现场验证需要的两行:
- wpa_key_mgmt=WPA-PSK
- rsn_pairwise=CCMP
Returns:
bool: 若文件被修改返回 True否则 False
"""
if logger is None:
logger = logger_manager.logger
conf_path = "/etc/hostapd.conf"
try:
if not os.path.exists(conf_path):
return False
with open(conf_path, "r", encoding="utf-8") as f:
lines = f.read().splitlines()
except Exception as e:
logger.warning(f"[WIFI-AP] 读取 hostapd.conf 失败: {e}")
return False
wanted = {
"wpa_key_mgmt": "WPA-PSK",
"rsn_pairwise": "CCMP",
}
changed = False
seen = set()
new_lines = []
for ln in lines:
s = ln.strip()
if not s or s.startswith("#") or "=" not in s:
new_lines.append(ln)
continue
k, v = s.split("=", 1)
k = k.strip()
if k in wanted:
seen.add(k)
new_v = wanted[k]
if v.strip() != new_v:
new_lines.append(f"{k}={new_v}")
changed = True
else:
new_lines.append(ln)
continue
new_lines.append(ln)
# 缺的补到末尾
for k, v in wanted.items():
if k not in seen:
new_lines.append(f"{k}={v}")
changed = True
if not changed:
return False
try:
with open(conf_path, "w", encoding="utf-8") as f:
f.write("\n".join(new_lines).rstrip() + "\n")
logger.info("[WIFI-AP] 已更新 /etc/hostapd.conf 安全参数WPA-PSK + CCMP")
return True
except Exception as e:
logger.warning(f"[WIFI-AP] 写入 hostapd.conf 失败: {e}")
return False
def _switch_boot_to_ap_mode(logger):
"""
去掉 STA 标志、建立 AP 标志,由 S30wifi 起 hostapd与 Maix start_ap 二选一,以系统脚本为准)。
"""
try:
sta = "/boot/wifi.sta"
ap = "/boot/wifi.ap"
if os.path.exists(sta):
os.remove(sta)
with open(ap, "w", encoding="utf-8") as f:
f.write("")
os.system("/etc/init.d/S30wifi restart")
# 某些固件生成的 hostapd.conf 缺少新安全参数,导致 Windows 提示“较旧的安全标准”。
# 若本次修改了 hostapd.conf则再重启一次让 hostapd 重新加载配置。
try:
if _ensure_hostapd_modern_security(logger):
os.system("/etc/init.d/S30wifi restart")
except Exception:
pass
return True
except Exception as e:
logger.error(f"[WIFI-AP] 切换 /boot 为 AP 模式失败: {e}")
return False
def start_http_server_thread():
"""仅启动 HTTP 线程(假定 AP 已由 S30wifi 拉起)。"""
global _http_thread
logger = logger_manager.logger
if _http_thread is not None and _http_thread.is_alive():
logger.warning("[WIFI-HTTP] 配网线程已在运行")
return
_http_stop.clear()
host = getattr(config, "WIFI_CONFIG_HTTP_HOST", "0.0.0.0")
port = int(getattr(config, "WIFI_CONFIG_HTTP_PORT", 8080))
_http_thread = threading.Thread(
target=_serve_loop,
args=(host, port),
daemon=True,
name="wifi_config_httpd",
)
_http_thread.start()
def maybe_start_wifi_ap_fallback(logger=None):
"""
若启用 WIFI_CONFIG_AP_FALLBACK等待若干秒后检测 STA WiFi 与 4G
仅当二者均不可用时,写热点用的 /boot/wifi.ssid|pass、切到 /boot/wifi.ap 并 restart S30wifi再启动 HTTP。
"""
if logger is None:
logger = logger_manager.logger
if not getattr(config, "WIFI_CONFIG_AP_FALLBACK", False):
return
from network import network_manager
# 先快速检测一次:若 STA 或 4G 已可用,直接返回,避免不必要的等待
wifi_ok = wifi_manager.is_sta_associated()
g4_ok = network_manager.is_4g_available()
logger.info(f"[WIFI-AP] 兜底检测(quick)sta关联={wifi_ok}, 4g={g4_ok}")
if wifi_ok or g4_ok:
logger.info("[WIFI-AP] STA 或 4G 可用,不启动热点配网")
return
# 两者均不可用:再按配置等待一段时间后复检,避免开机瞬态误判
wait_sec = int(getattr(config, "WIFI_AP_FALLBACK_WAIT_SEC", 10))
wait_sec = max(0, min(wait_sec, 120))
if wait_sec > 0:
logger.info(f"[WIFI-AP] 兜底配网:等待 {wait_sec}s 后再检测 STA/4G…")
std_time.sleep(wait_sec)
# 必须用 STA 关联判断is_wifi_connected() 在 AP 模式会因 192.168.66.1 误判为已连接
wifi_ok = wifi_manager.is_sta_associated()
g4_ok = network_manager.is_4g_available()
logger.info(f"[WIFI-AP] 兜底检测sta关联={wifi_ok}, 4g={g4_ok}")
if wifi_ok or g4_ok:
logger.info("[WIFI-AP] STA 或 4G 可用,不启动热点配网")
return
logger.warning("[WIFI-AP] STA 与 4G 均不可用,启动热点配网(/boot/wifi.ap + HTTP")
try:
_write_boot_ap_credentials_for_s30wifi()
except Exception as e:
logger.error(f"[WIFI-AP] 写热点 /boot 凭证失败: {e}")
return
if not _switch_boot_to_ap_mode(logger):
return
std_time.sleep(3)
start_http_server_thread()
p = int(getattr(config, "WIFI_CONFIG_HTTP_PORT", 8080))
ip = getattr(config, "WIFI_CONFIG_AP_IP", "192.168.66.1")
logger.info(f"[WIFI-AP] 请连接热点后访问 http://{ip}:{p}/ (若 IP 以 S30wifi 为准)")
def stop_wifi_config_http():
"""请求停止 HTTP 线程(下次 accept 超时后退出)。"""
_http_stop.set()
# 兼容旧名:不再使用「强制开 AP」逻辑统一走 maybe_start_wifi_ap_fallback
def start_wifi_config_ap_thread():
maybe_start_wifi_ap_fallback()