50 lines
1.3 KiB
Python
50 lines
1.3 KiB
Python
import threading
|
|
from threading import Thread
|
|
from typing import Callable
|
|
|
|
from serial import Serial
|
|
|
|
class SerialReader:
|
|
|
|
serial : Serial = None
|
|
__thread : Thread | None = None
|
|
__listeners : list[Callable] = None
|
|
|
|
def __init__(self, port, baudrate):
|
|
self.serial = Serial(port, baudrate)
|
|
self._run_event = threading.Event()
|
|
self.__listeners = []
|
|
|
|
def start(self) -> None:
|
|
self._run_event.set()
|
|
if self.__thread is None or not self.__thread.is_alive():
|
|
self.__thread = Thread(target=self.__read, daemon=True)
|
|
self.__thread.start()
|
|
|
|
def stop(self) -> None:
|
|
if self._run_event.is_set():
|
|
self._run_event.clear()
|
|
|
|
def subscribe(self, listener : Callable) -> None:
|
|
self.__listeners.append(listener)
|
|
|
|
def __notify(self, data : str):
|
|
for listener in self.__listeners:
|
|
listener(data)
|
|
|
|
def __read(self):
|
|
while self._run_event.is_set():
|
|
line = self.serial.readline()
|
|
if line:
|
|
data = line.decode('utf-8', errors='ignore')
|
|
self.__notify(data)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
def callback(data):
|
|
print(data)
|
|
|
|
reader = SerialReader("/dev/ttyUSB1", 57600)
|
|
reader.subscribe(callback)
|
|
reader.start() |