mongo-service/python_example.py

153 lines
4.7 KiB
Python

#!/usr/bin/env python3
"""
Example Python script to connect to MongoDB and perform basic operations.
"""
import os
import sys
from datetime import datetime
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, OperationFailure
# MongoDB connection details
MONGO_HOST = os.environ.get("MONGO_HOST", "localhost") # Replace with your MongoDB host
MONGO_PORT = int(os.environ.get("MONGO_PORT", "27017"))
MONGO_USERNAME = os.environ.get(
"MONGO_USERNAME", "admin"
) # Replace with your MongoDB username
MONGO_PASSWORD = os.environ.get(
"MONGO_PASSWORD", "password"
) # Replace with your MongoDB password
MONGO_AUTH_DB = os.environ.get("MONGO_AUTH_DB", "admin")
def connect_to_mongodb():
"""Connect to MongoDB and return the client object."""
try:
# Create a MongoDB client with authentication
connection_string = f"mongodb://{MONGO_USERNAME}:{MONGO_PASSWORD}@{MONGO_HOST}:{MONGO_PORT}/?authSource={MONGO_AUTH_DB}"
client = MongoClient(connection_string)
# Check if the connection is successful
client.admin.command("ping")
print(f"✅ Successfully connected to MongoDB at {MONGO_HOST}:{MONGO_PORT}")
return client
except ConnectionFailure as e:
print(f"❌ Failed to connect to MongoDB: {e}")
sys.exit(1)
except OperationFailure as e:
print(f"❌ Authentication failed: {e}")
sys.exit(1)
def list_databases(client):
"""List all databases in the MongoDB instance."""
try:
databases = client.list_database_names()
print("\n📚 Available databases:")
for db in databases:
print(f" - {db}")
return databases
except Exception as e:
print(f"❌ Error listing databases: {e}")
return []
def create_sample_data(client, database_name="sample_db", collection_name="users"):
"""Create a sample database and collection with some data."""
try:
db = client[database_name]
collection = db[collection_name]
# Sample user data
users = [
{
"name": "John Doe",
"email": "john.doe@example.com",
"age": 30,
"created_at": datetime.now(),
},
{
"name": "Jane Smith",
"email": "jane.smith@example.com",
"age": 25,
"created_at": datetime.now(),
},
{
"name": "Bob Johnson",
"email": "bob.johnson@example.com",
"age": 35,
"created_at": datetime.now(),
},
]
# Insert the users
result = collection.insert_many(users)
print(
f"\n✅ Successfully inserted {len(result.inserted_ids)} documents into {database_name}.{collection_name}"
)
# Count documents in the collection
count = collection.count_documents({})
print(f"📊 Total documents in {collection_name}: {count}")
return True
except Exception as e:
print(f"❌ Error creating sample data: {e}")
return False
def query_data(client, database_name="sample_db", collection_name="users"):
"""Query and display data from the collection."""
try:
db = client[database_name]
collection = db[collection_name]
# Find all users
print("\n🔍 All users:")
for user in collection.find():
print(f" - {user['name']} ({user['email']}), Age: {user['age']}")
# Find users older than 30
print("\n🔍 Users older than 30:")
for user in collection.find({"age": {"$gt": 30}}):
print(f" - {user['name']} ({user['email']}), Age: {user['age']}")
return True
except Exception as e:
print(f"❌ Error querying data: {e}")
return False
def main(thread_id=None):
"""Main function to demonstrate MongoDB operations.
Args:
thread_id: Optional thread identifier for concurrent testing
"""
prefix = f"[Thread {thread_id}] " if thread_id is not None else ""
print(f"{prefix}🔄 Connecting to MongoDB...")
client = connect_to_mongodb()
# List databases
list_databases(client)
# Create sample data with unique collection name if threading
collection_suffix = f"_thread_{thread_id}" if thread_id is not None else ""
print(f"\n{prefix}🔄 Creating sample data...")
create_sample_data(client, collection_name=f"users{collection_suffix}")
# Query data
print(f"\n{prefix}🔄 Querying data...")
query_data(client, collection_name=f"users{collection_suffix}")
# Close the connection
client.close()
print(f"\n{prefix}👋 Connection closed")
return True
if __name__ == "__main__":
main()