Try stuff
This commit is contained in:
Binary file not shown.
15
rpi/main.py
15
rpi/main.py
@@ -1,6 +1,8 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
import cv2
|
||||||
|
import requests
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from flask import Flask, jsonify
|
from flask import Flask, jsonify
|
||||||
|
|
||||||
@@ -62,7 +64,18 @@ def start_party():
|
|||||||
@app.route("/command/party/play", methods=['POST'])
|
@app.route("/command/party/play", methods=['POST'])
|
||||||
def make_move():
|
def make_move():
|
||||||
try:
|
try:
|
||||||
game_service.make_move()
|
frame, fen = game_service.make_move()
|
||||||
|
print(fen)
|
||||||
|
|
||||||
|
encoded_frame = cv2.imencode('.jpg', frame)
|
||||||
|
image_bytes = encoded_frame[1].tobytes()
|
||||||
|
headers = {'Content-Type': 'image/jpeg'}
|
||||||
|
response = requests.post(
|
||||||
|
"https://192.168.15.125:1880/party/image",
|
||||||
|
data=image_bytes,
|
||||||
|
headers=headers,
|
||||||
|
verify=False)
|
||||||
|
print(response.status_code)
|
||||||
return jsonify({"status": "ok", "message": "Party started"}), 200
|
return jsonify({"status": "ok", "message": "Party started"}), 200
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
print(ex)
|
print(ex)
|
||||||
|
|||||||
@@ -40,11 +40,25 @@ class DetectionService:
|
|||||||
def stop(self):
|
def stop(self):
|
||||||
self.camera.close()
|
self.camera.close()
|
||||||
|
|
||||||
def analyze_single_frame(self) -> tuple[np.ndarray, str | None]:
|
def analyze_single_frame(self) -> tuple[bytes, str | None]:
|
||||||
frame = self.camera.take_photo()
|
frame = self.camera.take_photo()
|
||||||
fen = self.__get_fen(frame)
|
result = self.__run_complete_detection(frame)
|
||||||
return frame, fen
|
|
||||||
|
|
||||||
|
edges_prediction = result["edges"]
|
||||||
|
pieces_prediction = result["pieces"]
|
||||||
|
|
||||||
|
processed_frame = self.board_manager.process_frame(edges_prediction[0], frame, self.scale_size)
|
||||||
|
if processed_frame is None:
|
||||||
|
return frame.tobytes(), None
|
||||||
|
|
||||||
|
warped_corners = processed_frame[0]
|
||||||
|
matrix = processed_frame[1]
|
||||||
|
|
||||||
|
detections = self.pieces_manager.extract_pieces(pieces_prediction)
|
||||||
|
|
||||||
|
board = self.pieces_manager.pieces_to_board(detections, warped_corners, matrix, self.scale_size)
|
||||||
|
|
||||||
|
return pieces_prediction[0].plot().tobytes(), self.pieces_manager.board_to_fen(board)
|
||||||
|
|
||||||
def __run_complete_detection(self, frame : np.ndarray, display=False) -> dict[str, list[Results]] :
|
def __run_complete_detection(self, frame : np.ndarray, display=False) -> dict[str, list[Results]] :
|
||||||
pieces_prediction = self.__run_pieces_detection(frame)
|
pieces_prediction = self.__run_pieces_detection(frame)
|
||||||
@@ -71,26 +85,6 @@ class DetectionService:
|
|||||||
self.__display_frame(prediction[0].plot())
|
self.__display_frame(prediction[0].plot())
|
||||||
return prediction
|
return prediction
|
||||||
|
|
||||||
|
|
||||||
def __get_fen(self, frame : np.ndarray) -> str | None:
|
|
||||||
result = self.__run_complete_detection(frame)
|
|
||||||
|
|
||||||
edges_prediction = result["edges"]
|
|
||||||
pieces_prediction = result["pieces"]
|
|
||||||
|
|
||||||
processed_frame = self.board_manager.process_frame(edges_prediction[0], frame, self.scale_size)
|
|
||||||
if processed_frame is None:
|
|
||||||
return None
|
|
||||||
|
|
||||||
warped_corners = processed_frame[0]
|
|
||||||
matrix = processed_frame[1]
|
|
||||||
|
|
||||||
detections = self.pieces_manager.extract_pieces(pieces_prediction)
|
|
||||||
|
|
||||||
board = self.pieces_manager.pieces_to_board(detections, warped_corners, matrix, self.scale_size)
|
|
||||||
|
|
||||||
return self.pieces_manager.board_to_fen(board)
|
|
||||||
|
|
||||||
def __display_frame(self, frame : np.ndarray):
|
def __display_frame(self, frame : np.ndarray):
|
||||||
cv2.namedWindow("Frame", cv2.WINDOW_NORMAL)
|
cv2.namedWindow("Frame", cv2.WINDOW_NORMAL)
|
||||||
cv2.resizeWindow("Frame", self.scale_size[0], self.scale_size[1])
|
cv2.resizeWindow("Frame", self.scale_size[0], self.scale_size[1])
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import cv2
|
import cv2
|
||||||
|
import numpy as np
|
||||||
import requests
|
import requests
|
||||||
|
|
||||||
from services.clock_service import ClockService
|
from services.clock_service import ClockService
|
||||||
@@ -22,16 +23,5 @@ class GameService:
|
|||||||
self.clock_service.stop()
|
self.clock_service.stop()
|
||||||
self.detection_service.stop()
|
self.detection_service.stop()
|
||||||
|
|
||||||
def make_move(self) -> None:
|
def make_move(self) -> tuple[bytes, str]:
|
||||||
img, fen = self.detection_service.analyze_single_frame()
|
return self.detection_service.analyze_single_frame()
|
||||||
print(fen)
|
|
||||||
|
|
||||||
encoded_frame = cv2.imencode('.jpg', img)
|
|
||||||
image_bytes = encoded_frame[1].tobytes()
|
|
||||||
headers = {'Content-Type': 'image/jpeg'}
|
|
||||||
response = requests.post(
|
|
||||||
"https://192.168.15.125:1880/party/image",
|
|
||||||
data=image_bytes,
|
|
||||||
headers=headers,
|
|
||||||
verify=False)
|
|
||||||
print(response.status_code)
|
|
||||||
Reference in New Issue
Block a user