285 lines
9.7 KiB
Python
285 lines
9.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
MongoDB Load Test - All-in-One Script
|
|
|
|
This script combines MongoDB operations and load testing in a single file.
|
|
It creates multiple threads to test MongoDB's performance under concurrent load.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import threading
|
|
import concurrent.futures
|
|
from datetime import datetime
|
|
from pymongo import MongoClient
|
|
from pymongo.errors import ConnectionFailure, OperationFailure
|
|
|
|
# ===== Configuration =====
|
|
# MongoDB connection details
|
|
MONGO_HOST = os.environ.get("MONGO_HOST", "localhost")
|
|
MONGO_PORT = int(os.environ.get("MONGO_PORT", "27017"))
|
|
MONGO_USERNAME = os.environ.get("MONGO_USERNAME", "admin")
|
|
MONGO_PASSWORD = os.environ.get("MONGO_PASSWORD", "password")
|
|
MONGO_AUTH_DB = os.environ.get("MONGO_AUTH_DB", "admin")
|
|
|
|
# Load test settings
|
|
NUM_THREADS = 100 # Number of concurrent connections
|
|
TIMEOUT = 120 # Maximum time to wait for all threads (seconds)
|
|
|
|
# ===== MongoDB Operations =====
|
|
def connect_to_mongodb():
|
|
"""Connect to MongoDB and return the client object."""
|
|
try:
|
|
# Create a MongoDB client with authentication
|
|
connection_string = f"mongodb://{MONGO_USERNAME}:{MONGO_PASSWORD}@{MONGO_HOST}:{MONGO_PORT}/?authSource={MONGO_AUTH_DB}"
|
|
client = MongoClient(connection_string)
|
|
|
|
# Check if the connection is successful
|
|
client.admin.command("ping")
|
|
return client
|
|
except ConnectionFailure as e:
|
|
print(f"❌ Failed to connect to MongoDB: {e}")
|
|
raise
|
|
except OperationFailure as e:
|
|
print(f"❌ Authentication failed: {e}")
|
|
raise
|
|
|
|
|
|
def list_databases(client, thread_id=None):
|
|
"""List all databases in the MongoDB instance."""
|
|
prefix = f"[Thread {thread_id}] " if thread_id is not None else ""
|
|
try:
|
|
databases = client.list_database_names()
|
|
if thread_id is None: # Only print for main thread or when no threading
|
|
print(f"\n{prefix}📚 Available databases:")
|
|
for db in databases:
|
|
print(f" - {db}")
|
|
return databases
|
|
except Exception as e:
|
|
print(f"{prefix}❌ Error listing databases: {e}")
|
|
return []
|
|
|
|
|
|
def create_sample_data(client, database_name="sample_db", collection_name="users", thread_id=None):
|
|
"""Create a sample database and collection with some data."""
|
|
prefix = f"[Thread {thread_id}] " if thread_id is not None else ""
|
|
try:
|
|
db = client[database_name]
|
|
collection = db[collection_name]
|
|
|
|
# Sample user data
|
|
users = [
|
|
{
|
|
"name": f"John Doe {thread_id or ''}",
|
|
"email": f"john.doe{thread_id or ''}@example.com",
|
|
"age": 30,
|
|
"created_at": datetime.now(),
|
|
},
|
|
{
|
|
"name": f"Jane Smith {thread_id or ''}",
|
|
"email": f"jane.smith{thread_id or ''}@example.com",
|
|
"age": 25,
|
|
"created_at": datetime.now(),
|
|
},
|
|
{
|
|
"name": f"Bob Johnson {thread_id or ''}",
|
|
"email": f"bob.johnson{thread_id or ''}@example.com",
|
|
"age": 35,
|
|
"created_at": datetime.now(),
|
|
},
|
|
]
|
|
|
|
# Insert the users
|
|
result = collection.insert_many(users)
|
|
print(
|
|
f"{prefix}✅ Inserted {len(result.inserted_ids)} documents into {database_name}.{collection_name}"
|
|
)
|
|
|
|
# Count documents in the collection
|
|
count = collection.count_documents({})
|
|
print(f"{prefix}📊 Total documents in {collection_name}: {count}")
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(f"{prefix}❌ Error creating sample data: {e}")
|
|
return False
|
|
|
|
|
|
def query_data(client, database_name="sample_db", collection_name="users", thread_id=None):
|
|
"""Query and display data from the collection."""
|
|
prefix = f"[Thread {thread_id}] " if thread_id is not None else ""
|
|
try:
|
|
db = client[database_name]
|
|
collection = db[collection_name]
|
|
|
|
# Find all users
|
|
if thread_id is None: # Only print details for main thread
|
|
print(f"{prefix}🔍 All users:")
|
|
for user in collection.find():
|
|
print(f" - {user['name']} ({user['email']}), Age: {user['age']}")
|
|
|
|
# Find users older than 30
|
|
print(f"{prefix}🔍 Users older than 30:")
|
|
for user in collection.find({"age": {"$gt": 30}}):
|
|
print(f" - {user['name']} ({user['email']}), Age: {user['age']}")
|
|
else:
|
|
# For threads, just count to reduce output noise
|
|
all_users = collection.count_documents({})
|
|
older_users = collection.count_documents({"age": {"$gt": 30}})
|
|
print(f"{prefix}🔍 Found {all_users} users, {older_users} older than 30")
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(f"{prefix}❌ Error querying data: {e}")
|
|
return False
|
|
|
|
|
|
def run_mongodb_operations(thread_id=None):
|
|
"""Run a complete set of MongoDB operations."""
|
|
prefix = f"[Thread {thread_id}] " if thread_id is not None else ""
|
|
try:
|
|
print(f"{prefix}🔄 Connecting to MongoDB...")
|
|
client = connect_to_mongodb()
|
|
|
|
# List databases (only in main thread to reduce output)
|
|
if thread_id is None or thread_id == 0:
|
|
list_databases(client, thread_id)
|
|
|
|
# Create sample data with unique collection name if threading
|
|
collection_suffix = f"_thread_{thread_id}" if thread_id is not None else ""
|
|
create_sample_data(
|
|
client,
|
|
database_name="sample_db",
|
|
collection_name=f"users{collection_suffix}",
|
|
thread_id=thread_id
|
|
)
|
|
|
|
# Query data
|
|
query_data(
|
|
client,
|
|
database_name="sample_db",
|
|
collection_name=f"users{collection_suffix}",
|
|
thread_id=thread_id
|
|
)
|
|
|
|
# Close the connection
|
|
client.close()
|
|
print(f"{prefix}👋 Connection closed")
|
|
return True
|
|
except Exception as e:
|
|
print(f"{prefix}❌ Error in MongoDB operations: {e}")
|
|
return False
|
|
|
|
|
|
# ===== Load Testing =====
|
|
def run_thread(thread_id):
|
|
"""
|
|
Function to run in each thread.
|
|
|
|
Args:
|
|
thread_id: The thread identifier
|
|
|
|
Returns:
|
|
Tuple of (thread_id, success, elapsed_time)
|
|
"""
|
|
start_time = time.time()
|
|
try:
|
|
# Run the MongoDB operations
|
|
success = run_mongodb_operations(thread_id)
|
|
elapsed = time.time() - start_time
|
|
return (thread_id, success, elapsed)
|
|
except Exception as e:
|
|
elapsed = time.time() - start_time
|
|
print(f"[Thread {thread_id}] ❌ Error: {e}")
|
|
return (thread_id, False, elapsed)
|
|
|
|
|
|
def run_load_test(num_threads=NUM_THREADS, timeout=TIMEOUT):
|
|
"""Run the load test with multiple threads."""
|
|
print(f"🚀 Starting MongoDB load test with {num_threads} concurrent connections")
|
|
print(f"⏱️ Timeout set to {timeout} seconds")
|
|
print("-" * 80)
|
|
|
|
start_time = time.time()
|
|
|
|
# Results tracking
|
|
successful_threads = 0
|
|
failed_threads = 0
|
|
times = []
|
|
|
|
# Use a thread pool to manage the concurrent connections
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
|
|
# Submit all threads
|
|
future_to_thread = {
|
|
executor.submit(run_thread, i): i for i in range(num_threads)
|
|
}
|
|
|
|
# Process results as they complete
|
|
for future in concurrent.futures.as_completed(future_to_thread, timeout=timeout):
|
|
thread_id = future_to_thread[future]
|
|
try:
|
|
thread_id, success, elapsed = future.result()
|
|
times.append(elapsed)
|
|
if success:
|
|
successful_threads += 1
|
|
else:
|
|
failed_threads += 1
|
|
except Exception as e:
|
|
print(f"[Thread {thread_id}] ❌ Generated an exception: {e}")
|
|
failed_threads += 1
|
|
|
|
# Calculate statistics
|
|
total_time = time.time() - start_time
|
|
avg_time = sum(times) / len(times) if times else 0
|
|
min_time = min(times) if times else 0
|
|
max_time = max(times) if times else 0
|
|
|
|
# Print results
|
|
print("\n" + "=" * 80)
|
|
print(f"📊 MongoDB Load Test Results:")
|
|
print(f" Total threads: {num_threads}")
|
|
print(f" Successful threads: {successful_threads}")
|
|
print(f" Failed threads: {failed_threads}")
|
|
print(f" Total test time: {total_time:.2f} seconds")
|
|
print(f" Average thread time: {avg_time:.2f} seconds")
|
|
print(f" Minimum thread time: {min_time:.2f} seconds")
|
|
print(f" Maximum thread time: {max_time:.2f} seconds")
|
|
print(f" Throughput: {successful_threads / total_time:.2f} operations/second")
|
|
print("=" * 80)
|
|
|
|
# Return exit code based on success
|
|
return 0 if failed_threads == 0 else 1
|
|
|
|
|
|
def main():
|
|
"""Main function to run either a single test or load test."""
|
|
if len(sys.argv) > 1 and sys.argv[1] == "--single":
|
|
# Run a single MongoDB operation test
|
|
print("🔄 Running single MongoDB test...")
|
|
success = run_mongodb_operations()
|
|
return 0 if success else 1
|
|
else:
|
|
# Run the load test
|
|
return run_load_test()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|
|
|
|
|
|
|
|
"""
|
|
================================================================================
|
|
📊 MongoDB Load Test Results:
|
|
Total threads: 100
|
|
Successful threads: 100
|
|
Failed threads: 0
|
|
Total test time: 0.99 seconds
|
|
Average thread time: 0.62 seconds
|
|
Minimum thread time: 0.13 seconds
|
|
Maximum thread time: 0.74 seconds
|
|
Throughput: 100.71 operations/second
|
|
================================================================================ Enough for a simple mongo server that handles maximum connection of 100
|
|
"""
|