From 6ab4410a82aebe8d638b19ccd97d145299224a11 Mon Sep 17 00:00:00 2001 From: berkay Date: Sun, 20 Apr 2025 14:58:55 +0300 Subject: [PATCH] updated redis impl --- Controllers/Redis/README.md | 85 ++++++++++++++ Controllers/Redis/config.py | 8 +- Controllers/Redis/connection.py | 19 +++- Controllers/Redis/implementations.py | 158 +++++++++++++++++++++++++++ 4 files changed, 264 insertions(+), 6 deletions(-) create mode 100644 Controllers/Redis/README.md diff --git a/Controllers/Redis/README.md b/Controllers/Redis/README.md new file mode 100644 index 0000000..80bda34 --- /dev/null +++ b/Controllers/Redis/README.md @@ -0,0 +1,85 @@ +# Redis Controller + +## Overview +This module provides a robust, thread-safe Redis connection handler with comprehensive concurrent operation testing. The Redis controller is designed for high-performance, resilient database connection management that can handle multiple simultaneous operations efficiently. + +## Features +- Singleton pattern for efficient connection management +- Connection pooling with configurable settings +- Automatic retry capabilities for Redis operations +- Thread-safe operations with proper error handling +- Comprehensive JSON data handling +- TTL management and expiry time resolution +- Efficient batch operations using Redis pipelines + +## Configuration +The Redis controller is configured with the following default settings: +- Host: 10.10.2.15 +- Port: 6379 +- DB: 0 +- Connection pool size: 50 connections +- Health check interval: 30 seconds +- Socket timeout: 5.0 seconds +- Retry on timeout: Enabled +- Socket keepalive: Enabled + +## Usage Examples +The controller provides several high-level methods for Redis operations: +- `set_json`: Store JSON data with optional expiry +- `get_json`: Retrieve JSON data with pattern matching +- `get_json_iterator`: Memory-efficient iterator for large datasets +- `delete`: Remove keys matching a pattern +- `refresh_ttl`: Update expiry time for existing keys +- `key_exists`: Check if a key exists without retrieving it +- `resolve_expires_at`: Get human-readable expiry time + +## Concurrent Performance Testing +The Redis controller has been thoroughly tested for concurrent operations with impressive results: + +### Test Configuration +- 10,000 concurrent threads +- Each thread performs a set, get, and delete operation +- Pipeline used for efficient batching +- Exponential backoff for connection errors +- Comprehensive error tracking and reporting + +### Test Results +``` +Concurrent Redis Test Results: +Total threads: 10000 +Passed: 10000 +Failed: 0 +Operations with retries: 0 +Total retry attempts: 0 +Success rate: 100.00% + +Performance Metrics: +Total execution time: 4.30 seconds +Operations per second: 2324.35 +Average operation time: 1.92 ms +Minimum operation time: 0.43 ms +Maximum operation time: 40.45 ms +95th percentile operation time: 4.14 ms +``` + +## Thread Safety +The Redis controller is designed to be thread-safe with the following mechanisms: +- Connection pooling to manage concurrent connections efficiently +- Thread-local storage for operation-specific data +- Atomic operations using Redis pipelines +- Proper error handling and retry logic for connection issues +- Exponential backoff for handling connection limits + +## Error Handling +The controller implements comprehensive error handling: +- Connection errors are automatically retried with exponential backoff +- Detailed error reporting with context-specific information +- Graceful degradation under high load +- Connection health monitoring and automatic reconnection + +## Best Practices +- Use pipelines for batching multiple operations +- Implement proper key naming conventions +- Set appropriate TTL values for cached data +- Monitor connection pool usage in production +- Use the JSON iterator for large datasets to minimize memory usage diff --git a/Controllers/Redis/config.py b/Controllers/Redis/config.py index f9f10b4..337bff3 100644 --- a/Controllers/Redis/config.py +++ b/Controllers/Redis/config.py @@ -3,12 +3,12 @@ from pydantic_settings import BaseSettings, SettingsConfigDict class Configs(BaseSettings): """ - MongoDB configuration settings. + Redis configuration settings. """ - HOST: str = "" - PASSWORD: str = "" - PORT: int = 0 + HOST: str = "10.10.2.15" + PASSWORD: str = "your_strong_password_here" + PORT: int = 6379 DB: int = 0 def as_dict(self): diff --git a/Controllers/Redis/connection.py b/Controllers/Redis/connection.py index 2b189f8..73b974b 100644 --- a/Controllers/Redis/connection.py +++ b/Controllers/Redis/connection.py @@ -40,7 +40,19 @@ class RedisConn: # Add connection pooling settings if not provided if "max_connections" not in self.config: - self.config["max_connections"] = 10 + self.config["max_connections"] = 50 # Increased for better concurrency + + # Add connection timeout settings + if "health_check_interval" not in self.config: + self.config["health_check_interval"] = 30 # Health check every 30 seconds + + # Add retry settings for operations + if "retry_on_timeout" not in self.config: + self.config["retry_on_timeout"] = True + + # Add connection pool settings for better performance + if "socket_keepalive" not in self.config: + self.config["socket_keepalive"] = True # Initialize the connection with retry logic self._connect_with_retry() @@ -124,7 +136,10 @@ class RedisConn: "socket_connect_timeout", self.DEFAULT_TIMEOUT ), "decode_responses": kwargs.get("decode_responses", True), - "max_connections": kwargs.get("max_connections", 10), + "max_connections": kwargs.get("max_connections", 50), + "health_check_interval": kwargs.get("health_check_interval", 30), + "retry_on_timeout": kwargs.get("retry_on_timeout", True), + "socket_keepalive": kwargs.get("socket_keepalive", True), } # Add any additional parameters diff --git a/Controllers/Redis/implementations.py b/Controllers/Redis/implementations.py index 87fecda..9d108d4 100644 --- a/Controllers/Redis/implementations.py +++ b/Controllers/Redis/implementations.py @@ -1,4 +1,9 @@ from Controllers.Redis.database import RedisActions +import threading +import time +import random +import uuid +import concurrent.futures def example_set_json() -> None: @@ -106,5 +111,158 @@ def run_all_examples() -> None: example_resolve_expires_at() +def run_concurrent_test(num_threads=100): + """Run a comprehensive concurrent test with multiple threads to verify Redis connection handling.""" + print(f"\nStarting comprehensive Redis concurrent test with {num_threads} threads...") + + # Results tracking with detailed metrics + results = { + "passed": 0, + "failed": 0, + "retried": 0, + "errors": [], + "operation_times": [], + "retry_count": 0, + "max_retries": 3, + "retry_delay": 0.1 + } + results_lock = threading.Lock() + + def worker(thread_id): + # Track operation timing + start_time = time.time() + retry_count = 0 + success = False + error_message = None + + while retry_count <= results["max_retries"] and not success: + try: + # Generate unique key for this thread + unique_id = str(uuid.uuid4())[:8] + full_key = f"test:concurrent:{thread_id}:{unique_id}" + + # Simple string operations instead of JSON + test_value = f"test-value-{thread_id}-{time.time()}" + + # Set data in Redis with pipeline for efficiency + from Controllers.Redis.database import redis_cli + + # Use pipeline to reduce network overhead + with redis_cli.pipeline() as pipe: + pipe.set(full_key, test_value) + pipe.get(full_key) + pipe.delete(full_key) + results_list = pipe.execute() + + # Check results + set_ok = results_list[0] + retrieved_value = results_list[1] + if isinstance(retrieved_value, bytes): + retrieved_value = retrieved_value.decode('utf-8') + + # Verify data + success = set_ok and retrieved_value == test_value + + if success: + break + else: + error_message = f"Data verification failed: set_ok={set_ok}, value_match={retrieved_value == test_value}" + retry_count += 1 + with results_lock: + results["retry_count"] += 1 + time.sleep(results["retry_delay"] * (2 ** retry_count)) # Exponential backoff + + except Exception as e: + error_message = str(e) + retry_count += 1 + with results_lock: + results["retry_count"] += 1 + + # Check if it's a connection error and retry + if "Too many connections" in str(e) or "Connection" in str(e): + # Exponential backoff for connection issues + backoff_time = results["retry_delay"] * (2 ** retry_count) + time.sleep(backoff_time) + else: + # For other errors, use a smaller delay + time.sleep(results["retry_delay"]) + + # Record operation time + operation_time = time.time() - start_time + + # Update results + with results_lock: + if success: + results["passed"] += 1 + results["operation_times"].append(operation_time) + if retry_count > 0: + results["retried"] += 1 + else: + results["failed"] += 1 + if error_message: + results["errors"].append(f"Thread {thread_id} failed after {retry_count} retries: {error_message}") + else: + results["errors"].append(f"Thread {thread_id} failed after {retry_count} retries with unknown error") + + # Create and start threads using a thread pool + start_time = time.time() + with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: + futures = [executor.submit(worker, i) for i in range(num_threads)] + concurrent.futures.wait(futures) + + # Calculate execution time and performance metrics + execution_time = time.time() - start_time + ops_per_second = num_threads / execution_time if execution_time > 0 else 0 + + # Calculate additional metrics if we have successful operations + avg_op_time = 0 + min_op_time = 0 + max_op_time = 0 + p95_op_time = 0 + + if results["operation_times"]: + avg_op_time = sum(results["operation_times"]) / len(results["operation_times"]) + min_op_time = min(results["operation_times"]) + max_op_time = max(results["operation_times"]) + # Calculate 95th percentile + sorted_times = sorted(results["operation_times"]) + p95_index = int(len(sorted_times) * 0.95) + p95_op_time = sorted_times[p95_index] if p95_index < len(sorted_times) else sorted_times[-1] + + # Print detailed results + print("\nConcurrent Redis Test Results:") + print(f"Total threads: {num_threads}") + print(f"Passed: {results['passed']}") + print(f"Failed: {results['failed']}") + print(f"Operations with retries: {results['retried']}") + print(f"Total retry attempts: {results['retry_count']}") + print(f"Success rate: {(results['passed'] / num_threads) * 100:.2f}%") + + print("\nPerformance Metrics:") + print(f"Total execution time: {execution_time:.2f} seconds") + print(f"Operations per second: {ops_per_second:.2f}") + + if results["operation_times"]: + print(f"Average operation time: {avg_op_time * 1000:.2f} ms") + print(f"Minimum operation time: {min_op_time * 1000:.2f} ms") + print(f"Maximum operation time: {max_op_time * 1000:.2f} ms") + print(f"95th percentile operation time: {p95_op_time * 1000:.2f} ms") + + # Print errors (limited to 10 for readability) + if results["errors"]: + print("\nErrors:") + for i, error in enumerate(results["errors"][:10]): + print(f"- {error}") + if len(results["errors"]) > 10: + print(f"- ... and {len(results['errors']) - 10} more errors") + + # Return results for potential further analysis + return results + + if __name__ == "__main__": + # Run basic examples run_all_examples() + + # Run enhanced concurrent test + run_concurrent_test(10000)