upload log file to qiqiu

This commit is contained in:
gcw_4spBpAfv
2026-04-23 17:53:21 +08:00
parent 12fac4ea1c
commit 8efe1ae5c5
5 changed files with 316 additions and 877 deletions

View File

@@ -9,7 +9,7 @@ from version import VERSION
# ==================== 应用配置 ==================== # ==================== 应用配置 ====================
APP_VERSION = VERSION APP_VERSION = VERSION
APP_DIR = "/maixapp/apps/t11" APP_DIR = "/maixapp/apps/t11"
LOCAL_FILENAME = "/maixapp/apps/t11/main_tmp.py" LOCAL_FILENAME = APP_DIR + "/main_tmp.py"
# ==================== 服务器配置 ==================== # ==================== 服务器配置 ====================
# SERVER_IP = "stcp.shelingxingqiu.com" # SERVER_IP = "stcp.shelingxingqiu.com"
@@ -22,7 +22,7 @@ WIFI_QUALITY_RTT_SAMPLES = 3 # 到业务服务器 TCP 建连耗时采样次数
WIFI_QUALITY_RTT_BAD_MS = 600.0 # 中位数超过此值认为延迟过高 WIFI_QUALITY_RTT_BAD_MS = 600.0 # 中位数超过此值认为延迟过高
WIFI_QUALITY_RTT_WARN_MS = 350.0 # 与 RSSI 联合:超过此值且信号弱也判为差 WIFI_QUALITY_RTT_WARN_MS = 350.0 # 与 RSSI 联合:超过此值且信号弱也判为差
WIFI_QUALITY_RSSI_BAD_DBM = -80.0 # 低于此 dBm更负更差视为信号弱 WIFI_QUALITY_RSSI_BAD_DBM = -80.0 # 低于此 dBm更负更差视为信号弱
WIFI_QUALITY_USE_RSSI = True # 是否把 RSSI 纳入综合判定False 则仅看 RTT WIFI_QUALITY_USE_RSSI = True # 是否把 RSSI 纳入综合判定
# WiFi 热点配网(手机连设备 AP浏览器提交路由器 SSID/密码;仅 GET/POST标准库 socket # WiFi 热点配网(手机连设备 AP浏览器提交路由器 SSID/密码;仅 GET/POST标准库 socket
WIFI_CONFIG_AP_FALLBACK = True # # WiFi 配网失败时,是否退回热点模式,并等待重新配网 WIFI_CONFIG_AP_FALLBACK = True # # WiFi 配网失败时,是否退回热点模式,并等待重新配网
@@ -46,14 +46,14 @@ SSL_AUTH_MODE = 1 # 1=单向认证验证服务器2=双向
SSL_VERIFY_MODE = 1 # 0=不验仅测试用1=写入并使用 CA 证书 SSL_VERIFY_MODE = 1 # 0=不验仅测试用1=写入并使用 CA 证书
SSL_CERT_FILENAME = "server.pem" # 模组里证书名MSSLCERTWR / MSSLCFG="cert" 用) SSL_CERT_FILENAME = "server.pem" # 模组里证书名MSSLCERTWR / MSSLCFG="cert" 用)
SSL_CERT_PATH = "/maixapp/apps/t11/server.pem" # 设备文件系统里 CA 证书路径(你自己放进去) SSL_CERT_PATH = APP_DIR + "/server.pem" # 设备文件系统里 CA 证书路径(你自己放进去)
# MIPOPEN 末尾的参数在不同固件里含义可能不同;按你手册例子保留 # MIPOPEN 末尾的参数在不同固件里含义可能不同;按你手册例子保留
MIPOPEN_TAIL = ",,0" MIPOPEN_TAIL = ",,0"
# ==================== 文件路径配置 ==================== # ==================== 文件路径配置 ====================
CONFIG_FILE = "/root/laser_config.json" CONFIG_FILE = "/root/laser_config.json"
LOG_FILE = "/maixapp/apps/t11/app.log" LOG_FILE = APP_DIR + "/app.log"
BACKUP_BASE = "/maixapp/apps/t11/backups" BACKUP_BASE = APP_DIR + "/backups"
# ==================== 硬件配置 ==================== # ==================== 硬件配置 ====================
# WiFi模块开关True=有WiFi模块False=无WiFi模块 # WiFi模块开关True=有WiFi模块False=无WiFi模块

View File

