redis implemntations and api setup completed

This commit is contained in:
2025-01-25 20:59:47 +03:00
parent 32022ca521
commit 3d5a43220e
138 changed files with 2888 additions and 1117 deletions

View File

@@ -0,0 +1,372 @@
"""
Response handlers for SQLAlchemy query results with pagination support.
This module provides a set of response classes for handling different types of data:
- Single PostgreSQL records
- Multiple SQLAlchemy records
- List data
- Dictionary data
Each response includes pagination information and supports data transformation
through response models.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional, Type, TypeVar, Protocol, Generic
from dataclasses import dataclass
from fastapi import status
from fastapi.responses import JSONResponse
from ApiLibrary.common.line_number import get_line_number_for_error
from Services.PostgresDb.Models.response import PostgresResponse
from ErrorHandlers.ErrorHandlers.api_exc_handler import HTTPExceptionApi
from Services.pagination import Pagination, PaginationConfig
T = TypeVar("T")
DataT = TypeVar("DataT")
@dataclass
class ResponseConfig(Generic[T]):
"""Configuration for response formatting.
Attributes:
status_code: HTTP status code (default: "HTTP_200_OK")
message: Response message to include in the response
completed: Operation completion status flag
cls_object: Class object for error handling context
response_model: Optional response model class for data transformation
"""
status_code: str = "HTTP_200_OK"
message: str = ""
completed: bool = True
cls_object: Optional[Any] = None
response_model: Optional[Type[T]] = None
class ResponseProtocol(Protocol):
"""Protocol defining required methods for response models."""
def dump(self) -> Dict[str, Any]:
"""Convert model to dictionary format."""
...
class BaseJsonResponse(Generic[T]):
"""Base class for JSON response handling.
Provides common functionality for all response types including:
- Response formatting with consistent structure
- Pagination handling and configuration
- Data transformation through response models
"""
def __init__(
self,
message: str,
result: Any,
response_model: Optional[Type[T]] = None,
status_code: str = "HTTP_200_OK",
completed: bool = True,
cls_object: Optional[Any] = None,
filter_attributes: Optional[Any] = None,
) -> None:
"""Initialize response handler.
Args:
message: Response message
result: Query result or data
response_model: Optional model for data transformation
status_code: HTTP status code
completed: Operation completion status
cls_object: Class object for error context
filter_attributes: Optional pagination and filtering attributes
"""
self.status_code = getattr(status, status_code, status.HTTP_200_OK)
self.message = message
self.completed = completed
self.filter_attributes = filter_attributes
self.response_model = response_model
self.cls_object = cls_object
self.result = result
def _create_pagination(self) -> Pagination:
"""Create and configure pagination instance.
Returns:
Configured Pagination instance
"""
pagination = Pagination()
if self.filter_attributes:
pagination.change(
PaginationConfig(
page=self.filter_attributes.page,
size=self.filter_attributes.size,
order_field=self.filter_attributes.order_field,
order_type=self.filter_attributes.order_type,
)
)
return pagination
def _format_response(self, pagination: Pagination, data: Any) -> JSONResponse:
"""Format final JSON response with pagination.
Args:
pagination: Pagination instance with configuration
data: Response data to include
Returns:
Formatted JSONResponse
"""
return JSONResponse(
status_code=self.status_code,
content={
"pagination": pagination.as_dict(),
"completed": self.completed,
"message": self.message,
"data": data,
},
)
def _transform_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Transform data using response model if provided.
Args:
data: Raw data dictionary
Returns:
Transformed data dictionary
"""
if self.response_model:
return self.response_model(**data).dump()
return data
@staticmethod
def _validate_data(data: Any, expected_type: Type, cls_object: Any) -> None:
"""Validate data type and raise exception if invalid.
Args:
data: Data to validate
expected_type: Expected type of data
cls_object: Class object for error context
Raises:
HTTPExceptionApi: If data type is invalid
"""
if not isinstance(data, expected_type):
raise HTTPExceptionApi(
lang=cls_object.lang,
error_code="HTTP_400_BAD_REQUEST",
loc=get_line_number_for_error(),
sys_msg=f"Invalid data type: {type(data)}",
)
class SinglePostgresResponse(BaseJsonResponse[T]):
"""Handler for single record responses from PostgreSQL queries."""
def __new__(
cls,
message: str,
result: PostgresResponse,
response_model: Optional[Type[T]] = None,
status_code: str = "HTTP_200_OK",
completed: bool = True,
cls_object: Optional[Any] = None,
filter_attributes: Optional[Any] = None,
) -> JSONResponse:
"""Create response for single PostgreSQL record.
Args:
message: Response message
result: PostgreSQL query result
response_model: Optional model for data transformation
status_code: HTTP status code
completed: Operation completion status
cls_object: Class object for error context
filter_attributes: Optional pagination and filtering attributes
Returns:
Formatted JSON response
Raises:
HTTPExceptionApi: If result is invalid or empty
"""
cls._validate_data(result, PostgresResponse, cls_object)
if not result.first:
raise HTTPExceptionApi(
lang=cls_object.lang,
error_code="HTTP_400_BAD_REQUEST",
loc=get_line_number_for_error(),
sys_msg="No data found",
)
instance = super().__new__(cls)
instance.__init__(
message=message,
result=result,
response_model=response_model,
status_code=status_code,
completed=completed,
cls_object=cls_object,
filter_attributes=filter_attributes,
)
pagination = instance._create_pagination()
data = instance._transform_data(result.data.get_dict())
return instance._format_response(pagination, data)
class AlchemyJsonResponse(BaseJsonResponse[T]):
"""Handler for multiple record responses from SQLAlchemy queries."""
def __new__(
cls,
message: str,
result: PostgresResponse,
response_model: Optional[Type[T]] = None,
status_code: str = "HTTP_200_OK",
completed: bool = True,
cls_object: Optional[Any] = None,
filter_attributes: Optional[Any] = None,
) -> JSONResponse:
"""Create response for multiple SQLAlchemy records.
Args:
message: Response message
result: PostgreSQL query result
response_model: Optional model for data transformation
status_code: HTTP status code
completed: Operation completion status
cls_object: Class object for error context
filter_attributes: Optional pagination and filtering attributes
Returns:
Formatted JSON response
Raises:
HTTPExceptionApi: If result is invalid
"""
cls._validate_data(result, PostgresResponse, cls_object)
if result.first:
raise HTTPExceptionApi(
lang=cls_object.lang,
error_code="HTTP_400_BAD_REQUEST",
loc=get_line_number_for_error(),
sys_msg="No data found",
)
instance = super().__new__(cls)
instance.__init__(
message=message,
result=result,
response_model=response_model,
status_code=status_code,
completed=completed,
cls_object=cls_object,
filter_attributes=filter_attributes,
)
pagination = instance._create_pagination()
data = [instance._transform_data(item.get_dict()) for item in result.data]
pagination.feed(data)
return instance._format_response(pagination, data)
class ListJsonResponse(BaseJsonResponse[T]):
"""Handler for list data responses."""
def __new__(
cls,
message: str,
result: List[Any],
response_model: Optional[Type[T]] = None,
status_code: str = "HTTP_200_OK",
completed: bool = True,
cls_object: Optional[Any] = None,
filter_attributes: Optional[Any] = None,
) -> JSONResponse:
"""Create response for list data.
Args:
message: Response message
result: List of data items
response_model: Optional model for data transformation
status_code: HTTP status code
completed: Operation completion status
cls_object: Class object for error context
filter_attributes: Optional pagination and filtering attributes
Returns:
Formatted JSON response
"""
cls._validate_data(result, list, cls_object)
instance = super().__new__(cls)
instance.__init__(
message=message,
result=result,
response_model=response_model,
status_code=status_code,
completed=completed,
cls_object=cls_object,
filter_attributes=filter_attributes,
)
pagination = instance._create_pagination()
data = [instance._transform_data(item) for item in result]
pagination.feed(data)
return instance._format_response(pagination, data)
class DictJsonResponse(BaseJsonResponse[T]):
"""Handler for dictionary data responses."""
def __new__(
cls,
message: str,
result: Dict[str, Any],
response_model: Optional[Type[T]] = None,
status_code: str = "HTTP_200_OK",
completed: bool = True,
cls_object: Optional[Any] = None,
filter_attributes: Optional[Any] = None,
) -> JSONResponse:
"""Create response for dictionary data.
Args:
message: Response message
result: Dictionary data
response_model: Optional model for data transformation
status_code: HTTP status code
completed: Operation completion status
cls_object: Class object for error context
filter_attributes: Optional pagination and filtering attributes
Returns:
Formatted JSON response
"""
cls._validate_data(result, dict, cls_object)
instance = super().__new__(cls)
instance.__init__(
message=message,
result=result,
response_model=response_model,
status_code=status_code,
completed=completed,
cls_object=cls_object,
filter_attributes=filter_attributes,
)
pagination = instance._create_pagination()
data = instance._transform_data(result)
return instance._format_response(pagination, data)

View File

