package be.naaturel.boardmateapi.services; import be.naaturel.boardmateapi.common.exceptions.ServiceException; import be.naaturel.boardmateapi.common.helpers.Logger; import org.eclipse.paho.client.mqttv3.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; @Service public class MqttService { private final MqttClient publisher; private final MqttClient subscriber; public MqttService( @Qualifier("mqttPublisher") MqttClient publisher, @Qualifier("mqttSubscriber") MqttClient subscriber ) { this.publisher = publisher; this.subscriber = subscriber; } public void publish(String topic, String payload) throws ServiceException { try { MqttMessage message = new MqttMessage(payload.getBytes()); message.setQos(1); if(!publisher.isConnected()){ publisher.connect(); } publisher.publish(topic, message); Logger.displayInfo("Published message: " + payload); } catch (MqttException e) { throw new ServiceException("Unable to publish message", e); } finally { try{ if (publisher.isConnected()) publisher.disconnect(); } catch (MqttException e){ Logger.displayError("Failed to disconnect MQTT client: " + e.getMessage()); } } } public void subscribe(String topic) throws ServiceException { try { MqttConnectOptions options = new MqttConnectOptions(); options.setAutomaticReconnect(true); options.setCleanSession(true); if(!subscriber.isConnected()){ subscriber.connect(options); } subscriber.subscribe(topic, 1); Logger.displayInfo("Subscribed to topic: " + topic); } catch (MqttException e) { throw new ServiceException("Unable to subscribe", e); } } }