from fastapi.routing import APIRouter from fastapi.requests import Request from api_objects import OccupantTokenObject, EmployeeTokenObject from api_validations.validations_request import ( UpdateEndpointAccessList, InsertEndpointAccess, CheckEndpointAccess, ) from api_services.redis.auth_actions.token import parse_token_object_to_dict from databases import ( EndpointRestriction, Event2Occupant, Event2Employee, Events, ) from databases.sql_models.event.event import Services, Service2Events endpoint_restriction_route = APIRouter(prefix="/access", tags=["Endpoint Access"]) endpoint_restriction_route.include_router( endpoint_restriction_route, include_in_schema=True ) @endpoint_restriction_route.post( path="/endpoint/restriction/create", summary="Add extra restriction to endpoints list", ) def endpoint_restriction_create(request: Request, data: InsertEndpointAccess): token_dict = parse_token_object_to_dict(request=request) return @endpoint_restriction_route.post( path="/endpoint/update", summary="Update extra restriction to endpoints list" ) def endpoint_restriction_update(request: Request, data: UpdateEndpointAccessList): token_dict = parse_token_object_to_dict(request=request) return @endpoint_restriction_route.post( path="/endpoints/available", summary="List extra restriction to endpoints list" ) def endpoint_restriction_list(request: Request): token_dict, records = parse_token_object_to_dict(request=request), [] if isinstance(token_dict, OccupantTokenObject): occupant_events = Event2Occupant.get_event_id_by_build_living_space_id( build_living_space_id=token_dict.selected_occupant.living_space_id ) events_list = Events.filter_all(Events.id.in_(occupant_events)).data records = EndpointRestriction.filter_all( EndpointRestriction.id.in_([event.endpoint_id for event in events_list]) ).data elif isinstance(token_dict, EmployeeTokenObject): employee_events = Event2Employee.get_event_id_by_employee_id( employee_id=token_dict.selected_company.employee_id ) events_list = Events.filter_all(Events.id.in_(employee_events)).data records = EndpointRestriction.filter_all( EndpointRestriction.id.in_([event.endpoint_id for event in events_list]) ).data return dict( completed=True, message="Available endpoints are listed successfully", result=[str(record.endpoint_name) for record in records], ) @endpoint_restriction_route.post( path="/endpoint/available", summary="Check extra restriction to endpoint available" ) def endpoint_restriction_available(request: Request, data: CheckEndpointAccess): token_dict, records = parse_token_object_to_dict(request=request), [] endpoint = EndpointRestriction.filter_one( EndpointRestriction.endpoint_name.ilike(f"%{str(data.endpoint)}%") ).data if not endpoint: EndpointRestriction.raise_http_exception( status_code="HTTP_404_NOT_FOUND", error_case="UNAUTHORIZED", message="Only Occupant can see this data", data={}, ) event = Events.filter_one(Events.id == endpoint.id).data service = Service2Events.filter_one( Service2Events.event_id == event.id, ).data if isinstance(token_dict, OccupantTokenObject): event_occupant = Event2Occupant.filter_one( Event2Occupant.event_service_id == service.id, Event2Occupant.build_living_space_id == token_dict.selected_occupant.living_space_id, ).data if not event_occupant: EndpointRestriction.raise_http_exception( status_code="HTTP_404_NOT_FOUND", error_case="UNAUTHORIZED", message="This endpoint is not available for this occupant", data={}, ) return dict( completed=True, message="Endpoint is available for this occupant", ) elif isinstance(token_dict, EmployeeTokenObject): event_employee = Event2Employee.filter_one( Event2Employee.event_service_id == service.id, Event2Employee.employee_id == token_dict.selected_company.employee_id, ).data if not event_employee: EndpointRestriction.raise_http_exception( status_code="HTTP_404_NOT_FOUND", error_case="UNAUTHORIZED", message="This endpoint is not available for this employee", data={}, ) return dict( completed=True, message="Endpoint is available for this occupant", ) @endpoint_restriction_route.patch( path="/endpoint/bind/patch", summary="Patch extra restriction to endpoints list" ) def endpoint_restriction_patch(request: Request): token_dict = parse_token_object_to_dict(request=request) return