diff --git a/config.py b/config.py index b95cb70..5cf6d02 100644 --- a/config.py +++ b/config.py @@ -24,6 +24,14 @@ WIFI_QUALITY_RTT_WARN_MS = 350.0 # 与 RSSI 联合:超过此值且信号弱 WIFI_QUALITY_RSSI_BAD_DBM = -80.0 # 低于此 dBm(更负更差)视为信号弱 WIFI_QUALITY_USE_RSSI = True # 是否把 RSSI 纳入综合判定(False 则仅看 RTT) +# WiFi 热点配网(手机连设备 AP,浏览器提交路由器 SSID/密码;仅 GET/POST,标准库 socket) +WIFI_CONFIG_AP_ENABLED = False # True=启动时开热点并起迷你 HTTP 配网服务 +WIFI_CONFIG_AP_SSID = "ArcherySetup" # 设备发出的热点名称 +WIFI_CONFIG_AP_PASSWORD = "12345678" # 热点密码(WPA2 通常至少 8 位) +WIFI_CONFIG_HTTP_HOST = "0.0.0.0" # HTTP 监听地址 +WIFI_CONFIG_HTTP_PORT = 8080 # 默认 8080,避免占用 80 需 root +WIFI_CONFIG_AP_IP = "192.168.66.1" # 与 MaixPy Wifi.start_ap 默认一致,手机访问 http://192.168.66.1:8080/ + # ===== TCP over SSL(TLS) 配置 ===== USE_TCP_SSL = False # True=按手册走 MSSLCFG/MIPCFG 绑定 SSL TCP_LINK_ID = 2 # diff --git a/main.py b/main.py index a29fbde..fa22767 100644 --- a/main.py +++ b/main.py @@ -115,6 +115,15 @@ def cmd_str(): # 2. 从4G模块同步系统时间(需要 at_client 已初始化) sync_system_time_from_4g() + # 2.1 WiFi 热点配网兜底:仅当 STA 与 4G 均不可用时起 AP + HTTP;提交后删 /boot/wifi.ap、建 wifi.sta 并 reboot + try: + from wifi_config_httpd import maybe_start_wifi_ap_fallback + + maybe_start_wifi_ap_fallback(logger) + except Exception as e: + if logger: + logger.error(f"[WIFI-AP] 兜底配网检测/启动失败: {e}") + # 2.5. 启动存图 worker 线程(队列 + worker,避免主循环阻塞) start_save_shot_worker() diff --git a/network.py b/network.py index 98ae2ad..23397ff 100644 --- a/network.py +++ b/network.py @@ -1291,265 +1291,268 @@ class NetworkManager: continue # 接收数据(根据网络类型选择接收方式) - item = None + # WiFi 粘包:一次 recv 可能含多条完整包;也可能缓冲里已有完整包但本轮 recv 超时为空 + rx_items = [] if self._network_type == "wifi": - # WiFi接收数据 - data = self.receive_tcp_data_via_wifi(timeout_ms=50) + data = self.receive_tcp_data_via_wifi(timeout_ms=5) if data: - # 将数据添加到缓冲区 self.logger.info(f"[NET] 接收WiFi数据, {time.time()}") wifi_manager.recv_buffer += data - - # 尝试从缓冲区解析完整的数据包 - while len(wifi_manager.recv_buffer) >= 12: # 至少需要12字节的头部 - # 解析头部 - try: - body_len, msg_type, checksum = struct.unpack(">III", wifi_manager.recv_buffer[:12]) - total_len = 12 + body_len - - if len(wifi_manager.recv_buffer) >= total_len: - # 有完整的数据包 - payload = wifi_manager.recv_buffer[:total_len] - wifi_manager.recv_buffer = wifi_manager.recv_buffer[total_len:] - item = (0, payload) # link_id=0 for WiFi - break - else: - # 数据包不完整,等待更多数据 - self.logger.info(f"[NET] 接收WiFi数据不完整, {time.time()}") - break - except: - # 解析失败,清空缓冲区 - wifi_manager.recv_buffer = b"" - self.logger.info(f"[NET] 接收WiFi数据解析失败, {time.time()}") - break - elif self._network_type == "4g": - # 4G接收数据 - item = hardware_manager.at_client.pop_tcp_payload() - - if item: - if isinstance(item, tuple) and len(item) == 2: - link_id, payload = item - else: - link_id, payload = 0, item - - if not logged_in: + while len(wifi_manager.recv_buffer) >= 12: try: - self.logger.debug(f"[TCP] rx link={link_id} len={len(payload)} head={payload[:12].hex()}") - except: - pass - - # msg_type, body = self.parse_packet(payload) - msg_type, body = self._netcore.parse_packet(payload) - - # 处理登录响应 - if not logged_in and msg_type == 1: - if body and body.get("cmd") == 1 and body.get("data") == "登录成功": - logged_in = True - last_heartbeat_ack_time = time.ticks_ms() - self.logger.info("登录成功") - - # 检查 ota_pending.json - try: - pending_path = f"{config.APP_DIR}/ota_pending.json" - if os.path.exists(pending_path): - try: - with open(pending_path, "r", encoding="utf-8") as f: - pending_obj = json.load(f) - except: - pending_obj = {} - self.safe_enqueue({"result": "ota_ok", "url": pending_obj.get("url", "")}, 2) - self.logger.info("[OTA] 已上报 ota_ok,等待心跳确认后删除 pending") - except Exception as e: - self.logger.error(f"[OTA] ota_ok 上报失败: {e}") - else: - break - - # 处理心跳 ACK - elif logged_in and msg_type == 4: - last_heartbeat_ack_time = time.ticks_ms() - self.logger.debug("✅ 收到心跳确认") - - # 处理命令40(分片下载) - elif logged_in and msg_type == 40: - if isinstance(body, dict): - t = body.get('t', 0) - v = body.get('v') - # 如果是第一个分片,清空之前的缓存 - if len(self._raw_line_data) == 0 or (len(self._raw_line_data) > 0 and self._raw_line_data[0].get('v') != v): - self._raw_line_data.clear() - # 或者更简单:每次收到命令40时,如果版本号不同,清空缓存 - if len(self._raw_line_data) > 0: - first_v = self._raw_line_data[0].get('v') - if first_v and first_v != v: - self._raw_line_data.clear() - self._raw_line_data.append(body) - if len(self._raw_line_data) >= int(t): - self.logger.info(f"下载完成") - from ota_manager import ota_manager - stock_array = list(map(lambda x: x.get('d'), self._raw_line_data)) - local_filename = config.LOCAL_FILENAME - with open(local_filename, 'w', encoding='utf-8') as file: - file.write("\n".join(stock_array)) - ota_manager.apply_ota_and_reboot(None, local_filename) + body_len, msg_type, checksum = struct.unpack(">III", wifi_manager.recv_buffer[:12]) + total_len = 12 + body_len + if len(wifi_manager.recv_buffer) >= total_len: + payload = wifi_manager.recv_buffer[:total_len] + wifi_manager.recv_buffer = wifi_manager.recv_buffer[total_len:] + rx_items.append((0, payload)) else: - self.safe_enqueue({'data':{'l': len(self._raw_line_data), 'v': v}, 'cmd': 41}) - self.logger.info(f"已下载{len(self._raw_line_data)} 全部:{t} 版本:{v}") + self.logger.info(f"[NET] 接收WiFi数据不完整, {time.time()}") + break + except Exception: + wifi_manager.recv_buffer = b"" + self.logger.info(f"[NET] 接收WiFi数据解析失败, {time.time()}") + break + elif self._network_type == "4g": + item = hardware_manager.at_client.pop_tcp_payload() + if item: + rx_items.append(item) - # 处理业务指令 - elif logged_in and isinstance(body, dict): - inner_cmd = None - data_obj = body.get("data") - if isinstance(data_obj, dict): - inner_cmd = data_obj.get("cmd") - if inner_cmd == 2: # 开启激光并校准 - from laser_manager import laser_manager - if not laser_manager.calibration_active: - laser_manager.turn_on_laser() - time.sleep_ms(100) - hardware_manager.stop_idle_timer() # 停表 - if not config.HARDCODE_LASER_POINT: - laser_manager.start_calibration() - self.safe_enqueue({"result": "calibrating"}, 2) - else: - # 写死的逻辑,不需要校准激光点 - self.safe_enqueue({"result": "laser pos set by hard code"}, 2) - elif inner_cmd == 3: # 关闭激光 - from laser_manager import laser_manager - laser_manager.turn_off_laser() - laser_manager.stop_calibration() - hardware_manager.start_idle_timer() # 开表 - self.safe_enqueue({"result": "laser_off"}, 2) - elif inner_cmd == 4: # 上报电量 - voltage = get_bus_voltage() - battery_percent = voltage_to_percent(voltage) - battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)} - self.safe_enqueue(battery_data, 2) - self.logger.info(f"电量上报: {battery_percent}%") - elif inner_cmd == 5: # OTA 升级 - inner_data = data_obj.get("data", {}) if isinstance(data_obj, dict) else {} - ssid = inner_data.get("ssid") - password = inner_data.get("password") - ota_url = inner_data.get("url") - mode = (inner_data.get("mode") or "").strip().lower() - - if not ota_url: - self.logger.error("ota missing_url") - self.safe_enqueue({"result": "missing_url"}, 2) - continue - - from ota_manager import ota_manager - if ota_manager.update_thread_started: - self.safe_enqueue({"result": "update_already_started"}, 2) - continue - - # 自动判断模式:如果没有明确指定,根据WiFi连接状态和凭证决定 - if mode not in ("4g", "wifi"): - self.logger.info("ota missing mode, auto-detecting...") - # 若本次会话已锁定 4G,则 OTA 自动也走 4G,避免后续回切导致体验不一致 - if self._session_force_4g: - mode = "4g" - self.logger.info("ota auto-selected: 4g (session locked on 4g)") - else: - # 只有同时满足:WiFi已连接 且 提供了WiFi凭证,才使用WiFi - if self.is_wifi_connected() and ssid and password: - mode = "wifi" - self.logger.info("ota auto-selected: wifi (WiFi connected and credentials provided)") - else: - mode = "4g" - self.logger.info("ota auto-selected: 4g (WiFi not available or no credentials)") - - hardware_manager.stop_idle_timer() # 停表,注意OTA停表之后,就没有再开表,因为OTA后面会重启,会重新开表 - - if mode == "4g": - ota_manager._set_ota_url(ota_url) # 记录 OTA URL,供命令7使用 - ota_manager._start_update_thread() - _thread.start_new_thread(ota_manager.direct_ota_download_via_4g, (ota_url,)) - else: # mode == "wifi" - if not ssid or not password: - self.logger.error("ota wifi mode requires ssid and password") - self.safe_enqueue({"result": "missing_ssid_or_password"}, 2) - else: - ota_manager._start_update_thread() - _thread.start_new_thread(ota_manager.handle_wifi_and_update, (ssid, password, ota_url)) - elif inner_cmd == 6: + _rx_login_fail = False + _rx_skip_tcp_iteration = False + if rx_items: + self.logger.info(f"total {len(rx_items)} items") + for item in rx_items: + if isinstance(item, tuple) and len(item) == 2: + link_id, payload = item + else: + link_id, payload = 0, item + + if not logged_in: + try: + self.logger.debug(f"[TCP] rx link={link_id} len={len(payload)} head={payload[:12].hex()}") + except: + pass + + # msg_type, body = self.parse_packet(payload) + msg_type, body = self._netcore.parse_packet(payload) + + # 处理登录响应 + if not logged_in and msg_type == 1: + if body and body.get("cmd") == 1 and body.get("data") == "登录成功": + logged_in = True + last_heartbeat_ack_time = time.ticks_ms() + self.logger.info("登录成功") + + # 检查 ota_pending.json try: - ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() - ip = ip if ip else "no_ip" - except: - ip = "error_getting_ip" - self.safe_enqueue({"result": "current_ip", "ip": ip}, 2) - # elif inner_cmd == 7: - # from ota_manager import ota_manager - # if ota_manager.update_thread_started: - # self.safe_enqueue({"result": "update_already_started"}, 2) - # continue - - # try: - # ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() - # except: - # ip = None - - # if not ip: - # self.safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, 2) - # else: - # # 注意:direct_ota_download 需要 ota_url 参数 - # # 如果 ota_manager.ota_url 为 None,需要从其他地方获取 - # ota_url_to_use = ota_manager.ota_url - # if not ota_url_to_use: - # self.logger.error("[OTA] cmd=7 但 OTA_URL 未设置") - # self.safe_enqueue({"result": "ota_failed", "reason": "ota_url_not_set"}, 2) - # else: - # ota_manager._start_update_thread() - # _thread.start_new_thread(ota_manager.direct_ota_download, (ota_url_to_use,)) - elif inner_cmd == 41: - self.logger.info(f"[TEST] 收到TCP射箭触发命令, {time.time()}") - self._manual_trigger_flag = True - self.safe_enqueue({"result": "trigger_ack"}, 2) - hardware_manager.start_idle_timer() # 重新计时 - elif inner_cmd == 42: # 关机命令 - self.logger.info("[SHUTDOWN] 收到TCP关机命令,准备关机...") - self.safe_enqueue({"result": "shutdown_ack"}, 2) - time.sleep_ms(1000) - self.disconnect_server() - # 尝试关闭4G模块 - try: - with self.get_uart_lock(): - hardware_manager.at_client.send("AT+CFUN=0", "OK", 5000) - except: - pass - time.sleep_ms(2000) - os.system("sync") # 刷新文件系统缓存到磁盘,防止数据丢失 - time.sleep_ms(500) - # os.system("poweroff") - hardware_manager.power_off() - return - elif inner_cmd == 43: # 上传日志命令 - # 格式: {"cmd":43, "data":{"ssid":"xxx","password":"xxx","url":"xxx", ...}} - inner_data = data_obj.get("data", {}) - upload_url = inner_data.get("url") - wifi_ssid = inner_data.get("ssid") - wifi_password = inner_data.get("password") - include_rotated = inner_data.get("include_rotated", True) - max_files = inner_data.get("max_files") - archive_format = inner_data.get("archive", "tgz") # tgz 或 zip - - hardware_manager.start_idle_timer() # 重新计时 - - if not upload_url: - self.logger.error("[LOG_UPLOAD] 缺少 url 参数") - self.safe_enqueue({"result": "log_upload_failed", "reason": "missing_url"}, 2) + pending_path = f"{config.APP_DIR}/ota_pending.json" + if os.path.exists(pending_path): + try: + with open(pending_path, "r", encoding="utf-8") as f: + pending_obj = json.load(f) + except: + pending_obj = {} + self.safe_enqueue({"result": "ota_ok", "url": pending_obj.get("url", "")}, 2) + self.logger.info("[OTA] 已上报 ota_ok,等待心跳确认后删除 pending") + except Exception as e: + self.logger.error(f"[OTA] ota_ok 上报失败: {e}") + else: + _rx_login_fail = True + break + + # 处理心跳 ACK + elif logged_in and msg_type == 4: + last_heartbeat_ack_time = time.ticks_ms() + self.logger.debug("✅ 收到心跳确认") + + # 处理命令40(分片下载) + elif logged_in and msg_type == 40: + if isinstance(body, dict): + t = body.get('t', 0) + v = body.get('v') + # 如果是第一个分片,清空之前的缓存 + if len(self._raw_line_data) == 0 or (len(self._raw_line_data) > 0 and self._raw_line_data[0].get('v') != v): + self._raw_line_data.clear() + # 或者更简单:每次收到命令40时,如果版本号不同,清空缓存 + if len(self._raw_line_data) > 0: + first_v = self._raw_line_data[0].get('v') + if first_v and first_v != v: + self._raw_line_data.clear() + self._raw_line_data.append(body) + if len(self._raw_line_data) >= int(t): + self.logger.info(f"下载完成") + from ota_manager import ota_manager + stock_array = list(map(lambda x: x.get('d'), self._raw_line_data)) + local_filename = config.LOCAL_FILENAME + with open(local_filename, 'w', encoding='utf-8') as file: + file.write("\n".join(stock_array)) + ota_manager.apply_ota_and_reboot(None, local_filename) else: - self.logger.info(f"[LOG_UPLOAD] 收到日志上传命令,目标URL: {upload_url}") - # 在新线程中执行上传,避免阻塞主循环 - import _thread - _thread.start_new_thread( - self._upload_log_file, - (upload_url, wifi_ssid, wifi_password, include_rotated, max_files, archive_format) - ) - else: # data的结构不是 dict - self.logger.info(f"[NET] body={body}, {time.time()}") - else: - self.logger.info(f"[NET] 未知数据 {body}, {time.time()}") + self.safe_enqueue({'data':{'l': len(self._raw_line_data), 'v': v}, 'cmd': 41}) + self.logger.info(f"已下载{len(self._raw_line_data)} 全部:{t} 版本:{v}") + + # 处理业务指令 + elif logged_in and isinstance(body, dict): + inner_cmd = None + data_obj = body.get("data") + if isinstance(data_obj, dict): + inner_cmd = data_obj.get("cmd") + if inner_cmd == 2: # 开启激光并校准 + from laser_manager import laser_manager + if not laser_manager.calibration_active: + laser_manager.turn_on_laser() + time.sleep_ms(100) + hardware_manager.stop_idle_timer() # 停表 + if not config.HARDCODE_LASER_POINT: + laser_manager.start_calibration() + self.safe_enqueue({"result": "calibrating"}, 2) + else: + # 写死的逻辑,不需要校准激光点 + self.safe_enqueue({"result": "laser pos set by hard code"}, 2) + elif inner_cmd == 3: # 关闭激光 + from laser_manager import laser_manager + laser_manager.turn_off_laser() + laser_manager.stop_calibration() + hardware_manager.start_idle_timer() # 开表 + self.safe_enqueue({"result": "laser_off"}, 2) + elif inner_cmd == 4: # 上报电量 + voltage = get_bus_voltage() + battery_percent = voltage_to_percent(voltage) + battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)} + self.safe_enqueue(battery_data, 2) + self.logger.info(f"电量上报: {battery_percent}%") + elif inner_cmd == 5: # OTA 升级 + inner_data = data_obj.get("data", {}) if isinstance(data_obj, dict) else {} + ssid = inner_data.get("ssid") + password = inner_data.get("password") + ota_url = inner_data.get("url") + mode = (inner_data.get("mode") or "").strip().lower() + + if not ota_url: + self.logger.error("ota missing_url") + self.safe_enqueue({"result": "missing_url"}, 2) + _rx_skip_tcp_iteration = True + break + + from ota_manager import ota_manager + if ota_manager.update_thread_started: + self.safe_enqueue({"result": "update_already_started"}, 2) + _rx_skip_tcp_iteration = True + break + + # 自动判断模式:如果没有明确指定,根据WiFi连接状态和凭证决定 + if mode not in ("4g", "wifi"): + self.logger.info("ota missing mode, auto-detecting...") + # 若本次会话已锁定 4G,则 OTA 自动也走 4G,避免后续回切导致体验不一致 + if self._session_force_4g: + mode = "4g" + self.logger.info("ota auto-selected: 4g (session locked on 4g)") + else: + # 只有同时满足:WiFi已连接 且 提供了WiFi凭证,才使用WiFi + if self.is_wifi_connected() and ssid and password: + mode = "wifi" + self.logger.info("ota auto-selected: wifi (WiFi connected and credentials provided)") + else: + mode = "4g" + self.logger.info("ota auto-selected: 4g (WiFi not available or no credentials)") + + hardware_manager.stop_idle_timer() # 停表,注意OTA停表之后,就没有再开表,因为OTA后面会重启,会重新开表 + + if mode == "4g": + ota_manager._set_ota_url(ota_url) # 记录 OTA URL,供命令7使用 + ota_manager._start_update_thread() + _thread.start_new_thread(ota_manager.direct_ota_download_via_4g, (ota_url,)) + else: # mode == "wifi" + if not ssid or not password: + self.logger.error("ota wifi mode requires ssid and password") + self.safe_enqueue({"result": "missing_ssid_or_password"}, 2) + else: + ota_manager._start_update_thread() + _thread.start_new_thread(ota_manager.handle_wifi_and_update, (ssid, password, ota_url)) + elif inner_cmd == 6: + try: + ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() + ip = ip if ip else "no_ip" + except: + ip = "error_getting_ip" + self.safe_enqueue({"result": "current_ip", "ip": ip}, 2) + # elif inner_cmd == 7: + # from ota_manager import ota_manager + # if ota_manager.update_thread_started: + # self.safe_enqueue({"result": "update_already_started"}, 2) + # continue + + # try: + # ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() + # except: + # ip = None + + # if not ip: + # self.safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, 2) + # else: + # # 注意:direct_ota_download 需要 ota_url 参数 + # # 如果 ota_manager.ota_url 为 None,需要从其他地方获取 + # ota_url_to_use = ota_manager.ota_url + # if not ota_url_to_use: + # self.logger.error("[OTA] cmd=7 但 OTA_URL 未设置") + # self.safe_enqueue({"result": "ota_failed", "reason": "ota_url_not_set"}, 2) + # else: + # ota_manager._start_update_thread() + # _thread.start_new_thread(ota_manager.direct_ota_download, (ota_url_to_use,)) + elif inner_cmd == 41: + self.logger.info(f"[TEST] 收到TCP射箭触发命令, {time.time()}") + self._manual_trigger_flag = True + self.safe_enqueue({"result": "trigger_ack"}, 2) + hardware_manager.start_idle_timer() # 重新计时 + elif inner_cmd == 42: # 关机命令 + self.logger.info("[SHUTDOWN] 收到TCP关机命令,准备关机...") + self.safe_enqueue({"result": "shutdown_ack"}, 2) + time.sleep_ms(1000) + self.disconnect_server() + # 尝试关闭4G模块 + try: + with self.get_uart_lock(): + hardware_manager.at_client.send("AT+CFUN=0", "OK", 5000) + except: + pass + time.sleep_ms(2000) + os.system("sync") # 刷新文件系统缓存到磁盘,防止数据丢失 + time.sleep_ms(500) + # os.system("poweroff") + hardware_manager.power_off() + return + elif inner_cmd == 43: # 上传日志命令 + # 格式: {"cmd":43, "data":{"ssid":"xxx","password":"xxx","url":"xxx", ...}} + inner_data = data_obj.get("data", {}) + upload_url = inner_data.get("url") + wifi_ssid = inner_data.get("ssid") + wifi_password = inner_data.get("password") + include_rotated = inner_data.get("include_rotated", True) + max_files = inner_data.get("max_files") + archive_format = inner_data.get("archive", "tgz") # tgz 或 zip + + hardware_manager.start_idle_timer() # 重新计时 + + if not upload_url: + self.logger.error("[LOG_UPLOAD] 缺少 url 参数") + self.safe_enqueue({"result": "log_upload_failed", "reason": "missing_url"}, 2) + else: + self.logger.info(f"[LOG_UPLOAD] 收到日志上传命令,目标URL: {upload_url}") + # 在新线程中执行上传,避免阻塞主循环 + import _thread + _thread.start_new_thread( + self._upload_log_file, + (upload_url, wifi_ssid, wifi_password, include_rotated, max_files, archive_format) + ) + else: # data的结构不是 dict + self.logger.info(f"[NET] body={body}, {time.time()}") + else: + self.logger.info(f"[NET] 未知数据 {body}, {time.time()}") + if _rx_login_fail: + break + if _rx_skip_tcp_iteration: + continue else: time.sleep_ms(5) diff --git a/wifi.py b/wifi.py index 899e422..dfc8c4d 100644 --- a/wifi.py +++ b/wifi.py @@ -116,6 +116,20 @@ class WiFiManager: # ==================== WiFi 连接方法 ==================== + def is_sta_associated(self): + """ + 是否作为 STA 已关联到上游 AP(用于与 AP 模式区分:AP 模式下 wlan0 可能有 IP 但 iw link 为 Not connected)。 + """ + try: + out = os.popen("iw dev wlan0 link 2>/dev/null").read() + if not out.strip(): + return False + if "Not connected" in out: + return False + return "Connected to" in out + except Exception: + return False + def is_wifi_connected(self): """检查WiFi是否已连接""" # 优先用 MaixPy network(如果可用) @@ -271,7 +285,67 @@ class WiFiManager: self._wifi_ip = None self.logger.error(f"[WIFI] 连接/验证失败,已回滚: {e}") return None, str(e) - + + def persist_sta_credentials(self, ssid: str, password: str, restart_service: bool = True): + """ + 仅写入 STA 凭证(/etc/wpa_supplicant.conf + /boot/wifi.ssid|pass), + 可选是否立即 /etc/init.d/S30wifi restart。 + 不做可达性验证。用于热点配网页提交后切换到连接指定路由器。 + password 为空时按开放网络(key_mgmt=NONE)写入。 + Returns: + (ok: bool, err_msg: str) + """ + ssid = (ssid or "").strip() + password = (password or "").strip() + if not ssid: + return False, "SSID 为空" + + conf_path = "/etc/wpa_supplicant.conf" + ssid_file = "/boot/wifi.ssid" + pass_file = "/boot/wifi.pass" + + def _write_text(path: str, content: str): + with open(path, "w", encoding="utf-8") as f: + f.write(content) + + try: + if password: + net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read() + if "network={" not in net_conf: + return False, "wpa_passphrase 失败" + else: + esc = ssid.replace("\\", "\\\\").replace('"', '\\"') + net_conf = ( + "network={\n" + f' ssid="{esc}"\n' + " key_mgmt=NONE\n" + "}\n" + ) + _write_text( + conf_path, + "ctrl_interface=/var/run/wpa_supplicant\n" + "update_config=1\n\n" + + net_conf, + ) + except Exception as e: + return False, str(e) + + try: + _write_text(ssid_file, ssid) + _write_text(pass_file, password) + except Exception as e: + return False, str(e) + + if restart_service: + try: + os.system("/etc/init.d/S30wifi restart") + except Exception as e: + return False, str(e) + self.logger.info(f"[WIFI] persist_sta_credentials: 已写入并重启 S30wifi, ssid={ssid!r}") + else: + self.logger.info(f"[WIFI] persist_sta_credentials: 已写入凭证(未重启 S30wifi), ssid={ssid!r}") + return True, "" + def disconnect_wifi(self): """断开WiFi连接并清理资源""" if self._wifi_socket: