You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
203 lines
7.1 KiB
203 lines
7.1 KiB
import serial
|
|
import time
|
|
|
|
def calculate_crc(data):
|
|
"""计算Modbus RTU CRC16校验码"""
|
|
crc = 0xFFFF
|
|
for byte in data:
|
|
crc ^= byte
|
|
for _ in range(8):
|
|
if crc & 0x0001:
|
|
crc >>= 1
|
|
crc ^= 0xA001
|
|
else:
|
|
crc >>= 1
|
|
return crc.to_bytes(2, byteorder='little')
|
|
|
|
|
|
def create_modbus_frame(slave_address, function_code=0x03, start_address=0x0000, num_registers=1):
|
|
"""创建Modbus RTU请求帧"""
|
|
frame = bytes([
|
|
slave_address, # 设备地址
|
|
function_code, # 功能码
|
|
(start_address >> 8) & 0xFF, # 起始地址高字节
|
|
start_address & 0xFF, # 起始地址低字节
|
|
(num_registers >> 8) & 0xFF, # 寄存器数量高字节
|
|
num_registers & 0xFF # 寄存器数量低字节
|
|
])
|
|
crc = calculate_crc(frame)
|
|
return frame + crc
|
|
|
|
|
|
def bytes_to_hex(data):
|
|
"""将字节数据转换为十六进制字符串"""
|
|
return ' '.join([f"{b:02X}" for b in data]) if data else ""
|
|
|
|
|
|
def validate_response(response, slave_address, function_code):
|
|
"""验证Modbus响应有效性"""
|
|
if len(response) < 5:
|
|
return False
|
|
|
|
# 检查地址和功能码
|
|
if response[0] != slave_address or response[1] != function_code:
|
|
# 检查是否是错误响应
|
|
if response[0] == slave_address and response[1] == function_code + 0x80:
|
|
error_code = response[2]
|
|
print(f" Modbus错误响应 (异常码: {error_code:02X})")
|
|
return False
|
|
|
|
# 检查数据长度
|
|
data_length = response[2]
|
|
if len(response) != 5 + data_length:
|
|
return False
|
|
|
|
# 验证CRC校验
|
|
received_crc = response[-2:]
|
|
calculated_crc = calculate_crc(response[:-2])
|
|
return received_crc == calculated_crc
|
|
|
|
|
|
def get_stopbits(stopbits_str):
|
|
"""将字符串表示的停止位转换为serial库常量"""
|
|
if stopbits_str == '1.5':
|
|
return serial.STOPBITS_ONE_POINT_FIVE
|
|
elif stopbits_str == '2':
|
|
return serial.STOPBITS_TWO
|
|
else:
|
|
return serial.STOPBITS_ONE
|
|
|
|
|
|
def get_function_code(func_str):
|
|
"""将字符串表示的功能码转换为整数"""
|
|
try:
|
|
# 尝试解析十六进制
|
|
if func_str.lower().startswith('0x'):
|
|
return int(func_str, 16)
|
|
# 尝试解析十进制
|
|
return int(func_str)
|
|
except ValueError:
|
|
# 解析失败,返回默认值
|
|
return 0x03
|
|
|
|
|
|
def scan(port_temp,baudrate_temp,timeout_temp,retries_temp,send_interval_temp,register_address_temp,func_input_temp,bytesize_input_temp,parity_input_temp,stopbits_input_temp):
|
|
port = port_temp.strip()
|
|
baudrate = baudrate_temp.strip()
|
|
timeout = float(timeout_temp.strip())
|
|
retries = int(retries_temp.strip())
|
|
send_interval = float(send_interval_temp.strip())
|
|
register_address = int(register_address_temp.strip())
|
|
|
|
func_input = func_input_temp.strip()
|
|
function_code = get_function_code(func_input) if func_input else 0x03
|
|
|
|
# 数据位配置
|
|
bytesize_input = bytesize_input_temp.strip()
|
|
if bytesize_input == '5':
|
|
bytesize = serial.FIVEBITS
|
|
elif bytesize_input == '6':
|
|
bytesize = serial.SIXBITS
|
|
elif bytesize_input == '7':
|
|
bytesize = serial.SEVENBITS
|
|
else:
|
|
bytesize = serial.EIGHTBITS # 默认
|
|
|
|
# 校验位配置
|
|
parity_input = parity_input_temp.strip().upper()
|
|
if parity_input == 'E':
|
|
parity = serial.PARITY_EVEN
|
|
elif parity_input == 'O':
|
|
parity = serial.PARITY_ODD
|
|
elif parity_input == 'M':
|
|
parity = serial.PARITY_MARK
|
|
elif parity_input == 'S':
|
|
parity = serial.PARITY_SPACE
|
|
else:
|
|
parity = serial.PARITY_NONE # 默认
|
|
|
|
# 停止位配置
|
|
stopbits_input = stopbits_input_temp.strip()
|
|
if stopbits_input == '1.5':
|
|
stopbits = serial.STOPBITS_ONE_POINT_FIVE
|
|
elif stopbits_input == '2':
|
|
stopbits = serial.STOPBITS_TWO
|
|
else:
|
|
stopbits = serial.STOPBITS_ONE # 默认
|
|
|
|
# 停止位描述转换
|
|
stopbits_desc = {
|
|
serial.STOPBITS_ONE: "1位",
|
|
serial.STOPBITS_ONE_POINT_FIVE: "1.5位",
|
|
serial.STOPBITS_TWO: "2位"
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
def scan_modbus_devices(port='COM3', baudrate=9600, addr=1,
|
|
timeout=0.2, retries=1, send_interval=0.05, register_address=0,
|
|
bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE,
|
|
function_code=0x03):
|
|
try:
|
|
# 配置串口参数
|
|
ser = serial.Serial(
|
|
port=port,
|
|
baudrate=baudrate,
|
|
bytesize=bytesize,
|
|
parity=parity,
|
|
stopbits=stopbits,
|
|
timeout=timeout
|
|
)
|
|
# 使用用户设置的寄存器地址和功能码创建请求帧
|
|
frame = create_modbus_frame(addr, function_code=function_code, start_address=register_address)
|
|
|
|
# 打印发送的发码
|
|
# print(f"地址 {addr:3d}: 发送 -> {bytes_to_hex(frame)}")
|
|
sendding_frame=f"地址 {addr:3d}: 发送 -> {bytes_to_hex(frame)}"
|
|
for attempt in range(retries):
|
|
try:
|
|
ser.write(frame)
|
|
time.sleep(send_interval) # 发送间隔时间
|
|
response = ser.read_all()
|
|
recvied_frame=''
|
|
register_value=''
|
|
if response:
|
|
# 打印接收的收码
|
|
# print(f"地址 {addr:3d}: 接收 <- {bytes_to_hex(response)}")
|
|
recvied_frame=f"地址 {addr:3d}: 接收 <- {bytes_to_hex(response)}"
|
|
if response and validate_response(response, addr, function_code):
|
|
# 解析寄存器值
|
|
data_length = response[2]
|
|
if data_length >= 2: # 至少2字节数据
|
|
# 提取第一个寄存器的值
|
|
reg_value = (response[3] << 8) | response[4]
|
|
# print(f"地址 {addr:3d} - 设备响应有效 | 寄存器 {register_address} 值: {reg_value}")
|
|
register_value=f"地址 {addr:3d} - 设备响应有效 | 寄存器 {register_address} 值: {reg_value}"
|
|
else:
|
|
# print(f"地址 {addr:3d} - 设备响应有效 | 无效数据长度")
|
|
register_value =f"地址 {addr:3d} - 设备响应有效 | 无效数据长度"
|
|
break
|
|
elif attempt == retries - 1:
|
|
# print(f"地址 {addr:3d} - 无响应")
|
|
recvied_frame =f"地址 {addr:3d} - 无响应"
|
|
except Exception as e:
|
|
print(f"\n地址 {addr} 通信错误: {str(e)}")
|
|
|
|
ser.close()
|
|
status={
|
|
'status':200,
|
|
'sending_code':f"发码:{sendding_frame}",
|
|
'receving_code':f"回码:{recvied_frame}",
|
|
'register_value':f"寄存器值: {register_value}"
|
|
}
|
|
except serial.SerialException as e:
|
|
print(f"\n串口错误: {str(e)}")
|
|
status={
|
|
'status':403,
|
|
'error':f"\n串口错误: {str(e)}"
|
|
}
|
|
return status
|
|
print(status)
|
|
return status
|