Move inference onto the API
This commit is contained in:
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -12,12 +12,14 @@ from services.mqtt_service import MQTTService
|
||||
class GameController:
|
||||
|
||||
_game_service : GameService
|
||||
_api_url : str
|
||||
_broker_service : MQTTService
|
||||
_has_started : bool
|
||||
_auth_token : str
|
||||
|
||||
def __init__(self, app : Flask, broker_service : MQTTService):
|
||||
def __init__(self, app : Flask, api_url : str, broker_service : MQTTService):
|
||||
self._game_service = GameService()
|
||||
self._api_url = api_url
|
||||
self._game_service.set_on_terminated(self._stop_event)
|
||||
self._broker_service = broker_service
|
||||
self._register_routes(app)
|
||||
@@ -61,11 +63,17 @@ class GameController:
|
||||
if auth_token != "Bearer " + self._auth_token:
|
||||
return jsonify({"status": "error", "message": "Invalid authorization token"}), 401
|
||||
|
||||
threading.Thread(
|
||||
target=self._analyze_move(),
|
||||
daemon=True
|
||||
).start()
|
||||
img = self._game_service.make_move()
|
||||
b64_img = base64.b64encode(img).decode('utf-8')
|
||||
payload = {
|
||||
"image": f"data:image/jpeg;base64,{b64_img}"
|
||||
}
|
||||
response = requests.post(self._api_url, json=payload, verify=False)
|
||||
print(response.status_code)
|
||||
|
||||
data = response.json()
|
||||
fen = data.get("fen")
|
||||
self._game_service.add_move(fen)
|
||||
return jsonify({"status": "ok"}), 200
|
||||
|
||||
except ServiceException as ex:
|
||||
@@ -74,22 +82,6 @@ class GameController:
|
||||
print(ex)
|
||||
return jsonify({"status": "error", "message": f"An error occurred : {ex}"}), 500
|
||||
|
||||
def _analyze_move(self):
|
||||
img, fen = self._game_service.make_move()
|
||||
self._send_detection_result("https://192.168.15.125:1880/party/image", img, fen)
|
||||
|
||||
def _send_detection_result(self, url, img, fen):
|
||||
try:
|
||||
b64_img = base64.b64encode(img).decode('utf-8')
|
||||
payload = {
|
||||
"fen": fen,
|
||||
"image": f"data:image/jpeg;base64,{b64_img}"
|
||||
}
|
||||
response = requests.post(url, json=payload, verify=False)
|
||||
print(response.status_code)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
def _stop_event(self, game_data : str):
|
||||
try :
|
||||
print(f"Exporting game data : {game_data}")
|
||||
|
||||
@@ -23,13 +23,13 @@ class Camera:
|
||||
self.cap.release()
|
||||
self.cap = None
|
||||
|
||||
def take_photo(self) -> np.ndarray:
|
||||
def take_photo(self) -> bytes:
|
||||
self.open()
|
||||
try:
|
||||
ret, frame = self.cap.read()
|
||||
if not ret:
|
||||
raise RuntimeError("Failed to capture image")
|
||||
return frame
|
||||
return cv2.imencode('.jpg', frame, [int(cv2.IMWRITE_JPEG_QUALITY), 80])[1].tobytes()
|
||||
finally:
|
||||
self.close()
|
||||
|
||||
|
||||
@@ -44,7 +44,7 @@ api_broker = MQTTService(
|
||||
password=api_password,
|
||||
)
|
||||
|
||||
game_controller = GameController(app, api_broker)
|
||||
game_controller = GameController(app, "https://192.168.15.125:1880/party/image", api_broker)
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
|
||||
@@ -1,74 +0,0 @@
|
||||
import cv2
|
||||
import numpy as np
|
||||
from typing import Tuple, Any
|
||||
|
||||
from numpy import ndarray
|
||||
|
||||
class BoardManager:
|
||||
|
||||
def process_frame(self, prediction: object, image : np.ndarray, scale_size: tuple[int, int]) -> tuple[ndarray, ndarray] | None:
|
||||
try :
|
||||
mask = self.__get_mask(prediction)
|
||||
contour = self.__get_largest_contour(mask)
|
||||
corners = self.__approx_corners(contour)
|
||||
scaled_corners = self.__scale_corners(corners, mask.shape, image.shape)
|
||||
ordered_corners = self.__order_corners(scaled_corners)
|
||||
transformation_matrix = self.__calculte_transformation_matrix(ordered_corners, scale_size)
|
||||
warped_corners = cv2.perspectiveTransform(
|
||||
np.array(ordered_corners, np.float32).reshape(-1, 1, 2),
|
||||
transformation_matrix
|
||||
).reshape(-1, 2)
|
||||
return warped_corners, transformation_matrix
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return None
|
||||
|
||||
def __calculte_transformation_matrix(self, corners: np.ndarray, output_size : tuple[int, int]) -> np.ndarray:
|
||||
width = output_size[0]
|
||||
height = output_size[1]
|
||||
|
||||
dst = np.array([
|
||||
[0, 0], # top-left
|
||||
[width - 1, 0], # top-right
|
||||
[width - 1, height - 1], # bottom-right
|
||||
[0, height - 1] # bottom-left
|
||||
], dtype=np.float32)
|
||||
return cv2.getPerspectiveTransform(corners, dst)
|
||||
|
||||
def __get_mask(self, pred: object) -> Any:
|
||||
if pred.masks is None:
|
||||
raise ValueError("Board contour is not 4 corners")
|
||||
mask = pred.masks.data[0].cpu().numpy()
|
||||
mask = (mask * 255).astype(np.uint8)
|
||||
return mask
|
||||
|
||||
def __get_largest_contour(self, mask: np.ndarray) -> np.ndarray:
|
||||
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
if not contours:
|
||||
raise ValueError("No contours found")
|
||||
return max(contours, key=cv2.contourArea)
|
||||
|
||||
def __approx_corners(self, contour: np.ndarray) -> np.ndarray:
|
||||
epsilon = 0.02 * cv2.arcLength(contour, True)
|
||||
approx = cv2.approxPolyDP(contour, epsilon, True)
|
||||
if len(approx) != 4:
|
||||
raise ValueError("Board contour is not 4 corners")
|
||||
return approx.reshape(4, 2)
|
||||
|
||||
def __scale_corners(self, pts: np.ndarray, mask_shape: Tuple[int, int], image_shape: Tuple[int, int, int]) -> np.ndarray:
|
||||
mask_h, mask_w = mask_shape
|
||||
img_h, img_w = image_shape[:2]
|
||||
scale_x = img_w / mask_w
|
||||
scale_y = img_h / mask_h
|
||||
scaled_pts = [(int(p[0] * scale_x), int(p[1] * scale_y)) for p in pts]
|
||||
return np.array(scaled_pts, dtype=np.float32)
|
||||
|
||||
def __order_corners(self, pts: np.ndarray) -> np.ndarray:
|
||||
rect = np.zeros((4, 2), dtype="float32")
|
||||
s = pts.sum(axis=1)
|
||||
rect[0] = pts[np.argmin(s)] # top-left
|
||||
rect[2] = pts[np.argmax(s)] # bottom-right
|
||||
diff = np.diff(pts, axis=1)
|
||||
rect[1] = pts[np.argmin(diff)] # top-right
|
||||
rect[3] = pts[np.argmax(diff)] # bottom-left
|
||||
return rect
|
||||
@@ -1,56 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
import os
|
||||
import random
|
||||
|
||||
import numpy as np
|
||||
from ultralytics import YOLO
|
||||
import cv2
|
||||
from ultralytics.engine.results import Results
|
||||
|
||||
|
||||
class Detector :
|
||||
|
||||
def __init__(self, model_path):
|
||||
self.model = YOLO(model_path)
|
||||
self.used_height = 640
|
||||
self.used_width = 640
|
||||
|
||||
def make_prediction(self, image : str | np.ndarray) -> list[Results]:
|
||||
return self.model.predict(source=image, conf=0.6)
|
||||
|
||||
if __name__ == "__main__":
|
||||
corner_model_path = "../../assets/models/edges.pt"
|
||||
pieces_model_path = "../../assets/models/unified-nano-refined.pt"
|
||||
|
||||
corner_model = YOLO(corner_model_path)
|
||||
pieces_model = YOLO(pieces_model_path)
|
||||
|
||||
img_folder = "../training/datasets/pieces/unified/test/images/"
|
||||
save_folder = "./results"
|
||||
os.makedirs(save_folder, exist_ok=True)
|
||||
|
||||
test_images = os.listdir(img_folder)
|
||||
|
||||
for i in range(0, 10):
|
||||
rnd = random.randint(0, len(test_images) - 1)
|
||||
img_path = os.path.join(img_folder, test_images[rnd])
|
||||
save_path = os.path.join(save_folder, test_images[rnd])
|
||||
|
||||
image = cv2.imread(img_path)
|
||||
height, width = image.shape[:2]
|
||||
|
||||
#fen = prediction_to_fen(results, height, width)
|
||||
#print("Predicted FEN:", fen)
|
||||
|
||||
corner_result = corner_model.predict(source=image, conf=0.6)
|
||||
pieces_result = pieces_model.predict(source=image, conf=0.6)
|
||||
|
||||
corner_annotated_image = corner_result[0].plot()
|
||||
pieces_annotated_image = pieces_result[0].plot(img=corner_annotated_image)
|
||||
|
||||
cv2.imwrite(save_path, pieces_annotated_image)
|
||||
#cv2.namedWindow("YOLO Predictions", cv2.WINDOW_NORMAL)
|
||||
#cv2.imshow("YOLO Predictions", annotated_image)
|
||||
|
||||
cv2.waitKey(0)
|
||||
cv2.destroyAllWindows()
|
||||
@@ -1,112 +0,0 @@
|
||||
import cv2
|
||||
import numpy as np
|
||||
from typing import Any
|
||||
|
||||
from numpy import ndarray
|
||||
|
||||
|
||||
class PiecesManager:
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def extract_pieces(self, pieces_pred) -> list[Any]:
|
||||
result = pieces_pred[0]
|
||||
detections = []
|
||||
|
||||
for box in result.boxes:
|
||||
# xywh en pixels de l'image originale
|
||||
x, y, w, h = box.xywh[0].cpu().numpy()
|
||||
label = result.names[int(box.cls[0])]
|
||||
detections.append({"label": label, "bbox": (int(x), int(y), int(w), int(h))})
|
||||
|
||||
return detections
|
||||
|
||||
def pieces_to_board(self, detected_boxes: list, warped_corners: ndarray, matrix: np.ndarray, board_size: tuple[int, int]) -> list[list[str | None]]:
|
||||
|
||||
board_array = [[None for _ in range(8)] for _ in range(8)]
|
||||
board_width, board_height = board_size
|
||||
|
||||
tl, tr, br, bl = warped_corners
|
||||
square_centers = self.__compute_square_centers(tl, tr, br, bl)
|
||||
|
||||
for d in detected_boxes:
|
||||
x, y, w, h = d["bbox"]
|
||||
|
||||
points = np.array([
|
||||
[x + w / 2, y + h * 0.2],
|
||||
[x + w / 2, y + h / 2],
|
||||
[x + w / 2, y + h * 0.8]
|
||||
], dtype=np.float32).reshape(-1, 1, 2)
|
||||
|
||||
warped_points = cv2.perspectiveTransform(points, matrix)
|
||||
|
||||
wx = np.mean(warped_points[:, 0, 0])
|
||||
wy = np.percentile(warped_points[:, 0, 1], 25)
|
||||
|
||||
best_rank = 0
|
||||
best_file = 0
|
||||
min_dist = float("inf")
|
||||
|
||||
for r, c, cx, cy in square_centers:
|
||||
dist = (wx - cx) ** 2 + (wy - cy) ** 2
|
||||
if dist < min_dist:
|
||||
min_dist = dist
|
||||
best_rank = r
|
||||
best_file = c
|
||||
|
||||
max_reasonable_dist = (board_width / 8) ** 2
|
||||
if min_dist > max_reasonable_dist:
|
||||
continue
|
||||
|
||||
board_array[best_rank][best_file] = d["label"]
|
||||
|
||||
return board_array
|
||||
|
||||
def board_to_fen(self, board : list[list[str | None]]) -> str:
|
||||
map_fen = {
|
||||
"w_pawn": "P", "w_knight": "N", "w_bishop": "B",
|
||||
"w_rook": "R", "w_queen": "Q", "w_king": "K",
|
||||
"b_pawn": "p", "b_knight": "n", "b_bishop": "b",
|
||||
"b_rook": "r", "b_queen": "q", "b_king": "k",
|
||||
}
|
||||
rows = []
|
||||
for rank in board:
|
||||
empty = 0
|
||||
row = ""
|
||||
for sq in rank:
|
||||
if sq is None:
|
||||
empty += 1
|
||||
else:
|
||||
if empty:
|
||||
row += str(empty)
|
||||
empty = 0
|
||||
row += map_fen[sq]
|
||||
if empty:
|
||||
row += str(empty)
|
||||
rows.append(row)
|
||||
return "/".join(rows)
|
||||
|
||||
def __compute_square_centers(self, tl, tr, br, bl):
|
||||
centers = []
|
||||
for line in range(8):
|
||||
for file in range(8):
|
||||
u = (file + 0.5) / 8
|
||||
v = (line + 0.5) / 8
|
||||
|
||||
# interpolation bilinéaire
|
||||
x = (
|
||||
(1 - u) * (1 - v) * tl[0] +
|
||||
u * (1 - v) * tr[0] +
|
||||
u * v * br[0] +
|
||||
(1 - u) * v * bl[0]
|
||||
)
|
||||
y = (
|
||||
(1 - u) * (1 - v) * tl[1] +
|
||||
u * (1 - v) * tr[1] +
|
||||
u * v * br[1] +
|
||||
(1 - u) * v * bl[1]
|
||||
)
|
||||
|
||||
centers.append((line, file, x, y))
|
||||
return centers
|
||||
@@ -1,95 +0,0 @@
|
||||
import cv2
|
||||
import numpy as np
|
||||
from pathlib import Path
|
||||
|
||||
from ultralytics.engine.results import Results
|
||||
|
||||
from hardware.camera.camera import Camera
|
||||
from models.detection.detector import Detector
|
||||
from models.detection.board_manager import BoardManager
|
||||
from models.detection.pieces_manager import PiecesManager
|
||||
|
||||
|
||||
class DetectionService:
|
||||
|
||||
edges_detector : Detector
|
||||
pieces_detector : Detector
|
||||
|
||||
board_manager : BoardManager
|
||||
pieces_manager : PiecesManager
|
||||
|
||||
scale_size : tuple[int, int]
|
||||
|
||||
camera : Camera
|
||||
|
||||
def __init__(self):
|
||||
current_file = Path(__file__).resolve()
|
||||
project_root = current_file.parent.parent
|
||||
|
||||
self.edges_detector = Detector(project_root / "assets" / "models" / "edges.pt")
|
||||
self.pieces_detector = Detector(project_root / "assets" / "models" / "unified-nano-refined.pt")
|
||||
|
||||
self.pieces_manager = PiecesManager()
|
||||
self.board_manager = BoardManager()
|
||||
self.scale_size = (800, 800)
|
||||
self.camera = Camera()
|
||||
|
||||
def start(self):
|
||||
self.camera.open()
|
||||
|
||||
def stop(self):
|
||||
self.camera.close()
|
||||
|
||||
def analyze_single_frame(self) -> tuple[bytes, str | None]:
|
||||
frame = self.camera.take_photo()
|
||||
encoded_frame = cv2.imencode('.jpg', frame, [int(cv2.IMWRITE_JPEG_QUALITY), 80])[1].tobytes()
|
||||
|
||||
result = self.__run_complete_detection(frame)
|
||||
|
||||
edges_prediction = result["edges"]
|
||||
pieces_prediction = result["pieces"]
|
||||
|
||||
processed_frame = self.board_manager.process_frame(edges_prediction[0], frame, self.scale_size)
|
||||
if processed_frame is None:
|
||||
return encoded_frame, None
|
||||
|
||||
warped_corners, matrix = processed_frame
|
||||
|
||||
detections = self.pieces_manager.extract_pieces(pieces_prediction)
|
||||
|
||||
board = self.pieces_manager.pieces_to_board(detections, warped_corners, matrix, self.scale_size)
|
||||
|
||||
return encoded_frame, self.pieces_manager.board_to_fen(board)
|
||||
|
||||
def __run_complete_detection(self, frame : np.ndarray, display=False) -> dict[str, list[Results]] :
|
||||
pieces_prediction = self.__run_pieces_detection(frame)
|
||||
edges_prediction = self.__run_edges_detection(frame)
|
||||
|
||||
if display:
|
||||
edges_annotated_frame = edges_prediction[0].plot()
|
||||
pieces_annotated_frame = pieces_prediction[0].plot(img=edges_annotated_frame)
|
||||
self.__display_frame(pieces_annotated_frame)
|
||||
|
||||
return { "edges" : edges_prediction, "pieces" : pieces_prediction}
|
||||
|
||||
|
||||
def __run_pieces_detection(self, frame : np.ndarray, display=False) -> list[Results]:
|
||||
prediction = self.pieces_detector.make_prediction(frame)
|
||||
if display:
|
||||
self.__display_frame(prediction[0].plot())
|
||||
return prediction
|
||||
|
||||
|
||||
def __run_edges_detection(self, frame : np.ndarray, display=False) -> list[Results]:
|
||||
prediction = self.edges_detector.make_prediction(frame)
|
||||
if display:
|
||||
self.__display_frame(prediction[0].plot())
|
||||
return prediction
|
||||
|
||||
def __display_frame(self, frame : np.ndarray):
|
||||
cv2.namedWindow("Frame", cv2.WINDOW_NORMAL)
|
||||
cv2.resizeWindow("Frame", self.scale_size[0], self.scale_size[1])
|
||||
cv2.imshow("Frame", frame)
|
||||
cv2.waitKey(0)
|
||||
cv2.destroyAllWindows()
|
||||
return
|
||||
@@ -2,29 +2,29 @@ import json
|
||||
from typing import Callable
|
||||
|
||||
from hardware.buzzer.buzzer import Buzzer
|
||||
from hardware.camera.camera import Camera
|
||||
from hardware.led.led import Led
|
||||
from models.exceptions.ServiceException import ServiceException
|
||||
from models.game import Game
|
||||
from services.clock_service import ClockService
|
||||
from services.detection_service import DetectionService
|
||||
|
||||
|
||||
class GameService:
|
||||
|
||||
_game : Game
|
||||
_detection_service : DetectionService
|
||||
_camera : Camera
|
||||
_clock_service : ClockService
|
||||
_has_started : bool
|
||||
_led : Led
|
||||
_buzzer : Buzzer
|
||||
_on_terminated : Callable[[str], None]
|
||||
_has_started : bool
|
||||
|
||||
def __init__(self):
|
||||
self._detection_service = DetectionService()
|
||||
self._camera = Camera()
|
||||
self._clock_service = ClockService()
|
||||
self._has_started = False
|
||||
self._led = Led(7)
|
||||
self._buzzer = Buzzer(8)
|
||||
self._has_started = False
|
||||
|
||||
def start(self, white_name, back_name, time_control : int, increment : int, timestamp : int) -> None:
|
||||
if self._has_started :
|
||||
@@ -46,18 +46,20 @@ class GameService:
|
||||
self._notify()
|
||||
self._has_started = False
|
||||
|
||||
def make_move(self) -> tuple[bytes, str] | None:
|
||||
def make_move(self) -> bytes:
|
||||
try :
|
||||
if not self._has_started :
|
||||
raise Exception("Game hasn't started yet.")
|
||||
self._clock_service.switch()
|
||||
img, fen = self._detection_service.analyze_single_frame()
|
||||
self._game.add_move(fen)
|
||||
return img, fen
|
||||
img = self._camera.take_photo()
|
||||
return img
|
||||
except Exception as e:
|
||||
print(e)
|
||||
raise ServiceException(e)
|
||||
|
||||
def add_move(self, fen):
|
||||
self._game.add_move(fen)
|
||||
|
||||
def set_on_terminated(self, callback: Callable[[str], None]):
|
||||
self._on_terminated = callback
|
||||
|
||||
|
||||
Reference in New Issue
Block a user