wag-managment-api-service-v.../api_services/redis/old_functions.py

97 lines
3.5 KiB
Python

import json
import typing
from .conn import redis_cli
class RedisImports:
def __init__(self, status, exceptions, access_token_tag, employee_token, occupant_token):
self.status = status
self.exceptions = exceptions
self.ACCESS_TOKEN_TAG = access_token_tag
self.EmployeeTokenObject = employee_token
self.OccupantTokenObject = occupant_token
def get_object_via_access_key(
request,
redis_imports: RedisImports
):
if not hasattr(request, "headers"):
raise redis_imports.exceptions(
status_code=401,
detail=dict(
message="Headers are not found in request. Invalid request object."
),
)
if not request.headers.get(redis_imports.ACCESS_TOKEN_TAG):
raise redis_imports.exceptions(
status_code=401,
detail=dict(message="Unauthorized user, please login..."),
)
already_tokens = redis_cli.scan_iter(
match=str(request.headers.get(redis_imports.ACCESS_TOKEN_TAG) + ":*")
)
if already_tokens := list(already_tokens):
try:
if redis_object := json.loads(
redis_cli.get(already_tokens[0].decode()) or {}
):
if redis_object.get("user_type") == 1:
if not redis_object.get("selected_company", None):
redis_object["selected_company"] = None
return redis_imports.EmployeeTokenObject(**redis_object)
elif redis_object.get("user_type") == 2:
if not redis_object.get("selected_occupant", None):
redis_object["selected_occupant"] = None
return redis_imports.OccupantTokenObject(**redis_object)
raise redis_imports.exceptions(
status_code=401,
detail=dict(
message="User type is not found in the token object. Please reach to your administrator."
),
)
except Exception as e:
raise redis_imports.exceptions(
status_code=500,
detail={
"message": "Redis Service raised an exception.",
"error": str(e),
},
)
raise redis_imports.exceptions(
status_code=redis_imports.status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials. Please login again.",
)
def get_object_via_user_uu_id(user_id: str) -> typing.Union[dict, None]:
already_tokens = redis_cli.scan_iter(match=str("*:" + str(user_id)))
already_tokens_list, already_tokens_dict = [], {}
for already_token in already_tokens:
redis_object = json.loads(redis_cli.get(already_token) or {})
already_tokens_list.append(redis_object)
already_tokens_dict[already_token.decode()] = redis_object
return already_tokens_dict
def save_object_to_redis(
access_token, model_object, redis_imports: RedisImports
) -> bool:
try:
if redis_cli.set(
name=str(access_token) + ":" + str(model_object.user_uu_id),
value=model_object.model_dump_json(),
):
return access_token
except Exception as e:
print("Save Object to Redis Error: ", e)
raise redis_imports.exceptions(
status_code=redis_imports.status.HTTP_503_SERVICE_UNAVAILABLE,
detail=dict(
message="Headers are not found in request. Invalid request object. Redis Error: Token is not saved."
),
)