latest version of apis event and cahce ablitites added

This commit is contained in:
2025-02-10 11:41:38 +03:00
parent e832ec7603
commit 26f601f01a
396 changed files with 34981 additions and 2 deletions

View File

@@ -0,0 +1,166 @@
"""
Account related API endpoints.
"""
from typing import Any, Dict
from fastapi import Request
from Events.Engine.abstract_class import MethodToEvent
from Events.base_request_model import EndpointBaseRequestModel, ContextRetrievers
from ApiLayers.Middleware.token_event_middleware import TokenEventMiddleware
from ApiLayers.ApiValidations.Response.default_response import (
EndpointSuccessListResponse,
)
from .function_handlers import (
AddressListFunctions,
AddressUpdateFunctions,
AddressSearchFunctions,
AddressCreateFunctions,
)
from .api_events import AddressSuperUserEvents
AddressListEventMethods = MethodToEvent(
name="AddressListEventMethods",
events={
AddressSuperUserEvents.AddressListEvents.key: AddressSuperUserEvents.AddressListEvents,
},
headers=[],
errors=[],
decorators_list=[TokenEventMiddleware.event_required],
url="/list",
method="POST",
summary="List all accounts by given previligous",
description="List all accounts by given previligous",
)
def account_list_event_endpoint(
request: Request, data: EndpointBaseRequestModel
) -> Dict[str, Any]:
context_retriever = ContextRetrievers(func=account_list_event_endpoint)
event_2_catch = AddressListEventMethods.retrieve_event(
event_function_code=f"{AddressSuperUserEvents.AddressListEvents.key}"
)
context_retriever.RESPONSE_VALIDATOR = event_2_catch.RESPONSE_VALIDATOR
data = event_2_catch.REQUEST_VALIDATOR(**data.data)
AddressListFunctions.context_retriever = context_retriever
pagination_result = event_2_catch.endpoint_callable(data=data)
return EndpointSuccessListResponse(
code=event_2_catch.static_key, lang=context_retriever.token.lang
).as_dict(
data=pagination_result.data, pagination=pagination_result.pagination.as_dict()
)
AddressListEventMethods.endpoint_callable = account_list_event_endpoint
AddressCreateEventMethods = MethodToEvent(
name="AddressCreateEventMethods",
events={
AddressSuperUserEvents.AddressCreateEvents.key: AddressSuperUserEvents.AddressCreateEvents,
},
headers=[],
errors=[],
decorators_list=[TokenEventMiddleware.event_required],
url="/create",
method="POST",
summary="Create Address via given data and previligous",
description="Create Address via given data and previligous",
)
def account_create_event_endpoint(
request: Request, data: EndpointBaseRequestModel
) -> Dict[str, Any]:
context_retriever = ContextRetrievers(func=account_create_event_endpoint)
event_2_catch = AddressCreateEventMethods.retrieve_event(
event_function_code=f"{AddressSuperUserEvents.AddressCreateEvents.key}"
)
context_retriever.RESPONSE_VALIDATOR = event_2_catch.RESPONSE_VALIDATOR
data = event_2_catch.REQUEST_VALIDATOR(**data.data)
AddressCreateFunctions.context_retriever = context_retriever
pagination_result = event_2_catch.endpoint_callable(data=data)
return EndpointSuccessListResponse(
code=event_2_catch.static_key, lang=context_retriever.token.lang
).as_dict(
data=pagination_result.data, pagination=pagination_result.pagination.as_dict()
)
AddressCreateEventMethods.endpoint_callable = account_create_event_endpoint
AddressUpdateEventMethods = MethodToEvent(
name="AddressUpdateEventMethods",
events={
AddressSuperUserEvents.AddressUpdateEvents.key: AddressSuperUserEvents.AddressUpdateEvents,
},
headers=[],
errors=[],
decorators_list=[TokenEventMiddleware.event_required],
url="/update",
method="POST",
summary="Update Address via given data and previligous",
description="Update Address via given data and previligous",
)
def account_update_event_endpoint(
request: Request, data: EndpointBaseRequestModel
) -> Dict[str, Any]:
context_retriever = ContextRetrievers(func=account_update_event_endpoint)
event_2_catch = AddressUpdateEventMethods.retrieve_event(
event_function_code=f"{AddressSuperUserEvents.AddressUpdateEvents.key}"
)
context_retriever.RESPONSE_VALIDATOR = event_2_catch.RESPONSE_VALIDATOR
data = event_2_catch.REQUEST_VALIDATOR(**data.data)
AddressUpdateFunctions.context_retriever = context_retriever
pagination_result = event_2_catch.endpoint_callable(data=data)
return EndpointSuccessListResponse(
code=event_2_catch.static_key, lang=context_retriever.token.lang
).as_dict(
data=pagination_result.data, pagination=pagination_result.pagination.as_dict()
)
AddressUpdateEventMethods.endpoint_callable = account_update_event_endpoint
AddressSearchEventMethods = MethodToEvent(
name="AddressSearchEventMethods",
events={
AddressSuperUserEvents.AddressSearchEvents.key: AddressSuperUserEvents.AddressSearchEvents,
},
headers=[],
errors=[],
decorators_list=[TokenEventMiddleware.event_required],
url="/search",
method="POST",
summary="Search Address via given data and previligous",
description="Search Address via given data and previligous",
)
def address_search_event_endpoint(
request: Request, data: EndpointBaseRequestModel
) -> Dict[str, Any]:
context_retriever = ContextRetrievers(func=account_update_event_endpoint)
event_2_catch = AddressUpdateEventMethods.retrieve_event(
event_function_code=f"{AddressSuperUserEvents.AddressSearchEvents.key}"
)
context_retriever.RESPONSE_VALIDATOR = event_2_catch.RESPONSE_VALIDATOR
data = event_2_catch.REQUEST_VALIDATOR(**data.data)
AddressSearchFunctions.context_retriever = context_retriever
pagination_result = event_2_catch.endpoint_callable(data=data)
return EndpointSuccessListResponse(
code=event_2_catch.static_key, lang=context_retriever.token.lang
).as_dict(
data=pagination_result.data, pagination=pagination_result.pagination.as_dict()
)
AddressSearchEventMethods.endpoint_callable = address_search_event_endpoint

