2026-01-12 11:39:27 +08:00
|
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
"""
|
|
|
|
|
|
激光管理器模块
|
|
|
|
|
|
提供激光控制、校准等功能
|
|
|
|
|
|
"""
|
|
|
|
|
|
import json
|
|
|
|
|
|
import os
|
2026-01-12 18:06:04 +08:00
|
|
|
|
import binascii
|
2026-01-12 11:39:27 +08:00
|
|
|
|
from maix import time, camera
|
|
|
|
|
|
import threading
|
|
|
|
|
|
import config
|
|
|
|
|
|
from logger_manager import logger_manager
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class LaserManager:
|
|
|
|
|
|
"""激光控制管理器(单例)"""
|
|
|
|
|
|
_instance = None
|
|
|
|
|
|
|
|
|
|
|
|
def __new__(cls):
|
|
|
|
|
|
if cls._instance is None:
|
|
|
|
|
|
cls._instance = super(LaserManager, cls).__new__(cls)
|
|
|
|
|
|
cls._instance._initialized = False
|
|
|
|
|
|
return cls._instance
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
|
if self._initialized:
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
# 私有状态
|
|
|
|
|
|
self._calibration_active = False
|
|
|
|
|
|
self._calibration_result = None
|
|
|
|
|
|
self._calibration_lock = threading.Lock()
|
|
|
|
|
|
self._laser_point = None
|
2026-01-12 18:06:04 +08:00
|
|
|
|
self._laser_turned_on = False
|
2026-01-12 11:39:27 +08:00
|
|
|
|
self._initialized = True
|
|
|
|
|
|
|
|
|
|
|
|
# ==================== 状态访问(只读属性)====================
|
|
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
|
def calibration_active(self):
|
|
|
|
|
|
"""是否正在校准"""
|
|
|
|
|
|
return self._calibration_active
|
|
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
|
def laser_point(self):
|
|
|
|
|
|
"""当前激光点"""
|
|
|
|
|
|
return self._laser_point
|
|
|
|
|
|
|
|
|
|
|
|
# ==================== 业务方法 ====================
|
|
|
|
|
|
|
|
|
|
|
|
def load_laser_point(self):
|
|
|
|
|
|
"""从配置文件加载激光中心点,失败则使用默认值"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
if "laser_config.json" in os.listdir("/root"):
|
|
|
|
|
|
with open(config.CONFIG_FILE, "r") as f:
|
|
|
|
|
|
data = json.load(f)
|
|
|
|
|
|
if isinstance(data, list) and len(data) == 2:
|
|
|
|
|
|
self._laser_point = (int(data[0]), int(data[1]))
|
|
|
|
|
|
logger = logger_manager.logger
|
|
|
|
|
|
if logger:
|
|
|
|
|
|
logger.debug(f"[INFO] 加载激光点: {self._laser_point}")
|
|
|
|
|
|
return self._laser_point
|
|
|
|
|
|
else:
|
|
|
|
|
|
raise ValueError
|
|
|
|
|
|
else:
|
|
|
|
|
|
self._laser_point = config.DEFAULT_LASER_POINT
|
|
|
|
|
|
except:
|
|
|
|
|
|
self._laser_point = config.DEFAULT_LASER_POINT
|
|
|
|
|
|
|
|
|
|
|
|
return self._laser_point
|
|
|
|
|
|
|
|
|
|
|
|
def save_laser_point(self, point):
|
|
|
|
|
|
"""保存激光中心点到配置文件"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
with open(config.CONFIG_FILE, "w") as f:
|
|
|
|
|
|
json.dump([point[0], point[1]], f)
|
|
|
|
|
|
self._laser_point = point
|
|
|
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger = logger_manager.logger
|
|
|
|
|
|
if logger:
|
|
|
|
|
|
logger.error(f"[LASER] 保存激光点失败: {e}")
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
def turn_on_laser(self):
|
|
|
|
|
|
"""发送指令开启激光,并读取回包(部分模块支持)"""
|
|
|
|
|
|
from hardware import hardware_manager
|
|
|
|
|
|
logger = logger_manager.logger
|
|
|
|
|
|
|
|
|
|
|
|
if hardware_manager.distance_serial is None:
|
|
|
|
|
|
if logger:
|
|
|
|
|
|
logger.error("[LASER] distance_serial 未初始化")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
# 打印调试信息
|
|
|
|
|
|
if logger:
|
|
|
|
|
|
logger.info(f"[LASER] 发送开启命令: {config.LASER_ON_CMD.hex()}")
|
|
|
|
|
|
|
|
|
|
|
|
# 清空接收缓冲区
|
|
|
|
|
|
try:
|
|
|
|
|
|
hardware_manager.distance_serial.read(-1) # 清空缓冲区
|
|
|
|
|
|
except:
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
# 发送命令
|
|
|
|
|
|
written = hardware_manager.distance_serial.write(config.LASER_ON_CMD)
|
|
|
|
|
|
if logger:
|
|
|
|
|
|
logger.info(f"[LASER] 写入字节数: {written}")
|
|
|
|
|
|
|
2026-01-12 18:53:01 +08:00
|
|
|
|
# return None
|
2026-01-12 18:06:04 +08:00
|
|
|
|
|
|
|
|
|
|
# TODO: 暂时去掉这个等待
|
2026-01-12 11:39:27 +08:00
|
|
|
|
# 读取回包
|
2026-01-12 18:53:01 +08:00
|
|
|
|
print("before read:", time.ticks_ms())
|
|
|
|
|
|
resp = hardware_manager.distance_serial.read(len=20,timeout=10)
|
|
|
|
|
|
print("after read:", time.ticks_ms())
|
|
|
|
|
|
if resp:
|
|
|
|
|
|
if logger:
|
|
|
|
|
|
logger.info(f"[LASER] 收到回包 ({len(resp)}字节): {resp.hex()}")
|
|
|
|
|
|
if resp == config.LASER_ON_CMD:
|
|
|
|
|
|
if logger:
|
|
|
|
|
|
logger.info("✅ 激光开启指令已确认")
|
|
|
|
|
|
else:
|
|
|
|
|
|
if logger:
|
|
|
|
|
|
logger.warning("🔇 无回包(可能正常或模块不支持回包)")
|
|
|
|
|
|
return resp
|
2026-01-12 11:39:27 +08:00
|
|
|
|
|
|
|
|
|
|
def turn_off_laser(self):
|
|
|
|
|
|
"""发送指令关闭激光"""
|
|
|
|
|
|
from hardware import hardware_manager
|
|
|
|
|
|
logger = logger_manager.logger
|
|
|
|
|
|
|
|
|
|
|
|
if hardware_manager.distance_serial is None:
|
|
|
|
|
|
if logger:
|
|
|
|
|
|
logger.error("[LASER] distance_serial 未初始化")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
# 打印调试信息
|
|
|
|
|
|
if logger:
|
|
|
|
|
|
logger.info(f"[LASER] 发送关闭命令: {config.LASER_OFF_CMD.hex()}")
|
|
|
|
|
|
|
|
|
|
|
|
# 清空接收缓冲区
|
|
|
|
|
|
try:
|
|
|
|
|
|
hardware_manager.distance_serial.read(-1)
|
|
|
|
|
|
except:
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
# 发送命令
|
|
|
|
|
|
written = hardware_manager.distance_serial.write(config.LASER_OFF_CMD)
|
|
|
|
|
|
if logger:
|
|
|
|
|
|
logger.info(f"[LASER] 写入字节数: {written}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 读取回包
|
2026-01-12 18:53:01 +08:00
|
|
|
|
resp = hardware_manager.distance_serial.read(20)
|
|
|
|
|
|
if resp:
|
|
|
|
|
|
if logger:
|
|
|
|
|
|
logger.info(f"[LASER] 收到回包 ({len(resp)}字节): {resp.hex()}")
|
|
|
|
|
|
else:
|
|
|
|
|
|
if logger:
|
|
|
|
|
|
logger.warning("🔇 无回包")
|
|
|
|
|
|
return resp
|
2026-01-12 18:06:04 +08:00
|
|
|
|
# 不用读回包
|
2026-01-12 18:53:01 +08:00
|
|
|
|
# return None
|
2026-01-12 11:39:27 +08:00
|
|
|
|
|
|
|
|
|
|
def flash_laser(self, duration_ms=1000):
|
|
|
|
|
|
"""闪一下激光(用于射箭反馈)"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
self.turn_on_laser()
|
|
|
|
|
|
time.sleep_ms(duration_ms)
|
|
|
|
|
|
self.turn_off_laser()
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger = logger_manager.logger
|
|
|
|
|
|
if logger:
|
|
|
|
|
|
logger.error(f"闪激光失败: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
def find_red_laser(self, frame, threshold=150):
|
|
|
|
|
|
"""在图像中查找最亮的红色激光点(基于 RGB 阈值)"""
|
|
|
|
|
|
w, h = frame.width(), frame.height()
|
|
|
|
|
|
img_bytes = frame.to_bytes()
|
|
|
|
|
|
max_sum = 0
|
|
|
|
|
|
best_pos = None
|
|
|
|
|
|
for y in range(0, h, 2):
|
|
|
|
|
|
for x in range(0, w, 2):
|
|
|
|
|
|
idx = (y * w + x) * 3
|
|
|
|
|
|
r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2]
|
|
|
|
|
|
if r > threshold and r > g * 2 and r > b * 2:
|
|
|
|
|
|
rgb_sum = r + g + b
|
|
|
|
|
|
if rgb_sum > max_sum:
|
|
|
|
|
|
max_sum = rgb_sum
|
|
|
|
|
|
best_pos = (x, y)
|
|
|
|
|
|
return best_pos
|
|
|
|
|
|
|
|
|
|
|
|
def calibrate_laser_position(self):
|
|
|
|
|
|
"""执行一次激光校准:拍照 → 找红点 → 保存坐标"""
|
|
|
|
|
|
time.sleep_ms(80)
|
|
|
|
|
|
cam = camera.Camera(640, 480)
|
|
|
|
|
|
frame = cam.read()
|
|
|
|
|
|
pos = self.find_red_laser(frame)
|
|
|
|
|
|
if pos:
|
|
|
|
|
|
self.save_laser_point(pos)
|
|
|
|
|
|
return pos
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def start_calibration(self):
|
|
|
|
|
|
"""开始校准(公共方法)"""
|
|
|
|
|
|
with self._calibration_lock:
|
|
|
|
|
|
if self._calibration_active:
|
|
|
|
|
|
return False
|
|
|
|
|
|
self._calibration_active = True
|
|
|
|
|
|
self._calibration_result = None
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
def stop_calibration(self):
|
|
|
|
|
|
"""停止校准(公共方法)"""
|
|
|
|
|
|
with self._calibration_lock:
|
|
|
|
|
|
self._calibration_active = False
|
|
|
|
|
|
|
|
|
|
|
|
def set_calibration_result(self, result):
|
|
|
|
|
|
"""设置校准结果(内部方法)"""
|
|
|
|
|
|
with self._calibration_lock:
|
|
|
|
|
|
self._calibration_result = result
|
|
|
|
|
|
|
|
|
|
|
|
def get_calibration_result(self):
|
|
|
|
|
|
"""获取并清除校准结果(内部方法)"""
|
|
|
|
|
|
with self._calibration_lock:
|
|
|
|
|
|
result = self._calibration_result
|
|
|
|
|
|
self._calibration_result = None
|
|
|
|
|
|
return result
|
2026-01-12 18:06:04 +08:00
|
|
|
|
|
|
|
|
|
|
def parse_bcd_distance(self, bcd_bytes: bytes) -> float:
|
|
|
|
|
|
"""将 4 字节 BCD 码转换为距离(米)"""
|
|
|
|
|
|
if len(bcd_bytes) != 4:
|
|
|
|
|
|
return 0.0
|
|
|
|
|
|
try:
|
|
|
|
|
|
hex_string = binascii.hexlify(bcd_bytes).decode()
|
|
|
|
|
|
distance_int = int(hex_string)
|
|
|
|
|
|
return distance_int / 1000.0
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger = logger_manager.logger
|
|
|
|
|
|
if logger:
|
|
|
|
|
|
logger.error(f"[LASER] BCD 解析失败: {e}")
|
|
|
|
|
|
return 0.0
|
|
|
|
|
|
|
|
|
|
|
|
def read_distance_from_laser_sensor(self) -> float:
|
|
|
|
|
|
"""发送测距指令并返回距离(米)"""
|
|
|
|
|
|
from hardware import hardware_manager
|
|
|
|
|
|
logger = logger_manager.logger
|
|
|
|
|
|
|
|
|
|
|
|
if hardware_manager.distance_serial is None:
|
|
|
|
|
|
if logger:
|
|
|
|
|
|
logger.error("[LASER] distance_serial 未初始化")
|
|
|
|
|
|
return 0.0
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
# 清空缓冲区
|
|
|
|
|
|
try:
|
|
|
|
|
|
hardware_manager.distance_serial.read(-1)
|
|
|
|
|
|
except:
|
|
|
|
|
|
pass
|
|
|
|
|
|
# 打开激光
|
|
|
|
|
|
|
|
|
|
|
|
self.turn_on_laser()
|
|
|
|
|
|
self._laser_turned_on = True
|
2026-01-12 18:53:01 +08:00
|
|
|
|
# time.sleep_ms(500) # 需要一定时间让激光稳定
|
2026-01-12 18:06:04 +08:00
|
|
|
|
# 发送测距查询命令
|
|
|
|
|
|
hardware_manager.distance_serial.write(config.DISTANCE_QUERY_CMD)
|
|
|
|
|
|
# time.sleep_ms(500) # 测试结果:这里的等待没有用!
|
|
|
|
|
|
self.turn_off_laser()
|
|
|
|
|
|
self._laser_turned_on = False
|
|
|
|
|
|
|
|
|
|
|
|
# 这里的等待才是有效的!大概350ms能读到数据
|
|
|
|
|
|
# 循环读取响应,最多等待500ms
|
|
|
|
|
|
start_time = time.ticks_ms()
|
|
|
|
|
|
max_wait_ms = 500
|
|
|
|
|
|
response = None
|
|
|
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
|
|
# 检查是否超时
|
|
|
|
|
|
elapsed_ms = time.ticks_diff(start_time,time.ticks_ms())
|
|
|
|
|
|
print("elapsed_ms:", elapsed_ms)
|
|
|
|
|
|
if elapsed_ms >= max_wait_ms:
|
|
|
|
|
|
if logger:
|
|
|
|
|
|
logger.warning(f"[LASER] 读取超时 ({elapsed_ms}ms),未收到完整响应")
|
|
|
|
|
|
return 0.0
|
|
|
|
|
|
|
|
|
|
|
|
# 尝试读取数据
|
|
|
|
|
|
response = hardware_manager.distance_serial.read(config.DISTANCE_RESPONSE_LEN)
|
|
|
|
|
|
|
|
|
|
|
|
# 如果读到完整数据,立即返回
|
|
|
|
|
|
if response and len(response) == config.DISTANCE_RESPONSE_LEN:
|
|
|
|
|
|
elapsed_ms = time.ticks_diff(start_time,time.ticks_ms())
|
|
|
|
|
|
if logger:
|
|
|
|
|
|
logger.debug(f"[LASER] 收到响应 ({elapsed_ms}ms)")
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
# 如果还没超时,短暂等待后继续尝试
|
|
|
|
|
|
time.sleep_ms(10) # 每次循环等待10ms,避免CPU占用过高
|
|
|
|
|
|
|
|
|
|
|
|
# 验证响应格式
|
|
|
|
|
|
if response and len(response) == config.DISTANCE_RESPONSE_LEN:
|
|
|
|
|
|
if response[3] != 0x20:
|
|
|
|
|
|
if response[0] == 0xEE:
|
|
|
|
|
|
err_code = (response[7] << 8) | response[8]
|
|
|
|
|
|
if logger:
|
|
|
|
|
|
logger.warning(f"[LASER] 模块错误代码: {hex(err_code)}")
|
|
|
|
|
|
return 0.0
|
|
|
|
|
|
|
|
|
|
|
|
# 解析BCD码距离
|
|
|
|
|
|
bcd_bytes = response[6:10]
|
|
|
|
|
|
distance_value_m = self.parse_bcd_distance(bcd_bytes)
|
|
|
|
|
|
signal_quality = (response[10] << 8) | response[11]
|
|
|
|
|
|
|
|
|
|
|
|
if logger:
|
|
|
|
|
|
logger.debug(f"[LASER] 测距成功: {distance_value_m:.3f} m, 信号质量: {signal_quality}")
|
|
|
|
|
|
return distance_value_m
|
|
|
|
|
|
|
|
|
|
|
|
if logger:
|
|
|
|
|
|
logger.warning(f"[LASER] 无效响应: {response.hex() if response else 'None'}")
|
|
|
|
|
|
return 0.0
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
logger = logger_manager.logger
|
|
|
|
|
|
if logger:
|
|
|
|
|
|
logger.error(f"[LASER] 读取激光测距失败: {e}")
|
|
|
|
|
|
return 0.0
|
|
|
|
|
|
|
|
|
|
|
|
def quick_measure_distance(self) -> float:
|
|
|
|
|
|
"""
|
|
|
|
|
|
快速激光测距:打开激光 → 测距 → 关闭激光
|
|
|
|
|
|
激光开启时间最小化(约500-600ms),尽量不让用户觉察到
|
|
|
|
|
|
返回距离(米),失败返回0.0
|
|
|
|
|
|
"""
|
|
|
|
|
|
logger = logger_manager.logger
|
|
|
|
|
|
self._laser_turned_on = False
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 等待激光稳定(最小延迟)
|
|
|
|
|
|
# time.sleep_ms(50)
|
|
|
|
|
|
|
|
|
|
|
|
# 读取距离
|
|
|
|
|
|
distance_m = self.read_distance_from_laser_sensor()
|
|
|
|
|
|
|
|
|
|
|
|
return distance_m
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
if logger:
|
|
|
|
|
|
logger.error(f"[LASER] 快速测距异常: {e}")
|
|
|
|
|
|
return 0.0
|
|
|
|
|
|
finally:
|
|
|
|
|
|
# 确保激光关闭
|
|
|
|
|
|
if self._laser_turned_on:
|
|
|
|
|
|
try:
|
|
|
|
|
|
self.turn_off_laser()
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
if logger:
|
|
|
|
|
|
logger.error(f"[LASER] 关闭激光失败: {e}")
|
2026-01-12 11:39:27 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 创建全局单例实例
|
|
|
|
|
|
laser_manager = LaserManager()
|
|
|
|
|
|
|
|
|
|
|
|
# ==================== 向后兼容的函数接口 ====================
|
|
|
|
|
|
|
|
|
|
|
|
def load_laser_point():
|
|
|
|
|
|
"""加载激光点(向后兼容接口)"""
|
|
|
|
|
|
return laser_manager.load_laser_point()
|
|
|
|
|
|
|
|
|
|
|
|
def save_laser_point(point):
|
|
|
|
|
|
"""保存激光点(向后兼容接口)"""
|
|
|
|
|
|
return laser_manager.save_laser_point(point)
|
|
|
|
|
|
|
|
|
|
|
|
def turn_on_laser():
|
|
|
|
|
|
"""开启激光(向后兼容接口)"""
|
|
|
|
|
|
return laser_manager.turn_on_laser()
|
|
|
|
|
|
|
|
|
|
|
|
def turn_off_laser():
|
|
|
|
|
|
"""关闭激光(向后兼容接口)"""
|
|
|
|
|
|
return laser_manager.turn_off_laser()
|
|
|
|
|
|
|
|
|
|
|
|
def flash_laser(duration_ms=1000):
|
|
|
|
|
|
"""闪激光(向后兼容接口)"""
|
|
|
|
|
|
return laser_manager.flash_laser(duration_ms)
|
|
|
|
|
|
|
|
|
|
|
|
def find_red_laser(frame, threshold=150):
|
|
|
|
|
|
"""查找红色激光点(向后兼容接口)"""
|
|
|
|
|
|
return laser_manager.find_red_laser(frame, threshold)
|
|
|
|
|
|
|
|
|
|
|
|
def calibrate_laser_position():
|
|
|
|
|
|
"""校准激光位置(向后兼容接口)"""
|
|
|
|
|
|
return laser_manager.calibrate_laser_position()
|
|
|
|
|
|
|