diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..8d6c9338 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/rpi/api-resources/generated/ diff --git a/rpi/.env b/rpi/.env new file mode 100644 index 00000000..e69de29b diff --git a/rpi/api-resources/board-mate.yaml b/rpi/api-resources/board-mate.yaml new file mode 100644 index 00000000..05265541 --- /dev/null +++ b/rpi/api-resources/board-mate.yaml @@ -0,0 +1,95 @@ +openapi: "3.1.0" +info: + title: "boardmate_api API" + description: "boardmate_api API" + version: "1.0.0" +servers: + - url: "https://boardmate_api" +paths: + /games: + get: + summary: "GET games" + operationId: "retrieveAllGames" + responses: + "200": + description: "OK" + content: + '*/*': + schema: + type: "object" + /games/{id}: + get: + summary: "GET games/{id}" + operationId: "retrieveGames" + parameters: + - name: "id" + in: "path" + required: true + schema: + type: "string" + responses: + "200": + description: "OK" + content: + '*/*': + schema: + $ref: "#/components/schemas/GameDto" + /create: + post: + summary: "POST create" + operationId: "CreateParty" + requestBody: + content: + application/json: + schema: + $ref: "#/components/schemas/GameDto" + required: true + responses: + "200": + description: "OK" + content: + '*/*': + schema: + type: "string" + /moves/add/{gameId}: + post: + summary: "POST moves/add/{gameId}" + operationId: "AddMove" + parameters: + - name: "gameId" + in: "path" + required: true + schema: + type: "string" + requestBody: + content: + application/json: + schema: + type: "string" + required: true + responses: + "200": + description: "OK" + content: + '*/*': + schema: + type: "string" +components: + schemas: + GameDto: + type: "object" + properties: + whiteName: + type: "string" + nullable: true + blackName: + type: "string" + nullable: true + timeValue: + type: "integer" + format: "int32" + nullable: true + increment: + type: "integer" + format: "int32" + nullable: true \ No newline at end of file diff --git a/rpi/api-resources/openapi-generator-cli.jar b/rpi/api-resources/openapi-generator-cli.jar new file mode 100644 index 00000000..8ea8d073 Binary files /dev/null and b/rpi/api-resources/openapi-generator-cli.jar differ diff --git a/rpi/board_mate_client/__init__.py b/rpi/board_mate_client/__init__.py new file mode 100644 index 00000000..3d1900a0 --- /dev/null +++ b/rpi/board_mate_client/__init__.py @@ -0,0 +1,34 @@ +# coding: utf-8 + +# flake8: noqa + +""" + boardmate_api API + + boardmate_api API + + The version of the OpenAPI document: 1.0.0 + Generated by OpenAPI Generator (https://openapi-generator.tech) + + Do not edit the class manually. +""" # noqa: E501 + + +__version__ = "1.0.0" + +# import apis into sdk package +from board_mate_client.api.default_api import DefaultApi + +# import ApiClient +from board_mate_client.api_response import ApiResponse +from board_mate_client.api_client import ApiClient +from board_mate_client.configuration import Configuration +from board_mate_client.exceptions import OpenApiException +from board_mate_client.exceptions import ApiTypeError +from board_mate_client.exceptions import ApiValueError +from board_mate_client.exceptions import ApiKeyError +from board_mate_client.exceptions import ApiAttributeError +from board_mate_client.exceptions import ApiException + +# import models into sdk package +from board_mate_client.models.game_dto import GameDto diff --git a/rpi/board_mate_client/api/__init__.py b/rpi/board_mate_client/api/__init__.py new file mode 100644 index 00000000..8b0861dd --- /dev/null +++ b/rpi/board_mate_client/api/__init__.py @@ -0,0 +1,5 @@ +# flake8: noqa + +# import apis into api package +from board_mate_client.api.default_api import DefaultApi + diff --git a/rpi/board_mate_client/api/default_api.py b/rpi/board_mate_client/api/default_api.py new file mode 100644 index 00000000..b2c12647 --- /dev/null +++ b/rpi/board_mate_client/api/default_api.py @@ -0,0 +1,1080 @@ +# coding: utf-8 + +""" + boardmate_api API + + boardmate_api API + + The version of the OpenAPI document: 1.0.0 + Generated by OpenAPI Generator (https://openapi-generator.tech) + + Do not edit the class manually. +""" # noqa: E501 + +import warnings +from pydantic import validate_call, Field, StrictFloat, StrictStr, StrictInt +from typing import Any, Dict, List, Optional, Tuple, Union +from typing_extensions import Annotated + +from pydantic import StrictStr +from typing import Any, Dict +from board_mate_client.models.game_dto import GameDto + +from board_mate_client.api_client import ApiClient, RequestSerialized +from board_mate_client.api_response import ApiResponse +from board_mate_client.rest import RESTResponseType + + +class DefaultApi: + """NOTE: This class is auto generated by OpenAPI Generator + Ref: https://openapi-generator.tech + + Do not edit the class manually. + """ + + def __init__(self, api_client=None) -> None: + if api_client is None: + api_client = ApiClient.get_default() + self.api_client = api_client + + + @validate_call + def add_move( + self, + game_id: StrictStr, + body: StrictStr, + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], + Annotated[StrictFloat, Field(gt=0)] + ] + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> str: + """POST moves/add/{gameId} + + + :param game_id: (required) + :type game_id: str + :param body: (required) + :type body: str + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._add_move_serialize( + game_id=game_id, + body=body, + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index + ) + + _response_types_map: Dict[str, Optional[str]] = { + '200': "str", + } + response_data = self.api_client.call_api( + *_param, + _request_timeout=_request_timeout + ) + response_data.read() + return self.api_client.response_deserialize( + response_data=response_data, + response_types_map=_response_types_map, + ).data + + + @validate_call + def add_move_with_http_info( + self, + game_id: StrictStr, + body: StrictStr, + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], + Annotated[StrictFloat, Field(gt=0)] + ] + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> ApiResponse[str]: + """POST moves/add/{gameId} + + + :param game_id: (required) + :type game_id: str + :param body: (required) + :type body: str + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._add_move_serialize( + game_id=game_id, + body=body, + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index + ) + + _response_types_map: Dict[str, Optional[str]] = { + '200': "str", + } + response_data = self.api_client.call_api( + *_param, + _request_timeout=_request_timeout + ) + response_data.read() + return self.api_client.response_deserialize( + response_data=response_data, + response_types_map=_response_types_map, + ) + + + @validate_call + def add_move_without_preload_content( + self, + game_id: StrictStr, + body: StrictStr, + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], + Annotated[StrictFloat, Field(gt=0)] + ] + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> RESTResponseType: + """POST moves/add/{gameId} + + + :param game_id: (required) + :type game_id: str + :param body: (required) + :type body: str + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._add_move_serialize( + game_id=game_id, + body=body, + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index + ) + + _response_types_map: Dict[str, Optional[str]] = { + '200': "str", + } + response_data = self.api_client.call_api( + *_param, + _request_timeout=_request_timeout + ) + return response_data.response + + + def _add_move_serialize( + self, + game_id, + body, + _request_auth, + _content_type, + _headers, + _host_index, + ) -> RequestSerialized: + + _host = None + + _collection_formats: Dict[str, str] = { + } + + _path_params: Dict[str, str] = {} + _query_params: List[Tuple[str, str]] = [] + _header_params: Dict[str, Optional[str]] = _headers or {} + _form_params: List[Tuple[str, str]] = [] + _files: Dict[str, Union[str, bytes]] = {} + _body_params: Optional[bytes] = None + + # process the path parameters + if game_id is not None: + _path_params['gameId'] = game_id + # process the query parameters + # process the header parameters + # process the form parameters + # process the body parameter + if body is not None: + _body_params = body + + + # set the HTTP header `Accept` + _header_params['Accept'] = self.api_client.select_header_accept( + [ + '*/*' + ] + ) + + # set the HTTP header `Content-Type` + if _content_type: + _header_params['Content-Type'] = _content_type + else: + _default_content_type = ( + self.api_client.select_header_content_type( + [ + 'application/json' + ] + ) + ) + if _default_content_type is not None: + _header_params['Content-Type'] = _default_content_type + + # authentication setting + _auth_settings: List[str] = [ + ] + + return self.api_client.param_serialize( + method='POST', + resource_path='/moves/add/{gameId}', + path_params=_path_params, + query_params=_query_params, + header_params=_header_params, + body=_body_params, + post_params=_form_params, + files=_files, + auth_settings=_auth_settings, + collection_formats=_collection_formats, + _host=_host, + _request_auth=_request_auth + ) + + + + + @validate_call + def create_party( + self, + game_dto: GameDto, + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], + Annotated[StrictFloat, Field(gt=0)] + ] + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> str: + """POST create + + + :param game_dto: (required) + :type game_dto: GameDto + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._create_party_serialize( + game_dto=game_dto, + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index + ) + + _response_types_map: Dict[str, Optional[str]] = { + '200': "str", + } + response_data = self.api_client.call_api( + *_param, + _request_timeout=_request_timeout + ) + response_data.read() + return self.api_client.response_deserialize( + response_data=response_data, + response_types_map=_response_types_map, + ).data + + + @validate_call + def create_party_with_http_info( + self, + game_dto: GameDto, + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], + Annotated[StrictFloat, Field(gt=0)] + ] + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> ApiResponse[str]: + """POST create + + + :param game_dto: (required) + :type game_dto: GameDto + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._create_party_serialize( + game_dto=game_dto, + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index + ) + + _response_types_map: Dict[str, Optional[str]] = { + '200': "str", + } + response_data = self.api_client.call_api( + *_param, + _request_timeout=_request_timeout + ) + response_data.read() + return self.api_client.response_deserialize( + response_data=response_data, + response_types_map=_response_types_map, + ) + + + @validate_call + def create_party_without_preload_content( + self, + game_dto: GameDto, + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], + Annotated[StrictFloat, Field(gt=0)] + ] + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> RESTResponseType: + """POST create + + + :param game_dto: (required) + :type game_dto: GameDto + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._create_party_serialize( + game_dto=game_dto, + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index + ) + + _response_types_map: Dict[str, Optional[str]] = { + '200': "str", + } + response_data = self.api_client.call_api( + *_param, + _request_timeout=_request_timeout + ) + return response_data.response + + + def _create_party_serialize( + self, + game_dto, + _request_auth, + _content_type, + _headers, + _host_index, + ) -> RequestSerialized: + + _host = None + + _collection_formats: Dict[str, str] = { + } + + _path_params: Dict[str, str] = {} + _query_params: List[Tuple[str, str]] = [] + _header_params: Dict[str, Optional[str]] = _headers or {} + _form_params: List[Tuple[str, str]] = [] + _files: Dict[str, Union[str, bytes]] = {} + _body_params: Optional[bytes] = None + + # process the path parameters + # process the query parameters + # process the header parameters + # process the form parameters + # process the body parameter + if game_dto is not None: + _body_params = game_dto + + + # set the HTTP header `Accept` + _header_params['Accept'] = self.api_client.select_header_accept( + [ + '*/*' + ] + ) + + # set the HTTP header `Content-Type` + if _content_type: + _header_params['Content-Type'] = _content_type + else: + _default_content_type = ( + self.api_client.select_header_content_type( + [ + 'application/json' + ] + ) + ) + if _default_content_type is not None: + _header_params['Content-Type'] = _default_content_type + + # authentication setting + _auth_settings: List[str] = [ + ] + + return self.api_client.param_serialize( + method='POST', + resource_path='/create', + path_params=_path_params, + query_params=_query_params, + header_params=_header_params, + body=_body_params, + post_params=_form_params, + files=_files, + auth_settings=_auth_settings, + collection_formats=_collection_formats, + _host=_host, + _request_auth=_request_auth + ) + + + + + @validate_call + def retrieve_all_games( + self, + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], + Annotated[StrictFloat, Field(gt=0)] + ] + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> object: + """GET games + + + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._retrieve_all_games_serialize( + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index + ) + + _response_types_map: Dict[str, Optional[str]] = { + '200': "object", + } + response_data = self.api_client.call_api( + *_param, + _request_timeout=_request_timeout + ) + response_data.read() + return self.api_client.response_deserialize( + response_data=response_data, + response_types_map=_response_types_map, + ).data + + + @validate_call + def retrieve_all_games_with_http_info( + self, + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], + Annotated[StrictFloat, Field(gt=0)] + ] + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> ApiResponse[object]: + """GET games + + + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._retrieve_all_games_serialize( + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index + ) + + _response_types_map: Dict[str, Optional[str]] = { + '200': "object", + } + response_data = self.api_client.call_api( + *_param, + _request_timeout=_request_timeout + ) + response_data.read() + return self.api_client.response_deserialize( + response_data=response_data, + response_types_map=_response_types_map, + ) + + + @validate_call + def retrieve_all_games_without_preload_content( + self, + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], + Annotated[StrictFloat, Field(gt=0)] + ] + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> RESTResponseType: + """GET games + + + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._retrieve_all_games_serialize( + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index + ) + + _response_types_map: Dict[str, Optional[str]] = { + '200': "object", + } + response_data = self.api_client.call_api( + *_param, + _request_timeout=_request_timeout + ) + return response_data.response + + + def _retrieve_all_games_serialize( + self, + _request_auth, + _content_type, + _headers, + _host_index, + ) -> RequestSerialized: + + _host = None + + _collection_formats: Dict[str, str] = { + } + + _path_params: Dict[str, str] = {} + _query_params: List[Tuple[str, str]] = [] + _header_params: Dict[str, Optional[str]] = _headers or {} + _form_params: List[Tuple[str, str]] = [] + _files: Dict[str, Union[str, bytes]] = {} + _body_params: Optional[bytes] = None + + # process the path parameters + # process the query parameters + # process the header parameters + # process the form parameters + # process the body parameter + + + # set the HTTP header `Accept` + _header_params['Accept'] = self.api_client.select_header_accept( + [ + '*/*' + ] + ) + + + # authentication setting + _auth_settings: List[str] = [ + ] + + return self.api_client.param_serialize( + method='GET', + resource_path='/games', + path_params=_path_params, + query_params=_query_params, + header_params=_header_params, + body=_body_params, + post_params=_form_params, + files=_files, + auth_settings=_auth_settings, + collection_formats=_collection_formats, + _host=_host, + _request_auth=_request_auth + ) + + + + + @validate_call + def retrieve_games( + self, + id: StrictStr, + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], + Annotated[StrictFloat, Field(gt=0)] + ] + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> GameDto: + """GET games/{id} + + + :param id: (required) + :type id: str + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._retrieve_games_serialize( + id=id, + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index + ) + + _response_types_map: Dict[str, Optional[str]] = { + '200': "GameDto", + } + response_data = self.api_client.call_api( + *_param, + _request_timeout=_request_timeout + ) + response_data.read() + return self.api_client.response_deserialize( + response_data=response_data, + response_types_map=_response_types_map, + ).data + + + @validate_call + def retrieve_games_with_http_info( + self, + id: StrictStr, + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], + Annotated[StrictFloat, Field(gt=0)] + ] + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> ApiResponse[GameDto]: + """GET games/{id} + + + :param id: (required) + :type id: str + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._retrieve_games_serialize( + id=id, + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index + ) + + _response_types_map: Dict[str, Optional[str]] = { + '200': "GameDto", + } + response_data = self.api_client.call_api( + *_param, + _request_timeout=_request_timeout + ) + response_data.read() + return self.api_client.response_deserialize( + response_data=response_data, + response_types_map=_response_types_map, + ) + + + @validate_call + def retrieve_games_without_preload_content( + self, + id: StrictStr, + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], + Annotated[StrictFloat, Field(gt=0)] + ] + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> RESTResponseType: + """GET games/{id} + + + :param id: (required) + :type id: str + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._retrieve_games_serialize( + id=id, + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index + ) + + _response_types_map: Dict[str, Optional[str]] = { + '200': "GameDto", + } + response_data = self.api_client.call_api( + *_param, + _request_timeout=_request_timeout + ) + return response_data.response + + + def _retrieve_games_serialize( + self, + id, + _request_auth, + _content_type, + _headers, + _host_index, + ) -> RequestSerialized: + + _host = None + + _collection_formats: Dict[str, str] = { + } + + _path_params: Dict[str, str] = {} + _query_params: List[Tuple[str, str]] = [] + _header_params: Dict[str, Optional[str]] = _headers or {} + _form_params: List[Tuple[str, str]] = [] + _files: Dict[str, Union[str, bytes]] = {} + _body_params: Optional[bytes] = None + + # process the path parameters + if id is not None: + _path_params['id'] = id + # process the query parameters + # process the header parameters + # process the form parameters + # process the body parameter + + + # set the HTTP header `Accept` + _header_params['Accept'] = self.api_client.select_header_accept( + [ + '*/*' + ] + ) + + + # authentication setting + _auth_settings: List[str] = [ + ] + + return self.api_client.param_serialize( + method='GET', + resource_path='/games/{id}', + path_params=_path_params, + query_params=_query_params, + header_params=_header_params, + body=_body_params, + post_params=_form_params, + files=_files, + auth_settings=_auth_settings, + collection_formats=_collection_formats, + _host=_host, + _request_auth=_request_auth + ) + + diff --git a/rpi/board_mate_client/api_client.py b/rpi/board_mate_client/api_client.py new file mode 100644 index 00000000..59eaa005 --- /dev/null +++ b/rpi/board_mate_client/api_client.py @@ -0,0 +1,770 @@ +# coding: utf-8 + +""" + boardmate_api API + + boardmate_api API + + The version of the OpenAPI document: 1.0.0 + Generated by OpenAPI Generator (https://openapi-generator.tech) + + Do not edit the class manually. +""" # noqa: E501 + + +import datetime +from dateutil.parser import parse +from enum import Enum +import json +import mimetypes +import os +import re +import tempfile + +from urllib.parse import quote +from typing import Tuple, Optional, List, Dict, Union +from pydantic import SecretStr + +from board_mate_client.configuration import Configuration +from board_mate_client.api_response import ApiResponse, T as ApiResponseT +import board_mate_client.models +from board_mate_client import rest +from board_mate_client.exceptions import ( + ApiValueError, + ApiException, + BadRequestException, + UnauthorizedException, + ForbiddenException, + NotFoundException, + ServiceException +) + +RequestSerialized = Tuple[str, str, Dict[str, str], Optional[str], List[str]] + +class ApiClient: + """Generic API client for OpenAPI client library builds. + + OpenAPI generic API client. This client handles the client- + server communication, and is invariant across implementations. Specifics of + the methods and models for each application are generated from the OpenAPI + templates. + + :param configuration: .Configuration object for this client + :param header_name: a header to pass when making calls to the API. + :param header_value: a header value to pass when making calls to + the API. + :param cookie: a cookie to include in the header when making calls + to the API + """ + + PRIMITIVE_TYPES = (float, bool, bytes, str, int) + NATIVE_TYPES_MAPPING = { + 'int': int, + 'long': int, # TODO remove as only py3 is supported? + 'float': float, + 'str': str, + 'bool': bool, + 'date': datetime.date, + 'datetime': datetime.datetime, + 'object': object, + } + _pool = None + + def __init__( + self, + configuration=None, + header_name=None, + header_value=None, + cookie=None + ) -> None: + # use default configuration if none is provided + if configuration is None: + configuration = Configuration.get_default() + self.configuration = configuration + + self.rest_client = rest.RESTClientObject(configuration) + self.default_headers = {} + if header_name is not None: + self.default_headers[header_name] = header_value + self.cookie = cookie + # Set default User-Agent. + self.user_agent = 'OpenAPI-Generator/1.0.0/python' + self.client_side_validation = configuration.client_side_validation + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + pass + + @property + def user_agent(self): + """User agent for this API client""" + return self.default_headers['User-Agent'] + + @user_agent.setter + def user_agent(self, value): + self.default_headers['User-Agent'] = value + + def set_default_header(self, header_name, header_value): + self.default_headers[header_name] = header_value + + + _default = None + + @classmethod + def get_default(cls): + """Return new instance of ApiClient. + + This method returns newly created, based on default constructor, + object of ApiClient class or returns a copy of default + ApiClient. + + :return: The ApiClient object. + """ + if cls._default is None: + cls._default = ApiClient() + return cls._default + + @classmethod + def set_default(cls, default): + """Set default instance of ApiClient. + + It stores default ApiClient. + + :param default: object of ApiClient. + """ + cls._default = default + + def param_serialize( + self, + method, + resource_path, + path_params=None, + query_params=None, + header_params=None, + body=None, + post_params=None, + files=None, auth_settings=None, + collection_formats=None, + _host=None, + _request_auth=None + ) -> RequestSerialized: + + """Builds the HTTP request params needed by the request. + :param method: Method to call. + :param resource_path: Path to method endpoint. + :param path_params: Path parameters in the url. + :param query_params: Query parameters in the url. + :param header_params: Header parameters to be + placed in the request header. + :param body: Request body. + :param post_params dict: Request post form parameters, + for `application/x-www-form-urlencoded`, `multipart/form-data`. + :param auth_settings list: Auth Settings names for the request. + :param files dict: key -> filename, value -> filepath, + for `multipart/form-data`. + :param collection_formats: dict of collection formats for path, query, + header, and post parameters. + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the authentication + in the spec for a single request. + :return: tuple of form (path, http_method, query_params, header_params, + body, post_params, files) + """ + + config = self.configuration + + # header parameters + header_params = header_params or {} + header_params.update(self.default_headers) + if self.cookie: + header_params['Cookie'] = self.cookie + if header_params: + header_params = self.sanitize_for_serialization(header_params) + header_params = dict( + self.parameters_to_tuples(header_params,collection_formats) + ) + + # path parameters + if path_params: + path_params = self.sanitize_for_serialization(path_params) + path_params = self.parameters_to_tuples( + path_params, + collection_formats + ) + for k, v in path_params: + # specified safe chars, encode everything + resource_path = resource_path.replace( + '{%s}' % k, + quote(str(v), safe=config.safe_chars_for_path_param) + ) + + # post parameters + if post_params or files: + post_params = post_params if post_params else [] + post_params = self.sanitize_for_serialization(post_params) + post_params = self.parameters_to_tuples( + post_params, + collection_formats + ) + if files: + post_params.extend(self.files_parameters(files)) + + # auth setting + self.update_params_for_auth( + header_params, + query_params, + auth_settings, + resource_path, + method, + body, + request_auth=_request_auth + ) + + # body + if body: + body = self.sanitize_for_serialization(body) + + # request url + if _host is None: + url = self.configuration.host + resource_path + else: + # use server/host defined in path or operation instead + url = _host + resource_path + + # query parameters + if query_params: + query_params = self.sanitize_for_serialization(query_params) + url_query = self.parameters_to_url_query( + query_params, + collection_formats + ) + url += "?" + url_query + + return method, url, header_params, body, post_params + + + def call_api( + self, + method, + url, + header_params=None, + body=None, + post_params=None, + _request_timeout=None + ) -> rest.RESTResponse: + """Makes the HTTP request (synchronous) + :param method: Method to call. + :param url: Path to method endpoint. + :param header_params: Header parameters to be + placed in the request header. + :param body: Request body. + :param post_params dict: Request post form parameters, + for `application/x-www-form-urlencoded`, `multipart/form-data`. + :param _request_timeout: timeout setting for this request. + :return: RESTResponse + """ + + try: + # perform request and return response + response_data = self.rest_client.request( + method, url, + headers=header_params, + body=body, post_params=post_params, + _request_timeout=_request_timeout + ) + + except ApiException as e: + raise e + + return response_data + + def response_deserialize( + self, + response_data: rest.RESTResponse, + response_types_map: Optional[Dict[str, ApiResponseT]]=None + ) -> ApiResponse[ApiResponseT]: + """Deserializes response into an object. + :param response_data: RESTResponse object to be deserialized. + :param response_types_map: dict of response types. + :return: ApiResponse + """ + + msg = "RESTResponse.read() must be called before passing it to response_deserialize()" + assert response_data.data is not None, msg + + response_type = response_types_map.get(str(response_data.status), None) + if not response_type and isinstance(response_data.status, int) and 100 <= response_data.status <= 599: + # if not found, look for '1XX', '2XX', etc. + response_type = response_types_map.get(str(response_data.status)[0] + "XX", None) + + # deserialize response data + response_text = None + return_data = None + try: + if response_type == "bytearray": + return_data = response_data.data + elif response_type == "file": + return_data = self.__deserialize_file(response_data) + elif response_type is not None: + match = None + content_type = response_data.getheader('content-type') + if content_type is not None: + match = re.search(r"charset=([a-zA-Z\-\d]+)[\s;]?", content_type) + encoding = match.group(1) if match else "utf-8" + response_text = response_data.data.decode(encoding) + if response_type in ["bytearray", "str"]: + return_data = self.__deserialize_primitive(response_text, response_type) + else: + return_data = self.deserialize(response_text, response_type) + finally: + if not 200 <= response_data.status <= 299: + raise ApiException.from_response( + http_resp=response_data, + body=response_text, + data=return_data, + ) + + return ApiResponse( + status_code = response_data.status, + data = return_data, + headers = response_data.getheaders(), + raw_data = response_data.data + ) + + def sanitize_for_serialization(self, obj): + """Builds a JSON POST object. + + If obj is None, return None. + If obj is SecretStr, return obj.get_secret_value() + If obj is str, int, long, float, bool, return directly. + If obj is datetime.datetime, datetime.date + convert to string in iso8601 format. + If obj is list, sanitize each element in the list. + If obj is dict, return the dict. + If obj is OpenAPI model, return the properties dict. + + :param obj: The data to serialize. + :return: The serialized form of data. + """ + if obj is None: + return None + elif isinstance(obj, Enum): + return obj.value + elif isinstance(obj, SecretStr): + return obj.get_secret_value() + elif isinstance(obj, self.PRIMITIVE_TYPES): + return obj + elif isinstance(obj, list): + return [ + self.sanitize_for_serialization(sub_obj) for sub_obj in obj + ] + elif isinstance(obj, tuple): + return tuple( + self.sanitize_for_serialization(sub_obj) for sub_obj in obj + ) + elif isinstance(obj, (datetime.datetime, datetime.date)): + return obj.isoformat() + + elif isinstance(obj, dict): + obj_dict = obj + else: + # Convert model obj to dict except + # attributes `openapi_types`, `attribute_map` + # and attributes which value is not None. + # Convert attribute name to json key in + # model definition for request. + if hasattr(obj, 'to_dict') and callable(getattr(obj, 'to_dict')): + obj_dict = obj.to_dict() + else: + obj_dict = obj.__dict__ + + return { + key: self.sanitize_for_serialization(val) + for key, val in obj_dict.items() + } + + def deserialize(self, response_text, response_type): + """Deserializes response into an object. + + :param response: RESTResponse object to be deserialized. + :param response_type: class literal for + deserialized object, or string of class name. + + :return: deserialized object. + """ + + # fetch data from response object + try: + data = json.loads(response_text) + except ValueError: + data = response_text + + return self.__deserialize(data, response_type) + + def __deserialize(self, data, klass): + """Deserializes dict, list, str into an object. + + :param data: dict, list or str. + :param klass: class literal, or string of class name. + + :return: object. + """ + if data is None: + return None + + if isinstance(klass, str): + if klass.startswith('List['): + m = re.match(r'List\[(.*)]', klass) + assert m is not None, "Malformed List type definition" + sub_kls = m.group(1) + return [self.__deserialize(sub_data, sub_kls) + for sub_data in data] + + if klass.startswith('Dict['): + m = re.match(r'Dict\[([^,]*), (.*)]', klass) + assert m is not None, "Malformed Dict type definition" + sub_kls = m.group(2) + return {k: self.__deserialize(v, sub_kls) + for k, v in data.items()} + + # convert str to class + if klass in self.NATIVE_TYPES_MAPPING: + klass = self.NATIVE_TYPES_MAPPING[klass] + else: + klass = getattr(board_mate_client.models, klass) + + if klass in self.PRIMITIVE_TYPES: + return self.__deserialize_primitive(data, klass) + elif klass == object: + return self.__deserialize_object(data) + elif klass == datetime.date: + return self.__deserialize_date(data) + elif klass == datetime.datetime: + return self.__deserialize_datetime(data) + elif issubclass(klass, Enum): + return self.__deserialize_enum(data, klass) + else: + return self.__deserialize_model(data, klass) + + def parameters_to_tuples(self, params, collection_formats): + """Get parameters as list of tuples, formatting collections. + + :param params: Parameters as dict or list of two-tuples + :param dict collection_formats: Parameter collection formats + :return: Parameters as list of tuples, collections formatted + """ + new_params: List[Tuple[str, str]] = [] + if collection_formats is None: + collection_formats = {} + for k, v in params.items() if isinstance(params, dict) else params: + if k in collection_formats: + collection_format = collection_formats[k] + if collection_format == 'multi': + new_params.extend((k, value) for value in v) + else: + if collection_format == 'ssv': + delimiter = ' ' + elif collection_format == 'tsv': + delimiter = '\t' + elif collection_format == 'pipes': + delimiter = '|' + else: # csv is the default + delimiter = ',' + new_params.append( + (k, delimiter.join(str(value) for value in v))) + else: + new_params.append((k, v)) + return new_params + + def parameters_to_url_query(self, params, collection_formats): + """Get parameters as list of tuples, formatting collections. + + :param params: Parameters as dict or list of two-tuples + :param dict collection_formats: Parameter collection formats + :return: URL query string (e.g. a=Hello%20World&b=123) + """ + new_params: List[Tuple[str, str]] = [] + if collection_formats is None: + collection_formats = {} + for k, v in params.items() if isinstance(params, dict) else params: + if isinstance(v, bool): + v = str(v).lower() + if isinstance(v, (int, float)): + v = str(v) + if isinstance(v, dict): + v = json.dumps(v) + + if k in collection_formats: + collection_format = collection_formats[k] + if collection_format == 'multi': + new_params.extend((k, str(value)) for value in v) + else: + if collection_format == 'ssv': + delimiter = ' ' + elif collection_format == 'tsv': + delimiter = '\t' + elif collection_format == 'pipes': + delimiter = '|' + else: # csv is the default + delimiter = ',' + new_params.append( + (k, delimiter.join(quote(str(value)) for value in v)) + ) + else: + new_params.append((k, quote(str(v)))) + + return "&".join(["=".join(map(str, item)) for item in new_params]) + + def files_parameters(self, files: Dict[str, Union[str, bytes]]): + """Builds form parameters. + + :param files: File parameters. + :return: Form parameters with files. + """ + params = [] + for k, v in files.items(): + if isinstance(v, str): + with open(v, 'rb') as f: + filename = os.path.basename(f.name) + filedata = f.read() + elif isinstance(v, bytes): + filename = k + filedata = v + else: + raise ValueError("Unsupported file value") + mimetype = ( + mimetypes.guess_type(filename)[0] + or 'application/octet-stream' + ) + params.append( + tuple([k, tuple([filename, filedata, mimetype])]) + ) + return params + + def select_header_accept(self, accepts: List[str]) -> Optional[str]: + """Returns `Accept` based on an array of accepts provided. + + :param accepts: List of headers. + :return: Accept (e.g. application/json). + """ + if not accepts: + return None + + for accept in accepts: + if re.search('json', accept, re.IGNORECASE): + return accept + + return accepts[0] + + def select_header_content_type(self, content_types): + """Returns `Content-Type` based on an array of content_types provided. + + :param content_types: List of content-types. + :return: Content-Type (e.g. application/json). + """ + if not content_types: + return None + + for content_type in content_types: + if re.search('json', content_type, re.IGNORECASE): + return content_type + + return content_types[0] + + def update_params_for_auth( + self, + headers, + queries, + auth_settings, + resource_path, + method, + body, + request_auth=None + ) -> None: + """Updates header and query params based on authentication setting. + + :param headers: Header parameters dict to be updated. + :param queries: Query parameters tuple list to be updated. + :param auth_settings: Authentication setting identifiers list. + :resource_path: A string representation of the HTTP request resource path. + :method: A string representation of the HTTP request method. + :body: A object representing the body of the HTTP request. + The object type is the return value of sanitize_for_serialization(). + :param request_auth: if set, the provided settings will + override the token in the configuration. + """ + if not auth_settings: + return + + if request_auth: + self._apply_auth_params( + headers, + queries, + resource_path, + method, + body, + request_auth + ) + else: + for auth in auth_settings: + auth_setting = self.configuration.auth_settings().get(auth) + if auth_setting: + self._apply_auth_params( + headers, + queries, + resource_path, + method, + body, + auth_setting + ) + + def _apply_auth_params( + self, + headers, + queries, + resource_path, + method, + body, + auth_setting + ) -> None: + """Updates the request parameters based on a single auth_setting + + :param headers: Header parameters dict to be updated. + :param queries: Query parameters tuple list to be updated. + :resource_path: A string representation of the HTTP request resource path. + :method: A string representation of the HTTP request method. + :body: A object representing the body of the HTTP request. + The object type is the return value of sanitize_for_serialization(). + :param auth_setting: auth settings for the endpoint + """ + if auth_setting['in'] == 'cookie': + headers['Cookie'] = auth_setting['value'] + elif auth_setting['in'] == 'header': + if auth_setting['type'] != 'http-signature': + headers[auth_setting['key']] = auth_setting['value'] + elif auth_setting['in'] == 'query': + queries.append((auth_setting['key'], auth_setting['value'])) + else: + raise ApiValueError( + 'Authentication token must be in `query` or `header`' + ) + + def __deserialize_file(self, response): + """Deserializes body to file + + Saves response body into a file in a temporary folder, + using the filename from the `Content-Disposition` header if provided. + + handle file downloading + save response body into a tmp file and return the instance + + :param response: RESTResponse. + :return: file path. + """ + fd, path = tempfile.mkstemp(dir=self.configuration.temp_folder_path) + os.close(fd) + os.remove(path) + + content_disposition = response.getheader("Content-Disposition") + if content_disposition: + m = re.search( + r'filename=[\'"]?([^\'"\s]+)[\'"]?', + content_disposition + ) + assert m is not None, "Unexpected 'content-disposition' header value" + filename = m.group(1) + path = os.path.join(os.path.dirname(path), filename) + + with open(path, "wb") as f: + f.write(response.data) + + return path + + def __deserialize_primitive(self, data, klass): + """Deserializes string to primitive type. + + :param data: str. + :param klass: class literal. + + :return: int, long, float, str, bool. + """ + try: + return klass(data) + except UnicodeEncodeError: + return str(data) + except TypeError: + return data + + def __deserialize_object(self, value): + """Return an original value. + + :return: object. + """ + return value + + def __deserialize_date(self, string): + """Deserializes string to date. + + :param string: str. + :return: date. + """ + try: + return parse(string).date() + except ImportError: + return string + except ValueError: + raise rest.ApiException( + status=0, + reason="Failed to parse `{0}` as date object".format(string) + ) + + def __deserialize_datetime(self, string): + """Deserializes string to datetime. + + The string should be in iso8601 datetime format. + + :param string: str. + :return: datetime. + """ + try: + return parse(string) + except ImportError: + return string + except ValueError: + raise rest.ApiException( + status=0, + reason=( + "Failed to parse `{0}` as datetime object" + .format(string) + ) + ) + + def __deserialize_enum(self, data, klass): + """Deserializes primitive type to enum. + + :param data: primitive type. + :param klass: class literal. + :return: enum value. + """ + try: + return klass(data) + except ValueError: + raise rest.ApiException( + status=0, + reason=( + "Failed to parse `{0}` as `{1}`" + .format(data, klass) + ) + ) + + def __deserialize_model(self, data, klass): + """Deserializes list or dict to model. + + :param data: dict, list. + :param klass: class literal. + :return: model object. + """ + + return klass.from_dict(data) diff --git a/rpi/board_mate_client/api_response.py b/rpi/board_mate_client/api_response.py new file mode 100644 index 00000000..9bc7c11f --- /dev/null +++ b/rpi/board_mate_client/api_response.py @@ -0,0 +1,21 @@ +"""API response object.""" + +from __future__ import annotations +from typing import Optional, Generic, Mapping, TypeVar +from pydantic import Field, StrictInt, StrictBytes, BaseModel + +T = TypeVar("T") + +class ApiResponse(BaseModel, Generic[T]): + """ + API response object + """ + + status_code: StrictInt = Field(description="HTTP status code") + headers: Optional[Mapping[str, str]] = Field(None, description="HTTP headers") + data: T = Field(description="Deserialized data given the data type") + raw_data: StrictBytes = Field(description="Raw data (HTTP response body)") + + model_config = { + "arbitrary_types_allowed": True + } diff --git a/rpi/board_mate_client/configuration.py b/rpi/board_mate_client/configuration.py new file mode 100644 index 00000000..0bb11dc9 --- /dev/null +++ b/rpi/board_mate_client/configuration.py @@ -0,0 +1,436 @@ +# coding: utf-8 + +""" + boardmate_api API + + boardmate_api API + + The version of the OpenAPI document: 1.0.0 + Generated by OpenAPI Generator (https://openapi-generator.tech) + + Do not edit the class manually. +""" # noqa: E501 + + +import copy +import logging +from logging import FileHandler +import multiprocessing +import sys +from typing import Optional +import urllib3 + +import http.client as httplib + +JSON_SCHEMA_VALIDATION_KEYWORDS = { + 'multipleOf', 'maximum', 'exclusiveMaximum', + 'minimum', 'exclusiveMinimum', 'maxLength', + 'minLength', 'pattern', 'maxItems', 'minItems' +} + +class Configuration: + """This class contains various settings of the API client. + + :param host: Base url. + :param api_key: Dict to store API key(s). + Each entry in the dict specifies an API key. + The dict key is the name of the security scheme in the OAS specification. + The dict value is the API key secret. + :param api_key_prefix: Dict to store API prefix (e.g. Bearer). + The dict key is the name of the security scheme in the OAS specification. + The dict value is an API key prefix when generating the auth data. + :param username: Username for HTTP basic authentication. + :param password: Password for HTTP basic authentication. + :param access_token: Access token. + :param server_index: Index to servers configuration. + :param server_variables: Mapping with string values to replace variables in + templated server configuration. The validation of enums is performed for + variables with defined enum values before. + :param server_operation_index: Mapping from operation ID to an index to server + configuration. + :param server_operation_variables: Mapping from operation ID to a mapping with + string values to replace variables in templated server configuration. + The validation of enums is performed for variables with defined enum + values before. + :param ssl_ca_cert: str - the path to a file of concatenated CA certificates + in PEM format. + + """ + + _default = None + + def __init__(self, host=None, + api_key=None, api_key_prefix=None, + username=None, password=None, + access_token=None, + server_index=None, server_variables=None, + server_operation_index=None, server_operation_variables=None, + ssl_ca_cert=None, + ) -> None: + """Constructor + """ + self._base_path = "https://boardmate_api" if host is None else host + """Default Base url + """ + self.server_index = 0 if server_index is None and host is None else server_index + self.server_operation_index = server_operation_index or {} + """Default server index + """ + self.server_variables = server_variables or {} + self.server_operation_variables = server_operation_variables or {} + """Default server variables + """ + self.temp_folder_path = None + """Temp file folder for downloading files + """ + # Authentication Settings + self.api_key = {} + if api_key: + self.api_key = api_key + """dict to store API key(s) + """ + self.api_key_prefix = {} + if api_key_prefix: + self.api_key_prefix = api_key_prefix + """dict to store API prefix (e.g. Bearer) + """ + self.refresh_api_key_hook = None + """function hook to refresh API key if expired + """ + self.username = username + """Username for HTTP basic authentication + """ + self.password = password + """Password for HTTP basic authentication + """ + self.access_token = access_token + """Access token + """ + self.logger = {} + """Logging Settings + """ + self.logger["package_logger"] = logging.getLogger("board_mate_client") + self.logger["urllib3_logger"] = logging.getLogger("urllib3") + self.logger_format = '%(asctime)s %(levelname)s %(message)s' + """Log format + """ + self.logger_stream_handler = None + """Log stream handler + """ + self.logger_file_handler: Optional[FileHandler] = None + """Log file handler + """ + self.logger_file = None + """Debug file location + """ + self.debug = False + """Debug switch + """ + + self.verify_ssl = True + """SSL/TLS verification + Set this to false to skip verifying SSL certificate when calling API + from https server. + """ + self.ssl_ca_cert = ssl_ca_cert + """Set this to customize the certificate file to verify the peer. + """ + self.cert_file = None + """client certificate file + """ + self.key_file = None + """client key file + """ + self.assert_hostname = None + """Set this to True/False to enable/disable SSL hostname verification. + """ + self.tls_server_name = None + """SSL/TLS Server Name Indication (SNI) + Set this to the SNI value expected by the server. + """ + + self.connection_pool_maxsize = multiprocessing.cpu_count() * 5 + """urllib3 connection pool's maximum number of connections saved + per pool. urllib3 uses 1 connection as default value, but this is + not the best value when you are making a lot of possibly parallel + requests to the same host, which is often the case here. + cpu_count * 5 is used as default value to increase performance. + """ + + self.proxy: Optional[str] = None + """Proxy URL + """ + self.proxy_headers = None + """Proxy headers + """ + self.safe_chars_for_path_param = '' + """Safe chars for path_param + """ + self.retries = None + """Adding retries to override urllib3 default value 3 + """ + # Enable client side validation + self.client_side_validation = True + + self.socket_options = None + """Options to pass down to the underlying urllib3 socket + """ + + self.datetime_format = "%Y-%m-%dT%H:%M:%S.%f%z" + """datetime format + """ + + self.date_format = "%Y-%m-%d" + """date format + """ + + def __deepcopy__(self, memo): + cls = self.__class__ + result = cls.__new__(cls) + memo[id(self)] = result + for k, v in self.__dict__.items(): + if k not in ('logger', 'logger_file_handler'): + setattr(result, k, copy.deepcopy(v, memo)) + # shallow copy of loggers + result.logger = copy.copy(self.logger) + # use setters to configure loggers + result.logger_file = self.logger_file + result.debug = self.debug + return result + + def __setattr__(self, name, value): + object.__setattr__(self, name, value) + + @classmethod + def set_default(cls, default): + """Set default instance of configuration. + + It stores default configuration, which can be + returned by get_default_copy method. + + :param default: object of Configuration + """ + cls._default = default + + @classmethod + def get_default_copy(cls): + """Deprecated. Please use `get_default` instead. + + Deprecated. Please use `get_default` instead. + + :return: The configuration object. + """ + return cls.get_default() + + @classmethod + def get_default(cls): + """Return the default configuration. + + This method returns newly created, based on default constructor, + object of Configuration class or returns a copy of default + configuration. + + :return: The configuration object. + """ + if cls._default is None: + cls._default = Configuration() + return cls._default + + @property + def logger_file(self): + """The logger file. + + If the logger_file is None, then add stream handler and remove file + handler. Otherwise, add file handler and remove stream handler. + + :param value: The logger_file path. + :type: str + """ + return self.__logger_file + + @logger_file.setter + def logger_file(self, value): + """The logger file. + + If the logger_file is None, then add stream handler and remove file + handler. Otherwise, add file handler and remove stream handler. + + :param value: The logger_file path. + :type: str + """ + self.__logger_file = value + if self.__logger_file: + # If set logging file, + # then add file handler and remove stream handler. + self.logger_file_handler = logging.FileHandler(self.__logger_file) + self.logger_file_handler.setFormatter(self.logger_formatter) + for _, logger in self.logger.items(): + logger.addHandler(self.logger_file_handler) + + @property + def debug(self): + """Debug status + + :param value: The debug status, True or False. + :type: bool + """ + return self.__debug + + @debug.setter + def debug(self, value): + """Debug status + + :param value: The debug status, True or False. + :type: bool + """ + self.__debug = value + if self.__debug: + # if debug status is True, turn on debug logging + for _, logger in self.logger.items(): + logger.setLevel(logging.DEBUG) + # turn on httplib debug + httplib.HTTPConnection.debuglevel = 1 + else: + # if debug status is False, turn off debug logging, + # setting log level to default `logging.WARNING` + for _, logger in self.logger.items(): + logger.setLevel(logging.WARNING) + # turn off httplib debug + httplib.HTTPConnection.debuglevel = 0 + + @property + def logger_format(self): + """The logger format. + + The logger_formatter will be updated when sets logger_format. + + :param value: The format string. + :type: str + """ + return self.__logger_format + + @logger_format.setter + def logger_format(self, value): + """The logger format. + + The logger_formatter will be updated when sets logger_format. + + :param value: The format string. + :type: str + """ + self.__logger_format = value + self.logger_formatter = logging.Formatter(self.__logger_format) + + def get_api_key_with_prefix(self, identifier, alias=None): + """Gets API key (with prefix if set). + + :param identifier: The identifier of apiKey. + :param alias: The alternative identifier of apiKey. + :return: The token for api key authentication. + """ + if self.refresh_api_key_hook is not None: + self.refresh_api_key_hook(self) + key = self.api_key.get(identifier, self.api_key.get(alias) if alias is not None else None) + if key: + prefix = self.api_key_prefix.get(identifier) + if prefix: + return "%s %s" % (prefix, key) + else: + return key + + def get_basic_auth_token(self): + """Gets HTTP basic authentication header (string). + + :return: The token for basic HTTP authentication. + """ + username = "" + if self.username is not None: + username = self.username + password = "" + if self.password is not None: + password = self.password + return urllib3.util.make_headers( + basic_auth=username + ':' + password + ).get('authorization') + + def auth_settings(self): + """Gets Auth Settings dict for api client. + + :return: The Auth Settings information dict. + """ + auth = {} + return auth + + def to_debug_report(self): + """Gets the essential information for debugging. + + :return: The report for debugging. + """ + return "Python SDK Debug Report:\n"\ + "OS: {env}\n"\ + "Python Version: {pyversion}\n"\ + "Version of the API: 1.0.0\n"\ + "SDK Package Version: 1.0.0".\ + format(env=sys.platform, pyversion=sys.version) + + def get_host_settings(self): + """Gets an array of host settings + + :return: An array of host settings + """ + return [ + { + 'url': "https://boardmate_api", + 'description': "No description provided", + } + ] + + def get_host_from_settings(self, index, variables=None, servers=None): + """Gets host URL based on the index and variables + :param index: array index of the host settings + :param variables: hash of variable and the corresponding value + :param servers: an array of host settings or None + :return: URL based on host settings + """ + if index is None: + return self._base_path + + variables = {} if variables is None else variables + servers = self.get_host_settings() if servers is None else servers + + try: + server = servers[index] + except IndexError: + raise ValueError( + "Invalid index {0} when selecting the host settings. " + "Must be less than {1}".format(index, len(servers))) + + url = server['url'] + + # go through variables and replace placeholders + for variable_name, variable in server.get('variables', {}).items(): + used_value = variables.get( + variable_name, variable['default_value']) + + if 'enum_values' in variable \ + and used_value not in variable['enum_values']: + raise ValueError( + "The variable `{0}` in the host URL has invalid value " + "{1}. Must be {2}.".format( + variable_name, variables[variable_name], + variable['enum_values'])) + + url = url.replace("{" + variable_name + "}", used_value) + + return url + + @property + def host(self): + """Return generated host.""" + return self.get_host_from_settings(self.server_index, variables=self.server_variables) + + @host.setter + def host(self, value): + """Fix base path.""" + self._base_path = value + self.server_index = None diff --git a/rpi/board_mate_client/exceptions.py b/rpi/board_mate_client/exceptions.py new file mode 100644 index 00000000..e5faa6c5 --- /dev/null +++ b/rpi/board_mate_client/exceptions.py @@ -0,0 +1,199 @@ +# coding: utf-8 + +""" + boardmate_api API + + boardmate_api API + + The version of the OpenAPI document: 1.0.0 + Generated by OpenAPI Generator (https://openapi-generator.tech) + + Do not edit the class manually. +""" # noqa: E501 + +from typing import Any, Optional +from typing_extensions import Self + +class OpenApiException(Exception): + """The base exception class for all OpenAPIExceptions""" + + +class ApiTypeError(OpenApiException, TypeError): + def __init__(self, msg, path_to_item=None, valid_classes=None, + key_type=None) -> None: + """ Raises an exception for TypeErrors + + Args: + msg (str): the exception message + + Keyword Args: + path_to_item (list): a list of keys an indices to get to the + current_item + None if unset + valid_classes (tuple): the primitive classes that current item + should be an instance of + None if unset + key_type (bool): False if our value is a value in a dict + True if it is a key in a dict + False if our item is an item in a list + None if unset + """ + self.path_to_item = path_to_item + self.valid_classes = valid_classes + self.key_type = key_type + full_msg = msg + if path_to_item: + full_msg = "{0} at {1}".format(msg, render_path(path_to_item)) + super(ApiTypeError, self).__init__(full_msg) + + +class ApiValueError(OpenApiException, ValueError): + def __init__(self, msg, path_to_item=None) -> None: + """ + Args: + msg (str): the exception message + + Keyword Args: + path_to_item (list) the path to the exception in the + received_data dict. None if unset + """ + + self.path_to_item = path_to_item + full_msg = msg + if path_to_item: + full_msg = "{0} at {1}".format(msg, render_path(path_to_item)) + super(ApiValueError, self).__init__(full_msg) + + +class ApiAttributeError(OpenApiException, AttributeError): + def __init__(self, msg, path_to_item=None) -> None: + """ + Raised when an attribute reference or assignment fails. + + Args: + msg (str): the exception message + + Keyword Args: + path_to_item (None/list) the path to the exception in the + received_data dict + """ + self.path_to_item = path_to_item + full_msg = msg + if path_to_item: + full_msg = "{0} at {1}".format(msg, render_path(path_to_item)) + super(ApiAttributeError, self).__init__(full_msg) + + +class ApiKeyError(OpenApiException, KeyError): + def __init__(self, msg, path_to_item=None) -> None: + """ + Args: + msg (str): the exception message + + Keyword Args: + path_to_item (None/list) the path to the exception in the + received_data dict + """ + self.path_to_item = path_to_item + full_msg = msg + if path_to_item: + full_msg = "{0} at {1}".format(msg, render_path(path_to_item)) + super(ApiKeyError, self).__init__(full_msg) + + +class ApiException(OpenApiException): + + def __init__( + self, + status=None, + reason=None, + http_resp=None, + *, + body: Optional[str] = None, + data: Optional[Any] = None, + ) -> None: + self.status = status + self.reason = reason + self.body = body + self.data = data + self.headers = None + + if http_resp: + if self.status is None: + self.status = http_resp.status + if self.reason is None: + self.reason = http_resp.reason + if self.body is None: + try: + self.body = http_resp.data.decode('utf-8') + except Exception: + pass + self.headers = http_resp.getheaders() + + @classmethod + def from_response( + cls, + *, + http_resp, + body: Optional[str], + data: Optional[Any], + ) -> Self: + if http_resp.status == 400: + raise BadRequestException(http_resp=http_resp, body=body, data=data) + + if http_resp.status == 401: + raise UnauthorizedException(http_resp=http_resp, body=body, data=data) + + if http_resp.status == 403: + raise ForbiddenException(http_resp=http_resp, body=body, data=data) + + if http_resp.status == 404: + raise NotFoundException(http_resp=http_resp, body=body, data=data) + + if 500 <= http_resp.status <= 599: + raise ServiceException(http_resp=http_resp, body=body, data=data) + raise ApiException(http_resp=http_resp, body=body, data=data) + + def __str__(self): + """Custom error messages for exception""" + error_message = "({0})\n"\ + "Reason: {1}\n".format(self.status, self.reason) + if self.headers: + error_message += "HTTP response headers: {0}\n".format( + self.headers) + + if self.data or self.body: + error_message += "HTTP response body: {0}\n".format(self.data or self.body) + + return error_message + + +class BadRequestException(ApiException): + pass + + +class NotFoundException(ApiException): + pass + + +class UnauthorizedException(ApiException): + pass + + +class ForbiddenException(ApiException): + pass + + +class ServiceException(ApiException): + pass + + +def render_path(path_to_item): + """Returns a string representation of a path""" + result = "" + for pth in path_to_item: + if isinstance(pth, int): + result += "[{0}]".format(pth) + else: + result += "['{0}']".format(pth) + return result diff --git a/rpi/board_mate_client/models/__init__.py b/rpi/board_mate_client/models/__init__.py new file mode 100644 index 00000000..b1aa3425 --- /dev/null +++ b/rpi/board_mate_client/models/__init__.py @@ -0,0 +1,17 @@ +# coding: utf-8 + +# flake8: noqa +""" + boardmate_api API + + boardmate_api API + + The version of the OpenAPI document: 1.0.0 + Generated by OpenAPI Generator (https://openapi-generator.tech) + + Do not edit the class manually. +""" # noqa: E501 + + +# import models into model package +from board_mate_client.models.game_dto import GameDto diff --git a/rpi/board_mate_client/models/game_dto.py b/rpi/board_mate_client/models/game_dto.py new file mode 100644 index 00000000..04d1de84 --- /dev/null +++ b/rpi/board_mate_client/models/game_dto.py @@ -0,0 +1,93 @@ +# coding: utf-8 + +""" + boardmate_api API + + boardmate_api API + + The version of the OpenAPI document: 1.0.0 + Generated by OpenAPI Generator (https://openapi-generator.tech) + + Do not edit the class manually. +""" # noqa: E501 + + +from __future__ import annotations +import pprint +import re # noqa: F401 +import json + +from pydantic import BaseModel, ConfigDict, Field, StrictInt, StrictStr +from typing import Any, ClassVar, Dict, List, Optional +from typing import Optional, Set +from typing_extensions import Self + +class GameDto(BaseModel): + """ + GameDto + """ # noqa: E501 + white_name: Optional[StrictStr] = Field(default=None, alias="whiteName") + black_name: Optional[StrictStr] = Field(default=None, alias="blackName") + time_value: Optional[StrictInt] = Field(default=None, alias="timeValue") + increment: Optional[StrictInt] = None + __properties: ClassVar[List[str]] = ["whiteName", "blackName", "timeValue", "increment"] + + model_config = ConfigDict( + populate_by_name=True, + validate_assignment=True, + protected_namespaces=(), + ) + + + def to_str(self) -> str: + """Returns the string representation of the model using alias""" + return pprint.pformat(self.model_dump(by_alias=True)) + + def to_json(self) -> str: + """Returns the JSON representation of the model using alias""" + # TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead + return json.dumps(self.to_dict()) + + @classmethod + def from_json(cls, json_str: str) -> Optional[Self]: + """Create an instance of GameDto from a JSON string""" + return cls.from_dict(json.loads(json_str)) + + def to_dict(self) -> Dict[str, Any]: + """Return the dictionary representation of the model using alias. + + This has the following differences from calling pydantic's + `self.model_dump(by_alias=True)`: + + * `None` is only added to the output dict for nullable fields that + were set at model initialization. Other fields with value `None` + are ignored. + """ + excluded_fields: Set[str] = set([ + ]) + + _dict = self.model_dump( + by_alias=True, + exclude=excluded_fields, + exclude_none=True, + ) + return _dict + + @classmethod + def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]: + """Create an instance of GameDto from a dict""" + if obj is None: + return None + + if not isinstance(obj, dict): + return cls.model_validate(obj) + + _obj = cls.model_validate({ + "whiteName": obj.get("whiteName"), + "blackName": obj.get("blackName"), + "timeValue": obj.get("timeValue"), + "increment": obj.get("increment") + }) + return _obj + + diff --git a/rpi/board_mate_client/py.typed b/rpi/board_mate_client/py.typed new file mode 100644 index 00000000..e69de29b diff --git a/rpi/board_mate_client/rest.py b/rpi/board_mate_client/rest.py new file mode 100644 index 00000000..cab3b012 --- /dev/null +++ b/rpi/board_mate_client/rest.py @@ -0,0 +1,257 @@ +# coding: utf-8 + +""" + boardmate_api API + + boardmate_api API + + The version of the OpenAPI document: 1.0.0 + Generated by OpenAPI Generator (https://openapi-generator.tech) + + Do not edit the class manually. +""" # noqa: E501 + + +import io +import json +import re +import ssl + +import urllib3 + +from board_mate_client.exceptions import ApiException, ApiValueError + +SUPPORTED_SOCKS_PROXIES = {"socks5", "socks5h", "socks4", "socks4a"} +RESTResponseType = urllib3.HTTPResponse + + +def is_socks_proxy_url(url): + if url is None: + return False + split_section = url.split("://") + if len(split_section) < 2: + return False + else: + return split_section[0].lower() in SUPPORTED_SOCKS_PROXIES + + +class RESTResponse(io.IOBase): + + def __init__(self, resp) -> None: + self.response = resp + self.status = resp.status + self.reason = resp.reason + self.data = None + + def read(self): + if self.data is None: + self.data = self.response.data + return self.data + + def getheaders(self): + """Returns a dictionary of the response headers.""" + return self.response.headers + + def getheader(self, name, default=None): + """Returns a given response header.""" + return self.response.headers.get(name, default) + + +class RESTClientObject: + + def __init__(self, configuration) -> None: + # urllib3.PoolManager will pass all kw parameters to connectionpool + # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/poolmanager.py#L75 # noqa: E501 + # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/connectionpool.py#L680 # noqa: E501 + # Custom SSL certificates and client certificates: http://urllib3.readthedocs.io/en/latest/advanced-usage.html # noqa: E501 + + # cert_reqs + if configuration.verify_ssl: + cert_reqs = ssl.CERT_REQUIRED + else: + cert_reqs = ssl.CERT_NONE + + pool_args = { + "cert_reqs": cert_reqs, + "ca_certs": configuration.ssl_ca_cert, + "cert_file": configuration.cert_file, + "key_file": configuration.key_file, + } + if configuration.assert_hostname is not None: + pool_args['assert_hostname'] = ( + configuration.assert_hostname + ) + + if configuration.retries is not None: + pool_args['retries'] = configuration.retries + + if configuration.tls_server_name: + pool_args['server_hostname'] = configuration.tls_server_name + + + if configuration.socket_options is not None: + pool_args['socket_options'] = configuration.socket_options + + if configuration.connection_pool_maxsize is not None: + pool_args['maxsize'] = configuration.connection_pool_maxsize + + # https pool manager + self.pool_manager: urllib3.PoolManager + + if configuration.proxy: + if is_socks_proxy_url(configuration.proxy): + from urllib3.contrib.socks import SOCKSProxyManager + pool_args["proxy_url"] = configuration.proxy + pool_args["headers"] = configuration.proxy_headers + self.pool_manager = SOCKSProxyManager(**pool_args) + else: + pool_args["proxy_url"] = configuration.proxy + pool_args["proxy_headers"] = configuration.proxy_headers + self.pool_manager = urllib3.ProxyManager(**pool_args) + else: + self.pool_manager = urllib3.PoolManager(**pool_args) + + def request( + self, + method, + url, + headers=None, + body=None, + post_params=None, + _request_timeout=None + ): + """Perform requests. + + :param method: http request method + :param url: http request url + :param headers: http request headers + :param body: request json body, for `application/json` + :param post_params: request post parameters, + `application/x-www-form-urlencoded` + and `multipart/form-data` + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + """ + method = method.upper() + assert method in [ + 'GET', + 'HEAD', + 'DELETE', + 'POST', + 'PUT', + 'PATCH', + 'OPTIONS' + ] + + if post_params and body: + raise ApiValueError( + "body parameter cannot be used with post_params parameter." + ) + + post_params = post_params or {} + headers = headers or {} + + timeout = None + if _request_timeout: + if isinstance(_request_timeout, (int, float)): + timeout = urllib3.Timeout(total=_request_timeout) + elif ( + isinstance(_request_timeout, tuple) + and len(_request_timeout) == 2 + ): + timeout = urllib3.Timeout( + connect=_request_timeout[0], + read=_request_timeout[1] + ) + + try: + # For `POST`, `PUT`, `PATCH`, `OPTIONS`, `DELETE` + if method in ['POST', 'PUT', 'PATCH', 'OPTIONS', 'DELETE']: + + # no content type provided or payload is json + content_type = headers.get('Content-Type') + if ( + not content_type + or re.search('json', content_type, re.IGNORECASE) + ): + request_body = None + if body is not None: + request_body = json.dumps(body) + r = self.pool_manager.request( + method, + url, + body=request_body, + timeout=timeout, + headers=headers, + preload_content=False + ) + elif content_type == 'application/x-www-form-urlencoded': + r = self.pool_manager.request( + method, + url, + fields=post_params, + encode_multipart=False, + timeout=timeout, + headers=headers, + preload_content=False + ) + elif content_type == 'multipart/form-data': + # must del headers['Content-Type'], or the correct + # Content-Type which generated by urllib3 will be + # overwritten. + del headers['Content-Type'] + # Ensures that dict objects are serialized + post_params = [(a, json.dumps(b)) if isinstance(b, dict) else (a,b) for a, b in post_params] + r = self.pool_manager.request( + method, + url, + fields=post_params, + encode_multipart=True, + timeout=timeout, + headers=headers, + preload_content=False + ) + # Pass a `string` parameter directly in the body to support + # other content types than JSON when `body` argument is + # provided in serialized form. + elif isinstance(body, str) or isinstance(body, bytes): + r = self.pool_manager.request( + method, + url, + body=body, + timeout=timeout, + headers=headers, + preload_content=False + ) + elif headers['Content-Type'] == 'text/plain' and isinstance(body, bool): + request_body = "true" if body else "false" + r = self.pool_manager.request( + method, + url, + body=request_body, + preload_content=False, + timeout=timeout, + headers=headers) + else: + # Cannot generate the request from given parameters + msg = """Cannot prepare a request message for provided + arguments. Please check that your arguments match + declared content type.""" + raise ApiException(status=0, reason=msg) + # For `GET`, `HEAD` + else: + r = self.pool_manager.request( + method, + url, + fields={}, + timeout=timeout, + headers=headers, + preload_content=False + ) + except urllib3.exceptions.SSLError as e: + msg = "\n".join([type(e).__name__, str(e)]) + raise ApiException(status=0, reason=msg) + + return RESTResponse(r) diff --git a/rpi/dependencies.sh b/rpi/dependencies.sh new file mode 100644 index 00000000..6ff2f6c8 --- /dev/null +++ b/rpi/dependencies.sh @@ -0,0 +1,3 @@ +pip install requests +pip install python-dotenv +pip install pyyaml \ No newline at end of file diff --git a/rpi/services/__init__.py b/rpi/services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/rpi/services/board_mate_service.py b/rpi/services/board_mate_service.py new file mode 100644 index 00000000..17fc168e --- /dev/null +++ b/rpi/services/board_mate_service.py @@ -0,0 +1,22 @@ +import requests +import os +from dotenv import load_dotenv + +def query_api(url, params=None, headers=None, timeout=10): + response = requests.get( + url, + params=params, + headers=headers, + timeout=timeout + ) + response.raise_for_status() # Raises an error for 4xx/5xx responses + return response.json() + +if __name__ == "__main__": + load_dotenv() + + url = "https://api.example.com/users" + params = {"limit": 10} + + data = query_api(url, params=params) + print(data) \ No newline at end of file diff --git a/rpi/lcd-screen/grove_rgb_lcd.py b/rpi/timer/grove_rgb_lcd.py similarity index 100% rename from rpi/lcd-screen/grove_rgb_lcd.py rename to rpi/timer/grove_rgb_lcd.py diff --git a/rpi/lcd-screen/main.py b/rpi/timer/main.py similarity index 100% rename from rpi/lcd-screen/main.py rename to rpi/timer/main.py