View File

@@ -0,0 +1,85 @@
from Events.Engine.abstract_class import Event
from ApiLayers.LanguageModels.Request import (
LoginRequestLanguageModel,
)
# from models import TemplateResponseModels, TemplateRequestModels
from .function_handlers import AddressSuperUserFunctions
# Address List for super_user event
address_list_super_user_event = Event(
name="account_insert_super_user_event",
key="7ce855ce-db79-4397-b0ec-f5e408ea6447",
# request_validator=AccountRequestValidators.ListAccountRecord,
# response_validator=SelectResponseAccount,
# language_models=[AccountRecords.__language_model__],
language_models=[],
statics="",
description="List address by validation list options and queries.",
)
address_list_super_user_event.endpoint_callable = (
AddressSuperUserFunctions.AddressListFunctions.template_example_function_list
)
# Address Create for super_user event
address_create_super_user_event = Event(
name="account_insert_super_user_event",
key="d638a6b2-cf2e-4361-99a4-021183b75ec1",
# request_validator=AccountRequestValidators.ListAccountRecord,
# response_validator=SelectResponseAccount,
# language_models=[AccountRecords.__language_model__],
language_models=[],
statics="",
description="Create address by validation list options and queries.",
)
address_create_super_user_event.endpoint_callable = (
AddressSuperUserFunctions.AddressCreateFunctions.template_example_function_list
)
# Address Update for super_user event
address_update_super_user_event = Event(
name="account_insert_super_user_event",
key="455b8bf5-52e4-47fa-9338-102bfcd364e5",
# request_validator=AccountRequestValidators.ListAccountRecord,
# response_validator=SelectResponseAccount,
# language_models=[AccountRecords.__language_model__],
language_models=[],
statics="",
description="Update address by validation list options and queries.",
)
address_update_super_user_event.endpoint_callable = (
AddressSuperUserFunctions.AddressUpdateFunctions.template_example_function_list
)
# Address Update for super_user event
address_search_super_user_event = Event(
name="account_insert_super_user_event",
key="7dd8c122-fae5-4a6d-a439-068312bb4df3",
# request_validator=AccountRequestValidators.ListAccountRecord,
# response_validator=SelectResponseAccount,
# language_models=[AccountRecords.__language_model__],
language_models=[],
statics="",
description="Search address by validation list options and queries.",
)
address_search_super_user_event.endpoint_callable = (
AddressSuperUserFunctions.AddressSearchFunctions.template_example_function_list
)
class AddressSuperUserEvents:
AddressListEvents = address_list_super_user_event
AddressCreateEvents = address_create_super_user_event
AddressUpdateEvents = address_update_super_user_event
AddressSearchEvents = address_search_super_user_event

