updated docs

This commit is contained in:
2025-01-22 21:46:11 +03:00
parent 87e5f5ab06
commit 1ba2694a9d
50 changed files with 3342 additions and 401 deletions

View File

@@ -71,7 +71,10 @@ async def authentication_select_company_or_occupant_type(
if data.is_employee:
return {"selected_company": data.company_uu_id, "completed": True}
elif data.is_occupant:
return {"selected_occupant": data.build_living_space_uu_id, "completed": True}
return {
"selected_occupant": data.build_living_space_uu_id,
"completed": True,
}
return {"completed": False, "selected_company": None, "selected_occupant": None}

View File

@@ -46,8 +46,14 @@ class AccountListEventMethod(MethodToEvent):
"208e6273-17ef-44f0-814a-8098f816b63a": "account_records_list_flt_res",
}
__event_validation__ = {
"7192c2aa-5352-4e36-98b3-dafb7d036a3d": (AccountRecordResponse, [AccountRecords.__language_model__]),
"208e6273-17ef-44f0-814a-8098f816b63a": (AccountRecordResponse, [AccountRecords.__language_model__]),
"7192c2aa-5352-4e36-98b3-dafb7d036a3d": (
AccountRecordResponse,
[AccountRecords.__language_model__],
),
"208e6273-17ef-44f0-814a-8098f816b63a": (
AccountRecordResponse,
[AccountRecords.__language_model__],
),
}
@classmethod
@@ -226,7 +232,10 @@ class AccountCreateEventMethod(MethodToEvent):
"31f4f32f-0cd4-4995-8a6a-f9f56335848a": "account_records_create",
}
__event_validation__ = {
"31f4f32f-0cd4-4995-8a6a-f9f56335848a": (InsertAccountRecord, [AccountRecords.__language_model__]),
"31f4f32f-0cd4-4995-8a6a-f9f56335848a": (
InsertAccountRecord,
[AccountRecords.__language_model__],
),
}
@classmethod
@@ -314,7 +323,10 @@ class AccountUpdateEventMethod(MethodToEvent):
"ec98ef2c-bcd0-432d-a8f4-1822a56c33b2": "account_records_update",
}
__event_validation__ = {
"ec98ef2c-bcd0-432d-a8f4-1822a56c33b2": (UpdateAccountRecord, [AccountRecords.__language_model__]),
"ec98ef2c-bcd0-432d-a8f4-1822a56c33b2": (
UpdateAccountRecord,
[AccountRecords.__language_model__],
),
}
@classmethod

View File

@@ -39,10 +39,16 @@ class AddressListEventMethod(MethodToEvent):
"52afe375-dd95-4f4b-aaa2-4ec61bc6de52": "address_list_employee",
}
__event_validation__ = {
"9c251d7d-da70-4d63-a72c-e69c26270442": (ListAddressResponse, [Addresses.__language_model__]),
"52afe375-dd95-4f4b-aaa2-4ec61bc6de52": (ListAddressResponse, [Addresses.__language_model__]),
"9c251d7d-da70-4d63-a72c-e69c26270442": (
ListAddressResponse,
[Addresses.__language_model__],
),
"52afe375-dd95-4f4b-aaa2-4ec61bc6de52": (
ListAddressResponse,
[Addresses.__language_model__],
),
}
@classmethod
def address_list_super_user(
cls,
@@ -113,7 +119,10 @@ class AddressCreateEventMethod(MethodToEvent):
"ffdc445f-da10-4ce4-9531-d2bdb9a198ae": "create_address",
}
__event_validation__ = {
"ffdc445f-da10-4ce4-9531-d2bdb9a198ae": (InsertAddress, [Addresses.__language_model__]),
"ffdc445f-da10-4ce4-9531-d2bdb9a198ae": (
InsertAddress,
[Addresses.__language_model__],
),
}
@classmethod
@@ -161,7 +170,10 @@ class AddressSearchEventMethod(MethodToEvent):
"e0ac1269-e9a7-4806-9962-219ac224b0d0": "search_address",
}
__event_validation__ = {
"e0ac1269-e9a7-4806-9962-219ac224b0d0": (SearchAddress, [Addresses.__language_model__]),
"e0ac1269-e9a7-4806-9962-219ac224b0d0": (
SearchAddress,
[Addresses.__language_model__],
),
}
@classmethod
@@ -301,7 +313,10 @@ class AddressUpdateEventMethod(MethodToEvent):
"1f9c3a9c-e5bd-4dcd-9b9a-3742d7e03a27": "update_address",
}
__event_validation__ = {
"1f9c3a9c-e5bd-4dcd-9b9a-3742d7e03a27": (UpdateAddress, [Addresses.__language_model__]),
"1f9c3a9c-e5bd-4dcd-9b9a-3742d7e03a27": (
UpdateAddress,
[Addresses.__language_model__],
),
}
@classmethod

View File

