#!/usr/bin/env python3 """ Example Python script to connect to MongoDB and perform basic operations. """ import os import sys from datetime import datetime from pymongo import MongoClient from pymongo.errors import ConnectionFailure, OperationFailure # MongoDB connection details MONGO_HOST = os.environ.get("MONGO_HOST", "localhost") # Replace with your MongoDB host MONGO_PORT = int(os.environ.get("MONGO_PORT", "27017")) MONGO_USERNAME = os.environ.get( "MONGO_USERNAME", "admin" ) # Replace with your MongoDB username MONGO_PASSWORD = os.environ.get( "MONGO_PASSWORD", "password" ) # Replace with your MongoDB password MONGO_AUTH_DB = os.environ.get("MONGO_AUTH_DB", "admin") def connect_to_mongodb(): """Connect to MongoDB and return the client object.""" try: # Create a MongoDB client with authentication connection_string = f"mongodb://{MONGO_USERNAME}:{MONGO_PASSWORD}@{MONGO_HOST}:{MONGO_PORT}/?authSource={MONGO_AUTH_DB}" client = MongoClient(connection_string) # Check if the connection is successful client.admin.command("ping") print(f"āœ… Successfully connected to MongoDB at {MONGO_HOST}:{MONGO_PORT}") return client except ConnectionFailure as e: print(f"āŒ Failed to connect to MongoDB: {e}") sys.exit(1) except OperationFailure as e: print(f"āŒ Authentication failed: {e}") sys.exit(1) def list_databases(client): """List all databases in the MongoDB instance.""" try: databases = client.list_database_names() print("\nšŸ“š Available databases:") for db in databases: print(f" - {db}") return databases except Exception as e: print(f"āŒ Error listing databases: {e}") return [] def create_sample_data(client, database_name="sample_db", collection_name="users"): """Create a sample database and collection with some data.""" try: db = client[database_name] collection = db[collection_name] # Sample user data users = [ { "name": "John Doe", "email": "john.doe@example.com", "age": 30, "created_at": datetime.now(), }, { "name": "Jane Smith", "email": "jane.smith@example.com", "age": 25, "created_at": datetime.now(), }, { "name": "Bob Johnson", "email": "bob.johnson@example.com", "age": 35, "created_at": datetime.now(), }, ] # Insert the users result = collection.insert_many(users) print( f"\nāœ… Successfully inserted {len(result.inserted_ids)} documents into {database_name}.{collection_name}" ) # Count documents in the collection count = collection.count_documents({}) print(f"šŸ“Š Total documents in {collection_name}: {count}") return True except Exception as e: print(f"āŒ Error creating sample data: {e}") return False def query_data(client, database_name="sample_db", collection_name="users"): """Query and display data from the collection.""" try: db = client[database_name] collection = db[collection_name] # Find all users print("\nšŸ” All users:") for user in collection.find(): print(f" - {user['name']} ({user['email']}), Age: {user['age']}") # Find users older than 30 print("\nšŸ” Users older than 30:") for user in collection.find({"age": {"$gt": 30}}): print(f" - {user['name']} ({user['email']}), Age: {user['age']}") return True except Exception as e: print(f"āŒ Error querying data: {e}") return False def main(thread_id=None): """Main function to demonstrate MongoDB operations. Args: thread_id: Optional thread identifier for concurrent testing """ prefix = f"[Thread {thread_id}] " if thread_id is not None else "" print(f"{prefix}šŸ”„ Connecting to MongoDB...") client = connect_to_mongodb() # List databases list_databases(client) # Create sample data with unique collection name if threading collection_suffix = f"_thread_{thread_id}" if thread_id is not None else "" print(f"\n{prefix}šŸ”„ Creating sample data...") create_sample_data(client, collection_name=f"users{collection_suffix}") # Query data print(f"\n{prefix}šŸ”„ Querying data...") query_data(client, collection_name=f"users{collection_suffix}") # Close the connection client.close() print(f"\n{prefix}šŸ‘‹ Connection closed") return True if __name__ == "__main__": main()