wag-managment-api-service-v.../Services/MongoDb/Models/actions.py

146 lines
4.2 KiB
Python

"""
This module contains the MongoActions class, which provides methods for
performing actions on the MongoDB database.
Api Mongo functions in general retrieves 2 params which are
companyUUID and Storage Reason
"""
from typing import Optional, Dict, Any, List
from pymongo import MongoClient
from pymongo.collection import Collection
from Services.MongoDb.Models.mixins import (
MongoUpdateMixin,
MongoInsertMixin,
MongoFindMixin,
MongoDeleteMixin,
MongoAggregateMixin,
)
from Services.MongoDb.Models.exceptions import (
MongoDocumentNotFoundError,
MongoDuplicateKeyError,
MongoValidationError,
MongoConnectionError,
)
class MongoActions(
MongoUpdateMixin,
MongoInsertMixin,
MongoFindMixin,
MongoDeleteMixin,
MongoAggregateMixin
):
"""Main MongoDB actions class that inherits all CRUD operation mixins.
This class provides a unified interface for all MongoDB operations while
managing collections based on company UUID and storage reason.
"""
def __init__(
self,
client: MongoClient,
database: str,
company_uuid: str,
storage_reason: str
):
"""Initialize MongoDB actions with client and collection info.
Args:
client: MongoDB client
database: Database name to use
company_uuid: Company UUID for collection naming
storage_reason: Storage reason for collection naming
"""
self._client = client
self._database = database
self._company_uuid = company_uuid
self._storage_reason = storage_reason
self._collection = None
self.use_collection(storage_reason)
def use_collection(self, storage_reason: str) -> None:
"""Switch to a different collection.
Args:
storage_reason: New storage reason for collection naming
"""
collection_name = f"{self._company_uuid}*{storage_reason}"
self._collection = self._client[self._database][collection_name]
@property
def collection(self) -> Collection:
"""Get current MongoDB collection."""
return self._collection
def insert_one(self, document: Dict[str, Any]):
"""Insert a single document."""
return super().insert_one(self.collection, document)
def insert_many(self, documents: List[Dict[str, Any]]):
"""Insert multiple documents."""
return super().insert_many(self.collection, documents)
def find_one(self, filter_query: Dict[str, Any], projection: Optional[Dict[str, Any]] = None):
"""Find a single document."""
return super().find_one(self.collection, filter_query, projection)
def find_many(
self,
filter_query: Dict[str, Any],
projection: Optional[Dict[str, Any]] = None,
sort: Optional[List[tuple]] = None,
limit: Optional[int] = None,
skip: Optional[int] = None,
):
"""Find multiple documents."""
return super().find_many(
self.collection,
filter_query,
projection,
sort,
limit,
skip
)
def update_one(
self,
filter_query: Dict[str, Any],
update_data: Dict[str, Any],
upsert: bool = False,
):
"""Update a single document."""
return super().update_one(
self.collection,
filter_query,
update_data,
upsert
)
def update_many(
self,
filter_query: Dict[str, Any],
update_data: Dict[str, Any],
upsert: bool = False,
):
"""Update multiple documents."""
return super().update_many(
self.collection,
filter_query,
update_data,
upsert
)
def delete_one(self, filter_query: Dict[str, Any]):
"""Delete a single document."""
return super().delete_one(self.collection, filter_query)
def delete_many(self, filter_query: Dict[str, Any]):
"""Delete multiple documents."""
return super().delete_many(self.collection, filter_query)
def aggregate(self, pipeline: List[Dict[str, Any]]):
"""Execute an aggregation pipeline."""
return super().aggregate(self.collection, pipeline)