215 lines
7.6 KiB
Python
215 lines
7.6 KiB
Python
"""
|
|
Validation function handlers
|
|
"""
|
|
|
|
from typing import Dict, Any
|
|
from fastapi import Request
|
|
|
|
from ApiLayers.AllConfigs.Redis.configs import (
|
|
RedisValidationKeysAction,
|
|
RedisCategoryKeys,
|
|
)
|
|
from Services.Redis.Actions.actions import RedisActions
|
|
from Events.base_request_model import BaseRouteModel
|
|
|
|
from config import ValidationsConfig
|
|
|
|
|
|
class ValidateBase:
|
|
|
|
redis_key: str = f"{RedisCategoryKeys.METHOD_FUNCTION_CODES}:*:"
|
|
|
|
def __init__(self, url: str, reachable_codes: list):
|
|
self.url = url
|
|
self.reachable_codes = reachable_codes
|
|
|
|
@property
|
|
def function_codes(self):
|
|
redis_function_codes = RedisActions.get_json(
|
|
list_keys=[f"{self.redis_key}{self.url}"]
|
|
)
|
|
if redis_function_codes.status:
|
|
return redis_function_codes.first
|
|
raise ValueError("Function code not found")
|
|
|
|
@property
|
|
def intersection(self):
|
|
intersection = list(
|
|
set(self.function_codes).intersection(set(self.reachable_codes))
|
|
)
|
|
if not len(intersection) == 1:
|
|
raise ValueError(
|
|
"Users reachable function codes does not match or match more than one."
|
|
)
|
|
return intersection[0]
|
|
|
|
|
|
class RedisHeaderRetrieve(ValidateBase):
|
|
|
|
redis_key: str = RedisValidationKeysAction.dynamic_header_request_key
|
|
|
|
@property
|
|
def header(self):
|
|
"""
|
|
Headers: Headers which is merged with response model && language models of event
|
|
"""
|
|
redis_header = RedisActions.get_json(
|
|
list_keys=[f"{self.redis_key}:{self.intersection}"]
|
|
)
|
|
if redis_header.status:
|
|
return redis_header.first
|
|
raise ValueError("Header not found")
|
|
|
|
|
|
class RedisValidationRetrieve(ValidateBase):
|
|
|
|
redis_key: str = RedisValidationKeysAction.dynamic_validation_key
|
|
|
|
@property
|
|
def validation(self):
|
|
"""
|
|
Validation: Validation of event which is merged with response model && language models of event
|
|
"""
|
|
redis_validation = RedisActions.get_json(
|
|
list_keys=[f"{self.redis_key}:{self.intersection}"]
|
|
)
|
|
if redis_validation.status:
|
|
return redis_validation.first
|
|
raise ValueError("Header not found")
|
|
|
|
|
|
class ValidationsBoth(RedisHeaderRetrieve, RedisValidationRetrieve):
|
|
|
|
@property
|
|
def both(self) -> Dict[str, Any]:
|
|
"""
|
|
Headers: Headers which is merged with response model && language models of event
|
|
Validation: Validation of event which is merged with response model && language models of event
|
|
"""
|
|
return {"headers": self.header, "validation": self.validation}
|
|
|
|
|
|
class RetrieveValidation(BaseRouteModel):
|
|
|
|
@classmethod
|
|
def retrieve_validation(cls, data: Any):
|
|
"""
|
|
Retrieve validation by event function code
|
|
"""
|
|
if (
|
|
getattr(data, "asked_field", "")
|
|
not in ValidationsConfig.SUPPORTED_VALIDATIONS
|
|
):
|
|
raise ValueError(
|
|
f"Invalid asked field please retry with valid fields {ValidationsConfig.SUPPORTED_VALIDATIONS}"
|
|
)
|
|
|
|
reachable_codes = []
|
|
if cls.context_retriever.token.is_employee:
|
|
reachable_codes = (
|
|
cls.context_retriever.token.selected_company.reachable_event_codes
|
|
)
|
|
elif cls.context_retriever.token.is_occupant:
|
|
reachable_codes = (
|
|
cls.context_retriever.token.selected_occupant.reachable_event_codes
|
|
)
|
|
|
|
validate_dict = dict(url=data.url, reachable_code=reachable_codes)
|
|
if data.asked_field == "all":
|
|
return ValidationsBoth(**validate_dict).both
|
|
elif data.asked_field == "headers":
|
|
return RedisHeaderRetrieve(**validate_dict).header
|
|
elif data.asked_field == "validation":
|
|
return RedisValidationRetrieve(**validate_dict).validation
|
|
|
|
|
|
class RetrievePage(BaseRouteModel):
|
|
|
|
@staticmethod
|
|
def get_site_cluster(page_name: str):
|
|
"""
|
|
/dashboard?site=clusterName retrieves clusterName
|
|
"""
|
|
if page_name:
|
|
return page_name.split("?")[1].split("=")[1]
|
|
raise ValueError("Page name not found")
|
|
|
|
@classmethod
|
|
def retrieve_cluster(cls, data: Any):
|
|
"""
|
|
Retrieve cluster by event function code
|
|
"""
|
|
reachable_codes = []
|
|
if cls.context_retriever.token.is_employee:
|
|
reachable_codes = (
|
|
cls.context_retriever.token.selected_company.reachable_event_codes
|
|
)
|
|
elif cls.context_retriever.token.is_occupant:
|
|
reachable_codes = (
|
|
cls.context_retriever.token.selected_occupant.reachable_event_codes
|
|
)
|
|
validate_dict = dict(url=data.url, reachable_code=reachable_codes)
|
|
print("validate_dict", validate_dict)
|
|
cluster_name = data.get("name")
|
|
print("cluster_name", cluster_name)
|
|
raise NotImplementedError("Cluster not found")
|
|
|
|
@classmethod
|
|
def retrieve_page(cls, data: Any):
|
|
"""
|
|
Retrieve page by event function code
|
|
"""
|
|
from Events.Engine import CategoryCluster
|
|
from Events.JustEvents.events_file import retrieve_cluster_by_name
|
|
|
|
reachable_codes = []
|
|
if cls.context_retriever.token.is_employee:
|
|
reachable_codes = (
|
|
cls.context_retriever.token.selected_company.reachable_event_codes
|
|
)
|
|
elif cls.context_retriever.token.is_occupant:
|
|
reachable_codes = (
|
|
cls.context_retriever.token.selected_occupant.reachable_event_codes
|
|
)
|
|
cluster_from_all_events = cls.get_site_cluster(page_name=data.page)
|
|
if not cluster_from_all_events:
|
|
raise ValueError(f"Cluster not found : {data.page}")
|
|
|
|
cluster_name: CategoryCluster = retrieve_cluster_by_name(cluster_from_all_events)
|
|
if not cluster_name:
|
|
raise ValueError("Cluster not found")
|
|
|
|
page_info = cluster_name.retrieve_page_info().get(data.page, None)
|
|
if not page_info:
|
|
raise ValueError("Page not found")
|
|
|
|
endpoints: dict = dict(page_info).get("endpoints", {})
|
|
if not endpoints:
|
|
raise ValueError("Endpoints not found")
|
|
|
|
new_page_info_dict = dict(
|
|
name=cluster_name.name,
|
|
prefix=cluster_name.PREFIX,
|
|
url=dict(page_info).get("url", None),
|
|
icon=dict(page_info).get("icon", None),
|
|
page_info=dict(page_info).get("page_info", None),
|
|
endpoints={},
|
|
language_models={},
|
|
instructions={},
|
|
)
|
|
for key, event_codes in dict(endpoints).items():
|
|
if set(event_codes) & set(reachable_codes):
|
|
language_models = dict(page_info).get("language_models", {})
|
|
instructions = dict(page_info).get("instructions", {})
|
|
new_page_info_dict["endpoints"][key] = True
|
|
if language_models.get(key, None):
|
|
if key in language_models[key].keys(): # key has sub key blocks inside lang model
|
|
for key_model, val_model in dict(language_models[key]).items():
|
|
if key_model in new_page_info_dict["endpoints"].keys():
|
|
new_page_info_dict["language_models"][key_model] = language_models[key][key_model]
|
|
else:
|
|
new_page_info_dict["language_models"][key] = language_models[key]
|
|
if instructions.get(key, None):
|
|
new_page_info_dict["instructions"][key] = instructions.get(key, None)
|
|
return new_page_info_dict
|