View File

@@ -0,0 +1,27 @@
from Events.Engine.abstract_class import CategoryCluster
from .address import (
AddressListEventMethods,
AddressCreateEventMethods,
AddressUpdateEventMethods,
AddressSearchEventMethods,
)
from .info import address_page_info
AddressCluster = CategoryCluster(
name="AddressCluster",
tags=["Address"],
prefix="/address",
description="Address Cluster",
pageinfo=address_page_info,
endpoints={
"AddressListEventMethods": AddressListEventMethods,
"AddressCreateEventMethods": AddressCreateEventMethods,
"AddressUpdateEventMethods": AddressUpdateEventMethods,
"AddressSearchEventMethods": AddressSearchEventMethods,
},
include_in_schema=True,
sub_category=[],
is_client=True,
)

View File

@@ -0,0 +1,157 @@
from typing import Union, Optional
from ApiLayers.ApiValidations.Request import ListOptions
from Events.base_request_model import BaseRouteModel, ListOptionsBase
from Services.PostgresDb.Models.pagination import PaginationResult
from ApiLayers.Schemas import AddressNeighborhood
class Handlers:
"""Class for handling authentication functions"""
@classmethod # Requires no auth context
def handle_function(cls, **kwargs):
"""Handle function with kwargs"""
return
class AddressListFunctions(BaseRouteModel):
@classmethod
def template_example_function_list(
cls, data: Optional[Union[dict, ListOptions]]
) -> PaginationResult:
list_options_base = ListOptionsBase(
table=AddressNeighborhood,
list_options=data,
model_query=None,
)
db_session, query_options = list_options_base.init_list_options()
if cls.context_retriever.token.is_occupant:
AddressNeighborhood.pre_query = AddressNeighborhood.filter_all(
AddressNeighborhood.neighborhood_code.icontains("10"),
db=db_session,
).query
elif cls.context_retriever.token.is_employee:
AddressNeighborhood.pre_query = AddressNeighborhood.filter_all(
AddressNeighborhood.neighborhood_code.icontains("9"),
db=db_session,
).query
records = AddressNeighborhood.filter_all(
*query_options.convert(), db=db_session
)
return list_options_base.paginated_result(
records=records,
response_model=getattr(cls.context_retriever, "RESPONSE_VALIDATOR", None),
)
class AddressCreateFunctions(BaseRouteModel):
@classmethod
def template_example_function_list(
cls, data: Optional[Union[dict, ListOptions]]
) -> PaginationResult:
from ApiLayers.Schemas import AddressNeighborhood
list_options_base = ListOptionsBase(
table=AddressNeighborhood,
list_options=data,
model_query=None,
)
db_session, query_options = list_options_base.init_list_options()
if cls.context_retriever.token.is_occupant:
AddressNeighborhood.pre_query = AddressNeighborhood.filter_all(
AddressNeighborhood.neighborhood_code.icontains("10"),
db=db_session,
).query
elif cls.context_retriever.token.is_employee:
AddressNeighborhood.pre_query = AddressNeighborhood.filter_all(
AddressNeighborhood.neighborhood_code.icontains("9"),
db=db_session,
).query
records = AddressNeighborhood.filter_all(
*query_options.convert(), db=db_session
)
return list_options_base.paginated_result(
records=records,
response_model=getattr(cls.context_retriever, "RESPONSE_VALIDATOR", None),
)
class AddressSearchFunctions(BaseRouteModel):
"""Event methods for searching addresses.
This class handles address search functionality including text search
and filtering.
"""
@classmethod
def template_example_function_list(
cls, data: Optional[Union[dict, ListOptions]]
) -> PaginationResult:
from ApiLayers.Schemas import AddressNeighborhood
list_options_base = ListOptionsBase(
table=AddressNeighborhood,
list_options=data,
model_query=None,
)
db_session, query_options = list_options_base.init_list_options()
if cls.context_retriever.token.is_occupant:
AddressNeighborhood.pre_query = AddressNeighborhood.filter_all(
AddressNeighborhood.neighborhood_code.icontains("10"),
db=db_session,
).query
elif cls.context_retriever.token.is_employee:
AddressNeighborhood.pre_query = AddressNeighborhood.filter_all(
AddressNeighborhood.neighborhood_code.icontains("9"),
db=db_session,
).query
records = AddressNeighborhood.filter_all(
*query_options.convert(), db=db_session
)
return list_options_base.paginated_result(
records=records,
response_model=getattr(cls.context_retriever, "RESPONSE_VALIDATOR", None),
)
class AddressUpdateFunctions(BaseRouteModel):
@classmethod
def template_example_function_list(
cls, data: Optional[Union[dict, ListOptions]]
) -> PaginationResult:
from ApiLayers.Schemas import AddressNeighborhood
list_options_base = ListOptionsBase(
table=AddressNeighborhood,
list_options=data,
model_query=None,
)
db_session, query_options = list_options_base.init_list_options()
if cls.context_retriever.token.is_occupant:
AddressNeighborhood.pre_query = AddressNeighborhood.filter_all(
AddressNeighborhood.neighborhood_code.icontains("10"),
db=db_session,
).query
elif cls.context_retriever.token.is_employee:
AddressNeighborhood.pre_query = AddressNeighborhood.filter_all(
AddressNeighborhood.neighborhood_code.icontains("9"),
db=db_session,
).query
records = AddressNeighborhood.filter_all(
*query_options.convert(), db=db_session
)
return list_options_base.paginated_result(
records=records,
response_model=getattr(cls.context_retriever, "RESPONSE_VALIDATOR", None),
)
class AddressSuperUserFunctions:
AddressListFunctions = AddressListFunctions
AddressCreateFunctions = AddressCreateFunctions
AddressSearchFunctions = AddressSearchFunctions
AddressUpdateFunctions = AddressUpdateFunctions

