prod-wag-backend-automate-s.../Controllers/Redis/implementations.py

269 lines
9.5 KiB
Python

from Controllers.Redis.database import RedisActions
import threading
import time
import random
import uuid
import concurrent.futures
def example_set_json() -> None:
"""Example of setting JSON data in Redis with and without expiry."""
# Example 1: Set JSON without expiry
data = {"name": "John", "age": 30, "city": "New York"}
keys = ["user", "profile", "123"]
result = RedisActions.set_json(list_keys=keys, value=data)
print("Set JSON without expiry:", result.as_dict())
# Example 2: Set JSON with expiry
expiry = {"hours": 1, "minutes": 30}
result = RedisActions.set_json(list_keys=keys, value=data, expires=expiry)
print("Set JSON with expiry:", result.as_dict())
def example_get_json() -> None:
"""Example of retrieving JSON data from Redis."""
# Example 1: Get all matching keys
keys = ["user", "profile", "*"]
result = RedisActions.get_json(list_keys=keys)
print("Get all matching JSON:", result.as_dict())
# Example 2: Get with limit
result = RedisActions.get_json(list_keys=keys, limit=5)
print("Get JSON with limit:", result.as_dict())
def example_get_json_iterator() -> None:
"""Example of using the JSON iterator for large datasets."""
keys = ["user", "profile", "*"]
for row in RedisActions.get_json_iterator(list_keys=keys):
print(
"Iterating over JSON row:",
row.as_dict if isinstance(row.as_dict, dict) else row.as_dict,
)
def example_delete_key() -> None:
"""Example of deleting a specific key."""
key = "user:profile:123"
result = RedisActions.delete_key(key)
print("Delete specific key:", result)
def example_delete() -> None:
"""Example of deleting multiple keys matching a pattern."""
keys = ["user", "profile", "*"]
result = RedisActions.delete(list_keys=keys)
print("Delete multiple keys:", result)
def example_refresh_ttl() -> None:
"""Example of refreshing TTL for a key."""
key = "user:profile:123"
new_expiry = {"hours": 2, "minutes": 0}
result = RedisActions.refresh_ttl(key=key, expires=new_expiry)
print("Refresh TTL:", result.as_dict())
def example_key_exists() -> None:
"""Example of checking if a key exists."""
key = "user:profile:123"
exists = RedisActions.key_exists(key)
print(f"Key {key} exists:", exists)
def example_resolve_expires_at() -> None:
"""Example of resolving expiry time for a key."""
from Controllers.Redis.base import RedisRow
redis_row = RedisRow()
redis_row.set_key("user:profile:123")
print(redis_row.keys)
expires_at = RedisActions.resolve_expires_at(redis_row)
print("Resolve expires at:", expires_at)
def run_all_examples() -> None:
"""Run all example functions to demonstrate RedisActions functionality."""
print("\n=== Redis Actions Examples ===\n")
print("1. Setting JSON data:")
example_set_json()
print("\n2. Getting JSON data:")
example_get_json()
print("\n3. Using JSON iterator:")
example_get_json_iterator()
# print("\n4. Deleting specific key:")
# example_delete_key()
#
# print("\n5. Deleting multiple keys:")
# example_delete()
print("\n6. Refreshing TTL:")
example_refresh_ttl()
print("\n7. Checking key existence:")
example_key_exists()
print("\n8. Resolving expiry time:")
example_resolve_expires_at()
def run_concurrent_test(num_threads=100):
"""Run a comprehensive concurrent test with multiple threads to verify Redis connection handling."""
print(f"\nStarting comprehensive Redis concurrent test with {num_threads} threads...")
# Results tracking with detailed metrics
results = {
"passed": 0,
"failed": 0,
"retried": 0,
"errors": [],
"operation_times": [],
"retry_count": 0,
"max_retries": 3,
"retry_delay": 0.1
}
results_lock = threading.Lock()
def worker(thread_id):
# Track operation timing
start_time = time.time()
retry_count = 0
success = False
error_message = None
while retry_count <= results["max_retries"] and not success:
try:
# Generate unique key for this thread
unique_id = str(uuid.uuid4())[:8]
full_key = f"test:concurrent:{thread_id}:{unique_id}"
# Simple string operations instead of JSON
test_value = f"test-value-{thread_id}-{time.time()}"
# Set data in Redis with pipeline for efficiency
from Controllers.Redis.database import redis_cli
# Use pipeline to reduce network overhead
with redis_cli.pipeline() as pipe:
pipe.set(full_key, test_value)
pipe.get(full_key)
pipe.delete(full_key)
results_list = pipe.execute()
# Check results
set_ok = results_list[0]
retrieved_value = results_list[1]
if isinstance(retrieved_value, bytes):
retrieved_value = retrieved_value.decode('utf-8')
# Verify data
success = set_ok and retrieved_value == test_value
if success:
break
else:
error_message = f"Data verification failed: set_ok={set_ok}, value_match={retrieved_value == test_value}"
retry_count += 1
with results_lock:
results["retry_count"] += 1
time.sleep(results["retry_delay"] * (2 ** retry_count)) # Exponential backoff
except Exception as e:
error_message = str(e)
retry_count += 1
with results_lock:
results["retry_count"] += 1
# Check if it's a connection error and retry
if "Too many connections" in str(e) or "Connection" in str(e):
# Exponential backoff for connection issues
backoff_time = results["retry_delay"] * (2 ** retry_count)
time.sleep(backoff_time)
else:
# For other errors, use a smaller delay
time.sleep(results["retry_delay"])
# Record operation time
operation_time = time.time() - start_time
# Update results
with results_lock:
if success:
results["passed"] += 1
results["operation_times"].append(operation_time)
if retry_count > 0:
results["retried"] += 1
else:
results["failed"] += 1
if error_message:
results["errors"].append(f"Thread {thread_id} failed after {retry_count} retries: {error_message}")
else:
results["errors"].append(f"Thread {thread_id} failed after {retry_count} retries with unknown error")
# Create and start threads using a thread pool
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [executor.submit(worker, i) for i in range(num_threads)]
concurrent.futures.wait(futures)
# Calculate execution time and performance metrics
execution_time = time.time() - start_time
ops_per_second = num_threads / execution_time if execution_time > 0 else 0
# Calculate additional metrics if we have successful operations
avg_op_time = 0
min_op_time = 0
max_op_time = 0
p95_op_time = 0
if results["operation_times"]:
avg_op_time = sum(results["operation_times"]) / len(results["operation_times"])
min_op_time = min(results["operation_times"])
max_op_time = max(results["operation_times"])
# Calculate 95th percentile
sorted_times = sorted(results["operation_times"])
p95_index = int(len(sorted_times) * 0.95)
p95_op_time = sorted_times[p95_index] if p95_index < len(sorted_times) else sorted_times[-1]
# Print detailed results
print("\nConcurrent Redis Test Results:")
print(f"Total threads: {num_threads}")
print(f"Passed: {results['passed']}")
print(f"Failed: {results['failed']}")
print(f"Operations with retries: {results['retried']}")
print(f"Total retry attempts: {results['retry_count']}")
print(f"Success rate: {(results['passed'] / num_threads) * 100:.2f}%")
print("\nPerformance Metrics:")
print(f"Total execution time: {execution_time:.2f} seconds")
print(f"Operations per second: {ops_per_second:.2f}")
if results["operation_times"]:
print(f"Average operation time: {avg_op_time * 1000:.2f} ms")
print(f"Minimum operation time: {min_op_time * 1000:.2f} ms")
print(f"Maximum operation time: {max_op_time * 1000:.2f} ms")
print(f"95th percentile operation time: {p95_op_time * 1000:.2f} ms")
# Print errors (limited to 10 for readability)
if results["errors"]:
print("\nErrors:")
for i, error in enumerate(results["errors"][:10]):
print(f"- {error}")
if len(results["errors"]) > 10:
print(f"- ... and {len(results['errors']) - 10} more errors")
# Return results for potential further analysis
return results
if __name__ == "__main__":
# Run basic examples
run_all_examples()
# Run enhanced concurrent test
run_concurrent_test(10000)