From 079ea42d9618a93a9be9694c400ad6703428b36c Mon Sep 17 00:00:00 2001 From: Laurent Date: Tue, 23 Dec 2025 14:31:54 +0100 Subject: [PATCH] Minor fixes --- rpi/board-detector/debug/realtime_detect.py | 46 ------- rpi/board-detector/main.py | 107 ----------------- rpi/board-detector/realtime_detect.py | 89 ++++++++++++++ rpi/models/detection/__init__.py | 0 .../detection}/board_manager.py | 50 ++++---- .../detection}/detector.py | 11 +- rpi/models/detection/pieces_manager.py | 112 ++++++++++++++++++ rpi/services/__init__.py | 0 rpi/services/detection_service.py | 99 ++++++++++++++++ 9 files changed, 328 insertions(+), 186 deletions(-) delete mode 100644 rpi/board-detector/debug/realtime_detect.py delete mode 100644 rpi/board-detector/main.py create mode 100644 rpi/board-detector/realtime_detect.py create mode 100644 rpi/models/detection/__init__.py rename rpi/{board-detector => models/detection}/board_manager.py (61%) rename rpi/{board-detector => models/detection}/detector.py (81%) create mode 100644 rpi/models/detection/pieces_manager.py create mode 100644 rpi/services/__init__.py create mode 100644 rpi/services/detection_service.py diff --git a/rpi/board-detector/debug/realtime_detect.py b/rpi/board-detector/debug/realtime_detect.py deleted file mode 100644 index 8e6f7982..00000000 --- a/rpi/board-detector/debug/realtime_detect.py +++ /dev/null @@ -1,46 +0,0 @@ -from ultralytics import YOLO -import cv2 - -if __name__ == "__main__": - - corner_model_path = "../../assets/models/edges.pt" - pieces_model_path = "../../assets/models/unified-nano-refined.pt" - - print("Initializing model...") - corner_model = YOLO(corner_model_path) - pieces_model = YOLO(pieces_model_path) - - print("Initializing camera...") - cap = cv2.VideoCapture(0) - cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) - cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) - cap.set(cv2.CAP_PROP_FPS, 30) - - print("Initialized") - if not cap.isOpened(): - print("Error: Could not open camera") - exit() - - cv2.namedWindow("Predictions", cv2.WINDOW_NORMAL) - - while True: - ret, frame = cap.read() - if not ret: - print("Error: Failed to grab frame") - break - - # Optional: resize frame to improve YOLO performance - # frame = cv2.resize(frame, (416, 416)) - - corner_result = corner_model.predict(source=frame, conf=0.6) - pieces_result = pieces_model.predict(source=frame, conf=0.6) - - corner_annotated_frame = corner_result[0].plot() - pieces_annotated_frame = pieces_result[0].plot(img=corner_annotated_frame) - - cv2.imshow("Predictions", pieces_annotated_frame) - cv2.resizeWindow("Predictions", 640, 640) - if cv2.waitKey(1) & 0xFF == ord('q'): - break - cap.release() - cv2.destroyAllWindows() diff --git a/rpi/board-detector/main.py b/rpi/board-detector/main.py deleted file mode 100644 index 06531260..00000000 --- a/rpi/board-detector/main.py +++ /dev/null @@ -1,107 +0,0 @@ -import cv2 -import numpy as np -from detector import Detector -from board_manager import BoardManager - -# -------------------- Pièces -------------------- -def extract_pieces(pieces_pred): - """Extrait les pièces avec leur bbox, sans remapping inutile""" - result = pieces_pred[0] - detections = [] - - for box in result.boxes: - # xywh en pixels de l'image originale - x, y, w, h = box.xywh[0].cpu().numpy() - label = result.names[int(box.cls[0])] - detections.append({"label": label, "bbox": (int(x), int(y), int(w), int(h))}) - - return detections - -import numpy as np -import cv2 - -def pieces_to_board(detected_boxes, matrix, board_size=800): - board_array = [[None for _ in range(8)] for _ in range(8)] - - for d in detected_boxes: - x, y, w, h = d["bbox"] - - # Points multiples sur la pièce pour stabilité - points = np.array([ - [x + w/2, y + h*0.2], # haut - [x + w/2, y + h/2], # centre - [x + w/2, y + h*0.8] # bas - ], dtype=np.float32).reshape(-1,1,2) - - # Transformation perspective - warped_points = cv2.perspectiveTransform(points, matrix) - wy_values = warped_points[:,0,1] # coordonnées y après warp - - # Prendre le percentile haut (25%) pour éviter décalage - wy_percentile = np.percentile(wy_values, 25) - - # Normaliser et calculer rank/file - nx = np.clip(np.mean(warped_points[:,0,0]) / board_size, 0, 0.999) - ny = np.clip(wy_percentile / board_size, 0, 0.999) - - file = min(max(int(nx * 8), 0), 7) - rank = min(max(int(ny * 8), 0), 7) - - board_array[rank][file] = d["label"] - - return board_array - - -def board_to_fen(board): - map_fen = { - "w_pawn": "P", "w_knight": "N", "w_bishop": "B", - "w_rook": "R", "w_queen": "Q", "w_king": "K", - "b_pawn": "p", "b_knight": "n", "b_bishop": "b", - "b_rook": "r", "b_queen": "q", "b_king": "k", - } - rows = [] - for rank in board: - empty = 0 - row = "" - for sq in rank: - if sq is None: - empty += 1 - else: - if empty: - row += str(empty) - empty = 0 - row += map_fen[sq] - if empty: - row += str(empty) - rows.append(row) - return "/".join(rows) - -if __name__ == "__main__": - edges_detector = Detector("../assets/models/edges.pt") - pieces_detector = Detector("../assets/models/unified-nano-refined.pt") - #image_path = "./test/1.png" - image_path = "../training/datasets/pieces/unified/test/images/659_jpg.rf.0009cadea8df487a76d6960a28b9d811.jpg" - image = cv2.imread(image_path) - - edges_pred = edges_detector.make_prediction(image_path) - pieces_pred = pieces_detector.make_prediction(image_path) - - remap_width = 800 - remap_height = 800 - - board_manager = BoardManager(image) - corners, matrix = board_manager.extract_corners(edges_pred[0], (remap_width, remap_height)) - - detections = extract_pieces(pieces_pred) - - board = pieces_to_board(detections, matrix, remap_width) - - # FEN - fen = board_to_fen(board) - print("FEN:", fen) - - frame = pieces_pred[0].plot() - cv2.namedWindow("Pred", cv2.WINDOW_NORMAL) - cv2.imshow("Pred", frame) - cv2.waitKey(0) - cv2.destroyAllWindows() \ No newline at end of file diff --git a/rpi/board-detector/realtime_detect.py b/rpi/board-detector/realtime_detect.py new file mode 100644 index 00000000..91fe055a --- /dev/null +++ b/rpi/board-detector/realtime_detect.py @@ -0,0 +1,89 @@ +import time + +from models.detection.detector import * +from models.detection.board_manager import * +from models.detection.pieces_manager import * + +def print_board_console(board_array): + files = "abcdefgh" + print(" " + " ".join(files)) + print(" +-----------------+") + + for r in range(8): + row_str = str(8 - r) + "|" + for f in range(8): + piece = board_array[r][f] + if piece is None: + row_str += ". " + else: + if piece[0] == "w": + piece = piece.upper() + row_str += piece[2] + " " + row_str += "|" + print(row_str) + print(" +-----------------+") + +if __name__ == "__main__": + + print("Initializing models...") + edges_detector = Detector("../assets/models/edges.pt") + pieces_detector = Detector("../assets/models/unified-nano-refined.pt") + + pieces_manager = PiecesManager() + board_manager = BoardManager() + + print("Initializing camera...") + cap = cv2.VideoCapture(0) + cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) + cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) + cap.set(cv2.CAP_PROP_FPS, 30) + + print("Initialized") + if not cap.isOpened(): + print("Error: Could not open camera") + exit() + + cv2.namedWindow("Predictions", cv2.WINDOW_NORMAL) + + while True: + time.sleep(1) + + ret, frame = cap.read() + if not ret: + print("Error: Failed to grab frame") + break + + edges_pred = edges_detector.make_prediction(frame) + pieces_pred = pieces_detector.make_prediction(frame) + + edges_annotated_frame = edges_pred[0].plot() + pieces_annotated_frame = pieces_pred[0].plot(img=edges_annotated_frame) + + cv2.imshow("Predictions", pieces_annotated_frame) + cv2.resizeWindow("Predictions", 640, 640) + + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + remap_width = 800 + remap_height = 800 + + board_manager = BoardManager() + result = board_manager.process_frame(edges_pred[0], frame, (remap_width, remap_height)) + if result is None: + continue + + corners = result[0] + matrix = result[1] + + detections = pieces_manager.extract_pieces(pieces_pred) + + board = pieces_manager.pieces_to_board(detections, matrix, (remap_width, remap_height)) + + fen = pieces_manager.board_to_fen(board) + print("FEN:", fen) + + print_board_console(board) + + cap.release() + cv2.destroyAllWindows() diff --git a/rpi/models/detection/__init__.py b/rpi/models/detection/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/rpi/board-detector/board_manager.py b/rpi/models/detection/board_manager.py similarity index 61% rename from rpi/board-detector/board_manager.py rename to rpi/models/detection/board_manager.py index 4cf0f1f3..a9d2c04f 100644 --- a/rpi/board-detector/board_manager.py +++ b/rpi/models/detection/board_manager.py @@ -1,27 +1,29 @@ import cv2 import numpy as np -from typing import Tuple +from typing import Tuple, Any + +from numpy import ndarray class BoardManager: - image: np.ndarray - def __init__(self, image : np.ndarray) -> None: - self.image = image - - def extract_corners(self, prediction: object, scale_size: tuple[int, int]) -> tuple[np.ndarray, np.ndarray]: - """Extrait le plateau warpé à partir de la prédiction et de l'image de base""" - - mask = self.__get_mask(prediction) - contour = self.__get_largest_contour(mask) - corners = self.__approx_corners(contour) - scaled_corners = self.__scale_corners(corners, mask.shape, self.image.shape) - ordered_corners = self.__order_corners(scaled_corners) - transformation_matrix = self.__calculte_transformation_matrix(ordered_corners, scale_size) - return ordered_corners, transformation_matrix + def process_frame(self, prediction: object, image : np.ndarray, scale_size: tuple[int, int]) -> tuple[ndarray, ndarray] | None: + try : + mask = self.__get_mask(prediction) + contour = self.__get_largest_contour(mask) + corners = self.__approx_corners(contour) + scaled_corners = self.__scale_corners(corners, mask.shape, image.shape) + ordered_corners = self.__order_corners(scaled_corners) + transformation_matrix = self.__calculte_transformation_matrix(ordered_corners, scale_size) + warped_corners = cv2.perspectiveTransform( + np.array(ordered_corners, np.float32).reshape(-1, 1, 2), + transformation_matrix + ).reshape(-1, 2) + return warped_corners, transformation_matrix + except Exception as e: + print(e) + return None def __calculte_transformation_matrix(self, corners: np.ndarray, output_size : tuple[int, int]) -> np.ndarray: - """Calcule la matrice de perspective""" - width = output_size[0] height = output_size[1] @@ -33,26 +35,20 @@ class BoardManager: ], dtype=np.float32) return cv2.getPerspectiveTransform(corners, dst) - def __get_mask(self, pred: object) -> np.ndarray: - """Retourne le masque du plateau en uint8""" - + def __get_mask(self, pred: object) -> Any: if pred.masks is None: - raise ValueError("No board mask detected") + raise ValueError("Board contour is not 4 corners") mask = pred.masks.data[0].cpu().numpy() mask = (mask * 255).astype(np.uint8) return mask def __get_largest_contour(self, mask: np.ndarray) -> np.ndarray: - """Trouve le plus grand contour dans le masque""" - contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if not contours: raise ValueError("No contours found") return max(contours, key=cv2.contourArea) def __approx_corners(self, contour: np.ndarray) -> np.ndarray: - """Approxime les coins du plateau à 4 points""" - epsilon = 0.02 * cv2.arcLength(contour, True) approx = cv2.approxPolyDP(contour, epsilon, True) if len(approx) != 4: @@ -60,8 +56,6 @@ class BoardManager: return approx.reshape(4, 2) def __scale_corners(self, pts: np.ndarray, mask_shape: Tuple[int, int], image_shape: Tuple[int, int, int]) -> np.ndarray: - """Remappe les coins de la taille du masque vers la taille originale""" - mask_h, mask_w = mask_shape img_h, img_w = image_shape[:2] scale_x = img_w / mask_w @@ -70,8 +64,6 @@ class BoardManager: return np.array(scaled_pts, dtype=np.float32) def __order_corners(self, pts: np.ndarray) -> np.ndarray: - """Ordonne les coins top-left, top-right, bottom-right, bottom-left""" - rect = np.zeros((4, 2), dtype="float32") s = pts.sum(axis=1) rect[0] = pts[np.argmin(s)] # top-left diff --git a/rpi/board-detector/detector.py b/rpi/models/detection/detector.py similarity index 81% rename from rpi/board-detector/detector.py rename to rpi/models/detection/detector.py index ea4a119b..212ae527 100644 --- a/rpi/board-detector/detector.py +++ b/rpi/models/detection/detector.py @@ -2,8 +2,11 @@ import os import random +import numpy as np from ultralytics import YOLO import cv2 +from ultralytics.engine.results import Results + class Detector : @@ -12,12 +15,12 @@ class Detector : self.used_height = 640 self.used_width = 640 - def make_prediction(self, image_path): - return self.model.predict(source=image_path, conf=0.6) + def make_prediction(self, image : str | np.ndarray) -> list[Results]: + return self.model.predict(source=image, conf=0.6) if __name__ == "__main__": - corner_model_path = "../assets/models/edges.pt" - pieces_model_path = "../assets/models/unified-nano-refined.pt" + corner_model_path = "../../assets/models/edges.pt" + pieces_model_path = "../../assets/models/unified-nano-refined.pt" corner_model = YOLO(corner_model_path) pieces_model = YOLO(pieces_model_path) diff --git a/rpi/models/detection/pieces_manager.py b/rpi/models/detection/pieces_manager.py new file mode 100644 index 00000000..4a683642 --- /dev/null +++ b/rpi/models/detection/pieces_manager.py @@ -0,0 +1,112 @@ +import cv2 +import numpy as np +from typing import Any + +from numpy import ndarray + + +class PiecesManager: + + def __init__(self): + pass + + def extract_pieces(self, pieces_pred) -> list[Any]: + result = pieces_pred[0] + detections = [] + + for box in result.boxes: + # xywh en pixels de l'image originale + x, y, w, h = box.xywh[0].cpu().numpy() + label = result.names[int(box.cls[0])] + detections.append({"label": label, "bbox": (int(x), int(y), int(w), int(h))}) + + return detections + + def pieces_to_board(self, detected_boxes: list, warped_corners: ndarray, matrix: np.ndarray, board_size: tuple[int, int]) -> list[list[str | None]]: + + board_array = [[None for _ in range(8)] for _ in range(8)] + board_width, board_height = board_size + + tl, tr, br, bl = warped_corners + square_centers = self.__compute_square_centers(tl, tr, br, bl) + + for d in detected_boxes: + x, y, w, h = d["bbox"] + + points = np.array([ + [x + w / 2, y + h * 0.2], + [x + w / 2, y + h / 2], + [x + w / 2, y + h * 0.8] + ], dtype=np.float32).reshape(-1, 1, 2) + + warped_points = cv2.perspectiveTransform(points, matrix) + + wx = np.mean(warped_points[:, 0, 0]) + wy = np.percentile(warped_points[:, 0, 1], 25) + + best_rank = 0 + best_file = 0 + min_dist = float("inf") + + for r, c, cx, cy in square_centers: + dist = (wx - cx) ** 2 + (wy - cy) ** 2 + if dist < min_dist: + min_dist = dist + best_rank = r + best_file = c + + max_reasonable_dist = (board_width / 8) ** 2 + if min_dist > max_reasonable_dist: + continue + + board_array[best_rank][best_file] = d["label"] + + return board_array + + def board_to_fen(self, board : list[list[str | None]]) -> str: + map_fen = { + "w_pawn": "P", "w_knight": "N", "w_bishop": "B", + "w_rook": "R", "w_queen": "Q", "w_king": "K", + "b_pawn": "p", "b_knight": "n", "b_bishop": "b", + "b_rook": "r", "b_queen": "q", "b_king": "k", + } + rows = [] + for rank in board: + empty = 0 + row = "" + for sq in rank: + if sq is None: + empty += 1 + else: + if empty: + row += str(empty) + empty = 0 + row += map_fen[sq] + if empty: + row += str(empty) + rows.append(row) + return "/".join(rows) + + def __compute_square_centers(self, tl, tr, br, bl): + centers = [] + for line in range(8): + for file in range(8): + u = (file + 0.5) / 8 + v = (line + 0.5) / 8 + + # interpolation bilinéaire + x = ( + (1 - u) * (1 - v) * tl[0] + + u * (1 - v) * tr[0] + + u * v * br[0] + + (1 - u) * v * bl[0] + ) + y = ( + (1 - u) * (1 - v) * tl[1] + + u * (1 - v) * tr[1] + + u * v * br[1] + + (1 - u) * v * bl[1] + ) + + centers.append((line, file, x, y)) + return centers diff --git a/rpi/services/__init__.py b/rpi/services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/rpi/services/detection_service.py b/rpi/services/detection_service.py new file mode 100644 index 00000000..e6071dbe --- /dev/null +++ b/rpi/services/detection_service.py @@ -0,0 +1,99 @@ +import cv2 +import numpy as np +from ultralytics.engine.results import Results + +from models.detection.detector import Detector +from models.detection.board_manager import BoardManager +from models.detection.pieces_manager import PiecesManager + + +class DetectionService: + + edges_detector : Detector + pieces_detector : Detector + + board_manager : BoardManager + pieces_manager : PiecesManager + + scale_size : tuple[int, int] + + def __init__(self): + self.edges_detector = Detector("../assets/models/edges.pt") + self.pieces_detector = Detector("../assets/models/unified-nano-refined.pt") + + self.pieces_manager = PiecesManager() + self.board_manager = BoardManager() + self.scale_size = (800, 800) + + + def run_complete_detection(self, frame : np.ndarray, display=False) -> dict[str, list[Results]] : + pieces_prediction = self.run_pieces_detection(frame) + edges_prediction = self.run_edges_detection(frame) + + if display: + edges_annotated_frame = edges_prediction[0].plot() + pieces_annotated_frame = pieces_prediction[0].plot(img=edges_annotated_frame) + self.__display_frame(pieces_annotated_frame) + + return { "edges" : edges_prediction, "pieces" : pieces_prediction} + + + def run_pieces_detection(self, frame : np.ndarray, display=False) -> list[Results]: + prediction = self.pieces_detector.make_prediction(frame) + if display: + self.__display_frame(prediction[0].plot()) + return prediction + + + def run_edges_detection(self, frame : np.ndarray, display=False) -> list[Results]: + prediction = self.edges_detector.make_prediction(frame) + if display: + self.__display_frame(prediction[0].plot()) + return prediction + + + def get_fen(self, frame : np.ndarray) -> str | None: + result = self.run_complete_detection(frame) + + edges_prediction = result["edges"] + pieces_prediction = result["pieces"] + + warped_corners, matrix = self.board_manager.process_frame(edges_prediction[0], frame, self.scale_size) + if matrix is None: + return None + + detections = self.pieces_manager.extract_pieces(pieces_prediction) + + board = self.pieces_manager.pieces_to_board(detections, warped_corners, matrix, self.scale_size) + + return self.pieces_manager.board_to_fen(board) + + + def __display_frame(self, frame : np.ndarray): + cv2.namedWindow("Frame", cv2.WINDOW_NORMAL) + cv2.resizeWindow("Frame", self.scale_size[0], self.scale_size[1]) + cv2.imshow("Frame", frame) + cv2.waitKey(0) + cv2.destroyAllWindows() + return + + +if __name__ == "__main__" : + import os + import random + + service = DetectionService() + + img_folder = "../training/datasets/pieces/unified/test/images/" + + test_images = os.listdir(img_folder) + + rnd = random.randint(0, len(test_images) - 1) + img_path = os.path.join(img_folder, test_images[rnd]) + + image = cv2.imread(img_path) + + fen = service.get_fen(image) + print(fen) + + service.run_complete_detection(image, display=True) \ No newline at end of file