prod-wag-backend-automate-s.../ApiServices/ManagementService/Endpoints/application/route.py

155 lines
5.2 KiB
Python

from fastapi import APIRouter, Depends
from ApiControllers.abstracts.default_validations import CommonHeaders
from ApiControllers.providers.token_provider import TokenProvider
from Controllers.Postgres.pagination import PaginateOnly, Pagination, PaginationResult
from Controllers.Postgres.response import EndpointResponse
from Schemas import Applications
from Validations.application.validations import (
RequestApplication,
)
# Create API router
application_route = APIRouter(prefix="/application", tags=["Application Management"])
@application_route.post(
path="/list",
description="List applications endpoint",
operation_id="application-list",
)
def application_list_route(
list_options: PaginateOnly,
headers: CommonHeaders = Depends(CommonHeaders.as_dependency),
):
"""
List applications with pagination and filtering options
"""
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
list_options = PaginateOnly(**list_options.model_dump())
with Applications.new_session() as db_session:
if list_options.query:
applications_list = Applications.filter_all(
*Applications.convert(list_options.query), db=db_session
)
else:
applications_list = Applications.filter_all(db=db_session)
pagination = Pagination(data=applications_list)
pagination.change(**list_options.model_dump())
pagination_result = PaginationResult(
data=applications_list,
pagination=pagination,
)
return EndpointResponse(
message="MSG0003-LIST",
pagination_result=pagination_result,
).response
@application_route.post(
path="/create",
description="Create application endpoint",
operation_id="application-create",
)
def application_create_route(
data: RequestApplication,
headers: CommonHeaders = Depends(CommonHeaders.as_dependency),
):
"""
Create a new application
"""
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
with Applications.new_session() as db_session:
created_application_dict = data.model_dump()
created_application = Applications.find_or_create(
db=db_session,
include_args=[
Applications.application_for == data.application_for,
Applications.application_code == data.application_code,
Applications.site_url == data.site_url,
]
** created_application_dict,
)
if created_application.meta_data.created:
return EndpointResponse(
message="MSG0001-INSERT",
data=created_application,
).response
return EndpointResponse(
message="MSG0002-FOUND",
data=created_application,
).response
@application_route.post(
path="/update/{application_uuid}",
description="Update application endpoint",
operation_id="application-update",
)
def application_update_route(
data: RequestApplication,
application_uuid: str,
headers: CommonHeaders = Depends(CommonHeaders.as_dependency),
):
"""
Update an existing application
"""
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
with Applications.new_session() as db_session:
updated_application_dict = data.model_dump()
found_application = Applications.filter_one(
Applications.uu_id == application_uuid, db=db_session
).data
if not found_application:
return EndpointResponse(
message="MSG0002-FOUND",
data=found_application,
).response
updated_application = found_application.update(**updated_application_dict)
updated_application.save(db_session)
if updated_application.meta_data.updated:
return EndpointResponse(
message="MSG0003-UPDATE",
data=updated_application,
).response
return EndpointResponse(
message="MSG0003-UPDATE",
data=updated_application,
).response
@application_route.delete(
path="/{application_uuid}",
description="Delete application endpoint",
operation_id="application-delete",
)
def application_delete_route(
application_uuid: str,
headers: CommonHeaders = Depends(CommonHeaders.as_dependency),
):
"""
Delete application by ID
"""
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
with Applications.new_session() as db_session:
found_application = Applications.filter_one(
Applications.uu_id == application_uuid, db=db_session
).data
if not found_application:
return EndpointResponse(
message="MSG0002-FOUND",
data=found_application,
).response
found_application.destroy(db_session)
Applications.save(db_session)
if found_application.meta_data.deleted:
return EndpointResponse(
message="MSG0004-DELETE",
data=found_application,
).response
return EndpointResponse(
message="MSG0004-DELETE",
data=found_application,
).response