173 lines
6.2 KiB
Python
173 lines
6.2 KiB
Python
import logging
|
|
|
|
from json import loads, dumps
|
|
from contextlib import contextmanager
|
|
from time import sleep
|
|
from redis import Redis, RedisError, ConnectionError as RedisConnectionError
|
|
from .config import RedisConfig
|
|
|
|
|
|
logger = logging.getLogger('RedisHandler')
|
|
|
|
|
|
@contextmanager
|
|
def safe_redis_operation(redis_client: Redis, operation_name: str = "Redis operation"):
|
|
"""
|
|
Context manager for safely executing Redis operations with error handling
|
|
"""
|
|
try:
|
|
yield redis_client
|
|
except RedisConnectionError as e:
|
|
logger.error(f"{operation_name} failed due to Redis connection error: {str(e)}")
|
|
raise
|
|
except RedisError as e:
|
|
logger.error(f"{operation_name} failed due to Redis error: {str(e)}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"{operation_name} failed with unexpected error: {str(e)}")
|
|
raise
|
|
|
|
|
|
class RedisHandler:
|
|
|
|
"""Singleton Redis handler class for centralized Redis operations"""
|
|
|
|
_instance = None
|
|
REDIS_EXCEPTIONS = (RedisConnectionError, RedisError)
|
|
|
|
def __new__(cls):
|
|
if cls._instance is None:
|
|
cls._instance = super(RedisHandler, cls).__new__(cls)
|
|
cls._instance._initialized = False
|
|
return cls._instance
|
|
|
|
def __init__(self):
|
|
if self._initialized:
|
|
return
|
|
self.redis_client = self._create_redis_client()
|
|
self.redis_connected = self._check_redis_connection()
|
|
self._initialized = True
|
|
|
|
def _create_redis_client(self):
|
|
"""Create a Redis client with connection retry"""
|
|
max_retries, retry_delay = 5, 5
|
|
for attempt in range(max_retries):
|
|
try:
|
|
client = Redis(**RedisConfig.as_dict())
|
|
client.ping()
|
|
logger.info("Redis connection established successfully")
|
|
return client
|
|
except (RedisConnectionError, RedisError) as e:
|
|
if attempt < max_retries - 1:
|
|
logger.warning(f"Redis connection attempt {attempt + 1} failed: {str(e)}. Retrying in {retry_delay} seconds...")
|
|
sleep(retry_delay)
|
|
retry_delay *= 2
|
|
else:
|
|
logger.error(f"Failed to connect to Redis after {max_retries} attempts: {str(e)}")
|
|
return Redis(**RedisConfig.as_dict())
|
|
|
|
def _check_redis_connection(self) -> bool:
|
|
"""Check if Redis connection is alive"""
|
|
try:
|
|
self.ping()
|
|
return True
|
|
except Exception as e:
|
|
return False
|
|
|
|
def ping(self):
|
|
"""Ping Redis server to check connection"""
|
|
return self.redis_client.ping()
|
|
|
|
def sadd(self, key: str, value):
|
|
"""Add a value to a Redis set"""
|
|
return self.redis_client.sadd(key, value)
|
|
|
|
def ismember(self, key: str, value):
|
|
"""Check if a value is a member of a Redis set"""
|
|
return self.redis_client.sismember(key, value)
|
|
|
|
def get(self, key: str):
|
|
"""Get a value from Redis by key"""
|
|
return self.redis_client.get(key)
|
|
|
|
def get_json(self, key: str) -> dict:
|
|
"""Get a value from Redis by key"""
|
|
obj = self.redis_client.get(key)
|
|
if obj:
|
|
return loads(obj)
|
|
return None
|
|
|
|
def set_json(self, key: str, value):
|
|
"""Set a key-value pair in Redis"""
|
|
return self.redis_client.set(key, dumps(value))
|
|
|
|
def set(self, key: str, value):
|
|
"""Set a key-value pair in Redis"""
|
|
return self.redis_client.set(key, value)
|
|
|
|
def delete_value(self, key: str, value):
|
|
"""Delete a value from a Redis value by finding key"""
|
|
get_redis = self.get(key)
|
|
if get_redis:
|
|
get_redis: dict = loads(get_redis)
|
|
get_redis.pop(value)
|
|
self.set(key, dumps(get_redis))
|
|
|
|
def rpush(self, key: str, value):
|
|
"""Append a value to a Redis list"""
|
|
return self.redis_client.rpush(key, value)
|
|
|
|
def lindex(self, key: str, index: int):
|
|
"""Get an element from a Redis list by its index"""
|
|
return self.redis_client.lindex(key, index)
|
|
|
|
def spop(self, key: str, count=1):
|
|
"""Remove and return random members from a Redis set"""
|
|
return self.redis_client.spop(key, count)
|
|
|
|
def srem(self, key: str, value):
|
|
"""Remove a specific member from a Redis set"""
|
|
return self.redis_client.srem(key, value)
|
|
|
|
def ensure_connection(self) -> bool:
|
|
"""Check if Redis connection is alive and reconnect if needed"""
|
|
if not self.redis_connected:
|
|
try:
|
|
self.redis_client = self._create_redis_client()
|
|
self.redis_connected = self._check_redis_connection()
|
|
if self.redis_connected:
|
|
logger.info("Redis connection re-established successfully")
|
|
return self.redis_connected
|
|
except Exception as e:
|
|
logger.error(f"Failed to re-establish Redis connection: {str(e)}")
|
|
return False
|
|
return True
|
|
|
|
@classmethod
|
|
def handle_reconnection(cls, consecutive_errors=0, max_consecutive_errors=5):
|
|
"""
|
|
Handle Redis reconnection with exponential backoff based on consecutive errors
|
|
Args:
|
|
consecutive_errors: Number of consecutive errors encountered
|
|
max_consecutive_errors: Threshold for extended sleep time
|
|
Returns:
|
|
tuple: (RedisHandler instance, bool indicating if extended sleep is needed)
|
|
"""
|
|
try:
|
|
instance = cls()
|
|
instance.redis_connected = instance._check_redis_connection()
|
|
logger.info("Recreated Redis handler using singleton pattern")
|
|
need_extended_sleep = consecutive_errors >= max_consecutive_errors
|
|
if need_extended_sleep:
|
|
logger.warning(f"Hit {max_consecutive_errors} consecutive Redis errors, taking longer pause")
|
|
return instance, need_extended_sleep
|
|
except Exception as redis_retry_error:
|
|
logger.error(f"Failed to recreate Redis handler: {str(redis_retry_error)}")
|
|
return None, consecutive_errors >= max_consecutive_errors
|
|
|
|
|
|
class RedisSaveModels:
|
|
|
|
COMMENT_BUILDING_CLUSTER = "COMMENT:PARSER:BUILDING:CLUSTER"
|
|
COMMENT_BUILDING_INFO = "COMMENT:PARSER:BUILDING:INFO"
|