prod-wag-backend-automate-s.../Controllers/Mongo/implementations.py

308 lines
11 KiB
Python

# Initialize the MongoDB handler with your configuration
from Controllers.Mongo.database import mongo_handler
from datetime import datetime
def cleanup_test_data():
"""Clean up test data from all collections."""
collections = ["users", "products", "orders", "sales"]
for collection_name in collections:
with mongo_handler.collection(collection_name) as collection:
collection.delete_many({})
def test_basic_crud_operations():
"""Test basic CRUD operations on users collection."""
print("\nTesting basic CRUD operations...")
try:
with mongo_handler.collection("users") as users_collection:
# Insert multiple documents
users_collection.insert_many(
[
{"username": "john", "email": "john@example.com", "role": "user"},
{"username": "jane", "email": "jane@example.com", "role": "admin"},
{"username": "bob", "email": "bob@example.com", "role": "user"},
]
)
# Find with multiple conditions
admin_users = list(users_collection.find({"role": "admin"}))
# Update multiple documents
update_result = users_collection.update_many(
{"role": "user"}, {"$set": {"last_login": datetime.now().isoformat()}}
)
# Delete documents
delete_result = users_collection.delete_many({"username": "bob"})
success = (
len(admin_users) == 1
and admin_users[0]["username"] == "jane"
and update_result.modified_count == 2
and delete_result.deleted_count == 1
)
print(f"Test {'passed' if success else 'failed'}")
return success
except Exception as e:
print(f"Test failed with exception: {e}")
return False
def test_nested_documents():
"""Test operations with nested documents in products collection."""
print("\nTesting nested documents...")
try:
with mongo_handler.collection("products") as products_collection:
# Insert a product with nested data
products_collection.insert_one(
{
"name": "Laptop",
"price": 999.99,
"specs": {"cpu": "Intel i7", "ram": "16GB", "storage": "512GB SSD"},
"in_stock": True,
"tags": ["electronics", "computers", "laptops"],
}
)
# Find with nested field query
laptop = products_collection.find_one({"specs.cpu": "Intel i7"})
# Update nested field
update_result = products_collection.update_one(
{"name": "Laptop"}, {"$set": {"specs.ram": "32GB"}}
)
# Verify the update
updated_laptop = products_collection.find_one({"name": "Laptop"})
success = (
laptop is not None
and laptop["specs"]["ram"] == "16GB"
and update_result.modified_count == 1
and updated_laptop["specs"]["ram"] == "32GB"
)
print(f"Test {'passed' if success else 'failed'}")
return success
except Exception as e:
print(f"Test failed with exception: {e}")
return False
def test_array_operations():
"""Test operations with arrays in orders collection."""
print("\nTesting array operations...")
try:
with mongo_handler.collection("orders") as orders_collection:
# Insert an order with array of items
orders_collection.insert_one(
{
"order_id": "ORD001",
"customer": "john",
"items": [
{"product": "Laptop", "quantity": 1},
{"product": "Mouse", "quantity": 2},
],
"total": 1099.99,
"status": "pending",
}
)
# Find orders containing specific items
laptop_orders = list(orders_collection.find({"items.product": "Laptop"}))
# Update array elements
update_result = orders_collection.update_one(
{"order_id": "ORD001"},
{"$push": {"items": {"product": "Keyboard", "quantity": 1}}},
)
# Verify the update
updated_order = orders_collection.find_one({"order_id": "ORD001"})
success = (
len(laptop_orders) == 1
and update_result.modified_count == 1
and len(updated_order["items"]) == 3
and updated_order["items"][-1]["product"] == "Keyboard"
)
print(f"Test {'passed' if success else 'failed'}")
return success
except Exception as e:
print(f"Test failed with exception: {e}")
return False
def test_aggregation():
"""Test aggregation operations on sales collection."""
print("\nTesting aggregation operations...")
try:
with mongo_handler.collection("sales") as sales_collection:
# Insert sample sales data
sales_collection.insert_many(
[
{"product": "Laptop", "amount": 999.99, "date": datetime.now()},
{"product": "Mouse", "amount": 29.99, "date": datetime.now()},
{"product": "Keyboard", "amount": 59.99, "date": datetime.now()},
]
)
# Calculate total sales by product
pipeline = [{"$group": {"_id": "$product", "total": {"$sum": "$amount"}}}]
sales_summary = list(sales_collection.aggregate(pipeline))
success = (
len(sales_summary) == 3
and any(
item["_id"] == "Laptop" and item["total"] == 999.99
for item in sales_summary
)
and any(
item["_id"] == "Mouse" and item["total"] == 29.99
for item in sales_summary
)
and any(
item["_id"] == "Keyboard" and item["total"] == 59.99
for item in sales_summary
)
)
print(f"Test {'passed' if success else 'failed'}")
return success
except Exception as e:
print(f"Test failed with exception: {e}")
return False
def test_index_operations():
"""Test index creation and unique constraints."""
print("\nTesting index operations...")
try:
with mongo_handler.collection("users") as users_collection:
# Create indexes
users_collection.create_index("email", unique=True)
users_collection.create_index([("username", 1), ("role", 1)])
# Insert initial document
users_collection.insert_one(
{"username": "test_user", "email": "test@example.com"}
)
# Try to insert duplicate email (should fail)
try:
users_collection.insert_one(
{"username": "test_user2", "email": "test@example.com"}
)
success = False # Should not reach here
except Exception:
success = True
print(f"Test {'passed' if success else 'failed'}")
return success
except Exception as e:
print(f"Test failed with exception: {e}")
return False
def test_complex_queries():
"""Test complex queries with multiple conditions."""
print("\nTesting complex queries...")
try:
with mongo_handler.collection("products") as products_collection:
# Insert test data
products_collection.insert_many(
[
{
"name": "Expensive Laptop",
"price": 999.99,
"tags": ["electronics", "computers"],
"in_stock": True,
},
{
"name": "Cheap Mouse",
"price": 29.99,
"tags": ["electronics", "peripherals"],
"in_stock": True,
},
]
)
# Find products with price range and specific tags
expensive_electronics = list(
products_collection.find(
{
"price": {"$gt": 500},
"tags": {"$in": ["electronics"]},
"in_stock": True,
}
)
)
# Update with multiple conditions
update_result = products_collection.update_many(
{"price": {"$lt": 100}, "in_stock": True},
{"$set": {"discount": 0.1}, "$inc": {"price": -10}},
)
# Verify the update
updated_product = products_collection.find_one({"name": "Cheap Mouse"})
success = (
len(expensive_electronics) == 1
and expensive_electronics[0]["name"] == "Expensive Laptop"
and update_result.modified_count == 1
and updated_product["price"] == 19.99
and updated_product["discount"] == 0.1
)
print(f"Test {'passed' if success else 'failed'}")
return success
except Exception as e:
print(f"Test failed with exception: {e}")
return False
def run_all_tests():
"""Run all MongoDB tests and report results."""
print("Starting MongoDB tests...")
# Clean up any existing test data before starting
cleanup_test_data()
tests = [
test_basic_crud_operations,
test_nested_documents,
test_array_operations,
test_aggregation,
test_index_operations,
test_complex_queries,
]
passed_list, not_passed_list = [], []
passed, failed = 0, 0
for test in tests:
# Clean up test data before each test
cleanup_test_data()
try:
if test():
passed += 1
passed_list.append(f"Test {test.__name__} passed")
else:
failed += 1
not_passed_list.append(f"Test {test.__name__} failed")
except Exception as e:
print(f"Test {test.__name__} failed with exception: {e}")
failed += 1
not_passed_list.append(f"Test {test.__name__} failed")
print(f"\nTest Results: {passed} passed, {failed} failed")
print("Passed Tests:")
print("\n".join(passed_list))
print("Failed Tests:")
print("\n".join(not_passed_list))
return passed, failed
if __name__ == "__main__":
run_all_tests()