updated Service providers

This commit is contained in:
2025-04-05 22:54:05 +03:00
parent fa4df11323
commit 60507c87e0
21 changed files with 1134 additions and 183 deletions

View File

@@ -0,0 +1,30 @@
ToDo List:
Redis Items:
Internal serves from Class inside API:
- [ ] Save Events activities as [
{"endpoint_code-UUID1": ["event-UUID1", "event-UUID2", "event-UUID3"]},
{"endpoint_code-UUID2": ["event-UUID4", "event-UUID5", "event-UUID6"]}
]
- [ ] Save Application activities as [
{"app1": ["appCode-UUID1", "appCode-UUID2", "appCode-UUID3"]},
{"app2": ["appCode-UUID4", "appCode-UUID5", "appCode-UUID6"]}
]
Serve Only from Redis:
- [] Endpoint: "authentication/page/valid"
send data: {"page": "app1"}
result: {"page": "appCode-UUID4"}
Client side Frontend:
On Redirect [send=> authentication/page/valid]
Retrieve available application UUIDs from API
Redirect to related page with UUID
{
"appCode-UUID1": App1.tsx
}

View File

@@ -14,6 +14,7 @@ from ApiServices.AuthService.validations.request.authentication.login_post impor
RequestForgotPasswordPhone,
RequestForgotPasswordEmail,
RequestVerifyOTP,
RequestApplication,
)
from ApiServices.AuthService.events.auth.auth import AuthHandlers
@@ -367,7 +368,45 @@ def authentication_password_verify_otp(
"tz": tz or "GMT+3",
"token": token,
}
print('Token&OTP : ', data.otp, data.token)
print("Token&OTP : ", data.otp, data.token)
if not domain or not language:
return JSONResponse(
content={"error": "EYS_0003"},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
headers=headers,
)
return JSONResponse(
content={},
status_code=status.HTTP_202_ACCEPTED,
headers=headers,
)
@auth_route.post(
path="/page/valid",
summary="Verify if page is valid returns application avaliable",
description="Verify if page is valid returns application avaliable",
)
def authentication_page_valid(
request: Request,
data: RequestApplication,
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
):
"""
Verify if page is valid returns application that can user reach
page: { url = /building/create}
result: { "application": "4c11f5ef-0bbd-41ac-925e-f79d9aac2b0e" }
"""
token = request.headers.get(api_config.ACCESS_TOKEN_TAG, None)
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
"tz": tz or "GMT+3",
"token": token,
}
if not domain or not language:
return JSONResponse(
content={"error": "EYS_0003"},

View File

@@ -14,6 +14,10 @@ class RequestVerifyOTP(BaseModel):
otp: str
class RequestApplication(BaseModel):
page: str # /building/create
class RequestSelectOccupant(BaseModel):
company_uu_id: str

View File

@@ -1,5 +1,7 @@
from fastapi import APIRouter, Request, Response
import uuid
from fastapi import APIRouter, Request, Response, Header
from ApiServices.TemplateService.config import api_config
from ApiServices.TemplateService.events.template.event import template_event_cluster
test_template_route = APIRouter(prefix="/test", tags=["Test"])
@@ -10,10 +12,25 @@ test_template_route = APIRouter(prefix="/test", tags=["Test"])
description="Test Template Route",
operation_id="bb20c8c6-a289-4cab-9da7-34ca8a36c8e5",
)
def test_template(request: Request, response: Response):
def test_template(
request: Request,
response: Response,
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
):
"""
Test Template Route
"""
event_code = "bb20c8c6-a289-4cab-9da7-34ca8a36c8e5"
token = request.headers.get(api_config.ACCESS_TOKEN_TAG, None)
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
"tz": tz or "GMT+3",
"token": token,
}
event_cluster_matched = template_event_cluster.match_event(
event_keys=[
"3f510dcf-9f84-4eb9-b919-f582f30adab1",

View File

@@ -0,0 +1,210 @@
import enum
from typing import Optional, Union, Dict, Any, List
from pydantic import BaseModel
from Controllers.Redis.database import RedisActions
from ..config import api_config
class UserType(enum.Enum):
employee = 1
occupant = 2
class Credentials(BaseModel):
person_id: int
person_name: str
class ApplicationToken(BaseModel):
# Application Token Object -> is the main object for the user
user_type: int = UserType.occupant.value
credential_token: str = ""
user_uu_id: str
user_id: int
person_id: int
person_uu_id: str
request: Optional[dict] = None # Request Info of Client
expires_at: Optional[float] = None # Expiry timestamp
class OccupantToken(BaseModel):
# Selection of the occupant type for a build part is made by the user
living_space_id: int # Internal use
living_space_uu_id: str # Outer use
occupant_type_id: int
occupant_type_uu_id: str
occupant_type: str
build_id: int
build_uuid: str
build_part_id: int
build_part_uuid: str
responsible_company_id: Optional[int] = None
responsible_company_uuid: Optional[str] = None
responsible_employee_id: Optional[int] = None
responsible_employee_uuid: Optional[str] = None
reachable_event_codes: Optional[dict[str, list[str]]] = (
None # ID list of reachable modules
)
reachable_app_codes: Optional[dict[str, list[str]]] = (
None # ID list of reachable modules
)
class CompanyToken(BaseModel):
# Selection of the company for an employee is made by the user
company_id: int
company_uu_id: str
department_id: int # ID list of departments
department_uu_id: str # ID list of departments
duty_id: int
duty_uu_id: str
staff_id: int
staff_uu_id: str
employee_id: int
employee_uu_id: str
bulk_duties_id: int
reachable_event_codes: Optional[dict[str, list[str]]] = (
None # ID list of reachable modules
)
reachable_app_codes: Optional[dict[str, list[str]]] = (
None # ID list of reachable modules
)
class OccupantTokenObject(ApplicationToken):
# Occupant Token Object -> Requires selection of the occupant type for a specific build part
available_occupants: dict = None
selected_occupant: Optional[OccupantToken] = None # Selected Occupant Type
@property
def is_employee(self) -> bool:
return False
@property
def is_occupant(self) -> bool:
return True
class EmployeeTokenObject(ApplicationToken):
# Full hierarchy Employee[staff_id] -> Staff -> Duty -> Department -> Company
companies_id_list: List[int] # List of company objects
companies_uu_id_list: List[str] # List of company objects
duty_id_list: List[int] # List of duty objects
duty_uu_id_list: List[str] # List of duty objects
selected_company: Optional[CompanyToken] = None # Selected Company Object
@property
def is_employee(self) -> bool:
return True
@property
def is_occupant(self) -> bool:
return False
TokenDictType = Union[EmployeeTokenObject, OccupantTokenObject]
class TokenProvider:
AUTH_KEY: str = api_config.ACCESS_TOKEN_TAG
AUTH_TOKEN: str = "AUTH_TOKEN"
@classmethod
def process_redis_object(cls, redis_object: Dict[str, Any]) -> TokenDictType:
"""
Process Redis object and return appropriate token object.
"""
if not redis_object.get("selected_company"):
redis_object["selected_company"] = None
if not redis_object.get("selected_occupant"):
redis_object["selected_occupant"] = None
if redis_object.get("user_type") == UserType.employee.value:
return EmployeeTokenObject(**redis_object)
elif redis_object.get("user_type") == UserType.occupant.value:
return OccupantTokenObject(**redis_object)
raise ValueError("Invalid user type")
@classmethod
def get_dict_from_redis(
cls, token: Optional[str] = None, user_uu_id: Optional[str] = None
) -> Union[TokenDictType, List[TokenDictType]]:
"""
Retrieve token object from Redis using token and user_uu_id
"""
token_to_use, user_uu_id_to_use = token or "*", user_uu_id or "*"
list_of_token_dict, auth_key_list = [], [
cls.AUTH_TOKEN,
token_to_use,
user_uu_id_to_use,
]
if token:
result = RedisActions.get_json(list_keys=auth_key_list, limit=1)
if first_record := result.first:
return cls.process_redis_object(first_record)
elif user_uu_id:
result = RedisActions.get_json(list_keys=auth_key_list)
if all_records := result.all:
for all_record in all_records:
list_of_token_dict.append(cls.process_redis_object(all_record))
return list_of_token_dict
raise ValueError(
"Token not found in Redis. Please check the token or user_uu_id."
)
@classmethod
def retrieve_application_codes(
cls, page_url: str, tokens: Union[TokenDictType, List[TokenDictType]]
):
if isinstance(tokens, TokenDictType):
if application_codes := tokens.reachable_app_codes.get(page_url, None):
return application_codes
elif isinstance(tokens, list):
retrieved_event_apps = []
for token in tokens:
if application_codes := token.reachable_app_codes.get(page_url, None):
retrieved_event_apps.append(application_codes)
return retrieved_event_apps
raise ValueError("Invalid token type or no application codes found.")
@classmethod
def retrieve_event_codes(
cls, endpoint_code: str, tokens: Union[TokenDictType, List[TokenDictType]]
):
if isinstance(tokens, TokenDictType):
if event_codes := tokens.reachable_event_codes.get(endpoint_code, None):
return event_codes
elif isinstance(tokens, List):
retrieved_event_codes = []
for token in tokens:
if isinstance(token, TokenDictType):
if event_codes := token.reachable_event_codes.get(
endpoint_code, None
):
retrieved_event_codes.append(event_codes)
return retrieved_event_codes
raise ValueError("Invalid token type or no event codes found.")