fix: 修改电量不固定

This commit is contained in:
2025-12-26 15:12:47 +08:00
parent 5a98bf2e85
commit 21cec260b8

84
main.py
View File

@@ -7,20 +7,21 @@
作者ZZH
最后更新2025-11-21
"""
from maix import camera, display, image, app, time, key, uart, pinmap, i2c, network, err
import _thread
import hashlib
import hmac
import json
import os
import re
import socket
import struct
import cv2
import numpy as np
import json
import struct
import re
from maix.peripheral import adc
import _thread
import os
import hmac
import ujson
import hashlib
import requests
import socket
import ujson
from maix import camera, display, image, app, time, key, uart, pinmap, i2c, network, err
from maix.peripheral import adc
# import config
@@ -49,7 +50,7 @@ URL = "http://ws.shelingxingqiu.com"
API_PATH = "/home/shoot/device_fire/arrow/fire"
# UART 设备初始化
uart4g = uart.UART("/dev/ttyS2", 115200) # 4G 模块通信
uart4g = uart.UART("/dev/ttyS2", 115200) # 4G 模块通信
distance_serial = uart.UART("/dev/ttyS1", 9600) # 激光测距模块
# 引脚功能映射
@@ -71,14 +72,14 @@ thickness = 1
length = 2
# 全局状态变量
laser_calibration_active = False # 是否正在后台校准激光
laser_calibration_result = None # 校准结果坐标 (x, y)
laser_calibration_lock = False # 简易互斥锁,防止多线程冲突
laser_calibration_active = False # 是否正在后台校准激光
laser_calibration_result = None # 校准结果坐标 (x, y)
laser_calibration_lock = False # 简易互斥锁,防止多线程冲突
# 硬件对象初始化
laser_x, laser_y = laser_point
adc_obj = adc.ADC(0, adc.RES_BIT_12)
bus = i2c.I2C(1, i2c.Mode.MASTER) # 使用 I2C1 总线
bus = i2c.I2C(1, i2c.Mode.MASTER) # 使用 I2C1 总线
# bus = i2c.I2C(5, i2c.Mode.MASTER) #ota升级的
# INA226 电流/电压监测芯片寄存器地址
INA226_ADDR = 0x40
@@ -94,15 +95,15 @@ LASER_OFF_CMD = bytes([0xAA, MODULE_ADDR, 0x01, 0xBE, 0x00, 0x01, 0x00, 0x00, 0x
# 相机标定参数(用于距离估算)
# FOCAL_LENGTH_PIX = 3800.0 # 焦距(像素)
FOCAL_LENGTH_PIX = 1900.0 # 焦距(像素)
REAL_RADIUS_CM = 15 # 靶心实际半径(厘米)
FOCAL_LENGTH_PIX = 1900.0 # 焦距(像素)
REAL_RADIUS_CM = 15 # 靶心实际半径(厘米)
# TCP 连接状态
tcp_connected = False
high_send_queue = [] # 高优先级发送队列:射箭事件等
normal_send_queue = [] # 普通发送队列:电量/校准结果/状态等
queue_lock = False # 简易互斥锁,保护队列
uart4g_lock = False # 简易互斥锁,保护 4G 串口 AT 发送流程(防并发)
high_send_queue = [] # 高优先级发送队列:射箭事件等
normal_send_queue = [] # 普通发送队列:电量/校准结果/状态等
queue_lock = False # 简易互斥锁,保护队列
uart4g_lock = False # 简易互斥锁,保护 4G 串口 AT 发送流程(防并发)
update_thread_started = False # 防止 OTA 更新线程重复启动
@@ -168,6 +169,7 @@ def direct_ota_download():
finally:
update_thread_started = False # 允许下次 OTA
def handle_wifi_and_update(ssid, password):
"""在子线程中执行 Wi-Fi 连接 + OTA 更新流程"""
try:
@@ -442,7 +444,7 @@ def find_red_laser(frame, threshold=150):
for y in range(0, h, 2):
for x in range(0, w, 2):
idx = (y * w + x) * 3
r, g, b = img_bytes[idx], img_bytes[idx+1], img_bytes[idx+2]
r, g, b = img_bytes[idx], img_bytes[idx + 1], img_bytes[idx + 2]
if r > threshold and r > g * 2 and r > b * 2:
rgb_sum = r + g + b
if rgb_sum > max_sum:
@@ -588,18 +590,20 @@ def compute_laser_position(circle_center, laser_point, radius, method):
dy = ly - cy
return dx / (circle_r / 100.0), -dy / (circle_r / 100.0)
def compute_laser_position_v2(circle_center, laser_point):
print(f"circle_center : {circle_center}")
if circle_center is None:
return 200, 200
cx, cy = circle_center
lx, ly = 320,230
# lx, ly = laser_point
lx, ly = 320, 220
dx = lx - cx
dy = ly - cy
r = 22.16 * 5
target_x = dx/r*100
target_y = dy/r*100
target_x = dx / r * 100
target_y = dy / r * 100
print(f"lx{lx} ly: {ly} cx: {cx} cy: {cy} dx: {dx} dy: {dy} result_x: {target_x} result_y: {-target_y}")
return (target_x, -target_y)
# ==================== TCP 通信线程 ====================
def connect_server():
@@ -704,7 +708,8 @@ def tcp_main():
safe_enqueue({"result": "update_already_started"}, 2)
elif inner_cmd == 6:
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
ip = os.popen(
"ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
ip = ip if ip else "no_ip"
except:
ip = "error_getting_ip"
@@ -717,12 +722,14 @@ def tcp_main():
# 实时检查是否有 IP
try:
ip = os.popen("ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
ip = os.popen(
"ifconfig wlan0 2>/dev/null | grep 'inet ' | awk '{print $2}'").read().strip()
except:
ip = None
if not ip:
safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"}, MSG_TYPE_STATUS)
safe_enqueue({"result": "ota_rejected", "reason": "no_wifi_ip"},
MSG_TYPE_STATUS)
else:
# 启动纯下载线程
update_thread_started = True
@@ -778,7 +785,7 @@ def tcp_main():
print("💓 心跳已发送")
# 心跳超时重连
if logged_in and current_time - last_heartbeat_ack_time > 1000*60*10: # 十分钟
if logged_in and current_time - last_heartbeat_ack_time > 1000 * 60 * 10: # 十分钟
print("⏰ 十分钟无心跳ACK重连")
break
@@ -843,9 +850,10 @@ def cmd_str():
while not app.need_exit():
current_time = time.ticks_ms()
# print("压力传感器数值: ", adc_obj.read())
if adc_obj.read() > ADC_TRIGGER_THRESHOLD:
diff_ms = current_time-last_adc_trigger
if diff_ms<3000:
adc_val = adc_obj.read()
if adc_val > ADC_TRIGGER_THRESHOLD:
diff_ms = current_time - last_adc_trigger
if diff_ms < 3000:
continue
last_adc_trigger = current_time
time.sleep_ms(60) # 防抖
@@ -876,14 +884,14 @@ def cmd_str():
# result_img.save(filename, quality=70)
# except Exception as e:
# print(f"❌ 保存失败: {e}")
# 构造上报数据
inner_data = {
"x": float(dx) if dx is not None else 200.0,
"y": float(dy) if dy is not None else 200.0,
"r": 90.0,
"d": round((distance_m or 0.0) * 100), # 距离(厘米)
"m": method
"m": method,
"adc": adc_val,
}
report_data = {"cmd": 1, "data": inner_data}
# 射箭事件高优先级入队,由 tcp_main 统一发送
@@ -897,4 +905,4 @@ def cmd_str():
if __name__ == "__main__":
cmd_str()
cmd_str()