from typing import Union from fastapi.responses import JSONResponse from fastapi import status from databases import ( Build, BuildParts, ) from api_events.events.abstract_class import MethodToEvent, ActionsSchema from api_objects.auth.token_objects import EmployeeTokenObject, OccupantTokenObject from api_validations.core_response import AlchemyJsonResponse from api_validations.validations_request import ( InsertBuildParts, ListOptions, ) class BuildingBuildPartsListEventMethods(MethodToEvent): event_type = "SELECT" __event_keys__ = { "b860e37a-e19b-4c45-9543-461241f7587c": "building_build_parts_list" } @classmethod def building_build_parts_list( cls, list_options: ListOptions, token_dict: Union[EmployeeTokenObject, OccupantTokenObject], ): build_list_query = Build.select_action( employee_id=token_dict.selected_company.employee_id, ) build_list_ids = [build.id for build in build_list_query.all()] BuildParts.pre_query = BuildParts.filter_all( BuildParts.build_id.in_(build_list_ids), *BuildParts.valid_record_args(BuildParts), ).query records = BuildParts.filter_active( *BuildParts.get_smart_query(list_options.query), *BuildParts.valid_record_args(BuildParts), ) return AlchemyJsonResponse( completed=True, message="Building Parts Records are listed", result=records, ) class BuildingBuildPartsCreateEventMethods(MethodToEvent): event_type = "CREATE" __event_keys__ = { "fb403f69-11ed-4f4f-ad71-5e6fb4a793d2": "building_build_parts_create" } @classmethod def building_build_parts_create( cls, data: InsertBuildParts, token_dict: Union[EmployeeTokenObject, OccupantTokenObject], ): created_build = BuildParts.create_action(data=data, token=token_dict) created_build.save() return JSONResponse( content={ "completed": True, "message": "Create Build Parts record", "data": created_build.get_dict(), }, status_code=status.HTTP_200_OK, ) class BuildingBuildPartsUpdateEventMethods(MethodToEvent): event_type = "UPDATE" __event_keys__ = { "58fdf95e-2110-4ed6-9c26-95f4be87eaee": "building_build_parts_update" } @classmethod def building_build_parts_update( cls, data: InsertBuildParts, token_dict: Union[EmployeeTokenObject, OccupantTokenObject], ): updated_build = BuildParts.update_action(data=data, token=token_dict) updated_build.save() return JSONResponse( content={ "completed": True, "message": "Update Build Parts record", "data": updated_build, }, status_code=status.HTTP_200_OK, ) class BuildingBuildPartsPatchEventMethods(MethodToEvent): event_type = "PATCH" __event_keys__ = { "87a15ade-3474-4206-b574-bbf8580cbb14": "building_build_parts_patch" } @classmethod def building_build_parts_patch(cls, data, token_dict): find_one_build = BuildParts.filter_one( BuildParts.uu_id == data.uu_id, *BuildParts.valid_record_args(BuildParts), ).data access_authorized_build = BuildParts.select_action( duty_id=token_dict.selected_company.duty_id, filter_expr=[BuildParts.id == find_one_build.id], ) if access_authorized_build.count: action = data.excluded_dump() find_one_build.active = bool(action.get("active", find_one_build.active)) find_one_build.is_confirmed = bool( action.get("confirm", find_one_build.is_confirmed) ) find_one_build.deleted = bool(action.get("delete", find_one_build.deleted)) find_one_build.save() return JSONResponse( content={ "completed": True, "message": "Update Build Parts record", "data": find_one_build.get_dict(), }, status_code=status.HTTP_200_OK, ) return JSONResponse( content={ "completed": False, "message": "Update Build Parts record", "data": {}, }, status_code=status.HTTP_200_OK, ) BuildingBuildPartsListEventMethod = BuildingBuildPartsListEventMethods( action=ActionsSchema(endpoint="/building/parts/list") ) BuildingBuildPartsCreateEventMethod = BuildingBuildPartsCreateEventMethods( action=ActionsSchema(endpoint="/building/parts/create") ) BuildingBuildPartsUpdateEventMethod = BuildingBuildPartsUpdateEventMethods( action=ActionsSchema(endpoint="/building/parts/update") ) BuildingBuildPartsPatchEventMethod = BuildingBuildPartsPatchEventMethods( action=ActionsSchema(endpoint="/building/parts/patch") )