wag-managment-api-service-v.../api_services/redis/auth_actions/token.py

92 lines
4.5 KiB
Python

from fastapi import HTTPException, status
def parse_token_object_to_dict(request): # from requests import Request
import api_events.events as events
from api_services.redis.functions import get_object_via_access_key
from databases import EndpointRestriction, Events
from api_configs.configs import Config
if valid_token := get_object_via_access_key(request=request):
endpoint_name = str(request.url).replace(str(request.base_url), "/")
if str(endpoint_name) in Config.INSECURE_PATHS or str(endpoint_name) in Config.NOT_SECURE_PATHS:
return valid_token
endpoint_active = EndpointRestriction.filter_one(
EndpointRestriction.endpoint_name.ilike(f"%{endpoint_name}%"),
system=True,
).data
if not endpoint_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"This endpoint {endpoint_name} is not active for this user, please contact your responsible company for further information.",
)
if valid_token.user_type == 1:
if not valid_token.selected_company:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Selected company is not found in the token object.",
)
selected_event = Events.filter_one(
Events.endpoint_id == endpoint_active.id,
Events.id.in_(valid_token.selected_company.reachable_event_list_id),
).data
if not selected_event:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="This endpoint requires event validation. Please contact your responsible company to use this event.",
)
event_function_class = getattr(selected_event, "function_class", None)
event_function_code = getattr(selected_event, "function_code", None)
function_class = getattr(events, event_function_class, None)
active_function = getattr(
function_class,
function_class.__event_keys__.get(event_function_code, None),
None,
)
if not active_function:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="This endpoint requires event validation. Please contact your responsible company to use this event.",
)
valid_token.available_event = active_function
return valid_token
elif valid_token.user_type == 2:
if not valid_token.selected_occupant:
raise HTTPException(
status_code=status.HTTP_418_IM_A_TEAPOT,
detail="Selected occupant is not found in the token object.",
)
selected_event = Events.filter_all(
Events.endpoint_id == endpoint_active.id,
Events.id.in_(valid_token.selected_occupant.reachable_event_list_id),
)
if not selected_event.data:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"This endpoint {endpoint_name} requires event validation. Please contact your responsible company to use this event.",
)
selected_event = selected_event.data[0]
event_function_class = getattr(selected_event, "function_class", None)
event_function_code = getattr(selected_event, "function_code", None)
function_class = getattr(events, event_function_class, None)
active_function = getattr(
function_class,
function_class.__event_keys__.get(event_function_code, None),
None,
)
if not active_function:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"This endpoint {endpoint_name} requires event validation. Please contact your responsible company to use this event.",
)
valid_token.available_event = active_function
return valid_token
valid_token.available_event = None
return valid_token
user_type = "Company" if valid_token.user_type == 1 else "Occupant"
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Token of this user is not valid. Please login and refresh {user_type} selection.",
)