wag-managment-api-service-v.../Services/PostgresDb/Models_old/base_model.py

255 lines
8.8 KiB
Python

from contextlib import contextmanager
from typing import Any, Dict, Optional, Generator
from sqlalchemy.orm import Session
from sqlalchemy import inspect
from Services.PostgresDb.database import Base
class BaseModel(Base):
"""Base model class with common utility functions and SQLAlchemy integration.
This class serves as the foundation for all database models, providing:
- SQLAlchemy ORM integration through Base
- Session management utilities
- CRUD operations (create, update)
- Bulk operation support
"""
__abstract__ = True # Marks this as a base class, won't create a table
@classmethod
def new_session(cls) -> Session:
"""Get database session."""
from Services.PostgresDb.database import get_db
with get_db() as session:
return session
def update(
self, session: Optional[Session] = None, **kwargs: Dict[str, Any]
) -> "BaseModel":
"""Update model instance with given attributes.
Args:
session: Optional existing session to use. If not provided, creates a new one.
**kwargs: Attributes to update
Returns:
Updated model instance
Example:
# Using an existing session
with get_db() as session:
model.update(session=session, name="new name")
model2.update(session=session, status="active")
# Both updates use the same transaction
# Creating a new session automatically
model.update(name="new name") # Creates and manages its own session
"""
should_close_session = session is None
if session is None:
session = self.get_session()
try:
# Remove unrelated fields
check_kwargs = self.remove_non_related_inputs(kwargs)
# Get all table columns
mapper = inspect(self.__class__)
columns = [column.key for column in mapper.columns]
# Get relationship fields
relationships = [rel.key for rel in mapper.relationships]
# Handle confirmation logic
is_confirmed_argument = kwargs.get("is_confirmed", None)
if is_confirmed_argument and not len(kwargs) == 1:
self.raise_http_exception(
status_code="HTTP_406_NOT_ACCEPTABLE",
error_case="ConfirmError",
data=kwargs,
message="Confirm field cannot be updated with other fields",
)
# Process system fields
check_kwargs = self.extract_system_fields(check_kwargs, create=False)
# Update columns
for key, value in check_kwargs.items():
if key in columns:
setattr(self, key, value)
elif key in relationships:
# Handle relationship updates
related_obj = getattr(self, key)
if isinstance(related_obj, list):
# Handle many-to-many or one-to-many relationships
if isinstance(value, list):
setattr(self, key, value)
else:
# Handle many-to-one or one-to-one relationships
setattr(self, key, value)
# Handle user tracking
if hasattr(self, "creds"):
person_id = getattr(self.creds, "person_id", None)
person_name = getattr(self.creds, "person_name", None)
if person_id and person_name:
if is_confirmed_argument:
self.confirmed_by_id = self.creds.get("person_id", "Unknown")
self.confirmed_by = self.creds.get("person_name", "Unknown")
else:
self.updated_by_id = self.creds.get("person_id", "Unknown")
self.updated_by = self.creds.get("person_name", "Unknown")
session.add(self)
session.flush()
return self
except Exception:
if should_close_session:
session.rollback()
raise
finally:
if should_close_session:
session.close()
@classmethod
def create(
cls, session: Optional[Session] = None, **kwargs: Dict[str, Any]
) -> "BaseModel":
"""Create new instance with optional session reuse.
Args:
session: Optional existing session to use. If not provided, creates a new one.
**kwargs: Attributes for the new instance
Returns:
Created model instance
Example:
# Using an existing session for multiple creates
with get_db() as session:
user1 = User.create(session=session, name="John")
user2 = User.create(session=session, name="Jane")
# Both creates use the same transaction
# Creating with auto-managed session
user = User.create(name="John") # Creates and manages its own session
"""
instance = cls()
should_close_session = session is None
if session is None:
session = instance.get_session()
try:
check_kwargs = cls.remove_non_related_inputs(instance, kwargs)
check_kwargs = cls.extract_system_fields(
instance, check_kwargs, create=True
)
# Get all table columns and relationships
mapper = inspect(cls)
columns = [column.key for column in mapper.columns]
relationships = [rel.key for rel in mapper.relationships]
# Set attributes
for key, value in check_kwargs.items():
if key in columns:
setattr(instance, key, value)
elif key in relationships:
# Handle relationship assignments
if isinstance(value, list):
# Handle many-to-many or one-to-many relationships
setattr(instance, key, value)
else:
# Handle many-to-one or one-to-one relationships
setattr(instance, key, value)
# Handle user tracking
if hasattr(instance, "creds"):
person_id = getattr(instance.creds, "person_id", None)
person_name = getattr(instance.creds, "person_name", None)
if person_id and person_name:
instance.created_by_id = instance.creds.get("person_id", "Unknown")
instance.created_by = instance.creds.get("person_name", "Unknown")
session.add(instance)
session.flush()
if should_close_session:
session.commit()
return instance
except Exception:
if should_close_session:
session.rollback()
raise
finally:
if should_close_session:
session.close()
@classmethod
@contextmanager
def bulk_create(
cls, session: Optional[Session] = None
) -> Generator[Session, None, None]:
"""Context manager for bulk creating instances.
Args:
session: Optional existing session to use. If not provided, creates a new one.
Yields:
SQLAlchemy session for creating multiple instances
Example:
# Bulk create multiple instances in one transaction
with User.bulk_create() as session:
user1 = User.create(session=session, name="John")
user2 = User.create(session=session, name="Jane")
# Both creates share the same transaction
"""
should_close_session = session is None
if session is None:
session = cls().get_session()
try:
yield session
if should_close_session:
session.commit()
except Exception:
if should_close_session:
session.rollback()
raise
finally:
if should_close_session:
session.close()
# @router.put("/users/{user_id}")
# async def update_user(
# user_id: str,
# update_data: Dict[str, Any],
# db: Session = Depends(get_db_session)
# ):
# user = db.query(User).filter(User.id == user_id).first()
# if not user:
# raise HTTPException(status_code=404, detail="User not found")
#
# updated_user = user.update(**update_data)
# return updated_user
#
#
# @router.post("/users")
# async def create_user(
# user_data: Dict[str, Any],
# db: Session = Depends(get_db_session)
# ):
# with User.create_with_session(**user_data) as new_user:
# return new_user