from fastapi import status from fastapi.routing import APIRouter from fastapi.requests import Request from fastapi.exceptions import HTTPException from api_validations.validations_request import ( EndpointValidation, ) from pydantic import BaseModel validations_route = APIRouter(prefix="/validations", tags=["Validations"]) validations_route.include_router(validations_route, include_in_schema=True) class EndpointValidationResponse(BaseModel): language: str headers: dict validation: dict class ValidationParser: def __init__(self, active_validation): self.annotations = ( active_validation.__annotations__.items() if active_validation else None ) self.schema = {} self.parse() def parse(self): for key, value in self.annotations or {}: field_type, required = "string", False if str(value) == "" or str(value) == "typing.Optional[str]": field_type = "string" required = not str(value) == "typing.Optional[str]" elif str(value) == "" or str(value) == "typing.Optional[int]": field_type = "integer" required = not str(value) == "typing.Optional[int]" elif ( str(value) == "" or str(value) == "typing.Optional[bool]" ): field_type = "boolean" required = not str(value) == "typing.Optional[bool]" elif ( str(value) == "" or str(value) == "typing.Optional[float]" ): field_type = "float" required = not str(value) == "typing.Optional[bool]" elif ( str(value) == "" or str(value) == "typing.Optional[datetime.datetime]" ): field_type = "datetime" required = not str(value) == "typing.Optional[datetime.datetime]" self.schema[key] = {"type": field_type, "required": required} def retrieve_validation_from_class(selected_event, events): event_function_class = getattr(selected_event, "function_class", None) event_function_code = getattr(selected_event, "function_code", None) function_class = getattr(events, event_function_class, None) return function_class.__event_validation__.get(event_function_code, None) @validations_route.post(path="/endpoint", summary="Retrieve validation of endpoint") def user_list(request: Request, validation: EndpointValidation): import api_events.events as events from api_services.redis.functions import get_object_via_access_key from databases import ( EndpointRestriction, Events, ) valid_token = get_object_via_access_key(request=request) if not valid_token: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=f"No valid token found in the request.", ) endpoint_active = EndpointRestriction.filter_one( EndpointRestriction.endpoint_name.ilike(f"%{str(validation.endpoint)}%"), system=True, ).data if not endpoint_active: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=f"This endpoint {str(validation.endpoint)} is not active for this user, please contact your responsible company for further information.", ) if valid_token.user_type == 1 and not valid_token.selected_company: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Selected company is not found in the token object.", ) elif valid_token.user_type == 2 and not valid_token.selected_occupant: raise HTTPException( status_code=status.HTTP_418_IM_A_TEAPOT, detail="Selected occupant is not found in the token object.", ) selected_event = Events.filter_one( Events.endpoint_id == endpoint_active.id, Events.id.in_(valid_token.selected_company.reachable_event_list_id), ).data if not selected_event: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="This endpoint requires event validation. Please contact your responsible company to use this event.", ) active_validation = retrieve_validation_from_class(selected_event, events) if not active_validation: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="No validation found for this endpoint.", ) headers = getattr( active_validation, str(valid_token.lang).lower(), active_validation.tr ) validation_parse = ValidationParser(active_validation=active_validation) return EndpointValidationResponse( language=valid_token.lang, headers=headers, validation=validation_parse.schema, )