updated Query options

This commit is contained in:
2025-01-30 14:24:42 +03:00
parent 822e4155a1
commit b664f64eb4
35 changed files with 852 additions and 589 deletions

View File

@@ -140,7 +140,9 @@ class QueryModel(ArgumentModel):
@classmethod
def filter_all_system(
cls: Type[T], *args: Union[BinaryExpression, ColumnExpressionArgument], db: Session
cls: Type[T],
*args: Union[BinaryExpression, ColumnExpressionArgument],
db: Session,
) -> PostgresResponse:
"""
Filter multiple records by expressions without status filtering.
@@ -158,7 +160,11 @@ class QueryModel(ArgumentModel):
return PostgresResponse(pre_query=cls._query(db), query=query, is_array=True)
@classmethod
def filter_all(cls: Type[T], *args: Union[BinaryExpression, ColumnExpressionArgument], db: Session) -> PostgresResponse:
def filter_all(
cls: Type[T],
*args: Union[BinaryExpression, ColumnExpressionArgument],
db: Session,
) -> PostgresResponse:
"""
Filter multiple records by expressions.

View File

@@ -2,7 +2,8 @@ from __future__ import annotations
from typing import Any, Dict, Optional, Union
from sqlalchemy import desc, asc
from pydantic import BaseModel
from AllConfigs.SqlDatabase.configs import PaginateConfig
from ApiLayers.AllConfigs.SqlDatabase.configs import PaginateConfig
from ApiLayers.ApiValidations.Request import ListOptions
from Services.PostgresDb.Models.response import PostgresResponse
@@ -183,3 +184,44 @@ class PaginationResult:
if self.response_type
else queried_data.get_dict()
)
class QueryOptions:
def __init__(self, table, data: Union[dict, ListOptions] = None, model_query: Optional[Any] = None):
self.table = table
self.data = data
self.model_query = model_query
if isinstance(data, dict):
self.data = ListOptions(**data)
self.validate_query()
if not self.data.order_type:
self.data.order_type = ["created_at"]
if not self.data.order_field:
self.data.order_field = ["uu_id"]
def validate_query(self):
if not self.data.query or not self.model_query:
return ()
cleaned_query, cleaned_query_by_model, last_dict = {}, {}, {}
for key, value in self.data.query.items():
cleaned_query[str(str(key).split("__")[0])] = value
cleaned_query_by_model[str(str(key).split("__")[0])] = (key, value)
cleaned_model = self.model_query(**cleaned_query)
for i in cleaned_query:
if hasattr(cleaned_model, i):
last_dict[str(cleaned_query_by_model[i][0])] = str(cleaned_query_by_model[i][1])
self.data.query = last_dict
def convert(self) -> tuple:
"""
self.table.convert(query)
(<sqlalchemy.sql.elements.BinaryExpression object at 0x7caaeacf0080>, <sqlalchemy.sql.elements.BinaryExpression object at 0x7caaea729b80>)
"""
if not self.data:
return ()
if not self.data.query:
return ()
print('query', self.data)
print('query', self.data.query)
return tuple(self.table.convert(self.data.query))