Files
board-mate/rpi/controllers/GameController.py
2026-01-01 03:19:55 +01:00

78 lines
2.8 KiB
Python

import base64
import threading
import requests
from flask import jsonify, request
from models.exceptions.ServiceException import ServiceException
from services.game_service import GameService
class GameController:
_game_service : GameService
_has_started : bool
_auth_token : str
def __init__(self, app):
self._game_service = GameService()
self._register_routes(app)
self._auth_token = "0eed89e8-7625-4f8d-bf2a-0872aede0efb"
def _register_routes(self, app):
app.add_url_rule("/command/party/start", view_func=self.start_party, methods=['POST'])
app.add_url_rule("/command/party/play", view_func=self.make_move, methods=['POST'])
def start_party(self):
try:
data = request.get_json()
if data is None:
raise Exception("Data must be provided")
white_name = data["white_name"]
black_name = data["black_name"]
time_control = int(data["time_control"])
increment = int(data["increment"])
self._game_service.start(white_name, black_name, time_control, increment)
self._has_started = True
return jsonify({"status": "ok", "message": "Game started"}), 200
except ServiceException as ex:
return jsonify({"status": "error", "message": f"An error occurred : {ex}"}), 400
except Exception as ex:
print(ex)
return jsonify({"status": "error", "message": f"An error occurred : {ex}"}), 500
def make_move(self):
try:
auth_token = request.headers.get("Authorization")
if auth_token != "Bearer " + self._auth_token:
return jsonify({"status": "error", "message": "Invalid authorization token"}), 401
threading.Thread(
target=self._analyze_move(),
daemon=True
).start()
return jsonify({"status": "ok"}), 200
except ServiceException as ex:
return jsonify({"status": "error", "message": f"An error occurred : {ex}"}), 400
except Exception as ex:
print(ex)
return jsonify({"status": "error", "message": f"An error occurred : {ex}"}), 500
def _analyze_move(self):
img, fen = self._game_service.make_move()
self._send_detection_result("https://192.168.15.125:1880/party/image", img, fen)
def _send_detection_result(self, url, img, fen):
try:
b64_img = base64.b64encode(img).decode('utf-8')
payload = {
"fen": fen,
"image": f"data:image/jpeg;base64,{b64_img}"
}
response = requests.post(url, json=payload, verify=False)
print(response.status_code)
except Exception as e:
print(e)