67 lines
1.5 KiB
Python
67 lines
1.5 KiB
Python
import time
|
|
import threading
|
|
from typing import Callable
|
|
|
|
import RPi.GPIO as GPIO
|
|
|
|
class SoundReader:
|
|
|
|
_pin :int
|
|
_running :bool
|
|
_thread :threading.Thread | None
|
|
_subscriber :Callable[[str], None]
|
|
|
|
def __init__(self, pin):
|
|
self._pin = pin
|
|
self._running = False
|
|
self._thread = None
|
|
|
|
GPIO.setmode(GPIO.BCM)
|
|
GPIO.setup(self._pin, GPIO.IN)
|
|
|
|
def start(self):
|
|
if not self._running:
|
|
self._running = True
|
|
self._thread = threading.Thread(target=self._read, daemon=True)
|
|
self._thread.start()
|
|
|
|
def stop(self):
|
|
self._running = False
|
|
if self._thread:
|
|
self._thread.join()
|
|
GPIO.cleanup()
|
|
|
|
def subscribe(self, handler:Callable[[str], None]):
|
|
self._subscriber = handler
|
|
|
|
def _notify(self, data : str):
|
|
if self._subscriber:
|
|
self._subscriber(data)
|
|
|
|
def _read(self):
|
|
measure_iteration = 100
|
|
level = 0
|
|
i = 0
|
|
while self._running:
|
|
if GPIO.input(self._pin):
|
|
level += 1
|
|
if i == measure_iteration:
|
|
self._notify(f'{{"soundSensorValue": {level}}}')
|
|
i = 0
|
|
level = 0
|
|
i += 1
|
|
time.sleep(0.02)
|
|
|
|
if __name__ == "__main__":
|
|
sensor = SoundReader(pin=17)
|
|
sensor.start()
|
|
|
|
try:
|
|
while True:
|
|
print(f"Approx. Sound Level: {sensor.get_level()}")
|
|
time.sleep(0.2)
|
|
except KeyboardInterrupt:
|
|
print("Exiting...")
|
|
finally:
|
|
sensor.stop()
|