@@ -0,0 +1,70 @@
from typing import Dict, List, Optional
from fastapi import APIRouter, Header
from pydantic import BaseModel
class LanguageStrings(BaseModel):
validation: Dict[str, Dict[str, str]] # validation.required.field: {tr: "...", en: "..."}
messages: Dict[str, Dict[str, str]] # messages.welcome: {tr: "...", en: "..."}
labels: Dict[str, Dict[str, str]] # labels.submit_button: {tr: "...", en: "..."}
class LanguageService:
def __init__(self):
self.strings: Dict[str, Dict[str, Dict[str, str]]] = {
"validation": {
"required": {
"tr": "Bu alan zorunludur",
"en": "This field is required"
},
"email": {
"tr": "Geçerli bir e-posta adresi giriniz",
"en": "Please enter a valid email"
},
"min_length": {
"tr": "En az {min} karakter giriniz",
"en": "Enter at least {min} characters"
},
# Add more validation messages
},
"messages": {
"welcome": {
"tr": "Hoş geldiniz",
"en": "Welcome"
},
"success": {
"tr": "İşlem başarılı",
"en": "Operation successful"
},
# Add more messages
},
"labels": {
"submit": {
"tr": "Gönder",
"en": "Submit"
},
"cancel": {
"tr": "İptal",
"en": "Cancel"
},
# Add more labels
}
}
def get_strings(self, lang: str = "tr") -> LanguageStrings:
"""Get all strings for a specific language"""
return LanguageStrings(
validation={k: v[lang] for k, v in self.strings["validation"].items()},
messages={k: v[lang] for k, v in self.strings["messages"].items()},
labels={k: v[lang] for k, v in self.strings["labels"].items()}
)
# Create FastAPI router
router = APIRouter(prefix="/api/language", tags=["Language"])
language_service = LanguageService()
@router.get("/strings")
async def get_language_strings(
accept_language: Optional[str] = Header(default="tr")
) -> LanguageStrings:
"""Get all language strings based on Accept-Language header"""
lang = accept_language.split(",")[0][:2] # Get primary language code
return language_service.get_strings(lang if lang in ["tr", "en"] else "tr")

View File

@@ -0,0 +1,81 @@
from typing import Dict
from fastapi import APIRouter, Header
from pydantic import BaseModel
from typing import Optional
class ZodMessages(BaseModel):
"""Messages that match Zod's error types"""
required_error: str
invalid_type_error: str
invalid_string: Dict[str, str] # email, url, etc
too_small: Dict[str, str] # string, array, number
too_big: Dict[str, str] # string, array, number
custom: Dict[str, str] # custom validation messages
class LanguageService:
def __init__(self):
self.messages = {
"tr": {
"required_error": "Bu alan zorunludur",
"invalid_type_error": "Geçersiz tip",
"invalid_string": {
"email": "Geçerli bir e-posta adresi giriniz",
"url": "Geçerli bir URL giriniz",
"uuid": "Geçerli bir UUID giriniz"
},
"too_small": {
"string": "{min} karakterden az olamaz",
"array": "En az {min} öğe gereklidir",
"number": "En az {min} olmalıdır"
},
"too_big": {
"string": "{max} karakterden fazla olamaz",
"array": "En fazla {max} öğe olabilir",
"number": "En fazla {max} olabilir"
},
"custom": {
"password_match": "Şifreler eşleşmiyor",
"unique_email": "Bu e-posta adresi zaten kullanılıyor",
"strong_password": "Şifre en az bir büyük harf, bir küçük harf ve bir rakam içermelidir"
}
},
"en": {
"required_error": "This field is required",
"invalid_type_error": "Invalid type",
"invalid_string": {
"email": "Please enter a valid email",
"url": "Please enter a valid URL",
"uuid": "Please enter a valid UUID"
},
"too_small": {
"string": "Must be at least {min} characters",
"array": "Must contain at least {min} items",
"number": "Must be at least {min}"
},
"too_big": {
"string": "Must be at most {max} characters",
"array": "Must contain at most {max} items",
"number": "Must be at most {max}"
},
"custom": {
"password_match": "Passwords do not match",
"unique_email": "This email is already in use",
"strong_password": "Password must contain at least one uppercase letter, one lowercase letter, and one number"
}
}
}
def get_messages(self, lang: str = "tr") -> Dict:
"""Get all Zod messages for a specific language"""
return self.messages.get(lang, self.messages["tr"])
router = APIRouter(prefix="/api/language", tags=["Language"])
language_service = LanguageService()
@router.get("/zod-messages")
async def get_zod_messages(
accept_language: Optional[str] = Header(default="tr")
) -> Dict:
"""Get Zod validation messages based on Accept-Language header"""
lang = accept_language.split(",")[0][:2] # Get primary language code
return language_service.get_messages(lang if lang in ["tr", "en"] else "tr")

View File

@@ -3,4 +3,3 @@
from .route_configs import get_route_configs
__all__ = ["get_route_configs"]

View File

