from __future__ import annotations from typing import Any, Dict, Optional, Union from sqlalchemy import desc, asc from pydantic import BaseModel from ApiLayers.AllConfigs.SqlDatabase.configs import PaginateConfig from ApiLayers.ApiValidations.Request import ListOptions from Services.PostgresDb.Models.response import PostgresResponse class PaginationConfig(BaseModel): """ Configuration for pagination settings. Attributes: page: Current page number (default: 1) size: Items per page (default: 10) order_field: Field to order by (default: "id") order_type: Order direction (default: "asc") """ page: int = 1 size: int = 10 order_field: Optional[Union[tuple[str], list[str]]] = None order_type: Optional[Union[tuple[str], list[str]]] = None def __init__(self, **data): super().__init__(**data) if self.order_field is None: self.order_field = ["uu_id"] if self.order_type is None: self.order_type = ["asc"] class Pagination: """ Handles pagination logic for query results. Manages page size, current page, ordering, and calculates total pages and items based on the data source. Attributes: DEFAULT_SIZE: Default number of items per page (10) MIN_SIZE: Minimum allowed page size (10) MAX_SIZE: Maximum allowed page size (40) """ DEFAULT_SIZE = PaginateConfig.DEFAULT_SIZE MIN_SIZE = PaginateConfig.MIN_SIZE MAX_SIZE = PaginateConfig.MAX_SIZE def __init__(self, data: PostgresResponse): self.data = data self.size: int = self.DEFAULT_SIZE self.page: int = 1 self.orderField: Optional[Union[tuple[str], list[str]]] = ["uu_id"] self.orderType: Optional[Union[tuple[str], list[str]]] = ["asc"] self.page_count: int = 1 self.total_count: int = 0 self.all_count: int = 0 self.total_pages: int = 1 self._update_page_counts() def change(self, **kwargs) -> None: """Update pagination settings from config.""" config = PaginationConfig(**kwargs) self.size = ( config.size if self.MIN_SIZE <= config.size <= self.MAX_SIZE else self.DEFAULT_SIZE ) self.page = config.page self.orderField = config.order_field self.orderType = config.order_type self._update_page_counts() def feed(self, data: PostgresResponse) -> None: """Calculate pagination based on data source.""" self.data = data self._update_page_counts() def _update_page_counts(self) -> None: """Update page counts and validate current page.""" if self.data: self.total_count = self.data.count self.all_count = self.data.total_count self.size = ( self.size if self.MIN_SIZE <= self.size <= self.MAX_SIZE else self.DEFAULT_SIZE ) self.total_pages = max(1, (self.total_count + self.size - 1) // self.size) self.page = max(1, min(self.page, self.total_pages)) self.page_count = ( self.total_count % self.size if self.page == self.total_pages and self.total_count % self.size else self.size ) def refresh(self) -> None: """Reset pagination state to defaults.""" self._update_page_counts() def reset(self) -> None: """Reset pagination state to defaults.""" self.size = self.DEFAULT_SIZE self.page = 1 self.orderField = "uu_id" self.orderType = "asc" def as_dict(self) -> Dict[str, Any]: """Convert pagination state to dictionary format.""" self.refresh() return { "size": self.size, "page": self.page, "allCount": self.all_count, "totalCount": self.total_count, "totalPages": self.total_pages, "pageCount": self.page_count, "orderField": self.orderField, "orderType": self.orderType, } class PaginationResult: """ Result of a paginated query. Contains the query result and pagination state. data: PostgresResponse of query results pagination: Pagination state Attributes: _query: Original query object pagination: Pagination state """ def __init__(self, data: PostgresResponse, pagination: Pagination, response_model: Any = None): self._query = data.query self.pagination = pagination self.response_type = data.is_list self.limit = self.pagination.size self.offset = self.pagination.size * (self.pagination.page - 1) self.order_by = self.pagination.orderField self.response_model = response_model def dynamic_order_by(self): """ Dynamically order a query by multiple fields. Returns: Ordered query object. """ if not len(self.order_by) == len(self.pagination.orderType): raise ValueError( "Order by fields and order types must have the same length." ) order_criteria = zip(self.order_by, self.pagination.orderType) for field, direction in order_criteria: if hasattr(self._query.column_descriptions[0]["entity"], field): if direction.lower().startswith("d"): self._query = self._query.order_by( desc( getattr(self._query.column_descriptions[0]["entity"], field) ) ) else: self._query = self._query.order_by( asc( getattr(self._query.column_descriptions[0]["entity"], field) ) ) return self._query @property def data(self) -> Union[list | dict]: """Get query object.""" query_ordered = self.dynamic_order_by() query_paginated = query_ordered.limit(self.limit).offset(self.offset) queried_data = ( query_paginated.all() if self.response_type else query_paginated.first() ) data = ( [result.get_dict() for result in queried_data] if self.response_type else queried_data.get_dict() ) if self.response_model: return [self.response_model(**item).model_dump() for item in data] return data class QueryOptions: def __init__(self, table, data: Union[dict, ListOptions] = None, model_query: Optional[Any] = None): self.table = table self.data = data self.model_query = model_query if isinstance(data, dict): self.data = ListOptions(**data) self.validate_query() if not self.data.order_type: self.data.order_type = ["created_at"] if not self.data.order_field: self.data.order_field = ["uu_id"] def validate_query(self): if not self.data.query or not self.model_query: return () cleaned_query, cleaned_query_by_model, last_dict = {}, {}, {} for key, value in self.data.query.items(): cleaned_query[str(str(key).split("__")[0])] = value cleaned_query_by_model[str(str(key).split("__")[0])] = (key, value) cleaned_model = self.model_query(**cleaned_query) for i in cleaned_query: if hasattr(cleaned_model, i): last_dict[str(cleaned_query_by_model[str(i)][0])] = str(cleaned_query_by_model[str(i)][1]) self.data.query = last_dict def convert(self) -> tuple: """ self.table.convert(query) (, ) """ if not self.data: return () if not self.data.query: return () return tuple(self.table.convert(self.data.query))