From 6cdff81296f0f4534e78ac7891f755a1ccd7aaa9 Mon Sep 17 00:00:00 2001 From: Laurent Date: Mon, 29 Dec 2025 17:56:17 +0100 Subject: [PATCH] Rework Mqtt Implementation --- api/compose.yaml | 9 +- api/mongo-init/mongo-init.js | 2 + api/mosquitto/config/init-mosquitto.sh | 18 ++++ api/mosquitto/data/mosquitto.db | Bin 47 -> 47 bytes api/mosquitto/log/mosquitto.log | 46 ++++++++ api/src/main/java/MqttStarter.java | 45 ++++++++ .../common/models/MqttMessageWrapper.java | 24 +++++ .../configurations/MqttConfig.java | 21 +--- .../properties/MqttProperies.java | 10 +- .../controllers/BrokerController.java | 4 +- .../boardmateapi/services/MqttService.java | 101 +++++++++++++----- api/src/main/resources/application.properties | 10 +- 12 files changed, 231 insertions(+), 59 deletions(-) create mode 100644 api/mosquitto/config/init-mosquitto.sh create mode 100644 api/src/main/java/MqttStarter.java create mode 100644 api/src/main/java/be/naaturel/boardmateapi/common/models/MqttMessageWrapper.java diff --git a/api/compose.yaml b/api/compose.yaml index fdb279c7..8fe0b879 100644 --- a/api/compose.yaml +++ b/api/compose.yaml @@ -8,6 +8,10 @@ services: - "8000:8080" - "5005:5005" environment: + BROKER_URL: "tcp://api-broker:1883" + BROKER_SECURE_URL: "tcp://api-broker:8883" + BROKER_USERNAME: "board-mate-api" + BROKER_PASSWORD: "hepl" JWT_SECRET: "enY3OWU4djFyMTByNTZhcG9uY3Z0djQ5cnY0eDhhNWM0bjg5OTRjNDhidA==" SSL_KEYSTORE_PATH: "/certs/keystore.p12" SPRING_DATA_MONGODB_URI: "mongodb://board-mate-user:apx820kcng@mongodb:27017/board-mate-db" @@ -50,7 +54,7 @@ services: mongodb: image: mongo:latest - container_name: mongo-db + container_name: api-database environment: - MONGO_INITDB_DATABASE=board-mate-db - MONGO_INITDB_ROOT_PASSWORD=secret @@ -63,10 +67,11 @@ services: mosquitto: image: eclipse-mosquitto:latest - container_name: mosquitto + container_name: api-broker ports: - "1883:1883" - "8883:8883" + command: ["sh", "/mosquitto/config/init-mosquitto.sh"] volumes: - ./mosquitto/config:/mosquitto/config - ./mosquitto/data:/mosquitto/data diff --git a/api/mongo-init/mongo-init.js b/api/mongo-init/mongo-init.js index 0d45ce49..63d99613 100644 --- a/api/mongo-init/mongo-init.js +++ b/api/mongo-init/mongo-init.js @@ -1,5 +1,7 @@ db = db.getSiblingDB("board-mate-db"); +db.createCollection("clients"); +db.createCollection("telemetry"); db.createCollection("games"); db.createUser({ diff --git a/api/mosquitto/config/init-mosquitto.sh b/api/mosquitto/config/init-mosquitto.sh new file mode 100644 index 00000000..016f692d --- /dev/null +++ b/api/mosquitto/config/init-mosquitto.sh @@ -0,0 +1,18 @@ +#!/bin/sh + +PASSWORD_FILE=/mosquitto/config/passwords + +echo "Creating password file with pre-registered users..." +if [ ! -f "$PASSWORD_FILE" ]; then + touch "$PASSWORD_FILE" + chmod 600 "$PASSWORD_FILE" + + # Add pre-registered users + mosquitto_passwd -b "$PASSWORD_FILE" board-mate-api hepl + echo "Password file created !" +else + echo "Password file exists, skipping this step" +fi + +echo "Starting mosquitto..." +exec mosquitto -c /mosquitto/config/mosquitto.conf -v diff --git a/api/mosquitto/data/mosquitto.db b/api/mosquitto/data/mosquitto.db index 3e65b6bb24831363a1b44cd7281662aab15b9b5c..a5d66eda3c5f972bf06ad1111559a72d11904eb8 100644 GIT binary patch delta 21 UcmdPbpCB)=QiK5v7&#y`03n6} { + Logger.displayError("Connection lost: " + cause.getMessage()); + }); + + service.onMessageReceived((msg) -> { + Logger.displayInfo("Received message on topic " + msg.getTopic() + ": " + msg.getContent()); + }); + + service.registerCallback(); + } +} diff --git a/api/src/main/java/be/naaturel/boardmateapi/common/models/MqttMessageWrapper.java b/api/src/main/java/be/naaturel/boardmateapi/common/models/MqttMessageWrapper.java new file mode 100644 index 00000000..b71e55ee --- /dev/null +++ b/api/src/main/java/be/naaturel/boardmateapi/common/models/MqttMessageWrapper.java @@ -0,0 +1,24 @@ +package be.naaturel.boardmateapi.common.models; + +import org.eclipse.paho.client.mqttv3.MqttMessage; + +import java.nio.charset.StandardCharsets; + +public class MqttMessageWrapper { + + private final String topic; + private final MqttMessage message; + + public MqttMessageWrapper(String topic, MqttMessage message){ + this.topic = topic; + this.message = message; + } + + public String getContent() { + return new String(message.getPayload(), StandardCharsets.UTF_8); + } + + public String getTopic() { + return topic; + } +} diff --git a/api/src/main/java/be/naaturel/boardmateapi/configurations/configurations/MqttConfig.java b/api/src/main/java/be/naaturel/boardmateapi/configurations/configurations/MqttConfig.java index 6a9f39b8..b300733f 100644 --- a/api/src/main/java/be/naaturel/boardmateapi/configurations/configurations/MqttConfig.java +++ b/api/src/main/java/be/naaturel/boardmateapi/configurations/configurations/MqttConfig.java @@ -29,26 +29,7 @@ public class MqttConfig { @Bean("mqttSubscriber") public MqttClient mqttSubscriber() throws MqttException { String subscriberId = properties.getClientId() + "-sub"; - MqttClient client = new MqttClient(properties.getBrokerUrl(), subscriberId); - - client.setCallback(new MqttCallback() { - @Override - public void connectionLost(Throwable cause) { - Logger.displayError("Connection lost: " + cause.getMessage()); - } - - @Override - public void messageArrived(String topic, MqttMessage message) { - Logger.displayInfo("Received message on topic " + topic + ": " + message.toString()); - } - - @Override - public void deliveryComplete(IMqttDeliveryToken token) { - // Not needed for subscriber - } - }); - - return client; + return new MqttClient(properties.getBrokerUrl(), subscriberId); } } diff --git a/api/src/main/java/be/naaturel/boardmateapi/configurations/properties/MqttProperies.java b/api/src/main/java/be/naaturel/boardmateapi/configurations/properties/MqttProperies.java index 1fae37b2..d6a1b334 100644 --- a/api/src/main/java/be/naaturel/boardmateapi/configurations/properties/MqttProperies.java +++ b/api/src/main/java/be/naaturel/boardmateapi/configurations/properties/MqttProperies.java @@ -6,9 +6,12 @@ import org.springframework.context.annotation.Configuration; @Configuration public class MqttProperies { - @Value("${mqtt.broker-url}") + @Value("${mqtt.plain.broker-url}") private String brokerUrl; + @Value("${mqtt.ssl.broker-url}") + private String brokerSecureUrl; + @Value("${mqtt.client-id}") private String clientId; @@ -18,12 +21,9 @@ public class MqttProperies { @Value("${mqtt.password}") private String password; - @Value("${mqtt.topic}") - private String topic; - public String getBrokerUrl() { return brokerUrl; } + public String getBrokerSecureUrl() { return brokerSecureUrl; } public String getClientId() { return clientId; } public String getUsername() { return username; } public String getPassword() { return password; } - public String getTopic() { return topic; } } \ No newline at end of file diff --git a/api/src/main/java/be/naaturel/boardmateapi/controllers/BrokerController.java b/api/src/main/java/be/naaturel/boardmateapi/controllers/BrokerController.java index 53578b67..a7f87162 100644 --- a/api/src/main/java/be/naaturel/boardmateapi/controllers/BrokerController.java +++ b/api/src/main/java/be/naaturel/boardmateapi/controllers/BrokerController.java @@ -21,7 +21,7 @@ public class BrokerController { this.service = service; } - @PostMapping("/publish/{topic}") + /*@PostMapping("/publish/{topic}") public ResponseEntity> publish(@PathVariable String topic, @RequestBody String message){ ResponseBody body = ResponseBody.createEmpty(); try { @@ -38,6 +38,6 @@ public class BrokerController { .status(HttpStatus.INTERNAL_SERVER_ERROR) .body(body); } - } + }*/ } diff --git a/api/src/main/java/be/naaturel/boardmateapi/services/MqttService.java b/api/src/main/java/be/naaturel/boardmateapi/services/MqttService.java index 60239963..b0e42d8e 100644 --- a/api/src/main/java/be/naaturel/boardmateapi/services/MqttService.java +++ b/api/src/main/java/be/naaturel/boardmateapi/services/MqttService.java @@ -2,59 +2,110 @@ package be.naaturel.boardmateapi.services; import be.naaturel.boardmateapi.common.exceptions.ServiceException; import be.naaturel.boardmateapi.common.helpers.Logger; +import be.naaturel.boardmateapi.common.models.MqttMessageWrapper; import org.eclipse.paho.client.mqttv3.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; +import java.nio.charset.StandardCharsets; +import java.util.function.Consumer; + @Service public class MqttService { private final MqttClient publisher; private final MqttClient subscriber; + private Consumer onConnectionLost; + private Consumer onMessageReceived; public MqttService( @Qualifier("mqttPublisher") MqttClient publisher, @Qualifier("mqttSubscriber") MqttClient subscriber - ) { + ) { this.publisher = publisher; this.subscriber = subscriber; + this.onConnectionLost = null; + this.onMessageReceived = null; } - public void publish(String topic, String payload) throws ServiceException { + public void onConnectionLost(Consumer consumer){ + this.onConnectionLost = consumer; + } + + public void onMessageReceived(Consumer consumer){ + this.onMessageReceived = consumer; + } + + public void disconnect() { try { - MqttMessage message = new MqttMessage(payload.getBytes()); - message.setQos(1); - if(!publisher.isConnected()){ - publisher.connect(); - } - publisher.publish(topic, message); - Logger.displayInfo("Published message: " + payload); - } catch (MqttException e) { - throw new ServiceException("Unable to publish message", e); - } finally { - try{ - if (publisher.isConnected()) publisher.disconnect(); - } catch (MqttException e){ - Logger.displayError("Failed to disconnect MQTT client: " + e.getMessage()); - } + disconnect(publisher); + disconnect(subscriber); + } catch (ServiceException e) { + Logger.displayError("Failed to disconnect MQTT client" + e.getMessage()); } } - public void subscribe(String topic) throws ServiceException { + public void publish(String topic, String payload) { + try { + connect(publisher, null); + MqttMessage message = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8)); + message.setQos(1); + publisher.publish(topic, message); + } catch (MqttException e) { + throw new RuntimeException(e); + } + } + + public void subscribe(String topic) { try { MqttConnectOptions options = new MqttConnectOptions(); options.setAutomaticReconnect(true); - options.setCleanSession(true); + options.setCleanSession(false); + + connect(subscriber, options); - if(!subscriber.isConnected()){ - subscriber.connect(options); - } subscriber.subscribe(topic, 1); - Logger.displayInfo("Subscribed to topic: " + topic); - } catch (MqttException e) { - throw new ServiceException("Unable to subscribe", e); + throw new RuntimeException(e); } } + public void registerCallback(){ + this.subscriber.setCallback(new MqttCallback() { + @Override + public void connectionLost(Throwable cause) { + onConnectionLost.accept(cause); + } + + @Override + public void messageArrived(String topic, MqttMessage message) { + MqttMessageWrapper msg = new MqttMessageWrapper(topic, message); + onMessageReceived.accept(msg); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + // Not needed for subscriber + } + }); + } + + private void disconnect(MqttClient client) throws ServiceException { + if(client.isConnected()){ + try { + client.disconnect(); + } catch (MqttException e) { + throw new ServiceException("Failed to disconnect the broker", e); + } + } + } + + private void connect(MqttClient client, MqttConnectOptions options) throws MqttException { + if (client.isConnected()) return; + if (options == null) { + client.connect(); + } else { + client.connect(options); + } + } } diff --git a/api/src/main/resources/application.properties b/api/src/main/resources/application.properties index f74f8c0c..8d339cff 100644 --- a/api/src/main/resources/application.properties +++ b/api/src/main/resources/application.properties @@ -19,13 +19,13 @@ server.ssl.key-store-type=PKCS12 server.ssl.key-alias=board-mate-api #=============MQTT============= -mqtt.broker-url=tcp://test.mosquitto.org:1883 -mqtt.client-id=board-mate-client +mqtt.plain.broker-url=${BROKER_URL} +mqtt.ssl.broker-url=${BROKER_SECURE_URL} +mqtt.client-id=board-mate-api -mqtt.topic=board-mate-test/topic +mqtt.username=${BROKER_USERNAME} +mqtt.password=${BROKER_PASSWORD} -mqtt.username=yourUsername -mqtt.password=yourPassword #=============METRICS============= management.endpoint.health.show-details=always management.endpoints.web.exposure.include=*