@@ -0,0 +1,254 @@
from contextlib import contextmanager
from typing import Any, Dict, Optional, Generator
from sqlalchemy.orm import Session
from sqlalchemy import inspect
from Services.PostgresDb.database import Base
class BaseModel(Base):
"""Base model class with common utility functions and SQLAlchemy integration.
This class serves as the foundation for all database models, providing:
- SQLAlchemy ORM integration through Base
- Session management utilities
- CRUD operations (create, update)
- Bulk operation support
"""
__abstract__ = True # Marks this as a base class, won't create a table
@classmethod
def new_session(cls) -> Session:
"""Get database session."""
from Services.PostgresDb.database import get_db
with get_db() as session:
return session
def update(
self, session: Optional[Session] = None, **kwargs: Dict[str, Any]
) -> "BaseModel":
"""Update model instance with given attributes.
Args:
session: Optional existing session to use. If not provided, creates a new one.
**kwargs: Attributes to update
Returns:
Updated model instance
Example:
# Using an existing session
with get_db() as session:
model.update(session=session, name="new name")
model2.update(session=session, status="active")
# Both updates use the same transaction
# Creating a new session automatically
model.update(name="new name") # Creates and manages its own session
"""
should_close_session = session is None
if session is None:
session = self.get_session()
try:
# Remove unrelated fields
check_kwargs = self.remove_non_related_inputs(kwargs)
# Get all table columns
mapper = inspect(self.__class__)
columns = [column.key for column in mapper.columns]
# Get relationship fields
relationships = [rel.key for rel in mapper.relationships]
# Handle confirmation logic
is_confirmed_argument = kwargs.get("is_confirmed", None)
if is_confirmed_argument and not len(kwargs) == 1:
self.raise_http_exception(
status_code="HTTP_406_NOT_ACCEPTABLE",
error_case="ConfirmError",
data=kwargs,
message="Confirm field cannot be updated with other fields",
)
# Process system fields
check_kwargs = self.extract_system_fields(check_kwargs, create=False)
# Update columns
for key, value in check_kwargs.items():
if key in columns:
setattr(self, key, value)
elif key in relationships:
# Handle relationship updates
related_obj = getattr(self, key)
if isinstance(related_obj, list):
# Handle many-to-many or one-to-many relationships
if isinstance(value, list):
setattr(self, key, value)
else:
# Handle many-to-one or one-to-one relationships
setattr(self, key, value)
# Handle user tracking
if hasattr(self, "creds"):
person_id = getattr(self.creds, "person_id", None)
person_name = getattr(self.creds, "person_name", None)
if person_id and person_name:
if is_confirmed_argument:
self.confirmed_by_id = self.creds.get("person_id", "Unknown")
self.confirmed_by = self.creds.get("person_name", "Unknown")
else:
self.updated_by_id = self.creds.get("person_id", "Unknown")
self.updated_by = self.creds.get("person_name", "Unknown")
session.add(self)
session.flush()
return self
except Exception:
if should_close_session:
session.rollback()
raise
finally:
if should_close_session:
session.close()
@classmethod
def create(
cls, session: Optional[Session] = None, **kwargs: Dict[str, Any]
) -> "BaseModel":
"""Create new instance with optional session reuse.
Args:
session: Optional existing session to use. If not provided, creates a new one.
**kwargs: Attributes for the new instance
Returns:
Created model instance
Example:
# Using an existing session for multiple creates
with get_db() as session:
user1 = User.create(session=session, name="John")
user2 = User.create(session=session, name="Jane")
# Both creates use the same transaction
# Creating with auto-managed session
user = User.create(name="John") # Creates and manages its own session
"""
instance = cls()
should_close_session = session is None
if session is None:
session = instance.get_session()
try:
check_kwargs = cls.remove_non_related_inputs(instance, kwargs)
check_kwargs = cls.extract_system_fields(
instance, check_kwargs, create=True
)
# Get all table columns and relationships
mapper = inspect(cls)
columns = [column.key for column in mapper.columns]
relationships = [rel.key for rel in mapper.relationships]
# Set attributes
for key, value in check_kwargs.items():
if key in columns:
setattr(instance, key, value)
elif key in relationships:
# Handle relationship assignments
if isinstance(value, list):
# Handle many-to-many or one-to-many relationships
setattr(instance, key, value)
else:
# Handle many-to-one or one-to-one relationships
setattr(instance, key, value)
# Handle user tracking
if hasattr(instance, "creds"):
person_id = getattr(instance.creds, "person_id", None)
person_name = getattr(instance.creds, "person_name", None)
if person_id and person_name:
instance.created_by_id = instance.creds.get("person_id", "Unknown")
instance.created_by = instance.creds.get("person_name", "Unknown")
session.add(instance)
session.flush()
if should_close_session:
session.commit()
return instance
except Exception:
if should_close_session:
session.rollback()
raise
finally:
if should_close_session:
session.close()
@classmethod
@contextmanager
def bulk_create(
cls, session: Optional[Session] = None
) -> Generator[Session, None, None]:
"""Context manager for bulk creating instances.
Args:
session: Optional existing session to use. If not provided, creates a new one.
Yields:
SQLAlchemy session for creating multiple instances
Example:
# Bulk create multiple instances in one transaction
with User.bulk_create() as session:
user1 = User.create(session=session, name="John")
user2 = User.create(session=session, name="Jane")
# Both creates share the same transaction
"""
should_close_session = session is None
if session is None:
session = cls().get_session()
try:
yield session
if should_close_session:
session.commit()
except Exception:
if should_close_session:
session.rollback()
raise
finally:
if should_close_session:
session.close()
# @router.put("/users/{user_id}")
# async def update_user(
# user_id: str,
# update_data: Dict[str, Any],
# db: Session = Depends(get_db_session)
# ):
# user = db.query(User).filter(User.id == user_id).first()
# if not user:
# raise HTTPException(status_code=404, detail="User not found")
#
# updated_user = user.update(**update_data)
# return updated_user
#
#
# @router.post("/users")
# async def create_user(
# user_data: Dict[str, Any],
# db: Session = Depends(get_db_session)
# ):
# with User.create_with_session(**user_data) as new_user:
# return new_user

View File

