base context for wrappers updated

This commit is contained in:
berkay 2025-01-17 20:45:41 +03:00
parent 628f6bd483
commit d6785ed36f
20 changed files with 225 additions and 196 deletions

View File

@ -15,7 +15,9 @@ from ApiValidations.Custom.token_objects import CompanyToken
from ApiValidations.Request.authentication import (
Login,
EmployeeSelectionValidation,
OccupantSelectionValidation, OccupantSelection, EmployeeSelection,
OccupantSelectionValidation,
OccupantSelection,
EmployeeSelection,
)
from ErrorHandlers import HTTPExceptionApi
from Schemas.company.company import Companies
@ -39,10 +41,7 @@ from .models import (
if TYPE_CHECKING:
from fastapi import Request
from ApiServices.Token.token_handler import (
OccupantTokenObject,
EmployeeTokenObject
)
from ApiServices.Token.token_handler import OccupantTokenObject, EmployeeTokenObject
# Type aliases for common types
TokenDictType = Union["EmployeeTokenObject", "OccupantTokenObject"]
@ -116,7 +115,7 @@ class AuthenticationSelectEventMethods(MethodToEvent):
error_code="HTTP_400_BAD_REQUEST",
lang=token_dict.lang,
loc=get_line_number_for_error(),
sys_msg="Company not found in token"
sys_msg="Company not found in token",
)
selected_company = Companies.filter_one(
Companies.uu_id == data.company_uu_id,
@ -127,7 +126,7 @@ class AuthenticationSelectEventMethods(MethodToEvent):
error_code="HTTP_400_BAD_REQUEST",
lang=token_dict.lang,
loc=get_line_number_for_error(),
sys_msg="Company not found in token"
sys_msg="Company not found in token",
)
# Get department IDs for the company
@ -142,12 +141,17 @@ class AuthenticationSelectEventMethods(MethodToEvent):
# Get duties IDs for the company
duties_ids = [
duty.id
for duty in Duties.filter_all(Duties.company_id == selected_company.id, db=db_session).data
for duty in Duties.filter_all(
Duties.company_id == selected_company.id, db=db_session
).data
]
# Get staff IDs
staff_ids = [
staff.id for staff in Staff.filter_all(Staff.duties_id.in_(duties_ids), db=db_session).data
staff.id
for staff in Staff.filter_all(
Staff.duties_id.in_(duties_ids), db=db_session
).data
]
# Get employee
@ -162,7 +166,7 @@ class AuthenticationSelectEventMethods(MethodToEvent):
error_code="HTTP_400_BAD_REQUEST",
lang=token_dict.lang,
loc=get_line_number_for_error(),
sys_msg="Employee not found in token"
sys_msg="Employee not found in token",
)
# Get reachable events
@ -173,7 +177,9 @@ class AuthenticationSelectEventMethods(MethodToEvent):
# Get staff and duties
staff = Staff.filter_one(Staff.id == employee.staff_id, db=db_session).data
duties = Duties.filter_one(Duties.id == staff.duties_id, db=db_session).data
department = Departments.filter_one(Departments.id == duties.department_id, db=db_session).data
department = Departments.filter_one(
Departments.id == duties.department_id, db=db_session
).data
# Get bulk duty
bulk_id = Duty.filter_by_one(system=True, duty_code="BULK", db=db_session).data
@ -200,11 +206,16 @@ class AuthenticationSelectEventMethods(MethodToEvent):
reachable_event_list_id=reachable_event_list_id,
)
try: # Update Redis
update_token = TokenService.update_token_at_redis(request=request, add_payload=company_token)
update_token = TokenService.update_token_at_redis(
request=request, add_payload=company_token
)
return update_token
except Exception as e:
raise HTTPExceptionApi(
error_code="", lang="en", loc=get_line_number_for_error(), sys_msg=f"{e}"
error_code="",
lang="en",
loc=get_line_number_for_error(),
sys_msg=f"{e}",
)
@classmethod

View File

