This commit is contained in:
gcw_4spBpAfv
2026-01-24 11:05:03 +08:00
parent 28fb62e5d6
commit 8ce8831315
9 changed files with 197 additions and 127 deletions

2
.gitignore vendored
View File

@@ -1 +1,3 @@
/cpp_ext/build/
/.cursor/
/dist/

View File

@@ -17,6 +17,7 @@ files:
- network.py
- ota_manager.py
- power.py
- shoot_manager.py
- shot_id_generator.py
- time_sync.py
- version.py

View File

@@ -39,7 +39,7 @@ BACKUP_BASE = "/maixapp/apps/t11/backups"
# ==================== 硬件配置 ====================
# WiFi模块开关True=有WiFi模块False=无WiFi模块
HAS_WIFI_MODULE = False # 根据实际硬件情况设置
HAS_WIFI_MODULE = True # 根据实际硬件情况设置
# UART配置
UART4G_DEVICE = "/dev/ttyS2"
@@ -108,6 +108,7 @@ LASER_LENGTH = 2
# ==================== 图像保存配置 ====================
SAVE_IMAGE_ENABLED = True # 是否保存图像True=保存False=不保存)
PHOTO_DIR = "/root/phot" # 照片存储目录
MAX_IMAGES = 1000
# ==================== OTA配置 ====================
MAX_BACKUPS = 5

23
main.py
View File

