#!/usr/bin/env python3 from collections.abc import Callable from threading import Thread import threading from serial import Serial class RfidReader: __serial : Serial = None __thread : Thread | None = None __listeners : list[Callable] = None def __init__(self, port : str, baudrate: int): self.__serial = Serial(port, baudrate) self._run_event = threading.Event() self.__listeners = [] def start(self) -> None: self._run_event.set() if self.__thread is None or not self.__thread.is_alive(): self.__thread = Thread(target=self.__read, daemon=True) self.__thread.start() def stop(self) -> None: if self._run_event.is_set(): self._run_event.clear() def subscribe(self, listener : Callable) -> None: self.__listeners.append(listener) def __notify(self, uid : int): for listener in self.__listeners: listener(uid) def __read(self): while self._run_event.is_set(): data = b"" while len(data) < 14 and self._run_event.is_set(): character = self.__serial.read(1) if not character: continue data += character if len(data) >= 11: hex_value = data[5:11].decode(errors='ignore') try: res = int(hex_value, 16) self.__notify(res) except ValueError: print("Invalid hex:", hex_value) else: print("Received incomplete data:", data) if __name__ == "__main__": def callback(uid): print("UID:", uid) reader = RfidReader("/dev/serial0", 9600) reader.subscribe(callback) reader.start()