Controllers added updated implementations and tests awaits

This commit is contained in:
2025-03-31 23:51:27 +03:00
parent 2d32842782
commit 5d30bc2701
20 changed files with 2188 additions and 0 deletions

View File

@@ -0,0 +1,24 @@
from pydantic_settings import BaseSettings, SettingsConfigDict
class Configs(BaseSettings):
"""
MongoDB configuration settings.
"""
USER: str = ""
PASSWORD: str = ""
HOST: str = ""
PORT: int = 0
DB: str = ""
ENGINE: str = ""
@property
def url(self):
"""Generate the database URL."""
return f"{self.ENGINE}://{self.USER}:{self.PASSWORD}@{self.HOST}:{self.PORT}/{self.DB}?retryWrites=true&w=majority"
model_config = SettingsConfigDict(env_prefix="MONGO_")
mongo_configs = Configs() # singleton instance of the MONGODB configuration settings

View File

@@ -0,0 +1,208 @@
import time
import functools
from pymongo import MongoClient
from pymongo.errors import PyMongoError
from config import mongo_configs
def retry_operation(max_attempts=3, delay=1.0, backoff=2.0, exceptions=(PyMongoError,)):
"""
Decorator for retrying MongoDB operations with exponential backoff.
Args:
max_attempts: Maximum number of retry attempts
delay: Initial delay between retries in seconds
backoff: Multiplier for delay after each retry
exceptions: Tuple of exceptions to catch and retry
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
mtries, mdelay = max_attempts, delay
while mtries > 1:
try:
return func(*args, **kwargs)
except exceptions as e:
time.sleep(mdelay)
mtries -= 1
mdelay *= backoff
return func(*args, **kwargs)
return wrapper
return decorator
class MongoDBConfig:
"""
Configuration class for MongoDB connection settings.
"""
def __init__(
self,
uri: str = "mongodb://localhost:27017/",
max_pool_size: int = 50,
min_pool_size: int = 10,
max_idle_time_ms: int = 30000,
wait_queue_timeout_ms: int = 2000,
server_selection_timeout_ms: int = 5000,
**additional_options,
):
"""
Initialize MongoDB configuration.
"""
self.uri = uri
self.client_options = {
"maxPoolSize": max_pool_size,
"minPoolSize": min_pool_size,
"maxIdleTimeMS": max_idle_time_ms,
"waitQueueTimeoutMS": wait_queue_timeout_ms,
"serverSelectionTimeoutMS": server_selection_timeout_ms,
**additional_options,
}
class MongoDBHandler(MongoDBConfig):
"""
A MongoDB handler that provides context manager access to specific collections
with automatic retry capability.
"""
_instance = None
def __new__(cls, *args, **kwargs):
"""
Implement singleton pattern for the handler.
"""
if cls._instance is None:
cls._instance = super(MongoDBHandler, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(
self,
uri: str,
max_pool_size: int = 50,
min_pool_size: int = 10,
max_idle_time_ms: int = 30000,
wait_queue_timeout_ms: int = 2000,
server_selection_timeout_ms: int = 5000,
**additional_options,
):
"""
Initialize the MongoDB handler (only happens once due to singleton).
"""
# Only initialize once
if not hasattr(self, "_initialized") or not self._initialized:
super().__init__(
uri=uri,
max_pool_size=max_pool_size,
min_pool_size=min_pool_size,
max_idle_time_ms=max_idle_time_ms,
wait_queue_timeout_ms=wait_queue_timeout_ms,
server_selection_timeout_ms=server_selection_timeout_ms,
**additional_options,
)
self._initialized = True
def collection(self, collection_name: str):
"""
Get a context manager for a specific collection.
Args:
collection_name: Name of the collection to access
Returns:
A context manager for the specified collection
"""
return CollectionContext(self, collection_name)
class CollectionContext:
"""
Context manager for MongoDB collections with automatic retry capability.
"""
def __init__(self, db_handler: MongoDBHandler, collection_name: str):
"""
Initialize collection context.
Args:
db_handler: Reference to the MongoDB handler
collection_name: Name of the collection to access
"""
self.db_handler = db_handler
self.collection_name = collection_name
self.client = None
self.collection = None
def __enter__(self):
"""
Enter context, establishing a new connection.
Returns:
The MongoDB collection object with retry capabilities
"""
try:
# Create a new client connection
self.client = MongoClient(
self.db_handler.uri, **self.db_handler.client_options
)
self.collection = self.client.get_database()[self.collection_name]
# Enhance collection methods with retry capabilities
self._add_retry_capabilities()
return self.collection
except Exception as e:
if self.client:
self.client.close()
raise
def _add_retry_capabilities(self):
"""
Add retry capabilities to collection methods.
"""
# Store original methods
original_insert_one = self.collection.insert_one
original_insert_many = self.collection.insert_many
original_find_one = self.collection.find_one
original_find = self.collection.find
original_update_one = self.collection.update_one
original_update_many = self.collection.update_many
original_delete_one = self.collection.delete_one
original_delete_many = self.collection.delete_many
original_replace_one = self.collection.replace_one
original_count_documents = self.collection.count_documents
# Add retry capabilities to methods
self.collection.insert_one = retry_operation()(original_insert_one)
self.collection.insert_many = retry_operation()(original_insert_many)
self.collection.find_one = retry_operation()(original_find_one)
self.collection.find = retry_operation()(original_find)
self.collection.update_one = retry_operation()(original_update_one)
self.collection.update_many = retry_operation()(original_update_many)
self.collection.delete_one = retry_operation()(original_delete_one)
self.collection.delete_many = retry_operation()(original_delete_many)
self.collection.replace_one = retry_operation()(original_replace_one)
self.collection.count_documents = retry_operation()(original_count_documents)
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Exit context, closing the connection.
"""
if self.client:
self.client.close()
self.client = None
self.collection = None
mongo_handler = MongoDBHandler(
uri=mongo_configs.url,
db_name=mongo_configs.DB,
max_pool_size=100,
min_pool_size=20,
max_idle_time_ms=60000,
)

