updated Identity and managment service

This commit is contained in:
2025-05-01 15:25:15 +03:00
parent e815251123
commit 1920c2a25d
69 changed files with 1672 additions and 462 deletions

View File

@@ -1,11 +1,13 @@
import uuid
from fastapi import APIRouter, Request, Response, Header
from fastapi import APIRouter, Header
from typing import Any
from ApiDefaults.config import api_config
from Events.user.cluster import UserRouterCluster
from ApiControllers.providers.token_provider import TokenProvider
from ApiControllers.abstracts.default_validations import CommonHeaders
from Controllers.Postgres.pagination import PaginateOnly
user_route = APIRouter(prefix="/user", tags=["User"])
@@ -13,103 +15,58 @@ user_route = APIRouter(prefix="/user", tags=["User"])
@user_route.post(
path="/list",
description="List users endpoint",
operation_id="5bc09312-d3f2-4f47-baba-17c928706da8",
operation_id="1aca3094-fe80-4e0f-a460-1a506419082a",
)
def user_list_route(
request: Request,
response: Response,
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
data: PaginateOnly,
headers: CommonHeaders,
):
"""
List users endpoint
"""
endpoint_code = "5bc09312-d3f2-4f47-baba-17c928706da8"
token = request.headers.get(api_config.ACCESS_TOKEN_TAG, None)
token_object = TokenProvider.get_dict_from_redis(token=token)
event_key = TokenProvider.retrieve_event_codes(
endpoint_code=endpoint_code, token=token_object
)
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
"tz": tz or "GMT+3",
"token": token,
}
event_cluster_matched = UserRouterCluster.get_event_cluster("UserList").match_event(
event_key=event_key
)
response.headers["X-Header"] = "Test Header GET"
if runner_callable := event_cluster_matched.event_callable():
return runner_callable
raise ValueError("Event key not found or multiple matches found")
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object)
event_key = TokenProvider.retrieve_event_codes(**event_founder_dict)
FoundCluster = UserRouterCluster.get_event_cluster("UserList")
event_cluster_matched = FoundCluster.match_event(event_key=event_key)
return event_cluster_matched.event_callable(data=data)
@user_route.post(
path="/create",
description="Create users endpoint",
operation_id="08d4b572-1584-47bb-aa42-8d068e5514e7",
operation_id="9686211f-4260-485d-8076-186c22c053aa",
)
def user_create_route(
request: Request,
response: Response,
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
data: Any,
headers: CommonHeaders,
):
"""
Create users endpoint
"""
endpoint_code = "08d4b572-1584-47bb-aa42-8d068e5514e7"
token = request.headers.get(api_config.ACCESS_TOKEN_TAG, None)
token_object = TokenProvider.get_dict_from_redis(token=token)
event_key = TokenProvider.retrieve_event_codes(
endpoint_code=endpoint_code, token=token_object
)
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
"tz": tz or "GMT+3",
"token": token,
}
event_cluster_matched = UserRouterCluster.get_event_cluster(
"UserCreate"
).match_event(event_key=event_key)
response.headers["X-Header"] = "Test Header POST"
if runner_callable := event_cluster_matched.event_callable():
return runner_callable
raise ValueError("Event key not found or multiple matches found")
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object)
event_key = TokenProvider.retrieve_event_codes(**event_founder_dict)
FoundCluster = UserRouterCluster.get_event_cluster("UserCreate")
event_cluster_matched = FoundCluster.match_event(event_key=event_key)
return event_cluster_matched.event_callable(data=data)
@user_route.post(
path="/update",
description="Update users endpoint",
operation_id="b641236a-928d-4f19-a1d2-5edf611d1e56",
operation_id="268e887b-5ff5-4f99-b1be-7e127e28a198",
)
def user_update_route(request: Request, response: Response):
def user_update_route(
data: Any,
headers: CommonHeaders,
):
"""
Update users endpoint
"""
endpoint_code = "b641236a-928d-4f19-a1d2-5edf611d1e56"
token = request.headers.get(api_config.ACCESS_TOKEN_TAG, None)
token_object = TokenProvider.get_dict_from_redis(token=token)
event_key = TokenProvider.retrieve_event_codes(
endpoint_code=endpoint_code, token=token_object
)
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
"tz": tz or "GMT+3",
"token": token,
}
event_cluster_matched = UserRouterCluster.get_event_cluster(
"UserUpdate"
).match_event(event_key=event_key)
response.headers["X-Header"] = "Test Header POST"
if runner_callable := event_cluster_matched.event_callable():
return runner_callable
raise ValueError("Event key not found or multiple matches found")
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object)
event_key = TokenProvider.retrieve_event_codes(**event_founder_dict)
FoundCluster = UserRouterCluster.get_event_cluster("UserUpdate")
event_cluster_matched = FoundCluster.match_event(event_key=event_key)
return event_cluster_matched.event_callable(data=data)

View File

