api added

This commit is contained in:
2025-05-30 21:10:44 +03:00
parent c44a724a05
commit e5829f0525
72 changed files with 5576 additions and 0 deletions

View File

@@ -0,0 +1,7 @@
from .people.cluster import PeopleRouterCluster
from .user.cluster import UserRouterCluster
__all__ = [
"PeopleRouterCluster",
"UserRouterCluster",
]

View File

@@ -0,0 +1,27 @@
from api_initializer.event_clusters import EventCluster, RouterCluster
from index import endpoints_index
from .supers_events import (
SuperPeopleListEvent,
SuperPeopleCreateEvent,
SuperPeopleUpdateEvent,
SuperPeopleDeleteEvent,
)
PeopleRouterCluster = RouterCluster(name="PeopleRouterCluster")
PeopleListEventCluster = EventCluster(name="PeopleListEventCluster", endpoint_uu_id=endpoints_index["PeopleList"])
PeopleListEventCluster.add_event(SuperPeopleListEvent)
PeopleCreateEventCluster = EventCluster(name="PeopleCreateEventCluster", endpoint_uu_id=endpoints_index["PeopleCreate"])
PeopleCreateEventCluster.add_event(SuperPeopleCreateEvent)
PeopleUpdateEventCluster = EventCluster(name="PeopleUpdateEventCluster", endpoint_uu_id=endpoints_index["PeopleUpdate"])
PeopleUpdateEventCluster.add_event(SuperPeopleUpdateEvent)
PeopleDeleteEventCluster = EventCluster(name="PeopleDeleteEventCluster", endpoint_uu_id=endpoints_index["PeopleDelete"])
PeopleDeleteEventCluster.add_event(SuperPeopleDeleteEvent)
PeopleRouterCluster.set_event_cluster(PeopleListEventCluster)
PeopleRouterCluster.set_event_cluster(PeopleCreateEventCluster)
PeopleRouterCluster.set_event_cluster(PeopleUpdateEventCluster)
PeopleRouterCluster.set_event_cluster(PeopleDeleteEventCluster)

View File

@@ -0,0 +1,114 @@
from typing import Any
from api_initializer.event_clusters import Event
from api_validations.response import (
PaginateOnly,
Pagination,
PaginationResult,
PostgresResponseSingle,
PostgresResponse,
EndpointResponse
)
from api_validations.token.validations import TokenDictType
from schemas import (
Build,
BuildLivingSpace,
BuildParts,
People,
)
# List all endpoint Super User
SuperPeopleListEvent = Event(
name="super_people_list",
key="0f8a8b7f-0615-4507-916b-030d48cb5c1d",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="Super People List all flat representative users endpoint",
)
# Create endpoint Super User
SuperPeopleCreateEvent = Event(
name="super_people_create",
key="e18657b7-7a5a-43b8-b43a-422cbc783326",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="Super People Create flat representative users endpoint",
)
# Update endpoint Super User
SuperPeopleUpdateEvent = Event(
name="super_people_update",
key="02a774aa-1f7d-472b-98f1-7b4a58d43e31",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="Super People Update flat representative users endpoint",
)
# Delete endpoint Super User
SuperPeopleDeleteEvent = Event(
name="super_people_delete",
key="b56fd146-b11a-466a-84c9-4c72fb0b9ffa",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="Super People Delete flat representative users endpoint",
)
def super_people_list_callable(list_options: PaginateOnly, token: TokenDictType):
list_options = PaginateOnly(**list_options.model_dump())
if token.is_employee:
raise Exception("Forbidden for employees")
# TODO: Pydantic Model must be implemnted for list_options.query
with People.new_session() as db_session:
People.set_session(db_session)
db_session.query(
).join(BuildParts, BuildParts.id == BuildLivingSpace.build_parts_id
).join(People, People.person_id == BuildLivingSpace.people_id
).filter()
if list_options.query:
people_list = People.query.filter(*People.convert(list_options.query))
else:
people_list = People.query.filter()
pagination = Pagination(data=people_list)
pagination.change(**list_options.model_dump())
pagination_result = PaginationResult(data=people_list, pagination=pagination)
return EndpointResponse(message="MSG0003-LIST", pagination_result=pagination_result).response
SuperPeopleListEvent.event_callable = super_people_list_callable
def super_people_create_callable(data: People, token: TokenDictType):
return {
"message": "MSG0001-INSERT",
"data": None,
"completed": True,
}
SuperPeopleCreateEvent.event_callable = super_people_create_callable
def super_people_update_callable(data: People, token: TokenDictType):
return {
"message": "MSG0002-UPDATE",
"data": None,
"completed": True,
}
SuperPeopleUpdateEvent.event_callable = super_people_update_callable
def super_people_delete_callable(data: People, token: TokenDictType):
return {
"message": "MSG0004-DELETE",
"data": None,
"completed": True,
}
SuperPeopleDeleteEvent.event_callable = super_people_delete_callable

