""" Validation function handlers """ from typing import Dict, Any, Optional from fastapi import Request from ApiLayers.AllConfigs.Redis.configs import ( RedisValidationKeysAction, RedisCategoryKeys, ) from Services.Redis.Actions.actions import RedisActions from Events.base_request_model import BaseRouteModel from config import ValidationsConfig class ValidateBase: redis_key: str = f"{RedisCategoryKeys.METHOD_FUNCTION_CODES}:*:" def __init__(self, url: str, reachable_codes: list): self.url = url self.reachable_codes = reachable_codes @property def function_codes(self): redis_function_codes = RedisActions.get_json( list_keys=[f"{self.redis_key}{self.url}"] ) if redis_function_codes.status: return redis_function_codes.first raise ValueError("Function code not found") @property def intersection(self): intersection = list( set(self.function_codes).intersection(set(self.reachable_codes)) ) if not len(intersection) == 1: raise ValueError( "Users reachable function codes does not match or match more than one." ) return intersection[0] class RedisHeaderRetrieve(ValidateBase): redis_key: str = RedisValidationKeysAction.dynamic_header_request_key @property def header(self): """ Headers: Headers which is merged with response model && language models of event """ redis_header = RedisActions.get_json( list_keys=[f"{self.redis_key}:{self.intersection}"] ) if redis_header.status: return redis_header.first raise ValueError("Header not found") class RedisValidationRetrieve(ValidateBase): redis_key: str = RedisValidationKeysAction.dynamic_validation_key @property def validation(self): """ Validation: Validation of event which is merged with response model && language models of event """ redis_validation = RedisActions.get_json( list_keys=[f"{self.redis_key}:{self.intersection}"] ) if redis_validation.status: return redis_validation.first raise ValueError("Validation not found") class ValidationsBoth(RedisHeaderRetrieve, RedisValidationRetrieve): @property def both(self) -> Dict[str, Any]: """ Headers: Headers which is merged with response model && language models of event Validation: Validation of event which is merged with response model && language models of event """ return {"headers": self.header, "validation": self.validation} class RetrieveValidation(BaseRouteModel): @classmethod def retrieve_validation(cls, data: Any): """ Retrieve validation by event function code """ if ( getattr(data, "asked_field", "") not in ValidationsConfig.SUPPORTED_VALIDATIONS ): raise ValueError( f"Invalid asked field please retry with valid fields {ValidationsConfig.SUPPORTED_VALIDATIONS}" ) reachable_codes = [] if cls.context_retriever.token.is_employee: reachable_codes = ( cls.context_retriever.token.selected_company.reachable_event_codes ) elif cls.context_retriever.token.is_occupant: reachable_codes = ( cls.context_retriever.token.selected_occupant.reachable_event_codes ) validate_dict = dict(url=data.url, reachable_code=reachable_codes) if data.asked_field == "all": return ValidationsBoth(**validate_dict).both elif data.asked_field == "headers": return RedisHeaderRetrieve(**validate_dict).header elif data.asked_field == "validation": return RedisValidationRetrieve(**validate_dict).validation class RetrievePage(BaseRouteModel): @staticmethod def get_site_cluster(page_name: str): """ /dashboard?site=clusterName retrieves clusterName """ if page_name: return page_name.split("?")[1].split("=")[1] raise ValueError("Page name not found") @classmethod def retrieve_cluster(cls, data: Any): """ Retrieve cluster by event function code """ reachable_codes = [] if cls.context_retriever.token.is_employee: reachable_codes = ( cls.context_retriever.token.selected_company.reachable_event_codes ) elif cls.context_retriever.token.is_occupant: reachable_codes = ( cls.context_retriever.token.selected_occupant.reachable_event_codes ) validate_dict = dict(url=data.url, reachable_code=reachable_codes) print("validate_dict", validate_dict) cluster_name = data.get("name") print("cluster_name", cluster_name) raise NotImplementedError("Cluster not found") @classmethod def retrieve_page(cls, data: Any) -> Optional[Dict[str, Any]]: """ Retrieve page by event function code Check reachable codes for user to render which page and components on Frontend Uses: Context Token Retriever to get user information from ACCESS_TOKEN CategoryCluster Retrieve Cluster Retrieve cluster by name Check available instructions of page and info user with it Returns: NAME=cluster_name.name, PREFIX=cluster_name.PREFIX, URL=dict(page_info).get("URL", None), ICON=dict(page_info).get("ICON", None), INFO=dict(page_info).get("INFO", None), SUB_COMPONENTS=sub_components, """ from Events.Engine import CategoryCluster from Events.JustEvents.events_file import retrieve_cluster_by_name reachable_codes = [] page_name = getattr(data, "page", None) if cls.context_retriever.token.is_employee: reachable_codes = ( cls.context_retriever.token.selected_company.reachable_event_codes ) elif cls.context_retriever.token.is_occupant: reachable_codes = ( cls.context_retriever.token.selected_occupant.reachable_event_codes ) cluster_from_all_events = cls.get_site_cluster(page_name=page_name) if not cluster_from_all_events: raise ValueError(f"Cluster not found : {page_name}") cluster_name: CategoryCluster = retrieve_cluster_by_name(cluster_from_all_events) if not cluster_name: raise ValueError("Cluster not found") page_info = cluster_name.retrieve_page_info().get(page_name, None) if not page_info: raise ValueError("Page not found") sub_components: dict = dict(page_info).get("SUB_COMPONENTS", {}) if not sub_components: raise ValueError("Sub components not found") new_page_info_dict = dict( NAME=cluster_name.name, PREFIX=cluster_name.PREFIX, URL=dict(page_info).get("URL", None), ICON=dict(page_info).get("ICON", None), INFO=dict(page_info).get("INFO", None), SUB_COMPONENTS=sub_components, # MAPPING=cluster_name.MAPPING, ) for key, events in dict(sub_components).items(): # Meaning client can reach this endpoint [] & [] intersection endpoint_event_codes = dict(events).get("ENDPOINTS", []) new_page_info_dict["SUB_COMPONENTS"][key].pop("ENDPOINTS") if not set(endpoint_event_codes) & set(reachable_codes): new_page_info_dict["SUB_COMPONENTS"].pop(key) return new_page_info_dict