From 4963db420595da8bb6bfab7ed34bb3a6fd5e5671 Mon Sep 17 00:00:00 2001 From: Laurent Date: Sun, 4 Jan 2026 12:11:30 +0100 Subject: [PATCH] Add telemetry database insertion --- .../board_mate/client/api/client_api.py | 2 +- api-customer/app.py | 81 +++---------------- api-customer/docker-compose.yml | 3 +- api-customer/mongo-init/mongo-init.js | 6 +- api-customer/requirements.txt | 13 +-- .../src/controllers/message_controller.py | 35 +++++++- .../src/controllers/mqtt_forwarder.py | 61 +++++++++++--- api-customer/src/services/mongo_service.py | 44 ++++++++++ api-customer/src/services/mqtt_service.py | 23 +++--- rpi/hardware/generic/serial_reader.py | 16 ++-- 10 files changed, 173 insertions(+), 111 deletions(-) create mode 100644 api-customer/src/services/mongo_service.py diff --git a/api-customer/api-resources/client-components/board_mate/client/api/client_api.py b/api-customer/api-resources/client-components/board_mate/client/api/client_api.py index 3074b62f..83fe531c 100644 --- a/api-customer/api-resources/client-components/board_mate/client/api/client_api.py +++ b/api-customer/api-resources/client-components/board_mate/client/api/client_api.py @@ -48,7 +48,7 @@ class ClientApi: This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True - >>> thread = api.create(client_dto, async_req=True) + >>> thread = api.create_forwarder(client_dto, async_req=True) >>> result = thread.get() :param client_dto: (required) diff --git a/api-customer/app.py b/api-customer/app.py index 8dcdd029..f71c556a 100644 --- a/api-customer/app.py +++ b/api-customer/app.py @@ -1,22 +1,23 @@ import json import os -import threading - -import requests from dotenv import load_dotenv from flask import Flask from src.controllers.AuthController import AuthController from src.controllers.ClientController import ClientController from src.controllers.message_controller import MessageController -from src.controllers.mqtt_forwarder import MQTTForwarder +from src.controllers.mqtt_forwarder import create_forwarder from src.models.AuthData import AuthData +from src.services.mongo_service import MongoService from src.services.mqtt_service import MQTTService load_dotenv() app = Flask(__name__) +database_uri = os.getenv("MONGO_URI", "mongodb://localhost:27017") +database_name = os.getenv("DATABASE_NAME", "default") + local_broker_address = os.environ.get("LOCAL_BROKER_ADDRESS", "127.0.0.1") local_broker_port = int(os.environ.get("LOCAL_BROKER_PORT", 1883)) @@ -25,78 +26,22 @@ api_broker_port = int(os.environ.get("API_BROKER_PORT", 1883)) auth_data = AuthData() +database_service = MongoService(database_uri, database_name) + auth_controller = AuthController(app, auth_data, "https://192.168.15.120:8000") client_controller = ClientController(app, auth_data, "https://192.168.15.120:8000") -message_controller = MessageController(app, auth_data, "https://192.168.15.120:8000") +message_controller = MessageController(app, auth_data, "https://192.168.15.120:8000", database_service) +def handle_login(data): + local_broker, api_broker, forwarder = create_forwarder(data, local_broker_address, local_broker_port, api_broker_address, api_broker_port) -def handle_message_received(topic: str, payload: str): - try: - print("=== MQTT MESSAGE RECEIVED ===", flush=True) - print("Raw payload:", payload, flush=True) - print("Payload type:", type(payload), flush=True) - - data = json.loads(payload) - print("Parsed payload:", data, flush=True) - - url = "https://192.168.15.125:1880/message/receive" - - response = requests.post( - url, - json=data, - verify=False, - timeout=5 - ) - - print("=== NODE-RED RESPONSE ===", flush=True) - print("Status code:", response.status_code, flush=True) - print("Headers:", response.headers, flush=True) - print("Raw response:", repr(response.text), flush=True) - - if response.text.strip(): - content_type = response.headers.get("Content-Type", "") - if content_type.startswith("application/json"): - print("Response JSON:", response.json(), flush=True) - else: - print("Response is not JSON", flush=True) - else: - print("Node-RED returned an empty response body", flush=True) - - except json.JSONDecodeError as e: - print("Incoming payload is NOT valid JSON:", e, flush=True) - - except requests.RequestException as e: - print("HTTP request to Node-RED failed:", e, flush=True) - - except Exception as e: - print("Unexpected error:", e, flush=True) - -def start_mqtt(data : AuthData): client_id = data.get_client_id() + api_broker.subscribe(f"/chat/{client_id}/message", message_controller.handle_message_received) + forwarder.start(f"/customer/telemetry/#", f"/board-mate/{client_id}/telemetry", api_broker.publish) - local_broker = MQTTService( - local_broker_address, - local_broker_port, - client_id=client_id, - username="main", - password="hepl", - ) - - api_broker = MQTTService( - api_broker_address, - api_broker_port, - client_id=client_id, - username="customer", - password="hepl", - ) - - api_broker.subscribe(f"/chat/{client_id}/message", handle_message_received) - - forwarder = MQTTForwarder(client_id, local_broker, api_broker) - forwarder.start(f"/customer/telemetry/#", f"/board-mate/{client_id}/telemetry") if __name__ == '__main__': - auth_controller.set_on_login(start_mqtt) + auth_controller.set_on_login(handle_login) app.run(host="0.0.0.0", port=5000, debug=True) diff --git a/api-customer/docker-compose.yml b/api-customer/docker-compose.yml index 35eb17e4..5d439996 100644 --- a/api-customer/docker-compose.yml +++ b/api-customer/docker-compose.yml @@ -10,7 +10,8 @@ services: - mongo - mosquitto environment: - - MONGO_URI=mongodb://mongo:27017/mydb + - MONGO_URI=mongodb://user:psk358xpg@customer-database:27017 + - DATABASE_NAME=customer-db - LOCAL_BROKER_USERNAME=main - LOCAL_BROKER_PASSWORD=hepl - LOCAL_BROKER_ADDRESS=customer-broker diff --git a/api-customer/mongo-init/mongo-init.js b/api-customer/mongo-init/mongo-init.js index 9e5341da..893262e0 100644 --- a/api-customer/mongo-init/mongo-init.js +++ b/api-customer/mongo-init/mongo-init.js @@ -1,8 +1,12 @@ db = db.getSiblingDB("customer-db"); -db.createCollection("systems"); +db.createCollection("messages"); +db.createCollection("clients"); +db.createCollection("rooms"); +db.createCollection("telemetry"); db.createCollection("games"); + db.createUser({ user: "user", pwd: "psk358xpg", diff --git a/api-customer/requirements.txt b/api-customer/requirements.txt index d856f394..f07649ea 100644 --- a/api-customer/requirements.txt +++ b/api-customer/requirements.txt @@ -1,15 +1,16 @@ +aenum flask +requests paho-mqtt python_dateutil -setuptools -urllib3 +python-dotenv pydantic -typing-extensions -requests +pymongo python-dotenv pyyaml -aenum -python-dotenv +setuptools +typing-extensions +urllib3 file:./api-resources/auth-components file:./api-resources/client-components diff --git a/api-customer/src/controllers/message_controller.py b/api-customer/src/controllers/message_controller.py index 1312c955..c59bec4e 100644 --- a/api-customer/src/controllers/message_controller.py +++ b/api-customer/src/controllers/message_controller.py @@ -1,8 +1,10 @@ +import requests from board_mate.message import Configuration, ApiClient, MessageDto, MessageApi, MessagePostRequestDto from flask import jsonify, request from pydantic import StrictStr from src.models.AuthData import AuthData +from src.services.mongo_service import MongoService from src.services.mqtt_service import MQTTService import json @@ -11,12 +13,14 @@ class MessageController: _client_id : MQTTService = None _auth_data : AuthData = None + _database_service : MongoService = None - def __init__(self, app, auth_data : AuthData, host : str): + def __init__(self, app, auth_data : AuthData, host : str, database_service : MongoService): self._register_routes(app) self.config = Configuration(host=host) self.config.verify_ssl=False self._auth_data = auth_data + self._database_service = database_service def _register_routes(self, app): app.add_url_rule("/message/send", view_func=self.send, methods=['POST']) @@ -40,8 +44,33 @@ class MessageController: ) message_api.post_message(new_message) - return jsonify({"success" : True, "message": None}), 200 except Exception as e: print(e) - return jsonify({"success" : False, "message" : f"An error occurred : {self._auth_data.get_token()} {self._auth_data.get_client_id()} \n {e}"}), 500 \ No newline at end of file + return jsonify({"success" : False, "message" : f"An error occurred : {self._auth_data.get_token()} {self._auth_data.get_client_id()} \n {e}"}), 500 + + def handle_message_received(self, topic: str, payload: str) -> None : + try: + + data = json.loads(payload) + print("Parsed payload:", data, flush=True) + + url = "https://192.168.15.125:1880/message/receive" + + response = requests.post( + url, + json=data, + verify=False, + timeout=5 + ) + + self._database_service.insert("messages", data) + + except json.JSONDecodeError as e: + print("Incoming payload is NOT valid JSON:", e, flush=True) + + except requests.RequestException as e: + print("HTTP request to Node-RED failed:", e, flush=True) + + except Exception as e: + print("Unexpected error:", e, flush=True) diff --git a/api-customer/src/controllers/mqtt_forwarder.py b/api-customer/src/controllers/mqtt_forwarder.py index 7760eb71..224dc50e 100644 --- a/api-customer/src/controllers/mqtt_forwarder.py +++ b/api-customer/src/controllers/mqtt_forwarder.py @@ -1,21 +1,60 @@ +import json +from typing import Callable + +from pymongo import MongoClient + +from src.models.AuthData import AuthData +from src.services.mongo_service import MongoService from src.services.mqtt_service import MQTTService class MQTTForwarder: - client_id : str - local_broker : MQTTService - central_broker : MQTTService + _client_id : str + _local_broker : MQTTService + _central_broker : MQTTService + _db_service : MongoService - def __init__(self, client_id : str, local_mqtt: MQTTService, central_mqtt: MQTTService): - self.client_id = client_id - self.local_broker = local_mqtt - self.central_broker = central_mqtt + def __init__(self, client_id : str, local_mqtt: MQTTService, central_mqtt: MQTTService, db_service : MongoService): + self._client_id = client_id + self._local_broker = local_mqtt + self._central_broker = central_mqtt + self._db_service = db_service - def start(self, src_topic: str, dst_topic: str): + def start(self, src_topic: str, dst_topic: str, handler : Callable[[str, str], None]): try: def forward_handler(topic: str, msg: str): - self.central_broker.publish(dst_topic, msg) + #self.central_broker.publish(dst_topic, msg) + data = json.loads(msg) + self._db_service.insert("telemetry", data) + handler(dst_topic, msg) - self.local_broker.subscribe(src_topic, forward_handler) + self._local_broker.subscribe(src_topic, forward_handler) except Exception as e: - print(f"An error occurred while forwarding from {src_topic} to {dst_topic}: {e}") \ No newline at end of file + print(f"An error occurred while forwarding from {src_topic} to {dst_topic}: {e}") + + +def create_forwarder(auth_data : AuthData, + local_broker_address : str, local_broker_port : int, + api_broker_address : str, api_broker_port : int) -> tuple[MQTTService, MQTTService, MQTTForwarder]: + client_id = auth_data.get_client_id() + + local_broker = MQTTService( + local_broker_address, + local_broker_port, + client_id=client_id, + username="main", + password="hepl", + ) + + api_broker = MQTTService( + api_broker_address, + api_broker_port, + client_id=client_id, + username="customer", + password="hepl", + ) + + forwarder = MQTTForwarder(client_id, local_broker, api_broker) + + return local_broker, api_broker, forwarder + diff --git a/api-customer/src/services/mongo_service.py b/api-customer/src/services/mongo_service.py new file mode 100644 index 00000000..702075bd --- /dev/null +++ b/api-customer/src/services/mongo_service.py @@ -0,0 +1,44 @@ +import json + +from pymongo import MongoClient +from pymongo.synchronous.database import Database + + +class MongoService: + + _client : MongoClient + _db : Database + + def __init__(self, uri : str, database : str): + self._client = MongoClient(uri) + self._db = self._client[database] + + + def insert(self, collection : str, data : object): + collection = self._db[collection] + payload = self._to_document(data) + result = collection.insert_one(payload) + return result.inserted_id + + def find(self, collection: str, field: str, value): + col = self._db[collection] + return list(col.find({field: value})) + + def _to_document(self, obj): + if obj is None or isinstance(obj, (str, int, float, bool)): + return obj + + if isinstance(obj, list): + return [self._to_document(i) for i in obj] + + if isinstance(obj, dict): + return {k: self._to_document(v) for k, v in obj.items()} + + if hasattr(obj, "__dict__"): + return { + k: self._to_document(v) + for k, v in vars(obj).items() + if not k.startswith("_") + } + + return str(obj) diff --git a/api-customer/src/services/mqtt_service.py b/api-customer/src/services/mqtt_service.py index cbd28c9b..98b6d210 100644 --- a/api-customer/src/services/mqtt_service.py +++ b/api-customer/src/services/mqtt_service.py @@ -41,19 +41,18 @@ class MQTTService: self._connected = False def connect(self): + if self._connected: return + + print(f"Connecting to {self.address}...") + self.client.connect(self.address, self.port) + self.client.loop_start() + timeout = 5 + start = time.time() + while not self._connected and time.time() - start < timeout: + time.sleep(0.1) if not self._connected: - print(f"Connecting to {self.address}...") - self.client.connect(self.address, self.port) - self.client.loop_start() - timeout = 5 - start = time.time() - while not self._connected and time.time() - start < timeout: - time.sleep(0.1) - if not self._connected: - raise ConnectionError(f"Cannot connect to MQTT broker at {self.address}:{self.port}") - print(f"Successfully connected to {self.address}") - else : - print(f"Already connected to {self.address}...") + raise ConnectionError(f"Cannot connect to MQTT broker at {self.address}:{self.port}") + print(f"Successfully connected to {self.address}") def disconnect(self): if self._connected: diff --git a/rpi/hardware/generic/serial_reader.py b/rpi/hardware/generic/serial_reader.py index fe522c2b..dbb78be1 100644 --- a/rpi/hardware/generic/serial_reader.py +++ b/rpi/hardware/generic/serial_reader.py @@ -7,29 +7,29 @@ from serial import Serial class SerialReader: serial : Serial = None - __thread : Thread | None = None - __listeners : list[Callable] = None + _thread : Thread | None = None + _listeners : list[Callable] = None def __init__(self, port, baudrate): self.serial = Serial(port, baudrate) self._run_event = threading.Event() - self.__listeners = [] + self._listeners = [] def start(self) -> None: self._run_event.set() - if self.__thread is None or not self.__thread.is_alive(): - self.__thread = Thread(target=self._read, daemon=True) - self.__thread.start() + if self._thread is None or not self._thread.is_alive(): + self._thread = Thread(target=self._read, daemon=True) + self._thread.start() def stop(self) -> None: if self._run_event.is_set(): self._run_event.clear() def subscribe(self, listener : Callable[[str], None]) -> None: - self.__listeners.append(listener) + self._listeners.append(listener) def _notify(self, data : str): - for listener in self.__listeners: + for listener in self._listeners: listener(data) def _read(self):