import threading from threading import Thread from typing import Callable from serial import Serial class SerialReader: serial : Serial = None __thread : Thread | None = None __listeners : list[Callable] = None def __init__(self, port, baudrate): self.serial = Serial(port, baudrate) self._run_event = threading.Event() self.__listeners = [] def start(self) -> None: self._run_event.set() if self.__thread is None or not self.__thread.is_alive(): self.__thread = Thread(target=self._read, daemon=True) self.__thread.start() def stop(self) -> None: if self._run_event.is_set(): self._run_event.clear() def subscribe(self, listener : Callable) -> None: self.__listeners.append(listener) def _notify(self, data : str): for listener in self.__listeners: listener(data) def _read(self): while self._run_event.is_set(): line = self.serial.readline() if line: data = line.decode('utf-8', errors='ignore') self._notify(data) if __name__ == "__main__": def callback(data): print(data) reader = SerialReader("/dev/ttyUSB1", 57600) reader.subscribe(callback) reader.start()