403 lines
12 KiB
Python
403 lines
12 KiB
Python
"""
|
|
Advanced filtering functionality for SQLAlchemy models.
|
|
|
|
This module provides a comprehensive set of filtering capabilities for SQLAlchemy models,
|
|
including pagination, ordering, and complex query building.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
import arrow
|
|
|
|
from typing import Any, TypeVar, Type, Union, Optional
|
|
|
|
from sqlalchemy import ColumnExpressionArgument
|
|
from sqlalchemy.orm import Query, Session
|
|
from sqlalchemy.sql.elements import BinaryExpression
|
|
|
|
from Controllers.Postgres.response import PostgresResponse
|
|
|
|
|
|
T = TypeVar("T", bound="QueryModel")
|
|
|
|
|
|
class QueryModel:
|
|
|
|
__abstract__ = True
|
|
pre_query = None
|
|
|
|
@classmethod
|
|
def _query(cls: Type[T], db: Session) -> Query:
|
|
"""Returns the query to use in the model."""
|
|
if cls.pre_query is not None:
|
|
# Return the pre_query directly as it's already a Query object
|
|
return cls.pre_query
|
|
return db.query(cls)
|
|
|
|
@classmethod
|
|
def add_new_arg_to_args(
|
|
cls: Type[T],
|
|
args_list: tuple[BinaryExpression, ...],
|
|
argument: str,
|
|
value: BinaryExpression,
|
|
) -> tuple[BinaryExpression, ...]:
|
|
"""
|
|
Add a new argument to the query arguments if it doesn't exist.
|
|
|
|
Args:
|
|
args_list: Existing query arguments
|
|
argument: Key of the argument to check for
|
|
value: New argument value to add
|
|
|
|
Returns:
|
|
Updated tuple of query arguments
|
|
"""
|
|
# Convert to set to remove duplicates while preserving order
|
|
new_args = list(
|
|
dict.fromkeys(arg for arg in args_list if isinstance(arg, BinaryExpression))
|
|
)
|
|
# Check if argument already exists
|
|
if not any(
|
|
getattr(getattr(arg, "left", None), "key", None) == argument
|
|
for arg in new_args
|
|
):
|
|
new_args.append(value)
|
|
|
|
return tuple(new_args)
|
|
|
|
@classmethod
|
|
def get_not_expired_query_arg(
|
|
cls: Type[T], args: tuple[BinaryExpression, ...]
|
|
) -> tuple[BinaryExpression, ...]:
|
|
"""
|
|
Add expiry date filtering to the query arguments.
|
|
|
|
Args:
|
|
args: Existing query arguments
|
|
|
|
Returns:
|
|
Updated tuple of query arguments with expiry filters
|
|
|
|
Raises:
|
|
AttributeError: If model does not have expiry_starts or expiry_ends columns
|
|
"""
|
|
try:
|
|
current_time = str(arrow.now())
|
|
# Only add expiry filters if they don't already exist
|
|
if not any(
|
|
getattr(getattr(arg, "left", None), "key", None) == "expiry_ends"
|
|
for arg in args
|
|
):
|
|
ends = cls.expiry_ends > current_time
|
|
args = cls.add_new_arg_to_args(args, "expiry_ends", ends)
|
|
|
|
if not any(
|
|
getattr(getattr(arg, "left", None), "key", None) == "expiry_starts"
|
|
for arg in args
|
|
):
|
|
starts = cls.expiry_starts <= current_time
|
|
args = cls.add_new_arg_to_args(args, "expiry_starts", starts)
|
|
return args
|
|
|
|
except AttributeError as e:
|
|
raise AttributeError(
|
|
f"Model {cls.__name__} must have expiry_starts and expiry_ends columns"
|
|
) from e
|
|
|
|
@classmethod
|
|
def produce_query_to_add(cls: Type[T], filter_list: dict, args: tuple) -> tuple:
|
|
"""
|
|
Adds query to main filter options
|
|
|
|
Args:
|
|
filter_list: Dictionary containing query parameters
|
|
args: Existing query arguments to add to
|
|
|
|
Returns:
|
|
Updated query arguments tuple
|
|
"""
|
|
try:
|
|
if not filter_list or not isinstance(filter_list, dict):
|
|
return args
|
|
|
|
query_params = filter_list.get("query")
|
|
if not query_params or not isinstance(query_params, dict):
|
|
return args
|
|
|
|
for key, value in query_params.items():
|
|
if hasattr(cls, key):
|
|
# Create a new filter expression
|
|
filter_expr = getattr(cls, key) == value
|
|
# Add it to args if it doesn't exist
|
|
args = cls.add_new_arg_to_args(args, key, filter_expr)
|
|
return args
|
|
|
|
except Exception as e:
|
|
print(f"Error in produce_query_to_add: {str(e)}")
|
|
return args
|
|
|
|
@classmethod
|
|
def convert(
|
|
cls: Type[T], smart_options: dict[str, Any], validate_model: Any = None
|
|
) -> Optional[tuple[BinaryExpression, ...]]:
|
|
"""
|
|
Convert smart options to SQLAlchemy filter expressions.
|
|
|
|
Args:
|
|
smart_options: Dictionary of filter options
|
|
validate_model: Optional model to validate against
|
|
|
|
Returns:
|
|
Tuple of SQLAlchemy filter expressions or None if validation fails
|
|
"""
|
|
if validate_model is not None:
|
|
# Add validation logic here if needed
|
|
pass
|
|
|
|
return tuple(cls.filter_expr(**smart_options))
|
|
|
|
@classmethod
|
|
def filter_by_one(
|
|
cls: Type[T], db: Session, system: bool = False, **kwargs: Any
|
|
) -> PostgresResponse[T]:
|
|
"""
|
|
Filter single record by keyword arguments.
|
|
|
|
Args:
|
|
db: Database session
|
|
system: If True, skip status filtering
|
|
**kwargs: Filter criteria
|
|
|
|
Returns:
|
|
Query response with single record
|
|
"""
|
|
# Get base query (either pre_query or new query)
|
|
base_query = cls._query(db)
|
|
|
|
# Create the final query by applying filters
|
|
query = base_query
|
|
|
|
# Add keyword filters first
|
|
query = query.filter_by(**kwargs)
|
|
|
|
# Add status filters if not system query
|
|
if not system:
|
|
query = query.filter(
|
|
cls.is_confirmed == True, cls.deleted == False, cls.active == True
|
|
)
|
|
|
|
# Add expiry filters last
|
|
args = cls.get_not_expired_query_arg(())
|
|
query = query.filter(*args)
|
|
|
|
return PostgresResponse(
|
|
model=cls,
|
|
pre_query=base_query, # Use the base query for pre_query
|
|
query=query,
|
|
is_array=False,
|
|
)
|
|
|
|
@classmethod
|
|
def filter_one(
|
|
cls: Type[T],
|
|
*args: Union[BinaryExpression, ColumnExpressionArgument],
|
|
db: Session,
|
|
) -> PostgresResponse[T]:
|
|
"""
|
|
Filter single record by expressions.
|
|
|
|
Args:
|
|
db: Database session
|
|
*args: Filter expressions
|
|
|
|
Returns:
|
|
Query response with single record
|
|
"""
|
|
# Get base query (either pre_query or new query)
|
|
base_query = cls._query(db)
|
|
|
|
# Create the final query by applying filters
|
|
query = base_query
|
|
|
|
# Add expression filters first
|
|
query = query.filter(*args)
|
|
|
|
# Add status filters
|
|
query = query.filter(
|
|
cls.is_confirmed == True, cls.deleted == False, cls.active == True
|
|
)
|
|
|
|
# Add expiry filters last
|
|
args = cls.get_not_expired_query_arg(())
|
|
query = query.filter(*args)
|
|
|
|
return PostgresResponse(
|
|
model=cls,
|
|
pre_query=base_query, # Use the base query for pre_query
|
|
query=query,
|
|
is_array=False,
|
|
)
|
|
|
|
@classmethod
|
|
def filter_one_system(
|
|
cls: Type[T],
|
|
*args: Union[BinaryExpression, ColumnExpressionArgument],
|
|
db: Session,
|
|
) -> PostgresResponse[T]:
|
|
"""
|
|
Filter single record by expressions without status filtering.
|
|
|
|
Args:
|
|
db: Database session
|
|
*args: Filter expressions
|
|
|
|
Returns:
|
|
Query response with single record
|
|
"""
|
|
# Get base query (either pre_query or new query)
|
|
base_query = cls._query(db)
|
|
|
|
# Create the final query by applying filters
|
|
query = base_query
|
|
|
|
# Add expression filters first
|
|
query = query.filter(*args)
|
|
|
|
# Add expiry filters last
|
|
args = cls.get_not_expired_query_arg(())
|
|
query = query.filter(*args)
|
|
|
|
return PostgresResponse(
|
|
model=cls,
|
|
pre_query=base_query, # Use the base query for pre_query
|
|
query=query,
|
|
is_array=False,
|
|
)
|
|
|
|
@classmethod
|
|
def filter_all_system(
|
|
cls: Type[T],
|
|
*args: Union[BinaryExpression, ColumnExpressionArgument],
|
|
db: Session,
|
|
) -> PostgresResponse[T]:
|
|
"""
|
|
Filter multiple records by expressions without status filtering.
|
|
|
|
Args:
|
|
db: Database session
|
|
*args: Filter expressions
|
|
|
|
Returns:
|
|
Query response with matching records
|
|
"""
|
|
# Get base query (either pre_query or new query)
|
|
base_query = cls._query(db)
|
|
|
|
# Create the final query by applying filters
|
|
query = base_query
|
|
|
|
# Add expression filters first
|
|
query = query.filter(*args)
|
|
|
|
# Add expiry filters last
|
|
args = cls.get_not_expired_query_arg(())
|
|
query = query.filter(*args)
|
|
|
|
return PostgresResponse(
|
|
model=cls,
|
|
pre_query=base_query, # Use the base query for pre_query
|
|
query=query,
|
|
is_array=True,
|
|
)
|
|
|
|
@classmethod
|
|
def filter_all(
|
|
cls: Type[T],
|
|
*args: Union[BinaryExpression, ColumnExpressionArgument],
|
|
db: Session,
|
|
) -> PostgresResponse[T]:
|
|
"""
|
|
Filter multiple records by expressions.
|
|
|
|
Args:
|
|
db: Database session
|
|
*args: Filter expressions
|
|
|
|
Returns:
|
|
Query response with matching records
|
|
"""
|
|
# Get base query (either pre_query or new query)
|
|
base_query = cls._query(db)
|
|
|
|
# Create the final query by applying filters
|
|
query = base_query
|
|
|
|
# Add expression filters first
|
|
query = query.filter(*args)
|
|
|
|
# Add status filters
|
|
query = query.filter(
|
|
cls.is_confirmed == True, cls.deleted == False, cls.active == True
|
|
)
|
|
|
|
# Add expiry filters last
|
|
args = cls.get_not_expired_query_arg(())
|
|
query = query.filter(*args)
|
|
|
|
return PostgresResponse(
|
|
model=cls,
|
|
pre_query=base_query, # Use the base query for pre_query
|
|
query=query,
|
|
is_array=True,
|
|
)
|
|
|
|
@classmethod
|
|
def filter_by_all_system(
|
|
cls: Type[T], db: Session, **kwargs: Any
|
|
) -> PostgresResponse[T]:
|
|
"""
|
|
Filter multiple records by keyword arguments without status filtering.
|
|
|
|
Args:
|
|
db: Database session
|
|
**kwargs: Filter criteria
|
|
|
|
Returns:
|
|
Query response with matching records
|
|
"""
|
|
# Get base query (either pre_query or new query)
|
|
base_query = cls._query(db)
|
|
|
|
# Create the final query by applying filters
|
|
query = base_query
|
|
|
|
# Add keyword filters first
|
|
query = query.filter_by(**kwargs)
|
|
|
|
# Add expiry filters last
|
|
args = cls.get_not_expired_query_arg(())
|
|
query = query.filter(*args)
|
|
|
|
return PostgresResponse(
|
|
model=cls,
|
|
pre_query=base_query, # Use the base query for pre_query
|
|
query=query,
|
|
is_array=True,
|
|
)
|
|
|
|
@classmethod
|
|
def filter_by_one_system(
|
|
cls: Type[T], db: Session, **kwargs: Any
|
|
) -> PostgresResponse[T]:
|
|
"""
|
|
Filter single record by keyword arguments without status filtering.
|
|
|
|
Args:
|
|
db: Database session
|
|
**kwargs: Filter criteria
|
|
|
|
Returns:
|
|
Query response with single record
|
|
"""
|
|
# Use filter_by_one with system=True to avoid code duplication
|
|
return cls.filter_by_one(db=db, system=True, **kwargs)
|