mongo-service/mongodb_load_tester.py

285 lines
9.7 KiB
Python

#!/usr/bin/env python3
"""
MongoDB Load Test - All-in-One Script
This script combines MongoDB operations and load testing in a single file.
It creates multiple threads to test MongoDB's performance under concurrent load.
"""
import os
import sys
import time
import threading
import concurrent.futures
from datetime import datetime
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, OperationFailure
# ===== Configuration =====
# MongoDB connection details
MONGO_HOST = os.environ.get("MONGO_HOST", "localhost")
MONGO_PORT = int(os.environ.get("MONGO_PORT", "27017"))
MONGO_USERNAME = os.environ.get("MONGO_USERNAME", "admin")
MONGO_PASSWORD = os.environ.get("MONGO_PASSWORD", "password")
MONGO_AUTH_DB = os.environ.get("MONGO_AUTH_DB", "admin")
# Load test settings
NUM_THREADS = 100 # Number of concurrent connections
TIMEOUT = 120 # Maximum time to wait for all threads (seconds)
# ===== MongoDB Operations =====
def connect_to_mongodb():
"""Connect to MongoDB and return the client object."""
try:
# Create a MongoDB client with authentication
connection_string = f"mongodb://{MONGO_USERNAME}:{MONGO_PASSWORD}@{MONGO_HOST}:{MONGO_PORT}/?authSource={MONGO_AUTH_DB}"
client = MongoClient(connection_string)
# Check if the connection is successful
client.admin.command("ping")
return client
except ConnectionFailure as e:
print(f"❌ Failed to connect to MongoDB: {e}")
raise
except OperationFailure as e:
print(f"❌ Authentication failed: {e}")
raise
def list_databases(client, thread_id=None):
"""List all databases in the MongoDB instance."""
prefix = f"[Thread {thread_id}] " if thread_id is not None else ""
try:
databases = client.list_database_names()
if thread_id is None: # Only print for main thread or when no threading
print(f"\n{prefix}📚 Available databases:")
for db in databases:
print(f" - {db}")
return databases
except Exception as e:
print(f"{prefix}❌ Error listing databases: {e}")
return []
def create_sample_data(client, database_name="sample_db", collection_name="users", thread_id=None):
"""Create a sample database and collection with some data."""
prefix = f"[Thread {thread_id}] " if thread_id is not None else ""
try:
db = client[database_name]
collection = db[collection_name]
# Sample user data
users = [
{
"name": f"John Doe {thread_id or ''}",
"email": f"john.doe{thread_id or ''}@example.com",
"age": 30,
"created_at": datetime.now(),
},
{
"name": f"Jane Smith {thread_id or ''}",
"email": f"jane.smith{thread_id or ''}@example.com",
"age": 25,
"created_at": datetime.now(),
},
{
"name": f"Bob Johnson {thread_id or ''}",
"email": f"bob.johnson{thread_id or ''}@example.com",
"age": 35,
"created_at": datetime.now(),
},
]
# Insert the users
result = collection.insert_many(users)
print(
f"{prefix}✅ Inserted {len(result.inserted_ids)} documents into {database_name}.{collection_name}"
)
# Count documents in the collection
count = collection.count_documents({})
print(f"{prefix}📊 Total documents in {collection_name}: {count}")
return True
except Exception as e:
print(f"{prefix}❌ Error creating sample data: {e}")
return False
def query_data(client, database_name="sample_db", collection_name="users", thread_id=None):
"""Query and display data from the collection."""
prefix = f"[Thread {thread_id}] " if thread_id is not None else ""
try:
db = client[database_name]
collection = db[collection_name]
# Find all users
if thread_id is None: # Only print details for main thread
print(f"{prefix}🔍 All users:")
for user in collection.find():
print(f" - {user['name']} ({user['email']}), Age: {user['age']}")
# Find users older than 30
print(f"{prefix}🔍 Users older than 30:")
for user in collection.find({"age": {"$gt": 30}}):
print(f" - {user['name']} ({user['email']}), Age: {user['age']}")
else:
# For threads, just count to reduce output noise
all_users = collection.count_documents({})
older_users = collection.count_documents({"age": {"$gt": 30}})
print(f"{prefix}🔍 Found {all_users} users, {older_users} older than 30")
return True
except Exception as e:
print(f"{prefix}❌ Error querying data: {e}")
return False
def run_mongodb_operations(thread_id=None):
"""Run a complete set of MongoDB operations."""
prefix = f"[Thread {thread_id}] " if thread_id is not None else ""
try:
print(f"{prefix}🔄 Connecting to MongoDB...")
client = connect_to_mongodb()
# List databases (only in main thread to reduce output)
if thread_id is None or thread_id == 0:
list_databases(client, thread_id)
# Create sample data with unique collection name if threading
collection_suffix = f"_thread_{thread_id}" if thread_id is not None else ""
create_sample_data(
client,
database_name="sample_db",
collection_name=f"users{collection_suffix}",
thread_id=thread_id
)
# Query data
query_data(
client,
database_name="sample_db",
collection_name=f"users{collection_suffix}",
thread_id=thread_id
)
# Close the connection
client.close()
print(f"{prefix}👋 Connection closed")
return True
except Exception as e:
print(f"{prefix}❌ Error in MongoDB operations: {e}")
return False
# ===== Load Testing =====
def run_thread(thread_id):
"""
Function to run in each thread.
Args:
thread_id: The thread identifier
Returns:
Tuple of (thread_id, success, elapsed_time)
"""
start_time = time.time()
try:
# Run the MongoDB operations
success = run_mongodb_operations(thread_id)
elapsed = time.time() - start_time
return (thread_id, success, elapsed)
except Exception as e:
elapsed = time.time() - start_time
print(f"[Thread {thread_id}] ❌ Error: {e}")
return (thread_id, False, elapsed)
def run_load_test(num_threads=NUM_THREADS, timeout=TIMEOUT):
"""Run the load test with multiple threads."""
print(f"🚀 Starting MongoDB load test with {num_threads} concurrent connections")
print(f"⏱️ Timeout set to {timeout} seconds")
print("-" * 80)
start_time = time.time()
# Results tracking
successful_threads = 0
failed_threads = 0
times = []
# Use a thread pool to manage the concurrent connections
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
# Submit all threads
future_to_thread = {
executor.submit(run_thread, i): i for i in range(num_threads)
}
# Process results as they complete
for future in concurrent.futures.as_completed(future_to_thread, timeout=timeout):
thread_id = future_to_thread[future]
try:
thread_id, success, elapsed = future.result()
times.append(elapsed)
if success:
successful_threads += 1
else:
failed_threads += 1
except Exception as e:
print(f"[Thread {thread_id}] ❌ Generated an exception: {e}")
failed_threads += 1
# Calculate statistics
total_time = time.time() - start_time
avg_time = sum(times) / len(times) if times else 0
min_time = min(times) if times else 0
max_time = max(times) if times else 0
# Print results
print("\n" + "=" * 80)
print(f"📊 MongoDB Load Test Results:")
print(f" Total threads: {num_threads}")
print(f" Successful threads: {successful_threads}")
print(f" Failed threads: {failed_threads}")
print(f" Total test time: {total_time:.2f} seconds")
print(f" Average thread time: {avg_time:.2f} seconds")
print(f" Minimum thread time: {min_time:.2f} seconds")
print(f" Maximum thread time: {max_time:.2f} seconds")
print(f" Throughput: {successful_threads / total_time:.2f} operations/second")
print("=" * 80)
# Return exit code based on success
return 0 if failed_threads == 0 else 1
def main():
"""Main function to run either a single test or load test."""
if len(sys.argv) > 1 and sys.argv[1] == "--single":
# Run a single MongoDB operation test
print("🔄 Running single MongoDB test...")
success = run_mongodb_operations()
return 0 if success else 1
else:
# Run the load test
return run_load_test()
if __name__ == "__main__":
sys.exit(main())
"""
================================================================================
📊 MongoDB Load Test Results:
Total threads: 100
Successful threads: 100
Failed threads: 0
Total test time: 0.99 seconds
Average thread time: 0.62 seconds
Minimum thread time: 0.13 seconds
Maximum thread time: 0.74 seconds
Throughput: 100.71 operations/second
================================================================================ Enough for a simple mongo server that handles maximum connection of 100
"""