diff --git a/app.yaml b/app.yaml index 4232cbc..464bf28 100644 --- a/app.yaml +++ b/app.yaml @@ -1,6 +1,6 @@ id: t11 name: t11 -version: 1.1.10 +version: 1.2.1 author: t11 icon: '' desc: t11 diff --git a/cpp_ext/CMakeLists.txt b/cpp_ext/CMakeLists.txt index aefc191..590db1d 100644 --- a/cpp_ext/CMakeLists.txt +++ b/cpp_ext/CMakeLists.txt @@ -27,12 +27,42 @@ add_library(archery_netcore MODULE target_include_directories(archery_netcore PRIVATE "${PY_INCLUDE_DIR}" "${MAIXCDK_PATH}/components/3rd_party/pybind11/pybind11/include" + "${MAIXCDK_PATH}/components/3rd_party/openssl/include" "${CMAKE_CURRENT_SOURCE_DIR}/third_party" # 添加 nlohmann/json 路径 ) +# 尽量减少 .so 体积并增加逆向成本 +target_compile_options(archery_netcore PRIVATE + -Os + -ffunction-sections + -fdata-sections + -fvisibility=hidden + -fvisibility-inlines-hidden +) +target_link_options(archery_netcore PRIVATE + -Wl,--gc-sections + -Wl,-s +) + set_target_properties(archery_netcore PROPERTIES PREFIX "" SUFFIX "${PY_EXT_SUFFIX}" ) -target_link_libraries(archery_netcore PRIVATE "${PY_LIB}") \ No newline at end of file +# OpenSSL (for AES-256-GCM decrypt) +# 使用 MaixCDK 提供的 OpenSSL 库(在 so/maixcam 目录下) +set(OPENSSL_LIB_DIR "${MAIXCDK_PATH}/components/3rd_party/openssl/so/maixcam") +if(EXISTS "${OPENSSL_LIB_DIR}/libcrypto.so") + target_link_directories(archery_netcore PRIVATE "${OPENSSL_LIB_DIR}") + target_link_libraries(archery_netcore PRIVATE "${PY_LIB}" crypto ssl) + message(STATUS "Using OpenSSL from MaixCDK: ${OPENSSL_LIB_DIR}") +else() + # Fallback: 尝试 find_package 或系统库 + find_package(OpenSSL QUIET) + if(OpenSSL_FOUND) + target_link_libraries(archery_netcore PRIVATE "${PY_LIB}" OpenSSL::Crypto OpenSSL::SSL) + else() + message(WARNING "OpenSSL not found in MaixCDK, trying system libraries (may fail)") + target_link_libraries(archery_netcore PRIVATE "${PY_LIB}" crypto ssl) + endif() +endif() \ No newline at end of file diff --git a/cpp_ext/archery_netcore.cpp b/cpp_ext/archery_netcore.cpp index 375a896..56a43ec 100644 --- a/cpp_ext/archery_netcore.cpp +++ b/cpp_ext/archery_netcore.cpp @@ -5,6 +5,10 @@ #include #include #include +#include +#include +#include +#include #include "native_logger.hpp" namespace py = pybind11; @@ -14,6 +18,29 @@ namespace { // 配置项 const std::string _cfg_server_ip = "www.shelingxingqiu.com"; const int _cfg_server_port = 50005; + + // OTA AEAD format: MAGIC(7) | nonce(12) | ciphertext(N) | tag(16) + constexpr const char* kOtaMagic = "AROTAE1"; + constexpr size_t kOtaMagicLen = 7; + constexpr size_t kGcmNonceLen = 12; + constexpr size_t kGcmTagLen = 16; + + // 固定 32-byte AES-256-GCM key(提高被直接查看的成本;不是绝对安全) + // 注意:需要与打包端传入的 --aead-key-hex 保持一致。 + static std::array ota_key_bytes() { + // 简单拆分混淆:key = a XOR b + static const std::array a = { + 0x92,0x99,0x4d,0x06,0x6f,0xb6,0xa6,0x3d,0x85,0x08,0xbe,0x73,0x5e,0x73,0x4d,0x8a, + 0x53,0x88,0xe6,0x99,0xfc,0x10,0x29,0xb9,0x16,0x9b,0xe7,0x0c,0x65,0x21,0x1c,0xce + }; + static const std::array b = { + 0xcf,0x60,0xa2,0xc2,0x32,0x7a,0x61,0xb0,0x4c,0x8e,0x8a,0x62,0x31,0xc7,0x82,0xff, + 0xec,0xac,0xa1,0x04,0x2a,0x4d,0xaa,0xf2,0xb0,0x5b,0x39,0x2b,0xf4,0xb3,0xad,0xad + }; + std::array k{}; + for (size_t i = 0; i < k.size(); i++) k[i] = static_cast(a[i] ^ b[i]); + return k; + } } // 定义获取配置的函数 @@ -109,6 +136,103 @@ py::dict json_to_py_dict(const json& j) { return d; } +static bool read_file_all(const std::string& path, std::vector& out) { + std::ifstream ifs(path, std::ios::binary); + if (!ifs) return false; + ifs.seekg(0, std::ios::end); + std::streampos size = ifs.tellg(); + if (size <= 0) return false; + ifs.seekg(0, std::ios::beg); + out.resize(static_cast(size)); + if (!ifs.read(reinterpret_cast(out.data()), size)) return false; + return true; +} + +static bool write_file_all(const std::string& path, const uint8_t* data, size_t len) { + std::ofstream ofs(path, std::ios::binary | std::ios::trunc); + if (!ofs) return false; + ofs.write(reinterpret_cast(data), static_cast(len)); + return static_cast(ofs); +} + +static bool decrypt_ota_file_impl(const std::string& input_path, const std::string& output_zip_path) { + std::vector in; + if (!read_file_all(input_path, in)) { + netcore::log_error(std::string("decrypt_ota_file: read failed: ") + input_path); + return false; + } + + const size_t min_len = kOtaMagicLen + kGcmNonceLen + kGcmTagLen + 1; + if (in.size() < min_len) { + netcore::log_error("decrypt_ota_file: too short"); + return false; + } + if (!std::equal(in.begin(), in.begin() + kOtaMagicLen, reinterpret_cast(kOtaMagic))) { + netcore::log_error("decrypt_ota_file: bad magic"); + return false; + } + + const uint8_t* nonce = in.data() + kOtaMagicLen; + const uint8_t* ct_and_tag = in.data() + kOtaMagicLen + kGcmNonceLen; + const size_t ct_and_tag_len = in.size() - (kOtaMagicLen + kGcmNonceLen); + if (ct_and_tag_len <= kGcmTagLen) { + netcore::log_error("decrypt_ota_file: no ciphertext"); + return false; + } + const size_t ciphertext_len = ct_and_tag_len - kGcmTagLen; + const uint8_t* ciphertext = ct_and_tag; + const uint8_t* tag = ct_and_tag + ciphertext_len; + + std::vector plain(ciphertext_len); + int out_len1 = 0; + int out_len2 = 0; + + EVP_CIPHER_CTX* ctx = EVP_CIPHER_CTX_new(); + if (!ctx) { + netcore::log_error("decrypt_ota_file: EVP_CIPHER_CTX_new failed"); + return false; + } + + bool ok = false; + auto key = ota_key_bytes(); + + do { + if (1 != EVP_DecryptInit_ex(ctx, EVP_aes_256_gcm(), nullptr, nullptr, nullptr)) { + netcore::log_error("decrypt_ota_file: DecryptInit failed"); + break; + } + if (1 != EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_SET_IVLEN, static_cast(kGcmNonceLen), nullptr)) { + netcore::log_error("decrypt_ota_file: set ivlen failed"); + break; + } + if (1 != EVP_DecryptInit_ex(ctx, nullptr, nullptr, key.data(), nonce)) { + netcore::log_error("decrypt_ota_file: set key/iv failed"); + break; + } + if (1 != EVP_DecryptUpdate(ctx, plain.data(), &out_len1, ciphertext, static_cast(ciphertext_len))) { + netcore::log_error("decrypt_ota_file: update failed"); + break; + } + if (1 != EVP_CIPHER_CTX_ctrl(ctx, EVP_CTRL_GCM_SET_TAG, static_cast(kGcmTagLen), const_cast(tag))) { + netcore::log_error("decrypt_ota_file: set tag failed"); + break; + } + if (1 != EVP_DecryptFinal_ex(ctx, plain.data() + out_len1, &out_len2)) { + netcore::log_error("decrypt_ota_file: final failed (auth tag mismatch?)"); + break; + } + const size_t plain_len = static_cast(out_len1 + out_len2); + if (!write_file_all(output_zip_path, plain.data(), plain_len)) { + netcore::log_error(std::string("decrypt_ota_file: write failed: ") + output_zip_path); + break; + } + ok = true; + } while (false); + + EVP_CIPHER_CTX_free(ctx); + return ok; +} + // 打包 TCP 数据包 py::bytes make_packet(int msg_type, py::dict body_dict) { netcore::log_debug(std::string("make_packet msg_type=") + std::to_string(msg_type)); @@ -233,6 +357,17 @@ PYBIND11_MODULE(archery_netcore, m) { m.def("get_config", &get_config, "Get system configuration"); + m.def( + "decrypt_ota_file", + [](const std::string& input_path, const std::string& output_zip_path) { + netcore::log_info(std::string("decrypt_ota_file in=") + input_path + " out=" + output_zip_path); + return decrypt_ota_file_impl(input_path, output_zip_path); + }, + py::arg("input_path"), + py::arg("output_zip_path"), + "Decrypt OTA encrypted file (MAGIC|nonce|ciphertext|tag) to plaintext zip." + ); + // Minimal demo: return actions for inner_cmd=41 (manual trigger + ack) m.def("actions_for_inner_cmd", [](int inner_cmd) { py::list actions; diff --git a/design_doc/command_record.md b/design_doc/command_record.md new file mode 100644 index 0000000..0beadaf --- /dev/null +++ b/design_doc/command_record.md @@ -0,0 +1,23 @@ + +1. CPP构建命令: + +cd /mnt/d/code/archery/cpp_ext +rm -rf build && mkdir build && cd build + +TOOLCHAIN_BIN=/mnt/d/code/MaixCDK/dl/extracted/toolchains/maixcam/host-tools/gcc/riscv64-linux-musl-x86_64/bin +PYDEV=/mnt/d/code/shooting/python3_lib_maixcam_musl_3.11.6 +MAIXCDK=/mnt/d/code/MaixCDK + +cmake .. -G Ninja \ + -DCMAKE_C_COMPILER="${TOOLCHAIN_BIN}/riscv64-unknown-linux-musl-gcc" \ + -DCMAKE_CXX_COMPILER="${TOOLCHAIN_BIN}/riscv64-unknown-linux-musl-g++" \ + -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_C_FLAGS="-mcpu=c906fdv -march=rv64imafdcv0p7xthead -mcmodel=medany -mabi=lp64d" \ + -DCMAKE_CXX_FLAGS="-mcpu=c906fdv -march=rv64imafdcv0p7xthead -mcmodel=medany -mabi=lp64d" \ + -DPY_INCLUDE_DIR="${PYDEV}/include/python3.11" \ + -DPY_LIB="${PYDEV}/lib/libpython3.11.so" \ + -DPY_EXT_SUFFIX=".cpython-311-riscv64-linux-gnu.so" \ + -DMAIXCDK_PATH="${MAIXCDK}" + +ninja + diff --git a/design_doc/solution_record.md b/design_doc/solution_record.md new file mode 100644 index 0000000..1e7629d --- /dev/null +++ b/design_doc/solution_record.md @@ -0,0 +1,13 @@ +1. OTA 下载的时候,为什么使用十六进制下载,读取 URC 事件? + 因为使用二进制下载的时候,经常会出现错误,并且会失败?然后最稳定传输的办法,是每次传输的时候,是分块,而且每次分块都要“删/建”http实例。推测原因是因为我们现在是直接传输文件的源代码,代码中含有了一些字符串可能和 AT指令重复,导致了 AT 模块在解释的时候出错。而使用 16 进制的方式,可以避免这个问题。因为十六进制直接把数据先转成了字符串,然后在设备端再把字符串转成数据,这样就不可能出现 AT的指令,从而减少了麻烦。 +2. OTA 下载的时候,为什么不用 AT 模块里 HTTPDLFILE 的指令? + 因为在测试中发现,使用 HTTPDLFILE,其实是下载到了 4G 模块内部,需要重新从模块内部转到存储卡,而且 4G 模块的存储较小,大概只有 40k,所以还需要分块来下载和转存,比较麻烦,于是最终使用了使用读取串口事件的模式。 +3. OTA 下载的时候,为什么不用 AT 模块里 HTTPREAD 的指令? + 因为之前测试发现,READ模式其实是需要多步: + 3.1. AT+MHTTPCREATE + 3.2. AT+MHTTPCFG + 3.3. AT+MHTTPREQUEST + 3.4. AT+MHTTPREAD + 它其实也是把数据下载到 4g 模块的缓存里,然后再从缓存里读取出来。所以也是比较繁琐的,还不如 HTTPDLFILE 简单。 +4. +4. \ No newline at end of file diff --git a/design_doc/todo.md b/design_doc/todo.md new file mode 100644 index 0000000..7c8a2b9 --- /dev/null +++ b/design_doc/todo.md @@ -0,0 +1,41 @@ +你现在要防的是“别人拿到设备/拿到代码包后,能伪造请求、刷接口、下发恶意 OTA、甚至劫持通信”。单靠隐藏 Python 源码只能提高门槛,真正的安全要靠协议和密钥设计。结合你仓库里实际内容,建议你重点隐藏/整改这些点(按风险排序)。 +1. 必须隐藏/必须整改(高风险) +1.1 登录口令规则太弱(几乎等于明文) +你现在的登录是 password = device_id + "."(见 network.py 读取设备 ID 后直接拼出来),这意味着只要攻击者知道/猜到 device_id,就能直接登录伪装设备。 +相关位置: +with open("/device_key", "r") as f: device_id = f.read().strip() ... self._device_id = device_id self._password = device_id + "." +1.2 HTTP 鉴权 token 的盐值是硬编码常量(泄露后可离线伪造) +你 token 是 HMAC-SHA256((SALT+device_id), SALT2),而 SALT/SALT2 是固定字符串:"shootMessageFire" / "shoot"。这类“硬编码盐值 + 可猜/可读的 device_id”意味着:攻击者只要拿到代码包/逆向 .so,就能在自己电脑上批量算 token,伪造 HTTP 请求。 +相关位置: +SALT = "shootMessageFire"SALT2 = "shoot"return "Arrow_" + hmac.new((SALT + device_id).encode(), SALT2.encode(), hashlib.sha256).hexdigest() +1.3 TLS 配置目前看起来没有做证书校验(容易被中间人攻击) +config.py 虽然 USE_TCP_SSL=True,但你在 network.py 里实际把 MSSLCFG="auth" 固定成 0(不验),且写证书分支被 if False 禁用了。这样“看起来是 TLS”,但仍可能被抓包/篡改/假服务器接入。 +相关位置: +r = hardware_manager.at_client.send(f'AT+MSSLCFG="auth",{ssl_id},0', "OK", 3000)...if False: # 写证书/校验被禁用 ...r = hardware_manager.at_client.send(f'AT+MIPCFG="ssl",{link_id},{ssl_id},1', "OK", 3000) +1.4 OTA 下发“url”如果缺少强校验,就是远程代码执行入口 +你 OTA 逻辑里从服务器指令拿到 url 就去下载并替换文件/重启(这是正常 OTA),但安全性取决于: +是否只允许白名单域名/路径 +是否强制 https 并校验服务器证书 +是否对 OTA 包做签名校验(最关键) +你这里能看到固定域名 static.shelingxingqiu.com 的特殊处理(ota_manager.py 里还在纠结 http/https),这块一定要“服务端签名 + 设备端验签”,否则隐藏源码也没用。 + +2. 建议隐藏(中风险,但很容易被人利用) +2.1 所有服务器地址/端口/API 路径(可被用于扫描、压测、撞库、协议逆向) +这些在 config.py 是明文: +SERVER_IP = "stcp.shelingxingqiu.com"SERVER_PORT = 50005HTTP_URL = "http://ws.shelingxingqiu.com"HTTP_API_PATH = "/home/shoot/device_fire/arrow/fire" +注意:即使你把它们藏进 .so,攻击者仍可能通过抓包/观察 DNS/SNI/流量拿到域名或 IP,所以“隐藏”只能降低静态分析风险,不能替代鉴权/签名。 +2.2 WiFi 凭证落盘位置 +你会把 SSID/密码写到 /boot/wifi.ssid 和 /boot/wifi.pass(network.py/wifi.py 都有)。拿到设备存储就能读到明文密码,这属于设备侧安全问题。 +2.3 日志/调试信息泄露 +你仓库里 .cursor/debug.log* 已经记录了完整的 AT+MIPOPEN ... host:port 等信息;如果这些被打进包或被用户导出,也会泄露通信细节。 +你发布包里“真正会被带走的敏感内容” +你现在 app.yaml 的打包清单只包含核心运行文件(不包含 test/、backup_code/、.cursor/ 等),这一点很好: +files: - app.yaml - at_client.py - camera_manager.py - config.py - hardware.py - laser_manager.py - logger_manager.py - main.py - network.py - ota_manager.py - power.py - shot_id_generator.py - time_sync.py - version.py - vision.py +但要注意:你当前最敏感的“盐值/口令规则/HTTP header 逻辑/OTA 策略”都在这些会打包的文件里(尤其是 network.py、ota_manager.py、config.py)。 + +3. 最有效的“防黑”措施(比隐藏更关键) +把“password=device_id+.” 改为真正的 per-device secret(设备出厂烧录,服务端保存;或服务端下发公钥,设备用私钥签名) +所有上报/控制命令加签名 + nonce/timestamp + 服务端防重放(别人抓到一次包也不能复用) +OTA 包必须做签名校验(设备端内置公钥,下载后验签通过才应用) +TLS 必须做证书校验/最好做 pinning(至少别用 auth=0) +如果你告诉我:你们服务端目前能不能改协议(例如新增签名字段、下发 challenge、做 OTA 签名),我可以按“最小改动但提升最大安全”的顺序,帮你规划一套从现状平滑升级的方案。 \ No newline at end of file diff --git a/main.py b/main.py index b7acd1e..6641693 100644 --- a/main.py +++ b/main.py @@ -252,14 +252,6 @@ def cmd_str(): try: frame = camera_manager.read_frame() - # 先检测靶心以获取距离(用于计算激光点) - # 第一次检测不使用激光点,仅用于获取距离 - result_img_temp, center_temp, radius_temp, method_temp, best_radius1_temp, ellipse_params_temp = detect_circle_v3(frame, None) - - # 计算距离 - distance_m = estimate_distance(best_radius1_temp) if best_radius1_temp else None - - # 根据距离动态计算激光点坐标(如果未检测到靶心,使用硬编码值或默认值) laser_point_method = None # 记录激光点选择方法 if config.HARDCODE_LASER_POINT: # 硬编码模式:使用硬编码值 @@ -272,15 +264,13 @@ def cmd_str(): logger_manager.logger.info(f"[算法] 使用校准值: {laser_manager.laser_point}") elif distance_m and distance_m > 0: # 动态计算模式:根据距离计算激光点 + # 先检测靶心以获取距离(用于计算激光点) + # 第一次检测不使用激光点,仅用于获取距离 + result_img_temp, center_temp, radius_temp, method_temp, best_radius1_temp, ellipse_params_temp = detect_circle_v3(frame, None) + # 计算距离 + distance_m = estimate_distance(best_radius1_temp) if best_radius1_temp else None laser_point = laser_manager.calculate_laser_point_from_distance(distance_m) laser_point_method = "dynamic" - logger_manager.logger.info(f"[算法] 使用比例尺: {laser_point}") - else: - # 未检测到靶心且未启用硬编码:使用默认激光点或从配置文件加载 - laser_point = laser_manager.laser_point - laser_point_method = "default" - logger_manager.logger.info(f"[算法] 使用默认值: {laser_point}") - if laser_point is None: logger = logger_manager.logger if logger: @@ -290,22 +280,9 @@ def cmd_str(): x, y = laser_point - # 绘制激光十字线 - color = image.Color(config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2]) - frame.draw_line( - int(x - config.LASER_LENGTH), int(y), - int(x + config.LASER_LENGTH), int(y), - color, config.LASER_THICKNESS - ) - frame.draw_line( - int(x), int(y - config.LASER_LENGTH), - int(x), int(y + config.LASER_LENGTH), - color, config.LASER_THICKNESS - ) - frame.draw_circle(int(x), int(y), 1, color, config.LASER_THICKNESS) - - # 重新检测靶心(使用计算出的激光点) + # 检测靶心 result_img, center, radius, method, best_radius1, ellipse_params = detect_circle_v3(frame, laser_point) + camera_manager.show(result_img) # 计算偏移与距离(如果检测到靶心) diff --git a/ota_manager.py b/ota_manager.py index 6e56f7f..2436983 100644 --- a/ota_manager.py +++ b/ota_manager.py @@ -210,6 +210,64 @@ class OTAManager: self.logger.error(f"[OTA] 错误:{downloaded_file} 不存在") return False + # ====== 第一步:如果是 AEAD 加密包,先解密成临时 zip(再走原有 unzip 流程) ====== + downloaded_file_original = downloaded_file + decrypted_tmp_zip = None + try: + magic = b"AROTAE1" # must match packager/C++ side + is_enc_ext = downloaded_file.lower().endswith((".enc", ".zip.enc")) + is_enc_magic = False + try: + with open(downloaded_file, "rb") as f: + head = f.read(len(magic)) + is_enc_magic = (head == magic) + except Exception: + is_enc_magic = False + + if is_enc_ext or is_enc_magic: + # Choose output zip path (same dir) + tmp_zip = downloaded_file + if tmp_zip.lower().endswith(".zip.enc"): + tmp_zip = tmp_zip[:-4] # remove ".enc" -> ".zip" + elif tmp_zip.lower().endswith(".enc"): + tmp_zip = tmp_zip[:-4] + if not tmp_zip.lower().endswith(".zip"): + tmp_zip = tmp_zip + ".zip" + else: + tmp_zip = tmp_zip + ".zip" + + decrypted_tmp_zip = tmp_zip + + # Remove stale tmp if exists + try: + if os.path.exists(decrypted_tmp_zip): + os.remove(decrypted_tmp_zip) + except Exception: + pass + + self.logger.info(f"[OTA] 检测到加密包,开始解密: {downloaded_file} -> {decrypted_tmp_zip}") + ok = False + try: + core = getattr(network_manager, "_netcore", None) + if core and hasattr(core, "decrypt_ota_file"): + ok = bool(core.decrypt_ota_file(downloaded_file, decrypted_tmp_zip)) + else: + import archery_netcore as _netcore + ok = bool(_netcore.decrypt_ota_file(downloaded_file, decrypted_tmp_zip)) + except Exception as e: + self.logger.error(f"[OTA] 解密异常: {e}") + ok = False + + if not ok or (not os.path.exists(decrypted_tmp_zip)): + self.logger.error("[OTA] 解密失败,终止更新") + return False + + downloaded_file = decrypted_tmp_zip + self.logger.info(f"[OTA] 解密成功,后续使用明文ZIP: {downloaded_file}") + except Exception as e: + self.logger.error(f"[OTA] 解密流程异常: {e}") + return False + # 备份 backup_base = config.BACKUP_BASE backup_dir = None @@ -437,17 +495,35 @@ class OTAManager: # 清理下载文件 try: - if os.path.exists(downloaded_file): - # 删除下载的文件 + # 删除下载的文件(可能包含:原始加密包 + 临时明文zip) + files_to_remove = [] + try: + if 'downloaded_file_original' in locals() and downloaded_file_original: + files_to_remove.append(downloaded_file_original) + except Exception: + pass + try: + if 'decrypted_tmp_zip' in locals() and decrypted_tmp_zip: + files_to_remove.append(decrypted_tmp_zip) + except Exception: + pass + # 兼容:如果变量不存在,至少清理当前 downloaded_file + if not files_to_remove: + files_to_remove = [downloaded_file] + + removed_any = False + for fp in list(dict.fromkeys(files_to_remove)): try: - os.remove(downloaded_file) - self.logger.info(f"[OTA] 已删除下载文件: {downloaded_file}") + if fp and os.path.exists(fp): + os.remove(fp) + removed_any = True + self.logger.info(f"[OTA] 已删除下载文件: {fp}") except Exception as e: self.logger.warning(f"[OTA] 删除下载文件失败(可忽略): {e}") # 尝试删除时间戳目录(如果为空) try: - download_dir = os.path.dirname(downloaded_file) + download_dir = os.path.dirname(files_to_remove[0] if files_to_remove else downloaded_file) if download_dir.startswith("/tmp/download/"): # 检查时间戳目录是否为空 if os.path.exists(download_dir): @@ -567,7 +643,7 @@ class OTAManager: # 根据文件扩展名判断是否为二进制文件 filename_lower = filename.lower() - is_binary = filename_lower.endswith(('.zip', '.bin', '.tar', '.gz', '.exe', '.dll', '.so', '.dylib')) + is_binary = filename_lower.endswith(('.zip', '.zip.enc', '.enc', '.bin', '.tar', '.gz', '.exe', '.dll', '.so', '.dylib')) if is_binary: # 二进制文件:使用二进制模式写入 diff --git a/package.py b/package.py index fa8c4d7..c07d3f2 100644 --- a/package.py +++ b/package.py @@ -5,11 +5,17 @@ 根据 app.yaml 中列出的文件,打包成 zip 文件 版本号从 version.py 中读取 """ +import argparse import os import yaml import zipfile from datetime import datetime import sys +import secrets + +MAGIC = b"AROTAE1" # 7 bytes: Archery OTA Encrypted v1 +GCM_NONCE_LEN = 12 +GCM_TAG_LEN = 16 # 添加当前目录到路径,以便导入 version 模块 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) @@ -91,8 +97,63 @@ def create_zip_package(app_info, files, output_dir='.', base_dir='.'): return None +def _validate_key_hex(key_hex: str) -> bytes: + if not isinstance(key_hex, str): + raise ValueError("aead key must be hex string") + key_hex = key_hex.strip().lower() + if key_hex.startswith("0x"): + key_hex = key_hex[2:] + if len(key_hex) != 64: + raise ValueError("aead key must be 64 hex chars (32 bytes)") + try: + key = bytes.fromhex(key_hex) + except Exception as e: + raise ValueError(f"invalid hex key: {e}") + if len(key) != 32: + raise ValueError("aead key must be 32 bytes") + return key + + +def encrypt_zip_aead(zip_path: str, key_hex: str, out_ext: str = ".enc") -> str: + """ + Encrypt the whole zip file as one blob: + output format: MAGIC(7) | nonce(12) | ciphertext(N) | tag(16) + using AES-256-GCM (AEAD). + """ + # Lazy import: packaging-only dependency + try: + from cryptography.hazmat.primitives.ciphers.aead import AESGCM + except Exception as e: + raise RuntimeError( + "Missing dependency: cryptography. Install with: pip install cryptography. " + f"Import error: {e}" + ) + + key = _validate_key_hex(key_hex) + with open(zip_path, "rb") as f: + plain = f.read() + + nonce = secrets.token_bytes(GCM_NONCE_LEN) + aesgcm = AESGCM(key) + ct_and_tag = aesgcm.encrypt(nonce, plain, None) # ciphertext || tag (16 bytes) + + enc_path = zip_path + out_ext if out_ext else (zip_path + ".enc") + with open(enc_path, "wb") as f: + f.write(MAGIC) + f.write(nonce) + f.write(ct_and_tag) + + return enc_path + + def main(): """主函数""" + parser = argparse.ArgumentParser(description="打包 app.yaml 文件列表到 zip,并可选进行 AES-256-GCM 加密输出 .enc") + parser.add_argument("--aead-key-hex", default=None, help="AES-256-GCM key (64 hex chars = 32 bytes). If set, output encrypted file.") + parser.add_argument("--keep-zip", action="store_true", help="Keep the plaintext zip when encryption is enabled.") + parser.add_argument("--out-ext", default=".enc", help="Encrypted output extension appended to zip path. Default: .enc (produces *.zip.enc)") + args = parser.parse_args() + print("=" * 60) print("应用打包脚本") print("=" * 60) @@ -139,6 +200,23 @@ def main(): zip_path = create_zip_package(app_info, existing_files) if zip_path: + enc_path = None + if args.aead_key_hex: + try: + enc_path = encrypt_zip_aead(zip_path, args.aead_key_hex, out_ext=args.out_ext) + enc_size = os.path.getsize(enc_path) + print(f"\n[SUCCESS] AEAD加密完成: {os.path.basename(enc_path)} ({enc_size:,} bytes)") + print(f" 文件路径: {os.path.abspath(enc_path)}") + if not args.keep_zip: + try: + os.remove(zip_path) + print(f"[INFO] 已删除明文zip: {os.path.basename(zip_path)}") + except Exception as e: + print(f"[WARNING] 删除明文zip失败(可忽略): {e}") + except Exception as e: + print(f"\n[ERROR] AEAD加密失败: {e}") + print("[ERROR] 保留明文zip用于排查。") + print("\n" + "=" * 60) print("打包成功完成!") print("=" * 60) diff --git a/shoot_manager.py b/shoot_manager.py new file mode 100644 index 0000000..d80c75b --- /dev/null +++ b/shoot_manager.py @@ -0,0 +1,208 @@ +import config +from camera_manager import camera_manager +from laser_manager import laser_manager +from logger_manager import logger_manager +from network import network_manager +from power import get_bus_voltage, voltage_to_percent +from vision import estimate_distance, detect_circle_v3, save_shot_image +from maix import camera, display, image, app, time, uart, pinmap, i2c + +def analyze_shot(frame, laser_point=None): + """ + 分析射箭结果(算法部分,可迁移到C++) + :param frame: 图像帧 + :param laser_point: 激光点坐标 (x, y) + :return: 包含分析结果的字典 + """ + logger = logger_manager.logger + + # 先检测靶心以获取距离(用于计算激光点) + result_img_temp, center_temp, radius_temp, method_temp, best_radius1_temp, ellipse_params_temp = detect_circle_v3( + frame, None) + + # 计算距离 + distance_m = estimate_distance(best_radius1_temp) if best_radius1_temp else None + + # 根据距离动态计算激光点坐标 + laser_point_method = None + if config.HARDCODE_LASER_POINT: + laser_point = laser_manager.laser_point + laser_point_method = "hardcode" + elif laser_manager.has_calibrated_point(): + laser_point = laser_manager.laser_point + laser_point_method = "calibrated" + if logger: + logger.info(f"[算法] 使用校准值: {laser_manager.laser_point}") + elif distance_m and distance_m > 0: + laser_point = laser_manager.calculate_laser_point_from_distance(distance_m) + laser_point_method = "dynamic" + if logger: + logger.info(f"[算法] 使用比例尺: {laser_point}") + else: + laser_point = laser_manager.laser_point + laser_point_method = "default" + if logger: + logger.info(f"[算法] 使用默认值: {laser_point}") + + if laser_point is None: + return { + "success": False, + "reason": "laser_point_not_initialized" + } + + x, y = laser_point + + # 绘制激光十字线 + color = image.Color(config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2]) + frame.draw_line( + int(x - config.LASER_LENGTH), int(y), + int(x + config.LASER_LENGTH), int(y), + color, config.LASER_THICKNESS + ) + frame.draw_line( + int(x), int(y - config.LASER_LENGTH), + int(x), int(y + config.LASER_LENGTH), + color, config.LASER_THICKNESS + ) + frame.draw_circle(int(x), int(y), 1, color, config.LASER_THICKNESS) + + # 重新检测靶心(使用计算出的激光点) + result_img, center, radius, method, best_radius1, ellipse_params = detect_circle_v3(frame, laser_point) + + # 计算偏移与距离 + if center and radius: + dx, dy = laser_manager.compute_laser_position(center, (x, y), radius, method) + distance_m = estimate_distance(best_radius1) + else: + dx, dy = None, None + distance_m = None + + # 返回分析结果 + return { + "success": True, + "result_img": result_img, + "center": center, + "radius": radius, + "method": method, + "best_radius1": best_radius1, + "ellipse_params": ellipse_params, + "dx": dx, + "dy": dy, + "distance_m": distance_m, + "laser_point": laser_point, + "laser_point_method": laser_point_method + } + + +def process_shot(adc_val): + """ + 处理射箭事件(逻辑控制部分) + :param adc_val: ADC触发值 + :return: None + """ + logger = logger_manager.logger + + try: + frame = camera_manager.read_frame() + + # 调用算法分析 + analysis_result = analyze_shot(frame) + + if not analysis_result.get("success"): + reason = analysis_result.get("reason", "unknown") + if logger: + logger.warning(f"[MAIN] 射箭分析失败: {reason}") + time.sleep_ms(100) + return + + # 提取分析结果 + result_img = analysis_result["result_img"] + center = analysis_result["center"] + radius = analysis_result["radius"] + method = analysis_result["method"] + ellipse_params = analysis_result["ellipse_params"] + dx = analysis_result["dx"] + dy = analysis_result["dy"] + distance_m = analysis_result["distance_m"] + laser_point = analysis_result["laser_point"] + laser_point_method = analysis_result["laser_point_method"] + x, y = laser_point + + camera_manager.show(result_img) + + if not (center and radius) and logger: + logger.warning("[MAIN] 未检测到靶心,但会保存图像") + + # 读取电量 + voltage = get_bus_voltage() + battery_percent = voltage_to_percent(voltage) + + # 生成射箭ID + from shot_id_generator import shot_id_generator + shot_id = shot_id_generator.generate_id() + + if logger: + logger.info(f"[MAIN] 射箭ID: {shot_id}") + + # 保存图像 + save_shot_image( + result_img, + center, + radius, + method, + ellipse_params, + (x, y), + distance_m, + shot_id=shot_id, + photo_dir=config.PHOTO_DIR if config.SAVE_IMAGE_ENABLED else None + ) + + # 构造上报数据 + inner_data = { + "shot_id": shot_id, + "x": float(dx) if dx is not None else 200.0, + "y": float(dy) if dy is not None else 200.0, + "r": 90.0, + "d": round((distance_m or 0.0) * 100), + "d_laser": 0.0, + "d_laser_quality": 0, + "m": method if method else "no_target", + "adc": adc_val, + "laser_method": laser_point_method, + "target_x": float(x), + "target_y": float(y), + } + + if ellipse_params: + (ell_center, (width, height), angle) = ellipse_params + inner_data["ellipse_major_axis"] = float(max(width, height)) + inner_data["ellipse_minor_axis"] = float(min(width, height)) + inner_data["ellipse_angle"] = float(angle) + inner_data["ellipse_center_x"] = float(ell_center[0]) + inner_data["ellipse_center_y"] = float(ell_center[1]) + else: + inner_data["ellipse_major_axis"] = None + inner_data["ellipse_minor_axis"] = None + inner_data["ellipse_angle"] = None + inner_data["ellipse_center_x"] = None + inner_data["ellipse_center_y"] = None + + report_data = {"cmd": 1, "data": inner_data} + network_manager.safe_enqueue(report_data, msg_type=2, high=True) + + if logger: + if center and radius: + logger.info(f"射箭事件已加入发送队列(已检测到靶心),ID: {shot_id}") + else: + logger.info(f"射箭事件已加入发送队列(未检测到靶心,已保存图像),ID: {shot_id}") + + # 闪一下激光(射箭反馈) + laser_manager.flash_laser(1000) + + time.sleep_ms(100) + except Exception as e: + if logger: + logger.error(f"[MAIN] 图像处理异常: {e}") + import traceback + logger.error(traceback.format_exc()) + time.sleep_ms(100) diff --git a/version.py b/version.py index cc60361..6550c1c 100644 --- a/version.py +++ b/version.py @@ -4,10 +4,10 @@ 应用版本号 每次 OTA 更新时,只需要更新这个文件中的版本号 """ -VERSION = '1.2.0' +VERSION = '1.2.1' # 1.2.0 开始使用C++编译成.so,替换部分代码 - +# 1.2.1 ota使用加密包 diff --git a/vision.py b/vision.py index c421539..23c76ea 100644 --- a/vision.py +++ b/vision.py @@ -608,35 +608,35 @@ def save_shot_image(result_img, center, radius, method, ellipse_params, # 转换图像为 OpenCV 格式以便绘制 img_cv = image.image2cv(result_img, False, False) - - # # 确保激光十字线被绘制(使用OpenCV在图像上绘制,确保可见性) - # laser_color = (config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2]) - # thickness = max(config.LASER_THICKNESS, 2) # 至少2像素宽,确保可见 - # length = max(config.LASER_LENGTH, 10) # 至少10像素长 - - # # 绘制激光十字线(水平线) - # cv2.line(img_cv, - # (int(x - length), int(y)), - # (int(x + length), int(y)), - # laser_color, thickness) - # # 绘制激光十字线(垂直线) - # cv2.line(img_cv, - # (int(x), int(y - length)), - # (int(x), int(y + length)), - # laser_color, thickness) - # # 绘制激光点 - # cv2.circle(img_cv, (int(x), int(y)), max(thickness, 3), laser_color, -1) - # 在 vision.py 的 save_shot_image 函数中,替换第598-614行的代码: - # 绘制激光点标注(使用空心圆圈,类似校准时的标注方式) + # 绘制激光十字线(保存图像时统一绘制,避免影响检测) laser_color = (config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2]) - thickness = 1 # 圆圈线宽 - - # 绘制外圈(半径10,空心) - cv2.circle(img_cv, (int(x), int(y)), 10, laser_color, thickness) - # 绘制中圈(半径5,空心) - cv2.circle(img_cv, (int(x), int(y)), 5, laser_color, thickness) - # 绘制中心点(半径2,实心,用于精确定位) + cross_thickness = int(max(getattr(config, "LASER_THICKNESS", 1), 1)) + cross_length = int(max(getattr(config, "LASER_LENGTH", 10), 10)) + + # 水平线 + cv2.line( + img_cv, + (int(x - cross_length), int(y)), + (int(x + cross_length), int(y)), + laser_color, + cross_thickness, + ) + # 垂直线 + cv2.line( + img_cv, + (int(x), int(y - cross_length)), + (int(x), int(y + cross_length)), + laser_color, + cross_thickness, + ) + # 小点(与原 main.py 行为一致) + cv2.circle(img_cv, (int(x), int(y)), 1, laser_color, cross_thickness) + + # 额外的激光点标注(空心圆圈,便于肉眼查看) + ring_thickness = 1 + cv2.circle(img_cv, (int(x), int(y)), 10, laser_color, ring_thickness) + cv2.circle(img_cv, (int(x), int(y)), 5, laser_color, ring_thickness) cv2.circle(img_cv, (int(x), int(y)), 2, laser_color, -1) # 如果检测到靶心,绘制靶心标注