View File

@@ -0,0 +1,17 @@
# Initialize the MongoDB handler with your configuration
from Controllers.Mongo.database import mongo_handler
# Use the collection with automatic retry capabilities
with mongo_handler.collection("users") as users_collection:
# These operations will automatically retry on failure
users_collection.insert_one({"username": "john", "email": "john@example.com"})
# Find operations also have retry capabilities
user = users_collection.find_one({"username": "john"})
# Update operations will retry if they encounter transient errors
users_collection.update_one(
{"username": "john"}, {"$set": {"last_login": "2025-03-31"}}
)

View File

@@ -0,0 +1,41 @@
from pydantic_settings import BaseSettings, SettingsConfigDict
"""
pool_pre_ping=True, # Verify connection before using
pool_size=20, # Maximum number of permanent connections
max_overflow=10, # Maximum number of additional connections
pool_recycle=600, # Recycle connections after 1 hour
pool_timeout=30, # Wait up to 30 seconds for a connection
echo=True, # Set to True for debugging SQL queries
"""
class Configs(BaseSettings):
"""
Postgresql configuration settings.
"""
DB: str = ""
USER: str = ""
PASSWORD: str = ""
HOST: str = ""
PORT: str = 0
ENGINE: str = ""
POOL_PRE_PING: bool = True
POOL_SIZE: int = 20
MAX_OVERFLOW: int = 10
POOL_RECYCLE: int = 600
POOL_TIMEOUT: int = 30
ECHO: bool = True
@property
def url(self):
"""Generate the database URL."""
return f"{self.ENGINE}://{self.USER}:{self.PASSWORD}@{self.HOST}:{self.PORT}/{self.DB}"
model_config = SettingsConfigDict(env_prefix="POSTGRES_")
postgres_configs = (
Configs()
) # singleton instance of the POSTGRESQL configuration settings

