diff --git a/app.yaml b/app.yaml index 40b272a..16bdd0a 100644 --- a/app.yaml +++ b/app.yaml @@ -5,10 +5,13 @@ author: t11 icon: '' desc: t11 files: + - 4g_download_manager.py - app.yaml - archery_netcore.cpython-311-riscv64-linux-gnu.so + - aruco_detector.py - at_client.py - camera_manager.py + - cameraParameters.xml - config.py - hardware.py - laser_manager.py @@ -17,9 +20,13 @@ files: - network.py - ota_manager.py - power.py + - server.pem - shoot_manager.py - shot_id_generator.py - time_sync.py + - triangle_positions.json + - triangle_target.py - version.py - - vision.cpython-311-riscv64-linux-gnu.so + - vision.py + - wifi_config_httpd.py - wifi.py diff --git a/archery_netcore.cpython-311-riscv64-linux-gnu.so b/archery_netcore.cpython-311-riscv64-linux-gnu.so new file mode 100644 index 0000000..1eb8d81 Binary files /dev/null and b/archery_netcore.cpython-311-riscv64-linux-gnu.so differ diff --git a/cameraParameters.xml b/cameraParameters.xml new file mode 100644 index 0000000..b41a66e --- /dev/null +++ b/cameraParameters.xml @@ -0,0 +1,33 @@ + + +"Sat Apr 11 12:05:27 2026" +29 + + 640 480 + + 3 + 3 +
d
+ + 2207.9058323074869 0. 328.90661220953149 0. 2207.9058323074869 + 205.49515894111076 0. 0. 1.
+ + 4 + 1 +
d
+ + 0. 11.687428265309892 3.6908895632668468 3.597571733110271
+ + 1 + 5 +
d
+ + -0.63036604771649651 3.3832710000807449 0. 0. -0.45113389267675552
+ + 5 + 1 +
d
+ + 0.025002349846111244 1.0651877135605927 0. 0. 0.04021252864120229
+0.28992233810828955 +
diff --git a/config.py b/config.py index 00bb188..62d2c41 100644 --- a/config.py +++ b/config.py @@ -36,17 +36,17 @@ WIFI_CONFIG_HTTP_PORT = 8080 # 默认 8080,避免占用 80 需 r WIFI_CONFIG_AP_IP = "192.168.66.1" # 与 MaixPy Wifi.start_ap 默认一致,手机访问 http://192.168.66.1:8080/ # ===== TCP over SSL(TLS) 配置 ===== -USE_TCP_SSL = False # True=按手册走 MSSLCFG/MIPCFG 绑定 SSL +USE_TCP_SSL = True # True=按手册走 MSSLCFG/MIPCFG 绑定 SSL TCP_LINK_ID = 2 # -TCP_SSL_PORT = 443 # TLS 端口(不一定必须 443,以服务器为准) +TCP_SSL_PORT = 50006 # TLS 端口(不一定必须 443,以服务器为准) # SSL profile SSL_ID = 1 # ssl_id=1 -SSL_AUTH_MODE = 0 # 1=单向认证(验证服务器),2=双向 +SSL_AUTH_MODE = 1 # 1=单向认证(验证服务器),2=双向 SSL_VERIFY_MODE = 1 # 0=不验(仅测试用);1=写入并使用 CA 证书 -SSL_CERT_FILENAME = "www.shelingxingqiu.com.crt" # 模组里证书名(MSSLCERTWR / MSSLCFG="cert" 用) -SSL_CERT_PATH = "/root/www.shelingxingqiu.com.crt" # 设备文件系统里 CA 证书路径(你自己放进去) +SSL_CERT_FILENAME = "server.pem" # 模组里证书名(MSSLCERTWR / MSSLCFG="cert" 用) +SSL_CERT_PATH = "/maixapp/apps/t11/server.pem" # 设备文件系统里 CA 证书路径(你自己放进去) # MIPOPEN 末尾的参数在不同固件里含义可能不同;按你手册例子保留 MIPOPEN_TAIL = ",,0" @@ -95,7 +95,7 @@ DEFAULT_LASER_POINT = (320, 245) # 默认激光中心点 # 硬编码激光点配置 HARDCODE_LASER_POINT = True # 是否使用硬编码的激光点(True=使用硬编码值,False=使用校准值) -HARDCODE_LASER_POINT_VALUE = (320, 245) # 硬编码的激光点坐标(315, 245) # # 硬编码的激光点坐标 (x, y) +HARDCODE_LASER_POINT_VALUE = (320, 296) # 硬编码的激光点坐标(315, 245) # # 硬编码的激光点坐标 (x, y) # 激光点检测配置 LASER_DETECTION_THRESHOLD = 140 # 红色通道阈值(默认120,可调整,范围建议:100-150) @@ -122,6 +122,27 @@ LASER_CAMERA_OFFSET_CM = 1.4 # 激光在摄像头下方的物理距离(厘米 IMAGE_CENTER_X = 320 # 图像中心 X 坐标 IMAGE_CENTER_Y = 240 # 图像中心 Y 坐标 +# ==================== 三角形四角标记:单应性偏移 + PnP 估距 ==================== +# 依赖 cameraParameters.xml(相机内参)与 triangle_positions.json(四角物方坐标,厘米或毫米见 JSON 约定)。 +# 部署时请把这两个文件放到 APP_DIR(与 main 同应用目录),或改下面路径为设备上的实际绝对路径。 +USE_TRIANGLE_OFFSET = True # False 时仅走黄心圆/椭圆 + 半径估距,不使用三角形路径 +CAMERA_CALIB_XML = APP_DIR + "/cameraParameters.xml" +TRIANGLE_POSITIONS_JSON = APP_DIR + "/triangle_positions.json" +# 检测到的三角形边长在图像中的像素范围,分辨率或靶纸占比变化时可微调 +TRIANGLE_SIZE_RANGE = (8, 500) +# 三角形检测兜底增强:CLAHE(更鲁棒但更慢)。默认关闭以优先速度。 +TRIANGLE_ENABLE_CLAHE_FALLBACK = False +# 三角形检测超时(毫秒)。超过该时间直接判失败,回退圆心算法(并行时不再等待)。 +TRIANGLE_TIMEOUT_MS = 1000 + +# 三角形检测性能/鲁棒性参数(偏向速度的默认值) +# 说明: +# - Otsu 是最快的全局阈值;adaptiveThreshold 更鲁棒但更慢 +# - filtered 候选过多时,枚举 C(n,4) 会变慢,需限幅 +TRIANGLE_EARLY_EXIT_CANDIDATES = 4 # 找到多少个候选就提前停止二值化尝试 +TRIANGLE_ADAPTIVE_BLOCK_SIZES = (11, 21) # 自适应阈值 blockSize 尝试列表;置空 () 可完全关闭 adaptiveThreshold +TRIANGLE_MAX_FILTERED_FOR_COMBO = 10 # 参与四点组合评分的最大候选数(超过则截断到最可能的一部分) + FLASH_LASER_WHILE_SHOOTING = True # 是否在拍摄时闪一下激光(True=闪,False=不闪) FLASH_LASER_DURATION_MS = 1000 # 闪一下激光的持续时间(毫秒) diff --git a/design_doc/command_record.md b/design_doc/command_record.md index 1156f54..f4f0527 100644 --- a/design_doc/command_record.md +++ b/design_doc/command_record.md @@ -36,3 +36,50 @@ printf 'AT+MHTTPDLFILE="http://static.shelingxingqiu.com/shoot/v1/main.py","down 4. wifi的启动条件,在 /boot 目录下,看看是否有 wifi.sta 和 wifi.ssid, wifi.pass 这些文件。其中 wifi.sta 是开关文件。 如果没有了它就不会启动wifi流程。具体的wifi流程 由 /etc/init.d/S30wifi 控制。它会判断 wifi.sta 是否存在,然后是否启动wifi,还是启动热点。 +5. 给自己的程序打包到基础镜像中,参考:https://wiki.sipeed.com/maixpy/doc/zh/pro/compile_os.html + 5.1. 按照链接中的步骤,去github上获取了基础镜像,这次使用的是 v4.12.4,把Assets中的下面几样东西下载下来,我是在windows的wsl中执行的,注意, + 假如是在windows中下载的文件,在wsl中编译会很慢,所以我采用的是直接在wsl中下载,放到wsl的自己的文件系统中。 + 1)maixcam-2025-12-31-maixpy-v4.12.4.img.xz + 2)maixcam_builtin_files.tar.xz + 3)MaixPy-4.12.4-py3-none-any.whl + 4)Source code(zip) + 5.2. 把自己的文件放到 buildtin_files中: + 1)我把项目文件目录 t11 放到了 maixcam_builtin_files\maixapp\apps 这个目录下。 + 2)为了能让它自启动,我把 auto_start.txt 放到了 maixcam_builtin_files\maixapp 这个目录下。 + + 5.3. 然后在解压后的源码中找到tools/os目录下 /home/saga/maixcam/MaixPy-4.12.4/tools/os/maixcam + 执行 + export MAIXCDK_PATH=/home/saga/maixcam/MaixCDK + 编译: + ./gen_os.sh ../../../../../maixcam/maixcam-2025-12-31-maixpy-v4.12.4.img ../../../../../maixcam/MaixPy-4.12.4-py3-none-any.whl ../../../../../maixcam/maixcam_builtin_files 0 maixcam + 注意,在编译过程中,也会去 github 下载内容,所以需要打开梯子。 + 5.4. 等待编译完成,会编译成镜像文件,然后根据 https://wiki.sipeed.com/hardware/zh/maixcam/os.html 这个指引来烧录系统。 + 5.5. 烧录完系统后,需要安装 runtime, 可以按照 https://wiki.sipeed.com/maixpy/doc/zh/README_no_screen.html 这个来升级运行库,或者直接在 Maixvision 中链接的时候安装 runtime。 + 5.6. 安装 runtime 之后,重启,我们的系统就会自己启动起来了。 + + 遇到问题: + /mnt/d/code/shooting/compile_maixcam/MaixPy-4.12.4/MaixPy-4.12.4/tools/os/maixcam/fuse2fs: error while loading shared libraries: libfuse.so.2: cannot open shared object file: No such file or directory + 解决办法: + 安装 libfuse2 + sudo apt update + sudo apt install libfuse2 + + 遇到问题: + python 缺少 yaml + 解决办法: + pip install pyyaml + + 遇到问题: + ./build_all.sh: line 56: maixtool: command not found + 解决办法: + export PATH="/mnt/d/code/MaixCDK/.venv/bin:$PATH" + + 遇到问题: + ./update_img.sh: line 80: mcopy: command not found + 解决办法: + sudo apt update + sudo apt install mtools + +6. 相机标定: +set OPENCV_FFMPEG_CAPTURE_OPTIONS="rtsp_transport;tcp" +opencv_interactive-calibration -t=chessboard -w=9 -h=6 -sz=0.025 -v="http://192.168.1.55:8000/stream" \ No newline at end of file diff --git a/design_doc/solution_record.md b/design_doc/solution_record.md index f76bdb6..0ca7831 100644 --- a/design_doc/solution_record.md +++ b/design_doc/solution_record.md @@ -102,4 +102,95 @@ WiFi 连接成功 尝试切换到 4G ↓ 上层检测到连接断开: - 重新 connect_server() → 自动选择 4G \ No newline at end of file + 重新 connect_server() → 自动选择 4G + +10. 现在使用的相机,其实是支持更大的分辨率的,比如说1920*1280,但是由于我们的图像处理,拍照处理之后很容易触发OOM。 + +11. 环数计算流程: +现在设备侧的目标是:算出箭点相对靶心的偏移(dx,dy),单位是物理厘米(cm),然后把它作为 x,y 上报给后端;后端再去算环。 +设备侧本身不直接算环数,它算的是偏移与距离,并上报。 + +算法流程(一次射箭从触发到上报) +1) 触发后取一帧图 + 在 process_shot() 里读取相机帧并调用 analyze_shot(frame) +2) 确定激光点(laser_point) + + analyze_shot() 第一步先确定激光点 (x,y)(像素坐标): + + 硬编码:config.HARDCODE_LASER_POINT=True → 用 laser_manager.laser_point + 已校准:laser_manager.has_calibrated_point() → 用校准值 + 动态模式:先 detect_circle_v3(frame, None) 粗估距离,再根据距离反推激光点 + 代码在: + + if config.HARDCODE_LASER_POINT: + ... + elif laser_manager.has_calibrated_point(): + ... + else: + _, _, _, _, best_radius1_temp, _ = detect_circle_v3(frame, None) + distance_m_first = estimate_distance(best_radius1_temp) ... + laser_point = laser_manager.calculate_laser_point_from_distance(distance_m_first) +3) 优先走三角形路径(成功就直接用于上报 x/y) +如果 config.USE_TRIANGLE_OFFSET=True,先尝试识别靶面四角三角形标记: + +if getattr(config, "USE_TRIANGLE_OFFSET", False): + K, dist_coef, pos = _get_triangle_calib() + img_rgb = image.image2cv(frame, False, False) + tri = try_triangle_scoring(img_rgb, (x, y), pos, K, dist_coef, ...) + if tri.get("ok"): + return {... "dx": tri["dx_cm"], "dy": tri["dy_cm"], "distance_m": tri.get("distance_m"), ...} +这一步里 try_triangle_scoring() 做了两件事(都在 triangle_target.py): + +单应性(homography):把激光点从图像坐标映射到靶面坐标系,得到(dx,dy)(cm) +PnP:用识别到的角点与相机标定,估算 相机到靶的距离 distance_m +关键代码: + +ok_h, tx, ty, _H = homography_calibration(...) +out["dx_cm"] = tx +out["dy_cm"] = -ty +out["distance_m"] = dist_m +out["distance_method"] = "pnp_triangle" +注意:这里 dy_cm 取了负号,是为了和现网约定一致(laser_manager.compute_laser_position 的坐标方向)。 + +4) 三角形失败 → 回退圆形/椭圆靶心检测(兜底) +如果三角形不可用或识别失败,就走传统靶心检测: + +detect_circle_v3(frame, laser_point) 找黄心/红心、半径、椭圆参数 +用 laser_manager.compute_laser_position() 把像素偏移换算成厘米偏移(dx,dy) +在 shoot_manager.py: + +result_img, center, radius, method, best_radius1, ellipse_params = detect_circle_v3(frame, laser_point) +if center and radius: + dx, dy = laser_manager.compute_laser_position(center, (x, y), radius, method) + distance_m = estimate_distance(best_radius1) ... +在 laser_manager.compute_laser_position()(核心换算逻辑): + +r = radius * 5 +target_x = (lx-cx)/r*100 +target_y = (ly-cy)/r*100 +return (target_x, -target_y) +这里 (像素差)/(radius*5)*100 是你们旧约定下的“像素→厘米”比例模型(并且 y 方向同样取负号)。 + +5) 上报数据:把(dx,dy) 作为 x/y 发给后端 +最终上报发生在 process_shot(),直接把 dx,dy 填到 inner_data["x"],["y"]: + +srv_x = round(float(dx), 4) if dx is not None else 200.0 +srv_y = round(float(dy), 4) if dy is not None else 200.0 +inner_data = { + "x": srv_x, + "y": srv_y, + "d": round((distance_m or 0.0) * 100), + "m": method if method else "no_target", + "offset_method": offset_method, + "distance_method": distance_method, + ... +} +network_manager.safe_enqueue(...) +x,y:物理厘米(cm) +d:相机到靶距离(m→cm,乘 100;三角形成功时来自 PnP) +m/offset_method/distance_method:标记本次用的算法路径(triangle / yellow / pnp 等) +后端收到 x,y 后,再用你之前给的 Go 公式 CalculateRingNumber(x,y,tenRingRadius) 计算环数。 + +你现在的“环数计算”实际依赖关系 +最好路径(快+稳):三角形 → dx,dy(单应性) + distance_m(PnP) +兜底路径:圆/椭圆靶心 → dx,dy(基于黄心半径比例/透视校正) + distance_m(黄心半径估距) diff --git a/server.pem b/server.pem new file mode 100644 index 0000000..f0f14b9 --- /dev/null +++ b/server.pem @@ -0,0 +1,33 @@ +-----BEGIN CERTIFICATE----- +MIIFwjCCA6qgAwIBAgIUAZIGjFLTekYI+IIquQ/87qLDuNAwDQYJKoZIhvcNAQEL +BQAwXjELMAkGA1UEBhMCQ04xDjAMBgNVBAgMBUxvY2FsMQ4wDAYDVQQHDAVMb2Nh +bDEOMAwGA1UECgwFTG9jYWwxHzAdBgNVBAMMFnd3dy5zaGVsaW5neGluZ3FpdS5j +b20wIBcNMjYwNDA3MDc0NDI2WhgPMjEyNjAzMTQwNzQ0MjZaMF4xCzAJBgNVBAYT +AkNOMQ4wDAYDVQQIDAVMb2NhbDEOMAwGA1UEBwwFTG9jYWwxDjAMBgNVBAoMBUxv +Y2FsMR8wHQYDVQQDDBZ3d3cuc2hlbGluZ3hpbmdxaXUuY29tMIICIjANBgkqhkiG +9w0BAQEFAAOCAg8AMIICCgKCAgEAvKRcWr8QeT1OzhMbWlHmqxmduE+e7r2Oet9I +mU4O888U1X1YKaIDnq+zqRCNteid3jrOWucDLReZzNnrZ4l3Jq9nbWuTwj9Y9vCq +ahW3K3BOhnuJ+qvqX2Izn1Z9iNCFhXnUaFy8+iP0nJNNIRXwg7ioKbY6+SaTbBzI +vfG33MjOmwnQlqZzdGyNpvieO9XzqVyRxeDen/LJf4Z1NocP2rOjqQC3dIDXOfBt +/ZOZymb4XwQ9b/t+6WJn9Zfycw0tp/7GqI+vqLDUMpipO4ahmybJPO02IhokZ09t +BnCXe0enLnMAshIipTxSaJEick9HnQVSUzF+9A1F0cCFAhS8cM/04aksfYsJD2xj +riiVHVoVo6tb0GJSCM+b0j9ObH9bDx3DKfy9EcqP25mJxWQTuT8G0oiyuxE5knjA +HL7yjwd5gVSuig+ACnxE3vITeVKtvyep7sD4tJqkN93t7OMeBRFMGsYpJ8w+8u6X ++9/RmMcOnuNcT/4HrOuAtlAnM1D44MSI1RLaOCJJ9evqhpWdktfn2Uv4gCnaTjUr +OiEU/G+lquST2kggjbcReLqkk+7yN3XkaR9dun4iV35WfEo1ENThVhLPGV61LaJq +PwbjltQlkcAFPJ1GJyE9FVO79bB51d0w/rlI/CcDUpTRMaXR35EmTjxvXOr/a/XI +56GUNaUCAwEAAaN2MHQwHQYDVR0OBBYEFH1HCDm4N7LMhIX2Fb2FXAfdyhwQMB8G +A1UdIwQYMBaAFH1HCDm4N7LMhIX2Fb2FXAfdyhwQMA8GA1UdEwEB/wQFMAMBAf8w +IQYDVR0RBBowGIIWd3d3LnNoZWxpbmd4aW5ncWl1LmNvbTANBgkqhkiG9w0BAQsF +AAOCAgEAG/PMwXCXJOaqCpU/LaY6w04ue6wk95RbPXf4JH4CrrLUfgyUmFlNNQPA +LuZSBRI6KUGkTvzuz/3ofZHVEin3CyE5NadB3UItpfA4Wl4r3jMPifIgnA/NT8xo +GE1gYaDbcfJNE8jy6GebjZekbVrPvCY9YgcUT2AmW5fcbnCTy+/iC7lf9MvvqHTJ +H5zvOp5nyWJYWYsvvif3Y7dp00ytg9I8/LSgUspKwB8qSWPWV8z4WsV6sc1mNqVS +nFBDkgzZxr4ZYlhVLzbSoab8D4A/z6riEMqv4S+oF5VkaJLhsN8vgHh9aPspCC3Q +zhcosH8XmNmJmT/X64FhhRqxAqX65WanVQABtBS/vsC+FAQDGMb3RkZSbLEnIlgj +bx/6bSkhHl+J2xIqA7tLvYhRSvM3H12X7VSVc+tkVzI5JoUSugZLxxRDGpYgkvRz +SPFCqb9eTn5ES5gnQX6+E+f/E/WQTmadolSbEppdxNZW7AaIUdQo0aFxFwctwhA2 +YNUG9oW2TXAZjSECyTo28NFkFfwBhpHWigFCANNCd8Nrn0k0YMuJOkqW5e4w3/24 +/IxM/C9K7aAx4S1XZ16Nvh5pZQduEGKTSUYMJ/uV26Mf4ZGroUfGB9tBguK5rYbL +UlRvtU9mkZPK04GbLsoo+8tZTDRtkuCiC19xk33XiitZrmavc24= +-----END CERTIFICATE----- diff --git a/shoot_manager.py b/shoot_manager.py index d80c75b..89c8fad 100644 --- a/shoot_manager.py +++ b/shoot_manager.py @@ -1,11 +1,44 @@ +import os +import threading + import config from camera_manager import camera_manager from laser_manager import laser_manager from logger_manager import logger_manager from network import network_manager -from power import get_bus_voltage, voltage_to_percent -from vision import estimate_distance, detect_circle_v3, save_shot_image -from maix import camera, display, image, app, time, uart, pinmap, i2c +from triangle_target import load_camera_from_xml, load_triangle_positions, try_triangle_scoring +from vision import estimate_distance, detect_circle_v3, enqueue_save_shot +from maix import image, time + +# 缓存相机标定与三角形位置,避免每次射箭重复读磁盘 +_tri_calib_cache = None + +def _get_triangle_calib(): + """返回 (K, dist, marker_positions);首次调用时从磁盘加载并缓存。""" + global _tri_calib_cache + if _tri_calib_cache is not None: + return _tri_calib_cache + calib_path = getattr(config, "CAMERA_CALIB_XML", "") + tri_json = getattr(config, "TRIANGLE_POSITIONS_JSON", "") + if not (os.path.isfile(calib_path) and os.path.isfile(tri_json)): + _tri_calib_cache = (None, None, None) + return _tri_calib_cache + K, dist = load_camera_from_xml(calib_path) + pos = load_triangle_positions(tri_json) + _tri_calib_cache = (K, dist, pos) + return _tri_calib_cache + + +def preload_triangle_calib(): + """ + 启动阶段预加载三角形标定与坐标文件,避免首次射箭触发时的读盘/解析开销。 + """ + try: + _get_triangle_calib() + except Exception: + # 预加载失败不影响主流程;射箭时会再次按需尝试 + pass + def analyze_shot(frame, laser_point=None): """ @@ -13,18 +46,18 @@ def analyze_shot(frame, laser_point=None): :param frame: 图像帧 :param laser_point: 激光点坐标 (x, y) :return: 包含分析结果的字典 + + 优先级: + 1. 三角形单应性(USE_TRIANGLE_OFFSET=True 时)— 成功则直接返回,跳过圆形检测 + 2. 圆形检测(三角形不可用或识别失败时兜底) """ logger = logger_manager.logger + from datetime import datetime - # 先检测靶心以获取距离(用于计算激光点) - result_img_temp, center_temp, radius_temp, method_temp, best_radius1_temp, ellipse_params_temp = detect_circle_v3( - frame, None) - - # 计算距离 - distance_m = estimate_distance(best_radius1_temp) if best_radius1_temp else None - - # 根据距离动态计算激光点坐标 + # ── Step 1: 确定激光点 ──────────────────────────────────────────────────── laser_point_method = None + distance_m_first = None + if config.HARDCODE_LASER_POINT: laser_point = laser_manager.laser_point laser_point_method = "hardcode" @@ -33,65 +66,128 @@ def analyze_shot(frame, laser_point=None): laser_point_method = "calibrated" if logger: logger.info(f"[算法] 使用校准值: {laser_manager.laser_point}") - elif distance_m and distance_m > 0: - laser_point = laser_manager.calculate_laser_point_from_distance(distance_m) - laser_point_method = "dynamic" - if logger: - logger.info(f"[算法] 使用比例尺: {laser_point}") else: - laser_point = laser_manager.laser_point - laser_point_method = "default" - if logger: - logger.info(f"[算法] 使用默认值: {laser_point}") + # 动态模式:先做一次无激光点检测以估算距离,再推算激光点 + _, _, _, _, best_radius1_temp, _ = detect_circle_v3(frame, None) + distance_m_first = estimate_distance(best_radius1_temp) if best_radius1_temp else None + if distance_m_first and distance_m_first > 0: + laser_point = laser_manager.calculate_laser_point_from_distance(distance_m_first) + laser_point_method = "dynamic" + if logger: + logger.info(f"[算法] 使用比例尺: {laser_point}") + else: + laser_point = laser_manager.laser_point + laser_point_method = "default" + if logger: + logger.info(f"[算法] 使用默认值: {laser_point}") if laser_point is None: - return { - "success": False, - "reason": "laser_point_not_initialized" - } + return {"success": False, "reason": "laser_point_not_initialized"} x, y = laser_point - # 绘制激光十字线 - color = image.Color(config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2]) - frame.draw_line( - int(x - config.LASER_LENGTH), int(y), - int(x + config.LASER_LENGTH), int(y), - color, config.LASER_THICKNESS - ) - frame.draw_line( - int(x), int(y - config.LASER_LENGTH), - int(x), int(y + config.LASER_LENGTH), - color, config.LASER_THICKNESS - ) - frame.draw_circle(int(x), int(y), 1, color, config.LASER_THICKNESS) + # ── Step 2: 提前转换一次图像,两个检测线程共享(只读)──────────────────────── + img_cv = image.image2cv(frame, False, False) - # 重新检测靶心(使用计算出的激光点) - result_img, center, radius, method, best_radius1, ellipse_params = detect_circle_v3(frame, laser_point) + # ── Step 3: 检查三角形是否可用 ──────────────────────────────────────────────── + use_tri = getattr(config, "USE_TRIANGLE_OFFSET", False) + K = dist_coef = pos = None + if use_tri: + K, dist_coef, pos = _get_triangle_calib() + use_tri = K is not None and dist_coef is not None and pos - # 计算偏移与距离 - if center and radius: - dx, dy = laser_manager.compute_laser_position(center, (x, y), radius, method) - distance_m = estimate_distance(best_radius1) - else: + def _build_circle_result(cdata): + """从圆形检测结果构建 analyze_shot 返回值。""" + r_img, center, radius, method, best_radius1, ellipse_params = cdata dx, dy = None, None - distance_m = None + d_m = distance_m_first + if center and radius: + dx, dy = laser_manager.compute_laser_position(center, (x, y), radius, method) + d_m = estimate_distance(best_radius1) if best_radius1 else distance_m_first + return { + "success": True, + "result_img": r_img, + "center": center, "radius": radius, "method": method, + "best_radius1": best_radius1, "ellipse_params": ellipse_params, + "dx": dx, "dy": dy, "distance_m": d_m, + "laser_point": laser_point, "laser_point_method": laser_point_method, + "offset_method": "yellow_ellipse" if ellipse_params else "yellow_circle", + "distance_method": "yellow_radius", + } - # 返回分析结果 - return { - "success": True, - "result_img": result_img, - "center": center, - "radius": radius, - "method": method, - "best_radius1": best_radius1, - "ellipse_params": ellipse_params, - "dx": dx, - "dy": dy, - "distance_m": distance_m, - "laser_point": laser_point, - "laser_point_method": laser_point_method - } + if not use_tri: + # 三角形未配置,直接跑圆形检测 + return _build_circle_result( + detect_circle_v3(frame, laser_point, img_cv=img_cv) + ) + + # ── Step 4: 三角形 + 圆形并行检测 ───────────────────────────────────────────── + # 两个线程共享只读的 img_cv,互不干扰 + tri_result = {} + circle_result = {} + + def _run_triangle(): + try: + logger.info(f"[TRI] begin {datetime.now()}") + logger.info(f"[TRI] K: {K}, dist: {dist_coef}, pos: {pos}, {datetime.now()}") + tri = try_triangle_scoring( + img_cv, (x, y), pos, K, dist_coef, + size_range=getattr(config, "TRIANGLE_SIZE_RANGE", (8, 500)), + ) + logger.info(f"[TRI] tri: {tri}, {datetime.now()}") + tri_result['data'] = tri + except Exception as e: + logger.error(f"[TRI] 三角形路径异常: {e}") + tri_result['data'] = {'ok': False} + + def _run_circle(): + try: + circle_result['data'] = detect_circle_v3(frame, laser_point, img_cv=img_cv) + except Exception as e: + logger.error(f"[CIRCLE] 圆形检测异常: {e}") + circle_result['data'] = (frame, None, None, None, None, None) + + t_tri = threading.Thread(target=_run_triangle, daemon=True) + t_cir = threading.Thread(target=_run_circle, daemon=True) + t_tri.start() + t_cir.start() + + # 最多等待三角形 TRIANGLE_TIMEOUT_MS(默认 1000ms) + tri_timeout_s = float(getattr(config, "TRIANGLE_TIMEOUT_MS", 1000)) / 1000.0 + t_tri.join(timeout=tri_timeout_s) + if t_tri.is_alive(): + # 超时:直接放弃三角形结果,回退圆心(圆心线程通常已跑完) + logger.warning(f"[TRI] timeout>{tri_timeout_s:.2f}s,回退圆心算法") + t_cir.join() + return _build_circle_result( + circle_result.get('data') or (frame, None, None, None, None, None) + ) + + tri = tri_result.get('data', {}) + + if tri.get('ok'): + logger.info(f"[TRI] end {datetime.now()}") + return { + "success": True, + "result_img": frame, + "center": None, "radius": None, + "method": tri.get("offset_method") or "triangle_homography", + "best_radius1": None, "ellipse_params": None, + "dx": tri["dx_cm"], "dy": tri["dy_cm"], + "distance_m": tri.get("distance_m") or distance_m_first, + "laser_point": laser_point, "laser_point_method": laser_point_method, + "offset_method": tri.get("offset_method") or "triangle_homography", + "distance_method": tri.get("distance_method") or "pnp_triangle", + "tri_markers": tri.get("markers", []), + "tri_homography": tri.get("homography"), + } + + # 三角形失败,等圆形结果(已并行跑完,几乎无额外等待) + t_cir.join() + logger.info(f"[TRI] end(fallback) {datetime.now()}") + return _build_circle_result( + circle_result.get('data') or (frame, None, None, None, None, None) + ) def process_shot(adc_val): @@ -103,6 +199,7 @@ def process_shot(adc_val): logger = logger_manager.logger try: + network_manager.safe_enqueue({"shoot_event": "start"}, msg_type=2, high=True) frame = camera_manager.read_frame() # 调用算法分析 @@ -126,16 +223,21 @@ def process_shot(adc_val): distance_m = analysis_result["distance_m"] laser_point = analysis_result["laser_point"] laser_point_method = analysis_result["laser_point_method"] + offset_method = analysis_result.get("offset_method", "yellow_circle") + distance_method = analysis_result.get("distance_method", "yellow_radius") + tri_markers = analysis_result.get("tri_markers", []) + tri_homography = analysis_result.get("tri_homography") x, y = laser_point - camera_manager.show(result_img) + # 三角形路径成功时 center/radius 为空是正常的;此时用 triangle 方法名用于保存文件名与上报字段 m + if (not method) and tri_markers: + method = offset_method or "triangle_homography" - if not (center and radius) and logger: - logger.warning("[MAIN] 未检测到靶心,但会保存图像") + if config.SHOW_CAMERA_PHOTO_WHILE_SHOOTING: + camera_manager.show(result_img) - # 读取电量 - voltage = get_bus_voltage() - battery_percent = voltage_to_percent(voltage) + if dx is None and dy is None and logger: + logger.warning("[MAIN] 未检测到偏移量(三角形与圆形均失败),但会保存图像") # 生成射箭ID from shot_id_generator import shot_id_generator @@ -144,33 +246,30 @@ def process_shot(adc_val): if logger: logger.info(f"[MAIN] 射箭ID: {shot_id}") - # 保存图像 - save_shot_image( - result_img, - center, - radius, - method, - ellipse_params, - (x, y), - distance_m, - shot_id=shot_id, - photo_dir=config.PHOTO_DIR if config.SAVE_IMAGE_ENABLED else None - ) + laser_distance_m = None + laser_signal_quality = 0 + + # x,y 单位:物理厘米(compute_laser_position 与三角形单应性均输出物理 cm) + # 未检测到靶心时 x/y 用 200.0(脱靶标志) + srv_x = round(float(dx), 4) if dx is not None else 200.0 + srv_y = round(float(dy), 4) if dy is not None else 200.0 # 构造上报数据 inner_data = { "shot_id": shot_id, - "x": float(dx) if dx is not None else 200.0, - "y": float(dy) if dy is not None else 200.0, - "r": 90.0, + "x": srv_x, + "y": srv_y, + "r": 20.0, # 保留字段(服务端当前忽略,物理外环半径 cm) "d": round((distance_m or 0.0) * 100), - "d_laser": 0.0, - "d_laser_quality": 0, + "d_laser": round((laser_distance_m or 0.0) * 100), + "d_laser_quality": laser_signal_quality, "m": method if method else "no_target", "adc": adc_val, "laser_method": laser_point_method, "target_x": float(x), "target_y": float(y), + "offset_method": offset_method, + "distance_method": distance_method, } if ellipse_params: @@ -190,14 +289,99 @@ def process_shot(adc_val): report_data = {"cmd": 1, "data": inner_data} network_manager.safe_enqueue(report_data, msg_type=2, high=True) - if logger: - if center and radius: - logger.info(f"射箭事件已加入发送队列(已检测到靶心),ID: {shot_id}") - else: - logger.info(f"射箭事件已加入发送队列(未检测到靶心,已保存图像),ID: {shot_id}") + # 数据上报后再画标注,不干扰检测阶段的原始画面 + if result_img is not None: + # 1. 若有三角形标记,先用 cv2 画轮廓 / 顶点 / ID,再反推靶心位置 + if tri_markers: + import cv2 as _cv2 + import numpy as _np + _img_cv = image.image2cv(result_img, False, False) + + # 三角形轮廓 + 直角顶点 + ID + for _m in tri_markers: + _corners = _np.array(_m["corners"], dtype=_np.int32) + _cv2.polylines(_img_cv, [_corners], True, (0, 255, 0), 2) + _cx, _cy = int(_m["center"][0]), int(_m["center"][1]) + _cv2.circle(_img_cv, (_cx, _cy), 4, (0, 0, 255), -1) + _cv2.putText(_img_cv, f"T{_m['id']}", + (_cx - 18, _cy - 12), + _cv2.FONT_HERSHEY_SIMPLEX, 0.55, (0, 255, 0), 1) + + # 靶心(H_inv @ [0,0]):小红圆 + _center_px = None + if tri_homography is not None: + try: + _H_inv = _np.linalg.inv(tri_homography) + _c_img = _cv2.perspectiveTransform( + _np.array([[[0.0, 0.0]]], dtype=_np.float32), _H_inv)[0][0] + _ocx, _ocy = int(_c_img[0]), int(_c_img[1]) + _cv2.circle(_img_cv, (_ocx, _ocy), 5, (0, 0, 255), -1) # 实心 + _cv2.circle(_img_cv, (_ocx, _ocy), 9, (0, 0, 255), 1) # 外框 + _center_px = (_ocx, _ocy) + logger.info(f"[算法] 靶心: {_center_px}") + except Exception: + pass + + # 叠加信息:落点-圆心距离 / 相机-靶距离等 + try: + import math as _math + _lines = [] + if dx is not None and dy is not None: + _r_cm = _math.hypot(float(dx), float(dy)) + _lines.append(f"offset=({float(dx):.2f},{float(dy):.2f})cm |r|={_r_cm:.2f}cm") + if distance_m is not None: + _lines.append(f"cam_dist={float(distance_m):.2f}m ({distance_method})") + if method: + _lines.append(f"method={method}") + if _lines: + _y0 = 22 + for i, _t in enumerate(_lines): + _cv2.putText( + _img_cv, + _t, + (10, _y0 + i * 18), + _cv2.FONT_HERSHEY_SIMPLEX, + 0.5, + (0, 255, 0), + 1, + ) + except Exception: + pass + + result_img = image.cv2image(_img_cv, False, False) + + # 2. 激光十字线 + _lc = image.Color(config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2]) + result_img.draw_line(int(x - config.LASER_LENGTH), int(y), + int(x + config.LASER_LENGTH), int(y), + _lc, config.LASER_THICKNESS) + result_img.draw_line(int(x), int(y - config.LASER_LENGTH), + int(x), int(y + config.LASER_LENGTH), + _lc, config.LASER_THICKNESS) + result_img.draw_circle(int(x), int(y), 1, _lc, config.LASER_THICKNESS) # 闪一下激光(射箭反馈) - laser_manager.flash_laser(1000) + if config.FLASH_LASER_WHILE_SHOOTING: + laser_manager.flash_laser(config.FLASH_LASER_DURATION_MS) + + # 保存图像(异步队列,与 main.py 一致) + enqueue_save_shot( + result_img, + center, + radius, + method, + ellipse_params, + (x, y), + distance_m, + shot_id=shot_id, + photo_dir=config.PHOTO_DIR if config.SAVE_IMAGE_ENABLED else None, + ) + + if logger: + if dx is not None and dy is not None: + logger.info(f"射箭事件已加入发送队列(偏移=({dx:.2f},{dy:.2f})cm),ID: {shot_id}") + else: + logger.info(f"射箭事件已加入发送队列(未检测到偏移,已保存图像),ID: {shot_id}") time.sleep_ms(100) except Exception as e: diff --git a/triangle_positions.json b/triangle_positions.json new file mode 100644 index 0000000..655e966 --- /dev/null +++ b/triangle_positions.json @@ -0,0 +1,6 @@ +{ + "0": [-20.0, -20.0, 0.0], + "1": [-20.0, 20.0, 0.0], + "2": [ 20.0, 20.0, 0.0], + "3": [ 20.0, -20.0, 0.0] +} diff --git a/triangle_target.py b/triangle_target.py new file mode 100644 index 0000000..8778750 --- /dev/null +++ b/triangle_target.py @@ -0,0 +1,513 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +靶纸四角等腰直角三角形:检测、单应性落点、PnP 估距。 +从 test/aruco_deteck.py 抽出,供主流程 shoot_manager 使用。 +""" +import json +import os +from itertools import combinations + +import cv2 +import numpy as np + + +def _log(msg): + try: + from logger_manager import logger_manager + if logger_manager.logger: + logger_manager.logger.info(msg) + except Exception: + pass + + +def load_camera_from_xml(path): + """读取 OpenCV FileStorage XML,返回 (camera_matrix, dist_coeffs) 或 (None, None)。""" + if not path or not os.path.isfile(path): + _log(f"[TRI] 标定文件不存在: {path}") + return None, None + try: + fs = cv2.FileStorage(path, cv2.FILE_STORAGE_READ) + K = fs.getNode("camera_matrix").mat() + dist = fs.getNode("distortion_coefficients").mat() + fs.release() + if K is None or K.size == 0: + return None, None + if dist is None or dist.size == 0: + dist = np.zeros((5, 1), dtype=np.float64) + return K, dist + except Exception as e: + _log(f"[TRI] 读取标定失败: {e}") + return None, None + + +def load_triangle_positions(path): + """加载 triangle_positions.json,返回 dict[int, [x,y,z]]。""" + if not path or not os.path.isfile(path): + _log(f"[TRI] 三角形位置文件不存在: {path}") + return None + with open(path, "r", encoding="utf-8") as f: + raw = json.load(f) + return {int(k): v for k, v in raw.items()} + + +def homography_calibration(marker_centers, marker_ids, marker_positions, impact_point_pixel): + target_points = [] + for mid in marker_ids: + pos = marker_positions.get(mid) + if pos is None: + return False, None, None, None + target_points.append([pos[0], pos[1]]) + + src_pts = np.array(marker_centers, dtype=np.float32) + dst_pts = np.array(target_points, dtype=np.float32) + H, _ = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, ransacReprojThreshold=1.0) + if H is None: + return False, None, None, None + + pt = np.array([[impact_point_pixel]], dtype=np.float32) + transformed = cv2.perspectiveTransform(pt, H) + target_x = float(transformed[0][0][0]) + target_y = float(transformed[0][0][1]) + return True, target_x, target_y, H + + +def complete_fourth_point(detected_ids, detected_centers, marker_positions): + target_order = [0, 1, 2, 3] + target_coords = {mid: marker_positions[mid][:2] for mid in target_order} + all_ids = set(target_coords.keys()) + missing_id = (all_ids - set(detected_ids)).pop() + + known_src = [] + known_dst = [] + for mid, pt in zip(detected_ids, detected_centers): + known_src.append(pt) + known_dst.append(target_coords[mid]) + + M_inv, _ = cv2.estimateAffine2D( + np.array(known_dst, dtype=np.float32), + np.array(known_src, dtype=np.float32), + ) + if M_inv is None: + return None + + missing_target = target_coords[missing_id] + missing_src_h = M_inv @ np.array([missing_target[0], missing_target[1], 1.0]) + missing_src = missing_src_h[:2] + + complete_centers = [] + for mid in target_order: + if mid == missing_id: + complete_centers.append(missing_src) + else: + idx = detected_ids.index(mid) + complete_centers.append(detected_centers[idx]) + + return complete_centers, target_order + + +def pnp_distance_meters(marker_ids, marker_centers_px, marker_positions, K, dist): + """ + 靶面原点 (0,0,0) 到相机光心的距离:||tvec||,object 单位为 cm 时 tvec 为 cm,返回米。 + """ + obj = [] + for mid in marker_ids: + p = marker_positions[mid] + obj.append([float(p[0]), float(p[1]), float(p[2])]) + obj_pts = np.array(obj, dtype=np.float64) + img_pts = np.array(marker_centers_px, dtype=np.float64) + + ok, rvec, tvec = cv2.solvePnP( + obj_pts, img_pts, K, dist, flags=cv2.SOLVEPNP_ITERATIVE + ) + if not ok: + return None + tvec = tvec.reshape(-1) + dist_cm = float(np.linalg.norm(tvec)) + return dist_cm / 100.0 + + +def detect_triangle_markers( + gray_image, + orig_gray=None, + size_range=(8, 500), + max_interior_gray=90, + dark_pixel_gray=80, + min_dark_ratio=0.70, + verbose=True, +): + # 读取可调参数(缺省值与 config.py 保持一致) + try: + import config as _cfg + early_exit = int(getattr(_cfg, "TRIANGLE_EARLY_EXIT_CANDIDATES", 4)) + block_sizes = tuple(getattr(_cfg, "TRIANGLE_ADAPTIVE_BLOCK_SIZES", (11, 21, 35))) + max_combo_n = int(getattr(_cfg, "TRIANGLE_MAX_FILTERED_FOR_COMBO", 10)) + except Exception: + early_exit = 4 + block_sizes = (11, 21, 35) + max_combo_n = 10 + + min_leg, max_leg = size_range + min_area = 0.5 * (min_leg ** 2) * 0.1 + kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3)) + + def _check_shape(approx): + pts = approx.reshape(3, 2).astype(np.float32) + sides = [ + np.linalg.norm(pts[1] - pts[0]), + np.linalg.norm(pts[2] - pts[1]), + np.linalg.norm(pts[0] - pts[2]), + ] + order = sorted(range(3), key=lambda i: sides[i]) + leg1, leg2, hyp = sides[order[0]], sides[order[1]], sides[order[2]] + avg_leg = (leg1 + leg2) / 2 + + if not (min_leg <= avg_leg <= max_leg): + return None + if abs(leg1 - leg2) / (avg_leg + 1e-6) > 0.20: + return None + if abs(hyp - avg_leg * np.sqrt(2)) / (avg_leg * np.sqrt(2) + 1e-6) > 0.20: + return None + + edge_verts = [(0, 1), (1, 2), (2, 0)] + hv0, hv1 = edge_verts[order[2]] + right_v = 3 - hv0 - hv1 + right_pt = pts[right_v] + + v0 = pts[hv0] - right_pt + v1_vec = pts[hv1] - right_pt + cos_a = np.dot(v0, v1_vec) / ( + np.linalg.norm(v0) * np.linalg.norm(v1_vec) + 1e-6 + ) + if abs(cos_a) > 0.20: + return None + + return right_pt, avg_leg, pts + + def _color_ok(approx): + if orig_gray is None: + return True + mask = np.zeros(orig_gray.shape[:2], dtype=np.uint8) + cv2.fillPoly(mask, [approx], 255) + erode_k = max(1, int(min(orig_gray.shape[:2]) * 0.002)) + erode_k = min(erode_k, 5) + k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (2 * erode_k + 1, 2 * erode_k + 1)) + mask_in = cv2.erode(mask, k, iterations=1) + if cv2.countNonZero(mask_in) < 20: + mask_in = mask + + mean_val = cv2.mean(orig_gray, mask=mask_in)[0] + ys, xs = np.where(mask_in > 0) + if len(xs) == 0: + return False + interior = orig_gray[ys, xs] + dark_ratio = float(np.mean(interior <= dark_pixel_gray)) + return (mean_val <= max_interior_gray) and (dark_ratio >= min_dark_ratio) + + def _extract_candidates(binary_img): + contours, _ = cv2.findContours(binary_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + found = [] + for cnt in contours: + if cv2.contourArea(cnt) < min_area: + continue + peri = cv2.arcLength(cnt, True) + approx = cv2.approxPolyDP(cnt, 0.05 * peri, True) + if len(approx) != 3: + continue + shape = _check_shape(approx) + if shape is None: + continue + if not _color_ok(approx): + continue + right_pt, avg_leg, pts = shape + center_px = np.mean(pts, axis=0).tolist() + dedup_key = f"{int(center_px[0] // 10)},{int(center_px[1] // 10)}" + found.append({ + "center_px": center_px, + "right_pt": right_pt.tolist(), + "corners": pts.tolist(), + "avg_leg": avg_leg, + "dedup_key": dedup_key, + }) + return found + + all_candidates = [] + seen_keys = set() + # 早退条件:不仅要数量够,还要候选分布足够分散(覆盖多个象限),避免误检集中导致提前退出 + h0, w0 = gray_image.shape[:2] + cx0, cy0 = w0 / 2.0, h0 / 2.0 + seen_quadrants = set() + # 4 个候选就够 4 角检测;3 个够 3 点补全,加 1 裕量 + _EARLY_EXIT = max(3, early_exit) + + def _add_from_binary(b): + b = cv2.morphologyEx(b, cv2.MORPH_CLOSE, kernel) + for c in _extract_candidates(b): + if c["dedup_key"] not in seen_keys: + seen_keys.add(c["dedup_key"]) + all_candidates.append(c) + # 象限统计:按图像中心划分 + tx, ty = c["center_px"] + if tx < cx0 and ty < cy0: + q = 0 + elif tx < cx0: + q = 1 + elif ty >= cy0: + q = 2 + else: + q = 3 + seen_quadrants.add(q) + + def _should_early_exit(): + # 至少覆盖 3 个象限 + 数量达到阈值,才认为“足够像四角”可停止更多尝试 + return (len(all_candidates) >= _EARLY_EXIT) and (len(seen_quadrants) >= 3) + + # 1. 最快:全局 Otsu(无需逐像素邻域计算,~10ms) + _, b_otsu = cv2.threshold(gray_image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) + _add_from_binary(b_otsu) + + # 2. 只在 Otsu 不够时才跑自适应阈值(每次 ~100ms,尽早退出) + for block_size in block_sizes: + if _should_early_exit(): + break + if block_size is None: + continue + b = cv2.adaptiveThreshold( + gray_image, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, block_size, 4 + ) + _add_from_binary(b) + + if verbose: + _log(f"[TRI] 候选三角形共 {len(all_candidates)} 个(预过滤前)") + + if len(all_candidates) < 2: + return [] + + all_legs = [c["avg_leg"] for c in all_candidates] + med_leg = float(np.median(all_legs)) + filtered = [] + for c in all_candidates: + leg = c["avg_leg"] + if med_leg > 1e-6 and not (0.40 * med_leg <= leg <= 2.0 * med_leg): + continue + filtered.append(c) + + if len(filtered) < 2: + return [] + + # 候选过多时,四点组合枚举会变慢:截断到更可能的 max_combo_n 个候选 + if max_combo_n > 0 and len(filtered) > max_combo_n: + # 以 avg_leg 接近中位数优先(更符合四角同尺度) + med_leg = float(np.median([c["avg_leg"] for c in filtered])) + filtered = sorted(filtered, key=lambda c: abs(c["avg_leg"] - med_leg))[:max_combo_n] + + def _order_quad(pts_4): + by_y = sorted(range(4), key=lambda i: pts_4[i][1]) + top_pair = sorted(by_y[:2], key=lambda i: pts_4[i][0]) + bot_pair = sorted(by_y[2:], key=lambda i: pts_4[i][0]) + return top_pair[0], bot_pair[0], bot_pair[1], top_pair[1] + + def _score_quad(cands_4): + pts = [np.array(c["center_px"]) for c in cands_4] + legs = [c["avg_leg"] for c in cands_4] + tl, bl, br, tr = _order_quad(pts) + + diag1 = np.linalg.norm(pts[tl] - pts[br]) + diag2 = np.linalg.norm(pts[bl] - pts[tr]) + diag_ratio = max(diag1, diag2) / (min(diag1, diag2) + 1e-6) + + s_top = np.linalg.norm(pts[tl] - pts[tr]) + s_bot = np.linalg.norm(pts[bl] - pts[br]) + s_left = np.linalg.norm(pts[tl] - pts[bl]) + s_right = np.linalg.norm(pts[tr] - pts[br]) + h_ratio = max(s_top, s_bot) / (min(s_top, s_bot) + 1e-6) + v_ratio = max(s_left, s_right) / (min(s_left, s_right) + 1e-6) + + med_l = float(np.median(legs)) + leg_dev = max(abs(l - med_l) / (med_l + 1e-6) for l in legs) + + score = (diag_ratio - 1.0) * 3.0 + (h_ratio - 1.0) + (v_ratio - 1.0) + leg_dev * 2.0 + return score, (tl, bl, br, tr) + + assigned = None + if len(filtered) >= 4: + best_score = float("inf") + best_combo = None + best_order = None + + for combo in combinations(range(len(filtered)), 4): + cands = [filtered[i] for i in combo] + score, order = _score_quad(cands) + if score < best_score: + best_score = score + best_combo = combo + best_order = order + + if verbose: + _log(f"[TRI] 最优四边形: score={best_score:.3f}") + + if best_score < 3.0: + cands = [filtered[i] for i in best_combo] + tl, bl, br, tr = best_order + assigned = { + 0: cands[tl], + 1: cands[bl], + 2: cands[br], + 3: cands[tr], + } + + if assigned is None: + cx = np.mean([c["center_px"][0] for c in filtered]) + cy = np.mean([c["center_px"][1] for c in filtered]) + quadrant_map = {} + for c in filtered: + tx, ty = c["center_px"] + if tx < cx and ty < cy: + q = 0 + elif tx < cx: + q = 1 + elif ty >= cy: + q = 2 + else: + q = 3 + if q not in quadrant_map or c["avg_leg"] > quadrant_map[q]["avg_leg"]: + quadrant_map[q] = c + assigned = quadrant_map + + result = [] + for tid in sorted(assigned.keys()): + c = assigned[tid] + result.append({ + "id": tid, + "center": c["right_pt"], + "corners": c["corners"], + }) + return result + + +def try_triangle_scoring( + img_rgb, + laser_xy, + marker_positions, + camera_matrix, + dist_coeffs, + size_range=(8, 500), +): + """ + 尝试三角形单应性 + PnP 估距。 + img_rgb: RGB,与 laser_xy 同一像素坐标系。 + 返回 dict: + ok, dx_cm, dy_cm, distance_m, offset_method, distance_method + """ + out = { + "ok": False, + "dx_cm": None, + "dy_cm": None, + "distance_m": None, + "offset_method": None, + "distance_method": None, + } + if marker_positions is None or camera_matrix is None or dist_coeffs is None: + return out + + h_orig, w_orig = img_rgb.shape[:2] + + # 缩图加速:嵌入式 CPU 上图像处理耗时与面积成正比,缩到最长边 320px 可获得 ~4× 加速 + # 检测完后把像素坐标乘以 inv_scale 还原到原始分辨率,再送入单应性/PnP(与 K 标定分辨率一致) + MAX_DETECT_DIM = 320 + long_side = max(h_orig, w_orig) + if long_side > MAX_DETECT_DIM: + det_scale = MAX_DETECT_DIM / long_side + det_w = int(w_orig * det_scale) + det_h = int(h_orig * det_scale) + img_det = cv2.resize(img_rgb, (det_w, det_h), interpolation=cv2.INTER_LINEAR) + inv_scale = 1.0 / det_scale + size_range_det = (max(4, int(size_range[0] * det_scale)), + max(8, int(size_range[1] * det_scale))) + else: + img_det = img_rgb + inv_scale = 1.0 + size_range_det = size_range + + gray = cv2.cvtColor(img_det, cv2.COLOR_RGB2GRAY) + + # 快速路径:直接在原始灰度图上跑(内部先走 Otsu,几乎不耗时) + # 光照均匀时通常在这一步就找到 ≥3 个三角形,完全跳过 CLAHE + tri_markers = detect_triangle_markers( + gray, orig_gray=gray, size_range=size_range_det, verbose=True + ) + + if len(tri_markers) < 3: + # 慢速兜底:CLAHE 增强对比度后再试(光线不均 / 局部过暗时有效) + # 默认关闭以优先速度;由 config.TRIANGLE_ENABLE_CLAHE_FALLBACK 控制。 + try: + import config as _cfg + enable_clahe = bool(getattr(_cfg, "TRIANGLE_ENABLE_CLAHE_FALLBACK", False)) + except Exception: + enable_clahe = False + + if enable_clahe: + _log(f"[TRI] 快速路径不足{len(tri_markers)}个,启用CLAHE增强") + clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) + gray_clahe = clahe.apply(gray) + tri_markers = detect_triangle_markers( + gray_clahe, orig_gray=gray, size_range=size_range_det, verbose=True + ) + else: + _log(f"[TRI] 快速路径不足{len(tri_markers)}个,跳过CLAHE兜底(已关闭)") + + if len(tri_markers) < 3: + _log(f"[TRI] 三角形不足3个: {len(tri_markers)}") + return out + + # 将缩图坐标还原为原始分辨率(K 矩阵在原始分辨率下标定) + if inv_scale != 1.0: + for m in tri_markers: + m["center"] = [m["center"][0] * inv_scale, m["center"][1] * inv_scale] + m["corners"] = [[c[0] * inv_scale, c[1] * inv_scale] for c in m["corners"]] + + lx = float(np.clip(laser_xy[0], 0, w_orig - 1)) + ly = float(np.clip(laser_xy[1], 0, h_orig - 1)) + + if len(tri_markers) == 4: + tri_sorted = sorted(tri_markers, key=lambda m: m["id"]) + marker_ids = [m["id"] for m in tri_sorted] + marker_centers = [[float(m["center"][0]), float(m["center"][1])] for m in tri_sorted] + offset_tag = "triangle_homography" + else: + marker_ids_list = [m["id"] for m in tri_markers] + marker_centers_orig = [[float(m["center"][0]), float(m["center"][1])] for m in tri_markers] + comp = complete_fourth_point(marker_ids_list, marker_centers_orig, marker_positions) + if comp is None: + _log("[TRI] 3点补全第4点失败") + return out + marker_centers, marker_ids = comp + marker_centers = [[float(c[0]), float(c[1])] for c in marker_centers] + offset_tag = "triangle_homography_3pt" + + ok_h, tx, ty, _H = homography_calibration( + marker_centers, marker_ids, marker_positions, [lx, ly] + ) + if not ok_h: + _log("[TRI] 单应性失败") + return out + + # 与 laser_manager.compute_laser_position 现网约定一致:(x_cm, -y_cm_target) + out["dx_cm"] = tx + out["dy_cm"] = -ty + out["ok"] = True + out["offset_method"] = offset_tag + out["markers"] = tri_markers # 供上层绘制标注用 + out["homography"] = _H # 供上层反推靶心像素位置用 + + dist_m = pnp_distance_meters(marker_ids, marker_centers, marker_positions, camera_matrix, dist_coeffs) + if dist_m is not None and 0.3 < dist_m < 50.0: + out["distance_m"] = dist_m + out["distance_method"] = "pnp_triangle" + _log(f"[TRI] PnP 距离={dist_m:.2f}m, 偏移=({out['dx_cm']:.2f},{out['dy_cm']:.2f})cm") + else: + out["distance_m"] = None + out["distance_method"] = None + _log(f"[TRI] PnP 距离无效,回退黄心估距; 偏移=({out['dx_cm']:.2f},{out['dy_cm']:.2f})cm") + + return out diff --git a/vision.py b/vision.py new file mode 100644 index 0000000..f97a34b --- /dev/null +++ b/vision.py @@ -0,0 +1,944 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +视觉检测模块 +提供靶心检测、距离估算、图像保存等功能 +""" +import cv2 +import numpy as np +import os +import math +import threading +import queue +from maix import image +import config +from logger_manager import logger_manager + +# 导入ArUco检测器(如果启用) +if config.USE_ARUCO: + from aruco_detector import detect_target_with_aruco, aruco_detector + +# 存图队列 + worker +_save_queue = queue.Queue(maxsize=16) +_save_worker_started = False +_save_worker_lock = threading.Lock() + +def check_laser_point_sharpness(frame, laser_point=None, roi_size=30, threshold=100.0, ellipse_params=None): + """ + 检测激光点本身的清晰度(不是整个靶子) + + Args: + frame: 图像帧对象 + laser_point: 激光点坐标 (x, y),如果为None则自动查找 + roi_size: ROI区域大小(像素),默认30x30 + threshold: 清晰度阈值 + ellipse_params: 椭圆参数 ((center_x, center_y), (width, height), angle),用于限制激光点必须在椭圆内 + + Returns: + (is_sharp, sharpness_score, laser_pos): (是否清晰, 清晰度分数, 激光点坐标) + """ + try: + # 1. 如果没有提供激光点,先查找 + if laser_point is None: + from laser_manager import laser_manager + laser_point = laser_manager.find_red_laser(frame, ellipse_params=ellipse_params) + if laser_point is None: + logger_manager.logger.debug(f"未找到激光点") + return False, 0.0, None + + x, y = laser_point + + # 2. 转换为 OpenCV 格式 + img_cv = image.image2cv(frame, False, False) + h, w = img_cv.shape[:2] + + # 3. 提取 ROI 区域(激光点周围) + roi_half = roi_size // 2 + x_min = max(0, int(x) - roi_half) + x_max = min(w, int(x) + roi_half) + y_min = max(0, int(y) - roi_half) + y_max = min(h, int(y) + roi_half) + + roi = img_cv[y_min:y_max, x_min:x_max] + + if roi.size == 0: + return False, 0.0, laser_point + + # 4. 转换为灰度图(用于清晰度检测) + gray_roi = cv2.cvtColor(roi, cv2.COLOR_RGB2GRAY) + + # 5. 方法1:检测点的扩散程度(能量集中度) + # 计算中心区域的能量集中度 + center_x, center_y = roi.shape[1] // 2, roi.shape[0] // 2 + center_radius = min(5, roi.shape[0] // 4) # 中心区域半径 + + # 创建中心区域的掩码 + y_coords, x_coords = np.ogrid[:roi.shape[0], :roi.shape[1]] + center_mask = (x_coords - center_x)**2 + (y_coords - center_y)**2 <= center_radius**2 + + # 计算中心区域和周围区域的亮度 + center_brightness = gray_roi[center_mask].mean() + outer_mask = ~center_mask + outer_brightness = gray_roi[outer_mask].mean() if np.any(outer_mask) else 0 + + # 对比度(清晰的点对比度高) + contrast = abs(center_brightness - outer_brightness) + + # 6. 方法2:检测点的边缘锐度(使用拉普拉斯) + laplacian = cv2.Laplacian(gray_roi, cv2.CV_64F) + edge_sharpness = abs(laplacian).var() + + # 7. 方法3:检测点的能量集中度(方差) + # 清晰的点:能量集中在中心,方差小 + # 模糊的点:能量分散,方差大 + # 但我们需要的是:清晰的点中心亮度高,周围低,所以梯度大 + sobel_x = cv2.Sobel(gray_roi, cv2.CV_64F, 1, 0, ksize=3) + sobel_y = cv2.Sobel(gray_roi, cv2.CV_64F, 0, 1, ksize=3) + gradient = np.sqrt(sobel_x**2 + sobel_y**2) + gradient_sharpness = gradient.var() + + # 8. 组合多个指标 + # 对比度权重0.3,边缘锐度权重0.4,梯度权重0.3 + sharpness_score = (contrast * 0.3 + edge_sharpness * 0.4 + gradient_sharpness * 0.3) + + is_sharp = sharpness_score >= threshold + + logger = logger_manager.logger + if logger: + logger.debug(f"[VISION] 激光点清晰度: 位置=({x}, {y}), 对比度={contrast:.2f}, 边缘={edge_sharpness:.2f}, 梯度={gradient_sharpness:.2f}, 综合={sharpness_score:.2f}, 是否清晰={is_sharp}") + + return is_sharp, sharpness_score, laser_point + + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[VISION] 激光点清晰度检测失败: {e}") + import traceback + logger.error(traceback.format_exc()) + return False, 0.0, laser_point + +def check_image_sharpness(frame, threshold=100.0, save_debug_images=False): + """ + 检查图像清晰度(针对圆形靶子优化,基于圆形边缘检测) + 检测靶心的圆形边缘,计算边缘区域的梯度清晰度 + + Args: + frame: 图像帧对象 + threshold: 清晰度阈值,低于此值认为图像模糊(默认100.0) + 可以根据实际情况调整: + - 清晰图像通常 > 200 + - 模糊图像通常 < 100 + - 中等清晰度 100-200 + save_debug_images: 是否保存调试图像(原始图和边缘图),默认False + + Returns: + (is_sharp, sharpness_score): (是否清晰, 清晰度分数) + """ + try: + logger_manager.logger.debug(f"begin") + # 转换为 OpenCV 格式 + img_cv = image.image2cv(frame, False, False) + logger_manager.logger.debug(f"after image2cv") + + # 转换为 HSV 颜色空间 + hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV) + h, s, v = cv2.split(hsv) + logger_manager.logger.debug(f"after HSV conversion") + + # 检测黄色区域(靶心) + # 调整饱和度策略:稍微增强,不要过度 + s_enhanced = np.clip(s * 1.1, 0, 255).astype(np.uint8) + hsv_enhanced = cv2.merge((h, s_enhanced, v)) + + # HSV 阈值范围(与 detect_circle_v3 保持一致) + lower_yellow = np.array([7, 80, 0]) + upper_yellow = np.array([32, 255, 255]) + mask_yellow = cv2.inRange(hsv_enhanced, lower_yellow, upper_yellow) + + # 形态学操作,填充小孔洞 + kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) + mask_yellow = cv2.morphologyEx(mask_yellow, cv2.MORPH_CLOSE, kernel) + logger_manager.logger.debug(f"after yellow mask detection") + + # 计算边缘区域:扩展黄色区域,然后减去原始区域,得到边缘区域 + mask_dilated = cv2.dilate(mask_yellow, kernel, iterations=2) + mask_edge = cv2.subtract(mask_dilated, mask_yellow) # 边缘区域 + + # 计算边缘区域的像素数量 + edge_pixel_count = np.sum(mask_edge > 0) + logger_manager.logger.debug(f"edge pixel count: {edge_pixel_count}") + + # 如果检测不到边缘区域,使用全局梯度作为后备方案 + if edge_pixel_count < 100: + logger_manager.logger.debug(f"edge region too small, using global gradient") + # 使用 V 通道计算全局梯度 + sobel_v_x = cv2.Sobel(v, cv2.CV_64F, 1, 0, ksize=3) + sobel_v_y = cv2.Sobel(v, cv2.CV_64F, 0, 1, ksize=3) + gradient = np.sqrt(sobel_v_x**2 + sobel_v_y**2) + sharpness_score = gradient.var() + logger_manager.logger.debug(f"global gradient variance: {sharpness_score:.2f}") + else: + # 在边缘区域计算梯度清晰度 + # 使用 V(亮度)通道计算梯度,因为边缘在亮度上通常很明显 + sobel_v_x = cv2.Sobel(v, cv2.CV_64F, 1, 0, ksize=3) + sobel_v_y = cv2.Sobel(v, cv2.CV_64F, 0, 1, ksize=3) + gradient = np.sqrt(sobel_v_x**2 + sobel_v_y**2) + + # 只在边缘区域计算清晰度 + edge_gradient = gradient[mask_edge > 0] + + if len(edge_gradient) > 0: + # 计算边缘梯度的方差(清晰图像的边缘梯度变化大) + sharpness_score = edge_gradient.var() + # 也可以使用均值作为补充指标(清晰图像的边缘梯度均值也较大) + gradient_mean = edge_gradient.mean() + logger_manager.logger.debug(f"edge gradient: mean={gradient_mean:.2f}, var={sharpness_score:.2f}, pixels={len(edge_gradient)}") + else: + # 如果边缘区域没有有效梯度,使用全局梯度 + sharpness_score = gradient.var() + logger_manager.logger.debug(f"no edge gradient, using global: {sharpness_score:.2f}") + + # 保存调试图像(如果启用) + if save_debug_images: + try: + debug_dir = config.PHOTO_DIR + if debug_dir not in os.listdir("/root"): + try: + os.mkdir(debug_dir) + except: + pass + + # 生成文件名 + try: + all_images = [f for f in os.listdir(debug_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))] + img_count = len(all_images) + except: + img_count = 0 + + # 保存原始图像 + img_orig = image.cv2image(img_cv, False, False) + orig_filename = f"{debug_dir}/sharpness_debug_orig_{img_count:04d}.bmp" + img_orig.save(orig_filename) + + # # 保存边缘检测结果(可视化) + # # 创建可视化图像:原始图像 + 黄色区域 + 边缘区域 + # debug_img = img_cv.copy() + # # 在黄色区域绘制绿色 + # debug_img[mask_yellow > 0] = [0, 255, 0] # RGB格式,绿色 + # # 在边缘区域绘制红色 + # debug_img[mask_edge > 0] = [255, 0, 0] # RGB格式,红色 + + # debug_img_maix = image.cv2image(debug_img, False, False) + # debug_filename = f"{debug_dir}/sharpness_debug_edge_{img_count:04d}.bmp" + # debug_img_maix.save(debug_filename) + + # logger = logger_manager.logger + # if logger: + # logger.info(f"[VISION] 保存调试图像: {orig_filename}, {debug_filename}") + except Exception as e: + logger = logger_manager.logger + if logger: + logger.warning(f"[VISION] 保存调试图像失败: {e}") + import traceback + logger.error(traceback.format_exc()) + + is_sharp = sharpness_score >= threshold + + logger = logger_manager.logger + if logger: + logger.debug(f"[VISION] 清晰度检测: 分数={sharpness_score:.2f}, 边缘像素数={edge_pixel_count}, 是否清晰={is_sharp}, 阈值={threshold}") + + return is_sharp, sharpness_score + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[VISION] 清晰度检测失败: {e}") + import traceback + logger.error(traceback.format_exc()) + # 出错时返回 False,避免使用模糊图像 + return False, 0.0 + +def save_calibration_image(frame, laser_pos, photo_dir=None): + """ + 保存激光校准图像(带标注) + 在找到的激光点位置绘制圆圈,便于检查算法是否正确 + + Args: + frame: 原始图像帧 + laser_pos: 找到的激光点坐标 (x, y) + photo_dir: 照片存储目录,如果为None则使用 config.PHOTO_DIR + + Returns: + str: 保存的文件路径,如果保存失败则返回 None + """ + # 检查是否启用图像保存 + if not config.SAVE_IMAGE_ENABLED: + return None + + if photo_dir is None: + photo_dir = config.PHOTO_DIR + + try: + # 确保照片目录存在 + try: + if photo_dir not in os.listdir("/root"): + os.mkdir(photo_dir) + except: + pass + + # 生成文件名 + try: + all_images = [f for f in os.listdir(photo_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))] + img_count = len(all_images) + except: + img_count = 0 + + x, y = laser_pos + filename = f"{photo_dir}/calibration_{int(x)}_{int(y)}_{img_count:04d}.bmp" + + logger = logger_manager.logger + if logger: + logger.info(f"保存校准图像: {filename}, 激光点: ({x}, {y})") + + # 转换图像为 OpenCV 格式以便绘制 + img_cv = image.image2cv(frame, False, False) + + # 绘制激光点圆圈(用绿色圆圈标出找到的激光点) + cv2.circle(img_cv, (int(x), int(y)), 10, (0, 255, 0), 2) # 外圈:绿色,半径10 + cv2.circle(img_cv, (int(x), int(y)), 5, (0, 255, 0), 2) # 中圈:绿色,半径5 + cv2.circle(img_cv, (int(x), int(y)), 2, (0, 255, 0), -1) # 中心点:绿色实心 + + # 可选:绘制十字线帮助定位 + cv2.line(img_cv, + (int(x - 20), int(y)), + (int(x + 20), int(y)), + (0, 255, 0), 1) # 水平线 + cv2.line(img_cv, + (int(x), int(y - 20)), + (int(x), int(y + 20)), + (0, 255, 0), 1) # 垂直线 + + # 转换回 MaixPy 图像格式并保存 + result_img = image.cv2image(img_cv, False, False) + result_img.save(filename) + + if logger: + logger.debug(f"校准图像已保存: {filename}") + + return filename + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"保存校准图像失败: {e}") + import traceback + logger.error(traceback.format_exc()) + return None + +# def detect_circle_v3(frame, laser_point=None): +# """检测图像中的靶心(优先清晰轮廓,其次黄色区域)- 返回椭圆参数版本 +# 增加红色圆圈检测,验证黄色圆圈是否为真正的靶心 +# 如果提供 laser_point,会选择最接近激光点的目标 + +# Args: +# frame: 图像帧 +# laser_point: 激光点坐标 (x, y),用于多目标场景下的目标选择 + +# Returns: +# (result_img, best_center, best_radius, method, best_radius1, ellipse_params) +# """ +# img_cv = image.image2cv(frame, False, False) + +# best_center = best_radius = best_radius1 = method = None +# ellipse_params = None + +# # HSV 黄色掩码检测(模糊靶心) +# hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV) +# h, s, v = cv2.split(hsv) + +# # 调整饱和度策略:稍微增强,不要过度 +# s = np.clip(s * 1.1, 0, 255).astype(np.uint8) + +# hsv = cv2.merge((h, s, v)) + +# # 放宽 HSV 阈值范围(针对模糊图像的关键调整) +# lower_yellow = np.array([7, 80, 0]) # 饱和度下限降低,捕捉淡黄色 +# upper_yellow = np.array([32, 255, 255]) # 亮度上限拉满 + +# mask_yellow = cv2.inRange(hsv, lower_yellow, upper_yellow) + +# # 调整形态学操作 +# kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) +# mask_yellow = cv2.morphologyEx(mask_yellow, cv2.MORPH_CLOSE, kernel) + +# contours_yellow, _ = cv2.findContours(mask_yellow, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + +# # 存储所有有效的黄色-红色组合 +# valid_targets = [] + +# if contours_yellow: +# for cnt_yellow in contours_yellow: +# area = cv2.contourArea(cnt_yellow) +# perimeter = cv2.arcLength(cnt_yellow, True) + +# # 计算圆度 +# if perimeter > 0: +# circularity = (4 * np.pi * area) / (perimeter * perimeter) +# else: +# circularity = 0 + +# logger = logger_manager.logger +# if area > 50 and circularity > 0.7: +# if logger: +# logger.info(f"[target] -> 面积:{area}, 圆度:{circularity:.2f}") +# # 尝试拟合椭圆 +# yellow_center = None +# yellow_radius = None +# yellow_ellipse = None + +# if len(cnt_yellow) >= 5: +# (x, y), (width, height), angle = cv2.fitEllipse(cnt_yellow) +# yellow_ellipse = ((x, y), (width, height), angle) +# axes_minor = min(width, height) +# radius = axes_minor / 2 +# yellow_center = (int(x), int(y)) +# yellow_radius = int(radius) +# else: +# (x, y), radius = cv2.minEnclosingCircle(cnt_yellow) +# yellow_center = (int(x), int(y)) +# yellow_radius = int(radius) +# yellow_ellipse = None + +# # 如果检测到黄色圆圈,再检测红色圆圈进行验证 +# if yellow_center and yellow_radius: +# # HSV 红色掩码检测(红色在HSV中跨越0度,需要两个范围) +# # 红色范围1: 0-10度(接近0度的红色) +# lower_red1 = np.array([0, 80, 0]) +# upper_red1 = np.array([10, 255, 255]) +# mask_red1 = cv2.inRange(hsv, lower_red1, upper_red1) + +# # 红色范围2: 170-180度(接近180度的红色) +# lower_red2 = np.array([170, 80, 0]) +# upper_red2 = np.array([180, 255, 255]) +# mask_red2 = cv2.inRange(hsv, lower_red2, upper_red2) + +# # 合并两个红色掩码 +# mask_red = cv2.bitwise_or(mask_red1, mask_red2) + +# # 形态学操作 +# kernel_red = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) +# mask_red = cv2.morphologyEx(mask_red, cv2.MORPH_CLOSE, kernel_red) + +# contours_red, _ = cv2.findContours(mask_red, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + +# found_valid_red = False + +# if contours_red: +# # 找到所有符合条件的红色圆圈 +# for cnt_red in contours_red: +# area_red = cv2.contourArea(cnt_red) +# perimeter_red = cv2.arcLength(cnt_red, True) + +# if perimeter_red > 0: +# circularity_red = (4 * np.pi * area_red) / (perimeter_red * perimeter_red) +# else: +# circularity_red = 0 + +# # 红色圆圈也应该有一定的圆度 +# if area_red > 50 and circularity_red > 0.6: +# # 计算红色圆圈的中心和半径 +# if len(cnt_red) >= 5: +# (x_red, y_red), (w_red, h_red), angle_red = cv2.fitEllipse(cnt_red) +# radius_red = min(w_red, h_red) / 2 +# red_center = (int(x_red), int(y_red)) +# red_radius = int(radius_red) +# else: +# (x_red, y_red), radius_red = cv2.minEnclosingCircle(cnt_red) +# red_center = (int(x_red), int(y_red)) +# red_radius = int(radius_red) + +# # 计算黄色和红色圆心的距离 +# if red_center: +# dx = yellow_center[0] - red_center[0] +# dy = yellow_center[1] - red_center[1] +# distance = np.sqrt(dx*dx + dy*dy) + +# # 圆心距离阈值:应该小于黄色半径的某个倍数(比如1.5倍) +# max_distance = yellow_radius * 1.5 + +# # 红色圆圈应该比黄色圆圈大(外圈) +# if distance < max_distance and red_radius > yellow_radius * 0.8: +# found_valid_red = True +# logger = logger_manager.logger +# if logger: +# logger.info(f"[target] -> 找到匹配的红圈: 黄心({yellow_center}), 红心({red_center}), 距离:{distance:.1f}, 黄半径:{yellow_radius}, 红半径:{red_radius}") + +# # 记录这个有效目标 +# valid_targets.append({ +# 'center': yellow_center, +# 'radius': yellow_radius, +# 'ellipse': yellow_ellipse, +# 'area': area +# }) +# break + +# if not found_valid_red: +# logger = logger_manager.logger +# if logger: +# logger.debug("Debug -> 未找到匹配的红色圆圈,可能是误识别") + +# # 从所有有效目标中选择最佳目标 +# if valid_targets: +# if laser_point: +# # 如果有激光点,选择最接近激光点的目标 +# best_target = None +# min_distance = float('inf') +# for target in valid_targets: +# dx = target['center'][0] - laser_point[0] +# dy = target['center'][1] - laser_point[1] +# distance = np.sqrt(dx*dx + dy*dy) +# if distance < min_distance: +# min_distance = distance +# best_target = target +# if best_target: +# best_center = best_target['center'] +# best_radius = best_target['radius'] +# ellipse_params = best_target['ellipse'] +# method = "v3_ellipse_red_validated_laser_selected" +# best_radius1 = best_radius * 5 +# else: +# # 如果没有激光点,选择面积最大的目标 +# best_target = max(valid_targets, key=lambda t: t['area']) +# best_center = best_target['center'] +# best_radius = best_target['radius'] +# ellipse_params = best_target['ellipse'] +# method = "v3_ellipse_red_validated" +# best_radius1 = best_radius * 5 + +# result_img = image.cv2image(img_cv, False, False) +# return result_img, best_center, best_radius, method, best_radius1, ellipse_params + +def detect_circle_v3(frame, laser_point=None, img_cv=None): + """检测图像中的靶心(优先清晰轮廓,其次黄色区域)- 返回椭圆参数版本 + 增加红色圆圈检测,验证黄色圆圈是否为真正的靶心 + 如果提供 laser_point,会选择最接近激光点的目标 + 优化: + 1. 缩图到 MAX_DET_DIM 后再做 HSV/形态学,最长边 640->320 可获得 ~4x 加速 + 2. 红色掩码在黄色轮廓循环外只计算一次,避免 N 次重复计算 + 3. img_cv 可由外部传入(与其他线程共享转换结果),为 None 时自动转换 + Args: + frame: 图像帧(img_cv 为 None 时使用) + laser_point: 激光点坐标 (x, y),用于多目标场景下的目标选择 + img_cv: 已转换的 numpy BGR/RGB 图像;不为 None 时跳过 image2cv 转换 + Returns: + (result_img, best_center, best_radius, method, best_radius1, ellipse_params) + """ + if img_cv is None: + img_cv = image.image2cv(frame, False, False) + logger = logger_manager.logger + from datetime import datetime + logger.debug(f"[detect_circle_v3] begin {datetime.now()}") + # -- 1. 缩图加速(与三角形路径保持一致) + h_orig, w_orig = img_cv.shape[:2] + MAX_DET_DIM = 320 + long_side = max(h_orig, w_orig) + if long_side > MAX_DET_DIM: + det_scale = MAX_DET_DIM / long_side + img_det = cv2.resize(img_cv, (int(w_orig * det_scale), int(h_orig * det_scale)), + interpolation=cv2.INTER_LINEAR) + inv_scale = 1.0 / det_scale # 检测坐标 -> 原始坐标的倍率 + else: + img_det = img_cv + inv_scale = 1.0 + + # 激光点映射到检测分辨率 + lp_det = None + if laser_point is not None: + lp_det = (laser_point[0] / inv_scale, laser_point[1] / inv_scale) + best_center = best_radius = best_radius1 = method = None + ellipse_params = None + + logger.debug(f"[detect_circle_v3] step 1 fin {datetime.now()}") + + # -- 2. HSV + 黄色掩码 + hsv = cv2.cvtColor(img_det, cv2.COLOR_RGB2HSV) + h, s, v = cv2.split(hsv) + s = np.clip(s * 1.1, 0, 255).astype(np.uint8) + hsv = cv2.merge((h, s, v)) + lower_yellow = np.array([7, 80, 0]) + upper_yellow = np.array([32, 255, 255]) + mask_yellow = cv2.inRange(hsv, lower_yellow, upper_yellow) + kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) + mask_yellow = cv2.morphologyEx(mask_yellow, cv2.MORPH_CLOSE, kernel) + + logger.debug(f"[detect_circle_v3] step 2 fin {datetime.now()}") + + # -- 3. 红色掩码:在循环外只算一次 + mask_red = cv2.bitwise_or( + cv2.inRange(hsv, np.array([0, 80, 0]), np.array([10, 255, 255])), + cv2.inRange(hsv, np.array([170, 80, 0]), np.array([180, 255, 255])), + ) + kernel_red = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) + mask_red = cv2.morphologyEx(mask_red, cv2.MORPH_CLOSE, kernel_red) + contours_red, _ = cv2.findContours(mask_red, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + # 预先把红色轮廓筛选成 (center, radius) 列表,后续直接查表 + red_candidates = [] + for cnt_r in contours_red: + ar = cv2.contourArea(cnt_r) + if ar <= 50: + continue + pr = cv2.arcLength(cnt_r, True) + if pr <= 0 or (4 * np.pi * ar) / (pr * pr) <= 0.6: + continue + if len(cnt_r) >= 5: + (xr, yr), (wr, hr), _ = cv2.fitEllipse(cnt_r) + red_candidates.append({"center": (int(xr), int(yr)), "radius": int(min(wr, hr) / 2)}) + else: + (xr, yr), rr = cv2.minEnclosingCircle(cnt_r) + red_candidates.append({"center": (int(xr), int(yr)), "radius": int(rr)}) + + logger.debug(f"[detect_circle_v3] step 3 fin {datetime.now()}") + + # -- 4. 黄色轮廓循环(复用上面的红色候选列表) + contours_yellow, _ = cv2.findContours(mask_yellow, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + valid_targets = [] + for cnt_yellow in contours_yellow: + area = cv2.contourArea(cnt_yellow) + if area <= 50: + continue + perimeter = cv2.arcLength(cnt_yellow, True) + if perimeter <= 0: + continue + circularity = (4 * np.pi * area) / (perimeter * perimeter) + if circularity <= 0.7: + continue + if logger: + logger.info(f"[target] -> 面积:{area:.1f}, 圆度:{circularity:.2f}") + if len(cnt_yellow) >= 5: + (x, y), (width, height), angle = cv2.fitEllipse(cnt_yellow) + yellow_ellipse = ((x, y), (width, height), angle) + yellow_center = (int(x), int(y)) + yellow_radius = int(min(width, height) / 2) + else: + (x, y), radius = cv2.minEnclosingCircle(cnt_yellow) + yellow_center = (int(x), int(y)) + yellow_radius = int(radius) + yellow_ellipse = None + # 在预筛好的红色候选中匹配 + matched = False + for rc in red_candidates: + ddx = yellow_center[0] - rc["center"][0] + ddy = yellow_center[1] - rc["center"][1] + dist_centers = math.hypot(ddx, ddy) + if dist_centers < yellow_radius * 1.5 and rc["radius"] > yellow_radius * 0.8: + if logger: + logger.info(f"[target] -> 找到匹配的红圈: 黄心({yellow_center}), " + f"红心({rc['center']}), 距离:{dist_centers:.1f}, " + f"黄半径:{yellow_radius}, 红半径:{rc['radius']}") + valid_targets.append({ + "center": yellow_center, + "radius": yellow_radius, + "ellipse": yellow_ellipse, + "area": area, + }) + matched = True + break + if not matched and logger: + logger.debug("Debug -> 未找到匹配的红色圆圈,可能是误识别") + + logger.debug(f"[detect_circle_v3] step 4 fin {datetime.now()}") + + # -- 5. 选最佳目标,坐标还原到原始分辨率 + if valid_targets: + if lp_det: + best_target = min(valid_targets, + key=lambda t: (t["center"][0] - lp_det[0]) ** 2 + + (t["center"][1] - lp_det[1]) ** 2) + method = "v3_ellipse_red_validated_laser_selected" + else: + best_target = max(valid_targets, key=lambda t: t["area"]) + method = "v3_ellipse_red_validated" + bc = best_target["center"] + br = best_target["radius"] + be = best_target["ellipse"] + if inv_scale != 1.0: + best_center = (int(bc[0] * inv_scale), int(bc[1] * inv_scale)) + best_radius = int(br * inv_scale) + if be is not None: + (ex, ey), (ew, eh), ea = be + be = ((ex * inv_scale, ey * inv_scale), + (ew * inv_scale, eh * inv_scale), ea) + else: + best_center = bc + best_radius = br + ellipse_params = be + best_radius1 = best_radius * 5 + result_img = image.cv2image(img_cv, False, False) + logger.debug(f"[detect_circle_v3] step 5 fin {datetime.now()}") + return result_img, best_center, best_radius, method, best_radius1, ellipse_params + +def estimate_distance(pixel_radius): + """根据像素半径估算实际距离(单位:米)""" + if not pixel_radius: + return 0.0 + return (config.REAL_RADIUS_CM * config.FOCAL_LENGTH_PIX) / pixel_radius / 100.0 + +def estimate_pixel(physical_distance_cm, target_distance_m): + """ + 根据物理距离和目标距离计算对应的像素偏移 + + Args: + physical_distance_cm: 物理世界中的距离(厘米),例如激光与摄像头的距离 + target_distance_m: 目标距离(米),例如到靶心的距离 + + Returns: + float: 对应的像素偏移 + """ + if not target_distance_m or target_distance_m <= 0: + return 0.0 + # 公式:像素偏移 = (物理距离_米) * 焦距_像素 / 目标距离_米 + return (physical_distance_cm / 100.0) * config.FOCAL_LENGTH_PIX / target_distance_m + + +def _save_shot_image_impl(img_cv, center, radius, method, ellipse_params, + laser_point, distance_m, shot_id=None, photo_dir=None): + """ + 内部实现:在 img_cv (numpy HWC RGB) 上绘制标注并保存。 + 由 save_shot_image(同步)和存图 worker(异步)调用。 + """ + if not config.SAVE_IMAGE_ENABLED: + return None + if photo_dir is None: + photo_dir = config.PHOTO_DIR + try: + try: + if photo_dir not in os.listdir("/root"): + os.mkdir(photo_dir) + except Exception: + pass + + x, y = laser_point + if shot_id: + # 之前是用 center/radius 判定 no_target;但三角形路径会返回 center=None(正常) + # 这里改为:只要 method 有值,就按 method 命名;否则才回退 no_target + method_str = (method or "").strip() + if method_str: + filename = f"{photo_dir}/shot_{shot_id}_{method_str}.bmp" + else: + filename = f"{photo_dir}/shot_{shot_id}_no_target.bmp" + else: + try: + all_images = [f for f in os.listdir(photo_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))] + img_count = len(all_images) + except Exception: + img_count = 0 + if center is None or radius is None: + method_str = "no_target" + distance_str = "000" + else: + method_str = method or "unknown" + distance_str = str(round((distance_m or 0.0) * 100)) + filename = f"{photo_dir}/{method_str}_{int(x)}_{int(y)}_{distance_str}_{img_count:04d}.bmp" + + logger = logger_manager.logger + if logger: + if shot_id: + logger.info(f"[VISION] 保存射箭图像,ID: {shot_id}, 文件名: {filename}") + if center and radius: + logger.info(f"结果 -> 圆心: {center}, 半径: {radius}, 方法: {method}") + if ellipse_params: + (ec, (ew, eh), ea) = ellipse_params + logger.info(f"椭圆 -> 中心: ({ec[0]:.1f}, {ec[1]:.1f}), 长轴: {max(ew, eh):.1f}, 短轴: {min(ew, eh):.1f}, 角度: {ea:.1f}°") + else: + logger.info(f"结果 -> 未检测到靶心,保存原始图像(激光点: ({x}, {y}))") + + # laser_color = (config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2]) + # cross_thickness = int(max(getattr(config, "LASER_THICKNESS", 1), 1)) + # cross_length = int(max(getattr(config, "LASER_LENGTH", 10), 10)) + # cv2.line(img_cv, (int(x - cross_length), int(y)), (int(x + cross_length), int(y)), laser_color, cross_thickness) + # cv2.line(img_cv, (int(x), int(y - cross_length)), (int(x), int(y + cross_length)), laser_color, cross_thickness) + # cv2.circle(img_cv, (int(x), int(y)), 1, laser_color, cross_thickness) + # ring_thickness = 1 + # cv2.circle(img_cv, (int(x), int(y)), 10, laser_color, ring_thickness) + # cv2.circle(img_cv, (int(x), int(y)), 5, laser_color, ring_thickness) + # cv2.circle(img_cv, (int(x), int(y)), 2, laser_color, -1) + + if center and radius: + cx, cy = center + if ellipse_params: + (ell_center, (width, height), angle) = ellipse_params + cx_ell, cy_ell = int(ell_center[0]), int(ell_center[1]) + cv2.ellipse(img_cv, (cx_ell, cy_ell), (int(width / 2), int(height / 2)), angle, 0, 360, (0, 255, 0), 2) + cv2.circle(img_cv, (cx_ell, cy_ell), 3, (255, 0, 0), -1) + minor_length = min(width, height) / 2 + minor_angle = angle + 90 if width >= height else angle + minor_angle_rad = math.radians(minor_angle) + dx_minor = minor_length * math.cos(minor_angle_rad) + dy_minor = minor_length * math.sin(minor_angle_rad) + pt1 = (int(cx_ell - dx_minor), int(cy_ell - dy_minor)) + pt2 = (int(cx_ell + dx_minor), int(cy_ell + dy_minor)) + cv2.line(img_cv, pt1, pt2, (0, 0, 255), 2) + else: + cv2.circle(img_cv, (cx, cy), radius, (0, 0, 255), 2) + cv2.circle(img_cv, (cx, cy), 2, (0, 0, 255), -1) + cv2.line(img_cv, (int(x), int(y)), (cx, cy), (255, 255, 0), 1) + + out = image.cv2image(img_cv, False, False) + out.save(filename) + if logger: + if center and radius: + logger.debug(f"图像已保存(含靶心标注): {filename}") + else: + logger.debug(f"图像已保存(无靶心,含激光十字线): {filename}") + + # 清理旧图片:如果目录下图片超过100张,删除最老的 + try: + image_files = [] + for f in os.listdir(photo_dir): + if f.endswith(('.bmp', '.jpg', '.jpeg')): + filepath = os.path.join(photo_dir, f) + try: + mtime = os.path.getmtime(filepath) + image_files.append((mtime, filepath, f)) + except Exception: + pass + + from config import MAX_IMAGES + if len(image_files) > MAX_IMAGES: + image_files.sort(key=lambda x: x[0]) + to_delete = len(image_files) - MAX_IMAGES + deleted_count = 0 + for _, filepath, fname in image_files[:to_delete]: + try: + os.remove(filepath) + deleted_count += 1 + if logger: + logger.debug(f"[VISION] 删除旧图片: {fname}") + except Exception as e: + if logger: + logger.warning(f"[VISION] 删除旧图片失败 {fname}: {e}") + if logger and deleted_count > 0: + logger.info(f"[VISION] 已清理 {deleted_count} 张旧图片,当前剩余 {MAX_IMAGES} 张") + except Exception as e: + if logger: + logger.warning(f"[VISION] 清理旧图片时出错(可忽略): {e}") + + return filename + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"保存图像失败: {e}") + import traceback + logger.error(traceback.format_exc()) + return None + + +def _save_worker_loop(): + """存图 worker:从队列取任务并调用 _save_shot_image_impl。""" + while True: + try: + item = _save_queue.get() + if item is None: + break + _save_shot_image_impl(*item) + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[VISION] 存图 worker 异常: {e}") + import traceback + logger.error(traceback.format_exc()) + finally: + try: + _save_queue.task_done() + except Exception: + pass + + +def start_save_shot_worker(): + """启动存图 worker 线程(应在程序初始化时调用一次)。""" + global _save_worker_started + with _save_worker_lock: + if _save_worker_started: + return + _save_worker_started = True + t = threading.Thread(target=_save_worker_loop, daemon=True) + t.start() + logger = logger_manager.logger + if logger: + logger.info("[VISION] 存图 worker 线程已启动") + + +def enqueue_save_shot(result_img, center, radius, method, ellipse_params, + laser_point, distance_m, shot_id=None, photo_dir=None): + """ + 将存图任务放入队列,由 worker 异步保存。主线程传入 result_img 的复制,不阻塞。 + """ + if not config.SAVE_IMAGE_ENABLED: + return + if photo_dir is None: + photo_dir = config.PHOTO_DIR + try: + img_cv = image.image2cv(result_img, False, False) + img_copy = np.copy(img_cv) + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[VISION] enqueue_save_shot 复制图像失败: {e}") + return + task = (img_copy, center, radius, method, ellipse_params, laser_point, distance_m, shot_id, photo_dir) + try: + _save_queue.put_nowait(task) + except queue.Full: + logger = logger_manager.logger + if logger: + logger.warning("[VISION] 存图队列已满,跳过本次保存") + + +def save_shot_image(result_img, center, radius, method, ellipse_params, + laser_point, distance_m, shot_id=None, photo_dir=None): + """ + 保存射击图像(带标注)。同步调用,会阻塞。 + 主流程建议使用 enqueue_save_shot;此处保留供校准、测试等场景使用。 + """ + if not config.SAVE_IMAGE_ENABLED: + return None + if photo_dir is None: + photo_dir = config.PHOTO_DIR + try: + img_cv = image.image2cv(result_img, False, False) + return _save_shot_image_impl(img_cv, center, radius, method, ellipse_params, + laser_point, distance_m, shot_id, photo_dir) + except Exception as e: + logger = logger_manager.logger + if logger: + logger.error(f"[VISION] save_shot_image 转换图像失败: {e}") + return None + + +def detect_target(frame, laser_point=None): + """ + 统一的靶心检测接口,根据配置自动选择检测方法 + + Args: + frame: MaixPy图像帧 + laser_point: 激光点坐标(可选) + + Returns: + (result_img, center, radius, method, best_radius1, ellipse_params) + 与detect_circle_v3保持相同的返回格式 + """ + logger = logger_manager.logger + + if config.USE_ARUCO: + # 使用ArUco检测 + if logger: + logger.debug("[VISION] 使用ArUco标记检测靶心") + + # 延迟导入以避免循环依赖 + from aruco_detector import detect_target_with_aruco + return detect_target_with_aruco(frame, laser_point) + else: + # 使用传统黄色靶心检测 + if logger: + logger.debug("[VISION] 使用传统黄色靶心检测") + return detect_circle_v3(frame, laser_point) +