From 01594d02ed7da1c5f38dd80cf880d42f838e81e8 Mon Sep 17 00:00:00 2001 From: Laurent Date: Wed, 31 Dec 2025 12:15:59 +0100 Subject: [PATCH] Integrate detection service --- api/mosquitto/data/mosquitto.db | Bin 164 -> 164 bytes rpi/hardware/camera/__init__.py | 0 rpi/hardware/camera/camera.py | 60 ++++++++++++++++++++++++++++++ rpi/main.py | 10 ++--- rpi/services/detection_service.py | 51 +++++++++++-------------- rpi/services/game_service.py | 23 ++++++++++++ 6 files changed, 109 insertions(+), 35 deletions(-) create mode 100644 rpi/hardware/camera/__init__.py create mode 100644 rpi/hardware/camera/camera.py create mode 100644 rpi/services/game_service.py diff --git a/api/mosquitto/data/mosquitto.db b/api/mosquitto/data/mosquitto.db index 6c634b9963c6cdf2c37d29f670dccd6fdc8864fb..1b72d16b07fa7d0a5b0f54c571172483865af184 100644 GIT binary patch delta 12 TcmZ3&xP) np.ndarray: + self.open() + try: + ret, frame = self.cap.read() + if not ret: + raise RuntimeError("Failed to capture image") + return frame + finally: + self.close() + + def start_capture(self, callback: Callable[[np.ndarray], None]): + if self.thread and self.thread.is_alive(): + return + + self.open() + self.stop_event.clear() + + def loop(): + while not self.stop_event.is_set(): + ret, frame = self.cap.read() + if not ret: + break + callback(frame) + + self.thread = threading.Thread(target=loop, daemon=True) + self.thread.start() + + def stop_capture(self): + self.stop_event.set() + + self.close() + + if self.thread: + self.thread.join(timeout=2) + self.thread = None diff --git a/rpi/main.py b/rpi/main.py index 8acb3a55..71a81262 100644 --- a/rpi/main.py +++ b/rpi/main.py @@ -9,6 +9,7 @@ from hardware.rfid.reader import RfidReader from services.clock_service import ClockService from services.detection_service import DetectionService from services.forwarder_service import ForwarderService +from services.game_service import GameService from services.mqtt_service import MQTTService load_dotenv() @@ -30,9 +31,6 @@ app = Flask(__name__) rfid_reader = RfidReader("/dev/serial0", 9600) light_sensor_reader = LoraLightSensorReader("/dev/ttyUSB1", 9600) -detection_service = DetectionService() -clock_service = ClockService() - local_broker = MQTTService( local_broker_address, local_broker_port, @@ -50,6 +48,7 @@ api_broker = MQTTService( ) forward_service = ForwarderService(local_broker, api_broker) +game_service = GameService() @app.route("/command/party/start", methods=['POST']) def start_party(): @@ -61,9 +60,9 @@ def start_party(): return jsonify({"status": "error", "message": f"An error occurred : {e}"}), 500 @app.route("/command/party/play", methods=['POST']) -def party_play(): +def make_move(): try: - print("Move received started!") + game_service.make_move() return jsonify({"status": "ok", "message": "Party started"}), 200 except Exception as ex: print(e) @@ -84,7 +83,6 @@ if __name__ == "__main__": rfid_reader.start() light_sensor_reader.start() - clock_service.start(600, 0) print("App started...") app.run(host="0.0.0.0", port=5000, debug=False) diff --git a/rpi/services/detection_service.py b/rpi/services/detection_service.py index d1374206..bddf4286 100644 --- a/rpi/services/detection_service.py +++ b/rpi/services/detection_service.py @@ -3,6 +3,7 @@ import numpy as np from pathlib import Path from ultralytics.engine.results import Results +from hardware.camera.camera import Camera from models.detection.detector import Detector from models.detection.board_manager import BoardManager from models.detection.pieces_manager import PiecesManager @@ -18,6 +19,8 @@ class DetectionService: scale_size : tuple[int, int] + camera : Camera + def __init__(self): current_file = Path(__file__).resolve() project_root = current_file.parent.parent @@ -28,11 +31,23 @@ class DetectionService: self.pieces_manager = PiecesManager() self.board_manager = BoardManager() self.scale_size = (800, 800) + self.camera = Camera() + + def start(self): + self.camera.open() + + def stop(self): + self.camera.close() + + def analyze_single_frame(self) -> str | None: + frame = self.camera.take_photo() + fen = self.__get_fen(frame) + return fen - def run_complete_detection(self, frame : np.ndarray, display=False) -> dict[str, list[Results]] : - pieces_prediction = self.run_pieces_detection(frame) - edges_prediction = self.run_edges_detection(frame) + def __run_complete_detection(self, frame : np.ndarray, display=False) -> dict[str, list[Results]] : + pieces_prediction = self.__run_pieces_detection(frame) + edges_prediction = self.__run_edges_detection(frame) if display: edges_annotated_frame = edges_prediction[0].plot() @@ -42,22 +57,22 @@ class DetectionService: return { "edges" : edges_prediction, "pieces" : pieces_prediction} - def run_pieces_detection(self, frame : np.ndarray, display=False) -> list[Results]: + def __run_pieces_detection(self, frame : np.ndarray, display=False) -> list[Results]: prediction = self.pieces_detector.make_prediction(frame) if display: self.__display_frame(prediction[0].plot()) return prediction - def run_edges_detection(self, frame : np.ndarray, display=False) -> list[Results]: + def __run_edges_detection(self, frame : np.ndarray, display=False) -> list[Results]: prediction = self.edges_detector.make_prediction(frame) if display: self.__display_frame(prediction[0].plot()) return prediction - def get_fen(self, frame : np.ndarray) -> str | None: - result = self.run_complete_detection(frame) + def __get_fen(self, frame : np.ndarray) -> str | None: + result = self.__run_complete_detection(frame) edges_prediction = result["edges"] pieces_prediction = result["pieces"] @@ -75,7 +90,6 @@ class DetectionService: return self.pieces_manager.board_to_fen(board) - def __display_frame(self, frame : np.ndarray): cv2.namedWindow("Frame", cv2.WINDOW_NORMAL) cv2.resizeWindow("Frame", self.scale_size[0], self.scale_size[1]) @@ -83,24 +97,3 @@ class DetectionService: cv2.waitKey(0) cv2.destroyAllWindows() return - - -if __name__ == "__main__" : - import os - import random - - service = DetectionService() - - img_folder = "../training/datasets/pieces/unified/test/images/" - - test_images = os.listdir(img_folder) - - rnd = random.randint(0, len(test_images) - 1) - img_path = os.path.join(img_folder, test_images[rnd]) - - image = cv2.imread(img_path) - - fen = service.get_fen(image) - print(fen) - - service.run_complete_detection(image, display=True) diff --git a/rpi/services/game_service.py b/rpi/services/game_service.py new file mode 100644 index 00000000..eaabd75c --- /dev/null +++ b/rpi/services/game_service.py @@ -0,0 +1,23 @@ +from services.clock_service import ClockService +from services.detection_service import DetectionService + + +class GameService: + + detection_service : DetectionService + clock_service : ClockService + + def __init__(self): + self.detection_service = DetectionService() + self.clock_service = ClockService() + + def start(self, time_control : int, increment : int ) -> None: + self.clock_service.start(time_control, increment) + self.clock_service.set_on_terminated(self.stop) + + def stop(self): + self.clock_service.stop() + + def make_move(self) -> None: + fen = self.detection_service.analyze_single_frame() + print(fen) \ No newline at end of file