from typing import Callable, Optional import paho.mqtt.client as mqtt import json import time import base64 class MQTTService: client : mqtt.Client def __init__(self, address: str, port: int, client_id: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, ca_certs: Optional[str] = None, insecure : bool = False ): self.address = address self.port = port self.client = mqtt.Client(client_id=client_id) if username and password: self.client.username_pw_set(username, password) if ca_certs: self.client.tls_set(ca_certs=ca_certs) self.client.tls_set() self.client.tls_insecure_set(insecure) self._connected = False self.client.on_connect = self._on_connect self.client.on_disconnect = self._on_disconnect self.client.on_message = None self._subscriptions = {} def _on_connect(self, client, userdata, flags, rc): if rc == 0: self._connected = True for topic in self._subscriptions: self.client.subscribe(topic) else: print(f"MQTT connection failed with code {rc}") def _on_disconnect(self, client, userdata, rc): self._connected = False def connect(self): if not self._connected: print(f"Connecting to {self.address}...") self.client.connect(self.address, self.port) self.client.loop_start() timeout = 5 start = time.time() while not self._connected and time.time() - start < timeout: time.sleep(0.1) if not self._connected: raise ConnectionError(f"Cannot connect to MQTT broker at {self.address}:{self.port}") print(f"Successfully connected to {self.address}") else : print(f"Already connected to {self.address}...") def disconnect(self): if self._connected: self.client.disconnect() self.client.loop_stop() self._connected = False def publish(self, topic: str, data: str, qos: int = 0): try: self.connect() payload = { "timestamp": int(time.time()), "data": data } result = self.client.publish(topic, json.dumps(payload), qos=qos) result.wait_for_publish() except Exception as e: print(f"An error occurred while publishing on {topic} on {self.address} : {e}") def subscribe(self, topic: str, handler: Callable[[str, str], None], qos: int = 0): try: self.connect() self._subscriptions[topic] = handler def on_message(client, userdata, msg): for sub_topic, h in self._subscriptions.items(): if mqtt.topic_matches_sub(sub_topic, msg.topic): h(msg.topic, msg.payload.decode()) self.client.on_message = on_message self.client.subscribe(topic, qos=qos) except Exception as e: print(f"An error occurred while trying to subscribe to {topic} on {self.address} : {e}") if __name__ == "__main__": service = MQTTService("127.0.0.1", 1883) service.publish("test", "test", "Hello from MQTT", 1)