updated event and router stacks
This commit is contained in:
120
ApiControllers/abstracts/event_clusters.py
Normal file
120
ApiControllers/abstracts/event_clusters.py
Normal file
@@ -0,0 +1,120 @@
|
||||
from typing import Optional, Type
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class EventCluster:
|
||||
"""
|
||||
EventCluster
|
||||
"""
|
||||
def __init__(self, endpoint_uu_id: str, name: str):
|
||||
self.endpoint_uu_id = endpoint_uu_id
|
||||
self.name = name
|
||||
self.events: list["Event"] = []
|
||||
|
||||
def add_event(self, event: "Event"):
|
||||
"""
|
||||
Add an event to the cluster
|
||||
"""
|
||||
if event.key not in [e.key for e in self.events]:
|
||||
self.events.append(event)
|
||||
|
||||
def get_event(self, event_key: str):
|
||||
"""
|
||||
Get an event by its key
|
||||
"""
|
||||
|
||||
for event in self.events:
|
||||
if event.key == event_key:
|
||||
return event
|
||||
return None
|
||||
|
||||
def set_events_to_database(self):
|
||||
from Schemas import Events, EndpointRestriction
|
||||
|
||||
with Events.new_session() as db_session:
|
||||
print(f"Endpoint UUID: {self.endpoint_uu_id}")
|
||||
if to_save_endpoint := EndpointRestriction.filter_one(
|
||||
EndpointRestriction.operation_uu_id == self.endpoint_uu_id,
|
||||
db=db_session,
|
||||
).data:
|
||||
for event in self.events:
|
||||
print(f"event:", event.name)
|
||||
event_to_save_database = Events.find_or_create(
|
||||
function_code=event.key,
|
||||
function_class=event.name,
|
||||
description=event.description,
|
||||
endpoint_code=self.endpoint_uu_id,
|
||||
endpoint_id=to_save_endpoint.id,
|
||||
endpoint_uu_id=str(to_save_endpoint.uu_id),
|
||||
is_confirmed=True,
|
||||
db=db_session,
|
||||
include_args=[
|
||||
Events.function_code,
|
||||
Events.function_class,
|
||||
Events.endpoint_code,
|
||||
Events.endpoint_uu_id,
|
||||
],
|
||||
)
|
||||
if event_to_save_database.meta_data.created:
|
||||
event_to_save_database.save(db=db_session)
|
||||
print(
|
||||
f"UUID: {event_to_save_database.uu_id} event is saved to {to_save_endpoint.uu_id}"
|
||||
)
|
||||
|
||||
def match_event(self, event_key: str) -> "Event":
|
||||
"""
|
||||
Match an event by its key
|
||||
"""
|
||||
if event := self.get_event(event_key=event_key):
|
||||
return event
|
||||
raise ValueError("Event key not found")
|
||||
|
||||
|
||||
class Event:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
key: str,
|
||||
request_validator: Optional[Type[BaseModel]] = None,
|
||||
response_validator: Optional[Type[BaseModel]] = None,
|
||||
description: str = "",
|
||||
):
|
||||
self.name = name
|
||||
self.key = key
|
||||
self.request_validator = request_validator
|
||||
self.response_validator = response_validator
|
||||
self.description = description
|
||||
|
||||
def event_callable(self):
|
||||
"""
|
||||
Example callable method
|
||||
"""
|
||||
print(self.name)
|
||||
return {}
|
||||
|
||||
|
||||
class RouterCluster:
|
||||
"""
|
||||
RouterCluster
|
||||
"""
|
||||
event_clusters: dict[str, EventCluster] = {}
|
||||
|
||||
def __init__(self, name: str):
|
||||
self.name = name
|
||||
|
||||
def set_event_cluster(self, event_cluster: EventCluster):
|
||||
"""
|
||||
Add an event cluster to the set
|
||||
"""
|
||||
if event_cluster.name not in self.event_clusters:
|
||||
self.event_clusters[event_cluster.name] = event_cluster
|
||||
|
||||
def get_event_cluster(self, event_cluster_name: str) -> EventCluster:
|
||||
"""
|
||||
Get an event cluster by its name
|
||||
"""
|
||||
if event_cluster_name not in self.event_clusters:
|
||||
raise ValueError("Event cluster not found")
|
||||
return self.event_clusters[event_cluster_name]
|
||||
|
||||
0
ApiControllers/handlers/__init__.py
Normal file
0
ApiControllers/handlers/__init__.py
Normal file
0
ApiControllers/initializer/__init__.py
Normal file
0
ApiControllers/initializer/__init__.py
Normal file
42
ApiControllers/initializer/create_route.py
Normal file
42
ApiControllers/initializer/create_route.py
Normal file
@@ -0,0 +1,42 @@
|
||||
from typing import List
|
||||
from fastapi import APIRouter, FastAPI
|
||||
|
||||
|
||||
class RouteRegisterController:
|
||||
|
||||
def __init__(self, app: FastAPI, router_list: List[APIRouter]):
|
||||
self.router_list = router_list
|
||||
self.app = app
|
||||
|
||||
@staticmethod
|
||||
def add_router_with_event_to_database(router: APIRouter):
|
||||
from Schemas import EndpointRestriction
|
||||
|
||||
with EndpointRestriction.new_session() as db_session:
|
||||
for route in router.routes:
|
||||
route_path = str(getattr(route, "path"))
|
||||
route_summary = str(getattr(route, "name"))
|
||||
operation_id = getattr(route, "operation_id", None)
|
||||
if not operation_id:
|
||||
continue
|
||||
|
||||
for route_method in [
|
||||
method.lower() for method in getattr(route, "methods")
|
||||
]:
|
||||
restriction = EndpointRestriction.find_or_create(
|
||||
endpoint_method=route_method,
|
||||
endpoint_name=route_path,
|
||||
endpoint_desc=route_summary.replace("_", " "),
|
||||
endpoint_function=route_summary,
|
||||
operation_uu_id=operation_id, # UUID of the endpoint
|
||||
is_confirmed=True,
|
||||
db=db_session,
|
||||
)
|
||||
if restriction.meta_data.created:
|
||||
restriction.save(db=db_session)
|
||||
|
||||
def register_routes(self):
|
||||
for router in self.router_list:
|
||||
self.app.include_router(router)
|
||||
self.add_router_with_event_to_database(router)
|
||||
return self.app
|
||||
0
ApiControllers/middlewares/__init__.py
Normal file
0
ApiControllers/middlewares/__init__.py
Normal file
25
ApiControllers/middlewares/token_middleware.py
Normal file
25
ApiControllers/middlewares/token_middleware.py
Normal file
@@ -0,0 +1,25 @@
|
||||
from fastapi import Request, status
|
||||
from fastapi.responses import JSONResponse
|
||||
from ApiDefaults.config import api_config
|
||||
|
||||
from Endpoints.routes import get_safe_endpoint_urls
|
||||
|
||||
|
||||
async def token_middleware(request: Request, call_next):
|
||||
|
||||
base_url = request.url.path
|
||||
safe_endpoints = [_[0] for _ in get_safe_endpoint_urls()]
|
||||
if base_url in safe_endpoints:
|
||||
return await call_next(request)
|
||||
|
||||
token = request.headers.get(api_config.ACCESS_TOKEN_TAG, None)
|
||||
if not token:
|
||||
return JSONResponse(
|
||||
content={
|
||||
"error": "EYS_0002",
|
||||
},
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
)
|
||||
|
||||
response = await call_next(request)
|
||||
return response
|
||||
0
ApiControllers/providers/__init__.py
Normal file
0
ApiControllers/providers/__init__.py
Normal file
206
ApiControllers/providers/token_provider.py
Normal file
206
ApiControllers/providers/token_provider.py
Normal file
@@ -0,0 +1,206 @@
|
||||
import enum
|
||||
|
||||
from typing import Optional, Union, Dict, Any, List
|
||||
from pydantic import BaseModel
|
||||
from Controllers.Redis.database import RedisActions
|
||||
|
||||
|
||||
class UserType(enum.Enum):
|
||||
|
||||
employee = 1
|
||||
occupant = 2
|
||||
|
||||
|
||||
class Credentials(BaseModel):
|
||||
|
||||
person_id: int
|
||||
person_name: str
|
||||
|
||||
|
||||
class ApplicationToken(BaseModel):
|
||||
# Application Token Object -> is the main object for the user
|
||||
|
||||
user_type: int = UserType.occupant.value
|
||||
credential_token: str = ""
|
||||
|
||||
user_uu_id: str
|
||||
user_id: int
|
||||
|
||||
person_id: int
|
||||
person_uu_id: str
|
||||
|
||||
request: Optional[dict] = None # Request Info of Client
|
||||
expires_at: Optional[float] = None # Expiry timestamp
|
||||
|
||||
|
||||
class OccupantToken(BaseModel):
|
||||
|
||||
# Selection of the occupant type for a build part is made by the user
|
||||
|
||||
living_space_id: int # Internal use
|
||||
living_space_uu_id: str # Outer use
|
||||
|
||||
occupant_type_id: int
|
||||
occupant_type_uu_id: str
|
||||
occupant_type: str
|
||||
|
||||
build_id: int
|
||||
build_uuid: str
|
||||
build_part_id: int
|
||||
build_part_uuid: str
|
||||
|
||||
responsible_company_id: Optional[int] = None
|
||||
responsible_company_uuid: Optional[str] = None
|
||||
responsible_employee_id: Optional[int] = None
|
||||
responsible_employee_uuid: Optional[str] = None
|
||||
|
||||
# ID list of reachable event codes as "endpoint_code": ["UUID", "UUID"]
|
||||
reachable_event_codes: Optional[dict[str, str]] = None
|
||||
|
||||
# ID list of reachable applications as "page_url": ["UUID", "UUID"]
|
||||
reachable_app_codes: Optional[dict[str, str]] = None
|
||||
|
||||
|
||||
class CompanyToken(BaseModel):
|
||||
|
||||
# Selection of the company for an employee is made by the user
|
||||
company_id: int
|
||||
company_uu_id: str
|
||||
|
||||
department_id: int # ID list of departments
|
||||
department_uu_id: str # ID list of departments
|
||||
|
||||
duty_id: int
|
||||
duty_uu_id: str
|
||||
|
||||
staff_id: int
|
||||
staff_uu_id: str
|
||||
|
||||
employee_id: int
|
||||
employee_uu_id: str
|
||||
bulk_duties_id: int
|
||||
|
||||
# ID list of reachable event codes as "endpoint_code": ["UUID", "UUID"]
|
||||
reachable_event_codes: Optional[dict[str, str]] = None
|
||||
|
||||
# ID list of reachable applications as "page_url": ["UUID", "UUID"]
|
||||
reachable_app_codes: Optional[dict[str, str]] = None
|
||||
|
||||
|
||||
class OccupantTokenObject(ApplicationToken):
|
||||
# Occupant Token Object -> Requires selection of the occupant type for a specific build part
|
||||
|
||||
available_occupants: dict = None
|
||||
selected_occupant: Optional[OccupantToken] = None # Selected Occupant Type
|
||||
|
||||
@property
|
||||
def is_employee(self) -> bool:
|
||||
return False
|
||||
|
||||
@property
|
||||
def is_occupant(self) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
class EmployeeTokenObject(ApplicationToken):
|
||||
# Full hierarchy Employee[staff_id] -> Staff -> Duty -> Department -> Company
|
||||
|
||||
companies_id_list: List[int] # List of company objects
|
||||
companies_uu_id_list: List[str] # List of company objects
|
||||
|
||||
duty_id_list: List[int] # List of duty objects
|
||||
duty_uu_id_list: List[str] # List of duty objects
|
||||
|
||||
selected_company: Optional[CompanyToken] = None # Selected Company Object
|
||||
|
||||
@property
|
||||
def is_employee(self) -> bool:
|
||||
return True
|
||||
|
||||
@property
|
||||
def is_occupant(self) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
TokenDictType = Union[EmployeeTokenObject, OccupantTokenObject]
|
||||
|
||||
|
||||
class TokenProvider:
|
||||
|
||||
AUTH_TOKEN: str = "AUTH_TOKEN"
|
||||
|
||||
@classmethod
|
||||
def convert_redis_object_to_token(
|
||||
cls, redis_object: Dict[str, Any]
|
||||
) -> TokenDictType:
|
||||
"""
|
||||
Process Redis object and return appropriate token object.
|
||||
"""
|
||||
if redis_object.get("user_type") == UserType.employee.value:
|
||||
return EmployeeTokenObject(**redis_object)
|
||||
elif redis_object.get("user_type") == UserType.occupant.value:
|
||||
return OccupantTokenObject(**redis_object)
|
||||
raise ValueError("Invalid user type")
|
||||
|
||||
@classmethod
|
||||
def get_dict_from_redis(
|
||||
cls, token: Optional[str] = None, user_uu_id: Optional[str] = None
|
||||
) -> Union[TokenDictType, List[TokenDictType]]:
|
||||
"""
|
||||
Retrieve token object from Redis using token and user_uu_id
|
||||
"""
|
||||
token_to_use, user_uu_id_to_use = token or "*", user_uu_id or "*"
|
||||
list_of_token_dict, auth_key_list = [], [
|
||||
cls.AUTH_TOKEN,
|
||||
token_to_use,
|
||||
user_uu_id_to_use,
|
||||
]
|
||||
if token:
|
||||
result = RedisActions.get_json(list_keys=auth_key_list, limit=1)
|
||||
if first_record := result.first:
|
||||
return cls.convert_redis_object_to_token(first_record)
|
||||
elif user_uu_id:
|
||||
result = RedisActions.get_json(list_keys=auth_key_list)
|
||||
if all_records := result.all:
|
||||
for all_record in all_records:
|
||||
list_of_token_dict.append(
|
||||
cls.convert_redis_object_to_token(all_record)
|
||||
)
|
||||
return list_of_token_dict
|
||||
raise ValueError(
|
||||
"Token not found in Redis. Please check the token or user_uu_id."
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def retrieve_application_codes(cls, page_url: str, token: TokenDictType):
|
||||
"""
|
||||
Retrieve application code from the token object or list of token objects.
|
||||
"""
|
||||
if isinstance(token, EmployeeTokenObject):
|
||||
if application_codes := token.selected_company.reachable_app_codes.get(
|
||||
page_url, None
|
||||
):
|
||||
return application_codes
|
||||
elif isinstance(token, OccupantTokenObject):
|
||||
if application_codes := token.selected_occupant.reachable_app_codes.get(
|
||||
page_url, None
|
||||
):
|
||||
return application_codes
|
||||
raise ValueError("Invalid token type or no application code found.")
|
||||
|
||||
@classmethod
|
||||
def retrieve_event_codes(cls, endpoint_code: str, token: TokenDictType) -> str:
|
||||
"""
|
||||
Retrieve event code from the token object or list of token objects.
|
||||
"""
|
||||
if isinstance(token, EmployeeTokenObject):
|
||||
if event_codes := token.selected_company.reachable_event_codes.get(
|
||||
endpoint_code, None
|
||||
):
|
||||
return event_codes
|
||||
elif isinstance(token, OccupantTokenObject):
|
||||
if event_codes := token.selected_occupant.reachable_event_codes.get(
|
||||
endpoint_code, None
|
||||
):
|
||||
return event_codes
|
||||
raise ValueError("Invalid token type or no event code found.")
|
||||
Reference in New Issue
Block a user