Add telemetry database insertion
This commit is contained in:
@@ -1,8 +1,10 @@
|
||||
import requests
|
||||
from board_mate.message import Configuration, ApiClient, MessageDto, MessageApi, MessagePostRequestDto
|
||||
from flask import jsonify, request
|
||||
from pydantic import StrictStr
|
||||
|
||||
from src.models.AuthData import AuthData
|
||||
from src.services.mongo_service import MongoService
|
||||
from src.services.mqtt_service import MQTTService
|
||||
import json
|
||||
|
||||
@@ -11,12 +13,14 @@ class MessageController:
|
||||
|
||||
_client_id : MQTTService = None
|
||||
_auth_data : AuthData = None
|
||||
_database_service : MongoService = None
|
||||
|
||||
def __init__(self, app, auth_data : AuthData, host : str):
|
||||
def __init__(self, app, auth_data : AuthData, host : str, database_service : MongoService):
|
||||
self._register_routes(app)
|
||||
self.config = Configuration(host=host)
|
||||
self.config.verify_ssl=False
|
||||
self._auth_data = auth_data
|
||||
self._database_service = database_service
|
||||
|
||||
def _register_routes(self, app):
|
||||
app.add_url_rule("/message/send", view_func=self.send, methods=['POST'])
|
||||
@@ -40,8 +44,33 @@ class MessageController:
|
||||
)
|
||||
|
||||
message_api.post_message(new_message)
|
||||
|
||||
return jsonify({"success" : True, "message": None}), 200
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return jsonify({"success" : False, "message" : f"An error occurred : {self._auth_data.get_token()} {self._auth_data.get_client_id()} \n {e}"}), 500
|
||||
return jsonify({"success" : False, "message" : f"An error occurred : {self._auth_data.get_token()} {self._auth_data.get_client_id()} \n {e}"}), 500
|
||||
|
||||
def handle_message_received(self, topic: str, payload: str) -> None :
|
||||
try:
|
||||
|
||||
data = json.loads(payload)
|
||||
print("Parsed payload:", data, flush=True)
|
||||
|
||||
url = "https://192.168.15.125:1880/message/receive"
|
||||
|
||||
response = requests.post(
|
||||
url,
|
||||
json=data,
|
||||
verify=False,
|
||||
timeout=5
|
||||
)
|
||||
|
||||
self._database_service.insert("messages", data)
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
print("Incoming payload is NOT valid JSON:", e, flush=True)
|
||||
|
||||
except requests.RequestException as e:
|
||||
print("HTTP request to Node-RED failed:", e, flush=True)
|
||||
|
||||
except Exception as e:
|
||||
print("Unexpected error:", e, flush=True)
|
||||
|
||||
@@ -1,21 +1,60 @@
|
||||
import json
|
||||
from typing import Callable
|
||||
|
||||
from pymongo import MongoClient
|
||||
|
||||
from src.models.AuthData import AuthData
|
||||
from src.services.mongo_service import MongoService
|
||||
from src.services.mqtt_service import MQTTService
|
||||
|
||||
class MQTTForwarder:
|
||||
|
||||
client_id : str
|
||||
local_broker : MQTTService
|
||||
central_broker : MQTTService
|
||||
_client_id : str
|
||||
_local_broker : MQTTService
|
||||
_central_broker : MQTTService
|
||||
_db_service : MongoService
|
||||
|
||||
def __init__(self, client_id : str, local_mqtt: MQTTService, central_mqtt: MQTTService):
|
||||
self.client_id = client_id
|
||||
self.local_broker = local_mqtt
|
||||
self.central_broker = central_mqtt
|
||||
def __init__(self, client_id : str, local_mqtt: MQTTService, central_mqtt: MQTTService, db_service : MongoService):
|
||||
self._client_id = client_id
|
||||
self._local_broker = local_mqtt
|
||||
self._central_broker = central_mqtt
|
||||
self._db_service = db_service
|
||||
|
||||
def start(self, src_topic: str, dst_topic: str):
|
||||
def start(self, src_topic: str, dst_topic: str, handler : Callable[[str, str], None]):
|
||||
try:
|
||||
def forward_handler(topic: str, msg: str):
|
||||
self.central_broker.publish(dst_topic, msg)
|
||||
#self.central_broker.publish(dst_topic, msg)
|
||||
data = json.loads(msg)
|
||||
self._db_service.insert("telemetry", data)
|
||||
handler(dst_topic, msg)
|
||||
|
||||
self.local_broker.subscribe(src_topic, forward_handler)
|
||||
self._local_broker.subscribe(src_topic, forward_handler)
|
||||
except Exception as e:
|
||||
print(f"An error occurred while forwarding from {src_topic} to {dst_topic}: {e}")
|
||||
print(f"An error occurred while forwarding from {src_topic} to {dst_topic}: {e}")
|
||||
|
||||
|
||||
def create_forwarder(auth_data : AuthData,
|
||||
local_broker_address : str, local_broker_port : int,
|
||||
api_broker_address : str, api_broker_port : int) -> tuple[MQTTService, MQTTService, MQTTForwarder]:
|
||||
client_id = auth_data.get_client_id()
|
||||
|
||||
local_broker = MQTTService(
|
||||
local_broker_address,
|
||||
local_broker_port,
|
||||
client_id=client_id,
|
||||
username="main",
|
||||
password="hepl",
|
||||
)
|
||||
|
||||
api_broker = MQTTService(
|
||||
api_broker_address,
|
||||
api_broker_port,
|
||||
client_id=client_id,
|
||||
username="customer",
|
||||
password="hepl",
|
||||
)
|
||||
|
||||
forwarder = MQTTForwarder(client_id, local_broker, api_broker)
|
||||
|
||||
return local_broker, api_broker, forwarder
|
||||
|
||||
|
||||
44
api-customer/src/services/mongo_service.py
Normal file
44
api-customer/src/services/mongo_service.py
Normal file
@@ -0,0 +1,44 @@
|
||||
import json
|
||||
|
||||
from pymongo import MongoClient
|
||||
from pymongo.synchronous.database import Database
|
||||
|
||||
|
||||
class MongoService:
|
||||
|
||||
_client : MongoClient
|
||||
_db : Database
|
||||
|
||||
def __init__(self, uri : str, database : str):
|
||||
self._client = MongoClient(uri)
|
||||
self._db = self._client[database]
|
||||
|
||||
|
||||
def insert(self, collection : str, data : object):
|
||||
collection = self._db[collection]
|
||||
payload = self._to_document(data)
|
||||
result = collection.insert_one(payload)
|
||||
return result.inserted_id
|
||||
|
||||
def find(self, collection: str, field: str, value):
|
||||
col = self._db[collection]
|
||||
return list(col.find({field: value}))
|
||||
|
||||
def _to_document(self, obj):
|
||||
if obj is None or isinstance(obj, (str, int, float, bool)):
|
||||
return obj
|
||||
|
||||
if isinstance(obj, list):
|
||||
return [self._to_document(i) for i in obj]
|
||||
|
||||
if isinstance(obj, dict):
|
||||
return {k: self._to_document(v) for k, v in obj.items()}
|
||||
|
||||
if hasattr(obj, "__dict__"):
|
||||
return {
|
||||
k: self._to_document(v)
|
||||
for k, v in vars(obj).items()
|
||||
if not k.startswith("_")
|
||||
}
|
||||
|
||||
return str(obj)
|
||||
@@ -41,19 +41,18 @@ class MQTTService:
|
||||
self._connected = False
|
||||
|
||||
def connect(self):
|
||||
if self._connected: return
|
||||
|
||||
print(f"Connecting to {self.address}...")
|
||||
self.client.connect(self.address, self.port)
|
||||
self.client.loop_start()
|
||||
timeout = 5
|
||||
start = time.time()
|
||||
while not self._connected and time.time() - start < timeout:
|
||||
time.sleep(0.1)
|
||||
if not self._connected:
|
||||
print(f"Connecting to {self.address}...")
|
||||
self.client.connect(self.address, self.port)
|
||||
self.client.loop_start()
|
||||
timeout = 5
|
||||
start = time.time()
|
||||
while not self._connected and time.time() - start < timeout:
|
||||
time.sleep(0.1)
|
||||
if not self._connected:
|
||||
raise ConnectionError(f"Cannot connect to MQTT broker at {self.address}:{self.port}")
|
||||
print(f"Successfully connected to {self.address}")
|
||||
else :
|
||||
print(f"Already connected to {self.address}...")
|
||||
raise ConnectionError(f"Cannot connect to MQTT broker at {self.address}:{self.port}")
|
||||
print(f"Successfully connected to {self.address}")
|
||||
|
||||
def disconnect(self):
|
||||
if self._connected:
|
||||
|
||||
Reference in New Issue
Block a user