new api service and logic implemented

This commit is contained in:
2025-01-23 22:27:25 +03:00
parent d91ecda9df
commit 32022ca521
245 changed files with 28004 additions and 0 deletions

View File

@@ -0,0 +1,25 @@
"""
FastAPI Application Entry Point
This module initializes and configures the FastAPI application with:
- CORS middleware for cross-origin requests
- Request timing middleware for performance monitoring
- Custom exception handlers for consistent error responses
- Prometheus instrumentation for metrics
- API routers for endpoint organization
"""
import uvicorn
from prometheus_fastapi_instrumentator import Instrumentator
from app_handler import setup_middleware, get_uvicorn_config
from create_file import create_app
app = create_app() # Initialize FastAPI application
Instrumentator().instrument(app=app).expose(app=app) # Setup Prometheus metrics
setup_middleware(app) # Configure middleware and exception handlers
if __name__ == "__main__":
uvicorn_config = get_uvicorn_config() # Run the application with Uvicorn
uvicorn.Server(uvicorn.Config(**uvicorn_config)).run()

View File

@@ -0,0 +1,94 @@
"""
FastAPI Application Handler Module
This module contains all the handler functions for configuring and setting up the FastAPI application:
- CORS middleware configuration
- Exception handlers setup
- Uvicorn server configuration
"""
from typing import Dict, Any
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI, Request, HTTPException, status
from fastapi.responses import JSONResponse
from ErrorHandlers.Exceptions.api_exc import HTTPExceptionApi
from middleware.auth_middleware import RequestTimingMiddleware, LoggerTimingMiddleware
def setup_cors_middleware(app: FastAPI) -> None:
"""
Configure CORS middleware for the application.
Args:
app: FastAPI application instance
"""
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
async def generic_exception_handler(request: Request, exc: Exception) -> JSONResponse:
"""
Handle generic exceptions and return formatted error responses.
Args:
request: FastAPI request object
exc: Exception instance
Returns:
JSONResponse: Formatted error response
"""
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"detail": "Internal server error", "error_code": "INTERNAL_ERROR"},
)
def setup_exception_handlers(app: FastAPI) -> None:
"""
Configure custom exception handlers for the application.
Args:
app: FastAPI application instance
"""
from ErrorHandlers.ErrorHandlers.api_exc_handler import HTTPExceptionApiHandler
custom_exception_handler = HTTPExceptionApiHandler(response_model=JSONResponse)
app.add_exception_handler(
HTTPExceptionApi, custom_exception_handler.handle_exception
)
app.add_exception_handler(Exception, generic_exception_handler)
def setup_middleware(app: FastAPI) -> None:
"""
Configure all middleware for the application.
Args:
app: FastAPI application instance
"""
setup_cors_middleware(app)
app.add_middleware(RequestTimingMiddleware)
app.add_middleware(LoggerTimingMiddleware)
setup_exception_handlers(app)
def get_uvicorn_config() -> Dict[str, Any]:
"""
Get Uvicorn server configuration.
Returns:
Dict[str, Any]: Uvicorn configuration dictionary
"""
return {
"app": "app:app",
"host": "0.0.0.0",
"port": 41575,
"log_level": "info",
"reload": True,
}

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,21 @@
"""
Base FastAPI application configuration.
"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
def create_app() -> FastAPI:
app = FastAPI(title="API Service")
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
return app

View File

@@ -0,0 +1,94 @@
"""
FastAPI Application Factory Module
This module provides functionality to create and configure a FastAPI application with:
- Custom OpenAPI schema configuration
- Security scheme configuration for Bearer authentication
- Automatic router registration
- Response class configuration
- Security requirements for protected endpoints
"""
from typing import Any, Dict, List, Tuple
from fastapi import FastAPI, APIRouter
from fastapi.responses import JSONResponse, RedirectResponse
from fastapi.openapi.utils import get_openapi
from AllConfigs.Token.config import Auth
from AllConfigs.main import MainConfig as Config
from create_routes import get_all_routers
def setup_security_schema() -> Dict[str, Any]:
"""
Configure security schema for the OpenAPI documentation.
Returns:
Dict[str, Any]: Security schema configuration
"""
return {
"components": {
"securitySchemes": {
"Bearer Auth": {
"type": "apiKey",
"in": "header",
"name": Auth.ACCESS_TOKEN_TAG,
"description": "Enter: **'Bearer <JWT>'**, where JWT is the access token",
}
}
}
}
def configure_route_security(
path: str, method: str, schema: Dict[str, Any], protected_paths: List[str]
) -> None:
"""
Configure security requirements for a specific route.
Args:
path: Route path
method: HTTP method
schema: OpenAPI schema to modify
protected_paths: List of paths that require authentication
"""
if path in protected_paths:
if "paths" in schema and path in schema["paths"]:
if method.lower() in schema["paths"][path]:
schema["paths"][path][method.lower()]["security"] = [{"Bearer": []}]
def create_app() -> FastAPI:
"""
Create and configure a FastAPI application with dynamic route creation.
Returns:
FastAPI: Configured FastAPI application instance
"""
from open_api_creator import create_openapi_schema
# Get all routers and protected routes using the dynamic route creation
app = FastAPI(
title=Config.TITLE,
description=Config.DESCRIPTION,
default_response_class=JSONResponse,
) # Initialize FastAPI app
@app.get("/", include_in_schema=False, summary=str(Config.DESCRIPTION))
async def home() -> RedirectResponse:
"""Redirect root path to API documentation."""
return RedirectResponse(url="/docs")
# Get all routers and protected routes using the dynamic route creation
routers, protected_routes = get_all_routers()
# Include all routers
for router in routers:
app.include_router(router)
app.openapi = lambda app=app: create_openapi_schema(app)
return app

View File

@@ -0,0 +1,127 @@
"""
Route configuration and factory module.
Handles dynamic route creation based on configurations.
"""
from typing import Optional, Dict, Any, List, Callable, TypeVar, ParamSpec
P = ParamSpec("P") # For function parameters
R = TypeVar("R") # For return type
from dataclasses import dataclass
from functools import wraps
from fastapi import APIRouter, Request
from fastapi.routing import APIRoute
from middleware.auth_middleware import MiddlewareModule
from pydantic import BaseModel
from AllConfigs.main import MainConfig as Config
@dataclass
class EndpointFactoryConfig:
endpoint: str
method: str
summary: str
description: str
endpoint_function: Callable[P, R] # Now accepts any parameters and return type
is_auth_required: bool = True
is_event_required: bool = False
extra_options: Dict[str, Any] = None
def __post_init__(self):
if self.extra_options is None:
self.extra_options = {}
class EnhancedEndpointFactory:
def __init__(self, router_config: dict):
self.router = APIRouter(
prefix=router_config["prefix"],
tags=router_config["tags"],
include_in_schema=router_config.get("include_in_schema", True),
)
self.endpoints = router_config["endpoints"]
self.protected_routes: Dict[str, List[str]] = {}
def create_endpoint(self, config: EndpointFactoryConfig):
"""
Create an endpoint directly from the configuration.
Args:
config: EndpointFactoryConfig instance containing endpoint configuration
"""
endpoint_path = config.endpoint
endpoint_function = config.endpoint_function
if config.is_auth_required:
# endpoint_function = MiddlewareModule.auth_required(endpoint_function)
# Track protected routes
full_path = f"{self.router.prefix}{endpoint_path}"
if full_path not in self.protected_routes:
self.protected_routes[full_path] = []
self.protected_routes[full_path].append(config.method.lower())
# Register the endpoint with FastAPI router
getattr(self.router, config.method.lower())(
endpoint_path,
summary=config.summary,
description=config.description,
**config.extra_options,
)(endpoint_function)
def get_router(self) -> APIRouter:
"""Get the configured router."""
return self.router
def get_protected_routes(self) -> Dict[str, List[str]]:
"""Get the protected routes mapping."""
return self.protected_routes
async def health_check(request: Request):
"""Default health check endpoint."""
return {"status": "healthy", "message": "Service is running"}
async def ping_test(request: Request, service_name: str = "base-router"):
"""Default ping test endpoint."""
return {"ping": "pong", "service": service_name}
def get_all_routers() -> tuple[List[APIRouter], Dict[str, List[str]]]:
"""
Get all routers and protected routes from route configurations.
Returns:
tuple: (routers, protected_routes)
"""
from ApiEvents.route_configs import get_route_configs
routers = []
all_protected_routes = {}
# Get route configurations from the registry
route_configs = get_route_configs()
factory_all = []
for config in route_configs:
factory = EnhancedEndpointFactory(config)
# Create endpoints from configuration
for endpoint_dict in config["endpoints"]:
endpoint_config = EndpointFactoryConfig(
endpoint=endpoint_dict["endpoint"],
method=endpoint_dict["method"],
summary=endpoint_dict["summary"],
description=endpoint_dict["description"],
endpoint_function=endpoint_dict["endpoint_function"],
is_auth_required=endpoint_dict["is_auth_required"],
is_event_required=endpoint_dict["is_event_required"],
extra_options=endpoint_dict.get("extra_options", {}),
)
factory.create_endpoint(endpoint_config)
factory_all.append(endpoint_config.__dict__)
# Add router and protected routes
routers.append(factory.get_router())
all_protected_routes.update(factory.get_protected_routes())
return routers, all_protected_routes

View File

@@ -0,0 +1,14 @@
from .token_event_middleware import TokenEventMiddleware
from .auth_middleware import (
LoggerTimingMiddleware,
RequestTimingMiddleware,
MiddlewareModule,
)
__all__ = [
"TokenEventMiddleware",
"RequestTimingMiddleware",
"MiddlewareModule",
"LoggerTimingMiddleware",
]

View File

@@ -0,0 +1,162 @@
"""
Authentication and Authorization middleware for FastAPI applications.
This module provides authentication decorator for protecting endpoints
and a middleware for request timing measurements.
"""
from time import perf_counter
from typing import Callable, Optional, Dict, Any, Tuple, Union
from functools import wraps
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from ApiLibrary.common.line_number import get_line_number_for_error
from ErrorHandlers.ErrorHandlers.api_exc_handler import HTTPExceptionApi
from AllConfigs.Token.config import Auth
import inspect
class MiddlewareModule:
"""
Middleware module for handling authentication and request timing.
"""
@staticmethod
def get_user_from_request(
request: Request,
) -> dict:
"""
Get authenticated token context from request.
Args:
request: FastAPI request object
Returns:
AuthContext: Context containing the authenticated token data
Raises:
HTTPExceptionApi: If token is missing, invalid, or user not found
"""
from ApiServices.Token.token_handler import TokenService
# Get token and validate - will raise HTTPExceptionApi if invalid
redis_token = TokenService.get_access_token_from_request(request=request)
# Get token context - will validate token and raise appropriate errors
token_context = TokenService.get_object_via_access_key(access_token=redis_token)
if not token_context:
raise HTTPExceptionApi(
error_code="USER_NOT_FOUND",
lang="tr",
loc=get_line_number_for_error(),
sys_msg="TokenService: Token Context couldnt retrieved from redis",
)
return token_context
@classmethod
def auth_required(cls, func: Callable) -> Callable:
"""
Decorator for protecting FastAPI endpoints with authentication.
Usage:
@router.get("/protected")
@MiddlewareModule.auth_required
async def protected_endpoint(request: Request):
auth = protected_endpoint.auth # Access auth context
if auth.is_employee:
# Handle employee logic
employee_id = auth.token_context.employee_id
else:
# Handle occupant logic
occupant_id = auth.token_context.occupant_id
return {"user_id": auth.user_id}
Args:
func: The FastAPI route handler function to protect
Returns:
Callable: Wrapped function that checks authentication before execution
Raises:
HTTPExceptionApi: If authentication fails
"""
@wraps(func)
async def wrapper(request: Request, *args, **kwargs):
# Get and validate token context from request
# Create auth context and Attach auth context to both wrapper and original function
func.auth = cls.get_user_from_request(request)
wrapper.auth = func.auth
# Call the original endpoint function
if inspect.iscoroutinefunction(func):
return await func(request, *args, **kwargs)
return func(request, *args, **kwargs)
return wrapper
class RequestTimingMiddleware(BaseHTTPMiddleware):
"""
Middleware for measuring and logging request timing.
Only handles timing, no authentication.
"""
async def dispatch(self, request: Request, call_next: Callable) -> Response:
"""
Process each request through the middleware.
Args:
request: FastAPI request object
call_next: Next middleware in the chain
Returns:
Response: Processed response with timing headers
"""
start_time = perf_counter()
# Process the request
response = await call_next(request)
# Add timing information to response headers
end_time = perf_counter()
elapsed = (end_time - start_time) * 1000 # Convert to milliseconds
response.headers.update(
{
"request-start": f"{start_time:.6f}",
"request-end": f"{end_time:.6f}",
"request-duration": f"{elapsed:.2f}ms",
}
)
return response
class LoggerTimingMiddleware(BaseHTTPMiddleware):
"""
Middleware for measuring and logging request timing.
Only handles timing, no authentication.
"""
async def dispatch(self, request: Request, call_next: Callable) -> Response:
# Log the request
import arrow
headers = dict(request.headers)
response = await call_next(request)
# Log the response
print(
"Loggers :",
{
"url": request.url,
"method": request.method,
"access_token": headers.get(Auth.ACCESS_TOKEN_TAG, ""),
"referer": headers.get("referer", ""),
"origin": headers.get("origin", ""),
"user-agent": headers.get("user-agent", ""),
"datetime": arrow.now().format("YYYY-MM-DD HH:mm:ss ZZ"),
"status_code": response.status_code,
},
)
return response

View File

@@ -0,0 +1,297 @@
"""
Token event middleware for handling authentication and event tracking.
"""
import inspect
from functools import wraps
from typing import Callable, Dict, Any, Optional, Union
from fastapi import Request
from pydantic import BaseModel
from ApiLibrary.common.line_number import get_line_number_for_error
from ApiServices.Token.token_handler import TokenService
from ErrorHandlers.Exceptions.api_exc import HTTPExceptionApi
from Schemas.rules.rules import EndpointRestriction
from .auth_middleware import MiddlewareModule
from Schemas import Events
class EventFunctions:
def __init__(self, endpoint: str, request: Request):
self.endpoint = endpoint
self.request = request
def match_endpoint_with_accesiable_event(self) -> Optional[Dict[str, Any]]:
"""
Match an endpoint with accessible events.
Args:
endpoint: The endpoint to match
Returns:
Dict containing the endpoint registration data
None if endpoint is not found in database
"""
access_token = TokenService.get_access_token_from_request(self.request)
token_context = TokenService.get_object_via_access_key(
access_token=access_token
)
if token_context.is_employee:
reachable_event_codes: list[str] = (
token_context.selected_company.reachable_event_codes
)
elif token_context.is_occupant:
reachable_event_codes: list[str] = (
token_context.selected_occupant.reachable_event_codes
)
else:
raise HTTPExceptionApi(
error_code="",
lang="en",
loc=get_line_number_for_error(),
sys_msg="Token not found",
)
if not access_token:
raise HTTPExceptionApi(
error_code="",
lang="en",
loc=get_line_number_for_error(),
sys_msg="Token not found",
)
db = EndpointRestriction.new_session()
restriction = EndpointRestriction.filter_one(
EndpointRestriction.endpoint_name == self.endpoint,
db=db,
).data
if not restriction:
raise HTTPExceptionApi(
error_code="",
lang="en",
loc=get_line_number_for_error(),
sys_msg="Function code not found",
)
event_related = Events.filter_all(
Events.endpoint_id == restriction.id,
db=db,
).data
if not event_related:
raise HTTPExceptionApi(
error_code="",
lang="en",
loc=get_line_number_for_error(),
sys_msg="No event is registered for this user.",
)
an_event = event_related[0]
event_related_codes: list[str] = [
event.function_code for event in event_related
]
intersected_code: set = set(reachable_event_codes).intersection(
set(event_related_codes)
)
if not len(list(intersected_code)) == 1:
raise HTTPExceptionApi(
error_code="",
lang="en",
loc=get_line_number_for_error(),
sys_msg="No event is registered for this user.",
)
return {
"endpoint_url": self.endpoint,
"reachable_event_code": list(intersected_code)[0],
"class": an_event.function_class,
}
def retrieve_function_dict(self) -> Optional[Dict[str, Any]]:
"""
Retrieve function dictionary for a given endpoint.
Args:
endpoint: The endpoint to retrieve the function dictionary for
Returns:
Dictionary containing the function dictionary
None if endpoint is not found
"""
access_token = TokenService.get_access_token_from_request(self.request)
token_context = TokenService.get_object_via_access_key(
access_token=access_token
)
if token_context.is_employee:
reachable_event_codes: list[str] = (
token_context.selected_company.reachable_event_codes
)
elif token_context.is_occupant:
reachable_event_codes: list[str] = (
token_context.selected_occupant.reachable_event_codes
)
else:
raise HTTPExceptionApi(
error_code="",
lang="en",
loc=get_line_number_for_error(),
sys_msg="Token not found",
)
if not access_token:
raise HTTPExceptionApi(
error_code="",
lang="en",
loc=get_line_number_for_error(),
sys_msg="Token not found",
)
db = EndpointRestriction.new_session()
restriction = EndpointRestriction.filter_one(
EndpointRestriction.endpoint_name == self.endpoint,
db=db,
).data
if not restriction:
raise HTTPExceptionApi(
error_code="",
lang="en",
loc=get_line_number_for_error(),
sys_msg="Function code not found",
)
event_related = Events.filter_all(
Events.endpoint_id == restriction.id,
db=db,
).data
if not event_related:
raise HTTPExceptionApi(
error_code="",
lang="en",
loc=get_line_number_for_error(),
sys_msg="No event is registered for this user.",
)
an_event = event_related[0]
event_related_codes: list[str] = [
event.function_code for event in event_related
]
intersected_code: set = set(reachable_event_codes).intersection(
set(event_related_codes)
)
if not len(list(intersected_code)) == 1:
raise HTTPExceptionApi(
error_code="",
lang="en",
loc=get_line_number_for_error(),
sys_msg="No event is registered for this user.",
)
return {
"endpoint_url": self.endpoint,
"reachable_event_code": list(intersected_code)[0],
"class": an_event.function_class,
}
class TokenEventMiddleware:
"""
Module containing token and event handling functionality.
This class provides:
- Token and event context management
- Event validation decorator for endpoints
"""
@staticmethod
def event_required(
func: Callable[..., Dict[str, Any]]
) -> Callable[..., Dict[str, Any]]:
"""
Decorator for endpoints with token and event requirements.
This decorator:
1. First validates authentication using MiddlewareModule.auth_required
2. Then adds event tracking context
Args:
func: The function to be decorated
Returns:
Callable: The wrapped function with both auth and event handling
"""
# # First apply authentication
# authenticated_func = MiddlewareModule.auth_required(func)
authenticated_func = func
@wraps(authenticated_func)
async def wrapper(request: Request, *args, **kwargs) -> Dict[str, Any]:
# Get function code from the function's metadata
endpoint_url = getattr(authenticated_func, "url_of_endpoint", {})
if not endpoint_url:
raise HTTPExceptionApi(
error_code="",
lang="en",
loc=get_line_number_for_error(),
sys_msg="Function code not found",
)
# Make handler available to all functions in the chain
func.func_code = EventFunctions(
endpoint_url, request
).match_endpoint_with_accesiable_event()
# Call the authenticated function
if inspect.iscoroutinefunction(authenticated_func):
return await authenticated_func(request, *args, **kwargs)
return authenticated_func(request, *args, **kwargs)
return wrapper
@staticmethod
def validation_required(
func: Callable[..., Dict[str, Any]]
) -> Callable[..., Dict[str, Any]]:
"""
Decorator for endpoints with token and event requirements.
This decorator:
1. First validates authentication using MiddlewareModule.auth_required
2. Then adds event tracking context
Args:
func: The function to be decorated
Returns:
Callable: The wrapped function with both auth and event handling
"""
# First apply authentication
authenticated_func = MiddlewareModule.auth_required(func)
@wraps(authenticated_func)
async def wrapper(
request: Request, *args: Any, **kwargs: Any
) -> Union[Dict[str, Any], BaseModel]:
# Handle both async and sync functions
endpoint_asked = getattr(kwargs.get("data", None), "data", None).get(
"endpoint", None
)
if not endpoint_asked:
raise HTTPExceptionApi(
error_code="",
lang="en",
loc=get_line_number_for_error(),
sys_msg="Endpoint not found",
)
wrapper.validation_code = EventFunctions(
endpoint_asked, request
).retrieve_function_dict()
if inspect.iscoroutinefunction(authenticated_func):
result = await authenticated_func(request, *args, **kwargs)
else:
result = authenticated_func(request, *args, **kwargs)
function_auth = getattr(authenticated_func, "auth", None)
wrapper.auth = function_auth
func.auth = function_auth
authenticated_func.auth = function_auth
# If result is a coroutine, await it
if inspect.iscoroutine(result):
result = await result
return result
return wrapper

View File

@@ -0,0 +1,273 @@
"""
OpenAPI Schema Creator Module
This module provides functionality to create and customize OpenAPI documentation:
- Custom security schemes (Bearer Auth, API Key)
- Response schemas and examples
- Tag management and descriptions
- Error responses and validation
- Custom documentation extensions
"""
from typing import Any, Dict, List, Optional, Set
from fastapi import FastAPI, APIRouter
from fastapi.routing import APIRoute
from fastapi.openapi.utils import get_openapi
from AllConfigs.Token.config import Auth
from AllConfigs.main import MainConfig as Config
from create_routes import get_all_routers
class OpenAPISchemaCreator:
"""
OpenAPI schema creator and customizer for FastAPI applications.
"""
def __init__(self, app: FastAPI):
"""
Initialize the OpenAPI schema creator.
Args:
app: FastAPI application instance
"""
self.app = app
_, self.protected_routes = get_all_routers()
# self.tags_metadata = self._create_tags_metadata()
@staticmethod
def _create_tags_metadata() -> List[Dict[str, str]]:
"""
Create metadata for API tags.
Returns:
List[Dict[str, str]]: List of tag metadata
"""
return [
{
"name": "Authentication",
"description": "Operations related to user authentication and authorization",
},
{
"name": "Users",
"description": "User management and profile operations",
},
# Add more tags as needed
]
def _create_security_schemes(self) -> Dict[str, Any]:
"""
Create security scheme definitions.
Returns:
Dict[str, Any]: Security scheme configurations
"""
return {
"Bearer Auth": {
"type": "apiKey",
"in": "header",
"name": Auth.ACCESS_TOKEN_TAG,
"description": "Enter: **'Bearer <JWT>'**, where JWT is the access token",
}
}
def _create_common_responses(self) -> Dict[str, Any]:
"""
Create common response schemas.
Returns:
Dict[str, Any]: Common response configurations
"""
return {
"401": {
"description": "Unauthorized - Invalid or missing credentials",
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/HTTPValidationError"}
}
},
},
"403": {
"description": "Forbidden - Insufficient permissions",
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/HTTPValidationError"}
}
},
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/HTTPValidationError"}
}
},
},
"500": {
"description": "Internal Server Error",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"detail": {"type": "string"},
"error_code": {"type": "string"},
},
},
"example": {
"detail": "Internal server error occurred",
"error_code": "INTERNAL_ERROR",
},
}
},
},
}
def _process_request_body(
self, path: str, method: str, schema: Dict[str, Any]
) -> None:
"""
Process request body to include examples from model config.
Args:
path: Route path
method: HTTP method
schema: OpenAPI schema to modify
"""
try:
route_schema = schema["paths"][path][method]
if "requestBody" in route_schema:
request_body = route_schema["requestBody"]
if "content" in request_body:
content = request_body["content"]
if "application/json" in content:
json_content = content["application/json"]
if (
"schema" in json_content
and "$ref" in json_content["schema"]
):
ref = json_content["schema"]["$ref"]
model_name = ref.split("/")[-1]
if model_name in schema["components"]["schemas"]:
model_schema = schema["components"]["schemas"][
model_name
]
if "example" in model_schema:
json_content["example"] = model_schema["example"]
except KeyError:
pass
def _process_response_examples(
self, path: str, method: str, schema: Dict[str, Any]
) -> None:
"""
Process response body to include examples from model config.
Args:
path: Route path
method: HTTP method
schema: OpenAPI schema to modify
"""
try:
route_schema = schema["paths"][path][method]
if "responses" in route_schema:
responses = route_schema["responses"]
if "200" in responses:
response = responses["200"]
if "content" in response:
content = response["content"]
if "application/json" in content:
json_content = content["application/json"]
if (
"schema" in json_content
and "$ref" in json_content["schema"]
):
ref = json_content["schema"]["$ref"]
model_name = ref.split("/")[-1]
if model_name in schema["components"]["schemas"]:
model_schema = schema["components"]["schemas"][
model_name
]
if "example" in model_schema:
json_content["example"] = model_schema[
"example"
]
except KeyError:
pass
def configure_route_security(
self, path: str, method: str, schema: Dict[str, Any]
) -> None:
"""
Configure security requirements for a specific route.
Args:
path: Route path
method: HTTP method
schema: OpenAPI schema to modify
"""
# Check if route is protected based on dynamic routing info
if path in self.protected_routes and method in self.protected_routes[path]:
schema["paths"][path][method]["security"] = [
{"Bearer Auth": []},
]
schema["paths"][path][method]["responses"].update(
self._create_common_responses()
)
# Process request body examples
self._process_request_body(path, method, schema)
# Process response examples
self._process_response_examples(path, method, schema)
def create_schema(self) -> Dict[str, Any]:
"""
Create the complete OpenAPI schema.
Returns:
Dict[str, Any]: Complete OpenAPI schema
"""
openapi_schema = get_openapi(
title=Config.TITLE,
description=Config.DESCRIPTION,
version="1.1.1",
routes=self.app.routes,
)
# Add security schemes
if "components" not in openapi_schema:
openapi_schema["components"] = {}
openapi_schema["components"][
"securitySchemes"
] = self._create_security_schemes()
# Configure route security and responses
for route in self.app.routes:
if isinstance(route, APIRoute) and route.include_in_schema:
path = str(route.path)
methods = [method.lower() for method in route.methods]
for method in methods:
self.configure_route_security(path, method, openapi_schema)
# # Add custom documentation extensions
openapi_schema["x-documentation"] = {
"postman_collection": "/docs/postman",
"swagger_ui": "/docs",
"redoc": "/redoc",
}
return openapi_schema
def create_openapi_schema(app: FastAPI) -> Dict[str, Any]:
"""
Create OpenAPI schema for a FastAPI application.
Args:
app: FastAPI application instance
Returns:
Dict[str, Any]: Complete OpenAPI schema
"""
creator = OpenAPISchemaCreator(app)
return creator.create_schema()