72 lines
3.3 KiB
Python
72 lines
3.3 KiB
Python
from typing import Any
|
|
from fastapi import APIRouter, Depends
|
|
|
|
from index import endpoints_index
|
|
from events.builds.cluster import BuildRouterCluster
|
|
|
|
from Validations.defaults.validations import CommonHeaders
|
|
from Validations.response.pagination import PaginateOnly
|
|
from Extensions.Middlewares.token_provider import TokenProvider
|
|
|
|
|
|
build_endpoint_route = APIRouter(prefix="/builds", tags=["Builds Cluster"])
|
|
|
|
build_list = "BuildList"
|
|
@build_endpoint_route.post(
|
|
path="/list",
|
|
description="List all builds endpoint",
|
|
operation_id=endpoints_index[build_list],
|
|
)
|
|
def build_list_route(data: PaginateOnly, headers: CommonHeaders = Depends(CommonHeaders.as_dependency)):
|
|
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
|
|
event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object)
|
|
event_key = TokenProvider.retrieve_event_codes(**event_founder_dict)
|
|
FoundCluster = BuildRouterCluster.get_event_cluster(build_list)
|
|
event_cluster_matched = FoundCluster.match_event(event_key=event_key)
|
|
return event_cluster_matched.event_callable(list_options=data, headers=headers)
|
|
|
|
|
|
build_create = "BuildCreate"
|
|
@build_endpoint_route.post(
|
|
path="/create",
|
|
description="Create build endpoint",
|
|
operation_id=endpoints_index[build_create],
|
|
)
|
|
def build_create_route(data, headers: CommonHeaders = Depends(CommonHeaders.as_dependency)):
|
|
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
|
|
event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object)
|
|
event_key = TokenProvider.retrieve_event_codes(**event_founder_dict)
|
|
FoundCluster = BuildRouterCluster.get_event_cluster(build_create)
|
|
event_cluster_matched = FoundCluster.match_event(event_key=event_key)
|
|
return event_cluster_matched.event_callable(data=data, headers=headers)
|
|
|
|
|
|
build_update = "BuildUpdate"
|
|
@build_endpoint_route.post(
|
|
path="/update/{uu_id}",
|
|
description="Update build endpoint",
|
|
operation_id=endpoints_index[build_update],
|
|
)
|
|
def build_update_route(uu_id: str, data, headers: CommonHeaders = Depends(CommonHeaders.as_dependency)):
|
|
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
|
|
event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object)
|
|
event_key = TokenProvider.retrieve_event_codes(**event_founder_dict)
|
|
FoundCluster = BuildRouterCluster.get_event_cluster(build_update)
|
|
event_cluster_matched = FoundCluster.match_event(event_key=event_key)
|
|
return event_cluster_matched.event_callable(uu_id=uu_id, data=data, headers=headers)
|
|
|
|
|
|
build_delete = "BuildDelete"
|
|
@build_endpoint_route.post(
|
|
path="/delete/{uu_id}",
|
|
description="Delete build endpoint",
|
|
operation_id=endpoints_index[build_delete],
|
|
)
|
|
def build_delete_route(uu_id: str, headers: CommonHeaders = Depends(CommonHeaders.as_dependency)):
|
|
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
|
|
event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object)
|
|
event_key = TokenProvider.retrieve_event_codes(**event_founder_dict)
|
|
FoundCluster = BuildRouterCluster.get_event_cluster(build_delete)
|
|
event_cluster_matched = FoundCluster.match_event(event_key=event_key)
|
|
return event_cluster_matched.event_callable(uu_id=uu_id, headers=headers)
|