mongo-service/load_test.py

105 lines
3.3 KiB
Python

#!/usr/bin/env python3
"""
MongoDB Load Test Script
This script tests MongoDB performance with multiple concurrent connections
by running the example script in multiple threads.
"""
import time
import threading
import concurrent.futures
from datetime import datetime
import sys
import os
# Import the main function from our example script
from python_example import main as run_mongodb_operations
# Configuration
NUM_THREADS = 100 # Number of concurrent connections
TIMEOUT = 120 # Maximum time to wait for all threads (seconds)
def run_thread(thread_id):
"""
Function to run in each thread.
Args:
thread_id: The thread identifier
Returns:
Tuple of (thread_id, success, elapsed_time)
"""
start_time = time.time()
try:
# Run the MongoDB operations
success = run_mongodb_operations(thread_id)
elapsed = time.time() - start_time
return (thread_id, success, elapsed)
except Exception as e:
elapsed = time.time() - start_time
print(f"[Thread {thread_id}] ❌ Error: {e}")
return (thread_id, False, elapsed)
def main():
"""Main function to run the load test."""
print(f"🚀 Starting MongoDB load test with {NUM_THREADS} concurrent connections")
print(f"⏱️ Timeout set to {TIMEOUT} seconds")
print("-" * 80)
start_time = time.time()
# Results tracking
successful_threads = 0
failed_threads = 0
times = []
# Use a thread pool to manage the concurrent connections
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
# Submit all threads
future_to_thread = {
executor.submit(run_thread, i): i for i in range(NUM_THREADS)
}
# Process results as they complete
for future in concurrent.futures.as_completed(future_to_thread, timeout=TIMEOUT):
thread_id = future_to_thread[future]
try:
thread_id, success, elapsed = future.result()
times.append(elapsed)
if success:
successful_threads += 1
else:
failed_threads += 1
except Exception as e:
print(f"[Thread {thread_id}] ❌ Generated an exception: {e}")
failed_threads += 1
# Calculate statistics
total_time = time.time() - start_time
avg_time = sum(times) / len(times) if times else 0
min_time = min(times) if times else 0
max_time = max(times) if times else 0
# Print results
print("\n" + "=" * 80)
print(f"📊 MongoDB Load Test Results:")
print(f" Total threads: {NUM_THREADS}")
print(f" Successful threads: {successful_threads}")
print(f" Failed threads: {failed_threads}")
print(f" Total test time: {total_time:.2f} seconds")
print(f" Average thread time: {avg_time:.2f} seconds")
print(f" Minimum thread time: {min_time:.2f} seconds")
print(f" Maximum thread time: {max_time:.2f} seconds")
print(f" Throughput: {successful_threads / total_time:.2f} operations/second")
print("=" * 80)
# Return exit code based on success
return 0 if failed_threads == 0 else 1
if __name__ == "__main__":
sys.exit(main())