Files
archery/network.py
gcw_4spBpAfv 61096ba190 'v1.2.3'
2026-02-05 12:45:52 +08:00

1233 lines
54 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
网络通信模块
提供TCP通信、数据包打包/解析、队列管理等功能
"""
import json
from math import e
import struct
from maix import time
import hmac
import hashlib
import ujson
import os
import threading
import socket
import config
from hardware import hardware_manager
from power import get_bus_voltage, voltage_to_percent
# from laser import laser_manager
# from ota import ota_manager
from logger_manager import logger_manager
class NetworkManager:
"""网络通信管理器(单例)"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(NetworkManager, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# 私有状态
self._tcp_connected = False
self._high_send_queue = []
self._normal_send_queue = []
self._queue_lock = threading.Lock()
self._uart4g_lock = threading.Lock()
self._device_id = None
self._password = None
self._raw_line_data = []
self._manual_trigger_flag = False
# WiFi 相关状态
self._network_type = None # "wifi" 或 "4g" 或 None
self._wifi_connected = False
self._wifi_ip = None
self._wifi_socket = None
self._wifi_socket_lock = threading.Lock()
self._prefer_wifi = True # 是否优先使用WiFi
self._wifi_recv_buffer = b"" # WiFi接收缓冲区
self._initialized = True
# 导入 archery_netcore 模块,并检查是否存在 parse_packet 和 make_packet 函数
try:
import archery_netcore as _netcore
self._netcore = _netcore
if hasattr(self._netcore, "parse_packet") and hasattr(self._netcore, "make_packet") and hasattr(self._netcore, "actions_for_inner_cmd"):
print("[NET] archery_netcore found")
else:
print("[NET] archery_netcore not found parse_packet or make_packet")
exit(1)
except Exception:
print("[NET] import archery_netcore failed")
exit(1)
# 服务器相关
self._server_ip = self._netcore.get_config().get("SERVER_IP")
self._server_port = self._netcore.get_config().get("SERVER_PORT")
# ==================== 状态访问(只读属性)====================
@property
def logger(self):
"""获取 logger 对象"""
return logger_manager.logger
@property
def tcp_connected(self):
"""TCP连接状态"""
return self._tcp_connected
@property
def device_id(self):
"""设备ID"""
return self._device_id
@property
def password(self):
"""密码"""
return self._password
@property
def has_pending_messages(self):
"""是否有待发送消息"""
with self._queue_lock:
return len(self._high_send_queue) > 0 or len(self._normal_send_queue) > 0
@property
def manual_trigger_flag(self):
"""手动触发标志"""
return self._manual_trigger_flag
@property
def network_type(self):
"""当前使用的网络类型("wifi""4g""""
return self._network_type
@property
def wifi_connected(self):
"""WiFi是否已连接"""
return self._wifi_connected
@property
def wifi_ip(self):
"""WiFi IP地址"""
return self._wifi_ip
# ==================== 内部状态管理方法 ====================
def set_manual_trigger(self, value=True):
"""设置手动触发标志(公共方法)"""
self._manual_trigger_flag = value
def clear_manual_trigger(self):
"""清除手动触发标志(公共方法)"""
self._manual_trigger_flag = False
def _set_tcp_connected(self, connected):
"""设置TCP连接状态内部方法"""
self._tcp_connected = connected
def _set_device_info(self, device_id, password):
"""设置设备信息(内部方法)"""
self._device_id = device_id
self._password = password
def _enqueue(self, item, high=False):
"""线程安全地加入队列(内部方法)"""
with self._queue_lock:
if high:
self._high_send_queue.append(item)
else:
self._normal_send_queue.append(item)
def _dequeue(self):
"""线程安全地从队列取出(内部方法)"""
with self._queue_lock:
if self._high_send_queue:
return self._high_send_queue.pop(0)
elif self._normal_send_queue:
return self._normal_send_queue.pop(0)
return None
def _set_raw_line_data(self, data):
"""设置原始行数据(内部方法)"""
self._raw_line_data = data
def _get_raw_line_data(self):
"""获取原始行数据(内部方法)"""
return self._raw_line_data
def get_uart_lock(self):
"""获取UART锁用于with语句"""
return self._uart4g_lock
def get_queue_lock(self):
"""获取队列锁用于with语句"""
return self._queue_lock
# ==================== 业务方法 ====================
def read_device_id(self):
"""从 /device_key 文件读取设备唯一 ID失败则使用默认值"""
try:
with open("/device_key", "r") as f:
device_id = f.read().strip()
if device_id:
self.logger.debug(f"[INFO] 从 /device_key 读取到 DEVICE_ID: {device_id}")
# 设置内部状态
self._device_id = device_id
self._password = device_id + "."
return device_id
except Exception as e:
self.logger.error(f"[ERROR] 无法读取 /device_key: {e}")
# 使用默认值
default_id = "DEFAULT_DEVICE_ID"
self._device_id = default_id
self._password = default_id + "."
return default_id
# ==================== WiFi 管理方法 ====================
def is_wifi_connected(self):
"""检查WiFi是否已连接"""
# 优先用 MaixPy network如果可用
try:
from maix import network
wlan = network.WLAN(network.TYPE_WIFI)
if wlan.isconnected():
self._wifi_connected = True
return True
except:
pass
# 兜底:看系统 wlan0 有没有 IP
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
if ip:
self._wifi_connected = True
self._wifi_ip = ip
return True
except:
pass
self._wifi_connected = False
return False
def connect_wifi(self, ssid, password):
"""
连接 Wi-Fi 并将凭证持久化保存到 /boot/ 目录
Returns:
(ip, error): IP地址和错误信息成功时error为None
"""
# 配置文件路径定义
conf_path = "/etc/wpa_supplicant.conf" # wpa_supplicant配置文件路径
ssid_file = "/boot/wifi.ssid" # 用于保存SSID的文件路径
pass_file = "/boot/wifi.pass" # 用于保存密码的文件路径
try:
# 生成 wpa_supplicant 配置
net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read() # 调用系统命令生成配置
if "network={" not in net_conf: # 检查配置是否生成成功
return None, "Failed to generate wpa config"
# 写入运行时配置
with open(conf_path, "w") as f: # 打开配置文件准备写入
f.write("ctrl_interface=/var/run/wpa_supplicant\n") # 设置控制接口路径
f.write("update_config=1\n\n") # 允许更新配置
f.write(net_conf) # 写入网络配置
# 持久化保存 SSID/PASS
with open(ssid_file, "w") as f: # 打开SSID文件准备写入
f.write(ssid.strip()) # 写入SSID去除首尾空格
with open(pass_file, "w") as f: # 打开密码文件准备写入
f.write(password.strip()) # 写入密码(去除首尾空格)
# 重启 Wi-Fi 服务
os.system("/etc/init.d/S30wifi restart") # 执行WiFi服务重启命令
# 等待获取 IP
import time as std_time # 导入time模块并重命名为std_time
for _ in range(50): # 最多等待50秒
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() # 获取wlan0的IP地址
if ip: # 如果获取到IP地址
self._wifi_connected = True # 设置WiFi连接状态为已连接
self._wifi_ip = ip # 保存IP地址
self.logger.info(f"[WIFI] 已连接IP: {ip}") # 记录连接成功日志
return ip, None
std_time.sleep(1) # 每次循环等待1秒
return None, "Timeout: No IP obtained" # 超时未获取到IP
except Exception as e: # 捕获所有异常
self.logger.error(f"[WIFI] 连接失败: {e}") # 记录错误日志
return None, f"Exception: {str(e)}" # 返回异常信息
def is_server_reachable(self, host, port=80, timeout=5):
"""检查目标主机端口是否可达(用于网络检测)"""
try:
addr_info = socket.getaddrinfo(host, port)[0]
s = socket.socket(addr_info[0], addr_info[1], addr_info[2])
s.settimeout(timeout)
s.connect(addr_info[-1])
s.close()
return True
except Exception as e:
self.logger.warning(f"[NET] 无法连接 {host}:{port} - {e}")
return False
# ==================== 网络选择策略 ====================
def select_network(self, prefer_wifi=None):
"""
自动选择网络WiFi优先
Args:
prefer_wifi: 是否优先使用WiFiNone表示使用默认策略
Returns:
"wifi""4g" 或 None无可用网络
"""
if prefer_wifi is None:
prefer_wifi = self._prefer_wifi
# 策略1如果指定优先WiFi且WiFi可用使用WiFi
if prefer_wifi and self.is_wifi_connected():
# 检查WiFi是否能连接到服务器
if self.is_server_reachable(self._server_ip, self._server_port, timeout=3):
self._network_type = "wifi"
self.logger.info(f"[NET] 选择WiFi网络IP: {self._wifi_ip}")
import os
os.environ["TZ"] = "Asia/Shanghai"
os.system("ntpdate pool.ntp.org")
return "wifi"
else:
self.logger.warning("[NET] WiFi已连接但无法访问服务器尝试4G")
# 策略2如果WiFi可用使用WiFi
if self.is_wifi_connected():
if self.is_server_reachable(self._server_ip, self._server_port, timeout=3):
self._network_type = "wifi"
self.logger.info(f"[NET] 选择WiFi网络IP: {self._wifi_ip}")
return "wifi"
# 策略3回退到4G
self.logger.info("[NET] WiFi不可用或无法连接服务器使用4G网络")
self._network_type = "4g"
return "4g"
def safe_enqueue(self, data_dict, msg_type=2, high=False):
"""线程安全地将消息加入队列(公共方法)"""
self._enqueue((msg_type, data_dict), high)
def make_packet(self, msg_type: int, body_dict: dict) -> bytes:
"""打包 TCP 数据包:头部(长度+类型+校验)+ JSON 正文"""
body = json.dumps(body_dict).encode()
body_len = len(body)
checksum = body_len + msg_type
header = struct.pack(">III", body_len, msg_type, checksum)
return header + body
def parse_packet(self, data: bytes):
"""解析 TCP 数据包,返回 (类型, 正文字典)"""
if len(data) < 12:
return None, None
body_len, msg_type, checksum = struct.unpack(">III", data[:12])
expected_len = 12 + body_len
# 防御性检查:如果 data 比预期长,说明可能有粘包
if len(data) > expected_len:
self.logger.warning(
f"[TCP] parse_packet: data length ({len(data)}) > expected ({expected_len}), "
f"possible packet concatenation. body_len={body_len}, msg_type={msg_type}"
)
# 只解析第一个包,忽略多余数据(或者可以返回剩余部分)
# data = data[:expected_len]
# TODO 是否需要解析剩余部分?
# 如果 data 比预期短,说明包不完整(半包)
if len(data) < expected_len:
self.logger.warning(
f"[TCP] parse_packet: data length ({len(data)}) < expected ({expected_len}), "
f"incomplete packet. body_len={body_len}, msg_type={msg_type}"
)
return None, None
body = data[12:12 + body_len]
try:
return msg_type, json.loads(body.decode())
except:
return msg_type, {"raw": body.decode(errors="ignore")}
def connect_server(self):
"""
连接到服务器自动选择WiFi或4G
Returns:
bool: 是否连接成功
"""
if self._tcp_connected:
# 检查当前连接是否仍然有效
if self._network_type == "wifi":
return self._check_wifi_connection()
elif self._network_type == "4g":
return True # 4G连接状态由AT命令维护
return False
# 自动选择网络
network_type = self.select_network()
if not network_type:
return False
self.logger.info(f"连接到服务器,使用{network_type.upper()}...")
# 根据网络类型建立TCP连接
if network_type == "wifi":
return self._connect_tcp_via_wifi()
elif network_type == "4g":
return self._connect_tcp_via_4g()
return False
def _connect_tcp_via_wifi(self):
"""通过WiFi建立TCP连接"""
try:
# 创建TCP socket
self._wifi_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._wifi_socket.settimeout(5.0) # 5秒超时
# 连接到服务器
addr_info = socket.getaddrinfo(config.SERVER_IP, config.SERVER_PORT,
socket.AF_INET, socket.SOCK_STREAM)[0]
self._wifi_socket.connect(addr_info[-1])
# 设置非阻塞模式(用于接收数据)
self._wifi_socket.setblocking(False)
self._tcp_connected = True
self.logger.info("[WIFI-TCP] TCP连接已建立")
return True
except Exception as e:
self.logger.error(f"[WIFI-TCP] 连接失败: {e}")
if self._wifi_socket:
try:
self._wifi_socket.close()
except:
pass
self._wifi_socket = None
return False
def _connect_tcp_via_4g(self):
"""通过4G模块建立TCP连接支持按手册绑定 SSL"""
link_id = getattr(config, "TCP_LINK_ID", 0)
use_ssl = getattr(config, "USE_TCP_SSL", False)
host = self._server_ip
port = getattr(config, "TCP_SSL_PORT", 443) if use_ssl else config.SERVER_PORT
tail = getattr(config, "MIPOPEN_TAIL", "")
with self.get_uart_lock():
resp = hardware_manager.at_client.send(f"AT+MIPCLOSE={link_id}", "OK", 1000)
self.logger.info(f"[4G-TCP] AT+MIPCLOSE={link_id} response: {resp}")
if use_ssl:
ok = self._configure_ssl_before_connect(link_id)
if not ok:
return False
# 按手册AT+MIPOPEN=1,"TCP","host",443,,0
# cmd = f'AT+MIPOPEN={link_id},"TCP","{host}",{port}{tail}'
cmd = f'AT+MIPOPEN={link_id},"TCP","{host}",{port}'
res = hardware_manager.at_client.send(cmd, "+MIPOPEN", 8000)
self.logger.info(f"[4G-TCP] {cmd} response: {res}")
if f"+MIPOPEN: {link_id},0" in res:
self._tcp_connected = True
return True
return False
def _check_wifi_connection(self):
"""检查WiFi TCP连接是否仍然有效"""
if not self._wifi_socket:
return False
try:
# 尝试发送0字节来检测连接状态
self._wifi_socket.send(b"", socket.MSG_DONTWAIT)
return True
except:
# socket已断开
try:
self._wifi_socket.close()
except:
pass
self._wifi_socket = None
self._tcp_connected = False
return False
def disconnect_server(self):
"""断开TCP连接"""
if self._tcp_connected:
self.logger.info("与服务器断开链接")
if self._network_type == "wifi":
self._disconnect_tcp_via_wifi()
elif self._network_type == "4g":
self._disconnect_tcp_via_4g()
self._tcp_connected = False
self._network_type = None
def _disconnect_tcp_via_wifi(self):
"""断开WiFi TCP连接"""
with self._wifi_socket_lock:
if self._wifi_socket:
try:
self._wifi_socket.close()
except:
pass
self._wifi_socket = None
def _disconnect_tcp_via_4g(self):
link_id = getattr(config, "TCP_LINK_ID", 0)
with self.get_uart_lock():
hardware_manager.at_client.send(f"AT+MIPCLOSE={link_id}", "OK", 1000)
def tcp_send_raw(self, data: bytes, max_retries=2) -> bool:
"""
统一的TCP发送接口自动选择WiFi或4G
Args:
data: 要发送的数据
max_retries: 最大重试次数
Returns:
bool: 是否发送成功
"""
if not self._tcp_connected:
return False
# 根据网络类型选择发送方式
if self._network_type == "wifi":
return self._tcp_send_raw_via_wifi(data, max_retries)
elif self._network_type == "4g":
return self._tcp_send_raw_via_4g(data, max_retries)
else:
self.logger.error("[NET] 未选择网络类型,无法发送数据")
return False
def _tcp_send_raw_via_wifi(self, data: bytes, max_retries=2) -> bool:
"""通过WiFi socket发送TCP数据"""
if not self._wifi_socket:
return False
with self._wifi_socket_lock:
for attempt in range(max_retries):
try:
# 标准socket发送
total_sent = 0
while total_sent < len(data):
sent = self._wifi_socket.send(data[total_sent:])
if sent == 0:
# socket连接已断开
self.logger.warning(f"[WIFI-TCP] 发送失败socket已断开尝试 {attempt+1}/{max_retries}")
break
total_sent += sent
if total_sent == len(data):
return True
# 发送不完整,重试
time.sleep_ms(50)
except OSError as e:
self.logger.error(f"[WIFI-TCP] 发送异常: {e}(尝试 {attempt+1}/{max_retries}")
time.sleep_ms(50)
except Exception as e:
self.logger.error(f"[WIFI-TCP] 未知错误: {e}(尝试 {attempt+1}/{max_retries}")
time.sleep_ms(50)
return False
def _tcp_send_raw_via_4g(self, data: bytes, max_retries=2) -> bool:
link_id = getattr(config, "TCP_LINK_ID", 0)
with self.get_uart_lock():
for _ in range(max_retries):
cmd = f'AT+MIPSEND={link_id},{len(data)}'
if ">" not in hardware_manager.at_client.send(cmd, ">", 2000):
time.sleep_ms(50)
continue
total = 0
while total < len(data):
n = hardware_manager.uart4g.write(data[total:])
if not n or n < 0:
time.sleep_ms(1)
continue
total += n
hardware_manager.uart4g.write(b"\x1A")
r = hardware_manager.at_client.send("", "OK", 8000)
if ("SEND OK" in r) or ("OK" in r) or ("+MIPSEND" in r):
return True
time.sleep_ms(50)
return False
def _configure_ssl_before_connect(self, link_id: int) -> bool:
"""按手册MSSLCFG(auth) -> (可选) MSSLCERTWR -> MSSLCFG(cert) -> MIPCFG(ssl)"""
ssl_id = getattr(config, "SSL_ID", 1)
auth_mode = getattr(config, "SSL_AUTH_MODE", 1)
verify_mode = getattr(config, "SSL_VERIFY_MODE", 0)
# 1) 配置认证方式
# r = hardware_manager.at_client.send(f'AT+MSSLCFG="auth",{ssl_id},{auth_mode}', "OK", 3000)
r = hardware_manager.at_client.send(f'AT+MSSLCFG="auth",{ssl_id},0', "OK", 3000)
self.logger.info(f"[4G-TCP] AT+MSSLCFG=\"auth\",{ssl_id},0 response: {r}")
if "OK" not in r:
return False
# 2) 写入根证书(只有 verify_mode=1 才需要)
# if verify_mode == 1:
if False:
cert_filename = getattr(config, "SSL_CERT_FILENAME", None)
cert_path = getattr(config, "SSL_CERT_PATH", None)
if not cert_filename or not cert_path:
return False
# 读取证书文件(设备侧路径)
with open(cert_path, "rb") as f:
cert_data = f.read()
# 按手册AT+MSSLCERTWR="file",0,size -> 等待 ">" -> 写入证书内容 -> 等 OK
r = hardware_manager.at_client.send(f'AT+MSSLCERTWR="{cert_filename}",0,{len(cert_data)}', ">", 5000)
if ">" not in r:
return False
# 直接写原始字节,不要额外拼 \r\n
hardware_manager.uart4g.write(cert_data)
r = hardware_manager.at_client.send("", "OK", 8000)
if "OK" not in r:
return False
# 3) 引用根证书
r = hardware_manager.at_client.send(f'AT+MSSLCFG="cert",{ssl_id},"{cert_filename}"', "OK", 3000)
if "OK" not in r:
return False
# 4) 绑定 TCP 通道到 ssl_id并启用
r = hardware_manager.at_client.send(f'AT+MIPCFG="ssl",{link_id},{ssl_id},1', "OK", 3000)
self.logger.info(f"[4G-TCP] AT+MIPCFG=\"ssl\",{link_id},{ssl_id},1 response: {r}")
if "OK" not in r:
return False
return True
def receive_tcp_data_via_wifi(self, timeout_ms=100):
"""
通过WiFi接收TCP数据
Args:
timeout_ms: 超时时间(毫秒)
Returns:
bytes: 接收到的数据,如果没有数据则返回 b""
"""
if not self._wifi_socket:
return b""
try:
# 设置接收超时
self._wifi_socket.settimeout(timeout_ms / 1000.0)
# 尝试接收数据
data = self._wifi_socket.recv(4096) # 每次最多接收4KB
return data
except socket.timeout:
# 超时是正常的,表示没有数据
return b""
except OSError as e:
# socket错误连接断开等
self.logger.warning(f"[WIFI-TCP] 接收数据失败: {e}")
# 关闭socket
try:
self._wifi_socket.close()
except:
pass
self._wifi_socket = None
self._tcp_connected = False
return b""
except Exception as e:
self.logger.error(f"[WIFI-TCP] 接收数据异常: {e}")
return b""
def _upload_log_file(self, upload_url, wifi_ssid=None, wifi_password=None):
"""上传日志文件到指定URL
Args:
upload_url: 上传目标URL例如 "https://example.com/upload/"
wifi_ssid: WiFi SSID可选如果未连接WiFi则尝试连接
wifi_password: WiFi 密码(可选)
Note:
该功能仅在 WiFi 连接时可用4G 网络暂不支持文件上传
"""
import requests
import shutil
from datetime import datetime
try:
# 检查 WiFi 连接状态,如果未连接则尝试连接
if not self.is_wifi_connected():
if wifi_ssid and wifi_password:
self.logger.info(f"[LOG_UPLOAD] WiFi 未连接,尝试连接 WiFi: {wifi_ssid}")
self.safe_enqueue({"result": "log_upload_connecting_wifi", "ssid": wifi_ssid}, 2)
ip, error = self.connect_wifi(wifi_ssid, wifi_password)
if error:
self.logger.error(f"[LOG_UPLOAD] WiFi 连接失败: {error}")
self.safe_enqueue({
"result": "log_upload_failed",
"reason": "wifi_connect_failed",
"detail": error
}, 2)
return
self.logger.info(f"[LOG_UPLOAD] WiFi 连接成功IP: {ip}")
else:
self.logger.warning("[LOG_UPLOAD] WiFi 未连接且未提供 WiFi 凭证,无法上传日志")
self.safe_enqueue({
"result": "log_upload_failed",
"reason": "wifi_not_connected",
"detail": "WiFi not connected and no credentials provided"
}, 2)
return
else:
self.logger.info("[LOG_UPLOAD] WiFi 已连接,跳过连接步骤")
self.logger.info(f"[LOG_UPLOAD] 开始上传日志文件...")
# 获取日志文件路径
log_file_path = config.LOG_FILE # /maixapp/apps/t11/app.log
if not os.path.exists(log_file_path):
self.logger.error(f"[LOG_UPLOAD] 日志文件不存在: {log_file_path}")
self.safe_enqueue({"result": "log_upload_failed", "reason": "log_file_not_found"}, 2)
return
# 生成带时间戳的文件名
# 格式: app_20260131_143025_d19566161359c372.log
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
device_id = self._device_id or "unknown"
new_filename = f"app_{timestamp}_{device_id}.log"
# 创建临时文件(带时间戳的名字)
temp_dir = "/tmp"
temp_file_path = os.path.join(temp_dir, new_filename)
# 复制日志文件到临时位置
shutil.copy2(log_file_path, temp_file_path)
self.logger.info(f"[LOG_UPLOAD] 日志文件已复制到: {temp_file_path}")
# 使用 multipart/form-data 上传文件
with open(temp_file_path, 'rb') as f:
files = {'file': (new_filename, f, 'text/plain')}
# 添加额外的头部信息
headers = {
'User-Agent': 'Archery-Device/1.0',
'X-Device-ID': device_id,
}
# 如果是 ngrok-free.dev添加绕过警告页面的头
if 'ngrok-free.dev' in upload_url or 'ngrok.io' in upload_url:
headers['ngrok-skip-browser-warning'] = 'true'
self.logger.info(f"[LOG_UPLOAD] 正在上传到: {upload_url}")
# 禁用 SSL 警告(用于自签名证书或 SSL 兼容性问题)
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 发送请求verify=False 跳过 SSL 证书验证(解决 MaixCAM SSL 兼容性问题)
response = requests.post(upload_url, files=files, headers=headers, timeout=60, verify=False)
if response.status_code in (200, 201, 204):
self.logger.info(f"[LOG_UPLOAD] 上传成功! 状态码: {response.status_code}")
self.safe_enqueue({
"result": "log_upload_ok",
"filename": new_filename,
"status_code": response.status_code
}, 2)
else:
self.logger.error(f"[LOG_UPLOAD] 上传失败! 状态码: {response.status_code}, 响应: {response.text[:200]}")
self.safe_enqueue({
"result": "log_upload_failed",
"reason": f"http_{response.status_code}",
"detail": response.text[:100]
}, 2)
# 清理临时文件
try:
os.remove(temp_file_path)
self.logger.debug(f"[LOG_UPLOAD] 临时文件已删除: {temp_file_path}")
except Exception as e:
self.logger.warning(f"[LOG_UPLOAD] 删除临时文件失败: {e}")
except requests.exceptions.Timeout:
self.logger.error("[LOG_UPLOAD] 上传超时")
self.safe_enqueue({"result": "log_upload_failed", "reason": "timeout"}, 2)
except requests.exceptions.ConnectionError as e:
self.logger.error(f"[LOG_UPLOAD] 连接失败: {e}")
self.safe_enqueue({"result": "log_upload_failed", "reason": "connection_error"}, 2)
except Exception as e:
self.logger.error(f"[LOG_UPLOAD] 上传异常: {e}")
self.safe_enqueue({"result": "log_upload_failed", "reason": str(e)[:100]}, 2)
def generate_token(self, device_id):
"""生成用于 HTTP 接口鉴权的 TokenHMAC-SHA256"""
SALT = "shootMessageFire"
SALT2 = "shoot"
return "Arrow_" + hmac.new((SALT + device_id).encode(), SALT2.encode(), hashlib.sha256).hexdigest()
def tcp_main(self):
"""TCP 主通信循环:登录、心跳、处理指令、发送数据"""
import _thread
from maix import camera
self.logger.info("[NET] TCP主线程启动")
send_hartbeat_fail_count = 0
last_charging_check = 0
CHARGING_CHECK_INTERVAL = 5000 # 5秒检查一次充电状态
while True:
try:
# 检查充电状态每5秒检查一次
current_time = time.ticks_ms()
if current_time - last_charging_check > CHARGING_CHECK_INTERVAL:
last_charging_check = current_time
# OTA 期间不要 connect/登录/心跳/发送
try:
from ota_manager import ota_manager
if ota_manager.ota_in_progress:
time.sleep_ms(200)
continue
except Exception as e:
self.logger.error(f"[NET] OTA检查异常: {e}")
time.sleep_ms(200)
continue
if not self.connect_server():
time.sleep_ms(5000)
continue
# 发送登录包
vol_val = get_bus_voltage()
login_data = {
"deviceId": self.device_id,
"password": self.password,
"version": config.APP_VERSION,
"vol": vol_val,
"vol_per": voltage_to_percent(vol_val)
}
# if not self.tcp_send_raw(self.make_packet(1, login_data)):
if not self.tcp_send_raw(self._netcore.make_packet(1, login_data)):
self._tcp_connected = False
time.sleep_ms(2000)
continue
self.logger.info("➡️ 登录包已发送,等待确认...")
logged_in = False
pending_cleared = False
last_heartbeat_ack_time = time.ticks_ms()
last_heartbeat_send_time = time.ticks_ms()
while True:
# OTA 期间暂停 TCP 活动
try:
from ota_manager import ota_manager
if ota_manager.ota_in_progress:
time.sleep_ms(200)
continue
except Exception as e:
self.logger.error(f"[NET] OTA检查异常: {e}")
time.sleep_ms(200)
continue
# 接收数据(根据网络类型选择接收方式)
item = None
if self._network_type == "wifi":
# WiFi接收数据
data = self.receive_tcp_data_via_wifi(timeout_ms=50)
if data:
# 将数据添加到缓冲区
self._wifi_recv_buffer += data
# 尝试从缓冲区解析完整的数据包
while len(self._wifi_recv_buffer) >= 12: # 至少需要12字节的头部
# 解析头部
try:
body_len, msg_type, checksum = struct.unpack(">III", self._wifi_recv_buffer[:12])
total_len = 12 + body_len
if len(self._wifi_recv_buffer) >= total_len:
# 有完整的数据包
payload = self._wifi_recv_buffer[:total_len]
self._wifi_recv_buffer = self._wifi_recv_buffer[total_len:]
item = (0, payload) # link_id=0 for WiFi
break
else:
# 数据包不完整,等待更多数据
break
except:
# 解析失败,清空缓冲区
self._wifi_recv_buffer = b""
break
elif self._network_type == "4g":
# 4G接收数据
item = hardware_manager.at_client.pop_tcp_payload()
if item:
if isinstance(item, tuple) and len(item) == 2:
link_id, payload = item
else:
link_id, payload = 0, item
if not logged_in:
try:
self.logger.debug(f"[TCP] rx link={link_id} len={len(payload)} head={payload[:12].hex()}")
except:
pass
# msg_type, body = self.parse_packet(payload)
msg_type, body = self._netcore.parse_packet(payload)
# 处理登录响应
if not logged_in and msg_type == 1:
if body and body.get("cmd") == 1 and body.get("data") == "登录成功":
logged_in = True
last_heartbeat_ack_time = time.ticks_ms()
self.logger.info("登录成功")
# 检查 ota_pending.json
try:
pending_path = f"{config.APP_DIR}/ota_pending.json"
if os.path.exists(pending_path):
try:
with open(pending_path, "r", encoding="utf-8") as f:
pending_obj = json.load(f)
except:
pending_obj = {}
self.safe_enqueue({"result": "ota_ok", "url": pending_obj.get("url", "")}, 2)
self.logger.info("[OTA] 已上报 ota_ok等待心跳确认后删除 pending")
except Exception as e:
self.logger.error(f"[OTA] ota_ok 上报失败: {e}")
else:
break
# 处理心跳 ACK
elif logged_in and msg_type == 4:
last_heartbeat_ack_time = time.ticks_ms()
self.logger.debug("✅ 收到心跳确认")
# 处理命令40分片下载
elif logged_in and msg_type == 40:
if isinstance(body, dict):
t = body.get('t', 0)
v = body.get('v')
# 如果是第一个分片,清空之前的缓存
if len(self._raw_line_data) == 0 or (len(self._raw_line_data) > 0 and self._raw_line_data[0].get('v') != v):
self._raw_line_data.clear()
# 或者更简单每次收到命令40时如果版本号不同清空缓存
if len(self._raw_line_data) > 0:
first_v = self._raw_line_data[0].get('v')
if first_v and first_v != v:
self._raw_line_data.clear()
self._raw_line_data.append(body)
if len(self._raw_line_data) >= int(t):
self.logger.info(f"下载完成")
from ota_manager import ota_manager
stock_array = list(map(lambda x: x.get('d'), self._raw_line_data))
local_filename = config.LOCAL_FILENAME
with open(local_filename, 'w', encoding='utf-8') as file:
file.write("\n".join(stock_array))
ota_manager.apply_ota_and_reboot(None, local_filename)
else:
self.safe_enqueue({'data':{'l': len(self._raw_line_data), 'v': v}, 'cmd': 41})
self.logger.info(f"已下载{len(self._raw_line_data)} 全部:{t} 版本:{v}")
# 处理业务指令
elif logged_in and isinstance(body, dict):
inner_cmd = None
data_obj = body.get("data")
if isinstance(data_obj, dict):
inner_cmd = data_obj.get("cmd")
if inner_cmd == 2: # 开启激光并校准
from laser_manager import laser_manager
if not laser_manager.calibration_active:
laser_manager.turn_on_laser()
time.sleep_ms(100)
if not config.HARDCODE_LASER_POINT:
laser_manager.start_calibration()
self.safe_enqueue({"result": "calibrating"}, 2)
else:
# 写死的逻辑,不需要校准激光点
self.safe_enqueue({"result": "laser pos set by hard code"}, 2)
elif inner_cmd == 3: # 关闭激光
from laser_manager import laser_manager
laser_manager.turn_off_laser()
laser_manager.stop_calibration()
self.safe_enqueue({"result": "laser_off"}, 2)
elif inner_cmd == 4: # 上报电量
voltage = get_bus_voltage()
battery_percent = voltage_to_percent(voltage)
battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)}
self.safe_enqueue(battery_data, 2)
self.logger.info(f"电量上报: {battery_percent}%")
elif inner_cmd == 5: # OTA 升级
inner_data = data_obj.get("data", {}) if isinstance(data_obj, dict) else {}
ssid = inner_data.get("ssid")
password = inner_data.get("password")
ota_url = inner_data.get("url")
mode = (inner_data.get("mode") or "").strip().lower()
if not ota_url:
self.logger.error("ota missing_url")
self.safe_enqueue({"result": "missing_url"}, 2)
continue
from ota_manager import ota_manager
if ota_manager.update_thread_started:
self.safe_enqueue({"result": "update_already_started"}, 2)
continue
# 自动判断模式如果没有明确指定根据WiFi连接状态和凭证决定
if mode not in ("4g", "wifi"):
self.logger.info("ota missing mode, auto-detecting...")
# 只有同时满足WiFi已连接 且 提供了WiFi凭证才使用WiFi
if self.is_wifi_connected() and ssid and password:
mode = "wifi"
self.logger.info("ota auto-selected: wifi (WiFi connected and credentials provided)")
else:
mode = "4g"
self.logger.info("ota auto-selected: 4g (WiFi not available or no credentials)")
if mode == "4g":
ota_manager._set_ota_url(ota_url) # 记录 OTA URL供命令7使用
ota_manager._start_update_thread()
_thread.start_new_thread(ota_manager.direct_ota_download_via_4g, (ota_url,))
else: # mode == "wifi"
if not ssid or not password:
self.logger.error("ota wifi mode requires ssid and password")
self.safe_enqueue({"result": "missing_ssid_or_password"}, 2)
else:
ota_manager._start_update_thread()
_thread.start_new_thread(ota_manager.handle_wifi_and_update, (ssid, password, ota_url))
elif inner_cmd == 6:
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
ip = ip if ip else "no_ip"
except:
ip = "error_getting_ip"
self.safe_enqueue({"result": "current_ip", "ip": ip}, 2)
elif inner_cmd == 7:
from ota_manager import ota_manager
if ota_manager.update_thread_started:
self.safe_enqueue({"result": "update_already_started"}, 2)
continue
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
except:
ip = None
if not ip:
self.safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, 2)
else:
# 注意direct_ota_download 需要 ota_url 参数
# 如果 ota_manager.ota_url 为 None需要从其他地方获取
ota_url_to_use = ota_manager.ota_url
if not ota_url_to_use:
self.logger.error("[OTA] cmd=7 但 OTA_URL 未设置")
self.safe_enqueue({"result": "ota_failed", "reason": "ota_url_not_set"}, 2)
else:
ota_manager._start_update_thread()
_thread.start_new_thread(ota_manager.direct_ota_download, (ota_url_to_use,))
elif inner_cmd == 41:
self.logger.info("[TEST] 收到TCP射箭触发命令")
self._manual_trigger_flag = True
self.safe_enqueue({"result": "trigger_ack"}, 2)
elif inner_cmd == 42: # 关机命令
self.logger.info("[SHUTDOWN] 收到TCP关机命令准备关机...")
self.safe_enqueue({"result": "shutdown_ack"}, 2)
time.sleep_ms(1000)
self.disconnect_server()
# 尝试关闭4G模块
try:
with self.get_uart_lock():
hardware_manager.at_client.send("AT+CFUN=0", "OK", 5000)
except:
pass
time.sleep_ms(2000)
os.system("poweroff")
return
elif inner_cmd == 43: # 上传日志命令
# 格式: {"cmd":43, "data":{"ssid":"xxx","password":"xxx","url":"xxx"}}
inner_data = data_obj.get("data", {})
upload_url = inner_data.get("url")
wifi_ssid = inner_data.get("ssid")
wifi_password = inner_data.get("password")
if not upload_url:
self.logger.error("[LOG_UPLOAD] 缺少 url 参数")
self.safe_enqueue({"result": "log_upload_failed", "reason": "missing_url"}, 2)
else:
self.logger.info(f"[LOG_UPLOAD] 收到日志上传命令目标URL: {upload_url}")
# 在新线程中执行上传,避免阻塞主循环
import _thread
_thread.start_new_thread(self._upload_log_file, (upload_url, wifi_ssid, wifi_password))
else:
time.sleep_ms(5)
# 发送队列中的业务数据
if logged_in and (self._high_send_queue or self._normal_send_queue):
msg_type = None
data_dict = None
if self.get_queue_lock().acquire(blocking=False):
try:
if self._high_send_queue:
msg_type, data_dict = self._high_send_queue.pop(0)
elif self._normal_send_queue:
msg_type, data_dict = self._normal_send_queue.pop(0)
finally:
self.get_queue_lock().release()
if msg_type is not None and data_dict is not None:
pkt = self._netcore.make_packet(msg_type, data_dict)
# pkt = self.make_packet(msg_type, data_dict)
if not self.tcp_send_raw(pkt):
self._tcp_connected = False
break
# 发送激光校准结果
if logged_in:
from laser_manager import laser_manager
result = laser_manager.get_calibration_result()
if result:
x, y = result
self.safe_enqueue({"result": "ok", "x": x, "y": y}, 2)
# 定期发送心跳
current_time = time.ticks_ms()
if logged_in and current_time - last_heartbeat_send_time > config.HEARTBEAT_INTERVAL * 1000:
vol_val = get_bus_voltage()
if not self.tcp_send_raw(self._netcore.make_packet(4, {"vol": vol_val, "vol_per": voltage_to_percent(vol_val)})):
# if not self.tcp_send_raw(self.make_packet(4, {"vol": vol_val, "vol_per": voltage_to_percent(vol_val)})):
self.logger.error("心跳发送失败")
time.sleep_ms(3000)
send_hartbeat_fail_count += 1
if send_hartbeat_fail_count >= 3:
send_hartbeat_fail_count = 0
self.logger.error("连续3次发送心跳失败重连")
break
else:
continue
else:
send_hartbeat_fail_count = 0
last_heartbeat_send_time = current_time
self.logger.debug("心跳已发送")
# 删除 pending 文件(心跳发送成功后)
if not pending_cleared:
try:
pending_path = f"{config.APP_DIR}/ota_pending.json"
if os.path.exists(pending_path):
try:
os.remove(pending_path)
pending_cleared = True
self.logger.info("[OTA] 心跳发送成功,已删除 ota_pending.json")
except Exception as e:
self.logger.error(f"[OTA] 删除 pending 文件失败: {e}")
except Exception as e:
self.logger.error(f"[OTA] 检查 pending 文件时出错: {e}")
# 心跳超时重连
if logged_in and current_time - last_heartbeat_ack_time > 1000*60*10:
self.logger.error("十分钟无心跳ACK重连")
break
time.sleep_ms(50)
self._tcp_connected = False
self.logger.error("连接异常2秒后重连...")
time.sleep_ms(2000)
except Exception as e:
# TCP主循环的顶层异常捕获防止线程静默退出
self.logger.error(f"[NET] TCP主循环异常: {e}")
import traceback
self.logger.error(traceback.format_exc())
self._tcp_connected = False
time.sleep_ms(5000) # 等待5秒后重试连接
# 创建全局单例实例
network_manager = NetworkManager()
# ==================== 向后兼容的函数接口 ====================
def tcp_main():
"""TCP主循环向后兼容接口"""
return network_manager.tcp_main()
def read_device_id():
"""读取设备ID向后兼容接口"""
return network_manager.read_device_id()
def safe_enqueue(data_dict, msg_type=2, high=False):
"""线程安全地加入队列(向后兼容接口)"""
return network_manager.safe_enqueue(data_dict, msg_type, high)
def connect_server():
"""连接服务器(向后兼容接口)"""
return network_manager.connect_server()
def disconnet_server():
"""断开服务器连接(向后兼容接口)"""
return network_manager.disconnect_server()
def is_wifi_connected():
"""检查WiFi是否已连接向后兼容接口"""
return network_manager.is_wifi_connected()
def connect_wifi(ssid, password):
"""连接WiFi向后兼容接口"""
return network_manager.connect_wifi(ssid, password)
def is_server_reachable(host, port=80, timeout=5):
"""检查服务器是否可达(向后兼容接口)"""
return network_manager.is_server_reachable(host, port, timeout)