""" Abstract base classes for API route and event handling. This module provides core abstractions for route configuration and factory, with support for authentication and event handling. """ import uuid import inspect from typing import ( Tuple, TypeVar, Optional, Callable, Dict, Any, List, Type, ClassVar, Union, Set, ) from collections import defaultdict from dataclasses import dataclass, field from pydantic import BaseModel from fastapi import Request, Depends, APIRouter from functools import wraps from ApiLayers.ApiLibrary.common.line_number import get_line_number_for_error from ApiLayers.ErrorHandlers.Exceptions.api_exc import HTTPExceptionApi from ApiLayers.Schemas.rules.rules import EndpointRestriction ResponseModel = TypeVar("ResponseModel", bound=BaseModel) def endpoint_wrapper(url_of_endpoint: Optional[str] = None): """Create a wrapper for endpoints that stores url_of_endpoint in closure. Args: url_of_endpoint: Optional URL path for the endpoint """ def decorator(func: Callable[..., Any]) -> Callable[..., Any]: @wraps(func) async def wrapper( *args: Any, **kwargs: Any ) -> Union[Dict[str, Any], BaseModel]: # Handle both async and sync functions if inspect.iscoroutinefunction(func): result = await func(*args, **kwargs) else: result = func(*args, **kwargs) # If result is a coroutine, await it if inspect.iscoroutine(result): result = await result # Add endpoint to the result if isinstance(result, dict): result["endpoint"] = url_of_endpoint return result elif isinstance(result, BaseModel): # Convert Pydantic model to dict and add endpoint result_dict = result.model_dump() result_dict["endpoint"] = url_of_endpoint return result_dict return result wrapper.url_of_endpoint = url_of_endpoint return wrapper return decorator @dataclass class EndpointFactoryConfig: """Configuration class for API endpoints. Attributes: url_of_endpoint: Full URL path for this endpoint endpoint: URL path for this endpoint method: HTTP method (GET, POST, etc.) summary: Short description for API documentation description: Detailed description for API documentation endpoint_function: Function to handle the endpoint is_auth_required: Whether authentication is required response_model: Optional response model for OpenAPI schema request_model: Optional request model for OpenAPI schema is_event_required: Whether event handling is required extra_options: Additional endpoint options """ url_prefix: str url_endpoint: str url_of_endpoint: str endpoint: str method: str summary: str description: str endpoint_function: Callable[..., Any] # Now accepts any parameters and return type response_model: Optional[type] = None request_model: Optional[type] = None is_auth_required: bool = True is_event_required: bool = False extra_options: Dict[str, Any] = field(default_factory=dict) def __post_init__(self): """Post initialization hook. Wraps endpoint function with appropriate middleware based on configuration: - If auth and event required -> wrap with TokenEventMiddleware - If only event required -> wrap with EventMiddleware - If only auth required -> wrap with MiddlewareModule.auth_required """ # First apply auth/event middleware if self.is_event_required: from middleware import TokenEventMiddleware self.endpoint_function = TokenEventMiddleware.event_required( self.endpoint_function ) elif self.is_auth_required: from middleware import MiddlewareModule self.endpoint_function = MiddlewareModule.auth_required( self.endpoint_function ) # Then wrap with endpoint_wrapper to store url_of_endpoint self.endpoint_function = endpoint_wrapper(self.url_of_endpoint)( self.endpoint_function ) class RouteFactoryConfig: """Configuration class for API route factories. Attributes: name: Route name tags: List of tags for API documentation prefix: URL prefix for all endpoints in this route include_in_schema: Whether to include in OpenAPI schema endpoints: List of endpoint configurations extra_options: Additional route options """ def __init__( self, name: str, tags: List[str], prefix: str, include_in_schema: bool = True, endpoints: List[EndpointFactoryConfig] = None, extra_options: Dict[str, Any] = None, ): self.name = name self.tags = tags self.prefix = prefix self.include_in_schema = include_in_schema self.endpoints = endpoints or [] self.extra_options = extra_options or {} def __post_init__(self): """Validate and normalize configuration after initialization.""" if self.endpoints is None: self.endpoints = [] if self.extra_options is None: self.extra_options = {} def as_dict(self) -> Dict[str, Any]: """Convert configuration to dictionary format.""" return { "name": self.name, "tags": self.tags, "prefix": self.prefix, "include_in_schema": self.include_in_schema, "endpoints": [endpoint.__dict__ for endpoint in self.endpoints], "extra_options": self.extra_options, } class MethodToEvent: """Base class for mapping methods to API events with type safety and endpoint configuration. This class provides a framework for handling API events with proper type checking for tokens and response models, as well as managing endpoint configurations and frontend page structure. Type Parameters: TokenType: Type of authentication token ResponseModel: Type of response model Class Variables: action_key: Unique identifier for the action event_type: Type of event (e.g., 'query', 'command') event_description: Human-readable description of the event event_category: Category for grouping related events __event_keys__: Mapping of UUIDs to event names __event_validation__: Validation rules for events __endpoint_config__: API endpoint configuration __page_info__: Frontend page configuration """ action_key: ClassVar[Optional[str]] = None event_type: ClassVar[Optional[str]] = None event_description: ClassVar[str] = "" event_category: ClassVar[str] = "" __event_keys__: ClassVar[Dict[str, str]] = {} __event_validation__: Dict[str, Tuple[Type, Union[List, tuple]]] = {} __endpoint_config__: ClassVar[Dict[str, Dict[str, Any]]] = { "endpoints": {}, # Mapping of event UUIDs to endpoint configs "router_prefix": "", # Router prefix for all endpoints in this class "tags": [], # OpenAPI tags } __page_info__: ClassVar[Dict[str, Any]] = { "name": "", # Page name (e.g., "AccountPage") "title": {"tr": "", "en": ""}, # Multi-language titles "icon": "", # Icon name "url": "", # Frontend route "component": None, # Optional component name "parent": None, # Parent page name if this is a subpage } @classmethod def register_endpoint( cls, event_uuid: str, path: str, method: str = "POST", response_model: Optional[Type] = None, **kwargs, ) -> None: """Register an API endpoint configuration for an event. Args: event_uuid: UUID of the event path: Endpoint path (will be prefixed with router_prefix) method: HTTP method (default: POST) response_model: Pydantic model for response **kwargs: Additional FastAPI endpoint parameters """ if event_uuid not in cls.__event_keys__: raise ValueError(f"Event UUID {event_uuid} not found in {cls.__name__}") cls.__endpoint_config__["endpoints"][event_uuid] = { "path": path, "method": method, "response_model": response_model, **kwargs, } @classmethod def configure_router(cls, prefix: str, tags: List[str]) -> None: """Configure the API router settings. Args: prefix: Router prefix for all endpoints tags: OpenAPI tags for documentation """ cls.__endpoint_config__["router_prefix"] = prefix cls.__endpoint_config__["tags"] = tags @classmethod def configure_page( cls, name: str, title: Dict[str, str], icon: str, url: str, component: Optional[str] = None, parent: Optional[str] = None, ) -> None: """Configure the frontend page information. Args: name: Page name title: Multi-language titles (must include 'tr' and 'en') icon: Icon name url: Frontend route component: Optional component name parent: Parent page name for subpages """ required_langs = {"tr", "en"} if not all(lang in title for lang in required_langs): raise ValueError( f"Title must contain all required languages: {required_langs}" ) cls.__page_info__.update( { "name": name, "title": title, "icon": icon, "url": url, "component": component, "parent": parent, } ) @classmethod def get_endpoint_config(cls) -> Dict[str, Any]: """Get the complete endpoint configuration.""" return cls.__endpoint_config__ @classmethod def get_page_info(cls) -> Dict[str, Any]: """Get the frontend page configuration.""" return cls.__page_info__ @classmethod def has_available_events(cls, user_permission_uuids: Set[str]) -> bool: """Check if any events are available based on user permissions.""" return bool(set(cls.__event_keys__.keys()) & user_permission_uuids) @classmethod def get_page_info_with_permissions( cls, user_permission_uuids: Set[str], include_endpoints: bool = False ) -> Optional[Dict[str, Any]]: """Get page info if user has required permissions. Args: user_permission_uuids: Set of UUIDs the user has permission for include_endpoints: Whether to include available endpoint information Returns: Dict with page info if user has permissions, None otherwise """ # Check if user has any permissions for this page's events if not cls.has_available_events(user_permission_uuids): return None # Start with basic page info page_info = { **cls.__page_info__, "category": cls.event_category, "type": cls.event_type, "description": cls.event_description, } # Optionally include available endpoints if include_endpoints: available_endpoints = {} for uuid, endpoint in cls.__endpoint_config__["endpoints"].items(): if uuid in user_permission_uuids: available_endpoints[uuid] = { "path": f"{cls.__endpoint_config__['router_prefix']}{endpoint['path']}", "method": endpoint["method"], "event_name": cls.__event_keys__[uuid], } if available_endpoints: page_info["available_endpoints"] = available_endpoints return page_info @classmethod def get_events_config(cls) -> Dict[str, Any]: """Get the complete configuration including events, endpoints, and page info.""" return { "events": cls.__event_keys__, "endpoints": cls.__endpoint_config__, "page_info": cls.__page_info__, "category": cls.event_category, "type": cls.event_type, "description": cls.event_description, } @classmethod def retrieve_event_response_model(cls, function_code: str) -> Any: """Retrieve event validation for a specific function. Args: function_code: Function identifier Returns: Tuple containing response model and language models """ event_validation_list = cls.__event_validation__.get(function_code, None) if not event_validation_list: raise HTTPExceptionApi( error_code="", lang="en", loc=get_line_number_for_error(), sys_msg="Function not found", ) return event_validation_list[0] @classmethod def retrieve_event_languages(cls, function_code: str) -> Union[List, tuple]: """Retrieve event description for a specific function. Args: function_code: Function identifier Returns: Event description """ event_keys_list = cls.__event_validation__.get(function_code, None) if not event_keys_list: raise HTTPExceptionApi( error_code="", lang="en", loc=get_line_number_for_error(), sys_msg="Function not found", ) function_language_models: Union[List, tuple] = event_keys_list[1] if not function_language_models: raise HTTPExceptionApi( error_code="", lang="en", loc=get_line_number_for_error(), sys_msg="Function not found", ) return function_language_models @staticmethod def merge_models(language_model: List) -> Dict: merged_models = {"tr": {}, "en": {}} for model in language_model: for lang in dict(model).keys(): if lang not in merged_models: merged_models[lang] = model[lang] else: merged_models[lang].update(model[lang]) return merged_models @classmethod def retrieve_event_function(cls, function_code: str) -> Dict[str, str]: """Retrieve event parameters for a specific function. Args: function_code: Function identifier Returns: Dictionary of event parameters """ function_event = cls.__event_keys__[function_code] function_itself = getattr(cls, function_event, None) if not function_itself: raise HTTPExceptionApi( error_code="", lang="en", loc=get_line_number_for_error(), sys_msg="Function not found", ) return function_itself @classmethod def retrieve_language_parameters( cls, function_code: str, language: str = "tr" ) -> Dict[str, Any]: """Retrieve language-specific parameters for an event. Args: language: Language code (e.g. 'tr', 'en') function_code: Function identifier Returns: Dictionary of language-specific field mappings """ event_language_models = cls.retrieve_event_languages(function_code) event_response_model = cls.retrieve_event_response_model(function_code) event_response_model_merged = cls.merge_models(event_language_models) event_response_model_merged_lang = event_response_model_merged[language] # Map response model fields to language-specific values only_language_dict = { field: event_response_model_merged_lang[field] for field in event_response_model.model_fields if field in event_response_model_merged_lang } """ __event_validation__ : {"key": [A, B, C]} Language Model : Language Model that is model pydatnic requires Language Models : All language_models that is included in Langugage Models Section Merged Language Models : Merged with all models in list event_validation """ return { "language_model": only_language_dict, "language_models": event_response_model_merged, } class EventMethodRegistry: """Registry for mapping event method UUIDs to categories and managing permissions.""" def __init__(self): self._uuid_map: Dict[str, Tuple[Type[MethodToEvent], str]] = ( {} ) # uuid -> (method_class, event_name) self._category_events: Dict[str, Set[str]] = defaultdict( set ) # category -> set of uuids def register_method( self, category_name: str, method_class: Type[MethodToEvent] ) -> None: """Register a method class with its category.""" # Register all UUIDs from the method for event_uuid, event_name in method_class.__event_keys__.items(): self._uuid_map[event_uuid] = (method_class, event_name) self._category_events[category_name].add(event_uuid) def get_method_by_uuid( self, event_uuid: str ) -> Optional[Tuple[Type[MethodToEvent], str]]: """Get method class and event name by UUID.""" return self._uuid_map.get(event_uuid) def get_events_for_category(self, category_name: str) -> Set[str]: """Get all event UUIDs for a category.""" return self._category_events.get(category_name, set()) class EventCategory: """Base class for defining event categories similar to frontend page structure.""" def __init__( self, name: str, title: Dict[str, str], icon: str, url: str, component: Optional[str] = None, page_info: Any = None, all_endpoints: Dict[str, Set[str]] = None, # category -> set of event UUIDs sub_categories: List = None, ): self.name = name self.title = self._validate_title(title) self.icon = icon self.url = url self.component = component self.page_info = page_info self.all_endpoints = all_endpoints or {} self.sub_categories = self._process_subcategories(sub_categories or []) def _validate_title(self, title: Dict[str, str]) -> Dict[str, str]: """Validate title has required languages.""" required_langs = {"tr", "en"} if not all(lang in title for lang in required_langs): raise ValueError( f"Title must contain all required languages: {required_langs}" ) return title def _process_subcategories( self, categories: List[Union[Dict, "EventCategory"]] ) -> List["EventCategory"]: """Process subcategories ensuring they are all EventCategory instances.""" processed = [] for category in categories: if isinstance(category, dict): processed.append(EventCategory.from_dict(category)) elif isinstance(category, EventCategory): processed.append(category) else: raise ValueError(f"Invalid subcategory type: {type(category)}") return processed def has_available_events(self, user_permission_uuids: Set[str]) -> bool: """Check if category has available events based on UUID intersection.""" # Check current category's events return any( bool(events & user_permission_uuids) for events in self.all_endpoints.values() ) def get_menu_item( self, user_permission_uuids: Set[str] ) -> Optional[Dict[str, Any]]: """Get menu item if category has available events.""" # First check if this category has available events if not self.has_available_events(user_permission_uuids): return None menu_item = { "name": self.name, "title": self.title, "icon": self.icon, "url": self.url, } if self.component: menu_item["component"] = self.component # Only process subcategories if parent has permissions sub_items = [] for subcategory in self.sub_categories: if sub_menu := subcategory.get_menu_item(user_permission_uuids): sub_items.append(sub_menu) if sub_items: menu_item["items"] = sub_items return menu_item def get_available_events( self, registry: EventMethodRegistry, user_permission_uuids: Set[str] ) -> Dict[str, List[Dict[str, Any]]]: """Get available events based on user permission UUIDs.""" available_events = defaultdict(list) # Process endpoints in current category category_events = self.all_endpoints.get(self.name, set()) for event_uuid in category_events & user_permission_uuids: method_info = registry.get_method_by_uuid(event_uuid) if method_info: method_class, event_name = method_info available_events[method_class.event_type].append( { "uuid": event_uuid, "name": event_name, "description": method_class.event_description, "category": method_class.event_category, } ) # Process subcategories recursively for subcategory in self.sub_categories: sub_events = subcategory.get_available_events( registry, user_permission_uuids ) for event_type, events in sub_events.items(): available_events[event_type].extend(events) return dict(available_events) @classmethod def from_dict(cls, data: Dict[str, Any]) -> "EventCategory": """Create category from dictionary.""" return cls( name=data["name"], title=data["title"], icon=data["icon"], url=data["url"], component=data.get("component"), page_info=data.get("pageInfo"), all_endpoints=data.get("allEndpoints", {}), sub_categories=data.get("subCategories", []), ) def to_dict( self, registry: EventMethodRegistry, user_permission_uuids: Optional[Set[str]] = None, ) -> Dict[str, Any]: """Convert category to dictionary with optional permission filtering.""" result = { "name": self.name, "title": self.title, "icon": self.icon, "url": self.url, "pageInfo": self.page_info, } if user_permission_uuids is not None: # Only include endpoints and their info if user has permissions available_events = self.get_available_events( registry, user_permission_uuids ) if available_events: result["availableEvents"] = available_events result["allEndpoints"] = self.all_endpoints else: # Include all endpoints if no permissions specified result["allEndpoints"] = self.all_endpoints # Process subcategories subcategories = [ sub.to_dict(registry, user_permission_uuids) for sub in self.sub_categories ] # Only include subcategories that have available events if user_permission_uuids is None or any( "availableEvents" in sub for sub in subcategories ): result["subCategories"] = subcategories if self.component: result["component"] = self.component return result class EventCategoryManager: """Manager class for handling event categories and their relationships.""" def __init__(self): self.categories: List[EventCategory] = [] self.registry = EventMethodRegistry() def get_menu_tree(self, user_permission_uuids: Set[str]) -> List[Dict[str, Any]]: """Get menu tree based on available events.""" return [ menu_item for category in self.categories if (menu_item := category.get_menu_item(user_permission_uuids)) ] def register_category(self, category: EventCategory) -> None: """Register a category and its endpoints in the registry.""" self.categories.append(category) def add_category(self, category: Union[EventCategory, Dict[str, Any]]) -> None: """Add a new category.""" if isinstance(category, dict): category = EventCategory.from_dict(category) self.register_category(category) def add_categories( self, categories: List[Union[EventCategory, Dict[str, Any]]] ) -> None: """Add multiple categories at once.""" for category in categories: self.add_category(category) def get_category(self, name: str) -> Optional[EventCategory]: """Get category by name.""" return next((cat for cat in self.categories if cat.name == name), None) def get_all_categories( self, user_permission_uuids: Optional[Set[str]] = None ) -> List[Dict[str, Any]]: """Get all categories as dictionary, filtered by user permissions.""" return [ cat.to_dict(self.registry, user_permission_uuids) for cat in self.categories ] def get_category_endpoints(self, category_name: str) -> Set[str]: """Get all endpoint UUIDs for a category.""" category = self.get_category(category_name) return category.all_endpoints.get(category_name, set()) if category else set() def get_subcategories( self, category_name: str, user_permission_uuids: Optional[Set[str]] = None ) -> List[Dict[str, Any]]: """Get subcategories for a category.""" category = self.get_category(category_name) if not category: return [] return [ sub.to_dict(self.registry, user_permission_uuids) for sub in category.sub_categories ] def find_category_by_url(self, url: str) -> Optional[EventCategory]: """Find a category by its URL.""" for category in self.categories: if category.url == url: return category for subcategory in category.sub_categories: if subcategory.url == url: return subcategory return None class EventMethodRegistry: """Registry for all MethodToEvent classes and menu building.""" _instance = None _method_classes: Dict[str, Type[MethodToEvent]] = {} def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance @classmethod def register_method_class(cls, method_class: Type[MethodToEvent]) -> None: """Register a MethodToEvent class.""" if not issubclass(method_class, MethodToEvent): raise ValueError( f"{method_class.__name__} must be a subclass of MethodToEvent" ) page_info = method_class.get_page_info() cls._method_classes[page_info["name"]] = method_class @classmethod def get_all_menu_items( cls, user_permission_uuids: Set[str], include_endpoints: bool = False ) -> List[Dict[str, Any]]: """Get all menu items based on user permissions. Args: user_permission_uuids: Set of UUIDs the user has permission for include_endpoints: Whether to include available endpoint information Returns: List of menu items organized in a tree structure """ # First get all page infos page_infos = {} for method_class in cls._method_classes.values(): if page_info := method_class.get_page_info_with_permissions( user_permission_uuids, include_endpoints ): page_infos[page_info["name"]] = page_info # Build tree structure menu_tree = [] child_pages = set() # First pass: identify all child pages for page_info in page_infos.values(): if page_info.get("parent"): child_pages.add(page_info["name"]) # Second pass: build tree structure for name, page_info in page_infos.items(): # Skip if this is a child page if name in child_pages: continue # Start with this page's info menu_item = page_info.copy() # Find and add children children = [] for child_info in page_infos.values(): if child_info.get("parent") == name: children.append(child_info) if children: menu_item["items"] = sorted(children, key=lambda x: x["name"]) menu_tree.append(menu_item) return sorted(menu_tree, key=lambda x: x["name"]) @classmethod def get_available_endpoints( cls, user_permission_uuids: Set[str] ) -> Dict[str, Dict[str, Any]]: """Get all available endpoints based on user permissions. Args: user_permission_uuids: Set of UUIDs the user has permission for Returns: Dict mapping event UUIDs to endpoint configurations """ available_endpoints = {} for method_class in cls._method_classes.values(): if page_info := method_class.get_page_info_with_permissions( user_permission_uuids, include_endpoints=True ): if endpoints := page_info.get("available_endpoints"): available_endpoints.update(endpoints) return available_endpoints """ Example usage # Register your MethodToEvent classes registry = EventMethodRegistry() registry.register_method_class(AccountEventMethods) registry.register_method_class(AccountDetailsEventMethods) # Get complete menu structure user_permissions = { "uuid1", "uuid2", "uuid3" } menu_items = registry.get_all_menu_items(user_permissions, include_endpoints=True) # Result: [ { "name": "AccountPage", "title": {"tr": "Hesaplar", "en": "Accounts"}, "icon": "User", "url": "/account", "category": "account", "type": "query", "description": "Account management operations", "available_endpoints": { "uuid1": {"path": "/api/account/view", "method": "GET"}, "uuid2": {"path": "/api/account/edit", "method": "POST"} }, "items": [ { "name": "AccountDetailsPage", "title": {"tr": "Hesap Detayları", "en": "Account Details"}, "icon": "FileText", "url": "/account/details", "parent": "AccountPage", "category": "account_details", "type": "query", "available_endpoints": { "uuid3": {"path": "/api/account/details/view", "method": "GET"} } } ] } ] # Get all available endpoints endpoints = registry.get_available_endpoints(user_permissions) # Result: { "uuid1": { "path": "/api/account/view", "method": "GET", "event_name": "view_account" }, "uuid2": { "path": "/api/account/edit", "method": "POST", "event_name": "edit_account" }, "uuid3": { "path": "/api/account/details/view", "method": "GET", "event_name": "view_details" } } # Get event UUIDs from MethodToEvent classes account_events = {uuid for uuid in AccountEventMethods.__event_keys__} # Define categories with event UUIDs PAGES_INFO = [ { "name": "AccountPage", "title": {"tr": "Hesaplar", "en": "Accounts"}, "icon": "User", "url": "/account", "pageInfo": AccountPageInfo, "allEndpoints": {"AccountPage": account_events}, "subCategories": [ { "name": "AccountDetailsPage", "title": {"tr": "Hesap Detayları", "en": "Account Details"}, "icon": "FileText", "url": "/account/details", "allEndpoints": {} # No direct endpoints, only shown if parent has permissions } ] } ] # Initialize manager manager = EventCategoryManager() manager.add_categories(PAGES_INFO) # Get menu tree based on available events user_permission_uuids = { "31f4f32f-0cd4-4995-8a6a-f9f56335848a", "ec98ef2c-bcd0-432d-a8f4-1822a56c33b2" } menu_tree = manager.get_menu_tree(user_permission_uuids) """