View File

@@ -0,0 +1,61 @@
from contextlib import contextmanager
from functools import lru_cache
from typing import Generator
from config import postgres_configs
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, sessionmaker, scoped_session, Session
# Configure the database engine with proper pooling
engine = create_engine(
postgres_configs.url,
pool_pre_ping=True, # Verify connection before using
pool_size=20, # Maximum number of permanent connections
max_overflow=10, # Maximum number of additional connections
pool_recycle=600, # Recycle connections after 1 hour
pool_timeout=30, # Wait up to 30 seconds for a connection
echo=True, # Set to True for debugging SQL queries
)
Base = declarative_base()
# Create a cached session factory
@lru_cache()
def get_session_factory() -> scoped_session:
"""Create a thread-safe session factory."""
session_local = sessionmaker(
bind=engine,
autocommit=False,
autoflush=False,
expire_on_commit=True, # Prevent expired object issues
)
return scoped_session(session_local)
# Get database session with proper connection management
@contextmanager
def get_db() -> Generator[Session, None, None]:
"""Get database session with proper connection management.
This context manager ensures:
- Proper connection pooling
- Session cleanup
- Connection return to pool
- Thread safety
Yields:
Session: SQLAlchemy session object
"""
session_factory = get_session_factory()
session = session_factory()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
session_factory.remove() # Clean up the session from the registry

View File

310
Controllers/Redis/base.py Normal file
View File

