diff --git a/config.py b/config.py index 53e96fa..59714b9 100644 --- a/config.py +++ b/config.py @@ -9,7 +9,7 @@ from version import VERSION # ==================== 应用配置 ==================== APP_VERSION = VERSION APP_DIR = "/maixapp/apps/t11" -LOCAL_FILENAME = "/maixapp/apps/t11/main_tmp.py" +LOCAL_FILENAME = APP_DIR + "/main_tmp.py" # ==================== 服务器配置 ==================== # SERVER_IP = "stcp.shelingxingqiu.com" @@ -22,7 +22,7 @@ WIFI_QUALITY_RTT_SAMPLES = 3 # 到业务服务器 TCP 建连耗时采样次数 WIFI_QUALITY_RTT_BAD_MS = 600.0 # 中位数超过此值认为延迟过高 WIFI_QUALITY_RTT_WARN_MS = 350.0 # 与 RSSI 联合:超过此值且信号弱也判为差 WIFI_QUALITY_RSSI_BAD_DBM = -80.0 # 低于此 dBm(更负更差)视为信号弱 -WIFI_QUALITY_USE_RSSI = True # 是否把 RSSI 纳入综合判定(False 则仅看 RTT) +WIFI_QUALITY_USE_RSSI = True # 是否把 RSSI 纳入综合判定 # WiFi 热点配网(手机连设备 AP,浏览器提交路由器 SSID/密码;仅 GET/POST,标准库 socket) WIFI_CONFIG_AP_FALLBACK = True # # WiFi 配网失败时,是否退回热点模式,并等待重新配网 @@ -46,14 +46,14 @@ SSL_AUTH_MODE = 1 # 1=单向认证(验证服务器),2=双向 SSL_VERIFY_MODE = 1 # 0=不验(仅测试用);1=写入并使用 CA 证书 SSL_CERT_FILENAME = "server.pem" # 模组里证书名(MSSLCERTWR / MSSLCFG="cert" 用) -SSL_CERT_PATH = "/maixapp/apps/t11/server.pem" # 设备文件系统里 CA 证书路径(你自己放进去) +SSL_CERT_PATH = APP_DIR + "/server.pem" # 设备文件系统里 CA 证书路径(你自己放进去) # MIPOPEN 末尾的参数在不同固件里含义可能不同;按你手册例子保留 MIPOPEN_TAIL = ",,0" # ==================== 文件路径配置 ==================== CONFIG_FILE = "/root/laser_config.json" -LOG_FILE = "/maixapp/apps/t11/app.log" -BACKUP_BASE = "/maixapp/apps/t11/backups" +LOG_FILE = APP_DIR + "/app.log" +BACKUP_BASE = APP_DIR + "/backups" # ==================== 硬件配置 ==================== # WiFi模块开关(True=有WiFi模块,False=无WiFi模块) diff --git a/design_doc/todo.md b/design_doc/todo.md index 0fa3c7f..32baf71 100644 --- a/design_doc/todo.md +++ b/design_doc/todo.md @@ -1,17 +1,6 @@ 你现在要防的是“别人拿到设备/拿到代码包后,能伪造请求、刷接口、下发恶意 OTA、甚至劫持通信”。单靠隐藏 Python 源码只能提高门槛,真正的安全要靠协议和密钥设计。结合你仓库里实际内容,建议你重点隐藏/整改这些点(按风险排序)。 -1. 必须隐藏/必须整改(高风险) -1.1 登录口令规则太弱(几乎等于明文) -你现在的登录是 password = device_id + "."(见 network.py 读取设备 ID 后直接拼出来),这意味着只要攻击者知道/猜到 device_id,就能直接登录伪装设备。 -相关位置: -with open("/device_key", "r") as f: device_id = f.read().strip() ... self._device_id = device_id self._password = device_id + "." -1.2 HTTP 鉴权 token 的盐值是硬编码常量(泄露后可离线伪造) -你 token 是 HMAC-SHA256((SALT+device_id), SALT2),而 SALT/SALT2 是固定字符串:"shootMessageFire" / "shoot"。这类“硬编码盐值 + 可猜/可读的 device_id”意味着:攻击者只要拿到代码包/逆向 .so,就能在自己电脑上批量算 token,伪造 HTTP 请求。 -相关位置: -SALT = "shootMessageFire"SALT2 = "shoot"return "Arrow_" + hmac.new((SALT + device_id).encode(), SALT2.encode(), hashlib.sha256).hexdigest() -1.3 TLS 配置目前看起来没有做证书校验(容易被中间人攻击) -config.py 虽然 USE_TCP_SSL=True,但你在 network.py 里实际把 MSSLCFG="auth" 固定成 0(不验),且写证书分支被 if False 禁用了。这样“看起来是 TLS”,但仍可能被抓包/篡改/假服务器接入。 -相关位置: -r = hardware_manager.at_client.send(f'AT+MSSLCFG="auth",{ssl_id},0', "OK", 3000)...if False: # 写证书/校验被禁用 ...r = hardware_manager.at_client.send(f'AT+MIPCFG="ssl",{link_id},{ssl_id},1', "OK", 3000) + + 1.4 OTA 下发“url”如果缺少强校验,就是远程代码执行入口 你 OTA 逻辑里从服务器指令拿到 url 就去下载并替换文件/重启(这是正常 OTA),但安全性取决于: 是否只允许白名单域名/路径 @@ -26,20 +15,11 @@ SERVER_IP = "stcp.shelingxingqiu.com"SERVER_PORT = 50005HTTP_URL = "http://ws.sh 注意:即使你把它们藏进 .so,攻击者仍可能通过抓包/观察 DNS/SNI/流量拿到域名或 IP,所以“隐藏”只能降低静态分析风险,不能替代鉴权/签名。 -2.3 日志/调试信息泄露 -你仓库里 .cursor/debug.log* 已经记录了完整的 AT+MIPOPEN ... host:port 等信息;如果这些被打进包或被用户导出,也会泄露通信细节。 -你发布包里“真正会被带走的敏感内容” -你现在 app.yaml 的打包清单只包含核心运行文件(不包含 test/、backup_code/、.cursor/ 等),这一点很好: -files: - app.yaml - at_client.py - camera_manager.py - config.py - hardware.py - laser_manager.py - logger_manager.py - main.py - network.py - ota_manager.py - power.py - shot_id_generator.py - time_sync.py - version.py - vision.py -但要注意:你当前最敏感的“盐值/口令规则/HTTP header 逻辑/OTA 策略”都在这些会打包的文件里(尤其是 network.py、ota_manager.py、config.py)。 3. 最有效的“防黑”措施(比隐藏更关键) -把“password=device_id+.” 改为真正的 per-device secret(设备出厂烧录,服务端保存;或服务端下发公钥,设备用私钥签名) + 所有上报/控制命令加签名 + nonce/timestamp + 服务端防重放(别人抓到一次包也不能复用) OTA 包必须做签名校验(设备端内置公钥,下载后验签通过才应用) 如果你告诉我:你们服务端目前能不能改协议(例如新增签名字段、下发 challenge、做 OTA 签名),我可以按“最小改动但提升最大安全”的顺序,帮你规划一套从现状平滑升级的方案。 - - -https://wiki.sipeed.com/maixpy/doc/zh/pro/compile_os.html \ No newline at end of file diff --git a/network.py b/network.py index af4bee9..fca076a 100644 --- a/network.py +++ b/network.py @@ -1358,6 +1358,237 @@ class NetworkManager: self.logger.error(f"[LOG_UPLOAD] 上传异常: {e}") self.safe_enqueue({"result": "log_upload_failed", "reason": str(e)[:100]}, 2) + def _prepare_log_archive(self, include_rotated=True, max_files=None, archive_format="tgz"): + """准备日志归档压缩包,返回 (archive_path, archive_filename) 或 (None, error_msg) + + Args: + include_rotated: 是否包含轮转日志 + max_files: 最多打包多少个日志文件 + archive_format: tgz 或 zip + """ + import shutil + from datetime import datetime + import glob + + try: + log_file_path = config.LOG_FILE + if not os.path.exists(log_file_path): + return None, "log_file_not_found" + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + device_id = self._device_id or "unknown" + base_name = f"logs_{timestamp}_{device_id}" + archive_format = (archive_format or "tgz").strip().lower() + if archive_format not in ("tgz", "zip"): + archive_format = "tgz" + + candidates = [log_file_path] + if include_rotated: + candidates = sorted(set(glob.glob(log_file_path + "*"))) + candidates = [p for p in candidates if os.path.isfile(p)] + + def _log_sort_key(p): + if p == log_file_path: + return (0, 0, p) + suffix = p[len(log_file_path):] + if suffix.startswith("."): + try: + return (1, int(suffix[1:]), p) + except: + return (2, 999999, p) + return (3, 999999, p) + + candidates.sort(key=_log_sort_key) + + if max_files is None: + try: + max_files = 1 + int(getattr(config, "LOG_BACKUP_COUNT", 5)) + except: + max_files = 6 + try: + max_files = int(max_files) + except: + max_files = 6 + max_files = max(1, min(max_files, 20)) + selected = candidates[:max_files] + + if not selected: + return None, "no_log_files" + + os.system("sync") + temp_dir = "/tmp" + staging_dir = os.path.join(temp_dir, f"log_upload_{base_name}") + os.makedirs(staging_dir, exist_ok=True) + staged_paths = [] + try: + for p in selected: + dst = os.path.join(staging_dir, os.path.basename(p)) + shutil.copy2(p, dst) + staged_paths.append(dst) + except Exception as e: + try: + shutil.rmtree(staging_dir) + except: + pass + return None, f"snapshot_failed: {e}" + + if archive_format == "zip": + archive_filename = f"{base_name}.zip" + else: + archive_filename = f"{base_name}.tar.gz" + archive_path = os.path.join(temp_dir, archive_filename) + + try: + if archive_format == "zip": + import zipfile + with zipfile.ZipFile(archive_path, "w", compression=zipfile.ZIP_DEFLATED) as zf: + for p in staged_paths: + zf.write(p, arcname=os.path.basename(p)) + else: + import tarfile + with tarfile.open(archive_path, "w:gz") as tf: + for p in staged_paths: + tf.add(p, arcname=os.path.basename(p)) + except Exception as e: + try: + shutil.rmtree(staging_dir) + except: + pass + try: + if os.path.exists(archive_path): + os.remove(archive_path) + except: + pass + return None, f"archive_failed: {e}" + finally: + try: + shutil.rmtree(staging_dir) + except: + pass + + return archive_path, archive_filename + except Exception as e: + return None, f"prepare_exception: {e}" + + def _upload_log_file_v2(self, upload_url, upload_token, key, outlink="", include_rotated=True, max_files=None, archive_format="tgz"): + """上传日志到 Qiniu(支持 WiFi 和 4G 双路径) + + 流程:准备日志归档 -> 自动检测网络 -> WiFi(requests) 或 4G(AT命令) 上传 + """ + import shutil + + # 1) 准备日志归档 + archive_path, info = self._prepare_log_archive(include_rotated, max_files, archive_format) + if archive_path is None: + self.logger.error(f"[LOG_UPLOAD] 准备归档失败: {info}") + self.safe_enqueue({"result": "log_upload_failed", "reason": info}, 2) + return + + archive_filename = info + # key 是服务器下发的目录前缀,最终 key = prefix/filename + qiniu_key = key.rstrip("/") + "/" + archive_filename + self.logger.info(f"[LOG_UPLOAD] 日志归档已生成: {archive_path}, qiniu_key: {qiniu_key}") + + try: + # 2) WiFi 优先:只要 WiFi 已连接就先尝试 WiFi,失败再回落到 4G + wifi_tried = False + wifi_ok = False + + if self.is_wifi_connected(): + wifi_tried = True + self.logger.info(f"[LOG_UPLOAD] Using wifi path (preferred), archive: {archive_path}") + try: + # ---- WiFi path: 使用 requests 库上传 ---- + import requests + import urllib3 + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + with open(archive_path, 'rb') as f: + files = {'file': (archive_filename, f, 'application/octet-stream')} + data = {'token': upload_token, 'key': qiniu_key} + wifi_upload_url = upload_url.replace('https://', 'http://', 1) + self.logger.info(f"[LOG_UPLOAD] WiFi upload URL: {wifi_upload_url}") + response = requests.post(wifi_upload_url, files=files, data=data, timeout=120, verify=False) + response.raise_for_status() + result_json = response.json() + uploaded_key = result_json.get('key', qiniu_key) + + self.logger.info(f"[LOG_UPLOAD] WiFi upload ok: key={uploaded_key}") + + access_url = None + if outlink: + access_url = f"https://{outlink}/{uploaded_key}" + + response_data = { + "result": "log_upload_ok", + "key": uploaded_key, + "via": "wifi", + } + if access_url: + response_data["url"] = access_url + + self.safe_enqueue(response_data, 2) + wifi_ok = True + except Exception as e: + # WiFi 上传失败不影响主链路:记录原因并回落 4G + self.logger.warning(f"[LOG_UPLOAD] WiFi upload failed, fallback to 4g: {e}") + + if not wifi_ok: + if not wifi_tried: + self.logger.info(f"[LOG_UPLOAD] WiFi not connected, using 4g path, archive: {archive_path}") + else: + self.logger.info(f"[LOG_UPLOAD] Using 4g fallback path, archive: {archive_path}") + + # ---- 4G path: 使用 FourGUploadManager AT命令上传 ---- + import importlib.util + spec = importlib.util.spec_from_file_location( + "four_g_upload_manager", + os.path.join(os.path.dirname(__file__), "4g_upload_manager.py") + ) + upload_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(upload_module) + FourGUploadManager = upload_module.FourGUploadManager + + uploader = FourGUploadManager(hardware_manager.at_client) + result = uploader.upload_file(archive_path, upload_url, upload_token, qiniu_key) + + if result.get("success"): + uploaded_key = result.get("key", qiniu_key) + self.logger.info(f"[LOG_UPLOAD] 4G upload ok: key={uploaded_key}") + + access_url = None + if outlink: + access_url = f"https://{outlink}/{uploaded_key}" + + response_data = { + "result": "log_upload_ok", + "key": uploaded_key, + "via": "4g", + } + if access_url: + response_data["url"] = access_url + + self.safe_enqueue(response_data, 2) + else: + error_msg = result.get("error", "unknown_error") + self.logger.error(f"[LOG_UPLOAD] 4G upload failed: {error_msg}") + self.safe_enqueue({ + "result": "log_upload_failed", + "reason": error_msg[:100] + }, 2) + + except Exception as e: + self.logger.error(f"[LOG_UPLOAD] upload exception: {e}") + self.safe_enqueue({"result": "log_upload_failed", "reason": str(e)[:100]}, 2) + finally: + # 清理临时归档文件 + try: + if archive_path and os.path.exists(archive_path): + os.remove(archive_path) + self.logger.debug(f"[LOG_UPLOAD] 临时归档已删除: {archive_path}") + except Exception as e: + self.logger.warning(f"[LOG_UPLOAD] 删除临时归档失败: {e}") + def _upload_image_file(self, image_path, upload_url, upload_token, key, shoot_id, outlink): """上传图片文件到指定URL(自动检测网络类型,WiFi使用requests,4G使用AT HTTP命令) @@ -1369,51 +1600,57 @@ class NetworkManager: shoot_id: 射击ID outlink: 外链域名(可选,用于构建访问URL) """ - # 自动检测网络类型,选择上传路径 - if self._network_type == "wifi" and self.is_wifi_connected(): - mode = "wifi" - else: - mode = "4g" + # WiFi 优先(独立于 TCP 主链路):只要 WiFi 已连接就先走 WiFi,失败再回落 4G + mode = "wifi" if self.is_wifi_connected() else "4g" self.logger.info(f"[IMAGE_UPLOAD] Using {mode} path, image: {image_path}") try: + wifi_ok = False + if mode == "wifi": - # ---- WiFi path: 使用 requests 库上传 ---- - import requests - import urllib3 - urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + try: + # ---- WiFi path: 使用 requests 库上传 ---- + import requests + import urllib3 + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - with open(image_path, 'rb') as f: - files = {'file': (os.path.basename(image_path), f, 'application/octet-stream')} - data = {'token': upload_token, 'key': key} - # 测试:将HTTPS转为HTTP - wifi_upload_url = upload_url.replace('https://', 'http://', 1) - self.logger.info(f"[IMAGE_UPLOAD] WiFi upload URL: {wifi_upload_url}") - response = requests.post(wifi_upload_url, files=files, data=data, timeout=120, verify=False) - response.raise_for_status() - result_json = response.json() - uploaded_key = result_json.get('key', key) + with open(image_path, 'rb') as f: + files = {'file': (os.path.basename(image_path), f, 'application/octet-stream')} + data = {'token': upload_token, 'key': key} + # 将 HTTPS 转为 HTTP(设备端 SSL 兼容性) + wifi_upload_url = upload_url.replace('https://', 'http://', 1) + self.logger.info(f"[IMAGE_UPLOAD] WiFi upload URL: {wifi_upload_url}") + response = requests.post(wifi_upload_url, files=files, data=data, timeout=120, verify=False) + response.raise_for_status() + result_json = response.json() + uploaded_key = result_json.get('key', key) - self.logger.info(f"[IMAGE_UPLOAD] WiFi upload ok: key={uploaded_key}") + self.logger.info(f"[IMAGE_UPLOAD] WiFi upload ok: key={uploaded_key}") - access_url = None - if outlink: - access_url = f"https://{outlink}/{uploaded_key}" + access_url = None + if outlink: + access_url = f"https://{outlink}/{uploaded_key}" - response_data = { - "result": "image_upload_ok", - "shootId": shoot_id, - "key": uploaded_key, - "via": "wifi", - } - if access_url: - response_data["url"] = access_url + response_data = { + "result": "image_upload_ok", + "shootId": shoot_id, + "key": uploaded_key, + "via": "wifi", + } + if access_url: + response_data["url"] = access_url - self.safe_enqueue(response_data, 2) + self.safe_enqueue(response_data, 2) + wifi_ok = True + except Exception as e: + self.logger.warning(f"[IMAGE_UPLOAD] WiFi upload failed, fallback to 4g: {e}") - else: + if not wifi_ok: # ---- 4G path: 使用 FourGUploadManager AT命令上传 ---- + if mode != "4g": + self.logger.info(f"[IMAGE_UPLOAD] Using 4g fallback path, image: {image_path}") + import importlib.util spec = importlib.util.spec_from_file_location( "four_g_upload_manager", @@ -1702,6 +1939,35 @@ class NetworkManager: ) # 立即返回已入队确认 self.safe_enqueue({"result": "image_upload_queued", "shootId": shoot_id}, 2) + elif logged_in and msg_type == 101: + self.logger.info(f"[LOG_UPLOAD] 收到日志上传命令 {body}") + if isinstance(body, dict): + + upload_url = body.get("uploadUrl") + upload_token = body.get("token") + key = body.get("key") + outlink = body.get("outlink", "") + include_rotated = body.get("includeRotated", True) + max_files = body.get("maxFiles") + archive_format = body.get("archive", "tgz") + + hardware_manager.start_idle_timer() # 重新计时 + + # 验证必需字段 + if not upload_url or not upload_token or not key: + self.logger.error("[LOG_UPLOAD] 缺少必需参数: uploadUrl, token 或 key") + self.safe_enqueue({"result": "log_upload_failed", "reason": "missing_params"}, 2) + else: + self.logger.info(f"[LOG_UPLOAD] 收到日志上传命令,key: {key}") + # 在新线程中执行上传,避免阻塞主循环 + import _thread + _thread.start_new_thread( + self._upload_log_file_v2, + (upload_url, upload_token, key, outlink, include_rotated, max_files, archive_format) + ) + # 立即返回已入队确认 + self.safe_enqueue({"result": "log_upload_queued"}, 2) + # 处理业务指令 elif logged_in and isinstance(body, dict): inner_cmd = None @@ -1795,29 +2061,6 @@ class NetworkManager: mccid = self.get_4g_mccid() self.logger.info(f"4G MCCID: {mccid}") self.safe_enqueue({"result": "mccid", "mccid": mccid if mccid is not None else ""}, 2) - # elif inner_cmd == 7: - # from ota_manager import ota_manager - # if ota_manager.update_thread_started: - # self.safe_enqueue({"result": "update_already_started"}, 2) - # continue - - # try: - # ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() - # except: - # ip = None - - # if not ip: - # self.safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, 2) - # else: - # # 注意:direct_ota_download 需要 ota_url 参数 - # # 如果 ota_manager.ota_url 为 None,需要从其他地方获取 - # ota_url_to_use = ota_manager.ota_url - # if not ota_url_to_use: - # self.logger.error("[OTA] cmd=7 但 OTA_URL 未设置") - # self.safe_enqueue({"result": "ota_failed", "reason": "ota_url_not_set"}, 2) - # else: - # ota_manager._start_update_thread() - # _thread.start_new_thread(ota_manager.direct_ota_download, (ota_url_to_use,)) elif inner_cmd == 41: self.logger.info(f"[TEST] 收到TCP射箭触发命令, {time.time()}") self._manual_trigger_flag = True @@ -1953,10 +2196,10 @@ class NetworkManager: except Exception as e: self.logger.error(f"[OTA] 检查 pending 文件时出错: {e}") - # 心跳超时重连 - if logged_in and current_time - last_heartbeat_ack_time > 1000*60*10: - self.logger.error("十分钟无心跳ACK,重连") - break + # 服务器不再发送心跳ACK + # if logged_in and current_time - last_heartbeat_ack_time > 1000*60*10: + # self.logger.error("十分钟无心跳ACK,重连") + # break self._send_event.wait(timeout=0.05) # 0.05秒 = 50ms self._send_event.clear() diff --git a/vision.py b/vision.py index f97a34b..fdaf2fb 100644 --- a/vision.py +++ b/vision.py @@ -217,7 +217,7 @@ def check_image_sharpness(frame, threshold=100.0, save_debug_images=False): # 保存原始图像 img_orig = image.cv2image(img_cv, False, False) - orig_filename = f"{debug_dir}/sharpness_debug_orig_{img_count:04d}.bmp" + orig_filename = f"{debug_dir}/sharpness_debug_orig_{img_count:04d}.jpg" img_orig.save(orig_filename) # # 保存边缘检测结果(可视化) @@ -294,7 +294,7 @@ def save_calibration_image(frame, laser_pos, photo_dir=None): img_count = 0 x, y = laser_pos - filename = f"{photo_dir}/calibration_{int(x)}_{int(y)}_{img_count:04d}.bmp" + filename = f"{photo_dir}/calibration_{int(x)}_{int(y)}_{img_count:04d}.jpg" logger = logger_manager.logger if logger: @@ -722,9 +722,9 @@ def _save_shot_image_impl(img_cv, center, radius, method, ellipse_params, # 这里改为:只要 method 有值,就按 method 命名;否则才回退 no_target method_str = (method or "").strip() if method_str: - filename = f"{photo_dir}/shot_{shot_id}_{method_str}.bmp" + filename = f"{photo_dir}/shot_{shot_id}_{method_str}.jpg" else: - filename = f"{photo_dir}/shot_{shot_id}_no_target.bmp" + filename = f"{photo_dir}/shot_{shot_id}_no_target.jpg" else: try: all_images = [f for f in os.listdir(photo_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))] @@ -737,7 +737,7 @@ def _save_shot_image_impl(img_cv, center, radius, method, ellipse_params, else: method_str = method or "unknown" distance_str = str(round((distance_m or 0.0) * 100)) - filename = f"{photo_dir}/{method_str}_{int(x)}_{int(y)}_{distance_str}_{img_count:04d}.bmp" + filename = f"{photo_dir}/{method_str}_{int(x)}_{int(y)}_{distance_str}_{img_count:04d}.jpg" logger = logger_manager.logger if logger: diff --git a/vision.py.bak b/vision.py.bak deleted file mode 100644 index 7115900..0000000 --- a/vision.py.bak +++ /dev/null @@ -1,784 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -视觉检测模块 -提供靶心检测、距离估算、图像保存等功能 -""" -import cv2 -import numpy as np -import os -import math -import threading -import queue -from maix import image -import config -from logger_manager import logger_manager - -# 导入ArUco检测器(如果启用) -if config.USE_ARUCO: - from aruco_detector import detect_target_with_aruco, aruco_detector - -# 存图队列 + worker -_save_queue = queue.Queue(maxsize=16) -_save_worker_started = False -_save_worker_lock = threading.Lock() - -def check_laser_point_sharpness(frame, laser_point=None, roi_size=30, threshold=100.0, ellipse_params=None): - """ - 检测激光点本身的清晰度(不是整个靶子) - - Args: - frame: 图像帧对象 - laser_point: 激光点坐标 (x, y),如果为None则自动查找 - roi_size: ROI区域大小(像素),默认30x30 - threshold: 清晰度阈值 - ellipse_params: 椭圆参数 ((center_x, center_y), (width, height), angle),用于限制激光点必须在椭圆内 - - Returns: - (is_sharp, sharpness_score, laser_pos): (是否清晰, 清晰度分数, 激光点坐标) - """ - try: - # 1. 如果没有提供激光点,先查找 - if laser_point is None: - from laser_manager import laser_manager - laser_point = laser_manager.find_red_laser(frame, ellipse_params=ellipse_params) - if laser_point is None: - logger_manager.logger.debug(f"未找到激光点") - return False, 0.0, None - - x, y = laser_point - - # 2. 转换为 OpenCV 格式 - img_cv = image.image2cv(frame, False, False) - h, w = img_cv.shape[:2] - - # 3. 提取 ROI 区域(激光点周围) - roi_half = roi_size // 2 - x_min = max(0, int(x) - roi_half) - x_max = min(w, int(x) + roi_half) - y_min = max(0, int(y) - roi_half) - y_max = min(h, int(y) + roi_half) - - roi = img_cv[y_min:y_max, x_min:x_max] - - if roi.size == 0: - return False, 0.0, laser_point - - # 4. 转换为灰度图(用于清晰度检测) - gray_roi = cv2.cvtColor(roi, cv2.COLOR_RGB2GRAY) - - # 5. 方法1:检测点的扩散程度(能量集中度) - # 计算中心区域的能量集中度 - center_x, center_y = roi.shape[1] // 2, roi.shape[0] // 2 - center_radius = min(5, roi.shape[0] // 4) # 中心区域半径 - - # 创建中心区域的掩码 - y_coords, x_coords = np.ogrid[:roi.shape[0], :roi.shape[1]] - center_mask = (x_coords - center_x)**2 + (y_coords - center_y)**2 <= center_radius**2 - - # 计算中心区域和周围区域的亮度 - center_brightness = gray_roi[center_mask].mean() - outer_mask = ~center_mask - outer_brightness = gray_roi[outer_mask].mean() if np.any(outer_mask) else 0 - - # 对比度(清晰的点对比度高) - contrast = abs(center_brightness - outer_brightness) - - # 6. 方法2:检测点的边缘锐度(使用拉普拉斯) - laplacian = cv2.Laplacian(gray_roi, cv2.CV_64F) - edge_sharpness = abs(laplacian).var() - - # 7. 方法3:检测点的能量集中度(方差) - # 清晰的点:能量集中在中心,方差小 - # 模糊的点:能量分散,方差大 - # 但我们需要的是:清晰的点中心亮度高,周围低,所以梯度大 - sobel_x = cv2.Sobel(gray_roi, cv2.CV_64F, 1, 0, ksize=3) - sobel_y = cv2.Sobel(gray_roi, cv2.CV_64F, 0, 1, ksize=3) - gradient = np.sqrt(sobel_x**2 + sobel_y**2) - gradient_sharpness = gradient.var() - - # 8. 组合多个指标 - # 对比度权重0.3,边缘锐度权重0.4,梯度权重0.3 - sharpness_score = (contrast * 0.3 + edge_sharpness * 0.4 + gradient_sharpness * 0.3) - - is_sharp = sharpness_score >= threshold - - logger = logger_manager.logger - if logger: - logger.debug(f"[VISION] 激光点清晰度: 位置=({x}, {y}), 对比度={contrast:.2f}, 边缘={edge_sharpness:.2f}, 梯度={gradient_sharpness:.2f}, 综合={sharpness_score:.2f}, 是否清晰={is_sharp}") - - return is_sharp, sharpness_score, laser_point - - except Exception as e: - logger = logger_manager.logger - if logger: - logger.error(f"[VISION] 激光点清晰度检测失败: {e}") - import traceback - logger.error(traceback.format_exc()) - return False, 0.0, laser_point - -def check_image_sharpness(frame, threshold=100.0, save_debug_images=False): - """ - 检查图像清晰度(针对圆形靶子优化,基于圆形边缘检测) - 检测靶心的圆形边缘,计算边缘区域的梯度清晰度 - - Args: - frame: 图像帧对象 - threshold: 清晰度阈值,低于此值认为图像模糊(默认100.0) - 可以根据实际情况调整: - - 清晰图像通常 > 200 - - 模糊图像通常 < 100 - - 中等清晰度 100-200 - save_debug_images: 是否保存调试图像(原始图和边缘图),默认False - - Returns: - (is_sharp, sharpness_score): (是否清晰, 清晰度分数) - """ - try: - logger_manager.logger.debug(f"begin") - # 转换为 OpenCV 格式 - img_cv = image.image2cv(frame, False, False) - logger_manager.logger.debug(f"after image2cv") - - # 转换为 HSV 颜色空间 - hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV) - h, s, v = cv2.split(hsv) - logger_manager.logger.debug(f"after HSV conversion") - - # 检测黄色区域(靶心) - # 调整饱和度策略:稍微增强,不要过度 - s_enhanced = np.clip(s * 1.1, 0, 255).astype(np.uint8) - hsv_enhanced = cv2.merge((h, s_enhanced, v)) - - # HSV 阈值范围(与 detect_circle_v3 保持一致) - lower_yellow = np.array([7, 80, 0]) - upper_yellow = np.array([32, 255, 255]) - mask_yellow = cv2.inRange(hsv_enhanced, lower_yellow, upper_yellow) - - # 形态学操作,填充小孔洞 - kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) - mask_yellow = cv2.morphologyEx(mask_yellow, cv2.MORPH_CLOSE, kernel) - logger_manager.logger.debug(f"after yellow mask detection") - - # 计算边缘区域:扩展黄色区域,然后减去原始区域,得到边缘区域 - mask_dilated = cv2.dilate(mask_yellow, kernel, iterations=2) - mask_edge = cv2.subtract(mask_dilated, mask_yellow) # 边缘区域 - - # 计算边缘区域的像素数量 - edge_pixel_count = np.sum(mask_edge > 0) - logger_manager.logger.debug(f"edge pixel count: {edge_pixel_count}") - - # 如果检测不到边缘区域,使用全局梯度作为后备方案 - if edge_pixel_count < 100: - logger_manager.logger.debug(f"edge region too small, using global gradient") - # 使用 V 通道计算全局梯度 - sobel_v_x = cv2.Sobel(v, cv2.CV_64F, 1, 0, ksize=3) - sobel_v_y = cv2.Sobel(v, cv2.CV_64F, 0, 1, ksize=3) - gradient = np.sqrt(sobel_v_x**2 + sobel_v_y**2) - sharpness_score = gradient.var() - logger_manager.logger.debug(f"global gradient variance: {sharpness_score:.2f}") - else: - # 在边缘区域计算梯度清晰度 - # 使用 V(亮度)通道计算梯度,因为边缘在亮度上通常很明显 - sobel_v_x = cv2.Sobel(v, cv2.CV_64F, 1, 0, ksize=3) - sobel_v_y = cv2.Sobel(v, cv2.CV_64F, 0, 1, ksize=3) - gradient = np.sqrt(sobel_v_x**2 + sobel_v_y**2) - - # 只在边缘区域计算清晰度 - edge_gradient = gradient[mask_edge > 0] - - if len(edge_gradient) > 0: - # 计算边缘梯度的方差(清晰图像的边缘梯度变化大) - sharpness_score = edge_gradient.var() - # 也可以使用均值作为补充指标(清晰图像的边缘梯度均值也较大) - gradient_mean = edge_gradient.mean() - logger_manager.logger.debug(f"edge gradient: mean={gradient_mean:.2f}, var={sharpness_score:.2f}, pixels={len(edge_gradient)}") - else: - # 如果边缘区域没有有效梯度,使用全局梯度 - sharpness_score = gradient.var() - logger_manager.logger.debug(f"no edge gradient, using global: {sharpness_score:.2f}") - - # 保存调试图像(如果启用) - if save_debug_images: - try: - debug_dir = config.PHOTO_DIR - if debug_dir not in os.listdir("/root"): - try: - os.mkdir(debug_dir) - except: - pass - - # 生成文件名 - try: - all_images = [f for f in os.listdir(debug_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))] - img_count = len(all_images) - except: - img_count = 0 - - # 保存原始图像 - img_orig = image.cv2image(img_cv, False, False) - orig_filename = f"{debug_dir}/sharpness_debug_orig_{img_count:04d}.bmp" - img_orig.save(orig_filename) - - # # 保存边缘检测结果(可视化) - # # 创建可视化图像:原始图像 + 黄色区域 + 边缘区域 - # debug_img = img_cv.copy() - # # 在黄色区域绘制绿色 - # debug_img[mask_yellow > 0] = [0, 255, 0] # RGB格式,绿色 - # # 在边缘区域绘制红色 - # debug_img[mask_edge > 0] = [255, 0, 0] # RGB格式,红色 - - # debug_img_maix = image.cv2image(debug_img, False, False) - # debug_filename = f"{debug_dir}/sharpness_debug_edge_{img_count:04d}.bmp" - # debug_img_maix.save(debug_filename) - - # logger = logger_manager.logger - # if logger: - # logger.info(f"[VISION] 保存调试图像: {orig_filename}, {debug_filename}") - except Exception as e: - logger = logger_manager.logger - if logger: - logger.warning(f"[VISION] 保存调试图像失败: {e}") - import traceback - logger.error(traceback.format_exc()) - - is_sharp = sharpness_score >= threshold - - logger = logger_manager.logger - if logger: - logger.debug(f"[VISION] 清晰度检测: 分数={sharpness_score:.2f}, 边缘像素数={edge_pixel_count}, 是否清晰={is_sharp}, 阈值={threshold}") - - return is_sharp, sharpness_score - except Exception as e: - logger = logger_manager.logger - if logger: - logger.error(f"[VISION] 清晰度检测失败: {e}") - import traceback - logger.error(traceback.format_exc()) - # 出错时返回 False,避免使用模糊图像 - return False, 0.0 - -def save_calibration_image(frame, laser_pos, photo_dir=None): - """ - 保存激光校准图像(带标注) - 在找到的激光点位置绘制圆圈,便于检查算法是否正确 - - Args: - frame: 原始图像帧 - laser_pos: 找到的激光点坐标 (x, y) - photo_dir: 照片存储目录,如果为None则使用 config.PHOTO_DIR - - Returns: - str: 保存的文件路径,如果保存失败则返回 None - """ - # 检查是否启用图像保存 - if not config.SAVE_IMAGE_ENABLED: - return None - - if photo_dir is None: - photo_dir = config.PHOTO_DIR - - try: - # 确保照片目录存在 - try: - if photo_dir not in os.listdir("/root"): - os.mkdir(photo_dir) - except: - pass - - # 生成文件名 - try: - all_images = [f for f in os.listdir(photo_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))] - img_count = len(all_images) - except: - img_count = 0 - - x, y = laser_pos - filename = f"{photo_dir}/calibration_{int(x)}_{int(y)}_{img_count:04d}.bmp" - - logger = logger_manager.logger - if logger: - logger.info(f"保存校准图像: {filename}, 激光点: ({x}, {y})") - - # 转换图像为 OpenCV 格式以便绘制 - img_cv = image.image2cv(frame, False, False) - - # 绘制激光点圆圈(用绿色圆圈标出找到的激光点) - cv2.circle(img_cv, (int(x), int(y)), 10, (0, 255, 0), 2) # 外圈:绿色,半径10 - cv2.circle(img_cv, (int(x), int(y)), 5, (0, 255, 0), 2) # 中圈:绿色,半径5 - cv2.circle(img_cv, (int(x), int(y)), 2, (0, 255, 0), -1) # 中心点:绿色实心 - - # 可选:绘制十字线帮助定位 - cv2.line(img_cv, - (int(x - 20), int(y)), - (int(x + 20), int(y)), - (0, 255, 0), 1) # 水平线 - cv2.line(img_cv, - (int(x), int(y - 20)), - (int(x), int(y + 20)), - (0, 255, 0), 1) # 垂直线 - - # 转换回 MaixPy 图像格式并保存 - result_img = image.cv2image(img_cv, False, False) - result_img.save(filename) - - if logger: - logger.debug(f"校准图像已保存: {filename}") - - return filename - except Exception as e: - logger = logger_manager.logger - if logger: - logger.error(f"保存校准图像失败: {e}") - import traceback - logger.error(traceback.format_exc()) - return None - -def detect_circle_v3(frame, laser_point=None): - """检测图像中的靶心(优先清晰轮廓,其次黄色区域)- 返回椭圆参数版本 - 增加红色圆圈检测,验证黄色圆圈是否为真正的靶心 - 如果提供 laser_point,会选择最接近激光点的目标 - - Args: - frame: 图像帧 - laser_point: 激光点坐标 (x, y),用于多目标场景下的目标选择 - - Returns: - (result_img, best_center, best_radius, method, best_radius1, ellipse_params) - """ - img_cv = image.image2cv(frame, False, False) - - best_center = best_radius = best_radius1 = method = None - ellipse_params = None - - # HSV 黄色掩码检测(模糊靶心) - hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV) - h, s, v = cv2.split(hsv) - - # 调整饱和度策略:稍微增强,不要过度 - s = np.clip(s * 1.1, 0, 255).astype(np.uint8) - - hsv = cv2.merge((h, s, v)) - - # 放宽 HSV 阈值范围(针对模糊图像的关键调整) - lower_yellow = np.array([7, 80, 0]) # 饱和度下限降低,捕捉淡黄色 - upper_yellow = np.array([32, 255, 255]) # 亮度上限拉满 - - mask_yellow = cv2.inRange(hsv, lower_yellow, upper_yellow) - - # 调整形态学操作 - kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) - mask_yellow = cv2.morphologyEx(mask_yellow, cv2.MORPH_CLOSE, kernel) - - contours_yellow, _ = cv2.findContours(mask_yellow, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) - - # 存储所有有效的黄色-红色组合 - valid_targets = [] - - if contours_yellow: - for cnt_yellow in contours_yellow: - area = cv2.contourArea(cnt_yellow) - perimeter = cv2.arcLength(cnt_yellow, True) - - # 计算圆度 - if perimeter > 0: - circularity = (4 * np.pi * area) / (perimeter * perimeter) - else: - circularity = 0 - - logger = logger_manager.logger - if area > 50 and circularity > 0.7: - if logger: - logger.info(f"[target] -> 面积:{area}, 圆度:{circularity:.2f}") - # 尝试拟合椭圆 - yellow_center = None - yellow_radius = None - yellow_ellipse = None - - if len(cnt_yellow) >= 5: - (x, y), (width, height), angle = cv2.fitEllipse(cnt_yellow) - yellow_ellipse = ((x, y), (width, height), angle) - axes_minor = min(width, height) - radius = axes_minor / 2 - yellow_center = (int(x), int(y)) - yellow_radius = int(radius) - else: - (x, y), radius = cv2.minEnclosingCircle(cnt_yellow) - yellow_center = (int(x), int(y)) - yellow_radius = int(radius) - yellow_ellipse = None - - # 如果检测到黄色圆圈,再检测红色圆圈进行验证 - if yellow_center and yellow_radius: - # HSV 红色掩码检测(红色在HSV中跨越0度,需要两个范围) - # 红色范围1: 0-10度(接近0度的红色) - lower_red1 = np.array([0, 80, 0]) - upper_red1 = np.array([10, 255, 255]) - mask_red1 = cv2.inRange(hsv, lower_red1, upper_red1) - - # 红色范围2: 170-180度(接近180度的红色) - lower_red2 = np.array([170, 80, 0]) - upper_red2 = np.array([180, 255, 255]) - mask_red2 = cv2.inRange(hsv, lower_red2, upper_red2) - - # 合并两个红色掩码 - mask_red = cv2.bitwise_or(mask_red1, mask_red2) - - # 形态学操作 - kernel_red = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) - mask_red = cv2.morphologyEx(mask_red, cv2.MORPH_CLOSE, kernel_red) - - contours_red, _ = cv2.findContours(mask_red, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) - - found_valid_red = False - - if contours_red: - # 找到所有符合条件的红色圆圈 - for cnt_red in contours_red: - area_red = cv2.contourArea(cnt_red) - perimeter_red = cv2.arcLength(cnt_red, True) - - if perimeter_red > 0: - circularity_red = (4 * np.pi * area_red) / (perimeter_red * perimeter_red) - else: - circularity_red = 0 - - # 红色圆圈也应该有一定的圆度 - if area_red > 50 and circularity_red > 0.6: - # 计算红色圆圈的中心和半径 - if len(cnt_red) >= 5: - (x_red, y_red), (w_red, h_red), angle_red = cv2.fitEllipse(cnt_red) - radius_red = min(w_red, h_red) / 2 - red_center = (int(x_red), int(y_red)) - red_radius = int(radius_red) - else: - (x_red, y_red), radius_red = cv2.minEnclosingCircle(cnt_red) - red_center = (int(x_red), int(y_red)) - red_radius = int(radius_red) - - # 计算黄色和红色圆心的距离 - if red_center: - dx = yellow_center[0] - red_center[0] - dy = yellow_center[1] - red_center[1] - distance = np.sqrt(dx*dx + dy*dy) - - # 圆心距离阈值:应该小于黄色半径的某个倍数(比如1.5倍) - max_distance = yellow_radius * 1.5 - - # 红色圆圈应该比黄色圆圈大(外圈) - if distance < max_distance and red_radius > yellow_radius * 0.8: - found_valid_red = True - logger = logger_manager.logger - if logger: - logger.info(f"[target] -> 找到匹配的红圈: 黄心({yellow_center}), 红心({red_center}), 距离:{distance:.1f}, 黄半径:{yellow_radius}, 红半径:{red_radius}") - - # 记录这个有效目标 - valid_targets.append({ - 'center': yellow_center, - 'radius': yellow_radius, - 'ellipse': yellow_ellipse, - 'area': area - }) - break - - if not found_valid_red: - logger = logger_manager.logger - if logger: - logger.debug("Debug -> 未找到匹配的红色圆圈,可能是误识别") - - # 从所有有效目标中选择最佳目标 - if valid_targets: - if laser_point: - # 如果有激光点,选择最接近激光点的目标 - best_target = None - min_distance = float('inf') - for target in valid_targets: - dx = target['center'][0] - laser_point[0] - dy = target['center'][1] - laser_point[1] - distance = np.sqrt(dx*dx + dy*dy) - if distance < min_distance: - min_distance = distance - best_target = target - if best_target: - best_center = best_target['center'] - best_radius = best_target['radius'] - ellipse_params = best_target['ellipse'] - method = "v3_ellipse_red_validated_laser_selected" - best_radius1 = best_radius * 5 - else: - # 如果没有激光点,选择面积最大的目标 - best_target = max(valid_targets, key=lambda t: t['area']) - best_center = best_target['center'] - best_radius = best_target['radius'] - ellipse_params = best_target['ellipse'] - method = "v3_ellipse_red_validated" - best_radius1 = best_radius * 5 - - result_img = image.cv2image(img_cv, False, False) - return result_img, best_center, best_radius, method, best_radius1, ellipse_params - - -def estimate_distance(pixel_radius): - """根据像素半径估算实际距离(单位:米)""" - if not pixel_radius: - return 0.0 - return (config.REAL_RADIUS_CM * config.FOCAL_LENGTH_PIX) / pixel_radius / 100.0 - -def estimate_pixel(physical_distance_cm, target_distance_m): - """ - 根据物理距离和目标距离计算对应的像素偏移 - - Args: - physical_distance_cm: 物理世界中的距离(厘米),例如激光与摄像头的距离 - target_distance_m: 目标距离(米),例如到靶心的距离 - - Returns: - float: 对应的像素偏移 - """ - if not target_distance_m or target_distance_m <= 0: - return 0.0 - # 公式:像素偏移 = (物理距离_米) * 焦距_像素 / 目标距离_米 - return (physical_distance_cm / 100.0) * config.FOCAL_LENGTH_PIX / target_distance_m - - -def _save_shot_image_impl(img_cv, center, radius, method, ellipse_params, - laser_point, distance_m, shot_id=None, photo_dir=None): - """ - 内部实现:在 img_cv (numpy HWC RGB) 上绘制标注并保存。 - 由 save_shot_image(同步)和存图 worker(异步)调用。 - """ - if not config.SAVE_IMAGE_ENABLED: - return None - if photo_dir is None: - photo_dir = config.PHOTO_DIR - try: - try: - if photo_dir not in os.listdir("/root"): - os.mkdir(photo_dir) - except Exception: - pass - - x, y = laser_point - if shot_id: - if center is None or radius is None: - filename = f"{photo_dir}/shot_{shot_id}_no_target.bmp" - else: - method_str = method or "unknown" - filename = f"{photo_dir}/shot_{shot_id}_{method_str}.bmp" - else: - try: - all_images = [f for f in os.listdir(photo_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))] - img_count = len(all_images) - except Exception: - img_count = 0 - if center is None or radius is None: - method_str = "no_target" - distance_str = "000" - else: - method_str = method or "unknown" - distance_str = str(round((distance_m or 0.0) * 100)) - filename = f"{photo_dir}/{method_str}_{int(x)}_{int(y)}_{distance_str}_{img_count:04d}.bmp" - - logger = logger_manager.logger - if logger: - if shot_id: - logger.info(f"[VISION] 保存射箭图像,ID: {shot_id}, 文件名: {filename}") - if center and radius: - logger.info(f"结果 -> 圆心: {center}, 半径: {radius}, 方法: {method}") - if ellipse_params: - (ec, (ew, eh), ea) = ellipse_params - logger.info(f"椭圆 -> 中心: ({ec[0]:.1f}, {ec[1]:.1f}), 长轴: {max(ew, eh):.1f}, 短轴: {min(ew, eh):.1f}, 角度: {ea:.1f}°") - else: - logger.info(f"结果 -> 未检测到靶心,保存原始图像(激光点: ({x}, {y}))") - - laser_color = (config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2]) - cross_thickness = int(max(getattr(config, "LASER_THICKNESS", 1), 1)) - cross_length = int(max(getattr(config, "LASER_LENGTH", 10), 10)) - cv2.line(img_cv, (int(x - cross_length), int(y)), (int(x + cross_length), int(y)), laser_color, cross_thickness) - cv2.line(img_cv, (int(x), int(y - cross_length)), (int(x), int(y + cross_length)), laser_color, cross_thickness) - cv2.circle(img_cv, (int(x), int(y)), 1, laser_color, cross_thickness) - ring_thickness = 1 - cv2.circle(img_cv, (int(x), int(y)), 10, laser_color, ring_thickness) - cv2.circle(img_cv, (int(x), int(y)), 5, laser_color, ring_thickness) - cv2.circle(img_cv, (int(x), int(y)), 2, laser_color, -1) - - if center and radius: - cx, cy = center - if ellipse_params: - (ell_center, (width, height), angle) = ellipse_params - cx_ell, cy_ell = int(ell_center[0]), int(ell_center[1]) - cv2.ellipse(img_cv, (cx_ell, cy_ell), (int(width / 2), int(height / 2)), angle, 0, 360, (0, 255, 0), 2) - cv2.circle(img_cv, (cx_ell, cy_ell), 3, (255, 0, 0), -1) - minor_length = min(width, height) / 2 - minor_angle = angle + 90 if width >= height else angle - minor_angle_rad = math.radians(minor_angle) - dx_minor = minor_length * math.cos(minor_angle_rad) - dy_minor = minor_length * math.sin(minor_angle_rad) - pt1 = (int(cx_ell - dx_minor), int(cy_ell - dy_minor)) - pt2 = (int(cx_ell + dx_minor), int(cy_ell + dy_minor)) - cv2.line(img_cv, pt1, pt2, (0, 0, 255), 2) - else: - cv2.circle(img_cv, (cx, cy), radius, (0, 0, 255), 2) - cv2.circle(img_cv, (cx, cy), 2, (0, 0, 255), -1) - cv2.line(img_cv, (int(x), int(y)), (cx, cy), (255, 255, 0), 1) - - out = image.cv2image(img_cv, False, False) - out.save(filename) - if logger: - if center and radius: - logger.debug(f"图像已保存(含靶心标注): {filename}") - else: - logger.debug(f"图像已保存(无靶心,含激光十字线): {filename}") - - # 清理旧图片:如果目录下图片超过100张,删除最老的 - try: - image_files = [] - for f in os.listdir(photo_dir): - if f.endswith(('.bmp', '.jpg', '.jpeg')): - filepath = os.path.join(photo_dir, f) - try: - mtime = os.path.getmtime(filepath) - image_files.append((mtime, filepath, f)) - except Exception: - pass - - from config import MAX_IMAGES - if len(image_files) > MAX_IMAGES: - image_files.sort(key=lambda x: x[0]) - to_delete = len(image_files) - MAX_IMAGES - deleted_count = 0 - for _, filepath, fname in image_files[:to_delete]: - try: - os.remove(filepath) - deleted_count += 1 - if logger: - logger.debug(f"[VISION] 删除旧图片: {fname}") - except Exception as e: - if logger: - logger.warning(f"[VISION] 删除旧图片失败 {fname}: {e}") - if logger and deleted_count > 0: - logger.info(f"[VISION] 已清理 {deleted_count} 张旧图片,当前剩余 {MAX_IMAGES} 张") - except Exception as e: - if logger: - logger.warning(f"[VISION] 清理旧图片时出错(可忽略): {e}") - - return filename - except Exception as e: - logger = logger_manager.logger - if logger: - logger.error(f"保存图像失败: {e}") - import traceback - logger.error(traceback.format_exc()) - return None - - -def _save_worker_loop(): - """存图 worker:从队列取任务并调用 _save_shot_image_impl。""" - while True: - try: - item = _save_queue.get() - if item is None: - break - _save_shot_image_impl(*item) - except Exception as e: - logger = logger_manager.logger - if logger: - logger.error(f"[VISION] 存图 worker 异常: {e}") - import traceback - logger.error(traceback.format_exc()) - finally: - try: - _save_queue.task_done() - except Exception: - pass - - -def start_save_shot_worker(): - """启动存图 worker 线程(应在程序初始化时调用一次)。""" - global _save_worker_started - with _save_worker_lock: - if _save_worker_started: - return - _save_worker_started = True - t = threading.Thread(target=_save_worker_loop, daemon=True) - t.start() - logger = logger_manager.logger - if logger: - logger.info("[VISION] 存图 worker 线程已启动") - - -def enqueue_save_shot(result_img, center, radius, method, ellipse_params, - laser_point, distance_m, shot_id=None, photo_dir=None): - """ - 将存图任务放入队列,由 worker 异步保存。主线程传入 result_img 的复制,不阻塞。 - """ - if not config.SAVE_IMAGE_ENABLED: - return - if photo_dir is None: - photo_dir = config.PHOTO_DIR - try: - img_cv = image.image2cv(result_img, False, False) - img_copy = np.copy(img_cv) - except Exception as e: - logger = logger_manager.logger - if logger: - logger.error(f"[VISION] enqueue_save_shot 复制图像失败: {e}") - return - task = (img_copy, center, radius, method, ellipse_params, laser_point, distance_m, shot_id, photo_dir) - try: - _save_queue.put_nowait(task) - except queue.Full: - logger = logger_manager.logger - if logger: - logger.warning("[VISION] 存图队列已满,跳过本次保存") - - -def save_shot_image(result_img, center, radius, method, ellipse_params, - laser_point, distance_m, shot_id=None, photo_dir=None): - """ - 保存射击图像(带标注)。同步调用,会阻塞。 - 主流程建议使用 enqueue_save_shot;此处保留供校准、测试等场景使用。 - """ - if not config.SAVE_IMAGE_ENABLED: - return None - if photo_dir is None: - photo_dir = config.PHOTO_DIR - try: - img_cv = image.image2cv(result_img, False, False) - return _save_shot_image_impl(img_cv, center, radius, method, ellipse_params, - laser_point, distance_m, shot_id, photo_dir) - except Exception as e: - logger = logger_manager.logger - if logger: - logger.error(f"[VISION] save_shot_image 转换图像失败: {e}") - return None - - -def detect_target(frame, laser_point=None): - """ - 统一的靶心检测接口,根据配置自动选择检测方法 - - Args: - frame: MaixPy图像帧 - laser_point: 激光点坐标(可选) - - Returns: - (result_img, center, radius, method, best_radius1, ellipse_params) - 与detect_circle_v3保持相同的返回格式 - """ - logger = logger_manager.logger - - if config.USE_ARUCO: - # 使用ArUco检测 - if logger: - logger.debug("[VISION] 使用ArUco标记检测靶心") - - # 延迟导入以避免循环依赖 - from aruco_detector import detect_target_with_aruco - return detect_target_with_aruco(frame, laser_point) - else: - # 使用传统黄色靶心检测 - if logger: - logger.debug("[VISION] 使用传统黄色靶心检测") - return detect_circle_v3(frame, laser_point) -