import time from typing import Dict, Any from redis import Redis, ConnectionError, TimeoutError, ConnectionPool from Controllers.Redis.config import redis_configs class RedisConn: """ Redis connection manager with connection pooling, retry logic, and health check capabilities. """ CONNECTION_RETRIES = 3 # Number of connection retries before failing RETRY_DELAY = 0.5 # Delay between retries in seconds DEFAULT_TIMEOUT = 5.0 # Default connection timeout in seconds def __init__( self, max_retries: int = CONNECTION_RETRIES, ): """ Initialize Redis connection with configuration. Args: max_retries: Maximum number of connection attempts. """ self.max_retries = max_retries self.config = redis_configs.as_dict() self._redis = None self._pool = None # Add default parameters if not provided if "socket_timeout" not in self.config: self.config["socket_timeout"] = self.DEFAULT_TIMEOUT if "socket_connect_timeout" not in self.config: self.config["socket_connect_timeout"] = self.DEFAULT_TIMEOUT if "decode_responses" not in self.config: self.config["decode_responses"] = True # Add connection pooling settings if not provided if "max_connections" not in self.config: self.config["max_connections"] = 10 # Initialize the connection with retry logic self._connect_with_retry() def __del__(self): """Cleanup Redis connection and pool on object destruction.""" self.close() def close(self) -> None: """Close Redis connection and connection pool.""" try: if self._redis: self._redis.close() self._redis = None if self._pool: self._pool.disconnect() self._pool = None except Exception as e: print(f"Error closing Redis connection: {str(e)}") def _connect_with_retry(self) -> None: """ Attempt to establish a Redis connection with retry logic. Raises: Exception: If all connection attempts fail. """ for attempt in range(1, self.max_retries + 1): try: if self._pool is None: self._pool = ConnectionPool(**self.config) self._redis = Redis(connection_pool=self._pool) if self.check_connection(): return except (ConnectionError, TimeoutError) as e: if attempt < self.max_retries: time.sleep(self.RETRY_DELAY) else: raise Exception( f"Redis connection error after {self.max_retries} attempts: {str(e)}" ) except Exception as e: raise def check_connection(self) -> bool: """ Check if the Redis connection is alive with a PING command. Returns: bool: True if connection is healthy, False otherwise. """ try: return self._redis.ping() except Exception as e: err = e return False def set_connection( self, **kwargs ) -> Redis: """ Recreate Redis connection with new parameters. Args: host: Redis server hostname or IP password: Redis authentication password port: Redis server port db: Redis database number **kwargs: Additional Redis connection parameters Returns: Redis: The new Redis connection object """ try: # Update configuration self.config = { "host": redis_configs.HOST, "password": redis_configs.PASSWORD, "port": redis_configs.PORT, "db": redis_configs.PORT, "socket_timeout": kwargs.get("socket_timeout", self.DEFAULT_TIMEOUT), "socket_connect_timeout": kwargs.get( "socket_connect_timeout", self.DEFAULT_TIMEOUT ), "decode_responses": kwargs.get("decode_responses", True), "max_connections": kwargs.get("max_connections", 10), } # Add any additional parameters for key, value in kwargs.items(): if key not in self.config: self.config[key] = value # Create new connection self._redis = Redis(**self.config) if not self.check_connection(): raise ConnectionError( "Failed to establish connection with new parameters" ) return self._redis except Exception as e: raise def get_connection_info(self) -> Dict[str, Any]: """ Get current connection configuration details. Returns: Dict: Current connection configuration """ # Create a copy without password for security info = self.config.copy() if "password" in info: info["password"] = "********" if info["password"] else None return info def get_stats(self) -> Dict[str, Any]: """ Get Redis server statistics. Returns: Dict: Redis server info """ try: return self._redis.info() except Exception as e: return {"error": str(e)} @property def redis(self) -> Redis: """ Property to access the Redis client. Returns: Redis: The Redis client instance Raises: Exception: If Redis connection is not available """ if not self._redis: raise Exception("Redis client is not initialized") # Check connection health and reconnect if necessary if not self.check_connection(): self._connect_with_retry() return self._redis # Create singleton instance with error handling try: redis_conn = RedisConn() redis_cli = redis_conn.redis except Exception as t: # Optionally set a dummy/mock Redis client for testing or fallback behavior # redis_cli = MockRedis() # If you have a mock implementation # Or raise the exception to fail fast raise