Files
archery/main.py

1451 lines
52 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
激光射击系统主程序视觉测距版
功能目标检测激光校准4G TCP 通信OTA 升级单目测距INA226 电量监测
平台MaixPy (Sipeed MAIX)
作者ZZH
最后更新2025-11-21
"""
2025-12-28 16:22:41 +08:00
from maix import camera, display, image, app, time, key, uart, pinmap, i2c, network, err
import cv2
import numpy as np
2025-12-26 15:12:47 +08:00
import json
2025-12-28 16:22:41 +08:00
import struct
2025-12-26 15:12:47 +08:00
import re
2025-12-28 16:22:41 +08:00
from maix.peripheral import adc
import _thread
import os
import hmac
import ujson
import hashlib
import requests
import socket
2025-12-28 16:22:41 +08:00
import re
import binascii
2025-12-26 15:12:47 +08:00
2025-12-28 16:22:41 +08:00
try:
import hashlib
except:
hashlib = None
2025-12-26 11:47:33 +08:00
# import config
2025-12-28 16:22:41 +08:00
# ==================== Locks ====================
class _Mutex:
"""
基于 _thread.allocate_lock() 的互斥锁封装
- 支持 with
- 支持 try_acquire若固件不支持非阻塞 acquire 参数则退化为阻塞 acquire
"""
def __init__(self):
self._lk = _thread.allocate_lock()
def acquire(self, blocking=True):
try:
return self._lk.acquire(blocking)
except TypeError:
self._lk.acquire()
return True
def try_acquire(self):
return self.acquire(False)
def release(self):
self._lk.release()
def __enter__(self):
self.acquire(True)
return self
def __exit__(self, exc_type, exc, tb):
self.release()
return False
# ==================== 全局配置 ====================
# OTA 升级地址与本地路径
2025-12-28 16:22:41 +08:00
# url = "https://static.shelingxingqiu.com/shoot/202511031031/main.py"
local_filename = "/maixapp/apps/t11/main_tmp.py"
app_version = '1.0.0'
# OTA 下发参数(由后端指令写入)
OTA_URL = None
OTA_MODE = None # "4g" / "wifi" / None
def is_wifi_connected():
"""尽量判断当前是否有 Wi-Fi有则走 Wi-Fi OTA否则走 4G OTA"""
# 优先用 MaixPy network如果可用
try:
wlan = network.WLAN(network.TYPE_WIFI)
if wlan.isconnected():
return True
except:
pass
# 兜底:看系统 wlan0 有没有 IP你系统可能没有 wlan0则返回 False
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
return bool(ip)
except:
return False
# 设备认证信息(运行时动态加载)
DEVICE_ID = None
PASSWORD = None
# 服务器连接参数
SERVER_IP = "www.shelingxingqiu.com"
SERVER_PORT = 50005
2025-12-28 16:22:41 +08:00
HEARTBEAT_INTERVAL = 60 # 心跳间隔(秒)
# 激光校准配置
CONFIG_FILE = "/root/laser_config.json"
DEFAULT_POINT = (640, 480) # 默认激光中心点(图像中心)
laser_point = DEFAULT_POINT
# HTTP 上报接口
URL = "http://ws.shelingxingqiu.com"
API_PATH = "/home/shoot/device_fire/arrow/fire"
# UART 设备初始化
2025-12-28 16:22:41 +08:00
uart4g = uart.UART("/dev/ttyS2", 115200) # 4G 模块通信
distance_serial = uart.UART("/dev/ttyS1", 9600) # 激光测距模块
# 引脚功能映射
pinmap.set_pin_function("A18", "UART1_RX")
pinmap.set_pin_function("A19", "UART1_TX")
pinmap.set_pin_function("A29", "UART2_RX")
pinmap.set_pin_function("A28", "UART2_TX")
pinmap.set_pin_function("P18", "I2C1_SCL")
pinmap.set_pin_function("P21", "I2C1_SDA")
# pinmap.set_pin_function("A15", "I2C5_SCL")
# pinmap.set_pin_function("A27", "I2C5_SDA")#ota升级要修改的
# ADC 触发阈值(用于检测扳机/激光触发)
2025-12-26 11:47:33 +08:00
ADC_TRIGGER_THRESHOLD = 3000
ADC_LASER_THRESHOLD = 3000
# 显示参数:激光十字线样式
color = image.Color(255, 100, 0)
thickness = 1
length = 2
# 全局状态变量
2025-12-28 16:22:41 +08:00
laser_calibration_active = False # 是否正在后台校准激光
laser_calibration_result = None # 校准结果坐标 (x, y)
laser_calibration_lock = _Mutex() # 互斥锁,防止多线程冲突
# 硬件对象初始化
laser_x, laser_y = laser_point
adc_obj = adc.ADC(0, adc.RES_BIT_12)
2025-12-28 16:22:41 +08:00
bus = i2c.I2C(1, i2c.Mode.MASTER) # 使用 I2C1 总线
# bus = i2c.I2C(5, i2c.Mode.MASTER) #ota升级的
# INA226 电流/电压监测芯片寄存器地址
INA226_ADDR = 0x40
REG_CONFIGURATION = 0x00
REG_BUS_VOLTAGE = 0x02
REG_CALIBRATION = 0x05
CALIBRATION_VALUE = 0x1400
# 激光控制指令(自定义协议)
MODULE_ADDR = 0x00
LASER_ON_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x01, 0xC1])
LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0xC0])
# 相机标定参数(用于距离估算)
2025-11-24 16:52:53 +08:00
# FOCAL_LENGTH_PIX = 3800.0 # 焦距(像素)
2025-12-28 16:22:41 +08:00
FOCAL_LENGTH_PIX = 1900.0 # 焦距(像素)
REAL_RADIUS_CM = 15 # 靶心实际半径(厘米)
# TCP 连接状态
tcp_connected = False
2025-12-28 16:22:41 +08:00
high_send_queue = [] # 高优先级发送队列:射箭事件等
normal_send_queue = [] # 普通发送队列:电量/校准结果/状态等
queue_lock = _Mutex() # 互斥锁,保护队列
uart4g_lock = _Mutex() # 互斥锁,保护 4G 串口 AT 发送流程(防并发)
update_thread_started = False # 防止 OTA 更新线程重复启动
2025-12-28 16:22:41 +08:00
ota_in_progress = False # OTA(4G HTTP URC) 期间暂停 tcp_main 读取 uart4g避免吞掉 +MHTTPURC
# ==================== 工具函数 ====================
def download_file(url, filename):
"""从指定 URL 下载文件并保存为 UTF-8 编码文本"""
try:
print(f"正在从 {url} 下载文件...")
response = requests.get(url)
response.raise_for_status()
response.encoding = 'utf-8'
with open(filename, 'w', encoding='utf-8') as file:
file.write(response.text)
return f"下载成功!文件已保存为: {filename}"
except requests.exceptions.RequestException as e:
return f"下载失败!网络请求错误: {e}"
except OSError as e:
return f"下载失败!文件写入错误: {e}"
except Exception as e:
return f"下载失败!发生未知错误: {e}"
def is_server_reachable(host, port=80, timeout=5):
"""检查目标主机端口是否可达(用于 OTA 前网络检测)"""
try:
addr_info = socket.getaddrinfo(host, port)[0]
s = socket.socket(addr_info[0], addr_info[1], addr_info[2])
s.settimeout(timeout)
s.connect(addr_info[-1])
s.close()
return True
except Exception as e:
print(f"[NET] 无法连接 {host}:{port} - {e}")
return False
2025-12-28 16:22:41 +08:00
def apply_ota_and_reboot(ota_url=None):
"""
OTA 文件下载成功后备份原 main.py -> 替换 main_tmp.py -> 重启设备
"""
import shutil
main_py = "/maixapp/apps/t11/main.py"
main_tmp = "/maixapp/apps/t11/main_tmp.py"
main_bak = "/maixapp/apps/t11/main.py.bak"
ota_pending = "/maixapp/apps/t11/ota_pending.json"
try:
# 1. 检查下载的文件是否存在
if not os.path.exists(main_tmp):
print(f"[OTA] 错误:{main_tmp} 不存在")
return False
# 2. 备份原 main.py如果存在
if os.path.exists(main_py):
try:
shutil.copy2(main_py, main_bak)
print(f"[OTA] 已备份 {main_py} -> {main_bak}")
except Exception as e:
print(f"[OTA] 备份失败: {e}")
# 备份失败也继续(可能没有原文件)
# 3. 替换main_tmp.py -> main.py
try:
shutil.copy2(main_tmp, main_py)
print(f"[OTA] 已替换 {main_tmp} -> {main_py}")
# 确保写入磁盘
try:
os.sync() # 如果系统支持
except:
pass
time.sleep_ms(500) # 额外等待确保写入完成
except Exception as e:
print(f"[OTA] 替换失败: {e}")
return False
# 3.5 写入 pending用于重启后确认成功并上报
try:
pending_obj = {
"ts": int(time.time()) if hasattr(time, "time") else 0,
"url": ota_url or "",
"tmp": main_tmp,
"main": main_py,
"bak": main_bak,
}
with open(ota_pending, "w", encoding="utf-8") as f:
json.dump(pending_obj, f)
try:
os.sync()
except:
pass
except Exception as e:
print(f"[OTA] 写入 ota_pending 失败: {e}")
# 4. 通知服务器(可选,但重启前发一次)
safe_enqueue({"result": "ota_applied_rebooting"}, 2)
time.sleep_ms(1000) # 给一点时间让消息发出
# 5. 重启设备
print("[OTA] 准备重启设备...")
os.system("reboot") # MaixPy 通常是这个命令
return True
except Exception as e:
print(f"[OTA] apply_ota_and_reboot 异常: {e}")
return False
def direct_ota_download(ota_url):
"""
直接执行 OTA 下载假设已有网络
2025-12-28 16:22:41 +08:00
用于 cmd=7 / wifi 模式
"""
global update_thread_started
try:
2025-12-28 16:22:41 +08:00
if not ota_url:
safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2)
return
from urllib.parse import urlparse
2025-12-28 16:22:41 +08:00
parsed_url = urlparse(ota_url)
host = parsed_url.hostname
port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80)
if not is_server_reachable(host, port, timeout=8):
safe_enqueue({"result": "ota_failed", "reason": f"无法连接 {host}:{port}"}, 2)
return
2025-12-28 16:22:41 +08:00
print(f"[OTA] 开始下载: {ota_url}")
result_msg = download_file(ota_url, local_filename)
print(f"[OTA] {result_msg}")
2025-12-28 16:22:41 +08:00
# 检查是否下载成功(包含"成功"或"下载成功"关键字)
if "成功" in result_msg or "下载成功" in result_msg:
# 下载成功:备份+替换+重启
if apply_ota_and_reboot(ota_url):
return # 会重启,不会执行到 finally
else:
safe_enqueue({"result": result_msg}, 2)
except Exception as e:
error_msg = f"OTA 异常: {str(e)}"
print(error_msg)
safe_enqueue({"result": "ota_failed", "reason": error_msg}, 2)
finally:
2025-12-28 16:22:41 +08:00
update_thread_started = False
2025-12-26 15:12:47 +08:00
2025-12-28 16:22:41 +08:00
def handle_wifi_and_update(ssid, password, ota_url):
"""在子线程中执行 Wi-Fi 连接 + OTA 更新流程"""
2025-12-28 16:22:41 +08:00
global update_thread_started
try:
ip, error = connect_wifi(ssid, password)
if error:
safe_enqueue({"result": "wifi_failed", "error": error}, 2)
return
safe_enqueue({"result": "wifi_connected", "ip": ip}, 2)
2025-12-28 16:22:41 +08:00
# 下载
if not ota_url:
safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2)
return
from urllib.parse import urlparse
2025-12-28 16:22:41 +08:00
parsed_url = urlparse(ota_url)
host = parsed_url.hostname
port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80)
if not is_server_reachable(host, port, timeout=8):
err_msg = f"网络不通:无法连接 {host}:{port}"
safe_enqueue({"result": err_msg}, 2)
return
print(f"[NET] 已确认可访问 {host}:{port},开始下载...")
2025-12-28 16:22:41 +08:00
result = download_file(ota_url, local_filename)
print(result)
2025-12-28 16:22:41 +08:00
# 检查是否下载成功(包含"成功"或"下载成功"关键字)
if "成功" in result or "下载成功" in result:
# 下载成功:备份+替换+重启
if apply_ota_and_reboot(ota_url):
return # 会重启,不会执行到 finally
else:
safe_enqueue({"result": result}, 2)
finally:
update_thread_started = False
print("[UPDATE] 更新线程执行完毕,即将退出。")
def connect_wifi(ssid, password):
"""
连接 Wi-Fi 并将凭证持久化保存到 /boot/ 目录
以便设备重启后自动连接
"""
conf_path = "/etc/wpa_supplicant.conf"
ssid_file = "/boot/wifi.ssid"
pass_file = "/boot/wifi.pass"
try:
# 生成 wpa_supplicant 配置
net_conf = os.popen(f'wpa_passphrase "{ssid}" "{password}"').read()
if "network={" not in net_conf:
return None, "Failed to generate wpa config"
# 写入运行时配置
with open(conf_path, "w") as f:
f.write("ctrl_interface=/var/run/wpa_supplicant\n")
f.write("update_config=1\n\n")
f.write(net_conf)
# 持久化保存 SSID/PASS关键
with open(ssid_file, "w") as f:
f.write(ssid.strip())
with open(pass_file, "w") as f:
f.write(password.strip())
# 重启 Wi-Fi 服务
os.system("/etc/init.d/S30wifi restart")
# 等待获取 IP
for _ in range(20):
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
if ip:
return ip, None
time.sleep(1)
return None, "Timeout: No IP obtained"
except Exception as e:
return None, f"Exception: {str(e)}"
def read_device_id():
"""从 /device_key 文件读取设备唯一 ID失败则使用默认值"""
try:
with open("/device_key", "r") as f:
device_id = f.read().strip()
if device_id:
print(f"[INFO] 从 /device_key 读取到 DEVICE_ID: {device_id}")
return device_id
except Exception as e:
print(f"[ERROR] 无法读取 /device_key: {e}")
return "DEFAULT_DEVICE_ID"
2025-12-26 11:47:33 +08:00
def safe_enqueue(data_dict, msg_type=2, high=False):
"""线程安全地将消息加入 TCP 发送队列(支持优先级)"""
global queue_lock, high_send_queue, normal_send_queue
item = (msg_type, data_dict)
2025-12-28 16:22:41 +08:00
with queue_lock:
if high:
high_send_queue.append(item)
else:
normal_send_queue.append(item)
2025-12-26 11:47:33 +08:00
def _uart4g_lock_acquire():
2025-12-28 16:22:41 +08:00
# 兼容旧调用:建议改为 `with uart4g_lock:`
uart4g_lock.acquire(True)
2025-12-26 11:47:33 +08:00
def _uart4g_lock_release():
2025-12-28 16:22:41 +08:00
# 兼容旧调用:建议改为 `with uart4g_lock:`
uart4g_lock.release()
2025-12-26 11:47:33 +08:00
def at(cmd, wait="OK", timeout=2000):
"""向 4G 模块发送 AT 指令并等待响应"""
if cmd:
uart4g.write((cmd + "\r\n").encode())
t0 = time.ticks_ms()
buf = b""
while time.ticks_ms() - t0 < timeout:
data = uart4g.read()
if data:
buf += data
if wait.encode() in buf:
return buf.decode(errors="ignore")
return buf.decode(errors="ignore")
def make_packet(msg_type: int, body_dict: dict) -> bytes:
"""打包 TCP 数据包:头部(长度+类型+校验)+ JSON 正文"""
body = json.dumps(body_dict).encode()
body_len = len(body)
checksum = body_len + msg_type
header = struct.pack(">III", body_len, msg_type, checksum)
return header + body
def parse_packet(data: bytes):
"""解析 TCP 数据包,返回 (类型, 正文字典)"""
if len(data) < 12:
return None, None
body_len, msg_type, checksum = struct.unpack(">III", data[:12])
body = data[12:12 + body_len]
try:
return msg_type, json.loads(body.decode())
except:
return msg_type, {"raw": body.decode(errors="ignore")}
def tcp_send_raw(data: bytes, max_retries=2) -> bool:
"""通过 4G 模块的 AT 指令发送原始 TCP 数据包"""
global tcp_connected
if not tcp_connected:
return False
2025-12-28 16:22:41 +08:00
with uart4g_lock:
2025-12-26 11:47:33 +08:00
for attempt in range(max_retries):
cmd = f'AT+MIPSEND=0,{len(data)}'
if ">" not in at(cmd, ">", 1500):
time.sleep_ms(100)
continue
2025-12-26 11:47:33 +08:00
time.sleep_ms(10)
full = data + b"\x1A" # AT 指令结束符
try:
sent = uart4g.write(full)
if sent != len(full):
continue
except:
continue
2025-12-26 11:47:33 +08:00
if "OK" in at("", "OK", 1000):
return True
time.sleep_ms(100)
2025-12-26 11:47:33 +08:00
return False
def generate_token(device_id):
"""生成用于 HTTP 接口鉴权的 TokenHMAC-SHA256"""
SALT = "shootMessageFire"
SALT2 = "shoot"
return "Arrow_" + hmac.new((SALT + device_id).encode(), SALT2.encode(), hashlib.sha256).hexdigest()
def send_http_cmd(cmd_str, timeout_ms=3000):
"""发送 HTTP 相关 AT 指令(调试用)"""
print("[HTTP AT] =>", cmd_str)
return at(cmd_str, "OK", timeout_ms)
def read_http_response(timeout_ms=5000):
"""读取并打印 HTTP 响应(用于调试)"""
start = time.ticks_ms()
while time.ticks_ms() - start < timeout_ms:
data = uart4g.read(128)
if data:
try:
print("📡 HTTP 响应:", data.decode("utf-8", "ignore").strip())
except:
print("📡 响应(raw):", data)
time.sleep_ms(100)
def upload_shoot_event(json_data):
"""通过 4G 模块上报射击事件到 HTTP 接口(备用通道)"""
token = generate_token(DEVICE_ID)
if not send_http_cmd(f'AT+MHTTPCREATE="{URL}"'):
return False
instance_id = 0
send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"Content-Type: application/json"')
send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"Authorization: {token}"')
send_http_cmd(f'AT+MHTTPCFG="header",{instance_id},"DeviceId: {DEVICE_ID}"')
json_str = ujson.dumps(json_data)
if not send_http_cmd(f'AT+MHTTPCONTENT={instance_id},0,0,"{json_str}"'):
return False
if send_http_cmd(f'AT+MHTTPREQUEST={instance_id},2,0,"{API_PATH}"'):
read_http_response()
return True
return False
def load_laser_point():
"""从配置文件加载激光中心点,失败则使用默认值"""
global laser_point
try:
if "laser_config.json" in os.listdir("/root"):
with open(CONFIG_FILE, "r") as f:
data = json.load(f)
if isinstance(data, list) and len(data) == 2:
laser_point = (int(data[0]), int(data[1]))
print(f"[INFO] 加载激光点: {laser_point}")
else:
raise ValueError
else:
laser_point = DEFAULT_POINT
except:
laser_point = DEFAULT_POINT
def save_laser_point(point):
"""保存激光中心点到配置文件"""
global laser_point
try:
with open(CONFIG_FILE, "w") as f:
json.dump([point[0], point[1]], f)
laser_point = point
except:
pass
def turn_on_laser():
"""发送指令开启激光,并读取回包(部分模块支持)"""
distance_serial.write(LASER_ON_CMD)
time.sleep_ms(10)
resp = distance_serial.read(20)
if resp:
if resp == LASER_ON_CMD:
print("✅ 激光指令已确认")
else:
print("🔇 无回包(正常或模块不支持)")
return resp
def find_red_laser(frame, threshold=150):
"""在图像中查找最亮的红色激光点(基于 RGB 阈值)"""
w, h = frame.width(), frame.height()
img_bytes = frame.to_bytes()
max_sum = 0
best_pos = None
for y in range(0, h, 2):
for x in range(0, w, 2):
idx = (y * w + x) * 3
2025-12-28 16:22:41 +08:00
r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2]
if r > threshold and r > g * 2 and r > b * 2:
rgb_sum = r + g + b
if rgb_sum > max_sum:
max_sum = rgb_sum
best_pos = (x, y)
return best_pos
def calibrate_laser_position():
"""执行一次激光校准:拍照 → 找红点 → 保存坐标"""
global laser_x, laser_y
time.sleep_ms(80)
cam = camera.Camera(640, 480)
frame = cam.read()
pos = find_red_laser(frame)
if pos:
laser_x, laser_y = pos
save_laser_point(pos)
return pos
return None
# ==================== 电源管理INA226 ====================
def write_register(reg, value):
data = [(value >> 8) & 0xFF, value & 0xFF]
bus.writeto_mem(INA226_ADDR, reg, bytes(data))
def read_register(reg):
data = bus.readfrom_mem(INA226_ADDR, reg, 2)
return (data[0] << 8) | data[1]
def init_ina226():
"""初始化 INA226 芯片:配置模式 + 校准值"""
write_register(REG_CONFIGURATION, 0x4527)
write_register(REG_CALIBRATION, CALIBRATION_VALUE)
def get_bus_voltage():
"""读取总线电压单位V"""
raw = read_register(REG_BUS_VOLTAGE)
return raw * 1.25 / 1000
def voltage_to_percent(voltage):
"""根据电压估算电池百分比(查表插值)"""
points = [
(4.20, 100), (4.10, 95), (4.05, 85), (4.00, 75), (3.95, 65),
(3.90, 55), (3.85, 45), (3.80, 35), (3.75, 25), (3.70, 15),
(3.65, 5), (3.60, 0)
]
if voltage >= points[0][0]:
return 100
if voltage <= points[-1][0]:
return 0
for i in range(len(points) - 1):
v1, p1 = points[i]
v2, p2 = points[i + 1]
if voltage >= v2:
ratio = (voltage - v1) / (v2 - v1)
percent = p1 + (p2 - p1) * ratio
return max(0, min(100, int(round(percent))))
return 0
# ==================== 靶心检测与距离计算 ====================
def detect_circle(frame):
"""检测图像中的靶心(优先清晰轮廓,其次黄色区域)"""
global REAL_RADIUS_CM
img_cv = image.image2cv(frame, False, False)
gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY)
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
edged = cv2.Canny(blurred, 50, 150)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
ceroded = cv2.erode(cv2.dilate(edged, kernel), kernel)
contours, _ = cv2.findContours(ceroded, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
best_center = best_radius = best_radius1 = method = None
# 方法1基于轮廓拟合椭圆清晰靶心
for cnt in contours:
area = cv2.contourArea(cnt)
perimeter = cv2.arcLength(cnt, True)
if perimeter < 100 or area < 100:
continue
circularity = 4 * np.pi * area / (perimeter ** 2)
if circularity > 0.75 and len(cnt) >= 5:
center, axes, angle = cv2.fitEllipse(cnt)
radius = (axes[0] + axes[1]) / 4
best_center = (int(center[0]), int(center[1]))
best_radius = int(radius)
best_radius1 = best_radius
REAL_RADIUS_CM = 15
method = "清晰"
break
# 方法2基于 HSV 黄色掩码(模糊靶心)
if not best_center:
hsv = cv2.cvtColor(img_cv, cv2.COLOR_BGR2HSV)
h, s, v = cv2.split(hsv)
s = np.clip(s * 2, 0, 255).astype(np.uint8)
hsv = cv2.merge((h, s, v))
lower_yellow = np.array([7, 80, 0])
upper_yellow = np.array([32, 255, 182])
mask = cv2.inRange(hsv, lower_yellow, upper_yellow)
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
mask = cv2.morphologyEx(mask, cv2.MORPH_DILATE, kernel)
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if contours:
largest = max(contours, key=cv2.contourArea)
if cv2.contourArea(largest) > 50:
(x, y), radius = cv2.minEnclosingCircle(largest)
best_center = (int(x), int(y))
best_radius = int(radius)
best_radius1 = best_radius
REAL_RADIUS_CM = 15
method = "模糊"
result_img = image.cv2image(img_cv, False, False)
return result_img, best_center, best_radius, method, best_radius1
def estimate_distance(pixel_radius):
"""根据像素半径估算实际距离(单位:米)"""
if not pixel_radius:
return 0.0
return (REAL_RADIUS_CM * FOCAL_LENGTH_PIX) / pixel_radius / 100.0
def compute_laser_position(circle_center, laser_point, radius, method):
"""计算激光相对于靶心的偏移量(单位:厘米)"""
if not all([circle_center, radius, method]):
return None, None
cx, cy = circle_center
lx, ly = laser_point
# 根据检测方法动态调整靶心物理半径(简化模型)
circle_r = (radius / 4.0) * 20.0 if method == "模糊" else (68 / 16.0) * 20.0
dx = lx - cx
dy = ly - cy
return dx / (circle_r / 100.0), -dy / (circle_r / 100.0)
2025-12-26 15:12:47 +08:00
# ==================== TCP 通信线程 ====================
def connect_server():
"""通过 4G 模块建立 TCP 连接"""
global tcp_connected
if tcp_connected:
return True
print("连接到服务器...")
2025-12-28 16:22:41 +08:00
with uart4g_lock:
2025-12-26 11:47:33 +08:00
at("AT+MIPCLOSE=0", "OK", 1000)
res = at(f'AT+MIPOPEN=0,"TCP","{SERVER_IP}",{SERVER_PORT}', "+MIPOPEN", 8000)
if "+MIPOPEN: 0,0" in res:
tcp_connected = True
return True
return False
def tcp_main():
"""TCP 主通信循环:登录、心跳、处理指令、发送数据"""
2025-12-26 11:47:33 +08:00
global tcp_connected, high_send_queue, normal_send_queue, queue_lock, laser_calibration_active, laser_calibration_result, laser_calibration_lock, update_thread_started
send_hartbeat_fail_count = 0
while not app.need_exit():
if not connect_server():
time.sleep_ms(5000)
continue
# 发送登录包
login_data = {"deviceId": DEVICE_ID, "password": PASSWORD}
if not tcp_send_raw(make_packet(1, login_data)):
tcp_connected = False
time.sleep_ms(2000)
continue
print("➡️ 登录包已发送,等待确认...")
logged_in = False
last_heartbeat_ack_time = time.ticks_ms()
last_heartbeat_send_time = time.ticks_ms()
rx_buf = b""
while True:
# 接收数据
2025-12-28 16:22:41 +08:00
# OTA(4G HTTP) 会从 uart4g 吐出大量 +MHTTPURC 数据;
# tcp_main 若在此 read会把 URC 吃掉,导致 OTA empty_body/incomplete_body。
# 同时 uart4g_lock 置 True 时也不要抢读。
if ota_in_progress:
time.sleep_ms(20)
continue
data = uart4g.read()
if data:
rx_buf += data
# 解析 +MIPURC 消息
while b'+MIPURC: "rtcp"' in rx_buf:
try:
match = re.search(b'\+MIPURC: "rtcp",0,(\d+),(.+)', rx_buf, re.DOTALL)
if match:
payload_len = int(match.group(1))
payload = match.group(2)[:payload_len]
msg_type, body = parse_packet(payload)
# 处理登录响应
if not logged_in and msg_type == 1:
if body and body.get("cmd") == 1 and body.get("data") == "登录成功":
logged_in = True
last_heartbeat_ack_time = time.ticks_ms()
print("✅ 登录成功")
2025-12-28 16:22:41 +08:00
# 若存在 ota_pending.json说明上次 OTA 已应用并重启;
# 这里以“能成功登录服务器”为 OTA 成功判据:上报 ota_ok 并删除 pending确保只上报一次。
try:
pending_path = "/maixapp/apps/t11/ota_pending.json"
if os.path.exists(pending_path):
try:
with open(pending_path, "r", encoding="utf-8") as f:
pending_obj = json.load(f)
except:
pending_obj = {}
safe_enqueue({"result": "ota_ok", "url": pending_obj.get("url", "")}, 2)
try:
os.remove(pending_path)
except:
pass
except Exception as e:
print(f"[OTA] ota_ok 上报失败: {e}")
else:
break
# 处理心跳 ACK
elif logged_in and msg_type == 4:
last_heartbeat_ack_time = time.ticks_ms()
2025-12-26 11:47:33 +08:00
print("✅ 收到心跳确认")
# 处理业务指令
elif logged_in and isinstance(body, dict):
if isinstance(body.get("data"), dict) and "cmd" in body["data"]:
inner_cmd = body["data"]["cmd"]
if inner_cmd == 2: # 开启激光并校准
turn_on_laser()
time.sleep_ms(100)
laser_calibration_active = True
safe_enqueue({"result": "calibrating"}, 2)
elif inner_cmd == 3: # 关闭激光
distance_serial.write(LASER_OFF_CMD)
laser_calibration_active = False
safe_enqueue({"result": "laser_off"}, 2)
elif inner_cmd == 4: # 上报电量
voltage = get_bus_voltage()
battery_percent = voltage_to_percent(voltage)
battery_data = {"battery": battery_percent, "voltage": round(voltage, 3)}
safe_enqueue(battery_data, 2)
print(f"🔋 电量上报: {battery_percent}%")
2025-12-28 16:22:41 +08:00
elif inner_cmd == 5: # OTA 升级(含 Wi-Fi 配置及4g
inner_data = body["data"].get("data", {})
2025-12-28 16:22:41 +08:00
ssid = inner_data.get("ssid")
password = inner_data.get("password")
2025-12-28 16:22:41 +08:00
ota_url = inner_data.get("url")
mode = (inner_data.get("mode") or "").strip().lower() # "4g"/"wifi"/""
if not ota_url:
print("ota missing_url")
safe_enqueue({"result": "missing_url"}, 2)
rx_buf = rx_buf[match.end():] # 关键:先消费掉这个包
continue
# 自动判断mode 非法/为空时,优先 Wi-Fi如果已连否则 4G
if mode not in ("4g", "wifi"):
print("ota missing mode")
mode = "wifi" if is_wifi_connected() else "4g"
if update_thread_started:
safe_enqueue({"result": "update_already_started"}, 2)
rx_buf = rx_buf[match.end():] # 关键:先消费掉这个包
continue
update_thread_started = True
if mode == "4g":
_thread.start_new_thread(direct_ota_download_via_4g, (ota_url,))
else:
2025-12-28 16:22:41 +08:00
# wifi 模式:需要 ssid/password
if not ssid or not password:
update_thread_started = False
safe_enqueue({"result": "missing_ssid_or_password"}, 2)
else:
2025-12-28 16:22:41 +08:00
_thread.start_new_thread(handle_wifi_and_update, (ssid, password, ota_url))
elif inner_cmd == 6:
try:
2025-12-28 16:22:41 +08:00
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
ip = ip if ip else "no_ip"
except:
ip = "error_getting_ip"
safe_enqueue({"result": "current_ip", "ip": ip}, 2)
elif inner_cmd == 7:
# global update_thread_started
if update_thread_started:
safe_enqueue({"result": "update_already_started"}, 2)
2025-12-28 16:22:41 +08:00
rx_buf = rx_buf[match.end():] # 关键:先消费掉这个包
continue
# 实时检查是否有 IP
try:
2025-12-28 16:22:41 +08:00
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
except:
ip = None
if not ip:
2025-12-28 16:22:41 +08:00
safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, MSG_TYPE_STATUS)
else:
# 启动纯下载线程
update_thread_started = True
_thread.start_new_thread(direct_ota_download, ())
rx_buf = rx_buf[match.end():]
else:
break
except:
rx_buf = b""
break
# 发送队列中的业务数据
2025-12-28 16:22:41 +08:00
if logged_in and (high_send_queue or normal_send_queue):
2025-12-26 11:47:33 +08:00
# 只在锁内取出一个待发包,发送放到锁外,避免长时间占用队列锁
2025-12-28 16:22:41 +08:00
msg_type = None
data_dict = None
if queue_lock.try_acquire():
try:
if high_send_queue:
msg_type, data_dict = high_send_queue.pop(0)
elif normal_send_queue:
msg_type, data_dict = normal_send_queue.pop(0)
finally:
queue_lock.release()
if msg_type is not None and data_dict is not None:
pkt = make_packet(msg_type, data_dict)
if not tcp_send_raw(pkt):
tcp_connected = False
break
2025-12-26 11:47:33 +08:00
# 发送激光校准结果
2025-12-28 16:22:41 +08:00
if logged_in and laser_calibration_result is not None:
x = y = None
with laser_calibration_lock:
if laser_calibration_result is not None:
x, y = laser_calibration_result
laser_calibration_result = None
if x is not None and y is not None:
safe_enqueue({"result": "ok", "x": x, "y": y}, 2)
# 定期发送心跳
current_time = time.ticks_ms()
if logged_in and current_time - last_heartbeat_send_time > HEARTBEAT_INTERVAL * 1000:
if not tcp_send_raw(make_packet(4, {"t": int(time.time())})):
print("💔 心跳发送失败")
2025-12-26 11:47:33 +08:00
send_hartbeat_fail_count += 1
if send_hartbeat_fail_count >= 3:
send_hartbeat_fail_count = 0
print("连续3次发送心跳失败重连")
break
else:
continue
else:
send_hartbeat_fail_count = 0
last_heartbeat_send_time = current_time
print("💓 心跳已发送")
# 心跳超时重连
2025-12-28 16:22:41 +08:00
if logged_in and current_time - last_heartbeat_ack_time > 1000*60*10: # 十分钟
2025-12-26 11:47:33 +08:00
print("⏰ 十分钟无心跳ACK重连")
break
time.sleep_ms(50)
tcp_connected = False
print("🔌 连接异常2秒后重连...")
time.sleep_ms(2000)
def laser_calibration_worker():
"""后台线程:持续检测是否需要执行激光校准"""
global laser_calibration_active, laser_calibration_result, laser_calibration_lock
while True:
if laser_calibration_active:
result = calibrate_laser_position()
if result and len(result) == 2:
2025-12-28 16:22:41 +08:00
with laser_calibration_lock:
laser_calibration_result = result
laser_calibration_active = False
print(f"✅ 后台校准成功: {result}")
else:
time.sleep_ms(80)
else:
time.sleep_ms(50)
2025-12-28 16:22:41 +08:00
def download_file_via_4g(url, filename,
total_timeout_ms=30000,
retries=3,
debug=False):
"""
ML307R HTTP 下载URC content 分片模式
- 重试empty/incomplete/AT错误都会重试
- 超时total_timeout_ms
- 校验Content-Length 必须填满如有 Content-Md5 hashlib 可用则校验 MD5
- 日志默认干净debug=True 才打印 URC 进度
"""
from urllib.parse import urlparse
parsed = urlparse(url)
host = parsed.hostname
path = parsed.path or "/"
base_url = f"http://{host}" # 你已验证 HTTP 可 200如需 https 需另配 SSL
def _log(*args):
if debug:
print(*args)
def _get_ip():
r = at("AT+CGPADDR=1", "OK", 3000)
m = re.search(r'\+CGPADDR:\s*1,"([^"]+)"', r)
return m.group(1) if m else ""
def _drain_uart(max_ms=300):
"""清空串口残留,防止旧 URC 干扰"""
t0 = time.ticks_ms()
while time.ticks_ms() - t0 < max_ms:
d = uart4g.read(4096)
if not d:
time.sleep_ms(10)
continue
def _read_more(buf: bytes, ms=50) -> bytes:
t0 = time.ticks_ms()
while time.ticks_ms() - t0 < ms:
d = uart4g.read(4096)
if d:
buf += d
time.sleep_ms(1)
return buf
def _parse_httpid(raw: bytes):
m = re.search(rb"\+MHTTPCREATE:\s*(\d+)", raw)
return int(m.group(1)) if m else None
def _try_parse_header(buf: bytes):
"""
+MHTTPURC: "header",<id>,<code>,<hdr_len>,<hdr_text...>
返回 (urc_id, status_code, header_text, rest_buf) None
"""
tag = b'+MHTTPURC: "header",'
i = buf.find(tag)
if i < 0:
return None
if i > 0:
buf = buf[i:]
i = 0
# header 在 hdr_text 里包含 \r\n我们用 hdr_len 精确截取
j = i + len(tag)
comma_count = 0
k = j
while k < len(buf) and comma_count < 3: # 3 个逗号到 hdr_len 后的逗号
if buf[k:k+1] == b",":
comma_count += 1
k += 1
if comma_count < 3:
return None
header_prefix = buf[i:k]
m = re.search(rb'\+MHTTPURC: "header",\s*(\d+),\s*(\d+),\s*(\d+),', header_prefix)
if not m:
return ("drop", buf[1:])
urc_id = int(m.group(1))
code = int(m.group(2))
hdr_len = int(m.group(3))
text_start = k
text_end = text_start + hdr_len
if len(buf) < text_end:
return None
hdr_text = buf[text_start:text_end].decode("utf-8", "ignore")
rest = buf[text_end:]
return ("ok", urc_id, code, hdr_text, rest)
def _try_parse_one_content(buf: bytes):
"""
+MHTTPURC: "content",<id>,<total_len>,<sum_len>,<cur_len>,<payload...>
返回 ("ok", urc_id, total_len, sum_len, cur_len, payload_bytes, rest_buf) None
"""
tag = b'+MHTTPURC: "content",'
i = buf.find(tag)
if i < 0:
return None
if i > 0:
buf = buf[i:]
i = 0
j = i + len(tag)
comma_count = 0
k = j
while k < len(buf) and comma_count < 4: # payload 前只有 4 个逗号
if buf[k:k+1] == b",":
comma_count += 1
k += 1
if comma_count < 4:
return None
prefix = buf[i:k]
m = re.search(rb'\+MHTTPURC: "content",\s*(\d+),\s*(\d+),\s*(\d+),\s*(\d+),', prefix)
if not m:
return ("drop", buf[1:])
urc_id = int(m.group(1))
total_len = int(m.group(2))
sum_len = int(m.group(3))
cur_len = int(m.group(4))
payload_start = k
payload_end = payload_start + cur_len
if len(buf) < payload_end:
return None
payload = buf[payload_start:payload_end]
rest = buf[payload_end:]
return ("ok", urc_id, total_len, sum_len, cur_len, payload, rest)
def _extract_hdr_fields(hdr_text: str):
# Content-Length
mlen = re.search(r"Content-Length:\s*(\d+)", hdr_text, re.IGNORECASE)
clen = int(mlen.group(1)) if mlen else None
# Content-Md5 (base64)
mmd5 = re.search(r"Content-Md5:\s*([A-Za-z0-9+/=]+)", hdr_text, re.IGNORECASE)
md5_b64 = mmd5.group(1).strip() if mmd5 else None
return clen, md5_b64
def _md5_base64(data: bytes) -> str:
if hashlib is None:
return ""
digest = hashlib.md5(data).digest()
# base64: 24 chars with ==
return binascii.b2a_base64(digest).decode().strip()
def _one_attempt():
# 0) PDP确保有 IP避免把 OK 当成功)
ip = _get_ip()
if not ip or ip == "0.0.0.0":
at("AT+MIPCALL=1,1", "OK", 15000)
for _ in range(10):
ip = _get_ip()
if ip and ip != "0.0.0.0":
break
time.sleep(1)
if not ip or ip == "0.0.0.0":
return False, "PDP not ready (no_ip)"
# 1) 清理旧实例 + 清空串口残留
for i in range(0, 6):
at(f"AT+MHTTPDEL={i}", "OK", 1500)
_drain_uart(300)
# 2) 创建实例httpid 可能延迟吐出来)
uart4g.write((f'AT+MHTTPCREATE="{base_url}"\r\n').encode())
raw = b""
raw = _read_more(raw, 300)
raw = _read_more(raw, 2000)
httpid = _parse_httpid(raw)
if httpid is None:
return False, "MHTTPCREATE failed (no httpid)"
# 3) 发 GET不用 at()(避免 at 吃掉 URC
_drain_uart(100)
uart4g.write((f'AT+MHTTPREQUEST={httpid},1,0,"{path}"\r\n').encode())
# 4) 收 URC先拿 headercode/length/md5再收 content 分片
buf = b""
urc_id = None
status_code = None
total_len = None
expect_len = None
expect_md5_b64 = None
body_buf = None
got_ranges = set()
filled_bytes = 0
t0 = time.ticks_ms()
last_sum = 0
while time.ticks_ms() - t0 < total_timeout_ms:
buf = _read_more(buf, 50)
# 4.1 尝试解析 header可能先来
while True:
ph = _try_parse_header(buf)
if ph is None:
break
if ph[0] == "drop":
buf = ph[1]
continue
_, hid, code, hdr_text, rest = ph
buf = rest
if urc_id is None:
urc_id = hid
status_code = code
expect_len, expect_md5_b64 = _extract_hdr_fields(hdr_text)
_log(f"[HDR] id={hid} code={code} len={expect_len} md5={expect_md5_b64}")
# 4.2 解析尽可能多的 content
while True:
pc = _try_parse_one_content(buf)
if pc is None:
break
if pc[0] == "drop":
buf = pc[1]
continue
_, cid, _total, _sum, _cur, payload, rest = pc
buf = rest
if urc_id is None:
urc_id = cid
if cid != urc_id:
continue
if total_len is None:
total_len = _total
body_buf = bytearray(total_len)
# 定位写入
start = _sum - _cur
end = _sum
if start < 0 or end > total_len:
continue
key = (start, end)
if key not in got_ranges:
got_ranges.add(key)
body_buf[start:end] = payload
filled_bytes += (end - start)
if _sum > last_sum:
last_sum = _sum
if debug:
_log(f"[URC] {start}:{end} sum={_sum}/{total_len} filled={filled_bytes}")
# 完整条件:填满 total_len
if filled_bytes == total_len:
break
if total_len is not None and filled_bytes == total_len:
break
# 5) 清理实例
at(f"AT+MHTTPDEL={httpid}", "OK", 3000)
if body_buf is None:
return False, "empty_body"
if total_len is None:
return False, "no_total_len"
if filled_bytes != total_len:
return False, f"incomplete_body got={filled_bytes} expected={total_len}"
data = bytes(body_buf)
# 6) 校验Content-Length
if expect_len is not None and len(data) != expect_len:
return False, f"length_mismatch got={len(data)} expected={expect_len}"
# 7) 校验Content-Md5base64
if expect_md5_b64 and hashlib is not None:
md5_b64 = _md5_base64(data)
if md5_b64 != expect_md5_b64:
return False, f"md5_mismatch got={md5_b64} expected={expect_md5_b64}"
# 8) 写文件(原样 bytes
with open(filename, "wb") as f:
f.write(data)
return True, f"OK size={len(data)} ip={ip} code={status_code}"
global ota_in_progress
ota_in_progress = True
with uart4g_lock:
try:
last_err = "unknown"
for attempt in range(1, retries + 1):
ok, msg = _one_attempt()
if ok:
return True, msg
last_err = msg
# 重试前等待 + 清 UART
_log(f"[RETRY] attempt={attempt} failed={msg}")
_drain_uart(500)
time.sleep_ms(300)
return False, f"FAILED after {retries} retries: {last_err}"
finally:
ota_in_progress = False
def direct_ota_download_via_4g(ota_url):
"""通过 4G 模块下载 OTA不需要 Wi-Fi"""
global update_thread_started
try:
if not ota_url:
safe_enqueue({"result": "ota_failed", "reason": "missing_url"}, 2)
return
print(f"[OTA-4G] 开始通过 4G 下载: {ota_url}")
success, msg = download_file_via_4g(ota_url, local_filename)
print(f"[OTA-4G] {msg}")
if success and "OK" in msg:
# 下载成功:备份+替换+重启
if apply_ota_and_reboot(ota_url):
return # 会重启,不会执行到 finally
else:
safe_enqueue({"result": msg}, 2)
except Exception as e:
error_msg = f"OTA-4G 异常: {str(e)}"
print(error_msg)
safe_enqueue({"result": "ota_failed", "reason": error_msg}, 2)
finally:
update_thread_started = False
# ==================== 主程序入口 ====================
def cmd_str():
global DEVICE_ID, PASSWORD
2025-12-26 11:47:33 +08:00
# print("env: ", config.get_env())
DEVICE_ID = read_device_id()
PASSWORD = DEVICE_ID + "."
# 创建照片存储目录
photo_dir = "/root/phot"
if photo_dir not in os.listdir("/root"):
try:
os.mkdir(photo_dir)
except:
pass
# 初始化硬件
init_ina226()
load_laser_point()
disp = display.Display()
cam = camera.Camera(640, 480)
# 启动通信与校准线程
_thread.start_new_thread(tcp_main, ())
_thread.start_new_thread(laser_calibration_worker, ())
print("系统准备完成...")
last_adc_trigger = 0
# 主循环:检测扳机触发 → 拍照 → 分析 → 上报
while not app.need_exit():
current_time = time.ticks_ms()
# print("压力传感器数值: ", adc_obj.read())
2025-12-28 16:22:41 +08:00
if adc_obj.read() > ADC_TRIGGER_THRESHOLD:
diff_ms = current_time-last_adc_trigger
if diff_ms<3000:
continue
last_adc_trigger = current_time
time.sleep_ms(60) # 防抖
frame = cam.read()
x, y = laser_point
# 绘制激光十字线
frame.draw_line(int(x - length), int(y), int(x + length), int(y), color, thickness)
frame.draw_line(int(x), int(y - length), int(x), int(y + length), color, thickness)
frame.draw_circle(int(x), int(y), 1, color, thickness)
# 检测靶心
result_img, center, radius, method, best_radius1 = detect_circle(frame)
disp.show(result_img)
# 计算偏移与距离
2025-12-28 16:22:41 +08:00
dx, dy = compute_laser_position(center, (x, y), radius, method)
distance_m = estimate_distance(best_radius1)
# 读取电量
voltage = get_bus_voltage()
battery_percent = voltage_to_percent(voltage)
# 保存图像(带标注)
2025-12-28 16:22:41 +08:00
try:
jpg_count = len([f for f in os.listdir(photo_dir) if f.endswith('.jpg')])
filename = f"{photo_dir}/{int(x)}_{int(y)}_{round((distance_m or 0.0) * 100)}_{method}_{jpg_count:04d}.jpg"
result_img.save(filename, quality=70)
except Exception as e:
print(f"❌ 保存失败: {e}")
# 构造上报数据
inner_data = {
"x": float(dx) if dx is not None else 200.0,
"y": float(dy) if dy is not None else 200.0,
"r": 90.0,
"d": round((distance_m or 0.0) * 100), # 距离(厘米)
2025-12-28 16:22:41 +08:00
"m": method
}
report_data = {"cmd": 1, "data": inner_data}
2025-12-26 11:47:33 +08:00
# 射箭事件高优先级入队,由 tcp_main 统一发送
safe_enqueue(report_data, msg_type=2, high=True)
print("📤 射箭事件已加入发送队列")
time.sleep_ms(100)
else:
disp.show(cam.read())
time.sleep_ms(50)
2025-12-28 16:22:41 +08:00
def dump_system_info():
cmds = [
"uname -a",
"cat /etc/os-release 2>/dev/null",
"cat /proc/version 2>/dev/null",
"cat /proc/1/comm 2>/dev/null",
"ps 2>/dev/null | head -n 5",
"ls -l /sbin/init 2>/dev/null",
"ls -l /etc/init.d 2>/dev/null | head -n 10",
"which systemctl 2>/dev/null",
"which rc-service 2>/dev/null",
"which busybox 2>/dev/null && busybox | head -n 1",
"ls /dev/watchdog* 2>/dev/null",
]
for c in cmds:
try:
out = os.popen(c).read()
print("\n$ " + c + "\n" + (out.strip() or "<empty>"))
except Exception as e:
print("\n$ " + c + "\n<error> " + str(e))
if __name__ == "__main__":
2025-12-28 16:22:41 +08:00
# dump_system_info()
try:
import threading
print("threading module:", threading)
print("has Lock:", hasattr(threading, "Lock"))
if hasattr(threading, "Lock"):
print("has lock")
finally:
pass
cmd_str()