@@ -0,0 +1,535 @@
"""
Advanced filtering functionality for SQLAlchemy models.
This module provides a comprehensive set of filtering capabilities for SQLAlchemy models,
including pagination, ordering, and complex query building.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional, Type, TypeVar, Union, Tuple, Protocol
from dataclasses import dataclass
from json import dumps
from sqlalchemy import BinaryExpression, desc, asc
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Query, Session
from sqlalchemy.sql.elements import BinaryExpression
from ApiLibrary import system_arrow
from ApiLibrary.common.line_number import get_line_number_for_error
from ErrorHandlers.Exceptions.api_exc import HTTPExceptionApi
from Services.PostgresDb.Models.response import PostgresResponse
# Type variable for class methods returning self
T = TypeVar("T", bound="FilterAttributes")
class HTTPException(Exception):
"""Base exception for HTTP errors."""
def __init__(self, status_code: str, detail: str):
self.status_code = status_code
self.detail = detail
super().__init__(detail)
class HTTPStatus(Protocol):
"""Protocol defining required HTTP status codes."""
HTTP_400_BAD_REQUEST: str
HTTP_404_NOT_FOUND: str
HTTP_304_NOT_MODIFIED: str
@dataclass
class FilterConfig:
"""Configuration for filtering and pagination."""
page: int = 1
size: int = 10
order_field: str = "id"
order_type: str = "asc"
include_joins: List[str] = None
query: Dict[str, Any] = None
def __post_init__(self):
"""Initialize default values for None fields."""
self.include_joins = self.include_joins or []
self.query = self.query or {}
class QueryConfig:
"""Configuration for query building and execution."""
def __init__(
self,
pre_query: Optional[Query] = None,
filter_config: Optional[FilterConfig] = None,
http_exception: Optional[Type[HTTPException]] = HTTPException,
status: Optional[Type[HTTPStatus]] = None,
):
self.pre_query = pre_query
self.filter_config = filter_config or FilterConfig()
self.http_exception = http_exception
self.status = status
self.total_count: Optional[int] = None
def update_filter_config(self, **kwargs) -> None:
"""Update filter configuration parameters."""
for key, value in kwargs.items():
if hasattr(self.filter_config, key):
setattr(self.filter_config, key, value)
def set_total_count(self, count: int) -> None:
"""Set the total count of records."""
self.total_count = count
class FilterAttributes:
"""
Advanced filtering capabilities for SQLAlchemy models.
Features:
- Pagination and ordering
- Complex query building
- Active/deleted/confirmed status filtering
- Expiry date handling
- Transaction management
Usage:
# Initialize configuration
config = QueryConfig(filter_config=FilterConfig(page=1, size=10))
# Create model with configuration
class User(FilterAttributes):
query_config = config
# Filter multiple records
users = User.filter_by_all(db, name="John").data
# Update configuration
User.query_config.update_filter_config(page=2, size=20)
next_users = User.filter_all(db).data
"""
__abstract__ = True
# Class-level configuration
query_config: QueryConfig = QueryConfig()
@classmethod
def flush(cls: Type[T], db: Session) -> T:
"""
Flush the current session to the database.
Args:
db: Database session
Returns:
Self instance
Raises:
HTTPException: If database operation fails
"""
try:
db.flush()
return cls
except SQLAlchemyError as e:
raise HTTPExceptionApi(
error_code="HTTP_304_NOT_MODIFIED",
lang=cls.lang or "tr",
loc=get_line_number_for_error(),
sys_msg=str(e),
)
@classmethod
def destroy(cls: Type[T], db: Session) -> None:
"""
Delete the record from the database.
Args:
db: Database session
"""
db.delete(cls)
db.commit()
@classmethod
def save_via_metadata(cls: Type[T], db: Session) -> None:
"""
Save or rollback based on metadata.
Args:
db: Database session
Raises:
HTTPException: If save operation fails
"""
try:
meta_data = getattr(cls, "meta_data", {})
if meta_data.get("created", False):
db.commit()
db.rollback()
except SQLAlchemyError as e:
raise HTTPExceptionApi(
error_code="HTTP_304_NOT_MODIFIED",
lang=cls.lang or "tr",
loc=get_line_number_for_error(),
sys_msg=str(e),
)
@classmethod
def save(cls: Type[T], db: Session) -> None:
"""
Commit changes to database.
Args:
db: Database session
Raises:
HTTPException: If commit fails
"""
try:
db.commit()
except SQLAlchemyError as e:
raise HTTPExceptionApi(
error_code="HTTP_304_NOT_MODIFIED",
lang=cls.lang or "tr",
loc=get_line_number_for_error(),
sys_msg=str(e),
)
@classmethod
def rollback(cls: Type[T], db: Session) -> None:
"""
Rollback current transaction.
Args:
db: Database session
"""
db.rollback()
@classmethod
def save_and_confirm(cls: Type[T], db: Session) -> None:
"""
Save changes and mark record as confirmed.
Args:
db: Database session
Raises:
HTTPException: If operation fails
"""
try:
cls.save(db)
cls.update(db, is_confirmed=True)
cls.save(db)
except SQLAlchemyError as e:
raise HTTPExceptionApi(
error_code="HTTP_304_NOT_MODIFIED",
lang=cls.lang or "tr",
loc=get_line_number_for_error(),
sys_msg=str(e),
)
@classmethod
def _query(cls: Type[T], db: Session) -> Query:
"""
Get base query for model.
Args:
db: Database session
Returns:
SQLAlchemy Query object
"""
return (
cls.query_config.pre_query if cls.query_config.pre_query else db.query(cls)
)
@classmethod
def add_query_to_filter(
cls: Type[T], query: Query, filter_list: Dict[str, Any]
) -> Query:
"""
Add pagination and ordering to query.
Args:
query: Base query
filter_list: Dictionary containing pagination and ordering parameters
Returns:
Modified query with pagination and ordering
"""
order_field = getattr(cls, filter_list.get("order_field"))
order_func = desc if str(filter_list.get("order_type"))[0] == "d" else asc
return (
query.order_by(order_func(order_field))
.limit(filter_list.get("size"))
.offset((filter_list.get("page") - 1) * filter_list.get("size"))
.populate_existing()
)
@classmethod
def get_filter_attributes(cls) -> Dict[str, Any]:
"""
Get filter configuration from attributes.
Returns:
Dictionary containing pagination and filtering parameters
"""
return {
"page": getattr(cls.query_config.filter_config, "page", 1),
"size": getattr(cls.query_config.filter_config, "size", 10),
"order_field": getattr(cls.query_config.filter_config, "order_field", "id"),
"order_type": getattr(cls.query_config.filter_config, "order_type", "asc"),
"include_joins": getattr(
cls.query_config.filter_config, "include_joins", []
),
"query": getattr(cls.query_config.filter_config, "query", {}),
}
@classmethod
def add_new_arg_to_args(
cls,
args_list: Tuple[BinaryExpression, ...],
argument: str,
value: BinaryExpression,
) -> Tuple[BinaryExpression, ...]:
"""
Add new argument to filter arguments if not exists.
Args:
args_list: Current filter arguments
argument: Argument name to check
value: New argument to add
Returns:
Updated argument tuple
"""
new_args = [arg for arg in args_list if isinstance(arg, BinaryExpression)]
arg_left = lambda arg_obj: getattr(getattr(arg_obj, "left", None), "key", None)
if not any(arg_left(arg) == argument for arg in new_args):
new_args.append(value)
return tuple(new_args)
@classmethod
def get_not_expired_query_arg(
cls, args: Tuple[BinaryExpression, ...]
) -> Tuple[BinaryExpression, ...]:
"""
Add expiry date conditions to query.
Args:
args: Current query arguments
Returns:
Updated arguments with expiry conditions
"""
current_time = str(system_arrow.now())
args = cls.add_new_arg_to_args(
args, "expiry_ends", cls.expiry_ends > current_time
)
args = cls.add_new_arg_to_args(
args, "expiry_starts", cls.expiry_starts <= current_time
)
return args
@classmethod
def get_active_and_confirmed_query_arg(
cls, args: Tuple[BinaryExpression, ...]
) -> Tuple[BinaryExpression, ...]:
"""
Add status conditions to query.
Args:
args: Current query arguments
Returns:
Updated arguments with status conditions
"""
args = cls.add_new_arg_to_args(args, "is_confirmed", cls.is_confirmed == True)
args = cls.add_new_arg_to_args(args, "active", cls.active == True)
args = cls.add_new_arg_to_args(args, "deleted", cls.deleted == False)
return args
@classmethod
def select_only(
cls: Type[T],
db: Session,
*args: BinaryExpression,
select_args: List[Any],
order_by: Optional[Any] = None,
limit: Optional[int] = None,
system: bool = False,
) -> PostgresResponse:
"""
Select specific columns from filtered query.
Args:
db: Database session
args: Filter conditions
select_args: Columns to select
order_by: Optional ordering
limit: Optional result limit
system: If True, skip status filtering
Returns:
Query response with selected columns
"""
if not system:
args = cls.get_active_and_confirmed_query_arg(args)
args = cls.get_not_expired_query_arg(args)
query = cls._query(db).filter(*args).with_entities(*select_args)
cls.query_config.set_total_count(query.count())
if order_by is not None:
query = query.order_by(order_by)
if limit:
query = query.limit(limit)
return PostgresResponse(query=query, first=False)
@classmethod
def filter_by_all(
cls: Type[T], db: Session, system: bool = False, **kwargs
) -> PostgresResponse:
"""
Filter multiple records by keyword arguments.
Args:
db: Database session
system: If True, skip status filtering
**kwargs: Filter criteria
Returns:
Query response with matching records
"""
if "is_confirmed" not in kwargs and not system:
kwargs["is_confirmed"] = True
kwargs.pop("system", None)
query = cls._query(db).filter_by(**kwargs)
cls.query_config.set_total_count(query.count())
if cls.query_config.filter_config:
filter_list = cls.get_filter_attributes()
query = cls.add_query_to_filter(query, filter_list)
return PostgresResponse(query=query, first=False)
@classmethod
def filter_by_one(
cls: Type[T], db: Session, system: bool = False, **kwargs
) -> PostgresResponse:
"""
Filter single record by keyword arguments.
Args:
db: Database session
system: If True, skip status filtering
**kwargs: Filter criteria
Returns:
Query response with single record
"""
if "is_confirmed" not in kwargs and not system:
kwargs["is_confirmed"] = True
kwargs.pop("system", None)
query = cls._query(db).filter_by(**kwargs)
cls.query_config.set_total_count(1)
return PostgresResponse(query=query, first=True)
@classmethod
def filter_all(
cls: Type[T], *args: Any, db: Session, system: bool = False
) -> PostgresResponse:
"""
Filter multiple records by expressions.
Args:
db: Database session
args: Filter expressions
system: If True, skip status filtering
Returns:
Query response with matching records
"""
if not system:
args = cls.get_active_and_confirmed_query_arg(args)
args = cls.get_not_expired_query_arg(args)
filter_list = cls.get_filter_attributes()
if filter_list.get("query"):
for smart_iter in cls.filter_expr(**filter_list["query"]):
if key := getattr(getattr(smart_iter, "left", None), "key", None):
args = cls.add_new_arg_to_args(args, key, smart_iter)
query = cls._query(db)
cls.query_config.set_total_count(query.count())
query = query.filter(*args)
if cls.query_config.filter_config:
query = cls.add_query_to_filter(query, filter_list)
return PostgresResponse(query=query, first=False)
@classmethod
def filter_one(
cls: Type[T],
*args: Any,
db: Session,
system: bool = False,
expired: bool = False,
) -> PostgresResponse:
"""
Filter single record by expressions.
Args:
db: Database session
args: Filter expressions
system: If True, skip status filtering
expired: If True, include expired records
Returns:
Query response with single record
"""
if not system:
args = cls.get_active_and_confirmed_query_arg(args)
if not expired:
args = cls.get_not_expired_query_arg(args)
query = cls._query(db).filter(*args)
cls.query_config.set_total_count(1)
return PostgresResponse(query=query, first=True)
# @classmethod
# def raise_http_exception(
# cls,
# status_code: str,
# error_case: str,
# data: Dict[str, Any],
# message: str,
# ) -> None:
# """
# Raise HTTP exception with formatted error details.
# Args:
# status_code: HTTP status code string
# error_case: Error type
# data: Additional error data
# message: Error message
# Raises:
# HTTPException: With formatted error details
# """
# raise HTTPExceptionApi(
# error_code="HTTP_304_NOT_MODIFIED",
# lang=cls.lang or "tr", loc=get_line_number_for_error()
# )