@@ -1,17 +1,6 @@
你现在要防的是“别人拿到设备/拿到代码包后,能伪造请求、刷接口、下发恶意 OTA、甚至劫持通信”。单靠隐藏 Python 源码只能提高门槛,真正的安全要靠协议和密钥设计。结合你仓库里实际内容,建议你重点隐藏/整改这些点(按风险排序)。 你现在要防的是“别人拿到设备/拿到代码包后,能伪造请求、刷接口、下发恶意 OTA、甚至劫持通信”。单靠隐藏 Python 源码只能提高门槛,真正的安全要靠协议和密钥设计。结合你仓库里实际内容,建议你重点隐藏/整改这些点(按风险排序)。
1. 必须隐藏/必须整改(高风险)
1.1 登录口令规则太弱(几乎等于明文)
你现在的登录是 password = device_id + "."(见 network.py 读取设备 ID 后直接拼出来),这意味着只要攻击者知道/猜到 device_id就能直接登录伪装设备。
相关位置:
with open("/device_key", "r") as f: device_id = f.read().strip() ... self._device_id = device_id self._password = device_id + "."
1.2 HTTP 鉴权 token 的盐值是硬编码常量(泄露后可离线伪造)
你 token 是 HMAC-SHA256((SALT+device_id), SALT2),而 SALT/SALT2 是固定字符串:"shootMessageFire" / "shoot"。这类“硬编码盐值 + 可猜/可读的 device_id”意味着攻击者只要拿到代码包/逆向 .so就能在自己电脑上批量算 token伪造 HTTP 请求。
相关位置:
SALT = "shootMessageFire"SALT2 = "shoot"return "Arrow_" + hmac.new((SALT + device_id).encode(), SALT2.encode(), hashlib.sha256).hexdigest()
1.3 TLS 配置目前看起来没有做证书校验(容易被中间人攻击)
config.py 虽然 USE_TCP_SSL=True但你在 network.py 里实际把 MSSLCFG="auth" 固定成 0不验且写证书分支被 if False 禁用了。这样“看起来是 TLS”但仍可能被抓包/篡改/假服务器接入。
相关位置:
r = hardware_manager.at_client.send(f'AT+MSSLCFG="auth",{ssl_id},0', "OK", 3000)...if False: # 写证书/校验被禁用 ...r = hardware_manager.at_client.send(f'AT+MIPCFG="ssl",{link_id},{ssl_id},1', "OK", 3000)
1.4 OTA 下发“url”如果缺少强校验就是远程代码执行入口 1.4 OTA 下发“url”如果缺少强校验就是远程代码执行入口
你 OTA 逻辑里从服务器指令拿到 url 就去下载并替换文件/重启(这是正常 OTA但安全性取决于 你 OTA 逻辑里从服务器指令拿到 url 就去下载并替换文件/重启(这是正常 OTA但安全性取决于
是否只允许白名单域名/路径 是否只允许白名单域名/路径
@@ -26,20 +15,11 @@ SERVER_IP = "stcp.shelingxingqiu.com"SERVER_PORT = 50005HTTP_URL = "http://ws.sh
注意:即使你把它们藏进 .so攻击者仍可能通过抓包/观察 DNS/SNI/流量拿到域名或 IP所以“隐藏”只能降低静态分析风险不能替代鉴权/签名。 注意:即使你把它们藏进 .so攻击者仍可能通过抓包/观察 DNS/SNI/流量拿到域名或 IP所以“隐藏”只能降低静态分析风险不能替代鉴权/签名。
2.3 日志/调试信息泄露
你仓库里 .cursor/debug.log* 已经记录了完整的 AT+MIPOPEN ... host:port 等信息;如果这些被打进包或被用户导出,也会泄露通信细节。
你发布包里“真正会被带走的敏感内容”
你现在 app.yaml 的打包清单只包含核心运行文件(不包含 test/、backup_code/、.cursor/ 等),这一点很好:
files: - app.yaml - at_client.py - camera_manager.py - config.py - hardware.py - laser_manager.py - logger_manager.py - main.py - network.py - ota_manager.py - power.py - shot_id_generator.py - time_sync.py - version.py - vision.py
但要注意:你当前最敏感的“盐值/口令规则/HTTP header 逻辑/OTA 策略”都在这些会打包的文件里(尤其是 network.py、ota_manager.py、config.py
3. 最有效的“防黑”措施(比隐藏更关键) 3. 最有效的“防黑”措施(比隐藏更关键)
把“password=device_id+.” 改为真正的 per-device secret设备出厂烧录服务端保存或服务端下发公钥设备用私钥签名
所有上报/控制命令加签名 + nonce/timestamp + 服务端防重放(别人抓到一次包也不能复用) 所有上报/控制命令加签名 + nonce/timestamp + 服务端防重放(别人抓到一次包也不能复用)
OTA 包必须做签名校验(设备端内置公钥,下载后验签通过才应用) OTA 包必须做签名校验(设备端内置公钥,下载后验签通过才应用)
如果你告诉我:你们服务端目前能不能改协议(例如新增签名字段、下发 challenge、做 OTA 签名),我可以按“最小改动但提升最大安全”的顺序,帮你规划一套从现状平滑升级的方案。 如果你告诉我:你们服务端目前能不能改协议(例如新增签名字段、下发 challenge、做 OTA 签名),我可以按“最小改动但提升最大安全”的顺序,帮你规划一套从现状平滑升级的方案。
https://wiki.sipeed.com/maixpy/doc/zh/pro/compile_os.html

View File

@@ -1358,6 +1358,237 @@ class NetworkManager:
self.logger.error(f"[LOG_UPLOAD] 上传异常: {e}") self.logger.error(f"[LOG_UPLOAD] 上传异常: {e}")
self.safe_enqueue({"result": "log_upload_failed", "reason": str(e)[:100]}, 2) self.safe_enqueue({"result": "log_upload_failed", "reason": str(e)[:100]}, 2)
def _prepare_log_archive(self, include_rotated=True, max_files=None, archive_format="tgz"):
"""准备日志归档压缩包,返回 (archive_path, archive_filename) 或 (None, error_msg)
Args:
include_rotated: 是否包含轮转日志
max_files: 最多打包多少个日志文件
archive_format: tgz 或 zip
"""
import shutil
from datetime import datetime
import glob
try:
log_file_path = config.LOG_FILE
if not os.path.exists(log_file_path):
return None, "log_file_not_found"
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
device_id = self._device_id or "unknown"
base_name = f"logs_{timestamp}_{device_id}"
archive_format = (archive_format or "tgz").strip().lower()
if archive_format not in ("tgz", "zip"):
archive_format = "tgz"
candidates = [log_file_path]
if include_rotated:
candidates = sorted(set(glob.glob(log_file_path + "*")))
candidates = [p for p in candidates if os.path.isfile(p)]
def _log_sort_key(p):
if p == log_file_path:
return (0, 0, p)
suffix = p[len(log_file_path):]
if suffix.startswith("."):
try:
return (1, int(suffix[1:]), p)
except:
return (2, 999999, p)
return (3, 999999, p)
candidates.sort(key=_log_sort_key)
if max_files is None:
try:
max_files = 1 + int(getattr(config, "LOG_BACKUP_COUNT", 5))
except:
max_files = 6
try:
max_files = int(max_files)
except:
max_files = 6
max_files = max(1, min(max_files, 20))
selected = candidates[:max_files]
if not selected:
return None, "no_log_files"
os.system("sync")
temp_dir = "/tmp"
staging_dir = os.path.join(temp_dir, f"log_upload_{base_name}")
os.makedirs(staging_dir, exist_ok=True)
staged_paths = []
try:
for p in selected:
dst = os.path.join(staging_dir, os.path.basename(p))
shutil.copy2(p, dst)
staged_paths.append(dst)
except Exception as e:
try:
shutil.rmtree(staging_dir)
except:
pass
return None, f"snapshot_failed: {e}"
if archive_format == "zip":
archive_filename = f"{base_name}.zip"
else:
archive_filename = f"{base_name}.tar.gz"
archive_path = os.path.join(temp_dir, archive_filename)
try:
if archive_format == "zip":
import zipfile
with zipfile.ZipFile(archive_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for p in staged_paths:
zf.write(p, arcname=os.path.basename(p))
else:
import tarfile
with tarfile.open(archive_path, "w:gz") as tf:
for p in staged_paths:
tf.add(p, arcname=os.path.basename(p))
except Exception as e:
try:
shutil.rmtree(staging_dir)
except:
pass
try:
if os.path.exists(archive_path):
os.remove(archive_path)
except:
pass
return None, f"archive_failed: {e}"
finally:
try:
shutil.rmtree(staging_dir)
except:
pass
return archive_path, archive_filename
except Exception as e:
return None, f"prepare_exception: {e}"
def _upload_log_file_v2(self, upload_url, upload_token, key, outlink="", include_rotated=True, max_files=None, archive_format="tgz"):
"""上传日志到 Qiniu支持 WiFi 和 4G 双路径)
流程:准备日志归档 -> 自动检测网络 -> WiFi(requests) 或 4G(AT命令) 上传
"""
import shutil
# 1) 准备日志归档
archive_path, info = self._prepare_log_archive(include_rotated, max_files, archive_format)
if archive_path is None:
self.logger.error(f"[LOG_UPLOAD] 准备归档失败: {info}")
self.safe_enqueue({"result": "log_upload_failed", "reason": info}, 2)
return
archive_filename = info
# key 是服务器下发的目录前缀,最终 key = prefix/filename
qiniu_key = key.rstrip("/") + "/" + archive_filename
self.logger.info(f"[LOG_UPLOAD] 日志归档已生成: {archive_path}, qiniu_key: {qiniu_key}")
try:
# 2) WiFi 优先:只要 WiFi 已连接就先尝试 WiFi失败再回落到 4G
wifi_tried = False
wifi_ok = False
if self.is_wifi_connected():
wifi_tried = True
self.logger.info(f"[LOG_UPLOAD] Using wifi path (preferred), archive: {archive_path}")
try:
# ---- WiFi path: 使用 requests 库上传 ----
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
with open(archive_path, 'rb') as f:
files = {'file': (archive_filename, f, 'application/octet-stream')}
data = {'token': upload_token, 'key': qiniu_key}
wifi_upload_url = upload_url.replace('https://', 'http://', 1)
self.logger.info(f"[LOG_UPLOAD] WiFi upload URL: {wifi_upload_url}")
response = requests.post(wifi_upload_url, files=files, data=data, timeout=120, verify=False)
response.raise_for_status()
result_json = response.json()
uploaded_key = result_json.get('key', qiniu_key)
self.logger.info(f"[LOG_UPLOAD] WiFi upload ok: key={uploaded_key}")
access_url = None
if outlink:
access_url = f"https://{outlink}/{uploaded_key}"
response_data = {
"result": "log_upload_ok",
"key": uploaded_key,
"via": "wifi",
}
if access_url:
response_data["url"] = access_url
self.safe_enqueue(response_data, 2)
wifi_ok = True
except Exception as e:
# WiFi 上传失败不影响主链路:记录原因并回落 4G
self.logger.warning(f"[LOG_UPLOAD] WiFi upload failed, fallback to 4g: {e}")
if not wifi_ok:
if not wifi_tried:
self.logger.info(f"[LOG_UPLOAD] WiFi not connected, using 4g path, archive: {archive_path}")
else:
self.logger.info(f"[LOG_UPLOAD] Using 4g fallback path, archive: {archive_path}")
# ---- 4G path: 使用 FourGUploadManager AT命令上传 ----
import importlib.util
spec = importlib.util.spec_from_file_location(
"four_g_upload_manager",
os.path.join(os.path.dirname(__file__), "4g_upload_manager.py")
)
upload_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(upload_module)
FourGUploadManager = upload_module.FourGUploadManager
uploader = FourGUploadManager(hardware_manager.at_client)
result = uploader.upload_file(archive_path, upload_url, upload_token, qiniu_key)
if result.get("success"):
uploaded_key = result.get("key", qiniu_key)
self.logger.info(f"[LOG_UPLOAD] 4G upload ok: key={uploaded_key}")
access_url = None
if outlink:
access_url = f"https://{outlink}/{uploaded_key}"
response_data = {
"result": "log_upload_ok",
"key": uploaded_key,
"via": "4g",
}
if access_url:
response_data["url"] = access_url
self.safe_enqueue(response_data, 2)
else:
error_msg = result.get("error", "unknown_error")
self.logger.error(f"[LOG_UPLOAD] 4G upload failed: {error_msg}")
self.safe_enqueue({
"result": "log_upload_failed",
"reason": error_msg[:100]
}, 2)
except Exception as e:
self.logger.error(f"[LOG_UPLOAD] upload exception: {e}")
self.safe_enqueue({"result": "log_upload_failed", "reason": str(e)[:100]}, 2)
finally:
# 清理临时归档文件
try:
if archive_path and os.path.exists(archive_path):
os.remove(archive_path)
self.logger.debug(f"[LOG_UPLOAD] 临时归档已删除: {archive_path}")
except Exception as e:
self.logger.warning(f"[LOG_UPLOAD] 删除临时归档失败: {e}")
def _upload_image_file(self, image_path, upload_url, upload_token, key, shoot_id, outlink): def _upload_image_file(self, image_path, upload_url, upload_token, key, shoot_id, outlink):
"""上传图片文件到指定URL自动检测网络类型WiFi使用requests4G使用AT HTTP命令 """上传图片文件到指定URL自动检测网络类型WiFi使用requests4G使用AT HTTP命令
@@ -1369,16 +1600,16 @@ class NetworkManager:
shoot_id: 射击ID shoot_id: 射击ID
outlink: 外链域名可选用于构建访问URL outlink: 外链域名可选用于构建访问URL
""" """
# 自动检测网络类型,选择上传路径 # WiFi 优先(独立于 TCP 主链路):只要 WiFi 已连接就先走 WiFi失败再回落 4G
if self._network_type == "wifi" and self.is_wifi_connected(): mode = "wifi" if self.is_wifi_connected() else "4g"
mode = "wifi"
else:
mode = "4g"
self.logger.info(f"[IMAGE_UPLOAD] Using {mode} path, image: {image_path}") self.logger.info(f"[IMAGE_UPLOAD] Using {mode} path, image: {image_path}")
try: try:
wifi_ok = False
if mode == "wifi": if mode == "wifi":
try:
# ---- WiFi path: 使用 requests 库上传 ---- # ---- WiFi path: 使用 requests 库上传 ----
import requests import requests
import urllib3 import urllib3
@@ -1387,7 +1618,7 @@ class NetworkManager:
with open(image_path, 'rb') as f: with open(image_path, 'rb') as f:
files = {'file': (os.path.basename(image_path), f, 'application/octet-stream')} files = {'file': (os.path.basename(image_path), f, 'application/octet-stream')}
data = {'token': upload_token, 'key': key} data = {'token': upload_token, 'key': key}
# 测试:将HTTPS转为HTTP # 将 HTTPS 转为 HTTP(设备端 SSL 兼容性)
wifi_upload_url = upload_url.replace('https://', 'http://', 1) wifi_upload_url = upload_url.replace('https://', 'http://', 1)
self.logger.info(f"[IMAGE_UPLOAD] WiFi upload URL: {wifi_upload_url}") self.logger.info(f"[IMAGE_UPLOAD] WiFi upload URL: {wifi_upload_url}")
response = requests.post(wifi_upload_url, files=files, data=data, timeout=120, verify=False) response = requests.post(wifi_upload_url, files=files, data=data, timeout=120, verify=False)
@@ -1411,9 +1642,15 @@ class NetworkManager:
response_data["url"] = access_url response_data["url"] = access_url
self.safe_enqueue(response_data, 2) self.safe_enqueue(response_data, 2)
wifi_ok = True
except Exception as e:
self.logger.warning(f"[IMAGE_UPLOAD] WiFi upload failed, fallback to 4g: {e}")
else: if not wifi_ok:
# ---- 4G path: 使用 FourGUploadManager AT命令上传 ---- # ---- 4G path: 使用 FourGUploadManager AT命令上传 ----
if mode != "4g":
self.logger.info(f"[IMAGE_UPLOAD] Using 4g fallback path, image: {image_path}")
import importlib.util import importlib.util
spec = importlib.util.spec_from_file_location( spec = importlib.util.spec_from_file_location(
"four_g_upload_manager", "four_g_upload_manager",
@@ -1702,6 +1939,35 @@ class NetworkManager:
) )
# 立即返回已入队确认 # 立即返回已入队确认
self.safe_enqueue({"result": "image_upload_queued", "shootId": shoot_id}, 2) self.safe_enqueue({"result": "image_upload_queued", "shootId": shoot_id}, 2)
elif logged_in and msg_type == 101:
self.logger.info(f"[LOG_UPLOAD] 收到日志上传命令 {body}")
if isinstance(body, dict):
upload_url = body.get("uploadUrl")
upload_token = body.get("token")
key = body.get("key")
outlink = body.get("outlink", "")
include_rotated = body.get("includeRotated", True)
max_files = body.get("maxFiles")
archive_format = body.get("archive", "tgz")
hardware_manager.start_idle_timer() # 重新计时
# 验证必需字段
if not upload_url or not upload_token or not key:
self.logger.error("[LOG_UPLOAD] 缺少必需参数: uploadUrl, token 或 key")
self.safe_enqueue({"result": "log_upload_failed", "reason": "missing_params"}, 2)
else:
self.logger.info(f"[LOG_UPLOAD] 收到日志上传命令key: {key}")
# 在新线程中执行上传,避免阻塞主循环
import _thread
_thread.start_new_thread(
self._upload_log_file_v2,
(upload_url, upload_token, key, outlink, include_rotated, max_files, archive_format)
)
# 立即返回已入队确认
self.safe_enqueue({"result": "log_upload_queued"}, 2)
# 处理业务指令 # 处理业务指令
elif logged_in and isinstance(body, dict): elif logged_in and isinstance(body, dict):
inner_cmd = None inner_cmd = None
@@ -1795,29 +2061,6 @@ class NetworkManager:
mccid = self.get_4g_mccid() mccid = self.get_4g_mccid()
self.logger.info(f"4G MCCID: {mccid}") self.logger.info(f"4G MCCID: {mccid}")
self.safe_enqueue({"result": "mccid", "mccid": mccid if mccid is not None else ""}, 2) self.safe_enqueue({"result": "mccid", "mccid": mccid if mccid is not None else ""}, 2)
# elif inner_cmd == 7:
# from ota_manager import ota_manager
# if ota_manager.update_thread_started:
# self.safe_enqueue({"result": "update_already_started"}, 2)
# continue
# try:
# ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
# except:
# ip = None
# if not ip:
# self.safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, 2)
# else:
# # 注意direct_ota_download 需要 ota_url 参数
# # 如果 ota_manager.ota_url 为 None需要从其他地方获取
# ota_url_to_use = ota_manager.ota_url
# if not ota_url_to_use:
# self.logger.error("[OTA] cmd=7 但 OTA_URL 未设置")
# self.safe_enqueue({"result": "ota_failed", "reason": "ota_url_not_set"}, 2)
# else:
# ota_manager._start_update_thread()
# _thread.start_new_thread(ota_manager.direct_ota_download, (ota_url_to_use,))
elif inner_cmd == 41: elif inner_cmd == 41:
self.logger.info(f"[TEST] 收到TCP射箭触发命令, {time.time()}") self.logger.info(f"[TEST] 收到TCP射箭触发命令, {time.time()}")
self._manual_trigger_flag = True self._manual_trigger_flag = True
@@ -1953,10 +2196,10 @@ class NetworkManager:
except Exception as e: except Exception as e:
self.logger.error(f"[OTA] 检查 pending 文件时出错: {e}") self.logger.error(f"[OTA] 检查 pending 文件时出错: {e}")
# 心跳超时重连 # 服务器不再发送心跳ACK
if logged_in and current_time - last_heartbeat_ack_time > 1000*60*10: # if logged_in and current_time - last_heartbeat_ack_time > 1000*60*10:
self.logger.error("十分钟无心跳ACK重连") # self.logger.error("十分钟无心跳ACK重连")
break # break
self._send_event.wait(timeout=0.05) # 0.05秒 = 50ms self._send_event.wait(timeout=0.05) # 0.05秒 = 50ms
self._send_event.clear() self._send_event.clear()

View File

@@ -217,7 +217,7 @@ def check_image_sharpness(frame, threshold=100.0, save_debug_images=False):
# 保存原始图像 # 保存原始图像
img_orig = image.cv2image(img_cv, False, False) img_orig = image.cv2image(img_cv, False, False)
orig_filename = f"{debug_dir}/sharpness_debug_orig_{img_count:04d}.bmp" orig_filename = f"{debug_dir}/sharpness_debug_orig_{img_count:04d}.jpg"
img_orig.save(orig_filename) img_orig.save(orig_filename)
# # 保存边缘检测结果(可视化) # # 保存边缘检测结果(可视化)
@@ -294,7 +294,7 @@ def save_calibration_image(frame, laser_pos, photo_dir=None):
img_count = 0 img_count = 0
x, y = laser_pos x, y = laser_pos
filename = f"{photo_dir}/calibration_{int(x)}_{int(y)}_{img_count:04d}.bmp" filename = f"{photo_dir}/calibration_{int(x)}_{int(y)}_{img_count:04d}.jpg"
logger = logger_manager.logger logger = logger_manager.logger
if logger: if logger:
@@ -722,9 +722,9 @@ def _save_shot_image_impl(img_cv, center, radius, method, ellipse_params,
# 这里改为:只要 method 有值,就按 method 命名;否则才回退 no_target # 这里改为:只要 method 有值,就按 method 命名;否则才回退 no_target
method_str = (method or "").strip() method_str = (method or "").strip()
if method_str: if method_str:
filename = f"{photo_dir}/shot_{shot_id}_{method_str}.bmp" filename = f"{photo_dir}/shot_{shot_id}_{method_str}.jpg"
else: else:
filename = f"{photo_dir}/shot_{shot_id}_no_target.bmp" filename = f"{photo_dir}/shot_{shot_id}_no_target.jpg"
else: else:
try: try:
all_images = [f for f in os.listdir(photo_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))] all_images = [f for f in os.listdir(photo_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))]
@@ -737,7 +737,7 @@ def _save_shot_image_impl(img_cv, center, radius, method, ellipse_params,
else: else:
method_str = method or "unknown" method_str = method or "unknown"
distance_str = str(round((distance_m or 0.0) * 100)) distance_str = str(round((distance_m or 0.0) * 100))
filename = f"{photo_dir}/{method_str}_{int(x)}_{int(y)}_{distance_str}_{img_count:04d}.bmp" filename = f"{photo_dir}/{method_str}_{int(x)}_{int(y)}_{distance_str}_{img_count:04d}.jpg"
logger = logger_manager.logger logger = logger_manager.logger
if logger: if logger:

View File

@@ -1,784 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
视觉检测模块
提供靶心检测、距离估算、图像保存等功能
"""
import cv2
import numpy as np
import os
import math
import threading
import queue
from maix import image
import config
from logger_manager import logger_manager
# 导入ArUco检测器如果启用
if config.USE_ARUCO:
from aruco_detector import detect_target_with_aruco, aruco_detector
# 存图队列 + worker
_save_queue = queue.Queue(maxsize=16)
_save_worker_started = False
_save_worker_lock = threading.Lock()
def check_laser_point_sharpness(frame, laser_point=None, roi_size=30, threshold=100.0, ellipse_params=None):
"""
检测激光点本身的清晰度(不是整个靶子)
Args:
frame: 图像帧对象
laser_point: 激光点坐标 (x, y)如果为None则自动查找
roi_size: ROI区域大小像素默认30x30
threshold: 清晰度阈值
ellipse_params: 椭圆参数 ((center_x, center_y), (width, height), angle),用于限制激光点必须在椭圆内
Returns:
(is_sharp, sharpness_score, laser_pos): (是否清晰, 清晰度分数, 激光点坐标)
"""
try:
# 1. 如果没有提供激光点,先查找
if laser_point is None:
from laser_manager import laser_manager
laser_point = laser_manager.find_red_laser(frame, ellipse_params=ellipse_params)
if laser_point is None:
logger_manager.logger.debug(f"未找到激光点")
return False, 0.0, None
x, y = laser_point
# 2. 转换为 OpenCV 格式
img_cv = image.image2cv(frame, False, False)
h, w = img_cv.shape[:2]
# 3. 提取 ROI 区域(激光点周围)
roi_half = roi_size // 2
x_min = max(0, int(x) - roi_half)
x_max = min(w, int(x) + roi_half)
y_min = max(0, int(y) - roi_half)
y_max = min(h, int(y) + roi_half)
roi = img_cv[y_min:y_max, x_min:x_max]
if roi.size == 0:
return False, 0.0, laser_point
# 4. 转换为灰度图(用于清晰度检测)
gray_roi = cv2.cvtColor(roi, cv2.COLOR_RGB2GRAY)
# 5. 方法1检测点的扩散程度能量集中度
# 计算中心区域的能量集中度
center_x, center_y = roi.shape[1] // 2, roi.shape[0] // 2
center_radius = min(5, roi.shape[0] // 4) # 中心区域半径
# 创建中心区域的掩码
y_coords, x_coords = np.ogrid[:roi.shape[0], :roi.shape[1]]
center_mask = (x_coords - center_x)**2 + (y_coords - center_y)**2 <= center_radius**2
# 计算中心区域和周围区域的亮度
center_brightness = gray_roi[center_mask].mean()
outer_mask = ~center_mask
outer_brightness = gray_roi[outer_mask].mean() if np.any(outer_mask) else 0
# 对比度(清晰的点对比度高)
contrast = abs(center_brightness - outer_brightness)
# 6. 方法2检测点的边缘锐度使用拉普拉斯
laplacian = cv2.Laplacian(gray_roi, cv2.CV_64F)
edge_sharpness = abs(laplacian).var()
# 7. 方法3检测点的能量集中度方差
# 清晰的点:能量集中在中心,方差小
# 模糊的点:能量分散,方差大
# 但我们需要的是:清晰的点中心亮度高,周围低,所以梯度大
sobel_x = cv2.Sobel(gray_roi, cv2.CV_64F, 1, 0, ksize=3)
sobel_y = cv2.Sobel(gray_roi, cv2.CV_64F, 0, 1, ksize=3)
gradient = np.sqrt(sobel_x**2 + sobel_y**2)
gradient_sharpness = gradient.var()
# 8. 组合多个指标
# 对比度权重0.3边缘锐度权重0.4梯度权重0.3
sharpness_score = (contrast * 0.3 + edge_sharpness * 0.4 + gradient_sharpness * 0.3)
is_sharp = sharpness_score >= threshold
logger = logger_manager.logger
if logger:
logger.debug(f"[VISION] 激光点清晰度: 位置=({x}, {y}), 对比度={contrast:.2f}, 边缘={edge_sharpness:.2f}, 梯度={gradient_sharpness:.2f}, 综合={sharpness_score:.2f}, 是否清晰={is_sharp}")
return is_sharp, sharpness_score, laser_point
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[VISION] 激光点清晰度检测失败: {e}")
import traceback
logger.error(traceback.format_exc())
return False, 0.0, laser_point
def check_image_sharpness(frame, threshold=100.0, save_debug_images=False):
"""
检查图像清晰度(针对圆形靶子优化,基于圆形边缘检测)
检测靶心的圆形边缘,计算边缘区域的梯度清晰度
Args:
frame: 图像帧对象
threshold: 清晰度阈值低于此值认为图像模糊默认100.0
可以根据实际情况调整:
- 清晰图像通常 > 200
- 模糊图像通常 < 100
- 中等清晰度 100-200
save_debug_images: 是否保存调试图像原始图和边缘图默认False
Returns:
(is_sharp, sharpness_score): (是否清晰, 清晰度分数)
"""
try:
logger_manager.logger.debug(f"begin")
# 转换为 OpenCV 格式
img_cv = image.image2cv(frame, False, False)
logger_manager.logger.debug(f"after image2cv")
# 转换为 HSV 颜色空间
hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV)
h, s, v = cv2.split(hsv)
logger_manager.logger.debug(f"after HSV conversion")
# 检测黄色区域(靶心)
# 调整饱和度策略:稍微增强,不要过度
s_enhanced = np.clip(s * 1.1, 0, 255).astype(np.uint8)
hsv_enhanced = cv2.merge((h, s_enhanced, v))
# HSV 阈值范围(与 detect_circle_v3 保持一致)
lower_yellow = np.array([7, 80, 0])
upper_yellow = np.array([32, 255, 255])
mask_yellow = cv2.inRange(hsv_enhanced, lower_yellow, upper_yellow)
# 形态学操作,填充小孔洞
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask_yellow = cv2.morphologyEx(mask_yellow, cv2.MORPH_CLOSE, kernel)
logger_manager.logger.debug(f"after yellow mask detection")
# 计算边缘区域:扩展黄色区域,然后减去原始区域,得到边缘区域
mask_dilated = cv2.dilate(mask_yellow, kernel, iterations=2)
mask_edge = cv2.subtract(mask_dilated, mask_yellow) # 边缘区域
# 计算边缘区域的像素数量
edge_pixel_count = np.sum(mask_edge > 0)
logger_manager.logger.debug(f"edge pixel count: {edge_pixel_count}")
# 如果检测不到边缘区域,使用全局梯度作为后备方案
if edge_pixel_count < 100:
logger_manager.logger.debug(f"edge region too small, using global gradient")
# 使用 V 通道计算全局梯度
sobel_v_x = cv2.Sobel(v, cv2.CV_64F, 1, 0, ksize=3)
sobel_v_y = cv2.Sobel(v, cv2.CV_64F, 0, 1, ksize=3)
gradient = np.sqrt(sobel_v_x**2 + sobel_v_y**2)
sharpness_score = gradient.var()
logger_manager.logger.debug(f"global gradient variance: {sharpness_score:.2f}")
else:
# 在边缘区域计算梯度清晰度
# 使用 V亮度通道计算梯度因为边缘在亮度上通常很明显
sobel_v_x = cv2.Sobel(v, cv2.CV_64F, 1, 0, ksize=3)
sobel_v_y = cv2.Sobel(v, cv2.CV_64F, 0, 1, ksize=3)
gradient = np.sqrt(sobel_v_x**2 + sobel_v_y**2)
# 只在边缘区域计算清晰度
edge_gradient = gradient[mask_edge > 0]
if len(edge_gradient) > 0:
# 计算边缘梯度的方差(清晰图像的边缘梯度变化大)
sharpness_score = edge_gradient.var()
# 也可以使用均值作为补充指标(清晰图像的边缘梯度均值也较大)
gradient_mean = edge_gradient.mean()
logger_manager.logger.debug(f"edge gradient: mean={gradient_mean:.2f}, var={sharpness_score:.2f}, pixels={len(edge_gradient)}")
else:
# 如果边缘区域没有有效梯度,使用全局梯度
sharpness_score = gradient.var()
logger_manager.logger.debug(f"no edge gradient, using global: {sharpness_score:.2f}")
# 保存调试图像(如果启用)
if save_debug_images:
try:
debug_dir = config.PHOTO_DIR
if debug_dir not in os.listdir("/root"):
try:
os.mkdir(debug_dir)
except:
pass
# 生成文件名
try:
all_images = [f for f in os.listdir(debug_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))]
img_count = len(all_images)
except:
img_count = 0
# 保存原始图像
img_orig = image.cv2image(img_cv, False, False)
orig_filename = f"{debug_dir}/sharpness_debug_orig_{img_count:04d}.bmp"
img_orig.save(orig_filename)
# # 保存边缘检测结果(可视化)
# # 创建可视化图像:原始图像 + 黄色区域 + 边缘区域
# debug_img = img_cv.copy()
# # 在黄色区域绘制绿色
# debug_img[mask_yellow > 0] = [0, 255, 0] # RGB格式绿色
# # 在边缘区域绘制红色
# debug_img[mask_edge > 0] = [255, 0, 0] # RGB格式红色
# debug_img_maix = image.cv2image(debug_img, False, False)
# debug_filename = f"{debug_dir}/sharpness_debug_edge_{img_count:04d}.bmp"
# debug_img_maix.save(debug_filename)
# logger = logger_manager.logger
# if logger:
# logger.info(f"[VISION] 保存调试图像: {orig_filename}, {debug_filename}")
except Exception as e:
logger = logger_manager.logger
if logger:
logger.warning(f"[VISION] 保存调试图像失败: {e}")
import traceback
logger.error(traceback.format_exc())
is_sharp = sharpness_score >= threshold
logger = logger_manager.logger
if logger:
logger.debug(f"[VISION] 清晰度检测: 分数={sharpness_score:.2f}, 边缘像素数={edge_pixel_count}, 是否清晰={is_sharp}, 阈值={threshold}")
return is_sharp, sharpness_score
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[VISION] 清晰度检测失败: {e}")
import traceback
logger.error(traceback.format_exc())
# 出错时返回 False避免使用模糊图像
return False, 0.0
def save_calibration_image(frame, laser_pos, photo_dir=None):
"""
保存激光校准图像(带标注)
在找到的激光点位置绘制圆圈,便于检查算法是否正确
Args:
frame: 原始图像帧
laser_pos: 找到的激光点坐标 (x, y)
photo_dir: 照片存储目录如果为None则使用 config.PHOTO_DIR
Returns:
str: 保存的文件路径,如果保存失败则返回 None
"""
# 检查是否启用图像保存
if not config.SAVE_IMAGE_ENABLED:
return None
if photo_dir is None:
photo_dir = config.PHOTO_DIR
try:
# 确保照片目录存在
try:
if photo_dir not in os.listdir("/root"):
os.mkdir(photo_dir)
except:
pass
# 生成文件名
try:
all_images = [f for f in os.listdir(photo_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))]
img_count = len(all_images)
except:
img_count = 0
x, y = laser_pos
filename = f"{photo_dir}/calibration_{int(x)}_{int(y)}_{img_count:04d}.bmp"
logger = logger_manager.logger
if logger:
logger.info(f"保存校准图像: {filename}, 激光点: ({x}, {y})")
# 转换图像为 OpenCV 格式以便绘制
img_cv = image.image2cv(frame, False, False)
# 绘制激光点圆圈(用绿色圆圈标出找到的激光点)
cv2.circle(img_cv, (int(x), int(y)), 10, (0, 255, 0), 2) # 外圈绿色半径10
cv2.circle(img_cv, (int(x), int(y)), 5, (0, 255, 0), 2) # 中圈绿色半径5
cv2.circle(img_cv, (int(x), int(y)), 2, (0, 255, 0), -1) # 中心点:绿色实心
# 可选:绘制十字线帮助定位
cv2.line(img_cv,
(int(x - 20), int(y)),
(int(x + 20), int(y)),
(0, 255, 0), 1) # 水平线
cv2.line(img_cv,
(int(x), int(y - 20)),
(int(x), int(y + 20)),
(0, 255, 0), 1) # 垂直线
# 转换回 MaixPy 图像格式并保存
result_img = image.cv2image(img_cv, False, False)
result_img.save(filename)
if logger:
logger.debug(f"校准图像已保存: {filename}")
return filename
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"保存校准图像失败: {e}")
import traceback
logger.error(traceback.format_exc())
return None
def detect_circle_v3(frame, laser_point=None):
"""检测图像中的靶心(优先清晰轮廓,其次黄色区域)- 返回椭圆参数版本
增加红色圆圈检测,验证黄色圆圈是否为真正的靶心
如果提供 laser_point会选择最接近激光点的目标
Args:
frame: 图像帧
laser_point: 激光点坐标 (x, y),用于多目标场景下的目标选择
Returns:
(result_img, best_center, best_radius, method, best_radius1, ellipse_params)
"""
img_cv = image.image2cv(frame, False, False)
best_center = best_radius = best_radius1 = method = None
ellipse_params = None
# HSV 黄色掩码检测(模糊靶心)
hsv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2HSV)
h, s, v = cv2.split(hsv)
# 调整饱和度策略:稍微增强,不要过度
s = np.clip(s * 1.1, 0, 255).astype(np.uint8)
hsv = cv2.merge((h, s, v))
# 放宽 HSV 阈值范围(针对模糊图像的关键调整)
lower_yellow = np.array([7, 80, 0]) # 饱和度下限降低,捕捉淡黄色
upper_yellow = np.array([32, 255, 255]) # 亮度上限拉满
mask_yellow = cv2.inRange(hsv, lower_yellow, upper_yellow)
# 调整形态学操作
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask_yellow = cv2.morphologyEx(mask_yellow, cv2.MORPH_CLOSE, kernel)
contours_yellow, _ = cv2.findContours(mask_yellow, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# 存储所有有效的黄色-红色组合
valid_targets = []
if contours_yellow:
for cnt_yellow in contours_yellow:
area = cv2.contourArea(cnt_yellow)
perimeter = cv2.arcLength(cnt_yellow, True)
# 计算圆度
if perimeter > 0:
circularity = (4 * np.pi * area) / (perimeter * perimeter)
else:
circularity = 0
logger = logger_manager.logger
if area > 50 and circularity > 0.7:
if logger:
logger.info(f"[target] -> 面积:{area}, 圆度:{circularity:.2f}")
# 尝试拟合椭圆
yellow_center = None
yellow_radius = None
yellow_ellipse = None
if len(cnt_yellow) >= 5:
(x, y), (width, height), angle = cv2.fitEllipse(cnt_yellow)
yellow_ellipse = ((x, y), (width, height), angle)
axes_minor = min(width, height)
radius = axes_minor / 2
yellow_center = (int(x), int(y))
yellow_radius = int(radius)
else:
(x, y), radius = cv2.minEnclosingCircle(cnt_yellow)
yellow_center = (int(x), int(y))
yellow_radius = int(radius)
yellow_ellipse = None
# 如果检测到黄色圆圈,再检测红色圆圈进行验证
if yellow_center and yellow_radius:
# HSV 红色掩码检测红色在HSV中跨越0度需要两个范围
# 红色范围1: 0-10度接近0度的红色
lower_red1 = np.array([0, 80, 0])
upper_red1 = np.array([10, 255, 255])
mask_red1 = cv2.inRange(hsv, lower_red1, upper_red1)
# 红色范围2: 170-180度接近180度的红色
lower_red2 = np.array([170, 80, 0])
upper_red2 = np.array([180, 255, 255])
mask_red2 = cv2.inRange(hsv, lower_red2, upper_red2)
# 合并两个红色掩码
mask_red = cv2.bitwise_or(mask_red1, mask_red2)
# 形态学操作
kernel_red = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask_red = cv2.morphologyEx(mask_red, cv2.MORPH_CLOSE, kernel_red)
contours_red, _ = cv2.findContours(mask_red, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
found_valid_red = False
if contours_red:
# 找到所有符合条件的红色圆圈
for cnt_red in contours_red:
area_red = cv2.contourArea(cnt_red)
perimeter_red = cv2.arcLength(cnt_red, True)
if perimeter_red > 0:
circularity_red = (4 * np.pi * area_red) / (perimeter_red * perimeter_red)
else:
circularity_red = 0
# 红色圆圈也应该有一定的圆度
if area_red > 50 and circularity_red > 0.6:
# 计算红色圆圈的中心和半径
if len(cnt_red) >= 5:
(x_red, y_red), (w_red, h_red), angle_red = cv2.fitEllipse(cnt_red)
radius_red = min(w_red, h_red) / 2
red_center = (int(x_red), int(y_red))
red_radius = int(radius_red)
else:
(x_red, y_red), radius_red = cv2.minEnclosingCircle(cnt_red)
red_center = (int(x_red), int(y_red))
red_radius = int(radius_red)
# 计算黄色和红色圆心的距离
if red_center:
dx = yellow_center[0] - red_center[0]
dy = yellow_center[1] - red_center[1]
distance = np.sqrt(dx*dx + dy*dy)
# 圆心距离阈值应该小于黄色半径的某个倍数比如1.5倍)
max_distance = yellow_radius * 1.5
# 红色圆圈应该比黄色圆圈大(外圈)
if distance < max_distance and red_radius > yellow_radius * 0.8:
found_valid_red = True
logger = logger_manager.logger
if logger:
logger.info(f"[target] -> 找到匹配的红圈: 黄心({yellow_center}), 红心({red_center}), 距离:{distance:.1f}, 黄半径:{yellow_radius}, 红半径:{red_radius}")
# 记录这个有效目标
valid_targets.append({
'center': yellow_center,
'radius': yellow_radius,
'ellipse': yellow_ellipse,
'area': area
})
break
if not found_valid_red:
logger = logger_manager.logger
if logger:
logger.debug("Debug -> 未找到匹配的红色圆圈,可能是误识别")
# 从所有有效目标中选择最佳目标
if valid_targets:
if laser_point:
# 如果有激光点,选择最接近激光点的目标
best_target = None
min_distance = float('inf')
for target in valid_targets:
dx = target['center'][0] - laser_point[0]
dy = target['center'][1] - laser_point[1]
distance = np.sqrt(dx*dx + dy*dy)
if distance < min_distance:
min_distance = distance
best_target = target
if best_target:
best_center = best_target['center']
best_radius = best_target['radius']
ellipse_params = best_target['ellipse']
method = "v3_ellipse_red_validated_laser_selected"
best_radius1 = best_radius * 5
else:
# 如果没有激光点,选择面积最大的目标
best_target = max(valid_targets, key=lambda t: t['area'])
best_center = best_target['center']
best_radius = best_target['radius']
ellipse_params = best_target['ellipse']
method = "v3_ellipse_red_validated"
best_radius1 = best_radius * 5
result_img = image.cv2image(img_cv, False, False)
return result_img, best_center, best_radius, method, best_radius1, ellipse_params
def estimate_distance(pixel_radius):
"""根据像素半径估算实际距离(单位:米)"""
if not pixel_radius:
return 0.0
return (config.REAL_RADIUS_CM * config.FOCAL_LENGTH_PIX) / pixel_radius / 100.0
def estimate_pixel(physical_distance_cm, target_distance_m):
"""
根据物理距离和目标距离计算对应的像素偏移
Args:
physical_distance_cm: 物理世界中的距离(厘米),例如激光与摄像头的距离
target_distance_m: 目标距离(米),例如到靶心的距离
Returns:
float: 对应的像素偏移
"""
if not target_distance_m or target_distance_m <= 0:
return 0.0
# 公式:像素偏移 = (物理距离_米) * 焦距_像素 / 目标距离_米
return (physical_distance_cm / 100.0) * config.FOCAL_LENGTH_PIX / target_distance_m
def _save_shot_image_impl(img_cv, center, radius, method, ellipse_params,
laser_point, distance_m, shot_id=None, photo_dir=None):
"""
内部实现:在 img_cv (numpy HWC RGB) 上绘制标注并保存。
由 save_shot_image同步和存图 worker异步调用。
"""
if not config.SAVE_IMAGE_ENABLED:
return None
if photo_dir is None:
photo_dir = config.PHOTO_DIR
try:
try:
if photo_dir not in os.listdir("/root"):
os.mkdir(photo_dir)
except Exception:
pass
x, y = laser_point
if shot_id:
if center is None or radius is None:
filename = f"{photo_dir}/shot_{shot_id}_no_target.bmp"
else:
method_str = method or "unknown"
filename = f"{photo_dir}/shot_{shot_id}_{method_str}.bmp"
else:
try:
all_images = [f for f in os.listdir(photo_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))]
img_count = len(all_images)
except Exception:
img_count = 0
if center is None or radius is None:
method_str = "no_target"
distance_str = "000"
else:
method_str = method or "unknown"
distance_str = str(round((distance_m or 0.0) * 100))
filename = f"{photo_dir}/{method_str}_{int(x)}_{int(y)}_{distance_str}_{img_count:04d}.bmp"
logger = logger_manager.logger
if logger:
if shot_id:
logger.info(f"[VISION] 保存射箭图像ID: {shot_id}, 文件名: {filename}")
if center and radius:
logger.info(f"结果 -> 圆心: {center}, 半径: {radius}, 方法: {method}")
if ellipse_params:
(ec, (ew, eh), ea) = ellipse_params
logger.info(f"椭圆 -> 中心: ({ec[0]:.1f}, {ec[1]:.1f}), 长轴: {max(ew, eh):.1f}, 短轴: {min(ew, eh):.1f}, 角度: {ea:.1f}°")
else:
logger.info(f"结果 -> 未检测到靶心,保存原始图像(激光点: ({x}, {y})")
laser_color = (config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2])
cross_thickness = int(max(getattr(config, "LASER_THICKNESS", 1), 1))
cross_length = int(max(getattr(config, "LASER_LENGTH", 10), 10))
cv2.line(img_cv, (int(x - cross_length), int(y)), (int(x + cross_length), int(y)), laser_color, cross_thickness)
cv2.line(img_cv, (int(x), int(y - cross_length)), (int(x), int(y + cross_length)), laser_color, cross_thickness)
cv2.circle(img_cv, (int(x), int(y)), 1, laser_color, cross_thickness)
ring_thickness = 1
cv2.circle(img_cv, (int(x), int(y)), 10, laser_color, ring_thickness)
cv2.circle(img_cv, (int(x), int(y)), 5, laser_color, ring_thickness)
cv2.circle(img_cv, (int(x), int(y)), 2, laser_color, -1)
if center and radius:
cx, cy = center
if ellipse_params:
(ell_center, (width, height), angle) = ellipse_params
cx_ell, cy_ell = int(ell_center[0]), int(ell_center[1])
cv2.ellipse(img_cv, (cx_ell, cy_ell), (int(width / 2), int(height / 2)), angle, 0, 360, (0, 255, 0), 2)
cv2.circle(img_cv, (cx_ell, cy_ell), 3, (255, 0, 0), -1)
minor_length = min(width, height) / 2
minor_angle = angle + 90 if width >= height else angle
minor_angle_rad = math.radians(minor_angle)
dx_minor = minor_length * math.cos(minor_angle_rad)
dy_minor = minor_length * math.sin(minor_angle_rad)
pt1 = (int(cx_ell - dx_minor), int(cy_ell - dy_minor))
pt2 = (int(cx_ell + dx_minor), int(cy_ell + dy_minor))
cv2.line(img_cv, pt1, pt2, (0, 0, 255), 2)
else:
cv2.circle(img_cv, (cx, cy), radius, (0, 0, 255), 2)
cv2.circle(img_cv, (cx, cy), 2, (0, 0, 255), -1)
cv2.line(img_cv, (int(x), int(y)), (cx, cy), (255, 255, 0), 1)
out = image.cv2image(img_cv, False, False)
out.save(filename)
if logger:
if center and radius:
logger.debug(f"图像已保存(含靶心标注): {filename}")
else:
logger.debug(f"图像已保存(无靶心,含激光十字线): {filename}")
# 清理旧图片如果目录下图片超过100张删除最老的
try:
image_files = []
for f in os.listdir(photo_dir):
if f.endswith(('.bmp', '.jpg', '.jpeg')):
filepath = os.path.join(photo_dir, f)
try:
mtime = os.path.getmtime(filepath)
image_files.append((mtime, filepath, f))
except Exception:
pass
from config import MAX_IMAGES
if len(image_files) > MAX_IMAGES:
image_files.sort(key=lambda x: x[0])
to_delete = len(image_files) - MAX_IMAGES
deleted_count = 0
for _, filepath, fname in image_files[:to_delete]:
try:
os.remove(filepath)
deleted_count += 1
if logger:
logger.debug(f"[VISION] 删除旧图片: {fname}")
except Exception as e:
if logger:
logger.warning(f"[VISION] 删除旧图片失败 {fname}: {e}")
if logger and deleted_count > 0:
logger.info(f"[VISION] 已清理 {deleted_count} 张旧图片,当前剩余 {MAX_IMAGES}")
except Exception as e:
if logger:
logger.warning(f"[VISION] 清理旧图片时出错(可忽略): {e}")
return filename
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"保存图像失败: {e}")
import traceback
logger.error(traceback.format_exc())
return None
def _save_worker_loop():
"""存图 worker从队列取任务并调用 _save_shot_image_impl。"""
while True:
try:
item = _save_queue.get()
if item is None:
break
_save_shot_image_impl(*item)
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[VISION] 存图 worker 异常: {e}")
import traceback
logger.error(traceback.format_exc())
finally:
try:
_save_queue.task_done()
except Exception:
pass
def start_save_shot_worker():
"""启动存图 worker 线程(应在程序初始化时调用一次)。"""
global _save_worker_started
with _save_worker_lock:
if _save_worker_started:
return
_save_worker_started = True
t = threading.Thread(target=_save_worker_loop, daemon=True)
t.start()
logger = logger_manager.logger
if logger:
logger.info("[VISION] 存图 worker 线程已启动")
def enqueue_save_shot(result_img, center, radius, method, ellipse_params,
laser_point, distance_m, shot_id=None, photo_dir=None):
"""
将存图任务放入队列,由 worker 异步保存。主线程传入 result_img 的复制,不阻塞。
"""
if not config.SAVE_IMAGE_ENABLED:
return
if photo_dir is None:
photo_dir = config.PHOTO_DIR
try:
img_cv = image.image2cv(result_img, False, False)
img_copy = np.copy(img_cv)
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[VISION] enqueue_save_shot 复制图像失败: {e}")
return
task = (img_copy, center, radius, method, ellipse_params, laser_point, distance_m, shot_id, photo_dir)
try:
_save_queue.put_nowait(task)
except queue.Full:
logger = logger_manager.logger
if logger:
logger.warning("[VISION] 存图队列已满,跳过本次保存")
def save_shot_image(result_img, center, radius, method, ellipse_params,
laser_point, distance_m, shot_id=None, photo_dir=None):
"""
保存射击图像(带标注)。同步调用,会阻塞。
主流程建议使用 enqueue_save_shot此处保留供校准、测试等场景使用。
"""
if not config.SAVE_IMAGE_ENABLED:
return None
if photo_dir is None:
photo_dir = config.PHOTO_DIR
try:
img_cv = image.image2cv(result_img, False, False)
return _save_shot_image_impl(img_cv, center, radius, method, ellipse_params,
laser_point, distance_m, shot_id, photo_dir)
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[VISION] save_shot_image 转换图像失败: {e}")
return None
def detect_target(frame, laser_point=None):
"""
统一的靶心检测接口,根据配置自动选择检测方法
Args:
frame: MaixPy图像帧
laser_point: 激光点坐标(可选)
Returns:
(result_img, center, radius, method, best_radius1, ellipse_params)
与detect_circle_v3保持相同的返回格式
"""
logger = logger_manager.logger
if config.USE_ARUCO:
# 使用ArUco检测
if logger:
logger.debug("[VISION] 使用ArUco标记检测靶心")
# 延迟导入以避免循环依赖
from aruco_detector import detect_target_with_aruco
return detect_target_with_aruco(frame, laser_point)
else:
# 使用传统黄色靶心检测
if logger:
logger.debug("[VISION] 使用传统黄色靶心检测")
return detect_circle_v3(frame, laser_point)