invole c++

This commit is contained in:
gcw_4spBpAfv
2026-01-22 17:55:11 +08:00
parent 945077a453
commit 42bfdd033c
15 changed files with 25424 additions and 1041 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
/cpp_ext/build/

View File

@@ -1,11 +1,12 @@
id: t11 id: t11
name: t11 name: t11
version: 1.1.4 version: 1.1.10
author: t11 author: t11
icon: '' icon: ''
desc: t11 desc: t11
files: files:
- app.yaml - app.yaml
- archery_netcore.cpython-311-riscv64-linux-gnu.so
- at_client.py - at_client.py
- camera_manager.py - camera_manager.py
- config.py - config.py

View File

@@ -88,7 +88,7 @@ class ATClient:
self.uart.write((cmd + "\r\n").encode()) self.uart.write((cmd + "\r\n").encode())
t0 = time.ticks_ms() t0 = time.ticks_ms()
while time.ticks_ms() - t0 < timeout_ms: while abs(time.ticks_diff(time.ticks_ms(), t0)) < timeout_ms:
if (not self._waiting) or (self._expect in self._resp): if (not self._waiting) or (self._expect in self._resp):
self._waiting = False self._waiting = False
break break

View File

@@ -12,20 +12,14 @@ APP_DIR = "/maixapp/apps/t11"
LOCAL_FILENAME = "/maixapp/apps/t11/main_tmp.py" LOCAL_FILENAME = "/maixapp/apps/t11/main_tmp.py"
# ==================== 服务器配置 ==================== # ==================== 服务器配置 ====================
SERVER_IP = "stcp.shelingxingqiu.com" # SERVER_IP = "stcp.shelingxingqiu.com"
# SERVER_IP = "www.shelingxingqiu.com" SERVER_IP = "www.shelingxingqiu.com"
SERVER_PORT = 50005 SERVER_PORT = 50005
HEARTBEAT_INTERVAL = 15 # 心跳间隔(秒) HEARTBEAT_INTERVAL = 15 # 心跳间隔(秒)
# ==================== HTTP配置 ====================
HTTP_URL = "http://ws.shelingxingqiu.com"
HTTP_API_PATH = "/home/shoot/device_fire/arrow/fire"
# config.py 里新增(建议放到“服务器配置”附近)
# ===== TCP over SSL(TLS) 配置 ===== # ===== TCP over SSL(TLS) 配置 =====
USE_TCP_SSL = True # True=按手册走 MSSLCFG/MIPCFG 绑定 SSL USE_TCP_SSL = False # True=按手册走 MSSLCFG/MIPCFG 绑定 SSL
TCP_LINK_ID = 1 # TCP_LINK_ID = 2 #
TCP_SSL_PORT = 443 # TLS 端口(不一定必须 443以服务器为准 TCP_SSL_PORT = 443 # TLS 端口(不一定必须 443以服务器为准
# SSL profile # SSL profile
@@ -45,7 +39,7 @@ BACKUP_BASE = "/maixapp/apps/t11/backups"
# ==================== 硬件配置 ==================== # ==================== 硬件配置 ====================
# WiFi模块开关True=有WiFi模块False=无WiFi模块 # WiFi模块开关True=有WiFi模块False=无WiFi模块
HAS_WIFI_MODULE = True # 根据实际硬件情况设置 HAS_WIFI_MODULE = False # 根据实际硬件情况设置
# UART配置 # UART配置
UART4G_DEVICE = "/dev/ttyS2" UART4G_DEVICE = "/dev/ttyS2"

View File

