wag-managment-api-service-v.../Services/PostgresDb/Models/mixins.py

548 lines
17 KiB
Python

"""
PostgreSQL Base Models Module
This module provides base classes for PostgreSQL models with common functionality such as:
- CRUD operations with session management
- Soft delete capability
- Automatic timestamps
- User tracking (created_by, updated_by)
- Data serialization
- Multi-language support
"""
import datetime
from decimal import Decimal
from typing import Any, Dict, List, Optional, Type, TypeVar, Union, cast
from sqlalchemy import (
TIMESTAMP,
NUMERIC,
func,
text,
UUID,
String,
Integer,
Boolean,
SmallInteger,
)
from sqlalchemy.orm import Mapped, mapped_column, Session
from sqlalchemy_mixins.serialize import SerializeMixin
from sqlalchemy_mixins.repr import ReprMixin
from sqlalchemy_mixins.smartquery import SmartQueryMixin
from ApiLibrary import DateTimeLocal, system_arrow
from Services.Postgres.database import Base
from Services.PostgresDb.Models.filter_functions import FilterAttributes
# Type variable for class methods returning self
T = TypeVar("T", bound="CrudMixin")
class CrudMixin(Base, SmartQueryMixin, SerializeMixin):
"""
Base mixin providing CRUD operations and common fields for PostgreSQL models.
Features:
- Automatic timestamps (created_at, updated_at)
- Soft delete capability
- User tracking (created_by, updated_by)
- Data serialization
- Multi-language support
"""
__abstract__ = True
# System fields that should be handled automatically during creation
__system__fields__create__ = (
"created_at",
"updated_at",
"cryp_uu_id",
"created_by",
"created_by_id",
"updated_by",
"updated_by_id",
"replication_id",
"confirmed_by",
"confirmed_by_id",
"is_confirmed",
"deleted",
"active",
"is_notification_send",
"is_email_send",
)
# System fields that should be handled automatically during updates
__system__fields__update__ = (
"cryp_uu_id",
"created_at",
"updated_at",
"created_by",
"created_by_id",
"confirmed_by",
"confirmed_by_id",
"updated_by",
"updated_by_id",
"replication_id",
)
# Default fields to exclude from serialization
__system_default_model__ = [
"cryp_uu_id",
"is_confirmed",
"deleted",
"is_notification_send",
"replication_id",
"is_email_send",
"confirmed_by_id",
"confirmed_by",
"updated_by_id",
"created_by_id",
]
# User credentials and preferences
creds = None
lang: str = "tr"
client_arrow: Optional[DateTimeLocal] = None
valid_record_dict: Dict[str, bool] = {"active": True, "deleted": False}
meta_data: Dict[str, Any] = {}
# Common timestamp fields for all models
expiry_starts: Mapped[TIMESTAMP] = mapped_column(
TIMESTAMP(timezone=True),
server_default=func.now(),
nullable=False,
comment="Record validity start timestamp",
)
expiry_ends: Mapped[TIMESTAMP] = mapped_column(
TIMESTAMP(timezone=True),
default="2099-12-31",
server_default="2099-12-31",
comment="Record validity end timestamp",
)
@classmethod
def set_user_define_properties(cls, token: Any) -> None:
"""
Set user-specific properties from the authentication token.
Args:
token: Authentication token containing user preferences
"""
cls.creds = token.credentials
cls.client_arrow = DateTimeLocal(is_client=True, timezone=token.timezone)
cls.lang = str(token.lang).lower()
@classmethod
def remove_non_related_inputs(cls, kwargs: Dict[str, Any]) -> Dict[str, Any]:
"""
Filter out inputs that don't correspond to model fields.
Args:
kwargs: Dictionary of field names and values
Returns:
Dictionary containing only valid model fields
"""
return {
key: value
for key, value in kwargs.items()
if key in cls.columns + cls.hybrid_properties + cls.settable_relations
}
@classmethod
def extract_system_fields(
cls, filter_kwargs: dict, create: bool = True
) -> Dict[str, Any]:
"""
Remove system-managed fields from input dictionary.
Args:
filter_kwargs: Input dictionary of fields
create: If True, use creation field list, else use update field list
Returns:
Dictionary with system fields removed
"""
system_fields = filter_kwargs.copy()
extract_fields = (
cls.__system__fields__create__ if create else cls.__system__fields__update__
)
for field in extract_fields:
system_fields.pop(field, None)
return system_fields
@classmethod
def iterate_over_variables(cls, val: Any, key: str) -> tuple[bool, Optional[Any]]:
"""
Process a field value based on its type and convert it to the appropriate format.
Args:
val: Field value
key: Field name
Returns:
Tuple of (should_include, processed_value)
"""
key_ = cls.__annotations__.get(key, None)
is_primary = key in cls.primary_keys
row_attr = bool(getattr(getattr(cls, key), "foreign_keys", None))
# Skip primary keys and foreign keys
if is_primary or row_attr:
return False, None
# Handle None values
if val is None:
return True, None
# Special handling for UUID fields
if str(key[-5:]).lower() == "uu_id":
return True, str(val)
# Handle typed fields
if key_:
if key_ == Mapped[int]:
return True, int(val)
elif key_ == Mapped[bool]:
return True, bool(val)
elif key_ == Mapped[float] or key_ == Mapped[NUMERIC]:
return True, round(float(val), 3)
elif key_ == Mapped[TIMESTAMP]:
return True, str(
cls.client_arrow.get(str(val)).format("DD-MM-YYYY HH:mm:ss +0")
)
elif key_ == Mapped[str]:
return True, str(val)
# Handle based on Python types
else:
if isinstance(val, datetime.datetime):
return True, str(
cls.client_arrow.get(str(val)).format("DD-MM-YYYY HH:mm:ss +0")
)
elif isinstance(val, bool):
return True, bool(val)
elif isinstance(val, (float, Decimal)):
return True, round(float(val), 3)
elif isinstance(val, int):
return True, int(val)
elif isinstance(val, str):
return True, str(val)
elif val is None:
return True, None
return False, None
@classmethod
def find_or_create(cls: Type[T], db: Session, **kwargs) -> T:
"""
Find an existing record matching the criteria or create a new one.
Args:
db: Database session
**kwargs: Search/creation criteria
Returns:
Existing or newly created record
"""
check_kwargs = cls.extract_system_fields(kwargs)
# Search for existing record
query = db.query(cls).filter(
cls.expiry_ends > str(system_arrow.now()),
cls.expiry_starts <= str(system_arrow.now()),
)
for key, value in check_kwargs.items():
if hasattr(cls, key):
query = query.filter(getattr(cls, key) == value)
already_record = query.first()
# Handle existing record
if already_record:
if already_record.deleted:
already_record.meta_data = {
"created": False,
"error_case": "DeletedRecord",
"message": "",
}
return already_record
elif not already_record.is_confirmed:
already_record.meta_data = {
"created": False,
"error_case": "IsNotConfirmed",
"message": "",
}
return already_record
already_record.meta_data = {
"created": False,
"error_case": "AlreadyExists",
"message": "",
}
return already_record
# Create new record
check_kwargs = cls.remove_non_related_inputs(check_kwargs)
created_record = cls()
for key, value in check_kwargs.items():
setattr(created_record, key, value)
if getattr(cls.creds, "person_id", None) and getattr(
cls.creds, "person_name", None
):
created_record.created_by_id = cls.creds.person_id
created_record.created_by = cls.creds.person_name
db.add(created_record)
db.flush()
created_record.meta_data = {"created": True, "error_case": None, "message": ""}
return created_record
def update(self, db: Session, **kwargs) -> "CrudMixin":
"""
Update the record with new values.
Args:
db: Database session
**kwargs: Fields to update
Returns:
Updated record
Raises:
ValueError: If attempting to update is_confirmed with other fields
"""
check_kwargs = self.remove_non_related_inputs(kwargs)
is_confirmed_argument = kwargs.get("is_confirmed", None)
if is_confirmed_argument and not len(kwargs) == 1:
raise ValueError("Confirm field cannot be updated with other fields")
check_kwargs = self.extract_system_fields(check_kwargs, create=False)
for key, value in check_kwargs.items():
setattr(self, key, value)
# Update confirmation or modification tracking
if is_confirmed_argument:
if getattr(self.creds, "person_id", None) and getattr(
self.creds, "person_name", None
):
self.confirmed_by_id = self.creds.person_id
self.confirmed_by = self.creds.person_name
else:
if getattr(self.creds, "person_id", None) and getattr(
self.creds, "person_name", None
):
self.updated_by_id = self.creds.person_id
self.updated_by = self.creds.person_name
db.flush()
return self
def get_dict(
self,
exclude: Optional[List[str]] = None,
include: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""
Convert model instance to dictionary with customizable fields.
Args:
exclude: List of fields to exclude
include: List of fields to include (takes precedence over exclude)
Returns:
Dictionary representation of the model
"""
return_dict: Dict[str, Any] = {}
if include:
# Handle explicitly included fields
exclude_list = [
element
for element in self.__system_default_model__
if str(element)[-2:] == "id" and str(element)[-5:].lower() == "uu_id"
]
columns_include_list = list(set(include).difference(set(exclude_list)))
columns_include_list.extend(["uu_id"])
for key in columns_include_list:
val = getattr(self, key)
correct, value_of_database = self.iterate_over_variables(val, key)
if correct:
return_dict[key] = value_of_database
elif exclude:
# Handle explicitly excluded fields
exclude.extend(
list(
set(getattr(self, "__exclude__fields__", []) or []).difference(
exclude
)
)
)
exclude.extend(
[
element
for element in self.__system_default_model__
if str(element)[-2:] == "id"
]
)
columns_excluded_list = list(set(self.columns).difference(set(exclude)))
columns_excluded_list.extend(["uu_id", "active"])
for key in columns_excluded_list:
val = getattr(self, key)
correct, value_of_database = self.iterate_over_variables(val, key)
if correct:
return_dict[key] = value_of_database
else:
# Handle default field selection
exclude_list = (
getattr(self, "__exclude__fields__", []) or []
) + self.__system_default_model__
columns_list = list(set(self.columns).difference(set(exclude_list)))
columns_list = [col for col in columns_list if str(col)[-2:] != "id"]
columns_list.extend(
[col for col in self.columns if str(col)[-5:].lower() == "uu_id"]
)
for remove_field in self.__system_default_model__:
if remove_field in columns_list:
columns_list.remove(remove_field)
for key in columns_list:
val = getattr(self, key)
correct, value_of_database = self.iterate_over_variables(val, key)
if correct:
return_dict[key] = value_of_database
return return_dict
class BaseCollection(CrudMixin, ReprMixin):
"""Base model class with minimal fields."""
__abstract__ = True
__repr__ = ReprMixin.__repr__
id: Mapped[int] = mapped_column(primary_key=True)
class CrudCollection(CrudMixin, SmartQueryMixin, FilterAttributes):
"""
Full-featured model class with all common fields.
Includes:
- UUID and reference ID
- Timestamps
- User tracking
- Confirmation status
- Soft delete
- Notification flags
"""
__abstract__ = True
__repr__ = ReprMixin.__repr__
# Primary and reference fields
id: Mapped[int] = mapped_column(primary_key=True)
uu_id: Mapped[str] = mapped_column(
UUID,
server_default=text("gen_random_uuid()"),
index=True,
unique=True,
comment="Unique identifier UUID",
)
ref_id: Mapped[str] = mapped_column(
String(100), nullable=True, index=True, comment="External reference ID"
)
# Timestamps
created_at: Mapped[TIMESTAMP] = mapped_column(
TIMESTAMP(timezone=True),
server_default=func.now(),
nullable=False,
index=True,
comment="Record creation timestamp",
)
updated_at: Mapped[TIMESTAMP] = mapped_column(
TIMESTAMP(timezone=True),
server_default=func.now(),
onupdate=func.now(),
nullable=False,
index=True,
comment="Last update timestamp",
)
# Cryptographic and user tracking
cryp_uu_id: Mapped[str] = mapped_column(
String, nullable=True, index=True, comment="Cryptographic UUID"
)
created_by: Mapped[str] = mapped_column(
String, nullable=True, comment="Creator name"
)
created_by_id: Mapped[int] = mapped_column(
Integer, nullable=True, comment="Creator ID"
)
updated_by: Mapped[str] = mapped_column(
String, nullable=True, comment="Last modifier name"
)
updated_by_id: Mapped[int] = mapped_column(
Integer, nullable=True, comment="Last modifier ID"
)
confirmed_by: Mapped[str] = mapped_column(
String, nullable=True, comment="Confirmer name"
)
confirmed_by_id: Mapped[int] = mapped_column(
Integer, nullable=True, comment="Confirmer ID"
)
# Status flags
is_confirmed: Mapped[bool] = mapped_column(
Boolean, server_default="0", comment="Record confirmation status"
)
replication_id: Mapped[int] = mapped_column(
SmallInteger, server_default="0", comment="Replication identifier"
)
deleted: Mapped[bool] = mapped_column(
Boolean, server_default="0", comment="Soft delete flag"
)
active: Mapped[bool] = mapped_column(
Boolean, server_default="1", comment="Record active status"
)
is_notification_send: Mapped[bool] = mapped_column(
Boolean, server_default="0", comment="Notification sent flag"
)
is_email_send: Mapped[bool] = mapped_column(
Boolean, server_default="0", comment="Email sent flag"
)
@classmethod
def retrieve_language_model(cls, lang: str, response_model: Any) -> Dict[str, str]:
"""
Retrieve language-specific model headers and validation messages.
Args:
lang: Language code
response_model: Model containing language annotations
Returns:
Dictionary of field names to localized headers
"""
headers_and_validation = {}
__language_model__ = getattr(cls.__language_model__, lang, "tr")
for field in response_model.__annotations__.keys():
headers_and_validation[field] = getattr(
__language_model__, field, "Lang Not found"
)
return headers_and_validation