Brought some adjustment to mqtt service and forwarder

This commit is contained in:
2025-12-29 13:46:05 +01:00
parent 4110e6823b
commit 3d15e8e26a
3 changed files with 78 additions and 74 deletions

View File

@@ -1,7 +1,6 @@
import uuid import uuid
from flask import Flask from flask import Flask
from src.controllers.mqtt_forwarder import MQTTForwarder from src.controllers.mqtt_forwarder import MQTTForwarder
from src.services.mqtt_service import MQTTService from src.services.mqtt_service import MQTTService
@@ -9,18 +8,26 @@ client_id = "1"
app = Flask(__name__) app = Flask(__name__)
@app.route('/') @app.route('/')
def hello_world(): def hello_world():
return 'Hello World!' return 'Hello World!'
if __name__ == '__main__': if __name__ == '__main__':
local_broker = MQTTService(
"127.0.0.1",
1883,
username="Pi-1",
password="hepl",
)
local_broker = MQTTService("127.0.0.1", 1883) api_broker = MQTTService(
api_broker = MQTTService("192.168.15.120", 8883) "192.168.15.120",
1883,
username="rpi",
password="hepl",
)
forwarder = MQTTForwarder(client_id, local_broker, api_broker) forwarder = MQTTForwarder(client_id, local_broker, api_broker)
forwarder.start(f"/customer/${client_id}/#", f"/board-mate/${client_id}/telemetry") forwarder.start(f"/customer/{client_id}/#", f"/board-mate/{client_id}/telemetry")
app.run(host="0.0.0.0", port=5000, debug=False) app.run(host="0.0.0.0", port=5000, debug=False)

View File

@@ -15,32 +15,8 @@ class MQTTForwarder:
self.local_broker = local_mqtt self.local_broker = local_mqtt
self.central_broker = central_mqtt self.central_broker = central_mqtt
def start(self, src_topic : str, dst_topic : str) -> None: def start(self, src_topic: str, dst_topic: str):
def handler(msg:str): def forward_handler(topic: str, msg: str):
self.__forward(msg, dst_topic) self.central_broker.publish(dst_topic, msg)
self.local_broker.subscribe(src_topic, handler) self.local_broker.subscribe(src_topic, forward_handler)
def __forward(self, msg: str, dst_topic) -> None:
self.central_broker.publish(self.client_id, dst_topic, msg)
"""def start(self):
self.local.subscribe("board-mate/+/telemetry", self.handle_message)
def handle_message(self, topic: str, payload: str):
print(f"[FORWARD] {topic} -> central")
message = json.loads(payload)
message["source"] = "client-server"
message["forwarded_at"] = int(time.time())
self.central.publish(
client_id="client-forwarder",
topic=f"clients/{self.extract_client_id(topic)}/telemetry",
data=json.dumps(message),
qos=1
)
def extract_client_id(self, topic: str) -> str:
# ex: board-mate/pi-123/telemetry
return topic.split("/")[1]"""

View File

@@ -1,55 +1,76 @@
import json import json
import time import time
from typing import Callable from typing import Callable, Optional
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
from paho.mqtt.client import Client
class MQTTService: class MQTTService:
def __init__(self, address: str, port: int, client_id: Optional[str] = None,
client : Client username: Optional[str] = None, password: Optional[str] = None,
ca_certs: Optional[str] = None):
def __init__(self, address: str, port: int):
self.address = address self.address = address
self.port = port self.port = port
self.client = mqtt.Client() self.client = mqtt.Client(client_id=client_id)
if username and password:
self.client.username_pw_set(username, password)
if ca_certs:
self.client.tls_set(ca_certs=ca_certs)
else:
self.client.tls_set()
self.client.tls_insecure_set(True)
self._connected = False
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_message = None
self._subscriptions = {}
def publish(self, client_id: str, topic: str, data: str, qos: int = 0): def _on_connect(self, client, userdata, flags, rc):
if rc == 0:
self._connected = True
# Resubscribe all topics
for topic in self._subscriptions:
self.client.subscribe(topic)
else:
print(f"MQTT connection failed with code {rc}")
try: def _on_disconnect(self, client, userdata, rc):
self.__connect() self._connected = False
payload = { def connect(self):
"timestamp": int(time.time()), if not self._connected:
"data": data
}
result = self.client.publish(topic, json.dumps(payload), qos=qos)
result.wait_for_publish()
except Exception as e:
print("MQTT error:", e)
self.__disconnect()
def subscribe(self, topic: str, handler: Callable[[str], None]):
def on_message(client, userdata, msg):
handler(msg.payload.decode())
try:
self.__connect()
self.client.message_callback_add(topic, on_message)
self.client.subscribe(topic)
except Exception as e:
print("MQTT error:", e)
self.__disconnect()
def __connect(self):
if not self.client.is_connected():
self.client.connect(self.address, self.port) self.client.connect(self.address, self.port)
self.client.loop_start() self.client.loop_start()
# Wait for connection
timeout = 5
start = time.time()
while not self._connected and time.time() - start < timeout:
time.sleep(0.1)
if not self._connected:
raise ConnectionError(f"Cannot connect to MQTT broker at {self.address}:{self.port}")
def __disconnect(self): def disconnect(self):
if self.client.is_connected(): if self._connected:
self.client.disconnect() self.client.disconnect()
self.client.loop_stop() self.client.loop_stop()
self._connected = False
def publish(self, topic: str, data: str, qos: int = 0):
self.connect()
payload = {
"timestamp": int(time.time()),
"data": data
}
result = self.client.publish(topic, json.dumps(payload), qos=qos)
result.wait_for_publish()
def subscribe(self, topic: str, handler: Callable[[str, str], None], qos: int = 0):
self.connect()
self._subscriptions[topic] = handler
def on_message(client, userdata, msg):
for sub_topic, h in self._subscriptions.items():
if mqtt.topic_matches_sub(sub_topic, msg.topic):
h(msg.topic, msg.payload.decode())
self.client.on_message = on_message
self.client.subscribe(topic, qos=qos)