@@ -0,0 +1,310 @@
"""
Redis key-value operations with structured data handling.
This module provides a class for managing Redis key-value operations with support for:
- Structured data storage and retrieval
- Key pattern generation for searches
- JSON serialization/deserialization
- Type-safe value handling
"""
import arrow
import json
from connection import redis_cli
from typing import Union, Dict, List, Optional, Any, ClassVar
class RedisKeyError(Exception):
"""Exception raised for Redis key-related errors."""
pass
class RedisValueError(Exception):
"""Exception raised for Redis value-related errors."""
pass
class RedisRow:
"""
Handles Redis key-value operations with structured data.
This class provides methods for:
- Managing compound keys with delimiters
- Converting between bytes and string formats
- JSON serialization/deserialization of values
- Pattern generation for Redis key searches
Attributes:
key: The Redis key in bytes or string format
value: The stored value (will be JSON serialized)
delimiter: Character used to separate compound key parts
expires_at: Optional expiration timestamp
"""
key: Union[str, bytes]
value: Optional[str] = None
delimiter: str = ":"
expires_at: Optional[dict] = {"seconds": 60 * 60 * 30}
expires_at_string: Optional[str]
def get_expiry_time(self) -> int | None:
"""Calculate expiry time in seconds from kwargs."""
time_multipliers = {"days": 86400, "hours": 3600, "minutes": 60, "seconds": 1}
if self.expires_at:
return sum(
int(self.expires_at.get(unit, 0)) * multiplier
for unit, multiplier in time_multipliers.items()
)
return None
def merge(self, set_values: List[Union[str, bytes]]) -> None:
"""
Merge list of values into a single delimited key.
Args:
set_values: List of values to merge into key
Example:
>>> RedisRow.merge(["users", "123", "profile"])
>>> print(RedisRow.key)
b'users:123:profile'
"""
if not set_values:
raise RedisKeyError("Cannot merge empty list of values")
merged = []
for value in set_values:
if value is None:
continue
if isinstance(value, bytes):
value = value.decode()
merged.append(str(value))
self.key = self.delimiter.join(merged).encode()
@classmethod
def regex(cls, list_keys: List[Union[str, bytes, None]]) -> str:
"""
Generate Redis search pattern from list of keys.
Args:
list_keys: List of key parts, can include None for wildcards
Returns:
str: Redis key pattern with wildcards
Example:
>>> RedisRow.regex([None, "users", "active"])
'*:users:active'
"""
if not list_keys:
return ""
# Filter and convert valid keys
valid_keys = []
for key in list_keys:
if key is None or str(key) == "None":
continue
if isinstance(key, bytes):
key = key.decode()
valid_keys.append(str(key))
# Build pattern
pattern = cls.delimiter.join(valid_keys)
if not pattern:
return ""
# Add wildcard if first key was None
if list_keys[0] is None:
pattern = f"*{cls.delimiter}{pattern}"
if "*" not in pattern and any([list_key is None for list_key in list_keys]):
pattern = f"{pattern}:*"
return pattern
def parse(self) -> List[str]:
"""
Parse the key into its component parts.
Returns:
List[str]: Key parts split by delimiter
Example:
>>> RedisRow.key = b'users:123:profile'
>>> RedisRow.parse()
['users', '123', 'profile']
"""
if not self.key:
return []
key_str = self.key.decode() if isinstance(self.key, bytes) else self.key
return key_str.split(self.delimiter)
def feed(self, value: Union[bytes, Dict, List, str]) -> None:
"""
Convert and store value in JSON format.
Args:
value: Value to store (bytes, dict, or list)
Raises:
RedisValueError: If value type is not supported
Example:
>>> RedisRow.feed({"name": "John", "age": 30})
>>> print(RedisRow.value)
'{"name": "John", "age": 30}'
"""
try:
if isinstance(value, (dict, list)):
self.value = json.dumps(value)
elif isinstance(value, bytes):
self.value = json.dumps(json.loads(value.decode()))
elif isinstance(value, str):
self.value = value
else:
raise RedisValueError(f"Unsupported value type: {type(value)}")
except json.JSONDecodeError as e:
raise RedisValueError(f"Invalid JSON format: {str(e)}")
def modify(self, add_dict: Dict) -> None:
"""
Modify existing data by merging with new dictionary.
Args:
add_dict: Dictionary to merge with existing data
Example:
>>> RedisRow.feed({"name": "John"})
>>> RedisRow.modify({"age": 30})
>>> print(RedisRow.data)
{"name": "John", "age": 30}
"""
if not isinstance(add_dict, dict):
raise RedisValueError("modify() requires a dictionary argument")
current_data = self.row if self.row else {}
if not isinstance(current_data, dict):
raise RedisValueError("Cannot modify non-dictionary data")
current_data = {
**current_data,
**add_dict,
}
self.feed(current_data)
self.save()
def save(self):
"""
Save the data to Redis with optional expiration.
Raises:
RedisKeyError: If key is not set
RedisValueError: If value is not set
"""
if not self.key:
raise RedisKeyError("Cannot save data without a key")
if not self.value:
raise RedisValueError("Cannot save empty data")
if self.expires_at:
redis_cli.setex(
name=self.redis_key, time=self.get_expiry_time(), value=self.value
)
self.expires_at_string = str(
arrow.now()
.shift(seconds=self.get_expiry_time())
.format("YYYY-MM-DD HH:mm:ss")
)
return self.value
redis_cli.set(name=self.redis_key, value=self.value)
self.expires_at = None
self.expires_at_string = None
return self.value
def remove(self, key: str) -> None:
"""
Remove a key from the stored dictionary.
Args:
key: Key to remove from stored dictionary
Raises:
KeyError: If key doesn't exist
RedisValueError: If stored value is not a dictionary
"""
current_data = self.row
if not isinstance(current_data, dict):
raise RedisValueError("Cannot remove key from non-dictionary data")
try:
current_data.pop(key)
self.feed(current_data)
self.save()
except KeyError:
raise KeyError(f"Key '{key}' not found in stored data")
def delete(self) -> None:
"""Delete the key from Redis."""
try:
redis_cli.delete(self.redis_key)
except Exception as e:
print(f"Error deleting key: {str(e)}")
@property
def keys(self) -> str:
"""
Get key as string.
Returns:
str: Key in string format
"""
return self.key.decode() if isinstance(self.key, bytes) else self.key
def set_key(self, key: Union[str, bytes]) -> None:
"""
Set key ensuring bytes format.
Args:
key: Key in string or bytes format
"""
if not key:
raise RedisKeyError("Cannot set empty key")
self.key = key if isinstance(key, bytes) else str(key).encode()
@property
def redis_key(self) -> bytes:
"""
Get key in bytes format for Redis operations.
Returns:
bytes: Key in bytes format
"""
return self.key if isinstance(self.key, bytes) else str(self.key).encode()
@property
def row(self) -> Union[Dict, List]:
"""
Get stored value as Python object.
Returns:
Union[Dict, List]: Deserialized JSON data
"""
try:
return json.loads(self.value)
except json.JSONDecodeError as e:
raise RedisValueError(f"Invalid JSON format in stored value: {str(e)}")
@property
def as_dict(self) -> Dict[str, Any]:
"""
Get row data as dictionary.
Returns:
Dict[str, Any]: Dictionary with keys and value
"""
return {
"keys": self.keys,
"value": self.row,
}

