wag-managment-api-service-l.../Services/MongoDb/database.py

193 lines
6.0 KiB
Python

"""
MongoDB database connection and operations.
This module provides MongoDB connection management with:
1. Connection pooling
2. Lifecycle management
3. Error handling
"""
from typing import Optional, Dict, Any, List, Union, Callable
from contextlib import contextmanager
from pymongo import MongoClient
from pymongo.results import InsertOneResult, DeleteResult, UpdateResult
from pymongo.cursor import Cursor
from functools import wraps
from AllConfigs.NoSqlDatabase.configs import MongoConfig
class MongoInsertMixin:
"""Mixin for MongoDB insert operations."""
def insert_one(self, document: Dict[str, Any]) -> InsertOneResult:
"""Insert a single document."""
return self.collection.insert_one(document)
def insert_many(self, documents: List[Dict[str, Any]]) -> List[InsertOneResult]:
"""Insert multiple documents."""
return self.collection.insert_many(documents)
class MongoFindMixin:
"""Mixin for MongoDB find operations."""
def find_one(self, filter_query: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Find a single document."""
return self.collection.find_one(filter_query)
def find_many(self, filter_query: Dict[str, Any]) -> Cursor:
"""Find multiple documents."""
return self.collection.find(filter_query)
class MongoUpdateMixin:
"""Mixin for MongoDB update operations."""
def update_one(
self, filter_query: Dict[str, Any], update: Dict[str, Any]
) -> UpdateResult:
"""Update a single document."""
return self.collection.update_one(filter_query, update)
def update_many(
self, filter_query: Dict[str, Any], update: Dict[str, Any]
) -> UpdateResult:
"""Update multiple documents."""
return self.collection.update_many(filter_query, update)
class MongoDeleteMixin:
"""Mixin for MongoDB delete operations."""
def delete_one(self, filter_query: Dict[str, Any]) -> DeleteResult:
"""Delete a single document."""
return self.collection.delete_one(filter_query)
def delete_many(self, filter_query: Dict[str, Any]) -> DeleteResult:
"""Delete multiple documents."""
return self.collection.delete_many(filter_query)
class MongoAggregateMixin:
"""Mixin for MongoDB aggregate operations."""
def aggregate(self, pipeline: List[Dict[str, Any]]) -> Cursor:
"""Execute an aggregation pipeline."""
return self.collection.aggregate(pipeline)
class MongoDBHandler(
MongoInsertMixin,
MongoFindMixin,
MongoUpdateMixin,
MongoDeleteMixin,
MongoAggregateMixin,
):
"""Handler for MongoDB operations with connection management."""
_instance = None
_client: Optional[MongoClient] = None
def __new__(cls):
"""Implement singleton pattern for database connection."""
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""Initialize MongoDB connection if not already initialized."""
if not self._client:
# Build connection options
connection_kwargs = {
"host": MongoConfig.URL,
"maxPoolSize": 50, # Maximum number of connections in the pool
"minPoolSize": 10, # Minimum number of connections in the pool
"maxIdleTimeMS": 30000, # Maximum time a connection can be idle (30 seconds)
"waitQueueTimeoutMS": 2000, # How long a thread will wait for a connection
"serverSelectionTimeoutMS": 5000, # How long to wait for server selection
}
self._client = MongoClient(**connection_kwargs)
# Test connection
self._client.admin.command("ping")
def __enter__(self):
"""Context manager entry point."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit point - ensures connection is properly closed."""
try:
if self._client:
self._client.close()
self._client = None
except Exception:
# Silently pass any errors during shutdown
pass
return False # Don't suppress any exceptions
def close(self):
"""Close MongoDB connection."""
try:
if self._client:
self._client.close()
self._client = None
except Exception:
# Silently pass any errors during shutdown
pass
@property
def client(self) -> MongoClient:
"""Get MongoDB client."""
return self._client
def get_database(self, database_name: str = None):
"""Get MongoDB database."""
db_name = database_name or MongoConfig.DATABASE_NAME
return self._client[db_name]
def get_collection(self, collection_name: str, database_name: str = None):
"""Get MongoDB collection."""
database = self.get_database(database_name)
return database[collection_name]
# Create a function to get the singleton instance
@classmethod
@contextmanager
def get_mongodb(cls):
"""Get or create the MongoDB singleton instance as a context manager."""
instance = cls()
try:
yield instance
finally:
try:
if instance._client:
instance._client.close()
instance._client = None
except Exception:
# Silently pass any errors during shutdown
pass
@classmethod
def with_mongodb(cls, func: Callable):
"""Decorator to automatically handle MongoDB connection context.
Usage:
@MongoDBHandler.with_mongodb
def my_function(db, *args, **kwargs):
# db is the MongoDB instance
pass
"""
@wraps(func)
def wrapper(*args, **kwargs):
with cls.get_mongodb() as db:
return func(db, *args, **kwargs)
return wrapper
# Create a singleton instance for backward compatibility
mongodb = MongoDBHandler()