import json import typing from api_services.redis.conn import redis_cli from api_library.date_time_actions.date_functions import system_arrow from api_objects import OccupantTokenObject, EmployeeTokenObject class RedisActions: @classmethod def set_json(cls, name, value): try: search_name = str(name) if isinstance(name, str) else name.decode() redis_cli.set(name=search_name, value=json.dumps(value)) return RedisResponse( status=True, message="Value is set successfully.", data=value, ) except Exception as e: return RedisResponse( status=False, message="Value is not set successfully.", error=str(e), ) @classmethod def get_json(cls, name): try: search_name = str(name) if isinstance(name, str) else name.decode() json_get = redis_cli.get(search_name) if not json_get: return RedisResponse( status=False, message="Value is not get successfully.", error="Value is not found in the redis.", ) return RedisResponse( status=True, message="Value is get successfully.", data=json.loads(json_get), ) except Exception as e: return RedisResponse( status=False, message="Value is not get successfully.", error=str(e), ) @classmethod def get_with_regex(cls, value_regex): try: already_tokens = redis_cli.scan_iter(match=str(value_regex)) for already_token in already_tokens: return RedisResponse( status=True, message="Single value is retrieved successfully.", data=json.loads(redis_cli.get(already_token)), ) except Exception as e: return RedisResponse( status=False, message="Values are not listed successfully.", error=str(e), ) @classmethod def list_all(cls, value_regex): try: already_tokens = redis_cli.scan_iter(match=str(value_regex)) already_tokens_list = {} for already_token in already_tokens: already_tokens_list[already_token.decode()] = json.loads( redis_cli.get(already_token) ) return RedisResponse( status=True, message="Values are listed successfully.", data=already_tokens_list, ) except Exception as e: return RedisResponse( status=False, message="Values are not listed successfully.", error=str(e), ) @classmethod def set_replace_all(cls, value, value_regex): try: already_tokens = redis_cli.scan_iter(match=str(value_regex)) for already_token in already_tokens: redis_cli.set(name=already_token, value=json.dumps(value)) return RedisResponse( status=True, message="Value is set successfully.", data=value, ) except Exception as e: return RedisResponse( status=False, message="Value is not set successfully.", error=str(e), ) @classmethod def delete(cls, name): try: search_name = str(name) if isinstance(name, str) else name.decode() json_delete = redis_cli.delete(search_name) if not json_delete: return RedisResponse( status=False, message="Value is not deleted successfully.", error="Value is not found in the redis.", ) return RedisResponse( status=True, message="Value is deleted successfully.", ) except Exception as e: return RedisResponse( status=False, message="Value is not deleted successfully.", error=str(e), ) @classmethod def delete_all(cls, value_regex): try: already_tokens = redis_cli.scan_iter(match=str(value_regex)) for already_token in already_tokens: redis_cli.delete(already_token) return RedisResponse( status=True, message="Value is deleted successfully.", ) except Exception as e: return RedisResponse( status=False, message="Value is not deleted successfully.", error=str(e), ) @classmethod def update(cls, name, data): try: json_update = cls.get_json(name=name) if json_update.status: value_dict = json_update.data for key, value in data.items(): value_dict[key] = value redis_cli.set(name=name, value=json.dumps(value_dict)) return RedisResponse( status=True, message="Value is updated successfully.", data=value_dict, ) except Exception as e: return RedisResponse( status=False, message="Value is not updated successfully.", error=str(e), ) @classmethod def update_all(cls, value_regex, data): try: already_tokens = redis_cli.scan_iter(match=str(value_regex)) for already_token in already_tokens: cls.update(name=already_token, data=data) return RedisResponse( status=True, message="Values are updated successfully.", data=data, ) except Exception as e: return RedisResponse( status=False, message="Value is not updated successfully.", error=str(e), ) @classmethod def set_json_with_expiry(cls, name, value, **expiry_kwargs): """Set JSON value with expiry time Example: set_json_with_expiry('key', value, hours=1, minutes=30) """ try: search_name = str(name) if isinstance(name, str) else name.decode() expiry_time = system_arrow.get_expiry_time(**expiry_kwargs) seconds_until_expiry = int( expiry_time.timestamp() - system_arrow.now().timestamp() ) redis_cli.setex( name=search_name, time=seconds_until_expiry, value=json.dumps( {"value": value, "expires_at": expiry_time.timestamp()} ), ) return RedisResponse( status=True, message="Value is set successfully with expiry.", data={"value": value, "expires_at": expiry_time.timestamp()}, ) except Exception as e: return RedisResponse( status=False, message="Value is not set successfully.", error=str(e), ) @classmethod def get_json_if_valid(cls, name): """Get JSON value if not expired""" try: search_name = str(name) if isinstance(name, str) else name.decode() result = redis_cli.get(name=search_name) if not result: return RedisResponse( status=False, message="Key not found.", ) data = json.loads(result) if system_arrow.is_expired(data.get("expires_at")): redis_cli.delete(search_name) return RedisResponse( status=False, message="Cache expired.", ) return RedisResponse( status=True, message="Value retrieved successfully.", data=data["value"], ) except Exception as e: return RedisResponse( status=False, message="Error retrieving value.", error=str(e), ) @classmethod def cache_with_timezone(cls, name, value, timezone=None, **expiry_kwargs): """Cache value with timezone-aware key""" try: dt = DateTimeLocal(timezone=timezone) if timezone else system_arrow cache_key = dt.get_cache_key(name) return cls.set_json_with_expiry(cache_key, value, **expiry_kwargs) except Exception as e: return RedisResponse( status=False, message="Error caching with timezone.", error=str(e), ) @classmethod def save_object_to_redis(cls, access_token, model_object, expiry_minutes: int = 30): """Save object to Redis with expiry time Args: access_token: The access token to use as key prefix model_object: The model object to save (must have user_uu_id attribute and model_dump_json method) expiry_minutes: Minutes until the token expires (default: 30) Returns: str: The access token if successful Raises: Exception: If saving fails """ try: key = f"{access_token}:{model_object.user_uu_id}" expiry_time = system_arrow.get_expiry_time(minutes=expiry_minutes) seconds_until_expiry = max( 1, int(expiry_time.timestamp() - system_arrow.now().timestamp()) ) # Add expiry time to the model data model_data = json.loads(model_object.model_dump_json()) model_data["expires_at"] = expiry_time.timestamp() if redis_cli.setex( name=key, time=seconds_until_expiry, value=json.dumps(model_data) ): return access_token except Exception as e: raise Exception(f"Failed to save object to Redis. Error: {str(e)}") raise Exception("Failed to save token to Redis") @classmethod def get_object_via_access_key(cls, request): """Get object from Redis using access key from request headers Args: request: The request object containing headers Returns: Union[EmployeeTokenObject, OccupantTokenObject]: The token object Raises: Exception: If retrieval fails or token is invalid """ from api_configs.configs import Auth from api_library.date_time_actions.date_functions import system_arrow if not hasattr(request, "headers"): raise Exception("Headers not found in request") access_token = request.headers.get(Auth.ACCESS_TOKEN_TAG) if not access_token: raise Exception("Unauthorized user, please login") # Scan for matching tokens token_pattern = f"{access_token}:*" matching_tokens = list(redis_cli.scan_iter(match=token_pattern)) if not matching_tokens: raise Exception("Invalid credentials. Please login again") try: # Check if token has expired in Redis token_key = matching_tokens[0] ttl = redis_cli.ttl(token_key) if ttl <= 0: redis_cli.delete(token_key) raise Exception("Token expired. Please login again") # Get the token data token_data = json.loads(redis_cli.get(token_key) or "{}") # Return appropriate token object based on user type if token_data.get("user_type") == 1: # Employee if not token_data.get("selected_company"): token_data["selected_company"] = None return EmployeeTokenObject(**token_data) elif token_data.get("user_type") == 2: # Occupant if not token_data.get("selected_occupant"): token_data["selected_occupant"] = None return OccupantTokenObject(**token_data) raise Exception("Invalid user type in token") except Exception as e: raise Exception(f"Failed to retrieve token: {str(e)}") @classmethod def get_object_via_user_uu_id(cls, user_id: str) -> typing.Union[dict, None]: """Get all objects for a user by UUID Args: user_id: The user UUID to search for Returns: dict: Dictionary of token keys and their corresponding objects """ token_pattern = f"*:{str(user_id)}" matching_tokens = redis_cli.scan_iter(match=token_pattern) tokens_dict = {} for token_key in matching_tokens: token_data = json.loads(redis_cli.get(token_key) or "{}") # Skip expired tokens and clean them up if system_arrow.is_expired(token_data.get("expires_at")): redis_cli.delete(token_key) continue tokens_dict[token_key.decode()] = token_data return tokens_dict class RedisResponse: def __init__( self, status: bool, message: str, data: typing.Union[dict | list] = None, error: str = None, ): self.status = status self.message = message self.data = data if isinstance(data, dict): self.data_type = "dict" elif isinstance(data, list): self.data_type = "list" elif data is None: self.data_type = None self.error = error def as_dict(self): return { "status": self.status, "message": self.message, "data": self.data, "data_type": self.data_type, "error": self.error, }