validation services added
This commit is contained in:
@@ -27,7 +27,7 @@ authentication_login_super_user_event.endpoint_callable = (
|
||||
# Auth Select Company or Occupant Type
|
||||
authentication_select_super_user_event = Event(
|
||||
name="authentication_select_super_user_event",
|
||||
key="a5d2d0d1-3e9b-4b0f-8c7d-6d4a4b4c4d4e",
|
||||
key="f951ae1a-7950-4eab-ae2d-5bd9c2d21173",
|
||||
request_validator=AuthenticationRequestModels.SelectCompanyOrOccupantTypeSuperUserRequestModel,
|
||||
language_models=[SelectRequestLanguageModel],
|
||||
response_validation_static="LOGIN_SELECT",
|
||||
|
||||
@@ -2,4 +2,10 @@
|
||||
Validations package initialization.
|
||||
"""
|
||||
|
||||
__all__ = []
|
||||
|
||||
from .validation.cluster import ValidationsCluster
|
||||
|
||||
|
||||
__all__ = [
|
||||
"ValidationsCluster",
|
||||
]
|
||||
|
||||
@@ -1,116 +0,0 @@
|
||||
from typing import TYPE_CHECKING, Dict, Any, Union
|
||||
|
||||
from ApiEvents.base_request_model import DictRequestModel, EndpointBaseRequestModel
|
||||
from ApiEvents.abstract_class import (
|
||||
RouteFactoryConfig,
|
||||
EndpointFactoryConfig,
|
||||
endpoint_wrapper,
|
||||
)
|
||||
from ApiValidations.Custom.token_objects import EmployeeTokenObject, OccupantTokenObject
|
||||
from ErrorHandlers.Exceptions.api_exc import HTTPExceptionApi
|
||||
from ApiLibrary.common.line_number import get_line_number_for_error
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fastapi import Request, HTTPException, status, Body
|
||||
|
||||
|
||||
# Type aliases for common types
|
||||
prefix = "/available"
|
||||
|
||||
|
||||
async def check_endpoints_available(request: "Request") -> Dict[str, Any]:
|
||||
"""
|
||||
Check if endpoints are available.
|
||||
"""
|
||||
auth_dict = check_endpoints_available.auth
|
||||
selection_of_user = None
|
||||
if auth_dict.is_occupant:
|
||||
selection_of_user = auth_dict.selected_occupant
|
||||
else:
|
||||
selection_of_user = auth_dict.selected_company
|
||||
if not selection_of_user:
|
||||
raise HTTPExceptionApi(
|
||||
error_code="",
|
||||
lang=auth_dict.lang,
|
||||
loc=get_line_number_for_error(),
|
||||
sys_msg="User selection not found",
|
||||
)
|
||||
return {"reachable_event_endpoints": selection_of_user.reachable_event_endpoints}
|
||||
|
||||
|
||||
async def check_endpoint_available(
|
||||
request: "Request",
|
||||
data: EndpointBaseRequestModel,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Check if endpoints are available.
|
||||
"""
|
||||
auth_dict = check_endpoint_available.auth
|
||||
print("data", data)
|
||||
data_dict = data.data
|
||||
endpoint_asked = data_dict.get("endpoint", None)
|
||||
|
||||
if not endpoint_asked:
|
||||
raise HTTPExceptionApi(
|
||||
error_code="",
|
||||
lang=auth_dict.lang,
|
||||
loc=get_line_number_for_error(),
|
||||
sys_msg="Endpoint not found",
|
||||
)
|
||||
|
||||
selection_of_user = None
|
||||
if auth_dict.is_occupant:
|
||||
selection_of_user = auth_dict.selected_occupant
|
||||
else:
|
||||
selection_of_user = auth_dict.selected_company
|
||||
if not selection_of_user:
|
||||
raise HTTPExceptionApi(
|
||||
error_code="",
|
||||
lang=auth_dict.lang,
|
||||
loc=get_line_number_for_error(),
|
||||
sys_msg="User selection not found",
|
||||
)
|
||||
|
||||
if endpoint_asked not in selection_of_user.reachable_event_endpoints:
|
||||
raise HTTPExceptionApi(
|
||||
error_code="",
|
||||
lang=auth_dict.lang,
|
||||
loc=get_line_number_for_error(),
|
||||
sys_msg="Endpoint not found",
|
||||
)
|
||||
return {"endpoint": endpoint_asked, "status": "OK"}
|
||||
|
||||
|
||||
AVAILABLE_CONFIG = RouteFactoryConfig(
|
||||
name="available_endpoints",
|
||||
prefix=prefix,
|
||||
tags=["Available Endpoints"],
|
||||
include_in_schema=True,
|
||||
endpoints=[
|
||||
EndpointFactoryConfig(
|
||||
url_prefix=prefix,
|
||||
url_endpoint="/endpoints",
|
||||
url_of_endpoint=f"{prefix}/endpoints",
|
||||
endpoint="/endpoints",
|
||||
method="POST",
|
||||
summary="Retrieve all endpoints available for user",
|
||||
description="",
|
||||
is_auth_required=True, # Needs token_dict
|
||||
is_event_required=False,
|
||||
endpoint_function=check_endpoints_available,
|
||||
),
|
||||
EndpointFactoryConfig(
|
||||
url_prefix=prefix,
|
||||
url_endpoint="/endpoint",
|
||||
url_of_endpoint=f"{prefix}/endpoint",
|
||||
endpoint="/endpoint",
|
||||
method="POST",
|
||||
summary="Retrieve an endpoint available for user",
|
||||
description="",
|
||||
is_auth_required=True, # Needs token_dict
|
||||
is_event_required=False,
|
||||
endpoint_function=check_endpoint_available,
|
||||
),
|
||||
],
|
||||
).as_dict()
|
||||
@@ -1,325 +0,0 @@
|
||||
"""
|
||||
request models.
|
||||
"""
|
||||
|
||||
from typing import TYPE_CHECKING, Dict, Any, Literal, Optional, TypedDict, Union
|
||||
from pydantic import BaseModel, Field, model_validator, RootModel, ConfigDict
|
||||
from ApiEvents.base_request_model import BaseRequestModel, DictRequestModel
|
||||
from ApiValidations.Custom.token_objects import EmployeeTokenObject, OccupantTokenObject
|
||||
from ApiValidations.Request.base_validations import ListOptions
|
||||
from ErrorHandlers.Exceptions.api_exc import HTTPExceptionApi
|
||||
from Schemas.identity.identity import (
|
||||
AddressPostcode,
|
||||
Addresses,
|
||||
RelationshipEmployee2PostCode,
|
||||
)
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fastapi import Request
|
||||
|
||||
|
||||
class AddressListEventMethods(MethodToEvent):
|
||||
|
||||
event_type = "SELECT"
|
||||
event_description = "List Address records"
|
||||
event_category = "Address"
|
||||
|
||||
__event_keys__ = {
|
||||
"9c251d7d-da70-4d63-a72c-e69c26270442": "address_list_super_user",
|
||||
"52afe375-dd95-4f4b-aaa2-4ec61bc6de52": "address_list_employee",
|
||||
}
|
||||
__event_validation__ = {
|
||||
"9c251d7d-da70-4d63-a72c-e69c26270442": ListAddressResponse,
|
||||
"52afe375-dd95-4f4b-aaa2-4ec61bc6de52": ListAddressResponse,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def address_list_super_user(
|
||||
cls,
|
||||
list_options: ListOptions,
|
||||
token_dict: Union[EmployeeTokenObject, OccupantTokenObject],
|
||||
):
|
||||
db = RelationshipEmployee2PostCode.new_session()
|
||||
post_code_list = RelationshipEmployee2PostCode.filter_all(
|
||||
RelationshipEmployee2PostCode.company_id
|
||||
== token_dict.selected_company.company_id,
|
||||
db=db,
|
||||
).data
|
||||
post_code_id_list = [post_code.member_id for post_code in post_code_list]
|
||||
if not post_code_id_list:
|
||||
raise HTTPExceptionApi(
|
||||
status_code=404,
|
||||
detail="User has no post code registered. User can not list addresses.",
|
||||
)
|
||||
get_street_ids = [
|
||||
street_id[0]
|
||||
for street_id in AddressPostcode.select_only(
|
||||
AddressPostcode.id.in_(post_code_id_list),
|
||||
select_args=[AddressPostcode.street_id],
|
||||
order_by=AddressPostcode.street_id.desc(),
|
||||
).data
|
||||
]
|
||||
if not get_street_ids:
|
||||
raise HTTPExceptionApi(
|
||||
status_code=404,
|
||||
detail="User has no street registered. User can not list addresses.",
|
||||
)
|
||||
Addresses.pre_query = Addresses.filter_all(
|
||||
Addresses.street_id.in_(get_street_ids),
|
||||
).query
|
||||
Addresses.filter_attr = list_options
|
||||
records = Addresses.filter_all().data
|
||||
return
|
||||
# return AlchemyJsonResponse(
|
||||
# completed=True, message="List Address records", result=records
|
||||
# )
|
||||
|
||||
@classmethod
|
||||
def address_list_employee(
|
||||
cls,
|
||||
list_options: ListOptions,
|
||||
token_dict: Union[EmployeeTokenObject, OccupantTokenObject],
|
||||
):
|
||||
Addresses.filter_attr = list_options
|
||||
Addresses.pre_query = Addresses.filter_all(
|
||||
Addresses.street_id.in_(get_street_ids),
|
||||
)
|
||||
records = Addresses.filter_all().data
|
||||
return
|
||||
# return AlchemyJsonResponse(
|
||||
# completed=True, message="List Address records", result=records
|
||||
# )
|
||||
|
||||
|
||||
class AddressCreateEventMethods(MethodToEvent):
|
||||
|
||||
event_type = "CREATE"
|
||||
event_description = ""
|
||||
event_category = ""
|
||||
|
||||
__event_keys__ = {
|
||||
"ffdc445f-da10-4ce4-9531-d2bdb9a198ae": "create_address",
|
||||
}
|
||||
__event_validation__ = {
|
||||
"ffdc445f-da10-4ce4-9531-d2bdb9a198ae": InsertAddress,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def create_address(
|
||||
cls,
|
||||
data: InsertAddress,
|
||||
token_dict: Union[EmployeeTokenObject, OccupantTokenObject],
|
||||
):
|
||||
post_code = AddressPostcode.filter_one(
|
||||
AddressPostcode.uu_id == data.post_code_uu_id,
|
||||
).data
|
||||
if not post_code:
|
||||
raise HTTPExceptionApi(
|
||||
status_code=404,
|
||||
detail="Post code not found. User can not create address without post code.",
|
||||
)
|
||||
|
||||
data_dict = data.excluded_dump()
|
||||
data_dict["street_id"] = post_code.street_id
|
||||
data_dict["street_uu_id"] = str(post_code.street_uu_id)
|
||||
del data_dict["post_code_uu_id"]
|
||||
address = Addresses.find_or_create(**data_dict)
|
||||
address.save()
|
||||
address.update(is_confirmed=True)
|
||||
address.save()
|
||||
return AlchemyJsonResponse(
|
||||
completed=True,
|
||||
message="Address created successfully",
|
||||
result=address.get_dict(),
|
||||
)
|
||||
|
||||
|
||||
class AddressSearchEventMethods(MethodToEvent):
|
||||
"""Event methods for searching addresses.
|
||||
|
||||
This class handles address search functionality including text search
|
||||
and filtering.
|
||||
"""
|
||||
|
||||
event_type = "SEARCH"
|
||||
event_description = "Search for addresses using text and filters"
|
||||
event_category = "Address"
|
||||
|
||||
__event_keys__ = {
|
||||
"e0ac1269-e9a7-4806-9962-219ac224b0d0": "search_address",
|
||||
}
|
||||
__event_validation__ = {
|
||||
"e0ac1269-e9a7-4806-9962-219ac224b0d0": SearchAddress,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def _build_order_clause(
|
||||
cls, filter_list: Dict[str, Any], schemas: List[str], filter_table: Any
|
||||
) -> Any:
|
||||
"""Build the ORDER BY clause for the query.
|
||||
|
||||
Args:
|
||||
filter_list: Dictionary of filter options
|
||||
schemas: List of available schema fields
|
||||
filter_table: SQLAlchemy table to query
|
||||
|
||||
Returns:
|
||||
SQLAlchemy order_by clause
|
||||
"""
|
||||
# Default to ordering by UUID if field not in schema
|
||||
if filter_list.get("order_field") not in schemas:
|
||||
filter_list["order_field"] = "uu_id"
|
||||
else:
|
||||
# Extract table and field from order field
|
||||
table_name, field_name = str(filter_list.get("order_field")).split(".")
|
||||
filter_table = getattr(databases.sql_models, table_name)
|
||||
filter_list["order_field"] = field_name
|
||||
|
||||
# Build order clause
|
||||
field = getattr(filter_table, filter_list.get("order_field"))
|
||||
return (
|
||||
field.desc()
|
||||
if str(filter_list.get("order_type"))[0] == "d"
|
||||
else field.asc()
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _format_record(cls, record: Any, schemas: List[str]) -> Dict[str, str]:
|
||||
"""Format a database record into a dictionary.
|
||||
|
||||
Args:
|
||||
record: Database record to format
|
||||
schemas: List of schema fields
|
||||
|
||||
Returns:
|
||||
Formatted record dictionary
|
||||
"""
|
||||
result = {}
|
||||
for index, schema in enumerate(schemas):
|
||||
value = str(record[index])
|
||||
# Special handling for UUID fields
|
||||
if "uu_id" in value:
|
||||
value = str(value)
|
||||
result[schema] = value
|
||||
return result
|
||||
|
||||
@classmethod
|
||||
def search_address(
|
||||
cls,
|
||||
data: SearchAddress,
|
||||
token_dict: Union[EmployeeTokenObject, OccupantTokenObject],
|
||||
) -> JSONResponse:
|
||||
"""Search for addresses using text search and filters.
|
||||
|
||||
Args:
|
||||
data: Search parameters including text and filters
|
||||
token_dict: Authentication token
|
||||
|
||||
Returns:
|
||||
JSON response with search results
|
||||
|
||||
Raises:
|
||||
HTTPExceptionApi: If search fails
|
||||
"""
|
||||
try:
|
||||
# Start performance measurement
|
||||
start_time = perf_counter()
|
||||
|
||||
# Get initial query
|
||||
search_result = AddressStreet.search_address_text(search_text=data.search)
|
||||
if not search_result:
|
||||
raise HTTPExceptionApi(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="No addresses found matching search criteria",
|
||||
)
|
||||
|
||||
query = search_result.get("query")
|
||||
schemas = search_result.get("schema")
|
||||
|
||||
# Apply filters
|
||||
filter_list = data.list_options.dump()
|
||||
filter_table = AddressStreet
|
||||
|
||||
# Build and apply order clause
|
||||
order = cls._build_order_clause(filter_list, schemas, filter_table)
|
||||
|
||||
# Apply pagination
|
||||
page_size = int(filter_list.get("size"))
|
||||
offset = (int(filter_list.get("page")) - 1) * page_size
|
||||
|
||||
# Execute query
|
||||
query = (
|
||||
query.order_by(order)
|
||||
.limit(page_size)
|
||||
.offset(offset)
|
||||
.populate_existing()
|
||||
)
|
||||
records = list(query.all())
|
||||
|
||||
# Format results
|
||||
results = [cls._format_record(record, schemas) for record in records]
|
||||
|
||||
# Log performance
|
||||
duration = perf_counter() - start_time
|
||||
print(f"Address search completed in {duration:.3f}s")
|
||||
|
||||
return AlchemyJsonResponse(
|
||||
completed=True, message="Address search results", result=results
|
||||
)
|
||||
|
||||
except HTTPExceptionApi as e:
|
||||
# Re-raise HTTP exceptions
|
||||
raise e
|
||||
except Exception as e:
|
||||
# Log and wrap other errors
|
||||
print(f"Address search error: {str(e)}")
|
||||
raise HTTPExceptionApi(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail="Failed to search addresses",
|
||||
) from e
|
||||
|
||||
|
||||
class AddressUpdateEventMethods(MethodToEvent):
|
||||
|
||||
event_type = "UPDATE"
|
||||
event_description = ""
|
||||
event_category = ""
|
||||
|
||||
__event_keys__ = {
|
||||
"1f9c3a9c-e5bd-4dcd-9b9a-3742d7e03a27": "update_address",
|
||||
}
|
||||
__event_validation__ = {
|
||||
"1f9c3a9c-e5bd-4dcd-9b9a-3742d7e03a27": UpdateAddress,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def update_address(
|
||||
cls,
|
||||
address_uu_id: str,
|
||||
data: UpdateAddress,
|
||||
token_dict: Union[EmployeeTokenObject, OccupantTokenObject],
|
||||
):
|
||||
if isinstance(token_dict, EmployeeTokenObject):
|
||||
address = Addresses.filter_one(
|
||||
Addresses.uu_id == address_uu_id,
|
||||
).data
|
||||
if not address:
|
||||
raise HTTPExceptionApi(
|
||||
status_code=404,
|
||||
detail=f"Address not found. User can not update with given address uuid : {address_uu_id}",
|
||||
)
|
||||
|
||||
data_dict = data.excluded_dump()
|
||||
updated_address = address.update(**data_dict)
|
||||
updated_address.save()
|
||||
return AlchemyJsonResponse(
|
||||
completed=True,
|
||||
message="Address updated successfully",
|
||||
result=updated_address.get_dict(),
|
||||
)
|
||||
elif isinstance(token_dict, OccupantTokenObject):
|
||||
raise HTTPExceptionApi(
|
||||
status_code=403,
|
||||
detail="Occupant can not update address.",
|
||||
)
|
||||
@@ -1,23 +1,25 @@
|
||||
from Events.Engine.abstract_class import Event
|
||||
from ApiLayers.LanguageModels.Request import (
|
||||
LoginRequestLanguageModel,
|
||||
)
|
||||
from typing import Any
|
||||
from fastapi import Request
|
||||
|
||||
from models import TemplateResponseModels, TemplateRequestModels
|
||||
from function_handlers import TemplateFunctions
|
||||
from Events.Engine.abstract_class import Event
|
||||
|
||||
from .models import ValidationsPydantic
|
||||
from .function_handlers import RetrieveValidation
|
||||
|
||||
|
||||
# Auth Login
|
||||
template_event = Event(
|
||||
name="authentication_login_super_user_event",
|
||||
key="a5d2d0d1-3e9b-4b0f-8c7d-6d4a4b4c4d4e",
|
||||
request_validator=TemplateRequestModels.TemplateRequestModelX,
|
||||
language_models=[LoginRequestLanguageModel],
|
||||
response_validation_static="LOGIN_SUCCESS",
|
||||
description="Login super user",
|
||||
validation_event = Event(
|
||||
name="validation_event",
|
||||
key="02b5a596-14ba-4361-90d7-c6755727c63f",
|
||||
request_validator=ValidationsPydantic,
|
||||
language_models=[],
|
||||
response_validation_static=None,
|
||||
description="Get Validations by event function code",
|
||||
)
|
||||
|
||||
|
||||
template_event.endpoint_callable = (
|
||||
TemplateFunctions.template_example_function()
|
||||
)
|
||||
def get_validation_by_event_function_code(request: Request, data: Any):
|
||||
return RetrieveValidation.retrieve_validation(data=data)
|
||||
|
||||
|
||||
validation_event.endpoint_callable = get_validation_by_event_function_code
|
||||
|
||||
@@ -1,14 +1,16 @@
|
||||
from Events.Engine.abstract_class import CategoryCluster
|
||||
from info import template_page_info
|
||||
|
||||
from .validation import ValidationEventMethods
|
||||
|
||||
|
||||
TemplateCluster = CategoryCluster(
|
||||
name="TemplateCluster",
|
||||
tags=["template"],
|
||||
prefix="/template",
|
||||
description="Template cluster",
|
||||
pageinfo=template_page_info,
|
||||
endpoints={},
|
||||
ValidationsCluster = CategoryCluster(
|
||||
name="ValidationsCluster",
|
||||
tags=["Validations"],
|
||||
prefix="/validations",
|
||||
description="Validations cluster",
|
||||
endpoints={
|
||||
"ValidationEventMethods": ValidationEventMethods,
|
||||
},
|
||||
include_in_schema=True,
|
||||
sub_category=[],
|
||||
)
|
||||
|
||||
@@ -1,21 +1,103 @@
|
||||
from typing import Any, Union
|
||||
"""
|
||||
Validation function handlers
|
||||
"""
|
||||
|
||||
from typing import Dict, Any
|
||||
from fastapi import Request
|
||||
|
||||
from Events.base_request_model import TokenDictType, BaseRouteModel
|
||||
from ApiLayers.AllConfigs.Redis.configs import RedisValidationKeysAction, RedisCategoryKeys
|
||||
from Services.Redis.Actions.actions import RedisActions
|
||||
from Events.base_request_model import BaseRouteModel
|
||||
|
||||
from config import ValidationsConfig
|
||||
|
||||
|
||||
class Handlers:
|
||||
"""Class for handling authentication functions"""
|
||||
class ValidateBase:
|
||||
|
||||
@classmethod # Requires no auth context
|
||||
def handle_function(cls, **kwargs):
|
||||
"""Handle function with kwargs"""
|
||||
return
|
||||
redis_key: str = f"{RedisCategoryKeys.METHOD_FUNCTION_CODES}:*:"
|
||||
|
||||
def __init__(self, url: str, reachable_codes: list):
|
||||
self.url = url
|
||||
self.reachable_codes = reachable_codes
|
||||
|
||||
@property
|
||||
def function_codes(self):
|
||||
redis_function_codes= RedisActions.get_json(
|
||||
list_keys=[f"{self.redis_key}{self.url}"]
|
||||
)
|
||||
if redis_function_codes.status:
|
||||
return redis_function_codes.first
|
||||
raise ValueError("Function code not found")
|
||||
|
||||
@property
|
||||
def intersection(self):
|
||||
intersection = list(set(self.function_codes).intersection(set(self.reachable_codes)))
|
||||
if not len(intersection) == 1:
|
||||
raise ValueError("Users reachable function codes does not match or match more than one.")
|
||||
return intersection[0]
|
||||
|
||||
|
||||
class TemplateFunctions(BaseRouteModel):
|
||||
"""Class for handling authentication functions"""
|
||||
class RedisHeaderRetrieve(ValidateBase):
|
||||
|
||||
redis_key: str = RedisValidationKeysAction.dynamic_header_request_key
|
||||
|
||||
@property
|
||||
def header(self):
|
||||
"""
|
||||
Headers: Headers which is merged with response model && language models of event
|
||||
"""
|
||||
redis_header= RedisActions.get_json(list_keys=[f"{self.redis_key}:{self.intersection}"])
|
||||
if redis_header.status:
|
||||
return redis_header.first
|
||||
raise ValueError("Header not found")
|
||||
|
||||
|
||||
class RedisValidationRetrieve(ValidateBase):
|
||||
|
||||
redis_key: str = RedisValidationKeysAction.dynamic_validation_key
|
||||
|
||||
@property
|
||||
def validation(self):
|
||||
"""
|
||||
Validation: Validation of event which is merged with response model && language models of event
|
||||
"""
|
||||
redis_validation = RedisActions.get_json(list_keys=[f"{self.redis_key}:{self.intersection}"])
|
||||
if redis_validation.status:
|
||||
return redis_validation.first
|
||||
raise ValueError("Header not found")
|
||||
|
||||
|
||||
class ValidationsBoth(RedisHeaderRetrieve, RedisValidationRetrieve):
|
||||
|
||||
@property
|
||||
def both(self) -> Dict[str, Any]:
|
||||
"""
|
||||
Headers: Headers which is merged with response model && language models of event
|
||||
Validation: Validation of event which is merged with response model && language models of event
|
||||
"""
|
||||
return {"headers": self.header, "validation": self.validation}
|
||||
|
||||
|
||||
class RetrieveValidation(BaseRouteModel):
|
||||
|
||||
@classmethod
|
||||
def template_example_function(cls):
|
||||
return
|
||||
def retrieve_validation(cls, data: Any):
|
||||
"""
|
||||
Retrieve validation by event function code
|
||||
"""
|
||||
if getattr(data, 'asked_field', "") not in ValidationsConfig.SUPPORTED_VALIDATIONS:
|
||||
raise ValueError(f"Invalid asked field please retry with valid fields {ValidationsConfig.SUPPORTED_VALIDATIONS}")
|
||||
|
||||
reachable_codes = []
|
||||
if cls.context_retriever.token.is_employee:
|
||||
reachable_codes = cls.context_retriever.token.selected_company.reachable_event_codes
|
||||
elif cls.context_retriever.token.is_occupant:
|
||||
reachable_codes = cls.context_retriever.token.selected_occupant.reachable_event_codes
|
||||
|
||||
validate_dict = dict(url=data.url, reachable_code=reachable_codes)
|
||||
if data.asked_field == "all":
|
||||
return ValidationsBoth(**validate_dict).both
|
||||
elif data.asked_field == "headers":
|
||||
return RedisHeaderRetrieve(**validate_dict).header
|
||||
elif data.asked_field == "validation":
|
||||
return RedisValidationRetrieve(**validate_dict).validation
|
||||
|
||||
@@ -1,25 +1,10 @@
|
||||
"""
|
||||
Validation records request and response models.
|
||||
"""
|
||||
|
||||
from typing import TYPE_CHECKING, Dict, Any
|
||||
from pydantic import BaseModel, Field, RootModel
|
||||
|
||||
from typing import Optional
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class ValidationsPydantic(BaseModel):
|
||||
class_model: str
|
||||
reachable_event_code: str
|
||||
lang: str
|
||||
|
||||
|
||||
class InsertValidationRecordRequestModel:
|
||||
pass
|
||||
|
||||
|
||||
class UpdateValidationRecordRequestModel:
|
||||
pass
|
||||
|
||||
|
||||
class ListOptionsValidationRecordRequestModel:
|
||||
pass
|
||||
event_code: str
|
||||
asked_field: Optional[str] = "all"
|
||||
|
||||
@@ -1,146 +1,40 @@
|
||||
"""
|
||||
Validation request models.
|
||||
template related API endpoints.
|
||||
"""
|
||||
|
||||
from typing import TYPE_CHECKING, Dict, Any
|
||||
from typing import Any, Dict
|
||||
from fastapi import Request
|
||||
|
||||
from ApiEvents.abstract_class import MethodToEvent
|
||||
from ApiLibrary.common.line_number import get_line_number_for_error
|
||||
from ApiValidations.Custom.validation_response import ValidationModel, ValidationParser
|
||||
from Events.Engine.abstract_class import MethodToEvent
|
||||
from Events.base_request_model import EndpointBaseRequestModel, ContextRetrievers
|
||||
from ApiLayers.Middleware.auth_middleware import MiddlewareModule
|
||||
|
||||
from ErrorHandlers.Exceptions.api_exc import HTTPExceptionApi
|
||||
from .models import ValidationsPydantic
|
||||
from .api_events import validation_event
|
||||
from .function_handlers import RetrieveValidation
|
||||
|
||||
|
||||
class AllModelsImport:
|
||||
|
||||
@classmethod
|
||||
def import_all_models(cls):
|
||||
from ApiEvents.events.account.account_records import (
|
||||
AccountListEventMethod,
|
||||
AccountUpdateEventMethod,
|
||||
AccountCreateEventMethod,
|
||||
)
|
||||
from ApiEvents.events.address.address import (
|
||||
AddressListEventMethod,
|
||||
AddressUpdateEventMethod,
|
||||
AddressCreateEventMethod,
|
||||
AddressSearchEventMethod,
|
||||
)
|
||||
|
||||
return dict(
|
||||
AccountListEventMethod=AccountListEventMethod,
|
||||
AccountUpdateEventMethod=AccountUpdateEventMethod,
|
||||
AccountCreateEventMethod=AccountCreateEventMethod,
|
||||
AddressListEventMethod=AddressListEventMethod,
|
||||
AddressUpdateEventMethod=AddressUpdateEventMethod,
|
||||
AddressCreateEventMethod=AddressCreateEventMethod,
|
||||
AddressSearchEventMethod=AddressSearchEventMethod,
|
||||
)
|
||||
ValidationEventMethods = MethodToEvent(
|
||||
name="ValidationEventMethods",
|
||||
events={validation_event.key: validation_event},
|
||||
headers=[],
|
||||
errors=[],
|
||||
url="/validations",
|
||||
method="POST",
|
||||
decorators_list=[MiddlewareModule.auth_required],
|
||||
summary="Get Validations by event function code",
|
||||
description="Get Validations by event function code by All, Header, Validation & request url",
|
||||
)
|
||||
|
||||
|
||||
class ValidationsBoth(MethodToEvent):
|
||||
|
||||
@classmethod
|
||||
def retrieve_both_validations_and_headers(
|
||||
cls, event: ValidationsPydantic
|
||||
) -> Dict[str, Any]:
|
||||
EVENT_MODELS = AllModelsImport.import_all_models()
|
||||
return_single_model = EVENT_MODELS.get(event.class_model, None)
|
||||
# event_class_validation = getattr(return_single_model, "__event_validation__", None)
|
||||
if not return_single_model:
|
||||
raise HTTPExceptionApi(
|
||||
error_code="",
|
||||
lang="en",
|
||||
loc=get_line_number_for_error(),
|
||||
sys_msg="Validation code not found",
|
||||
)
|
||||
response_model = return_single_model.retrieve_event_response_model(
|
||||
event.reachable_event_code
|
||||
)
|
||||
language_model_all = return_single_model.retrieve_language_parameters(
|
||||
function_code=event.reachable_event_code, language=event.lang
|
||||
)
|
||||
language_model = language_model_all.get("language_model", None)
|
||||
language_models = language_model_all.get("language_models", None)
|
||||
|
||||
validation = ValidationModel(response_model, language_model, language_models)
|
||||
"""
|
||||
Headers: Headers which is merged with response model && language models of event
|
||||
Validation: Validation of event which is merged with response model && language models of event
|
||||
"""
|
||||
return {
|
||||
"headers": validation.headers,
|
||||
"validation": validation.validation,
|
||||
# "language_models": language_model_all,
|
||||
}
|
||||
def authentication_login_with_domain_and_creds_endpoint(
|
||||
request: Request, data: EndpointBaseRequestModel
|
||||
) -> Dict[str, Any]:
|
||||
function = ValidationEventMethods.retrieve_event(event_function_code=f"{validation_event.key}")
|
||||
data = function.REQUEST_VALIDATOR(**data.data)
|
||||
RetrieveValidation.context_retriever = ContextRetrievers(func=authentication_login_with_domain_and_creds_endpoint)
|
||||
return function.endpoint_callable(request=request, data=data)
|
||||
|
||||
|
||||
class ValidationsValidations(MethodToEvent):
|
||||
|
||||
@classmethod
|
||||
def retrieve_validations(cls, event: ValidationsPydantic) -> Dict[str, Any]:
|
||||
EVENT_MODELS = AllModelsImport.import_all_models()
|
||||
return_single_model = EVENT_MODELS.get(event.class_model, None)
|
||||
# event_class_validation = getattr(return_single_model, "__event_validation__", None)
|
||||
if not return_single_model:
|
||||
raise HTTPExceptionApi(
|
||||
error_code="",
|
||||
lang="en",
|
||||
loc=get_line_number_for_error(),
|
||||
sys_msg="Validation code not found",
|
||||
)
|
||||
response_model = return_single_model.retrieve_event_response_model(
|
||||
event.reachable_event_code
|
||||
)
|
||||
language_model_all = return_single_model.retrieve_language_parameters(
|
||||
function_code=event.reachable_event_code, language=event.lang
|
||||
)
|
||||
language_model = language_model_all.get("language_model", None)
|
||||
language_models = language_model_all.get("language_models", None)
|
||||
|
||||
validation = ValidationModel(response_model, language_model, language_models)
|
||||
"""
|
||||
Headers: Headers which is merged with response model && language models of event
|
||||
Validation: Validation of event which is merged with response model && language models of event
|
||||
"""
|
||||
return {
|
||||
"validation": validation.validation,
|
||||
# "headers": validation.headers,
|
||||
# "language_models": language_model_all,
|
||||
}
|
||||
|
||||
|
||||
class ValidationsHeaders(MethodToEvent):
|
||||
|
||||
@classmethod
|
||||
def retrieve_headers(cls, event: ValidationsPydantic) -> Dict[str, Any]:
|
||||
EVENT_MODELS = AllModelsImport.import_all_models()
|
||||
return_single_model = EVENT_MODELS.get(event.class_model, None)
|
||||
# event_class_validation = getattr(return_single_model, "__event_validation__", None)
|
||||
if not return_single_model:
|
||||
raise HTTPExceptionApi(
|
||||
error_code="",
|
||||
lang="en",
|
||||
loc=get_line_number_for_error(),
|
||||
sys_msg="Validation code not found",
|
||||
)
|
||||
response_model = return_single_model.retrieve_event_response_model(
|
||||
event.reachable_event_code
|
||||
)
|
||||
language_model_all = return_single_model.retrieve_language_parameters(
|
||||
function_code=event.reachable_event_code, language=event.lang
|
||||
)
|
||||
language_model = language_model_all.get("language_model", None)
|
||||
language_models = language_model_all.get("language_models", None)
|
||||
|
||||
validation = ValidationModel(response_model, language_model, language_models)
|
||||
"""
|
||||
Headers: Headers which is merged with response model && language models of event
|
||||
Validation: Validation of event which is merged with response model && language models of event
|
||||
"""
|
||||
return {
|
||||
"headers": validation.headers,
|
||||
# "validation": validation.validation,
|
||||
# "language_models": language_model_all,
|
||||
}
|
||||
ValidationEventMethods.endpoint_callable = (
|
||||
authentication_login_with_domain_and_creds_endpoint
|
||||
)
|
||||
|
||||
@@ -181,22 +181,22 @@ class CategoryCluster:
|
||||
|
||||
TAGS: list
|
||||
PREFIX: str
|
||||
PAGEINFO: PageInfo
|
||||
PAGEINFO: Optional[PageInfo]
|
||||
DESCRIPTION: str
|
||||
ENDPOINTS: dict[str, MethodToEvent] # {"MethodToEvent": MethodToEvent, ...}
|
||||
SUBCATEGORY: Optional[List["CategoryCluster"]] # [CategoryCluster, ...]
|
||||
INCLUDE_IN_SCHEMA: Optional[bool] = True
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
tags: list,
|
||||
prefix: str,
|
||||
description: str,
|
||||
pageinfo: PageInfo,
|
||||
endpoints: dict[str, MethodToEvent],
|
||||
sub_category: list,
|
||||
include_in_schema: Optional[bool] = True,
|
||||
self,
|
||||
name: str,
|
||||
tags: list,
|
||||
prefix: str,
|
||||
description: str,
|
||||
endpoints: dict[str, MethodToEvent],
|
||||
sub_category: list,
|
||||
pageinfo: Optional[PageInfo] = None,
|
||||
include_in_schema: Optional[bool] = True,
|
||||
):
|
||||
self.NAME = name
|
||||
self.TAGS = tags
|
||||
|
||||
@@ -88,11 +88,11 @@ class PrepareEvents(DecoratorModule):
|
||||
cluster.get_redis_cluster_index_value()
|
||||
)
|
||||
# [SAVE]REDIS => CLUSTER_FUNCTION_CODES = {"ClusterToMethod"} : [FUNCTION_CODE, ...]}
|
||||
self.valid_redis_items.CLUSTER_FUNCTION_CODES_VALUE = {
|
||||
self.valid_redis_items.CLUSTER_FUNCTION_CODES_VALUE.update({
|
||||
f"{self.valid_redis_items.CLUSTER_FUNCTION_CODES_KEY}:{cluster.name}": tuple(
|
||||
cluster.retrieve_all_function_codes()
|
||||
)
|
||||
}
|
||||
})
|
||||
|
||||
for method_endpoint in list(cluster.ENDPOINTS.values()):
|
||||
# [SAVE]REDIS => ENDPOINT2CLASS = {MethodEvent: Endpoint("/.../.../..."), ...}
|
||||
|
||||
Reference in New Issue
Block a user