Configured customer broker
This commit is contained in:
0
api-customer/src/services/__init__.py
Normal file
0
api-customer/src/services/__init__.py
Normal file
55
api-customer/src/services/mqtt_service.py
Normal file
55
api-customer/src/services/mqtt_service.py
Normal file
@@ -0,0 +1,55 @@
|
||||
import json
|
||||
import time
|
||||
from typing import Callable
|
||||
|
||||
import paho.mqtt.client as mqtt
|
||||
from paho.mqtt.client import Client
|
||||
|
||||
|
||||
class MQTTService:
|
||||
|
||||
client : Client
|
||||
|
||||
def __init__(self, address: str, port: int):
|
||||
self.address = address
|
||||
self.port = port
|
||||
self.client = mqtt.Client()
|
||||
|
||||
def publish(self, client_id: str, topic: str, data: str, qos: int = 0):
|
||||
|
||||
try:
|
||||
self.__connect()
|
||||
|
||||
payload = {
|
||||
"timestamp": int(time.time()),
|
||||
"data": data
|
||||
}
|
||||
result = self.client.publish(topic, json.dumps(payload), qos=qos)
|
||||
result.wait_for_publish()
|
||||
|
||||
except Exception as e:
|
||||
print("MQTT error:", e)
|
||||
self.__disconnect()
|
||||
|
||||
def subscribe(self, topic: str, handler: Callable[[str], None]):
|
||||
def on_message(client, userdata, msg):
|
||||
handler(msg.payload.decode())
|
||||
|
||||
try:
|
||||
self.__connect()
|
||||
self.client.message_callback_add(topic, on_message)
|
||||
self.client.subscribe(topic)
|
||||
except Exception as e:
|
||||
print("MQTT error:", e)
|
||||
self.__disconnect()
|
||||
|
||||
|
||||
def __connect(self):
|
||||
if not self.client.is_connected():
|
||||
self.client.connect(self.address, self.port)
|
||||
self.client.loop_start()
|
||||
|
||||
def __disconnect(self):
|
||||
if self.client.is_connected():
|
||||
self.client.disconnect()
|
||||
self.client.loop_stop()
|
||||
Reference in New Issue
Block a user