diff --git a/api-customer/app.py b/api-customer/app.py index ab2f8eea..8aa58e2f 100644 --- a/api-customer/app.py +++ b/api-customer/app.py @@ -1,7 +1,6 @@ import uuid from flask import Flask - from src.controllers.mqtt_forwarder import MQTTForwarder from src.services.mqtt_service import MQTTService @@ -9,18 +8,26 @@ client_id = "1" app = Flask(__name__) - @app.route('/') def hello_world(): return 'Hello World!' - if __name__ == '__main__': + local_broker = MQTTService( + "127.0.0.1", + 1883, + username="Pi-1", + password="hepl", + ) - local_broker = MQTTService("127.0.0.1", 1883) - api_broker = MQTTService("192.168.15.120", 8883) + api_broker = MQTTService( + "192.168.15.120", + 1883, + username="rpi", + password="hepl", + ) forwarder = MQTTForwarder(client_id, local_broker, api_broker) - forwarder.start(f"/customer/${client_id}/#", f"/board-mate/${client_id}/telemetry") + forwarder.start(f"/customer/{client_id}/#", f"/board-mate/{client_id}/telemetry") app.run(host="0.0.0.0", port=5000, debug=False) diff --git a/api-customer/src/controllers/mqtt_forwarder.py b/api-customer/src/controllers/mqtt_forwarder.py index 1ea04742..310231cb 100644 --- a/api-customer/src/controllers/mqtt_forwarder.py +++ b/api-customer/src/controllers/mqtt_forwarder.py @@ -15,32 +15,8 @@ class MQTTForwarder: self.local_broker = local_mqtt self.central_broker = central_mqtt - def start(self, src_topic : str, dst_topic : str) -> None: - def handler(msg:str): - self.__forward(msg, dst_topic) + def start(self, src_topic: str, dst_topic: str): + def forward_handler(topic: str, msg: str): + self.central_broker.publish(dst_topic, msg) - self.local_broker.subscribe(src_topic, handler) - - def __forward(self, msg: str, dst_topic) -> None: - self.central_broker.publish(self.client_id, dst_topic, msg) - - """def start(self): - self.local.subscribe("board-mate/+/telemetry", self.handle_message) - - def handle_message(self, topic: str, payload: str): - print(f"[FORWARD] {topic} -> central") - - message = json.loads(payload) - message["source"] = "client-server" - message["forwarded_at"] = int(time.time()) - - self.central.publish( - client_id="client-forwarder", - topic=f"clients/{self.extract_client_id(topic)}/telemetry", - data=json.dumps(message), - qos=1 - ) - - def extract_client_id(self, topic: str) -> str: - # ex: board-mate/pi-123/telemetry - return topic.split("/")[1]""" \ No newline at end of file + self.local_broker.subscribe(src_topic, forward_handler) \ No newline at end of file diff --git a/api-customer/src/services/mqtt_service.py b/api-customer/src/services/mqtt_service.py index 48f71825..959300ae 100644 --- a/api-customer/src/services/mqtt_service.py +++ b/api-customer/src/services/mqtt_service.py @@ -1,55 +1,76 @@ import json import time -from typing import Callable +from typing import Callable, Optional import paho.mqtt.client as mqtt -from paho.mqtt.client import Client - class MQTTService: - - client : Client - - def __init__(self, address: str, port: int): + def __init__(self, address: str, port: int, client_id: Optional[str] = None, + username: Optional[str] = None, password: Optional[str] = None, + ca_certs: Optional[str] = None): self.address = address self.port = port - self.client = mqtt.Client() + self.client = mqtt.Client(client_id=client_id) + if username and password: + self.client.username_pw_set(username, password) + if ca_certs: + self.client.tls_set(ca_certs=ca_certs) + else: + self.client.tls_set() + self.client.tls_insecure_set(True) + self._connected = False + self.client.on_connect = self._on_connect + self.client.on_disconnect = self._on_disconnect + self.client.on_message = None + self._subscriptions = {} - def publish(self, client_id: str, topic: str, data: str, qos: int = 0): + def _on_connect(self, client, userdata, flags, rc): + if rc == 0: + self._connected = True + # Resubscribe all topics + for topic in self._subscriptions: + self.client.subscribe(topic) + else: + print(f"MQTT connection failed with code {rc}") - try: - self.__connect() + def _on_disconnect(self, client, userdata, rc): + self._connected = False - payload = { - "timestamp": int(time.time()), - "data": data - } - result = self.client.publish(topic, json.dumps(payload), qos=qos) - result.wait_for_publish() - - except Exception as e: - print("MQTT error:", e) - self.__disconnect() - - def subscribe(self, topic: str, handler: Callable[[str], None]): - def on_message(client, userdata, msg): - handler(msg.payload.decode()) - - try: - self.__connect() - self.client.message_callback_add(topic, on_message) - self.client.subscribe(topic) - except Exception as e: - print("MQTT error:", e) - self.__disconnect() - - - def __connect(self): - if not self.client.is_connected(): + def connect(self): + if not self._connected: self.client.connect(self.address, self.port) self.client.loop_start() + # Wait for connection + timeout = 5 + start = time.time() + while not self._connected and time.time() - start < timeout: + time.sleep(0.1) + if not self._connected: + raise ConnectionError(f"Cannot connect to MQTT broker at {self.address}:{self.port}") - def __disconnect(self): - if self.client.is_connected(): + def disconnect(self): + if self._connected: self.client.disconnect() - self.client.loop_stop() \ No newline at end of file + self.client.loop_stop() + self._connected = False + + def publish(self, topic: str, data: str, qos: int = 0): + self.connect() + payload = { + "timestamp": int(time.time()), + "data": data + } + result = self.client.publish(topic, json.dumps(payload), qos=qos) + result.wait_for_publish() + + def subscribe(self, topic: str, handler: Callable[[str, str], None], qos: int = 0): + self.connect() + self._subscriptions[topic] = handler + + def on_message(client, userdata, msg): + for sub_topic, h in self._subscriptions.items(): + if mqtt.topic_matches_sub(sub_topic, msg.topic): + h(msg.topic, msg.payload.decode()) + + self.client.on_message = on_message + self.client.subscribe(topic, qos=qos) \ No newline at end of file