""" Abstract base classes for API route and event handling. This module provides core abstractions for route configuration and factory, with support for authentication and event handling. """ from typing import ( Tuple, TypeVar, Optional, Callable, Dict, Any, List, Type, ClassVar, Union, Set, ) from collections import defaultdict import uuid from dataclasses import dataclass, field from pydantic import BaseModel from fastapi import Request, Depends, APIRouter from functools import wraps import inspect from ApiLibrary.common.line_number import get_line_number_for_error from ErrorHandlers.Exceptions.api_exc import HTTPExceptionApi from Schemas.rules.rules import EndpointRestriction ResponseModel = TypeVar("ResponseModel", bound=BaseModel) def endpoint_wrapper(url_of_endpoint: Optional[str] = None): """Create a wrapper for endpoints that stores url_of_endpoint in closure. Args: url_of_endpoint: Optional URL path for the endpoint """ def decorator(func: Callable[..., Any]) -> Callable[..., Any]: @wraps(func) async def wrapper( *args: Any, **kwargs: Any ) -> Union[Dict[str, Any], BaseModel]: # Handle both async and sync functions if inspect.iscoroutinefunction(func): result = await func(*args, **kwargs) else: result = func(*args, **kwargs) # If result is a coroutine, await it if inspect.iscoroutine(result): result = await result # Add endpoint to the result if isinstance(result, dict): result["endpoint"] = url_of_endpoint return result elif isinstance(result, BaseModel): # Convert Pydantic model to dict and add endpoint result_dict = result.model_dump() result_dict["endpoint"] = url_of_endpoint return result_dict return result wrapper.url_of_endpoint = url_of_endpoint return wrapper return decorator @dataclass class EndpointFactoryConfig: """Configuration class for API endpoints. Attributes: url_of_endpoint: Full URL path for this endpoint endpoint: URL path for this endpoint method: HTTP method (GET, POST, etc.) summary: Short description for API documentation description: Detailed description for API documentation endpoint_function: Function to handle the endpoint is_auth_required: Whether authentication is required response_model: Optional response model for OpenAPI schema request_model: Optional request model for OpenAPI schema is_event_required: Whether event handling is required extra_options: Additional endpoint options """ url_prefix: str url_endpoint: str url_of_endpoint: str endpoint: str method: str summary: str description: str endpoint_function: Callable[..., Any] # Now accepts any parameters and return type response_model: Optional[type] = None request_model: Optional[type] = None is_auth_required: bool = True is_event_required: bool = False extra_options: Dict[str, Any] = field(default_factory=dict) def __post_init__(self): """Post initialization hook. Wraps endpoint function with appropriate middleware based on configuration: - If auth and event required -> wrap with TokenEventMiddleware - If only event required -> wrap with EventMiddleware - If only auth required -> wrap with MiddlewareModule.auth_required """ # First apply auth/event middleware if self.is_event_required: from middleware import TokenEventMiddleware self.endpoint_function = TokenEventMiddleware.event_required( self.endpoint_function ) elif self.is_auth_required: from middleware import MiddlewareModule self.endpoint_function = MiddlewareModule.auth_required( self.endpoint_function ) # Then wrap with endpoint_wrapper to store url_of_endpoint self.endpoint_function = endpoint_wrapper(self.url_of_endpoint)( self.endpoint_function ) class RouteFactoryConfig: """Configuration class for API route factories. Attributes: name: Route name tags: List of tags for API documentation prefix: URL prefix for all endpoints in this route include_in_schema: Whether to include in OpenAPI schema endpoints: List of endpoint configurations extra_options: Additional route options """ def __init__( self, name: str, tags: List[str], prefix: str, include_in_schema: bool = True, endpoints: List[EndpointFactoryConfig] = None, extra_options: Dict[str, Any] = None, ): self.name = name self.tags = tags self.prefix = prefix self.include_in_schema = include_in_schema self.endpoints = endpoints or [] self.extra_options = extra_options or {} def __post_init__(self): """Validate and normalize configuration after initialization.""" if self.endpoints is None: self.endpoints = [] if self.extra_options is None: self.extra_options = {} def as_dict(self) -> Dict[str, Any]: """Convert configuration to dictionary format.""" return { "name": self.name, "tags": self.tags, "prefix": self.prefix, "include_in_schema": self.include_in_schema, "endpoints": [endpoint.__dict__ for endpoint in self.endpoints], "extra_options": self.extra_options, } class MethodToEvent: """Base class for mapping methods to API events with type safety and endpoint configuration. This class provides a framework for handling API events with proper type checking for tokens and response models, as well as managing endpoint configurations and frontend page structure. Type Parameters: TokenType: Type of authentication token ResponseModel: Type of response model Class Variables: action_key: Unique identifier for the action event_type: Type of event (e.g., 'query', 'command') event_description: Human-readable description of the event event_category: Category for grouping related events __event_keys__: Mapping of UUIDs to event names __event_validation__: Validation rules for events __endpoint_config__: API endpoint configuration __page_info__: Frontend page configuration """ action_key: ClassVar[Optional[str]] = None event_type: ClassVar[Optional[str]] = None event_description: ClassVar[str] = "" event_category: ClassVar[str] = "" __event_keys__: ClassVar[Dict[str, str]] = {} __event_validation__: Dict[str, Tuple[Type, Union[List, tuple]]] = {} __endpoint_config__: ClassVar[Dict[str, Dict[str, Any]]] = { "endpoints": {}, # Mapping of event UUIDs to endpoint configs "router_prefix": "", # Router prefix for all endpoints in this class "tags": [], # OpenAPI tags } __page_info__: ClassVar[Dict[str, Any]] = { "name": "", # Page name (e.g., "AccountPage") "title": {"tr": "", "en": ""}, # Multi-language titles "icon": "", # Icon name "url": "", # Frontend route "component": None, # Optional component name "parent": None, # Parent page name if this is a subpage } @classmethod def register_endpoint( cls, event_uuid: str, path: str, method: str = "POST", response_model: Optional[Type] = None, **kwargs ) -> None: """Register an API endpoint configuration for an event. Args: event_uuid: UUID of the event path: Endpoint path (will be prefixed with router_prefix) method: HTTP method (default: POST) response_model: Pydantic model for response **kwargs: Additional FastAPI endpoint parameters """ if event_uuid not in cls.__event_keys__: raise ValueError(f"Event UUID {event_uuid} not found in {cls.__name__}") cls.__endpoint_config__["endpoints"][event_uuid] = { "path": path, "method": method, "response_model": response_model, **kwargs } @classmethod def configure_router(cls, prefix: str, tags: List[str]) -> None: """Configure the API router settings. Args: prefix: Router prefix for all endpoints tags: OpenAPI tags for documentation """ cls.__endpoint_config__["router_prefix"] = prefix cls.__endpoint_config__["tags"] = tags @classmethod def configure_page( cls, name: str, title: Dict[str, str], icon: str, url: str, component: Optional[str] = None, parent: Optional[str] = None ) -> None: """Configure the frontend page information. Args: name: Page name title: Multi-language titles (must include 'tr' and 'en') icon: Icon name url: Frontend route component: Optional component name parent: Parent page name for subpages """ required_langs = {"tr", "en"} if not all(lang in title for lang in required_langs): raise ValueError(f"Title must contain all required languages: {required_langs}") cls.__page_info__.update({ "name": name, "title": title, "icon": icon, "url": url, "component": component, "parent": parent }) @classmethod def get_endpoint_config(cls) -> Dict[str, Any]: """Get the complete endpoint configuration.""" return cls.__endpoint_config__ @classmethod def get_page_info(cls) -> Dict[str, Any]: """Get the frontend page configuration.""" return cls.__page_info__ @classmethod def has_available_events(cls, user_permission_uuids: Set[str]) -> bool: """Check if any events are available based on user permissions.""" return bool(set(cls.__event_keys__.keys()) & user_permission_uuids) @classmethod def get_page_info_with_permissions( cls, user_permission_uuids: Set[str], include_endpoints: bool = False ) -> Optional[Dict[str, Any]]: """Get page info if user has required permissions. Args: user_permission_uuids: Set of UUIDs the user has permission for include_endpoints: Whether to include available endpoint information Returns: Dict with page info if user has permissions, None otherwise """ # Check if user has any permissions for this page's events if not cls.has_available_events(user_permission_uuids): return None # Start with basic page info page_info = { **cls.__page_info__, "category": cls.event_category, "type": cls.event_type, "description": cls.event_description } # Optionally include available endpoints if include_endpoints: available_endpoints = {} for uuid, endpoint in cls.__endpoint_config__["endpoints"].items(): if uuid in user_permission_uuids: available_endpoints[uuid] = { "path": f"{cls.__endpoint_config__['router_prefix']}{endpoint['path']}", "method": endpoint["method"], "event_name": cls.__event_keys__[uuid] } if available_endpoints: page_info["available_endpoints"] = available_endpoints return page_info @classmethod def get_events_config(cls) -> Dict[str, Any]: """Get the complete configuration including events, endpoints, and page info.""" return { "events": cls.__event_keys__, "endpoints": cls.__endpoint_config__, "page_info": cls.__page_info__, "category": cls.event_category, "type": cls.event_type, "description": cls.event_description } @classmethod def retrieve_event_response_model(cls, function_code: str) -> Any: """Retrieve event validation for a specific function. Args: function_code: Function identifier Returns: Tuple containing response model and language models """ event_validation_list = cls.__event_validation__.get(function_code, None) if not event_validation_list: raise HTTPExceptionApi( error_code="", lang="en", loc=get_line_number_for_error(), sys_msg="Function not found", ) return event_validation_list[0] @classmethod def retrieve_event_languages(cls, function_code: str) -> Union[List, tuple]: """Retrieve event description for a specific function. Args: function_code: Function identifier Returns: Event description """ event_keys_list = cls.__event_validation__.get(function_code, None) if not event_keys_list: raise HTTPExceptionApi( error_code="", lang="en", loc=get_line_number_for_error(), sys_msg="Function not found", ) function_language_models: Union[List, tuple] = event_keys_list[1] if not function_language_models: raise HTTPExceptionApi( error_code="", lang="en", loc=get_line_number_for_error(), sys_msg="Function not found", ) return function_language_models @staticmethod def merge_models(language_model: List) -> Dict: merged_models = {"tr": {}, "en": {}} for model in language_model: for lang in dict(model).keys(): if lang not in merged_models: merged_models[lang] = model[lang] else: merged_models[lang].update(model[lang]) return merged_models @classmethod def retrieve_event_function(cls, function_code: str) -> Dict[str, str]: """Retrieve event parameters for a specific function. Args: function_code: Function identifier Returns: Dictionary of event parameters """ function_event = cls.__event_keys__[function_code] function_itself = getattr(cls, function_event, None) if not function_itself: raise HTTPExceptionApi( error_code="", lang="en", loc=get_line_number_for_error(), sys_msg="Function not found", ) return function_itself @classmethod def retrieve_language_parameters( cls, function_code: str, language: str = "tr" ) -> Dict[str, Any]: """Retrieve language-specific parameters for an event. Args: language: Language code (e.g. 'tr', 'en') function_code: Function identifier Returns: Dictionary of language-specific field mappings """ event_language_models = cls.retrieve_event_languages(function_code) event_response_model = cls.retrieve_event_response_model(function_code) event_response_model_merged = cls.merge_models(event_language_models) event_response_model_merged_lang = event_response_model_merged[language] # Map response model fields to language-specific values only_language_dict = { field: event_response_model_merged_lang[field] for field in event_response_model.model_fields if field in event_response_model_merged_lang } """ __event_validation__ : {"key": [A, B, C]} Language Model : Language Model that is model pydatnic requires Language Models : All language_models that is included in Langugage Models Section Merged Language Models : Merged with all models in list event_validation """ return { "language_model": only_language_dict, "language_models": event_response_model_merged, } class EventMethodRegistry: """Registry for mapping event method UUIDs to categories and managing permissions.""" def __init__(self): self._uuid_map: Dict[str, Tuple[Type[MethodToEvent], str]] = {} # uuid -> (method_class, event_name) self._category_events: Dict[str, Set[str]] = defaultdict(set) # category -> set of uuids def register_method(self, category_name: str, method_class: Type[MethodToEvent]) -> None: """Register a method class with its category.""" # Register all UUIDs from the method for event_uuid, event_name in method_class.__event_keys__.items(): self._uuid_map[event_uuid] = (method_class, event_name) self._category_events[category_name].add(event_uuid) def get_method_by_uuid(self, event_uuid: str) -> Optional[Tuple[Type[MethodToEvent], str]]: """Get method class and event name by UUID.""" return self._uuid_map.get(event_uuid) def get_events_for_category(self, category_name: str) -> Set[str]: """Get all event UUIDs for a category.""" return self._category_events.get(category_name, set()) class EventCategory: """Base class for defining event categories similar to frontend page structure.""" def __init__( self, name: str, title: Dict[str, str], icon: str, url: str, component: Optional[str] = None, page_info: Any = None, all_endpoints: Dict[str, Set[str]] = None, # category -> set of event UUIDs sub_categories: List = None, ): self.name = name self.title = self._validate_title(title) self.icon = icon self.url = url self.component = component self.page_info = page_info self.all_endpoints = all_endpoints or {} self.sub_categories = self._process_subcategories(sub_categories or []) def _validate_title(self, title: Dict[str, str]) -> Dict[str, str]: """Validate title has required languages.""" required_langs = {"tr", "en"} if not all(lang in title for lang in required_langs): raise ValueError(f"Title must contain all required languages: {required_langs}") return title def _process_subcategories(self, categories: List[Union[Dict, "EventCategory"]]) -> List["EventCategory"]: """Process subcategories ensuring they are all EventCategory instances.""" processed = [] for category in categories: if isinstance(category, dict): processed.append(EventCategory.from_dict(category)) elif isinstance(category, EventCategory): processed.append(category) else: raise ValueError(f"Invalid subcategory type: {type(category)}") return processed def has_available_events(self, user_permission_uuids: Set[str]) -> bool: """Check if category has available events based on UUID intersection.""" # Check current category's events return any( bool(events & user_permission_uuids) for events in self.all_endpoints.values() ) def get_menu_item(self, user_permission_uuids: Set[str]) -> Optional[Dict[str, Any]]: """Get menu item if category has available events.""" # First check if this category has available events if not self.has_available_events(user_permission_uuids): return None menu_item = { "name": self.name, "title": self.title, "icon": self.icon, "url": self.url } if self.component: menu_item["component"] = self.component # Only process subcategories if parent has permissions sub_items = [] for subcategory in self.sub_categories: if sub_menu := subcategory.get_menu_item(user_permission_uuids): sub_items.append(sub_menu) if sub_items: menu_item["items"] = sub_items return menu_item def get_available_events(self, registry: EventMethodRegistry, user_permission_uuids: Set[str]) -> Dict[str, List[Dict[str, Any]]]: """Get available events based on user permission UUIDs.""" available_events = defaultdict(list) # Process endpoints in current category category_events = self.all_endpoints.get(self.name, set()) for event_uuid in category_events & user_permission_uuids: method_info = registry.get_method_by_uuid(event_uuid) if method_info: method_class, event_name = method_info available_events[method_class.event_type].append({ "uuid": event_uuid, "name": event_name, "description": method_class.event_description, "category": method_class.event_category }) # Process subcategories recursively for subcategory in self.sub_categories: sub_events = subcategory.get_available_events(registry, user_permission_uuids) for event_type, events in sub_events.items(): available_events[event_type].extend(events) return dict(available_events) @classmethod def from_dict(cls, data: Dict[str, Any]) -> "EventCategory": """Create category from dictionary.""" return cls( name=data["name"], title=data["title"], icon=data["icon"], url=data["url"], component=data.get("component"), page_info=data.get("pageInfo"), all_endpoints=data.get("allEndpoints", {}), sub_categories=data.get("subCategories", []) ) def to_dict(self, registry: EventMethodRegistry, user_permission_uuids: Optional[Set[str]] = None) -> Dict[str, Any]: """Convert category to dictionary with optional permission filtering.""" result = { "name": self.name, "title": self.title, "icon": self.icon, "url": self.url, "pageInfo": self.page_info, } if user_permission_uuids is not None: # Only include endpoints and their info if user has permissions available_events = self.get_available_events(registry, user_permission_uuids) if available_events: result["availableEvents"] = available_events result["allEndpoints"] = self.all_endpoints else: # Include all endpoints if no permissions specified result["allEndpoints"] = self.all_endpoints # Process subcategories subcategories = [ sub.to_dict(registry, user_permission_uuids) for sub in self.sub_categories ] # Only include subcategories that have available events if user_permission_uuids is None or any( "availableEvents" in sub for sub in subcategories ): result["subCategories"] = subcategories if self.component: result["component"] = self.component return result class EventCategoryManager: """Manager class for handling event categories and their relationships.""" def __init__(self): self.categories: List[EventCategory] = [] self.registry = EventMethodRegistry() def get_menu_tree(self, user_permission_uuids: Set[str]) -> List[Dict[str, Any]]: """Get menu tree based on available events.""" return [ menu_item for category in self.categories if (menu_item := category.get_menu_item(user_permission_uuids)) ] def register_category(self, category: EventCategory) -> None: """Register a category and its endpoints in the registry.""" self.categories.append(category) def add_category(self, category: Union[EventCategory, Dict[str, Any]]) -> None: """Add a new category.""" if isinstance(category, dict): category = EventCategory.from_dict(category) self.register_category(category) def add_categories(self, categories: List[Union[EventCategory, Dict[str, Any]]]) -> None: """Add multiple categories at once.""" for category in categories: self.add_category(category) def get_category(self, name: str) -> Optional[EventCategory]: """Get category by name.""" return next((cat for cat in self.categories if cat.name == name), None) def get_all_categories(self, user_permission_uuids: Optional[Set[str]] = None) -> List[Dict[str, Any]]: """Get all categories as dictionary, filtered by user permissions.""" return [cat.to_dict(self.registry, user_permission_uuids) for cat in self.categories] def get_category_endpoints(self, category_name: str) -> Set[str]: """Get all endpoint UUIDs for a category.""" category = self.get_category(category_name) return category.all_endpoints.get(category_name, set()) if category else set() def get_subcategories(self, category_name: str, user_permission_uuids: Optional[Set[str]] = None) -> List[Dict[str, Any]]: """Get subcategories for a category.""" category = self.get_category(category_name) if not category: return [] return [sub.to_dict(self.registry, user_permission_uuids) for sub in category.sub_categories] def find_category_by_url(self, url: str) -> Optional[EventCategory]: """Find a category by its URL.""" for category in self.categories: if category.url == url: return category for subcategory in category.sub_categories: if subcategory.url == url: return subcategory return None class EventMethodRegistry: """Registry for all MethodToEvent classes and menu building.""" _instance = None _method_classes: Dict[str, Type[MethodToEvent]] = {} def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance @classmethod def register_method_class(cls, method_class: Type[MethodToEvent]) -> None: """Register a MethodToEvent class.""" if not issubclass(method_class, MethodToEvent): raise ValueError(f"{method_class.__name__} must be a subclass of MethodToEvent") page_info = method_class.get_page_info() cls._method_classes[page_info["name"]] = method_class @classmethod def get_all_menu_items( cls, user_permission_uuids: Set[str], include_endpoints: bool = False ) -> List[Dict[str, Any]]: """Get all menu items based on user permissions. Args: user_permission_uuids: Set of UUIDs the user has permission for include_endpoints: Whether to include available endpoint information Returns: List of menu items organized in a tree structure """ # First get all page infos page_infos = {} for method_class in cls._method_classes.values(): if page_info := method_class.get_page_info_with_permissions(user_permission_uuids, include_endpoints): page_infos[page_info["name"]] = page_info # Build tree structure menu_tree = [] child_pages = set() # First pass: identify all child pages for page_info in page_infos.values(): if page_info.get("parent"): child_pages.add(page_info["name"]) # Second pass: build tree structure for name, page_info in page_infos.items(): # Skip if this is a child page if name in child_pages: continue # Start with this page's info menu_item = page_info.copy() # Find and add children children = [] for child_info in page_infos.values(): if child_info.get("parent") == name: children.append(child_info) if children: menu_item["items"] = sorted( children, key=lambda x: x["name"] ) menu_tree.append(menu_item) return sorted(menu_tree, key=lambda x: x["name"]) @classmethod def get_available_endpoints( cls, user_permission_uuids: Set[str] ) -> Dict[str, Dict[str, Any]]: """Get all available endpoints based on user permissions. Args: user_permission_uuids: Set of UUIDs the user has permission for Returns: Dict mapping event UUIDs to endpoint configurations """ available_endpoints = {} for method_class in cls._method_classes.values(): if page_info := method_class.get_page_info_with_permissions( user_permission_uuids, include_endpoints=True ): if endpoints := page_info.get("available_endpoints"): available_endpoints.update(endpoints) return available_endpoints """ Example usage # Register your MethodToEvent classes registry = EventMethodRegistry() registry.register_method_class(AccountEventMethods) registry.register_method_class(AccountDetailsEventMethods) # Get complete menu structure user_permissions = { "uuid1", "uuid2", "uuid3" } menu_items = registry.get_all_menu_items(user_permissions, include_endpoints=True) # Result: [ { "name": "AccountPage", "title": {"tr": "Hesaplar", "en": "Accounts"}, "icon": "User", "url": "/account", "category": "account", "type": "query", "description": "Account management operations", "available_endpoints": { "uuid1": {"path": "/api/account/view", "method": "GET"}, "uuid2": {"path": "/api/account/edit", "method": "POST"} }, "items": [ { "name": "AccountDetailsPage", "title": {"tr": "Hesap Detayları", "en": "Account Details"}, "icon": "FileText", "url": "/account/details", "parent": "AccountPage", "category": "account_details", "type": "query", "available_endpoints": { "uuid3": {"path": "/api/account/details/view", "method": "GET"} } } ] } ] # Get all available endpoints endpoints = registry.get_available_endpoints(user_permissions) # Result: { "uuid1": { "path": "/api/account/view", "method": "GET", "event_name": "view_account" }, "uuid2": { "path": "/api/account/edit", "method": "POST", "event_name": "edit_account" }, "uuid3": { "path": "/api/account/details/view", "method": "GET", "event_name": "view_details" } } # Get event UUIDs from MethodToEvent classes account_events = {uuid for uuid in AccountEventMethods.__event_keys__} # Define categories with event UUIDs PAGES_INFO = [ { "name": "AccountPage", "title": {"tr": "Hesaplar", "en": "Accounts"}, "icon": "User", "url": "/account", "pageInfo": AccountPageInfo, "allEndpoints": {"AccountPage": account_events}, "subCategories": [ { "name": "AccountDetailsPage", "title": {"tr": "Hesap Detayları", "en": "Account Details"}, "icon": "FileText", "url": "/account/details", "allEndpoints": {} # No direct endpoints, only shown if parent has permissions } ] } ] # Initialize manager manager = EventCategoryManager() manager.add_categories(PAGES_INFO) # Get menu tree based on available events user_permission_uuids = { "31f4f32f-0cd4-4995-8a6a-f9f56335848a", "ec98ef2c-bcd0-432d-a8f4-1822a56c33b2" } menu_tree = manager.get_menu_tree(user_permission_uuids) """