import serial import time def calculate_crc(data): """计算Modbus RTU CRC16校验码""" crc = 0xFFFF for byte in data: crc ^= byte for _ in range(8): if crc & 0x0001: crc >>= 1 crc ^= 0xA001 else: crc >>= 1 return crc.to_bytes(2, byteorder='little') def create_modbus_frame(slave_address, function_code=0x03, start_address=0x0000, num_registers=1): """创建Modbus RTU请求帧""" frame = bytes([ slave_address, # 设备地址 function_code, # 功能码 (start_address >> 8) & 0xFF, # 起始地址高字节 start_address & 0xFF, # 起始地址低字节 (num_registers >> 8) & 0xFF, # 寄存器数量高字节 num_registers & 0xFF # 寄存器数量低字节 ]) crc = calculate_crc(frame) return frame + crc def bytes_to_hex(data): """将字节数据转换为十六进制字符串""" return ' '.join([f"{b:02X}" for b in data]) if data else "" def validate_response(response, slave_address, function_code): """验证Modbus响应有效性""" if len(response) < 5: return False # 检查地址和功能码 if response[0] != slave_address or response[1] != function_code: # 检查是否是错误响应 if response[0] == slave_address and response[1] == function_code + 0x80: error_code = response[2] print(f" Modbus错误响应 (异常码: {error_code:02X})") return False # 检查数据长度 data_length = response[2] if len(response) != 5 + data_length: return False # 验证CRC校验 received_crc = response[-2:] calculated_crc = calculate_crc(response[:-2]) return received_crc == calculated_crc def get_stopbits(stopbits_str): """将字符串表示的停止位转换为serial库常量""" if stopbits_str == '1.5': return serial.STOPBITS_ONE_POINT_FIVE elif stopbits_str == '2': return serial.STOPBITS_TWO else: return serial.STOPBITS_ONE def get_function_code(func_str): """将字符串表示的功能码转换为整数""" try: # 尝试解析十六进制 if func_str.lower().startswith('0x'): return int(func_str, 16) # 尝试解析十进制 return int(func_str) except ValueError: # 解析失败,返回默认值 return 0x03 def scan(port_temp,baudrate_temp,timeout_temp,retries_temp,send_interval_temp,register_address_temp,func_input_temp,bytesize_input_temp,parity_input_temp,stopbits_input_temp): port = port_temp.strip() baudrate = baudrate_temp.strip() timeout = float(timeout_temp.strip()) retries = int(retries_temp.strip()) send_interval = float(send_interval_temp.strip()) register_address = int(register_address_temp.strip()) func_input = func_input_temp.strip() function_code = get_function_code(func_input) if func_input else 0x03 # 数据位配置 bytesize_input = bytesize_input_temp.strip() if bytesize_input == '5': bytesize = serial.FIVEBITS elif bytesize_input == '6': bytesize = serial.SIXBITS elif bytesize_input == '7': bytesize = serial.SEVENBITS else: bytesize = serial.EIGHTBITS # 默认 # 校验位配置 parity_input = parity_input_temp.strip().upper() if parity_input == 'E': parity = serial.PARITY_EVEN elif parity_input == 'O': parity = serial.PARITY_ODD elif parity_input == 'M': parity = serial.PARITY_MARK elif parity_input == 'S': parity = serial.PARITY_SPACE else: parity = serial.PARITY_NONE # 默认 # 停止位配置 stopbits_input = stopbits_input_temp.strip() if stopbits_input == '1.5': stopbits = serial.STOPBITS_ONE_POINT_FIVE elif stopbits_input == '2': stopbits = serial.STOPBITS_TWO else: stopbits = serial.STOPBITS_ONE # 默认 # 停止位描述转换 stopbits_desc = { serial.STOPBITS_ONE: "1位", serial.STOPBITS_ONE_POINT_FIVE: "1.5位", serial.STOPBITS_TWO: "2位" } def scan_modbus_devices(port='COM3', baudrate=9600, addr=1, timeout=0.2, retries=1, send_interval=0.05, register_address=0, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, function_code=0x03): try: # 配置串口参数 ser = serial.Serial( port=port, baudrate=baudrate, bytesize=bytesize, parity=parity, stopbits=stopbits, timeout=timeout ) # 使用用户设置的寄存器地址和功能码创建请求帧 frame = create_modbus_frame(addr, function_code=function_code, start_address=register_address) # 打印发送的发码 # print(f"地址 {addr:3d}: 发送 -> {bytes_to_hex(frame)}") sendding_frame=f"地址 {addr:3d}: 发送 -> {bytes_to_hex(frame)}" for attempt in range(retries): try: ser.write(frame) time.sleep(send_interval) # 发送间隔时间 response = ser.read_all() recvied_frame='' register_value='' if response: # 打印接收的收码 # print(f"地址 {addr:3d}: 接收 <- {bytes_to_hex(response)}") recvied_frame=f"地址 {addr:3d}: 接收 <- {bytes_to_hex(response)}" if response and validate_response(response, addr, function_code): # 解析寄存器值 data_length = response[2] if data_length >= 2: # 至少2字节数据 # 提取第一个寄存器的值 reg_value = (response[3] << 8) | response[4] # print(f"地址 {addr:3d} - 设备响应有效 | 寄存器 {register_address} 值: {reg_value}") register_value=f"地址 {addr:3d} - 设备响应有效 | 寄存器 {register_address} 值: {reg_value}" else: # print(f"地址 {addr:3d} - 设备响应有效 | 无效数据长度") register_value =f"地址 {addr:3d} - 设备响应有效 | 无效数据长度" break elif attempt == retries - 1: # print(f"地址 {addr:3d} - 无响应") recvied_frame =f"地址 {addr:3d} - 无响应" except Exception as e: print(f"\n地址 {addr} 通信错误: {str(e)}") ser.close() status={ 'status':200, 'sending_code':f"发码:{sendding_frame}", 'receving_code':f"回码:{recvied_frame}", 'register_value':f"寄存器值: {register_value}" } except serial.SerialException as e: print(f"\n串口错误: {str(e)}") status={ 'status':403, 'error':f"\n串口错误: {str(e)}" } return status print(status) return status