View File

View File

@@ -0,0 +1,184 @@
import time
from typing import Dict, Any, Optional
from redis import Redis, ConnectionError, TimeoutError
class RedisConn:
"""
Redis connection manager with connection pooling, retry logic,
and health check capabilities.
"""
CONNECTION_RETRIES = 3 # Number of connection retries before failing
RETRY_DELAY = 0.5 # Delay between retries in seconds
DEFAULT_TIMEOUT = 5.0 # Default connection timeout in seconds
def __init__(
self,
config: Optional[Dict[str, Any]] = None,
max_retries: int = CONNECTION_RETRIES,
):
"""
Initialize Redis connection with configuration.
Args:
config: Redis connection configuration dictionary. If None, uses WagRedis config.
max_retries: Maximum number of connection attempts.
"""
self.max_retries = max_retries
self.config = config
self._redis = None
# Add default parameters if not provided
if "socket_timeout" not in self.config:
self.config["socket_timeout"] = self.DEFAULT_TIMEOUT
if "socket_connect_timeout" not in self.config:
self.config["socket_connect_timeout"] = self.DEFAULT_TIMEOUT
if "decode_responses" not in self.config:
self.config["decode_responses"] = True
# Add connection pooling settings if not provided
if "max_connections" not in self.config:
self.config["max_connections"] = 10
# Initialize the connection with retry logic
self._connect_with_retry()
def _connect_with_retry(self) -> None:
"""
Attempt to establish a Redis connection with retry logic.
Raises:
Exception: If all connection attempts fail.
"""
for attempt in range(1, self.max_retries + 1):
try:
self._redis = Redis(**self.config)
if self.check_connection():
return
except (ConnectionError, TimeoutError) as e:
if attempt < self.max_retries:
time.sleep(self.RETRY_DELAY)
else:
raise Exception(
f"Redis connection error after {self.max_retries} attempts: {str(e)}"
)
except Exception as e:
raise
def check_connection(self) -> bool:
"""
Check if the Redis connection is alive with a PING command.
Returns:
bool: True if connection is healthy, False otherwise.
"""
try:
return self._redis.ping()
except Exception as e:
err = e
return False
def set_connection(
self, host: str, password: str, port: int, db: int, **kwargs
) -> Redis:
"""
Recreate Redis connection with new parameters.
Args:
host: Redis server hostname or IP
password: Redis authentication password
port: Redis server port
db: Redis database number
**kwargs: Additional Redis connection parameters
Returns:
Redis: The new Redis connection object
"""
try:
# Update configuration
self.config = {
"host": host,
"password": password,
"port": port,
"db": db,
"socket_timeout": kwargs.get("socket_timeout", self.DEFAULT_TIMEOUT),
"socket_connect_timeout": kwargs.get(
"socket_connect_timeout", self.DEFAULT_TIMEOUT
),
"decode_responses": kwargs.get("decode_responses", True),
"max_connections": kwargs.get("max_connections", 10),
}
# Add any additional parameters
for key, value in kwargs.items():
if key not in self.config:
self.config[key] = value
# Create new connection
self._redis = Redis(**self.config)
if not self.check_connection():
raise ConnectionError(
"Failed to establish connection with new parameters"
)
return self._redis
except Exception as e:
raise
def get_connection_info(self) -> Dict[str, Any]:
"""
Get current connection configuration details.
Returns:
Dict: Current connection configuration
"""
# Create a copy without password for security
info = self.config.copy()
if "password" in info:
info["password"] = "********" if info["password"] else None
return info
def get_stats(self) -> Dict[str, Any]:
"""
Get Redis server statistics.
Returns:
Dict: Redis server info
"""
try:
return self._redis.info()
except Exception as e:
return {"error": str(e)}
@property
def redis(self) -> Redis:
"""
Property to access the Redis client.
Returns:
Redis: The Redis client instance
Raises:
Exception: If Redis connection is not available
"""
if not self._redis:
raise Exception("Redis client is not initialized")
# Check connection health and reconnect if necessary
if not self.check_connection():
self._connect_with_retry()
return self._redis
# Create singleton instance with error handling
try:
redis_conn = RedisConn()
redis_cli = redis_conn.redis
except Exception as t:
# Optionally set a dummy/mock Redis client for testing or fallback behavior
# redis_cli = MockRedis() # If you have a mock implementation
# Or raise the exception to fail fast
raise

