277 lines
9.4 KiB
Python
277 lines
9.4 KiB
Python
import typing
|
|
from typing import Union
|
|
|
|
from fastapi import status, HTTPException
|
|
from fastapi.responses import JSONResponse
|
|
|
|
from databases import (
|
|
Build,
|
|
RelationshipEmployee2Build,
|
|
Addresses,
|
|
BuildParts,
|
|
BuildTypes,
|
|
ApiEnumDropdown,
|
|
)
|
|
|
|
from api_validations.validations_request import (
|
|
InsertBuild,
|
|
UpdateBuild,
|
|
PatchRecord,
|
|
ListOptions,
|
|
)
|
|
|
|
from api_validations.core_response import AlchemyJsonResponse
|
|
from api_events.events.abstract_class import MethodToEvent, ActionsSchema
|
|
from api_objects.auth.token_objects import EmployeeTokenObject, OccupantTokenObject
|
|
|
|
|
|
class BuildListEventMethods(MethodToEvent):
|
|
|
|
event_type = "SELECT"
|
|
event_description = ""
|
|
event_category = ""
|
|
|
|
__event_keys__ = {
|
|
"68b3b5ed-b74c-4a27-820f-3959214e94e9": "build_list",
|
|
}
|
|
|
|
@classmethod
|
|
def build_list(
|
|
cls,
|
|
list_options: ListOptions,
|
|
token_dict: typing.Union[EmployeeTokenObject, OccupantTokenObject],
|
|
):
|
|
if isinstance(token_dict, OccupantTokenObject):
|
|
Build.pre_query = Build.filter_all(
|
|
Build.id == token_dict.selected_occupant.build_id,
|
|
).query
|
|
elif isinstance(token_dict, EmployeeTokenObject):
|
|
Build.pre_query = Build.select_action(
|
|
employee_id=token_dict.selected_company.employee_id
|
|
)
|
|
Build.filter_attr = list_options
|
|
records = Build.filter_all()
|
|
return AlchemyJsonResponse(
|
|
completed=True,
|
|
message="Building Records are listed",
|
|
result=records,
|
|
)
|
|
|
|
|
|
class BuildCreateEventMethods(MethodToEvent):
|
|
|
|
event_type = "CREATE"
|
|
event_description = ""
|
|
event_category = ""
|
|
__event_keys__ = {
|
|
"a2271854-6b90-43da-a440-a62b70d90528": "build_create",
|
|
"b67ee709-0992-4604-9f90-fb1da10d5cf9": "create_building_employee",
|
|
}
|
|
|
|
@classmethod
|
|
def build_create(cls, data: InsertBuild, token_dict: EmployeeTokenObject):
|
|
if not token_dict.selected_company.employee_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail=f"Employee id is not found for {token_dict.selected_company.employee_uu_id}",
|
|
)
|
|
|
|
if not token_dict.selected_company.company_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail=f"Company id is not found for {token_dict.selected_company.company_uu_id}",
|
|
)
|
|
|
|
created_build = Build.create_action(data=data, token=token_dict)
|
|
build_type = BuildTypes.filter_by_one(
|
|
**BuildTypes.valid_record_dict, type_code="APT_YNT"
|
|
).data
|
|
if not build_type:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Build type APT_YNT is not found. Please contact with your system administrator.",
|
|
)
|
|
api_enum = ApiEnumDropdown.filter_by_one(enum_class="Directions", key="NN").data
|
|
if not api_enum:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Api Enum NN is not found. Please contact with your system administrator.",
|
|
)
|
|
build_parts = dict(
|
|
address_gov_code=f"{data.gov_address_code}-M",
|
|
build_id=int(created_build.id),
|
|
build_uu_id=str(created_build.uu_id),
|
|
part_no="0",
|
|
part_type_id=int(build_type.id),
|
|
part_type_uu_id=str(build_type.uu_id),
|
|
part_direction_id=int(api_enum.id),
|
|
part_direction_uu_id=str(api_enum.uu_id),
|
|
part_code="MAN-ROOM",
|
|
human_livable=False,
|
|
)
|
|
man_build_part = BuildParts.find_or_create(**build_parts)
|
|
created_build.save()
|
|
created_build.update(management_room_id=man_build_part.id)
|
|
created_build.save()
|
|
man_build_part.update(is_confirmed=True)
|
|
man_build_part.save()
|
|
# created_build_relation = RelationshipEmployee2Build.find_or_create(
|
|
# company_id=token_dict.selected_company.company_id,
|
|
# member_id=created_build.id,
|
|
# employee_id=token_dict.selected_company.employee_id,
|
|
# )
|
|
# created_build_relation.update(is_confirmed=True)
|
|
# created_build_relation.save()
|
|
return JSONResponse(
|
|
content={
|
|
"completed": True,
|
|
"message": "Create Build record completed. This build is assigned to you.",
|
|
"data": created_build.get_dict(),
|
|
},
|
|
status_code=status.HTTP_200_OK,
|
|
)
|
|
|
|
@classmethod
|
|
def create_building_employee(
|
|
cls, data: InsertBuild, token_dict: EmployeeTokenObject
|
|
):
|
|
records = Addresses.list_via_employee(
|
|
token_dict=token_dict, filter_expr=[Addresses.uu_id == data.address_uu_id]
|
|
).data
|
|
if not records:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"This address {data.address_uu_id} is not found in the user's address list.",
|
|
)
|
|
|
|
if not token_dict.selected_company.employee_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail=f"Employee id is not found for {token_dict.selected_company.employee_uu_id}",
|
|
)
|
|
|
|
if not token_dict.selected_company.company_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail=f"Company id is not found for {token_dict.selected_company.company_uu_id}",
|
|
)
|
|
|
|
created_build = Build.create_action(data=data, token=token_dict)
|
|
|
|
created_build_relation = RelationshipEmployee2Build.find_or_create(
|
|
company_id=token_dict.selected_company.company_id,
|
|
member_id=created_build.id,
|
|
employee_id=token_dict.selected_company.employee_id,
|
|
)
|
|
created_build.save()
|
|
return JSONResponse(
|
|
content={
|
|
"completed": True,
|
|
"message": "Create Build record completed. This build is assigned to you.",
|
|
"data": created_build.get_dict(),
|
|
},
|
|
status_code=status.HTTP_200_OK,
|
|
)
|
|
|
|
|
|
class BuildUpdateEventMethods(MethodToEvent):
|
|
|
|
event_type = "UPDATE"
|
|
event_description = ""
|
|
event_category = ""
|
|
|
|
__class_key__ = ""
|
|
__event_keys__ = {
|
|
"5ad38a66-1189-451e-babb-77de2d63d757": "build_update",
|
|
}
|
|
|
|
@classmethod
|
|
def build_update(
|
|
cls,
|
|
build_uu_id: str,
|
|
data: UpdateBuild,
|
|
token_dict: Union[EmployeeTokenObject, OccupantTokenObject],
|
|
):
|
|
Build.pre_query = Build.select_action(
|
|
employee_id=token_dict.selected_company.employee_id
|
|
)
|
|
if Build.filter_all(
|
|
Build.person_id == token_dict.person_id,
|
|
).data:
|
|
Build.pre_query = None
|
|
updated_build = Build.update_action(
|
|
data=data, token=token_dict, build_uu_id=build_uu_id
|
|
)
|
|
Build.save()
|
|
return JSONResponse(
|
|
content={
|
|
"completed": True,
|
|
"message": "Update Build record",
|
|
"data": updated_build,
|
|
},
|
|
status_code=status.HTTP_200_OK,
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail=f"This user can not modify {build_uu_id} - building.",
|
|
)
|
|
|
|
|
|
class BuildPatchEventMethods(MethodToEvent):
|
|
|
|
event_type = "PATCH"
|
|
event_description = ""
|
|
event_category = ""
|
|
|
|
__event_keys__ = {
|
|
"e3876bfe-8847-4dea-ae36-e709f7431930": "build_patch",
|
|
}
|
|
|
|
@classmethod
|
|
def build_patch(cls, build_uu_id: str, data: PatchRecord, token_dict):
|
|
find_one_build = Build.filter_one(
|
|
Build.uu_id == build_uu_id,
|
|
)
|
|
access_authorized_build = Build.select_action(
|
|
employee_id=token_dict.selected_company.employee_id,
|
|
filter_expr=[Build.id == find_one_build.id],
|
|
)
|
|
if access_authorized_build.count:
|
|
action = data.excluded_dump()
|
|
find_one_build.active = bool(action.get("active", find_one_build.active))
|
|
find_one_build.is_confirmed = bool(
|
|
action.get("confirm", find_one_build.is_confirmed)
|
|
)
|
|
find_one_build.deleted = bool(action.get("delete", find_one_build.deleted))
|
|
find_one_build.save()
|
|
return JSONResponse(
|
|
content={
|
|
"completed": True,
|
|
"message": "Patch Build record completed",
|
|
"data": find_one_build.get_dict(),
|
|
},
|
|
status_code=status.HTTP_200_OK,
|
|
)
|
|
return JSONResponse(
|
|
content={
|
|
"completed": False,
|
|
"message": "Patch Build record failed",
|
|
"data": {},
|
|
},
|
|
status_code=status.HTTP_200_OK,
|
|
)
|
|
|
|
|
|
BuildListEventMethod = BuildListEventMethods(
|
|
action=ActionsSchema(endpoint="/building/build/list")
|
|
)
|
|
BuildCreateEventMethod = BuildCreateEventMethods(
|
|
action=ActionsSchema(endpoint="/building/build/create")
|
|
)
|
|
BuildUpdateEventMethod = BuildUpdateEventMethods(
|
|
action=ActionsSchema(endpoint="/building/build/update")
|
|
)
|
|
BuildPatchEventMethod = BuildPatchEventMethods(
|
|
action=ActionsSchema(endpoint="/building/build/patch")
|
|
)
|