You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

203 lines
7.1 KiB

import serial
import time
def calculate_crc(data):
"""计算Modbus RTU CRC16校验码"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc >>= 1
crc ^= 0xA001
else:
crc >>= 1
return crc.to_bytes(2, byteorder='little')
def create_modbus_frame(slave_address, function_code=0x03, start_address=0x0000, num_registers=1):
"""创建Modbus RTU请求帧"""
frame = bytes([
slave_address, # 设备地址
function_code, # 功能码
(start_address >> 8) & 0xFF, # 起始地址高字节
start_address & 0xFF, # 起始地址低字节
(num_registers >> 8) & 0xFF, # 寄存器数量高字节
num_registers & 0xFF # 寄存器数量低字节
])
crc = calculate_crc(frame)
return frame + crc
def bytes_to_hex(data):
"""将字节数据转换为十六进制字符串"""
return ' '.join([f"{b:02X}" for b in data]) if data else ""
def validate_response(response, slave_address, function_code):
"""验证Modbus响应有效性"""
if len(response) < 5:
return False
# 检查地址和功能码
if response[0] != slave_address or response[1] != function_code:
# 检查是否是错误响应
if response[0] == slave_address and response[1] == function_code + 0x80:
error_code = response[2]
print(f" Modbus错误响应 (异常码: {error_code:02X})")
return False
# 检查数据长度
data_length = response[2]
if len(response) != 5 + data_length:
return False
# 验证CRC校验
received_crc = response[-2:]
calculated_crc = calculate_crc(response[:-2])
return received_crc == calculated_crc
def get_stopbits(stopbits_str):
"""将字符串表示的停止位转换为serial库常量"""
if stopbits_str == '1.5':
return serial.STOPBITS_ONE_POINT_FIVE
elif stopbits_str == '2':
return serial.STOPBITS_TWO
else:
return serial.STOPBITS_ONE
def get_function_code(func_str):
"""将字符串表示的功能码转换为整数"""
try:
# 尝试解析十六进制
if func_str.lower().startswith('0x'):
return int(func_str, 16)
# 尝试解析十进制
return int(func_str)
except ValueError:
# 解析失败,返回默认值
return 0x03
def scan(port_temp,baudrate_temp,timeout_temp,retries_temp,send_interval_temp,register_address_temp,func_input_temp,bytesize_input_temp,parity_input_temp,stopbits_input_temp):
port = port_temp.strip()
baudrate = baudrate_temp.strip()
timeout = float(timeout_temp.strip())
retries = int(retries_temp.strip())
send_interval = float(send_interval_temp.strip())
register_address = int(register_address_temp.strip())
func_input = func_input_temp.strip()
function_code = get_function_code(func_input) if func_input else 0x03
# 数据位配置
bytesize_input = bytesize_input_temp.strip()
if bytesize_input == '5':
bytesize = serial.FIVEBITS
elif bytesize_input == '6':
bytesize = serial.SIXBITS
elif bytesize_input == '7':
bytesize = serial.SEVENBITS
else:
bytesize = serial.EIGHTBITS # 默认
# 校验位配置
parity_input = parity_input_temp.strip().upper()
if parity_input == 'E':
parity = serial.PARITY_EVEN
elif parity_input == 'O':
parity = serial.PARITY_ODD
elif parity_input == 'M':
parity = serial.PARITY_MARK
elif parity_input == 'S':
parity = serial.PARITY_SPACE
else:
parity = serial.PARITY_NONE # 默认
# 停止位配置
stopbits_input = stopbits_input_temp.strip()
if stopbits_input == '1.5':
stopbits = serial.STOPBITS_ONE_POINT_FIVE
elif stopbits_input == '2':
stopbits = serial.STOPBITS_TWO
else:
stopbits = serial.STOPBITS_ONE # 默认
# 停止位描述转换
stopbits_desc = {
serial.STOPBITS_ONE: "1位",
serial.STOPBITS_ONE_POINT_FIVE: "1.5位",
serial.STOPBITS_TWO: "2位"
}
def scan_modbus_devices(port='COM3', baudrate=9600, addr=1,
timeout=0.2, retries=1, send_interval=0.05, register_address=0,
bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE,
function_code=0x03):
try:
# 配置串口参数
ser = serial.Serial(
port=port,
baudrate=baudrate,
bytesize=bytesize,
parity=parity,
stopbits=stopbits,
timeout=timeout
)
# 使用用户设置的寄存器地址和功能码创建请求帧
frame = create_modbus_frame(addr, function_code=function_code, start_address=register_address)
# 打印发送的发码
# print(f"地址 {addr:3d}: 发送 -> {bytes_to_hex(frame)}")
sendding_frame=f"地址 {addr:3d}: 发送 -> {bytes_to_hex(frame)}"
for attempt in range(retries):
try:
ser.write(frame)
time.sleep(send_interval) # 发送间隔时间
response = ser.read_all()
recvied_frame=''
register_value=''
if response:
# 打印接收的收码
# print(f"地址 {addr:3d}: 接收 <- {bytes_to_hex(response)}")
recvied_frame=f"地址 {addr:3d}: 接收 <- {bytes_to_hex(response)}"
if response and validate_response(response, addr, function_code):
# 解析寄存器值
data_length = response[2]
if data_length >= 2: # 至少2字节数据
# 提取第一个寄存器的值
reg_value = (response[3] << 8) | response[4]
# print(f"地址 {addr:3d} - 设备响应有效 | 寄存器 {register_address} 值: {reg_value}")
register_value=f"地址 {addr:3d} - 设备响应有效 | 寄存器 {register_address} 值: {reg_value}"
else:
# print(f"地址 {addr:3d} - 设备响应有效 | 无效数据长度")
register_value =f"地址 {addr:3d} - 设备响应有效 | 无效数据长度"
break
elif attempt == retries - 1:
# print(f"地址 {addr:3d} - 无响应")
recvied_frame =f"地址 {addr:3d} - 无响应"
except Exception as e:
print(f"\n地址 {addr} 通信错误: {str(e)}")
ser.close()
status={
'status':200,
'sending_code':f"发码:{sendding_frame}",
'receving_code':f"回码:{recvied_frame}",
'register_value':f"寄存器值: {register_value}"
}
except serial.SerialException as e:
print(f"\n串口错误: {str(e)}")
status={
'status':403,
'error':f"\n串口错误: {str(e)}"
}
return status
print(status)
return status