View File

@@ -0,0 +1,350 @@
import arrow
from typing import Optional, List, Dict, Union, Iterator
from response import RedisResponse
from connection import redis_cli
from base import RedisRow
class MainConfig:
DATETIME_FORMAT: str = "YYYY-MM-DD HH:mm:ss"
class RedisActions:
"""Class for handling Redis operations with JSON data."""
@classmethod
def get_expiry_time(cls, expiry_kwargs: Dict[str, int]) -> int:
"""
Calculate expiry time in seconds from kwargs.
Args:
expiry_kwargs: Dictionary with time units as keys (days, hours, minutes, seconds)
and their respective values.
Returns:
Total expiry time in seconds.
"""
time_multipliers = {"days": 86400, "hours": 3600, "minutes": 60, "seconds": 1}
return sum(
int(expiry_kwargs.get(unit, 0)) * multiplier
for unit, multiplier in time_multipliers.items()
)
@classmethod
def set_expiry_time(cls, expiry_seconds: int) -> Dict[str, int]:
"""
Convert total seconds back into a dictionary of time units.
Args:
expiry_seconds: Total expiry time in seconds.
Returns:
Dictionary with time units and their values.
"""
time_multipliers = {"days": 86400, "hours": 3600, "minutes": 60, "seconds": 1}
result = {}
remaining_seconds = expiry_seconds
if expiry_seconds < 0:
return {}
for unit, multiplier in time_multipliers.items():
if remaining_seconds >= multiplier:
result[unit], remaining_seconds = divmod(remaining_seconds, multiplier)
return result
@classmethod
def resolve_expires_at(cls, redis_row: RedisRow) -> str:
"""
Resolve expiry time for Redis key.
Args:
redis_row: RedisRow object containing the redis_key.
Returns:
Formatted expiry time string or message indicating no expiry.
"""
expiry_time = redis_cli.ttl(redis_row.redis_key)
if expiry_time == -1:
return "Key has no expiry time."
if expiry_time == -2:
return "Key does not exist."
return arrow.now().shift(seconds=expiry_time).format(MainConfig.DATETIME_FORMAT)
@classmethod
def key_exists(cls, key: Union[str, bytes]) -> bool:
"""
Check if a key exists in Redis without retrieving its value.
Args:
key: Redis key to check.
Returns:
Boolean indicating if key exists.
"""
return bool(redis_cli.exists(key))
@classmethod
def refresh_ttl(
cls, key: Union[str, bytes], expires: Dict[str, int]
) -> RedisResponse:
"""
Refresh TTL for an existing key.
Args:
key: Redis key to refresh TTL.
expires: Dictionary with time units to set new expiry.
Returns:
RedisResponse with operation result.
"""
try:
if not cls.key_exists(key):
return RedisResponse(
status=False,
message="Cannot refresh TTL: Key does not exist.",
)
expiry_time = cls.get_expiry_time(expiry_kwargs=expires)
redis_cli.expire(name=key, time=expiry_time)
expires_at_string = (
arrow.now()
.shift(seconds=expiry_time)
.format(MainConfig.DATETIME_FORMAT)
)
return RedisResponse(
status=True,
message="TTL refreshed successfully.",
data={"key": key, "expires_at": expires_at_string},
)
except Exception as e:
return RedisResponse(
status=False,
message="Failed to refresh TTL.",
error=str(e),
)
@classmethod
def delete_key(cls, key: Union[Optional[str], Optional[bytes]]) -> RedisResponse:
"""
Delete a specific key from Redis.
Args:
key: Redis key to delete.
Returns:
RedisResponse with operation result.
"""
try:
deleted_count = redis_cli.delete(key)
if deleted_count > 0:
return RedisResponse(
status=True,
message="Key deleted successfully.",
data={"deleted_count": deleted_count},
)
return RedisResponse(
status=False,
message="Key not found or already deleted.",
data={"deleted_count": 0},
)
except Exception as e:
return RedisResponse(
status=False,
message="Failed to delete key.",
error=str(e),
)
@classmethod
def delete(
cls, list_keys: List[Union[Optional[str], Optional[bytes]]]
) -> RedisResponse:
"""
Delete multiple keys matching a pattern.
Args:
list_keys: List of key components to form pattern for deletion.
Returns:
RedisResponse with operation result.
"""
try:
regex = RedisRow().regex(list_keys=list_keys)
json_get = redis_cli.scan_iter(match=regex)
deleted_keys, deleted_count = [], 0
# Use pipeline for batch deletion
with redis_cli.pipeline() as pipe:
for row in json_get:
pipe.delete(row)
deleted_keys.append(row)
results = pipe.execute()
deleted_count = sum(results)
return RedisResponse(
status=True,
message="Keys deleted successfully.",
data={"deleted_count": deleted_count, "deleted_keys": deleted_keys},
)
except Exception as e:
return RedisResponse(
status=False,
message="Failed to delete keys.",
error=str(e),
)
@classmethod
def set_json(
cls,
list_keys: List[Union[str, bytes]],
value: Optional[Union[Dict, List]],
expires: Optional[Dict[str, int]] = None,
) -> RedisResponse:
"""
Set JSON value in Redis with optional expiry.
Args:
list_keys: List of key components to form Redis key.
value: JSON-serializable data to store.
expires: Optional dictionary with time units for expiry.
Returns:
RedisResponse with operation result.
"""
redis_row = RedisRow()
redis_row.merge(set_values=list_keys)
redis_row.feed(value)
redis_row.expires_at_string = None
redis_row.expires_at = None
try:
if expires:
redis_row.expires_at = expires
expiry_time = cls.get_expiry_time(expiry_kwargs=expires)
redis_cli.setex(
name=redis_row.redis_key,
time=expiry_time,
value=redis_row.value,
)
redis_row.expires_at_string = str(
arrow.now()
.shift(seconds=expiry_time)
.format(MainConfig.DATETIME_FORMAT)
)
else:
redis_cli.set(name=redis_row.redis_key, value=redis_row.value)
return RedisResponse(
status=True,
message="Value set successfully.",
data=redis_row,
)
except Exception as e:
return RedisResponse(
status=False,
message="Failed to set value.",
error=str(e),
)
@classmethod
def get_json(
cls,
list_keys: List[Union[Optional[str], Optional[bytes]]],
limit: Optional[int] = None,
) -> RedisResponse:
"""
Get JSON values from Redis using pattern matching.
Args:
list_keys: List of key components to form pattern for retrieval.
limit: Optional limit on number of results to return.
Returns:
RedisResponse with operation result.
"""
try:
list_of_rows, count = [], 0
regex = RedisRow.regex(list_keys=list_keys)
json_get = redis_cli.scan_iter(match=regex)
for row in json_get:
if limit is not None and count >= limit:
break
redis_row = RedisRow()
redis_row.set_key(key=row)
# Use pipeline for batch retrieval
with redis_cli.pipeline() as pipe:
pipe.get(row)
pipe.ttl(row)
redis_value, redis_value_expire = pipe.execute()
redis_row.expires_at = cls.set_expiry_time(
expiry_seconds=int(redis_value_expire)
)
redis_row.expires_at_string = cls.resolve_expires_at(
redis_row=redis_row
)
redis_row.feed(redis_value)
list_of_rows.append(redis_row)
count += 1
if list_of_rows:
return RedisResponse(
status=True,
message="Values retrieved successfully.",
data=list_of_rows,
)
return RedisResponse(
status=False,
message="No matching keys found.",
data=list_of_rows,
)
except Exception as e:
return RedisResponse(
status=False,
message="Failed to retrieve values.",
error=str(e),
)
@classmethod
def get_json_iterator(
cls, list_keys: List[Union[Optional[str], Optional[bytes]]]
) -> Iterator[RedisRow]:
"""
Get JSON values from Redis as an iterator for memory-efficient processing of large datasets.
Args:
list_keys: List of key components to form pattern for retrieval.
Returns:
Iterator yielding RedisRow objects.
"""
regex = RedisRow.regex(list_keys=list_keys)
json_get = redis_cli.scan_iter(match=regex)
for row in json_get:
try:
redis_row = RedisRow()
redis_row.set_key(key=row)
# Use pipeline for batch retrieval
with redis_cli.pipeline() as pipe:
pipe.get(row)
pipe.ttl(row)
redis_value, redis_value_expire = pipe.execute()
redis_row.expires_at = cls.set_expiry_time(
expiry_seconds=int(redis_value_expire)
)
redis_row.expires_at_string = cls.resolve_expires_at(
redis_row=redis_row
)
redis_row.feed(redis_value)
yield redis_row
except Exception as e:
err = e
continue

