71 lines
3.4 KiB
Python
71 lines
3.4 KiB
Python
from typing import Any, Union
|
|
|
|
from api_validations.token.validations import (
|
|
TokenDictType,
|
|
EmployeeTokenObject,
|
|
OccupantTokenObject,
|
|
CompanyToken,
|
|
OccupantToken,
|
|
UserType
|
|
)
|
|
from api_controllers.redis.database import RedisActions
|
|
from api_modules.token.password_module import PasswordModule
|
|
|
|
from schemas import Users
|
|
|
|
|
|
class RedisHandlers:
|
|
|
|
AUTH_TOKEN: str = "AUTH_TOKEN"
|
|
|
|
@classmethod
|
|
def process_redis_object(cls, redis_object: dict[str, Any]) -> TokenDictType:
|
|
"""Process Redis object and return appropriate token object."""
|
|
if not redis_object.get("selected_company"):
|
|
redis_object["selected_company"] = None
|
|
if not redis_object.get("selected_occupant"):
|
|
redis_object["selected_occupant"] = None
|
|
if redis_object.get("user_type") == UserType.employee.value:
|
|
return EmployeeTokenObject(**redis_object)
|
|
elif redis_object.get("user_type") == UserType.occupant.value:
|
|
return OccupantTokenObject(**redis_object)
|
|
raise ValueError("Invalid user type")
|
|
|
|
@classmethod
|
|
def get_object_from_redis(cls, access_token: str) -> TokenDictType:
|
|
redis_response = RedisActions.get_json(list_keys=[cls.AUTH_TOKEN, access_token, "*", "*"])
|
|
if not redis_response.status:
|
|
raise ValueError("EYS_0001")
|
|
if redis_object := redis_response.first:
|
|
return cls.process_redis_object(redis_object)
|
|
raise ValueError("EYS_0002")
|
|
|
|
@classmethod
|
|
def set_object_to_redis(cls, user: Users, token, header_info, add_uuid: str):
|
|
result_delete = RedisActions.delete(list_keys=[cls.AUTH_TOKEN, "*", str(user.uu_id), add_uuid])
|
|
generated_access_token = PasswordModule.generate_access_token()
|
|
keys = [cls.AUTH_TOKEN, generated_access_token, str(user.uu_id)]
|
|
if add_uuid:
|
|
keys.append(add_uuid)
|
|
RedisActions.set_json(list_keys=keys, value={**token, **header_info}, expires={"hours": 1, "minutes": 30})
|
|
return generated_access_token
|
|
RedisActions.set_json(list_keys=keys, value={**token, **header_info}, expires={"hours": 1, "minutes": 30})
|
|
return generated_access_token
|
|
|
|
@classmethod
|
|
def update_token_at_redis(cls, token: str, add_payload: Union[CompanyToken, OccupantToken], add_uuid: str):
|
|
if already_token_data := RedisActions.get_json(list_keys=[cls.AUTH_TOKEN, token, '*', add_uuid]).first:
|
|
already_token = cls.process_redis_object(already_token_data)
|
|
if already_token.is_employee and isinstance(add_payload, CompanyToken):
|
|
already_token.selected_company = add_payload
|
|
list_keys = [cls.AUTH_TOKEN, token, str(already_token.user_uu_id), str(add_uuid)]
|
|
print('is_employee: ', list_keys)
|
|
elif already_token.is_occupant and isinstance(add_payload, OccupantToken):
|
|
already_token.selected_occupant = add_payload
|
|
list_keys = [cls.AUTH_TOKEN, token, str(already_token.user_uu_id), str(add_uuid)]
|
|
print('is_occupant: ', list_keys)
|
|
result = RedisActions.set_json(list_keys=list_keys, value=already_token.model_dump(), expires={"hours": 1, "minutes": 30})
|
|
RedisActions.delete(list_keys=[cls.AUTH_TOKEN, token, str(already_token.user_uu_id)])
|
|
return result.first
|
|
raise ValueError("Something went wrong")
|