阿方
创作者认证
萤火工场CEM5861G-M11 在昉·星光2上读取雷达数据
安装 pyserial 串口库
可以直接使用 pip 安装:
sudo pip install pyserial --break-system-packages
或者前往 PyPI 下载 pyserial 的 whl 包进行离线安装:
- 进入 https://pypi.org/project/pyserial/#files 后下载 Built Distribution 中的 whl 包,这里我使用了最新 3.5 版本
- 使用 pip 安装:
sudo pip install pyserial-3.5-py2.py3-none-any.whl --break-system-packages
编程
对于本程序的核心思想解析,参考《萤火工场CEM5861G-M11 使用RA6E2点灯》
import sys
import serial
import time
uart_port = "/dev/ttyS0"
baud_rate = 115200
# 状态机的各个状态
STATE_WAIT_HEADER_1 = 0
STATE_WAIT_HEADER_2 = 1
STATE_READ_PAYLOAD = 2
# 定义帧头和帧数据长度
FRAME_HEADER_1 = 0x55
FRAME_HEADER_2 = 0xA5
PAYLOAD_LENGTH = 10 # Data[0]~Data[9]的长度
def calculate_checksum(data):
"""计算校验和"""
checksum = 0
for byte in data:
checksum = (checksum + byte) & 0xFF
return checksum
def parse_radar_data(data_bytes):
"""解析雷达 Data[0]~Data[9] 部分的数据"""
if len(data_bytes) < 10:
print(f"警告: 接收到的数据部分长度不足10字节: {len(data_bytes)}")
return None
target_id = data_bytes[0]
target_status = data_bytes[1]
distance_cm = (data_bytes[2] << 8) | data_bytes[3]
speed_raw = (data_bytes[4] << 8) | data_bytes[5]
speed_ms = speed_raw if speed_raw < 0x8000 else speed_raw - 0x10000
azimuth_angle = data_bytes[6] if data_bytes[6] < 0x80 else data_bytes[6] - 0x100
elevation_angle = data_bytes[7] if data_bytes[7] < 0x80 else data_bytes[7] - 0x100
signal_strength = (data_bytes[8] << 8) | data_bytes[9]
return {
"目标ID": target_id,
"目标状态": target_status,
"距离(cm)": distance_cm,
"速度(m/s)": speed_ms,
"方位角(度)": azimuth_angle,
"俯仰角(度)": elevation_angle,
"信号强度": signal_strength
}
def showUartInfo(uart):
"""
使用状态机对齐帧并读取雷达数据
帧格式: 55 A5 [数据长度-高][数据长度-低] [功能码] [命令码1] [命令码2] Data[0]...Data[9] [校验和]
其中 [数据长度-高][数据长度-低] = 00 0E (从功能码到校验和共 14 字节),整个帧长度是 18 字节
"""
global STATE_WAIT_HEADER_1
global STATE_WAIT_HEADER_2
global STATE_READ_PAYLOAD
current_state = STATE_WAIT_HEADER_1
received_frame = []
expected_payload_length = 0
while True:
byte = uart.read(1)
if not byte:
break
byte_val = ord(byte)
if current_state == STATE_WAIT_HEADER_1:
if byte_val == FRAME_HEADER_1:
received_frame = [byte_val]
current_state = STATE_WAIT_HEADER_2
elif current_state == STATE_WAIT_HEADER_2:
if byte_val == FRAME_HEADER_2:
received_frame.append(byte_val)
current_state = STATE_READ_PAYLOAD
expected_payload_length = 0
else:
current_state = STATE_WAIT_HEADER_1
received_frame = []
elif current_state == STATE_READ_PAYLOAD:
received_frame.append(byte_val)
if len(received_frame) == 4:
expected_payload_length = (received_frame[2] << 8) | received_frame[3]
if expected_payload_length != 14:
print(f"警告: 接收到非预期的payload长度: {expected_payload_length}. 重新寻找帧头. 帧: {' '.join(f'{b:02X}' for b in received_frame)}")
current_state = STATE_WAIT_HEADER_1
received_frame = []
continue
if len(received_frame) == 2 + 2 + expected_payload_length and expected_payload_length != 0:
function_code = received_frame[4]
command_code_1 = received_frame[5]
command_code_2 = received_frame[6]
if function_code == 0x03 and command_code_1 == 0x81 and command_code_2 == 0x00:
data_payload = received_frame[7 : 7 + PAYLOAD_LENGTH]
received_checksum = received_frame[-1]
calculated_checksum = calculate_checksum(received_frame[:-1])
if calculated_checksum == received_checksum:
print("----------------------------------------")
print(f"{' '.join(f'{b:02X}' for b in received_frame)}")
radar_status = parse_radar_data(data_payload)
if radar_status:
print("雷达状态:")
for key, value in radar_status.items():
print(f" {key}: {value}")
print("----------------------------------------")
else:
print(
f"校验和错误: 计算值 0x{calculated_checksum:02X}, 接收值 0x{received_checksum:02X}. 帧: {' '.join(f'{b:02X}' for b in received_frame)}")
else:
print(
f"命令码不匹配: 功能码 0x{function_code:02X}, 命令码1 0x{command_code_1:02X}, 命令码2 0x{command_code_2:02X}. 帧: {' '.join(f'{b:02X}' for b in received_frame)}")
current_state = STATE_WAIT_HEADER_1
received_frame = []
expected_payload_length = 0
elif expected_payload_length != 0 and len(received_frame) > 2 + 2 + expected_payload_length:
print(
f"警告: 帧长度超出预期,可能失去同步。重新寻找帧头. 帧: {' '.join(f'{b:02X}' for b in received_frame)}")
current_state = STATE_WAIT_HEADER_1
received_frame = []
expected_payload_length = 0
def main():
try:
uart = serial.Serial(uart_port, baudrate=baud_rate, timeout=0.1)
print(f"成功打开串口: {uart_port}@{baud_rate} bps")
print("等待雷达数据...")
while True:
showUartInfo(uart)
except serial.SerialException as e:
print(f"串口错误: {e}")
print(f"请检查串口 {uart_port} 是否存在、未被占用,并且波特率 {baud_rate} 是否正确")
return 1
except KeyboardInterrupt:
print("\n程序终止")
finally:
if 'uart' in locals() and uart.is_open:
uart.close()
print("串口已关闭")
if __name__ == "__main__":
sys.exit(main())
接线
运行
在运行程序前需要暂停服务 serial-getty@ttyS0.service
:
sudo systemctl stop serial-getty@ttyS0.service
然后运行程序即可看到不停显示的雷达状态:
----------------------------------------
55 A5 00 0E 03 81 00 00 02 00 52 00 00 00 00 05 91 76
雷达状态:
目标ID: 0
目标状态: 2
距离(cm): 82
速度(m/s): 0
方位角(度): 0
俯仰角(度): 0
信号强度: 1425
----------------------------------------
现象
版块:
萤火工场
前4天 15:40
全部评论