wag-managment-api-service-v.../ApiEvents/EventServiceApi/utils.py

52 lines
1.5 KiB
Python

"""
Utility functions for API event handling.
"""
from functools import wraps
from inspect import signature
from typing import Callable, TypeVar, ParamSpec, Any
from fastapi import Request
from Services.PostgresDb.Models.token_models import parse_token_object_to_dict
P = ParamSpec('P')
R = TypeVar('R')
def with_token_event(func: Callable[P, R]) -> Callable[P, R]:
"""
Decorator that handles token parsing and event execution.
This decorator:
1. Parses the token from the request
2. Calls the appropriate event with the token and other arguments
Args:
func: The endpoint function to wrap
Returns:
Wrapped function that handles token parsing and event execution
"""
@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
# Extract request from args or kwargs
request = next(
(arg for arg in args if isinstance(arg, Request)),
kwargs.get('request')
)
if not request:
raise ValueError("Request object not found in arguments")
# Parse token
token_dict = parse_token_object_to_dict(request=request)
# Add token_dict to kwargs
kwargs['token_dict'] = token_dict
# Call the original function
return token_dict.available_event(**{
k: v for k, v in kwargs.items()
if k in signature(token_dict.available_event).parameters
})
return wrapper