#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ WiFi 热点配网:迷你 HTTP 服务器(仅 GET/POST,标准库 socket),独立线程运行。 策略(与 /etc/init.d/S30wifi 一致): - 仅当 STA 未连上 WiFi 且 4G 也不可用时,写入 /boot/wifi.ap、去掉 /boot/wifi.sta, 并重启 S30wifi 由系统起热点;再在本进程起 HTTP。 - 用户 POST 提交路由器 SSID/密码后:仅写凭证、stop S30wifi、删 /boot/wifi.ap、建 /boot/wifi.sta、sync、reboot。 """ import html import os import socket import threading import time as std_time from urllib.parse import parse_qs import config from logger_manager import logger_manager from wifi import wifi_manager _http_thread = None _http_stop = threading.Event() def _http_response(status, body_bytes, content_type="text/html; charset=utf-8"): head = ( f"HTTP/1.1 {status}\r\n" f"Content-Type: {content_type}\r\n" f"Content-Length: {len(body_bytes)}\r\n" f"Connection: close\r\n" f"\r\n" ).encode("utf-8") return head + body_bytes def _read_http_request(conn, max_total=65536): """返回 (method, path, headers_str, body_bytes) 或 None。""" buf = b"" while b"\r\n\r\n" not in buf and len(buf) < max_total: chunk = conn.recv(4096) if not chunk: break buf += chunk if b"\r\n\r\n" not in buf: return None idx = buf.index(b"\r\n\r\n") header_bytes = buf[:idx] rest = buf[idx + 4 :] try: headers_str = header_bytes.decode("utf-8", errors="replace") except Exception: headers_str = "" lines = headers_str.split("\r\n") if not lines: return None parts = lines[0].split() method = parts[0] if parts else "GET" path = parts[1] if len(parts) > 1 else "/" content_length = 0 for line in lines[1:]: if line.lower().startswith("content-length:"): try: content_length = int(line.split(":", 1)[1].strip()) except Exception: content_length = 0 break body = rest while content_length > 0 and len(body) < content_length and len(body) < max_total: chunk = conn.recv(4096) if not chunk: break body += chunk body = body[:content_length] return method, path, headers_str, body def _page_form(msg_html=""): # 页面展示的热点名:以 /boot/wifi.ssid 为准(与实际 AP 保持一致) try: if os.path.exists("/boot/wifi.ssid"): with open("/boot/wifi.ssid", "r", encoding="utf-8") as f: _ssid = f.read().strip() else: _ssid = "" except Exception: _ssid = "" ap_ssid = html.escape(_ssid or getattr(config, "WIFI_CONFIG_AP_SSID", "ArcherySetup")) port = int(getattr(config, "WIFI_CONFIG_HTTP_PORT", 8080)) ap_ip = html.escape(getattr(config, "WIFI_CONFIG_AP_IP", "192.168.66.1")) body = f"""
热点:{ap_ssid} · 端口 {port}
请填写要连接的路由器 SSID 与密码(用于 STA 上网,不是热点密码)。提交后将关闭热点、保存并重启设备。
{msg_html}提示:提交后设备会重启;请手机改连路由器 WiFi。
""" return body.encode("utf-8") def _apply_sta_and_reboot(router_ssid: str, router_password: str): """ 写路由器 STA 凭证 -> 停 WiFi 服务 -> 删 /boot/wifi.ap -> 建 /boot/wifi.sta -> sync -> reboot """ logger = logger_manager.logger ok, err = wifi_manager.persist_sta_credentials(router_ssid, router_password, restart_service=False) if not ok: return False, err try: os.system("/etc/init.d/S30wifi stop") except Exception as e: logger.warning(f"[WIFI-AP] S30wifi stop: {e}") ap_flag = "/boot/wifi.ap" sta_flag = "/boot/wifi.sta" try: if os.path.exists(ap_flag): os.remove(ap_flag) except Exception as e: return False, f"删除 {ap_flag} 失败: {e}" try: with open(sta_flag, "w", encoding="utf-8") as f: f.write("") except Exception as e: return False, f"创建 {sta_flag} 失败: {e}" try: os.system("sync") except Exception: pass logger.info("[WIFI-AP] 已切换为 STA 标志并准备 reboot") try: os.system("reboot") except Exception as e: return False, f"reboot 调用失败: {e}" return True, "" def _handle_client(conn, addr): logger = logger_manager.logger try: conn.settimeout(30.0) req = _read_http_request(conn) if not req: conn.sendall(_http_response("400 Bad Request", b"Bad Request")) return method, path, _headers, body = req path = path.split("?", 1)[0] if method == "GET" and path in ("/", "/index.html"): conn.sendall(_http_response("200 OK", _page_form())) return if method == "POST" and path in ("/", "/index.html"): try: qs = body.decode("utf-8", errors="replace") except Exception: qs = "" fields = parse_qs(qs, keep_blank_values=True) ssid = (fields.get("ssid") or [""])[0].strip() password = (fields.get("password") or [""])[0] ok, err = _apply_sta_and_reboot(ssid, password) if ok: msg = '已保存,设备正在重启…
' else: msg = f'失败:{html.escape(err)}
' conn.sendall(_http_response("200 OK", _page_form(msg))) return if method == "GET" and path == "/favicon.ico": conn.sendall(_http_response("204 No Content", b"")) return conn.sendall(_http_response("404 Not Found", b"Not Found")) except Exception as e: try: logger.error(f"[WIFI-HTTP] 处理请求异常 {addr}: {e}") except Exception: pass finally: try: conn.close() except Exception: pass def _serve_loop(host, port): logger = logger_manager.logger srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) srv.bind((host, port)) srv.listen(5) srv.settimeout(1.0) logger.info(f"[WIFI-HTTP] 监听 {host}:{port}") except Exception as e: logger.error(f"[WIFI-HTTP] bind 失败: {e}") try: srv.close() except Exception: pass return while not _http_stop.is_set(): try: conn, addr = srv.accept() except socket.timeout: continue except Exception as e: if _http_stop.is_set(): break logger.warning(f"[WIFI-HTTP] accept: {e}") continue t = threading.Thread(target=_handle_client, args=(conn, addr), daemon=True) t.start() try: srv.close() except Exception: pass logger.info("[WIFI-HTTP] 服务已停止") def _ensure_hostapd_ssid(ssid: str, logger=None) -> bool: """ 某些固件会把 SSID 写到 /etc/hostapd.conf 或 /boot/hostapd.conf。 为避免只改 /boot/wifi.ssid 不生效,这里同步更新已存在的 hostapd.conf。 Returns: bool: 任一文件被修改则 True """ if logger is None: logger = logger_manager.logger if not ssid: return False changed_any = False for conf_path in ("/etc/hostapd.conf", "/boot/hostapd.conf"): try: if not os.path.exists(conf_path): continue with open(conf_path, "r", encoding="utf-8") as f: lines = f.read().splitlines() except Exception: continue changed = False out = [] seen = False for ln in lines: s = ln.strip() if s.lower().startswith("ssid="): seen = True cur = s.split("=", 1)[1].strip() if cur != ssid: out.append(f"ssid={ssid}") changed = True else: out.append(ln) else: out.append(ln) if not seen: out.append(f"ssid={ssid}") changed = True if changed: try: with open(conf_path, "w", encoding="utf-8") as f: f.write("\n".join(out).rstrip() + "\n") changed_any = True except Exception as e: if logger: logger.warning(f"[WIFI-AP] 写入 {conf_path} 失败: {e}") if changed_any and logger: logger.info(f"[WIFI-AP] 已同步热点 SSID 到 hostapd.conf: {ssid}") return changed_any def _write_boot_ap_credentials_for_s30wifi(): """供 S30wifi AP 分支 gen_hostapd 使用的热点 SSID/密码。""" base = (getattr(config, "WIFI_CONFIG_AP_SSID", "ArcherySetup") or "ArcherySetup").strip() # 追加设备码,便于区分多台设备(读取 /device_key,失败则不加后缀) suffix = "" try: with open("/device_key", "r", encoding="utf-8") as f: dev = (f.read() or "").strip() if dev: s = dev # 只保留字母数字,避免 SSID 出现不可见字符 s = "".join([c for c in s if c.isalnum()]) if s: suffix = s except Exception: suffix = "" ssid = f"{base}_{suffix}" if suffix else base pwd = getattr(config, "WIFI_CONFIG_AP_PASSWORD", "12345678") with open("/boot/wifi.ssid", "w", encoding="utf-8") as f: f.write(ssid.strip()) with open("/boot/wifi.pass", "w", encoding="utf-8") as f: f.write(pwd.strip()) try: _ensure_hostapd_ssid(ssid.strip()) except Exception: pass def _ensure_hostapd_modern_security(logger=None) -> bool: """ 确保 AP 使用较新的安全标准(至少 WPA2-PSK + CCMP)。 你现场验证需要的两行: - wpa_key_mgmt=WPA-PSK - rsn_pairwise=CCMP Returns: bool: 若文件被修改返回 True,否则 False """ if logger is None: logger = logger_manager.logger conf_path = "/etc/hostapd.conf" try: if not os.path.exists(conf_path): return False with open(conf_path, "r", encoding="utf-8") as f: lines = f.read().splitlines() except Exception as e: logger.warning(f"[WIFI-AP] 读取 hostapd.conf 失败: {e}") return False wanted = { "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", } changed = False seen = set() new_lines = [] for ln in lines: s = ln.strip() if not s or s.startswith("#") or "=" not in s: new_lines.append(ln) continue k, v = s.split("=", 1) k = k.strip() if k in wanted: seen.add(k) new_v = wanted[k] if v.strip() != new_v: new_lines.append(f"{k}={new_v}") changed = True else: new_lines.append(ln) continue new_lines.append(ln) # 缺的补到末尾 for k, v in wanted.items(): if k not in seen: new_lines.append(f"{k}={v}") changed = True if not changed: return False try: with open(conf_path, "w", encoding="utf-8") as f: f.write("\n".join(new_lines).rstrip() + "\n") logger.info("[WIFI-AP] 已更新 /etc/hostapd.conf 安全参数(WPA-PSK + CCMP)") return True except Exception as e: logger.warning(f"[WIFI-AP] 写入 hostapd.conf 失败: {e}") return False def _switch_boot_to_ap_mode(logger): """ 去掉 STA 标志、建立 AP 标志,由 S30wifi 起 hostapd(与 Maix start_ap 二选一,以系统脚本为准)。 """ try: sta = "/boot/wifi.sta" ap = "/boot/wifi.ap" if os.path.exists(sta): os.remove(sta) with open(ap, "w", encoding="utf-8") as f: f.write("") os.system("/etc/init.d/S30wifi restart") # 某些固件生成的 hostapd.conf 缺少新安全参数,导致 Windows 提示“较旧的安全标准”。 # 若本次修改了 hostapd.conf,则再重启一次让 hostapd 重新加载配置。 try: if _ensure_hostapd_modern_security(logger): os.system("/etc/init.d/S30wifi restart") except Exception: pass return True except Exception as e: logger.error(f"[WIFI-AP] 切换 /boot 为 AP 模式失败: {e}") return False def start_http_server_thread(): """仅启动 HTTP 线程(假定 AP 已由 S30wifi 拉起)。""" global _http_thread logger = logger_manager.logger if _http_thread is not None and _http_thread.is_alive(): logger.warning("[WIFI-HTTP] 配网线程已在运行") return _http_stop.clear() host = getattr(config, "WIFI_CONFIG_HTTP_HOST", "0.0.0.0") port = int(getattr(config, "WIFI_CONFIG_HTTP_PORT", 8080)) _http_thread = threading.Thread( target=_serve_loop, args=(host, port), daemon=True, name="wifi_config_httpd", ) _http_thread.start() def maybe_start_wifi_ap_fallback(logger=None): """ 若启用 WIFI_CONFIG_AP_FALLBACK:等待若干秒后检测 STA WiFi 与 4G, 仅当二者均不可用时,写热点用的 /boot/wifi.ssid|pass、切到 /boot/wifi.ap 并 restart S30wifi,再启动 HTTP。 """ if logger is None: logger = logger_manager.logger if not getattr(config, "WIFI_CONFIG_AP_FALLBACK", False): return from network import network_manager # 先快速检测一次:若 STA 或 4G 已可用,直接返回,避免不必要的等待 wifi_ok = wifi_manager.is_sta_associated() g4_ok = network_manager.is_4g_available() logger.info(f"[WIFI-AP] 兜底检测(quick):sta关联={wifi_ok}, 4g={g4_ok}") if wifi_ok or g4_ok: logger.info("[WIFI-AP] STA 或 4G 可用,不启动热点配网") return # 两者均不可用:再按配置等待一段时间后复检,避免开机瞬态误判 wait_sec = int(getattr(config, "WIFI_AP_FALLBACK_WAIT_SEC", 10)) wait_sec = max(0, min(wait_sec, 120)) if wait_sec > 0: logger.info(f"[WIFI-AP] 兜底配网:等待 {wait_sec}s 后再检测 STA/4G…") std_time.sleep(wait_sec) # 必须用 STA 关联判断;is_wifi_connected() 在 AP 模式会因 192.168.66.1 误判为已连接 wifi_ok = wifi_manager.is_sta_associated() g4_ok = network_manager.is_4g_available() logger.info(f"[WIFI-AP] 兜底检测:sta关联={wifi_ok}, 4g={g4_ok}") if wifi_ok or g4_ok: logger.info("[WIFI-AP] STA 或 4G 可用,不启动热点配网") return logger.warning("[WIFI-AP] STA 与 4G 均不可用,启动热点配网(/boot/wifi.ap + HTTP)") try: _write_boot_ap_credentials_for_s30wifi() except Exception as e: logger.error(f"[WIFI-AP] 写热点 /boot 凭证失败: {e}") return if not _switch_boot_to_ap_mode(logger): return std_time.sleep(3) start_http_server_thread() p = int(getattr(config, "WIFI_CONFIG_HTTP_PORT", 8080)) ip = getattr(config, "WIFI_CONFIG_AP_IP", "192.168.66.1") logger.info(f"[WIFI-AP] 请连接热点后访问 http://{ip}:{p}/ (若 IP 以 S30wifi 为准)") def stop_wifi_config_http(): """请求停止 HTTP 线程(下次 accept 超时后退出)。""" _http_stop.set() # 兼容旧名:不再使用「强制开 AP」逻辑,统一走 maybe_start_wifi_ap_fallback def start_wifi_config_ap_thread(): maybe_start_wifi_ap_fallback()