import typing from fastapi import HTTPException, status from api_objects import OccupantTokenObject, EmployeeTokenObject from api_services.redis.functions import RedisActions from api_configs import Auth class AccessObjectActions: @classmethod def save_object_to_redis( cls, access_token, model_object: typing.Union[OccupantTokenObject, EmployeeTokenObject], expiry_minutes: int = Auth.TOKEN_EXPIRE_MINUTES_30.total_seconds() // 60 ) -> bool: """Save access token object to Redis with expiry Args: access_token: The access token model_object: The token object to save expiry_minutes: Minutes until token expires (default: from Auth config) Returns: bool: True if successful Raises: HTTPException: If save fails """ try: RedisActions.save_object_to_redis( access_token=access_token, model_object=model_object, expiry_minutes=expiry_minutes ) return True except Exception as e: print("Save Object to Redis Error: ", e) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=dict( message="Failed to save token to Redis", error=str(e) ), ) @classmethod def get_object_via_user_uu_id(cls, user_id: str) -> typing.Union[dict, None]: """Get all valid tokens for a user Args: user_id: The user UUID to search for Returns: dict: Dictionary of valid tokens for the user """ return RedisActions.get_object_via_user_uu_id(user_id) @classmethod def access_token(cls, request) -> str: """Extract and validate access token from request Args: request: The request object Returns: str: The access token Raises: HTTPException: If token is missing or invalid """ if not hasattr(request, "headers"): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=dict( message="Headers not found in request" ) ) access_token = request.headers.get(Auth.ACCESS_TOKEN_TAG) if not access_token: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=dict( message="Unauthorized user, please login" ), ) return access_token @classmethod def get_token_object(cls, request) -> typing.Union[OccupantTokenObject, EmployeeTokenObject]: """Get and validate token object from request Args: request: The request object Returns: Union[OccupantTokenObject, EmployeeTokenObject]: The token object Raises: HTTPException: If token is invalid or expired """ try: return RedisActions.get_object_via_access_key(request) except Exception as e: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=dict( message=str(e) ) ) @classmethod def get_object_via_access_key( cls, request, ) -> typing.Union[EmployeeTokenObject, OccupantTokenObject, None]: from api_configs import Auth access_object = RedisActions.get_with_regex( value_regex=str(request.headers.get(Auth.ACCESS_TOKEN_TAG) + ":*") ).data if access_object.get("user_type") == 1: if not access_object.get("selected_company", None): access_object["selected_company"] = None return EmployeeTokenObject(**access_object) elif access_object.get("user_type") == 2: if not access_object.get("selected_occupant", None): access_object["selected_occupant"] = None return OccupantTokenObject(**access_object) raise HTTPException( status_code=401, detail=dict( message="User type is not found in the token object. Please reach to your administrator." ) ) def parse_token_object_to_dict(request): # from requests import Request import api_events.events as events from databases import EndpointRestriction, Events from api_configs.configs import Config if valid_token := AccessObjectActions.get_token_object(request=request): endpoint_name = str(request.url).replace(str(request.base_url), "/") if ( str(endpoint_name) in Config.INSECURE_PATHS or str(endpoint_name) in Config.NOT_SECURE_PATHS ): return valid_token if "update" in endpoint_name: endpoint_name = endpoint_name.split("update")[0] + "update" endpoint_active = EndpointRestriction.filter_one( EndpointRestriction.endpoint_name.ilike(f"%{endpoint_name}%"), system=True, ).data if not endpoint_active: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=f"This endpoint {endpoint_name} is not active for this user, please contact your responsible company for further information.", ) if valid_token.user_type == 1: if not valid_token.selected_company: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Selected company is not found in the token object.", ) selected_event = Events.filter_one( Events.endpoint_id == endpoint_active.id, Events.id.in_(valid_token.selected_company.reachable_event_list_id), ).data if not selected_event: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="This endpoint requires event validation. Please contact your responsible company to use this event.", ) event_function_class = getattr(selected_event, "function_class", None) event_function_code = getattr(selected_event, "function_code", None) function_class = getattr(events, event_function_class, None) active_function = getattr( function_class, function_class.__event_keys__.get(event_function_code, None), None, ) if not active_function: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="This endpoint requires event validation. Please contact your responsible company to use this event.", ) valid_token.available_event = active_function return valid_token elif valid_token.user_type == 2: if not valid_token.selected_occupant: raise HTTPException( status_code=status.HTTP_418_IM_A_TEAPOT, detail="Selected occupant is not found in the token object.", ) selected_event = Events.filter_all( Events.endpoint_id == endpoint_active.id, Events.id.in_(valid_token.selected_occupant.reachable_event_list_id), ) if not selected_event.data: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=f"This endpoint {endpoint_name} requires event validation. Please contact your responsible company to use this event.", ) selected_event = selected_event.data[0] event_function_class = getattr(selected_event, "function_class", None) event_function_code = getattr(selected_event, "function_code", None) function_class = getattr(events, event_function_class, None) active_function = getattr( function_class, function_class.__event_keys__.get(event_function_code, None), None, ) if not active_function: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=f"This endpoint {endpoint_name} requires event validation. Please contact your responsible company to use this event.", ) valid_token.available_event = active_function return valid_token valid_token.available_event = None return valid_token user_type = "Company" if valid_token.user_type == 1 else "Occupant" raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=f"Token of this user is not valid. Please login and refresh {user_type} selection.", )