diff --git a/config.py b/config.py index 59714b9..e1a5307 100644 --- a/config.py +++ b/config.py @@ -11,6 +11,15 @@ APP_VERSION = VERSION APP_DIR = "/maixapp/apps/t11" LOCAL_FILENAME = APP_DIR + "/main_tmp.py" +# ==================== 相机配置 ==================== +# 相机初始化分辨率(CameraManager / main.py 使用) +CAMERA_WIDTH = 640 +CAMERA_HEIGHT = 480 + +# 三角形检测缩图比例:默认按相机最长边缩到 1/2(性能更稳;可按需调整) +# 取值范围建议 (0.25 ~ 1.0];1.0 表示不缩图 +TRIANGLE_DETECT_SCALE = 0.5 + # ==================== 服务器配置 ==================== # SERVER_IP = "stcp.shelingxingqiu.com" SERVER_IP = "www.shelingxingqiu.com" diff --git a/design_doc/command_record.md b/design_doc/command_record.md index 5826176..fea95d4 100644 --- a/design_doc/command_record.md +++ b/design_doc/command_record.md @@ -72,7 +72,7 @@ printf 'AT+MHTTPDLFILE="http://static.shelingxingqiu.com/shoot/v1/main.py","down 遇到问题: ./build_all.sh: line 56: maixtool: command not found 解决办法: - export PATH="/mnt/d/code/MaixCDK/.venv/bin:$PATH" + pip install maixtool 遇到问题: ./update_img.sh: line 80: mcopy: command not found diff --git a/design_doc/solution_record.md b/design_doc/solution_record.md index 25524eb..e400d47 100644 --- a/design_doc/solution_record.md +++ b/design_doc/solution_record.md @@ -221,4 +221,56 @@ Why it works: MIPSEND enters prompt mode (>) — after the >, the AT parser treats ALL bytes as data, including CR/LF We construct the complete HTTP request ourselves (headers + Content-Length + multipart body) with real CRLF bytes -Key bug found during integration: _send_chunk() wrapped calls in self.at._cmd_lock, but self.at.send() also acquires the same lock internally — threading.Lock() is not reentrant, causing deadlock. Fixed by removing the outer lock (the network_manager.get_uart_lock() already provides thread safety).Trade-off: UART is locked during the entire upload, so heartbeats pause. For small JPEG files (~2-80KB), this is 5-20 seconds — acceptable if server heartbeat timeout is generous \ No newline at end of file +Key bug found during integration: _send_chunk() wrapped calls in self.at._cmd_lock, but self.at.send() also acquires the same lock internally — threading.Lock() is not reentrant, causing deadlock. Fixed by removing the outer lock (the network_manager.get_uart_lock() already provides thread safety).Trade-off: UART is locked during the entire upload, so heartbeats pause. For small JPEG files (~2-80KB), this is 5-20 seconds — acceptable if server heartbeat timeout is generous + + +13. 算环数算法1:「黄心 + 红心」椭圆/圆:主要在 vision.py 的 detect_circle_v3() 里完成:颜色先用 HSV 做掩码,再在轮廓上做面积、圆度筛选,黄圈用椭圆拟合,红圈预先筛成候选,最后用几何关系配对。 + + 1. 黄色怎么判、范围是什么? + 图像先转 HSV(cv2.COLOR_RGB2HSV,注意输入是 RGB)。 + 饱和度 S 整体乘 1.1 并限制在 0–255(让黄色更「显」一点)。 + 黄色 inRange(OpenCV HSV,H 多为 0–179): + 通道 下限 上限 + H 7 32 + S 80 255 + V 0 255 + 在黄掩码上找轮廓后,还要满足:面积 > 50,圆度 > 0.7(circularity = 4π·面积/周长²),且点数 ≥5 才 fitEllipse 当黄心椭圆。 + + 2. 红色怎么判、范围是什么? + 红色在 HSV 里跨 0°,所以用 两段 H 做并集: + 两段分别是: + H 0–10,S 80–255,V 0–255 + H 170–180,S 80–255,V 0–255 + 红轮廓候选:面积 > 50,圆度 > 0.6(比黄略松),再拟合椭圆或最小外接圆得到圆心和半径。 + + 3. 「黄心」和「红心」怎样算一对?(几何范围) + 对每个黄圈,在红色候选里找第一个满足: + + 两圆心距离 dist_centers < yellow_radius * 1.5 + 红半径 red_radius > yellow_radius * 0.8(红在外圈、略大) + dist_centers = math.hypot(ddx, ddy) + if dist_centers < yellow_radius * 1.5 and rc["radius"] > yellow_radius * 0.8: + 小结:黄色 = HSV H∈[7,32]、S≥80(且 S 放大 1.1)+ 形态学闭运算 + 面积/圆度;红色 = 两段 H(0–10 与 170–180)、S≥80 + 闭运算 + 面积/圆度;配对用 同心/包含 的距离与半径比例阈值。若你还关心 laser_manager.py 里「激光红点」的另一套阈值(LASER_*),那是另一条链路,和靶心黄/红 HSV 可以分开看。 + +14. 算环数算法2: + 使用单应性矩阵计算:镜头中心点(照片中心像素)到虚拟平面的转换。它不需要知道相机在 3D 空间中的具体位置,直接通过单应性矩阵 H的逆运算,将 2D 像素“翻译”成虚拟平面上的 2D 坐标。 + + 一、转换的本质:2D 到 2D 的“查字典” + 单应性变换(Homography)是平面到平面的映射。它不处理 3D 空间中的“投影线”,而是直接建立图像像素 (u,v)​ 与虚拟平面坐标 (x,y)​ 的一一对应关系。 + 你可以把单应性矩阵 H想象成一本“翻译字典”: + 正变换 H:已知靶纸上的真实位置 (x,y),查字典得到它在照片上哪个像素 (u,v)。 + 逆变换 H−1:已知照片上的像素 (u,v)(如镜头中心点),查字典反推它在靶纸上的真实位置 (x,y)。 + 这个“虚拟平面”就是你的靶纸平面(Z=0 的世界坐标系)。算法没有在物理上移动任何点,只是在做坐标系的换算。 + + 二、详细步骤:镜头中心点如何“落地” + + 相机分辨率是 640x480,镜头中心点(光轴与图像的交点)通常是 (u0,v0)=(320,240)。 + 1. 输入:镜头中心点(像素) + 2. 核心运算:乘以逆矩阵 + 通过 4 个黑色三角形的角点(已知真实坐标)计算出了单应性矩阵 H。现在使用它的逆矩阵 H−1 + 3. 输出:虚拟平面上的落点(物理坐标) + 计算后,你会得到:(xhit,yhit) + 这就是镜头中心点对应的靶纸上的真实位置(单位:毫米)。 + 4. 计算环数 + 由于虚拟平面原点 (0,0)就是靶纸圆心,直接计算欧氏距离。​ + 这个 d就是箭着点偏离圆心的真实物理距离,直接用于环数判定。 \ No newline at end of file diff --git a/main.py b/main.py index d1fc277..dac68cf 100644 --- a/main.py +++ b/main.py @@ -100,8 +100,7 @@ def cmd_str(): init_ina226() # 4. 初始化显示和相机 - camera_manager.init_camera(640, 480) - # camera_manager.init_camera(1280,720) + camera_manager.init_camera(getattr(config, "CAMERA_WIDTH", 640), getattr(config, "CAMERA_HEIGHT", 480)) camera_manager.init_display() # ==================== 第二阶段:软件初始化 ==================== diff --git a/network.py b/network.py index fca076a..6c457b6 100644 --- a/network.py +++ b/network.py @@ -1489,56 +1489,48 @@ class NetworkManager: qiniu_key = key.rstrip("/") + "/" + archive_filename self.logger.info(f"[LOG_UPLOAD] 日志归档已生成: {archive_path}, qiniu_key: {qiniu_key}") + # 2) 自动检测网络类型 + if self._network_type == "wifi" and self.is_wifi_connected(): + mode = "wifi" + else: + mode = "4g" + + self.logger.info(f"[LOG_UPLOAD] Using {mode} path, archive: {archive_path}") + try: - # 2) WiFi 优先:只要 WiFi 已连接就先尝试 WiFi,失败再回落到 4G - wifi_tried = False - wifi_ok = False + if mode == "wifi": + # ---- WiFi path: 使用 requests 库上传 ---- + import requests + import urllib3 + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - if self.is_wifi_connected(): - wifi_tried = True - self.logger.info(f"[LOG_UPLOAD] Using wifi path (preferred), archive: {archive_path}") - try: - # ---- WiFi path: 使用 requests 库上传 ---- - import requests - import urllib3 - urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + with open(archive_path, 'rb') as f: + files = {'file': (archive_filename, f, 'application/octet-stream')} + data = {'token': upload_token, 'key': qiniu_key} + wifi_upload_url = upload_url.replace('https://', 'http://', 1) + self.logger.info(f"[LOG_UPLOAD] WiFi upload URL: {wifi_upload_url}") + response = requests.post(wifi_upload_url, files=files, data=data, timeout=120, verify=False) + response.raise_for_status() + result_json = response.json() + uploaded_key = result_json.get('key', qiniu_key) - with open(archive_path, 'rb') as f: - files = {'file': (archive_filename, f, 'application/octet-stream')} - data = {'token': upload_token, 'key': qiniu_key} - wifi_upload_url = upload_url.replace('https://', 'http://', 1) - self.logger.info(f"[LOG_UPLOAD] WiFi upload URL: {wifi_upload_url}") - response = requests.post(wifi_upload_url, files=files, data=data, timeout=120, verify=False) - response.raise_for_status() - result_json = response.json() - uploaded_key = result_json.get('key', qiniu_key) + self.logger.info(f"[LOG_UPLOAD] WiFi upload ok: key={uploaded_key}") - self.logger.info(f"[LOG_UPLOAD] WiFi upload ok: key={uploaded_key}") + access_url = None + if outlink: + access_url = f"https://{outlink}/{uploaded_key}" - access_url = None - if outlink: - access_url = f"https://{outlink}/{uploaded_key}" + response_data = { + "result": "log_upload_ok", + "key": uploaded_key, + "via": "wifi", + } + if access_url: + response_data["url"] = access_url - response_data = { - "result": "log_upload_ok", - "key": uploaded_key, - "via": "wifi", - } - if access_url: - response_data["url"] = access_url - - self.safe_enqueue(response_data, 2) - wifi_ok = True - except Exception as e: - # WiFi 上传失败不影响主链路:记录原因并回落 4G - self.logger.warning(f"[LOG_UPLOAD] WiFi upload failed, fallback to 4g: {e}") - - if not wifi_ok: - if not wifi_tried: - self.logger.info(f"[LOG_UPLOAD] WiFi not connected, using 4g path, archive: {archive_path}") - else: - self.logger.info(f"[LOG_UPLOAD] Using 4g fallback path, archive: {archive_path}") + self.safe_enqueue(response_data, 2) + else: # ---- 4G path: 使用 FourGUploadManager AT命令上传 ---- import importlib.util spec = importlib.util.spec_from_file_location( @@ -1578,8 +1570,11 @@ class NetworkManager: }, 2) except Exception as e: - self.logger.error(f"[LOG_UPLOAD] upload exception: {e}") - self.safe_enqueue({"result": "log_upload_failed", "reason": str(e)[:100]}, 2) + self.logger.error(f"[LOG_UPLOAD] upload exception ({mode}): {e}") + self.safe_enqueue({ + "result": "log_upload_failed", + "reason": str(e)[:100] + }, 2) finally: # 清理临时归档文件 try: @@ -1600,57 +1595,51 @@ class NetworkManager: shoot_id: 射击ID outlink: 外链域名(可选,用于构建访问URL) """ - # WiFi 优先(独立于 TCP 主链路):只要 WiFi 已连接就先走 WiFi,失败再回落 4G - mode = "wifi" if self.is_wifi_connected() else "4g" + # 自动检测网络类型,选择上传路径 + if self._network_type == "wifi" and self.is_wifi_connected(): + mode = "wifi" + else: + mode = "4g" self.logger.info(f"[IMAGE_UPLOAD] Using {mode} path, image: {image_path}") try: - wifi_ok = False - if mode == "wifi": - try: - # ---- WiFi path: 使用 requests 库上传 ---- - import requests - import urllib3 - urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + # ---- WiFi path: 使用 requests 库上传 ---- + import requests + import urllib3 + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - with open(image_path, 'rb') as f: - files = {'file': (os.path.basename(image_path), f, 'application/octet-stream')} - data = {'token': upload_token, 'key': key} - # 将 HTTPS 转为 HTTP(设备端 SSL 兼容性) - wifi_upload_url = upload_url.replace('https://', 'http://', 1) - self.logger.info(f"[IMAGE_UPLOAD] WiFi upload URL: {wifi_upload_url}") - response = requests.post(wifi_upload_url, files=files, data=data, timeout=120, verify=False) - response.raise_for_status() - result_json = response.json() - uploaded_key = result_json.get('key', key) + with open(image_path, 'rb') as f: + files = {'file': (os.path.basename(image_path), f, 'application/octet-stream')} + data = {'token': upload_token, 'key': key} + # 测试:将HTTPS转为HTTP + wifi_upload_url = upload_url.replace('https://', 'http://', 1) + self.logger.info(f"[IMAGE_UPLOAD] WiFi upload URL: {wifi_upload_url}") + response = requests.post(wifi_upload_url, files=files, data=data, timeout=120, verify=False) + response.raise_for_status() + result_json = response.json() + uploaded_key = result_json.get('key', key) - self.logger.info(f"[IMAGE_UPLOAD] WiFi upload ok: key={uploaded_key}") + self.logger.info(f"[IMAGE_UPLOAD] WiFi upload ok: key={uploaded_key}") - access_url = None - if outlink: - access_url = f"https://{outlink}/{uploaded_key}" + access_url = None + if outlink: + access_url = f"https://{outlink}/{uploaded_key}" - response_data = { - "result": "image_upload_ok", - "shootId": shoot_id, - "key": uploaded_key, - "via": "wifi", - } - if access_url: - response_data["url"] = access_url + response_data = { + "result": "image_upload_ok", + "shootId": shoot_id, + "key": uploaded_key, + "via": "wifi", + } + if access_url: + response_data["url"] = access_url - self.safe_enqueue(response_data, 2) - wifi_ok = True - except Exception as e: - self.logger.warning(f"[IMAGE_UPLOAD] WiFi upload failed, fallback to 4g: {e}") + self.safe_enqueue(response_data, 2) - if not wifi_ok: + else: # ---- 4G path: 使用 FourGUploadManager AT命令上传 ---- - if mode != "4g": - self.logger.info(f"[IMAGE_UPLOAD] Using 4g fallback path, image: {image_path}") - import importlib.util spec = importlib.util.spec_from_file_location( "four_g_upload_manager", diff --git a/shoot_manager.py b/shoot_manager.py index 9e64212..3ebf15d 100644 --- a/shoot_manager.py +++ b/shoot_manager.py @@ -165,13 +165,28 @@ def analyze_shot(frame, laser_point=None): tri = tri_result.get('data', {}) - if tri.get('ok'): + # 保险校验:避免三角形返回 nan/inf 或退化点仍被上报 + try: + import numpy as _np + tri_ok = bool(tri.get('ok')) + if tri_ok: + dxv = tri.get("dx_cm") + dyv = tri.get("dy_cm") + H = tri.get("homography") + if not _np.isfinite(dxv) or not _np.isfinite(dyv): + tri_ok = False + elif H is not None and not _np.all(_np.isfinite(H)): + tri_ok = False + except Exception: + tri_ok = bool(tri.get('ok')) + + if tri_ok: logger.info(f"[TRI] end {datetime.now()} — 使用三角形结果(dx={tri['dx_cm']:.2f},dy={tri['dy_cm']:.2f}cm)") return { "success": True, "result_img": frame, "center": None, "radius": None, - "method": tri.get("offset_method") or "triangle_homography", + "method": "triangle_homography", "best_radius1": None, "ellipse_params": None, "dx": tri["dx_cm"], "dy": tri["dy_cm"], "distance_m": tri.get("distance_m") or distance_m_first, @@ -231,7 +246,7 @@ def process_shot(adc_val): # 三角形路径成功时 center/radius 为空是正常的;此时用 triangle 方法名用于保存文件名与上报字段 m if (not method) and tri_markers: - method = offset_method or "triangle_homography" + method = "triangle_homography" if config.SHOW_CAMERA_PHOTO_WHILE_SHOOTING: camera_manager.show(result_img) diff --git a/triangle_target.py b/triangle_target.py index 72e50b5..bca17f0 100644 --- a/triangle_target.py +++ b/triangle_target.py @@ -491,9 +491,17 @@ def try_triangle_scoring( h_orig, w_orig = img_rgb.shape[:2] - # 缩图加速:嵌入式 CPU 上图像处理耗时与面积成正比,缩到最长边 320px 可获得 ~4× 加速 + # 缩图加速:嵌入式 CPU 上图像处理耗时与面积成正比。 + # 不再写死 320/640:默认按相机最长边缩到 1/2(由 config.TRIANGLE_DETECT_SCALE 控制)。 # 检测完后把像素坐标乘以 inv_scale 还原到原始分辨率,再送入单应性/PnP(与 K 标定分辨率一致) - MAX_DETECT_DIM = 640 + try: + import config as _cfg + scale = float(getattr(_cfg, "TRIANGLE_DETECT_SCALE", 0.5)) + except Exception: + scale = 0.5 + if not (0.05 <= scale <= 1.0): + scale = 0.5 + MAX_DETECT_DIM = max(64, int(max(h_orig, w_orig) * scale)) long_side = max(h_orig, w_orig) if long_side > MAX_DETECT_DIM: det_scale = MAX_DETECT_DIM / long_side @@ -564,6 +572,40 @@ def try_triangle_scoring( marker_centers = [[float(c[0]), float(c[1])] for c in marker_centers] offset_tag = "triangle_homography_3pt" + # ---------- 结果有效性校验(防 nan/inf 与退化角点) ---------- + try: + import config as _cfg + min_center_dist_px = float(getattr(_cfg, "TRIANGLE_MIN_CENTER_DIST_PX", 3.0)) + max_dist_m = float(getattr(_cfg, "TRIANGLE_MAX_DISTANCE_M", 20.0)) + except Exception: + min_center_dist_px = 3.0 + max_dist_m = 20.0 + + def _all_finite(v) -> bool: + try: + return bool(np.all(np.isfinite(v))) + except Exception: + return False + + # 1) 4 个角点中心不能退化/重复(两两距离要大于阈值) + try: + pts = np.array(marker_centers, dtype=np.float64).reshape(-1, 2) + ok_centers = True + for i in range(len(pts)): + for j in range(i + 1, len(pts)): + if float(np.linalg.norm(pts[i] - pts[j])) <= min_center_dist_px: + ok_centers = False + break + if not ok_centers: + break + if not ok_centers: + _log(f"[TRI] 角点退化/重复:center_dist <= {min_center_dist_px:.1f}px,判定三角形失败") + return out + except Exception: + # 校验异常时不信任结果,直接回退 + _log("[TRI] 角点校验异常,判定三角形失败") + return out + ok_h, tx, ty, _H = homography_calibration( marker_centers, marker_ids, marker_positions, [lx, ly] ) @@ -571,6 +613,16 @@ def try_triangle_scoring( _log("[TRI] 单应性失败") return out + # 2) 单应性矩阵必须是有限数 + if (not _all_finite(_H)): + _log("[TRI] 单应性出现 nan/inf,判定三角形失败") + return out + + # 3) dx/dy 必须是有限数 + if (not _all_finite([tx, ty])): + _log("[TRI] 偏移出现 nan/inf,判定三角形失败") + return out + # 与 laser_manager.compute_laser_position 现网约定一致:(x_cm, -y_cm_target) out["dx_cm"] = tx out["dy_cm"] = -ty @@ -580,7 +632,8 @@ def try_triangle_scoring( out["homography"] = _H # 供上层反推靶心像素位置用 dist_m = pnp_distance_meters(marker_ids, marker_centers, marker_positions, camera_matrix, dist_coeffs) - if dist_m is not None and 0.3 < dist_m < 50.0: + # 4) distance_m 若存在也必须是有限数且在合理范围(默认 <20m) + if dist_m is not None and _all_finite([dist_m]) and 0.3 < dist_m < max_dist_m: out["distance_m"] = dist_m out["distance_method"] = "pnp_triangle" _log(f"[TRI] PnP 距离={dist_m:.2f}m, 偏移=({out['dx_cm']:.2f},{out['dy_cm']:.2f})cm")