@@ -8,17 +8,17 @@ from .supers_events import (
UserRouterCluster = RouterCluster(name="UserRouterCluster")
UserEventClusterList = EventCluster(
name="UserList", endpoint_uu_id="5bc09312-d3f2-4f47-baba-17c928706da8"
name="UserList", endpoint_uu_id="1aca3094-fe80-4e0f-a460-1a506419082a"
)
UserEventClusterList.add_event(SuperUsersListEvent)
UserEventClusterCreate = EventCluster(
name="UserCreate", endpoint_uu_id="08d4b572-1584-47bb-aa42-8d068e5514e7"
name="UserCreate", endpoint_uu_id="9686211f-4260-485d-8076-186c22c053aa"
)
UserEventClusterCreate.add_event(SuperUsersCreateEvent)
UserEventClusterUpdate = EventCluster(
name="UserUpdate", endpoint_uu_id="b641236a-928d-4f19-a1d2-5edf611d1e56"
name="UserUpdate", endpoint_uu_id="268e887b-5ff5-4f99-b1be-7e127e28a198"
)
UserEventClusterUpdate.add_event(SuperUsersUpdateEvent)

View File

@@ -1,4 +1,5 @@
from fastapi import APIRouter, Depends
from typing import Any
from ApiControllers.abstracts.default_validations import CommonHeaders
from ApiControllers.providers.token_provider import TokenProvider
@@ -17,7 +18,7 @@ application_route = APIRouter(prefix="/application", tags=["Application Manageme
@application_route.post(
path="/list",
description="List applications endpoint",
operation_id="application-list",
operation_id="3189a049-bdb0-49f3-83ff-feb8cb4cb57a",
)
def application_list_route(
list_options: PaginateOnly,
@@ -50,7 +51,7 @@ def application_list_route(
@application_route.post(
path="/create",
description="Create application endpoint",
operation_id="application-create",
operation_id="5570be78-030a-438e-8674-7e751447608b",
)
def application_create_route(
data: RequestApplication,
@@ -85,7 +86,7 @@ def application_create_route(
@application_route.post(
path="/update/{application_uuid}",
description="Update application endpoint",
operation_id="application-update",
operation_id="87cd4515-73dd-4d11-a01f-562e221d973c",
)
def application_update_route(
data: RequestApplication,
@@ -119,36 +120,42 @@ def application_update_route(
).response
@application_route.delete(
path="/{application_uuid}",
description="Delete application endpoint",
operation_id="application-delete",
@application_route.post(
path="/bind/employee",
description="Bind application to employee endpoint",
operation_id="2bab94fa-becb-4d8e-80f1-f4631119a521",
)
def application_delete_route(
application_uuid: str,
headers: CommonHeaders = Depends(CommonHeaders.as_dependency),
def application_bind_employee_route(
data: Any,
headers: CommonHeaders,
):
"""
Delete application by ID
Bind application to employee endpoint
"""
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
with Applications.new_session() as db_session:
found_application = Applications.filter_one(
Applications.uu_id == application_uuid, db=db_session
).data
if not found_application:
return EndpointResponse(
message="MSG0002-FOUND",
data=found_application,
).response
found_application.destroy(db_session)
Applications.save(db_session)
if found_application.meta_data.deleted:
return EndpointResponse(
message="MSG0004-DELETE",
data=found_application,
).response
return EndpointResponse(
message="MSG0004-DELETE",
data=found_application,
).response
event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object)
event_key = TokenProvider.retrieve_event_codes(**event_founder_dict)
FoundCluster = ApplicationRouterCluster.get_event_cluster("ApplicationBindEmployee")
event_cluster_matched = FoundCluster.match_event(event_key=event_key)
return event_cluster_matched.event_callable(data=data)
@application_route.post(
path="/bind/occupant",
description="Bind application to occupant endpoint",
operation_id="fccf1a59-0650-4e5c-ba8d-f389dadce01c",
)
def application_bind_occupant_route(
data: Any,
headers: CommonHeaders,
):
"""
Bind application to occupant endpoint
"""
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object)
event_key = TokenProvider.retrieve_event_codes(**event_founder_dict)
FoundCluster = ApplicationRouterCluster.get_event_cluster("ApplicationBindOccupant")
event_cluster_matched = FoundCluster.match_event(event_key=event_key)
return event_cluster_matched.event_callable(data=data)

View File

@@ -1,8 +1,8 @@
from fastapi import APIRouter, Depends
from typing import Any
from ApiControllers.abstracts.default_validations import CommonHeaders
from ApiControllers.providers.token_provider import TokenProvider
from Controllers.Postgres.pagination import PaginateOnly, Pagination, PaginationResult
from Controllers.Postgres.response import EndpointResponse
from Validations.service_endpoints.validations import Event2Employee, Event2Occupant
@@ -26,46 +26,66 @@ def event_list_route(
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object)
event_key = TokenProvider.retrieve_event_codes(**event_founder_dict)
FoundCluster = EventEndpointRouterCluster.get_event_cluster("EventList")
FoundCluster = EventsEndpointRouterCluster.get_event_cluster("EventsList")
event_cluster_matched = FoundCluster.match_event(event_key=event_key)
return event_cluster_matched.event_callable(data=data)
@event_endpoint_route.post(
path="/bind/employee",
description="Bind event to employee endpoint",
operation_id="",
path="/register/service",
description="Register event to service endpoint",
operation_id="c89a2150-db4d-4a8f-b6ec-9e0f09625f76",
)
def event_bind_employee_route(
data: Event2Employee,
def event_register_service_route(
data: Any,
headers: CommonHeaders = Depends(CommonHeaders.as_dependency),
):
"""
Bind event to employee
Register event to service
"""
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object)
event_key = TokenProvider.retrieve_event_codes(**event_founder_dict)
FoundCluster = EventEndpointRouterCluster.get_event_cluster("EventBindEmployee")
FoundCluster = EventEndpointRouterCluster.get_event_cluster("EventRegisterService")
event_cluster_matched = FoundCluster.match_event(event_key=event_key)
return event_cluster_matched.event_callable(data=data)
@event_endpoint_route.post(
path="/bind/occupant",
description="Bind event to occupant endpoint",
operation_id="",
path="/bind/extra/employee",
description="Bind event to employee extra endpoint",
operation_id="58ef3640-04ec-43f9-8f3e-f86be3ce4a24",
)
def event_bind_occupant_route(
data: Event2Occupant,
def event_bind_employee_extra_route(
data: Any,
headers: CommonHeaders = Depends(CommonHeaders.as_dependency),
):
"""
Bind event to occupant
Bind event to employee extra
"""
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object)
event_key = TokenProvider.retrieve_event_codes(**event_founder_dict)
FoundCluster = EventEndpointRouterCluster.get_event_cluster("EventBindOccupant")
FoundCluster = EventEndpointRouterCluster.get_event_cluster("EventBindEmployeeExtra")
event_cluster_matched = FoundCluster.match_event(event_key=event_key)
return event_cluster_matched.event_callable(data=data)
@event_endpoint_route.post(
path="/bind/extra/occupant",
description="Bind event to occupant extra endpoint",
operation_id="7794a550-3073-43e3-b0c5-80128f8d3e4b",
)
def event_bind_occupant_extra_route(
data: Any,
headers: CommonHeaders = Depends(CommonHeaders.as_dependency),
):
"""
Bind event to occupant extra
"""
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object)
event_key = TokenProvider.retrieve_event_codes(**event_founder_dict)
FoundCluster = EventEndpointRouterCluster.get_event_cluster("EventBindOccupantExtra")
event_cluster_matched = FoundCluster.match_event(event_key=event_key)
return event_cluster_matched.event_callable(data=data)

View File

@@ -3,8 +3,11 @@ from fastapi import APIRouter
def get_routes() -> list[APIRouter]:
from .application.route import application_route
from .service_endpoints.route import service_endpoint_route
from .service_managements.route import service_management_route
from .event_endpoints.route import event_endpoint_route
return [application_route]
return [application_route, service_endpoint_route, service_management_route, event_endpoint_route]
def get_safe_endpoint_urls() -> list[tuple[str, str]]:

View File

@@ -1 +1,5 @@
__all__ = []
from .service_endpoints.cluster import ServiceEndpointRouterCluster
from .event_endpoints.cluster import EventsEndpointRouterCluster
from .application.cluster import ApplicationRouterCluster
__all__ = ["ServiceEndpointRouterCluster", "EventsEndpointRouterCluster", "ApplicationRouterCluster"]

View File

@@ -1,27 +1,41 @@
from ApiControllers.abstracts.event_clusters import EventCluster, RouterCluster
from .supers_events import (
SuperUsersListEvent,
SuperUsersCreateEvent,
SuperUsersUpdateEvent,
ApplicationListEvent,
ApplicationCreateEvent,
ApplicationUpdateEvent,
ApplicationBindEmployeeEvent,
ApplicationBindOccupantEvent,
)
UserRouterCluster = RouterCluster(name="UserRouterCluster")
ApplicationRouterCluster = RouterCluster(name="ApplicationRouterCluster")
UserEventClusterList = EventCluster(
name="UserList", endpoint_uu_id="5bc09312-d3f2-4f47-baba-17c928706da8"
ApplicationEventClusterList = EventCluster(
name="ApplicationList", endpoint_uu_id="3189a049-bdb0-49f3-83ff-feb8cb4cb57a"
)
UserEventClusterList.add_event(SuperUsersListEvent)
ApplicationEventClusterList.add_event(ApplicationListEvent)
UserEventClusterCreate = EventCluster(
name="UserCreate", endpoint_uu_id="08d4b572-1584-47bb-aa42-8d068e5514e7"
ApplicationEventClusterCreate = EventCluster(
name="ApplicationCreate", endpoint_uu_id="5570be78-030a-438e-8674-7e751447608b"
)
UserEventClusterCreate.add_event(SuperUsersCreateEvent)
ApplicationEventClusterCreate.add_event(ApplicationCreateEvent)
UserEventClusterUpdate = EventCluster(
name="UserUpdate", endpoint_uu_id="b641236a-928d-4f19-a1d2-5edf611d1e56"
ApplicationEventClusterUpdate = EventCluster(
name="ApplicationUpdate", endpoint_uu_id="87cd4515-73dd-4d11-a01f-562e221d973c"
)
UserEventClusterUpdate.add_event(SuperUsersUpdateEvent)
ApplicationEventClusterUpdate.add_event(ApplicationUpdateEvent)
UserRouterCluster.set_event_cluster(UserEventClusterList)
UserRouterCluster.set_event_cluster(UserEventClusterCreate)
UserRouterCluster.set_event_cluster(UserEventClusterUpdate)
ApplicationEventClusterBindEmployee = EventCluster(
name="ApplicationBindEmployee", endpoint_uu_id="2bab94fa-becb-4d8e-80f1-f4631119a521"
)
ApplicationEventClusterBindEmployee.add_event(ApplicationBindEmployeeEvent)
ApplicationEventClusterBindOccupant = EventCluster(
name="ApplicationBindOccupant", endpoint_uu_id="fccf1a59-0650-4e5c-ba8d-f389dadce01c"
)
ApplicationEventClusterBindOccupant.add_event(ApplicationBindOccupantEvent)
ApplicationRouterCluster.set_event_cluster(ApplicationEventClusterList)
ApplicationRouterCluster.set_event_cluster(ApplicationEventClusterCreate)
ApplicationRouterCluster.set_event_cluster(ApplicationEventClusterUpdate)
ApplicationRouterCluster.set_event_cluster(ApplicationEventClusterBindEmployee)
ApplicationRouterCluster.set_event_cluster(ApplicationEventClusterBindOccupant)

View File

@@ -1,53 +1,75 @@
from ApiControllers.abstracts.event_clusters import Event
from Controllers.Postgres.pagination import Pagination, PaginationResult, PaginateOnly
from Controllers.Postgres.response import EndpointResponse
from Schemas import Users
from Schemas import (
Applications,
Application2Employee,
Application2Occupant,
)
# List endpoint
SuperUsersListEvent = Event(
name="supers_users_list",
key="341b394f-9f11-4abb-99e7-4b27fa6bf012",
ApplicationListEvent = Event(
name="application_list",
key="b4efda1e-bde7-4659-ab1a-ef74c0fd88b6",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="List events of users endpoint",
)
# Create endpoint
SuperUsersCreateEvent = Event(
name="supers_users_create",
key="4e7e189e-e015-4ff8-902d-60138cbc77a6",
ApplicationCreateEvent = Event(
name="application_create",
key="f53ca9aa-5536-4d77-9129-78d67e61db4a",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="Create events of users endpoint",
)
# Update endpoint
SuperUsersUpdateEvent = Event(
name="supers_users_update",
key="efa4aa4a-d414-4391-91ee-97eb617b7755",
ApplicationUpdateEvent = Event(
name="application_update",
key="0e9a855e-4e69-44b5-8ac2-825daa32840c",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="Update events of users endpoint",
)
#Bind Application to employee
ApplicationBindEmployeeEvent = Event(
name="application_bind_employee",
key="26a96c2d-bca8-41cb-8ac1-f3ca8124434b",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="Bind events of users endpoint",
)
def supers_users_list_callable(list_options: PaginateOnly):
#Bind Application to occupant
ApplicationBindOccupantEvent = Event(
name="application_bind_occupant",
key="4eaf2bb0-2a42-4d21-ae65-a9259ebee189",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="Bind events of users endpoint",
)
def application_list_callable(list_options: PaginateOnly):
"""
Example callable method
"""
list_options = PaginateOnly(**list_options.model_dump())
with Users.new_session() as db_session:
with Applications.new_session() as db_session:
if list_options.query:
users_list = Users.filter_all(
*Users.convert(list_options.query), db=db_session
applications_list = Applications.filter_all(
*Applications.convert(list_options.query), db=db_session
)
else:
users_list = Users.filter_all(db=db_session)
pagination = Pagination(data=users_list)
applications_list = Applications.filter_all(db=db_session)
pagination = Pagination(data=applications_list)
pagination.change(**list_options.model_dump())
pagination_result = PaginationResult(
data=users_list,
data=applications_list,
pagination=pagination,
# response_model="",
).pagination.as_dict
@@ -57,10 +79,10 @@ def supers_users_list_callable(list_options: PaginateOnly):
).response
SuperUsersListEvent.event_callable = supers_users_list_callable
ApplicationListEvent.event_callable = application_list_callable
def supers_users_create_callable():
def application_create_callable():
"""
Example callable method
"""
@@ -74,10 +96,10 @@ def supers_users_create_callable():
}
SuperUsersCreateEvent.event_callable = supers_users_create_callable
ApplicationCreateEvent.event_callable = application_create_callable
def supers_users_update_callable():
def application_update_callable():
"""
Example callable method
"""
@@ -91,4 +113,36 @@ def supers_users_update_callable():
}
SuperUsersUpdateEvent.event_callable = supers_users_update_callable
ApplicationUpdateEvent.event_callable = application_update_callable
def application_bind_employee_callable():
"""
Example callable method
"""
return {
"completed": True,
"message": "Example callable method 2",
"info": {
"host": "example_host",
"user_agent": "example_user_agent",
},
}
ApplicationBindEmployeeEvent.event_callable = application_bind_employee_callable
def application_bind_occupant_callable():
"""
Example callable method
"""
return {
"completed": True,
"message": "Example callable method 2",
"info": {
"host": "example_host",
"user_agent": "example_user_agent",
},
}
ApplicationBindOccupantEvent.event_callable = application_bind_occupant_callable

View File

@@ -0,0 +1,35 @@
from ApiControllers.abstracts.event_clusters import EventCluster, RouterCluster
from .supers_events import (
EventsListEvent,
EventRegisterServiceEvent,
EventBindEmployeeExtraEvent,
EventBindOccupantExtraEvent,
)
EventsEndpointRouterCluster = RouterCluster(name="EventsEndpointRouterCluster")
EventsEndpointEventClusterList = EventCluster(
name="EventsList", endpoint_uu_id="0659d5e4-671f-466c-a84f-47a1290a6f0d"
)
EventsEndpointEventClusterList.add_event(EventsListEvent)
EventsEndpointEventClusterRegisterService = EventCluster(
name="EventRegisterService", endpoint_uu_id="c89a2150-db4d-4a8f-b6ec-9e0f09625f76"
)
EventsEndpointEventClusterRegisterService.add_event(EventRegisterServiceEvent)
EventsEndpointEventClusterBindEmployeeExtra = EventCluster(
name="EventBindEmployeeExtra", endpoint_uu_id="58ef3640-04ec-43f9-8f3e-f86be3ce4a24"
)
EventsEndpointEventClusterBindEmployeeExtra.add_event(EventBindEmployeeExtraEvent)
EventsEndpointEventClusterBindOccupantExtra = EventCluster(
name="EventBindOccupantExtra", endpoint_uu_id="7794a550-3073-43e3-b0c5-80128f8d3e4b"
)
EventsEndpointEventClusterBindOccupantExtra.add_event(EventBindOccupantExtraEvent)
EventsEndpointRouterCluster.set_event_cluster(EventsEndpointEventClusterList)
EventsEndpointRouterCluster.set_event_cluster(EventsEndpointEventClusterRegisterService)
EventsEndpointRouterCluster.set_event_cluster(EventsEndpointEventClusterBindEmployeeExtra)
EventsEndpointRouterCluster.set_event_cluster(EventsEndpointEventClusterBindOccupantExtra)

View File

@@ -0,0 +1,107 @@
from ApiControllers.abstracts.event_clusters import Event
from Controllers.Postgres.pagination import Pagination, PaginationResult, PaginateOnly
from Controllers.Postgres.response import EndpointResponse
from typing import Any
from Schemas import (
Events,
Event2Employee,
Event2Occupant,
Event2EmployeeExtra,
Event2OccupantExtra,
Service2Events,
)
# List endpoint
EventsListEvent = Event(
name="service_endpoint_list",
key="0a08c64b-ce20-4791-b1e9-014db6b75ea7",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="List services endpoint",
)
# Event Register endpoint
EventRegisterServiceEvent = Event(
name="service_endpoint_register_service",
key="e18e7f89-5708-4a15-9258-99b0903ed43d",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="Register service endpoint",
)
# Bind employee extra endpoint
EventBindEmployeeExtraEvent = Event(
name="service_endpoint_bind_employee_extra",
key="cd452928-4256-4fb4-b81e-0ca41d723616",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="Bind service to employee extra endpoint",
)
# Bind occupant extra endpoint
EventBindOccupantExtraEvent = Event(
name="service_endpoint_bind_occupant_extra",
key="cb11a150-8049-45c9-8cf3-d5290ffd2e4a",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="Bind service to occupant extra endpoint",
)
def events_list_callable(list_options: PaginateOnly):
"""
Example callable method
"""
list_options = PaginateOnly(**list_options.model_dump())
with Services.new_session() as db_session:
if list_options.query:
services_list = Services.filter_all(
*Services.convert(list_options.query), db=db_session
)
else:
services_list = Services.filter_all(db=db_session)
pagination = Pagination(data=services_list)
pagination.change(**list_options.model_dump())
pagination_result = PaginationResult(
data=services_list,
pagination=pagination,
# response_model="",
).pagination.as_dict
return EndpointResponse(
message="MSG0003-LIST",
pagination_result=pagination_result,
).response
EventsListEvent.event_callable = events_list_callable
def event_register_service_callable(data: Any):
"""
Example callable method
"""
return EndpointResponse(
message="MSG0003-REGISTER",
).response
EventRegisterServiceEvent.event_callable = event_register_service_callable
def event_bind_employee_extra_callable(data: Any):
"""
Example callable method
"""
return EndpointResponse(
message="MSG0003-BIND",
).response
EventBindEmployeeExtraEvent.event_callable = event_bind_employee_extra_callable
def event_bind_occupant_extra_callable(data: Any):
"""
Example callable method
"""
return EndpointResponse(
message="MSG0003-BIND",
).response
EventBindOccupantExtraEvent.event_callable = event_bind_occupant_extra_callable

View File

@@ -5,7 +5,7 @@ from .supers_events import (
ServiceEndpointRouterCluster = RouterCluster(name="ServiceEndpointRouterCluster")
ServiceEndpointEventClusterList = EventCluster(
name="ServiceList", endpoint_uu_id="82ef3444-a26a-499d-8c77-ca2e95d6ceb9"
name="ServiceList", endpoint_uu_id="f4e4d332-70b1-4121-9fcc-a08850b72aaa"
)
ServiceEndpointEventClusterList.add_event(ServiceEndpointListEvent)
ServiceEndpointRouterCluster.set_event_cluster(ServiceEndpointEventClusterList)

View File

@@ -3,24 +3,27 @@ FROM python:3.12-slim
WORKDIR /
# Install system dependencies and Poetry
RUN apt-get update && apt-get install -y --no-install-recommends gcc \
&& rm -rf /var/lib/apt/lists/* && pip install --no-cache-dir poetry
RUN apt-get update && apt-get install -y --no-install-recommends gcc && rm -rf /var/lib/apt/lists/* && pip install --no-cache-dir poetry
# Copy Poetry configuration
COPY /pyproject.toml ./pyproject.toml
# Configure Poetry and install dependencies with optimizations
RUN poetry config virtualenvs.create false \
&& poetry install --no-interaction --no-ansi --no-root --only main \
&& pip cache purge && rm -rf ~/.cache/pypoetry
RUN poetry config virtualenvs.create false && poetry install --no-interaction --no-ansi --no-root --only main \
&& pip cache purge && rm -rf ~/.cache/pypoetry
# Copy application code
COPY /ApiServices/TemplateService /ApiServices/TemplateService
COPY /ApiControllers /ApiControllers
COPY /ApiDefaults /ApiDefaults
COPY /Controllers /Controllers
COPY /Schemas /Schemas
COPY /ApiServices/IdentityService/Endpoints /ApiDefaults/Endpoints
COPY /ApiServices/IdentityService/Events /ApiDefaults/Events
COPY /ApiServices/IdentityService/Validations /ApiDefaults/Validations
# Set Python path to include app directory
ENV PYTHONPATH=/ PYTHONUNBUFFERED=1 PYTHONDONTWRITEBYTECODE=1
# Run the application using the configured uvicorn server
CMD ["poetry", "run", "python", "ApiServices/TemplateService/app.py"]
CMD ["poetry", "run", "python", "ApiDefaults/app.py"]

View File

@@ -0,0 +1,67 @@
from fastapi import APIRouter, Depends
from ApiControllers.abstracts.default_validations import CommonHeaders
from ApiControllers.providers.token_provider import TokenProvider
from Controllers.Postgres.pagination import PaginateOnly
from Events.people.cluster import PeopleRouterCluster
people_route = APIRouter(prefix="/people", tags=["People"])
@people_route.post(
path="/list",
description="List people endpoint",
operation_id="f102db46-031a-43e4-966a-dae6896f985b",
)
def people_route_list(
data: PaginateOnly,
headers: CommonHeaders = Depends(CommonHeaders.as_dependency),
):
"""
List people endpoint
"""
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object)
event_key = TokenProvider.retrieve_event_codes(**event_founder_dict)
FoundCluster = PeopleRouterCluster.get_event_cluster("PeopleList")
event_cluster_matched = FoundCluster.match_event(event_key=event_key)
return event_cluster_matched.event_callable(data=data)
@people_route.post(
path="/create",
description="Create people endpoint",
operation_id="eb465fde-337f-4b81-94cf-28c6d4f2b1b6",
)
def people_route_create(
headers: CommonHeaders = Depends(CommonHeaders.as_dependency),
):
"""
Create people endpoint
"""
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object)
event_key = TokenProvider.retrieve_event_codes(**event_founder_dict)
FoundCluster = PeopleRouterCluster.get_event_cluster("PeopleCreate")
event_cluster_matched = FoundCluster.match_event(event_key=event_key)
return event_cluster_matched.event_callable()
@people_route.post(
path="/update",
description="Update people endpoint",
operation_id="c9e5ba69-6915-43f5-8f9c-a5c2aa865b89",
)
def people_route_update(
headers: CommonHeaders = Depends(CommonHeaders.as_dependency),
):
"""
Update people endpoint
"""
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object)
event_key = TokenProvider.retrieve_event_codes(**event_founder_dict)
FoundCluster = PeopleRouterCluster.get_event_cluster("PeopleUpdate")
event_cluster_matched = FoundCluster.match_event(event_key=event_key)
return event_cluster_matched.event_callable()

View File

@@ -1,9 +1,11 @@
from fastapi import APIRouter
from .test_template.route import test_template_route
def get_routes() -> list[APIRouter]:
return [test_template_route]
from .people.route import people_route
from .user.route import user_route
return [user_route, people_route]
def get_safe_endpoint_urls() -> list[tuple[str, str]]:
@@ -12,9 +14,5 @@ def get_safe_endpoint_urls() -> list[tuple[str, str]]:
("/docs", "GET"),
("/redoc", "GET"),
("/openapi.json", "GET"),
("/auth/register", "POST"),
("/auth/login", "POST"),
("/metrics", "GET"),
("/test/template", "GET"),
("/test/template", "POST"),
]

View File

@@ -0,0 +1,72 @@
import uuid
from fastapi import APIRouter, Header
from typing import Any
from ApiDefaults.config import api_config
from Events.user.cluster import UserRouterCluster
from ApiControllers.providers.token_provider import TokenProvider
from ApiControllers.abstracts.default_validations import CommonHeaders
from Controllers.Postgres.pagination import PaginateOnly
user_route = APIRouter(prefix="/user", tags=["User"])
@user_route.post(
path="/list",
description="List users endpoint",
operation_id="1aca3094-fe80-4e0f-a460-1a506419082a",
)
def user_list_route(
data: PaginateOnly,
headers: CommonHeaders,
):
"""
List users endpoint
"""
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object)
event_key = TokenProvider.retrieve_event_codes(**event_founder_dict)
FoundCluster = UserRouterCluster.get_event_cluster("UserList")
event_cluster_matched = FoundCluster.match_event(event_key=event_key)
return event_cluster_matched.event_callable(data=data)
@user_route.post(
path="/create",
description="Create users endpoint",
operation_id="9686211f-4260-485d-8076-186c22c053aa",
)
def user_create_route(
data: Any,
headers: CommonHeaders,
):
"""
Create users endpoint
"""
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object)
event_key = TokenProvider.retrieve_event_codes(**event_founder_dict)
FoundCluster = UserRouterCluster.get_event_cluster("UserCreate")
event_cluster_matched = FoundCluster.match_event(event_key=event_key)
return event_cluster_matched.event_callable(data=data)
@user_route.post(
path="/update",
description="Update users endpoint",
operation_id="268e887b-5ff5-4f99-b1be-7e127e28a198",
)
def user_update_route(
data: Any,
headers: CommonHeaders,
):
"""
Update users endpoint
"""
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object)
event_key = TokenProvider.retrieve_event_codes(**event_founder_dict)
FoundCluster = UserRouterCluster.get_event_cluster("UserUpdate")
event_cluster_matched = FoundCluster.match_event(event_key=event_key)
return event_cluster_matched.event_callable(data=data)

View File

@@ -0,0 +1,8 @@
from .people.cluster import PeopleRouterCluster
from .user.cluster import UserRouterCluster
__all__ = [
"PeopleRouterCluster",
"UserRouterCluster",
]

View File

@@ -0,0 +1,25 @@
from ApiControllers.abstracts.event_clusters import EventCluster, RouterCluster
from .supers_events import (
SupersPeopleCreateEvent,
SupersPeopleUpdateEvent,
SupersPeopleListEvent,
)
PeopleRouterCluster = RouterCluster(name="PeopleRouterCluster")
PeopleEventClusterList = EventCluster(
name="PeopleList", endpoint_uu_id="f102db46-031a-43e4-966a-dae6896f985b"
)
PeopleEventClusterList.add_event(SupersPeopleListEvent)
PeopleEventClusterCreate = EventCluster(
name="PeopleCreate", endpoint_uu_id="eb465fde-337f-4b81-94cf-28c6d4f2b1b6"
)
PeopleEventClusterCreate.add_event(SupersPeopleCreateEvent)
PeopleEventClusterUpdate = EventCluster(
name="PeopleUpdate", endpoint_uu_id="c9e5ba69-6915-43f5-8f9c-a5c2aa865b89"
)
PeopleEventClusterUpdate.add_event(SupersPeopleUpdateEvent)
PeopleRouterCluster.set_event_cluster(PeopleEventClusterList)
PeopleRouterCluster.set_event_cluster(PeopleEventClusterCreate)
PeopleRouterCluster.set_event_cluster(PeopleEventClusterUpdate)

View File

@@ -0,0 +1,103 @@
from ApiControllers.abstracts.event_clusters import Event
from Validations.people.validations import (
REQUESTAWMXNTKMGPPOJWRCTZUBADNFLQDBDYVQAORFAVCSXUUHEBQHCEPCSKFBADBODFDBPYKOVINV,
)
from Controllers.Postgres.pagination import (
ListOptions,
Pagination,
PaginationResult,
PaginateOnly,
)
from Controllers.Postgres.response import EndpointResponse
from Schemas import People
SupersPeopleCreateEvent = Event(
name="supers_people_create",
key="ec4c2404-a61b-46c7-bbdf-ce3357e6cf41",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="Create events of people endpoint",
)
# Update endpoint
SupersPeopleUpdateEvent = Event(
name="supers_people_update",
key="91e77de4-9f29-4309-b121-4aad256d440c",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="Update events of people endpoint",
)
# List endpoint
SupersPeopleListEvent = Event(
name="supers_people_list",
key="6828d280-e587-400d-a622-c318277386c3",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="List events of people endpoint",
)
def supers_people_create_callable():
"""
Example callable method
"""
return {
"completed": True,
"message": "Example callable method 2",
"info": {
"host": "example_host",
"user_agent": "example_user_agent",
},
}
SupersPeopleCreateEvent.event_callable = supers_people_create_callable
def supers_people_update_callable():
"""
Example callable method
"""
return {
"completed": True,
"message": "Example callable method 2",
"info": {
"host": "example_host",
"user_agent": "example_user_agent",
},
}
SupersPeopleUpdateEvent.event_callable = supers_people_update_callable
def supers_people_list_callable(data: PaginateOnly):
"""
Example callable method
"""
list_options = PaginateOnly(**data.model_dump())
with People.new_session() as db_session:
if list_options.query:
people_list = People.filter_all(
*People.convert(list_options.query), db=db_session
)
else:
people_list = People.filter_all(db=db_session)
pagination = Pagination(data=people_list)
pagination.change(**list_options.model_dump())
pagination_result = PaginationResult(
data=people_list,
pagination=pagination,
response_model=REQUESTAWMXNTKMGPPOJWRCTZUBADNFLQDBDYVQAORFAVCSXUUHEBQHCEPCSKFBADBODFDBPYKOVINV,
)
return EndpointResponse(
message="MSG0002-LIST",
pagination_result=pagination_result,
).response
SupersPeopleListEvent.event_callable = supers_people_list_callable

View File

@@ -0,0 +1,27 @@
from ApiControllers.abstracts.event_clusters import EventCluster, RouterCluster
from .supers_events import (
SuperUsersListEvent,
SuperUsersCreateEvent,
SuperUsersUpdateEvent,
)
UserRouterCluster = RouterCluster(name="UserRouterCluster")
UserEventClusterList = EventCluster(
name="UserList", endpoint_uu_id="1aca3094-fe80-4e0f-a460-1a506419082a"
)
UserEventClusterList.add_event(SuperUsersListEvent)
UserEventClusterCreate = EventCluster(
name="UserCreate", endpoint_uu_id="9686211f-4260-485d-8076-186c22c053aa"
)
UserEventClusterCreate.add_event(SuperUsersCreateEvent)
UserEventClusterUpdate = EventCluster(
name="UserUpdate", endpoint_uu_id="268e887b-5ff5-4f99-b1be-7e127e28a198"
)
UserEventClusterUpdate.add_event(SuperUsersUpdateEvent)
UserRouterCluster.set_event_cluster(UserEventClusterList)
UserRouterCluster.set_event_cluster(UserEventClusterCreate)
UserRouterCluster.set_event_cluster(UserEventClusterUpdate)

View File

@@ -0,0 +1,94 @@
from ApiControllers.abstracts.event_clusters import Event
from Controllers.Postgres.pagination import Pagination, PaginationResult, PaginateOnly
from Controllers.Postgres.response import EndpointResponse
from Schemas import Users
# List endpoint
SuperUsersListEvent = Event(
name="supers_users_list",
key="341b394f-9f11-4abb-99e7-4b27fa6bf012",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="List events of users endpoint",
)
# Create endpoint
SuperUsersCreateEvent = Event(
name="supers_users_create",
key="4e7e189e-e015-4ff8-902d-60138cbc77a6",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="Create events of users endpoint",
)
# Update endpoint
SuperUsersUpdateEvent = Event(
name="supers_users_update",
key="efa4aa4a-d414-4391-91ee-97eb617b7755",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="Update events of users endpoint",
)
def supers_users_list_callable(list_options: PaginateOnly):
"""
Example callable method
"""
list_options = PaginateOnly(**list_options.model_dump())
with Users.new_session() as db_session:
if list_options.query:
users_list = Users.filter_all(
*Users.convert(list_options.query), db=db_session
)
else:
users_list = Users.filter_all(db=db_session)
pagination = Pagination(data=users_list)
pagination.change(**list_options.model_dump())
pagination_result = PaginationResult(
data=users_list,
pagination=pagination,
# response_model="",
).pagination.as_dict
return EndpointResponse(
message="MSG0003-LIST",
pagination_result=pagination_result,
).response
SuperUsersListEvent.event_callable = supers_users_list_callable
def supers_users_create_callable():
"""
Example callable method
"""
return {
"completed": True,
"message": "Example callable method 2",
"info": {
"host": "example_host",
"user_agent": "example_user_agent",
},
}
SuperUsersCreateEvent.event_callable = supers_users_create_callable
def supers_users_update_callable():
"""
Example callable method
"""
return {
"completed": True,
"message": "Example callable method 2",
"info": {
"host": "example_host",
"user_agent": "example_user_agent",
},
}
SuperUsersUpdateEvent.event_callable = supers_users_update_callable

View File

@@ -11,8 +11,8 @@ class ListOptions(BaseModel):
size: Optional[int] = 10
order_field: Optional[str] = "id"
order_type: Optional[str] = "asc"
query: Optional[dict] = None
# include_joins: Optional[list] = None
query: Optional[dict] = None
class PaginateOnly(BaseModel):

View File

@@ -0,0 +1,24 @@
from pydantic import BaseModel
class REQUESTAWMXNTKMGPPOJWRCTZUBADNFLQDBDYVQAORFAVCSXUUHEBQHCEPCSKFBADBODFDBPYKOVINV(
BaseModel
):
uu_id: str
created_at: str
updated_at: str
person_tag: str
expiry_starts: str
expiry_ends: str
firstname: str
middle_name: str
surname: str
birth_date: str
birth_place: str
sex_code: str
country_code: str
tax_no: str
active: bool
deleted: bool
is_confirmed: bool
is_notification_send: bool

View File

@@ -0,0 +1,33 @@
from pydantic import BaseModel
class FF7C859A068EB4583A47AB5924EC8C19A033881823FBA42EAAD089BF7AC8059CE(BaseModel):
"""
a13143ade48954c2ba3f86869e027de5b28c8a9b619bf4ef28264a8e375371601
"""
pass
class F1B82565925FF4F5DAD2A36370788305A0A362E031EAC4A9E8BDFF4F35E265A6C(BaseModel):
"""
aa487ab3bfd9e4e6abc2db714ac6197f60bbc9068ac6541e7a815d5b1e969796b
"""
pass
class F3117E7D66FE6471C8452B97AB504EF0C29822B6395CA4D65A18FDD563F0EC8D7(BaseModel):
"""
a1bf55a214b684438a97a47e4f097ac7ae27b0dff03c4475cbd4301e24a032aac
"""
pass
class F33B4DE316B8A456480DD6ED5B5E2D35A2E6FCAF74BAC40D7A2970D318B153F85(BaseModel):
"""
F33B4DE316B8A456480DD6ED5B5E2D35A2E6FCAF74BAC40D7A2970D318B153F85
"""
pass

View File

@@ -1,16 +0,0 @@
import uvicorn
from config import api_config
from .create_app import create_app
# from prometheus_fastapi_instrumentator import Instrumentator
app = create_app() # Create FastAPI application
# Instrumentator().instrument(app=app).expose(app=app) # Setup Prometheus metrics
if __name__ == "__main__":
# Run the application with Uvicorn Server
uvicorn_config = uvicorn.Config(**api_config.app_as_dict)
uvicorn.Server(uvicorn_config).run()

View File

@@ -1,64 +0,0 @@
from pydantic_settings import BaseSettings, SettingsConfigDict
from fastapi.responses import JSONResponse
class Configs(BaseSettings):
"""
ApiTemplate configuration settings.
"""
PATH: str = ""
HOST: str = ""
PORT: int = 0
LOG_LEVEL: str = "info"
RELOAD: int = 0
ACCESS_TOKEN_TAG: str = ""
ACCESS_EMAIL_EXT: str = ""
TITLE: str = ""
ALGORITHM: str = ""
ACCESS_TOKEN_LENGTH: int = 90
REFRESHER_TOKEN_LENGTH: int = 144
EMAIL_HOST: str = ""
DATETIME_FORMAT: str = ""
FORGOT_LINK: str = ""
ALLOW_ORIGINS: list = ["http://localhost:3000"]
VERSION: str = "0.1.001"
DESCRIPTION: str = ""
@property
def app_as_dict(self) -> dict:
"""
Convert the settings to a dictionary.
"""
return {
"app": self.PATH,
"host": self.HOST,
"port": int(self.PORT),
"log_level": self.LOG_LEVEL,
"reload": bool(self.RELOAD),
}
@property
def api_info(self):
"""
Returns a dictionary with application information.
"""
return {
"title": self.TITLE,
"description": self.DESCRIPTION,
"default_response_class": JSONResponse,
"version": self.VERSION,
}
@classmethod
def forgot_link(cls, forgot_key):
"""
Generate a forgot password link.
"""
return cls.FORGOT_LINK + forgot_key
model_config = SettingsConfigDict(env_prefix="API_")
api_config = Configs()

View File

@@ -1,49 +0,0 @@
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import RedirectResponse
from endpoints.routes import get_routes
from open_api_creator import create_openapi_schema
from middlewares.token_middleware import token_middleware
from initializer.create_route import RouteRegisterController
from config import api_config
def create_events_if_any_cluster_set():
from events import retrieve_all_clusters
for event_cluster in retrieve_all_clusters():
for event in event_cluster.retrieve_all_event_clusters:
event.set_events_to_database()
def create_app():
application = FastAPI(**api_config.api_info)
# application.mount(
# "/application/static",
# StaticFiles(directory="application/static"),
# name="static",
# )
application.add_middleware(
CORSMiddleware,
allow_origins=api_config.ALLOW_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@application.middleware("http")
async def add_token_middleware(request: Request, call_next):
return await token_middleware(request, call_next)
@application.get("/", description="Redirect Route", include_in_schema=False)
async def redirect_to_docs():
return RedirectResponse(url="/docs")
route_register = RouteRegisterController(app=application, router_list=get_routes())
application = route_register.register_routes()
create_events_if_any_cluster_set()
application.openapi = lambda _=application: create_openapi_schema(_)
return application

View File

@@ -1,67 +0,0 @@
import uuid
from fastapi import APIRouter, Request, Response, Header
from ApiServices.TemplateService.config import api_config
from ApiServices.TemplateService.events.template.event import template_event_cluster
test_template_route = APIRouter(prefix="/test", tags=["Test"])
@test_template_route.get(
path="/template",
description="Test Template Route",
operation_id="bb20c8c6-a289-4cab-9da7-34ca8a36c8e5",
)
def test_template(
request: Request,
response: Response,
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
):
"""
Test Template Route
"""
event_code = "bb20c8c6-a289-4cab-9da7-34ca8a36c8e5"
token = request.headers.get(api_config.ACCESS_TOKEN_TAG, None)
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
"tz": tz or "GMT+3",
"token": token,
}
event_cluster_matched = template_event_cluster.match_event(
event_keys=[
"3f510dcf-9f84-4eb9-b919-f582f30adab1",
"9f403034-deba-4e1f-b43e-b25d3c808d39",
"b8ec6e64-286a-4f60-8554-7a3865454944",
]
)
response.headers["X-Header"] = "Test Header GET"
if runner_callable := event_cluster_matched.example_callable():
return runner_callable
raise ValueError("Event key not found or multiple matches found")
@test_template_route.post(
path="/template",
description="Test Template Route with Post Method",
)
def test_template_post(request: Request, response: Response):
"""
Test Template Route with Post Method
"""
event_cluster_matched = template_event_cluster.match_event(
event_keys=[
"3f510dcf-9f84-4eb9-b919-f582f30adab1",
"9f403034-deba-4e1f-b43e-b25d3c808d39",
"b8ec6e64-286a-4f60-8554-7a3865454944",
]
)
response.headers["X-Header"] = "Test Header POST"
if runner_callable := event_cluster_matched.example_callable():
return runner_callable
raise ValueError("Event key not found or multiple matches found")

View File

@@ -1,9 +0,0 @@
from .template.cluster import TemplateEventClusterSet
__all__ = [
"TemplateEventClusterSet",
]
def retrieve_all_clusters():
return [TemplateEventClusterSet]

View File

@@ -1,17 +0,0 @@
from ApiServices.TemplateService.initializer.event_clusters import (
EventCluster,
SetEventCluster,
)
TemplateEventCluster = EventCluster(
endpoint_uu_id="bb20c8c6-a289-4cab-9da7-34ca8a36c8e5"
)
OtherTemplateEventCluster = EventCluster(
endpoint_uu_id="ecb82b7a-317f-469d-a682-ff431f152453"
)
TemplateEventClusterSet = SetEventCluster()
TemplateEventClusterSet.add_event_cluster(TemplateEventCluster)
TemplateEventClusterSet.add_event_cluster(OtherTemplateEventCluster)

View File

@@ -1,57 +0,0 @@
from ApiServices.TemplateService.initializer.event_clusters import Event
from .cluster import (
template_event_cluster,
other_template_event_cluster,
)
single_event = Event(
name="example_event",
key="176b829c-7622-4cf2-b474-411e5acb637c",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="Example event description",
)
def example_callable():
"""
Example callable method
"""
return {
"completed": True,
"message": "Example callable method 2",
"info": {
"host": "example_host",
"user_agent": "example_user_agent",
},
}
single_event.event_callable = example_callable
template_event_cluster.add_event([single_event])
other_event = Event(
name="example_event-2",
key="36b26d7c-2a9e-4006-a213-f54bc66e5455",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="Example event 2 description",
)
def example_callable_other():
"""
Example callable method
"""
return {
"completed": True,
"message": "Example callable method 1",
"info": {
"host": "example_host",
"user_agent": "example_user_agent",
},
}
other_event.event_callable = example_callable_other
other_template_event_cluster.add_event([other_event])

View File

@@ -1,42 +0,0 @@
from typing import List
from fastapi import APIRouter, FastAPI
class RouteRegisterController:
def __init__(self, app: FastAPI, router_list: List[APIRouter]):
self.router_list = router_list
self.app = app
@staticmethod
def add_router_with_event_to_database(router: APIRouter):
from Schemas import EndpointRestriction
with EndpointRestriction.new_session() as db_session:
for route in router.routes:
route_path = str(getattr(route, "path"))
route_summary = str(getattr(route, "name"))
operation_id = getattr(route, "operation_id", None)
if not operation_id:
continue
for route_method in [
method.lower() for method in getattr(route, "methods")
]:
restriction = EndpointRestriction.find_or_create(
endpoint_method=route_method,
endpoint_name=route_path,
endpoint_desc=route_summary.replace("_", " "),
endpoint_function=route_summary,
operation_uu_id=operation_id, # UUID of the endpoint
is_confirmed=True,
db=db_session,
)
if restriction.meta_data.created:
restriction.save(db=db_session)
def register_routes(self):
for router in self.router_list:
self.app.include_router(router)
self.add_router_with_event_to_database(router)
return self.app

View File

@@ -1,119 +0,0 @@
from typing import Optional, Type
from pydantic import BaseModel
class EventCluster:
"""
EventCluster
"""
def __init__(self, endpoint_uu_id: str):
self.endpoint_uu_id = endpoint_uu_id
self.events = []
def add_event(self, list_of_events: list["Event"]):
"""
Add an event to the cluster
"""
for event in list_of_events:
self.events.append(event)
self.events = list(set(self.events))
def get_event(self, event_key: str):
"""
Get an event by its key
"""
for event in self.events:
if event.key == event_key:
return event
return None
def set_events_to_database(self):
from Schemas import Events, EndpointRestriction
with Events.new_session() as db_session:
if to_save_endpoint := EndpointRestriction.filter_one(
EndpointRestriction.operation_uu_id == self.endpoint_uu_id,
db=db_session,
).data:
for event in self.events:
event_to_save_database = Events.find_or_create(
function_code=event.key,
function_class=event.name,
description=event.description,
endpoint_code=self.endpoint_uu_id,
endpoint_id=to_save_endpoint.id,
endpoint_uu_id=str(to_save_endpoint.uu_id),
is_confirmed=True,
active=True,
db=db_session,
)
if event_to_save_database.meta_data.created:
event_to_save_database.save(db=db_session)
print(
f"UUID: {event_to_save_database.uu_id} event is saved to {to_save_endpoint.uu_id}"
)
def match_event(self, event_key: str) -> "Event":
"""
Match an event by its key
"""
# print('set(event_keys)', set(event_keys))
# print('event.keys', set([event.key for event in self.events]))
# intersection_of_key: set[str] = set(event_key) & set([event.key for event in self.events])
# if not len(intersection_of_key) == 1:
# raise ValueError(
# f"Event key not found or multiple matches found: {intersection_of_key}"
# )
return self.get_event(event_key=event_key)
class Event:
def __init__(
self,
name: str,
key: str,
request_validator: Optional[Type[BaseModel]] = None,
response_validator: Optional[Type[BaseModel]] = None,
description: str = "",
):
self.name = name
self.key = key
self.request_validator = request_validator
self.response_validator = response_validator
self.description = description
def event_callable(self):
"""
Example callable method
"""
print(self.name)
return {}
class SetEventCluster:
"""
SetEventCluster
"""
list_of_event_clusters: list[EventCluster] = []
def add_event_cluster(self, event_cluster: EventCluster):
"""
Add an event cluster to the set
"""
endpoint_uu_id_list = [
event_cluster_uuid.endpoint_uu_id
for event_cluster_uuid in self.list_of_event_clusters
]
if event_cluster.endpoint_uu_id not in endpoint_uu_id_list:
self.list_of_event_clusters.append(event_cluster)
@property
def retrieve_all_event_clusters(self) -> list[EventCluster]:
"""
Retrieve all event clusters
"""
return self.list_of_event_clusters

View File

@@ -1,24 +0,0 @@
from fastapi import Request, status
from fastapi.responses import JSONResponse
from ApiServices.TemplateService.endpoints.routes import get_safe_endpoint_urls
from ApiServices.TemplateService.config import api_config
async def token_middleware(request: Request, call_next):
base_url = request.url.path
safe_endpoints = [_[0] for _ in get_safe_endpoint_urls()]
if base_url in safe_endpoints:
return await call_next(request)
token = request.headers.get(api_config.ACCESS_TOKEN_TAG, None)
if not token:
return JSONResponse(
content={
"error": "EYS_0002",
},
status_code=status.HTTP_401_UNAUTHORIZED,
)
response = await call_next(request)
return response

View File

@@ -1,116 +0,0 @@
from typing import Any, Dict
from fastapi import FastAPI
from fastapi.routing import APIRoute
from fastapi.openapi.utils import get_openapi
from config import api_config as template_api_config
from endpoints.routes import get_safe_endpoint_urls
class OpenAPISchemaCreator:
"""
OpenAPI schema creator and customizer for FastAPI applications.
"""
def __init__(self, app: FastAPI):
"""
Initialize the OpenAPI schema creator.
Args:
app: FastAPI application instance
"""
self.app = app
self.safe_endpoint_list: list[tuple[str, str]] = get_safe_endpoint_urls()
self.routers_list = self.app.routes
@staticmethod
def create_security_schemes() -> Dict[str, Any]:
"""
Create security scheme definitions.
Returns:
Dict[str, Any]: Security scheme configurations
"""
return {
"BearerAuth": {
"type": "apiKey",
"in": "header",
"name": template_api_config.ACCESS_TOKEN_TAG,
"description": "Enter: **'Bearer <JWT>'**, where JWT is the access token",
}
}
def configure_route_security(
self, path: str, method: str, schema: Dict[str, Any]
) -> None:
"""
Configure security requirements for a specific route.
Args:
path: Route path
method: HTTP method
schema: OpenAPI schema to modify
"""
if not schema.get("paths", {}).get(path, {}).get(method):
return
# Check if endpoint is in safe list
endpoint_path = f"{path}:{method}"
list_of_safe_endpoints = [
f"{e[0]}:{str(e[1]).lower()}" for e in self.safe_endpoint_list
]
if endpoint_path not in list_of_safe_endpoints:
if "security" not in schema["paths"][path][method]:
schema["paths"][path][method]["security"] = []
schema["paths"][path][method]["security"].append({"BearerAuth": []})
def create_schema(self) -> Dict[str, Any]:
"""
Create the complete OpenAPI schema.
Returns:
Dict[str, Any]: Complete OpenAPI schema
"""
openapi_schema = get_openapi(
title=template_api_config.TITLE,
description=template_api_config.DESCRIPTION,
version=template_api_config.VERSION,
routes=self.app.routes,
)
# Add security schemes
if "components" not in openapi_schema:
openapi_schema["components"] = {}
openapi_schema["components"]["securitySchemes"] = self.create_security_schemes()
# Configure route security and responses
for route in self.app.routes:
if isinstance(route, APIRoute) and route.include_in_schema:
path = str(route.path)
methods = [method.lower() for method in route.methods]
for method in methods:
self.configure_route_security(path, method, openapi_schema)
# Add custom documentation extensions
openapi_schema["x-documentation"] = {
"postman_collection": "/docs/postman",
"swagger_ui": "/docs",
"redoc": "/redoc",
}
return openapi_schema
def create_openapi_schema(app: FastAPI) -> Dict[str, Any]:
"""
Create OpenAPI schema for a FastAPI application.
Args:
app: FastAPI application instance
Returns:
Dict[str, Any]: Complete OpenAPI schema
"""
creator = OpenAPISchemaCreator(app)
return creator.create_schema()

View File

@@ -1,206 +0,0 @@
import enum
from typing import Optional, Union, Dict, Any, List
from pydantic import BaseModel
from Controllers.Redis.database import RedisActions
class UserType(enum.Enum):
employee = 1
occupant = 2
class Credentials(BaseModel):
person_id: int
person_name: str
class ApplicationToken(BaseModel):
# Application Token Object -> is the main object for the user
user_type: int = UserType.occupant.value
credential_token: str = ""
user_uu_id: str
user_id: int
person_id: int
person_uu_id: str
request: Optional[dict] = None # Request Info of Client
expires_at: Optional[float] = None # Expiry timestamp
class OccupantToken(BaseModel):
# Selection of the occupant type for a build part is made by the user
living_space_id: int # Internal use
living_space_uu_id: str # Outer use
occupant_type_id: int
occupant_type_uu_id: str
occupant_type: str
build_id: int
build_uuid: str
build_part_id: int
build_part_uuid: str
responsible_company_id: Optional[int] = None
responsible_company_uuid: Optional[str] = None
responsible_employee_id: Optional[int] = None
responsible_employee_uuid: Optional[str] = None
# ID list of reachable event codes as "endpoint_code": ["UUID", "UUID"]
reachable_event_codes: Optional[dict[str, list[str]]] = None
# ID list of reachable applications as "page_url": ["UUID", "UUID"]
reachable_app_codes: Optional[dict[str, list[str]]] = None
class CompanyToken(BaseModel):
# Selection of the company for an employee is made by the user
company_id: int
company_uu_id: str
department_id: int # ID list of departments
department_uu_id: str # ID list of departments
duty_id: int
duty_uu_id: str
staff_id: int
staff_uu_id: str
employee_id: int
employee_uu_id: str
bulk_duties_id: int
# ID list of reachable event codes as "endpoint_code": ["UUID", "UUID"]
reachable_event_codes: Optional[dict[str, list[str]]] = None
# ID list of reachable applications as "page_url": ["UUID", "UUID"]
reachable_app_codes: Optional[dict[str, list[str]]] = None
class OccupantTokenObject(ApplicationToken):
# Occupant Token Object -> Requires selection of the occupant type for a specific build part
available_occupants: dict = None
selected_occupant: Optional[OccupantToken] = None # Selected Occupant Type
@property
def is_employee(self) -> bool:
return False
@property
def is_occupant(self) -> bool:
return True
class EmployeeTokenObject(ApplicationToken):
# Full hierarchy Employee[staff_id] -> Staff -> Duty -> Department -> Company
companies_id_list: List[int] # List of company objects
companies_uu_id_list: List[str] # List of company objects
duty_id_list: List[int] # List of duty objects
duty_uu_id_list: List[str] # List of duty objects
selected_company: Optional[CompanyToken] = None # Selected Company Object
@property
def is_employee(self) -> bool:
return True
@property
def is_occupant(self) -> bool:
return False
TokenDictType = Union[EmployeeTokenObject, OccupantTokenObject]
class TokenProvider:
AUTH_TOKEN: str = "AUTH_TOKEN"
@classmethod
def convert_redis_object_to_token(
cls, redis_object: Dict[str, Any]
) -> TokenDictType:
"""
Process Redis object and return appropriate token object.
"""
if redis_object.get("user_type") == UserType.employee.value:
return EmployeeTokenObject(**redis_object)
elif redis_object.get("user_type") == UserType.occupant.value:
return OccupantTokenObject(**redis_object)
raise ValueError("Invalid user type")
@classmethod
def get_dict_from_redis(
cls, token: Optional[str] = None, user_uu_id: Optional[str] = None
) -> Union[TokenDictType, List[TokenDictType]]:
"""
Retrieve token object from Redis using token and user_uu_id
"""
token_to_use, user_uu_id_to_use = token or "*", user_uu_id or "*"
list_of_token_dict, auth_key_list = [], [
cls.AUTH_TOKEN,
token_to_use,
user_uu_id_to_use,
]
if token:
result = RedisActions.get_json(list_keys=auth_key_list, limit=1)
if first_record := result.first:
return cls.convert_redis_object_to_token(first_record)
elif user_uu_id:
result = RedisActions.get_json(list_keys=auth_key_list)
if all_records := result.all:
for all_record in all_records:
list_of_token_dict.append(
cls.convert_redis_object_to_token(all_record)
)
return list_of_token_dict
raise ValueError(
"Token not found in Redis. Please check the token or user_uu_id."
)
@classmethod
def retrieve_application_codes(
cls, page_url: str, tokens: Union[TokenDictType, List[TokenDictType]]
):
if isinstance(tokens, TokenDictType):
if application_codes := tokens.reachable_app_codes.get(page_url, None):
return application_codes
elif isinstance(tokens, list):
retrieved_event_apps = []
for token in tokens:
if not isinstance(token, TokenDictType):
continue
if application_codes := token.reachable_app_codes.get(page_url, None):
retrieved_event_apps.append(application_codes)
return retrieved_event_apps
raise ValueError("Invalid token type or no application codes found.")
@classmethod
def retrieve_event_codes(
cls, endpoint_code: str, tokens: Union[TokenDictType, List[TokenDictType]]
):
if isinstance(tokens, TokenDictType):
if event_codes := tokens.reachable_event_codes.get(endpoint_code, None):
return event_codes
elif isinstance(tokens, List):
retrieved_event_codes = []
for token in tokens:
if not isinstance(token, TokenDictType):
continue
if event_codes := token.reachable_event_codes.get(endpoint_code, None):
retrieved_event_codes.append(event_codes)
return retrieved_event_codes
raise ValueError("Invalid token type or no event codes found.")