@@ -19,11 +19,15 @@ if(NOT DEFINED MAIXCDK_PATH)
message(FATAL_ERROR "MAIXCDK_PATH not set (need components/3rd_party/pybind11)") message(FATAL_ERROR "MAIXCDK_PATH not set (need components/3rd_party/pybind11)")
endif() endif()
add_library(archery_netcore MODULE archery_netcore.cpp) add_library(archery_netcore MODULE
archery_netcore.cpp
native_logger.cpp
)
target_include_directories(archery_netcore PRIVATE target_include_directories(archery_netcore PRIVATE
"${PY_INCLUDE_DIR}" "${PY_INCLUDE_DIR}"
"${MAIXCDK_PATH}/components/3rd_party/pybind11/pybind11/include" "${MAIXCDK_PATH}/components/3rd_party/pybind11/pybind11/include"
"${CMAKE_CURRENT_SOURCE_DIR}/third_party" # 添加 nlohmann/json 路径
) )
set_target_properties(archery_netcore PROPERTIES set_target_properties(archery_netcore PROPERTIES

View File

@@ -1,12 +1,269 @@
#include <pybind11/pybind11.h> #include <pybind11/pybind11.h>
#include <pybind11/stl.h> // 支持 std::vector, std::map 等
#include <nlohmann/json.hpp>
#include <cstring>
#include <cstdint>
#include <vector>
#include <string>
#include "native_logger.hpp"
namespace py = pybind11; namespace py = pybind11;
using json = nlohmann::json;
static const char* kServerIp = "stcp.shelingxingqiu.com"; namespace {
// 配置项
const std::string _cfg_server_ip = "www.shelingxingqiu.com";
const int _cfg_server_port = 50005;
}
// 定义获取配置的函数
py::dict get_config() {
py::dict config;
config["SERVER_IP"] = _cfg_server_ip;
config["SERVER_PORT"] = _cfg_server_port;
return config;
}
// 辅助函数:将 py::dict 转为 nlohmann::json
json py_dict_to_json(py::dict d) {
json j;
for (auto item : d) {
std::string key = py::str(item.first);
py::object val = py::reinterpret_borrow<py::object>(item.second);
if (py::isinstance<py::dict>(val)) {
j[key] = py_dict_to_json(py::cast<py::dict>(val));
} else if (py::isinstance<py::list>(val)) {
py::list py_list = py::cast<py::list>(val);
json arr = json::array();
for (auto elem : py_list) {
py::object elem_obj = py::reinterpret_borrow<py::object>(elem);
if (py::isinstance<py::dict>(elem_obj)) {
arr.push_back(py_dict_to_json(py::cast<py::dict>(elem_obj)));
} else if (py::isinstance<py::int_>(elem_obj)) {
arr.push_back(py::cast<int64_t>(elem_obj));
} else if (py::isinstance<py::float_>(elem_obj)) {
arr.push_back(py::cast<double>(elem_obj));
} else {
arr.push_back(py::str(elem_obj));
}
}
j[key] = arr;
} else if (py::isinstance<py::int_>(val)) {
j[key] = py::cast<int64_t>(val);
} else if (py::isinstance<py::float_>(val)) {
j[key] = py::cast<double>(val);
} else if (py::isinstance<py::bool_>(val)) {
j[key] = py::cast<bool>(val);
} else if (val.is_none()) {
j[key] = nullptr;
} else {
j[key] = py::str(val);
}
}
return j;
}
// 辅助函数:将 nlohmann::json 转为 py::dict
py::dict json_to_py_dict(const json& j) {
py::dict d;
if (j.is_object()) {
for (auto& item : j.items()) {
std::string key = item.key();
json val = item.value();
if (val.is_object()) {
d[py::str(key)] = json_to_py_dict(val);
} else if (val.is_array()) {
py::list py_list;
for (auto& elem : val) {
if (elem.is_object()) {
py_list.append(json_to_py_dict(elem));
} else if (elem.is_number_integer()) {
py_list.append(py::int_(elem.get<int64_t>()));
} else if (elem.is_number_float()) {
py_list.append(py::float_(elem.get<double>()));
} else if (elem.is_boolean()) {
py_list.append(py::bool_(elem.get<bool>()));
} else if (elem.is_null()) {
py_list.append(py::none());
} else {
py_list.append(py::str(elem.get<std::string>()));
}
}
d[py::str(key)] = py_list;
} else if (val.is_number_integer()) {
d[py::str(key)] = py::int_(val.get<int64_t>());
} else if (val.is_number_float()) {
d[py::str(key)] = py::float_(val.get<double>());
} else if (val.is_boolean()) {
d[py::str(key)] = py::bool_(val.get<bool>());
} else if (val.is_null()) {
d[py::str(key)] = py::none();
} else {
d[py::str(key)] = py::str(val.get<std::string>());
}
}
}
return d;
}
// 打包 TCP 数据包
py::bytes make_packet(int msg_type, py::dict body_dict) {
netcore::log_debug(std::string("make_packet msg_type=") + std::to_string(msg_type));
// 1) 将 py::dict 转为 JSON 字符串
json j = py_dict_to_json(body_dict);
std::string body_str = j.dump();
// 2) 计算 body_len 和 checksum
uint32_t body_len = body_str.size();
uint32_t checksum = body_len + msg_type;
// 3) 打包头部(大端序)
std::vector<uint8_t> packet;
packet.reserve(12 + body_len);
// body_len (big-endian, 4 bytes)
packet.push_back((body_len >> 24) & 0xFF);
packet.push_back((body_len >> 16) & 0xFF);
packet.push_back((body_len >> 8) & 0xFF);
packet.push_back(body_len & 0xFF);
// msg_type (big-endian, 4 bytes)
packet.push_back((msg_type >> 24) & 0xFF);
packet.push_back((msg_type >> 16) & 0xFF);
packet.push_back((msg_type >> 8) & 0xFF);
packet.push_back(msg_type & 0xFF);
// checksum (big-endian, 4 bytes)
packet.push_back((checksum >> 24) & 0xFF);
packet.push_back((checksum >> 16) & 0xFF);
packet.push_back((checksum >> 8) & 0xFF);
packet.push_back(checksum & 0xFF);
// 4) 追加 body
packet.insert(packet.end(), body_str.begin(), body_str.end());
netcore::log_debug(std::string("make_packet done bytes=") + std::to_string(packet.size()));
return py::bytes(reinterpret_cast<const char*>(packet.data()), packet.size());
}
// 解析 TCP 数据包
py::tuple parse_packet(py::bytes data) {
// 1) 转换为 bytes view
py::buffer_info buf = py::buffer(data).request();
if (buf.size < 12) {
netcore::log_error(std::string("parse_packet too_short len=") + std::to_string(buf.size));
return py::make_tuple(py::none(), py::none());
}
const uint8_t* ptr = static_cast<const uint8_t*>(buf.ptr);
// 2) 解析头部(大端序)
uint32_t body_len = (ptr[0] << 24) | (ptr[1] << 16) | (ptr[2] << 8) | ptr[3];
uint32_t msg_type = (ptr[4] << 24) | (ptr[5] << 16) | (ptr[6] << 8) | ptr[7];
uint32_t checksum = (ptr[8] << 24) | (ptr[9] << 16) | (ptr[10] << 8) | ptr[11];
// 3) 校验 checksum可选你现有代码不强制校验
// if (checksum != (body_len + msg_type)) {
// return py::make_tuple(py::none(), py::none());
// }
// 4) 检查长度
uint32_t expected_len = 12 + body_len;
if (buf.size < expected_len) {
// 半包
netcore::log_warn(std::string("parse_packet incomplete got=") + std::to_string(buf.size) +
" expected=" + std::to_string(expected_len));
return py::make_tuple(py::none(), py::none());
}
// 5) 防御性检查:如果 data 比预期长,说明可能有粘包
// (只解析第一个包,忽略多余数据)
if (buf.size > expected_len) {
netcore::log_warn(std::string("parse_packet concat got=") + std::to_string(buf.size) +
" expected=" + std::to_string(expected_len) +
" body_len=" + std::to_string(body_len) +
" msg_type=" + std::to_string(msg_type));
}
// 6) 提取 body 并解析 JSON
std::string body_str(reinterpret_cast<const char*>(ptr + 12), body_len);
try {
json j = json::parse(body_str);
py::dict body_dict = json_to_py_dict(j);
return py::make_tuple(py::int_(msg_type), body_dict);
} catch (const json::parse_error& e) {
// JSON 解析失败,返回 raw兼容你现有的逻辑
netcore::log_error(std::string("parse_packet json_parse_error: ") + e.what());
py::dict raw_dict;
raw_dict["raw"] = body_str;
return py::make_tuple(py::int_(msg_type), raw_dict);
} catch (const std::exception& e) {
netcore::log_error(std::string("parse_packet json_parse_error: ") + e.what());
py::dict raw_dict;
raw_dict["raw"] = body_str;
return py::make_tuple(py::int_(msg_type), raw_dict);
}
}
PYBIND11_MODULE(archery_netcore, m) { PYBIND11_MODULE(archery_netcore, m) {
m.doc() = "Archery net core (native, pybind11)."; m.doc() = "Archery net core (native, pybind11).";
m.def("server_ip", []() {
return py::str(kServerIp); // Optional: configure native logger from Python.
// Default log file: /maixapp/apps/t11/netcore.log
m.def("set_log_file", [](const std::string& path) { netcore::set_log_file(path); }, py::arg("path"));
m.def("set_log_level", [](int level) {
if (level < 0) level = 0;
if (level > 3) level = 3;
netcore::set_log_level(static_cast<netcore::LogLevel>(level));
}, py::arg("level"));
m.def("log_test", [](const std::string& msg) {
netcore::log_info(std::string("log_test: ") + msg);
}, py::arg("msg"));
m.def("make_packet", &make_packet,
"Pack TCP packet: header (len+type+checksum) + JSON body",
py::arg("msg_type"), py::arg("body_dict"));
m.def("parse_packet", &parse_packet,
"Parse TCP packet, return (msg_type, body_dict)");
m.def("get_config", &get_config, "Get system configuration");
// Minimal demo: return actions for inner_cmd=41 (manual trigger + ack)
m.def("actions_for_inner_cmd", [](int inner_cmd) {
py::list actions;
if (inner_cmd == 41) {
// 1) set manual trigger flag
{
py::dict a;
a["type"] = "SET_FLAG";
py::dict args;
args["name"] = "manual_trigger_flag";
args["value"] = true;
a["args"] = args;
actions.append(a);
}
// 2) enqueue trigger_ack
{
py::dict a;
a["type"] = "ENQUEUE";
py::dict args;
args["msg_type"] = 2;
args["high"] = false;
py::dict body;
body["result"] = "trigger_ack";
args["body"] = body;
a["args"] = args;
actions.append(a);
}
}
return actions;
}); });
} }

100
cpp_ext/native_logger.cpp Normal file
View File

@@ -0,0 +1,100 @@
#include "native_logger.hpp"
#include <cerrno>
#include <cstring>
#include <mutex>
#include <string>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>
namespace netcore {
static std::mutex g_mu;
static int g_fd = -1;
static std::string g_path = "netcore.log";
static LogLevel g_level = LogLevel::kDebug; //LogLevel::kInfo;
static const char* level_name(LogLevel lvl) {
switch (lvl) {
case LogLevel::kError: return "E";
case LogLevel::kWarn: return "W";
case LogLevel::kInfo: return "I";
case LogLevel::kDebug: return "D";
default: return "?";
}
}
static void ensure_open_locked() {
if (g_path.empty()) return;
if (g_fd >= 0) return;
g_fd = ::open(g_path.c_str(), O_CREAT | O_WRONLY | O_APPEND, 0644);
}
void set_log_file(const std::string& path) {
std::lock_guard<std::mutex> lk(g_mu);
g_path = path;
if (g_fd >= 0) {
::close(g_fd);
g_fd = -1;
}
ensure_open_locked();
}
void set_log_level(LogLevel level) {
std::lock_guard<std::mutex> lk(g_mu);
g_level = level;
}
void log(LogLevel level, const std::string& msg) {
std::lock_guard<std::mutex> lk(g_mu);
if (static_cast<int>(level) > static_cast<int>(g_level)) return;
if (g_path.empty()) return;
ensure_open_locked();
if (g_fd < 0) {
// Last resort: stderr (avoid any Python APIs)
::write(STDERR_FILENO, msg.c_str(), msg.size());
::write(STDERR_FILENO, "\n", 1);
return;
}
// Timestamp: epoch milliseconds (simple and cheap)
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
// long long ms = (long long)ts.tv_sec * 1000LL + ts.tv_nsec / 1000000LL;
// 1. 将秒数转换为本地时间结构体 struct tm
struct tm *tm_info = localtime(&ts.tv_sec);
// 2. 准备一个缓冲区来存储时间字符串
char buffer[30];
// 3. 格式化秒的部分
// 格式: 年-月-日 时:分:秒
strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", tm_info);
// 4. 计算毫秒部分并追加到字符串中
// ts.tv_nsec 是纳秒,除以 1,000,000 得到毫秒
char ms_buffer[8];
snprintf(ms_buffer, sizeof(ms_buffer), ".%03ld", ts.tv_nsec / 1000000);
// Build one line to keep writes atomic-ish
char head[256];
int n = ::snprintf(head, sizeof(head), "[%s%s] [%s] ", buffer, ms_buffer, level_name(level));
if (n < 0) n = 0;
::write(g_fd, head, (size_t)n);
::write(g_fd, msg.c_str(), msg.size());
::write(g_fd, "\n", 1);
}
void log_debug(const std::string& msg) { log(LogLevel::kDebug, msg); }
void log_info (const std::string& msg) { log(LogLevel::kInfo, msg); }
void log_warn (const std::string& msg) { log(LogLevel::kWarn, msg); }
void log_error(const std::string& msg) { log(LogLevel::kError, msg); }
} // namespace netcore

28
cpp_ext/native_logger.hpp Normal file
View File

@@ -0,0 +1,28 @@
#pragma once
#include <string>
namespace netcore {
enum class LogLevel : int {
kError = 0,
kWarn = 1,
kInfo = 2,
kDebug = 3,
};
// Set log file path. If empty, logging is disabled.
void set_log_file(const std::string& path);
// Set minimum log level to write (default: kInfo).
void set_log_level(LogLevel level);
// Log helpers (thread-safe, never calls into Python).
void log(LogLevel level, const std::string& msg);
void log_debug(const std::string& msg);
void log_info(const std::string& msg);
void log_warn(const std::string& msg);
void log_error(const std::string& msg);
} // namespace netcore

24765
cpp_ext/third_party/nlohmann/json.hpp vendored Normal file

File diff suppressed because it is too large Load Diff

826
laser.py
View File

@@ -1,826 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
激光射击系统主程序(激光测距版)
功能目标检测、激光校准、4G TCP 通信、OTA 升级、M01 激光测距、INA226 电量监测
平台MaixPy (Sipeed MAIX)
作者ZZH
最后更新2025-11-21
"""
from maix import camera, display, image, app, time, key, uart, pinmap, i2c, network, err
import cv2
import numpy as np
import json
import struct
import re
from maix.peripheral import adc
import _thread
import os
import requests
import socket
import binascii
# ==============================
# 全局配置
# ==============================
# OTA 升级地址(建议后续改为动态下发)
url = "https://static.shelingxingqiu.com/shoot/202511031031/main.py"
local_filename = "/maixapp/apps/t11/main.py"
DEVICE_ID = None
PASSWORD = None
SERVER_IP = "www.shelingxingqiu.com"
SERVER_PORT = 50005
HEARTBEAT_INTERVAL = 2 # 心跳间隔(秒)
CONFIG_FILE = "/root/laser_config.json"
DEFAULT_POINT = (640, 480) # 图像中心点
laser_point = DEFAULT_POINT
# HTTP API当前未使用保留备用
URL = "http://ws.shelingxingqiu.com"
API_PATH = "/home/shoot/device_fire/arrow/fire"
# UART 设备初始化
uart4g = uart.UART("/dev/ttyS2", 115200) # 4G 模块TCP 透传)
distance_serial = uart.UART("/dev/ttyS1", 9600) # M01 激光测距模块
# 消息类型常量
MSG_TYPE_LOGIN_REQ = 1 # 登录请求
MSG_TYPE_STATUS = 2 # 状态上报
MSG_TYPE_HEARTBEAT = 4 # 心跳包
# 引脚功能映射
pinmap.set_pin_function("A18", "UART1_RX")
pinmap.set_pin_function("A19", "UART1_TX")
pinmap.set_pin_function("A29", "UART2_RX")
pinmap.set_pin_function("A28", "UART2_TX")
pinmap.set_pin_function("P18", "I2C1_SCL")
pinmap.set_pin_function("P21", "I2C1_SDA")
# pinmap.set_pin_function("A15", "I2C5_SCL")
# pinmap.set_pin_function("A27", "I2C5_SDA")#ota升级要修改的
# ADC 触发阈值(用于检测扳机/激光触发)
ADC_TRIGGER_THRESHOLD = 3000
ADC_LASER_THRESHOLD = 3000
# 显示参数
color = image.Color(255, 100, 0) # 橙色十字线
thickness = 1
length = 2
# ADC 扳机触发阈值0~4095
ADC_TRIGGER_THRESHOLD = 3000
# I2C 电源监测INA226
adc_obj = adc.ADC(0, adc.RES_BIT_12)
bus = i2c.I2C(1, i2c.Mode.MASTER)
# bus = i2c.I2C(5, i2c.Mode.MASTER)#ota升级总线
INA226_ADDR = 0x40
REG_CONFIGURATION = 0x00
REG_BUS_VOLTAGE = 0x02
REG_CALIBRATION = 0x05
CALIBRATION_VALUE = 0x1400
# M01 激光模块指令
MODULE_ADDR = 0x00
LASER_ON_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC1])
LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0])
DISTANCE_QUERY_CMD = bytes([0xAA, MODULE_ADDR, 0x00, 0x20, 0x00, 0x01, 0x00, 0x00, 0x21])
DISTANCE_RESPONSE_LEN = 13
# TCP / 线程状态
tcp_connected = False
send_queue = []
update_thread_started = False # 防止重复 OTA
send_queue_lock = _thread.allocate_lock()
laser_calibration_data_lock = _thread.allocate_lock()
laser_calibration_active = False
laser_calibration_result = None
# ==============================
# 网络工具函数
# ==============================
def is_server_reachable(host, port=80, timeout=5):
"""检查能否连接到指定主机和端口(用于 OTA 前网络检测)"""
try:
addr_info = socket.getaddrinfo(host, port)[0]
s = socket.socket(addr_info[0], addr_info[1], addr_info[2])
s.settimeout(timeout)
s.connect(addr_info[-1])
s.close()
return True
except Exception as e:
print(f"[NET] 无法连接 {host}:{port} - {e}")
return False
def download_file(url, filename):
"""
从指定 URL 下载文件并保存为 UTF-8 文本。
注意:此操作会覆盖本地 main.py
"""
try:
print(f"[OTA] 正在从 {url} 下载文件...")
response = requests.get(url, timeout=10) # ⏱️ 防止卡死
response.raise_for_status()
response.encoding = 'utf-8'
with open(filename, 'w', encoding='utf-8') as file:
file.write(response.text)
return f"下载成功!文件已保存为: {filename}"
except requests.exceptions.RequestException as e:
return f"下载失败!网络请求错误: {e}"
except OSError as e:
return f"下载失败!文件写入错误: {e}"
except Exception as e:
return f"下载失败!发生未知错误: {e}"
def connect_wifi(ssid, password):
"""
连接 Wi-Fi 并持久化凭证到 /boot/ 目录,使设备重启后自动连接。
返回 (ip, error) 元组。
"""
conf_path = "/etc/wpa_supplicant.conf"
ssid_file = "/boot/wifi.ssid"
pass_file = "/boot/wifi.pass"
try:
# 生成 wpa_supplicant 配置
net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read()
if "network={" not in net_conf:
return None, "Failed to generate wpa config"
# 写入运行时配置
with open(conf_path, "w") as f:
f.write("ctrl_interface=/var/run/wpa_supplicant\n")
f.write("update_config=1\n\n")
f.write(net_conf)
# 持久化保存(供开机脚本读取)
with open(ssid_file, "w") as f:
f.write(ssid.strip())
with open(pass_file, "w") as f:
f.write(password.strip())
# 重启 Wi-Fi 服务
os.system("/etc/init.d/S30wifi restart")
# 等待获取 IP最多 20 秒)
for _ in range(20):
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
if ip:
return ip, None
time.sleep(1)
return None, "Timeout: No IP obtained"
except Exception as e:
return None, f"Exception: {str(e)}"
def direct_ota_download():
"""
直接执行 OTA 下载(假设已有网络)
用于 cmd=7 触发
"""
global update_thread_started
try:
# 再次确认网络可达(可选但推荐)
from urllib.parse import urlparse
parsed_url = urlparse(url)
host = parsed_url.hostname
port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80)
if not is_server_reachable(host, port, timeout=8):
safe_enqueue({"result": "ota_failed", "reason": f"无法连接 {host}:{port}"}, MSG_TYPE_STATUS)
return
print(f"[OTA] 开始直接下载固件...")
result_msg = download_file(url, local_filename)
print(f"[OTA] {result_msg}")
safe_enqueue({"result": result_msg}, MSG_TYPE_STATUS)
except Exception as e:
error_msg = f"OTA 异常: {str(e)}"
print(error_msg)
safe_enqueue({"result": "ota_failed", "reason": error_msg}, MSG_TYPE_STATUS)
finally:
update_thread_started = False # 允许下次 OTA
def handle_wifi_and_update(ssid, password):
"""
OTA 更新线程入口。
注意:必须在 finally 中重置 update_thread_started
"""
global update_thread_started
try:
ip, error = connect_wifi(ssid, password)
if error:
safe_enqueue({"result": "wifi_failed", "error": error}, MSG_TYPE_STATUS)
return
safe_enqueue({"result": "wifi_connected", "ip": ip}, MSG_TYPE_STATUS)
from urllib.parse import urlparse
parsed_url = urlparse(url)
host = parsed_url.hostname
port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80)
if not is_server_reachable(host, port, timeout=8):
err_msg = f"网络不通:无法连接 {host}:{port}"
safe_enqueue({"result": err_msg}, MSG_TYPE_STATUS)
return
print(f"[OTA] 已确认可访问 {host}:{port},开始下载...")
try:
cs = download_file(url, local_filename)
except Exception as e:
cs = f"下载失败: {str(e)}"
print(cs)
safe_enqueue({"result": cs}, MSG_TYPE_STATUS)
finally:
# ✅ 关键修复:允许下次 OTA
update_thread_started = False
print("[UPDATE] OTA 线程执行完毕,标志已重置。")
# ==============================
# 工具函数
# ==============================
def read_device_id():
"""从 /device_key 读取设备唯一 ID"""
try:
with open("/device_key", "r") as f:
device_id = f.read().strip()
if device_id:
print(f"[INFO] 从 /device_key 读取到 DEVICE_ID: {device_id}")
return device_id
else:
raise ValueError("文件为空")
except Exception as e:
print(f"[ERROR] 无法读取 /device_key: {e}")
return "DEFAULT_DEVICE_ID"
def safe_enqueue(data_dict, msg_type=MSG_TYPE_STATUS):
"""线程安全地将消息加入发送队列"""
global send_queue, send_queue_lock
with send_queue_lock:
send_queue.append((msg_type, data_dict))
def at(cmd, wait="OK", timeout=2000):
"""向 4G 模块发送 AT 指令并等待响应"""
if cmd:
uart4g.write((cmd + "\r\n").encode())
t0 = time.ticks_ms()
buf = b""
while time.ticks_ms() - t0 < timeout:
data = uart4g.read()
if data:
buf += data
if wait.encode() in buf:
return buf.decode(errors="ignore")
return buf.decode(errors="ignore")
def make_packet(msg_type: int, body_dict: dict) -> bytes:
"""构造二进制数据包:[body_len][msg_type][checksum][body]"""
body = json.dumps(body_dict, ensure_ascii=False).encode('utf-8')
body_len = len(body)
checksum = body_len + msg_type
header = struct.pack(">III", body_len, msg_type, checksum)
return header + body
def parse_packet(data: bytes):
"""解析二进制数据包"""
if len(data) < 12:
return None, None
body_len, msg_type, checksum = struct.unpack(">III", data[:12])
body = data[12:12 + body_len]
try:
# ✅ 显式指定 UTF-8 编码
return msg_type, json.loads(body.decode('utf-8'))
except Exception as e:
print(f"[ERROR] 解析包体失败: {e}")
return msg_type, {"raw": body.decode('utf-8', errors='ignore')}
def tcp_send_raw(data: bytes, max_retries=2) -> bool:
"""通过 4G 模块发送原始 TCP 数据(仅在 tcp_main 线程调用)"""
global tcp_connected
if not tcp_connected:
return False
for attempt in range(max_retries):
cmd = f'AT+MIPSEND=0,{len(data)}'
if ">" not in at(cmd, ">", 1500):
time.sleep_ms(100)
continue
time.sleep_ms(10)
full = data + b"\x1A"
try:
sent = uart4g.write(full)
if sent != len(full):
time.sleep_ms(100)
continue
except:
time.sleep_ms(100)
continue
if "OK" in at("", "OK", 1000):
return True
time.sleep_ms(100)
return False
def load_laser_point():
"""从配置文件加载激光点坐标"""
global laser_point
try:
if "laser_config.json" in os.listdir("/root"):
with open(CONFIG_FILE, "r") as f:
data = json.load(f)
if isinstance(data, list) and len(data) == 2:
laser_point = (int(data[0]), int(data[1]))
print(f"[INFO] 加载激光点: {laser_point}")
else:
raise ValueError
else:
laser_point = DEFAULT_POINT
except:
laser_point = DEFAULT_POINT
def save_laser_point(point):
"""保存激光点坐标到文件"""
global laser_point
try:
with open(CONFIG_FILE, "w") as f:
json.dump([point[0], point[1]], f)
laser_point = point
except:
pass
def turn_on_laser():
"""发送激光开启指令"""
distance_serial.write(LASER_ON_CMD)
time.sleep_ms(10)
resp = distance_serial.read(20)
if resp:
if resp == LASER_ON_CMD:
print("✅ 激光指令已确认")
else:
print("🔇 无回包(正常或模块不支持)")
return resp
# ==============================
# M01 激光测距模块
# ==============================
def parse_bcd_distance(bcd_bytes: bytes) -> float:
"""将 4 字节 BCD 码转换为距离(米)"""
if len(bcd_bytes) != 4:
return 0.0
try:
hex_string = binascii.hexlify(bcd_bytes).decode()
distance_int = int(hex_string)
return distance_int / 1000.0
except Exception as e:
print(f"[ERROR] BCD 解析失败: {e}")
return 0.0
def read_distance_from_laser_sensor():
"""发送测距指令并返回距离(米)"""
global distance_serial
try:
distance_serial.read() # 清空缓冲区
distance_serial.write(DISTANCE_QUERY_CMD)
time.sleep_ms(500)
response = distance_serial.read(DISTANCE_RESPONSE_LEN)
if response and len(response) == DISTANCE_RESPONSE_LEN:
if response[3] != 0x20:
if response[0] == 0xEE:
err_code = (response[7] << 8) | response[8]
print(f"[LASER] 模块错误代码: {hex(err_code)}")
return 0.0
bcd_bytes = response[6:10]
distance_value_m = parse_bcd_distance(bcd_bytes)
signal_quality = (response[10] << 8) | response[11]
print(f"[LASER] 测距成功: {distance_value_m:.3f} m, 信号质量: {signal_quality}")
return distance_value_m
print(f"[LASER] 无效响应: {response.hex() if response else 'None'}")
return 0.0
except Exception as e:
print(f"[ERROR] 读取激光测距失败: {e}")
return 0.0
# ==============================
# 激光点校准
# ==============================
def find_red_laser(frame, threshold=150):
"""在图像中查找最亮的红色点(简单 RGB 判定)"""
w, h = frame.width(), frame.height()
img_bytes = frame.to_bytes()
max_sum = 0
best_pos = None
for y in range(0, h, 2):
for x in range(0, w, 2):
idx = (y * w + x) * 3
r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2]
if r > threshold and r > g * 2 and r > b * 2:
rgb_sum = r + g + b
if rgb_sum > max_sum:
max_sum = rgb_sum
best_pos = (x, y)
return best_pos
def calibrate_laser_position():
"""拍摄一帧并识别激光点位置"""
time.sleep_ms(80)
cam = camera.Camera(640, 480)
frame = cam.read()
pos = find_red_laser(frame)
if pos:
save_laser_point(pos)
return pos
return None
# ==============================
# 电量监测INA226
# ==============================
def write_register(reg, value):
data = [(value >> 8) & 0xFF, value & 0xFF]
bus.writeto_mem(INA226_ADDR, reg, bytes(data))
def read_register(reg):
data = bus.readfrom_mem(INA226_ADDR, reg, 2)
return (data[0] << 8) | data[1]
def init_ina226():
write_register(REG_CONFIGURATION, 0x4527)
write_register(REG_CALIBRATION, CALIBRATION_VALUE)
def get_bus_voltage():
raw = read_register(REG_BUS_VOLTAGE)
return raw * 1.25 / 1000
def voltage_to_percent(voltage):
points = [
(4.20, 100), (4.10, 95), (4.05, 85), (4.00, 75), (3.95, 65),
(3.90, 55), (3.85, 45), (3.80, 35), (3.75, 25), (3.70, 15),
(3.65, 5), (3.60, 0)
]
if voltage >= points[0][0]: return 100
if voltage <= points[-1][0]: return 0
for i in range(len(points) - 1):
v1, p1 = points[i]; v2, p2 = points[i + 1]
if v2 <= voltage <= v1:
ratio = (voltage - v1) / (v2 - v1)
percent = p1 + (p2 - p1) * ratio
return max(0, min(100, int(round(percent))))
return 0
# ==============================
# 目标检测
# ==============================
def detect_circle(frame):
"""检测靶心圆(清晰/模糊两种模式)"""
img_cv = image.image2cv(frame, False, False)
gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY)
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
edged = cv2.Canny(blurred, 50, 150)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
ceroded = cv2.erode(cv2.dilate(edged, kernel), kernel)
contours, _ = cv2.findContours(ceroded, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
best_center = best_radius = method = None
for cnt in contours:
area = cv2.contourArea(cnt)
perimeter = cv2.arcLength(cnt, True)
if perimeter < 100 or area < 100: continue
circularity = 4 * np.pi * area / (perimeter ** 2)
if circularity > 0.75 and len(cnt) >= 5:
center, axes, angle = cv2.fitEllipse(cnt)
radius = (axes[0] + axes[1]) / 4
best_center = (int(center[0]), int(center[1]))
best_radius = int(radius)
method = "清晰"
break
if not best_center:
hsv = cv2.cvtColor(img_cv, cv2.COLOR_BGR2HSV)
h, s, v = cv2.split(hsv)
s = np.clip(s * 2, 0, 255).astype(np.uint8)
hsv = cv2.merge((h, s, v))
lower_yellow = np.array([7, 80, 0])
upper_yellow = np.array([32, 255, 182])
mask = cv2.inRange(hsv, lower_yellow, upper_yellow)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
mask = cv2.morphologyEx(mask, cv2.MORPH_DILATE, kernel)
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if contours:
largest = max(contours, key=cv2.contourArea)
if cv2.contourArea(largest) > 50:
(x, y), radius = cv2.minEnclosingCircle(largest)
best_center = (int(x), int(y))
best_radius = int(radius)
method = "模糊"
result_img = image.cv2image(img_cv, False, False)
return result_img, best_center, best_radius, method, best_radius
def compute_laser_position(circle_center, laser_point, radius, method):
"""计算激光相对于靶心的偏差(单位:厘米)"""
if not all([circle_center, radius, method]):
return None, None
cx, cy = circle_center
lx, ly = laser_point
# 根据检测模式估算实际半径(单位:像素 → 厘米)
circle_r_cm = (radius / 4.0) * 20.0 if method == "模糊" else (68 / 16.0) * 20.0
dx = lx - cx
dy = ly - cy
scale = circle_r_cm / radius if radius != 0 else 1.0
return dx * scale, -dy * scale
# ==============================
# TCP 通信主线程
# ==============================
def connect_server():
"""连接服务器(通过 4G 模块 AT 指令)"""
global tcp_connected
if tcp_connected:
return True
print("正在连接服务器...")
at("AT+MIPCLOSE=0", "OK", 1000)
res = at(f'AT+MIPOPEN=0,"TCP","{SERVER_IP}",{SERVER_PORT}', "+MIPOPEN", 8000)
if "+MIPOPEN: 0,0" in res:
tcp_connected = True
return True
return False
def tcp_main():
"""TCP 通信主循环(独立线程)"""
global tcp_connected, send_queue, laser_calibration_active, laser_calibration_result,update_thread_started
while not app.need_exit():
if not connect_server():
time.sleep_ms(5000)
continue
login_data = {"deviceId": DEVICE_ID, "password": PASSWORD}
if not tcp_send_raw(make_packet(MSG_TYPE_LOGIN_REQ, login_data)):
tcp_connected = False
time.sleep_ms(2000)
continue
print("➡️ 登录包已发送,等待确认...")
logged_in = False
last_heartbeat_ack_time = time.ticks_ms()
last_heartbeat_send_time = time.ticks_ms()
rx_buf = b""
while True:
data = uart4g.read()
if data:
rx_buf += data
while b'+MIPURC: "rtcp"' in rx_buf:
try:
match = re.search(b'\+MIPURC: "rtcp",0,(\d+),(.+)', rx_buf, re.DOTALL)
if match:
payload_len = int(match.group(1))
payload = match.group(2)[:payload_len]
msg_type, body = parse_packet(payload)
if not logged_in and msg_type == MSG_TYPE_LOGIN_REQ:
if body and body.get("cmd") == 1 and body.get("data") == "登录成功":
logged_in = True
last_heartbeat_ack_time = time.ticks_ms()
print("✅ 登录成功")
else:
break
elif logged_in and msg_type == MSG_TYPE_HEARTBEAT:
last_heartbeat_ack_time = time.ticks_ms()
print("✅ 收到心跳确认")
elif logged_in and isinstance(body, dict):
inner_data = body.get("data", {})
if isinstance(inner_data, dict) and "cmd" in inner_data:
inner_cmd = inner_data["cmd"]
if inner_cmd == 2:
turn_on_laser()
time.sleep_ms(100)
laser_calibration_active = True
safe_enqueue({"result": "calibrating"}, MSG_TYPE_STATUS)
elif inner_cmd == 3:
distance_serial.write(LASER_OFF_CMD)
laser_calibration_active = False
safe_enqueue({"result": "laser_off"}, MSG_TYPE_STATUS)
elif inner_cmd == 4:
voltage = get_bus_voltage()
battery_percent = voltage_to_percent(voltage)
battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)}
safe_enqueue(battery_data, MSG_TYPE_STATUS)
elif inner_cmd == 5:
ssid = inner_data.get("ssid")
password = inner_data.get("password")
if not ssid or not password:
safe_enqueue({"result": "missing_ssid_or_password"}, MSG_TYPE_STATUS)
else:
# global update_thread_started
if not update_thread_started:
update_thread_started = True
_thread.start_new_thread(handle_wifi_and_update, (ssid, password))
else:
safe_enqueue({"result": "update_already_started"}, MSG_TYPE_STATUS)
elif inner_cmd == 6:
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
ip = ip if ip else "no_ip"
except:
ip = "error_getting_ip"
safe_enqueue({"result": "current_ip", "ip": ip}, MSG_TYPE_STATUS)
elif inner_cmd == 7:
# global update_thread_started
if update_thread_started:
safe_enqueue({"result": "update_already_started"}, MSG_TYPE_STATUS)
continue
# 实时检查是否有 IP
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
except:
ip = None
if not ip:
safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, MSG_TYPE_STATUS)
else:
# 启动纯下载线程
update_thread_started = True
_thread.start_new_thread(direct_ota_download, ())
rx_buf = rx_buf[match.end():]
else:
break
except Exception as e:
print(f"[ERROR] 解析/处理数据包失败: {e}")
rx_buf = b""
break
# 发送队列处理
msg_type = None
if logged_in:
with send_queue_lock:
if send_queue:
msg_type, data_dict = send_queue.pop(0)
if msg_type is not None:
pkt = make_packet(msg_type, data_dict)
if not tcp_send_raw(pkt):
print("💔 发送失败,断开重连")
break
# 校准结果上报
if logged_in:
x = y = None
with laser_calibration_data_lock:
if laser_calibration_result is not None:
x, y = laser_calibration_result
laser_calibration_result = None
if x is not None:
safe_enqueue({"result": "ok", "x": x, "y": y}, MSG_TYPE_STATUS)
# 心跳机制
current_time = time.ticks_ms()
if logged_in and current_time - last_heartbeat_send_time > HEARTBEAT_INTERVAL * 1000:
if not tcp_send_raw(make_packet(MSG_TYPE_HEARTBEAT, {"t": int(time.time())})):
print("💔 心跳发送失败")
break
last_heartbeat_send_time = current_time
if logged_in and current_time - last_heartbeat_ack_time > 6000:
print("⏰ 6秒无心跳ACK重连")
break
time.sleep_ms(50)
tcp_connected = False
time.sleep_ms(2000)
def laser_calibration_worker():
"""后台激光校准线程"""
global laser_calibration_active, laser_calibration_result
while True:
if laser_calibration_active:
result = calibrate_laser_position()
if result and len(result) == 2:
with laser_calibration_data_lock:
laser_calibration_result = result
laser_calibration_active = False
print(f"✅ 后台校准成功: {result}")
else:
time.sleep_ms(80)
else:
time.sleep_ms(50)
# ==============================
# 主程序入口
# ==============================
def cmd_str():
global DEVICE_ID, PASSWORD
DEVICE_ID = read_device_id()
PASSWORD = DEVICE_ID + "."
photo_dir = "/root/phot"
if photo_dir not in os.listdir("/root"):
try:
os.mkdir(photo_dir)
except:
pass
init_ina226()
load_laser_point()
disp = display.Display()
cam = camera.Camera(640, 480)
_thread.start_new_thread(tcp_main, ())
_thread.start_new_thread(laser_calibration_worker, ())
print("系统准备完成...")
while not app.need_exit():
if adc_obj.read() > ADC_TRIGGER_THRESHOLD:
time.sleep_ms(60)
frame = cam.read()
x, y = laser_point
frame.draw_line(int(x - length), int(y), int(x + length), int(y), color, thickness)
frame.draw_line(int(x), int(y - length), int(x), int(y + length), color, thickness)
frame.draw_circle(int(x), int(y), 1, color, thickness)
result_img, center, radius, method, _ = detect_circle(frame)
disp.show(result_img)
dx, dy = compute_laser_position(center, (x, y), radius, method)
distance_m = read_distance_from_laser_sensor()
voltage = get_bus_voltage()
battery_percent = voltage_to_percent(voltage)
try:
jpg_count = len([f for f in os.listdir(photo_dir) if f.endswith('.jpg')])
filename = f"{photo_dir}/{int(x)}_{int(y)}_{round((distance_m or 0.0) * 100)}_{method}_{jpg_count:04d}.jpg"
result_img.save(filename, quality=70)
except Exception as e:
print(f"❌ 保存照片失败: {e}")
inner_data = {
"x": float(dx) if dx is not None else 200.0,
"y": float(dy) if dy is not None else 200.0,
"r": 90.0,
"d": round((distance_m or 0.0) * 100),
"m": method
}
report_data = {"cmd": 1, "data": inner_data}
safe_enqueue(report_data, MSG_TYPE_STATUS)
time.sleep_ms(100)
else:
disp.show(cam.read())
time.sleep_ms(50)
if __name__ == "__main__":
cmd_str()

View File

@@ -136,9 +136,8 @@ class LaserManager:
written = hardware_manager.distance_serial.write(config.LASER_ON_CMD) written = hardware_manager.distance_serial.write(config.LASER_ON_CMD)
self.logger.info(f"[LASER] 写入字节数: {written}") self.logger.info(f"[LASER] 写入字节数: {written}")
# return None time.sleep_ms(60)
# TODO: 暂时去掉这个等待
# 读取回包 # 读取回包
resp = hardware_manager.distance_serial.read(len=20,timeout=10) resp = hardware_manager.distance_serial.read(len=20,timeout=10)
if resp: if resp:
@@ -170,6 +169,7 @@ class LaserManager:
written = hardware_manager.distance_serial.write(config.LASER_OFF_CMD) written = hardware_manager.distance_serial.write(config.LASER_OFF_CMD)
self.logger.info(f"[LASER] 写入字节数: {written}") self.logger.info(f"[LASER] 写入字节数: {written}")
time.sleep_ms(60)
# 读取回包 # 读取回包
resp = hardware_manager.distance_serial.read(20) resp = hardware_manager.distance_serial.read(20)
@@ -190,23 +190,6 @@ class LaserManager:
except Exception as e: except Exception as e:
self.logger.error(f"闪激光失败: {e}") self.logger.error(f"闪激光失败: {e}")
# def find_red_laser(self, frame, threshold=150):
# """在图像中查找最亮的红色激光点(基于 RGB 阈值)"""
# w, h = frame.width(), frame.height()
# img_bytes = frame.to_bytes()
# max_sum = 0
# best_pos = None
# for y in range(0, h, 2):
# for x in range(0, w, 2):
# idx = (y * w + x) * 3
# r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2]
# if r > threshold and r > g * 2 and r > b * 2:
# rgb_sum = r + g + b
# if rgb_sum > max_sum:
# max_sum = rgb_sum
# best_pos = (x, y)
# return best_pos
# def find_red_laser(self, frame, threshold=150, search_radius=50): # def find_red_laser(self, frame, threshold=150, search_radius=50):
# """ # """
# 在图像中心附近查找最亮的红色激光点(基于 RGB 阈值) # 在图像中心附近查找最亮的红色激光点(基于 RGB 阈值)
@@ -1150,33 +1133,5 @@ class LaserManager:
# 创建全局单例实例 # 创建全局单例实例
laser_manager = LaserManager() laser_manager = LaserManager()
# ==================== 向后兼容的函数接口 ====================
def load_laser_point():
"""加载激光点(向后兼容接口)"""
return laser_manager.load_laser_point()
def save_laser_point(point):
"""保存激光点(向后兼容接口)"""
return laser_manager.save_laser_point(point)
def turn_on_laser():
"""开启激光(向后兼容接口)"""
return laser_manager.turn_on_laser()
def turn_off_laser():
"""关闭激光(向后兼容接口)"""
return laser_manager.turn_off_laser()
def flash_laser(duration_ms=1000):
"""闪激光(向后兼容接口)"""
return laser_manager.flash_laser(duration_ms)
def find_red_laser(frame, threshold=150):
"""查找红色激光点(向后兼容接口)"""
return laser_manager.find_red_laser(frame, threshold)
def calibrate_laser_position():
"""校准激光位置(向后兼容接口)"""
return laser_manager.calibrate_laser_position()

76
main.py
View File

@@ -30,79 +30,6 @@ from hardware import hardware_manager
from camera_manager import camera_manager from camera_manager import camera_manager
# def laser_calibration_worker():
# """后台线程:持续检测是否需要执行激光校准"""
# from maix import camera
# from laser_manager import laser_manager
# from ota_manager import ota_manager
# logger = logger_manager.logger
# if logger:
# logger.info("[LASER] 激光校准线程启动")
# while True:
# try:
# try:
# if ota_manager.ota_in_progress:
# time.sleep_ms(200)
# continue
# except Exception as e:
# logger = logger_manager.logger
# if logger:
# logger.error(f"[LASER] OTA检查异常: {e}")
# time.sleep_ms(200)
# continue
# if laser_manager.calibration_active:
# cam = None
# try:
# cam = camera.Camera(640, 480)
# start = time.ticks_ms()
# timeout_ms = 8000
# while laser_manager.calibration_active and time.ticks_diff(time.ticks_ms(), start) < timeout_ms:
# frame = cam.read()
# pos = laser_manager.find_red_laser(frame)
# if pos:
# laser_manager.set_calibration_result(pos)
# laser_manager.stop_calibration()
# laser_manager.save_laser_point(pos)
# logger = logger_manager.logger
# if logger:
# logger.info(f"✅ 后台校准成功: {pos}")
# break
# time.sleep_ms(60)
# except Exception as e:
# logger = logger_manager.logger
# if logger:
# logger.error(f"[LASER] calibration error: {e}")
# import traceback
# logger.error(traceback.format_exc())
# time.sleep_ms(200)
# finally:
# try:
# if cam is not None:
# del cam
# except Exception as e:
# logger = logger_manager.logger
# if logger:
# logger.error(f"[LASER] 释放相机资源异常: {e}")
# if laser_manager.calibration_active:
# time.sleep_ms(300)
# else:
# time.sleep_ms(50)
# except Exception as e:
# # 线程顶层异常捕获,防止线程静默退出
# logger = logger_manager.logger
# if logger:
# logger.error(f"[LASER] 校准线程异常: {e}")
# import traceback
# logger.error(traceback.format_exc())
# else:
# print(f"[LASER] 校准线程异常: {e}")
# import traceback
# traceback.print_exc()
# time.sleep_ms(1000) # 等待1秒后继续
def laser_calibration_worker(): def laser_calibration_worker():
"""后台线程:持续检测是否需要执行激光校准""" """后台线程:持续检测是否需要执行激光校准"""
from laser_manager import laser_manager from laser_manager import laser_manager
@@ -272,7 +199,8 @@ def cmd_str():
# 6. 启动通信与校准线程 # 6. 启动通信与校准线程
_thread.start_new_thread(network_manager.tcp_main, ()) _thread.start_new_thread(network_manager.tcp_main, ())
_thread.start_new_thread(laser_calibration_worker, ()) if not config.HARDCODE_LASER_POINT:
_thread.start_new_thread(laser_calibration_worker, ())
# 7. 加载激光点配置 # 7. 加载激光点配置
laser_manager.load_laser_point() laser_manager.load_laser_point()

View File

@@ -15,6 +15,7 @@ import os
import threading import threading
import socket import socket
import config import config
from hardware import hardware_manager from hardware import hardware_manager
from power import get_bus_voltage, voltage_to_percent from power import get_bus_voltage, voltage_to_percent
# from laser import laser_manager # from laser import laser_manager
@@ -57,6 +58,23 @@ class NetworkManager:
self._wifi_recv_buffer = b"" # WiFi接收缓冲区 self._wifi_recv_buffer = b"" # WiFi接收缓冲区
self._initialized = True self._initialized = True
# 导入 archery_netcore 模块,并检查是否存在 parse_packet 和 make_packet 函数
try:
import archery_netcore as _netcore
self._netcore = _netcore
if hasattr(self._netcore, "parse_packet") and hasattr(self._netcore, "make_packet") and hasattr(self._netcore, "actions_for_inner_cmd"):
print("[NET] archery_netcore found")
else:
print("[NET] archery_netcore not found parse_packet or make_packet")
exit(1)
except Exception:
print("[NET] import archery_netcore failed")
exit(1)
# 服务器相关
self._server_ip = self._netcore.get_config().get("SERVER_IP")
self._server_port = self._netcore.get_config().get("SERVER_PORT")
# ==================== 状态访问(只读属性)==================== # ==================== 状态访问(只读属性)====================
@property @property
@@ -287,7 +305,7 @@ class NetworkManager:
# 策略1如果指定优先WiFi且WiFi可用使用WiFi # 策略1如果指定优先WiFi且WiFi可用使用WiFi
if prefer_wifi and self.is_wifi_connected(): if prefer_wifi and self.is_wifi_connected():
# 检查WiFi是否能连接到服务器 # 检查WiFi是否能连接到服务器
if self.is_server_reachable(config.SERVER_IP, config.SERVER_PORT, timeout=3): if self.is_server_reachable(self._server_ip, self._server_port, timeout=3):
self._network_type = "wifi" self._network_type = "wifi"
self.logger.info(f"[NET] 选择WiFi网络IP: {self._wifi_ip}") self.logger.info(f"[NET] 选择WiFi网络IP: {self._wifi_ip}")
return "wifi" return "wifi"
@@ -296,7 +314,7 @@ class NetworkManager:
# 策略2如果WiFi可用使用WiFi # 策略2如果WiFi可用使用WiFi
if self.is_wifi_connected(): if self.is_wifi_connected():
if self.is_server_reachable(config.SERVER_IP, config.SERVER_PORT, timeout=3): if self.is_server_reachable(self._server_ip, self._server_port, timeout=3):
self._network_type = "wifi" self._network_type = "wifi"
self.logger.info(f"[NET] 选择WiFi网络IP: {self._wifi_ip}") self.logger.info(f"[NET] 选择WiFi网络IP: {self._wifi_ip}")
return "wifi" return "wifi"
@@ -324,6 +342,27 @@ class NetworkManager:
if len(data) < 12: if len(data) < 12:
return None, None return None, None
body_len, msg_type, checksum = struct.unpack(">III", data[:12]) body_len, msg_type, checksum = struct.unpack(">III", data[:12])
expected_len = 12 + body_len
# 防御性检查:如果 data 比预期长,说明可能有粘包
if len(data) > expected_len:
self.logger.warning(
f"[TCP] parse_packet: data length ({len(data)}) > expected ({expected_len}), "
f"possible packet concatenation. body_len={body_len}, msg_type={msg_type}"
)
# 只解析第一个包,忽略多余数据(或者可以返回剩余部分)
# data = data[:expected_len]
# TODO 是否需要解析剩余部分?
# 如果 data 比预期短,说明包不完整(半包)
if len(data) < expected_len:
self.logger.warning(
f"[TCP] parse_packet: data length ({len(data)}) < expected ({expected_len}), "
f"incomplete packet. body_len={body_len}, msg_type={msg_type}"
)
return None, None
body = data[12:12 + body_len] body = data[12:12 + body_len]
try: try:
return msg_type, json.loads(body.decode()) return msg_type, json.loads(body.decode())
@@ -395,7 +434,7 @@ class NetworkManager:
link_id = getattr(config, "TCP_LINK_ID", 0) link_id = getattr(config, "TCP_LINK_ID", 0)
use_ssl = getattr(config, "USE_TCP_SSL", False) use_ssl = getattr(config, "USE_TCP_SSL", False)
host = config.SERVER_IP host = self._server_ip
port = getattr(config, "TCP_SSL_PORT", 443) if use_ssl else config.SERVER_PORT port = getattr(config, "TCP_SSL_PORT", 443) if use_ssl else config.SERVER_PORT
tail = getattr(config, "MIPOPEN_TAIL", "") tail = getattr(config, "MIPOPEN_TAIL", "")
with self.get_uart_lock(): with self.get_uart_lock():
@@ -642,31 +681,6 @@ class NetworkManager:
SALT2 = "shoot" SALT2 = "shoot"
return "Arrow_" + hmac.new((SALT + device_id).encode(), SALT2.encode(), hashlib.sha256).hexdigest() return "Arrow_" + hmac.new((SALT + device_id).encode(), SALT2.encode(), hashlib.sha256).hexdigest()
def send_http_cmd(self, cmd_str, timeout_ms=3000):
"""发送 HTTP 相关 AT 指令(调试用)"""
self.logger.debug(f"[HTTP AT] => {cmd_str}")
return hardware_manager.at_client.send(cmd_str, "OK", timeout_ms)
def upload_shoot_event(self,json_data):
"""通过 4G 模块上报射击事件到 HTTP 接口(备用通道)"""
token = self.generate_token(self.device_id)
if not self.send_http_cmd(f'AT+MHTTPCREATE="{config.HTTP_URL}"'):
return False
instance_id = 0
self.send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"Content-Type: application/json"')
self.send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"Authorization: {token}"')
self.send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"DeviceId: {self.device_id}"')
json_str = ujson.dumps(json_data)
if not self.send_http_cmd(f'AT+MHTTPCONTENT={instance_id},0,0,"{json_str}"'):
return False
if self.send_http_cmd(f'AT+MHTTPREQUEST={instance_id},2,0,"{config.HTTP_API_PATH}"'):
time.sleep_ms(5000)
return True
return False
def tcp_main(self): def tcp_main(self):
"""TCP 主通信循环:登录、心跳、处理指令、发送数据""" """TCP 主通信循环:登录、心跳、处理指令、发送数据"""
import _thread import _thread
@@ -709,7 +723,8 @@ class NetworkManager:
"vol": vol_val, "vol": vol_val,
"vol_per": voltage_to_percent(vol_val) "vol_per": voltage_to_percent(vol_val)
} }
if not self.tcp_send_raw(self.make_packet(1, login_data)): # if not self.tcp_send_raw(self.make_packet(1, login_data)):
if not self.tcp_send_raw(self._netcore.make_packet(1, login_data)):
self._tcp_connected = False self._tcp_connected = False
time.sleep_ms(2000) time.sleep_ms(2000)
continue continue
@@ -777,7 +792,8 @@ class NetworkManager:
except: except:
pass pass
msg_type, body = self.parse_packet(payload) # msg_type, body = self.parse_packet(payload)
msg_type, body = self._netcore.parse_packet(payload)
# 处理登录响应 # 处理登录响应
if not logged_in and msg_type == 1: if not logged_in and msg_type == 1:
@@ -845,8 +861,12 @@ class NetworkManager:
if not laser_manager.calibration_active: if not laser_manager.calibration_active:
laser_manager.turn_on_laser() laser_manager.turn_on_laser()
time.sleep_ms(100) time.sleep_ms(100)
laser_manager.start_calibration() if not config.HARDCODE_LASER_POINT:
self.safe_enqueue({"result": "calibrating"}, 2) laser_manager.start_calibration()
self.safe_enqueue({"result": "calibrating"}, 2)
else:
# 写死的逻辑,不需要校准激光点
self.safe_enqueue({"result": "laser pos set by hard code"}, 2)
elif inner_cmd == 3: # 关闭激光 elif inner_cmd == 3: # 关闭激光
from laser_manager import laser_manager from laser_manager import laser_manager
laser_manager.turn_off_laser() laser_manager.turn_off_laser()
@@ -962,7 +982,8 @@ class NetworkManager:
self.get_queue_lock().release() self.get_queue_lock().release()
if msg_type is not None and data_dict is not None: if msg_type is not None and data_dict is not None:
pkt = self.make_packet(msg_type, data_dict) pkt = self._netcore.make_packet(msg_type, data_dict)
# pkt = self.make_packet(msg_type, data_dict)
if not self.tcp_send_raw(pkt): if not self.tcp_send_raw(pkt):
self._tcp_connected = False self._tcp_connected = False
break break
@@ -979,7 +1000,8 @@ class NetworkManager:
current_time = time.ticks_ms() current_time = time.ticks_ms()
if logged_in and current_time - last_heartbeat_send_time > config.HEARTBEAT_INTERVAL * 1000: if logged_in and current_time - last_heartbeat_send_time > config.HEARTBEAT_INTERVAL * 1000:
vol_val = get_bus_voltage() vol_val = get_bus_voltage()
if not self.tcp_send_raw(self.make_packet(4, {"vol": vol_val, "vol_per": voltage_to_percent(vol_val)})): if not self.tcp_send_raw(self._netcore.make_packet(4, {"vol": vol_val, "vol_per": voltage_to_percent(vol_val)})):
# if not self.tcp_send_raw(self.make_packet(4, {"vol": vol_val, "vol_per": voltage_to_percent(vol_val)})):
self.logger.error("心跳发送失败") self.logger.error("心跳发送失败")
time.sleep_ms(3000) time.sleep_ms(3000)
send_hartbeat_fail_count += 1 send_hartbeat_fail_count += 1