549
trash/Models_old/mixins.py Normal file
View File

@@ -0,0 +1,549 @@
"""
PostgreSQL Base Models Module
This module provides base classes for PostgreSQL models with common functionality such as:
- CRUD operations with session management
- Soft delete capability
- Automatic timestamps
- User tracking (created_by, updated_by)
- Data serialization
- Multi-language support
"""
import datetime
from decimal import Decimal
from typing import Any, Dict, List, Optional, Type, TypeVar, Union, cast
from sqlalchemy import (
TIMESTAMP,
NUMERIC,
func,
text,
UUID,
String,
Integer,
Boolean,
SmallInteger,
)
from sqlalchemy.orm import Mapped, mapped_column, Session
from sqlalchemy_mixins.serialize import SerializeMixin
from sqlalchemy_mixins.repr import ReprMixin
from sqlalchemy_mixins.smartquery import SmartQueryMixin
from ApiLibrary import DateTimeLocal, system_arrow
from Services.PostgresDb.Models.base_model import BaseModel
from Services.PostgresDb.Models.filter_functions import FilterAttributes
# Type variable for class methods returning self
T = TypeVar("T", bound="CrudMixin")
class CrudMixin(
BaseModel, SmartQueryMixin, SerializeMixin, ReprMixin, FilterAttributes
):
"""
Base mixin providing CRUD operations and common fields for PostgreSQL models.
Features:
- Automatic timestamps (created_at, updated_at)
- Soft delete capability
- User tracking (created_by, updated_by)
- Data serialization
- Multi-language support
"""
__abstract__ = True
# System fields that should be handled automatically during creation
__system__fields__create__ = (
"created_at",
"updated_at",
"cryp_uu_id",
"created_by",
"created_by_id",
"updated_by",
"updated_by_id",
"replication_id",
"confirmed_by",
"confirmed_by_id",
"is_confirmed",
"deleted",
"active",
"is_notification_send",
"is_email_send",
)
# System fields that should be handled automatically during updates
__system__fields__update__ = (
"cryp_uu_id",
"created_at",
"updated_at",
"created_by",
"created_by_id",
"confirmed_by",
"confirmed_by_id",
"updated_by",
"updated_by_id",
"replication_id",
)
# Default fields to exclude from serialization
__system_default_model__ = [
"cryp_uu_id",
"is_confirmed",
"deleted",
"is_notification_send",
"replication_id",
"is_email_send",
"confirmed_by_id",
"confirmed_by",
"updated_by_id",
"created_by_id",
]
# User credentials and preferences
creds = None
lang: str = "tr"
client_arrow: Optional[DateTimeLocal] = None
valid_record_dict: Dict[str, bool] = {"active": True, "deleted": False}
meta_data: Dict[str, Any] = {}
# Common timestamp fields for all models
expiry_starts: Mapped[TIMESTAMP] = mapped_column(
type_=TIMESTAMP(timezone=True),
server_default=func.now(),
nullable=False,
comment="Record validity start timestamp",
)
expiry_ends: Mapped[TIMESTAMP] = mapped_column(
type_=TIMESTAMP(timezone=True),
default="2099-12-31",
server_default="2099-12-31",
comment="Record validity end timestamp",
)
@classmethod
def set_user_define_properties(cls, token: Any) -> None:
"""
Set user-specific properties from the authentication token.
Args:
token: Authentication token containing user preferences
"""
cls.creds = token.credentials
cls.client_arrow = DateTimeLocal(is_client=True, timezone=token.timezone)
cls.lang = str(token.lang).lower()
@classmethod
def remove_non_related_inputs(cls, kwargs: Dict[str, Any]) -> Dict[str, Any]:
"""
Filter out inputs that don't correspond to model fields.
Args:
kwargs: Dictionary of field names and values
Returns:
Dictionary containing only valid model fields
"""
return {
key: value
for key, value in kwargs.items()
if key in cls.columns + cls.hybrid_properties + cls.settable_relations
}
@classmethod
def extract_system_fields(
cls, filter_kwargs: dict, create: bool = True
) -> Dict[str, Any]:
"""
Remove system-managed fields from input dictionary.
Args:
filter_kwargs: Input dictionary of fields
create: If True, use creation field list, else use update field list
Returns:
Dictionary with system fields removed
"""
system_fields = filter_kwargs.copy()
extract_fields = (
cls.__system__fields__create__ if create else cls.__system__fields__update__
)
for field in extract_fields:
system_fields.pop(field, None)
return system_fields
@classmethod
def iterate_over_variables(cls, val: Any, key: str) -> tuple[bool, Optional[Any]]:
"""
Process a field value based on its type and convert it to the appropriate format.
Args:
val: Field value
key: Field name
Returns:
Tuple of (should_include, processed_value)
"""
key_ = cls.__annotations__.get(key, None)
is_primary = key in cls.primary_keys
row_attr = bool(getattr(getattr(cls, key), "foreign_keys", None))
# Skip primary keys and foreign keys
if is_primary or row_attr:
return False, None
# Handle None values
if val is None:
return True, None
# Special handling for UUID fields
if str(key[-5:]).lower() == "uu_id":
return True, str(val)
# Handle typed fields
if key_:
if key_ == Mapped[int]:
return True, int(val)
elif key_ == Mapped[bool]:
return True, bool(val)
elif key_ == Mapped[float] or key_ == Mapped[NUMERIC]:
return True, round(float(val), 3)
elif key_ == Mapped[TIMESTAMP]:
return True, str(
cls.client_arrow.get(str(val)).format("DD-MM-YYYY HH:mm:ss +0")
)
elif key_ == Mapped[str]:
return True, str(val)
# Handle based on Python types
else:
if isinstance(val, datetime.datetime):
return True, str(
cls.client_arrow.get(str(val)).format("DD-MM-YYYY HH:mm:ss +0")
)
elif isinstance(val, bool):
return True, bool(val)
elif isinstance(val, (float, Decimal)):
return True, round(float(val), 3)
elif isinstance(val, int):
return True, int(val)
elif isinstance(val, str):
return True, str(val)
elif val is None:
return True, None
return False, None
@classmethod
def find_or_create(cls: Type[T], db: Session, **kwargs) -> T:
"""
Find an existing record matching the criteria or create a new one.
Args:
db: Database session
**kwargs: Search/creation criteria
Returns:
Existing or newly created record
"""
check_kwargs = cls.extract_system_fields(kwargs)
# Search for existing record
query = db.query(cls).filter(
cls.expiry_ends > str(system_arrow.now()),
cls.expiry_starts <= str(system_arrow.now()),
)
for key, value in check_kwargs.items():
if hasattr(cls, key):
query = query.filter(getattr(cls, key) == value)
already_record = query.first()
# Handle existing record
if already_record:
if already_record.deleted:
already_record.meta_data = {
"created": False,
"error_case": "DeletedRecord",
"message": "",
}
return already_record
elif not already_record.is_confirmed:
already_record.meta_data = {
"created": False,
"error_case": "IsNotConfirmed",
"message": "",
}
return already_record
already_record.meta_data = {
"created": False,
"error_case": "AlreadyExists",
"message": "",
}
return already_record
# Create new record
check_kwargs = cls.remove_non_related_inputs(check_kwargs)
created_record = cls()
for key, value in check_kwargs.items():
setattr(created_record, key, value)
if getattr(cls.creds, "person_id", None) and getattr(
cls.creds, "person_name", None
):
created_record.created_by_id = cls.creds.person_id
created_record.created_by = cls.creds.person_name
db.add(created_record)
db.flush()
created_record.meta_data = {"created": True, "error_case": None, "message": ""}
return created_record
def update(self, db: Session, **kwargs) -> "CrudMixin":
"""
Update the record with new values.
Args:
db: Database session
**kwargs: Fields to update
Returns:
Updated record
Raises:
ValueError: If attempting to update is_confirmed with other fields
"""
check_kwargs = self.remove_non_related_inputs(kwargs)
is_confirmed_argument = kwargs.get("is_confirmed", None)
if is_confirmed_argument and not len(kwargs) == 1:
raise ValueError("Confirm field cannot be updated with other fields")
check_kwargs = self.extract_system_fields(check_kwargs, create=False)
for key, value in check_kwargs.items():
setattr(self, key, value)
# Update confirmation or modification tracking
if is_confirmed_argument:
if getattr(self.creds, "person_id", None) and getattr(
self.creds, "person_name", None
):
self.confirmed_by_id = self.creds.person_id
self.confirmed_by = self.creds.person_name
else:
if getattr(self.creds, "person_id", None) and getattr(
self.creds, "person_name", None
):
self.updated_by_id = self.creds.person_id
self.updated_by = self.creds.person_name
db.flush()
return self
def get_dict(
self,
exclude: Optional[List[str]] = None,
include: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""
Convert model instance to dictionary with customizable fields.
Args:
exclude: List of fields to exclude
include: List of fields to include (takes precedence over exclude)
Returns:
Dictionary representation of the model
"""
return_dict: Dict[str, Any] = {}
if include:
# Handle explicitly included fields
exclude_list = [
element
for element in self.__system_default_model__
if str(element)[-2:] == "id" and str(element)[-5:].lower() == "uu_id"
]
columns_include_list = list(set(include).difference(set(exclude_list)))
columns_include_list.extend(["uu_id"])
for key in columns_include_list:
val = getattr(self, key)
correct, value_of_database = self.iterate_over_variables(val, key)
if correct:
return_dict[key] = value_of_database
elif exclude:
# Handle explicitly excluded fields
exclude.extend(
list(
set(getattr(self, "__exclude__fields__", []) or []).difference(
exclude
)
)
)
exclude.extend(
[
element
for element in self.__system_default_model__
if str(element)[-2:] == "id"
]
)
columns_excluded_list = list(set(self.columns).difference(set(exclude)))
columns_excluded_list.extend(["uu_id", "active"])
for key in columns_excluded_list:
val = getattr(self, key)
correct, value_of_database = self.iterate_over_variables(val, key)
if correct:
return_dict[key] = value_of_database
else:
# Handle default field selection
exclude_list = (
getattr(self, "__exclude__fields__", []) or []
) + self.__system_default_model__
columns_list = list(set(self.columns).difference(set(exclude_list)))
columns_list = [col for col in columns_list if str(col)[-2:] != "id"]
columns_list.extend(
[col for col in self.columns if str(col)[-5:].lower() == "uu_id"]
)
for remove_field in self.__system_default_model__:
if remove_field in columns_list:
columns_list.remove(remove_field)
for key in columns_list:
val = getattr(self, key)
correct, value_of_database = self.iterate_over_variables(val, key)
if correct:
return_dict[key] = value_of_database
return return_dict
class BaseCollection(CrudMixin):
"""Base model class with minimal fields."""
__abstract__ = True
__repr__ = ReprMixin.__repr__
id: Mapped[int] = mapped_column(Integer, primary_key=True)
class CrudCollection(CrudMixin):
"""
Full-featured model class with all common fields.
Includes:
- UUID and reference ID
- Timestamps
- User tracking
- Confirmation status
- Soft delete
- Notification flags
"""
__abstract__ = True
__repr__ = ReprMixin.__repr__
# Primary and reference fields
id: Mapped[int] = mapped_column(Integer, primary_key=True)
uu_id: Mapped[str] = mapped_column(
UUID,
server_default=text("gen_random_uuid()"),
index=True,
unique=True,
comment="Unique identifier UUID",
)
ref_id: Mapped[str] = mapped_column(
String(100), nullable=True, index=True, comment="External reference ID"
)
# Timestamps
created_at: Mapped[TIMESTAMP] = mapped_column(
TIMESTAMP(timezone=True),
server_default=func.now(),
nullable=False,
index=True,
comment="Record creation timestamp",
)
updated_at: Mapped[TIMESTAMP] = mapped_column(
TIMESTAMP(timezone=True),
server_default=func.now(),
onupdate=func.now(),
nullable=False,
index=True,
comment="Last update timestamp",
)
# Cryptographic and user tracking
cryp_uu_id: Mapped[str] = mapped_column(
String, nullable=True, index=True, comment="Cryptographic UUID"
)
created_by: Mapped[str] = mapped_column(
String, nullable=True, comment="Creator name"
)
created_by_id: Mapped[int] = mapped_column(
Integer, nullable=True, comment="Creator ID"
)
updated_by: Mapped[str] = mapped_column(
String, nullable=True, comment="Last modifier name"
)
updated_by_id: Mapped[int] = mapped_column(
Integer, nullable=True, comment="Last modifier ID"
)
confirmed_by: Mapped[str] = mapped_column(
String, nullable=True, comment="Confirmer name"
)
confirmed_by_id: Mapped[int] = mapped_column(
Integer, nullable=True, comment="Confirmer ID"
)
# Status flags
is_confirmed: Mapped[bool] = mapped_column(
Boolean, server_default="0", comment="Record confirmation status"
)
replication_id: Mapped[int] = mapped_column(
SmallInteger, server_default="0", comment="Replication identifier"
)
deleted: Mapped[bool] = mapped_column(
Boolean, server_default="0", comment="Soft delete flag"
)
active: Mapped[bool] = mapped_column(
Boolean, server_default="1", comment="Record active status"
)
is_notification_send: Mapped[bool] = mapped_column(
Boolean, server_default="0", comment="Notification sent flag"
)
is_email_send: Mapped[bool] = mapped_column(
Boolean, server_default="0", comment="Email sent flag"
)
@classmethod
def retrieve_language_model(cls, lang: str, response_model: Any) -> Dict[str, str]:
"""
Retrieve language-specific model headers and validation messages.
Args:
lang: Language code
response_model: Model containing language annotations
Returns:
Dictionary of field names to localized headers
"""
headers_and_validation = {}
__language_model__ = getattr(cls.__language_model__, lang, "tr")
for field in response_model.__annotations__.keys():
headers_and_validation[field] = getattr(
__language_model__, field, "Lang Not found"
)
return headers_and_validation

44
trash/Models_old/query.py Normal file
View File

@@ -0,0 +1,44 @@
from typing import Any, List, Optional, TypeVar, Union
from sqlalchemy.orm import Query
from sqlalchemy.orm.session import Session
T = TypeVar("T")
class QueryResponse:
"""Handler for SQLAlchemy query results with error handling."""
def __init__(self, db: Session, query: Query, first: bool = False):
self.db = db
self.first = first
self.__query = query
def get(self, index: int) -> Optional[T]:
"""Get item at specific index if it exists."""
count = self.count
if count and not index > count:
return self.data[index - 1]
return None
@property
def data(self) -> Union[Optional[T], List[T]]:
"""Get query results with error handling."""
try:
if self.first:
return self.__query.first()
return self.__query.all()
except Exception as e:
# Handle any database errors by rolling back
self.db.rollback()
return None if self.first else []
@property
def count(self) -> int:
"""Get total count of query results."""
return self.__query.count()
@property
def query(self) -> Query:
"""Get the underlying SQLAlchemy query."""
return self.__query

View File

@@ -0,0 +1,90 @@
"""
Response handler for PostgreSQL query results.
This module provides a wrapper class for SQLAlchemy query results,
adding convenience methods for accessing data and managing query state.
"""
from typing import Any, Dict, List, Optional, TypeVar, Generic, Union
from sqlalchemy.orm import Query
T = TypeVar("T")
class PostgresResponse(Generic[T]):
"""
Wrapper for PostgreSQL/SQLAlchemy query results.
Attributes:
query: SQLAlchemy query object
first: Whether to return first result only
data: Query results (lazy loaded)
count: Total count of results
Properties:
all: All results as list
first_item: First result only
"""
def __init__(
self,
query: Query,
first: bool = False,
status: bool = True,
message: str = "",
error: Optional[str] = None,
):
self._query = query
self._first = first
self.status = status
self.message = message
self.error = error
self._data: Optional[Union[List[T], T]] = None
self._count: Optional[int] = None
@property
def query(self) -> Query:
"""Get query object."""
return self._query
@property
def data(self) -> Union[List[T], T, None]:
"""
Lazy load and return query results.
Returns first item if first=True, otherwise returns all results.
"""
if self._data is None:
results = self._query.all()
self._data = results[0] if self._first and results else results
return self._data
@property
def count(self) -> int:
"""Lazy load and return total count of results."""
if self._count is None:
self._count = self._query.count()
return self._count
@property
def all(self) -> List[T]:
"""Get all results as list."""
return (
self.data
if isinstance(self.data, list)
else [self.data] if self.data else []
)
@property
def first(self) -> Optional[T]:
"""Get first result only."""
return self.data if self._first else (self.data[0] if self.data else None)
def as_dict(self) -> Dict[str, Any]:
"""Convert response to dictionary format."""
return {
"status": self.status,
"message": self.message,
"data": self.data,
"count": self.count,
"error": self.error,
}

33
trash/abstract_class.py Normal file
View File

@@ -0,0 +1,33 @@
class ClusterToMethod:
TAGS: list = ["Tag or Router"]
PREFIX: str = "/..."
PAGEINFO: PageInfo
ENDPOINTS: list # [MethodEvent, ...]
SUBCATEGORY: List[ClassVar[Any]] = [ClusterToMethod, ...]
def retrieve_all_function_codes():
"""
[FUNCTION_CODE, ...]
self.ENDPOINTS -> iter()
"""
pass
def retrieve_page_info():
"""
PAGE_INFO:ClusterToMethod = {
"PageInfo": {...}
"subCategory": PAGE_INFO:ClusterToMethod
}
PAGE_INFO:ClusterToMethod = {
"PageInfo": {...}
"subCategory": PAGE_INFO:ClusterToMethod
}
"""
pass
def retrieve_redis_value() -> Dict:
"""
Key(CLUSTER_FUNCTION_CODES:ClusterToMethod) : Value(PAGE_INFO, [FUNCTION_CODE, ...])
"""
pass

957
trash/abstract_class_old.py Normal file
View File

@@ -0,0 +1,957 @@
"""
Abstract base classes for API route and event handling.
This module provides core abstractions for route configuration and factory,
with support for authentication and event handling.
"""
import uuid
import inspect
from typing import (
Tuple,
TypeVar,
Optional,
Callable,
Dict,
Any,
List,
Type,
ClassVar,
Union,
Set,
)
from collections import defaultdict
from dataclasses import dataclass, field
from pydantic import BaseModel
from fastapi import Request, Depends, APIRouter
from functools import wraps
from ApiLayers.ApiLibrary.common.line_number import get_line_number_for_error
from ApiLayers.ErrorHandlers.Exceptions.api_exc import HTTPExceptionApi
from ApiLayers.Schemas.rules.rules import EndpointRestriction
ResponseModel = TypeVar("ResponseModel", bound=BaseModel)
def endpoint_wrapper(url_of_endpoint: Optional[str] = None):
"""Create a wrapper for endpoints that stores url_of_endpoint in closure.
Args:
url_of_endpoint: Optional URL path for the endpoint
"""
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
@wraps(func)
async def wrapper(
*args: Any, **kwargs: Any
) -> Union[Dict[str, Any], BaseModel]:
# Handle both async and sync functions
if inspect.iscoroutinefunction(func):
result = await func(*args, **kwargs)
else:
result = func(*args, **kwargs)
# If result is a coroutine, await it
if inspect.iscoroutine(result):
result = await result
# Add endpoint to the result
if isinstance(result, dict):
result["endpoint"] = url_of_endpoint
return result
elif isinstance(result, BaseModel):
# Convert Pydantic model to dict and add endpoint
result_dict = result.model_dump()
result_dict["endpoint"] = url_of_endpoint
return result_dict
return result
wrapper.url_of_endpoint = url_of_endpoint
return wrapper
return decorator
@dataclass
class EndpointFactoryConfig:
"""Configuration class for API endpoints.
Attributes:
url_of_endpoint: Full URL path for this endpoint
endpoint: URL path for this endpoint
method: HTTP method (GET, POST, etc.)
summary: Short description for API documentation
description: Detailed description for API documentation
endpoint_function: Function to handle the endpoint
is_auth_required: Whether authentication is required
response_model: Optional response model for OpenAPI schema
request_model: Optional request model for OpenAPI schema
is_event_required: Whether event handling is required
extra_options: Additional endpoint options
"""
url_prefix: str
url_endpoint: str
url_of_endpoint: str
endpoint: str
method: str
summary: str
description: str
endpoint_function: Callable[..., Any] # Now accepts any parameters and return type
response_model: Optional[type] = None
request_model: Optional[type] = None
is_auth_required: bool = True
is_event_required: bool = False
extra_options: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self):
"""Post initialization hook.
Wraps endpoint function with appropriate middleware based on configuration:
- If auth and event required -> wrap with TokenEventMiddleware
- If only event required -> wrap with EventMiddleware
- If only auth required -> wrap with MiddlewareModule.auth_required
"""
# First apply auth/event middleware
if self.is_event_required:
from middleware import TokenEventMiddleware
self.endpoint_function = TokenEventMiddleware.event_required(
self.endpoint_function
)
elif self.is_auth_required:
from middleware import MiddlewareModule
self.endpoint_function = MiddlewareModule.auth_required(
self.endpoint_function
)
# Then wrap with endpoint_wrapper to store url_of_endpoint
self.endpoint_function = endpoint_wrapper(self.url_of_endpoint)(
self.endpoint_function
)
class RouteFactoryConfig:
"""Configuration class for API route factories.
Attributes:
name: Route name
tags: List of tags for API documentation
prefix: URL prefix for all endpoints in this route
include_in_schema: Whether to include in OpenAPI schema
endpoints: List of endpoint configurations
extra_options: Additional route options
"""
def __init__(
self,
name: str,
tags: List[str],
prefix: str,
include_in_schema: bool = True,
endpoints: List[EndpointFactoryConfig] = None,
extra_options: Dict[str, Any] = None,
):
self.name = name
self.tags = tags
self.prefix = prefix
self.include_in_schema = include_in_schema
self.endpoints = endpoints or []
self.extra_options = extra_options or {}
def __post_init__(self):
"""Validate and normalize configuration after initialization."""
if self.endpoints is None:
self.endpoints = []
if self.extra_options is None:
self.extra_options = {}
def as_dict(self) -> Dict[str, Any]:
"""Convert configuration to dictionary format."""
return {
"name": self.name,
"tags": self.tags,
"prefix": self.prefix,
"include_in_schema": self.include_in_schema,
"endpoints": [endpoint.__dict__ for endpoint in self.endpoints],
"extra_options": self.extra_options,
}
class MethodToEvent:
"""Base class for mapping methods to API events with type safety and endpoint configuration.
This class provides a framework for handling API events with proper
type checking for tokens and response models, as well as managing
endpoint configurations and frontend page structure.
Type Parameters:
TokenType: Type of authentication token
ResponseModel: Type of response model
Class Variables:
action_key: Unique identifier for the action
event_type: Type of event (e.g., 'query', 'command')
event_description: Human-readable description of the event
event_category: Category for grouping related events
__event_keys__: Mapping of UUIDs to event names
__event_validation__: Validation rules for events
__endpoint_config__: API endpoint configuration
__page_info__: Frontend page configuration
"""
action_key: ClassVar[Optional[str]] = None
event_type: ClassVar[Optional[str]] = None
event_description: ClassVar[str] = ""
event_category: ClassVar[str] = ""
__event_keys__: ClassVar[Dict[str, str]] = {}
__event_validation__: Dict[str, Tuple[Type, Union[List, tuple]]] = {}
__endpoint_config__: ClassVar[Dict[str, Dict[str, Any]]] = {
"endpoints": {}, # Mapping of event UUIDs to endpoint configs
"router_prefix": "", # Router prefix for all endpoints in this class
"tags": [], # OpenAPI tags
}
__page_info__: ClassVar[Dict[str, Any]] = {
"name": "", # Page name (e.g., "AccountPage")
"title": {"tr": "", "en": ""}, # Multi-language titles
"icon": "", # Icon name
"url": "", # Frontend route
"component": None, # Optional component name
"parent": None, # Parent page name if this is a subpage
}
@classmethod
def register_endpoint(
cls,
event_uuid: str,
path: str,
method: str = "POST",
response_model: Optional[Type] = None,
**kwargs,
) -> None:
"""Register an API endpoint configuration for an event.
Args:
event_uuid: UUID of the event
path: Endpoint path (will be prefixed with router_prefix)
method: HTTP method (default: POST)
response_model: Pydantic model for response
**kwargs: Additional FastAPI endpoint parameters
"""
if event_uuid not in cls.__event_keys__:
raise ValueError(f"Event UUID {event_uuid} not found in {cls.__name__}")
cls.__endpoint_config__["endpoints"][event_uuid] = {
"path": path,
"method": method,
"response_model": response_model,
**kwargs,
}
@classmethod
def configure_router(cls, prefix: str, tags: List[str]) -> None:
"""Configure the API router settings.
Args:
prefix: Router prefix for all endpoints
tags: OpenAPI tags for documentation
"""
cls.__endpoint_config__["router_prefix"] = prefix
cls.__endpoint_config__["tags"] = tags
@classmethod
def configure_page(
cls,
name: str,
title: Dict[str, str],
icon: str,
url: str,
component: Optional[str] = None,
parent: Optional[str] = None,
) -> None:
"""Configure the frontend page information.
Args:
name: Page name
title: Multi-language titles (must include 'tr' and 'en')
icon: Icon name
url: Frontend route
component: Optional component name
parent: Parent page name for subpages
"""
required_langs = {"tr", "en"}
if not all(lang in title for lang in required_langs):
raise ValueError(
f"Title must contain all required languages: {required_langs}"
)
cls.__page_info__.update(
{
"name": name,
"title": title,
"icon": icon,
"url": url,
"component": component,
"parent": parent,
}
)
@classmethod
def get_endpoint_config(cls) -> Dict[str, Any]:
"""Get the complete endpoint configuration."""
return cls.__endpoint_config__
@classmethod
def get_page_info(cls) -> Dict[str, Any]:
"""Get the frontend page configuration."""
return cls.__page_info__
@classmethod
def has_available_events(cls, user_permission_uuids: Set[str]) -> bool:
"""Check if any events are available based on user permissions."""
return bool(set(cls.__event_keys__.keys()) & user_permission_uuids)
@classmethod
def get_page_info_with_permissions(
cls, user_permission_uuids: Set[str], include_endpoints: bool = False
) -> Optional[Dict[str, Any]]:
"""Get page info if user has required permissions.
Args:
user_permission_uuids: Set of UUIDs the user has permission for
include_endpoints: Whether to include available endpoint information
Returns:
Dict with page info if user has permissions, None otherwise
"""
# Check if user has any permissions for this page's events
if not cls.has_available_events(user_permission_uuids):
return None
# Start with basic page info
page_info = {
**cls.__page_info__,
"category": cls.event_category,
"type": cls.event_type,
"description": cls.event_description,
}
# Optionally include available endpoints
if include_endpoints:
available_endpoints = {}
for uuid, endpoint in cls.__endpoint_config__["endpoints"].items():
if uuid in user_permission_uuids:
available_endpoints[uuid] = {
"path": f"{cls.__endpoint_config__['router_prefix']}{endpoint['path']}",
"method": endpoint["method"],
"event_name": cls.__event_keys__[uuid],
}
if available_endpoints:
page_info["available_endpoints"] = available_endpoints
return page_info
@classmethod
def get_events_config(cls) -> Dict[str, Any]:
"""Get the complete configuration including events, endpoints, and page info."""
return {
"events": cls.__event_keys__,
"endpoints": cls.__endpoint_config__,
"page_info": cls.__page_info__,
"category": cls.event_category,
"type": cls.event_type,
"description": cls.event_description,
}
@classmethod
def retrieve_event_response_model(cls, function_code: str) -> Any:
"""Retrieve event validation for a specific function.
Args:
function_code: Function identifier
Returns:
Tuple containing response model and language models
"""
event_validation_list = cls.__event_validation__.get(function_code, None)
if not event_validation_list:
raise HTTPExceptionApi(
error_code="",
lang="en",
loc=get_line_number_for_error(),
sys_msg="Function not found",
)
return event_validation_list[0]
@classmethod
def retrieve_event_languages(cls, function_code: str) -> Union[List, tuple]:
"""Retrieve event description for a specific function.
Args:
function_code: Function identifier
Returns:
Event description
"""
event_keys_list = cls.__event_validation__.get(function_code, None)
if not event_keys_list:
raise HTTPExceptionApi(
error_code="",
lang="en",
loc=get_line_number_for_error(),
sys_msg="Function not found",
)
function_language_models: Union[List, tuple] = event_keys_list[1]
if not function_language_models:
raise HTTPExceptionApi(
error_code="",
lang="en",
loc=get_line_number_for_error(),
sys_msg="Function not found",
)
return function_language_models
@staticmethod
def merge_models(language_model: List) -> Dict:
merged_models = {"tr": {}, "en": {}}
for model in language_model:
for lang in dict(model).keys():
if lang not in merged_models:
merged_models[lang] = model[lang]
else:
merged_models[lang].update(model[lang])
return merged_models
@classmethod
def retrieve_event_function(cls, function_code: str) -> Dict[str, str]:
"""Retrieve event parameters for a specific function.
Args:
function_code: Function identifier
Returns:
Dictionary of event parameters
"""
function_event = cls.__event_keys__[function_code]
function_itself = getattr(cls, function_event, None)
if not function_itself:
raise HTTPExceptionApi(
error_code="",
lang="en",
loc=get_line_number_for_error(),
sys_msg="Function not found",
)
return function_itself
@classmethod
def retrieve_language_parameters(
cls, function_code: str, language: str = "tr"
) -> Dict[str, Any]:
"""Retrieve language-specific parameters for an event.
Args:
language: Language code (e.g. 'tr', 'en')
function_code: Function identifier
Returns:
Dictionary of language-specific field mappings
"""
event_language_models = cls.retrieve_event_languages(function_code)
event_response_model = cls.retrieve_event_response_model(function_code)
event_response_model_merged = cls.merge_models(event_language_models)
event_response_model_merged_lang = event_response_model_merged[language]
# Map response model fields to language-specific values
only_language_dict = {
field: event_response_model_merged_lang[field]
for field in event_response_model.model_fields
if field in event_response_model_merged_lang
}
"""
__event_validation__ : {"key": [A, B, C]}
Language Model : Language Model that is model pydatnic requires
Language Models : All language_models that is included in Langugage Models Section
Merged Language Models : Merged with all models in list event_validation
"""
return {
"language_model": only_language_dict,
"language_models": event_response_model_merged,
}
class EventMethodRegistry:
"""Registry for mapping event method UUIDs to categories and managing permissions."""
def __init__(self):
self._uuid_map: Dict[str, Tuple[Type[MethodToEvent], str]] = (
{}
) # uuid -> (method_class, event_name)
self._category_events: Dict[str, Set[str]] = defaultdict(
set
) # category -> set of uuids
def register_method(
self, category_name: str, method_class: Type[MethodToEvent]
) -> None:
"""Register a method class with its category."""
# Register all UUIDs from the method
for event_uuid, event_name in method_class.__event_keys__.items():
self._uuid_map[event_uuid] = (method_class, event_name)
self._category_events[category_name].add(event_uuid)
def get_method_by_uuid(
self, event_uuid: str
) -> Optional[Tuple[Type[MethodToEvent], str]]:
"""Get method class and event name by UUID."""
return self._uuid_map.get(event_uuid)
def get_events_for_category(self, category_name: str) -> Set[str]:
"""Get all event UUIDs for a category."""
return self._category_events.get(category_name, set())
class EventCategory:
"""Base class for defining event categories similar to frontend page structure."""
def __init__(
self,
name: str,
title: Dict[str, str],
icon: str,
url: str,
component: Optional[str] = None,
page_info: Any = None,
all_endpoints: Dict[str, Set[str]] = None, # category -> set of event UUIDs
sub_categories: List = None,
):
self.name = name
self.title = self._validate_title(title)
self.icon = icon
self.url = url
self.component = component
self.page_info = page_info
self.all_endpoints = all_endpoints or {}
self.sub_categories = self._process_subcategories(sub_categories or [])
def _validate_title(self, title: Dict[str, str]) -> Dict[str, str]:
"""Validate title has required languages."""
required_langs = {"tr", "en"}
if not all(lang in title for lang in required_langs):
raise ValueError(
f"Title must contain all required languages: {required_langs}"
)
return title
def _process_subcategories(
self, categories: List[Union[Dict, "EventCategory"]]
) -> List["EventCategory"]:
"""Process subcategories ensuring they are all EventCategory instances."""
processed = []
for category in categories:
if isinstance(category, dict):
processed.append(EventCategory.from_dict(category))
elif isinstance(category, EventCategory):
processed.append(category)
else:
raise ValueError(f"Invalid subcategory type: {type(category)}")
return processed
def has_available_events(self, user_permission_uuids: Set[str]) -> bool:
"""Check if category has available events based on UUID intersection."""
# Check current category's events
return any(
bool(events & user_permission_uuids)
for events in self.all_endpoints.values()
)
def get_menu_item(
self, user_permission_uuids: Set[str]
) -> Optional[Dict[str, Any]]:
"""Get menu item if category has available events."""
# First check if this category has available events
if not self.has_available_events(user_permission_uuids):
return None
menu_item = {
"name": self.name,
"title": self.title,
"icon": self.icon,
"url": self.url,
}
if self.component:
menu_item["component"] = self.component
# Only process subcategories if parent has permissions
sub_items = []
for subcategory in self.sub_categories:
if sub_menu := subcategory.get_menu_item(user_permission_uuids):
sub_items.append(sub_menu)
if sub_items:
menu_item["items"] = sub_items
return menu_item
def get_available_events(
self, registry: EventMethodRegistry, user_permission_uuids: Set[str]
) -> Dict[str, List[Dict[str, Any]]]:
"""Get available events based on user permission UUIDs."""
available_events = defaultdict(list)
# Process endpoints in current category
category_events = self.all_endpoints.get(self.name, set())
for event_uuid in category_events & user_permission_uuids:
method_info = registry.get_method_by_uuid(event_uuid)
if method_info:
method_class, event_name = method_info
available_events[method_class.event_type].append(
{
"uuid": event_uuid,
"name": event_name,
"description": method_class.event_description,
"category": method_class.event_category,
}
)
# Process subcategories recursively
for subcategory in self.sub_categories:
sub_events = subcategory.get_available_events(
registry, user_permission_uuids
)
for event_type, events in sub_events.items():
available_events[event_type].extend(events)
return dict(available_events)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "EventCategory":
"""Create category from dictionary."""
return cls(
name=data["name"],
title=data["title"],
icon=data["icon"],
url=data["url"],
component=data.get("component"),
page_info=data.get("pageInfo"),
all_endpoints=data.get("allEndpoints", {}),
sub_categories=data.get("subCategories", []),
)
def to_dict(
self,
registry: EventMethodRegistry,
user_permission_uuids: Optional[Set[str]] = None,
) -> Dict[str, Any]:
"""Convert category to dictionary with optional permission filtering."""
result = {
"name": self.name,
"title": self.title,
"icon": self.icon,
"url": self.url,
"pageInfo": self.page_info,
}
if user_permission_uuids is not None:
# Only include endpoints and their info if user has permissions
available_events = self.get_available_events(
registry, user_permission_uuids
)
if available_events:
result["availableEvents"] = available_events
result["allEndpoints"] = self.all_endpoints
else:
# Include all endpoints if no permissions specified
result["allEndpoints"] = self.all_endpoints
# Process subcategories
subcategories = [
sub.to_dict(registry, user_permission_uuids) for sub in self.sub_categories
]
# Only include subcategories that have available events
if user_permission_uuids is None or any(
"availableEvents" in sub for sub in subcategories
):
result["subCategories"] = subcategories
if self.component:
result["component"] = self.component
return result
class EventCategoryManager:
"""Manager class for handling event categories and their relationships."""
def __init__(self):
self.categories: List[EventCategory] = []
self.registry = EventMethodRegistry()
def get_menu_tree(self, user_permission_uuids: Set[str]) -> List[Dict[str, Any]]:
"""Get menu tree based on available events."""
return [
menu_item
for category in self.categories
if (menu_item := category.get_menu_item(user_permission_uuids))
]
def register_category(self, category: EventCategory) -> None:
"""Register a category and its endpoints in the registry."""
self.categories.append(category)
def add_category(self, category: Union[EventCategory, Dict[str, Any]]) -> None:
"""Add a new category."""
if isinstance(category, dict):
category = EventCategory.from_dict(category)
self.register_category(category)
def add_categories(
self, categories: List[Union[EventCategory, Dict[str, Any]]]
) -> None:
"""Add multiple categories at once."""
for category in categories:
self.add_category(category)
def get_category(self, name: str) -> Optional[EventCategory]:
"""Get category by name."""
return next((cat for cat in self.categories if cat.name == name), None)
def get_all_categories(
self, user_permission_uuids: Optional[Set[str]] = None
) -> List[Dict[str, Any]]:
"""Get all categories as dictionary, filtered by user permissions."""
return [
cat.to_dict(self.registry, user_permission_uuids) for cat in self.categories
]
def get_category_endpoints(self, category_name: str) -> Set[str]:
"""Get all endpoint UUIDs for a category."""
category = self.get_category(category_name)
return category.all_endpoints.get(category_name, set()) if category else set()
def get_subcategories(
self, category_name: str, user_permission_uuids: Optional[Set[str]] = None
) -> List[Dict[str, Any]]:
"""Get subcategories for a category."""
category = self.get_category(category_name)
if not category:
return []
return [
sub.to_dict(self.registry, user_permission_uuids)
for sub in category.sub_categories
]
def find_category_by_url(self, url: str) -> Optional[EventCategory]:
"""Find a category by its URL."""
for category in self.categories:
if category.url == url:
return category
for subcategory in category.sub_categories:
if subcategory.url == url:
return subcategory
return None
class EventMethodRegistry:
"""Registry for all MethodToEvent classes and menu building."""
_instance = None
_method_classes: Dict[str, Type[MethodToEvent]] = {}
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
@classmethod
def register_method_class(cls, method_class: Type[MethodToEvent]) -> None:
"""Register a MethodToEvent class."""
if not issubclass(method_class, MethodToEvent):
raise ValueError(
f"{method_class.__name__} must be a subclass of MethodToEvent"
)
page_info = method_class.get_page_info()
cls._method_classes[page_info["name"]] = method_class
@classmethod
def get_all_menu_items(
cls, user_permission_uuids: Set[str], include_endpoints: bool = False
) -> List[Dict[str, Any]]:
"""Get all menu items based on user permissions.
Args:
user_permission_uuids: Set of UUIDs the user has permission for
include_endpoints: Whether to include available endpoint information
Returns:
List of menu items organized in a tree structure
"""
# First get all page infos
page_infos = {}
for method_class in cls._method_classes.values():
if page_info := method_class.get_page_info_with_permissions(
user_permission_uuids, include_endpoints
):
page_infos[page_info["name"]] = page_info
# Build tree structure
menu_tree = []
child_pages = set()
# First pass: identify all child pages
for page_info in page_infos.values():
if page_info.get("parent"):
child_pages.add(page_info["name"])
# Second pass: build tree structure
for name, page_info in page_infos.items():
# Skip if this is a child page
if name in child_pages:
continue
# Start with this page's info
menu_item = page_info.copy()
# Find and add children
children = []
for child_info in page_infos.values():
if child_info.get("parent") == name:
children.append(child_info)
if children:
menu_item["items"] = sorted(children, key=lambda x: x["name"])
menu_tree.append(menu_item)
return sorted(menu_tree, key=lambda x: x["name"])
@classmethod
def get_available_endpoints(
cls, user_permission_uuids: Set[str]
) -> Dict[str, Dict[str, Any]]:
"""Get all available endpoints based on user permissions.
Args:
user_permission_uuids: Set of UUIDs the user has permission for
Returns:
Dict mapping event UUIDs to endpoint configurations
"""
available_endpoints = {}
for method_class in cls._method_classes.values():
if page_info := method_class.get_page_info_with_permissions(
user_permission_uuids, include_endpoints=True
):
if endpoints := page_info.get("available_endpoints"):
available_endpoints.update(endpoints)
return available_endpoints
"""
Example usage
# Register your MethodToEvent classes
registry = EventMethodRegistry()
registry.register_method_class(AccountEventMethods)
registry.register_method_class(AccountDetailsEventMethods)
# Get complete menu structure
user_permissions = {
"uuid1",
"uuid2",
"uuid3"
}
menu_items = registry.get_all_menu_items(user_permissions, include_endpoints=True)
# Result:
[
{
"name": "AccountPage",
"title": {"tr": "Hesaplar", "en": "Accounts"},
"icon": "User",
"url": "/account",
"category": "account",
"type": "query",
"description": "Account management operations",
"available_endpoints": {
"uuid1": {"path": "/api/account/view", "method": "GET"},
"uuid2": {"path": "/api/account/edit", "method": "POST"}
},
"items": [
{
"name": "AccountDetailsPage",
"title": {"tr": "Hesap Detayları", "en": "Account Details"},
"icon": "FileText",
"url": "/account/details",
"parent": "AccountPage",
"category": "account_details",
"type": "query",
"available_endpoints": {
"uuid3": {"path": "/api/account/details/view", "method": "GET"}
}
}
]
}
]
# Get all available endpoints
endpoints = registry.get_available_endpoints(user_permissions)
# Result:
{
"uuid1": {
"path": "/api/account/view",
"method": "GET",
"event_name": "view_account"
},
"uuid2": {
"path": "/api/account/edit",
"method": "POST",
"event_name": "edit_account"
},
"uuid3": {
"path": "/api/account/details/view",
"method": "GET",
"event_name": "view_details"
}
}
# Get event UUIDs from MethodToEvent classes
account_events = {uuid for uuid in AccountEventMethods.__event_keys__}
# Define categories with event UUIDs
PAGES_INFO = [
{
"name": "AccountPage",
"title": {"tr": "Hesaplar", "en": "Accounts"},
"icon": "User",
"url": "/account",
"pageInfo": AccountPageInfo,
"allEndpoints": {"AccountPage": account_events},
"subCategories": [
{
"name": "AccountDetailsPage",
"title": {"tr": "Hesap Detayları", "en": "Account Details"},
"icon": "FileText",
"url": "/account/details",
"allEndpoints": {} # No direct endpoints, only shown if parent has permissions
}
]
}
]
# Initialize manager
manager = EventCategoryManager()
manager.add_categories(PAGES_INFO)
# Get menu tree based on available events
user_permission_uuids = {
"31f4f32f-0cd4-4995-8a6a-f9f56335848a",
"ec98ef2c-bcd0-432d-a8f4-1822a56c33b2"
}
menu_tree = manager.get_menu_tree(user_permission_uuids)
"""

View File

@@ -25,17 +25,20 @@ from ApiLayers.Schemas import (
BuildParts,
RelationshipEmployee2Build,
Companies,
Departments,
Duties,
Departments,
Duties,
Duty,
Staff,
Employees,
Event2Employee,
Event2Occupant,
OccupantTypes,
Users
Users,
)
from ApiLayers.ApiServices.Token.token_handler import (
OccupantTokenObject,
EmployeeTokenObject,
)
from ApiLayers.ApiServices.Token.token_handler import OccupantTokenObject, EmployeeTokenObject
from .api_events import (
authentication_login_super_user_event,