@@ -19,9 +19,7 @@ if TYPE_CHECKING:
prefix = "/available"
async def check_endpoints_available(
request: "Request"
) -> Dict[str, Any]:
async def check_endpoints_available(request: "Request") -> Dict[str, Any]:
"""
Check if endpoints are available.
"""
@@ -52,7 +50,7 @@ async def check_endpoint_available(
print("data", data)
data_dict = data.data
endpoint_asked = data_dict.get("endpoint", None)
if not endpoint_asked:
raise HTTPExceptionApi(
error_code="",
@@ -81,10 +79,7 @@ async def check_endpoint_available(
loc=get_line_number_for_error(),
sys_msg="Endpoint not found",
)
return {
"endpoint": endpoint_asked,
"status": "OK"
}
return {"endpoint": endpoint_asked, "status": "OK"}
AVAILABLE_CONFIG = RouteFactoryConfig(

View File

@@ -22,13 +22,17 @@ prefix = "/validation"
@TokenEventMiddleware.validation_required
async def validations_validations_select(request: Request, data: EndpointBaseRequestModel) -> Dict[str, Any]:
async def validations_validations_select(
request: Request, data: EndpointBaseRequestModel
) -> Dict[str, Any]:
"""
Select validations.
"""
wrapped_context = getattr(validations_validations_select, "__wrapped__", None)
auth_context = getattr(wrapped_context, "auth", None)
validation_code = getattr(validations_validations_select, "validation_code", {"validation_code": None})
validation_code = getattr(
validations_validations_select, "validation_code", {"validation_code": None}
)
if not validation_code:
raise HTTPExceptionApi(
error_code="",
@@ -41,12 +45,16 @@ async def validations_validations_select(request: Request, data: EndpointBaseReq
reachable_event_code=validation_code.get("reachable_event_code", None),
lang=getattr(auth_context, "lang", None),
)
validations_both = ValidationsBoth.retrieve_both_validations_and_headers(validations_pydantic)
return {"status": "OK", "validation_code": validation_code, **validations_both }
validations_both = ValidationsBoth.retrieve_both_validations_and_headers(
validations_pydantic
)
return {"status": "OK", "validation_code": validation_code, **validations_both}
@TokenEventMiddleware.validation_required
async def validations_headers_select(request: Request, data: EndpointBaseRequestModel) -> Dict[str, Any]:
async def validations_headers_select(
request: Request, data: EndpointBaseRequestModel
) -> Dict[str, Any]:
"""
Select headers.
"""
@@ -57,7 +65,9 @@ async def validations_headers_select(request: Request, data: EndpointBaseRequest
@TokenEventMiddleware.validation_required
async def validations_validations_and_headers_select(request: Request, data: EndpointBaseRequestModel) -> Dict[str, Any]:
async def validations_validations_and_headers_select(
request: Request, data: EndpointBaseRequestModel
) -> Dict[str, Any]:
"""
Select validations and headers.
"""
@@ -67,7 +77,7 @@ async def validations_validations_and_headers_select(request: Request, data: End
}
VALIDATION_CONFIG_MAIN =RouteFactoryConfig(
VALIDATION_CONFIG_MAIN = RouteFactoryConfig(
name="validations",
prefix=prefix,
tags=["Validation"],
@@ -113,4 +123,6 @@ VALIDATION_CONFIG_MAIN =RouteFactoryConfig(
)
VALIDATION_CONFIG = VALIDATION_CONFIG_MAIN.as_dict()
VALIDATION_ENDPOINTS = [endpoint.url_of_endpoint for endpoint in VALIDATION_CONFIG_MAIN.endpoints]
VALIDATION_ENDPOINTS = [
endpoint.url_of_endpoint for endpoint in VALIDATION_CONFIG_MAIN.endpoints
]

View File

@@ -11,6 +11,7 @@ if TYPE_CHECKING:
ListOptions,
)
class ValidationsPydantic(BaseModel):
class_model: str
reachable_event_code: str

View File

@@ -2,26 +2,16 @@
Validation request models.
"""
from typing import TYPE_CHECKING, Dict, Any, Literal, Optional, TypedDict, Union
from pydantic import BaseModel, Field, model_validator, RootModel, ConfigDict
from typing import TYPE_CHECKING, Dict, Any
from ApiEvents.abstract_class import MethodToEvent
from ApiLibrary.common.line_number import get_line_number_for_error
from ApiValidations.Custom.validation_response import ValidationModel, ValidationParser
from ApiEvents.abstract_class import MethodToEvent
from ApiEvents.base_request_model import BaseRequestModel, DictRequestModel
from ApiValidations.Custom.token_objects import EmployeeTokenObject, OccupantTokenObject
from ApiValidations.Request.base_validations import ListOptions
from ErrorHandlers.Exceptions.api_exc import HTTPExceptionApi
from .models import ValidationsPydantic
if TYPE_CHECKING:
from fastapi import Request
class AllModelsImport:
@classmethod
@@ -46,18 +36,14 @@ class AllModelsImport:
AddressCreateEventMethod=AddressCreateEventMethod,
AddressSearchEventMethod=AddressSearchEventMethod,
)
class ValidationsBoth(MethodToEvent):
@classmethod
def retrieve_both_validations_and_headers(
cls, event: ValidationsPydantic
) -> Dict[str, Any]:
def retrieve_both_validations_and_headers(cls, event: ValidationsPydantic) -> Dict[str, Any]:
EVENT_MODELS = AllModelsImport.import_all_models()
return_single_model = EVENT_MODELS.get(event.class_model, None)
print("return_single_model", return_single_model, type(return_single_model))
# event_class_validation = getattr(return_single_model, "__event_validation__", None)
if not return_single_model:
raise HTTPExceptionApi(
@@ -67,11 +53,17 @@ class ValidationsBoth(MethodToEvent):
sys_msg="Validation code not found",
)
response_model = return_single_model.retrieve_event_response_model(event.reachable_event_code)
language_model_all = return_single_model.retrieve_language_parameters(function_code=event.reachable_event_code, language=event.lang)
language_model_all = return_single_model.retrieve_language_parameters(
function_code=event.reachable_event_code, language=event.lang
)
language_model = language_model_all.get("language_model", None)
language_models = language_model_all.get("language_models", None)
validation = ValidationModel(response_model, language_model, language_models)
"""
Headers: Headers which is merged with response model && language models of event
Validation: Validation of event which is merged with response model && language models of event
"""
return {
"headers": validation.headers,
"validation": validation.validation,
@@ -82,16 +74,65 @@ class ValidationsBoth(MethodToEvent):
class ValidationsValidations(MethodToEvent):
@classmethod
def retrieve_validations(
cls, event: ValidationsPydantic
) -> Dict[str, Any]:
return {}
def retrieve_validations(cls, event: ValidationsPydantic) -> Dict[str, Any]:
EVENT_MODELS = AllModelsImport.import_all_models()
return_single_model = EVENT_MODELS.get(event.class_model, None)
# event_class_validation = getattr(return_single_model, "__event_validation__", None)
if not return_single_model:
raise HTTPExceptionApi(
error_code="",
lang="en",
loc=get_line_number_for_error(),
sys_msg="Validation code not found",
)
response_model = return_single_model.retrieve_event_response_model(event.reachable_event_code)
language_model_all = return_single_model.retrieve_language_parameters(
function_code=event.reachable_event_code, language=event.lang
)
language_model = language_model_all.get("language_model", None)
language_models = language_model_all.get("language_models", None)
validation = ValidationModel(response_model, language_model, language_models)
"""
Headers: Headers which is merged with response model && language models of event
Validation: Validation of event which is merged with response model && language models of event
"""
return {
"validation": validation.validation,
# "headers": validation.headers,
# "language_models": language_model_all,
}
class ValidationsHeaders(MethodToEvent):
@classmethod
def retrieve_headers(
cls, event: ValidationsPydantic
) -> Dict[str, Any]:
return {}
def retrieve_headers(cls, event: ValidationsPydantic
) -> Dict[str, Any]:
EVENT_MODELS = AllModelsImport.import_all_models()
return_single_model = EVENT_MODELS.get(event.class_model, None)
# event_class_validation = getattr(return_single_model, "__event_validation__", None)
if not return_single_model:
raise HTTPExceptionApi(
error_code="",
lang="en",
loc=get_line_number_for_error(),
sys_msg="Validation code not found",
)
response_model = return_single_model.retrieve_event_response_model(event.reachable_event_code)
language_model_all = return_single_model.retrieve_language_parameters(
function_code=event.reachable_event_code, language=event.lang
)
language_model = language_model_all.get("language_model", None)
language_models = language_model_all.get("language_models", None)
validation = ValidationModel(response_model, language_model, language_models)
"""
Headers: Headers which is merged with response model && language models of event
Validation: Validation of event which is merged with response model && language models of event
"""
return {
"headers": validation.headers,
# "validation": validation.validation,
# "language_models": language_model_all,
}

View File

@@ -0,0 +1,158 @@
from typing import Dict, Any, Type, get_type_hints, get_args, get_origin
from pydantic import BaseModel, Field, EmailStr
from enum import Enum
import inspect
from fastapi import APIRouter
from datetime import datetime
class SchemaConverter:
"""Converts Pydantic models to Zod schema definitions"""
TYPE_MAPPINGS = {
str: "string",
int: "number",
float: "number",
bool: "boolean",
list: "array",
dict: "object",
datetime: "date",
EmailStr: "string.email()",
}
def __init__(self):
self.processed_models = set()
def convert_model(self, model: Type[BaseModel]) -> Dict[str, Any]:
"""Convert a Pydantic model to a Zod schema definition"""
if model.__name__ in self.processed_models:
return {"$ref": model.__name__}
self.processed_models.add(model.__name__)
schema = {
"name": model.__name__,
"type": "object",
"fields": {},
"validations": {}
}
for field_name, field in model.__fields__.items():
field_info = self._convert_field(field)
schema["fields"][field_name] = field_info
# Get validations from field
validations = self._get_field_validations(field)
if validations:
schema["validations"][field_name] = validations
return schema
def _convert_field(self, field) -> Dict[str, Any]:
"""Convert a Pydantic field to Zod field definition"""
field_type = field.outer_type_
origin = get_origin(field_type)
if origin is not None:
# Handle generic types (List, Dict, etc)
args = get_args(field_type)
if origin == list:
return {
"type": "array",
"items": self._get_type_name(args[0])
}
elif origin == dict:
return {
"type": "object",
"additionalProperties": self._get_type_name(args[1])
}
if inspect.isclass(field_type) and issubclass(field_type, BaseModel):
# Nested model
return self.convert_model(field_type)
if inspect.isclass(field_type) and issubclass(field_type, Enum):
# Enum type
return {
"type": "enum",
"values": [e.value for e in field_type]
}
return {
"type": self._get_type_name(field_type)
}
def _get_field_validations(self, field) -> Dict[str, Any]:
"""Extract validations from field"""
validations = {}
if field.field_info.min_length is not None:
validations["min_length"] = field.field_info.min_length
if field.field_info.max_length is not None:
validations["max_length"] = field.field_info.max_length
if field.field_info.regex is not None:
validations["pattern"] = field.field_info.regex.pattern
if field.field_info.gt is not None:
validations["gt"] = field.field_info.gt
if field.field_info.lt is not None:
validations["lt"] = field.field_info.lt
return validations
def _get_type_name(self, type_: Type) -> str:
"""Get Zod type name for Python type"""
return self.TYPE_MAPPINGS.get(type_, "any")
# FastAPI router
router = APIRouter(prefix="/api/validation", tags=["Validation"])
converter = SchemaConverter()
@router.get("/schema/{model_name}")
async def get_schema(model_name: str) -> Dict[str, Any]:
"""Get Zod schema for a specific model"""
# This is just an example - you'd need to implement model lookup
models = {
"User": UserModel,
"Product": ProductModel,
# Add your models here
}
if model_name not in models:
raise ValueError(f"Model {model_name} not found")
return converter.convert_model(models[model_name])
# Example usage:
"""
class UserModel(BaseModel):
email: EmailStr
username: str = Field(min_length=3, max_length=50)
age: int = Field(gt=0, lt=150)
is_active: bool = True
roles: List[str] = []
# GET /api/validation/schema/User would return:
{
"name": "User",
"type": "object",
"fields": {
"email": {"type": "string.email()"},
"username": {"type": "string"},
"age": {"type": "number"},
"is_active": {"type": "boolean"},
"roles": {
"type": "array",
"items": "string"
}
},
"validations": {
"username": {
"min_length": 3,
"max_length": 50
},
"age": {
"gt": 0,
"lt": 150
}
}
}
"""

View File

@@ -16,8 +16,10 @@ from typing import (
Type,
ClassVar,
Union,
Awaitable,
Set,
)
from collections import defaultdict
import uuid
from dataclasses import dataclass, field
from pydantic import BaseModel
from fastapi import Request, Depends, APIRouter
@@ -178,64 +180,26 @@ class RouteFactoryConfig:
}
class ActionsSchema:
"""Base class for defining API action schemas.
This class handles endpoint registration and validation in the database.
Subclasses should implement specific validation logic.
"""
def __init__(self, endpoint: str):
"""Initialize with an API endpoint path.
Args:
endpoint: The API endpoint path (e.g. "/users/create")
"""
self.endpoint = endpoint
def retrieve_action_from_endpoint(self) -> Dict[str, Any]:
"""Retrieve the endpoint registration from the database.
Returns:
Dict containing the endpoint registration data
Raises:
HTTPException: If endpoint is not found in database
"""
raise NotImplementedError(
"Subclasses must implement retrieve_action_from_endpoint"
)
class ActionsSchemaFactory:
"""Factory class for creating and validating action schemas.
This class ensures proper initialization and validation of API endpoints
through their action schemas.
"""
def __init__(self, action: ActionsSchema):
"""Initialize with an action schema.
Args:
action: The action schema to initialize
Raises:
HTTPException: If action initialization fails
"""
self.action = action
self.action_match = self.action.retrieve_action_from_endpoint()
class MethodToEvent:
"""Base class for mapping methods to API events with type safety.
"""Base class for mapping methods to API events with type safety and endpoint configuration.
This class provides a framework for handling API events with proper
type checking for tokens and response models.
type checking for tokens and response models, as well as managing
endpoint configurations and frontend page structure.
Type Parameters:
TokenType: Type of authentication token
ResponseModel: Type of response model
Class Variables:
action_key: Unique identifier for the action
event_type: Type of event (e.g., 'query', 'command')
event_description: Human-readable description of the event
event_category: Category for grouping related events
__event_keys__: Mapping of UUIDs to event names
__event_validation__: Validation rules for events
__endpoint_config__: API endpoint configuration
__page_info__: Frontend page configuration
"""
action_key: ClassVar[Optional[str]] = None
@@ -244,9 +208,163 @@ class MethodToEvent:
event_category: ClassVar[str] = ""
__event_keys__: ClassVar[Dict[str, str]] = {}
__event_validation__: Dict[str, Tuple[Type, Union[List, tuple]]] = {}
__endpoint_config__: ClassVar[Dict[str, Dict[str, Any]]] = {
"endpoints": {}, # Mapping of event UUIDs to endpoint configs
"router_prefix": "", # Router prefix for all endpoints in this class
"tags": [], # OpenAPI tags
}
__page_info__: ClassVar[Dict[str, Any]] = {
"name": "", # Page name (e.g., "AccountPage")
"title": {"tr": "", "en": ""}, # Multi-language titles
"icon": "", # Icon name
"url": "", # Frontend route
"component": None, # Optional component name
"parent": None, # Parent page name if this is a subpage
}
@classmethod
def retrieve_event_response_model(cls, function_code: str) -> Tuple:
def register_endpoint(
cls,
event_uuid: str,
path: str,
method: str = "POST",
response_model: Optional[Type] = None,
**kwargs
) -> None:
"""Register an API endpoint configuration for an event.
Args:
event_uuid: UUID of the event
path: Endpoint path (will be prefixed with router_prefix)
method: HTTP method (default: POST)
response_model: Pydantic model for response
**kwargs: Additional FastAPI endpoint parameters
"""
if event_uuid not in cls.__event_keys__:
raise ValueError(f"Event UUID {event_uuid} not found in {cls.__name__}")
cls.__endpoint_config__["endpoints"][event_uuid] = {
"path": path,
"method": method,
"response_model": response_model,
**kwargs
}
@classmethod
def configure_router(cls, prefix: str, tags: List[str]) -> None:
"""Configure the API router settings.
Args:
prefix: Router prefix for all endpoints
tags: OpenAPI tags for documentation
"""
cls.__endpoint_config__["router_prefix"] = prefix
cls.__endpoint_config__["tags"] = tags
@classmethod
def configure_page(
cls,
name: str,
title: Dict[str, str],
icon: str,
url: str,
component: Optional[str] = None,
parent: Optional[str] = None
) -> None:
"""Configure the frontend page information.
Args:
name: Page name
title: Multi-language titles (must include 'tr' and 'en')
icon: Icon name
url: Frontend route
component: Optional component name
parent: Parent page name for subpages
"""
required_langs = {"tr", "en"}
if not all(lang in title for lang in required_langs):
raise ValueError(f"Title must contain all required languages: {required_langs}")
cls.__page_info__.update({
"name": name,
"title": title,
"icon": icon,
"url": url,
"component": component,
"parent": parent
})
@classmethod
def get_endpoint_config(cls) -> Dict[str, Any]:
"""Get the complete endpoint configuration."""
return cls.__endpoint_config__
@classmethod
def get_page_info(cls) -> Dict[str, Any]:
"""Get the frontend page configuration."""
return cls.__page_info__
@classmethod
def has_available_events(cls, user_permission_uuids: Set[str]) -> bool:
"""Check if any events are available based on user permissions."""
return bool(set(cls.__event_keys__.keys()) & user_permission_uuids)
@classmethod
def get_page_info_with_permissions(
cls,
user_permission_uuids: Set[str],
include_endpoints: bool = False
) -> Optional[Dict[str, Any]]:
"""Get page info if user has required permissions.
Args:
user_permission_uuids: Set of UUIDs the user has permission for
include_endpoints: Whether to include available endpoint information
Returns:
Dict with page info if user has permissions, None otherwise
"""
# Check if user has any permissions for this page's events
if not cls.has_available_events(user_permission_uuids):
return None
# Start with basic page info
page_info = {
**cls.__page_info__,
"category": cls.event_category,
"type": cls.event_type,
"description": cls.event_description
}
# Optionally include available endpoints
if include_endpoints:
available_endpoints = {}
for uuid, endpoint in cls.__endpoint_config__["endpoints"].items():
if uuid in user_permission_uuids:
available_endpoints[uuid] = {
"path": f"{cls.__endpoint_config__['router_prefix']}{endpoint['path']}",
"method": endpoint["method"],
"event_name": cls.__event_keys__[uuid]
}
if available_endpoints:
page_info["available_endpoints"] = available_endpoints
return page_info
@classmethod
def get_events_config(cls) -> Dict[str, Any]:
"""Get the complete configuration including events, endpoints, and page info."""
return {
"events": cls.__event_keys__,
"endpoints": cls.__endpoint_config__,
"page_info": cls.__page_info__,
"category": cls.event_category,
"type": cls.event_type,
"description": cls.event_description
}
@classmethod
def retrieve_event_response_model(cls, function_code: str) -> Any:
"""Retrieve event validation for a specific function.
Args:
@@ -264,7 +382,7 @@ class MethodToEvent:
sys_msg="Function not found",
)
return event_validation_list[0]
@classmethod
def retrieve_event_languages(cls, function_code: str) -> Union[List, tuple]:
"""Retrieve event description for a specific function.
@@ -276,7 +394,6 @@ class MethodToEvent:
Event description
"""
event_keys_list = cls.__event_validation__.get(function_code, None)
print('event_keys_list', event_keys_list)
if not event_keys_list:
raise HTTPExceptionApi(
error_code="",
@@ -295,7 +412,7 @@ class MethodToEvent:
return function_language_models
@staticmethod
def merge_models(language_model: List) -> Tuple:
def merge_models(language_model: List) -> Dict:
merged_models = {"tr": {}, "en": {}}
for model in language_model:
for lang in dict(model).keys():
@@ -327,7 +444,9 @@ class MethodToEvent:
return function_itself
@classmethod
def retrieve_language_parameters(cls, function_code: str, language: str = "tr") -> Dict[str, str]:
def retrieve_language_parameters(
cls, function_code: str, language: str = "tr"
) -> Dict[str, Any]:
"""Retrieve language-specific parameters for an event.
Args:
@@ -342,19 +461,459 @@ class MethodToEvent:
event_response_model_merged = cls.merge_models(event_language_models)
event_response_model_merged_lang = event_response_model_merged[language]
# Map response model fields to language-specific values
print('event_response_model', dict(
event_response_model=event_response_model,
event_response_model_merged_lang=event_response_model_merged_lang,
event_response_model_merged=event_response_model_merged,
language=language,
function_code=function_code,
))
only_language_dict = {
field: event_response_model_merged_lang[field]
for field in event_response_model.model_fields
if field in event_response_model_merged_lang
}
"""
__event_validation__ : {"key": [A, B, C]}
Language Model : Language Model that is model pydatnic requires
Language Models : All language_models that is included in Langugage Models Section
Merged Language Models : Merged with all models in list event_validation
"""
return {
"language_model": only_language_dict,
"language_models": event_response_model_merged,
}
class EventMethodRegistry:
"""Registry for mapping event method UUIDs to categories and managing permissions."""
def __init__(self):
self._uuid_map: Dict[str, Tuple[Type[MethodToEvent], str]] = {} # uuid -> (method_class, event_name)
self._category_events: Dict[str, Set[str]] = defaultdict(set) # category -> set of uuids
def register_method(self, category_name: str, method_class: Type[MethodToEvent]) -> None:
"""Register a method class with its category."""
# Register all UUIDs from the method
for event_uuid, event_name in method_class.__event_keys__.items():
self._uuid_map[event_uuid] = (method_class, event_name)
self._category_events[category_name].add(event_uuid)
def get_method_by_uuid(self, event_uuid: str) -> Optional[Tuple[Type[MethodToEvent], str]]:
"""Get method class and event name by UUID."""
return self._uuid_map.get(event_uuid)
def get_events_for_category(self, category_name: str) -> Set[str]:
"""Get all event UUIDs for a category."""
return self._category_events.get(category_name, set())
class EventCategory:
"""Base class for defining event categories similar to frontend page structure."""
def __init__(
self,
name: str,
title: Dict[str, str],
icon: str,
url: str,
component: Optional[str] = None,
page_info: Any = None,
all_endpoints: Dict[str, Set[str]] = None, # category -> set of event UUIDs
sub_categories: List = None,
):
self.name = name
self.title = self._validate_title(title)
self.icon = icon
self.url = url
self.component = component
self.page_info = page_info
self.all_endpoints = all_endpoints or {}
self.sub_categories = self._process_subcategories(sub_categories or [])
def _validate_title(self, title: Dict[str, str]) -> Dict[str, str]:
"""Validate title has required languages."""
required_langs = {"tr", "en"}
if not all(lang in title for lang in required_langs):
raise ValueError(f"Title must contain all required languages: {required_langs}")
return title
def _process_subcategories(self, categories: List[Union[Dict, "EventCategory"]]) -> List["EventCategory"]:
"""Process subcategories ensuring they are all EventCategory instances."""
processed = []
for category in categories:
if isinstance(category, dict):
processed.append(EventCategory.from_dict(category))
elif isinstance(category, EventCategory):
processed.append(category)
else:
raise ValueError(f"Invalid subcategory type: {type(category)}")
return processed
def has_available_events(self, user_permission_uuids: Set[str]) -> bool:
"""Check if category has available events based on UUID intersection."""
# Check current category's events
return any(
bool(events & user_permission_uuids)
for events in self.all_endpoints.values()
)
def get_menu_item(self, user_permission_uuids: Set[str]) -> Optional[Dict[str, Any]]:
"""Get menu item if category has available events."""
# First check if this category has available events
if not self.has_available_events(user_permission_uuids):
return None
menu_item = {
"name": self.name,
"title": self.title,
"icon": self.icon,
"url": self.url
}
if self.component:
menu_item["component"] = self.component
# Only process subcategories if parent has permissions
sub_items = []
for subcategory in self.sub_categories:
if sub_menu := subcategory.get_menu_item(user_permission_uuids):
sub_items.append(sub_menu)
if sub_items:
menu_item["items"] = sub_items
return menu_item
def get_available_events(self, registry: EventMethodRegistry, user_permission_uuids: Set[str]) -> Dict[str, List[Dict[str, Any]]]:
"""Get available events based on user permission UUIDs."""
available_events = defaultdict(list)
# Process endpoints in current category
category_events = self.all_endpoints.get(self.name, set())
for event_uuid in category_events & user_permission_uuids:
method_info = registry.get_method_by_uuid(event_uuid)
if method_info:
method_class, event_name = method_info
available_events[method_class.event_type].append({
"uuid": event_uuid,
"name": event_name,
"description": method_class.event_description,
"category": method_class.event_category
})
# Process subcategories recursively
for subcategory in self.sub_categories:
sub_events = subcategory.get_available_events(registry, user_permission_uuids)
for event_type, events in sub_events.items():
available_events[event_type].extend(events)
return dict(available_events)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "EventCategory":
"""Create category from dictionary."""
return cls(
name=data["name"],
title=data["title"],
icon=data["icon"],
url=data["url"],
component=data.get("component"),
page_info=data.get("pageInfo"),
all_endpoints=data.get("allEndpoints", {}),
sub_categories=data.get("subCategories", [])
)
def to_dict(self, registry: EventMethodRegistry, user_permission_uuids: Optional[Set[str]] = None) -> Dict[str, Any]:
"""Convert category to dictionary with optional permission filtering."""
result = {
"name": self.name,
"title": self.title,
"icon": self.icon,
"url": self.url,
"pageInfo": self.page_info,
}
if user_permission_uuids is not None:
# Only include endpoints and their info if user has permissions
available_events = self.get_available_events(registry, user_permission_uuids)
if available_events:
result["availableEvents"] = available_events
result["allEndpoints"] = self.all_endpoints
else:
# Include all endpoints if no permissions specified
result["allEndpoints"] = self.all_endpoints
# Process subcategories
subcategories = [
sub.to_dict(registry, user_permission_uuids) for sub in self.sub_categories
]
# Only include subcategories that have available events
if user_permission_uuids is None or any(
"availableEvents" in sub for sub in subcategories
):
result["subCategories"] = subcategories
if self.component:
result["component"] = self.component
return result
class EventCategoryManager:
"""Manager class for handling event categories and their relationships."""
def __init__(self):
self.categories: List[EventCategory] = []
self.registry = EventMethodRegistry()
def get_menu_tree(self, user_permission_uuids: Set[str]) -> List[Dict[str, Any]]:
"""Get menu tree based on available events."""
return [
menu_item for category in self.categories
if (menu_item := category.get_menu_item(user_permission_uuids))
]
def register_category(self, category: EventCategory) -> None:
"""Register a category and its endpoints in the registry."""
self.categories.append(category)
def add_category(self, category: Union[EventCategory, Dict[str, Any]]) -> None:
"""Add a new category."""
if isinstance(category, dict):
category = EventCategory.from_dict(category)
self.register_category(category)
def add_categories(self, categories: List[Union[EventCategory, Dict[str, Any]]]) -> None:
"""Add multiple categories at once."""
for category in categories:
self.add_category(category)
def get_category(self, name: str) -> Optional[EventCategory]:
"""Get category by name."""
return next((cat for cat in self.categories if cat.name == name), None)
def get_all_categories(self, user_permission_uuids: Optional[Set[str]] = None) -> List[Dict[str, Any]]:
"""Get all categories as dictionary, filtered by user permissions."""
return [cat.to_dict(self.registry, user_permission_uuids) for cat in self.categories]
def get_category_endpoints(self, category_name: str) -> Set[str]:
"""Get all endpoint UUIDs for a category."""
category = self.get_category(category_name)
return category.all_endpoints.get(category_name, set()) if category else set()
def get_subcategories(self, category_name: str, user_permission_uuids: Optional[Set[str]] = None) -> List[Dict[str, Any]]:
"""Get subcategories for a category."""
category = self.get_category(category_name)
if not category:
return []
return [sub.to_dict(self.registry, user_permission_uuids) for sub in category.sub_categories]
def find_category_by_url(self, url: str) -> Optional[EventCategory]:
"""Find a category by its URL."""
for category in self.categories:
if category.url == url:
return category
for subcategory in category.sub_categories:
if subcategory.url == url:
return subcategory
return None
class EventMethodRegistry:
"""Registry for all MethodToEvent classes and menu building."""
_instance = None
_method_classes: Dict[str, Type[MethodToEvent]] = {}
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
@classmethod
def register_method_class(cls, method_class: Type[MethodToEvent]) -> None:
"""Register a MethodToEvent class."""
if not issubclass(method_class, MethodToEvent):
raise ValueError(f"{method_class.__name__} must be a subclass of MethodToEvent")
page_info = method_class.get_page_info()
cls._method_classes[page_info["name"]] = method_class
@classmethod
def get_all_menu_items(
cls,
user_permission_uuids: Set[str],
include_endpoints: bool = False
) -> List[Dict[str, Any]]:
"""Get all menu items based on user permissions.
Args:
user_permission_uuids: Set of UUIDs the user has permission for
include_endpoints: Whether to include available endpoint information
Returns:
List of menu items organized in a tree structure
"""
# First get all page infos
page_infos = {}
for method_class in cls._method_classes.values():
if page_info := method_class.get_page_info_with_permissions(user_permission_uuids, include_endpoints):
page_infos[page_info["name"]] = page_info
# Build tree structure
menu_tree = []
child_pages = set()
# First pass: identify all child pages
for page_info in page_infos.values():
if page_info.get("parent"):
child_pages.add(page_info["name"])
# Second pass: build tree structure
for name, page_info in page_infos.items():
# Skip if this is a child page
if name in child_pages:
continue
# Start with this page's info
menu_item = page_info.copy()
# Find and add children
children = []
for child_info in page_infos.values():
if child_info.get("parent") == name:
children.append(child_info)
if children:
menu_item["items"] = sorted(
children,
key=lambda x: x["name"]
)
menu_tree.append(menu_item)
return sorted(menu_tree, key=lambda x: x["name"])
@classmethod
def get_available_endpoints(
cls,
user_permission_uuids: Set[str]
) -> Dict[str, Dict[str, Any]]:
"""Get all available endpoints based on user permissions.
Args:
user_permission_uuids: Set of UUIDs the user has permission for
Returns:
Dict mapping event UUIDs to endpoint configurations
"""
available_endpoints = {}
for method_class in cls._method_classes.values():
if page_info := method_class.get_page_info_with_permissions(
user_permission_uuids,
include_endpoints=True
):
if endpoints := page_info.get("available_endpoints"):
available_endpoints.update(endpoints)
return available_endpoints
"""
Example usage
# Register your MethodToEvent classes
registry = EventMethodRegistry()
registry.register_method_class(AccountEventMethods)
registry.register_method_class(AccountDetailsEventMethods)
# Get complete menu structure
user_permissions = {
"uuid1",
"uuid2",
"uuid3"
}
menu_items = registry.get_all_menu_items(user_permissions, include_endpoints=True)
# Result:
[
{
"name": "AccountPage",
"title": {"tr": "Hesaplar", "en": "Accounts"},
"icon": "User",
"url": "/account",
"category": "account",
"type": "query",
"description": "Account management operations",
"available_endpoints": {
"uuid1": {"path": "/api/account/view", "method": "GET"},
"uuid2": {"path": "/api/account/edit", "method": "POST"}
},
"items": [
{
"name": "AccountDetailsPage",
"title": {"tr": "Hesap Detayları", "en": "Account Details"},
"icon": "FileText",
"url": "/account/details",
"parent": "AccountPage",
"category": "account_details",
"type": "query",
"available_endpoints": {
"uuid3": {"path": "/api/account/details/view", "method": "GET"}
}
}
]
}
]
# Get all available endpoints
endpoints = registry.get_available_endpoints(user_permissions)
# Result:
{
"uuid1": {
"path": "/api/account/view",
"method": "GET",
"event_name": "view_account"
},
"uuid2": {
"path": "/api/account/edit",
"method": "POST",
"event_name": "edit_account"
},
"uuid3": {
"path": "/api/account/details/view",
"method": "GET",
"event_name": "view_details"
}
}
# Get event UUIDs from MethodToEvent classes
account_events = {uuid for uuid in AccountEventMethods.__event_keys__}
# Define categories with event UUIDs
PAGES_INFO = [
{
"name": "AccountPage",
"title": {"tr": "Hesaplar", "en": "Accounts"},
"icon": "User",
"url": "/account",
"pageInfo": AccountPageInfo,
"allEndpoints": {"AccountPage": account_events},
"subCategories": [
{
"name": "AccountDetailsPage",
"title": {"tr": "Hesap Detayları", "en": "Account Details"},
"icon": "FileText",
"url": "/account/details",
"allEndpoints": {} # No direct endpoints, only shown if parent has permissions
}
]
}
]
# Initialize manager
manager = EventCategoryManager()
manager.add_categories(PAGES_INFO)
# Get menu tree based on available events
user_permission_uuids = {
"31f4f32f-0cd4-4995-8a6a-f9f56335848a",
"ec98ef2c-bcd0-432d-a8f4-1822a56c33b2"
}
menu_tree = manager.get_menu_tree(user_permission_uuids)
"""