172 lines
5.8 KiB
Python
172 lines
5.8 KiB
Python
from typing import Optional, Union, Type, Any, Dict, TypeVar
|
|
from pydantic import BaseModel
|
|
from sqlalchemy.orm import Query
|
|
from sqlalchemy import asc, desc
|
|
|
|
from .pagination import default_paginate_config
|
|
from .base import PostgresResponse
|
|
from .pagination import PaginationConfig
|
|
|
|
T = TypeVar("T")
|
|
|
|
class Pagination:
|
|
"""
|
|
Handles pagination logic for query results.
|
|
|
|
Manages page size, current page, ordering, and calculates total pages
|
|
and items based on the data source.
|
|
|
|
Attributes:
|
|
DEFAULT_SIZE: Default number of items per page (10)
|
|
MIN_SIZE: Minimum allowed page size (10)
|
|
MAX_SIZE: Maximum allowed page size (40)
|
|
"""
|
|
|
|
DEFAULT_SIZE = default_paginate_config.DEFAULT_SIZE
|
|
MIN_SIZE = default_paginate_config.MIN_SIZE
|
|
MAX_SIZE = default_paginate_config.MAX_SIZE
|
|
|
|
def __init__(self, data: Query, base_query: Query):
|
|
self.base_query = base_query
|
|
self.query = data
|
|
self.size: int = self.DEFAULT_SIZE
|
|
self.page: int = 1
|
|
self.orderField: Optional[Union[tuple[str], list[str]]] = ["uu_id"]
|
|
self.orderType: Optional[Union[tuple[str], list[str]]] = ["asc"]
|
|
self.page_count: int = 1
|
|
self.total_count: int = 0
|
|
self.all_count: int = 0
|
|
self.total_pages: int = 1
|
|
self._update_page_counts()
|
|
|
|
def change(self, **kwargs) -> None:
|
|
"""Update pagination settings from config."""
|
|
config = PaginationConfig(**kwargs)
|
|
self.size = (
|
|
config.size
|
|
if self.MIN_SIZE <= config.size <= self.MAX_SIZE
|
|
else self.DEFAULT_SIZE
|
|
)
|
|
self.page = config.page
|
|
self.orderField = config.orderField
|
|
self.orderType = config.orderType
|
|
self._update_page_counts()
|
|
|
|
def feed(self, data: PostgresResponse) -> None:
|
|
"""Calculate pagination based on data source."""
|
|
self.query = data
|
|
self._update_page_counts()
|
|
|
|
def _update_page_counts(self) -> None:
|
|
"""Update page counts and validate current page."""
|
|
if self.query:
|
|
self.total_count = self.query.count()
|
|
self.all_count = self.base_query.count()
|
|
|
|
self.size = (
|
|
self.size
|
|
if self.MIN_SIZE <= self.size <= self.MAX_SIZE
|
|
else self.DEFAULT_SIZE
|
|
)
|
|
self.total_pages = max(1, (self.total_count + self.size - 1) // self.size)
|
|
self.page = max(1, min(self.page, self.total_pages))
|
|
self.page_count = (
|
|
self.total_count % self.size
|
|
if self.page == self.total_pages and self.total_count % self.size
|
|
else self.size
|
|
)
|
|
|
|
def refresh(self) -> None:
|
|
"""Reset pagination state to defaults."""
|
|
self._update_page_counts()
|
|
|
|
def reset(self) -> None:
|
|
"""Reset pagination state to defaults."""
|
|
self.size = self.DEFAULT_SIZE
|
|
self.page = 1
|
|
self.orderField = "uu_id"
|
|
self.orderType = "asc"
|
|
|
|
@property
|
|
def next_available(self) -> bool:
|
|
if self.page < self.total_pages:
|
|
return True
|
|
return False
|
|
|
|
@property
|
|
def back_available(self) -> bool:
|
|
if self.page > 1:
|
|
return True
|
|
return False
|
|
|
|
@property
|
|
def as_dict(self) -> Dict[str, Any]:
|
|
"""Convert pagination state to dictionary format."""
|
|
self.refresh()
|
|
return {
|
|
"size": self.size,
|
|
"page": self.page,
|
|
"allCount": self.all_count,
|
|
"totalCount": self.total_count,
|
|
"totalPages": self.total_pages,
|
|
"pageCount": self.page_count,
|
|
"orderField": self.orderField,
|
|
"orderType": self.orderType,
|
|
"next": self.next_available,
|
|
"back": self.back_available,
|
|
}
|
|
|
|
|
|
class PaginationResult:
|
|
"""
|
|
Result of a paginated query.
|
|
|
|
Contains the query result and pagination state.
|
|
data: PostgresResponse of query results
|
|
pagination: Pagination state
|
|
|
|
Attributes:
|
|
_query: Original query object
|
|
pagination: Pagination state
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
data: PostgresResponse,
|
|
pagination: Pagination,
|
|
is_list: bool = True,
|
|
response_model: Type[T] = None,
|
|
):
|
|
self._query = data
|
|
self.pagination = pagination
|
|
self.response_type = is_list
|
|
self.limit = self.pagination.size
|
|
self.offset = self.pagination.size * (self.pagination.page - 1)
|
|
self.order_by = self.pagination.orderField
|
|
self.response_model = response_model
|
|
|
|
def dynamic_order_by(self):
|
|
"""
|
|
Dynamically order a query by multiple fields.
|
|
Returns:
|
|
Ordered query object.
|
|
"""
|
|
if not len(self.order_by) == len(self.pagination.orderType):
|
|
raise ValueError("Order by fields and order types must have the same length.")
|
|
order_criteria = zip(self.order_by, self.pagination.orderType)
|
|
for field, direction in order_criteria:
|
|
if hasattr(self._query.column_descriptions[0]["entity"], field):
|
|
if direction.lower().startswith("d"):
|
|
self._query = self._query.order_by(desc(getattr(self._query.column_descriptions[0]["entity"], field)))
|
|
else:
|
|
self._query = self._query.order_by(asc(getattr(self._query.column_descriptions[0]["entity"], field)))
|
|
return self._query
|
|
|
|
@property
|
|
def data(self) -> Union[list | dict]:
|
|
"""Get query object."""
|
|
query_paginated = self.dynamic_order_by().limit(self.limit).offset(self.offset)
|
|
queried_data = (query_paginated.all() if self.response_type else query_paginated.first())
|
|
data = ([result.get_dict() for result in queried_data] if self.response_type else queried_data.get_dict())
|
|
return [self.response_model(**item).model_dump() for item in data] if self.response_model else data
|