diff --git a/.idea/vcs.xml b/.idea/vcs.xml
index 33f261a..94a25f7 100644
--- a/.idea/vcs.xml
+++ b/.idea/vcs.xml
@@ -2,6 +2,5 @@
-
\ No newline at end of file
diff --git a/AllConfigs/main.py b/AllConfigs/main.py
index a1cf139..1c68cc4 100644
--- a/AllConfigs/main.py
+++ b/AllConfigs/main.py
@@ -1,4 +1,3 @@
-
class HostConfig:
MAIN_HOST = "10.10.2.36" # http://10.10.2.36
EMAIL_HOST = "10.10.2.34" # http://10.10.2.34
diff --git a/ApiLibrary/__init__.py b/ApiLibrary/__init__.py
index 7165020..b85b70c 100644
--- a/ApiLibrary/__init__.py
+++ b/ApiLibrary/__init__.py
@@ -1,3 +1,9 @@
from ApiLibrary.date_time_actions.date_functions import DateTimeLocal, system_arrow
+from ApiLibrary.extensions.select import SelectActionWithEmployee, SelectAction
-__all__ = ["DateTimeLocal", "system_arrow"]
+__all__ = [
+ "DateTimeLocal",
+ "system_arrow",
+ "SelectActionWithEmployee",
+ "SelectAction",
+]
diff --git a/ApiLibrary/extensions/select.py b/ApiLibrary/extensions/select.py
new file mode 100644
index 0000000..b94da17
--- /dev/null
+++ b/ApiLibrary/extensions/select.py
@@ -0,0 +1,76 @@
+class SelectorsBase:
+ @classmethod
+ def add_confirmed_filter(cls, first_table, second_table) -> tuple:
+ return (
+ first_table.active == True,
+ first_table.is_confirmed == True,
+ first_table.deleted == False,
+ second_table.active == True,
+ second_table.is_confirmed == True,
+ second_table.deleted == False,
+ )
+
+
+class SelectActionWithEmployee:
+
+ @classmethod
+ def select_action(cls, employee_id, filter_expr: list = None):
+ if filter_expr is not None:
+ filter_expr = (cls.__many__table__.employee_id == employee_id, *filter_expr)
+ data = (
+ cls.session.query(cls.id)
+ .select_from(cls)
+ .join(cls.__many__table__, cls.__many__table__.member_id == cls.id)
+ .filter(
+ *filter_expr,
+ *SelectorsBase.add_confirmed_filter(
+ first_table=cls, second_table=cls.__many__table__
+ ),
+ )
+ )
+ return cls.query.filter(cls.id.in_([comp[0] for comp in data.all()]))
+ data = (
+ cls.session.query(cls.id)
+ .select_from(cls)
+ .join(cls.__many__table__, cls.__many__table__.member_id == cls.id)
+ .filter(
+ cls.__many__table__.employee_id == employee_id,
+ *SelectorsBase.add_confirmed_filter(
+ first_table=cls, second_table=cls.__many__table__
+ ),
+ )
+ )
+ return cls.query.filter(cls.id.in_([comp[0] for comp in data.all()]))
+
+
+class SelectAction:
+
+ @classmethod
+ def select_action(cls, duty_id_list: list, filter_expr: list = None):
+ if filter_expr is not None:
+ data = (
+ cls.session.query(cls.id)
+ .select_from(cls)
+ .join(cls.__many__table__, cls.__many__table__.member_id == cls.id)
+ .filter(
+ cls.__many__table__.duties_id.in_(duty_id_list),
+ *SelectorsBase.add_confirmed_filter(
+ first_table=cls, second_table=cls.__many__table__
+ ),
+ *filter_expr,
+ )
+ )
+ return cls.query.filter(cls.id.in_([comp[0] for comp in data.all()]))
+
+ data = (
+ cls.session.query(cls.id)
+ .select_from(cls)
+ .join(cls.__many__table__, cls.__many__table__.member_id == cls.id)
+ .filter(
+ cls.__many__table__.duties_id.in_(duty_id_list),
+ *SelectorsBase.add_confirmed_filter(
+ first_table=cls, second_table=cls.__many__table__
+ ),
+ )
+ )
+ return cls.query.filter(cls.id.in_([comp[0] for comp in data.all()]))
diff --git a/ApiLibrary/token/password_module.py b/ApiLibrary/token/password_module.py
new file mode 100644
index 0000000..d2ebbf0
--- /dev/null
+++ b/ApiLibrary/token/password_module.py
@@ -0,0 +1,43 @@
+import hashlib
+import uuid
+import secrets
+import random
+
+from AllConfigs.Token.config import Auth
+
+
+class PasswordModule:
+
+ @staticmethod
+ def generate_random_uu_id(str_std: bool = True):
+ return str(uuid.uuid4()) if str_std else uuid.uuid4()
+
+ @staticmethod
+ def generate_token(length=32):
+ letters = "abcdefghijklmnopqrstuvwxyz"
+ merged_letters = [letter for letter in letters] + [
+ letter.upper() for letter in letters
+ ]
+ token_generated = secrets.token_urlsafe(length)
+ for i in str(token_generated):
+ if i not in merged_letters:
+ token_generated = token_generated.replace(
+ i, random.choice(merged_letters), 1
+ )
+ return token_generated
+
+ @staticmethod
+ def generate_access_token():
+ return secrets.token_urlsafe(Auth.ACCESS_TOKEN_LENGTH)
+
+ @staticmethod
+ def generate_refresher_token():
+ return secrets.token_urlsafe(Auth.REFRESHER_TOKEN_LENGTH)
+
+ @staticmethod
+ def create_hashed_password(domain: str, id_: str, password: str):
+ return hashlib.sha256(f"{domain}:{id_}:{password}".encode("utf-8")).hexdigest()
+
+ @classmethod
+ def check_password(cls, domain, id_, password, password_hashed):
+ return cls.create_hashed_password(domain, id_, password) == password_hashed
diff --git a/DockerApiServices/steps.txt b/DockerApiServices/steps.txt
new file mode 100644
index 0000000..5f88adf
--- /dev/null
+++ b/DockerApiServices/steps.txt
@@ -0,0 +1,12 @@
+What to do with services?
+
+*
+
+*
+
+*
+
+*
+
+*
+
diff --git a/ErrorHandlers/Exceptions/api_exc.py b/ErrorHandlers/Exceptions/api_exc.py
index 251e1e2..2373dd9 100644
--- a/ErrorHandlers/Exceptions/api_exc.py
+++ b/ErrorHandlers/Exceptions/api_exc.py
@@ -1,5 +1,3 @@
-
-
class HTTPExceptionApi(Exception):
def __init__(self, error_code: str, lang: str):
diff --git a/Schemas/building/build.py b/Schemas/building/build.py
index 4b63095..2ee022c 100644
--- a/Schemas/building/build.py
+++ b/Schemas/building/build.py
@@ -10,13 +10,12 @@ from sqlalchemy import (
ForeignKey,
Index,
TIMESTAMP,
- func,
Text,
Numeric,
or_,
)
-from ApiLibrary.date_time_actions.date_functions import system_arrow
+from ApiLibrary import system_arrow, SelectActionWithEmployee
from Services.PostgresDb import CrudCollection
from ApiValidations.Request import (
InsertBuild,
@@ -24,8 +23,8 @@ from ApiValidations.Request import (
InsertBuildLivingSpace,
UpdateBuild,
)
-# from databases.extensions.selector_classes import SelectActionWithEmployee
-# from api_objects.auth.token_objects import EmployeeTokenObject, OccupantTokenObject
+
+from ApiValidations.Custom.token_objects import EmployeeTokenObject, OccupantTokenObject
from LanguageModels.Database.building.build import (
BuildTypesLanguageModel,
Part2EmployeeLanguageModel,
diff --git a/Schemas/identity/identity.py b/Schemas/identity/identity.py
index 365782b..8446bbc 100644
--- a/Schemas/identity/identity.py
+++ b/Schemas/identity/identity.py
@@ -20,6 +20,7 @@ from ApiLibrary.date_time_actions.date_functions import system_arrow
from AllConfigs.Token.config import Auth, ApiStatic
from Services.PostgresDb import CrudCollection
+
# from databases.extensions import SelectAction, SelectActionWithEmployee
# from databases.extensions.auth import UserLoginModule
from ApiValidations.Request import InsertUsers, InsertPerson
diff --git a/Services/MongoDb/Models/action_models/base.py b/Services/MongoDb/Models/action_models/base.py
new file mode 100644
index 0000000..566afde
--- /dev/null
+++ b/Services/MongoDb/Models/action_models/base.py
@@ -0,0 +1,111 @@
+"""Base models for MongoDB documents."""
+
+from typing import Any, Dict, Optional, Union
+from bson import ObjectId
+from pydantic import BaseModel, ConfigDict, Field, model_validator
+from pydantic.json_schema import JsonSchemaValue
+from pydantic_core import CoreSchema, core_schema
+
+from ApiLibrary import system_arrow
+
+
+class PyObjectId(ObjectId):
+ """Custom type for handling MongoDB ObjectId in Pydantic models."""
+
+ @classmethod
+ def __get_pydantic_core_schema__(
+ cls,
+ _source_type: Any,
+ _handler: Any,
+ ) -> CoreSchema:
+ """Define the core schema for PyObjectId."""
+ return core_schema.json_or_python_schema(
+ json_schema=core_schema.str_schema(),
+ python_schema=core_schema.union_schema([
+ core_schema.is_instance_schema(ObjectId),
+ core_schema.chain_schema([
+ core_schema.str_schema(),
+ core_schema.no_info_plain_validator_function(cls.validate),
+ ]),
+ ]),
+ serialization=core_schema.plain_serializer_function_ser_schema(
+ lambda x: str(x),
+ return_schema=core_schema.str_schema(),
+ when_used='json',
+ ),
+ )
+
+ @classmethod
+ def validate(cls, value: Any) -> ObjectId:
+ """Validate and convert the value to ObjectId."""
+ if not ObjectId.is_valid(value):
+ raise ValueError("Invalid ObjectId")
+ return ObjectId(value)
+
+ @classmethod
+ def __get_pydantic_json_schema__(
+ cls,
+ _core_schema: CoreSchema,
+ _handler: Any,
+ ) -> JsonSchemaValue:
+ """Define the JSON schema for PyObjectId."""
+ return {"type": "string"}
+
+
+class MongoBaseModel(BaseModel):
+ """Base model for all MongoDB documents."""
+
+ model_config = ConfigDict(
+ arbitrary_types_allowed=True,
+ json_encoders={ObjectId: str},
+ populate_by_name=True,
+ from_attributes=True,
+ validate_assignment=True,
+ extra='allow'
+ )
+
+ # Optional _id field that will be ignored in create operations
+ id: Optional[PyObjectId] = Field(None, alias="_id")
+
+ def get_extra(self, field_name: str, default: Any = None) -> Any:
+ """Safely get extra field value.
+
+ Args:
+ field_name: Name of the extra field to retrieve
+ default: Default value to return if field doesn't exist
+
+ Returns:
+ Value of the extra field if it exists, otherwise the default value
+ """
+ return getattr(self, field_name, default)
+
+ def as_dict(self) -> Dict[str, Any]:
+ """Convert model to dictionary including all fields and extra fields.
+
+ Returns:
+ Dict containing all model fields and extra fields with proper type conversion
+ """
+ return self.model_dump(by_alias=True)
+
+
+class MongoDocument(MongoBaseModel):
+ """Base document model with timestamps."""
+
+ created_at: float = Field(default_factory=lambda: system_arrow.now().timestamp())
+ updated_at: float = Field(default_factory=lambda: system_arrow.now().timestamp())
+
+ @model_validator(mode='before')
+ @classmethod
+ def prevent_protected_fields(cls, data: Any) -> Any:
+ """Prevent user from setting protected fields like _id and timestamps."""
+ if isinstance(data, dict):
+ # Remove protected fields from input
+ data.pop('_id', None)
+ data.pop('created_at', None)
+ data.pop('updated_at', None)
+
+ # Set timestamps
+ data['created_at'] = system_arrow.now().timestamp()
+ data['updated_at'] = system_arrow.now().timestamp()
+
+ return data
diff --git a/Services/MongoDb/Models/action_models/domain.py b/Services/MongoDb/Models/action_models/domain.py
new file mode 100644
index 0000000..2f5e58c
--- /dev/null
+++ b/Services/MongoDb/Models/action_models/domain.py
@@ -0,0 +1,78 @@
+"""
+MongoDB Domain Models.
+
+This module provides Pydantic models for domain management,
+including domain history and access details.
+"""
+
+from datetime import datetime
+from typing import Any, Dict, List, Optional
+from pydantic import Field, model_validator
+
+from ApiLibrary import system_arrow
+from Services.MongoDb.Models.action_models.base import MongoBaseModel, MongoDocument
+
+
+class DomainData(MongoBaseModel):
+ """Model for domain data.
+
+ Attributes:
+ user_uu_id: Unique identifier of the user
+ main_domain: Primary domain
+ other_domains_list: List of additional domains
+ extra_data: Additional domain-related data
+ """
+
+ user_uu_id: str = Field(..., description="User's unique identifier")
+ main_domain: str = Field(..., description="Primary domain")
+ other_domains_list: List[str] = Field(
+ default_factory=list,
+ description="List of additional domains"
+ )
+ extra_data: Optional[Dict[str, Any]] = Field(
+ default_factory=dict,
+ alias="extraData",
+ description="Additional domain-related data"
+ )
+
+ class Config:
+ from_attributes = True
+ populate_by_name = True
+ validate_assignment = True
+
+
+class DomainDocument(MongoDocument):
+ """Model for domain-related documents."""
+
+ data: DomainData = Field(..., description="Domain data")
+
+ def update_main_domain(self, new_domain: str) -> None:
+ """Update the main domain and move current to history.
+
+ Args:
+ new_domain: New main domain to set
+ """
+ if self.data.main_domain and self.data.main_domain != new_domain:
+ if self.data.main_domain not in self.data.other_domains_list:
+ self.data.other_domains_list.append(self.data.main_domain)
+ self.data.main_domain = new_domain
+
+
+class DomainDocumentCreate(MongoDocument):
+ """Model for creating new domain documents."""
+ data: DomainData = Field(..., description="Initial domain data")
+
+ class Config:
+ from_attributes = True
+ populate_by_name = True
+ validate_assignment = True
+
+
+class DomainDocumentUpdate(MongoDocument):
+ """Model for updating existing domain documents."""
+ data: DomainData = Field(..., description="Updated domain data")
+
+ class Config:
+ from_attributes = True
+ populate_by_name = True
+ validate_assignment = True
diff --git a/Services/MongoDb/Models/action_models/password.py b/Services/MongoDb/Models/action_models/password.py
new file mode 100644
index 0000000..5f7609c
--- /dev/null
+++ b/Services/MongoDb/Models/action_models/password.py
@@ -0,0 +1,50 @@
+"""
+MongoDB Password Models.
+
+This module provides Pydantic models for password management,
+including password history and access details.
+"""
+
+from datetime import datetime
+from typing import Any, Dict, List, Optional
+from pydantic import Field
+
+from ApiLibrary import system_arrow
+from Services.MongoDb.Models.action_models.base import MongoBaseModel, MongoDocument
+
+
+class PasswordHistoryDetail(MongoBaseModel):
+ """Model for password history details."""
+
+ timestamp: datetime
+ ip_address: Optional[str] = Field(None, alias="ipAddress")
+ user_agent: Optional[str] = Field(None, alias="userAgent")
+ location: Optional[Dict[str, Any]] = None
+
+
+class PasswordHistoryData(MongoBaseModel):
+ """Model for password history data."""
+
+ password_history: List[str] = Field([], alias="passwordHistory")
+ access_history_detail: Dict[str, PasswordHistoryDetail] = Field(
+ default_factory=dict,
+ alias="accessHistoryDetail"
+ )
+
+
+class PasswordDocument(MongoDocument):
+ """Model for password-related documents."""
+
+ data: PasswordHistoryData
+
+
+class PasswordDocumentCreate(MongoBaseModel):
+ """Model for creating new password documents."""
+
+ data: PasswordHistoryData = Field(..., description="Initial password data")
+
+
+class PasswordDocumentUpdate(MongoBaseModel):
+ """Model for updating existing password documents."""
+
+ data: PasswordHistoryData
diff --git a/Services/MongoDb/Models/actions.py b/Services/MongoDb/Models/actions.py
new file mode 100644
index 0000000..b7be0c1
--- /dev/null
+++ b/Services/MongoDb/Models/actions.py
@@ -0,0 +1,145 @@
+"""
+This module contains the MongoActions class, which provides methods for
+performing actions on the MongoDB database.
+Api Mongo functions in general retrieves 2 params which are
+companyUUID and Storage Reason
+"""
+
+from typing import Optional, Dict, Any, List
+
+from pymongo import MongoClient
+from pymongo.collection import Collection
+
+from Services.MongoDb.Models.mixins import (
+ MongoUpdateMixin,
+ MongoInsertMixin,
+ MongoFindMixin,
+ MongoDeleteMixin,
+ MongoAggregateMixin,
+)
+from Services.MongoDb.Models.exceptions import (
+ MongoDocumentNotFoundError,
+ MongoDuplicateKeyError,
+ MongoValidationError,
+ MongoConnectionError,
+)
+
+
+class MongoActions(
+ MongoUpdateMixin,
+ MongoInsertMixin,
+ MongoFindMixin,
+ MongoDeleteMixin,
+ MongoAggregateMixin
+):
+ """Main MongoDB actions class that inherits all CRUD operation mixins.
+
+ This class provides a unified interface for all MongoDB operations while
+ managing collections based on company UUID and storage reason.
+ """
+
+ def __init__(
+ self,
+ client: MongoClient,
+ database: str,
+ company_uuid: str,
+ storage_reason: str
+ ):
+ """Initialize MongoDB actions with client and collection info.
+
+ Args:
+ client: MongoDB client
+ database: Database name to use
+ company_uuid: Company UUID for collection naming
+ storage_reason: Storage reason for collection naming
+ """
+ self._client = client
+ self._database = database
+ self._company_uuid = company_uuid
+ self._storage_reason = storage_reason
+ self._collection = None
+ self.use_collection(storage_reason)
+
+ def use_collection(self, storage_reason: str) -> None:
+ """Switch to a different collection.
+
+ Args:
+ storage_reason: New storage reason for collection naming
+ """
+ collection_name = f"{self._company_uuid}*{storage_reason}"
+ self._collection = self._client[self._database][collection_name]
+
+ @property
+ def collection(self) -> Collection:
+ """Get current MongoDB collection."""
+ return self._collection
+
+ def insert_one(self, document: Dict[str, Any]):
+ """Insert a single document."""
+ return super().insert_one(self.collection, document)
+
+ def insert_many(self, documents: List[Dict[str, Any]]):
+ """Insert multiple documents."""
+ return super().insert_many(self.collection, documents)
+
+ def find_one(self, filter_query: Dict[str, Any], projection: Optional[Dict[str, Any]] = None):
+ """Find a single document."""
+ return super().find_one(self.collection, filter_query, projection)
+
+ def find_many(
+ self,
+ filter_query: Dict[str, Any],
+ projection: Optional[Dict[str, Any]] = None,
+ sort: Optional[List[tuple]] = None,
+ limit: Optional[int] = None,
+ skip: Optional[int] = None,
+ ):
+ """Find multiple documents."""
+ return super().find_many(
+ self.collection,
+ filter_query,
+ projection,
+ sort,
+ limit,
+ skip
+ )
+
+ def update_one(
+ self,
+ filter_query: Dict[str, Any],
+ update_data: Dict[str, Any],
+ upsert: bool = False,
+ ):
+ """Update a single document."""
+ return super().update_one(
+ self.collection,
+ filter_query,
+ update_data,
+ upsert
+ )
+
+ def update_many(
+ self,
+ filter_query: Dict[str, Any],
+ update_data: Dict[str, Any],
+ upsert: bool = False,
+ ):
+ """Update multiple documents."""
+ return super().update_many(
+ self.collection,
+ filter_query,
+ update_data,
+ upsert
+ )
+
+ def delete_one(self, filter_query: Dict[str, Any]):
+ """Delete a single document."""
+ return super().delete_one(self.collection, filter_query)
+
+ def delete_many(self, filter_query: Dict[str, Any]):
+ """Delete multiple documents."""
+ return super().delete_many(self.collection, filter_query)
+
+ def aggregate(self, pipeline: List[Dict[str, Any]]):
+ """Execute an aggregation pipeline."""
+ return super().aggregate(self.collection, pipeline)
diff --git a/Services/MongoDb/Models/exception_handlers.py b/Services/MongoDb/Models/exception_handlers.py
new file mode 100644
index 0000000..a0dd469
--- /dev/null
+++ b/Services/MongoDb/Models/exception_handlers.py
@@ -0,0 +1,190 @@
+"""
+Exception handlers for MongoDB operations.
+
+This module provides exception handlers for MongoDB-related errors,
+converting them to appropriate HTTP responses.
+"""
+
+from typing import Callable, Any
+from fastapi import Request, status
+from fastapi.responses import JSONResponse
+from pymongo.errors import PyMongoError, DuplicateKeyError, ConnectionFailure
+
+from Services.MongoDb.Models.exceptions import (
+ MongoBaseException,
+ MongoConnectionError,
+ MongoDocumentNotFoundError,
+ MongoValidationError,
+ MongoDuplicateKeyError,
+ PasswordHistoryError,
+ PasswordReuseError,
+ PasswordHistoryLimitError,
+ InvalidPasswordDetailError
+)
+from ErrorHandlers.ErrorHandlers.api_exc_handler import HTTPExceptionApi
+
+
+def handle_mongo_errors(func: Callable) -> Callable:
+ """Decorator to handle MongoDB operation errors.
+
+ Args:
+ func: Function to wrap with error handling
+
+ Returns:
+ Wrapped function with error handling
+ """
+ async def wrapper(*args, **kwargs) -> Any:
+ try:
+ return await func(*args, **kwargs)
+ except ConnectionFailure as e:
+ raise MongoConnectionError(
+ message=str(e),
+ details={"error_type": "connection_failure"}
+ ).to_http_exception()
+ except DuplicateKeyError as e:
+ raise MongoDuplicateKeyError(
+ collection=e.details.get("namespace", "unknown"),
+ key_pattern=e.details.get("keyPattern", {}),
+ ).to_http_exception()
+ except PyMongoError as e:
+ raise MongoBaseException(
+ message=str(e),
+ details={"error_type": "pymongo_error"}
+ ).to_http_exception()
+ except Exception as e:
+ raise HTTPExceptionApi(
+ lang="en",
+ error_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ )
+ return wrapper
+
+
+async def mongo_base_exception_handler(
+ request: Request,
+ exc: MongoBaseException
+) -> JSONResponse:
+ """Handle base MongoDB exceptions.
+
+ Args:
+ request: FastAPI request
+ exc: MongoDB base exception
+
+ Returns:
+ JSON response with error details
+ """
+ return JSONResponse(
+ status_code=exc.status_code,
+ content={"error": exc.to_http_exception()}
+ )
+
+
+async def mongo_connection_error_handler(
+ request: Request,
+ exc: MongoConnectionError
+) -> JSONResponse:
+ """Handle MongoDB connection errors.
+
+ Args:
+ request: FastAPI request
+ exc: MongoDB connection error
+
+ Returns:
+ JSON response with connection error details
+ """
+ return JSONResponse(
+ status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
+ content={"error": exc.to_http_exception()}
+ )
+
+
+async def mongo_document_not_found_handler(
+ request: Request,
+ exc: MongoDocumentNotFoundError
+) -> JSONResponse:
+ """Handle document not found errors.
+
+ Args:
+ request: FastAPI request
+ exc: Document not found error
+
+ Returns:
+ JSON response with not found error details
+ """
+ return JSONResponse(
+ status_code=status.HTTP_404_NOT_FOUND,
+ content={"error": exc.to_http_exception()}
+ )
+
+
+async def mongo_validation_error_handler(
+ request: Request,
+ exc: MongoValidationError
+) -> JSONResponse:
+ """Handle validation errors.
+
+ Args:
+ request: FastAPI request
+ exc: Validation error
+
+ Returns:
+ JSON response with validation error details
+ """
+ return JSONResponse(
+ status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
+ content={"error": exc.to_http_exception()}
+ )
+
+
+async def mongo_duplicate_key_error_handler(
+ request: Request,
+ exc: MongoDuplicateKeyError
+) -> JSONResponse:
+ """Handle duplicate key errors.
+
+ Args:
+ request: FastAPI request
+ exc: Duplicate key error
+
+ Returns:
+ JSON response with duplicate key error details
+ """
+ return JSONResponse(
+ status_code=status.HTTP_409_CONFLICT,
+ content={"error": exc.to_http_exception()}
+ )
+
+
+async def password_history_error_handler(
+ request: Request,
+ exc: PasswordHistoryError
+) -> JSONResponse:
+ """Handle password history errors.
+
+ Args:
+ request: FastAPI request
+ exc: Password history error
+
+ Returns:
+ JSON response with password history error details
+ """
+ return JSONResponse(
+ status_code=exc.status_code,
+ content={"error": exc.to_http_exception()}
+ )
+
+
+def register_exception_handlers(app: Any) -> None:
+ """Register all MongoDB exception handlers with FastAPI app.
+
+ Args:
+ app: FastAPI application instance
+ """
+ app.add_exception_handler(MongoBaseException, mongo_base_exception_handler)
+ app.add_exception_handler(MongoConnectionError, mongo_connection_error_handler)
+ app.add_exception_handler(MongoDocumentNotFoundError, mongo_document_not_found_handler)
+ app.add_exception_handler(MongoValidationError, mongo_validation_error_handler)
+ app.add_exception_handler(MongoDuplicateKeyError, mongo_duplicate_key_error_handler)
+ app.add_exception_handler(PasswordHistoryError, password_history_error_handler)
+ app.add_exception_handler(PasswordReuseError, password_history_error_handler)
+ app.add_exception_handler(PasswordHistoryLimitError, password_history_error_handler)
+ app.add_exception_handler(InvalidPasswordDetailError, password_history_error_handler)
\ No newline at end of file
diff --git a/Services/MongoDb/Models/exceptions.py b/Services/MongoDb/Models/exceptions.py
new file mode 100644
index 0000000..773d868
--- /dev/null
+++ b/Services/MongoDb/Models/exceptions.py
@@ -0,0 +1,161 @@
+"""
+Custom exceptions for MongoDB operations and password management.
+
+This module defines custom exceptions for handling various error cases in MongoDB
+operations and password-related functionality.
+"""
+
+from typing import Any, Dict, Optional
+from fastapi import HTTPException, status
+from ErrorHandlers.ErrorHandlers.api_exc_handler import HTTPExceptionApi
+
+
+class MongoBaseException(Exception):
+ """Base exception for MongoDB-related errors."""
+
+ def __init__(
+ self,
+ message: str,
+ status_code: int = status.HTTP_500_INTERNAL_SERVER_ERROR,
+ details: Optional[Dict[str, Any]] = None
+ ):
+ self.message = message
+ self.status_code = status_code
+ self.details = details or {}
+ super().__init__(self.message)
+
+ def to_http_exception(self) -> HTTPException:
+ """Convert to FastAPI HTTPException."""
+ return HTTPExceptionApi(
+ lang="en",
+ error_code=self.status_code,
+ )
+
+
+class MongoConnectionError(MongoBaseException):
+ """Raised when there's an error connecting to MongoDB."""
+
+ def __init__(
+ self,
+ message: str = "Failed to connect to MongoDB",
+ details: Optional[Dict[str, Any]] = None
+ ):
+ super().__init__(
+ message=message,
+ status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
+ details=details
+ )
+
+
+class MongoDocumentNotFoundError(MongoBaseException):
+ """Raised when a document is not found in MongoDB."""
+
+ def __init__(
+ self,
+ collection: str,
+ filter_query: Dict[str, Any],
+ message: Optional[str] = None
+ ):
+ message = message or f"Document not found in collection '{collection}'"
+ super().__init__(
+ message=message,
+ status_code=status.HTTP_404_NOT_FOUND,
+ details={
+ "collection": collection,
+ "filter": filter_query
+ }
+ )
+
+
+class MongoValidationError(MongoBaseException):
+ """Raised when document validation fails."""
+
+ def __init__(
+ self,
+ message: str,
+ field_errors: Optional[Dict[str, str]] = None
+ ):
+ super().__init__(
+ message=message,
+ status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
+ details={"field_errors": field_errors or {}}
+ )
+
+
+class MongoDuplicateKeyError(MongoBaseException):
+ """Raised when trying to insert a document with a duplicate key."""
+
+ def __init__(
+ self,
+ collection: str,
+ key_pattern: Dict[str, Any],
+ message: Optional[str] = None
+ ):
+ message = message or f"Duplicate key error in collection '{collection}'"
+ super().__init__(
+ message=message,
+ status_code=status.HTTP_409_CONFLICT,
+ details={
+ "collection": collection,
+ "key_pattern": key_pattern
+ }
+ )
+
+
+class PasswordHistoryError(MongoBaseException):
+ """Base exception for password history-related errors."""
+
+ def __init__(
+ self,
+ message: str,
+ status_code: int = status.HTTP_400_BAD_REQUEST,
+ details: Optional[Dict[str, Any]] = None
+ ):
+ super().__init__(message, status_code, details)
+
+
+class PasswordReuseError(PasswordHistoryError):
+ """Raised when attempting to reuse a recent password."""
+
+ def __init__(
+ self,
+ message: str = "Password was used recently",
+ history_limit: Optional[int] = None
+ ):
+ details = {"history_limit": history_limit} if history_limit else None
+ super().__init__(
+ message=message,
+ status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
+ details=details
+ )
+
+
+class PasswordHistoryLimitError(PasswordHistoryError):
+ """Raised when password history limit is reached."""
+
+ def __init__(
+ self,
+ limit: int,
+ message: Optional[str] = None
+ ):
+ message = message or f"Password history limit of {limit} reached"
+ super().__init__(
+ message=message,
+ status_code=status.HTTP_409_CONFLICT,
+ details={"limit": limit}
+ )
+
+
+class InvalidPasswordDetailError(PasswordHistoryError):
+ """Raised when password history detail is invalid."""
+
+ def __init__(
+ self,
+ message: str,
+ field_errors: Optional[Dict[str, str]] = None
+ ):
+ super().__init__(
+ message=message,
+ status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
+ details={"field_errors": field_errors or {}}
+ )
diff --git a/Services/MongoDb/Models/mixins.py b/Services/MongoDb/Models/mixins.py
new file mode 100644
index 0000000..7a67d9a
--- /dev/null
+++ b/Services/MongoDb/Models/mixins.py
@@ -0,0 +1,161 @@
+"""
+MongoDB CRUD Operation Mixins.
+
+This module provides mixins for common MongoDB operations:
+1. Document creation (insert)
+2. Document retrieval (find)
+3. Document updates
+4. Document deletion
+5. Aggregation operations
+"""
+
+from typing import Any, Dict, List, Optional
+from functools import wraps
+
+from pymongo.collection import Collection
+from pymongo.errors import (
+ ConnectionFailure,
+ OperationFailure,
+ ServerSelectionTimeoutError,
+ PyMongoError,
+)
+
+from ErrorHandlers.ErrorHandlers.api_exc_handler import HTTPExceptionApi
+
+
+def handle_mongo_errors(func):
+ """Decorator to handle MongoDB operation errors.
+
+ Catches MongoDB-specific errors and converts them to HTTPExceptionApi.
+ """
+
+ @wraps(func)
+ def wrapper(*args, **kwargs):
+ try:
+ return func(*args, **kwargs)
+ except ConnectionFailure:
+ raise HTTPExceptionApi(
+ error_code="HTTP_503_SERVICE_UNAVAILABLE",
+ lang="en"
+ )
+ except ServerSelectionTimeoutError:
+ raise HTTPExceptionApi(
+ error_code="HTTP_504_GATEWAY_TIMEOUT",
+ lang="en"
+ )
+ except OperationFailure as e:
+ raise HTTPExceptionApi(
+ error_code="HTTP_400_BAD_REQUEST",
+ lang="en"
+ )
+ except PyMongoError as e:
+ raise HTTPExceptionApi(
+ error_code="HTTP_500_INTERNAL_SERVER_ERROR",
+ lang="en"
+ )
+ return wrapper
+
+
+class MongoInsertMixin:
+ """Mixin for MongoDB insert operations."""
+
+ @handle_mongo_errors
+ def insert_one(self, collection: Collection, document: Dict[str, Any]):
+ """Insert a single document into the collection."""
+ result = collection.insert_one(document)
+ return result
+
+ @handle_mongo_errors
+ def insert_many(self, collection: Collection, documents: List[Dict[str, Any]]):
+ """Insert multiple documents into the collection."""
+ result = collection.insert_many(documents)
+ return result
+
+
+class MongoFindMixin:
+ """Mixin for MongoDB find operations."""
+
+ @handle_mongo_errors
+ def find_one(
+ self,
+ collection: Collection,
+ filter_query: Dict[str, Any],
+ projection: Optional[Dict[str, Any]] = None,
+ ):
+ """Find a single document in the collection."""
+ result = collection.find_one(filter_query, projection)
+ return result
+
+ @handle_mongo_errors
+ def find_many(
+ self,
+ collection: Collection,
+ filter_query: Dict[str, Any],
+ projection: Optional[Dict[str, Any]] = None,
+ sort: Optional[List[tuple]] = None,
+ limit: Optional[int] = None,
+ skip: Optional[int] = None,
+ ):
+ """Find multiple documents in the collection with pagination support."""
+ cursor = collection.find(filter_query, projection)
+ if sort:
+ cursor = cursor.sort(sort)
+ if skip:
+ cursor = cursor.skip(skip)
+ if limit:
+ cursor = cursor.limit(limit)
+ return list(cursor)
+
+
+class MongoUpdateMixin:
+ """Mixin for MongoDB update operations."""
+
+ @handle_mongo_errors
+ def update_one(
+ self,
+ collection: Collection,
+ filter_query: Dict[str, Any],
+ update_data: Dict[str, Any],
+ upsert: bool = False,
+ ):
+ """Update a single document in the collection."""
+ result = collection.update_one(filter_query, update_data, upsert=upsert)
+ return result
+
+ @handle_mongo_errors
+ def update_many(
+ self,
+ collection: Collection,
+ filter_query: Dict[str, Any],
+ update_data: Dict[str, Any],
+ upsert: bool = False,
+ ):
+ """Update multiple documents in the collection."""
+ result = collection.update_many(filter_query, update_data, upsert=upsert)
+ return result
+
+
+class MongoDeleteMixin:
+ """Mixin for MongoDB delete operations."""
+
+ @handle_mongo_errors
+ def delete_one(self, collection: Collection, filter_query: Dict[str, Any]):
+ """Delete a single document from the collection."""
+ result = collection.delete_one(filter_query)
+ return result
+
+ @handle_mongo_errors
+ def delete_many(self, collection: Collection, filter_query: Dict[str, Any]):
+ """Delete multiple documents from the collection."""
+ result = collection.delete_many(filter_query)
+ return result
+
+
+class MongoAggregateMixin:
+ """Mixin for MongoDB aggregation operations."""
+
+ @handle_mongo_errors
+ def aggregate(self, collection: Collection, pipeline: List[Dict[str, Any]]):
+ """Execute an aggregation pipeline on the collection."""
+ result = collection.aggregate(pipeline)
+ return result
diff --git a/Services/MongoDb/Models/response.py b/Services/MongoDb/Models/response.py
new file mode 100644
index 0000000..68666fe
--- /dev/null
+++ b/Services/MongoDb/Models/response.py
@@ -0,0 +1,85 @@
+"""
+Response handler for MongoDB query results.
+
+This module provides a wrapper class for MongoDB query results,
+adding convenience methods for accessing data and managing query state.
+"""
+
+from typing import Any, Dict, List, Optional, TypeVar, Generic, Union
+from pymongo.cursor import Cursor
+
+T = TypeVar("T")
+
+
+class MongoResponse(Generic[T]):
+ """
+ Wrapper for MongoDB query results.
+
+ Attributes:
+ cursor: MongoDB cursor object
+ first: Whether to return first result only
+ data: Query results (lazy loaded)
+ count: Total count of results
+ """
+
+ def __init__(
+ self,
+ cursor: Optional[Cursor] = None,
+ first: bool = False,
+ status: bool = True,
+ message: str = "",
+ error: Optional[str] = None,
+ data: Optional[Union[List[T], T]] = None,
+ ):
+ self._cursor = cursor
+ self._first = first
+ self.status = status
+ self.message = message
+ self.error = error
+ self._data: Optional[Union[List[T], T]] = data
+ self._count: Optional[int] = None
+
+ @property
+ def data(self) -> Union[List[T], T, None]:
+ """
+ Lazy load and return query results.
+ Returns first item if first=True, otherwise returns all results.
+ """
+ if self._data is None and self._cursor is not None:
+ results = list(self._cursor)
+ self._data = results[0] if self._first and results else results
+ return self._data
+
+ @property
+ def count(self) -> int:
+ """Lazy load and return total count of results."""
+ if self._count is None:
+ if self._cursor is not None:
+ self._count = self._cursor.count()
+ else:
+ self._count = len(self.all)
+ return self._count
+
+ @property
+ def all(self) -> List[T]:
+ """Get all results as list."""
+ return (
+ self.data
+ if isinstance(self.data, list)
+ else [self.data] if self.data else []
+ )
+
+ @property
+ def first(self) -> Optional[T]:
+ """Get first result only."""
+ return self.data if self._first else (self.data[0] if self.data else None)
+
+ def as_dict(self) -> Dict[str, Any]:
+ """Convert response to dictionary format."""
+ return {
+ "status": self.status,
+ "message": self.message,
+ "data": self.data,
+ "count": self.count,
+ "error": self.error,
+ }
diff --git a/Services/MongoDb/database.py b/Services/MongoDb/database.py
new file mode 100644
index 0000000..b0f5089
--- /dev/null
+++ b/Services/MongoDb/database.py
@@ -0,0 +1,146 @@
+"""
+MongoDB database connection and operations.
+
+This module provides MongoDB connection management with:
+1. Connection pooling
+2. Lifecycle management
+3. Error handling
+"""
+
+from typing import Optional, Dict, Any, List, Union
+from pymongo import MongoClient
+from pymongo.results import InsertOneResult, DeleteResult, UpdateResult
+from pymongo.cursor import Cursor
+
+from AllConfigs.NoSqlDatabase.configs import MongoConfig
+
+
+class MongoInsertMixin:
+ """Mixin for MongoDB insert operations."""
+
+ def insert_one(self, document: Dict[str, Any]) -> InsertOneResult:
+ """Insert a single document."""
+ return self.collection.insert_one(document)
+
+ def insert_many(self, documents: List[Dict[str, Any]]) -> List[InsertOneResult]:
+ """Insert multiple documents."""
+ return self.collection.insert_many(documents)
+
+
+class MongoFindMixin:
+ """Mixin for MongoDB find operations."""
+
+ def find_one(self, filter_query: Dict[str, Any]) -> Optional[Dict[str, Any]]:
+ """Find a single document."""
+ return self.collection.find_one(filter_query)
+
+ def find_many(self, filter_query: Dict[str, Any]) -> Cursor:
+ """Find multiple documents."""
+ return self.collection.find(filter_query)
+
+
+class MongoUpdateMixin:
+ """Mixin for MongoDB update operations."""
+
+ def update_one(self, filter_query: Dict[str, Any], update: Dict[str, Any]) -> UpdateResult:
+ """Update a single document."""
+ return self.collection.update_one(filter_query, update)
+
+ def update_many(self, filter_query: Dict[str, Any], update: Dict[str, Any]) -> UpdateResult:
+ """Update multiple documents."""
+ return self.collection.update_many(filter_query, update)
+
+
+class MongoDeleteMixin:
+ """Mixin for MongoDB delete operations."""
+
+ def delete_one(self, filter_query: Dict[str, Any]) -> DeleteResult:
+ """Delete a single document."""
+ return self.collection.delete_one(filter_query)
+
+ def delete_many(self, filter_query: Dict[str, Any]) -> DeleteResult:
+ """Delete multiple documents."""
+ return self.collection.delete_many(filter_query)
+
+
+class MongoAggregateMixin:
+ """Mixin for MongoDB aggregate operations."""
+
+ def aggregate(self, pipeline: List[Dict[str, Any]]) -> Cursor:
+ """Execute an aggregation pipeline."""
+ return self.collection.aggregate(pipeline)
+
+
+class MongoDBHandler(
+ MongoInsertMixin,
+ MongoFindMixin,
+ MongoUpdateMixin,
+ MongoDeleteMixin,
+ MongoAggregateMixin,
+):
+ """Handler for MongoDB operations with connection management."""
+
+ _instance = None
+ _client: Optional[MongoClient] = None
+
+ def __new__(cls):
+ """Implement singleton pattern for database connection."""
+ if cls._instance is None:
+ cls._instance = super().__new__(cls)
+ return cls._instance
+
+ def __init__(self):
+ """Initialize MongoDB connection if not already initialized."""
+ if not self._client:
+ # Build connection URL based on whether credentials are provided
+ if MongoConfig.USER_NAME and MongoConfig.PASSWORD:
+ connection_url = (
+ f"mongodb://{MongoConfig.USER_NAME}:{MongoConfig.PASSWORD}"
+ f"@{MongoConfig.HOST}:{MongoConfig.PORT}"
+ )
+ else:
+ connection_url = f"mongodb://{MongoConfig.HOST}:{MongoConfig.PORT}"
+
+ # Build connection options
+ connection_kwargs = {
+ "host": connection_url,
+ "maxPoolSize": 50, # Maximum number of connections in the pool
+ "minPoolSize": 10, # Minimum number of connections in the pool
+ "maxIdleTimeMS": 30000, # Maximum time a connection can be idle (30 seconds)
+ "waitQueueTimeoutMS": 2000, # How long a thread will wait for a connection
+ "serverSelectionTimeoutMS": 5000, # How long to wait for server selection
+ }
+
+ self._client = MongoClient(**connection_kwargs)
+
+ # Test connection
+ self._client.admin.command('ping')
+
+ def close(self):
+ """Close MongoDB connection."""
+ if self._client:
+ self._client.close()
+ self._client = None
+
+ def __del__(self):
+ """Ensure connection is closed on deletion."""
+ self.close()
+
+ @property
+ def client(self) -> MongoClient:
+ """Get MongoDB client."""
+ return self._client
+
+ def get_database(self, database_name: str = None):
+ """Get MongoDB database."""
+ db_name = database_name or MongoConfig.DATABASE_NAME
+ return self._client[db_name]
+
+ def get_collection(self, collection_name: str, database_name: str = None):
+ """Get MongoDB collection."""
+ database = self.get_database(database_name)
+ return database[collection_name]
+
+
+# Create a singleton instance
+mongodb = MongoDBHandler()
diff --git a/Services/MongoDb/how_to.py b/Services/MongoDb/how_to.py
new file mode 100644
index 0000000..1eb12b8
--- /dev/null
+++ b/Services/MongoDb/how_to.py
@@ -0,0 +1,168 @@
+"""
+MongoDB Operations Examples
+
+This module provides practical examples of using MongoDB operations through our mixins.
+Each example demonstrates different aspects of CRUD operations and aggregation.
+"""
+
+from typing import Dict, List, Any
+from datetime import datetime
+
+from Services.MongoDb.database import mongodb
+
+
+def insert_examples() -> None:
+ """Examples of insert operations."""
+ # Single document insert
+ user_doc = {
+ "username": "john_doe",
+ "email": "john@example.com",
+ "age": 30,
+ "created_at": datetime.utcnow(),
+ }
+ user_id = mongodb.insert_one(
+ database="user_db", collection="users", document=user_doc
+ )
+ print(f"Inserted user with ID: {user_id}")
+
+ # Multiple documents insert
+ products = [
+ {"name": "Laptop", "price": 999.99, "stock": 50},
+ {"name": "Mouse", "price": 29.99, "stock": 100},
+ {"name": "Keyboard", "price": 59.99, "stock": 75},
+ ]
+ product_ids = mongodb.insert_many(
+ database="store_db", collection="products", documents=products
+ )
+ print(f"Inserted {len(product_ids)} products")
+
+
+def find_examples() -> None:
+ """Examples of find operations."""
+ # Find one document
+ user = mongodb.find_one(
+ database="user_db",
+ collection="users",
+ filter_query={"email": "john@example.com"},
+ projection={"username": 1, "email": 1, "_id": 0},
+ )
+ print(f"Found user: {user}")
+
+ # Find many with pagination
+ page_size = 10
+ page_number = 1
+ products = mongodb.find_many(
+ database="store_db",
+ collection="products",
+ filter_query={"price": {"$lt": 100}},
+ projection={"name": 1, "price": 1},
+ sort=[("price", 1)], # Sort by price ascending
+ limit=page_size,
+ skip=(page_number - 1) * page_size,
+ )
+ print(f"Found {len(products)} products under $100")
+
+
+def update_examples() -> None:
+ """Examples of update operations."""
+ # Update single document
+ result = mongodb.update_one(
+ database="store_db",
+ collection="products",
+ filter_query={"name": "Laptop"},
+ update_data={"price": 899.99, "stock": 45},
+ upsert=False,
+ )
+ print(f"Updated {result['modified_count']} laptop(s)")
+
+ # Update multiple documents
+ result = mongodb.update_many(
+ database="store_db",
+ collection="products",
+ filter_query={"stock": {"$lt": 10}},
+ update_data={"status": "low_stock"},
+ upsert=True,
+ )
+ print(f"Updated {result['modified_count']} low stock products")
+
+
+def delete_examples() -> None:
+ """Examples of delete operations."""
+ # Delete single document
+ count = mongodb.delete_one(
+ database="user_db",
+ collection="users",
+ filter_query={"email": "john@example.com"},
+ )
+ print(f"Deleted {count} user")
+
+ # Delete multiple documents
+ count = mongodb.delete_many(
+ database="store_db", collection="products", filter_query={"stock": 0}
+ )
+ print(f"Deleted {count} out-of-stock products")
+
+
+def aggregate_examples() -> None:
+ """Examples of aggregation operations."""
+ # Calculate average price by category
+ pipeline = [
+ {
+ "$group": {
+ "_id": "$category",
+ "avg_price": {"$avg": "$price"},
+ "total_products": {"$sum": 1},
+ }
+ },
+ {"$sort": {"avg_price": -1}},
+ ]
+ results = mongodb.aggregate(
+ database="store_db", collection="products", pipeline=pipeline
+ )
+ print("Category statistics:", list(results))
+
+
+def complex_query_example() -> None:
+ """Example of a complex query combining multiple operations."""
+ # Find active users who made purchases in last 30 days
+ pipeline = [
+ {
+ "$match": {
+ "status": "active",
+ "last_purchase": {
+ "$gte": datetime.utcnow().replace(day=datetime.utcnow().day - 30)
+ },
+ }
+ },
+ {
+ "$lookup": {
+ "from": "orders",
+ "localField": "_id",
+ "foreignField": "user_id",
+ "as": "recent_orders",
+ }
+ },
+ {
+ "$project": {
+ "username": 1,
+ "email": 1,
+ "total_orders": {"$size": "$recent_orders"},
+ "total_spent": {"$sum": "$recent_orders.amount"},
+ }
+ },
+ {"$sort": {"total_spent": -1}},
+ ]
+ results = mongodb.aggregate(
+ database="user_db", collection="users", pipeline=pipeline
+ )
+ print("Active users with recent purchases:", list(results))
+
+
+if __name__ == "__main__":
+ # Example usage of all operations
+ insert_examples()
+ find_examples()
+ update_examples()
+ delete_examples()
+ aggregate_examples()
+ complex_query_example()
diff --git a/Services/PostgresDb/Models/alchemy_response.py b/Services/PostgresDb/Models/alchemy_response.py
index 9fe2640..67fa64c 100644
--- a/Services/PostgresDb/Models/alchemy_response.py
+++ b/Services/PostgresDb/Models/alchemy_response.py
@@ -12,161 +12,92 @@ through response models.
"""
from __future__ import annotations
-from typing import Any, Dict, List, Optional, Type, Union, TypeVar, Protocol
+from typing import Any, Dict, List, Optional, Type, TypeVar, Protocol, Generic
from dataclasses import dataclass
from fastapi import status
from fastapi.responses import JSONResponse
-from sqlalchemy.orm import Query
-from ApiValidations.Request.base_validations import PydanticBaseModel
-from ApiValidations.handler import BaseModelRegular
from Services.PostgresDb.Models.response import PostgresResponse
from ErrorHandlers.ErrorHandlers.api_exc_handler import HTTPExceptionApi
+from Services.pagination import Pagination, PaginationConfig
+
T = TypeVar("T")
-
-
-class DataValidator(Protocol):
- """Protocol for data validation methods."""
-
- @staticmethod
- def validate_data(data: Any, cls_object: Any) -> None:
- """Validate data and raise HTTPExceptionApi if invalid."""
- ...
+DataT = TypeVar("DataT")
@dataclass
-class PaginationConfig:
- """
- Configuration for pagination settings.
-
- Attributes:
- page: Current page number (default: 1)
- size: Items per page (default: 10)
- order_field: Field to order by (default: "id")
- order_type: Order direction (default: "asc")
- """
-
- page: int = 1
- size: int = 10
- order_field: str = "id"
- order_type: str = "asc"
-
-
-class Pagination:
- """
- Handles pagination logic for query results.
-
- Manages page size, current page, ordering, and calculates total pages
- and items based on the data source.
-
- Attributes:
- DEFAULT_SIZE: Default number of items per page (10)
- MIN_SIZE: Minimum allowed page size (10)
- MAX_SIZE: Maximum allowed page size (40)
- """
-
- DEFAULT_SIZE = 10
- MIN_SIZE = 10
- MAX_SIZE = 40
-
- def __init__(self):
- self.size: int = self.DEFAULT_SIZE
- self.page: int = 1
- self.order_field: str = "id"
- self.order_type: str = "asc"
- self.page_count: int = 1
- self.total_count: int = 0
- self.total_pages: int = 1
-
- def change(self, config: PaginationConfig) -> None:
- """Update pagination settings from config."""
- self.size = (
- config.size
- if self.MIN_SIZE <= config.size <= self.MAX_SIZE
- else self.DEFAULT_SIZE
- )
- self.page = config.page
- self.order_field = config.order_field
- self.order_type = config.order_type
- self._update_page_counts()
-
- def feed(self, data: Union[List[Any], PostgresResponse, Query]) -> None:
- """Calculate pagination based on data source."""
- self.total_count = (
- len(data)
- if isinstance(data, list)
- else data.count if isinstance(data, PostgresResponse) else data.count()
- )
- self._update_page_counts()
-
- def _update_page_counts(self) -> None:
- """Update page counts and validate current page."""
- self.total_pages = max(1, (self.total_count + self.size - 1) // self.size)
- self.page = max(1, min(self.page, self.total_pages))
- self.page_count = (
- self.total_count % self.size
- if self.page == self.total_pages and self.total_count % self.size
- else self.size
- )
-
- def as_dict(self) -> Dict[str, Any]:
- """Convert pagination state to dictionary format."""
- return {
- "size": self.size,
- "page": self.page,
- "totalCount": self.total_count,
- "totalPages": self.total_pages,
- "pageCount": self.page_count,
- "orderField": self.order_field,
- "orderType": self.order_type,
- }
-
-
-@dataclass
-class ResponseConfig:
- """
- Configuration for response formatting.
+class ResponseConfig(Generic[T]):
+ """Configuration for response formatting.
Attributes:
status_code: HTTP status code (default: "HTTP_200_OK")
- message: Response message
- completed: Operation completion status
- cls_object: Class object for error handling
+ message: Response message to include in the response
+ completed: Operation completion status flag
+ cls_object: Class object for error handling context
+ response_model: Optional response model class for data transformation
"""
status_code: str = "HTTP_200_OK"
message: str = ""
completed: bool = True
cls_object: Optional[Any] = None
+ response_model: Optional[Type[T]] = None
-class BaseJsonResponse:
- """
- Base class for JSON response handling.
+class ResponseProtocol(Protocol):
+ """Protocol defining required methods for response models."""
- Provides common functionality for all response types:
- - Response formatting
- - Pagination handling
- - Data transformation
+ def dump(self) -> Dict[str, Any]:
+ """Convert model to dictionary format."""
+ ...
+
+
+class BaseJsonResponse(Generic[T]):
+ """Base class for JSON response handling.
+
+ Provides common functionality for all response types including:
+ - Response formatting with consistent structure
+ - Pagination handling and configuration
+ - Data transformation through response models
"""
def __init__(
self,
- config: ResponseConfig,
+ message: str,
+ result: Any,
response_model: Optional[Type[T]] = None,
+ status_code: str = "HTTP_200_OK",
+ completed: bool = True,
+ cls_object: Optional[Any] = None,
filter_attributes: Optional[Any] = None,
- ):
- self.status_code = getattr(status, config.status_code, status.HTTP_200_OK)
- self.message = config.message
- self.completed = config.completed
+ ) -> None:
+ """Initialize response handler.
+
+ Args:
+ message: Response message
+ result: Query result or data
+ response_model: Optional model for data transformation
+ status_code: HTTP status code
+ completed: Operation completion status
+ cls_object: Class object for error context
+ filter_attributes: Optional pagination and filtering attributes
+ """
+ self.status_code = getattr(status, status_code, status.HTTP_200_OK)
+ self.message = message
+ self.completed = completed
self.filter_attributes = filter_attributes
self.response_model = response_model
- self.cls_object = config.cls_object
+ self.cls_object = cls_object
+ self.result = result
def _create_pagination(self) -> Pagination:
- """Create and configure pagination instance."""
+ """Create and configure pagination instance.
+
+ Returns:
+ Configured Pagination instance
+ """
pagination = Pagination()
if self.filter_attributes:
pagination.change(
@@ -180,7 +111,15 @@ class BaseJsonResponse:
return pagination
def _format_response(self, pagination: Pagination, data: Any) -> JSONResponse:
- """Format final JSON response with pagination."""
+ """Format final JSON response with pagination.
+
+ Args:
+ pagination: Pagination instance with configuration
+ data: Response data to include
+
+ Returns:
+ Formatted JSONResponse
+ """
return JSONResponse(
status_code=self.status_code,
content={
@@ -191,9 +130,31 @@ class BaseJsonResponse:
},
)
+ def _transform_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
+ """Transform data using response model if provided.
+
+ Args:
+ data: Raw data dictionary
+
+ Returns:
+ Transformed data dictionary
+ """
+ if self.response_model:
+ return self.response_model(**data).dump()
+ return data
+
@staticmethod
def _validate_data(data: Any, expected_type: Type, cls_object: Any) -> None:
- """Validate data type and raise exception if invalid."""
+ """Validate data type and raise exception if invalid.
+
+ Args:
+ data: Data to validate
+ expected_type: Expected type of data
+ cls_object: Class object for error context
+
+ Raises:
+ HTTPExceptionApi: If data type is invalid
+ """
if not isinstance(data, expected_type):
raise HTTPExceptionApi(
lang=cls_object.lang,
@@ -201,58 +162,8 @@ class BaseJsonResponse:
)
-class SinglePostgresResponse(BaseJsonResponse):
- """
- Handles single record responses from PostgreSQL queries.
-
- Used when expecting a single record from a database query.
- Validates that the result is a PostgresResponse and contains exactly one record.
- """
-
- def __new__(
- cls,
- message: str,
- result: PostgresResponse,
- response_model: Optional[Type[T]] = None,
- status_code: str = "HTTP_200_OK",
- completed: bool = True,
- cls_object: Optional[Any] = None,
- ) -> JSONResponse:
- cls._validate_data(result, PostgresResponse, cls_object)
-
- if not result.first:
- raise HTTPExceptionApi(
- lang=cls_object.lang,
- error_code="HTTP_400_BAD_REQUEST",
- )
-
- instance = cls()
- instance.__init__(
- ResponseConfig(
- status_code=status_code,
- message=message,
- completed=completed,
- cls_object=cls_object,
- ),
- response_model=response_model,
- )
-
- pagination = instance._create_pagination()
- data = result.data.get_dict()
- if response_model:
- data = response_model(**data).dump()
-
- return instance._format_response(pagination, data)
-
-
-class AlchemyJsonResponse(BaseJsonResponse):
- """
- Handles multiple record responses from SQLAlchemy queries.
-
- Used for database queries returning multiple records.
- Validates that the result is a PostgresResponse and contains multiple records.
- Supports pagination and data transformation through response models.
- """
+class SinglePostgresResponse(BaseJsonResponse[T]):
+ """Handler for single record responses from PostgreSQL queries."""
def __new__(
cls,
@@ -264,6 +175,78 @@ class AlchemyJsonResponse(BaseJsonResponse):
cls_object: Optional[Any] = None,
filter_attributes: Optional[Any] = None,
) -> JSONResponse:
+ """Create response for single PostgreSQL record.
+
+ Args:
+ message: Response message
+ result: PostgreSQL query result
+ response_model: Optional model for data transformation
+ status_code: HTTP status code
+ completed: Operation completion status
+ cls_object: Class object for error context
+ filter_attributes: Optional pagination and filtering attributes
+
+ Returns:
+ Formatted JSON response
+
+ Raises:
+ HTTPExceptionApi: If result is invalid or empty
+ """
+ cls._validate_data(result, PostgresResponse, cls_object)
+
+ if not result.first:
+ raise HTTPExceptionApi(
+ lang=cls_object.lang,
+ error_code="HTTP_400_BAD_REQUEST",
+ )
+
+ instance = super().__new__(cls)
+ instance.__init__(
+ message=message,
+ result=result,
+ response_model=response_model,
+ status_code=status_code,
+ completed=completed,
+ cls_object=cls_object,
+ filter_attributes=filter_attributes,
+ )
+
+ pagination = instance._create_pagination()
+ data = instance._transform_data(result.data.get_dict())
+
+ return instance._format_response(pagination, data)
+
+
+class AlchemyJsonResponse(BaseJsonResponse[T]):
+ """Handler for multiple record responses from SQLAlchemy queries."""
+
+ def __new__(
+ cls,
+ message: str,
+ result: PostgresResponse,
+ response_model: Optional[Type[T]] = None,
+ status_code: str = "HTTP_200_OK",
+ completed: bool = True,
+ cls_object: Optional[Any] = None,
+ filter_attributes: Optional[Any] = None,
+ ) -> JSONResponse:
+ """Create response for multiple SQLAlchemy records.
+
+ Args:
+ message: Response message
+ result: PostgreSQL query result
+ response_model: Optional model for data transformation
+ status_code: HTTP status code
+ completed: Operation completion status
+ cls_object: Class object for error context
+ filter_attributes: Optional pagination and filtering attributes
+
+ Returns:
+ Formatted JSON response
+
+ Raises:
+ HTTPExceptionApi: If result is invalid
+ """
cls._validate_data(result, PostgresResponse, cls_object)
if result.first:
@@ -272,40 +255,26 @@ class AlchemyJsonResponse(BaseJsonResponse):
error_code="HTTP_400_BAD_REQUEST",
)
- instance = cls()
+ instance = super().__new__(cls)
instance.__init__(
- ResponseConfig(
- status_code=status_code,
- message=message,
- completed=completed,
- cls_object=cls_object,
- ),
+ message=message,
+ result=result,
response_model=response_model,
+ status_code=status_code,
+ completed=completed,
+ cls_object=cls_object,
filter_attributes=filter_attributes,
)
pagination = instance._create_pagination()
- data = [
- (
- response_model(**item.get_dict()).dump()
- if response_model
- else item.get_dict()
- )
- for item in result.data
- ]
+ data = [instance._transform_data(item.get_dict()) for item in result.data]
pagination.feed(data)
return instance._format_response(pagination, data)
-class ListJsonResponse(BaseJsonResponse):
- """
- Handles responses for list data.
-
- Used when working with Python lists that need to be paginated
- and optionally transformed through a response model.
- Validates that the input is a list.
- """
+class ListJsonResponse(BaseJsonResponse[T]):
+ """Handler for list data responses."""
def __new__(
cls,
@@ -317,37 +286,42 @@ class ListJsonResponse(BaseJsonResponse):
cls_object: Optional[Any] = None,
filter_attributes: Optional[Any] = None,
) -> JSONResponse:
+ """Create response for list data.
+
+ Args:
+ message: Response message
+ result: List of data items
+ response_model: Optional model for data transformation
+ status_code: HTTP status code
+ completed: Operation completion status
+ cls_object: Class object for error context
+ filter_attributes: Optional pagination and filtering attributes
+
+ Returns:
+ Formatted JSON response
+ """
cls._validate_data(result, list, cls_object)
- instance = cls()
+ instance = super().__new__(cls)
instance.__init__(
- ResponseConfig(
- status_code=status_code,
- message=message,
- completed=completed,
- cls_object=cls_object,
- ),
+ message=message,
+ result=result,
response_model=response_model,
+ status_code=status_code,
+ completed=completed,
+ cls_object=cls_object,
filter_attributes=filter_attributes,
)
pagination = instance._create_pagination()
- data = [
- response_model(**item).dump() if response_model else item for item in result
- ]
+ data = [instance._transform_data(item) for item in result]
pagination.feed(data)
return instance._format_response(pagination, data)
-class DictJsonResponse(BaseJsonResponse):
- """
- Handles responses for dictionary data.
-
- Used when working with single dictionary objects that need to be
- transformed through a response model. Validates that the input
- is a dictionary.
- """
+class DictJsonResponse(BaseJsonResponse[T]):
+ """Handler for dictionary data responses."""
def __new__(
cls,
@@ -359,21 +333,34 @@ class DictJsonResponse(BaseJsonResponse):
cls_object: Optional[Any] = None,
filter_attributes: Optional[Any] = None,
) -> JSONResponse:
+ """Create response for dictionary data.
+
+ Args:
+ message: Response message
+ result: Dictionary data
+ response_model: Optional model for data transformation
+ status_code: HTTP status code
+ completed: Operation completion status
+ cls_object: Class object for error context
+ filter_attributes: Optional pagination and filtering attributes
+
+ Returns:
+ Formatted JSON response
+ """
cls._validate_data(result, dict, cls_object)
- instance = cls()
+ instance = super().__new__(cls)
instance.__init__(
- ResponseConfig(
- status_code=status_code,
- message=message,
- completed=completed,
- cls_object=cls_object,
- ),
+ message=message,
+ result=result,
response_model=response_model,
+ status_code=status_code,
+ completed=completed,
+ cls_object=cls_object,
filter_attributes=filter_attributes,
)
pagination = instance._create_pagination()
- data = response_model(**result).dump() if response_model else result
+ data = instance._transform_data(result)
return instance._format_response(pagination, data)
diff --git a/Services/PostgresDb/Models/base_model.py b/Services/PostgresDb/Models/base_model.py
index b5f5558..fa95209 100644
--- a/Services/PostgresDb/Models/base_model.py
+++ b/Services/PostgresDb/Models/base_model.py
@@ -1,30 +1,66 @@
from contextlib import contextmanager
from typing import Any, Dict, Optional, Generator
from sqlalchemy.orm import Session
+from sqlalchemy import inspect
+from Services.PostgresDb.database import Base
-class BaseModel:
- """Base model class with common utility functions."""
+class BaseModel(Base):
+ """Base model class with common utility functions and SQLAlchemy integration.
+
+ This class serves as the foundation for all database models, providing:
+ - SQLAlchemy ORM integration through Base
+ - Session management utilities
+ - CRUD operations (create, update)
+ - Bulk operation support
+ """
+
+ __abstract__ = True # Marks this as a base class, won't create a table
+
+ def get_session(self) -> Session:
+ """Get database session."""
+ from Services.PostgresDb.database import get_db
+
+ with get_db() as session:
+ return session
+
+ def update(
+ self, session: Optional[Session] = None, **kwargs: Dict[str, Any]
+ ) -> "BaseModel":
+ """Update model instance with given attributes.
+
+ Args:
+ session: Optional existing session to use. If not provided, creates a new one.
+ **kwargs: Attributes to update
+
+ Returns:
+ Updated model instance
+
+ Example:
+ # Using an existing session
+ with get_db() as session:
+ model.update(session=session, name="new name")
+ model2.update(session=session, status="active")
+ # Both updates use the same transaction
+
+ # Creating a new session automatically
+ model.update(name="new name") # Creates and manages its own session
+ """
+ should_close_session = session is None
+ if session is None:
+ session = self.get_session()
- @contextmanager
- def db_session(self) -> Generator[Session, None, None]:
- """Context manager for database session."""
- session = self.get_session()
try:
- yield session
- session.commit()
- except Exception:
- session.rollback()
- raise
- finally:
- session.close()
-
- def update(self, **kwargs: Dict[str, Any]) -> "BaseModel":
- """Update model instance with given attributes."""
- with self.db_session() as session:
# Remove unrelated fields
check_kwargs = self.remove_non_related_inputs(kwargs)
+ # Get all table columns
+ mapper = inspect(self.__class__)
+ columns = [column.key for column in mapper.columns]
+
+ # Get relationship fields
+ relationships = [rel.key for rel in mapper.relationships]
+
# Handle confirmation logic
is_confirmed_argument = kwargs.get("is_confirmed", None)
if is_confirmed_argument and not len(kwargs) == 1:
@@ -38,9 +74,20 @@ class BaseModel:
# Process system fields
check_kwargs = self.extract_system_fields(check_kwargs, create=False)
- # Update attributes
+ # Update columns
for key, value in check_kwargs.items():
- setattr(self, key, value)
+ if key in columns:
+ setattr(self, key, value)
+ elif key in relationships:
+ # Handle relationship updates
+ related_obj = getattr(self, key)
+ if isinstance(related_obj, list):
+ # Handle many-to-many or one-to-many relationships
+ if isinstance(value, list):
+ setattr(self, key, value)
+ else:
+ # Handle many-to-one or one-to-one relationships
+ setattr(self, key, value)
# Handle user tracking
if hasattr(self, "creds"):
@@ -59,23 +106,68 @@ class BaseModel:
session.flush()
return self
+ except Exception:
+ if should_close_session:
+ session.rollback()
+ raise
+ finally:
+ if should_close_session:
+ session.close()
+
@classmethod
- @contextmanager
- def create_with_session(
- cls, **kwargs: Dict[str, Any]
- ) -> Generator["BaseModel", None, None]:
- """Create new instance with session management."""
+ def create(
+ cls, session: Optional[Session] = None, **kwargs: Dict[str, Any]
+ ) -> "BaseModel":
+ """Create new instance with optional session reuse.
+
+ Args:
+ session: Optional existing session to use. If not provided, creates a new one.
+ **kwargs: Attributes for the new instance
+
+ Returns:
+ Created model instance
+
+ Example:
+ # Using an existing session for multiple creates
+ with get_db() as session:
+ user1 = User.create(session=session, name="John")
+ user2 = User.create(session=session, name="Jane")
+ # Both creates use the same transaction
+
+ # Creating with auto-managed session
+ user = User.create(name="John") # Creates and manages its own session
+ """
instance = cls()
- session = instance.get_session()
+ should_close_session = session is None
+
+ if session is None:
+ session = instance.get_session()
+
try:
check_kwargs = cls.remove_non_related_inputs(instance, kwargs)
check_kwargs = cls.extract_system_fields(
instance, check_kwargs, create=True
)
- for key, value in check_kwargs.items():
- setattr(instance, key, value)
+ # Get all table columns and relationships
+ mapper = inspect(cls)
+ columns = [column.key for column in mapper.columns]
+ relationships = [rel.key for rel in mapper.relationships]
+ # Set attributes
+ for key, value in check_kwargs.items():
+ if key in columns:
+ setattr(instance, key, value)
+ elif key in relationships:
+ # Handle relationship assignments
+ if isinstance(value, list):
+ # Handle many-to-many or one-to-many relationships
+ setattr(instance, key, value)
+ else:
+ # Handle many-to-one or one-to-one relationships
+ setattr(instance, key, value)
+
+ # Handle user tracking
if hasattr(instance, "creds"):
person_id = getattr(instance.creds, "person_id", None)
person_name = getattr(instance.creds, "person_name", None)
@@ -86,13 +178,55 @@ class BaseModel:
session.add(instance)
session.flush()
- yield instance
- session.commit()
+
+ if should_close_session:
+ session.commit()
+
+ return instance
+
except Exception:
- session.rollback()
+ if should_close_session:
+ session.rollback()
raise
finally:
- session.close()
+ if should_close_session:
+ session.close()
+
+ @classmethod
+ @contextmanager
+ def bulk_create(
+ cls, session: Optional[Session] = None
+ ) -> Generator[Session, None, None]:
+ """Context manager for bulk creating instances.
+
+ Args:
+ session: Optional existing session to use. If not provided, creates a new one.
+
+ Yields:
+ SQLAlchemy session for creating multiple instances
+
+ Example:
+ # Bulk create multiple instances in one transaction
+ with User.bulk_create() as session:
+ user1 = User.create(session=session, name="John")
+ user2 = User.create(session=session, name="Jane")
+ # Both creates share the same transaction
+ """
+ should_close_session = session is None
+ if session is None:
+ session = cls().get_session()
+
+ try:
+ yield session
+ if should_close_session:
+ session.commit()
+ except Exception:
+ if should_close_session:
+ session.rollback()
+ raise
+ finally:
+ if should_close_session:
+ session.close()
# @router.put("/users/{user_id}")
@@ -101,13 +235,12 @@ class BaseModel:
# update_data: Dict[str, Any],
# db: Session = Depends(get_db_session)
# ):
-# with db_session() as session:
-# user = session.query(User).filter(User.id == user_id).first()
-# if not user:
-# raise HTTPException(status_code=404, detail="User not found")
+# user = db.query(User).filter(User.id == user_id).first()
+# if not user:
+# raise HTTPException(status_code=404, detail="User not found")
#
-# updated_user = user.update(**update_data)
-# return updated_user
+# updated_user = user.update(**update_data)
+# return updated_user
#
#
# @router.post("/users")
diff --git a/Services/PostgresDb/Models/filter_functions.py b/Services/PostgresDb/Models/filter_functions.py
index 898a512..fd78621 100644
--- a/Services/PostgresDb/Models/filter_functions.py
+++ b/Services/PostgresDb/Models/filter_functions.py
@@ -500,27 +500,27 @@ class FilterAttributes:
return PostgresResponse(query=query, first=True)
- @classmethod
- def raise_http_exception(
- cls,
- status_code: str,
- error_case: str,
- data: Dict[str, Any],
- message: str,
- ) -> None:
- """
- Raise HTTP exception with formatted error details.
+ # @classmethod
+ # def raise_http_exception(
+ # cls,
+ # status_code: str,
+ # error_case: str,
+ # data: Dict[str, Any],
+ # message: str,
+ # ) -> None:
+ # """
+ # Raise HTTP exception with formatted error details.
- Args:
- status_code: HTTP status code string
- error_case: Error type
- data: Additional error data
- message: Error message
+ # Args:
+ # status_code: HTTP status code string
+ # error_case: Error type
+ # data: Additional error data
+ # message: Error message
- Raises:
- HTTPException: With formatted error details
- """
- raise HTTPExceptionApi(
- error_code="HTTP_304_NOT_MODIFIED",
- lang=cls.lang or "tr",
- )
+ # Raises:
+ # HTTPException: With formatted error details
+ # """
+ # raise HTTPExceptionApi(
+ # error_code="HTTP_304_NOT_MODIFIED",
+ # lang=cls.lang or "tr",
+ # )
diff --git a/Services/PostgresDb/Models/how_to.py b/Services/PostgresDb/Models/how_to.py
deleted file mode 100644
index 746dfa6..0000000
--- a/Services/PostgresDb/Models/how_to.py
+++ /dev/null
@@ -1,190 +0,0 @@
-"""
-How to Use PostgreSQL Models
-
-This module provides examples of how to use the base models and database sessions
-effectively in your application.
-"""
-
-from datetime import datetime
-from typing import Optional, List
-from sqlalchemy import String, Integer
-from sqlalchemy.orm import Mapped, mapped_column
-
-from Services.PostgresDb import CrudCollection
-from Services.PostgresDb.database import get_db
-
-
-# Example Model Definition
-class User(CrudCollection):
- """Example user model demonstrating CrudCollection usage."""
-
- __tablename__ = "users"
-
- # Additional fields (id and other common fields come from CrudCollection)
- username: Mapped[str] = mapped_column(String(50), unique=True, index=True)
- email: Mapped[str] = mapped_column(String(100), unique=True)
- age: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
-
-
-# Example Usage
-
-
-def example_create():
- """Example of creating a new record."""
- with get_db() as db:
- # Create a new user
- user = User.find_or_create(
- db, username="john_doe", email="john@example.com", age=30
- )
- db.commit()
- return user
-
-
-def example_batch_create():
- """Example of creating multiple records in a single transaction."""
- with get_db() as db:
- try:
- # Create multiple users in one transaction
- users = []
- for i in range(3):
- user = User.find_or_create(
- db, username=f"user_{i}", email=f"user_{i}@example.com"
- )
- users.append(user)
-
- db.commit()
- return users
- except Exception:
- db.rollback()
- raise
-
-
-def example_update():
- """Example of updating a record."""
- with get_db() as db:
- # Find user and update
- user = db.query(User).filter(User.username == "john_doe").first()
- if user:
- user.update(db, email="john.doe@newdomain.com", age=31)
- db.commit()
- return user
-
-
-def example_soft_delete():
- """Example of soft deleting a record."""
- with get_db() as db:
- user = db.query(User).filter(User.username == "john_doe").first()
- if user:
- # This will set deleted=True instead of actually deleting the record
- user.update(db, deleted=True)
- db.commit()
-
-
-def example_query():
- """Example of querying records."""
- with get_db() as db:
- # Get active (non-deleted) users
- active_users = (
- db.query(User)
- .filter(User.active == True, User.deleted == False, User.age >= 18)
- .order_by(User.created_at.desc())
- .all()
- )
- return active_users
-
-
-def example_complex_transaction():
- """Example of a complex transaction with multiple operations."""
- with get_db() as db:
- try:
- # Multiple operations in single transaction
- user = User.find_or_create(db, username="new_user", email="new@example.com")
-
- # Update existing user
- other_user = db.query(User).filter(User.username == "old_user").first()
- if other_user:
- other_user.update(db, email="updated@example.com")
-
- # Soft delete another user
- deleted_user = db.query(User).filter(User.username == "to_delete").first()
- if deleted_user:
- deleted_user.update(db, deleted=True)
-
- # Commit all changes at once
- db.commit()
-
- except Exception:
- # Rollback all changes if any operation fails
- db.rollback()
- raise
-
-
-def example_serialization():
- """Example of serializing records to dictionaries."""
- with get_db() as db:
- user = db.query(User).first()
- if user:
- # Get all fields except specified ones
- dict_with_excludes = user.get_dict(exclude=["created_at", "updated_at"])
-
- # Get only specified fields
- dict_with_includes = user.get_dict(include=["id", "username", "email"])
-
- return {"excluded": dict_with_excludes, "included": dict_with_includes}
-
-
-def example_confirmation():
- """Example of confirming a record."""
- with get_db() as db:
- user = db.query(User).filter(User.username == "pending_user").first()
- if user:
- # Only update confirmation status
- user.update(db, is_confirmed=True)
- db.commit()
- return user
-
-
-# Example of error handling
-def example_error_handling():
- """Example of proper error handling."""
- with get_db() as db:
- try:
- # Attempt to create user
- user = User.find_or_create(
- db,
- username="existing_user", # This might cause unique constraint violation
- email="exists@example.com",
- )
- db.commit()
- return {"status": "success", "user": user.get_dict()}
-
- except Exception as e:
- db.rollback()
- return {
- "status": "error",
- "message": str(e),
- "error_type": e.__class__.__name__,
- }
-
-
-# Example of working with dates
-def example_date_handling():
- """Example of working with dates and expiry."""
- with get_db() as db:
- # Find records valid at current time
- current_users = (
- db.query(User)
- .filter(
- User.expiry_starts <= datetime.utcnow(),
- User.expiry_ends > datetime.utcnow(),
- )
- .all()
- )
-
- # Set expiry for a user
- user = db.query(User).first()
- if user:
- user.update(db, expiry_ends=datetime(2024, 12, 31, 23, 59, 59))
- db.commit()
-
- return current_users
diff --git a/Services/PostgresDb/Models/mixins.py b/Services/PostgresDb/Models/mixins.py
index 83ba31d..72c9d12 100644
--- a/Services/PostgresDb/Models/mixins.py
+++ b/Services/PostgresDb/Models/mixins.py
@@ -31,14 +31,16 @@ from sqlalchemy_mixins.repr import ReprMixin
from sqlalchemy_mixins.smartquery import SmartQueryMixin
from ApiLibrary import DateTimeLocal, system_arrow
-from Services.PostgresDb.database import Base
+from Services.PostgresDb.Models.base_model import BaseModel
from Services.PostgresDb.Models.filter_functions import FilterAttributes
# Type variable for class methods returning self
T = TypeVar("T", bound="CrudMixin")
-class CrudMixin(Base, SmartQueryMixin, SerializeMixin):
+class CrudMixin(
+ BaseModel, SmartQueryMixin, SerializeMixin, ReprMixin, FilterAttributes
+):
"""
Base mixin providing CRUD operations and common fields for PostgreSQL models.
@@ -426,7 +428,7 @@ class CrudMixin(Base, SmartQueryMixin, SerializeMixin):
return return_dict
-class BaseCollection(CrudMixin, ReprMixin):
+class BaseCollection(CrudMixin):
"""Base model class with minimal fields."""
__abstract__ = True
@@ -435,7 +437,7 @@ class BaseCollection(CrudMixin, ReprMixin):
id: Mapped[int] = mapped_column(primary_key=True)
-class CrudCollection(CrudMixin, SmartQueryMixin, FilterAttributes):
+class CrudCollection(CrudMixin):
"""
Full-featured model class with all common fields.
diff --git a/Services/PostgresDb/Models/response.py b/Services/PostgresDb/Models/response.py
index 00dbf68..001e320 100644
--- a/Services/PostgresDb/Models/response.py
+++ b/Services/PostgresDb/Models/response.py
@@ -70,7 +70,7 @@ class PostgresResponse(Generic[T]):
)
@property
- def first_item(self) -> Optional[T]:
+ def first(self) -> Optional[T]:
"""Get first result only."""
return self.data if self._first else (self.data[0] if self.data else None)
diff --git a/Services/PostgresDb/database.py b/Services/PostgresDb/database.py
index 8da967a..6f76499 100644
--- a/Services/PostgresDb/database.py
+++ b/Services/PostgresDb/database.py
@@ -1,33 +1,61 @@
from contextlib import contextmanager
+from functools import lru_cache
from typing import Generator
-from AllConfigs.SqlDatabase.configs import WagDatabase
from sqlalchemy import create_engine
-from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import sessionmaker, scoped_session, Session
+from AllConfigs.SqlDatabase.configs import WagDatabase
-engine_config: dict[str, object] = {
- "url": WagDatabase.DATABASE_URL,
- "pool_size": 20,
- "max_overflow": 10,
- "echo": True,
- "echo_pool": True,
- "isolation_level": "READ COMMITTED",
- "pool_pre_ping": True,
-}
-
-engine = create_engine(**engine_config)
-SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
+# Configure the database engine with proper pooling
+engine = create_engine(
+ WagDatabase.DATABASE_URL,
+ pool_pre_ping=True, # Verify connection before using
+ pool_size=20, # Maximum number of permanent connections
+ max_overflow=10, # Maximum number of additional connections
+ pool_recycle=3600, # Recycle connections after 1 hour
+ pool_timeout=30, # Wait up to 30 seconds for a connection
+ echo=False, # Set to True for debugging SQL queries
+)
Base = declarative_base()
+# Create a cached session factory
+@lru_cache()
+def get_session_factory() -> scoped_session:
+ """Create a thread-safe session factory."""
+ SessionLocal = sessionmaker(
+ bind=engine,
+ autocommit=False,
+ autoflush=False,
+ expire_on_commit=False, # Prevent expired object issues
+ )
+ return scoped_session(SessionLocal)
+
+
@contextmanager
def get_db() -> Generator[Session, None, None]:
- """Get database session with context management."""
- db = SessionLocal()
+ """Get database session with proper connection management.
+
+ This context manager ensures:
+ - Proper connection pooling
+ - Session cleanup
+ - Connection return to pool
+ - Thread safety
+
+ Yields:
+ Session: SQLAlchemy session object
+ """
+ session_factory = get_session_factory()
+ session = session_factory()
try:
- yield db
+ yield session
+ session.commit()
+ except Exception:
+ session.rollback()
+ raise
finally:
- db.close()
+ session.close()
+ session_factory.remove() # Clean up the session from the registry
diff --git a/Services/PostgresDb/how_to.py b/Services/PostgresDb/how_to.py
new file mode 100644
index 0000000..e69de29
diff --git a/Services/Redis/Models/row.py b/Services/Redis/Models/row.py
index 9aad9a1..7fc83a8 100644
--- a/Services/Redis/Models/row.py
+++ b/Services/Redis/Models/row.py
@@ -16,4 +16,3 @@ class AccessToken(BaseModel):
@property
def delimiter(self):
return "*"
-
diff --git a/Services/pagination.py b/Services/pagination.py
new file mode 100644
index 0000000..fe94e3f
--- /dev/null
+++ b/Services/pagination.py
@@ -0,0 +1,102 @@
+from typing import Any, Dict, List, Union, Protocol
+from dataclasses import dataclass
+from sqlalchemy.orm import Query
+
+from Services.PostgresDb.Models.response import PostgresResponse
+
+
+class DataValidator(Protocol):
+ """Protocol for data validation methods."""
+
+ @staticmethod
+ def validate_data(data: Any, cls_object: Any) -> None:
+ """Validate data and raise HTTPExceptionApi if invalid."""
+ ...
+
+
+@dataclass
+class PaginationConfig:
+ """
+ Configuration for pagination settings.
+
+ Attributes:
+ page: Current page number (default: 1)
+ size: Items per page (default: 10)
+ order_field: Field to order by (default: "id")
+ order_type: Order direction (default: "asc")
+ """
+
+ page: int = 1
+ size: int = 10
+ order_field: str = "id"
+ order_type: str = "asc"
+
+
+class Pagination:
+ """
+ Handles pagination logic for query results.
+
+ Manages page size, current page, ordering, and calculates total pages
+ and items based on the data source.
+
+ Attributes:
+ DEFAULT_SIZE: Default number of items per page (10)
+ MIN_SIZE: Minimum allowed page size (10)
+ MAX_SIZE: Maximum allowed page size (40)
+ """
+
+ DEFAULT_SIZE = 10
+ MIN_SIZE = 10
+ MAX_SIZE = 40
+
+ def __init__(self):
+ self.size: int = self.DEFAULT_SIZE
+ self.page: int = 1
+ self.order_field: str = "id"
+ self.order_type: str = "asc"
+ self.page_count: int = 1
+ self.total_count: int = 0
+ self.total_pages: int = 1
+
+ def change(self, config: PaginationConfig) -> None:
+ """Update pagination settings from config."""
+ self.size = (
+ config.size
+ if self.MIN_SIZE <= config.size <= self.MAX_SIZE
+ else self.DEFAULT_SIZE
+ )
+ self.page = config.page
+ self.order_field = config.order_field
+ self.order_type = config.order_type
+ self._update_page_counts()
+
+ def feed(self, data: Union[List[Any], PostgresResponse, Query]) -> None:
+ """Calculate pagination based on data source."""
+ self.total_count = (
+ len(data)
+ if isinstance(data, list)
+ else data.count if isinstance(data, PostgresResponse) else data.count()
+ )
+ self._update_page_counts()
+
+ def _update_page_counts(self) -> None:
+ """Update page counts and validate current page."""
+ self.total_pages = max(1, (self.total_count + self.size - 1) // self.size)
+ self.page = max(1, min(self.page, self.total_pages))
+ self.page_count = (
+ self.total_count % self.size
+ if self.page == self.total_pages and self.total_count % self.size
+ else self.size
+ )
+
+ def as_dict(self) -> Dict[str, Any]:
+ """Convert pagination state to dictionary format."""
+ return {
+ "size": self.size,
+ "page": self.page,
+ "totalCount": self.total_count,
+ "totalPages": self.total_pages,
+ "pageCount": self.page_count,
+ "orderField": self.order_field,
+ "orderType": self.order_type,
+ }
diff --git a/Ztest/Dockerfile b/Ztest/Dockerfile
new file mode 100644
index 0000000..470e2b2
--- /dev/null
+++ b/Ztest/Dockerfile
@@ -0,0 +1,46 @@
+# Use Python 3.9 as base image
+FROM python:3.9-slim
+
+# Set environment variables
+ENV PYTHONUNBUFFERED=1 \
+ PYTHONDONTWRITEBYTECODE=1 \
+ PIP_NO_CACHE_DIR=1 \
+ POETRY_VERSION=1.7.1 \
+ POETRY_HOME="/opt/poetry" \
+ POETRY_VIRTUALENVS_CREATE=false \
+ PYTHONPATH=/app
+
+# Add Poetry to PATH
+ENV PATH="$POETRY_HOME/bin:$PATH"
+
+# Install system dependencies
+RUN apt-get update \
+ && apt-get install -y --no-install-recommends \
+ curl \
+ build-essential \
+ libpq-dev \
+ postgresql-client \
+ && rm -rf /var/lib/apt/lists/*
+
+# Install Poetry
+RUN curl -sSL https://install.python-poetry.org | python3 -
+
+# Set working directory
+WORKDIR /app
+
+# Copy project files
+COPY pyproject.toml poetry.lock* ./
+
+# Install dependencies
+RUN poetry install --no-root --no-interaction --no-ansi
+
+# Copy required directories
+COPY Ztest/ ./Ztest/
+COPY Services/ ./Services/
+COPY AllConfigs/ ./AllConfigs/
+COPY ApiLibrary/ ./ApiLibrary/
+COPY ErrorHandlers/ ./ErrorHandlers/
+
+# Set entrypoint for running tests
+ENTRYPOINT ["poetry", "run", "pytest"]
+CMD ["-v", "--cov=Services", "Ztest/"]
diff --git a/test.py b/Ztest/test.py
similarity index 100%
rename from test.py
rename to Ztest/test.py
diff --git a/Ztest/test_mongo.py b/Ztest/test_mongo.py
new file mode 100644
index 0000000..71c733c
--- /dev/null
+++ b/Ztest/test_mongo.py
@@ -0,0 +1,110 @@
+"""Test MongoDB actions and models."""
+
+import pytest
+from pymongo import MongoClient
+
+from Services.MongoDb.Models.actions import MongoActions
+from Services.MongoDb.Models.action_models.domain import (
+ DomainData,
+ DomainDocumentCreate,
+ DomainDocumentUpdate
+)
+from AllConfigs.NoSqlDatabase.configs import MongoConfig
+
+
+@pytest.fixture
+def mongo_client():
+ """Create MongoDB test client."""
+ # Connect using configured credentials
+ client = MongoClient(MongoConfig.URL)
+ client.admin.command('ping') # Test connection
+ yield client
+ client.close()
+
+
+@pytest.fixture
+def mongo_actions(mongo_client):
+ """Create MongoActions instance for testing."""
+ if not mongo_client:
+ pytest.skip("MongoDB connection not available")
+
+ actions = MongoActions(
+ client=mongo_client,
+ database=MongoConfig.DATABASE_NAME,
+ company_uuid="test_company",
+ storage_reason="domains"
+ )
+ yield actions
+ try:
+ # Cleanup after tests
+ if actions.collection is not None:
+ actions.collection.drop()
+ except Exception as e:
+ print(f"Failed to cleanup test collection: {e}")
+
+
+def test_mongo_crud_operations(mongo_actions: MongoActions):
+ """Test CRUD operations with MongoActions."""
+
+ # Create test data
+ domain_data = DomainData(
+ user_uu_id="test_user",
+ main_domain="example.com",
+ other_domains_list=["old.com"]
+ )
+ create_doc = DomainDocumentCreate(data=domain_data)
+
+ # Test create
+ result = mongo_actions.insert_one(create_doc.model_dump())
+ assert result.inserted_id is not None
+
+ # Test read
+ doc = mongo_actions.find_one({"data.main_domain": "example.com"})
+ assert doc is not None
+ assert doc["data"]["main_domain"] == "example.com"
+
+ # Test update
+ update_data = DomainData(
+ user_uu_id="test_user",
+ main_domain="new.com",
+ other_domains_list=["example.com", "old.com"]
+ )
+ update_doc = DomainDocumentUpdate(data=update_data)
+ result = mongo_actions.update_one(
+ {"_id": doc["_id"]},
+ {"$set": update_doc.model_dump()}
+ )
+ assert result.modified_count == 1
+
+ # Test delete
+ result = mongo_actions.delete_one({"_id": doc["_id"]})
+ assert result.deleted_count == 1
+
+
+def test_mongo_aggregate(mongo_actions: MongoActions):
+ """Test aggregate operations with MongoActions."""
+
+ # Insert test documents
+ docs = [
+ DomainDocumentCreate(
+ data=DomainData(
+ user_uu_id="user1",
+ main_domain=f"domain{i}.com"
+ )
+ ).model_dump()
+ for i in range(3)
+ ]
+ mongo_actions.insert_many(docs)
+
+ # Test aggregation
+ pipeline = [
+ {"$group": {"_id": "$data.user_uu_id", "count": {"$sum": 1}}}
+ ]
+ result = mongo_actions.aggregate(pipeline)
+ result_list = list(result)
+ assert len(result_list) == 1
+ assert result_list[0]["count"] == 3
+
+
+if __name__ == "__main__":
+ pytest.main([__file__, "-v"])
\ No newline at end of file
diff --git a/Ztest/test_postgres.py b/Ztest/test_postgres.py
new file mode 100644
index 0000000..b6fa7c0
--- /dev/null
+++ b/Ztest/test_postgres.py
@@ -0,0 +1,108 @@
+"""Test PostgreSQL database operations."""
+
+import pytest
+from sqlalchemy import Column, String, create_engine, text
+from sqlalchemy.orm import Session
+
+from Services.PostgresDb.database import Base, get_db
+from Services.PostgresDb.Models.mixins import CrudCollection
+from AllConfigs.SqlDatabase.configs import WagDatabase
+
+
+class TestUser(CrudCollection):
+ """Test user model for PostgreSQL tests."""
+
+ __tablename__ = "test_users"
+
+ username = Column(String, unique=True, index=True)
+ email = Column(String, unique=True, index=True)
+
+
+@pytest.fixture(scope="session")
+def db_engine():
+ """Create a test database engine."""
+ # Use the same database URL but with test database
+ test_db_url = WagDatabase.DATABASE_URL
+ engine = create_engine(test_db_url, echo=True)
+
+ # Create all tables
+ Base.metadata.create_all(bind=engine)
+
+ yield engine
+
+ # Drop all tables after tests
+ Base.metadata.drop_all(bind=engine)
+
+
+@pytest.fixture
+def db_session(db_engine):
+ """Create a test database session."""
+ connection = db_engine.connect()
+ transaction = connection.begin()
+ session = Session(bind=connection)
+
+ yield session
+
+ # Rollback the transaction after each test
+ transaction.rollback()
+ connection.close()
+ session.close()
+
+
+def test_create_user(db_session):
+ """Test creating a user in the database."""
+ # Create user using CrudMixin methods
+ user = TestUser(username="testuser", email="test@example.com")
+ db_session.add(user)
+ db_session.commit()
+
+ # Verify user was created
+ db_user = db_session.query(TestUser).filter_by(username="testuser").first()
+ assert db_user is not None
+ assert db_user.email == "test@example.com"
+ assert db_user.created_at is not None
+ assert not db_user.deleted
+ assert db_user.active
+
+
+def test_update_user(db_session):
+ """Test updating a user in the database."""
+ # Create user
+ user = TestUser(username="updateuser", email="update@example.com")
+ db_session.add(user)
+ db_session.commit()
+
+ # Update user using CrudMixin methods
+ user.update(session=db_session, email="newemail@example.com")
+ db_session.commit()
+
+ # Verify update
+ updated_user = db_session.query(TestUser).filter_by(username="updateuser").first()
+ assert updated_user.email == "newemail@example.com"
+ assert updated_user.updated_at is not None
+
+
+def test_soft_delete_user(db_session):
+ """Test soft deleting a user from the database."""
+ # Create user
+ user = TestUser(username="deleteuser", email="delete@example.com")
+ db_session.add(user)
+ db_session.commit()
+
+ # Soft delete by updating deleted and active flags
+ user.update(session=db_session, deleted=True, active=False)
+ db_session.commit()
+
+ # Verify soft deletion
+ deleted_user = db_session.query(TestUser).filter_by(username="deleteuser").first()
+ assert deleted_user is not None
+ assert deleted_user.deleted
+ assert not deleted_user.active
+
+
+def test_get_db_context_manager():
+ """Test the get_db context manager."""
+ with get_db() as session:
+ # Verify we can execute a simple query
+ result = session.execute(text("SELECT 1"))
+ assert result.scalar() == 1
\ No newline at end of file
diff --git a/docker-compose.test.yml b/docker-compose.test.yml
new file mode 100644
index 0000000..c3e696f
--- /dev/null
+++ b/docker-compose.test.yml
@@ -0,0 +1,8 @@
+services:
+ test:
+ build:
+ context: .
+ dockerfile: Ztest/Dockerfile
+ volumes:
+ - .:/app
+ network_mode: "host"
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..228fa42
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,76 @@
+[tool.poetry]
+name = "wag-management-api-services"
+version = "0.1.1"
+description = "WAG Management API Service"
+authors = ["Karatay Berkay "]
+
+[tool.poetry.dependencies]
+python = "^3.9"
+# FastAPI and Web
+fastapi = "^0.104.1"
+uvicorn = "^0.24.0"
+pydantic = "^2.5.2"
+
+# MongoDB
+motor = "3.3.2" # Pinned version
+pymongo = "4.5.0" # Pinned version to match motor
+
+# PostgreSQL
+sqlalchemy = "^2.0.23"
+sqlalchemy-mixins = "^2.0.5"
+psycopg2-binary = "^2.9.9"
+
+# Redis
+redis = "^5.0.1"
+arrow = "^1.3.0"
+
+# Email
+redmail = "^0.6.0"
+
+# Testing
+pytest = "^7.4.3"
+pytest-asyncio = "^0.21.1"
+pytest-cov = "^4.1.0"
+
+# Utilities
+python-dateutil = "^2.8.2"
+typing-extensions = "^4.8.0"
+
+[tool.poetry.group.dev.dependencies]
+black = "^23.11.0"
+isort = "^5.12.0"
+mypy = "^1.7.1"
+flake8 = "^6.1.0"
+
+[build-system]
+requires = ["poetry-core>=1.0.0"]
+build-backend = "poetry.core.masonry.api"
+
+[tool.black]
+line-length = 88
+target-version = ['py39']
+include = '\.pyi?$'
+
+[tool.isort]
+profile = "black"
+multi_line_output = 3
+include_trailing_comma = true
+force_grid_wrap = 0
+use_parentheses = true
+line_length = 88
+
+[tool.mypy]
+python_version = "3.9"
+warn_return_any = true
+warn_unused_configs = true
+disallow_untyped_defs = true
+check_untyped_defs = true
+
+[tool.pytest.ini_options]
+minversion = "6.0"
+addopts = "-ra -q --cov=Services"
+testpaths = [
+ "Ztest",
+]
+python_files = ["test_*.py"]
+asyncio_mode = "auto"
diff --git a/scripts/dev.sh b/scripts/dev.sh
new file mode 100755
index 0000000..b2af983
--- /dev/null
+++ b/scripts/dev.sh
@@ -0,0 +1,154 @@
+#!/bin/bash
+
+# Colors for pretty output
+RED='\033[0;31m'
+GREEN='\033[0;32m'
+YELLOW='\033[1;33m'
+NC='\033[0m' # No Color
+
+# Error handling
+set -e
+trap 'last_command=$current_command; current_command=$BASH_COMMAND' DEBUG
+trap 'echo -e "${RED}\"${last_command}\" command failed with exit code $?.${NC}"' EXIT
+
+# Logging function
+log() {
+ echo -e "${GREEN}[$(date +'%Y-%m-%d %H:%M:%S')]${NC} $1"
+}
+
+# Check if a command exists
+check_command() {
+ if ! command -v "$1" &> /dev/null; then
+ echo -e "${YELLOW}$1 not found${NC}"
+ return 1
+ fi
+ return 0
+}
+
+# Check system requirements
+check_requirements() {
+ log "Checking system requirements..."
+
+ # Check Python version
+ if ! check_command python3; then
+ log "${RED}Python 3 is required but not found${NC}"
+ exit 1
+ fi
+
+ # Check if Docker Compose is installed
+ if ! check_command docker-compose; then
+ log "${YELLOW}Warning: Docker Compose not found. You'll need it later for running the application${NC}"
+ fi
+
+ log "${GREEN}System requirements check completed${NC}"
+}
+
+# Check if poetry is installed
+check_poetry() {
+ if ! command -v poetry &> /dev/null; then
+ log "${YELLOW}Poetry not found. Installing...${NC}"
+ curl -sSL https://install.python-poetry.org | python3 -
+ fi
+}
+
+# Setup development environment
+setup_dev() {
+ log "Setting up development environment..."
+ check_requirements
+ check_poetry
+ poetry install
+ log "${GREEN}Development environment setup complete!${NC}"
+}
+
+# Format code
+format_code() {
+ log "Formatting code..."
+ poetry run black Services/
+ poetry run isort Services/
+}
+
+# Run type checking
+check_types() {
+ log "Running type checks..."
+ poetry run mypy Services/
+}
+
+# Run linting
+lint_code() {
+ log "Running linter..."
+ poetry run flake8 Services/
+}
+
+# Run all checks
+check_all() {
+ format_code
+ lint_code
+ check_types
+}
+
+# Clean up pyc files and cache
+clean() {
+ log "Cleaning up python cache files..."
+ find . -type f -name "*.pyc" -delete
+ find . -type d -name "__pycache__" -delete
+ find . -type d -name ".pytest_cache" -delete
+ find . -type d -name ".mypy_cache" -delete
+}
+
+# Update dependencies
+update_deps() {
+ log "Updating dependencies..."
+ poetry update
+}
+
+# Show help
+show_help() {
+ echo -e "${GREEN}Available commands:${NC}"
+ echo "setup - Setup development environment"
+ echo "check-req - Check system requirements"
+ echo "format - Format code with black and isort"
+ echo "lint - Run flake8 linter"
+ echo "types - Run mypy type checker"
+ echo "check - Run all checks (format, lint, types)"
+ echo "clean - Clean up cache files"
+ echo "update - Update dependencies"
+}
+
+# Main command handler
+case "$1" in
+ "setup")
+ setup_dev
+ ;;
+ "check-req")
+ check_requirements
+ ;;
+ "format")
+ format_code
+ ;;
+ "lint")
+ lint_code
+ ;;
+ "types")
+ check_types
+ ;;
+ "check")
+ check_all
+ ;;
+ "clean")
+ clean
+ ;;
+ "update")
+ update_deps
+ ;;
+ "help"|"")
+ show_help
+ ;;
+ *)
+ echo -e "${RED}Unknown command: $1${NC}"
+ show_help
+ exit 1
+ ;;
+esac
+
+# Remove error handling trap
+trap - EXIT