152
package.py Normal file
View File

@@ -0,0 +1,152 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
应用打包脚本
根据 app.yaml 中列出的文件,打包成 zip 文件
版本号从 version.py 中读取
"""
import os
import yaml
import zipfile
from datetime import datetime
import sys
# 添加当前目录到路径,以便导入 version 模块
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
def load_app_yaml(yaml_path='app.yaml'):
"""加载 app.yaml 文件"""
try:
with open(yaml_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
except Exception as e:
print(f"[ERROR] 读取 {yaml_path} 失败: {e}")
return None
def check_files_exist(files, base_dir='.'):
"""检查文件是否存在"""
missing_files = []
existing_files = []
for file_path in files:
full_path = os.path.join(base_dir, file_path)
if os.path.exists(full_path):
existing_files.append(file_path)
else:
missing_files.append(file_path)
return existing_files, missing_files
def get_version_from_version_py():
"""从 version.py 读取版本号"""
try:
from version import VERSION
return VERSION
except ImportError:
print("[WARNING] 无法导入 version.py使用默认版本号 1.0.0")
return '1.0.0'
except Exception as e:
print(f"[WARNING] 读取 version.py 失败: {e},使用默认版本号 1.0.0")
return '1.0.0'
def create_zip_package(app_info, files, output_dir='.', base_dir='.'):
"""创建 zip 打包文件"""
# 生成输出文件名:{name}_v{version}_{timestamp}.zip
# 版本号从 version.py 读取,而不是从 app.yaml
app_name = app_info.get('name', 'app')
version = get_version_from_version_py() # 从 version.py 读取版本号
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
zip_filename = f"{app_name}_v{version}_{timestamp}.zip"
zip_path = os.path.join(output_dir, zip_filename)
print(f"[INFO] 开始打包: {zip_filename}")
print(f"[INFO] 包含文件数: {len(files)}")
try:
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file_path in files:
full_path = os.path.join(base_dir, file_path)
# 使用相对路径作为 zip 内的路径
zipf.write(full_path, file_path)
print(f"{file_path}")
# 获取文件大小
file_size = os.path.getsize(zip_path)
file_size_mb = file_size / (1024 * 1024)
print(f"\n[SUCCESS] 打包完成!")
print(f" 文件名: {zip_filename}")
print(f" 文件大小: {file_size_mb:.2f} MB ({file_size:,} 字节)")
print(f" 文件路径: {os.path.abspath(zip_path)}")
return zip_path
except Exception as e:
print(f"[ERROR] 打包失败: {e}")
import traceback
traceback.print_exc()
return None
def main():
"""主函数"""
print("=" * 60)
print("应用打包脚本")
print("=" * 60)
# 1. 加载 app.yaml
app_info = load_app_yaml('app.yaml')
if app_info is None:
return
# 从 version.py 读取版本号
version = get_version_from_version_py()
print(f"\n[INFO] 应用信息:")
print(f" ID: {app_info.get('id', 'N/A')}")
print(f" 名称: {app_info.get('name', 'N/A')}")
print(f" 版本: {version} (来自 version.py)")
print(f" 作者: {app_info.get('author', 'N/A')}")
if app_info.get('version') != version:
print(f" [注意] app.yaml 中的版本 ({app_info.get('version', 'N/A')}) 与 version.py 不一致")
# 2. 获取文件列表
files = app_info.get('files', [])
if not files:
print("[ERROR] app.yaml 中没有找到 files 列表")
return
print(f"\n[INFO] 文件列表 ({len(files)} 个文件):")
# 3. 检查文件是否存在
existing_files, missing_files = check_files_exist(files)
if missing_files:
print(f"\n[WARNING] 以下文件不存在,将被跳过:")
for f in missing_files:
print(f"{f}")
if not existing_files:
print("\n[ERROR] 没有找到任何有效文件,无法打包")
return
print(f"\n[INFO] 找到 {len(existing_files)} 个有效文件")
# 4. 创建 zip 包
zip_path = create_zip_package(app_info, existing_files)
if zip_path:
print("\n" + "=" * 60)
print("打包成功完成!")
print("=" * 60)
else:
print("\n" + "=" * 60)
print("打包失败!")
print("=" * 60)
if __name__ == "__main__":
main()

View File

@@ -4,7 +4,9 @@
应用版本号 应用版本号
每次 OTA 更新时,只需要更新这个文件中的版本号 每次 OTA 更新时,只需要更新这个文件中的版本号
""" """
VERSION = '1.1.5' VERSION = '1.2.0'
# 1.2.0 开始使用C++编译成.so替换部分代码