wag-managment-api-service-v.../ApiServices/api_handlers/auth_actions/token.py

214 lines
8.7 KiB
Python

import typing
from fastapi import HTTPException, status
from api_objects import OccupantTokenObject, EmployeeTokenObject
from api_services.redis.functions import RedisActions
from api_configs import Auth
class AccessObjectActions:
@classmethod
def save_object_to_redis(
cls,
access_token,
model_object: typing.Union[OccupantTokenObject, EmployeeTokenObject],
expiry_minutes: int = Auth.TOKEN_EXPIRE_MINUTES_30.total_seconds() // 60,
) -> bool:
"""Save access token object to Redis with expiry
Args:
access_token: The access token
model_object: The token object to save
expiry_minutes: Minutes until token expires (default: from Auth config)
Returns:
bool: True if successful
Raises:
HTTPException: If save fails
"""
try:
RedisActions.save_object_to_redis(
access_token=access_token,
model_object=model_object,
expiry_minutes=expiry_minutes,
)
return True
except Exception as e:
print("Save Object to Redis Error: ", e)
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=dict(message="Failed to save token to Redis", error=str(e)),
)
@classmethod
def get_object_via_user_uu_id(cls, user_id: str) -> typing.Union[dict, None]:
"""Get all valid tokens for a user
Args:
user_id: The user UUID to search for
Returns:
dict: Dictionary of valid tokens for the user
"""
return RedisActions.get_object_via_user_uu_id(user_id)
@classmethod
def access_token(cls, request) -> str:
"""Extract and validate access token from request
Args:
request: The request object
Returns:
str: The access token
Raises:
HTTPException: If token is missing or invalid
"""
if not hasattr(request, "headers"):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=dict(message="Headers not found in request"),
)
access_token = request.headers.get(Auth.ACCESS_TOKEN_TAG)
if not access_token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=dict(message="Unauthorized user, please login"),
)
return access_token
@classmethod
def get_token_object(
cls, request
) -> typing.Union[OccupantTokenObject, EmployeeTokenObject]:
"""Get and validate token object from request
Args:
request: The request object
Returns:
Union[OccupantTokenObject, EmployeeTokenObject]: The token object
Raises:
HTTPException: If token is invalid or expired
"""
try:
return RedisActions.get_object_via_access_key(request)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail=dict(message=str(e))
)
@classmethod
def get_object_via_access_key(
cls,
request,
) -> typing.Union[EmployeeTokenObject, OccupantTokenObject, None]:
from api_configs import Auth
access_object = RedisActions.get_with_regex(
value_regex=str(request.headers.get(Auth.ACCESS_TOKEN_TAG) + ":*")
).data
if access_object.get("user_type") == 1:
if not access_object.get("selected_company", None):
access_object["selected_company"] = None
return EmployeeTokenObject(**access_object)
elif access_object.get("user_type") == 2:
if not access_object.get("selected_occupant", None):
access_object["selected_occupant"] = None
return OccupantTokenObject(**access_object)
raise HTTPException(
status_code=401,
detail=dict(
message="User type is not found in the token object. Please reach to your administrator."
),
)
def parse_token_object_to_dict(request): # from requests import Request
import api_events.events as events
from databases import EndpointRestriction, Events
from api_configs.configs import Config
if valid_token := AccessObjectActions.get_token_object(request=request):
endpoint_name = str(request.url).replace(str(request.base_url), "/")
if (
str(endpoint_name) in Config.INSECURE_PATHS
or str(endpoint_name) in Config.NOT_SECURE_PATHS
):
return valid_token
if "update" in endpoint_name:
endpoint_name = endpoint_name.split("update")[0] + "update"
endpoint_active = EndpointRestriction.filter_one(
EndpointRestriction.endpoint_name.ilike(f"%{endpoint_name}%"),
system=True,
).data
if not endpoint_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"This endpoint {endpoint_name} is not active for this user, please contact your responsible company for further information.",
)
if valid_token.user_type == 1:
if not valid_token.selected_company:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Selected company is not found in the token object.",
)
selected_event = Events.filter_one(
Events.endpoint_id == endpoint_active.id,
Events.id.in_(valid_token.selected_company.reachable_event_list_id),
).data
if not selected_event:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="This endpoint requires event validation. Please contact your responsible company to use this event.",
)
event_function_class = getattr(selected_event, "function_class", None)
event_function_code = getattr(selected_event, "function_code", None)
function_class = getattr(events, event_function_class, None)
active_function = getattr(
function_class,
function_class.__event_keys__.get(event_function_code, None),
None,
)
if not active_function:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="This endpoint requires event validation. Please contact your responsible company to use this event.",
)
valid_token.available_event = active_function
return valid_token
elif valid_token.user_type == 2:
if not valid_token.selected_occupant:
raise HTTPException(
status_code=status.HTTP_418_IM_A_TEAPOT,
detail="Selected occupant is not found in the token object.",
)
selected_event = Events.filter_all(
Events.endpoint_id == endpoint_active.id,
Events.id.in_(valid_token.selected_occupant.reachable_event_list_id),
)
if not selected_event.data:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"This endpoint {endpoint_name} requires event validation. Please contact your responsible company to use this event.",
)
selected_event = selected_event.data[0]
event_function_class = getattr(selected_event, "function_class", None)
event_function_code = getattr(selected_event, "function_code", None)
function_class = getattr(events, event_function_class, None)
active_function = getattr(
function_class,
function_class.__event_keys__.get(event_function_code, None),
None,
)
if not active_function:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"This endpoint {endpoint_name} requires event validation. Please contact your responsible company to use this event.",
)
valid_token.available_event = active_function
return valid_token
valid_token.available_event = None
return valid_token
user_type = "Company" if valid_token.user_type == 1 else "Occupant"
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Token of this user is not valid. Please login and refresh {user_type} selection.",
)