redis-service/redis_load_test.py

262 lines
7.9 KiB
Python

#!/usr/bin/env python3
"""
Redis Load Test Script
This script tests Redis performance by running multiple operations (create, update, delete)
across 100 concurrent threads.
"""
import os
import time
import random
import string
import threading
import logging
import argparse
from concurrent.futures import ThreadPoolExecutor
from dotenv import load_dotenv
import redis
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("redis_test.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Load environment variables from .env file
load_dotenv()
# Redis connection parameters
REDIS_HOST = os.getenv("REDIS_HOST", "10.10.2.15")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", "your_strong_password_here")
REDIS_DB = int(os.getenv("REDIS_DB", 0))
# Test parameters
NUM_THREADS = 100
NUM_KEYS_PER_THREAD = 100
KEY_PREFIX = "test_key_"
VALUE_SIZE = 1024 # Size of random string value in bytes
# Statistics
success_count = 0
failure_count = 0
lock = threading.Lock()
def get_redis_connection():
"""Create and return a Redis connection"""
try:
client = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
password=REDIS_PASSWORD,
db=REDIS_DB,
socket_timeout=5,
decode_responses=True
)
# Test connection
client.ping()
return client
except redis.ConnectionError as e:
logger.error(f"Failed to connect to Redis: {e}")
raise
def generate_random_string(size):
"""Generate a random string of specified size"""
return ''.join(random.choices(string.ascii_letters + string.digits, k=size))
def create_keys(thread_id):
"""Create multiple Redis keys with random values"""
client = get_redis_connection()
global success_count, failure_count
keys_created = 0
start_time = time.time()
for i in range(NUM_KEYS_PER_THREAD):
key = f"{KEY_PREFIX}{thread_id}_{i}"
value = generate_random_string(VALUE_SIZE)
try:
client.set(key, value)
keys_created += 1
except Exception as e:
logger.error(f"Thread {thread_id} - Failed to create key {key}: {e}")
with lock:
failure_count += 1
duration = time.time() - start_time
logger.info(f"Thread {thread_id} - Created {keys_created} keys in {duration:.2f} seconds")
with lock:
success_count += keys_created
return keys_created
def update_keys(thread_id):
"""Update existing Redis keys with new values"""
client = get_redis_connection()
global success_count, failure_count
keys_updated = 0
start_time = time.time()
for i in range(NUM_KEYS_PER_THREAD):
key = f"{KEY_PREFIX}{thread_id}_{i}"
# Check if key exists
if client.exists(key):
new_value = generate_random_string(VALUE_SIZE)
try:
client.set(key, new_value)
keys_updated += 1
except Exception as e:
logger.error(f"Thread {thread_id} - Failed to update key {key}: {e}")
with lock:
failure_count += 1
duration = time.time() - start_time
logger.info(f"Thread {thread_id} - Updated {keys_updated} keys in {duration:.2f} seconds")
with lock:
success_count += keys_updated
return keys_updated
def delete_keys(thread_id):
"""Delete Redis keys created by this thread"""
client = get_redis_connection()
global success_count, failure_count
keys_deleted = 0
start_time = time.time()
for i in range(NUM_KEYS_PER_THREAD):
key = f"{KEY_PREFIX}{thread_id}_{i}"
try:
if client.delete(key):
keys_deleted += 1
except Exception as e:
logger.error(f"Thread {thread_id} - Failed to delete key {key}: {e}")
with lock:
failure_count += 1
duration = time.time() - start_time
logger.info(f"Thread {thread_id} - Deleted {keys_deleted} keys in {duration:.2f} seconds")
with lock:
success_count += keys_deleted
return keys_deleted
def run_test(operation):
"""Run the specified operation across multiple threads"""
global success_count, failure_count
success_count = 0
failure_count = 0
logger.info(f"Starting {operation} test with {NUM_THREADS} threads")
start_time = time.time()
with ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
if operation == "create":
results = list(executor.map(create_keys, range(NUM_THREADS)))
elif operation == "update":
results = list(executor.map(update_keys, range(NUM_THREADS)))
elif operation == "delete":
results = list(executor.map(delete_keys, range(NUM_THREADS)))
duration = time.time() - start_time
total_operations = sum(results)
ops_per_second = total_operations / duration if duration > 0 else 0
logger.info(f"Test completed: {operation}")
logger.info(f"Total operations: {total_operations}")
logger.info(f"Successful operations: {success_count}")
logger.info(f"Failed operations: {failure_count}")
logger.info(f"Duration: {duration:.2f} seconds")
logger.info(f"Operations per second: {ops_per_second:.2f}")
return {
"operation": operation,
"total_operations": total_operations,
"successful_operations": success_count,
"failed_operations": failure_count,
"duration": duration,
"operations_per_second": ops_per_second
}
def run_all_tests():
"""Run all tests (create, update, delete) in sequence"""
results = []
# Create test
results.append(run_test("create"))
# Update test
results.append(run_test("update"))
# Delete test
results.append(run_test("delete"))
return results
def print_summary(results):
"""Print a summary of all test results"""
logger.info("\n===== TEST SUMMARY =====")
for result in results:
logger.info(f"\nOperation: {result['operation'].upper()}")
logger.info(f"Total operations: {result['total_operations']}")
logger.info(f"Successful operations: {result['successful_operations']}")
logger.info(f"Failed operations: {result['failed_operations']}")
logger.info(f"Duration: {result['duration']:.2f} seconds")
logger.info(f"Operations per second: {result['operations_per_second']:.2f}")
logger.info("\n=========================")
def main():
"""Main function to parse arguments and run tests"""
parser = argparse.ArgumentParser(description='Redis Load Test')
parser.add_argument('--operation', choices=['create', 'update', 'delete', 'all'],
default='all', help='Operation to test (default: all)')
parser.add_argument('--threads', type=int, default=100,
help='Number of threads to use (default: 100)')
parser.add_argument('--keys', type=int, default=100,
help='Number of keys per thread (default: 100)')
args = parser.parse_args()
global NUM_THREADS, NUM_KEYS_PER_THREAD
NUM_THREADS = args.threads
NUM_KEYS_PER_THREAD = args.keys
try:
# Test Redis connection
client = get_redis_connection()
logger.info(f"Successfully connected to Redis at {REDIS_HOST}:{REDIS_PORT}")
# Run tests
if args.operation == 'all':
results = run_all_tests()
print_summary(results)
else:
result = run_test(args.operation)
print_summary([result])
except Exception as e:
logger.error(f"Test failed: {e}")
return 1
return 0
if __name__ == "__main__":
exit(main())