fix wifi 2 pkg issue

This commit is contained in:
gcw_4spBpAfv
2026-04-03 15:40:07 +08:00
parent 685dce2519
commit bdc3254ed2
4 changed files with 345 additions and 251 deletions

View File

@@ -1291,265 +1291,268 @@ class NetworkManager:
continue
# 接收数据(根据网络类型选择接收方式)
item = None
# WiFi 粘包:一次 recv 可能含多条完整包;也可能缓冲里已有完整包但本轮 recv 超时为空
rx_items = []
if self._network_type == "wifi":
# WiFi接收数据
data = self.receive_tcp_data_via_wifi(timeout_ms=50)
data = self.receive_tcp_data_via_wifi(timeout_ms=5)
if data:
# 将数据添加到缓冲区
self.logger.info(f"[NET] 接收WiFi数据, {time.time()}")
wifi_manager.recv_buffer += data
# 尝试从缓冲区解析完整的数据包
while len(wifi_manager.recv_buffer) >= 12: # 至少需要12字节的头部
# 解析头部
try:
body_len, msg_type, checksum = struct.unpack(">III", wifi_manager.recv_buffer[:12])
total_len = 12 + body_len
if len(wifi_manager.recv_buffer) >= total_len:
# 有完整的数据包
payload = wifi_manager.recv_buffer[:total_len]
wifi_manager.recv_buffer = wifi_manager.recv_buffer[total_len:]
item = (0, payload) # link_id=0 for WiFi
break
else:
# 数据包不完整,等待更多数据
self.logger.info(f"[NET] 接收WiFi数据不完整, {time.time()}")
break
except:
# 解析失败,清空缓冲区
wifi_manager.recv_buffer = b""
self.logger.info(f"[NET] 接收WiFi数据解析失败, {time.time()}")
break
elif self._network_type == "4g":
# 4G接收数据
item = hardware_manager.at_client.pop_tcp_payload()
if item:
if isinstance(item, tuple) and len(item) == 2:
link_id, payload = item
else:
link_id, payload = 0, item
if not logged_in:
while len(wifi_manager.recv_buffer) >= 12:
try:
self.logger.debug(f"[TCP] rx link={link_id} len={len(payload)} head={payload[:12].hex()}")
except:
pass
# msg_type, body = self.parse_packet(payload)
msg_type, body = self._netcore.parse_packet(payload)
# 处理登录响应
if not logged_in and msg_type == 1:
if body and body.get("cmd") == 1 and body.get("data") == "登录成功":
logged_in = True
last_heartbeat_ack_time = time.ticks_ms()
self.logger.info("登录成功")
# 检查 ota_pending.json
try:
pending_path = f"{config.APP_DIR}/ota_pending.json"
if os.path.exists(pending_path):
try:
with open(pending_path, "r", encoding="utf-8") as f:
pending_obj = json.load(f)
except:
pending_obj = {}
self.safe_enqueue({"result": "ota_ok", "url": pending_obj.get("url", "")}, 2)
self.logger.info("[OTA] 已上报 ota_ok等待心跳确认后删除 pending")
except Exception as e:
self.logger.error(f"[OTA] ota_ok 上报失败: {e}")
else:
break
# 处理心跳 ACK
elif logged_in and msg_type == 4:
last_heartbeat_ack_time = time.ticks_ms()
self.logger.debug("✅ 收到心跳确认")
# 处理命令40分片下载
elif logged_in and msg_type == 40:
if isinstance(body, dict):
t = body.get('t', 0)
v = body.get('v')
# 如果是第一个分片,清空之前的缓存
if len(self._raw_line_data) == 0 or (len(self._raw_line_data) > 0 and self._raw_line_data[0].get('v') != v):
self._raw_line_data.clear()
# 或者更简单每次收到命令40时如果版本号不同清空缓存
if len(self._raw_line_data) > 0:
first_v = self._raw_line_data[0].get('v')
if first_v and first_v != v:
self._raw_line_data.clear()
self._raw_line_data.append(body)
if len(self._raw_line_data) >= int(t):
self.logger.info(f"下载完成")
from ota_manager import ota_manager
stock_array = list(map(lambda x: x.get('d'), self._raw_line_data))
local_filename = config.LOCAL_FILENAME
with open(local_filename, 'w', encoding='utf-8') as file:
file.write("\n".join(stock_array))
ota_manager.apply_ota_and_reboot(None, local_filename)
body_len, msg_type, checksum = struct.unpack(">III", wifi_manager.recv_buffer[:12])
total_len = 12 + body_len
if len(wifi_manager.recv_buffer) >= total_len:
payload = wifi_manager.recv_buffer[:total_len]
wifi_manager.recv_buffer = wifi_manager.recv_buffer[total_len:]
rx_items.append((0, payload))
else:
self.safe_enqueue({'data':{'l': len(self._raw_line_data), 'v': v}, 'cmd': 41})
self.logger.info(f"已下载{len(self._raw_line_data)} 全部:{t} 版本:{v}")
self.logger.info(f"[NET] 接收WiFi数据不完整, {time.time()}")
break
except Exception:
wifi_manager.recv_buffer = b""
self.logger.info(f"[NET] 接收WiFi数据解析失败, {time.time()}")
break
elif self._network_type == "4g":
item = hardware_manager.at_client.pop_tcp_payload()
if item:
rx_items.append(item)
# 处理业务指令
elif logged_in and isinstance(body, dict):
inner_cmd = None
data_obj = body.get("data")
if isinstance(data_obj, dict):
inner_cmd = data_obj.get("cmd")
if inner_cmd == 2: # 开启激光并校准
from laser_manager import laser_manager
if not laser_manager.calibration_active:
laser_manager.turn_on_laser()
time.sleep_ms(100)
hardware_manager.stop_idle_timer() # 停表
if not config.HARDCODE_LASER_POINT:
laser_manager.start_calibration()
self.safe_enqueue({"result": "calibrating"}, 2)
else:
# 写死的逻辑,不需要校准激光点
self.safe_enqueue({"result": "laser pos set by hard code"}, 2)
elif inner_cmd == 3: # 关闭激光
from laser_manager import laser_manager
laser_manager.turn_off_laser()
laser_manager.stop_calibration()
hardware_manager.start_idle_timer() # 开表
self.safe_enqueue({"result": "laser_off"}, 2)
elif inner_cmd == 4: # 上报电量
voltage = get_bus_voltage()
battery_percent = voltage_to_percent(voltage)
battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)}
self.safe_enqueue(battery_data, 2)
self.logger.info(f"电量上报: {battery_percent}%")
elif inner_cmd == 5: # OTA 升级
inner_data = data_obj.get("data", {}) if isinstance(data_obj, dict) else {}
ssid = inner_data.get("ssid")
password = inner_data.get("password")
ota_url = inner_data.get("url")
mode = (inner_data.get("mode") or "").strip().lower()
if not ota_url:
self.logger.error("ota missing_url")
self.safe_enqueue({"result": "missing_url"}, 2)
continue
from ota_manager import ota_manager
if ota_manager.update_thread_started:
self.safe_enqueue({"result": "update_already_started"}, 2)
continue
# 自动判断模式如果没有明确指定根据WiFi连接状态和凭证决定
if mode not in ("4g", "wifi"):
self.logger.info("ota missing mode, auto-detecting...")
# 若本次会话已锁定 4G则 OTA 自动也走 4G避免后续回切导致体验不一致
if self._session_force_4g:
mode = "4g"
self.logger.info("ota auto-selected: 4g (session locked on 4g)")
else:
# 只有同时满足WiFi已连接 且 提供了WiFi凭证才使用WiFi
if self.is_wifi_connected() and ssid and password:
mode = "wifi"
self.logger.info("ota auto-selected: wifi (WiFi connected and credentials provided)")
else:
mode = "4g"
self.logger.info("ota auto-selected: 4g (WiFi not available or no credentials)")
hardware_manager.stop_idle_timer() # 停表注意OTA停表之后就没有再开表因为OTA后面会重启会重新开表
if mode == "4g":
ota_manager._set_ota_url(ota_url) # 记录 OTA URL供命令7使用
ota_manager._start_update_thread()
_thread.start_new_thread(ota_manager.direct_ota_download_via_4g, (ota_url,))
else: # mode == "wifi"
if not ssid or not password:
self.logger.error("ota wifi mode requires ssid and password")
self.safe_enqueue({"result": "missing_ssid_or_password"}, 2)
else:
ota_manager._start_update_thread()
_thread.start_new_thread(ota_manager.handle_wifi_and_update, (ssid, password, ota_url))
elif inner_cmd == 6:
_rx_login_fail = False
_rx_skip_tcp_iteration = False
if rx_items:
self.logger.info(f"total {len(rx_items)} items")
for item in rx_items:
if isinstance(item, tuple) and len(item) == 2:
link_id, payload = item
else:
link_id, payload = 0, item
if not logged_in:
try:
self.logger.debug(f"[TCP] rx link={link_id} len={len(payload)} head={payload[:12].hex()}")
except:
pass
# msg_type, body = self.parse_packet(payload)
msg_type, body = self._netcore.parse_packet(payload)
# 处理登录响应
if not logged_in and msg_type == 1:
if body and body.get("cmd") == 1 and body.get("data") == "登录成功":
logged_in = True
last_heartbeat_ack_time = time.ticks_ms()
self.logger.info("登录成功")
# 检查 ota_pending.json
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
ip = ip if ip else "no_ip"
except:
ip = "error_getting_ip"
self.safe_enqueue({"result": "current_ip", "ip": ip}, 2)
# elif inner_cmd == 7:
# from ota_manager import ota_manager
# if ota_manager.update_thread_started:
# self.safe_enqueue({"result": "update_already_started"}, 2)
# continue
# try:
# ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
# except:
# ip = None
# if not ip:
# self.safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, 2)
# else:
# # 注意direct_ota_download 需要 ota_url 参数
# # 如果 ota_manager.ota_url 为 None需要从其他地方获取
# ota_url_to_use = ota_manager.ota_url
# if not ota_url_to_use:
# self.logger.error("[OTA] cmd=7 但 OTA_URL 未设置")
# self.safe_enqueue({"result": "ota_failed", "reason": "ota_url_not_set"}, 2)
# else:
# ota_manager._start_update_thread()
# _thread.start_new_thread(ota_manager.direct_ota_download, (ota_url_to_use,))
elif inner_cmd == 41:
self.logger.info(f"[TEST] 收到TCP射箭触发命令, {time.time()}")
self._manual_trigger_flag = True
self.safe_enqueue({"result": "trigger_ack"}, 2)
hardware_manager.start_idle_timer() # 重新计时
elif inner_cmd == 42: # 关机命令
self.logger.info("[SHUTDOWN] 收到TCP关机命令准备关机...")
self.safe_enqueue({"result": "shutdown_ack"}, 2)
time.sleep_ms(1000)
self.disconnect_server()
# 尝试关闭4G模块
try:
with self.get_uart_lock():
hardware_manager.at_client.send("AT+CFUN=0", "OK", 5000)
except:
pass
time.sleep_ms(2000)
os.system("sync") # 刷新文件系统缓存到磁盘,防止数据丢失
time.sleep_ms(500)
# os.system("poweroff")
hardware_manager.power_off()
return
elif inner_cmd == 43: # 上传日志命令
# 格式: {"cmd":43, "data":{"ssid":"xxx","password":"xxx","url":"xxx", ...}}
inner_data = data_obj.get("data", {})
upload_url = inner_data.get("url")
wifi_ssid = inner_data.get("ssid")
wifi_password = inner_data.get("password")
include_rotated = inner_data.get("include_rotated", True)
max_files = inner_data.get("max_files")
archive_format = inner_data.get("archive", "tgz") # tgz 或 zip
hardware_manager.start_idle_timer() # 重新计时
if not upload_url:
self.logger.error("[LOG_UPLOAD] 缺少 url 参数")
self.safe_enqueue({"result": "log_upload_failed", "reason": "missing_url"}, 2)
pending_path = f"{config.APP_DIR}/ota_pending.json"
if os.path.exists(pending_path):
try:
with open(pending_path, "r", encoding="utf-8") as f:
pending_obj = json.load(f)
except:
pending_obj = {}
self.safe_enqueue({"result": "ota_ok", "url": pending_obj.get("url", "")}, 2)
self.logger.info("[OTA] 已上报 ota_ok等待心跳确认后删除 pending")
except Exception as e:
self.logger.error(f"[OTA] ota_ok 上报失败: {e}")
else:
_rx_login_fail = True
break
# 处理心跳 ACK
elif logged_in and msg_type == 4:
last_heartbeat_ack_time = time.ticks_ms()
self.logger.debug("✅ 收到心跳确认")
# 处理命令40分片下载
elif logged_in and msg_type == 40:
if isinstance(body, dict):
t = body.get('t', 0)
v = body.get('v')
# 如果是第一个分片,清空之前的缓存
if len(self._raw_line_data) == 0 or (len(self._raw_line_data) > 0 and self._raw_line_data[0].get('v') != v):
self._raw_line_data.clear()
# 或者更简单每次收到命令40时如果版本号不同清空缓存
if len(self._raw_line_data) > 0:
first_v = self._raw_line_data[0].get('v')
if first_v and first_v != v:
self._raw_line_data.clear()
self._raw_line_data.append(body)
if len(self._raw_line_data) >= int(t):
self.logger.info(f"下载完成")
from ota_manager import ota_manager
stock_array = list(map(lambda x: x.get('d'), self._raw_line_data))
local_filename = config.LOCAL_FILENAME
with open(local_filename, 'w', encoding='utf-8') as file:
file.write("\n".join(stock_array))
ota_manager.apply_ota_and_reboot(None, local_filename)
else:
self.logger.info(f"[LOG_UPLOAD] 收到日志上传命令目标URL: {upload_url}")
# 在新线程中执行上传,避免阻塞主循环
import _thread
_thread.start_new_thread(
self._upload_log_file,
(upload_url, wifi_ssid, wifi_password, include_rotated, max_files, archive_format)
)
else: # data的结构不是 dict
self.logger.info(f"[NET] body={body}, {time.time()}")
else:
self.logger.info(f"[NET] 未知数据 {body}, {time.time()}")
self.safe_enqueue({'data':{'l': len(self._raw_line_data), 'v': v}, 'cmd': 41})
self.logger.info(f"已下载{len(self._raw_line_data)} 全部:{t} 版本:{v}")
# 处理业务指令
elif logged_in and isinstance(body, dict):
inner_cmd = None
data_obj = body.get("data")
if isinstance(data_obj, dict):
inner_cmd = data_obj.get("cmd")
if inner_cmd == 2: # 开启激光并校准
from laser_manager import laser_manager
if not laser_manager.calibration_active:
laser_manager.turn_on_laser()
time.sleep_ms(100)
hardware_manager.stop_idle_timer() # 停表
if not config.HARDCODE_LASER_POINT:
laser_manager.start_calibration()
self.safe_enqueue({"result": "calibrating"}, 2)
else:
# 写死的逻辑,不需要校准激光点
self.safe_enqueue({"result": "laser pos set by hard code"}, 2)
elif inner_cmd == 3: # 关闭激光
from laser_manager import laser_manager
laser_manager.turn_off_laser()
laser_manager.stop_calibration()
hardware_manager.start_idle_timer() # 开表
self.safe_enqueue({"result": "laser_off"}, 2)
elif inner_cmd == 4: # 上报电量
voltage = get_bus_voltage()
battery_percent = voltage_to_percent(voltage)
battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)}
self.safe_enqueue(battery_data, 2)
self.logger.info(f"电量上报: {battery_percent}%")
elif inner_cmd == 5: # OTA 升级
inner_data = data_obj.get("data", {}) if isinstance(data_obj, dict) else {}
ssid = inner_data.get("ssid")
password = inner_data.get("password")
ota_url = inner_data.get("url")
mode = (inner_data.get("mode") or "").strip().lower()
if not ota_url:
self.logger.error("ota missing_url")
self.safe_enqueue({"result": "missing_url"}, 2)
_rx_skip_tcp_iteration = True
break
from ota_manager import ota_manager
if ota_manager.update_thread_started:
self.safe_enqueue({"result": "update_already_started"}, 2)
_rx_skip_tcp_iteration = True
break
# 自动判断模式如果没有明确指定根据WiFi连接状态和凭证决定
if mode not in ("4g", "wifi"):
self.logger.info("ota missing mode, auto-detecting...")
# 若本次会话已锁定 4G则 OTA 自动也走 4G避免后续回切导致体验不一致
if self._session_force_4g:
mode = "4g"
self.logger.info("ota auto-selected: 4g (session locked on 4g)")
else:
# 只有同时满足WiFi已连接 且 提供了WiFi凭证才使用WiFi
if self.is_wifi_connected() and ssid and password:
mode = "wifi"
self.logger.info("ota auto-selected: wifi (WiFi connected and credentials provided)")
else:
mode = "4g"
self.logger.info("ota auto-selected: 4g (WiFi not available or no credentials)")
hardware_manager.stop_idle_timer() # 停表注意OTA停表之后就没有再开表因为OTA后面会重启会重新开表
if mode == "4g":
ota_manager._set_ota_url(ota_url) # 记录 OTA URL供命令7使用
ota_manager._start_update_thread()
_thread.start_new_thread(ota_manager.direct_ota_download_via_4g, (ota_url,))
else: # mode == "wifi"
if not ssid or not password:
self.logger.error("ota wifi mode requires ssid and password")
self.safe_enqueue({"result": "missing_ssid_or_password"}, 2)
else:
ota_manager._start_update_thread()
_thread.start_new_thread(ota_manager.handle_wifi_and_update, (ssid, password, ota_url))
elif inner_cmd == 6:
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
ip = ip if ip else "no_ip"
except:
ip = "error_getting_ip"
self.safe_enqueue({"result": "current_ip", "ip": ip}, 2)
# elif inner_cmd == 7:
# from ota_manager import ota_manager
# if ota_manager.update_thread_started:
# self.safe_enqueue({"result": "update_already_started"}, 2)
# continue
# try:
# ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
# except:
# ip = None
# if not ip:
# self.safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, 2)
# else:
# # 注意direct_ota_download 需要 ota_url 参数
# # 如果 ota_manager.ota_url 为 None需要从其他地方获取
# ota_url_to_use = ota_manager.ota_url
# if not ota_url_to_use:
# self.logger.error("[OTA] cmd=7 但 OTA_URL 未设置")
# self.safe_enqueue({"result": "ota_failed", "reason": "ota_url_not_set"}, 2)
# else:
# ota_manager._start_update_thread()
# _thread.start_new_thread(ota_manager.direct_ota_download, (ota_url_to_use,))
elif inner_cmd == 41:
self.logger.info(f"[TEST] 收到TCP射箭触发命令, {time.time()}")
self._manual_trigger_flag = True
self.safe_enqueue({"result": "trigger_ack"}, 2)
hardware_manager.start_idle_timer() # 重新计时
elif inner_cmd == 42: # 关机命令
self.logger.info("[SHUTDOWN] 收到TCP关机命令准备关机...")
self.safe_enqueue({"result": "shutdown_ack"}, 2)
time.sleep_ms(1000)
self.disconnect_server()
# 尝试关闭4G模块
try:
with self.get_uart_lock():
hardware_manager.at_client.send("AT+CFUN=0", "OK", 5000)
except:
pass
time.sleep_ms(2000)
os.system("sync") # 刷新文件系统缓存到磁盘,防止数据丢失
time.sleep_ms(500)
# os.system("poweroff")
hardware_manager.power_off()
return
elif inner_cmd == 43: # 上传日志命令
# 格式: {"cmd":43, "data":{"ssid":"xxx","password":"xxx","url":"xxx", ...}}
inner_data = data_obj.get("data", {})
upload_url = inner_data.get("url")
wifi_ssid = inner_data.get("ssid")
wifi_password = inner_data.get("password")
include_rotated = inner_data.get("include_rotated", True)
max_files = inner_data.get("max_files")
archive_format = inner_data.get("archive", "tgz") # tgz 或 zip
hardware_manager.start_idle_timer() # 重新计时
if not upload_url:
self.logger.error("[LOG_UPLOAD] 缺少 url 参数")
self.safe_enqueue({"result": "log_upload_failed", "reason": "missing_url"}, 2)
else:
self.logger.info(f"[LOG_UPLOAD] 收到日志上传命令目标URL: {upload_url}")
# 在新线程中执行上传,避免阻塞主循环
import _thread
_thread.start_new_thread(
self._upload_log_file,
(upload_url, wifi_ssid, wifi_password, include_rotated, max_files, archive_format)
)
else: # data的结构不是 dict
self.logger.info(f"[NET] body={body}, {time.time()}")
else:
self.logger.info(f"[NET] 未知数据 {body}, {time.time()}")
if _rx_login_fail:
break
if _rx_skip_tcp_iteration:
continue
else:
time.sleep_ms(5)