View File

@@ -0,0 +1,77 @@
from Events.Engine.abstract_class import PageInfo
from .address import (
AddressListEventMethods,
AddressCreateEventMethods,
AddressUpdateEventMethods,
AddressSearchEventMethods,
)
prefix = "/address"
cluster_name = "AddressCluster"
address_page_info = PageInfo(
name=f"{cluster_name}",
url=f"/dashboard?site={cluster_name}",
icon="Building",
endpoints={
str(
f"{prefix}{AddressUpdateEventMethods.URL}"
): AddressUpdateEventMethods.retrieve_all_event_keys(),
str(
f"{prefix}{AddressCreateEventMethods.URL}"
): AddressCreateEventMethods.retrieve_all_event_keys(),
str(
f"{prefix}{AddressSearchEventMethods.URL}"
): AddressSearchEventMethods.retrieve_all_event_keys(),
str(
f"{prefix}{AddressListEventMethods.URL}"
): AddressListEventMethods.retrieve_all_event_keys(),
},
language_models={
"page_info": {
"key": "pair", # key: pair, value: dict
"description": {
"en": "Account Records for reaching user all types account information",
"tr": "Kullanıcı tüm hesap bilgilerine ulaşmak için Hesap Kayıtları",
},
},
f"{prefix}{AddressUpdateEventMethods.URL}": {
"component": "Button",
"site_url": f"/update?site={cluster_name}",
"page_info": {
"text": {
"en": "Update Account Records",
"tr": "Hesap Kayıdı Güncelle",
},
},
},
f"{prefix}{AddressCreateEventMethods.URL}": {
"component": "Button",
"site_url": f"/create?site={cluster_name}",
"page_info": {
"text": {
"en": "Create Account Records",
"tr": "Hesap Kayıdı Oluştur",
},
},
},
f"{prefix}{AddressSearchEventMethods.URL}": {
"component": "Search",
"page_info": {
"text": {
"en": "Search Account Records",
"tr": "Hesap Kayıtlarını Ara",
},
},
},
f"{prefix}{AddressListEventMethods.URL}": {
"component": "Table",
"fetch_url": AddressListEventMethods.URL,
"page_info": {
"description": {
"en": "Account Records for reaching user all types account information",
"tr": "Kullanıcı tüm hesap bilgilerine ulaşmak için Hesap Kayıtları",
},
},
},
},
)