#!/usr/bin/env python3 """ Redis Load Test Script This script tests Redis performance by running multiple operations (create, update, delete) across 100 concurrent threads. """ import os import time import random import string import threading import logging import argparse from concurrent.futures import ThreadPoolExecutor from dotenv import load_dotenv import redis # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("redis_test.log"), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Load environment variables from .env file load_dotenv() # Redis connection parameters REDIS_HOST = os.getenv("REDIS_HOST", "localhost") REDIS_PORT = int(os.getenv("REDIS_PORT", 6379)) REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", "your_strong_password_here") REDIS_DB = int(os.getenv("REDIS_DB", 0)) # Test parameters NUM_THREADS = 100 NUM_KEYS_PER_THREAD = 100 KEY_PREFIX = "test_key_" VALUE_SIZE = 1024 # Size of random string value in bytes # Statistics success_count = 0 failure_count = 0 lock = threading.Lock() def get_redis_connection(): """Create and return a Redis connection""" try: client = redis.Redis( host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, db=REDIS_DB, socket_timeout=5, decode_responses=True ) # Test connection client.ping() return client except redis.ConnectionError as e: logger.error(f"Failed to connect to Redis: {e}") raise def generate_random_string(size): """Generate a random string of specified size""" return ''.join(random.choices(string.ascii_letters + string.digits, k=size)) def create_keys(thread_id): """Create multiple Redis keys with random values""" client = get_redis_connection() global success_count, failure_count keys_created = 0 start_time = time.time() for i in range(NUM_KEYS_PER_THREAD): key = f"{KEY_PREFIX}{thread_id}_{i}" value = generate_random_string(VALUE_SIZE) try: client.set(key, value) keys_created += 1 except Exception as e: logger.error(f"Thread {thread_id} - Failed to create key {key}: {e}") with lock: failure_count += 1 duration = time.time() - start_time logger.info(f"Thread {thread_id} - Created {keys_created} keys in {duration:.2f} seconds") with lock: success_count += keys_created return keys_created def update_keys(thread_id): """Update existing Redis keys with new values""" client = get_redis_connection() global success_count, failure_count keys_updated = 0 start_time = time.time() for i in range(NUM_KEYS_PER_THREAD): key = f"{KEY_PREFIX}{thread_id}_{i}" # Check if key exists if client.exists(key): new_value = generate_random_string(VALUE_SIZE) try: client.set(key, new_value) keys_updated += 1 except Exception as e: logger.error(f"Thread {thread_id} - Failed to update key {key}: {e}") with lock: failure_count += 1 duration = time.time() - start_time logger.info(f"Thread {thread_id} - Updated {keys_updated} keys in {duration:.2f} seconds") with lock: success_count += keys_updated return keys_updated def delete_keys(thread_id): """Delete Redis keys created by this thread""" client = get_redis_connection() global success_count, failure_count keys_deleted = 0 start_time = time.time() for i in range(NUM_KEYS_PER_THREAD): key = f"{KEY_PREFIX}{thread_id}_{i}" try: if client.delete(key): keys_deleted += 1 except Exception as e: logger.error(f"Thread {thread_id} - Failed to delete key {key}: {e}") with lock: failure_count += 1 duration = time.time() - start_time logger.info(f"Thread {thread_id} - Deleted {keys_deleted} keys in {duration:.2f} seconds") with lock: success_count += keys_deleted return keys_deleted def run_test(operation): """Run the specified operation across multiple threads""" global success_count, failure_count success_count = 0 failure_count = 0 logger.info(f"Starting {operation} test with {NUM_THREADS} threads") start_time = time.time() with ThreadPoolExecutor(max_workers=NUM_THREADS) as executor: if operation == "create": results = list(executor.map(create_keys, range(NUM_THREADS))) elif operation == "update": results = list(executor.map(update_keys, range(NUM_THREADS))) elif operation == "delete": results = list(executor.map(delete_keys, range(NUM_THREADS))) duration = time.time() - start_time total_operations = sum(results) ops_per_second = total_operations / duration if duration > 0 else 0 logger.info(f"Test completed: {operation}") logger.info(f"Total operations: {total_operations}") logger.info(f"Successful operations: {success_count}") logger.info(f"Failed operations: {failure_count}") logger.info(f"Duration: {duration:.2f} seconds") logger.info(f"Operations per second: {ops_per_second:.2f}") return { "operation": operation, "total_operations": total_operations, "successful_operations": success_count, "failed_operations": failure_count, "duration": duration, "operations_per_second": ops_per_second } def run_all_tests(): """Run all tests (create, update, delete) in sequence""" results = [] # Create test results.append(run_test("create")) # Update test results.append(run_test("update")) # Delete test results.append(run_test("delete")) return results def print_summary(results): """Print a summary of all test results""" logger.info("\n===== TEST SUMMARY =====") for result in results: logger.info(f"\nOperation: {result['operation'].upper()}") logger.info(f"Total operations: {result['total_operations']}") logger.info(f"Successful operations: {result['successful_operations']}") logger.info(f"Failed operations: {result['failed_operations']}") logger.info(f"Duration: {result['duration']:.2f} seconds") logger.info(f"Operations per second: {result['operations_per_second']:.2f}") logger.info("\n=========================") def main(): """Main function to parse arguments and run tests""" parser = argparse.ArgumentParser(description='Redis Load Test') parser.add_argument('--operation', choices=['create', 'update', 'delete', 'all'], default='all', help='Operation to test (default: all)') parser.add_argument('--threads', type=int, default=100, help='Number of threads to use (default: 100)') parser.add_argument('--keys', type=int, default=100, help='Number of keys per thread (default: 100)') args = parser.parse_args() global NUM_THREADS, NUM_KEYS_PER_THREAD NUM_THREADS = args.threads NUM_KEYS_PER_THREAD = args.keys try: # Test Redis connection client = get_redis_connection() logger.info(f"Successfully connected to Redis at {REDIS_HOST}:{REDIS_PORT}") # Run tests if args.operation == 'all': results = run_all_tests() print_summary(results) else: result = run_test(args.operation) print_summary([result]) except Exception as e: logger.error(f"Test failed: {e}") return 1 return 0 if __name__ == "__main__": exit(main())