import asyncio import serial_asyncio class SerialProtocol(asyncio.Protocol): def __init__(self, message, loop): self.message = message self.loop = loop self.transport = None def connection_made(self, transport): self.transport = transport print("Połączenie nawiązane") self.transport.serial.rts = False self.transport.write(self.message) print("Wysłano bajty:", ' '.join(format(x, '02X') for x in self.message)) def data_received(self, data): print("Odebrano bajty:", ' '.join(format(x, '02X') for x in data)) self.transport.close() def connection_lost(self, exc): print("Połączenie zamknięte") self.loop.stop() async def main(): port = 'COM1' # Nazwa portu szeregowego baudrate = 9600 # Szybkość transmisji message = bytearray.fromhex('02 00 04 1E 03 00 25') loop = asyncio.get_running_loop() coro = serial_asyncio.create_serial_connection(loop, lambda: SerialProtocol(message, loop), port, baudrate=baudrate) await coro # Uruchomienie pętli asynchronicznej asyncio.run(main())