Rework connect options

This commit is contained in:
2025-12-29 21:20:23 +01:00
parent 0f3d0c60b6
commit d7b853ef41
2 changed files with 32 additions and 50 deletions

View File

@@ -1,6 +1,5 @@
package be.naaturel.boardmateapi.configurations.configurations;
import be.naaturel.boardmateapi.common.helpers.Logger;
import be.naaturel.boardmateapi.configurations.properties.MqttProperies;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
@@ -17,19 +16,23 @@ public class MqttConfig {
this.properties = properties;
}
@Bean("mqttPublisher")
public MqttClient mqttPublisher() throws MqttException {
MqttClient client = new MqttClient(properties.getBrokerUrl(), properties.getClientId());
@Bean
public MqttClient mqttBroker() throws MqttException {
return new MqttClient(properties.getBrokerUrl(), properties.getClientId());
}
@Bean
public MqttClient mqttBrokerSecure() throws MqttException {
return new MqttClient(properties.getBrokerSecureUrl(), properties.getClientId());
}
@Bean
public MqttConnectOptions connectOptions(){
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(false);
options.setUserName(properties.getUsername());
options.setPassword(properties.getPassword().toCharArray());
return client;
return options;
}
@Bean("mqttSubscriber")
public MqttClient mqttSubscriber() throws MqttException {
String subscriberId = properties.getClientId() + "-sub";
return new MqttClient(properties.getBrokerUrl(), subscriberId);
}
}

View File

@@ -4,7 +4,6 @@ import be.naaturel.boardmateapi.common.exceptions.ServiceException;
import be.naaturel.boardmateapi.common.helpers.Logger;
import be.naaturel.boardmateapi.common.models.MqttMessageWrapper;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
@@ -13,17 +12,14 @@ import java.util.function.Consumer;
@Service
public class MqttService {
private final MqttClient publisher;
private final MqttClient subscriber;
private final MqttClient brokerClient;
private final MqttConnectOptions options;
private Consumer<Throwable> onConnectionLost;
private Consumer<MqttMessageWrapper> onMessageReceived;
public MqttService(
@Qualifier("mqttPublisher") MqttClient publisher,
@Qualifier("mqttSubscriber") MqttClient subscriber
) {
this.publisher = publisher;
this.subscriber = subscriber;
public MqttService(MqttClient mqttBroker, MqttConnectOptions options) {
this.brokerClient = mqttBroker;
this.options = options;
this.onConnectionLost = null;
this.onMessageReceived = null;
}
@@ -36,21 +32,12 @@ public class MqttService {
this.onMessageReceived = consumer;
}
public void disconnect() {
try {
disconnect(publisher);
disconnect(subscriber);
} catch (ServiceException e) {
Logger.displayError("Failed to disconnect MQTT client" + e.getMessage());
}
}
public void publish(String topic, String payload) {
try {
connect(publisher, null);
connect();
MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
message.setQos(1);
publisher.publish(topic, message);
brokerClient.publish(topic, message);
} catch (MqttException e) {
throw new RuntimeException(e);
}
@@ -58,20 +45,16 @@ public class MqttService {
public void subscribe(String topic) {
try {
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(false);
connect();
connect(subscriber, options);
subscriber.subscribe(topic, 1);
brokerClient.subscribe(topic, 1);
} catch (Exception e) {
Logger.displayError("An error occurred while subscribing : " + e.getMessage());
}
}
public void registerCallback(){
this.subscriber.setCallback(new MqttCallback() {
this.brokerClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
onConnectionLost.accept(cause);
@@ -90,25 +73,21 @@ public class MqttService {
});
}
private void disconnect(MqttClient client) throws ServiceException {
if(client.isConnected()){
public void disconnect() throws ServiceException {
if(this.brokerClient.isConnected()){
try {
client.disconnect();
this.brokerClient.disconnect();
} catch (MqttException e) {
throw new ServiceException("Failed to disconnect the broker", e);
}
}
}
private void connect(MqttClient client, MqttConnectOptions options) throws MqttException {
public void connect() throws MqttException {
try {
if (client.isConnected()) return;
if (options == null) {
client.connect();
} else {
client.connect(options);
}
Logger.displayInfo("Connected to " + client.getCurrentServerURI());
if (this.brokerClient.isConnected()) return;
this.brokerClient.connect(this.options);
Logger.displayInfo("Connected to " + this.brokerClient.getCurrentServerURI());
} catch (Exception e){
Logger.displayError("Unable to connect to broker : " + e.getMessage());
}