join tested auth service login/select completed
This commit is contained in:
78
api_services/api_middlewares/token_provider.py
Normal file
78
api_services/api_middlewares/token_provider.py
Normal file
@@ -0,0 +1,78 @@
|
||||
import enum
|
||||
|
||||
from typing import Optional, Union, Dict, Any, List
|
||||
from pydantic import BaseModel
|
||||
|
||||
from api_controllers.redis.database import RedisActions
|
||||
from api_validations.token.validations import (
|
||||
TokenDictType,
|
||||
OccupantTokenObject,
|
||||
EmployeeTokenObject,
|
||||
UserType,
|
||||
)
|
||||
|
||||
class TokenProvider:
|
||||
|
||||
AUTH_TOKEN: str = "AUTH_TOKEN"
|
||||
|
||||
@classmethod
|
||||
def convert_redis_object_to_token(cls, redis_object: Dict[str, Any]) -> TokenDictType:
|
||||
"""
|
||||
Process Redis object and return appropriate token object.
|
||||
"""
|
||||
if redis_object.get("user_type") == UserType.employee.value:
|
||||
return EmployeeTokenObject(**redis_object)
|
||||
elif redis_object.get("user_type") == UserType.occupant.value:
|
||||
return OccupantTokenObject(**redis_object)
|
||||
raise ValueError("Invalid user type")
|
||||
|
||||
@classmethod
|
||||
def get_dict_from_redis(cls, token: Optional[str] = None, user_uu_id: Optional[str] = None) -> Union[TokenDictType, List[TokenDictType]]:
|
||||
"""
|
||||
Retrieve token object from Redis using token and user_uu_id
|
||||
"""
|
||||
token_to_use, user_uu_id_to_use = token or "*", user_uu_id or "*"
|
||||
list_of_token_dict, auth_key_list = [], [
|
||||
cls.AUTH_TOKEN,
|
||||
token_to_use,
|
||||
user_uu_id_to_use,
|
||||
]
|
||||
if token:
|
||||
result = RedisActions.get_json(list_keys=auth_key_list, limit=1)
|
||||
if first_record := result.first:
|
||||
return cls.convert_redis_object_to_token(first_record)
|
||||
elif user_uu_id:
|
||||
result = RedisActions.get_json(list_keys=auth_key_list)
|
||||
if all_records := result.all:
|
||||
for all_record in all_records:
|
||||
list_of_token_dict.append(cls.convert_redis_object_to_token(all_record))
|
||||
return list_of_token_dict
|
||||
raise ValueError(
|
||||
"Token not found in Redis. Please check the token or user_uu_id."
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def retrieve_application_codes(cls, page_url: str, token: TokenDictType):
|
||||
"""
|
||||
Retrieve application code from the token object or list of token objects.
|
||||
"""
|
||||
if isinstance(token, EmployeeTokenObject):
|
||||
if application_codes := token.selected_company.reachable_app_codes.get(page_url, None):
|
||||
return application_codes
|
||||
elif isinstance(token, OccupantTokenObject):
|
||||
if application_codes := token.selected_occupant.reachable_app_codes.get(page_url, None):
|
||||
return application_codes
|
||||
raise ValueError("Invalid token type or no application code found.")
|
||||
|
||||
@classmethod
|
||||
def retrieve_event_codes(cls, endpoint_code: str, token: TokenDictType) -> str:
|
||||
"""
|
||||
Retrieve event code from the token object or list of token objects.
|
||||
"""
|
||||
if isinstance(token, EmployeeTokenObject):
|
||||
if event_codes := token.selected_company.reachable_event_codes.get(endpoint_code, None):
|
||||
return event_codes
|
||||
elif isinstance(token, OccupantTokenObject):
|
||||
if event_codes := token.selected_occupant.reachable_event_codes.get(endpoint_code, None):
|
||||
return event_codes
|
||||
raise ValueError("Invalid token type or no event code found.")
|
||||
Reference in New Issue
Block a user