Rework Mqtt Implementation

This commit is contained in:
2025-12-29 17:56:17 +01:00
parent b1bec24bc3
commit 6cdff81296
12 changed files with 231 additions and 59 deletions

View File

@@ -2,59 +2,110 @@ package be.naaturel.boardmateapi.services;
import be.naaturel.boardmateapi.common.exceptions.ServiceException;
import be.naaturel.boardmateapi.common.helpers.Logger;
import be.naaturel.boardmateapi.common.models.MqttMessageWrapper;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.function.Consumer;
@Service
public class MqttService {
private final MqttClient publisher;
private final MqttClient subscriber;
private Consumer<Throwable> onConnectionLost;
private Consumer<MqttMessageWrapper> onMessageReceived;
public MqttService(
@Qualifier("mqttPublisher") MqttClient publisher,
@Qualifier("mqttSubscriber") MqttClient subscriber
) {
) {
this.publisher = publisher;
this.subscriber = subscriber;
this.onConnectionLost = null;
this.onMessageReceived = null;
}
public void publish(String topic, String payload) throws ServiceException {
public void onConnectionLost(Consumer<Throwable> consumer){
this.onConnectionLost = consumer;
}
public void onMessageReceived(Consumer<MqttMessageWrapper> consumer){
this.onMessageReceived = consumer;
}
public void disconnect() {
try {
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(1);
if(!publisher.isConnected()){
publisher.connect();
}
publisher.publish(topic, message);
Logger.displayInfo("Published message: " + payload);
} catch (MqttException e) {
throw new ServiceException("Unable to publish message", e);
} finally {
try{
if (publisher.isConnected()) publisher.disconnect();
} catch (MqttException e){
Logger.displayError("Failed to disconnect MQTT client: " + e.getMessage());
}
disconnect(publisher);
disconnect(subscriber);
} catch (ServiceException e) {
Logger.displayError("Failed to disconnect MQTT client" + e.getMessage());
}
}
public void subscribe(String topic) throws ServiceException {
public void publish(String topic, String payload) {
try {
connect(publisher, null);
MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
message.setQos(1);
publisher.publish(topic, message);
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
public void subscribe(String topic) {
try {
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setCleanSession(false);
connect(subscriber, options);
if(!subscriber.isConnected()){
subscriber.connect(options);
}
subscriber.subscribe(topic, 1);
Logger.displayInfo("Subscribed to topic: " + topic);
} catch (MqttException e) {
throw new ServiceException("Unable to subscribe", e);
throw new RuntimeException(e);
}
}
public void registerCallback(){
this.subscriber.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
onConnectionLost.accept(cause);
}
@Override
public void messageArrived(String topic, MqttMessage message) {
MqttMessageWrapper msg = new MqttMessageWrapper(topic, message);
onMessageReceived.accept(msg);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// Not needed for subscriber
}
});
}
private void disconnect(MqttClient client) throws ServiceException {
if(client.isConnected()){
try {
client.disconnect();
} catch (MqttException e) {
throw new ServiceException("Failed to disconnect the broker", e);
}
}
}
private void connect(MqttClient client, MqttConnectOptions options) throws MqttException {
if (client.isConnected()) return;
if (options == null) {
client.connect();
} else {
client.connect(options);
}
}
}