Files
archery/network.py

1078 lines
45 KiB
Python
Raw Normal View History

2026-01-20 11:25:17 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
网络通信模块
提供TCP通信数据包打包/解析队列管理等功能
"""
import json
from math import e
import struct
from maix import time
import hmac
import hashlib
import ujson
import os
import threading
import socket
import config
from hardware import hardware_manager
from power import get_bus_voltage, voltage_to_percent
# from laser import laser_manager
# from ota import ota_manager
from logger_manager import logger_manager
class NetworkManager:
"""网络通信管理器(单例)"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(NetworkManager, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# 私有状态
self._tcp_connected = False
self._high_send_queue = []
self._normal_send_queue = []
self._queue_lock = threading.Lock()
self._uart4g_lock = threading.Lock()
self._device_id = None
self._password = None
self._raw_line_data = []
self._manual_trigger_flag = False
# WiFi 相关状态
self._network_type = None # "wifi" 或 "4g" 或 None
self._wifi_connected = False
self._wifi_ip = None
self._wifi_socket = None
self._wifi_socket_lock = threading.Lock()
self._prefer_wifi = True # 是否优先使用WiFi
self._wifi_recv_buffer = b"" # WiFi接收缓冲区
self._initialized = True
# ==================== 状态访问(只读属性)====================
@property
def tcp_connected(self):
"""TCP连接状态"""
return self._tcp_connected
@property
def device_id(self):
"""设备ID"""
return self._device_id
@property
def password(self):
"""密码"""
return self._password
@property
def has_pending_messages(self):
"""是否有待发送消息"""
with self._queue_lock:
return len(self._high_send_queue) > 0 or len(self._normal_send_queue) > 0
@property
def manual_trigger_flag(self):
"""手动触发标志"""
return self._manual_trigger_flag
@property
def network_type(self):
"""当前使用的网络类型("wifi""4g""""
return self._network_type
@property
def wifi_connected(self):
"""WiFi是否已连接"""
return self._wifi_connected
@property
def wifi_ip(self):
"""WiFi IP地址"""
return self._wifi_ip
# ==================== 内部状态管理方法 ====================
def set_manual_trigger(self, value=True):
"""设置手动触发标志(公共方法)"""
self._manual_trigger_flag = value
def clear_manual_trigger(self):
"""清除手动触发标志(公共方法)"""
self._manual_trigger_flag = False
def _set_tcp_connected(self, connected):
"""设置TCP连接状态内部方法"""
self._tcp_connected = connected
def _set_device_info(self, device_id, password):
"""设置设备信息(内部方法)"""
self._device_id = device_id
self._password = password
def _enqueue(self, item, high=False):
"""线程安全地加入队列(内部方法)"""
with self._queue_lock:
if high:
self._high_send_queue.append(item)
else:
self._normal_send_queue.append(item)
def _dequeue(self):
"""线程安全地从队列取出(内部方法)"""
with self._queue_lock:
if self._high_send_queue:
return self._high_send_queue.pop(0)
elif self._normal_send_queue:
return self._normal_send_queue.pop(0)
return None
def _set_raw_line_data(self, data):
"""设置原始行数据(内部方法)"""
self._raw_line_data = data
def _get_raw_line_data(self):
"""获取原始行数据(内部方法)"""
return self._raw_line_data
def get_uart_lock(self):
"""获取UART锁用于with语句"""
return self._uart4g_lock
def get_queue_lock(self):
"""获取队列锁用于with语句"""
return self._queue_lock
# ==================== 业务方法 ====================
def read_device_id(self):
"""从 /device_key 文件读取设备唯一 ID失败则使用默认值"""
try:
with open("/device_key", "r") as f:
device_id = f.read().strip()
if device_id:
logger = logger_manager.logger
if logger:
logger.debug(f"[INFO] 从 /device_key 读取到 DEVICE_ID: {device_id}")
# 设置内部状态
self._device_id = device_id
self._password = device_id + "."
return device_id
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[ERROR] 无法读取 /device_key: {e}")
# 使用默认值
default_id = "DEFAULT_DEVICE_ID"
self._device_id = default_id
self._password = default_id + "."
return default_id
# ==================== WiFi 管理方法 ====================
def is_wifi_connected(self):
"""检查WiFi是否已连接"""
# 优先用 MaixPy network如果可用
try:
from maix import network
wlan = network.WLAN(network.TYPE_WIFI)
if wlan.isconnected():
self._wifi_connected = True
return True
except:
pass
# 兜底:看系统 wlan0 有没有 IP
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
if ip:
self._wifi_connected = True
self._wifi_ip = ip
return True
except:
pass
self._wifi_connected = False
return False
def connect_wifi(self, ssid, password):
"""
连接 Wi-Fi 并将凭证持久化保存到 /boot/ 目录
Returns:
(ip, error): IP地址和错误信息成功时error为None
"""
conf_path = "/etc/wpa_supplicant.conf"
ssid_file = "/boot/wifi.ssid"
pass_file = "/boot/wifi.pass"
try:
# 生成 wpa_supplicant 配置
net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read()
if "network={" not in net_conf:
return None, "Failed to generate wpa config"
# 写入运行时配置
with open(conf_path, "w") as f:
f.write("ctrl_interface=/var/run/wpa_supplicant\n")
f.write("update_config=1\n\n")
f.write(net_conf)
# 持久化保存 SSID/PASS
with open(ssid_file, "w") as f:
f.write(ssid.strip())
with open(pass_file, "w") as f:
f.write(password.strip())
# 重启 Wi-Fi 服务
os.system("/etc/init.d/S30wifi restart")
# 等待获取 IP
import time as std_time
for _ in range(20):
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
if ip:
self._wifi_connected = True
self._wifi_ip = ip
logger = logger_manager.logger
if logger:
logger.info(f"[WIFI] 已连接IP: {ip}")
return ip, None
std_time.sleep(1)
return None, "Timeout: No IP obtained"
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[WIFI] 连接失败: {e}")
return None, f"Exception: {str(e)}"
def is_server_reachable(self, host, port=80, timeout=5):
"""检查目标主机端口是否可达(用于网络检测)"""
try:
addr_info = socket.getaddrinfo(host, port)[0]
s = socket.socket(addr_info[0], addr_info[1], addr_info[2])
s.settimeout(timeout)
s.connect(addr_info[-1])
s.close()
return True
except Exception as e:
logger = logger_manager.logger
if logger:
logger.warning(f"[NET] 无法连接 {host}:{port} - {e}")
return False
# ==================== 网络选择策略 ====================
def select_network(self, prefer_wifi=None):
"""
自动选择网络WiFi优先
Args:
prefer_wifi: 是否优先使用WiFiNone表示使用默认策略
Returns:
"wifi" "4g" None无可用网络
"""
if prefer_wifi is None:
prefer_wifi = self._prefer_wifi
logger = logger_manager.logger
# 策略1如果指定优先WiFi且WiFi可用使用WiFi
if prefer_wifi and self.is_wifi_connected():
# 检查WiFi是否能连接到服务器
if self.is_server_reachable(config.SERVER_IP, config.SERVER_PORT, timeout=3):
self._network_type = "wifi"
if logger:
logger.info(f"[NET] 选择WiFi网络IP: {self._wifi_ip}")
return "wifi"
else:
if logger:
logger.warning("[NET] WiFi已连接但无法访问服务器尝试4G")
# 策略2如果WiFi可用使用WiFi
if self.is_wifi_connected():
if self.is_server_reachable(config.SERVER_IP, config.SERVER_PORT, timeout=3):
self._network_type = "wifi"
if logger:
logger.info(f"[NET] 选择WiFi网络IP: {self._wifi_ip}")
return "wifi"
# 策略3回退到4G
if logger:
logger.info("[NET] WiFi不可用或无法连接服务器使用4G网络")
self._network_type = "4g"
return "4g"
def safe_enqueue(self, data_dict, msg_type=2, high=False):
"""线程安全地将消息加入队列(公共方法)"""
self._enqueue((msg_type, data_dict), high)
def make_packet(self, msg_type: int, body_dict: dict) -> bytes:
"""打包 TCP 数据包:头部(长度+类型+校验)+ JSON 正文"""
body = json.dumps(body_dict).encode()
body_len = len(body)
checksum = body_len + msg_type
header = struct.pack(">III", body_len, msg_type, checksum)
return header + body
def parse_packet(self, data: bytes):
"""解析 TCP 数据包,返回 (类型, 正文字典)"""
if len(data) < 12:
return None, None
body_len, msg_type, checksum = struct.unpack(">III", data[:12])
body = data[12:12 + body_len]
try:
return msg_type, json.loads(body.decode())
except:
return msg_type, {"raw": body.decode(errors="ignore")}
def connect_server(self):
"""
连接到服务器自动选择WiFi或4G
Returns:
bool: 是否连接成功
"""
if self._tcp_connected:
# 检查当前连接是否仍然有效
if self._network_type == "wifi":
return self._check_wifi_connection()
elif self._network_type == "4g":
return True # 4G连接状态由AT命令维护
return False
# 自动选择网络
network_type = self.select_network()
if not network_type:
return False
logger = logger_manager.logger
if logger:
logger.info(f"连接到服务器(使用{network_type.upper()}...")
# 根据网络类型建立TCP连接
if network_type == "wifi":
return self._connect_tcp_via_wifi()
elif network_type == "4g":
return self._connect_tcp_via_4g()
return False
def _connect_tcp_via_wifi(self):
"""通过WiFi建立TCP连接"""
try:
# 创建TCP socket
self._wifi_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._wifi_socket.settimeout(5.0) # 5秒超时
# 连接到服务器
addr_info = socket.getaddrinfo(config.SERVER_IP, config.SERVER_PORT,
socket.AF_INET, socket.SOCK_STREAM)[0]
self._wifi_socket.connect(addr_info[-1])
# 设置非阻塞模式(用于接收数据)
self._wifi_socket.setblocking(False)
self._tcp_connected = True
logger = logger_manager.logger
if logger:
logger.info("[WIFI-TCP] TCP连接已建立")
return True
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[WIFI-TCP] 连接失败: {e}")
if self._wifi_socket:
try:
self._wifi_socket.close()
except:
pass
self._wifi_socket = None
return False
def _connect_tcp_via_4g(self):
"""通过4G模块建立TCP连接"""
with self.get_uart_lock():
hardware_manager.at_client.send("AT+MIPCLOSE=0", "OK", 1000)
res = hardware_manager.at_client.send(f'AT+MIPOPEN=0,"TCP","{config.SERVER_IP}",{config.SERVER_PORT}', "+MIPOPEN", 8000)
if "+MIPOPEN: 0,0" in res:
self._tcp_connected = True
return True
else:
logger = logger_manager.logger
logger.error(f"[4G-TCP] 连接失败: {res}")
return False
def _check_wifi_connection(self):
"""检查WiFi TCP连接是否仍然有效"""
if not self._wifi_socket:
return False
try:
# 尝试发送0字节来检测连接状态
self._wifi_socket.send(b"", socket.MSG_DONTWAIT)
return True
except:
# socket已断开
try:
self._wifi_socket.close()
except:
pass
self._wifi_socket = None
self._tcp_connected = False
return False
def disconnect_server(self):
"""断开TCP连接"""
if self._tcp_connected:
logger = logger_manager.logger
if logger:
logger.info("与服务器断开链接")
if self._network_type == "wifi":
self._disconnect_tcp_via_wifi()
elif self._network_type == "4g":
self._disconnect_tcp_via_4g()
self._tcp_connected = False
self._network_type = None
def _disconnect_tcp_via_wifi(self):
"""断开WiFi TCP连接"""
with self._wifi_socket_lock:
if self._wifi_socket:
try:
self._wifi_socket.close()
except:
pass
self._wifi_socket = None
def _disconnect_tcp_via_4g(self):
"""断开4G TCP连接"""
with self.get_uart_lock():
hardware_manager.at_client.send("AT+MIPCLOSE=0", "OK", 1000)
def tcp_send_raw(self, data: bytes, max_retries=2) -> bool:
"""
统一的TCP发送接口自动选择WiFi或4G
Args:
data: 要发送的数据
max_retries: 最大重试次数
Returns:
bool: 是否发送成功
"""
if not self._tcp_connected:
return False
# 根据网络类型选择发送方式
if self._network_type == "wifi":
return self._tcp_send_raw_via_wifi(data, max_retries)
elif self._network_type == "4g":
return self._tcp_send_raw_via_4g(data, max_retries)
else:
logger = logger_manager.logger
if logger:
logger.error("[NET] 未选择网络类型,无法发送数据")
return False
def _tcp_send_raw_via_wifi(self, data: bytes, max_retries=2) -> bool:
"""通过WiFi socket发送TCP数据"""
if not self._wifi_socket:
return False
with self._wifi_socket_lock:
for attempt in range(max_retries):
try:
# 标准socket发送
total_sent = 0
while total_sent < len(data):
sent = self._wifi_socket.send(data[total_sent:])
if sent == 0:
# socket连接已断开
logger = logger_manager.logger
if logger:
logger.warning(f"[WIFI-TCP] 发送失败socket已断开尝试 {attempt+1}/{max_retries}")
break
total_sent += sent
if total_sent == len(data):
return True
# 发送不完整,重试
time.sleep_ms(50)
except OSError as e:
logger = logger_manager.logger
if logger:
logger.error(f"[WIFI-TCP] 发送异常: {e}(尝试 {attempt+1}/{max_retries}")
time.sleep_ms(50)
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[WIFI-TCP] 未知错误: {e}(尝试 {attempt+1}/{max_retries}")
time.sleep_ms(50)
return False
def _tcp_send_raw_via_4g(self, data: bytes, max_retries=2) -> bool:
"""通过4G模块发送TCP数据"""
with self.get_uart_lock():
for _ in range(max_retries):
cmd = f'AT+MIPSEND=0,{len(data)}'
if ">" not in hardware_manager.at_client.send(cmd, ">", 2000):
time.sleep_ms(50)
continue
total = 0
while total < len(data):
n = hardware_manager.uart4g.write(data[total:])
if not n or n < 0:
time.sleep_ms(1)
continue
total += n
hardware_manager.uart4g.write(b"\x1A")
r = hardware_manager.at_client.send("", "OK", 8000)
if ("SEND OK" in r) or ("OK" in r) or ("+MIPSEND" in r):
return True
time.sleep_ms(50)
return False
def receive_tcp_data_via_wifi(self, timeout_ms=100):
"""
通过WiFi接收TCP数据
Args:
timeout_ms: 超时时间毫秒
Returns:
bytes: 接收到的数据如果没有数据则返回 b""
"""
if not self._wifi_socket:
return b""
try:
# 设置接收超时
self._wifi_socket.settimeout(timeout_ms / 1000.0)
# 尝试接收数据
data = self._wifi_socket.recv(4096) # 每次最多接收4KB
return data
except socket.timeout:
# 超时是正常的,表示没有数据
return b""
except OSError as e:
# socket错误连接断开等
logger = logger_manager.logger
if logger:
logger.warning(f"[WIFI-TCP] 接收数据失败: {e}")
# 关闭socket
try:
self._wifi_socket.close()
except:
pass
self._wifi_socket = None
self._tcp_connected = False
return b""
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[WIFI-TCP] 接收数据异常: {e}")
return b""
def generate_token(self, device_id):
"""生成用于 HTTP 接口鉴权的 TokenHMAC-SHA256"""
SALT = "shootMessageFire"
SALT2 = "shoot"
return "Arrow_" + hmac.new((SALT + device_id).encode(), SALT2.encode(), hashlib.sha256).hexdigest()
def send_http_cmd(self, cmd_str, timeout_ms=3000):
"""发送 HTTP 相关 AT 指令(调试用)"""
logger = logger_manager.logger
if logger:
logger.debug(f"[HTTP AT] => {cmd_str}")
return hardware_manager.at_client.send(cmd_str, "OK", timeout_ms)
def upload_shoot_event(self,json_data):
"""通过 4G 模块上报射击事件到 HTTP 接口(备用通道)"""
token = self.generate_token(self.device_id)
if not self.send_http_cmd(f'AT+MHTTPCREATE="{config.HTTP_URL}"'):
return False
instance_id = 0
self.send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"Content-Type: application/json"')
self.send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"Authorization: {token}"')
self.send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"DeviceId: {self.device_id}"')
json_str = ujson.dumps(json_data)
if not self.send_http_cmd(f'AT+MHTTPCONTENT={instance_id},0,0,"{json_str}"'):
return False
if self.send_http_cmd(f'AT+MHTTPREQUEST={instance_id},2,0,"{config.HTTP_API_PATH}"'):
time.sleep_ms(5000)
return True
return False
def tcp_main(self):
"""TCP 主通信循环:登录、心跳、处理指令、发送数据"""
import _thread
from maix import camera
logger = logger_manager.logger
if logger:
logger.info("[NET] TCP主线程启动")
send_hartbeat_fail_count = 0
last_charging_check = 0
CHARGING_CHECK_INTERVAL = 5000 # 5秒检查一次充电状态
while True:
try:
# 检查充电状态每5秒检查一次
current_time = time.ticks_ms()
if current_time - last_charging_check > CHARGING_CHECK_INTERVAL:
last_charging_check = current_time
# OTA 期间不要 connect/登录/心跳/发送
try:
from ota_manager import ota_manager
if ota_manager.ota_in_progress:
time.sleep_ms(200)
continue
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[NET] OTA检查异常: {e}")
time.sleep_ms(200)
continue
if not self.connect_server():
time.sleep_ms(5000)
continue
# 发送登录包
vol_val = get_bus_voltage()
login_data = {
"deviceId": self.device_id,
"password": self.password,
"version": config.APP_VERSION,
"vol": vol_val,
"vol_per": voltage_to_percent(vol_val)
}
if not self.tcp_send_raw(self.make_packet(1, login_data)):
self._tcp_connected = False
time.sleep_ms(2000)
continue
if logger:
logger.info("➡️ 登录包已发送,等待确认...")
logged_in = False
pending_cleared = False
last_heartbeat_ack_time = time.ticks_ms()
last_heartbeat_send_time = time.ticks_ms()
while True:
# OTA 期间暂停 TCP 活动
try:
from ota_manager import ota_manager
if ota_manager.ota_in_progress:
time.sleep_ms(200)
continue
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[NET] OTA检查异常: {e}")
time.sleep_ms(200)
continue
# 接收数据(根据网络类型选择接收方式)
item = None
if self._network_type == "wifi":
# WiFi接收数据
data = self.receive_tcp_data_via_wifi(timeout_ms=50)
if data:
# 将数据添加到缓冲区
self._wifi_recv_buffer += data
# 尝试从缓冲区解析完整的数据包
while len(self._wifi_recv_buffer) >= 12: # 至少需要12字节的头部
# 解析头部
try:
body_len, msg_type, checksum = struct.unpack(">III", self._wifi_recv_buffer[:12])
total_len = 12 + body_len
if len(self._wifi_recv_buffer) >= total_len:
# 有完整的数据包
payload = self._wifi_recv_buffer[:total_len]
self._wifi_recv_buffer = self._wifi_recv_buffer[total_len:]
item = (0, payload) # link_id=0 for WiFi
break
else:
# 数据包不完整,等待更多数据
break
except:
# 解析失败,清空缓冲区
self._wifi_recv_buffer = b""
break
elif self._network_type == "4g":
# 4G接收数据
item = hardware_manager.at_client.pop_tcp_payload()
if item:
if isinstance(item, tuple) and len(item) == 2:
link_id, payload = item
else:
link_id, payload = 0, item
if not logged_in:
try:
if logger:
logger.debug(f"[TCP] rx link={link_id} len={len(payload)} head={payload[:12].hex()}")
except:
pass
msg_type, body = self.parse_packet(payload)
# 处理登录响应
if not logged_in and msg_type == 1:
if body and body.get("cmd") == 1 and body.get("data") == "登录成功":
logged_in = True
last_heartbeat_ack_time = time.ticks_ms()
if logger:
logger.info("登录成功")
# 检查 ota_pending.json
try:
pending_path = f"{config.APP_DIR}/ota_pending.json"
if os.path.exists(pending_path):
try:
with open(pending_path, "r", encoding="utf-8") as f:
pending_obj = json.load(f)
except:
pending_obj = {}
self.safe_enqueue({"result": "ota_ok", "url": pending_obj.get("url", "")}, 2)
if logger:
logger.info("[OTA] 已上报 ota_ok等待心跳确认后删除 pending")
except Exception as e:
if logger:
logger.error(f"[OTA] ota_ok 上报失败: {e}")
else:
break
# 处理心跳 ACK
elif logged_in and msg_type == 4:
last_heartbeat_ack_time = time.ticks_ms()
if logger:
logger.debug("✅ 收到心跳确认")
# 处理命令40分片下载
elif logged_in and msg_type == 40:
if isinstance(body, dict):
t = body.get('t', 0)
v = body.get('v')
# 如果是第一个分片,清空之前的缓存
if len(self._raw_line_data) == 0 or (len(self._raw_line_data) > 0 and self._raw_line_data[0].get('v') != v):
self._raw_line_data.clear()
# 或者更简单每次收到命令40时如果版本号不同清空缓存
if len(self._raw_line_data) > 0:
first_v = self._raw_line_data[0].get('v')
if first_v and first_v != v:
self._raw_line_data.clear()
self._raw_line_data.append(body)
if len(self._raw_line_data) >= int(t):
if logger:
logger.info(f"下载完成")
from ota_manager import ota_manager
stock_array = list(map(lambda x: x.get('d'), self._raw_line_data))
local_filename = config.LOCAL_FILENAME
with open(local_filename, 'w', encoding='utf-8') as file:
file.write("\n".join(stock_array))
ota_manager.apply_ota_and_reboot(None, local_filename)
else:
self.safe_enqueue({'data':{'l': len(self._raw_line_data), 'v': v}, 'cmd': 41})
if logger:
logger.info(f"已下载{len(self._raw_line_data)} 全部:{t} 版本:{v}")
# 处理业务指令
elif logged_in and isinstance(body, dict):
inner_cmd = None
data_obj = body.get("data")
if isinstance(data_obj, dict):
inner_cmd = data_obj.get("cmd")
if inner_cmd == 2: # 开启激光并校准
from laser_manager import laser_manager
if not laser_manager.calibration_active:
laser_manager.turn_on_laser()
time.sleep_ms(100)
laser_manager.start_calibration()
self.safe_enqueue({"result": "calibrating"}, 2)
elif inner_cmd == 3: # 关闭激光
from laser_manager import laser_manager
laser_manager.turn_off_laser()
laser_manager.stop_calibration()
self.safe_enqueue({"result": "laser_off"}, 2)
elif inner_cmd == 4: # 上报电量
voltage = get_bus_voltage()
battery_percent = voltage_to_percent(voltage)
battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)}
self.safe_enqueue(battery_data, 2)
if logger:
logger.info(f"电量上报: {battery_percent}%")
elif inner_cmd == 5: # OTA 升级
inner_data = data_obj.get("data", {}) if isinstance(data_obj, dict) else {}
ssid = inner_data.get("ssid")
password = inner_data.get("password")
ota_url = inner_data.get("url")
mode = (inner_data.get("mode") or "").strip().lower()
if not ota_url:
if logger:
logger.error("ota missing_url")
self.safe_enqueue({"result": "missing_url"}, 2)
continue
from ota_manager import ota_manager
if ota_manager.update_thread_started:
self.safe_enqueue({"result": "update_already_started"}, 2)
continue
# 自动判断模式如果没有明确指定根据WiFi连接状态和凭证决定
if mode not in ("4g", "wifi"):
if logger:
logger.info("ota missing mode, auto-detecting...")
# 只有同时满足WiFi已连接 且 提供了WiFi凭证才使用WiFi
if self.is_wifi_connected() and ssid and password:
mode = "wifi"
if logger:
logger.info("ota auto-selected: wifi (WiFi connected and credentials provided)")
else:
mode = "4g"
if logger:
logger.info("ota auto-selected: 4g (WiFi not available or no credentials)")
if mode == "4g":
ota_manager._set_ota_url(ota_url) # 记录 OTA URL供命令7使用
ota_manager._start_update_thread()
_thread.start_new_thread(ota_manager.direct_ota_download_via_4g, (ota_url,))
else: # mode == "wifi"
if not ssid or not password:
if logger:
logger.error("ota wifi mode requires ssid and password")
self.safe_enqueue({"result": "missing_ssid_or_password"}, 2)
else:
ota_manager._start_update_thread()
_thread.start_new_thread(ota_manager.handle_wifi_and_update, (ssid, password, ota_url))
elif inner_cmd == 6:
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
ip = ip if ip else "no_ip"
except:
ip = "error_getting_ip"
self.safe_enqueue({"result": "current_ip", "ip": ip}, 2)
elif inner_cmd == 7:
from ota_manager import ota_manager
if ota_manager.update_thread_started:
self.safe_enqueue({"result": "update_already_started"}, 2)
continue
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
except:
ip = None
if not ip:
self.safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, 2)
else:
# 注意direct_ota_download 需要 ota_url 参数
# 如果 ota_manager.ota_url 为 None需要从其他地方获取
ota_url_to_use = ota_manager.ota_url
if not ota_url_to_use:
if logger:
logger.error("[OTA] cmd=7 但 OTA_URL 未设置")
self.safe_enqueue({"result": "ota_failed", "reason": "ota_url_not_set"}, 2)
else:
ota_manager._start_update_thread()
_thread.start_new_thread(ota_manager.direct_ota_download, (ota_url_to_use,))
elif inner_cmd == 41:
if logger:
logger.info("[TEST] 收到TCP射箭触发命令")
self._manual_trigger_flag = True
self.safe_enqueue({"result": "trigger_ack"}, 2)
elif inner_cmd == 42: # 关机命令
if logger:
logger.info("[SHUTDOWN] 收到TCP关机命令准备关机...")
self.safe_enqueue({"result": "shutdown_ack"}, 2)
time.sleep_ms(1000)
self.disconnect_server()
# 尝试关闭4G模块
try:
with self.get_uart_lock():
hardware_manager.at_client.send("AT+CFUN=0", "OK", 5000)
except:
pass
time.sleep_ms(2000)
os.system("poweroff")
return
else:
time.sleep_ms(5)
# 发送队列中的业务数据
if logged_in and (self._high_send_queue or self._normal_send_queue):
msg_type = None
data_dict = None
if self.get_queue_lock().acquire(blocking=False):
try:
if self._high_send_queue:
msg_type, data_dict = self._high_send_queue.pop(0)
elif self._normal_send_queue:
msg_type, data_dict = self._normal_send_queue.pop(0)
finally:
self.get_queue_lock().release()
if msg_type is not None and data_dict is not None:
pkt = self.make_packet(msg_type, data_dict)
if not self.tcp_send_raw(pkt):
self._tcp_connected = False
break
# 发送激光校准结果
if logged_in:
from laser_manager import laser_manager
result = laser_manager.get_calibration_result()
if result:
x, y = result
self.safe_enqueue({"result": "ok", "x": x, "y": y}, 2)
# 定期发送心跳
current_time = time.ticks_ms()
if logged_in and current_time - last_heartbeat_send_time > config.HEARTBEAT_INTERVAL * 1000:
vol_val = get_bus_voltage()
if not self.tcp_send_raw(self.make_packet(4, {"vol": vol_val, "vol_per": voltage_to_percent(vol_val)})):
if logger:
logger.error("心跳发送失败")
time.sleep_ms(3000)
send_hartbeat_fail_count += 1
if send_hartbeat_fail_count >= 3:
send_hartbeat_fail_count = 0
if logger:
logger.error("连续3次发送心跳失败重连")
break
else:
continue
else:
send_hartbeat_fail_count = 0
last_heartbeat_send_time = current_time
if logger:
logger.debug("心跳已发送")
# 删除 pending 文件(心跳发送成功后)
if not pending_cleared:
try:
pending_path = f"{config.APP_DIR}/ota_pending.json"
if os.path.exists(pending_path):
try:
os.remove(pending_path)
pending_cleared = True
if logger:
logger.info("[OTA] 心跳发送成功,已删除 ota_pending.json")
except Exception as e:
if logger:
logger.error(f"[OTA] 删除 pending 文件失败: {e}")
except Exception as e:
if logger:
logger.error(f"[OTA] 检查 pending 文件时出错: {e}")
# 心跳超时重连
if logged_in and current_time - last_heartbeat_ack_time > 1000*60*10:
if logger:
logger.error("十分钟无心跳ACK重连")
break
time.sleep_ms(50)
self._tcp_connected = False
if logger:
logger.error("连接异常2秒后重连...")
time.sleep_ms(2000)
except Exception as e:
# TCP主循环的顶层异常捕获防止线程静默退出
logger = logger_manager.logger
if logger:
logger.error(f"[NET] TCP主循环异常: {e}")
import traceback
logger.error(traceback.format_exc())
else:
print(f"[NET] TCP主循环异常: {e}")
import traceback
traceback.print_exc()
self._tcp_connected = False
time.sleep_ms(5000) # 等待5秒后重试连接
# 创建全局单例实例
network_manager = NetworkManager()
# ==================== 向后兼容的函数接口 ====================
def tcp_main():
"""TCP主循环向后兼容接口"""
return network_manager.tcp_main()
def read_device_id():
"""读取设备ID向后兼容接口"""
return network_manager.read_device_id()
def safe_enqueue(data_dict, msg_type=2, high=False):
"""线程安全地加入队列(向后兼容接口)"""
return network_manager.safe_enqueue(data_dict, msg_type, high)
def connect_server():
"""连接服务器(向后兼容接口)"""
return network_manager.connect_server()
def disconnet_server():
"""断开服务器连接(向后兼容接口)"""
return network_manager.disconnect_server()
def is_wifi_connected():
"""检查WiFi是否已连接向后兼容接口"""
return network_manager.is_wifi_connected()
def connect_wifi(ssid, password):
"""连接WiFi向后兼容接口"""
return network_manager.connect_wifi(ssid, password)
def is_server_reachable(host, port=80, timeout=5):
"""检查服务器是否可达(向后兼容接口)"""
return network_manager.is_server_reachable(host, port, timeout)