diff --git a/api/src/main/java/be/naaturel/boardmateapi/configurations/configurations/MqttConfig.java b/api/src/main/java/be/naaturel/boardmateapi/configurations/configurations/MqttConfig.java index b300733f..b8c56a52 100644 --- a/api/src/main/java/be/naaturel/boardmateapi/configurations/configurations/MqttConfig.java +++ b/api/src/main/java/be/naaturel/boardmateapi/configurations/configurations/MqttConfig.java @@ -1,6 +1,5 @@ package be.naaturel.boardmateapi.configurations.configurations; -import be.naaturel.boardmateapi.common.helpers.Logger; import be.naaturel.boardmateapi.configurations.properties.MqttProperies; import org.eclipse.paho.client.mqttv3.*; import org.springframework.beans.factory.annotation.Autowired; @@ -17,19 +16,23 @@ public class MqttConfig { this.properties = properties; } - @Bean("mqttPublisher") - public MqttClient mqttPublisher() throws MqttException { - MqttClient client = new MqttClient(properties.getBrokerUrl(), properties.getClientId()); + @Bean + public MqttClient mqttBroker() throws MqttException { + return new MqttClient(properties.getBrokerUrl(), properties.getClientId()); + } + + @Bean + public MqttClient mqttBrokerSecure() throws MqttException { + return new MqttClient(properties.getBrokerSecureUrl(), properties.getClientId()); + } + + @Bean + public MqttConnectOptions connectOptions(){ MqttConnectOptions options = new MqttConnectOptions(); + options.setAutomaticReconnect(true); + options.setCleanSession(false); options.setUserName(properties.getUsername()); options.setPassword(properties.getPassword().toCharArray()); - return client; + return options; } - - @Bean("mqttSubscriber") - public MqttClient mqttSubscriber() throws MqttException { - String subscriberId = properties.getClientId() + "-sub"; - return new MqttClient(properties.getBrokerUrl(), subscriberId); - } - } diff --git a/api/src/main/java/be/naaturel/boardmateapi/services/MqttService.java b/api/src/main/java/be/naaturel/boardmateapi/services/MqttService.java index 793449bc..a4421c1f 100644 --- a/api/src/main/java/be/naaturel/boardmateapi/services/MqttService.java +++ b/api/src/main/java/be/naaturel/boardmateapi/services/MqttService.java @@ -4,7 +4,6 @@ import be.naaturel.boardmateapi.common.exceptions.ServiceException; import be.naaturel.boardmateapi.common.helpers.Logger; import be.naaturel.boardmateapi.common.models.MqttMessageWrapper; import org.eclipse.paho.client.mqttv3.*; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import java.nio.charset.StandardCharsets; @@ -13,17 +12,14 @@ import java.util.function.Consumer; @Service public class MqttService { - private final MqttClient publisher; - private final MqttClient subscriber; + private final MqttClient brokerClient; + private final MqttConnectOptions options; private Consumer onConnectionLost; private Consumer onMessageReceived; - public MqttService( - @Qualifier("mqttPublisher") MqttClient publisher, - @Qualifier("mqttSubscriber") MqttClient subscriber - ) { - this.publisher = publisher; - this.subscriber = subscriber; + public MqttService(MqttClient mqttBroker, MqttConnectOptions options) { + this.brokerClient = mqttBroker; + this.options = options; this.onConnectionLost = null; this.onMessageReceived = null; } @@ -36,21 +32,12 @@ public class MqttService { this.onMessageReceived = consumer; } - public void disconnect() { - try { - disconnect(publisher); - disconnect(subscriber); - } catch (ServiceException e) { - Logger.displayError("Failed to disconnect MQTT client" + e.getMessage()); - } - } - public void publish(String topic, String payload) { try { - connect(publisher, null); + connect(); MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8)); message.setQos(1); - publisher.publish(topic, message); + brokerClient.publish(topic, message); } catch (MqttException e) { throw new RuntimeException(e); } @@ -58,20 +45,16 @@ public class MqttService { public void subscribe(String topic) { try { - MqttConnectOptions options = new MqttConnectOptions(); - options.setAutomaticReconnect(true); - options.setCleanSession(false); + connect(); - connect(subscriber, options); - - subscriber.subscribe(topic, 1); + brokerClient.subscribe(topic, 1); } catch (Exception e) { Logger.displayError("An error occurred while subscribing : " + e.getMessage()); } } public void registerCallback(){ - this.subscriber.setCallback(new MqttCallback() { + this.brokerClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { onConnectionLost.accept(cause); @@ -90,25 +73,21 @@ public class MqttService { }); } - private void disconnect(MqttClient client) throws ServiceException { - if(client.isConnected()){ + public void disconnect() throws ServiceException { + if(this.brokerClient.isConnected()){ try { - client.disconnect(); + this.brokerClient.disconnect(); } catch (MqttException e) { throw new ServiceException("Failed to disconnect the broker", e); } } } - private void connect(MqttClient client, MqttConnectOptions options) throws MqttException { + public void connect() throws MqttException { try { - if (client.isConnected()) return; - if (options == null) { - client.connect(); - } else { - client.connect(options); - } - Logger.displayInfo("Connected to " + client.getCurrentServerURI()); + if (this.brokerClient.isConnected()) return; + this.brokerClient.connect(this.options); + Logger.displayInfo("Connected to " + this.brokerClient.getCurrentServerURI()); } catch (Exception e){ Logger.displayError("Unable to connect to broker : " + e.getMessage()); }