# Initialize the MongoDB handler with your configuration from Controllers.Mongo.database import mongo_handler from datetime import datetime def cleanup_test_data(): """Clean up test data from all collections.""" collections = ["users", "products", "orders", "sales"] for collection_name in collections: with mongo_handler.collection(collection_name) as collection: collection.delete_many({}) def test_basic_crud_operations(): """Test basic CRUD operations on users collection.""" print("\nTesting basic CRUD operations...") try: with mongo_handler.collection("users") as users_collection: # Insert multiple documents users_collection.insert_many([ {"username": "john", "email": "john@example.com", "role": "user"}, {"username": "jane", "email": "jane@example.com", "role": "admin"}, {"username": "bob", "email": "bob@example.com", "role": "user"} ]) # Find with multiple conditions admin_users = list(users_collection.find({"role": "admin"})) # Update multiple documents update_result = users_collection.update_many( {"role": "user"}, {"$set": {"last_login": datetime.now().isoformat()}} ) # Delete documents delete_result = users_collection.delete_many({"username": "bob"}) success = ( len(admin_users) == 1 and admin_users[0]["username"] == "jane" and update_result.modified_count == 2 and delete_result.deleted_count == 1 ) print(f"Test {'passed' if success else 'failed'}") return success except Exception as e: print(f"Test failed with exception: {e}") return False def test_nested_documents(): """Test operations with nested documents in products collection.""" print("\nTesting nested documents...") try: with mongo_handler.collection("products") as products_collection: # Insert a product with nested data products_collection.insert_one({ "name": "Laptop", "price": 999.99, "specs": { "cpu": "Intel i7", "ram": "16GB", "storage": "512GB SSD" }, "in_stock": True, "tags": ["electronics", "computers", "laptops"] }) # Find with nested field query laptop = products_collection.find_one({"specs.cpu": "Intel i7"}) # Update nested field update_result = products_collection.update_one( {"name": "Laptop"}, {"$set": {"specs.ram": "32GB"}} ) # Verify the update updated_laptop = products_collection.find_one({"name": "Laptop"}) success = ( laptop is not None and laptop["specs"]["ram"] == "16GB" and update_result.modified_count == 1 and updated_laptop["specs"]["ram"] == "32GB" ) print(f"Test {'passed' if success else 'failed'}") return success except Exception as e: print(f"Test failed with exception: {e}") return False def test_array_operations(): """Test operations with arrays in orders collection.""" print("\nTesting array operations...") try: with mongo_handler.collection("orders") as orders_collection: # Insert an order with array of items orders_collection.insert_one({ "order_id": "ORD001", "customer": "john", "items": [ {"product": "Laptop", "quantity": 1}, {"product": "Mouse", "quantity": 2} ], "total": 1099.99, "status": "pending" }) # Find orders containing specific items laptop_orders = list(orders_collection.find({"items.product": "Laptop"})) # Update array elements update_result = orders_collection.update_one( {"order_id": "ORD001"}, {"$push": {"items": {"product": "Keyboard", "quantity": 1}}} ) # Verify the update updated_order = orders_collection.find_one({"order_id": "ORD001"}) success = ( len(laptop_orders) == 1 and update_result.modified_count == 1 and len(updated_order["items"]) == 3 and updated_order["items"][-1]["product"] == "Keyboard" ) print(f"Test {'passed' if success else 'failed'}") return success except Exception as e: print(f"Test failed with exception: {e}") return False def test_aggregation(): """Test aggregation operations on sales collection.""" print("\nTesting aggregation operations...") try: with mongo_handler.collection("sales") as sales_collection: # Insert sample sales data sales_collection.insert_many([ {"product": "Laptop", "amount": 999.99, "date": datetime.now()}, {"product": "Mouse", "amount": 29.99, "date": datetime.now()}, {"product": "Keyboard", "amount": 59.99, "date": datetime.now()} ]) # Calculate total sales by product pipeline = [ {"$group": {"_id": "$product", "total": {"$sum": "$amount"}}} ] sales_summary = list(sales_collection.aggregate(pipeline)) success = ( len(sales_summary) == 3 and any(item["_id"] == "Laptop" and item["total"] == 999.99 for item in sales_summary) and any(item["_id"] == "Mouse" and item["total"] == 29.99 for item in sales_summary) and any(item["_id"] == "Keyboard" and item["total"] == 59.99 for item in sales_summary) ) print(f"Test {'passed' if success else 'failed'}") return success except Exception as e: print(f"Test failed with exception: {e}") return False def test_index_operations(): """Test index creation and unique constraints.""" print("\nTesting index operations...") try: with mongo_handler.collection("users") as users_collection: # Create indexes users_collection.create_index("email", unique=True) users_collection.create_index([("username", 1), ("role", 1)]) # Insert initial document users_collection.insert_one({"username": "test_user", "email": "test@example.com"}) # Try to insert duplicate email (should fail) try: users_collection.insert_one({"username": "test_user2", "email": "test@example.com"}) success = False # Should not reach here except Exception: success = True print(f"Test {'passed' if success else 'failed'}") return success except Exception as e: print(f"Test failed with exception: {e}") return False def test_complex_queries(): """Test complex queries with multiple conditions.""" print("\nTesting complex queries...") try: with mongo_handler.collection("products") as products_collection: # Insert test data products_collection.insert_many([ { "name": "Expensive Laptop", "price": 999.99, "tags": ["electronics", "computers"], "in_stock": True }, { "name": "Cheap Mouse", "price": 29.99, "tags": ["electronics", "peripherals"], "in_stock": True } ]) # Find products with price range and specific tags expensive_electronics = list(products_collection.find({ "price": {"$gt": 500}, "tags": {"$in": ["electronics"]}, "in_stock": True })) # Update with multiple conditions update_result = products_collection.update_many( { "price": {"$lt": 100}, "in_stock": True }, { "$set": {"discount": 0.1}, "$inc": {"price": -10} } ) # Verify the update updated_product = products_collection.find_one({"name": "Cheap Mouse"}) success = ( len(expensive_electronics) == 1 and expensive_electronics[0]["name"] == "Expensive Laptop" and update_result.modified_count == 1 and updated_product["price"] == 19.99 and updated_product["discount"] == 0.1 ) print(f"Test {'passed' if success else 'failed'}") return success except Exception as e: print(f"Test failed with exception: {e}") return False def run_all_tests(): """Run all MongoDB tests and report results.""" print("Starting MongoDB tests...") # Clean up any existing test data before starting cleanup_test_data() tests = [ test_basic_crud_operations, test_nested_documents, test_array_operations, test_aggregation, test_index_operations, test_complex_queries ] passed_list, not_passed_list = [], [] passed, failed = 0, 0 for test in tests: # Clean up test data before each test cleanup_test_data() try: if test(): passed += 1 passed_list.append(f"Test {test.__name__} passed") else: failed += 1 not_passed_list.append(f"Test {test.__name__} failed") except Exception as e: print(f"Test {test.__name__} failed with exception: {e}") failed += 1 not_passed_list.append(f"Test {test.__name__} failed") print(f"\nTest Results: {passed} passed, {failed} failed") print('Passed Tests:') print("\n".join(passed_list)) print('Failed Tests:') print("\n".join(not_passed_list)) return passed, failed if __name__ == "__main__": run_all_tests()