remove unseed code

This commit is contained in:
gcw_4spBpAfv
2026-04-23 11:14:08 +08:00
parent 1bace88f37
commit 12fac4ea1c
5 changed files with 38 additions and 58 deletions

View File

@@ -25,6 +25,7 @@ add_library(archery_netcore MODULE
utils.cpp utils.cpp
decrypt_ota_file.cpp decrypt_ota_file.cpp
msg_handler.cpp msg_handler.cpp
tcp_ssl_password.cpp
) )
target_include_directories(archery_netcore PRIVATE target_include_directories(archery_netcore PRIVATE

View File

@@ -12,6 +12,7 @@
#include "native_logger.hpp" #include "native_logger.hpp"
#include "decrypt_ota_file.hpp" #include "decrypt_ota_file.hpp"
#include "utils.hpp" #include "utils.hpp"
#include "tcp_ssl_password.hpp"
namespace py = pybind11; namespace py = pybind11;
using json = nlohmann::json; using json = nlohmann::json;
@@ -61,6 +62,14 @@ PYBIND11_MODULE(archery_netcore, m) {
m.def("get_config", &get_config, "Get system configuration"); m.def("get_config", &get_config, "Get system configuration");
m.def(
"calculate_tcp_ssl_password",
&netcore::calculate_tcp_ssl_password,
"Calculate TCP SSL password: hex(md5(hex(md5(device_id)) + iccid))",
py::arg("device_id"),
py::arg("iccid")
);
m.def( m.def(
"decrypt_ota_file", "decrypt_ota_file",
[](const std::string& input_path, const std::string& output_zip_path) { [](const std::string& input_path, const std::string& output_zip_path) {

View File

@@ -19,21 +19,10 @@ import config
from hardware import hardware_manager from hardware import hardware_manager
from power import get_bus_voltage, voltage_to_percent from power import get_bus_voltage, voltage_to_percent
# from laser import laser_manager
# from ota import ota_manager
from logger_manager import logger_manager from logger_manager import logger_manager
from wifi import wifi_manager from wifi import wifi_manager
def _calculate_tcp_ssl_password(device_id, iccid):
"""
与服务器 calculatePassword(deviceId, iccid) 一致:
hex(md5(hex(md5(deviceId)) + iccid))iccid 为空则不拼接。
"""
md5_device_hex = hashlib.md5(device_id.encode("utf-8")).hexdigest()
if iccid:
md5_device_hex = md5_device_hex + iccid
return hashlib.md5(md5_device_hex.encode("utf-8")).hexdigest()
def _wifi_tls_would_block(exc): def _wifi_tls_would_block(exc):
@@ -219,10 +208,10 @@ class NetworkManager:
iccid = self.get_4g_mccid() iccid = self.get_4g_mccid()
iccid = iccid if iccid else "" iccid = iccid if iccid else ""
print(f"iccid: {iccid}") print(f"iccid: {iccid}")
self._password = _calculate_tcp_ssl_password(device_id, iccid) self._password = self._netcore.calculate_tcp_ssl_password(device_id, iccid)
else: else:
self._password = device_id + "." self.logger.error("[SSL] TCP SSL NOT enabled! exit!")
print(f"self._password: {self._password}") exit(1)
try: try:
with open("/device_key", "r") as f: with open("/device_key", "r") as f:
@@ -652,45 +641,8 @@ class NetworkManager:
"""线程安全地将消息加入队列(公共方法)""" """线程安全地将消息加入队列(公共方法)"""
self._enqueue((msg_type, data_dict), high) self._enqueue((msg_type, data_dict), high)
def make_packet(self, msg_type: int, body_dict: dict) -> bytes:
"""打包 TCP 数据包:头部(长度+类型+校验)+ JSON 正文"""
body = json.dumps(body_dict).encode()
body_len = len(body)
checksum = body_len + msg_type
header = struct.pack(">III", body_len, msg_type, checksum)
return header + body
def parse_packet(self, data: bytes):
"""解析 TCP 数据包,返回 (类型, 正文字典)"""
if len(data) < 12:
return None, None
body_len, msg_type, checksum = struct.unpack(">III", data[:12])
expected_len = 12 + body_len
# 防御性检查:如果 data 比预期长,说明可能有粘包
if len(data) > expected_len:
self.logger.warning(
f"[TCP] parse_packet: data length ({len(data)}) > expected ({expected_len}), "
f"possible packet concatenation. body_len={body_len}, msg_type={msg_type}"
)
# 只解析第一个包,忽略多余数据(或者可以返回剩余部分)
# data = data[:expected_len]
# TODO 是否需要解析剩余部分?
# 如果 data 比预期短,说明包不完整(半包)
if len(data) < expected_len:
self.logger.warning(
f"[TCP] parse_packet: data length ({len(data)}) < expected ({expected_len}), "
f"incomplete packet. body_len={body_len}, msg_type={msg_type}"
)
return None, None
body = data[12:12 + body_len]
try:
return msg_type, json.loads(body.decode())
except:
return msg_type, {"raw": body.decode(errors="ignore")}
def connect_server(self): def connect_server(self):
""" """
@@ -1512,12 +1464,6 @@ class NetworkManager:
"reason": str(e)[:100] "reason": str(e)[:100]
}, 2) }, 2)
def generate_token(self, device_id):
"""生成用于 HTTP 接口鉴权的 TokenHMAC-SHA256"""
SALT = "shootMessageFire"
SALT2 = "shoot"
return "Arrow_" + hmac.new((SALT + device_id).encode(), SALT2.encode(), hashlib.sha256).hexdigest()
def tcp_main(self): def tcp_main(self):
"""TCP 主通信循环:登录、心跳、处理指令、发送数据""" """TCP 主通信循环:登录、心跳、处理指令、发送数据"""
import _thread import _thread

View File

@@ -383,6 +383,27 @@ def _ensure_hostapd_modern_security(logger=None) -> bool:
return False return False
def _cleanup_ap_flag_if_needed(logger):
"""若 /boot/wifi.ap 残留,删除它并恢复 /boot/wifi.sta避免 main.py 误判为 AP 配网模式。"""
ap_flag = "/boot/wifi.ap"
sta_flag = "/boot/wifi.sta"
if not os.path.exists(ap_flag):
return
try:
os.remove(ap_flag)
logger.info(f"[WIFI-AP] 已清理残留标记 {ap_flag}")
except Exception as e:
logger.warning(f"[WIFI-AP] 清理 {ap_flag} 失败: {e}")
return
if not os.path.exists(sta_flag):
try:
with open(sta_flag, "w", encoding="utf-8") as f:
f.write("")
logger.info(f"[WIFI-AP] 已恢复 {sta_flag}")
except Exception as e:
logger.warning(f"[WIFI-AP] 恢复 {sta_flag} 失败: {e}")
def _switch_boot_to_ap_mode(logger): def _switch_boot_to_ap_mode(logger):
""" """
去掉 STA 标志、建立 AP 标志,由 S30wifi 起 hostapd与 Maix start_ap 二选一,以系统脚本为准)。 去掉 STA 标志、建立 AP 标志,由 S30wifi 起 hostapd与 Maix start_ap 二选一,以系统脚本为准)。
@@ -449,6 +470,8 @@ def maybe_start_wifi_ap_fallback(logger=None):
logger.info(f"[WIFI-AP] 兜底检测(quick)sta关联={wifi_ok}, 4g={g4_ok}") logger.info(f"[WIFI-AP] 兜底检测(quick)sta关联={wifi_ok}, 4g={g4_ok}")
if wifi_ok or g4_ok: if wifi_ok or g4_ok:
logger.info("[WIFI-AP] STA 或 4G 可用,不启动热点配网") logger.info("[WIFI-AP] STA 或 4G 可用,不启动热点配网")
# 清理上次开机可能残留的 /boot/wifi.ap 标记,避免 main.py 误判为 AP 配网模式
_cleanup_ap_flag_if_needed(logger)
return return
# 两者均不可用:再按配置等待一段时间后复检,避免开机瞬态误判 # 两者均不可用:再按配置等待一段时间后复检,避免开机瞬态误判
@@ -466,6 +489,7 @@ def maybe_start_wifi_ap_fallback(logger=None):
if wifi_ok or g4_ok: if wifi_ok or g4_ok:
logger.info("[WIFI-AP] STA 或 4G 可用,不启动热点配网") logger.info("[WIFI-AP] STA 或 4G 可用,不启动热点配网")
_cleanup_ap_flag_if_needed(logger)
return return
logger.warning("[WIFI-AP] STA 与 4G 均不可用,启动热点配网(/boot/wifi.ap + HTTP") logger.warning("[WIFI-AP] STA 与 4G 均不可用,启动热点配网(/boot/wifi.ap + HTTP")