diff --git a/rpi/hardware/sound/__init__.py b/rpi/hardware/sound/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/rpi/hardware/sound/sound.py b/rpi/hardware/sound/sound.py new file mode 100644 index 00000000..8bb5e663 --- /dev/null +++ b/rpi/hardware/sound/sound.py @@ -0,0 +1,68 @@ +import time +import threading +from typing import Callable + +import RPi.GPIO as GPIO + +class SoundReader: + + _pin :int + _running :bool + _thread :threading.Thread | None + _subscriber :Callable[[int], None] + + def __init__(self, pin): + self._pin = pin + self._sound_level = 0 + self._running = False + self._thread = None + + GPIO.setmode(GPIO.BCM) + GPIO.setup(self._pin, GPIO.IN) + + def start(self): + if not self._running: + self._running = True + self._thread = threading.Thread(target=self._read, daemon=True) + self._thread.start() + + def stop(self): + self._running = False + if self._thread: + self._thread.join() + GPIO.cleanup() + + def subscribe(self, handler:Callable[[int], None]): + self._subscriber = handler + + def get_level(self): + return self._sound_level + + def _notify(self): + if self._subscriber: + self._subscriber(self._sound_level) + + def _read(self): + measure_duration = 0.2 + interval = 2 + while self._running: + level = 0 + start = time.time() + while time.time() - start < measure_duration: + if GPIO.input(self._pin): + level += 1 + self._sound_level = level + time.sleep(interval - measure_duration) + +if __name__ == "__main__": + sensor = SoundReader(pin=17) + sensor.start() + + try: + while True: + print(f"Approx. Sound Level: {sensor.get_level()}") + time.sleep(0.2) + except KeyboardInterrupt: + print("Exiting...") + finally: + sensor.stop() diff --git a/rpi/main.py b/rpi/main.py index 52383813..bad0bdb9 100644 --- a/rpi/main.py +++ b/rpi/main.py @@ -9,6 +9,7 @@ from flask import Flask, jsonify, request from controllers.GameController import GameController from hardware.light.lora_light_sensor_reader import LoraLightSensorReader from hardware.rfid.reader import RfidReader +from hardware.sound.sound import SoundReader from services.clock_service import ClockService from services.detection_service import DetectionService from services.forwarder_service import ForwarderService @@ -37,6 +38,7 @@ if __name__ == "__main__": try : rfid_reader = RfidReader("/dev/serial0", 9600) light_sensor_reader = LoraLightSensorReader("/dev/ttyUSB1", 9600) + sound_reader = SoundReader(17) local_broker = MQTTService( local_broker_address, @@ -59,15 +61,18 @@ if __name__ == "__main__": forwarder_service.register_forwarder(client_id, "/system/sensor/rfid", f"/customer/telemetry/rfid") forwarder_service.register_forwarder(client_id, "/system/sensor/light", f"/customer/telemetry/light") + forwarder_service.register_forwarder(client_id, "/system/sensor/sound", f"/customer/telemetry/sound") forwarder_service.register_forwarder(client_id, "/system/sensor/gps", f"/customer/telemetry/gps") - rfid_reader.subscribe(lambda uid: local_broker.publish("/system/sensor/rfid", str(uid), 1)) - light_sensor_reader.subscribe(lambda light_value: local_broker.publish("/system/sensor/light", str(light_value), 0)) + rfid_reader.subscribe(lambda uid: local_broker.publish("/system/sensor/rfid", str(uid), 2)) + light_sensor_reader.subscribe(lambda light_value: local_broker.publish("/system/sensor/light", str(light_value), 1)) + sound_reader.subscribe(lambda light_value: local_broker.publish("/system/sensor/sound", str(light_value), 1)) forwarder_service.start_all() rfid_reader.start() light_sensor_reader.start() + sound_reader.start() print("App started...") app.run(host="0.0.0.0", port=5000, debug=False)