import asyncio
import serial_asyncio
class SerialProtocol(asyncio.Protocol):
def __init__(self, message, loop):
self.message = message
self.loop = loop
self.transport = None
def connection_made(self, transport):
self.transport = transport
print("Połączenie nawiązane")
self.transport.serial.rts = False
self.transport.write(self.message)
print("Wysłano bajty:", ' '.join(format(x, '02X') for x in self.message))
def data_received(self, data):
print("Odebrano bajty:", ' '.join(format(x, '02X') for x in data))
self.transport.close()
def connection_lost(self, exc):
print("Połączenie zamknięte")
self.loop.stop()
async def main():
port = 'COM1' # Nazwa portu szeregowego
baudrate = 9600 # Szybkość transmisji
message = bytearray.fromhex('02 00 04 1E 03 00 25')
loop = asyncio.get_running_loop()
coro = serial_asyncio.create_serial_connection(loop, lambda: SerialProtocol(message, loop), port, baudrate=baudrate)
await coro
# Uruchomienie pętli asynchronicznej
asyncio.run(main())
Paste Hosted With By Wklejamy.pl