You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

119 lines
4.1 KiB

import asyncio
import serial
import websocket
import websockets
import json
import sys
import time
from eventlet.green.http.client import responses
import modscan
async def handle_connection(websocket):
"""处理 WebSocket 连接"""
client_ip = websocket.remote_address[0]
print(f"客户端连接成功: {client_ip}")
async for message in websocket:
try:
data=json.loads(message)
port = data.get('port').strip()
baudrate = data.get('baudrate').strip()
register_address = int(data.get('registerAddress').strip())
timeout = float(data.get('timeout').strip())
retries = int(data.get('retries').strip())
send_interval = float(data.get('sendInterval').strip())
func_input = data.get('functionCode').strip()
function_code = modscan.get_function_code(func_input) if func_input else 0x03
# 数据位配置
bytesize_input = data.get('bytesize').strip()
if bytesize_input == '5':
bytesize = serial.FIVEBITS
elif bytesize_input == '6':
bytesize = serial.SIXBITS
elif bytesize_input == '7':
bytesize = serial.SEVENBITS
else:
bytesize = serial.EIGHTBITS # 默认
# 校验位配置
parity_input = data.get('parity').strip().upper()
if parity_input == 'E':
parity = serial.PARITY_EVEN
elif parity_input == 'O':
parity = serial.PARITY_ODD
elif parity_input == 'M':
parity = serial.PARITY_MARK
elif parity_input == 'S':
parity = serial.PARITY_SPACE
else:
parity = serial.PARITY_NONE # 默认
# 停止位配置
stopbits_input = data.get('stopbits').strip()
if stopbits_input == '1.5':
stopbits = serial.STOPBITS_ONE_POINT_FIVE
elif stopbits_input == '2':
stopbits = serial.STOPBITS_TWO
else:
stopbits = serial.STOPBITS_ONE # 默认
if (data.get('type')=='sole'):
addr = int(data.get('current_addr'))
status = modscan.scan_modbus_devices(port, baudrate, addr, timeout, retries, send_interval, register_address, bytesize,
parity, stopbits, function_code)
print(data)
await websocket.send(json.dumps(status))
print(status)
elif(data.get('type')=='list'):
startaddr=int(data.get('startAddr'))
endaddr=int(data.get('endAddr'))
for addr in range(startaddr,endaddr+1):
status = modscan.scan_modbus_devices(port, baudrate, addr, timeout, retries, send_interval,
register_address, bytesize,
parity, stopbits, function_code)
print(data)
await websocket.send(json.dumps(status))
print(status)
time.sleep(send_interval)
except json.JSONDecodeError:
await websocket.send(json.dumps({
"status": "error",
"message": "无效的JSON格式"
}))
async def main():
"""主异步函数"""
print("启动 WebSocket 服务器...")
# 正确创建服务器实例
server = await websockets.serve(
handle_connection,
"localhost",
8765
)
print(f"WebSocket 服务器已启动: ws://localhost:8765")
print(f"监听地址: {server.sockets[0].getsockname()}")
# 永久运行
await server.wait_closed()
if __name__ == "__main__":
# 设置 Windows 事件循环策略
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
# 运行主程序
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n服务器已停止")