163 lines
5.3 KiB
Python
163 lines
5.3 KiB
Python
import typing
|
|
|
|
from databases import (
|
|
Build,
|
|
BuildSites,
|
|
)
|
|
|
|
from api_validations.validations_request import (
|
|
InsertBuildArea,
|
|
UpdateBuildArea,
|
|
ListOptions,
|
|
)
|
|
|
|
from api_validations.core_response import AlchemyJsonResponse
|
|
from api_events.events.abstract_class import MethodToEvent, ActionsSchema
|
|
from api_objects.auth.token_objects import EmployeeTokenObject, OccupantTokenObject
|
|
|
|
|
|
class BuildSitesListEventMethods(MethodToEvent):
|
|
|
|
event_type = "SELECT"
|
|
__event_keys__ = {
|
|
"6798414c-6c7d-47f0-9d8b-6935a0f51c2e": "build_sites_list",
|
|
}
|
|
|
|
@classmethod
|
|
def build_sites_list(
|
|
cls,
|
|
list_options: ListOptions,
|
|
token_dict: typing.Union[EmployeeTokenObject, OccupantTokenObject],
|
|
):
|
|
if isinstance(token_dict, OccupantTokenObject):
|
|
build_ids = Build.filter_all(
|
|
Build.id == token_dict.selected_occupant.build_id,
|
|
*Build.valid_record_args(Build),
|
|
).data
|
|
build_id_list = [build.id for build in build_ids]
|
|
BuildSites.pre_query = BuildSites.filter_all(
|
|
BuildSites.build_id.in_(build_id_list),
|
|
*BuildSites.valid_record_args(BuildSites),
|
|
).query
|
|
elif isinstance(token_dict, EmployeeTokenObject):
|
|
build_ids = Build.select_action(
|
|
employee_id=token_dict.selected_company.employee_id
|
|
)
|
|
build_id_list = [build.id for build in build_ids]
|
|
BuildSites.pre_query = BuildSites.filter_all(
|
|
BuildSites.build_id.in_(build_id_list),
|
|
*BuildSites.valid_record_args(BuildSites),
|
|
).query
|
|
BuildSites.filter_attr = list_options
|
|
records = BuildSites.filter_all(
|
|
*BuildSites.get_smart_query(smart_query=list_options.query),
|
|
*BuildSites.valid_record_args(BuildSites),
|
|
)
|
|
return AlchemyJsonResponse(
|
|
completed=True, message="Update Build record", result=records
|
|
)
|
|
|
|
|
|
class BuildSitesCreateEventMethods(MethodToEvent):
|
|
|
|
event_type = "CREATE"
|
|
__event_keys__ = {
|
|
"57edc8bf-8f29-4e75-b5e1-9ca0139a3fda": "build_sites_create",
|
|
}
|
|
|
|
@classmethod
|
|
def build_area_create(
|
|
cls,
|
|
data: InsertBuildArea,
|
|
token_dict: typing.Union[EmployeeTokenObject, OccupantTokenObject],
|
|
):
|
|
if isinstance(token_dict, OccupantTokenObject):
|
|
if not token_dict.selected_occupant.build_uuid == data.build_uu_id:
|
|
BuildSites.raise_http_exception(
|
|
status_code="HTTP_403_FORBIDDEN",
|
|
error_case="UNAUTHORIZED",
|
|
message=f"Occupant can not create build sites for {data.build_uu_id}",
|
|
data={
|
|
"build_uu_id": data.build_uu_id,
|
|
},
|
|
)
|
|
elif isinstance(token_dict, EmployeeTokenObject):
|
|
build_ids = Build.select_action(
|
|
employee_id=token_dict.selected_company.employee_id
|
|
).all()
|
|
if not str(data.build_uu_id) in [str(build.uu_id) for build in build_ids]:
|
|
BuildSites.raise_http_exception(
|
|
status_code="HTTP_403_FORBIDDEN",
|
|
error_case="UNAUTHORIZED",
|
|
message=f"Employee can not create build sites for {data.build_uu_id}",
|
|
data={
|
|
"build_uu_id": data.build_uu_id,
|
|
},
|
|
)
|
|
data_dict = data.excluded_dump()
|
|
created_build_part = BuildSites.find_or_create(**data_dict)
|
|
created_build_part.save()
|
|
created_build_part.update(is_confirmed=True)
|
|
created_build_part.save()
|
|
return AlchemyJsonResponse(
|
|
completed=True,
|
|
message="Update Build record",
|
|
result=created_build_part,
|
|
)
|
|
|
|
|
|
class BuildSitesUpdateEventMethods(MethodToEvent):
|
|
|
|
event_type = "UPDATE"
|
|
__event_keys__ = {
|
|
"b18e8e37-a62b-4a84-9972-ba17121ed393": "build_sites_update",
|
|
}
|
|
|
|
@classmethod
|
|
def build_area_update(
|
|
cls,
|
|
build_uu_id: str,
|
|
data: UpdateBuildArea,
|
|
token_dict: typing.Union[EmployeeTokenObject, OccupantTokenObject],
|
|
):
|
|
return AlchemyJsonResponse(
|
|
completed=False,
|
|
message="Update Build record",
|
|
result=None,
|
|
)
|
|
|
|
|
|
class BuildSitesPatchEventMethods(MethodToEvent):
|
|
|
|
event_type = "PATCH"
|
|
__event_keys__ = {
|
|
"39ba1d78-ff0d-4ec7-a363-b457cbf199a0": "build_sites_patch",
|
|
}
|
|
|
|
@classmethod
|
|
def build_area_patch(
|
|
cls,
|
|
build_uu_id: str,
|
|
data,
|
|
token_dict: typing.Union[EmployeeTokenObject, OccupantTokenObject],
|
|
):
|
|
return AlchemyJsonResponse(
|
|
completed=False,
|
|
message="Patch Build record",
|
|
result=None,
|
|
)
|
|
|
|
|
|
BuildSitesListEventMethod = BuildSitesListEventMethods(
|
|
action=ActionsSchema(endpoint="/building/sites/list")
|
|
)
|
|
BuildSitesCreateEventMethod = BuildSitesCreateEventMethods(
|
|
action=ActionsSchema(endpoint="/building/sites/create")
|
|
)
|
|
BuildSitesUpdateEventMethod = BuildSitesUpdateEventMethods(
|
|
action=ActionsSchema(endpoint="/building/sites/update")
|
|
)
|
|
BuildSitesPatchEventMethod = BuildSitesPatchEventMethods(
|
|
action=ActionsSchema(endpoint="/building/sites/patch")
|
|
)
|