production-evyos-systems-an.../ServicesApi/Initializer/event_clusters.py

123 lines
3.9 KiB
Python

from typing import Optional, Type
from pydantic import BaseModel
class EventCluster:
"""
EventCluster
"""
def __repr__(self):
return f"EventCluster(name={self.name})"
def __init__(self, endpoint_uu_id: str, name: str):
self.endpoint_uu_id = endpoint_uu_id
self.name = name
self.events: list["Event"] = []
def add_event(self, event: "Event"):
"""
Add an event to the cluster
"""
if event.key not in [e.key for e in self.events]:
self.events.append(event)
def get_event(self, event_key: str):
"""
Get an event by its key
"""
for event in self.events:
if event.key == event_key:
return event
return None
def set_events_to_database(self):
from Schemas import Events, EndpointRestriction
with Events.new_session() as db_session:
Events.set_session(db_session)
EndpointRestriction.set_session(db_session)
if to_save_endpoint := EndpointRestriction.query.filter(EndpointRestriction.operation_uu_id == self.endpoint_uu_id).first():
print('to_save_endpoint', to_save_endpoint)
for event in self.events:
event_dict_to_save = dict(
function_code=event.key,
function_class=event.name,
description=event.description,
endpoint_code=self.endpoint_uu_id,
endpoint_id=to_save_endpoint.id,
endpoint_uu_id=str(to_save_endpoint.uu_id),
is_confirmed=True,
)
print('set_events_to_database event_dict_to_save', event_dict_to_save)
check_event = Events.query.filter(Events.endpoint_uu_id == event_dict_to_save["endpoint_uu_id"]).first()
if check_event:
check_event.update(**event_dict_to_save)
check_event.save()
else:
event_created = Events.create(**event_dict_to_save)
print(f"UUID: {event_created.uu_id} event is saved to {to_save_endpoint.uu_id}")
event_created.save()
def match_event(self, event_key: str) -> "Event":
"""
Match an event by its key
"""
if event := self.get_event(event_key=event_key):
return event
raise ValueError("Event key not found")
class Event:
def __init__(
self,
name: str,
key: str,
request_validator: Optional[Type[BaseModel]] = None,
response_validator: Optional[Type[BaseModel]] = None,
description: str = "",
):
self.name = name
self.key = key
self.request_validator = request_validator
self.response_validator = response_validator
self.description = description
def event_callable(self):
"""
Example callable method
"""
print(self.name)
return {}
class RouterCluster:
"""
RouterCluster
"""
def __repr__(self):
return f"RouterCluster(name={self.name})"
def __init__(self, name: str):
self.name = name
self.event_clusters: dict[str, EventCluster] = {}
def set_event_cluster(self, event_cluster: EventCluster):
"""
Add an event cluster to the set
"""
print("Setting event cluster:", event_cluster.name)
if event_cluster.name not in self.event_clusters:
self.event_clusters[event_cluster.name] = event_cluster
def get_event_cluster(self, event_cluster_name: str) -> EventCluster:
"""
Get an event cluster by its name
"""
if event_cluster_name not in self.event_clusters:
raise ValueError("Event cluster not found")
return self.event_clusters[event_cluster_name]