550 lines
17 KiB
Python
550 lines
17 KiB
Python
"""
|
|
PostgreSQL Base Models Module
|
|
|
|
This module provides base classes for PostgreSQL models with common functionality such as:
|
|
- CRUD operations with session management
|
|
- Soft delete capability
|
|
- Automatic timestamps
|
|
- User tracking (created_by, updated_by)
|
|
- Data serialization
|
|
- Multi-language support
|
|
"""
|
|
|
|
import datetime
|
|
from decimal import Decimal
|
|
from typing import Any, Dict, List, Optional, Type, TypeVar, Union, cast
|
|
|
|
from sqlalchemy import (
|
|
TIMESTAMP,
|
|
NUMERIC,
|
|
func,
|
|
text,
|
|
UUID,
|
|
String,
|
|
Integer,
|
|
Boolean,
|
|
SmallInteger,
|
|
)
|
|
from sqlalchemy.orm import Mapped, mapped_column, Session
|
|
from sqlalchemy_mixins.serialize import SerializeMixin
|
|
from sqlalchemy_mixins.repr import ReprMixin
|
|
from sqlalchemy_mixins.smartquery import SmartQueryMixin
|
|
|
|
from ApiLibrary import DateTimeLocal, system_arrow
|
|
from Services.PostgresDb.Models.base_model import BaseModel
|
|
from Services.PostgresDb.Models.filter_functions import FilterAttributes
|
|
|
|
# Type variable for class methods returning self
|
|
T = TypeVar("T", bound="CrudMixin")
|
|
|
|
|
|
class CrudMixin(
|
|
BaseModel, SmartQueryMixin, SerializeMixin, ReprMixin, FilterAttributes
|
|
):
|
|
"""
|
|
Base mixin providing CRUD operations and common fields for PostgreSQL models.
|
|
|
|
Features:
|
|
- Automatic timestamps (created_at, updated_at)
|
|
- Soft delete capability
|
|
- User tracking (created_by, updated_by)
|
|
- Data serialization
|
|
- Multi-language support
|
|
"""
|
|
|
|
__abstract__ = True
|
|
|
|
# System fields that should be handled automatically during creation
|
|
__system__fields__create__ = (
|
|
"created_at",
|
|
"updated_at",
|
|
"cryp_uu_id",
|
|
"created_by",
|
|
"created_by_id",
|
|
"updated_by",
|
|
"updated_by_id",
|
|
"replication_id",
|
|
"confirmed_by",
|
|
"confirmed_by_id",
|
|
"is_confirmed",
|
|
"deleted",
|
|
"active",
|
|
"is_notification_send",
|
|
"is_email_send",
|
|
)
|
|
|
|
# System fields that should be handled automatically during updates
|
|
__system__fields__update__ = (
|
|
"cryp_uu_id",
|
|
"created_at",
|
|
"updated_at",
|
|
"created_by",
|
|
"created_by_id",
|
|
"confirmed_by",
|
|
"confirmed_by_id",
|
|
"updated_by",
|
|
"updated_by_id",
|
|
"replication_id",
|
|
)
|
|
|
|
# Default fields to exclude from serialization
|
|
__system_default_model__ = [
|
|
"cryp_uu_id",
|
|
"is_confirmed",
|
|
"deleted",
|
|
"is_notification_send",
|
|
"replication_id",
|
|
"is_email_send",
|
|
"confirmed_by_id",
|
|
"confirmed_by",
|
|
"updated_by_id",
|
|
"created_by_id",
|
|
]
|
|
|
|
# User credentials and preferences
|
|
creds = None
|
|
lang: str = "tr"
|
|
client_arrow: Optional[DateTimeLocal] = None
|
|
valid_record_dict: Dict[str, bool] = {"active": True, "deleted": False}
|
|
meta_data: Dict[str, Any] = {}
|
|
|
|
# Common timestamp fields for all models
|
|
expiry_starts: Mapped[TIMESTAMP] = mapped_column(
|
|
type_=TIMESTAMP(timezone=True),
|
|
server_default=func.now(),
|
|
nullable=False,
|
|
comment="Record validity start timestamp",
|
|
)
|
|
expiry_ends: Mapped[TIMESTAMP] = mapped_column(
|
|
type_=TIMESTAMP(timezone=True),
|
|
default="2099-12-31",
|
|
server_default="2099-12-31",
|
|
comment="Record validity end timestamp",
|
|
)
|
|
|
|
@classmethod
|
|
def set_user_define_properties(cls, token: Any) -> None:
|
|
"""
|
|
Set user-specific properties from the authentication token.
|
|
|
|
Args:
|
|
token: Authentication token containing user preferences
|
|
"""
|
|
cls.creds = token.credentials
|
|
cls.client_arrow = DateTimeLocal(is_client=True, timezone=token.timezone)
|
|
cls.lang = str(token.lang).lower()
|
|
|
|
@classmethod
|
|
def remove_non_related_inputs(cls, kwargs: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Filter out inputs that don't correspond to model fields.
|
|
|
|
Args:
|
|
kwargs: Dictionary of field names and values
|
|
|
|
Returns:
|
|
Dictionary containing only valid model fields
|
|
"""
|
|
return {
|
|
key: value
|
|
for key, value in kwargs.items()
|
|
if key in cls.columns + cls.hybrid_properties + cls.settable_relations
|
|
}
|
|
|
|
@classmethod
|
|
def extract_system_fields(
|
|
cls, filter_kwargs: dict, create: bool = True
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Remove system-managed fields from input dictionary.
|
|
|
|
Args:
|
|
filter_kwargs: Input dictionary of fields
|
|
create: If True, use creation field list, else use update field list
|
|
|
|
Returns:
|
|
Dictionary with system fields removed
|
|
"""
|
|
system_fields = filter_kwargs.copy()
|
|
extract_fields = (
|
|
cls.__system__fields__create__ if create else cls.__system__fields__update__
|
|
)
|
|
for field in extract_fields:
|
|
system_fields.pop(field, None)
|
|
return system_fields
|
|
|
|
@classmethod
|
|
def iterate_over_variables(cls, val: Any, key: str) -> tuple[bool, Optional[Any]]:
|
|
"""
|
|
Process a field value based on its type and convert it to the appropriate format.
|
|
|
|
Args:
|
|
val: Field value
|
|
key: Field name
|
|
|
|
Returns:
|
|
Tuple of (should_include, processed_value)
|
|
"""
|
|
key_ = cls.__annotations__.get(key, None)
|
|
is_primary = key in cls.primary_keys
|
|
row_attr = bool(getattr(getattr(cls, key), "foreign_keys", None))
|
|
|
|
# Skip primary keys and foreign keys
|
|
if is_primary or row_attr:
|
|
return False, None
|
|
|
|
# Handle None values
|
|
if val is None:
|
|
return True, None
|
|
|
|
# Special handling for UUID fields
|
|
if str(key[-5:]).lower() == "uu_id":
|
|
return True, str(val)
|
|
|
|
# Handle typed fields
|
|
if key_:
|
|
if key_ == Mapped[int]:
|
|
return True, int(val)
|
|
elif key_ == Mapped[bool]:
|
|
return True, bool(val)
|
|
elif key_ == Mapped[float] or key_ == Mapped[NUMERIC]:
|
|
return True, round(float(val), 3)
|
|
elif key_ == Mapped[TIMESTAMP]:
|
|
return True, str(
|
|
cls.client_arrow.get(str(val)).format("DD-MM-YYYY HH:mm:ss +0")
|
|
)
|
|
elif key_ == Mapped[str]:
|
|
return True, str(val)
|
|
|
|
# Handle based on Python types
|
|
else:
|
|
if isinstance(val, datetime.datetime):
|
|
return True, str(
|
|
cls.client_arrow.get(str(val)).format("DD-MM-YYYY HH:mm:ss +0")
|
|
)
|
|
elif isinstance(val, bool):
|
|
return True, bool(val)
|
|
elif isinstance(val, (float, Decimal)):
|
|
return True, round(float(val), 3)
|
|
elif isinstance(val, int):
|
|
return True, int(val)
|
|
elif isinstance(val, str):
|
|
return True, str(val)
|
|
elif val is None:
|
|
return True, None
|
|
|
|
return False, None
|
|
|
|
@classmethod
|
|
def find_or_create(cls: Type[T], db: Session, **kwargs) -> T:
|
|
"""
|
|
Find an existing record matching the criteria or create a new one.
|
|
|
|
Args:
|
|
db: Database session
|
|
**kwargs: Search/creation criteria
|
|
|
|
Returns:
|
|
Existing or newly created record
|
|
"""
|
|
check_kwargs = cls.extract_system_fields(kwargs)
|
|
|
|
# Search for existing record
|
|
query = db.query(cls).filter(
|
|
cls.expiry_ends > str(system_arrow.now()),
|
|
cls.expiry_starts <= str(system_arrow.now()),
|
|
)
|
|
|
|
for key, value in check_kwargs.items():
|
|
if hasattr(cls, key):
|
|
query = query.filter(getattr(cls, key) == value)
|
|
|
|
already_record = query.first()
|
|
|
|
# Handle existing record
|
|
if already_record:
|
|
if already_record.deleted:
|
|
already_record.meta_data = {
|
|
"created": False,
|
|
"error_case": "DeletedRecord",
|
|
"message": "",
|
|
}
|
|
return already_record
|
|
elif not already_record.is_confirmed:
|
|
already_record.meta_data = {
|
|
"created": False,
|
|
"error_case": "IsNotConfirmed",
|
|
"message": "",
|
|
}
|
|
return already_record
|
|
|
|
already_record.meta_data = {
|
|
"created": False,
|
|
"error_case": "AlreadyExists",
|
|
"message": "",
|
|
}
|
|
return already_record
|
|
|
|
# Create new record
|
|
check_kwargs = cls.remove_non_related_inputs(check_kwargs)
|
|
created_record = cls()
|
|
|
|
for key, value in check_kwargs.items():
|
|
setattr(created_record, key, value)
|
|
|
|
if getattr(cls.creds, "person_id", None) and getattr(
|
|
cls.creds, "person_name", None
|
|
):
|
|
created_record.created_by_id = cls.creds.person_id
|
|
created_record.created_by = cls.creds.person_name
|
|
|
|
db.add(created_record)
|
|
db.flush()
|
|
|
|
created_record.meta_data = {"created": True, "error_case": None, "message": ""}
|
|
return created_record
|
|
|
|
def update(self, db: Session, **kwargs) -> "CrudMixin":
|
|
"""
|
|
Update the record with new values.
|
|
|
|
Args:
|
|
db: Database session
|
|
**kwargs: Fields to update
|
|
|
|
Returns:
|
|
Updated record
|
|
|
|
Raises:
|
|
ValueError: If attempting to update is_confirmed with other fields
|
|
"""
|
|
check_kwargs = self.remove_non_related_inputs(kwargs)
|
|
is_confirmed_argument = kwargs.get("is_confirmed", None)
|
|
|
|
if is_confirmed_argument and not len(kwargs) == 1:
|
|
raise ValueError("Confirm field cannot be updated with other fields")
|
|
|
|
check_kwargs = self.extract_system_fields(check_kwargs, create=False)
|
|
|
|
for key, value in check_kwargs.items():
|
|
setattr(self, key, value)
|
|
|
|
# Update confirmation or modification tracking
|
|
if is_confirmed_argument:
|
|
if getattr(self.creds, "person_id", None) and getattr(
|
|
self.creds, "person_name", None
|
|
):
|
|
self.confirmed_by_id = self.creds.person_id
|
|
self.confirmed_by = self.creds.person_name
|
|
else:
|
|
if getattr(self.creds, "person_id", None) and getattr(
|
|
self.creds, "person_name", None
|
|
):
|
|
self.updated_by_id = self.creds.person_id
|
|
self.updated_by = self.creds.person_name
|
|
|
|
db.flush()
|
|
return self
|
|
|
|
def get_dict(
|
|
self,
|
|
exclude: Optional[List[str]] = None,
|
|
include: Optional[List[str]] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Convert model instance to dictionary with customizable fields.
|
|
|
|
Args:
|
|
exclude: List of fields to exclude
|
|
include: List of fields to include (takes precedence over exclude)
|
|
|
|
Returns:
|
|
Dictionary representation of the model
|
|
"""
|
|
return_dict: Dict[str, Any] = {}
|
|
|
|
if include:
|
|
# Handle explicitly included fields
|
|
exclude_list = [
|
|
element
|
|
for element in self.__system_default_model__
|
|
if str(element)[-2:] == "id" and str(element)[-5:].lower() == "uu_id"
|
|
]
|
|
columns_include_list = list(set(include).difference(set(exclude_list)))
|
|
columns_include_list.extend(["uu_id"])
|
|
|
|
for key in columns_include_list:
|
|
val = getattr(self, key)
|
|
correct, value_of_database = self.iterate_over_variables(val, key)
|
|
if correct:
|
|
return_dict[key] = value_of_database
|
|
|
|
elif exclude:
|
|
# Handle explicitly excluded fields
|
|
exclude.extend(
|
|
list(
|
|
set(getattr(self, "__exclude__fields__", []) or []).difference(
|
|
exclude
|
|
)
|
|
)
|
|
)
|
|
exclude.extend(
|
|
[
|
|
element
|
|
for element in self.__system_default_model__
|
|
if str(element)[-2:] == "id"
|
|
]
|
|
)
|
|
|
|
columns_excluded_list = list(set(self.columns).difference(set(exclude)))
|
|
columns_excluded_list.extend(["uu_id", "active"])
|
|
|
|
for key in columns_excluded_list:
|
|
val = getattr(self, key)
|
|
correct, value_of_database = self.iterate_over_variables(val, key)
|
|
if correct:
|
|
return_dict[key] = value_of_database
|
|
else:
|
|
# Handle default field selection
|
|
exclude_list = (
|
|
getattr(self, "__exclude__fields__", []) or []
|
|
) + self.__system_default_model__
|
|
columns_list = list(set(self.columns).difference(set(exclude_list)))
|
|
columns_list = [col for col in columns_list if str(col)[-2:] != "id"]
|
|
columns_list.extend(
|
|
[col for col in self.columns if str(col)[-5:].lower() == "uu_id"]
|
|
)
|
|
|
|
for remove_field in self.__system_default_model__:
|
|
if remove_field in columns_list:
|
|
columns_list.remove(remove_field)
|
|
|
|
for key in columns_list:
|
|
val = getattr(self, key)
|
|
correct, value_of_database = self.iterate_over_variables(val, key)
|
|
if correct:
|
|
return_dict[key] = value_of_database
|
|
|
|
return return_dict
|
|
|
|
|
|
class BaseCollection(CrudMixin):
|
|
"""Base model class with minimal fields."""
|
|
|
|
__abstract__ = True
|
|
__repr__ = ReprMixin.__repr__
|
|
|
|
id: Mapped[int] = mapped_column(Integer, primary_key=True)
|
|
|
|
|
|
class CrudCollection(CrudMixin):
|
|
"""
|
|
Full-featured model class with all common fields.
|
|
|
|
Includes:
|
|
- UUID and reference ID
|
|
- Timestamps
|
|
- User tracking
|
|
- Confirmation status
|
|
- Soft delete
|
|
- Notification flags
|
|
"""
|
|
|
|
__abstract__ = True
|
|
__repr__ = ReprMixin.__repr__
|
|
|
|
# Primary and reference fields
|
|
id: Mapped[int] = mapped_column(Integer, primary_key=True)
|
|
uu_id: Mapped[str] = mapped_column(
|
|
UUID,
|
|
server_default=text("gen_random_uuid()"),
|
|
index=True,
|
|
unique=True,
|
|
comment="Unique identifier UUID",
|
|
)
|
|
ref_id: Mapped[str] = mapped_column(
|
|
String(100), nullable=True, index=True, comment="External reference ID"
|
|
)
|
|
|
|
# Timestamps
|
|
created_at: Mapped[TIMESTAMP] = mapped_column(
|
|
TIMESTAMP(timezone=True),
|
|
server_default=func.now(),
|
|
nullable=False,
|
|
index=True,
|
|
comment="Record creation timestamp",
|
|
)
|
|
updated_at: Mapped[TIMESTAMP] = mapped_column(
|
|
TIMESTAMP(timezone=True),
|
|
server_default=func.now(),
|
|
onupdate=func.now(),
|
|
nullable=False,
|
|
index=True,
|
|
comment="Last update timestamp",
|
|
)
|
|
|
|
# Cryptographic and user tracking
|
|
cryp_uu_id: Mapped[str] = mapped_column(
|
|
String, nullable=True, index=True, comment="Cryptographic UUID"
|
|
)
|
|
created_by: Mapped[str] = mapped_column(
|
|
String, nullable=True, comment="Creator name"
|
|
)
|
|
created_by_id: Mapped[int] = mapped_column(
|
|
Integer, nullable=True, comment="Creator ID"
|
|
)
|
|
updated_by: Mapped[str] = mapped_column(
|
|
String, nullable=True, comment="Last modifier name"
|
|
)
|
|
updated_by_id: Mapped[int] = mapped_column(
|
|
Integer, nullable=True, comment="Last modifier ID"
|
|
)
|
|
confirmed_by: Mapped[str] = mapped_column(
|
|
String, nullable=True, comment="Confirmer name"
|
|
)
|
|
confirmed_by_id: Mapped[int] = mapped_column(
|
|
Integer, nullable=True, comment="Confirmer ID"
|
|
)
|
|
|
|
# Status flags
|
|
is_confirmed: Mapped[bool] = mapped_column(
|
|
Boolean, server_default="0", comment="Record confirmation status"
|
|
)
|
|
replication_id: Mapped[int] = mapped_column(
|
|
SmallInteger, server_default="0", comment="Replication identifier"
|
|
)
|
|
deleted: Mapped[bool] = mapped_column(
|
|
Boolean, server_default="0", comment="Soft delete flag"
|
|
)
|
|
active: Mapped[bool] = mapped_column(
|
|
Boolean, server_default="1", comment="Record active status"
|
|
)
|
|
is_notification_send: Mapped[bool] = mapped_column(
|
|
Boolean, server_default="0", comment="Notification sent flag"
|
|
)
|
|
is_email_send: Mapped[bool] = mapped_column(
|
|
Boolean, server_default="0", comment="Email sent flag"
|
|
)
|
|
|
|
@classmethod
|
|
def retrieve_language_model(cls, lang: str, response_model: Any) -> Dict[str, str]:
|
|
"""
|
|
Retrieve language-specific model headers and validation messages.
|
|
|
|
Args:
|
|
lang: Language code
|
|
response_model: Model containing language annotations
|
|
|
|
Returns:
|
|
Dictionary of field names to localized headers
|
|
"""
|
|
headers_and_validation = {}
|
|
__language_model__ = getattr(cls.__language_model__, lang, "tr")
|
|
|
|
for field in response_model.__annotations__.keys():
|
|
headers_and_validation[field] = getattr(
|
|
__language_model__, field, "Lang Not found"
|
|
)
|
|
|
|
return headers_and_validation
|