from typing import Optional, Union, Type, Any, Dict, TypeVar from pydantic import BaseModel from sqlalchemy.orm import Query from sqlalchemy import asc, desc from .pagination import default_paginate_config from .base import PostgresResponse from .pagination import PaginationConfig T = TypeVar("T") class Pagination: """ Handles pagination logic for query results. Manages page size, current page, ordering, and calculates total pages and items based on the data source. Attributes: DEFAULT_SIZE: Default number of items per page (10) MIN_SIZE: Minimum allowed page size (10) MAX_SIZE: Maximum allowed page size (40) """ DEFAULT_SIZE = default_paginate_config.DEFAULT_SIZE MIN_SIZE = default_paginate_config.MIN_SIZE MAX_SIZE = default_paginate_config.MAX_SIZE def __init__(self, data: PostgresResponse): self.query = data self.size: int = self.DEFAULT_SIZE self.page: int = 1 self.orderField: Optional[Union[tuple[str], list[str]]] = ["uu_id"] self.orderType: Optional[Union[tuple[str], list[str]]] = ["asc"] self.page_count: int = 1 self.total_count: int = 0 self.all_count: int = 0 self.total_pages: int = 1 self._update_page_counts() def change(self, **kwargs) -> None: """Update pagination settings from config.""" config = PaginationConfig(**kwargs) self.size = ( config.size if self.MIN_SIZE <= config.size <= self.MAX_SIZE else self.DEFAULT_SIZE ) self.page = config.page self.orderField = config.orderField self.orderType = config.orderType self._update_page_counts() def feed(self, data: PostgresResponse) -> None: """Calculate pagination based on data source.""" self.query = data self._update_page_counts() def _update_page_counts(self) -> None: """Update page counts and validate current page.""" if self.query: self.total_count = self.query.count() self.all_count = self.query.count() self.size = ( self.size if self.MIN_SIZE <= self.size <= self.MAX_SIZE else self.DEFAULT_SIZE ) self.total_pages = max(1, (self.total_count + self.size - 1) // self.size) self.page = max(1, min(self.page, self.total_pages)) self.page_count = ( self.total_count % self.size if self.page == self.total_pages and self.total_count % self.size else self.size ) def refresh(self) -> None: """Reset pagination state to defaults.""" self._update_page_counts() def reset(self) -> None: """Reset pagination state to defaults.""" self.size = self.DEFAULT_SIZE self.page = 1 self.orderField = "uu_id" self.orderType = "asc" @property def next_available(self) -> bool: if self.page < self.total_pages: return True return False @property def back_available(self) -> bool: if self.page > 1: return True return False @property def as_dict(self) -> Dict[str, Any]: """Convert pagination state to dictionary format.""" self.refresh() return { "size": self.size, "page": self.page, "allCount": self.all_count, "totalCount": self.total_count, "totalPages": self.total_pages, "pageCount": self.page_count, "orderField": self.orderField, "orderType": self.orderType, "next": self.next_available, "back": self.back_available, } class PaginationResult: """ Result of a paginated query. Contains the query result and pagination state. data: PostgresResponse of query results pagination: Pagination state Attributes: _query: Original query object pagination: Pagination state """ def __init__( self, data: PostgresResponse, pagination: Pagination, is_list: bool = True, response_model: Type[T] = None, ): self._query = data self.pagination = pagination self.response_type = is_list self.limit = self.pagination.size self.offset = self.pagination.size * (self.pagination.page - 1) self.order_by = self.pagination.orderField self.response_model = response_model def dynamic_order_by(self): """ Dynamically order a query by multiple fields. Returns: Ordered query object. """ if not len(self.order_by) == len(self.pagination.orderType): raise ValueError( "Order by fields and order types must have the same length." ) order_criteria = zip(self.order_by, self.pagination.orderType) for field, direction in order_criteria: if hasattr(self._query.column_descriptions[0]["entity"], field): if direction.lower().startswith("d"): self._query = self._query.order_by( desc( getattr(self._query.column_descriptions[0]["entity"], field) ) ) else: self._query = self._query.order_by( asc( getattr(self._query.column_descriptions[0]["entity"], field) ) ) return self._query @property def data(self) -> Union[list | dict]: """Get query object.""" query_paginated = self.dynamic_order_by().limit(self.limit).offset(self.offset) queried_data = (query_paginated.all() if self.response_type else query_paginated.first()) data = ([result.get_dict() for result in queried_data] if self.response_type else queried_data.get_dict()) return [self.response_model(**item).model_dump() for item in data] if self.response_model else data