Files
archery/laser_manager.py

561 lines
21 KiB
Python
Raw Normal View History

2026-01-12 11:39:27 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
激光管理器模块
提供激光控制校准等功能
"""
import json
import os
import binascii
2026-01-12 11:39:27 +08:00
from maix import time, camera
import threading
import config
from logger_manager import logger_manager
class LaserManager:
"""激光控制管理器(单例)"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(LaserManager, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
# 私有状态
self._calibration_active = False
self._calibration_result = None
self._calibration_lock = threading.Lock()
self._laser_point = None
self._laser_turned_on = False
2026-01-12 11:39:27 +08:00
self._initialized = True
# ==================== 状态访问(只读属性)====================
@property
def calibration_active(self):
"""是否正在校准"""
return self._calibration_active
@property
def laser_point(self):
"""当前激光点"""
return self._laser_point
# ==================== 业务方法 ====================
def load_laser_point(self):
"""从配置文件加载激光中心点,失败则使用默认值"""
try:
if "laser_config.json" in os.listdir("/root"):
with open(config.CONFIG_FILE, "r") as f:
data = json.load(f)
if isinstance(data, list) and len(data) == 2:
self._laser_point = (int(data[0]), int(data[1]))
logger = logger_manager.logger
if logger:
logger.debug(f"[INFO] 加载激光点: {self._laser_point}")
return self._laser_point
else:
raise ValueError
else:
self._laser_point = config.DEFAULT_LASER_POINT
except:
self._laser_point = config.DEFAULT_LASER_POINT
return self._laser_point
def save_laser_point(self, point):
"""保存激光中心点到配置文件"""
try:
with open(config.CONFIG_FILE, "w") as f:
json.dump([point[0], point[1]], f)
self._laser_point = point
return True
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[LASER] 保存激光点失败: {e}")
return False
def turn_on_laser(self):
"""发送指令开启激光,并读取回包(部分模块支持)"""
from hardware import hardware_manager
logger = logger_manager.logger
if hardware_manager.distance_serial is None:
if logger:
logger.error("[LASER] distance_serial 未初始化")
return None
# 打印调试信息
if logger:
logger.info(f"[LASER] 发送开启命令: {config.LASER_ON_CMD.hex()}")
# 清空接收缓冲区
try:
hardware_manager.distance_serial.read(-1) # 清空缓冲区
except:
pass
# 发送命令
written = hardware_manager.distance_serial.write(config.LASER_ON_CMD)
if logger:
logger.info(f"[LASER] 写入字节数: {written}")
2026-01-12 18:53:01 +08:00
# return None
# TODO: 暂时去掉这个等待
2026-01-12 11:39:27 +08:00
# 读取回包
2026-01-12 18:53:01 +08:00
print("before read:", time.ticks_ms())
resp = hardware_manager.distance_serial.read(len=20,timeout=10)
print("after read:", time.ticks_ms())
if resp:
if logger:
logger.info(f"[LASER] 收到回包 ({len(resp)}字节): {resp.hex()}")
if resp == config.LASER_ON_CMD:
if logger:
logger.info("✅ 激光开启指令已确认")
else:
if logger:
logger.warning("🔇 无回包(可能正常或模块不支持回包)")
return resp
2026-01-12 11:39:27 +08:00
def turn_off_laser(self):
"""发送指令关闭激光"""
from hardware import hardware_manager
logger = logger_manager.logger
if hardware_manager.distance_serial is None:
if logger:
logger.error("[LASER] distance_serial 未初始化")
return None
# 打印调试信息
if logger:
logger.info(f"[LASER] 发送关闭命令: {config.LASER_OFF_CMD.hex()}")
# 清空接收缓冲区
try:
hardware_manager.distance_serial.read(-1)
except:
pass
# 发送命令
written = hardware_manager.distance_serial.write(config.LASER_OFF_CMD)
if logger:
logger.info(f"[LASER] 写入字节数: {written}")
# 读取回包
2026-01-12 18:53:01 +08:00
resp = hardware_manager.distance_serial.read(20)
if resp:
if logger:
logger.info(f"[LASER] 收到回包 ({len(resp)}字节): {resp.hex()}")
else:
if logger:
logger.warning("🔇 无回包")
return resp
# 不用读回包
2026-01-12 18:53:01 +08:00
# return None
2026-01-12 11:39:27 +08:00
def flash_laser(self, duration_ms=1000):
"""闪一下激光(用于射箭反馈)"""
try:
self.turn_on_laser()
time.sleep_ms(duration_ms)
self.turn_off_laser()
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"闪激光失败: {e}")
2026-01-13 00:01:39 +08:00
# def find_red_laser(self, frame, threshold=150):
# """在图像中查找最亮的红色激光点(基于 RGB 阈值)"""
# w, h = frame.width(), frame.height()
# img_bytes = frame.to_bytes()
# max_sum = 0
# best_pos = None
# for y in range(0, h, 2):
# for x in range(0, w, 2):
# idx = (y * w + x) * 3
# r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2]
# if r > threshold and r > g * 2 and r > b * 2:
# rgb_sum = r + g + b
# if rgb_sum > max_sum:
# max_sum = rgb_sum
# best_pos = (x, y)
# return best_pos
# def find_red_laser(self, frame, threshold=150, search_radius=50):
# """
# 在图像中心附近查找最亮的红色激光点(基于 RGB 阈值)
# Args:
# frame: 图像帧
# threshold: 红色通道阈值默认150
# search_radius: 搜索半径像素从图像中心开始搜索默认150
# Returns:
# (x, y) 坐标,如果未找到则返回 None
# """
# w, h = frame.width(), frame.height()
# center_x, center_y = w // 2, h // 2
# # 只在中心区域搜索
# x_min = max(0, center_x - search_radius)
# x_max = min(w, center_x + search_radius)
# y_min = max(0, center_y - search_radius)
# y_max = min(h, center_y + search_radius)
# img_bytes = frame.to_bytes()
# max_score = 0
# best_pos = None
# for y in range(y_min, y_max, 2):
# for x in range(x_min, x_max, 2):
# idx = (y * w + x) * 3
# r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2]
# # 判断是否为红色或过曝的红色(发白)
# is_red = False
# is_overexposed_red = False
# # 情况1正常红色r 明显大于 g 和 b
# if r > threshold and r > g * 2 and r > b * 2:
# is_red = True
# # 情况2过曝的红色发白r, g, b 都接近255但 r 仍然最大)
# # 过曝时r, g, b 都接近 255但 r 应该仍然是最高的
# elif r > 200 and g > 200 and b > 200: # 接近白色
# if r >= g and r >= b and (r - g) > 10 and (r - b) > 10: # r 仍然明显最大
# is_overexposed_red = True
# if is_red or is_overexposed_red:
# # 计算得分RGB 总和 + 距离中心权重(越靠近中心得分越高)
# rgb_sum = r + g + b
# # 计算到中心的距离
# dx = x - center_x
# dy = y - center_y
# distance_from_center = (dx * dx + dy * dy) ** 0.5
# # 距离权重:距离越近,权重越高(最大权重为 1.0,距离为 0 时)
# # 当距离为 search_radius 时,权重为 0.5
# distance_weight = 1.0 - (distance_from_center / search_radius) * 0.5
# distance_weight = max(0.5, distance_weight) # 最小权重 0.5
# # 综合得分RGB 总和 * 距离权重
# score = rgb_sum * distance_weight
# if score > max_score:
# max_score = score
# best_pos = (x, y)
# print("best_pos:", best_pos)
# return best_pos
def find_red_laser(self, frame, threshold=150, search_radius=150):
"""
在图像中心附近查找最亮的红色激光点基于 RGB 阈值
使用两阶段搜索先粗搜索找到候选区域再精细搜索找到最亮点
Args:
frame: 图像帧
threshold: 红色通道阈值默认150
search_radius: 搜索半径像素从图像中心开始搜索默认150
Returns:
(x, y) 坐标如果未找到则返回 None
"""
2026-01-12 11:39:27 +08:00
w, h = frame.width(), frame.height()
2026-01-13 00:01:39 +08:00
center_x, center_y = w // 2, h // 2
# 只在中心区域搜索
x_min = max(0, center_x - search_radius)
x_max = min(w, center_x + search_radius)
y_min = max(0, center_y - search_radius)
y_max = min(h, center_y + search_radius)
2026-01-12 11:39:27 +08:00
img_bytes = frame.to_bytes()
2026-01-13 00:01:39 +08:00
max_score = 0
candidate_pos = None
# 第一阶段粗搜索每2像素采样找到候选点
for y in range(y_min, y_max, 2):
for x in range(x_min, x_max, 2):
idx = (y * w + x) * 3
r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2]
# 判断是否为红色或过曝的红色(发白)
is_red = False
is_overexposed_red = False
# 情况1正常红色r 明显大于 g 和 b
if r > threshold and r > g * 2 and r > b * 2:
is_red = True
# 情况2过曝的红色发白r, g, b 都接近255但 r 仍然最大)
elif r > 200 and g > 200 and b > 200: # 接近白色
if r >= g and r >= b and (r - g) > 10 and (r - b) > 10: # r 仍然明显最大
is_overexposed_red = True
if is_red or is_overexposed_red:
# 计算得分RGB 总和 + 距离中心权重
rgb_sum = r + g + b
dx = x - center_x
dy = y - center_y
distance_from_center = (dx * dx + dy * dy) ** 0.5
distance_weight = 1.0 - (distance_from_center / search_radius) * 0.5
distance_weight = max(0.5, distance_weight)
score = rgb_sum * distance_weight
if score > max_score:
max_score = score
candidate_pos = (x, y)
# 如果没有找到候选点,直接返回
if candidate_pos is None:
return None
# 第二阶段在候选点周围进行精细搜索1像素间隔
# 在候选点周围 5x5 或 7x7 区域内找最亮的点
refine_radius = 3 # 精细搜索半径(像素)
cx, cy = candidate_pos
x_min_fine = max(0, cx - refine_radius)
x_max_fine = min(w, cx + refine_radius + 1)
y_min_fine = max(0, cy - refine_radius)
y_max_fine = min(h, cy + refine_radius + 1)
max_brightness = 0
best_pos = candidate_pos
# 精细搜索1像素间隔只考虑亮度RGB总和
for y in range(y_min_fine, y_max_fine, 1):
for x in range(x_min_fine, x_max_fine, 1):
2026-01-12 11:39:27 +08:00
idx = (y * w + x) * 3
r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2]
2026-01-13 00:01:39 +08:00
# 判断是否为红色或过曝的红色
is_red = False
is_overexposed_red = False
2026-01-12 11:39:27 +08:00
if r > threshold and r > g * 2 and r > b * 2:
2026-01-13 00:01:39 +08:00
is_red = True
elif r > 200 and g > 200 and b > 200:
if r >= g and r >= b and (r - g) > 10 and (r - b) > 10:
is_overexposed_red = True
if is_red or is_overexposed_red:
2026-01-12 11:39:27 +08:00
rgb_sum = r + g + b
2026-01-13 00:01:39 +08:00
# 精细搜索阶段只考虑亮度,不考虑距离权重
if rgb_sum > max_brightness:
max_brightness = rgb_sum
2026-01-12 11:39:27 +08:00
best_pos = (x, y)
2026-01-13 00:01:39 +08:00
2026-01-12 11:39:27 +08:00
return best_pos
2026-01-13 00:01:39 +08:00
2026-01-12 11:39:27 +08:00
def calibrate_laser_position(self):
"""执行一次激光校准:拍照 → 找红点 → 保存坐标"""
time.sleep_ms(80)
cam = camera.Camera(640, 480)
frame = cam.read()
pos = self.find_red_laser(frame)
if pos:
self.save_laser_point(pos)
return pos
return None
def start_calibration(self):
"""开始校准(公共方法)"""
with self._calibration_lock:
if self._calibration_active:
return False
self._calibration_active = True
self._calibration_result = None
return True
def stop_calibration(self):
"""停止校准(公共方法)"""
with self._calibration_lock:
self._calibration_active = False
def set_calibration_result(self, result):
"""设置校准结果(内部方法)"""
with self._calibration_lock:
self._calibration_result = result
def get_calibration_result(self):
"""获取并清除校准结果(内部方法)"""
with self._calibration_lock:
result = self._calibration_result
self._calibration_result = None
return result
def parse_bcd_distance(self, bcd_bytes: bytes) -> float:
"""将 4 字节 BCD 码转换为距离(米)"""
if len(bcd_bytes) != 4:
return 0.0
try:
hex_string = binascii.hexlify(bcd_bytes).decode()
distance_int = int(hex_string)
return distance_int / 1000.0
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[LASER] BCD 解析失败: {e}")
return 0.0
2026-01-12 20:53:23 +08:00
def read_distance_from_laser_sensor(self):
"""发送测距指令并返回距离(米)和信号质量
返回: (distance_m, signal_quality) 元组失败返回 (0.0, 0)
"""
from hardware import hardware_manager
logger = logger_manager.logger
if hardware_manager.distance_serial is None:
if logger:
logger.error("[LASER] distance_serial 未初始化")
2026-01-12 20:53:23 +08:00
return (0.0, 0)
try:
# 清空缓冲区
try:
hardware_manager.distance_serial.read(-1)
except:
pass
# 打开激光
self.turn_on_laser()
self._laser_turned_on = True
2026-01-12 18:53:01 +08:00
# time.sleep_ms(500) # 需要一定时间让激光稳定
# 发送测距查询命令
hardware_manager.distance_serial.write(config.DISTANCE_QUERY_CMD)
# time.sleep_ms(500) # 测试结果:这里的等待没有用!
self.turn_off_laser()
self._laser_turned_on = False
# 这里的等待才是有效的大概350ms能读到数据
# 循环读取响应最多等待500ms
start_time = time.ticks_ms()
max_wait_ms = 500
response = None
while True:
# 检查是否超时
2026-01-12 20:53:23 +08:00
# 注意:使用 time.ticks_diff(start_time, time.ticks_ms()) 避免负数问题
elapsed_ms = abs(time.ticks_diff(start_time, time.ticks_ms()))
if elapsed_ms >= max_wait_ms:
if logger:
logger.warning(f"[LASER] 读取超时 ({elapsed_ms}ms),未收到完整响应")
2026-01-12 20:53:23 +08:00
return (0.0, 0)
# 尝试读取数据
response = hardware_manager.distance_serial.read(config.DISTANCE_RESPONSE_LEN)
# 如果读到完整数据,立即返回
if response and len(response) == config.DISTANCE_RESPONSE_LEN:
2026-01-12 20:53:23 +08:00
elapsed_ms = abs(time.ticks_diff(start_time, time.ticks_ms()))
if logger:
logger.debug(f"[LASER] 收到响应 ({elapsed_ms}ms)")
break
# 如果还没超时,短暂等待后继续尝试
time.sleep_ms(10) # 每次循环等待10ms避免CPU占用过高
# 验证响应格式
if response and len(response) == config.DISTANCE_RESPONSE_LEN:
if response[3] != 0x20:
if response[0] == 0xEE:
err_code = (response[7] << 8) | response[8]
if logger:
logger.warning(f"[LASER] 模块错误代码: {hex(err_code)}")
2026-01-12 20:53:23 +08:00
return (0.0, 0)
# 解析BCD码距离
bcd_bytes = response[6:10]
distance_value_m = self.parse_bcd_distance(bcd_bytes)
signal_quality = (response[10] << 8) | response[11]
if logger:
logger.debug(f"[LASER] 测距成功: {distance_value_m:.3f} m, 信号质量: {signal_quality}")
2026-01-12 20:53:23 +08:00
return (distance_value_m, signal_quality)
if logger:
logger.warning(f"[LASER] 无效响应: {response.hex() if response else 'None'}")
2026-01-12 20:53:23 +08:00
return (0.0, 0)
except Exception as e:
logger = logger_manager.logger
if logger:
logger.error(f"[LASER] 读取激光测距失败: {e}")
2026-01-12 20:53:23 +08:00
return (0.0, 0)
2026-01-12 20:53:23 +08:00
def quick_measure_distance(self):
"""
快速激光测距打开激光 测距 关闭激光
激光开启时间最小化约500-600ms尽量不让用户觉察到
2026-01-12 20:53:23 +08:00
返回: (distance_m, signal_quality) 元组失败返回 (0.0, 0)
"""
logger = logger_manager.logger
self._laser_turned_on = False
try:
# 等待激光稳定(最小延迟)
# time.sleep_ms(50)
2026-01-12 20:53:23 +08:00
# 读取距离和信号质量
result = self.read_distance_from_laser_sensor()
return result
except Exception as e:
if logger:
logger.error(f"[LASER] 快速测距异常: {e}")
2026-01-12 20:53:23 +08:00
return (0.0, 0)
finally:
# 确保激光关闭
if self._laser_turned_on:
try:
self.turn_off_laser()
except Exception as e:
if logger:
logger.error(f"[LASER] 关闭激光失败: {e}")
2026-01-12 11:39:27 +08:00
# 创建全局单例实例
laser_manager = LaserManager()
# ==================== 向后兼容的函数接口 ====================
def load_laser_point():
"""加载激光点(向后兼容接口)"""
return laser_manager.load_laser_point()
def save_laser_point(point):
"""保存激光点(向后兼容接口)"""
return laser_manager.save_laser_point(point)
def turn_on_laser():
"""开启激光(向后兼容接口)"""
return laser_manager.turn_on_laser()
def turn_off_laser():
"""关闭激光(向后兼容接口)"""
return laser_manager.turn_off_laser()
def flash_laser(duration_ms=1000):
"""闪激光(向后兼容接口)"""
return laser_manager.flash_laser(duration_ms)
def find_red_laser(frame, threshold=150):
"""查找红色激光点(向后兼容接口)"""
return laser_manager.find_red_laser(frame, threshold)
def calibrate_laser_position():
"""校准激光位置(向后兼容接口)"""
return laser_manager.calibrate_laser_position()