View File

View File

@@ -0,0 +1,71 @@
from typing import Union, Dict, List, Optional, Any
from base import RedisRow
class RedisResponse:
"""Base class for Redis response handling."""
def __init__(
self,
status: bool,
message: str,
data: Any = None,
error: Optional[str] = None,
):
self.status = status
self.message = message
self.data = data
if isinstance(data, Dict):
self.data_type = "dict"
elif isinstance(data, list):
self.data_type = "list"
elif isinstance(data, RedisRow):
self.data_type = "row"
elif data is None:
self.data_type = None
self.error = error
def as_dict(self) -> Dict:
data = self.all
main_dict = {
"status": self.status, "message": self.message,
"count": self.count, "dataType": getattr(self, "data_type", None),
}
if isinstance(data, RedisRow):
dict_return = {data.keys: data.row}
dict_return.update(dict(main_dict))
return dict_return
elif isinstance(data, list):
dict_return = {}
for row in data:
if isinstance(row, RedisRow):
dict_return.update({row.keys: row.row})
dict_return.update(dict(main_dict))
return dict_return
return main_dict
@property
def all(self) -> Union[Optional[List[RedisRow]]]:
return self.data or []
@property
def count(self) -> int:
row = self.all
if isinstance(row, list):
return len(row)
elif isinstance(row, RedisRow):
return 1
return 0
@property
def first(self) -> Union[RedisRow, Dict, None]:
if self.data:
if isinstance(self.data, list):
if isinstance(self.data[0], RedisRow):
return self.data[0].row
return self.data[0]
elif isinstance(self.data, RedisRow):
return self.data.row
self.status = False
return None