from fastapi import status, HTTPException from fastapi.responses import JSONResponse from api_library.date_time_actions.date_functions import system_arrow from api_validations.validations_request import ( InsertEmployees, BindEmployees2People, PatchRecord, ListOptions, ) from databases import Employees, Staff, People, EmployeeHistory from api_events.events.abstract_class import MethodToEvent, ActionsSchema from api_objects.auth.token_objects import EmployeeTokenObject, OccupantTokenObject from api_validations.core_response import AlchemyJsonResponse class EmployeeListEventMethods(MethodToEvent): event_type = "SELECT" __event_keys__ = { "cb677c92-6b05-4122-af5c-12766fae8095": "employee_list", } @classmethod def employee_list( cls, list_options: ListOptions, token_dict: EmployeeTokenObject, ): employees_staff = Staff.filter_all( Staff.duties_id.in_(token_dict.duty_id_list), ).data Employees.filter_attr = list_options records = Employees.filter_all( Employees.staff_id.in_([staff.id for staff in employees_staff]), ) return AlchemyJsonResponse( completed=True, message="Employee are listed successfully", result=records, ) class EmployeeCreateEventMethods(MethodToEvent): event_type = "CREATE" __event_keys__ = { "1e1632c3-bb0e-46a5-8e45-da3f6d88ac43": "employee_create", } @classmethod def employee_create( cls, data: InsertEmployees, token_dict: EmployeeTokenObject, ): person = People.filter_one( People.uu_id == data.people_uu_id, ).data staff = Staff.filter_one( Staff.uu_id == data.staff_uu_id, ).data if not staff: return JSONResponse( content={ "completed": False, "message": "Staff record not found", "data": {}, }, status_code=status.HTTP_404_NOT_FOUND, ) created_employee = Employees.find_or_create( staff_id=staff.id, staff_uu_id=str(staff.uu_id), people_id=person.id if person else None, people_uu_id=str(person.uu_id) if person else None, ) Employees.save() return JSONResponse( content={ "completed": True, "message": "Create Employee record", "data": created_employee.get_dict(), }, status_code=status.HTTP_200_OK, ) class EmployeeUpdateEventMethods(MethodToEvent): event_type = "UPDATE" __event_keys__ = { "9015a076-d78c-463d-9474-ea343a125fb8": "employee_update", } @classmethod def employee_update( cls, employee_uu_id: str, data: PatchRecord, token_dict: EmployeeTokenObject, ): find_one_employee = Employees.filter_one( Employees.uu_id == employee_uu_id, ).data access_authorized_employee = Employees.select_action( employee_id=getattr(token_dict, "employee_id", 5), filter_expr=[Employees.id == token_dict.get("")], ) if access_authorized_employee.count: data_dict = data.excluded_dump() updated_employee = find_one_employee.update(**data_dict) Employees.save() return JSONResponse( content={ "completed": True, "message": "Update Employee record", "data": updated_employee, }, status_code=status.HTTP_200_OK, ) return JSONResponse( content={ "completed": False, "message": "Update Employee record", "data": {}, }, status_code=status.HTTP_200_OK, ) class EmployeePatchEventMethods(MethodToEvent): event_type = "PATCH" __event_keys__ = { "8446ce0b-9310-4b9f-93e2-61f56a9dacd1": "employee_patch", } @classmethod def employee_patch( cls, employee_uu_id: str, data: PatchRecord, token_dict: EmployeeTokenObject, ): find_one_employee = Employees.find_one_or_abort(uu_id=employee_uu_id) access_authorized_employee = Employees.select_action( employee_id=getattr(token_dict, "employee_id", 5), filter_expr=[Employees.id == find_one_employee.id], ) if access_authorized_employee.count: action = data.excluded_dump() find_one_employee.active = bool( action.get("active", find_one_employee.active) ) find_one_employee.is_confirmed = bool( action.get("confirm", find_one_employee.is_confirmed) ) find_one_employee.deleted = bool( action.get("delete", find_one_employee.deleted) ) return JSONResponse( content={ "completed": True, "message": "Patch Employee record completed", "data": find_one_employee.get_dict(), }, status_code=status.HTTP_200_OK, ) return JSONResponse( content={ "completed": False, "message": "Patch Employee record failed", "data": {}, }, status_code=status.HTTP_200_OK, ) class Employee2PeopleEmployEventMethods(MethodToEvent): event_type = "UPDATE" __event_keys__ = { "5eb04057-7a74-4555-b2c6-14eda32dae89": "company_employee_employ", } @classmethod def company_employee_employ( cls, data: BindEmployees2People, token_dict: EmployeeTokenObject, ): selected_staff = Staff.filter_one( Staff.uu_id == data.staff_uu_id, ).data selected_people = People.filter_one(People.uu_id == data.people_uu_id).data if not selected_staff: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Staff record not found", ) if not selected_people: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="People record not found", ) find_one_employee = Employees.filter_all( Employees.staff_id == selected_staff.id, ).data staff_name_upper = str(selected_staff.staff_name).upper() if not find_one_employee: return JSONResponse( content={ "completed": False, "message": f"There is no space for new Employee for given staff : {staff_name_upper}", "data": {}, }, status_code=status.HTTP_406_NOT_ACCEPTABLE, ) if not data.expiry_starts: data.expiry_starts = str(system_arrow.now()) data.expiry_starts = str(system_arrow.get(str(data.expiry_starts))) find_one_employee = find_one_employee.update( people_id=selected_people.id, expiry_starts=data.expiry_starts, **token_dict.update_creds, ) Employees.save() return JSONResponse( content={ "completed": True, "message": "Get Employee to People record", "data": find_one_employee.get_dict(), }, status_code=status.HTTP_200_OK, ) class Employee2PeopleFireEventMethods(MethodToEvent): event_type = "UPDATE" __event_keys__ = { "caf914fa-0899-4b0b-a85a-3d40fdaa06a5": "company_employee_fire", } @classmethod def company_employee_fire( cls, data: BindEmployees2People, token_dict: EmployeeTokenObject, ): selected_people = People.filter_one( People.uu_id == data.people_uu_id, ).data if not selected_people: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="People record not found", ) find_one_employee: Employees = Employees.filter_one( Employees.people_id == selected_people.id, ).data if not find_one_employee: return JSONResponse( content={ "completed": False, "message": "Employee record not found for given People", "data": {}, }, status_code=status.HTTP_404_NOT_FOUND, ) find_one_employee = find_one_employee.update( people_id=None, **token_dict.update_creds ) if not find_one_employee.people_id: employee_history = EmployeeHistory.find_or_create( staff_id=find_one_employee.staff_id, expiry_ends=data.expiry_ends, people_id=selected_people.id, created_by_id=find_one_employee.created_by_id, updated_by_id=find_one_employee.updated_by_id, confirmed_by_id=find_one_employee.confirmed_by_id, replication_id=find_one_employee.replication_id, created_by=find_one_employee.created_by, updated_by=find_one_employee.updated_by, confirmed_by=find_one_employee.confirmed_by, active=find_one_employee.active, is_confirmed=find_one_employee.is_confirmed, deleted=find_one_employee.deleted, expiry_starts=find_one_employee.expiry_starts, is_notification_send=find_one_employee.is_notification_send, cryp_uu_id=find_one_employee.cryp_uu_id, ) Employees.save() return JSONResponse( content={ "completed": True, "message": "Employee is fired from Staff", "data": find_one_employee.get_dict(), }, status_code=status.HTTP_200_OK, ) raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Employee record not found for given People", ) EmployeeListEventMethod = EmployeeListEventMethods( action=ActionsSchema(endpoint="/employee/list") ) EmployeeCreateEventMethod = EmployeeCreateEventMethods( action=ActionsSchema(endpoint="/employee/create") ) EmployeeUpdateEventMethod = EmployeeUpdateEventMethods( action=ActionsSchema(endpoint="/employee/update") ) EmployeePatchEventMethod = EmployeePatchEventMethods( action=ActionsSchema(endpoint="/employee/patch") ) Employee2PeopleEmployEventMethod = Employee2PeopleEmployEventMethods( action=ActionsSchema(endpoint="/employee/employ") ) Employee2PeopleFireEventMethod = Employee2PeopleFireEventMethods( action=ActionsSchema(endpoint="/employee/fire") )