Compare commits

...

7 Commits

Author SHA1 Message Date
gcw_4spBpAfv
ff629e596d 一般时候不预览照片 2026-02-10 17:54:11 +08:00
gcw_4spBpAfv
592dc6ceb1 v1.2.8 2026-02-10 17:52:55 +08:00
gcw_4spBpAfv
573c0a3385 v1.2.7 2026-02-09 11:24:46 +08:00
gcw_4spBpAfv
8aea76d99b v1.2.5 2026-02-07 17:09:39 +08:00
gcw_4spBpAfv
61096ba190 'v1.2.3' 2026-02-05 12:45:52 +08:00
gcw_4spBpAfv
f476545172 v1.2.2 2026-01-24 15:50:25 +08:00
gcw_4spBpAfv
aae97f6ce9 v1.2.2 2026-01-24 15:45:32 +08:00
19 changed files with 1656 additions and 532 deletions

399
4g_download_manager.py Normal file
View File

@@ -0,0 +1,399 @@
import re
import hashlib
import binascii
from maix import time
from power import get_bus_voltage, voltage_to_percent
from urllib.parse import urlparse
from hardware import hardware_manager
class DownloadManager4G:
"""4g下载管理器单例"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(DownloadManager4G, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# 私有状态
self.FRAG_SIZE = 1024
self.FRAG_DELAY = 10
self._initialized = True
def _log(self, *a):
if debug:
self.logger.debug(" ".join(str(x) for x in a))
def _pwr_log(self, prefix=""):
"""debug 用:输出电压/电量"""
if not debug:
return
try:
v = get_bus_voltage()
p = voltage_to_percent(v)
self.logger.debug(f"[PWR]{prefix} v={v:.3f}V p={p}%")
except Exception as e:
try:
self.logger.debug(f"[PWR]{prefix} read_failed: {e}")
except:
pass
def _clear_http_events(self):
if hardware_manager.at_client:
while hardware_manager.at_client.pop_http_event() is not None:
pass
def _parse_httpid(self, resp: str):
m = re.search(r"\+MHTTPCREATE:\s*(\d+)", resp)
return int(m.group(1)) if m else None
def _get_ip(self, ):
r = hardware_manager.at_client.send("AT+CGPADDR=1", "OK", 3000)
m = re.search(r'\+CGPADDR:\s*1,"([^"]+)"', r)
return m.group(1) if m else ""
def _ensure_pdp(self, ):
ip = self._get_ip()
if ip and ip != "0.0.0.0":
return True, ip
hardware_manager.at_client.send("AT+MIPCALL=1,1", "OK", 15000)
for _ in range(10):
ip = self._get_ip()
if ip and ip != "0.0.0.0":
return True, ip
time.sleep(1)
return False, ip
def _extract_hdr_fields(self, hdr_text: str):
mlen = re.search(r"Content-Length:\s*(\d+)", hdr_text, re.IGNORECASE)
clen = int(mlen.group(1)) if mlen else None
mmd5 = re.search(r"Content-Md5:\s*([A-Za-z0-9+/=]+)", hdr_text, re.IGNORECASE)
md5_b64 = mmd5.group(1).strip() if mmd5 else None
return clen, md5_b64
def _extract_content_range(self, hdr_text: str):
m = re.search(r"Content-Range:\s*bytes\s*(\d+)\s*-\s*(\d+)\s*/\s*(\d+)", hdr_text, re.IGNORECASE)
if not m:
return None, None, None
try:
return int(m.group(1)), int(m.group(2)), int(m.group(3))
except:
return None, None, None
def _hard_reset_http(self, ):
"""模块进入"坏状态"时的保守清场"""
self._clear_http_events()
for i in range(0, 6):
try:
hardware_manager.at_client.send(f"AT+MHTTPDEL={i}", "OK", 1200)
except:
pass
self._clear_http_events()
def _create_httpid(self, full_reset=False):
self._clear_http_events()
if hardware_manager.at_client:
hardware_manager.at_client.flush()
if full_reset:
self._hard_reset_http()
resp = hardware_manager.at_client.send(f'AT+MHTTPCREATE="{base_url}"', "OK", 8000)
hid = self._parse_httpid(resp)
if self._is_https:
resp = hardware_manager.at_client.send(f'AT+MHTTPCFG="ssl",{hid},1,1', "OK", 2000)
if "ERROR" in resp or "CME ERROR" in resp:
self.logger.error(f"MHTTPCFG SSL failed: {resp}")
# 尝试https 降级到http
downgraded_base_url = base_url.replace("https://", "http://")
resp = hardware_manager.at_client.send(f'AT+MHTTPCREATE="{downgraded_base_url}"', "OK", 8000)
hid = self._parse_httpid(resp)
return hid, resp
def _fetch_range_into_buf(self, start, want_len, out_buf, path, full_reset=False):
"""
请求 Range [start, start+want_len),写入 out_bufbytearray长度=want_len
返回 (ok, msg, total_len, md5_b64, got_len)
"""
end_incl = start + want_len - 1
hid, cresp = self._create_httpid(full_reset=full_reset)
if hid is None:
return False, f"MHTTPCREATE failed: {cresp}", None, None, 0
# 降低 URC 压力(分片/延迟)
hardware_manager.at_client.send(f'AT+MHTTPCFG="fragment",{hid},{self.FRAG_SIZE},{self.FRAG_DELAY}', "OK", 1500)
# 设置 Range headerinclusive
hardware_manager.at_client.send(f'AT+MHTTPCFG="header",{hid},"Range: bytes={start}-{end_incl}"', "OK", 3000)
req = hardware_manager.at_client.send(f'AT+MHTTPREQUEST={hid},1,0,"{path}"', "OK", 15000)
if "ERROR" in req or "CME ERROR" in req:
hardware_manager.at_client.send(f"AT+MHTTPDEL={hid}", "OK", 2000)
return False, f"MHTTPREQUEST failed: {req}", None, None, 0
# 等 header + content
hdr_text = None
hdr_accum = ""
code = None
resp_total = None
total_len = None
md5_b64 = None
got_ranges = set()
last_sum = 0
t0 = time.ticks_ms()
timeout_ms = 9000
logged_hdr = False
while time.ticks_ms() - t0 < timeout_ms:
ev = hardware_manager.at_client.pop_http_event() if hardware_manager.at_client else None
if not ev:
time.sleep_ms(5)
continue
if ev[0] == "header":
_, ehid, ecode, ehdr = ev
if ehid != hid:
continue
code = ecode
hdr_text = ehdr
if ehdr:
hdr_accum = (hdr_accum + "\n" + ehdr) if hdr_accum else ehdr
resp_total_tmp, md5_tmp = self._extract_hdr_fields(hdr_accum)
if md5_tmp:
md5_b64 = md5_tmp
cr_s, cr_e, cr_total = self._extract_content_range(hdr_accum)
if cr_total is not None:
total_len = cr_total
if resp_total_tmp is not None:
resp_total = resp_total_tmp
elif resp_total is None and (cr_s is not None) and (cr_e is not None) and (cr_e >= cr_s):
resp_total = (cr_e - cr_s + 1)
if (not logged_hdr) and (resp_total is not None or total_len is not None):
self._log(f"[HDR] id={hid} code={code} clen={resp_total} cr={cr_s}-{cr_e}/{cr_total}")
logged_hdr = True
continue
if ev[0] == "content":
_, ehid, _total, _sum, _cur, payload = ev
if ehid != hid:
continue
if resp_total is None:
resp_total = _total
if resp_total is None or resp_total <= 0:
continue
start_rel = _sum - _cur
end_rel = _sum
if start_rel < 0 or start_rel >= resp_total:
continue
if end_rel > resp_total:
end_rel = resp_total
actual_len = min(len(payload), end_rel - start_rel)
if actual_len <= 0:
continue
out_buf[start_rel:start_rel + actual_len] = payload[:actual_len]
got_ranges.add((start_rel, start_rel + actual_len))
if _sum > last_sum:
last_sum = _sum
if debug and (last_sum >= resp_total or (last_sum % 512 == 0)):
self._log(f"[CHUNK] {start}+{last_sum}/{resp_total}")
if last_sum >= resp_total:
break
# 清理实例(快路径:只删当前 hid
try:
hardware_manager.at_client.send(f"AT+MHTTPDEL={hid}", "OK", 2000)
except:
pass
if resp_total is None:
return False, "no_header_or_total", total_len, md5_b64, 0
# 计算实际填充长度
merged = sorted(got_ranges)
merged2 = []
for s, e in merged:
if not merged2 or s > merged2[-1][1]:
merged2.append((s, e))
else:
merged2[-1] = (merged2[-1][0], max(merged2[-1][1], e))
filled = sum(e - s for s, e in merged2)
if filled < resp_total:
return False, f"incomplete_chunk got={filled} expected={resp_total} code={code}", total_len, md5_b64, filled
got_len = resp_total
return True, "OK", total_len, md5_b64, got_len
def download_file_via_4g(self, url, filename,
total_timeout_ms=600000,
retries=3,
debug=False):
"""
ML307R HTTP 下载(更稳的"固定小块 Range 顺序下载"基于main109.py
- 只依赖 +MHTTPURC:"header"/"content"(不依赖 MHTTPREAD/cached
- 每次只请求一个小块 Range默认 10240B失败就重试同一块必要时缩小块大小
- 每个 chunk 都重新 MHTTPCREATE/MHTTPREQUEST避免卡在"206 header 但不吐 content"的坏状态
- 使用二进制模式下载,确保文件完整性
"""
# 小块策略与main109.py保持一致
CHUNK_MAX = 10240
CHUNK_MIN = 128
CHUNK_RETRIES = 12
t_func0 = time.ticks_ms()
parsed = urlparse(url)
host = parsed.hostname
path = parsed.path or "/"
if not host:
return False, "bad_url (no host)"
if isinstance(url, str) and url.startswith("https://static.shelingxingqiu.com/"):
base_url = "https://static.shelingxingqiu.com"
# TODO使用https看看是否能成功
self._is_https = True
else:
base_url = f"http://{host}"
self._is_https = False
try:
self._begin_ota()
except:
pass
from network import network_manager
with network_manager.get_uart_lock():
try:
ok_pdp, ip = self._ensure_pdp()
if not ok_pdp:
return False, f"PDP not ready (ip={ip})"
# 先清空旧事件,避免串台
self._clear_http_events()
# 为了支持随机写入,先创建空文件
try:
with open(filename, "wb") as f:
f.write(b"")
except Exception as e:
return False, f"open_file_failed: {e}"
total_len = None
expect_md5_b64 = None
offset = 0
chunk = CHUNK_MAX
t_start = time.ticks_ms()
last_progress_ms = t_start
STALL_TIMEOUT_MS = 60000
last_pwr_ms = t_start
self._pwr_log(prefix=" ota_start")
bad_http_state = 0
while True:
now = time.ticks_ms()
if debug and time.ticks_diff(now, last_pwr_ms) >= 5000:
last_pwr_ms = now
self._pwr_log(prefix=f" off={offset}/{total_len or '?'}")
if time.ticks_diff(now, t_start) > total_timeout_ms:
return False, f"timeout overall after {total_timeout_ms}ms offset={offset} total={total_len}"
if time.ticks_diff(now, last_progress_ms) > STALL_TIMEOUT_MS:
return False, f"timeout stalled {STALL_TIMEOUT_MS}ms offset={offset} total={total_len}"
if total_len is not None and offset >= total_len:
break
want = chunk
if total_len is not None:
remain = total_len - offset
if remain <= 0:
break
if want > remain:
want = remain
# 本 chunk 的 buffer长度=want
buf = bytearray(want)
success = False
last_err = "unknown"
md5_seen = None
got_len = 0
for k in range(1, CHUNK_RETRIES + 1):
do_full_reset = (bad_http_state >= 2)
ok, msg, tlen, md5_b64, got = self._fetch_range_into_buf(offset, want, buf, base_url, path, full_reset=do_full_reset)
last_err = msg
if tlen is not None and total_len is None:
total_len = tlen
if md5_b64 and not expect_md5_b64:
expect_md5_b64 = md5_b64
if ok:
success = True
got_len = got
bad_http_state = 0
break
try:
if ("no_header_or_total" in msg) or ("MHTTPREQUEST failed" in msg) or (
"MHTTPCREATE failed" in msg):
bad_http_state += 1
else:
bad_http_state = max(0, bad_http_state - 1)
except:
pass
if chunk > CHUNK_MIN:
chunk = max(CHUNK_MIN, chunk // 2)
want = min(chunk, want)
buf = bytearray(want)
self._log(f"[RETRY] off={offset} want={want} try={k} err={msg}")
self._pwr_log(prefix=f" retry{k} off={offset}")
time.sleep_ms(120)
if not success:
return False, f"chunk_failed off={offset} want={want} err={last_err} total={total_len}"
# 写入文件(二进制模式)
try:
with open(filename, "r+b") as f:
f.seek(offset)
f.write(bytes(buf))
except Exception as e:
return False, f"write_failed off={offset}: {e}"
offset += len(buf)
last_progress_ms = time.ticks_ms()
chunk = CHUNK_MAX
if debug:
self._log(f"[OK] offset={offset}/{total_len or '?'}")
# MD5 校验
if expect_md5_b64 and hashlib is not None:
try:
with open(filename, "rb") as f:
data = f.read()
digest = hashlib.md5(data).digest()
got_b64 = binascii.b2a_base64(digest).decode().strip()
if got_b64 != expect_md5_b64:
return False, f"md5_mismatch got={got_b64} expected={expect_md5_b64}"
self.logger.debug(f"[4G-DL] MD5 verified: {got_b64}")
except Exception as e:
return False, f"md5_check_failed: {e}"
t_cost = time.ticks_diff(time.ticks_ms(), t_func0)
self.logger.info(f"[4G-DL] download complete: size={offset} ip={ip} cost_ms={t_cost}")
return True, f"OK size={offset} ip={ip} cost_ms={t_cost}"
finally:
self._end_ota()

View File

@@ -1,6 +1,6 @@
id: t11
name: t11
version: 1.2.1
version: 1.2.7
author: t11
icon: ''
desc: t11

View File

@@ -58,9 +58,12 @@ REG_CURRENT = 0x04 # 电流寄存器
REG_CALIBRATION = 0x05
CALIBRATION_VALUE = 0x1400
# ==================== 空气传感器配置 ====================
ADC_TRIGGER_THRESHOLD = 2500 # TODO:只是用于测试,最终需要改为正常值
AIR_PRESSURE_lOG = False # TODO: 在正式环境中关闭
# ADC配置
ADC_CHANNEL = 0
ADC_TRIGGER_THRESHOLD = 3000
ADC_LASER_THRESHOLD = 3000
# ==================== 激光配置 ====================
@@ -100,6 +103,9 @@ LASER_CAMERA_OFFSET_CM = 1.4 # 激光在摄像头下方的物理距离(厘米
IMAGE_CENTER_X = 320 # 图像中心 X 坐标
IMAGE_CENTER_Y = 240 # 图像中心 Y 坐标
FLASH_LASER_WHILE_SHOOTING = True # 是否在拍摄时闪一下激光True=闪False=不闪)
FLASH_LASER_DURATION_MS = 1000 # 闪一下激光的持续时间(毫秒)
# ==================== 显示配置 ====================
LASER_COLOR = (0, 255, 0) # RGB颜色
LASER_THICKNESS = 1
@@ -110,6 +116,8 @@ SAVE_IMAGE_ENABLED = True # 是否保存图像True=保存False=不保存
PHOTO_DIR = "/root/phot" # 照片存储目录
MAX_IMAGES = 1000
SHOW_CAMERA_PHOTO_WHILE_SHOOTING = False # 是否在拍摄时显示摄像头图像True=显示False=不显示建议在连着USB测试过程中打开
# ==================== OTA配置 ====================
MAX_BACKUPS = 5
LOG_MAX_BYTES = 10 * 1024 * 1024 # 10MB

View File

@@ -22,6 +22,9 @@ endif()
add_library(archery_netcore MODULE
archery_netcore.cpp
native_logger.cpp
utils.cpp
decrypt_ota_file.cpp
msg_handler.cpp
)
target_include_directories(archery_netcore PRIVATE

View File

@@ -7,9 +7,11 @@
#include <string>
#include <fstream>
#include <array>
#include <algorithm>
#include <openssl/evp.h>
#include "msg_handler.hpp"
#include "native_logger.hpp"
#include "decrypt_ota_file.hpp"
#include "utils.hpp"
namespace py = pybind11;
using json = nlohmann::json;
@@ -18,29 +20,9 @@ namespace {
// 配置项
const std::string _cfg_server_ip = "www.shelingxingqiu.com";
const int _cfg_server_port = 50005;
const std::string _cfg_device_id_file = "/device_key";
// OTA AEAD format: MAGIC(7) | nonce(12) | ciphertext(N) | tag(16)
constexpr const char* kOtaMagic = "AROTAE1";
constexpr size_t kOtaMagicLen = 7;
constexpr size_t kGcmNonceLen = 12;
constexpr size_t kGcmTagLen = 16;
// 固定 32-byte AES-256-GCM key提高被直接查看的成本不是绝对安全
// 注意:需要与打包端传入的 --aead-key-hex 保持一致。
static std::array<uint8_t, 32> ota_key_bytes() {
// 简单拆分混淆key = a XOR b
static const std::array<uint8_t, 32> a = {
0x92,0x99,0x4d,0x06,0x6f,0xb6,0xa6,0x3d,0x85,0x08,0xbe,0x73,0x5e,0x73,0x4d,0x8a,
0x53,0x88,0xe6,0x99,0xfc,0x10,0x29,0xb9,0x16,0x9b,0xe7,0x0c,0x65,0x21,0x1c,0xce
};
static const std::array<uint8_t, 32> b = {
0xcf,0x60,0xa2,0xc2,0x32,0x7a,0x61,0xb0,0x4c,0x8e,0x8a,0x62,0x31,0xc7,0x82,0xff,
0xec,0xac,0xa1,0x04,0x2a,0x4d,0xaa,0xf2,0xb0,0x5b,0x39,0x2b,0xf4,0xb3,0xad,0xad
};
std::array<uint8_t, 32> k{};
for (size_t i = 0; i < k.size(); i++) k[i] = static_cast<uint8_t>(a[i] ^ b[i]);
return k;
}
}
// 定义获取配置的函数
@@ -52,286 +34,8 @@ py::dict get_config() {
}
// 辅助函数:将 py::dict 转为 nlohmann::json
json py_dict_to_json(py::dict d) {
json j;
for (auto item : d) {
std::string key = py::str(item.first);
py::object val = py::reinterpret_borrow<py::object>(item.second);
if (py::isinstance<py::dict>(val)) {
j[key] = py_dict_to_json(py::cast<py::dict>(val));
} else if (py::isinstance<py::list>(val)) {
py::list py_list = py::cast<py::list>(val);
json arr = json::array();
for (auto elem : py_list) {
py::object elem_obj = py::reinterpret_borrow<py::object>(elem);
if (py::isinstance<py::dict>(elem_obj)) {
arr.push_back(py_dict_to_json(py::cast<py::dict>(elem_obj)));
} else if (py::isinstance<py::int_>(elem_obj)) {
arr.push_back(py::cast<int64_t>(elem_obj));
} else if (py::isinstance<py::float_>(elem_obj)) {
arr.push_back(py::cast<double>(elem_obj));
} else {
arr.push_back(py::str(elem_obj));
}
}
j[key] = arr;
} else if (py::isinstance<py::int_>(val)) {
j[key] = py::cast<int64_t>(val);
} else if (py::isinstance<py::float_>(val)) {
j[key] = py::cast<double>(val);
} else if (py::isinstance<py::bool_>(val)) {
j[key] = py::cast<bool>(val);
} else if (val.is_none()) {
j[key] = nullptr;
} else {
j[key] = py::str(val);
}
}
return j;
}
// 辅助函数:将 nlohmann::json 转为 py::dict
py::dict json_to_py_dict(const json& j) {
py::dict d;
if (j.is_object()) {
for (auto& item : j.items()) {
std::string key = item.key();
json val = item.value();
if (val.is_object()) {
d[py::str(key)] = json_to_py_dict(val);
} else if (val.is_array()) {
py::list py_list;
for (auto& elem : val) {
if (elem.is_object()) {
py_list.append(json_to_py_dict(elem));
} else if (elem.is_number_integer()) {
py_list.append(py::int_(elem.get<int64_t>()));
} else if (elem.is_number_float()) {
py_list.append(py::float_(elem.get<double>()));
} else if (elem.is_boolean()) {
py_list.append(py::bool_(elem.get<bool>()));
} else if (elem.is_null()) {
py_list.append(py::none());
} else {
py_list.append(py::str(elem.get<std::string>()));
}
}
d[py::str(key)] = py_list;
} else if (val.is_number_integer()) {
d[py::str(key)] = py::int_(val.get<int64_t>());
} else if (val.is_number_float()) {
d[py::str(key)] = py::float_(val.get<double>());
} else if (val.is_boolean()) {
d[py::str(key)] = py::bool_(val.get<bool>());
} else if (val.is_null()) {
d[py::str(key)] = py::none();
} else {
d[py::str(key)] = py::str(val.get<std::string>());
}
}
}
return d;
}
static bool read_file_all(const std::string& path, std::vector<uint8_t>& out) {
std::ifstream ifs(path, std::ios::binary);
if (!ifs) return false;
ifs.seekg(0, std::ios::end);
std::streampos size = ifs.tellg();
if (size <= 0) return false;
ifs.seekg(0, std::ios::beg);
out.resize(static_cast<size_t>(size));
if (!ifs.read(reinterpret_cast<char*>(out.data()), size)) return false;
return true;
}
static bool write_file_all(const std::string& path, const uint8_t* data, size_t len) {
std::ofstream ofs(path, std::ios::binary | std::ios::trunc);
if (!ofs) return false;
ofs.write(reinterpret_cast<const char*>(data), static_cast<std::streamsize>(len));
return static_cast<bool>(ofs);
}
static bool decrypt_ota_file_impl(const std::string& input_path, const std::string& output_zip_path) {
std::vector<uint8_t> in;
if (!read_file_all(input_path, in)) {
netcore::log_error(std::string("decrypt_ota_file: read failed: ") + input_path);
return false;
}
const size_t min_len = kOtaMagicLen + kGcmNonceLen + kGcmTagLen + 1;
if (in.size() < min_len) {
netcore::log_error("decrypt_ota_file: too short");
return false;
}
if (!std::equal(in.begin(), in.begin() + kOtaMagicLen, reinterpret_cast<const uint8_t*>(kOtaMagic))) {
netcore::log_error("decrypt_ota_file: bad magic");
return false;
}
const uint8_t* nonce = in.data() + kOtaMagicLen;
const uint8_t* ct_and_tag = in.data() + kOtaMagicLen + kGcmNonceLen;
const size_t ct_and_tag_len = in.size() - (kOtaMagicLen + kGcmNonceLen);
if (ct_and_tag_len <= kGcmTagLen) {
netcore::log_error("decrypt_ota_file: no ciphertext");
return false;
}
const size_t ciphertext_len = ct_and_tag_len - kGcmTagLen;
const uint8_t* ciphertext = ct_and_tag;
const uint8_t* tag = ct_and_tag + ciphertext_len;
std::vector<uint8_t> plain(ciphertext_len);
int out_len1 = 0;
int out_len2 = 0;
EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new();
if (!ctx) {
netcore::log_error("decrypt_ota_file: EVP_CIPHER_CTX_new failed");
return false;
}
bool ok = false;
auto key = ota_key_bytes();
do {
if (1 != EVP_DecryptInit_ex(ctx, EVP_aes_256_gcm(), nullptr, nullptr, nullptr)) {
netcore::log_error("decrypt_ota_file: DecryptInit failed");
break;
}
if (1 != EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_SET_IVLEN, static_cast<int>(kGcmNonceLen), nullptr)) {
netcore::log_error("decrypt_ota_file: set ivlen failed");
break;
}
if (1 != EVP_DecryptInit_ex(ctx, nullptr, nullptr, key.data(), nonce)) {
netcore::log_error("decrypt_ota_file: set key/iv failed");
break;
}
if (1 != EVP_DecryptUpdate(ctx, plain.data(), &out_len1, ciphertext, static_cast<int>(ciphertext_len))) {
netcore::log_error("decrypt_ota_file: update failed");
break;
}
if (1 != EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_SET_TAG, static_cast<int>(kGcmTagLen), const_cast<uint8_t*>(tag))) {
netcore::log_error("decrypt_ota_file: set tag failed");
break;
}
if (1 != EVP_DecryptFinal_ex(ctx, plain.data() + out_len1, &out_len2)) {
netcore::log_error("decrypt_ota_file: final failed (auth tag mismatch?)");
break;
}
const size_t plain_len = static_cast<size_t>(out_len1 + out_len2);
if (!write_file_all(output_zip_path, plain.data(), plain_len)) {
netcore::log_error(std::string("decrypt_ota_file: write failed: ") + output_zip_path);
break;
}
ok = true;
} while (false);
EVP_CIPHER_CTX_free(ctx);
return ok;
}
// 打包 TCP 数据包
py::bytes make_packet(int msg_type, py::dict body_dict) {
netcore::log_debug(std::string("make_packet msg_type=") + std::to_string(msg_type));
// 1) 将 py::dict 转为 JSON 字符串
json j = py_dict_to_json(body_dict);
std::string body_str = j.dump();
// 2) 计算 body_len 和 checksum
uint32_t body_len = body_str.size();
uint32_t checksum = body_len + msg_type;
// 3) 打包头部(大端序)
std::vector<uint8_t> packet;
packet.reserve(12 + body_len);
// body_len (big-endian, 4 bytes)
packet.push_back((body_len >> 24) & 0xFF);
packet.push_back((body_len >> 16) & 0xFF);
packet.push_back((body_len >> 8) & 0xFF);
packet.push_back(body_len & 0xFF);
// msg_type (big-endian, 4 bytes)
packet.push_back((msg_type >> 24) & 0xFF);
packet.push_back((msg_type >> 16) & 0xFF);
packet.push_back((msg_type >> 8) & 0xFF);
packet.push_back(msg_type & 0xFF);
// checksum (big-endian, 4 bytes)
packet.push_back((checksum >> 24) & 0xFF);
packet.push_back((checksum >> 16) & 0xFF);
packet.push_back((checksum >> 8) & 0xFF);
packet.push_back(checksum & 0xFF);
// 4) 追加 body
packet.insert(packet.end(), body_str.begin(), body_str.end());
netcore::log_debug(std::string("make_packet done bytes=") + std::to_string(packet.size()));
return py::bytes(reinterpret_cast<const char*>(packet.data()), packet.size());
}
// 解析 TCP 数据包
py::tuple parse_packet(py::bytes data) {
// 1) 转换为 bytes view
py::buffer_info buf = py::buffer(data).request();
if (buf.size < 12) {
netcore::log_error(std::string("parse_packet too_short len=") + std::to_string(buf.size));
return py::make_tuple(py::none(), py::none());
}
const uint8_t* ptr = static_cast<const uint8_t*>(buf.ptr);
// 2) 解析头部(大端序)
uint32_t body_len = (ptr[0] << 24) | (ptr[1] << 16) | (ptr[2] << 8) | ptr[3];
uint32_t msg_type = (ptr[4] << 24) | (ptr[5] << 16) | (ptr[6] << 8) | ptr[7];
uint32_t checksum = (ptr[8] << 24) | (ptr[9] << 16) | (ptr[10] << 8) | ptr[11];
// 3) 校验 checksum可选你现有代码不强制校验
// if (checksum != (body_len + msg_type)) {
// return py::make_tuple(py::none(), py::none());
// }
// 4) 检查长度
uint32_t expected_len = 12 + body_len;
if (buf.size < expected_len) {
// 半包
netcore::log_warn(std::string("parse_packet incomplete got=") + std::to_string(buf.size) +
" expected=" + std::to_string(expected_len));
return py::make_tuple(py::none(), py::none());
}
// 5) 防御性检查:如果 data 比预期长,说明可能有粘包
// (只解析第一个包,忽略多余数据)
if (buf.size > expected_len) {
netcore::log_warn(std::string("parse_packet concat got=") + std::to_string(buf.size) +
" expected=" + std::to_string(expected_len) +
" body_len=" + std::to_string(body_len) +
" msg_type=" + std::to_string(msg_type));
}
// 6) 提取 body 并解析 JSON
std::string body_str(reinterpret_cast<const char*>(ptr + 12), body_len);
try {
json j = json::parse(body_str);
py::dict body_dict = json_to_py_dict(j);
return py::make_tuple(py::int_(msg_type), body_dict);
} catch (const json::parse_error& e) {
// JSON 解析失败,返回 raw兼容你现有的逻辑
netcore::log_error(std::string("parse_packet json_parse_error: ") + e.what());
py::dict raw_dict;
raw_dict["raw"] = body_str;
return py::make_tuple(py::int_(msg_type), raw_dict);
} catch (const std::exception& e) {
netcore::log_error(std::string("parse_packet json_parse_error: ") + e.what());
py::dict raw_dict;
raw_dict["raw"] = body_str;
return py::make_tuple(py::int_(msg_type), raw_dict);
}
}
PYBIND11_MODULE(archery_netcore, m) {
m.doc() = "Archery net core (native, pybind11).";
@@ -348,11 +52,11 @@ PYBIND11_MODULE(archery_netcore, m) {
netcore::log_info(std::string("log_test: ") + msg);
}, py::arg("msg"));
m.def("make_packet", &make_packet,
m.def("make_packet", &netcore::make_packet,
"Pack TCP packet: header (len+type+checksum) + JSON body",
py::arg("msg_type"), py::arg("body_dict"));
m.def("parse_packet", &parse_packet,
m.def("parse_packet", &netcore::parse_packet,
"Parse TCP packet, return (msg_type, body_dict)");
m.def("get_config", &get_config, "Get system configuration");
@@ -361,7 +65,7 @@ PYBIND11_MODULE(archery_netcore, m) {
"decrypt_ota_file",
[](const std::string& input_path, const std::string& output_zip_path) {
netcore::log_info(std::string("decrypt_ota_file in=") + input_path + " out=" + output_zip_path);
return decrypt_ota_file_impl(input_path, output_zip_path);
return netcore::decrypt_ota_file_impl(input_path, output_zip_path);
},
py::arg("input_path"),
py::arg("output_zip_path"),

View File

@@ -0,0 +1,135 @@
#include <pybind11/pybind11.h>
#include <pybind11/stl.h> // 支持 std::vector, std::map 等
#include <nlohmann/json.hpp>
#include <cstring>
#include <cstdint>
#include <vector>
#include <string>
#include <fstream>
#include <array>
#include <openssl/evp.h>
#include <algorithm>
#include "native_logger.hpp"
namespace netcore{
// OTA AEAD format: MAGIC(7) | nonce(12) | ciphertext(N) | tag(16)
constexpr const char* kOtaMagic = "AROTAE1";
constexpr size_t kOtaMagicLen = 7;
constexpr size_t kGcmNonceLen = 12;
constexpr size_t kGcmTagLen = 16;
// 固定 32-byte AES-256-GCM key提高被直接查看的成本不是绝对安全
// 注意:需要与打包端传入的 --aead-key-hex 保持一致。
static std::array<uint8_t, 32> ota_key_bytes() {
// 简单拆分混淆key = a XOR b
static const std::array<uint8_t, 32> a = {
0x92,0x99,0x4d,0x06,0x6f,0xb6,0xa6,0x3d,0x85,0x08,0xbe,0x73,0x5e,0x73,0x4d,0x8a,
0x53,0x88,0xe6,0x99,0xfc,0x10,0x29,0xb9,0x16,0x9b,0xe7,0x0c,0x65,0x21,0x1c,0xce
};
static const std::array<uint8_t, 32> b = {
0xcf,0x60,0xa2,0xc2,0x32,0x7a,0x61,0xb0,0x4c,0x8e,0x8a,0x62,0x31,0xc7,0x82,0xff,
0xec,0xac,0xa1,0x04,0x2a,0x4d,0xaa,0xf2,0xb0,0x5b,0x39,0x2b,0xf4,0xb3,0xad,0xad
};
std::array<uint8_t, 32> k{};
for (size_t i = 0; i < k.size(); i++) k[i] = static_cast<uint8_t>(a[i] ^ b[i]);
return k;
}
static bool read_file_all(const std::string& path, std::vector<uint8_t>& out) {
std::ifstream ifs(path, std::ios::binary);
if (!ifs) return false;
ifs.seekg(0, std::ios::end);
std::streampos size = ifs.tellg();
if (size <= 0) return false;
ifs.seekg(0, std::ios::beg);
out.resize(static_cast<size_t>(size));
if (!ifs.read(reinterpret_cast<char*>(out.data()), size)) return false;
return true;
}
static bool write_file_all(const std::string& path, const uint8_t* data, size_t len) {
std::ofstream ofs(path, std::ios::binary | std::ios::trunc);
if (!ofs) return false;
ofs.write(reinterpret_cast<const char*>(data), static_cast<std::streamsize>(len));
return static_cast<bool>(ofs);
}
bool decrypt_ota_file_impl(const std::string& input_path, const std::string& output_zip_path) {
std::vector<uint8_t> in;
if (!netcore::read_file_all(input_path, in)) {
netcore::log_error(std::string("decrypt_ota_file: read failed: ") + input_path);
return false;
}
const size_t min_len = kOtaMagicLen + kGcmNonceLen + kGcmTagLen + 1;
if (in.size() < min_len) {
netcore::log_error("decrypt_ota_file: too short");
return false;
}
if (!std::equal(in.begin(), in.begin() + kOtaMagicLen, reinterpret_cast<const uint8_t*>(kOtaMagic))) {
netcore::log_error("decrypt_ota_file: bad magic");
return false;
}
const uint8_t* nonce = in.data() + kOtaMagicLen;
const uint8_t* ct_and_tag = in.data() + kOtaMagicLen + kGcmNonceLen;
const size_t ct_and_tag_len = in.size() - (kOtaMagicLen + kGcmNonceLen);
if (ct_and_tag_len <= kGcmTagLen) {
netcore::log_error("decrypt_ota_file: no ciphertext");
return false;
}
const size_t ciphertext_len = ct_and_tag_len - kGcmTagLen;
const uint8_t* ciphertext = ct_and_tag;
const uint8_t* tag = ct_and_tag + ciphertext_len;
std::vector<uint8_t> plain(ciphertext_len);
int out_len1 = 0;
int out_len2 = 0;
EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new();
if (!ctx) {
netcore::log_error("decrypt_ota_file: EVP_CIPHER_CTX_new failed");
return false;
}
bool ok = false;
auto key = ota_key_bytes();
do {
if (1 != EVP_DecryptInit_ex(ctx, EVP_aes_256_gcm(), nullptr, nullptr, nullptr)) {
netcore::log_error("decrypt_ota_file: DecryptInit failed");
break;
}
if (1 != EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_SET_IVLEN, static_cast<int>(kGcmNonceLen), nullptr)) {
netcore::log_error("decrypt_ota_file: set ivlen failed");
break;
}
if (1 != EVP_DecryptInit_ex(ctx, nullptr, nullptr, key.data(), nonce)) {
netcore::log_error("decrypt_ota_file: set key/iv failed");
break;
}
if (1 != EVP_DecryptUpdate(ctx, plain.data(), &out_len1, ciphertext, static_cast<int>(ciphertext_len))) {
netcore::log_error("decrypt_ota_file: update failed");
break;
}
if (1 != EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_SET_TAG, static_cast<int>(kGcmTagLen), const_cast<uint8_t*>(tag))) {
netcore::log_error("decrypt_ota_file: set tag failed");
break;
}
if (1 != EVP_DecryptFinal_ex(ctx, plain.data() + out_len1, &out_len2)) {
netcore::log_error("decrypt_ota_file: final failed (auth tag mismatch?)");
break;
}
const size_t plain_len = static_cast<size_t>(out_len1 + out_len2);
if (!netcore::write_file_all(output_zip_path, plain.data(), plain_len)) {
netcore::log_error(std::string("decrypt_ota_file: write failed: ") + output_zip_path);
break;
}
ok = true;
} while (false);
EVP_CIPHER_CTX_free(ctx);
return ok;
}
}

View File

@@ -0,0 +1,7 @@
#pragma once
#include <string>
namespace netcore{
bool decrypt_ota_file_impl(const std::string& input_path, const std::string& output_zip_path);
}

113
cpp_ext/msg_handler.cpp Normal file
View File

@@ -0,0 +1,113 @@
#include <nlohmann/json.hpp>
#include <string>
#include <cstring>
#include <cstdint>
#include <vector>
#include "native_logger.hpp"
#include "msg_handler.hpp"
#include "utils.hpp"
namespace py = pybind11;
using json = nlohmann::json;
namespace netcore {
// 打包 TCP 数据包
py::bytes make_packet(int msg_type, py::dict body_dict) {
netcore::log_debug(std::string("make_packet msg_type=") + std::to_string(msg_type));
// 1) 将 py::dict 转为 JSON 字符串
json j = netcore::py_dict_to_json(body_dict);
std::string body_str = j.dump();
// 2) 计算 body_len 和 checksum
uint32_t body_len = body_str.size();
uint32_t checksum = body_len + msg_type;
// 3) 打包头部(大端序)
std::vector<uint8_t> packet;
packet.reserve(12 + body_len);
// body_len (big-endian, 4 bytes)
packet.push_back((body_len >> 24) & 0xFF);
packet.push_back((body_len >> 16) & 0xFF);
packet.push_back((body_len >> 8) & 0xFF);
packet.push_back(body_len & 0xFF);
// msg_type (big-endian, 4 bytes)
packet.push_back((msg_type >> 24) & 0xFF);
packet.push_back((msg_type >> 16) & 0xFF);
packet.push_back((msg_type >> 8) & 0xFF);
packet.push_back(msg_type & 0xFF);
// checksum (big-endian, 4 bytes)
packet.push_back((checksum >> 24) & 0xFF);
packet.push_back((checksum >> 16) & 0xFF);
packet.push_back((checksum >> 8) & 0xFF);
packet.push_back(checksum & 0xFF);
// 4) 追加 body
packet.insert(packet.end(), body_str.begin(), body_str.end());
netcore::log_debug(std::string("make_packet done bytes=") + std::to_string(packet.size()));
return py::bytes(reinterpret_cast<const char*>(packet.data()), packet.size());
}
// 解析 TCP 数据包
py::tuple parse_packet(py::bytes data) {
// 1) 转换为 bytes view
py::buffer_info buf = py::buffer(data).request();
if (buf.size < 12) {
netcore::log_error(std::string("parse_packet too_short len=") + std::to_string(buf.size));
return py::make_tuple(py::none(), py::none());
}
const uint8_t* ptr = static_cast<const uint8_t*>(buf.ptr);
// 2) 解析头部(大端序)
uint32_t body_len = (ptr[0] << 24) | (ptr[1] << 16) | (ptr[2] << 8) | ptr[3];
uint32_t msg_type = (ptr[4] << 24) | (ptr[5] << 16) | (ptr[6] << 8) | ptr[7];
uint32_t checksum = (ptr[8] << 24) | (ptr[9] << 16) | (ptr[10] << 8) | ptr[11];
// 3) 校验 checksum可选你现有代码不强制校验
// if (checksum != (body_len + msg_type)) {
// return py::make_tuple(py::none(), py::none());
// }
// 4) 检查长度
uint32_t expected_len = 12 + body_len;
if (buf.size < expected_len) {
// 半包
netcore::log_warn(std::string("parse_packet incomplete got=") + std::to_string(buf.size) +
" expected=" + std::to_string(expected_len));
return py::make_tuple(py::none(), py::none());
}
// 5) 防御性检查:如果 data 比预期长,说明可能有粘包
// (只解析第一个包,忽略多余数据)
if (buf.size > expected_len) {
netcore::log_warn(std::string("parse_packet concat got=") + std::to_string(buf.size) +
" expected=" + std::to_string(expected_len) +
" body_len=" + std::to_string(body_len) +
" msg_type=" + std::to_string(msg_type));
}
// 6) 提取 body 并解析 JSON
std::string body_str(reinterpret_cast<const char*>(ptr + 12), body_len);
try {
json j = json::parse(body_str);
py::dict body_dict = netcore::json_to_py_dict(j);
return py::make_tuple(py::int_(msg_type), body_dict);
} catch (const json::parse_error& e) {
// JSON 解析失败,返回 raw兼容你现有的逻辑
netcore::log_error(std::string("parse_packet json_parse_error: ") + e.what());
py::dict raw_dict;
raw_dict["raw"] = body_str;
return py::make_tuple(py::int_(msg_type), raw_dict);
} catch (const std::exception& e) {
netcore::log_error(std::string("parse_packet json_parse_error: ") + e.what());
py::dict raw_dict;
raw_dict["raw"] = body_str;
return py::make_tuple(py::int_(msg_type), raw_dict);
}
}
}

14
cpp_ext/msg_handler.hpp Normal file
View File

@@ -0,0 +1,14 @@
#pragma once
#include <pybind11/pybind11.h>
#include <pybind11/stl.h> // 支持 std::vector, std::map 等
namespace py = pybind11;
namespace netcore {
// 打包 TCP 数据包
py::bytes make_packet(int msg_type, py::dict body_dict);
// 解包 TCP 数据包
py::tuple parse_packet(py::bytes data);
}

95
cpp_ext/utils.cpp Normal file
View File

@@ -0,0 +1,95 @@
#include <fstream>
#include <cstring>
#include <cstdint>
#include <string>
#include <fstream>
#include "utils.hpp"
namespace netcore {
// 辅助函数:将 py::dict 转为 nlohmann::json
json py_dict_to_json(py::dict d) {
json j;
for (auto item : d) {
std::string key = py::str(item.first);
py::object val = py::reinterpret_borrow<py::object>(item.second);
if (py::isinstance<py::dict>(val)) {
j[key] = py_dict_to_json(py::cast<py::dict>(val));
} else if (py::isinstance<py::list>(val)) {
py::list py_list = py::cast<py::list>(val);
json arr = json::array();
for (auto elem : py_list) {
py::object elem_obj = py::reinterpret_borrow<py::object>(elem);
if (py::isinstance<py::dict>(elem_obj)) {
arr.push_back(py_dict_to_json(py::cast<py::dict>(elem_obj)));
} else if (py::isinstance<py::int_>(elem_obj)) {
arr.push_back(py::cast<int64_t>(elem_obj));
} else if (py::isinstance<py::float_>(elem_obj)) {
arr.push_back(py::cast<double>(elem_obj));
} else {
arr.push_back(py::str(elem_obj));
}
}
j[key] = arr;
} else if (py::isinstance<py::int_>(val)) {
j[key] = py::cast<int64_t>(val);
} else if (py::isinstance<py::float_>(val)) {
j[key] = py::cast<double>(val);
} else if (py::isinstance<py::bool_>(val)) {
j[key] = py::cast<bool>(val);
} else if (val.is_none()) {
j[key] = nullptr;
} else {
j[key] = py::str(val);
}
}
return j;
}
// 辅助函数:将 nlohmann::json 转为 py::dict
py::dict json_to_py_dict(const json& j) {
py::dict d;
if (j.is_object()) {
for (auto& item : j.items()) {
std::string key = item.key();
json val = item.value();
if (val.is_object()) {
d[py::str(key)] = json_to_py_dict(val);
} else if (val.is_array()) {
py::list py_list;
for (auto& elem : val) {
if (elem.is_object()) {
py_list.append(json_to_py_dict(elem));
} else if (elem.is_number_integer()) {
py_list.append(py::int_(elem.get<int64_t>()));
} else if (elem.is_number_float()) {
py_list.append(py::float_(elem.get<double>()));
} else if (elem.is_boolean()) {
py_list.append(py::bool_(elem.get<bool>()));
} else if (elem.is_null()) {
py_list.append(py::none());
} else {
py_list.append(py::str(elem.get<std::string>()));
}
}
d[py::str(key)] = py_list;
} else if (val.is_number_integer()) {
d[py::str(key)] = py::int_(val.get<int64_t>());
} else if (val.is_number_float()) {
d[py::str(key)] = py::float_(val.get<double>());
} else if (val.is_boolean()) {
d[py::str(key)] = py::bool_(val.get<bool>());
} else if (val.is_null()) {
d[py::str(key)] = py::none();
} else {
d[py::str(key)] = py::str(val.get<std::string>());
}
}
}
return d;
}
}

15
cpp_ext/utils.hpp Normal file
View File

@@ -0,0 +1,15 @@
#pragma once
#include <pybind11/pybind11.h>
#include <pybind11/stl.h> // 支持 std::vector, std::map 等
#include <nlohmann/json.hpp>
#include <string>
namespace py = pybind11;
using json = nlohmann::json;
namespace netcore {
json py_dict_to_json(py::dict d);
py::dict json_to_py_dict(const json& j);
}

View File

@@ -1,13 +1,76 @@
1. OTA 下载的时候,为什么使用十六进制下载,读取 URC 事件?
1. 4G OTA 下载的时候,为什么使用十六进制下载,读取 URC 事件?
因为使用二进制下载的时候,经常会出现错误,并且会失败?然后最稳定传输的办法,是每次传输的时候,是分块,而且每次分块都要“删/建”http实例。推测原因是因为我们现在是直接传输文件的源代码代码中含有了一些字符串可能和 AT指令重复导致了 AT 模块在解释的时候出错。而使用 16 进制的方式,可以避免这个问题。因为十六进制直接把数据先转成了字符串,然后在设备端再把字符串转成数据,这样就不可能出现 AT的指令从而减少了麻烦。
2. OTA 下载的时候,为什么不用 AT 模块里 HTTPDLFILE 的指令?
2. 4G OTA 下载的时候,为什么不用 AT 模块里 HTTPDLFILE 的指令?
因为在测试中发现,使用 HTTPDLFILE其实是下载到了 4G 模块内部,需要重新从模块内部转到存储卡,而且 4G 模块的存储较小,大概只有 40k所以还需要分块来下载和转存比较麻烦于是最终使用了使用读取串口事件的模式。
3. OTA 下载的时候,为什么不用 AT 模块里 HTTPREAD 的指令?
3. 4G OTA 下载的时候,为什么不用 AT 模块里 HTTPREAD 的指令?
因为之前测试发现READ模式其实是需要多步
3.1. AT+MHTTPCREATE
3.2. AT+MHTTPCFG
3.3. AT+MHTTPREQUEST
3.4. AT+MHTTPREAD
它其实也是把数据下载到 4g 模块的缓存里,然后再从缓存里读取出来。所以也是比较繁琐的,还不如 HTTPDLFILE 简单。
4.
4.
4. WiFi OTA 流程ota_manager.handle_wifi_and_update()
* 解析 ota_url 得到 host:port
* 调用 network_manager.connect_wifi(ssid, password, verify_host=host, verify_port=port, persist=True)
* 只有“能连上 WiFi 且能访问 OTA host:port”才会把新凭证保留在 /boot
* 连接成功后开始下载 OTA 文件download_file()
* 下载成功则 apply_ota_and_reboot()
5. TCP 通信
1) 平时 TCP 通信主流程network_manager.tcp_main()
外层无限循环:一直尝试保持与服务器的 TCP 会话。
每轮开始:
如果 OTA 正在进行:暂停(避免抢占资源/串口)。
connect_server():建立 TCP 连接(自动选 WiFi 或 4G
发送“登录包”msg_type=1等待服务器返回“登录成功”。
登录成功后进入内层循环:
接收数据:
WiFi非阻塞 recv();没数据返回 b"";有数据进入缓冲区拼包解析。
4G从 ATClient 的队列 pop_tcp_payload() 取数据。
处理命令/ACK
登录响应、心跳 ACK、OTA 命令、关机命令、日志上传命令等。
发送业务队列:
从高优/普通队列取 1 条,发送失败会放回队首,并断线重连(不再丢消息)。
发送心跳:
按 HEARTBEAT_INTERVAL 发心跳包。
心跳失败会计数(当前为连续失败到阈值才重连)。
任何发送/接收致命失败:
关闭 socket/断开连接 → 跳出内层循环 → 外层等待一会儿后重新 connect_server() → 重新登录。
6. “WiFi 连接/验证”
TCP 连接建立与网络选择connect_server() / select_network()
* select_network()WiFi 优先,但要求:
is_wifi_connected() 为 True系统层面有 WiFi IP 或 Maix WLAN connected
且能连到 TCP 服务器 SERVER_IP:SERVER_PORT
否则回退到 4G
* connect_server()
若已有连接WiFi 会做 _check_wifi_connection() 轻量检查4G 直接认为 OK由 AT 层维护)。
否则按网络类型走:
WiFi创建 socket → connect → setblocking(False)(接收用非阻塞)
4GAT+MIPOPEN 建链
WiFi 链接connect_wifi()
当前 connect_wifi() 的关键特点是:必须让 /etc/init.d/S30wifi restart 真正用新 SSID 去连,所以会临时写 /boot/wifi.ssid 和 /boot/wifi.pass失败自动回滚。
流程是:
(1) 备份旧配置
* /boot/wifi.ssid、/boot/wifi.pass
* /etc/wpa_supplicant.conf尽量备份
(2) 写入新凭证
* 把新 ssid/pass 写到 /boot/*
-(同时尽量写 /etc/wpa_supplicant.conf但不强依赖
(3) 重启 WiFi 服务:/etc/init.d/S30wifi restart
(4) 等待获取 IP默认 20 秒,可调)
(5) 验证可用性,连到 verify_host:verify_port
(6) 成功
* persist=True保留 /boot/*(持久化)
* persist=False回滚 /boot/* 到旧值(不重启,当前连接仍可继续)
(7) 失败
* 回滚 /boot/* + 回滚 /etc/wpa_supplicant.conf如果有备份
* 再 S30wifi restart 恢复旧网络
* 返回错误
7. 日志上传inner_cmd == 43当前只支持 wifi 上传日志
命令带 ssid/password/url 时:
* 若 WiFi 未连接:先 connect_wifi(..., verify_host=upload_host, verify_port=upload_port, persist=True)
上传内容:
* sync # 把日志从内存同步到文件
* 快照 app.log* 到 /tmp staging
* 打包成 tar.gz默认或 zip
* 以 multipart/form-data 的 file 字段 POST 到 url

View File

@@ -25,7 +25,6 @@ class HardwareManager:
# 私有硬件对象
self._uart4g = None # 4G模块UART
self._distance_serial = None # 激光测距串口
self._bus = None # I2C总线
self._adc_obj = None # ADC对象
self._at_client = None # AT客户端
@@ -39,11 +38,6 @@ class HardwareManager:
"""4G模块UART只读"""
return self._uart4g
@property
def distance_serial(self):
"""激光测距串口(只读)"""
return self._distance_serial
@property
def bus(self):
"""I2C总线只读"""
@@ -71,19 +65,6 @@ class HardwareManager:
self._uart4g = uart.UART(device, baudrate)
return self._uart4g
def init_distance_serial(self, device=None, baudrate=None):
"""初始化激光测距串口(激光控制)"""
from maix import uart
if device is None:
device = config.DISTANCE_SERIAL_DEVICE
if baudrate is None:
baudrate = config.DISTANCE_SERIAL_BAUDRATE
print(f"[HW] 初始化激光串口: device={device}, baudrate={baudrate}")
self._distance_serial = uart.UART(device, baudrate)
print(f"[HW] 激光串口初始化完成: {self._distance_serial}")
return self._distance_serial
def init_bus(self, bus_num=None):
"""初始化I2C总线"""
from maix import i2c

52
keygen.py Normal file
View File

@@ -0,0 +1,52 @@
import os
def generate_key_pair():
"""
生成一对新的密钥a和b使得a XOR b等于原始key
:return: (a, b, key) 元组每个元素都是32字节的字节数组
"""
# 原始key值
key = bytes([
0x5d, 0xf9, 0xef, 0xc4, 0x5d, 0xcc, 0xc7, 0x8d, 0xc9, 0x86, 0x34, 0x11, 0x6f, 0xb4, 0xcf, 0x75,
0xbf, 0x24, 0x47, 0x9d, 0xd6, 0x5d, 0x83, 0x4b, 0xa6, 0xc0, 0xde, 0x27, 0x91, 0x92, 0xb1, 0x63
])
# 随机生成a
a = os.urandom(32)
# 计算b = key XOR a
b = bytes([key[i] ^ a[i] for i in range(32)])
return a, b, key
def format_hex_array(data):
"""
将字节数组格式化为C++风格的十六进制数组
:param data: 字节数组
:return: 格式化后的字符串
"""
return "{" + ",".join([f"0x{b:02x}" for b in data]) + "}"
def generate_new_key_pair():
"""
生成新的密钥对并打印出来
"""
a, b, key = generate_key_pair()
print("原始key:")
print(format_hex_array(key))
print("\n新的密钥对:")
print("a =", format_hex_array(a))
print("b =", format_hex_array(b))
# 验证a XOR b是否等于key
verify_key = bytes([a[i] ^ b[i] for i in range(32)])
assert verify_key == key, "验证失败a XOR b 不等于 key"
print("\n验证成功a XOR b 等于 key")
if __name__ == "__main__":
generate_new_key_pair()

View File

@@ -29,6 +29,7 @@ class LaserManager:
return
# 私有状态
self._serial = None # 激光串口,由 laser_manager 自己持有
self._calibration_active = False
self._calibration_result = None
self._calibration_lock = threading.Lock()
@@ -65,6 +66,38 @@ class LaserManager:
"""
return self._last_frame_with_ellipse
# ==================== 初始化方法 ====================
def init(self, serial_device=None, baudrate=None):
"""
初始化激光模块(包括串口)
初始化完成后主动发送关闭命令,防止 UART 初始化噪声误触发激光
Args:
serial_device: 串口设备路径,默认使用 config.DISTANCE_SERIAL_DEVICE
baudrate: 波特率,默认使用 config.DISTANCE_SERIAL_BAUDRATE
"""
from maix import uart
device = serial_device or config.DISTANCE_SERIAL_DEVICE
baud = baudrate or config.DISTANCE_SERIAL_BAUDRATE
self._serial = uart.UART(device, baud)
print(f"[LASER] 激光串口初始化完成: device={device}, baudrate={baud}")
# 等待串口稳定后主动关闭激光,防止初始化噪声误触发
time.sleep_ms(100)
try:
self._serial.read(-1) # 清空接收缓冲区
except Exception:
pass
self._serial.write(config.LASER_OFF_CMD)
time.sleep_ms(60)
try:
self._serial.read(-1) # 清空回包
except Exception:
pass
print("[LASER] 已发送关闭命令(防止开机误触发)")
# ==================== 业务方法 ====================
def load_laser_point(self):
@@ -117,10 +150,8 @@ class LaserManager:
def turn_on_laser(self):
"""发送指令开启激光,并读取回包(部分模块支持)"""
from hardware import hardware_manager
if hardware_manager.distance_serial is None:
self.logger.error("[LASER] distance_serial 未初始化")
if self._serial is None:
self.logger.error("[LASER] 激光串口未初始化,请先调用 init()")
return None
# 打印调试信息
@@ -128,32 +159,31 @@ class LaserManager:
# 清空接收缓冲区
try:
hardware_manager.distance_serial.read(-1) # 清空缓冲区
self._serial.read(-1) # 清空缓冲区
except:
pass
# 发送命令
written = hardware_manager.distance_serial.write(config.LASER_ON_CMD)
written = self._serial.write(config.LASER_ON_CMD)
self.logger.info(f"[LASER] 写入字节数: {written}")
time.sleep_ms(60)
# 读取回包
resp = hardware_manager.distance_serial.read(len=20,timeout=10)
resp = self._serial.read(len=20, timeout=10)
if resp:
self.logger.info(f"[LASER] 收到回包 ({len(resp)}字节): {resp.hex()}")
if resp == config.LASER_ON_CMD:
self.logger.info("✅ 激光开启指令已确认")
else:
self.logger.warning("🔇 无回包(可能正常或模块不支持回包)")
self._laser_turned_on = True
return resp
def turn_off_laser(self):
"""发送指令关闭激光"""
from hardware import hardware_manager
if hardware_manager.distance_serial is None:
self.logger.error("[LASER] distance_serial 未初始化")
if self._serial is None:
self.logger.error("[LASER] 激光串口未初始化,请先调用 init()")
return None
# 打印调试信息
@@ -161,32 +191,35 @@ class LaserManager:
# 清空接收缓冲区
try:
hardware_manager.distance_serial.read(-1)
self._serial.read(-1)
except:
pass
# 发送命令
written = hardware_manager.distance_serial.write(config.LASER_OFF_CMD)
written = self._serial.write(config.LASER_OFF_CMD)
self.logger.info(f"[LASER] 写入字节数: {written}")
time.sleep_ms(60)
# 读取回包
resp = hardware_manager.distance_serial.read(20)
resp = self._serial.read(20)
if resp:
self.logger.info(f"[LASER] 收到回包 ({len(resp)}字节): {resp.hex()}")
else:
self.logger.warning("🔇 无回包")
self._laser_turned_on = False
return resp
# 不用读回包
# return None
def flash_laser(self, duration_ms=1000):
"""闪一下激光(用于射箭反馈)"""
"""闪一下激光(用于射箭反馈),如果射箭前激光已亮则保持亮"""
try:
was_on = self._laser_turned_on # 记住射箭前的激光状态
self.turn_on_laser()
time.sleep_ms(duration_ms)
self.turn_off_laser()
if not was_on:
self.turn_off_laser() # 射箭前是灭的,才关闭
else:
self.logger.info("[LASER] 射箭前激光已亮(调瞄中),保持激光开启")
except Exception as e:
self.logger.error(f"闪激光失败: {e}")
@@ -956,15 +989,14 @@ class LaserManager:
"""发送测距指令并返回距离(米)和信号质量
返回: (distance_m, signal_quality) 元组,失败返回 (0.0, 0)
"""
from hardware import hardware_manager
if hardware_manager.distance_serial is None:
self.logger.error("[LASER] distance_serial 未初始化")
if self._serial is None:
self.logger.error("[LASER] 激光串口未初始化,请先调用 init()")
return (0.0, 0)
try:
# 清空缓冲区
try:
hardware_manager.distance_serial.read(-1)
self._serial.read(-1)
except:
pass
# 打开激光
@@ -973,7 +1005,7 @@ class LaserManager:
self._laser_turned_on = True
# time.sleep_ms(500) # 需要一定时间让激光稳定
# 发送测距查询命令
hardware_manager.distance_serial.write(config.DISTANCE_QUERY_CMD)
self._serial.write(config.DISTANCE_QUERY_CMD)
# time.sleep_ms(500) # 测试结果:这里的等待没有用!
self.turn_off_laser()
self._laser_turned_on = False
@@ -993,7 +1025,7 @@ class LaserManager:
return (0.0, 0)
# 尝试读取数据
response = hardware_manager.distance_serial.read(config.DISTANCE_RESPONSE_LEN)
response = self._serial.read(config.DISTANCE_RESPONSE_LEN)
# 如果读到完整数据,立即返回
if response and len(response) == config.DISTANCE_RESPONSE_LEN:

126
main.py
View File

@@ -88,11 +88,13 @@ def cmd_str():
# 2. 初始化硬件对象UART、I2C、ADC
hardware_manager.init_uart4g()
hardware_manager.init_distance_serial()
hardware_manager.init_bus()
hardware_manager.init_adc()
hardware_manager.init_at_client()
# 3. 初始化激光模块(串口 + 开机关闭激光防误触发)
laser_manager.init()
# 3. 初始化 INA226 电量监测芯片
init_ina226()
@@ -109,6 +111,9 @@ def cmd_str():
logger_manager.init_logging(log_level=logging.DEBUG)
logger = logger_manager.logger
# 补充:因为初始化的时候,激光会亮,先关了它
# laser_manager.turn_off_laser()
# 2. 从4G模块同步系统时间需要 at_client 已初始化)
sync_system_time_from_4g()
@@ -212,6 +217,39 @@ def cmd_str():
logger.info("系统准备完成...")
last_adc_trigger = 0
# 气压采样:减少日志频率(每 N 个点输出一条),避免 logger.debug 拖慢采样
PRESSURE_BATCH_SIZE = 100
pressure_buf = []
pressure_sum = 0
pressure_min = 4095
pressure_max = 0
pressure_t0_ms = None
def _flush_pressure_buf(reason: str):
if not config.AIR_PRESSURE_lOG:
return
nonlocal pressure_buf, pressure_sum, pressure_min, pressure_max, pressure_t0_ms, logger
if not pressure_buf:
return
t1_ms = time.ticks_ms()
n = len(pressure_buf)
avg = (pressure_sum / n) if n else 0
# 一行输出:方便后处理画曲线;同时带上统计信息便于快速看波峰
line = (
f"[气压批量] reason={reason} "
f"t0={pressure_t0_ms} t1={t1_ms} n={n} "
f"min={pressure_min} max={pressure_max} avg={avg:.1f} "
f"values={','.join(map(str, pressure_buf))}"
)
if logger:
logger.debug(line)
else:
print(line)
pressure_buf = []
pressure_sum = 0
pressure_min = 4095
pressure_max = 0
pressure_t0_ms = None
# 主循环:检测扳机触发 → 拍照 → 分析 → 上报
while not app.need_exit():
@@ -245,12 +283,27 @@ def cmd_str():
logger.error(f"[MAIN] ADC读取异常: {e}")
time.sleep_ms(100)
continue
if adc_val > config.ADC_TRIGGER_THRESHOLD:
# ====== 气压采样缓存(每次循环都记录,批量输出日志)======
if pressure_t0_ms is None:
pressure_t0_ms = current_time
pressure_buf.append(adc_val)
pressure_sum += adc_val
if adc_val < pressure_min:
pressure_min = adc_val
if adc_val > pressure_max:
pressure_max = adc_val
if len(pressure_buf) >= PRESSURE_BATCH_SIZE:
_flush_pressure_buf("batch")
# if adc_val >= 2000:
# print(f"adc :{adc_val}")
if adc_val >= config.ADC_TRIGGER_THRESHOLD:
diff_ms = current_time - last_adc_trigger
if diff_ms < 3000:
continue
last_adc_trigger = current_time
# 触发前先把缓存刷出来,避免波形被长耗时处理截断
_flush_pressure_buf("before_trigger")
try:
frame = camera_manager.read_frame()
@@ -286,7 +339,8 @@ def cmd_str():
# 检测靶心
result_img, center, radius, method, best_radius1, ellipse_params = detect_circle_v3(frame, laser_point)
camera_manager.show(result_img)
if config.SHOW_CAMERA_PHOTO_WHILE_SHOOTING:
camera_manager.show(result_img)
# 计算偏移与距离(如果检测到靶心)
if center and radius:
@@ -327,21 +381,7 @@ def cmd_str():
from shot_id_generator import shot_id_generator
shot_id = shot_id_generator.generate_id() # 不需要使用device_id
if logger:
logger.info(f"[MAIN] 射箭ID: {shot_id}")
# 保存图像(无论是否检测到靶心都保存):放入队列由 worker 异步保存,不阻塞主循环
enqueue_save_shot(
result_img,
center,
radius,
method,
ellipse_params,
(x, y),
distance_m,
shot_id=shot_id,
photo_dir=config.PHOTO_DIR if config.SAVE_IMAGE_ENABLED else None,
)
# 构造上报数据
inner_data = {
@@ -378,14 +418,30 @@ def cmd_str():
report_data = {"cmd": 1, "data": inner_data}
network_manager.safe_enqueue(report_data, msg_type=2, high=True)
if logger:
if center and radius:
logger.info(f"射箭事件已加入发送队列已检测到靶心ID: {shot_id}")
else:
logger.info(f"射箭事件已加入发送队列未检测到靶心已保存图像ID: {shot_id}")
# 闪一下激光(射箭反馈)
laser_manager.flash_laser(1000)
if config.FLASH_LASER_WHILE_SHOOTING:
laser_manager.flash_laser(config.FLASH_LASER_DURATION_MS)
# 保存图像(无论是否检测到靶心都保存):放入队列由 worker 异步保存,不阻塞主循环
enqueue_save_shot(
result_img,
center,
radius,
method,
ellipse_params,
(x, y),
distance_m,
shot_id=shot_id,
photo_dir=config.PHOTO_DIR if config.SAVE_IMAGE_ENABLED else None,
)
if center and radius:
logger.info(f"射箭事件已加入发送队列已检测到靶心ID: {shot_id}")
else:
logger.info(f"射箭事件已加入发送队列未检测到靶心已保存图像ID: {shot_id}")
time.sleep_ms(100)
except Exception as e:
@@ -397,13 +453,14 @@ def cmd_str():
time.sleep_ms(100)
continue
else:
try:
camera_manager.show(camera_manager.read_frame())
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[MAIN] 显示异常: {e}")
time.sleep_ms(50)
if config.SHOW_CAMERA_PHOTO_WHILE_SHOOTING:
try:
camera_manager.show(camera_manager.read_frame())
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[MAIN] 显示异常: {e}")
time.sleep_ms(10)
except Exception as e:
# 主循环的顶层异常捕获,防止程序静默退出
@@ -416,6 +473,11 @@ def cmd_str():
print(f"[MAIN] 主循环异常: {e}")
import traceback
traceback.print_exc()
# 异常发生时尽量把缓存刷盘,方便定位问题
try:
_flush_pressure_buf("exception")
except:
pass
time.sleep_ms(1000) # 等待1秒后继续

View File

@@ -42,6 +42,7 @@ class NetworkManager:
self._high_send_queue = []
self._normal_send_queue = []
self._queue_lock = threading.Lock()
self._send_event = threading.Event()
self._uart4g_lock = threading.Lock()
self._device_id = None
self._password = None
@@ -150,6 +151,7 @@ class NetworkManager:
self._high_send_queue.append(item)
else:
self._normal_send_queue.append(item)
self._send_event.set()
def _dequeue(self):
"""线程安全地从队列取出(内部方法)"""
@@ -225,56 +227,132 @@ class NetworkManager:
self._wifi_connected = False
return False
def connect_wifi(self, ssid, password):
def connect_wifi(self, ssid, password, verify_host=None, verify_port=None, persist=True, timeout_s=20):
"""
连接 Wi-Fi 并将凭证持久化保存到 /boot/ 目录
连接 Wi-Fi(先用新凭证尝试连接并验证可用性;失败自动回滚;成功后再决定是否落盘)
重要:系统的 /etc/init.d/S30wifi 通常会读取 /boot/wifi.ssid 与 /boot/wifi.pass 来连接 WiFi。
因此要“真正尝试连接新 WiFi”必须临时写入 /boot/ 触发重启;若失败则把旧值写回去(回滚)。
Returns:
(ip, error): IP地址和错误信息成功时error为None
"""
# 配置文件路径定义
conf_path = "/etc/wpa_supplicant.conf" # wpa_supplicant配置文件路径
ssid_file = "/boot/wifi.ssid" # 用于保存SSID的文件路径
pass_file = "/boot/wifi.pass" # 用于保存密码的文件路径
conf_path = "/etc/wpa_supplicant.conf"
ssid_file = "/boot/wifi.ssid"
pass_file = "/boot/wifi.pass"
def _read_text(path: str):
try:
if os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
return f.read()
except Exception:
return None
return None
def _write_text(path: str, content: str):
with open(path, "w", encoding="utf-8") as f:
f.write(content)
def _restore_boot(old_ssid: str | None, old_pass: str | None):
# 还原 /boot 凭证:原来没有就删除,原来有就写回
try:
if old_ssid is None:
if os.path.exists(ssid_file):
os.remove(ssid_file)
else:
_write_text(ssid_file, old_ssid)
except Exception:
pass
try:
if old_pass is None:
if os.path.exists(pass_file):
os.remove(pass_file)
else:
_write_text(pass_file, old_pass)
except Exception:
pass
old_conf = _read_text(conf_path)
old_boot_ssid = _read_text(ssid_file)
old_boot_pass = _read_text(pass_file)
try:
# 生成 wpa_supplicant 配置
net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read() # 调用系统命令生成配置
if "network={" not in net_conf: # 检查配置是否生成成功
return None, "Failed to generate wpa config"
# 写入运行时配置
with open(conf_path, "w") as f: # 打开配置文件准备写入
f.write("ctrl_interface=/var/run/wpa_supplicant\n") # 设置控制接口路径
f.write("update_config=1\n\n") # 允许更新配置
f.write(net_conf) # 写入网络配置
# 持久化保存 SSID/PASS
with open(ssid_file, "w") as f: # 打开SSID文件准备写入
f.write(ssid.strip()) # 写入SSID去除首尾空格
with open(pass_file, "w") as f: # 打开密码文件准备写入
f.write(password.strip()) # 写入密码(去除首尾空格)
# 生成 wpa_supplicant 配置(写 /etc 作为辅助,具体是否生效取决于 S30wifi 脚本)
net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read()
if "network={" not in net_conf:
raise RuntimeError("Failed to generate wpa config")
try:
_write_text(
conf_path,
"ctrl_interface=/var/run/wpa_supplicant\n"
"update_config=1\n\n"
+ net_conf,
)
except Exception:
# 不强制要求写 /etc 成功(某些系统只用 /boot
pass
# ====== 临时写入 /boot 凭证,触发 WiFi 服务真正尝试连接新 SSID ======
_write_text(ssid_file, ssid.strip())
_write_text(pass_file, password.strip())
# 重启 Wi-Fi 服务
os.system("/etc/init.d/S30wifi restart") # 执行WiFi服务重启命令
os.system("/etc/init.d/S30wifi restart")
# 等待获取 IP
import time as std_time # 导入time模块并重命名为std_time
for _ in range(50): # 最多等待50秒
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() # 获取wlan0的IP地址
if ip: # 如果获取到IP地址
self._wifi_connected = True # 设置WiFi连接状态为已连接
self._wifi_ip = ip # 保存IP地址
self.logger.info(f"[WIFI] 已连接IP: {ip}") # 记录连接成功日志
import time as std_time
wait_s = int(timeout_s) if timeout_s and timeout_s > 0 else 20
wait_s = min(max(wait_s, 5), 60)
for _ in range(wait_s):
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
if ip:
# 拿到 IP 不代表可上网/可访问目标;继续做可达性验证
self._wifi_connected = True
self._wifi_ip = ip
self.logger.info(f"[WIFI] 已连接IP: {ip},开始验证网络可用性...")
# 验证能访问指定目标(默认使用 TCP 服务器)
v_host = verify_host if verify_host is not None else self._server_ip
v_port = int(verify_port) if verify_port is not None else int(self._server_port)
if v_host and v_port:
if not self.is_server_reachable(v_host, v_port, timeout=5):
raise RuntimeError(f"Target unreachable ({v_host}:{v_port})")
# ====== 验证通过 ======
if not persist:
# 不持久化:把 /boot 恢复成旧值(不重启,当前连接保持不变)
_restore_boot(old_boot_ssid, old_boot_pass)
self.logger.info("[WIFI] 网络验证通过,但按 persist=False 回滚 /boot 凭证(不重启)")
else:
self.logger.info("[WIFI] 网络验证通过,/boot 凭证已保留(持久化)")
return ip, None
std_time.sleep(1) # 每次循环等待1秒
return None, "Timeout: No IP obtained" # 超时未获取到IP
except Exception as e: # 捕获所有异常
self.logger.error(f"[WIFI] 连接失败: {e}") # 记录错误日志
return None, f"Exception: {str(e)}" # 返回异常信息
std_time.sleep(1)
raise RuntimeError("Timeout: No IP obtained")
except Exception as e:
# 失败:回滚 /boot 和 /etc重启 WiFi 恢复旧网络
_restore_boot(old_boot_ssid, old_boot_pass)
try:
if old_conf is not None:
_write_text(conf_path, old_conf)
except Exception:
pass
try:
os.system("/etc/init.d/S30wifi restart")
except Exception:
pass
self._wifi_connected = False
self._wifi_ip = None
self.logger.error(f"[WIFI] 连接/验证失败,已回滚: {e}")
return None, str(e)
def is_server_reachable(self, host, port=80, timeout=5):
"""检查目标主机端口是否可达(用于网络检测)"""
@@ -417,6 +495,8 @@ class NetworkManager:
# 设置非阻塞模式(用于接收数据)
self._wifi_socket.setblocking(False)
# 加快消息发送
self._wifi_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self._tcp_connected = True
self.logger.info("[WIFI-TCP] TCP连接已建立")
@@ -466,11 +546,66 @@ class NetworkManager:
if not self._wifi_socket:
return False
try:
# 尝试发送0字节来检测连接状态
self._wifi_socket.send(b"", socket.MSG_DONTWAIT)
# send(b"") 在很多实现里是 no-op无法可靠探测断线。
# 用非阻塞 peek 来判断若对端已关闭recv 会返回 b""。
data = self._wifi_socket.recv(1, socket.MSG_PEEK | socket.MSG_DONTWAIT)
if data == b"":
raise OSError("wifi socket closed")
return True
except:
# socket已断开
except BlockingIOError:
# 无数据可读但连接仍在EAGAIN
return True
except OSError as e:
# 兼容不同平台的 EAGAIN / would block
err = getattr(e, "errno", None)
if err in (11, 35, 10035): # EAGAIN/EWOULDBLOCK on linux/mac/win
return True
# 某些平台会把“无数据可读/超时”抛成 socket.timeout / TimeoutErrorerrno 可能为 None
# 这不代表断线:视为 benign交给真正的 send/recv 去判定断线。
if (err is None) and (("timed out" in str(e).lower()) or isinstance(e, (TimeoutError, socket.timeout))):
try:
self.logger.warning(f"[WIFI-TCP] conncheck timeout treated as benign: {e}")
except:
pass
return True
# 某些嵌入式 socket 实现可能不支持 MSG_PEEK/MSG_DONTWAIT或返回 EINVAL/ENOTSUP。
# 这种情况不代表断线:选择“无法检测但不判死”,交给真正的 send/recv 去触发断线处理。
# 常见EINVAL(22), ENOTSUP(95), EOPNOTSUPP(95), EINTR(4)
if err in (4, 22, 95):
try:
self.logger.warning(f"[WIFI-TCP] conncheck unsupported/benign errno={err}: {e}")
except:
pass
return True
# 记录真实错误,便于定位
try:
self.logger.error(f"[WIFI-TCP] conncheck failed errno={err}: {e}")
except:
pass
# 明确的“连接不可用”错误才判定断线并清理
# 常见ENOTCONN(107), ECONNRESET(104), EPIPE(32), EBADF(9)
if err in (9, 32, 104, 107, 10054, 10057):
try:
self._wifi_socket.close()
except:
pass
self._wifi_socket = None
self._tcp_connected = False
return False
# socket已断开或不可用清理
try:
self._wifi_socket.close()
except:
pass
self._wifi_socket = None
self._tcp_connected = False
return False
except Exception:
try:
self._wifi_socket.close()
except:
@@ -524,6 +659,10 @@ class NetworkManager:
# 根据网络类型选择发送方式
if self._network_type == "wifi":
# 先快速校验 WiFi socket 是否仍有效,避免卡在半开连接上
if not self._check_wifi_connection():
print("_check_wifi_connection failed")
return False
return self._tcp_send_raw_via_wifi(data, max_retries)
elif self._network_type == "4g":
return self._tcp_send_raw_via_4g(data, max_retries)
@@ -546,7 +685,7 @@ class NetworkManager:
if sent == 0:
# socket连接已断开
self.logger.warning(f"[WIFI-TCP] 发送失败socket已断开尝试 {attempt+1}/{max_retries}")
break
raise OSError("wifi socket closed (send returned 0)")
total_sent += sent
if total_sent == len(data):
@@ -557,10 +696,23 @@ class NetworkManager:
except OSError as e:
self.logger.error(f"[WIFI-TCP] 发送异常: {e}(尝试 {attempt+1}/{max_retries}")
time.sleep_ms(50)
# 发送异常通常意味着连接已不可用,主动关闭以触发重连
try:
self._wifi_socket.close()
except:
pass
self._wifi_socket = None
self._tcp_connected = False
return False
except Exception as e:
self.logger.error(f"[WIFI-TCP] 未知错误: {e}(尝试 {attempt+1}/{max_retries}")
time.sleep_ms(50)
try:
self._wifi_socket.close()
except:
pass
self._wifi_socket = None
self._tcp_connected = False
return False
return False
@@ -652,15 +804,13 @@ class NetworkManager:
return b""
try:
# 设置接收超时
self._wifi_socket.settimeout(timeout_ms / 1000.0)
# 尝试接收数据
data = self._wifi_socket.recv(4096) # 每次最多接收4KB
# 这里保持 socket 为非阻塞模式(连接时已 setblocking(False))。
# 不要反复 settimeout(),否则会把 socket 切回“阻塞+超时”,并导致 conncheck 误报 timed out
data = self._wifi_socket.recv(4096) # 每次最多接收4KB无数据会抛 BlockingIOError
return data
except socket.timeout:
# 超时是正常的,表示没有数据
except BlockingIOError:
# 无数据可读是正常的
return b""
except OSError as e:
# socket错误连接断开等
@@ -679,6 +829,240 @@ class NetworkManager:
self.logger.error(f"[WIFI-TCP] 接收数据异常: {e}")
return b""
def _upload_log_file(self, upload_url, wifi_ssid=None, wifi_password=None, include_rotated=True, max_files=None, archive_format="tgz"):
"""上传日志文件到指定URL
Args:
upload_url: 上传目标URL例如 "https://example.com/upload/"
wifi_ssid: WiFi SSID可选如果未连接WiFi则尝试连接
wifi_password: WiFi 密码(可选)
include_rotated: 是否包含轮转日志app.log.1 等)
max_files: 最多打包多少个日志文件(包含 app.log 本身None=按 backupCount 自动推断
archive_format: 打包格式tgz 或 zip
Note:
该功能仅在 WiFi 连接时可用4G 网络暂不支持文件上传
"""
import requests
import shutil
from datetime import datetime
import glob
try:
# 检查 WiFi 连接状态,如果未连接则尝试连接
if not self.is_wifi_connected():
if wifi_ssid and wifi_password:
self.logger.info(f"[LOG_UPLOAD] WiFi 未连接,尝试连接 WiFi: {wifi_ssid}")
self.safe_enqueue({"result": "log_upload_connecting_wifi", "ssid": wifi_ssid}, 2)
# 连接前先把“目标上传 URL”作为可达性验证目标只有验证通过才落盘保存 SSID/PASS
try:
from urllib.parse import urlparse
parsed = urlparse(upload_url)
v_host = parsed.hostname
v_port = parsed.port or (443 if parsed.scheme == "https" else 80)
except Exception:
v_host, v_port = None, None
ip, error = self.connect_wifi(
wifi_ssid,
wifi_password,
verify_host=v_host,
verify_port=v_port,
persist=True,
)
if error:
self.logger.error(f"[LOG_UPLOAD] WiFi 连接失败: {error}")
self.safe_enqueue({
"result": "log_upload_failed",
"reason": "wifi_connect_failed",
"detail": error
}, 2)
return
self.logger.info(f"[LOG_UPLOAD] WiFi 连接成功IP: {ip}")
else:
self.logger.warning("[LOG_UPLOAD] WiFi 未连接且未提供 WiFi 凭证,无法上传日志")
self.safe_enqueue({
"result": "log_upload_failed",
"reason": "wifi_not_connected",
"detail": "WiFi not connected and no credentials provided"
}, 2)
return
else:
self.logger.info("[LOG_UPLOAD] WiFi 已连接,跳过连接步骤")
self.logger.info(f"[LOG_UPLOAD] 开始上传日志文件...")
# 获取日志文件路径
log_file_path = config.LOG_FILE # /maixapp/apps/t11/app.log
if not os.path.exists(log_file_path):
self.logger.error(f"[LOG_UPLOAD] 日志文件不存在: {log_file_path}")
self.safe_enqueue({"result": "log_upload_failed", "reason": "log_file_not_found"}, 2)
return
# 生成带时间戳的文件名(归档)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
device_id = self._device_id or "unknown"
base_name = f"logs_{timestamp}_{device_id}"
archive_format = (archive_format or "tgz").strip().lower()
if archive_format not in ("tgz", "zip"):
archive_format = "tgz"
# 选择要打包的日志文件
candidates = [log_file_path]
if include_rotated:
candidates = sorted(set(glob.glob(log_file_path + "*")))
candidates = [p for p in candidates if os.path.isfile(p)]
# app.log 优先其余按“数字后缀”排序app.log.1, app.log.2...
def _log_sort_key(p: str):
if p == log_file_path:
return (0, 0, p)
suffix = p[len(log_file_path):]
if suffix.startswith("."):
try:
return (1, int(suffix[1:]), p)
except:
return (2, 999999, p)
return (3, 999999, p)
candidates.sort(key=_log_sort_key)
# 限制最大文件数默认app.log + 轮转数量)
if max_files is None:
try:
max_files = 1 + int(getattr(config, "LOG_BACKUP_COUNT", 5))
except:
max_files = 6
try:
max_files = int(max_files)
except:
max_files = 6
max_files = max(1, min(max_files, 20))
selected = candidates[:max_files]
if not selected:
self.logger.error("[LOG_UPLOAD] 未找到可打包的日志文件")
self.safe_enqueue({"result": "log_upload_failed", "reason": "no_log_files"}, 2)
return
self.logger.info(f"[LOG_UPLOAD] 将打包日志文件数: {len(selected)}")
# 先 sync尽量确保日志落盘再复制快照到 /tmp避免打包过程中日志被追加导致内容不一致
os.system("sync")
temp_dir = "/tmp"
staging_dir = os.path.join(temp_dir, f"log_upload_{base_name}")
os.makedirs(staging_dir, exist_ok=True)
staged_paths = []
try:
for p in selected:
dst = os.path.join(staging_dir, os.path.basename(p))
shutil.copy2(p, dst)
staged_paths.append(dst)
except Exception as e:
self.logger.error(f"[LOG_UPLOAD] 复制日志快照失败: {e}")
self.safe_enqueue({"result": "log_upload_failed", "reason": "snapshot_failed", "detail": str(e)[:100]}, 2)
try:
shutil.rmtree(staging_dir)
except:
pass
return
# 打包压缩
if archive_format == "zip":
archive_filename = f"{base_name}.zip"
else:
archive_filename = f"{base_name}.tar.gz"
archive_path = os.path.join(temp_dir, archive_filename)
try:
if archive_format == "zip":
import zipfile
with zipfile.ZipFile(archive_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for p in staged_paths:
zf.write(p, arcname=os.path.basename(p))
else:
import tarfile
with tarfile.open(archive_path, "w:gz") as tf:
for p in staged_paths:
tf.add(p, arcname=os.path.basename(p))
self.logger.info(f"[LOG_UPLOAD] 日志压缩包已生成: {archive_path}")
except Exception as e:
self.logger.error(f"[LOG_UPLOAD] 打包压缩失败: {e}")
self.safe_enqueue({"result": "log_upload_failed", "reason": "archive_failed", "detail": str(e)[:100]}, 2)
try:
shutil.rmtree(staging_dir)
except:
pass
try:
if os.path.exists(archive_path):
os.remove(archive_path)
except:
pass
return
finally:
try:
shutil.rmtree(staging_dir)
except:
pass
# 使用 multipart/form-data 上传压缩包
with open(archive_path, 'rb') as f:
mime = "application/gzip" if archive_format == "tgz" else "application/zip"
files = {'file': (archive_filename, f, mime)}
# 添加额外的头部信息
headers = {
'User-Agent': 'Archery-Device/1.0',
'X-Device-ID': device_id,
}
# 如果是 ngrok-free.dev添加绕过警告页面的头
if 'ngrok-free.dev' in upload_url or 'ngrok.io' in upload_url:
headers['ngrok-skip-browser-warning'] = 'true'
self.logger.info(f"[LOG_UPLOAD] 正在上传到: {upload_url}")
# 禁用 SSL 警告(用于自签名证书或 SSL 兼容性问题)
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 发送请求verify=False 跳过 SSL 证书验证(解决 MaixCAM SSL 兼容性问题)
response = requests.post(upload_url, files=files, headers=headers, timeout=60, verify=False)
if response.status_code in (200, 201, 204):
self.logger.info(f"[LOG_UPLOAD] 上传成功! 状态码: {response.status_code}")
self.safe_enqueue({
"result": "log_upload_ok",
"filename": archive_filename,
"status_code": response.status_code
}, 2)
else:
self.logger.error(f"[LOG_UPLOAD] 上传失败! 状态码: {response.status_code}, 响应: {response.text[:200]}")
self.safe_enqueue({
"result": "log_upload_failed",
"reason": f"http_{response.status_code}",
"detail": response.text[:100]
}, 2)
# 清理临时文件
try:
os.remove(archive_path)
self.logger.debug(f"[LOG_UPLOAD] 临时文件已删除: {archive_path}")
except Exception as e:
self.logger.warning(f"[LOG_UPLOAD] 删除临时文件失败: {e}")
except requests.exceptions.Timeout:
self.logger.error("[LOG_UPLOAD] 上传超时")
self.safe_enqueue({"result": "log_upload_failed", "reason": "timeout"}, 2)
except requests.exceptions.ConnectionError as e:
self.logger.error(f"[LOG_UPLOAD] 连接失败: {e}")
self.safe_enqueue({"result": "log_upload_failed", "reason": "connection_error"}, 2)
except Exception as e:
self.logger.error(f"[LOG_UPLOAD] 上传异常: {e}")
self.safe_enqueue({"result": "log_upload_failed", "reason": str(e)[:100]}, 2)
def generate_token(self, device_id):
"""生成用于 HTTP 接口鉴权的 TokenHMAC-SHA256"""
@@ -731,6 +1115,10 @@ class NetworkManager:
# if not self.tcp_send_raw(self.make_packet(1, login_data)):
if not self.tcp_send_raw(self._netcore.make_packet(1, login_data)):
self._tcp_connected = False
try:
self.disconnect_server()
except:
pass
time.sleep_ms(2000)
continue
@@ -741,6 +1129,10 @@ class NetworkManager:
last_heartbeat_send_time = time.ticks_ms()
while True:
# 如果底层连接已断开,尽快跳出内层循环触发重连/重选网络
if not self._tcp_connected:
break
# OTA 期间暂停 TCP 活动
try:
from ota_manager import ota_manager
@@ -929,29 +1321,29 @@ class NetworkManager:
except:
ip = "error_getting_ip"
self.safe_enqueue({"result": "current_ip", "ip": ip}, 2)
elif inner_cmd == 7:
from ota_manager import ota_manager
if ota_manager.update_thread_started:
self.safe_enqueue({"result": "update_already_started"}, 2)
continue
# elif inner_cmd == 7:
# from ota_manager import ota_manager
# if ota_manager.update_thread_started:
# self.safe_enqueue({"result": "update_already_started"}, 2)
# continue
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
except:
ip = None
# try:
# ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
# except:
# ip = None
if not ip:
self.safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, 2)
else:
# 注意direct_ota_download 需要 ota_url 参数
# 如果 ota_manager.ota_url 为 None需要从其他地方获取
ota_url_to_use = ota_manager.ota_url
if not ota_url_to_use:
self.logger.error("[OTA] cmd=7 但 OTA_URL 未设置")
self.safe_enqueue({"result": "ota_failed", "reason": "ota_url_not_set"}, 2)
else:
ota_manager._start_update_thread()
_thread.start_new_thread(ota_manager.direct_ota_download, (ota_url_to_use,))
# if not ip:
# self.safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, 2)
# else:
# # 注意direct_ota_download 需要 ota_url 参数
# # 如果 ota_manager.ota_url 为 None需要从其他地方获取
# ota_url_to_use = ota_manager.ota_url
# if not ota_url_to_use:
# self.logger.error("[OTA] cmd=7 但 OTA_URL 未设置")
# self.safe_enqueue({"result": "ota_failed", "reason": "ota_url_not_set"}, 2)
# else:
# ota_manager._start_update_thread()
# _thread.start_new_thread(ota_manager.direct_ota_download, (ota_url_to_use,))
elif inner_cmd == 41:
self.logger.info("[TEST] 收到TCP射箭触发命令")
self._manual_trigger_flag = True
@@ -968,29 +1360,62 @@ class NetworkManager:
except:
pass
time.sleep_ms(2000)
os.system("sync") # 刷新文件系统缓存到磁盘,防止数据丢失
time.sleep_ms(500)
os.system("poweroff")
return
elif inner_cmd == 43: # 上传日志命令
# 格式: {"cmd":43, "data":{"ssid":"xxx","password":"xxx","url":"xxx", ...}}
inner_data = data_obj.get("data", {})
upload_url = inner_data.get("url")
wifi_ssid = inner_data.get("ssid")
wifi_password = inner_data.get("password")
include_rotated = inner_data.get("include_rotated", True)
max_files = inner_data.get("max_files")
archive_format = inner_data.get("archive", "tgz") # tgz 或 zip
if not upload_url:
self.logger.error("[LOG_UPLOAD] 缺少 url 参数")
self.safe_enqueue({"result": "log_upload_failed", "reason": "missing_url"}, 2)
else:
self.logger.info(f"[LOG_UPLOAD] 收到日志上传命令目标URL: {upload_url}")
# 在新线程中执行上传,避免阻塞主循环
import _thread
_thread.start_new_thread(
self._upload_log_file,
(upload_url, wifi_ssid, wifi_password, include_rotated, max_files, archive_format)
)
else:
time.sleep_ms(5)
# 发送队列中的业务数据
if logged_in and (self._high_send_queue or self._normal_send_queue):
msg_type = None
data_dict = None
if self.get_queue_lock().acquire(blocking=False):
try:
if self._high_send_queue:
msg_type, data_dict = self._high_send_queue.pop(0)
elif self._normal_send_queue:
msg_type, data_dict = self._normal_send_queue.pop(0)
finally:
self.get_queue_lock().release()
if logged_in:
item = None
item_is_high = False
# 出队:发送失败时会把 item 放回队首,避免丢数据
with self.get_queue_lock():
if self._high_send_queue:
item = self._high_send_queue.pop(0)
item_is_high = True
elif self._normal_send_queue:
item = self._normal_send_queue.pop(0)
item_is_high = False
if msg_type is not None and data_dict is not None:
if item:
msg_type, data_dict = item
pkt = self._netcore.make_packet(msg_type, data_dict)
# pkt = self.make_packet(msg_type, data_dict)
if not self.tcp_send_raw(pkt):
# 发送失败:将消息放回队首,触发重连(避免丢消息)
with self.get_queue_lock():
if item_is_high:
self._high_send_queue.insert(0, item)
else:
self._normal_send_queue.insert(0, item)
self._tcp_connected = False
try:
self.disconnect_server()
except:
pass
break
# 发送激光校准结果
@@ -1007,14 +1432,20 @@ class NetworkManager:
vol_val = get_bus_voltage()
if not self.tcp_send_raw(self._netcore.make_packet(4, {"vol": vol_val, "vol_per": voltage_to_percent(vol_val)})):
# if not self.tcp_send_raw(self.make_packet(4, {"vol": vol_val, "vol_per": voltage_to_percent(vol_val)})):
self.logger.error("心跳发送失败")
time.sleep_ms(3000)
send_hartbeat_fail_count += 1
# 短暂波动可能导致一次发送失败:连续失败达到阈值才重连,避免重连风暴
self.logger.error(f"心跳发送失败({send_hartbeat_fail_count}/3),准备重试")
if send_hartbeat_fail_count >= 3:
send_hartbeat_fail_count = 0
self.logger.error("连续3次发送心跳失败重连")
self.logger.error("心跳连续失败>=3准备重连")
self._tcp_connected = False
try:
self.disconnect_server()
except:
pass
break
else:
# 不立即断开,让下一轮心跳再试;同时缩短一点等待,提升恢复速度
time.sleep_ms(200)
continue
else:
send_hartbeat_fail_count = 0
@@ -1040,7 +1471,8 @@ class NetworkManager:
self.logger.error("十分钟无心跳ACK重连")
break
time.sleep_ms(50)
self._send_event.wait(timeout=0.05) # 0.05秒 = 50ms
self._send_event.clear()
self._tcp_connected = False
self.logger.error("连接异常2秒后重连...")

View File

@@ -115,10 +115,7 @@ class OTAManager:
"""设置OTA模式内部方法"""
with self._lock:
self._ota_mode = mode
# ==================== 业务方法 ====================
# 注意:这些方法会调用 ota.py 中的实际实现函数
# 为了保持向后兼容,实际的实现仍然在 ota.py 中
def is_archive_file(self, filename):
"""
@@ -628,7 +625,7 @@ class OTAManager:
download_dir = self.get_download_timestamp_dir()
return f"{download_dir}/{default_name}"
def download_file(self, url, filename):
def download_file_via_wifi(self, url, filename):
"""从指定 URL 下载文件根据文件类型自动选择文本或二进制模式并支持MD5校验"""
try:
self.logger.info(f"正在从 {url} 下载文件...")
@@ -694,47 +691,47 @@ class OTAManager:
except Exception as e:
return f"下载失败!发生未知错误: {e}"
def direct_ota_download(self, ota_url):
"""直接执行 OTA 下载(假设已有网络)"""
# def direct_ota_download(self, ota_url):
# """直接执行 OTA 下载(假设已有网络)"""
self._set_ota_url(ota_url)
self._start_update_thread()
# self._set_ota_url(ota_url)
# self._start_update_thread()
try:
if not ota_url:
from network import safe_enqueue
safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2)
return
# try:
# if not ota_url:
# from network import safe_enqueue
# safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2)
# return
parsed_url = urlparse(ota_url)
host = parsed_url.hostname
port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80)
# parsed_url = urlparse(ota_url)
# host = parsed_url.hostname
# port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80)
if not network_manager.is_server_reachable(host, port, timeout=8):
from network import safe_enqueue
safe_enqueue({"result": "ota_failed", "reason": f"无法连接 {host}:{port}"}, 2)
return
# if not network_manager.is_server_reachable(host, port, timeout=8):
# from network import safe_enqueue
# safe_enqueue({"result": "ota_failed", "reason": f"无法连接 {host}:{port}"}, 2)
# return
downloaded_filename = self.get_filename_from_url(ota_url, default_name="main_tmp")
self.logger.info(f"[OTA] 下载文件将保存为: {downloaded_filename}")
self.logger.info(f"[OTA] 开始下载: {ota_url}")
result_msg = self.download_file(ota_url, downloaded_filename)
self.logger.info(f"[OTA] {result_msg}")
# downloaded_filename = self.get_filename_from_url(ota_url, default_name="main_tmp")
# self.logger.info(f"[OTA] 下载文件将保存为: {downloaded_filename}")
# self.logger.info(f"[OTA] 开始下载: {ota_url}")
# result_msg = self.download_file(ota_url, downloaded_filename)
# self.logger.info(f"[OTA] {result_msg}")
if "成功" in result_msg or "下载成功" in result_msg:
if self.apply_ota_and_reboot(ota_url, downloaded_filename):
return
else:
from network import safe_enqueue
safe_enqueue({"result": result_msg}, 2)
# if "成功" in result_msg or "下载成功" in result_msg:
# if self.apply_ota_and_reboot(ota_url, downloaded_filename):
# return
# else:
# from network import safe_enqueue
# safe_enqueue({"result": result_msg}, 2)
except Exception as e:
error_msg = f"OTA 异常: {str(e)}"
self.logger.error(error_msg)
from network import safe_enqueue
safe_enqueue({"result": "ota_failed", "reason": error_msg}, 2)
finally:
self._stop_update_thread()
# except Exception as e:
# error_msg = f"OTA 异常: {str(e)}"
# self.logger.error(error_msg)
# from network import safe_enqueue
# safe_enqueue({"result": "ota_failed", "reason": error_msg}, 2)
# finally:
# self._stop_update_thread()
def download_file_via_4g(self, url, filename,
total_timeout_ms=600000,
@@ -766,7 +763,7 @@ class OTAManager:
return False, "bad_url (no host)"
# 很多 ML307R 的 MHTTP 对 https 不稳定;对已知域名做降级
if isinstance(url, str) and url.startswith("https://static.shelingxingqiu.com/"):
base_url = "https://static.shelingxingqiu.com"
# TODO使用https看看是否能成功
@@ -862,7 +859,7 @@ class OTAManager:
downgraded_base_url = base_url.replace("https://", "http://")
resp = hardware_manager.at_client.send(f'AT+MHTTPCREATE="{downgraded_base_url}"', "OK", 8000)
hid = _parse_httpid(resp)
return hid, resp
def _fetch_range_into_buf(start, want_len, out_buf, full_reset=False):
@@ -1193,12 +1190,8 @@ class OTAManager:
from network import network_manager, safe_enqueue
try:
ip, error = network_manager.connect_wifi(ssid, password)
if error:
safe_enqueue({"result": "wifi_failed", "error": error}, 2)
return
safe_enqueue({"result": "wifi_connected", "ip": ip}, 2)
# 与 4G 一致OTA 期间暂停主循环 / 心跳等
self._begin_ota()
if not ota_url:
safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2)
return
@@ -1207,17 +1200,25 @@ class OTAManager:
host = parsed_url.hostname
port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80)
if not network_manager.is_server_reachable(host, port, timeout=8):
err_msg = f"网络不通:无法连接 {host}:{port}"
safe_enqueue({"result": err_msg}, 2)
self.logger.error(err_msg)
# 先连接 WiFi并把 OTA host:port 作为“可达性验证目标”
# 只有连接成功 + 可访问 OTA 地址,才会把 SSID/PASS 落盘到 /boot/
ip, error = network_manager.connect_wifi(
ssid,
password,
verify_host=host,
verify_port=port,
persist=True,
)
if error:
safe_enqueue({"result": "wifi_failed", "error": error}, 2)
return
safe_enqueue({"result": "wifi_connected", "ip": ip}, 2)
downloaded_filename = self.get_filename_from_url(ota_url, default_name="main_tmp")
self.logger.info(f"[OTA] 下载文件将保存为: {downloaded_filename}")
self.logger.info(f"[NET] 已确认可访问 {host}:{port},开始下载...")
result = self.download_file(ota_url, downloaded_filename)
result = self.download_file_via_wifi(ota_url, downloaded_filename)
self.logger.info(result)
if "成功" in result or "下载成功" in result:
@@ -1231,6 +1232,7 @@ class OTAManager:
self.logger.error(err_msg)
finally:
self._stop_update_thread()
self._end_ota() # 与 4G 一致
print("[UPDATE] 更新线程执行完毕,即将退出。")
def restore_from_backup(self, backup_dir_path=None):

View File

@@ -4,11 +4,18 @@
应用版本号
每次 OTA 更新时,只需要更新这个文件中的版本号
"""
VERSION = '1.2.1'
VERSION = '1.2.7'
# 1.2.0 开始使用C++编译成.so替换部分代码
# 1.2.1 ota使用加密包
# 1.2.2 支持wifi ota并且设定时区并使用单独线程保存图片
# 1.2.3 修改ADC_TRIGGER_THRESHOLD 为2300支持上传日志到服务器
# 1.2.4 修改ADC_TRIGGER_THRESHOLD 为3000并默认关闭摄像头的显示并把ADC的采样间隔从50ms降低到10ms
# 1.2.5 支持空气传感器采样,并默认关闭日志。优化断网时的发送队列丢消息问题,解决 WiFi 断线检测不可靠问题。
# 1.2.6 在链接 wifi 前先判断 wifi 的可用性,假如不可用,则不落盘。增加日志批量压缩上传功能
# 1.2.7 修复OTA失败的bug, 空气压力传感器的阈值是2500
# 1.2.8 1 加快 wifi 下数据传输的速度。2 调整射箭时处理的逻辑优先上报数据再存照片之类的操作。3假如是用户打开激光的射箭触发后不再关闭激光因为是调瞄阶段