import time import threading from typing import Callable import RPi.GPIO as GPIO class SoundReader: _pin :int _running :bool _thread :threading.Thread | None _subscriber :Callable[[str], None] def __init__(self, pin): self._pin = pin self._running = False self._thread = None GPIO.setmode(GPIO.BCM) GPIO.setup(self._pin, GPIO.IN) def start(self): if not self._running: self._running = True self._thread = threading.Thread(target=self._read, daemon=True) self._thread.start() def stop(self): self._running = False if self._thread: self._thread.join() GPIO.cleanup() def subscribe(self, handler:Callable[[str], None]): self._subscriber = handler def _notify(self, data : str): if self._subscriber: self._subscriber(data) def _read(self): measure_iteration = 100 level = 0 i = 0 while self._running: if GPIO.input(self._pin): level += 1 if i == measure_iteration: self._notify(f'{{"soundSensorValue": {level}}}') i = 0 level = 0 i += 1 time.sleep(0.02) if __name__ == "__main__": sensor = SoundReader(pin=17) sensor.start() try: while True: print(f"Approx. Sound Level: {sensor.get_level()}") time.sleep(0.2) except KeyboardInterrupt: print("Exiting...") finally: sensor.stop()