395 lines
14 KiB
Python
395 lines
14 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
激光管理器模块
|
||
提供激光控制、校准等功能
|
||
"""
|
||
import json
|
||
import os
|
||
import binascii
|
||
from maix import time, camera
|
||
import threading
|
||
import config
|
||
from logger_manager import logger_manager
|
||
|
||
|
||
class LaserManager:
|
||
"""激光控制管理器(单例)"""
|
||
_instance = None
|
||
|
||
def __new__(cls):
|
||
if cls._instance is None:
|
||
cls._instance = super(LaserManager, cls).__new__(cls)
|
||
cls._instance._initialized = False
|
||
return cls._instance
|
||
|
||
def __init__(self):
|
||
if self._initialized:
|
||
return
|
||
|
||
# 私有状态
|
||
self._calibration_active = False
|
||
self._calibration_result = None
|
||
self._calibration_lock = threading.Lock()
|
||
self._laser_point = None
|
||
self._laser_turned_on = False
|
||
self._initialized = True
|
||
|
||
# ==================== 状态访问(只读属性)====================
|
||
|
||
@property
|
||
def calibration_active(self):
|
||
"""是否正在校准"""
|
||
return self._calibration_active
|
||
|
||
@property
|
||
def laser_point(self):
|
||
"""当前激光点"""
|
||
return self._laser_point
|
||
|
||
# ==================== 业务方法 ====================
|
||
|
||
def load_laser_point(self):
|
||
"""从配置文件加载激光中心点,失败则使用默认值"""
|
||
try:
|
||
if "laser_config.json" in os.listdir("/root"):
|
||
with open(config.CONFIG_FILE, "r") as f:
|
||
data = json.load(f)
|
||
if isinstance(data, list) and len(data) == 2:
|
||
self._laser_point = (int(data[0]), int(data[1]))
|
||
logger = logger_manager.logger
|
||
if logger:
|
||
logger.debug(f"[INFO] 加载激光点: {self._laser_point}")
|
||
return self._laser_point
|
||
else:
|
||
raise ValueError
|
||
else:
|
||
self._laser_point = config.DEFAULT_LASER_POINT
|
||
except:
|
||
self._laser_point = config.DEFAULT_LASER_POINT
|
||
|
||
return self._laser_point
|
||
|
||
def save_laser_point(self, point):
|
||
"""保存激光中心点到配置文件"""
|
||
try:
|
||
with open(config.CONFIG_FILE, "w") as f:
|
||
json.dump([point[0], point[1]], f)
|
||
self._laser_point = point
|
||
return True
|
||
except Exception as e:
|
||
logger = logger_manager.logger
|
||
if logger:
|
||
logger.error(f"[LASER] 保存激光点失败: {e}")
|
||
return False
|
||
|
||
def turn_on_laser(self):
|
||
"""发送指令开启激光,并读取回包(部分模块支持)"""
|
||
from hardware import hardware_manager
|
||
logger = logger_manager.logger
|
||
|
||
if hardware_manager.distance_serial is None:
|
||
if logger:
|
||
logger.error("[LASER] distance_serial 未初始化")
|
||
return None
|
||
|
||
# 打印调试信息
|
||
if logger:
|
||
logger.info(f"[LASER] 发送开启命令: {config.LASER_ON_CMD.hex()}")
|
||
|
||
# 清空接收缓冲区
|
||
try:
|
||
hardware_manager.distance_serial.read(-1) # 清空缓冲区
|
||
except:
|
||
pass
|
||
|
||
# 发送命令
|
||
written = hardware_manager.distance_serial.write(config.LASER_ON_CMD)
|
||
if logger:
|
||
logger.info(f"[LASER] 写入字节数: {written}")
|
||
|
||
# return None
|
||
|
||
# TODO: 暂时去掉这个等待
|
||
# 读取回包
|
||
print("before read:", time.ticks_ms())
|
||
resp = hardware_manager.distance_serial.read(len=20,timeout=10)
|
||
print("after read:", time.ticks_ms())
|
||
if resp:
|
||
if logger:
|
||
logger.info(f"[LASER] 收到回包 ({len(resp)}字节): {resp.hex()}")
|
||
if resp == config.LASER_ON_CMD:
|
||
if logger:
|
||
logger.info("✅ 激光开启指令已确认")
|
||
else:
|
||
if logger:
|
||
logger.warning("🔇 无回包(可能正常或模块不支持回包)")
|
||
return resp
|
||
|
||
def turn_off_laser(self):
|
||
"""发送指令关闭激光"""
|
||
from hardware import hardware_manager
|
||
logger = logger_manager.logger
|
||
|
||
if hardware_manager.distance_serial is None:
|
||
if logger:
|
||
logger.error("[LASER] distance_serial 未初始化")
|
||
return None
|
||
|
||
# 打印调试信息
|
||
if logger:
|
||
logger.info(f"[LASER] 发送关闭命令: {config.LASER_OFF_CMD.hex()}")
|
||
|
||
# 清空接收缓冲区
|
||
try:
|
||
hardware_manager.distance_serial.read(-1)
|
||
except:
|
||
pass
|
||
|
||
# 发送命令
|
||
written = hardware_manager.distance_serial.write(config.LASER_OFF_CMD)
|
||
if logger:
|
||
logger.info(f"[LASER] 写入字节数: {written}")
|
||
|
||
|
||
# 读取回包
|
||
resp = hardware_manager.distance_serial.read(20)
|
||
if resp:
|
||
if logger:
|
||
logger.info(f"[LASER] 收到回包 ({len(resp)}字节): {resp.hex()}")
|
||
else:
|
||
if logger:
|
||
logger.warning("🔇 无回包")
|
||
return resp
|
||
# 不用读回包
|
||
# return None
|
||
|
||
def flash_laser(self, duration_ms=1000):
|
||
"""闪一下激光(用于射箭反馈)"""
|
||
try:
|
||
self.turn_on_laser()
|
||
time.sleep_ms(duration_ms)
|
||
self.turn_off_laser()
|
||
except Exception as e:
|
||
logger = logger_manager.logger
|
||
if logger:
|
||
logger.error(f"闪激光失败: {e}")
|
||
|
||
def find_red_laser(self, frame, threshold=150):
|
||
"""在图像中查找最亮的红色激光点(基于 RGB 阈值)"""
|
||
w, h = frame.width(), frame.height()
|
||
img_bytes = frame.to_bytes()
|
||
max_sum = 0
|
||
best_pos = None
|
||
for y in range(0, h, 2):
|
||
for x in range(0, w, 2):
|
||
idx = (y * w + x) * 3
|
||
r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2]
|
||
if r > threshold and r > g * 2 and r > b * 2:
|
||
rgb_sum = r + g + b
|
||
if rgb_sum > max_sum:
|
||
max_sum = rgb_sum
|
||
best_pos = (x, y)
|
||
return best_pos
|
||
|
||
def calibrate_laser_position(self):
|
||
"""执行一次激光校准:拍照 → 找红点 → 保存坐标"""
|
||
time.sleep_ms(80)
|
||
cam = camera.Camera(640, 480)
|
||
frame = cam.read()
|
||
pos = self.find_red_laser(frame)
|
||
if pos:
|
||
self.save_laser_point(pos)
|
||
return pos
|
||
return None
|
||
|
||
def start_calibration(self):
|
||
"""开始校准(公共方法)"""
|
||
with self._calibration_lock:
|
||
if self._calibration_active:
|
||
return False
|
||
self._calibration_active = True
|
||
self._calibration_result = None
|
||
return True
|
||
|
||
def stop_calibration(self):
|
||
"""停止校准(公共方法)"""
|
||
with self._calibration_lock:
|
||
self._calibration_active = False
|
||
|
||
def set_calibration_result(self, result):
|
||
"""设置校准结果(内部方法)"""
|
||
with self._calibration_lock:
|
||
self._calibration_result = result
|
||
|
||
def get_calibration_result(self):
|
||
"""获取并清除校准结果(内部方法)"""
|
||
with self._calibration_lock:
|
||
result = self._calibration_result
|
||
self._calibration_result = None
|
||
return result
|
||
|
||
def parse_bcd_distance(self, bcd_bytes: bytes) -> float:
|
||
"""将 4 字节 BCD 码转换为距离(米)"""
|
||
if len(bcd_bytes) != 4:
|
||
return 0.0
|
||
try:
|
||
hex_string = binascii.hexlify(bcd_bytes).decode()
|
||
distance_int = int(hex_string)
|
||
return distance_int / 1000.0
|
||
except Exception as e:
|
||
logger = logger_manager.logger
|
||
if logger:
|
||
logger.error(f"[LASER] BCD 解析失败: {e}")
|
||
return 0.0
|
||
|
||
def read_distance_from_laser_sensor(self):
|
||
"""发送测距指令并返回距离(米)和信号质量
|
||
返回: (distance_m, signal_quality) 元组,失败返回 (0.0, 0)
|
||
"""
|
||
from hardware import hardware_manager
|
||
logger = logger_manager.logger
|
||
|
||
if hardware_manager.distance_serial is None:
|
||
if logger:
|
||
logger.error("[LASER] distance_serial 未初始化")
|
||
return (0.0, 0)
|
||
|
||
try:
|
||
# 清空缓冲区
|
||
try:
|
||
hardware_manager.distance_serial.read(-1)
|
||
except:
|
||
pass
|
||
# 打开激光
|
||
|
||
self.turn_on_laser()
|
||
self._laser_turned_on = True
|
||
# time.sleep_ms(500) # 需要一定时间让激光稳定
|
||
# 发送测距查询命令
|
||
hardware_manager.distance_serial.write(config.DISTANCE_QUERY_CMD)
|
||
# time.sleep_ms(500) # 测试结果:这里的等待没有用!
|
||
self.turn_off_laser()
|
||
self._laser_turned_on = False
|
||
|
||
# 这里的等待才是有效的!大概350ms能读到数据
|
||
# 循环读取响应,最多等待500ms
|
||
start_time = time.ticks_ms()
|
||
max_wait_ms = 500
|
||
response = None
|
||
|
||
while True:
|
||
# 检查是否超时
|
||
# 注意:使用 time.ticks_diff(start_time, time.ticks_ms()) 避免负数问题
|
||
elapsed_ms = abs(time.ticks_diff(start_time, time.ticks_ms()))
|
||
if elapsed_ms >= max_wait_ms:
|
||
if logger:
|
||
logger.warning(f"[LASER] 读取超时 ({elapsed_ms}ms),未收到完整响应")
|
||
return (0.0, 0)
|
||
|
||
# 尝试读取数据
|
||
response = hardware_manager.distance_serial.read(config.DISTANCE_RESPONSE_LEN)
|
||
|
||
# 如果读到完整数据,立即返回
|
||
if response and len(response) == config.DISTANCE_RESPONSE_LEN:
|
||
elapsed_ms = abs(time.ticks_diff(start_time, time.ticks_ms()))
|
||
if logger:
|
||
logger.debug(f"[LASER] 收到响应 ({elapsed_ms}ms)")
|
||
break
|
||
|
||
# 如果还没超时,短暂等待后继续尝试
|
||
time.sleep_ms(10) # 每次循环等待10ms,避免CPU占用过高
|
||
|
||
# 验证响应格式
|
||
if response and len(response) == config.DISTANCE_RESPONSE_LEN:
|
||
if response[3] != 0x20:
|
||
if response[0] == 0xEE:
|
||
err_code = (response[7] << 8) | response[8]
|
||
if logger:
|
||
logger.warning(f"[LASER] 模块错误代码: {hex(err_code)}")
|
||
return (0.0, 0)
|
||
|
||
# 解析BCD码距离
|
||
bcd_bytes = response[6:10]
|
||
distance_value_m = self.parse_bcd_distance(bcd_bytes)
|
||
signal_quality = (response[10] << 8) | response[11]
|
||
|
||
if logger:
|
||
logger.debug(f"[LASER] 测距成功: {distance_value_m:.3f} m, 信号质量: {signal_quality}")
|
||
return (distance_value_m, signal_quality)
|
||
|
||
if logger:
|
||
logger.warning(f"[LASER] 无效响应: {response.hex() if response else 'None'}")
|
||
return (0.0, 0)
|
||
except Exception as e:
|
||
logger = logger_manager.logger
|
||
if logger:
|
||
logger.error(f"[LASER] 读取激光测距失败: {e}")
|
||
return (0.0, 0)
|
||
|
||
def quick_measure_distance(self):
|
||
"""
|
||
快速激光测距:打开激光 → 测距 → 关闭激光
|
||
激光开启时间最小化(约500-600ms),尽量不让用户觉察到
|
||
返回: (distance_m, signal_quality) 元组,失败返回 (0.0, 0)
|
||
"""
|
||
logger = logger_manager.logger
|
||
self._laser_turned_on = False
|
||
|
||
try:
|
||
|
||
|
||
# 等待激光稳定(最小延迟)
|
||
# time.sleep_ms(50)
|
||
|
||
# 读取距离和信号质量
|
||
result = self.read_distance_from_laser_sensor()
|
||
return result
|
||
except Exception as e:
|
||
if logger:
|
||
logger.error(f"[LASER] 快速测距异常: {e}")
|
||
return (0.0, 0)
|
||
finally:
|
||
# 确保激光关闭
|
||
if self._laser_turned_on:
|
||
try:
|
||
self.turn_off_laser()
|
||
except Exception as e:
|
||
if logger:
|
||
logger.error(f"[LASER] 关闭激光失败: {e}")
|
||
|
||
|
||
# 创建全局单例实例
|
||
laser_manager = LaserManager()
|
||
|
||
# ==================== 向后兼容的函数接口 ====================
|
||
|
||
def load_laser_point():
|
||
"""加载激光点(向后兼容接口)"""
|
||
return laser_manager.load_laser_point()
|
||
|
||
def save_laser_point(point):
|
||
"""保存激光点(向后兼容接口)"""
|
||
return laser_manager.save_laser_point(point)
|
||
|
||
def turn_on_laser():
|
||
"""开启激光(向后兼容接口)"""
|
||
return laser_manager.turn_on_laser()
|
||
|
||
def turn_off_laser():
|
||
"""关闭激光(向后兼容接口)"""
|
||
return laser_manager.turn_off_laser()
|
||
|
||
def flash_laser(duration_ms=1000):
|
||
"""闪激光(向后兼容接口)"""
|
||
return laser_manager.flash_laser(duration_ms)
|
||
|
||
def find_red_laser(frame, threshold=150):
|
||
"""查找红色激光点(向后兼容接口)"""
|
||
return laser_manager.find_red_laser(frame, threshold)
|
||
|
||
def calibrate_laser_position():
|
||
"""校准激光位置(向后兼容接口)"""
|
||
return laser_manager.calibrate_laser_position()
|
||
|