View File

@@ -0,0 +1,27 @@
from api_initializer.event_clusters import EventCluster, RouterCluster
from index import endpoints_index
from .supers_events import (
SuperUserListEvent,
SuperUserCreateEvent,
SuperUserUpdateEvent,
SuperUserDeleteEvent,
)
UserRouterCluster = RouterCluster(name="UserRouterCluster")
UserListAllEventCluster = EventCluster(name="UserListAllEventCluster", endpoint_uu_id=endpoints_index["UserList"])
UserListAllEventCluster.add_event(SuperUserListEvent)
UserCreateEventCluster = EventCluster(name="UserCreateEventCluster", endpoint_uu_id=endpoints_index["UserCreate"])
UserCreateEventCluster.add_event(SuperUserCreateEvent)
UserUpdateEventCluster = EventCluster(name="UserUpdateEventCluster", endpoint_uu_id=endpoints_index["UserUpdate"])
UserUpdateEventCluster.add_event(SuperUserUpdateEvent)
UserDeleteEventCluster = EventCluster(name="UserDeleteEventCluster", endpoint_uu_id=endpoints_index["UserDelete"])
UserDeleteEventCluster.add_event(SuperUserDeleteEvent)
UserRouterCluster.set_event_cluster(UserListAllEventCluster)
UserRouterCluster.set_event_cluster(UserCreateEventCluster)
UserRouterCluster.set_event_cluster(UserUpdateEventCluster)
UserRouterCluster.set_event_cluster(UserDeleteEventCluster)

View File

@@ -0,0 +1,100 @@
from typing import Any
from api_initializer.event_clusters import Event
from api_validations.response import (
PaginateOnly,
Pagination,
PaginationResult,
PostgresResponseSingle,
PostgresResponse,
EndpointResponse
)
from api_validations.token.validations import TokenDictType
from schemas import (
Build,
BuildLivingSpace,
BuildParts,
Users,
UsersTokens,
People,
)
# List all endpoint Super User
SuperUserListEvent = Event(
name="super_user_list",
key="202eec81-b382-4623-911b-709f1b841f3f",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="Super Users List all flat representative users endpoint",
)
# Create endpoint Super User
SuperUserCreateEvent = Event(
name="super_user_create",
key="2f0a3691-114d-48b7-b166-9572fc889695",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="Super Users Create flat representative users endpoint",
)
# Update endpoint Super User
SuperUserUpdateEvent = Event(
name="super_user_update",
key="8a8c8dd6-43ad-40df-86bd-345488273f52",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="Super Users Update flat representative users endpoint",
)
# Delete endpoint Super User
SuperUserDeleteEvent = Event(
name="super_user_delete",
key="e8c77554-4b0e-491f-aab5-67a5ef670999",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="Super Users Delete flat representative users endpoint",
)
def super_user_list_callable(list_options: PaginateOnly, token: TokenDictType):
return {
"message": "MSG0003-LIST",
"data": None,
"completed": True,
}
SuperUserListEvent.event_callable = super_user_list_callable
def super_user_create_callable(data: dict, token: TokenDictType):
return {
"message": "MSG0001-INSERT",
"data": None,
"completed": True,
}
SuperUserCreateEvent.event_callable = super_user_create_callable
def super_user_update_callable(data: dict, token: TokenDictType):
return {
"message": "MSG0002-UPDATE",
"data": None,
"completed": True,
}
SuperUserUpdateEvent.event_callable = super_user_update_callable
def super_user_delete_callable(data: dict, token: TokenDictType):
return {
"message": "MSG0004-DELETE",
"data": None,
"completed": True,
}
SuperUserDeleteEvent.event_callable = super_user_delete_callable