253 lines
9.2 KiB
Python
253 lines
9.2 KiB
Python
"""
|
|
Validation function handlers
|
|
"""
|
|
|
|
from typing import Dict, Any, Optional
|
|
from fastapi import Request
|
|
|
|
from ApiLayers.AllConfigs.Redis.configs import (
|
|
RedisCategoryKeys,
|
|
RedisValidationKeysAction,
|
|
RedisCategoryPageInfoKeysAction,
|
|
)
|
|
from ApiLayers.ApiLibrary.common.line_number import get_line_number_for_error
|
|
from ApiLayers.ErrorHandlers.Exceptions.api_exc import HTTPExceptionApi
|
|
from Services.Redis.Actions.actions import RedisActions
|
|
from Events.base_request_model import BaseRouteModel
|
|
|
|
from config import ValidationsConfig
|
|
|
|
|
|
class ValidateBase:
|
|
|
|
redis_key: str = f"{RedisCategoryKeys.METHOD_FUNCTION_CODES}:*:"
|
|
|
|
def __init__(self, url: str, reachable_codes: list):
|
|
self.url = url
|
|
self.reachable_codes = reachable_codes
|
|
|
|
@property
|
|
def function_codes(self):
|
|
redis_function_codes = RedisActions.get_json(
|
|
list_keys=[f"{self.redis_key}{self.url}"]
|
|
)
|
|
if redis_function_codes.status:
|
|
return redis_function_codes.first
|
|
raise ValueError("Function code not found")
|
|
|
|
@property
|
|
def intersection(self):
|
|
intersection = list(
|
|
set(self.function_codes).intersection(set(self.reachable_codes))
|
|
)
|
|
if not len(intersection) == 1:
|
|
raise ValueError(
|
|
"Users reachable function codes does not match or match more than one."
|
|
)
|
|
return intersection[0]
|
|
|
|
|
|
class RedisHeaderRetrieve(ValidateBase):
|
|
|
|
redis_key: str = RedisValidationKeysAction.dynamic_header_request_key
|
|
|
|
@property
|
|
def header(self):
|
|
"""
|
|
Headers: Headers which is merged with response model && language models of event
|
|
"""
|
|
redis_header = RedisActions.get_json(
|
|
list_keys=[f"{self.redis_key}:{self.intersection}"]
|
|
)
|
|
if redis_header.status:
|
|
return redis_header.first
|
|
raise ValueError("Header not found")
|
|
|
|
|
|
class RedisValidationRetrieve(ValidateBase):
|
|
|
|
redis_key: str = RedisValidationKeysAction.dynamic_validation_key
|
|
|
|
@property
|
|
def validation(self):
|
|
"""
|
|
Validation: Validation of event which is merged with response model && language models of event
|
|
"""
|
|
redis_validation = RedisActions.get_json(
|
|
list_keys=[f"{self.redis_key}:{self.intersection}"]
|
|
)
|
|
if redis_validation.status:
|
|
return redis_validation.first
|
|
raise ValueError("Validation not found")
|
|
|
|
|
|
class ValidationsBoth(RedisHeaderRetrieve, RedisValidationRetrieve):
|
|
|
|
@property
|
|
def both(self) -> Dict[str, Any]:
|
|
"""
|
|
Headers: Headers which is merged with response model && language models of event
|
|
Validation: Validation of event which is merged with response model && language models of event
|
|
"""
|
|
return {"headers": self.header, "validation": self.validation}
|
|
|
|
|
|
class RetrieveValidation(BaseRouteModel):
|
|
|
|
@classmethod
|
|
def retrieve_validation(cls, data: Any):
|
|
"""
|
|
Retrieve validation by event function code
|
|
"""
|
|
if (
|
|
getattr(data, "asked_field", "")
|
|
not in ValidationsConfig.SUPPORTED_VALIDATIONS
|
|
):
|
|
raise ValueError(
|
|
f"Invalid asked field please retry with valid fields {ValidationsConfig.SUPPORTED_VALIDATIONS}"
|
|
)
|
|
|
|
reachable_codes = []
|
|
if cls.context_retriever.token.is_employee:
|
|
reachable_codes = (
|
|
cls.context_retriever.token.selected_company.reachable_event_codes
|
|
)
|
|
elif cls.context_retriever.token.is_occupant:
|
|
reachable_codes = (
|
|
cls.context_retriever.token.selected_occupant.reachable_event_codes
|
|
)
|
|
|
|
validate_dict = dict(url=data.url, reachable_code=reachable_codes)
|
|
if data.asked_field == "all":
|
|
return ValidationsBoth(**validate_dict).both
|
|
elif data.asked_field == "headers":
|
|
return RedisHeaderRetrieve(**validate_dict).header
|
|
elif data.asked_field == "validation":
|
|
return RedisValidationRetrieve(**validate_dict).validation
|
|
|
|
|
|
class RetrievePage(BaseRouteModel):
|
|
|
|
@staticmethod
|
|
def get_site_cluster(page_name: str):
|
|
"""
|
|
/dashboard?site=clusterName retrieves clusterName
|
|
"""
|
|
if page_name:
|
|
return page_name.split("?")[1].split("=")[1]
|
|
raise ValueError("Page name not found")
|
|
|
|
@classmethod
|
|
def retrieve_cluster(cls, data: Any):
|
|
"""
|
|
Retrieve cluster by event function code
|
|
"""
|
|
reachable_codes = []
|
|
if cls.context_retriever.token.is_employee:
|
|
reachable_codes = (
|
|
cls.context_retriever.token.selected_company.reachable_event_codes
|
|
)
|
|
elif cls.context_retriever.token.is_occupant:
|
|
reachable_codes = (
|
|
cls.context_retriever.token.selected_occupant.reachable_event_codes
|
|
)
|
|
validate_dict = dict(url=data.url, reachable_code=reachable_codes)
|
|
print("validate_dict", validate_dict)
|
|
cluster_name = data.get("name")
|
|
print("cluster_name", cluster_name)
|
|
raise NotImplementedError("Cluster not found")
|
|
|
|
@classmethod
|
|
def retrieve_page(cls, data: Any) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Retrieve page by event function code
|
|
Check reachable codes for user to render which page and components on Frontend
|
|
Uses:
|
|
Context Token Retriever to get user information from ACCESS_TOKEN
|
|
CategoryCluster Retrieve Cluster Retrieve cluster by name
|
|
Check available instructions of page and info user with it
|
|
Args:
|
|
request: FastAPI request object
|
|
data.page: Request body containing login credentials
|
|
{
|
|
"page": "/dashboard?site=AccountCluster"
|
|
}
|
|
Returns:
|
|
NAME=cluster_name.name,
|
|
PREFIX=cluster_name.PREFIX,
|
|
URL=dict(page_info).get("URL", None),
|
|
ICON=dict(page_info).get("ICON", None),
|
|
INFO=dict(page_info).get("INFO", None),
|
|
SUB_COMPONENTS=sub_components,
|
|
"""
|
|
from Events.Engine import CategoryCluster
|
|
from Events.JustEvents.events_file import retrieve_cluster_by_name
|
|
|
|
reachable_codes, page_name = [], getattr(data, "page", None)
|
|
if cls.context_retriever.token.is_employee:
|
|
reachable_codes = (
|
|
cls.context_retriever.token.selected_company.reachable_event_codes
|
|
)
|
|
elif cls.context_retriever.token.is_occupant:
|
|
reachable_codes = (
|
|
cls.context_retriever.token.selected_occupant.reachable_event_codes
|
|
)
|
|
cluster_name = str(page_name).split("?")[1].split("=")[1]
|
|
print("page_name", f"{RedisCategoryPageInfoKeysAction.page_index}:{page_name}")
|
|
page_info_from_redis = RedisActions.get_json(
|
|
list_keys=[
|
|
RedisCategoryPageInfoKeysAction.page_index,
|
|
cluster_name,
|
|
]
|
|
)
|
|
page_info_all = page_info_from_redis.first
|
|
if not page_info_all:
|
|
return dict(
|
|
NAME=f"{cluster_name}",
|
|
PREFIX=f"NOT IMPLEMENTED",
|
|
URL=f"{page_name}",
|
|
MESSAGE="Cluster not found"
|
|
)
|
|
|
|
page_info = page_info_all.get(page_name, {})
|
|
sub_components = dict(page_info).get("SUB_COMPONENTS", {})
|
|
|
|
print("PAGE INFO : ", dict(
|
|
page_name=page_name,
|
|
page_info=page_info,
|
|
sub_components=sub_components,
|
|
NAME=cluster_name,
|
|
PREFIX=page_info_all.get("prefix", ""),
|
|
TEMPLATE_UI=dict(page_info).get("TEMPLATE_UI", ""),
|
|
URL=dict(page_info).get("URL", None),
|
|
ICON=dict(page_info).get("ICON", None),
|
|
INFO=dict(page_info).get("INFO", None),
|
|
SUB_COMPONENTS=sub_components,
|
|
# MAPPING=cluster_name.MAPPING,
|
|
))
|
|
new_page_info = dict(
|
|
NAME=cluster_name,
|
|
PREFIX=page_info_all.get("prefix", ""),
|
|
TEMPLATE_UI=dict(page_info).get("TEMPLATE_UI", ""),
|
|
URL=dict(page_info).get("URL", None),
|
|
ICON=dict(page_info).get("ICON", None),
|
|
INFO=dict(page_info).get("INFO", None),
|
|
SUB_COMPONENTS=sub_components,
|
|
# MAPPING=cluster_name.MAPPING,
|
|
)
|
|
for key, events in dict(sub_components).items():
|
|
# Meaning client can reach this endpoint [] & [] intersection in reachable_codes
|
|
endpoint_event_codes = dict(events).get("ENDPOINTS", [])
|
|
if not len(list(set(endpoint_event_codes) & set(reachable_codes))) == 1:
|
|
print(f"{endpoint_event_codes} :||: Endpoint is not available for the current user")
|
|
new_page_info["SUB_COMPONENTS"].pop(key, None)
|
|
for key in new_page_info["SUB_COMPONENTS"].keys():
|
|
new_page_info["SUB_COMPONENTS"][key].pop("ENDPOINTS", None)
|
|
if len(new_page_info["SUB_COMPONENTS"]) == 0:
|
|
return dict(
|
|
NAME=cluster_name,
|
|
PREFIX=page_info_all.get("prefix", ""),
|
|
MESSAGE="This cluster is not registered or not accessible for the current user"
|
|
)
|
|
return new_page_info
|