production-evyos-systems-an.../ServicesApi/Builds/Building/endpoints/building_parts/router.py

88 lines
3.7 KiB
Python

from typing import Optional, Any
from fastapi import APIRouter, Depends
from index import endpoints_index
from events.building_parts.cluster import PartsRouterCluster
from Validations.defaults.validations import CommonHeaders
from Validations.response.pagination import PaginateOnly
from Extensions.Middlewares.token_provider import TokenProvider
from pydantic import BaseModel
parts_endpoint_route = APIRouter(prefix="/parts", tags=["Parts Cluster"])
class PartsListRequest(BaseModel):
address_gov_code: str
build_uu_id: str = None
part_no: int
part_level: int
part_code: str
part_gross_size: int
part_net_size: int
default_accessory: str
human_livable: bool
due_part_key: Optional[str] = None
part_direction_uu_id: Optional[str] = None
part_type_uu_id: Optional[str] = None
parts_list = "PartsList"
@parts_endpoint_route.post(
path="/list",
description="List all parts endpoint",
operation_id=endpoints_index[parts_list],
)
def parts_list_route(data: PaginateOnly, headers: CommonHeaders = Depends(CommonHeaders.as_dependency)):
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object)
event_key = TokenProvider.retrieve_event_codes(**event_founder_dict)
FoundCluster = PartsRouterCluster.get_event_cluster("PartsListEventCluster")
event_cluster_matched = FoundCluster.match_event(event_key=event_key)
return event_cluster_matched.event_callable(list_options=data, headers=headers)
parts_create = "PartsCreate"
@parts_endpoint_route.post(
path="/create",
description="Create part endpoint",
operation_id=endpoints_index[parts_create],
)
def parts_create_route(data: PartsListRequest, headers: CommonHeaders = Depends(CommonHeaders.as_dependency)):
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object)
event_key = TokenProvider.retrieve_event_codes(**event_founder_dict)
FoundCluster = PartsRouterCluster.get_event_cluster("PartsCreateEventCluster")
event_cluster_matched = FoundCluster.match_event(event_key=event_key)
return event_cluster_matched.event_callable(data=data, headers=headers)
parts_update = "PartsUpdate"
@parts_endpoint_route.post(
path="/update/{uu_id}",
description="Update part endpoint",
operation_id=endpoints_index[parts_update],
)
def parts_update_route(uu_id: str, data: PartsListRequest, headers: CommonHeaders = Depends(CommonHeaders.as_dependency)):
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object)
event_key = TokenProvider.retrieve_event_codes(**event_founder_dict)
FoundCluster = PartsRouterCluster.get_event_cluster("PartsUpdateEventCluster")
event_cluster_matched = FoundCluster.match_event(event_key=event_key)
return event_cluster_matched.event_callable(uu_id=uu_id, data=data, headers=headers)
parts_delete = "PartsDelete"
@parts_endpoint_route.post(
path="/delete/{uu_id}",
description="Delete part endpoint",
operation_id=endpoints_index[parts_delete],
)
def parts_delete_route(uu_id: str, headers: CommonHeaders = Depends(CommonHeaders.as_dependency)):
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object)
event_key = TokenProvider.retrieve_event_codes(**event_founder_dict)
FoundCluster = PartsRouterCluster.get_event_cluster("PartsDeleteEventCluster")
event_cluster_matched = FoundCluster.match_event(event_key=event_key)
return event_cluster_matched.event_callable(uu_id=uu_id, headers=headers)