from Controllers.Redis.database import RedisActions import threading import time import random import uuid import concurrent.futures def example_set_json() -> None: """Example of setting JSON data in Redis with and without expiry.""" # Example 1: Set JSON without expiry data = {"name": "John", "age": 30, "city": "New York"} keys = ["user", "profile", "123"] result = RedisActions.set_json(list_keys=keys, value=data) print("Set JSON without expiry:", result.as_dict()) # Example 2: Set JSON with expiry expiry = {"hours": 1, "minutes": 30} result = RedisActions.set_json(list_keys=keys, value=data, expires=expiry) print("Set JSON with expiry:", result.as_dict()) def example_get_json() -> None: """Example of retrieving JSON data from Redis.""" # Example 1: Get all matching keys keys = ["user", "profile", "*"] result = RedisActions.get_json(list_keys=keys) print("Get all matching JSON:", result.as_dict()) # Example 2: Get with limit result = RedisActions.get_json(list_keys=keys, limit=5) print("Get JSON with limit:", result.as_dict()) def example_get_json_iterator() -> None: """Example of using the JSON iterator for large datasets.""" keys = ["user", "profile", "*"] for row in RedisActions.get_json_iterator(list_keys=keys): print( "Iterating over JSON row:", row.as_dict if isinstance(row.as_dict, dict) else row.as_dict, ) def example_delete_key() -> None: """Example of deleting a specific key.""" key = "user:profile:123" result = RedisActions.delete_key(key) print("Delete specific key:", result) def example_delete() -> None: """Example of deleting multiple keys matching a pattern.""" keys = ["user", "profile", "*"] result = RedisActions.delete(list_keys=keys) print("Delete multiple keys:", result) def example_refresh_ttl() -> None: """Example of refreshing TTL for a key.""" key = "user:profile:123" new_expiry = {"hours": 2, "minutes": 0} result = RedisActions.refresh_ttl(key=key, expires=new_expiry) print("Refresh TTL:", result.as_dict()) def example_key_exists() -> None: """Example of checking if a key exists.""" key = "user:profile:123" exists = RedisActions.key_exists(key) print(f"Key {key} exists:", exists) def example_resolve_expires_at() -> None: """Example of resolving expiry time for a key.""" from Controllers.Redis.base import RedisRow redis_row = RedisRow() redis_row.set_key("user:profile:123") print(redis_row.keys) expires_at = RedisActions.resolve_expires_at(redis_row) print("Resolve expires at:", expires_at) def run_all_examples() -> None: """Run all example functions to demonstrate RedisActions functionality.""" print("\n=== Redis Actions Examples ===\n") print("1. Setting JSON data:") example_set_json() print("\n2. Getting JSON data:") example_get_json() print("\n3. Using JSON iterator:") example_get_json_iterator() # print("\n4. Deleting specific key:") # example_delete_key() # # print("\n5. Deleting multiple keys:") # example_delete() print("\n6. Refreshing TTL:") example_refresh_ttl() print("\n7. Checking key existence:") example_key_exists() print("\n8. Resolving expiry time:") example_resolve_expires_at() def run_concurrent_test(num_threads=100): """Run a comprehensive concurrent test with multiple threads to verify Redis connection handling.""" print( f"\nStarting comprehensive Redis concurrent test with {num_threads} threads..." ) # Results tracking with detailed metrics results = { "passed": 0, "failed": 0, "retried": 0, "errors": [], "operation_times": [], "retry_count": 0, "max_retries": 3, "retry_delay": 0.1, } results_lock = threading.Lock() def worker(thread_id): # Track operation timing start_time = time.time() retry_count = 0 success = False error_message = None while retry_count <= results["max_retries"] and not success: try: # Generate unique key for this thread unique_id = str(uuid.uuid4())[:8] full_key = f"test:concurrent:{thread_id}:{unique_id}" # Simple string operations instead of JSON test_value = f"test-value-{thread_id}-{time.time()}" # Set data in Redis with pipeline for efficiency from Controllers.Redis.database import redis_cli # Use pipeline to reduce network overhead with redis_cli.pipeline() as pipe: pipe.set(full_key, test_value) pipe.get(full_key) pipe.delete(full_key) results_list = pipe.execute() # Check results set_ok = results_list[0] retrieved_value = results_list[1] if isinstance(retrieved_value, bytes): retrieved_value = retrieved_value.decode("utf-8") # Verify data success = set_ok and retrieved_value == test_value if success: break else: error_message = f"Data verification failed: set_ok={set_ok}, value_match={retrieved_value == test_value}" retry_count += 1 with results_lock: results["retry_count"] += 1 time.sleep( results["retry_delay"] * (2**retry_count) ) # Exponential backoff except Exception as e: error_message = str(e) retry_count += 1 with results_lock: results["retry_count"] += 1 # Check if it's a connection error and retry if "Too many connections" in str(e) or "Connection" in str(e): # Exponential backoff for connection issues backoff_time = results["retry_delay"] * (2**retry_count) time.sleep(backoff_time) else: # For other errors, use a smaller delay time.sleep(results["retry_delay"]) # Record operation time operation_time = time.time() - start_time # Update results with results_lock: if success: results["passed"] += 1 results["operation_times"].append(operation_time) if retry_count > 0: results["retried"] += 1 else: results["failed"] += 1 if error_message: results["errors"].append( f"Thread {thread_id} failed after {retry_count} retries: {error_message}" ) else: results["errors"].append( f"Thread {thread_id} failed after {retry_count} retries with unknown error" ) # Create and start threads using a thread pool start_time = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: futures = [executor.submit(worker, i) for i in range(num_threads)] concurrent.futures.wait(futures) # Calculate execution time and performance metrics execution_time = time.time() - start_time ops_per_second = num_threads / execution_time if execution_time > 0 else 0 # Calculate additional metrics if we have successful operations avg_op_time = 0 min_op_time = 0 max_op_time = 0 p95_op_time = 0 if results["operation_times"]: avg_op_time = sum(results["operation_times"]) / len(results["operation_times"]) min_op_time = min(results["operation_times"]) max_op_time = max(results["operation_times"]) # Calculate 95th percentile sorted_times = sorted(results["operation_times"]) p95_index = int(len(sorted_times) * 0.95) p95_op_time = ( sorted_times[p95_index] if p95_index < len(sorted_times) else sorted_times[-1] ) # Print detailed results print("\nConcurrent Redis Test Results:") print(f"Total threads: {num_threads}") print(f"Passed: {results['passed']}") print(f"Failed: {results['failed']}") print(f"Operations with retries: {results['retried']}") print(f"Total retry attempts: {results['retry_count']}") print(f"Success rate: {(results['passed'] / num_threads) * 100:.2f}%") print("\nPerformance Metrics:") print(f"Total execution time: {execution_time:.2f} seconds") print(f"Operations per second: {ops_per_second:.2f}") if results["operation_times"]: print(f"Average operation time: {avg_op_time * 1000:.2f} ms") print(f"Minimum operation time: {min_op_time * 1000:.2f} ms") print(f"Maximum operation time: {max_op_time * 1000:.2f} ms") print(f"95th percentile operation time: {p95_op_time * 1000:.2f} ms") # Print errors (limited to 10 for readability) if results["errors"]: print("\nErrors:") for i, error in enumerate(results["errors"][:10]): print(f"- {error}") if len(results["errors"]) > 10: print(f"- ... and {len(results['errors']) - 10} more errors") # Return results for potential further analysis return results if __name__ == "__main__": # Run basic examples run_all_examples() # Run enhanced concurrent test run_concurrent_test(10000)