@ -42,7 +42,11 @@ from .models import (
RememberRequestModel,
)
from ApiEvents.base_request_model import DictRequestModel, EndpointBaseRequestModel
from ApiEvents.abstract_class import RouteFactoryConfig, EndpointFactoryConfig, endpoint_wrapper
from ApiEvents.abstract_class import (
RouteFactoryConfig,
EndpointFactoryConfig,
endpoint_wrapper,
)
if TYPE_CHECKING:
from fastapi import Request
@ -51,6 +55,7 @@ from ApiValidations.Custom.token_objects import EmployeeTokenObject, OccupantTok
# Type aliases for common types
@endpoint_wrapper("/authentication/select")
async def authentication_select_company_or_occupant_type(
request: "Request",
@ -64,7 +69,7 @@ async def authentication_select_company_or_occupant_type(
data = EmployeeSelection(**data.data)
elif data.data.get("build_living_space_uu_id"):
data = OccupantSelection(**data.data)
if r := await AuthenticationSelectEventMethods.authentication_select_company_or_occupant_type(
if await AuthenticationSelectEventMethods.authentication_select_company_or_occupant_type(
request=request, data=data, token_dict=auth_dict
):
if isinstance(data, EmployeeSelection):
@ -100,7 +105,6 @@ async def authentication_check_token_is_valid(
}
@endpoint_wrapper("/authentication/refresh")
async def authentication_refresh_user_info(
request: "Request",
@ -139,6 +143,7 @@ async def authentication_create_password(
"status": "OK",
}
@endpoint_wrapper("/authentication/forgot-password")
async def authentication_forgot_password(
request: "Request",
@ -151,6 +156,7 @@ async def authentication_forgot_password(
"status": "OK",
}
@endpoint_wrapper("/authentication/reset-password")
async def authentication_reset_password(
request: "Request",
@ -163,6 +169,7 @@ async def authentication_reset_password(
"status": "OK",
}
@endpoint_wrapper("/authentication/disconnect")
async def authentication_disconnect_user(
request: "Request",

View File

@ -12,15 +12,16 @@ if TYPE_CHECKING:
class TokenObjectBase(BaseModel):
"""Base model for token objects."""
user_type: str = Field(..., description="Type of user")
user_id: str = Field(..., description="User ID")
token: str = Field(..., description="Authentication token")
permissions: Dict[str, Any] = Field(default_factory=dict, description="User permissions")
permissions: Dict[str, Any] = Field(description="User permissions")
class LoginData(TypedDict):
"""Type for login data."""
domain: str
access_key: str
password: str
@ -29,13 +30,14 @@ class LoginData(TypedDict):
class LoginRequestModel(BaseRequestModel[LoginData]):
"""Request model for login endpoint."""
model_config = ConfigDict(
json_schema_extra={
"example": {
"domain": "example.com",
"access_key": "user@example",
"password": "password",
"remember_me": False
"remember_me": False,
}
}
)
@ -43,96 +45,90 @@ class LoginRequestModel(BaseRequestModel[LoginData]):
class LogoutData(TypedDict):
"""Type for logout data."""
token: str
class LogoutRequestModel(BaseRequestModel[LogoutData]):
"""Request model for logout endpoint."""
model_config = ConfigDict(
json_schema_extra={
"example": {
"token": "your-token-here"
}
}
json_schema_extra={"example": {"token": "your-token-here"}}
)
class RememberData(TypedDict):
"""Type for remember token data."""
remember_token: str
class RememberRequestModel(BaseRequestModel[RememberData]):
"""Request model for remember token endpoint."""
model_config = ConfigDict(
json_schema_extra={
"example": {
"remember_token": "your-remember-token-here"
}
}
json_schema_extra={"example": {"remember_token": "your-remember-token-here"}}
)
class ForgotData(TypedDict):
"""Type for forgot password data."""
email: str
domain: str
class ForgotRequestModel(BaseRequestModel[ForgotData]):
"""Request model for forgot password endpoint."""
model_config = ConfigDict(
json_schema_extra={
"example": {
"email": "user@example.com",
"domain": "example.com"
}
"example": {"email": "user@example.com", "domain": "example.com"}
}
)
class ChangePasswordData(TypedDict):
"""Type for change password data."""
old_password: str
new_password: str
class ChangePasswordRequestModel(BaseRequestModel[ChangePasswordData]):
"""Request model for change password endpoint."""
model_config = ConfigDict(
json_schema_extra={
"example": {
"old_password": "old-pass",
"new_password": "new-pass"
}
"example": {"old_password": "old-pass", "new_password": "new-pass"}
}
)
class CreatePasswordData(TypedDict):
"""Type for create password data."""
token: str
password: str
class CreatePasswordRequestModel(BaseRequestModel[CreatePasswordData]):
"""Request model for create password endpoint."""
model_config = ConfigDict(
json_schema_extra={
"example": {
"token": "password-creation-token",
"password": "new-password"
}
"example": {"token": "password-creation-token", "password": "new-password"}
}
)
class SelectionDataOccupant(BaseModel):
"""Type for selection data."""
build_living_space_uu_id: Optional[str]
class SelectionDataEmployee(BaseModel):
"""Type for selection data."""
company_uu_id: Optional[str]
company_uu_id: Optional[str]

View File

@ -3,7 +3,11 @@ Account records endpoint configurations.
"""
from ApiEvents.abstract_class import RouteFactoryConfig, EndpointFactoryConfig, endpoint_wrapper
from ApiEvents.abstract_class import (
RouteFactoryConfig,
EndpointFactoryConfig,
endpoint_wrapper,
)
from ApiEvents.base_request_model import EndpointBaseRequestModel
from Services.PostgresDb.Models.alchemy_response import DictJsonResponse
@ -37,7 +41,7 @@ async def address_create(request: "Request", data: EndpointBaseRequestModel):
async def address_search(request: "Request", data: EndpointBaseRequestModel):
"""Handle address search endpoint."""
auth_dict = address_search.auth
code_dict = getattr(address_search, 'func_code', {"function_code": None})
code_dict = getattr(address_search, "func_code", {"function_code": None})
return {"auth_dict": auth_dict, "code_dict": code_dict, "data": data}
@ -69,6 +73,7 @@ async def address_update(
}
)
prefix = "/account/records"
# Account Records Router Configuration

View File

@ -16,12 +16,13 @@ if TYPE_CHECKING:
class AddressUpdateRequest(RootModel[Dict[str, Any]]):
"""Request model for address update."""
model_config = {
"json_schema_extra": {
"example": {
"street": "123 Main St",
"city": "Example City",
"country": "Example Country"
"country": "Example Country",
}
}
}
@ -29,6 +30,7 @@ class AddressUpdateRequest(RootModel[Dict[str, Any]]):
class AddressUpdateResponse(BaseModel):
"""Response model for address update."""
address_uu_id: str = Field(..., description="UUID of the updated address")
data: Dict[str, Any] = Field(..., description="Updated address data")
function_code: str = Field(..., description="Function code for the endpoint")
@ -36,14 +38,17 @@ class AddressUpdateResponse(BaseModel):
class InsertAccountRecordRequestModel(BaseRequestModel["InsertAccountRecord"]):
"""Request model for inserting account records."""
pass
class UpdateAccountRecordRequestModel(BaseRequestModel["UpdateAccountRecord"]):
"""Request model for updating account records."""
pass
class ListOptionsRequestModel(BaseRequestModel["ListOptions"]):
"""Request model for list options."""
pass

View File

@ -49,7 +49,7 @@ def endpoint_wrapper(url_of_endpoint: Optional[str] = None):
# If result is a coroutine, await it
if inspect.iscoroutine(result):
result = await result
print('result', result)
# Add endpoint to the result
if isinstance(result, dict):
result["endpoint"] = url_of_endpoint
@ -110,11 +110,13 @@ class EndpointFactoryConfig:
# First apply auth/event middleware
if self.is_event_required:
from middleware import TokenEventMiddleware
self.endpoint_function = TokenEventMiddleware.event_required(
self.endpoint_function
)
elif self.is_auth_required:
from middleware import MiddlewareModule
self.endpoint_function = MiddlewareModule.auth_required(
self.endpoint_function
)

View File

@ -15,6 +15,7 @@ T = TypeVar("T")
class EndpointBaseRequestModel(BaseModel):
data: dict = Field(..., description="Data to be sent with the request")
class Config:
json_schema_extra = {
"data": {
@ -25,13 +26,17 @@ class EndpointBaseRequestModel(BaseModel):
class BaseRequestModel(RootModel[T], Generic[T]):
"""Base model for all API requests."""
model_config = ConfigDict(
json_schema_extra={"example": {"base": "example"}} # Will be populated by subclasses
json_schema_extra={
"example": {"base": "example"}
} # Will be populated by subclasses
)
class DictRequestModel(RootModel[Dict[str, Any]]):
"""Request model for endpoints that accept dictionary data."""
model_config = ConfigDict(
json_schema_extra={
"example": {
@ -45,6 +50,7 @@ class DictRequestModel(RootModel[Dict[str, Any]]):
class SuccessResponse(BaseModel):
"""Standard success response model."""
token: str = Field(..., example="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...")
user_info: Dict[str, Any] = Field(
...,

View File

@ -30,7 +30,6 @@ from Schemas import (
Departments,
OccupantTypes,
)
from Services.Redis.Models.base import RedisRow
from Services.Redis.Models.response import RedisResponse
if TYPE_CHECKING:
@ -39,7 +38,6 @@ if TYPE_CHECKING:
T = TypeVar("T", EmployeeTokenObject, OccupantTokenObject)
class TokenService:
"""Service class for handling authentication tokens and user sessions."""
@ -232,9 +230,7 @@ class TokenService:
"company_address": company_address,
}
)
person = People.filter_one(
People.id == user.person_id, db=db_session
).data
person = People.filter_one(People.id == user.person_id, db=db_session).data
model_value = EmployeeTokenObject(
domain=domain,
user_type=UserType.employee.value,
@ -269,7 +265,6 @@ class TokenService:
"""Remove all tokens for a user with specific domain."""
redis_rows = cls._get_user_tokens(user)
for redis_row in redis_rows.all:
print('redis_row', redis_row.data)
if redis_row.data.get("domain") == domain:
RedisActions.delete_key(redis_row.key)
@ -340,20 +335,24 @@ class TokenService:
"""Update token at Redis."""
access_token = cls.get_access_token_from_request(request=request)
token_object = cls.get_object_via_access_key(access_token=access_token)
if isinstance(token_object, EmployeeTokenObject) and isinstance(add_payload, CompanyToken):
if isinstance(token_object, EmployeeTokenObject) and isinstance(
add_payload, CompanyToken
):
token_object.selected_company = add_payload
cls.update_object_to_redis(
access_token=access_token,
user_uu_id=token_object.user_uu_id,
model=token_object.model_dump()
model=token_object.model_dump(),
)
return token_object.selected_company.model_dump()
elif isinstance(token_object, OccupantTokenObject) and isinstance(add_payload, OccupantToken):
elif isinstance(token_object, OccupantTokenObject) and isinstance(
add_payload, OccupantToken
):
token_object.selected_occupant = add_payload
cls.update_object_to_redis(
access_token=access_token,
user_uu_id=token_object.user_uu_id,
model=token_object.model_dump()
model=token_object.model_dump(),
)
return token_object.selected_occupant.model_dump()
raise HTTPExceptionApi(

View File

@ -92,7 +92,6 @@ class OccupantTokenObject(ApplicationToken):
selected_occupant: Optional[OccupantToken] = None # Selected Occupant Type
@property
def is_employee(self) -> bool:
return False
@ -113,7 +112,6 @@ class EmployeeTokenObject(ApplicationToken):
selected_company: Optional[CompanyToken] = None # Selected Company Object
@property
def is_employee(self) -> bool:
return True

View File

@ -1,5 +1,3 @@
from ApiValidations.Request import BaseModelRegular
from typing import Optional
@ -57,11 +55,15 @@ class OccupantSelectionValidation:
class OccupantSelection(BaseModel, OccupantSelectionValidation):
build_living_space_uu_id: str = Field(..., example="987fcdeb-51a2-43e7-9876-543210987654")
build_living_space_uu_id: str = Field(
..., example="987fcdeb-51a2-43e7-9876-543210987654"
)
model_config = ConfigDict(
json_schema_extra={
"example": {"build_living_space_uu_id": "987fcdeb-51a2-43e7-9876-543210987654"}
"example": {
"build_living_space_uu_id": "987fcdeb-51a2-43e7-9876-543210987654"
}
}
)
@ -108,7 +110,7 @@ class Login(BaseModelRegular, LoginValidation):
"domain": "evyos.com.tr",
"access_key": "karatay.berkay.sup@evyos.com.tr",
"password": "string",
"remember_me": False
"remember_me": False,
}
}
)

View File

@ -13,7 +13,7 @@ from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI, Request, HTTPException, status
from fastapi.responses import JSONResponse
from ErrorHandlers.Exceptions.api_exc import HTTPExceptionApi
from middleware.auth_middleware import RequestTimingMiddleware
from middleware.auth_middleware import RequestTimingMiddleware, LoggerTimingMiddleware
def setup_cors_middleware(app: FastAPI) -> None:
@ -74,6 +74,7 @@ def setup_middleware(app: FastAPI) -> None:
"""
setup_cors_middleware(app)
app.add_middleware(RequestTimingMiddleware)
app.add_middleware(LoggerTimingMiddleware)
setup_exception_handlers(app)

View File

@ -68,6 +68,7 @@ def create_app() -> FastAPI:
"""
from open_api_creator import create_openapi_schema
# Get all routers and protected routes using the dynamic route creation
app = FastAPI(
@ -89,4 +90,5 @@ def create_app() -> FastAPI:
app.include_router(router)
app.openapi = lambda app=app: create_openapi_schema(app)
return app

View File

@ -1,5 +1,14 @@
from .token_event_middleware import TokenEventMiddleware
from .auth_middleware import RequestTimingMiddleware, MiddlewareModule
from .auth_middleware import (
LoggerTimingMiddleware,
RequestTimingMiddleware,
MiddlewareModule,
)
__all__ = ["TokenEventMiddleware", "RequestTimingMiddleware", "MiddlewareModule"]
__all__ = [
"TokenEventMiddleware",
"RequestTimingMiddleware",
"MiddlewareModule",
"LoggerTimingMiddleware",
]

View File

@ -8,53 +8,15 @@ and a middleware for request timing measurements.
from time import perf_counter
from typing import Callable, Optional, Dict, Any, Tuple, Union
from functools import wraps
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi import Request, Response
from AllConfigs.Token.config import Auth
from starlette.middleware.base import BaseHTTPMiddleware
from ApiLibrary.common.line_number import get_line_number_for_error
from ErrorHandlers.ErrorHandlers.api_exc_handler import HTTPExceptionApi
from .base_context import BaseContext
from ApiServices.Token.token_handler import OccupantTokenObject, EmployeeTokenObject
import inspect
class AuthContext(BaseContext):
"""
Context class for authentication middleware.
Extends BaseContext to provide authentication-specific functionality.
"""
def __init__(
self, token_context: Union[OccupantTokenObject, EmployeeTokenObject]
) -> None:
super().__init__()
self.token_context = token_context
@property
def is_employee(self) -> bool:
"""Check if authenticated token is for an employee."""
return isinstance(self.token_context, EmployeeTokenObject)
@property
def is_occupant(self) -> bool:
"""Check if authenticated token is for an occupant."""
return isinstance(self.token_context, OccupantTokenObject)
@property
def user_id(self) -> str:
"""Get the user's UUID from token context."""
return self.token_context.user_uu_id if self.token_context else None
def as_dict(self):
if not isinstance(self.token_context, dict):
return self.token_context.model_dump()
return self.token_context
def __repr__(self) -> str:
user_type = "Employee" if self.is_employee else "Occupant"
return f"AuthContext({user_type}Token: {self.user_id})"
class MiddlewareModule:
"""
Middleware module for handling authentication and request timing.
@ -124,7 +86,9 @@ class MiddlewareModule:
async def wrapper(request: Request, *args, **kwargs):
# Get and validate token context from request
# Create auth context and Attach auth context to both wrapper and original function
func.auth = cls.get_user_from_request(request) # This ensures the context is available in both places
func.auth = cls.get_user_from_request(
request
) # This ensures the context is available in both places
# Call the original endpoint function
if inspect.iscoroutinefunction(func):
return await func(request, *args, **kwargs)
@ -167,3 +131,20 @@ class RequestTimingMiddleware(BaseHTTPMiddleware):
)
return response
class LoggerTimingMiddleware(BaseHTTPMiddleware):
"""
Middleware for measuring and logging request timing.
Only handles timing, no authentication.
"""
async def dispatch(self, request: Request, call_next: Callable) -> Response:
# Log the request
print(f"Handling request: {request.method} {request.url}")
response = await call_next(request)
# Log the response
print(
f"Completed request: {request.method} {request.url} with status {response.status_code}"
)
return response

View File

@ -1,47 +0,0 @@
"""Base context for middleware."""
from typing import Optional, Dict, Any, Union, TYPE_CHECKING
if TYPE_CHECKING:
from ApiServices.Token.token_handler import OccupantTokenObject, EmployeeTokenObject
class BaseContext:
"""Base context class for middleware."""
def __init__(self) -> None:
self._token_context: Optional[
Union["OccupantTokenObject", "EmployeeTokenObject"]
] = None
self._function_code: Optional[str] = None
@property
def token_context(
self,
) -> Optional[Union["OccupantTokenObject", "EmployeeTokenObject"]]:
"""Get token context if available."""
return self._token_context
@token_context.setter
def token_context(
self, value: Union["OccupantTokenObject", "EmployeeTokenObject"]
) -> None:
"""Set token context."""
self._token_context = value
@property
def function_code(self) -> Optional[str]:
"""Get function code if available."""
return self._function_code
@function_code.setter
def function_code(self, value: str) -> None:
"""Set function code."""
self._function_code = value
def to_dict(self) -> Dict[str, Any]:
"""Convert context to dictionary."""
return {
"token_context": self._token_context,
"function_code": self._function_code,
}

View File

@ -5,24 +5,9 @@ Token event middleware for handling authentication and event tracking.
from functools import wraps
from typing import Callable, Dict, Any
from .auth_middleware import MiddlewareModule
from .base_context import BaseContext
import inspect
class TokenEventHandler(BaseContext):
"""Handler for token events with authentication context."""
def __init__(self, func: Callable, url_of_endpoint: str):
"""Initialize the handler with function and URL."""
super().__init__()
self.func = func
self.url_of_endpoint = url_of_endpoint
def update_context(self, function_code: str):
"""Update the event context with function code."""
self.function_code = function_code
class TokenEventMiddleware:
"""
Module containing token and event handling functionality.
@ -54,7 +39,9 @@ class TokenEventMiddleware:
@wraps(authenticated_func)
async def wrapper(*args, **kwargs) -> Dict[str, Any]:
# Create handler with context
function_code = "7192c2aa-5352-4e36-98b3-dafb7d036a3d" # Keep function_code as URL
function_code = (
"7192c2aa-5352-4e36-98b3-dafb7d036a3d" # Keep function_code as URL
)
# Make handler available to all functions in the chain
func.func_code = {"function_code": function_code}
@ -62,4 +49,5 @@ class TokenEventMiddleware:
if inspect.iscoroutinefunction(authenticated_func):
return await authenticated_func(*args, **kwargs)
return authenticated_func(*args, **kwargs)
return wrapper

View File

@ -123,7 +123,9 @@ class OpenAPISchemaCreator:
},
}
def _process_request_body(self, path: str, method: str, schema: Dict[str, Any]) -> None:
def _process_request_body(
self, path: str, method: str, schema: Dict[str, Any]
) -> None:
"""
Process request body to include examples from model config.
@ -140,17 +142,24 @@ class OpenAPISchemaCreator:
content = request_body["content"]
if "application/json" in content:
json_content = content["application/json"]
if "schema" in json_content and "$ref" in json_content["schema"]:
if (
"schema" in json_content
and "$ref" in json_content["schema"]
):
ref = json_content["schema"]["$ref"]
model_name = ref.split("/")[-1]
if model_name in schema["components"]["schemas"]:
model_schema = schema["components"]["schemas"][model_name]
model_schema = schema["components"]["schemas"][
model_name
]
if "example" in model_schema:
json_content["example"] = model_schema["example"]
except KeyError:
pass
def _process_response_examples(self, path: str, method: str, schema: Dict[str, Any]) -> None:
def _process_response_examples(
self, path: str, method: str, schema: Dict[str, Any]
) -> None:
"""
Process response body to include examples from model config.
@ -169,13 +178,20 @@ class OpenAPISchemaCreator:
content = response["content"]
if "application/json" in content:
json_content = content["application/json"]
if "schema" in json_content and "$ref" in json_content["schema"]:
if (
"schema" in json_content
and "$ref" in json_content["schema"]
):
ref = json_content["schema"]["$ref"]
model_name = ref.split("/")[-1]
if model_name in schema["components"]["schemas"]:
model_schema = schema["components"]["schemas"][model_name]
model_schema = schema["components"]["schemas"][
model_name
]
if "example" in model_schema:
json_content["example"] = model_schema["example"]
json_content["example"] = model_schema[
"example"
]
except KeyError:
pass
@ -223,7 +239,9 @@ class OpenAPISchemaCreator:
if "components" not in openapi_schema:
openapi_schema["components"] = {}
openapi_schema["components"]["securitySchemes"] = self._create_security_schemes()
openapi_schema["components"][
"securitySchemes"
] = self._create_security_schemes()
# Configure route security and responses
for route in self.app.routes:
if isinstance(route, APIRoute) and route.include_in_schema:

View File

@ -281,19 +281,16 @@ class Build(CrudCollection, SelectActionWithEmployee):
def update_action(cls, data: UpdateBuild, build_uu_id: str, token):
from Schemas import Addresses
print("data_dict", data.dump())
data_dict = data.excluded_dump()
db = Addresses.new_session()
if data.address_uu_id:
official_address = Addresses.filter_one(
Addresses.uu_id == data.address_uu_id
).data
Addresses.uu_id == data.address_uu_id, db=db
).first
data_dict["address_id"] = official_address.id if official_address else None
print("data_dict", data_dict)
if build_to_update := cls.filter_one(cls.uu_id == build_uu_id).data:
print("build_to_update", build_to_update.get_dict())
if build_to_update := cls.filter_one(cls.uu_id == build_uu_id, db=db).first:
updated_build = build_to_update.update(**data_dict)
updated_build.save()
print("updated_build", updated_build.get_dict())
return updated_build
@property

View File

@ -69,15 +69,22 @@ class RedisActions:
redis_row = RedisRow()
redis_row.merge(set_values=list_keys)
redis_row.feed(value)
redis_row.expires_at_string = None
redis_row.expires_at = None
try:
if expires:
redis_row.expires_at = expires
expiry_time = cls.get_expiry_time(expiry_kwargs=expires)
redis_cli.setex(
name=redis_row.redis_key,
time=expiry_time,
value=redis_row.value,
)
redis_row.expires_at = str(arrow.now().shift(seconds=expiry_time).format(MainConfig.DATETIME_FORMAT))
redis_row.expires_at_string = str(
arrow.now()
.shift(seconds=expiry_time)
.format(MainConfig.DATETIME_FORMAT)
)
else:
redis_cli.set(name=redis_row.redis_key, value=redis_row.value)
@ -126,7 +133,7 @@ class RedisActions:
return RedisResponse(
status=False,
message="Value is not get successfully.",
data=list_of_rows
data=list_of_rows,
)
except Exception as e:
return RedisResponse(
@ -134,4 +141,3 @@ class RedisActions:
message="Value is not get successfully.",
error=str(e),
)

View File

@ -45,7 +45,19 @@ class RedisRow:
key: ClassVar[Union[str, bytes]]
value: ClassVar[Any]
delimiter: ClassVar[str] = ":"
expires_at: ClassVar[Optional[str]] = None
expires_at: Optional[dict] = {"seconds": 60 * 60 * 30}
expires_at_string: Optional[str]
@classmethod
def get_expiry_time(cls) -> int | None:
"""Calculate expiry time in seconds from kwargs."""
time_multipliers = {"days": 86400, "hours": 3600, "minutes": 60, "seconds": 1}
if cls.expires_at:
return sum(
int(cls.expires_at.get(unit, 0)) * multiplier
for unit, multiplier in time_multipliers.items()
)
return
@classmethod
def merge(cls, set_values: List[Union[str, bytes]]) -> None:
@ -108,7 +120,7 @@ class RedisRow:
# Add wildcard if first key was None
if list_keys[0] is None:
pattern = f"*{cls.delimiter}{pattern}"
if '*' not in pattern:
if "*" not in pattern:
pattern = f"{pattern}:*"
return pattern
@ -182,6 +194,37 @@ class RedisRow:
cls.feed({**current_data, **add_dict})
@classmethod
def save(cls):
"""
Save the data to Redis with optional expiration.
Raises:
RedisKeyError: If key is not set
RedisValueError: If value is not set
"""
import arrow
from Services.Redis.conn import redis_cli
if not cls.key:
raise RedisKeyError("Cannot save data without a key")
if not cls.value:
raise RedisValueError("Cannot save empty data")
if cls.expires_at:
redis_cli.setex(name=cls.redis_key, time=cls.expires_at, value=cls.value)
cls.expires_at_string = str(
arrow.now()
.shift(seconds=cls.get_expiry_time())
.format("YYYY-MM-DD HH:mm:ss")
)
return cls.value
redis_cli.set(name=cls.redis_key, value=cls.value)
cls.expires_at = None
cls.expires_at_string = None
return cls.value
@classmethod
def remove(cls, key: str) -> None:
"""