Files
archery/wifi_config_httpd.py
2026-04-07 17:29:24 +08:00

498 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
WiFi 热点配网:迷你 HTTP 服务器(仅 GET/POST标准库 socket独立线程运行。
策略(与 /etc/init.d/S30wifi 一致):
- 仅当 STA 未连上 WiFi 且 4G 也不可用时,写入 /boot/wifi.ap、去掉 /boot/wifi.sta
并重启 S30wifi 由系统起热点;再在本进程起 HTTP。
- 用户 POST 提交路由器 SSID/密码后仅写凭证、stop S30wifi、删 /boot/wifi.ap、建 /boot/wifi.sta、sync、reboot。
"""
import html
import os
import socket
import threading
import time as std_time
from urllib.parse import parse_qs
import config
from logger_manager import logger_manager
from wifi import wifi_manager
_http_thread = None
_http_stop = threading.Event()
def _http_response(status, body_bytes, content_type="text/html; charset=utf-8"):
head = (
f"HTTP/1.1 {status}\r\n"
f"Content-Type: {content_type}\r\n"
f"Content-Length: {len(body_bytes)}\r\n"
f"Connection: close\r\n"
f"\r\n"
).encode("utf-8")
return head + body_bytes
def _read_http_request(conn, max_total=65536):
"""返回 (method, path, headers_str, body_bytes) 或 None。"""
buf = b""
while b"\r\n\r\n" not in buf and len(buf) < max_total:
chunk = conn.recv(4096)
if not chunk:
break
buf += chunk
if b"\r\n\r\n" not in buf:
return None
idx = buf.index(b"\r\n\r\n")
header_bytes = buf[:idx]
rest = buf[idx + 4 :]
try:
headers_str = header_bytes.decode("utf-8", errors="replace")
except Exception:
headers_str = ""
lines = headers_str.split("\r\n")
if not lines:
return None
parts = lines[0].split()
method = parts[0] if parts else "GET"
path = parts[1] if len(parts) > 1 else "/"
content_length = 0
for line in lines[1:]:
if line.lower().startswith("content-length:"):
try:
content_length = int(line.split(":", 1)[1].strip())
except Exception:
content_length = 0
break
body = rest
while content_length > 0 and len(body) < content_length and len(body) < max_total:
chunk = conn.recv(4096)
if not chunk:
break
body += chunk
body = body[:content_length]
return method, path, headers_str, body
def _page_form(msg_html=""):
# 页面展示的热点名:以 /boot/wifi.ssid 为准(与实际 AP 保持一致)
try:
if os.path.exists("/boot/wifi.ssid"):
with open("/boot/wifi.ssid", "r", encoding="utf-8") as f:
_ssid = f.read().strip()
else:
_ssid = ""
except Exception:
_ssid = ""
ap_ssid = html.escape(_ssid or getattr(config, "WIFI_CONFIG_AP_SSID", "ArcherySetup"))
port = int(getattr(config, "WIFI_CONFIG_HTTP_PORT", 8080))
ap_ip = html.escape(getattr(config, "WIFI_CONFIG_AP_IP", "192.168.66.1"))
body = f"""<!DOCTYPE html>
<html><head><meta charset="utf-8"/><meta name="viewport" content="width=device-width,initial-scale=1"/>
<title>WiFi 配网</title></head><body>
<h1>WiFi 配网</h1>
<p>热点:<b>{ap_ssid}</b> · 端口 <b>{port}</b></p>
<p>请填写要连接的<b>路由器</b> SSID 与密码(用于 STA 上网,不是热点密码)。提交后将关闭热点、保存并<b>重启设备</b>。</p>
{msg_html}
<form method="POST" action="/" accept-charset="utf-8">
<p>SSID<br/><input name="ssid" type="text" style="width:100%;max-width:320px" required/></p>
<p>密码(开放网络可留空)<br/><input name="password" type="password" style="width:100%;max-width:320px"/></p>
<p><button type="submit">保存并重启</button></p>
</form>
<p style="color:#666;font-size:12px">提示:提交后设备会重启;请手机改连路由器 WiFi。</p>
</body></html>"""
return body.encode("utf-8")
def _apply_sta_and_reboot(router_ssid: str, router_password: str):
"""
写路由器 STA 凭证 -> 停 WiFi 服务 -> 删 /boot/wifi.ap -> 建 /boot/wifi.sta -> sync -> reboot
"""
logger = logger_manager.logger
ok, err = wifi_manager.persist_sta_credentials(router_ssid, router_password, restart_service=False)
if not ok:
return False, err
try:
os.system("/etc/init.d/S30wifi stop")
except Exception as e:
logger.warning(f"[WIFI-AP] S30wifi stop: {e}")
ap_flag = "/boot/wifi.ap"
sta_flag = "/boot/wifi.sta"
try:
if os.path.exists(ap_flag):
os.remove(ap_flag)
except Exception as e:
return False, f"删除 {ap_flag} 失败: {e}"
try:
with open(sta_flag, "w", encoding="utf-8") as f:
f.write("")
except Exception as e:
return False, f"创建 {sta_flag} 失败: {e}"
try:
os.system("sync")
except Exception:
pass
logger.info("[WIFI-AP] 已切换为 STA 标志并准备 reboot")
try:
os.system("reboot")
except Exception as e:
return False, f"reboot 调用失败: {e}"
return True, ""
def _handle_client(conn, addr):
logger = logger_manager.logger
try:
conn.settimeout(30.0)
req = _read_http_request(conn)
if not req:
conn.sendall(_http_response("400 Bad Request", b"Bad Request"))
return
method, path, _headers, body = req
path = path.split("?", 1)[0]
if method == "GET" and path in ("/", "/index.html"):
conn.sendall(_http_response("200 OK", _page_form()))
return
if method == "POST" and path in ("/", "/index.html"):
try:
qs = body.decode("utf-8", errors="replace")
except Exception:
qs = ""
fields = parse_qs(qs, keep_blank_values=True)
ssid = (fields.get("ssid") or [""])[0].strip()
password = (fields.get("password") or [""])[0]
ok, err = _apply_sta_and_reboot(ssid, password)
if ok:
msg = '<p style="color:green"><b>已保存,设备正在重启…</b></p>'
else:
msg = f'<p style="color:red"><b>失败:</b>{html.escape(err)}</p>'
conn.sendall(_http_response("200 OK", _page_form(msg)))
return
if method == "GET" and path == "/favicon.ico":
conn.sendall(_http_response("204 No Content", b""))
return
conn.sendall(_http_response("404 Not Found", b"Not Found"))
except Exception as e:
try:
logger.error(f"[WIFI-HTTP] 处理请求异常 {addr}: {e}")
except Exception:
pass
finally:
try:
conn.close()
except Exception:
pass
def _serve_loop(host, port):
logger = logger_manager.logger
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.bind((host, port))
srv.listen(5)
srv.settimeout(1.0)
logger.info(f"[WIFI-HTTP] 监听 {host}:{port}")
except Exception as e:
logger.error(f"[WIFI-HTTP] bind 失败: {e}")
try:
srv.close()
except Exception:
pass
return
while not _http_stop.is_set():
try:
conn, addr = srv.accept()
except socket.timeout:
continue
except Exception as e:
if _http_stop.is_set():
break
logger.warning(f"[WIFI-HTTP] accept: {e}")
continue
t = threading.Thread(target=_handle_client, args=(conn, addr), daemon=True)
t.start()
try:
srv.close()
except Exception:
pass
logger.info("[WIFI-HTTP] 服务已停止")
def _ensure_hostapd_ssid(ssid: str, logger=None) -> bool:
"""
某些固件会把 SSID 写到 /etc/hostapd.conf 或 /boot/hostapd.conf。
为避免只改 /boot/wifi.ssid 不生效,这里同步更新已存在的 hostapd.conf。
Returns:
bool: 任一文件被修改则 True
"""
if logger is None:
logger = logger_manager.logger
if not ssid:
return False
changed_any = False
for conf_path in ("/etc/hostapd.conf", "/boot/hostapd.conf"):
try:
if not os.path.exists(conf_path):
continue
with open(conf_path, "r", encoding="utf-8") as f:
lines = f.read().splitlines()
except Exception:
continue
changed = False
out = []
seen = False
for ln in lines:
s = ln.strip()
if s.lower().startswith("ssid="):
seen = True
cur = s.split("=", 1)[1].strip()
if cur != ssid:
out.append(f"ssid={ssid}")
changed = True
else:
out.append(ln)
else:
out.append(ln)
if not seen:
out.append(f"ssid={ssid}")
changed = True
if changed:
try:
with open(conf_path, "w", encoding="utf-8") as f:
f.write("\n".join(out).rstrip() + "\n")
changed_any = True
except Exception as e:
if logger:
logger.warning(f"[WIFI-AP] 写入 {conf_path} 失败: {e}")
if changed_any and logger:
logger.info(f"[WIFI-AP] 已同步热点 SSID 到 hostapd.conf: {ssid}")
return changed_any
def _write_boot_ap_credentials_for_s30wifi():
"""供 S30wifi AP 分支 gen_hostapd 使用的热点 SSID/密码。"""
base = (getattr(config, "WIFI_CONFIG_AP_SSID", "ArcherySetup") or "ArcherySetup").strip()
# 追加设备码,便于区分多台设备(读取 /device_key失败则不加后缀
suffix = ""
try:
with open("/device_key", "r", encoding="utf-8") as f:
dev = (f.read() or "").strip()
if dev:
s = dev
# 只保留字母数字,避免 SSID 出现不可见字符
s = "".join([c for c in s if c.isalnum()])
if s:
suffix = s
except Exception:
suffix = ""
ssid = f"{base}_{suffix}" if suffix else base
pwd = getattr(config, "WIFI_CONFIG_AP_PASSWORD", "12345678")
with open("/boot/wifi.ssid", "w", encoding="utf-8") as f:
f.write(ssid.strip())
with open("/boot/wifi.pass", "w", encoding="utf-8") as f:
f.write(pwd.strip())
try:
_ensure_hostapd_ssid(ssid.strip())
except Exception:
pass
def _ensure_hostapd_modern_security(logger=None) -> bool:
"""
确保 AP 使用较新的安全标准(至少 WPA2-PSK + CCMP
你现场验证需要的两行:
- wpa_key_mgmt=WPA-PSK
- rsn_pairwise=CCMP
Returns:
bool: 若文件被修改返回 True否则 False
"""
if logger is None:
logger = logger_manager.logger
conf_path = "/etc/hostapd.conf"
try:
if not os.path.exists(conf_path):
return False
with open(conf_path, "r", encoding="utf-8") as f:
lines = f.read().splitlines()
except Exception as e:
logger.warning(f"[WIFI-AP] 读取 hostapd.conf 失败: {e}")
return False
wanted = {
"wpa_key_mgmt": "WPA-PSK",
"rsn_pairwise": "CCMP",
}
changed = False
seen = set()
new_lines = []
for ln in lines:
s = ln.strip()
if not s or s.startswith("#") or "=" not in s:
new_lines.append(ln)
continue
k, v = s.split("=", 1)
k = k.strip()
if k in wanted:
seen.add(k)
new_v = wanted[k]
if v.strip() != new_v:
new_lines.append(f"{k}={new_v}")
changed = True
else:
new_lines.append(ln)
continue
new_lines.append(ln)
# 缺的补到末尾
for k, v in wanted.items():
if k not in seen:
new_lines.append(f"{k}={v}")
changed = True
if not changed:
return False
try:
with open(conf_path, "w", encoding="utf-8") as f:
f.write("\n".join(new_lines).rstrip() + "\n")
logger.info("[WIFI-AP] 已更新 /etc/hostapd.conf 安全参数WPA-PSK + CCMP")
return True
except Exception as e:
logger.warning(f"[WIFI-AP] 写入 hostapd.conf 失败: {e}")
return False
def _switch_boot_to_ap_mode(logger):
"""
去掉 STA 标志、建立 AP 标志,由 S30wifi 起 hostapd与 Maix start_ap 二选一,以系统脚本为准)。
"""
try:
sta = "/boot/wifi.sta"
ap = "/boot/wifi.ap"
if os.path.exists(sta):
os.remove(sta)
with open(ap, "w", encoding="utf-8") as f:
f.write("")
os.system("/etc/init.d/S30wifi restart")
# 某些固件生成的 hostapd.conf 缺少新安全参数,导致 Windows 提示“较旧的安全标准”。
# 若本次修改了 hostapd.conf则再重启一次让 hostapd 重新加载配置。
try:
if _ensure_hostapd_modern_security(logger):
os.system("/etc/init.d/S30wifi restart")
except Exception:
pass
return True
except Exception as e:
logger.error(f"[WIFI-AP] 切换 /boot 为 AP 模式失败: {e}")
return False
def start_http_server_thread():
"""仅启动 HTTP 线程(假定 AP 已由 S30wifi 拉起)。"""
global _http_thread
logger = logger_manager.logger
if _http_thread is not None and _http_thread.is_alive():
logger.warning("[WIFI-HTTP] 配网线程已在运行")
return
_http_stop.clear()
host = getattr(config, "WIFI_CONFIG_HTTP_HOST", "0.0.0.0")
port = int(getattr(config, "WIFI_CONFIG_HTTP_PORT", 8080))
_http_thread = threading.Thread(
target=_serve_loop,
args=(host, port),
daemon=True,
name="wifi_config_httpd",
)
_http_thread.start()
def maybe_start_wifi_ap_fallback(logger=None):
"""
若启用 WIFI_CONFIG_AP_FALLBACK等待若干秒后检测 STA WiFi 与 4G
仅当二者均不可用时,写热点用的 /boot/wifi.ssid|pass、切到 /boot/wifi.ap 并 restart S30wifi再启动 HTTP。
"""
if logger is None:
logger = logger_manager.logger
if not getattr(config, "WIFI_CONFIG_AP_FALLBACK", False):
return
from network import network_manager
# 先快速检测一次:若 STA 或 4G 已可用,直接返回,避免不必要的等待
wifi_ok = wifi_manager.is_sta_associated()
g4_ok = network_manager.is_4g_available()
logger.info(f"[WIFI-AP] 兜底检测(quick)sta关联={wifi_ok}, 4g={g4_ok}")
if wifi_ok or g4_ok:
logger.info("[WIFI-AP] STA 或 4G 可用,不启动热点配网")
return
# 两者均不可用:再按配置等待一段时间后复检,避免开机瞬态误判
wait_sec = int(getattr(config, "WIFI_AP_FALLBACK_WAIT_SEC", 10))
wait_sec = max(0, min(wait_sec, 120))
if wait_sec > 0:
logger.info(f"[WIFI-AP] 兜底配网:等待 {wait_sec}s 后再检测 STA/4G…")
std_time.sleep(wait_sec)
# 必须用 STA 关联判断is_wifi_connected() 在 AP 模式会因 192.168.66.1 误判为已连接
wifi_ok = wifi_manager.is_sta_associated()
g4_ok = network_manager.is_4g_available()
logger.info(f"[WIFI-AP] 兜底检测sta关联={wifi_ok}, 4g={g4_ok}")
if wifi_ok or g4_ok:
logger.info("[WIFI-AP] STA 或 4G 可用,不启动热点配网")
return
logger.warning("[WIFI-AP] STA 与 4G 均不可用,启动热点配网(/boot/wifi.ap + HTTP")
try:
_write_boot_ap_credentials_for_s30wifi()
except Exception as e:
logger.error(f"[WIFI-AP] 写热点 /boot 凭证失败: {e}")
return
if not _switch_boot_to_ap_mode(logger):
return
std_time.sleep(3)
start_http_server_thread()
p = int(getattr(config, "WIFI_CONFIG_HTTP_PORT", 8080))
ip = getattr(config, "WIFI_CONFIG_AP_IP", "192.168.66.1")
logger.info(f"[WIFI-AP] 请连接热点后访问 http://{ip}:{p}/ (若 IP 以 S30wifi 为准)")
def stop_wifi_config_http():
"""请求停止 HTTP 线程(下次 accept 超时后退出)。"""
_http_stop.set()
# 兼容旧名:不再使用「强制开 AP」逻辑统一走 maybe_start_wifi_ap_fallback
def start_wifi_config_ap_thread():
maybe_start_wifi_ap_fallback()