Files
board-mate/rpi/hardware/generic/serial_reader.py
2025-12-27 15:17:03 +01:00

52 lines
1.3 KiB
Python

import threading
from threading import Thread
from typing import Callable
from serial import Serial
class SerialReader:
serial : Serial = None
__thread : Thread | None = None
__listeners : list[Callable] = None
def __init__(self, port, baudrate):
self.serial = Serial(port, baudrate)
self._run_event = threading.Event()
self.__listeners = []
def start(self) -> None:
self._run_event.set()
if self.__thread is None or not self.__thread.is_alive():
self.__thread = Thread(target=self._read, daemon=True)
self.__thread.start()
def stop(self) -> None:
if self._run_event.is_set():
self._run_event.clear()
def subscribe(self, listener : Callable) -> None:
self.__listeners.append(listener)
def _notify(self, data : str):
for listener in self.__listeners:
listener(data)
def _read(self):
while self._run_event.is_set():
line = self.serial.readline()
if line:
print("Generic method")
data = line.decode('utf-8', errors='ignore')
self._notify(data)
if __name__ == "__main__":
def callback(data):
print(data)
reader = SerialReader("/dev/ttyUSB1", 57600)
reader.subscribe(callback)
reader.start()