@@ -23,7 +23,7 @@ from logger_manager import logger_manager
from time_sync import sync_system_time_from_4g
from power import init_ina226, get_bus_voltage, voltage_to_percent
from laser_manager import laser_manager
from vision import detect_circle_v3, estimate_distance, save_shot_image, save_calibration_image
from vision import detect_circle_v3, estimate_distance, enqueue_save_shot, start_save_shot_worker
from network import network_manager
from ota_manager import ota_manager
from hardware import hardware_manager
@@ -111,6 +111,9 @@ def cmd_str():
# 2. 从4G模块同步系统时间需要 at_client 已初始化)
sync_system_time_from_4g()
# 2.5. 启动存图 worker 线程(队列 + worker避免主循环阻塞
start_save_shot_worker()
# 3. 启动时检查:是否需要恢复备份
pending_path = f"{config.APP_DIR}/ota_pending.json"
@@ -327,19 +330,17 @@ def cmd_str():
if logger:
logger.info(f"[MAIN] 射箭ID: {shot_id}")
# 保存图像(无论是否检测到靶心都保存)
# save_shot_image 函数会确保绘制激光十字线和检测标注(如果有)
# 如果未检测到靶心,文件名会包含 "no_target" 标识
save_shot_image(
result_img,
center,
radius,
method,
# 保存图像(无论是否检测到靶心都保存):放入队列由 worker 异步保存,不阻塞主循环
enqueue_save_shot(
result_img,
center,
radius,
method,
ellipse_params,
(x, y),
(x, y),
distance_m,
shot_id=shot_id,
photo_dir=config.PHOTO_DIR if config.SAVE_IMAGE_ENABLED else None
photo_dir=config.PHOTO_DIR if config.SAVE_IMAGE_ENABLED else None,
)
# 构造上报数据

View File

@@ -232,47 +232,49 @@ class NetworkManager:
Returns:
(ip, error): IP地址和错误信息成功时error为None
"""
conf_path = "/etc/wpa_supplicant.conf"
ssid_file = "/boot/wifi.ssid"
pass_file = "/boot/wifi.pass"
# 配置文件路径定义
conf_path = "/etc/wpa_supplicant.conf" # wpa_supplicant配置文件路径
ssid_file = "/boot/wifi.ssid" # 用于保存SSID的文件路径
pass_file = "/boot/wifi.pass" # 用于保存密码的文件路径
try:
# 生成 wpa_supplicant 配置
net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read()
if "network={" not in net_conf:
net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read() # 调用系统命令生成配置
if "network={" not in net_conf: # 检查配置是否生成成功
return None, "Failed to generate wpa config"
# 写入运行时配置
with open(conf_path, "w") as f:
f.write("ctrl_interface=/var/run/wpa_supplicant\n")
f.write("update_config=1\n\n")
f.write(net_conf)
with open(conf_path, "w") as f: # 打开配置文件准备写入
f.write("ctrl_interface=/var/run/wpa_supplicant\n") # 设置控制接口路径
f.write("update_config=1\n\n") # 允许更新配置
f.write(net_conf) # 写入网络配置
# 持久化保存 SSID/PASS
with open(ssid_file, "w") as f:
f.write(ssid.strip())
with open(pass_file, "w") as f:
f.write(password.strip())
with open(ssid_file, "w") as f: # 打开SSID文件准备写入
f.write(ssid.strip()) # 写入SSID去除首尾空格
with open(pass_file, "w") as f: # 打开密码文件准备写入
f.write(password.strip()) # 写入密码(去除首尾空格)
# 重启 Wi-Fi 服务
os.system("/etc/init.d/S30wifi restart")
os.system("/etc/init.d/S30wifi restart") # 执行WiFi服务重启命令
# 等待获取 IP
import time as std_time
for _ in range(20):
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
if ip:
self._wifi_connected = True
self._wifi_ip = ip
self.logger.info(f"[WIFI] 已连接IP: {ip}")
import time as std_time # 导入time模块并重命名为std_time
for _ in range(50): # 最多等待50秒
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip() # 获取wlan0的IP地址
if ip: # 如果获取到IP地址
self._wifi_connected = True # 设置WiFi连接状态为已连接
self._wifi_ip = ip # 保存IP地址
self.logger.info(f"[WIFI] 已连接IP: {ip}") # 记录连接成功日志
return ip, None
std_time.sleep(1)
std_time.sleep(1) # 每次循环等待1秒
return None, "Timeout: No IP obtained"
return None, "Timeout: No IP obtained" # 超时未获取到IP
except Exception as e:
self.logger.error(f"[WIFI] 连接失败: {e}")
return None, f"Exception: {str(e)}"
except Exception as e: # 捕获所有异常
self.logger.error(f"[WIFI] 连接失败: {e}") # 记录错误日志
return None, f"Exception: {str(e)}" # 返回异常信息
def is_server_reachable(self, host, port=80, timeout=5):
"""检查目标主机端口是否可达(用于网络检测)"""
@@ -308,6 +310,9 @@ class NetworkManager:
if self.is_server_reachable(self._server_ip, self._server_port, timeout=3):
self._network_type = "wifi"
self.logger.info(f"[NET] 选择WiFi网络IP: {self._wifi_ip}")
import os
os.environ["TZ"] = "Asia/Shanghai"
os.system("ntpdate pool.ntp.org")
return "wifi"
else:
self.logger.warning("[NET] WiFi已连接但无法访问服务器尝试4G")

View File

@@ -1225,7 +1225,10 @@ class OTAManager:
return
else:
safe_enqueue({"result": result}, 2)
except Exception as e:
err_msg = f"下载失败: {str(e)}"
safe_enqueue({"result": err_msg}, 2)
self.logger.error(err_msg)
finally:
self._stop_update_thread()
print("[UPDATE] 更新线程执行完毕,即将退出。")

View File

@@ -47,6 +47,7 @@ def set_autostart_app(app_id):
if __name__ == "__main__":
new_autostart_app_id = "t11" # change to app_id you want to set
# new_autostart_app_id = None # remove autostart
# new_autostart_app_id = "z1222" # change to app_id you want to set
list_apps()
print("Before set autostart appid:", get_curr_autostart_app())

View File

@@ -8,7 +8,7 @@ VERSION = '1.2.1'
# 1.2.0 开始使用C++编译成.so替换部分代码
# 1.2.1 ota使用加密包
# 1.2.2 支持wifi ota并且设定时区并使用单独线程保存图片

230
vision.py
View File

@@ -8,10 +8,17 @@ import cv2
import numpy as np
import os
import math
import threading
import queue
from maix import image
import config
from logger_manager import logger_manager
# 存图队列 + worker
_save_queue = queue.Queue(maxsize=16)
_save_worker_started = False
_save_worker_lock = threading.Lock()
def check_laser_point_sharpness(frame, laser_point=None, roi_size=30, threshold=100.0, ellipse_params=None):
"""
检测激光点本身的清晰度(不是整个靶子)
@@ -529,71 +536,45 @@ def estimate_pixel(physical_distance_cm, target_distance_m):
# 公式:像素偏移 = (物理距离_米) * 焦距_像素 / 目标距离_米
return (physical_distance_cm / 100.0) * config.FOCAL_LENGTH_PIX / target_distance_m
def save_shot_image(result_img, center, radius, method, ellipse_params,
laser_point, distance_m, shot_id=None, photo_dir=None):
def _save_shot_image_impl(img_cv, center, radius, method, ellipse_params,
laser_point, distance_m, shot_id=None, photo_dir=None):
"""
保存射击图像(带标注)
即使没有检测到靶心也会保存图像,文件名会标注 "no_target"
确保保存的图像总是包含激光十字线
Args:
result_img: 处理后的图像对象(可能已经包含激光十字线或检测标注)
center: 靶心中心坐标 (x, y),可能为 None未检测到靶心
radius: 靶心半径,可能为 None未检测到靶心
method: 检测方法,可能为 None未检测到靶心
ellipse_params: 椭圆参数 ((center, (width, height), angle)) 或 None
laser_point: 激光点坐标 (x, y)
distance_m: 距离(米),可能为 None未检测到靶心
shot_id: 射箭ID如果提供则用作文件名否则使用旧的文件名格式
photo_dir: 照片存储目录如果为None则使用 config.PHOTO_DIR
Returns:
str: 保存的文件路径,如果保存失败或未启用则返回 None
内部实现:在 img_cv (numpy HWC RGB) 上绘制标注并保存。
由 save_shot_image同步和存图 worker异步调用。
"""
# 检查是否启用图像保存
if not config.SAVE_IMAGE_ENABLED:
return None
if photo_dir is None:
photo_dir = config.PHOTO_DIR
try:
# 确保照片目录存在
try:
if photo_dir not in os.listdir("/root"):
os.mkdir(photo_dir)
except:
except Exception:
pass
x, y = laser_point
# 生成文件名:优先使用 shot_id否则使用旧格式
if shot_id:
# 使用射箭ID作为文件名
# 如果未检测到靶心,在文件名中标注
if center is None or radius is None:
filename = f"{photo_dir}/shot_{shot_id}_no_target.bmp"
else:
method_str = method or "unknown"
filename = f"{photo_dir}/shot_{shot_id}_{method_str}.bmp"
else:
# 旧的文件名格式(向后兼容)
try:
all_images = [f for f in os.listdir(photo_dir) if f.endswith(('.bmp', '.jpg', '.jpeg'))]
img_count = len(all_images)
except:
except Exception:
img_count = 0
# 如果未检测到靶心,在文件名中标注
if center is None or radius is None:
method_str = "no_target"
distance_str = "000"
else:
method_str = method or "unknown"
distance_str = str(round((distance_m or 0.0) * 100))
filename = f"{photo_dir}/{method_str}_{int(x)}_{int(y)}_{distance_str}_{img_count:04d}.bmp"
logger = logger_manager.logger
if logger:
if shot_id:
@@ -601,89 +582,82 @@ def save_shot_image(result_img, center, radius, method, ellipse_params,
if center and radius:
logger.info(f"结果 -> 圆心: {center}, 半径: {radius}, 方法: {method}")
if ellipse_params:
(ell_center, (width, height), angle) = ellipse_params
logger.info(f"椭圆 -> 中心: ({ell_center[0]:.1f}, {ell_center[1]:.1f}), 长轴: {max(width, height):.1f}, 短轴: {min(width, height):.1f}, 角度: {angle:.1f}°")
(ec, (ew, eh), ea) = ellipse_params
logger.info(f"椭圆 -> 中心: ({ec[0]:.1f}, {ec[1]:.1f}), 长轴: {max(ew, eh):.1f}, 短轴: {min(ew, eh):.1f}, 角度: {ea:.1f}°")
else:
logger.info(f"结果 -> 未检测到靶心,保存原始图像(激光点: ({x}, {y})")
# 转换图像为 OpenCV 格式以便绘制
img_cv = image.image2cv(result_img, False, False)
# 绘制激光十字线(保存图像时统一绘制,避免影响检测)
laser_color = (config.LASER_COLOR[0], config.LASER_COLOR[1], config.LASER_COLOR[2])
cross_thickness = int(max(getattr(config, "LASER_THICKNESS", 1), 1))
cross_length = int(max(getattr(config, "LASER_LENGTH", 10), 10))
# 水平线
cv2.line(
img_cv,
(int(x - cross_length), int(y)),
(int(x + cross_length), int(y)),
laser_color,
cross_thickness,
)
# 垂直线
cv2.line(
img_cv,
(int(x), int(y - cross_length)),
(int(x), int(y + cross_length)),
laser_color,
cross_thickness,
)
# 小点(与原 main.py 行为一致)
cv2.line(img_cv, (int(x - cross_length), int(y)), (int(x + cross_length), int(y)), laser_color, cross_thickness)
cv2.line(img_cv, (int(x), int(y - cross_length)), (int(x), int(y + cross_length)), laser_color, cross_thickness)
cv2.circle(img_cv, (int(x), int(y)), 1, laser_color, cross_thickness)
# 额外的激光点标注(空心圆圈,便于肉眼查看)
ring_thickness = 1
cv2.circle(img_cv, (int(x), int(y)), 10, laser_color, ring_thickness)
cv2.circle(img_cv, (int(x), int(y)), 5, laser_color, ring_thickness)
cv2.circle(img_cv, (int(x), int(y)), 2, laser_color, -1)
# 如果检测到靶心,绘制靶心标注
if center and radius:
cx, cy = center
if ellipse_params:
(ell_center, (width, height), angle) = ellipse_params
cx_ell, cy_ell = int(ell_center[0]), int(ell_center[1])
# 绘制椭圆
cv2.ellipse(img_cv,
(cx_ell, cy_ell),
(int(width/2), int(height/2)),
angle,
0, 360,
(0, 255, 0),
2)
cv2.ellipse(img_cv, (cx_ell, cy_ell), (int(width / 2), int(height / 2)), angle, 0, 360, (0, 255, 0), 2)
cv2.circle(img_cv, (cx_ell, cy_ell), 3, (255, 0, 0), -1)
# 绘制短轴
minor_length = min(width, height) / 2
minor_angle = angle + 90 if width >= height else angle
minor_angle_rad = math.radians(minor_angle)
dx_minor = minor_length * math.cos(minor_angle_rad)
dy_minor = minor_length * math.sin(minor_angle_rad)
pt1_minor = (int(cx_ell - dx_minor), int(cy_ell - dy_minor))
pt2_minor = (int(cx_ell + dx_minor), int(cy_ell + dy_minor))
cv2.line(img_cv, pt1_minor, pt2_minor, (0, 0, 255), 2)
pt1 = (int(cx_ell - dx_minor), int(cy_ell - dy_minor))
pt2 = (int(cx_ell + dx_minor), int(cy_ell + dy_minor))
cv2.line(img_cv, pt1, pt2, (0, 0, 255), 2)
else:
# 绘制圆形靶心
cv2.circle(img_cv, (cx, cy), radius, (0, 0, 255), 2)
cv2.circle(img_cv, (cx, cy), 2, (0, 0, 255), -1)
# 如果检测到靶心,绘制从激光点到靶心的连线(可选,用于可视化偏移)
cv2.line(img_cv, (int(x), int(y)), (cx, cy), (255, 255, 0), 1)
# 转换回 MaixPy 图像格式并保存
result_img = image.cv2image(img_cv, False, False)
result_img.save(filename)
out = image.cv2image(img_cv, False, False)
out.save(filename)
if logger:
if center and radius:
logger.debug(f"图像已保存(含靶心标注): {filename}")
else:
logger.debug(f"图像已保存(无靶心,含激光十字线): {filename}")
# 清理旧图片如果目录下图片超过100张删除最老的
try:
image_files = []
for f in os.listdir(photo_dir):
if f.endswith(('.bmp', '.jpg', '.jpeg')):
filepath = os.path.join(photo_dir, f)
try:
mtime = os.path.getmtime(filepath)
image_files.append((mtime, filepath, f))
except Exception:
pass
from config import MAX_IMAGES
if len(image_files) > MAX_IMAGES:
image_files.sort(key=lambda x: x[0])
to_delete = len(image_files) - MAX_IMAGES
deleted_count = 0
for _, filepath, fname in image_files[:to_delete]:
try:
os.remove(filepath)
deleted_count += 1
if logger:
logger.debug(f"[VISION] 删除旧图片: {fname}")
except Exception as e:
if logger:
logger.warning(f"[VISION] 删除旧图片失败 {fname}: {e}")
if logger and deleted_count > 0:
logger.info(f"[VISION] 已清理 {deleted_count} 张旧图片,当前剩余 {MAX_IMAGES}")
except Exception as e:
if logger:
logger.warning(f"[VISION] 清理旧图片时出错(可忽略): {e}")
return filename
except Exception as e:
logger = logger_manager.logger
@@ -693,3 +667,85 @@ def save_shot_image(result_img, center, radius, method, ellipse_params,
logger.error(traceback.format_exc())
return None
def _save_worker_loop():
"""存图 worker从队列取任务并调用 _save_shot_image_impl。"""
while True:
try:
item = _save_queue.get()
if item is None:
break
_save_shot_image_impl(*item)
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[VISION] 存图 worker 异常: {e}")
import traceback
logger.error(traceback.format_exc())
finally:
try:
_save_queue.task_done()
except Exception:
pass
def start_save_shot_worker():
"""启动存图 worker 线程(应在程序初始化时调用一次)。"""
global _save_worker_started
with _save_worker_lock:
if _save_worker_started:
return
_save_worker_started = True
t = threading.Thread(target=_save_worker_loop, daemon=True)
t.start()
logger = logger_manager.logger
if logger:
logger.info("[VISION] 存图 worker 线程已启动")
def enqueue_save_shot(result_img, center, radius, method, ellipse_params,
laser_point, distance_m, shot_id=None, photo_dir=None):
"""
将存图任务放入队列,由 worker 异步保存。主线程传入 result_img 的复制,不阻塞。
"""
if not config.SAVE_IMAGE_ENABLED:
return
if photo_dir is None:
photo_dir = config.PHOTO_DIR
try:
img_cv = image.image2cv(result_img, False, False)
img_copy = np.copy(img_cv)
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[VISION] enqueue_save_shot 复制图像失败: {e}")
return
task = (img_copy, center, radius, method, ellipse_params, laser_point, distance_m, shot_id, photo_dir)
try:
_save_queue.put_nowait(task)
except queue.Full:
logger = logger_manager.logger
if logger:
logger.warning("[VISION] 存图队列已满,跳过本次保存")
def save_shot_image(result_img, center, radius, method, ellipse_params,
laser_point, distance_m, shot_id=None, photo_dir=None):
"""
保存射击图像(带标注)。同步调用,会阻塞。
主流程建议使用 enqueue_save_shot此处保留供校准、测试等场景使用。
"""
if not config.SAVE_IMAGE_ENABLED:
return None
if photo_dir is None:
photo_dir = config.PHOTO_DIR
try:
img_cv = image.image2cv(result_img, False, False)
return _save_shot_image_impl(img_cv, center, radius, method, ellipse_params,
laser_point, distance_m, shot_id, photo_dir)
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[VISION] save_shot_image 转换图像失败: {e}")
return None