first commit

This commit is contained in:
2024-11-07 17:44:29 +03:00
commit 643d6d8f65
247 changed files with 420800 additions and 0 deletions

View File

View File

@@ -0,0 +1,171 @@
import typing
from fastapi import status
from fastapi.responses import JSONResponse
from databases import Companies
from api_validations import (
InsertCompany,
UpdateCompany,
ListOptions,
PatchRecord,
)
from api_events.events.abstract_class import MethodToEvent, ActionsSchema
from api_objects.auth.token_objects import EmployeeTokenObject, OccupantTokenObject
from api_validations.core_response import return_json_response_from_alchemy
class CompanyListEventMethods(MethodToEvent):
event_type = "SELECT"
__event_keys__ = {
"f6900cb5-ac5b-478e-8e7c-fa87e65cd2e5": "company_list",
}
@classmethod
def company_list(
cls,
list_options: ListOptions,
token_dict: typing.Union[EmployeeTokenObject, OccupantTokenObject],
):
if isinstance(token_dict, EmployeeTokenObject):
Companies.pre_query = Companies.select_action(
duty_id_list=[
token_dict.selected_company.duty_id,
token_dict.selected_company.bulk_duties_id,
]
)
elif isinstance(token_dict, OccupantTokenObject):
Companies.pre_query = Companies.filter_active(
Companies.id == token_dict.selected_occupant.responsible_company_id
).query
Companies.filter_attr = list_options
records = Companies.filter_active(
*Companies.get_smart_query(list_options.query)
)
return return_json_response_from_alchemy(
response=records, pagination=list_options
)
class CompanyCreateEventMethods(MethodToEvent):
event_type = "CREATE"
__event_keys__ = {
"76f11a08-5f4a-4e1f-961f-aaef21699acd": "company_create",
}
@classmethod
def company_create(cls, data: InsertCompany, token_dict):
created_company = Companies.create_action(
data=data, token=token_dict.companies_list
)
created_company.related_company = token_dict.get("company_uu_id")
return JSONResponse(
content={
"completed": True,
"message": "Create Company record",
"data": created_company.get_dict(),
"password_token": {
"password_token": created_company.password_token,
"password_expires_day": str(created_company.password_expires_day),
"password_expiry_begins": str(
created_company.password_expiry_begins
),
"hash_password": created_company.hash_password,
"related_company": created_company.related_company,
},
},
status_code=status.HTTP_200_OK,
)
class CompanyUpdateEventMethods(MethodToEvent):
event_type = "UPDATE"
__event_keys__ = {
"41ea7f29-006a-4310-b5c4-b2a0e1a504bd": "company_update",
}
@classmethod
def company_update(cls, company_uu_id: str, data: UpdateCompany, token_dict):
find_one_company = Companies.find_one_or_abort(uu_id=company_uu_id)
access_authorized_company = Companies.select_action(
duty_id=getattr(token_dict, "duty_id", 5),
filter_expr=[Companies.id == token_dict.get("")],
)
if access_authorized_company.count:
data_dict = data.excluded_dump()
updated_company = find_one_company.update(**data_dict)
return JSONResponse(
content={
"completed": True,
"message": "Update Company record",
"data": updated_company,
},
status_code=status.HTTP_200_OK,
)
return JSONResponse(
content={"completed": True, "message": "Update Company record", "data": {}},
status_code=status.HTTP_200_OK,
)
class CompanyPatchEventMethods(MethodToEvent):
event_type = "PATCH"
__event_keys__ = {
"6320d696-1fd1-49f9-860a-8f22e5b8a68d": "company_patch",
}
@classmethod
def company_patch(cls, company_uu_id: str, data: PatchRecord, token_dict):
find_one_company = Companies.find_one_or_abort(uu_id=company_uu_id)
access_authorized_company = Companies.select_action(
duty_id=getattr(token_dict, "duty_id", 5),
filter_expr=[Companies.id == find_one_company.id],
)
if access_authorized_company.count:
action = data.excluded_dump()
find_one_company.active = bool(
action.get("active", find_one_company.active)
)
find_one_company.is_confirmed = bool(
action.get("confirm", find_one_company.is_confirmed)
)
find_one_company.deleted = bool(
action.get("delete", find_one_company.deleted)
)
find_one_company.save()
return JSONResponse(
content={
"completed": True,
"message": "Patch Company record completed",
"data": find_one_company.get_dict(),
},
status_code=status.HTTP_200_OK,
)
return JSONResponse(
content={
"completed": False,
"message": "Patch Company record failed",
"data": {},
},
status_code=status.HTTP_200_OK,
)
CompanyPatchEventMethod = CompanyListEventMethods(
action=ActionsSchema(endpoint="/company/list")
)
CompanyCreateEventMethod = CompanyCreateEventMethods(
action=ActionsSchema(endpoint="/company/create")
)
CompanyUpdateEventMethod = CompanyUpdateEventMethods(
action=ActionsSchema(endpoint="/company/update")
)
CompanyListEventMethod = CompanyPatchEventMethods(
action=ActionsSchema(endpoint="/company/patch")
)

View File

@@ -0,0 +1,167 @@
from typing import Optional
from fastapi import status
from fastapi.responses import JSONResponse
from validations import (
DepartmentsPydantic,
PatchRecord,
ListOptions,
BaseModelRegular,
)
from databases import Departments
from api_events.events.abstract_class import MethodToEvent, ActionsSchema
from api_objects.auth.token_objects import EmployeeTokenObject, OccupantTokenObject
from api_validations.core_response import return_json_response_from_alchemy
class DepartmentListEventMethods(MethodToEvent):
event_type = "SELECT"
__event_keys__ = {
"2cb90331-c1b4-4923-8314-8111326b621a": "department_list",
}
@classmethod
def department_list(cls, list_options: ListOptions, token_dict):
Departments.filter_attr = list_options
records = Departments.filter_active(
*Departments.get_smart_query(smart_query=list_options.query),
Departments.company_id == token_dict.selected_company.company_id
)
return return_json_response_from_alchemy(
response=records, pagination=list_options
)
class AdminUserInsertDepartments(BaseModelRegular):
department_code: str
department_name: str
company_uu_id: Optional[str] = None
department_description: str
parent_department_uu_id: Optional[int] = None
class DepartmentCreateEventMethods(MethodToEvent):
event_type = "CREATE"
__event_keys__ = {
"d8bd3985-7f3b-4267-a74e-d5017e4ea9f8": "super_user_department_create",
}
@classmethod
def super_user_department_create(cls, data: DepartmentsPydantic, token_dict):
data_dict = data.excluded_dump()
data_dict["company_id"] = token_dict.selected_company.company_id
data_dict["company_uu_id"] = token_dict.selected_company.company_uu_id
created_department = Departments.find_or_create(**data_dict)
if created_department.is_found:
return JSONResponse(
content={
"completed": False,
"message": "Company record already exits here is the record",
"data": created_department.get_dict(),
},
status_code=status.HTTP_202_ACCEPTED,
)
return JSONResponse(
content={
"completed": True,
"message": "Create Company record",
"data": created_department.get_dict(),
},
status_code=status.HTTP_200_OK,
)
class DepartmentUpdateEventMethods(MethodToEvent):
event_type = "UPDATE"
__event_keys__ = {
"4172706f-06c9-4c38-9ac8-59085a72f80a": "department_update",
}
@classmethod
def department_update(
cls, company_uu_id: str, data: DepartmentsPydantic, token_dict
):
find_one_company = Departments.find_one_or_abort(uu_id=company_uu_id)
access_authorized_company = Departments.select_action(
duty_id=getattr(token_dict, "duty_id", 5),
filter_expr=[Departments.id == token_dict.get("")],
)
if access_authorized_company.count:
data_dict = data.excluded_dump()
updated_company = find_one_company.update(**data_dict)
return JSONResponse(
content={
"completed": True,
"message": "Update Company record",
"data": updated_company,
},
status_code=status.HTTP_200_OK,
)
return JSONResponse(
content={"completed": True, "message": "Update Company record", "data": {}},
status_code=status.HTTP_200_OK,
)
class DepartmentPatchEventMethods(MethodToEvent):
event_type = "PATCH"
__event_keys__ = {
"1e272e4f-6c1e-418b-91a7-be8b06c875da": "department_patch",
}
@classmethod
def department_patch(cls, company_uu_id: str, data: PatchRecord, token_dict):
find_one_company = Departments.find_one_or_abort(uu_id=company_uu_id)
access_authorized_company = Departments.select_action(
duty_id=getattr(token_dict, "duty_id", 5),
filter_expr=[Departments.id == find_one_company.id],
)
if access_authorized_company.count:
action = data.excluded_dump()
find_one_company.active = bool(
action.get("active", find_one_company.active)
)
find_one_company.is_confirmed = bool(
action.get("confirm", find_one_company.is_confirmed)
)
find_one_company.deleted = bool(
action.get("delete", find_one_company.deleted)
)
find_one_company.save()
return JSONResponse(
content={
"completed": True,
"message": "Patch Company record completed",
"data": find_one_company.get_dict(),
},
status_code=status.HTTP_200_OK,
)
return JSONResponse(
content={
"completed": False,
"message": "Patch Company record failed",
"data": {},
},
status_code=status.HTTP_200_OK,
)
DepartmentListEventMethod = DepartmentListEventMethods(
action=ActionsSchema(endpoint="/department/list")
)
DepartmentCreateEventMethod = DepartmentCreateEventMethods(
action=ActionsSchema(endpoint="/department/create")
)
DepartmentUpdateEventMethod = DepartmentUpdateEventMethods(
action=ActionsSchema(endpoint="/department/update")
)
DepartmentPatchEventMethod = DepartmentPatchEventMethods(
action=ActionsSchema(endpoint="/department/patch")
)

View File

@@ -0,0 +1,199 @@
from fastapi import status
from fastapi.responses import JSONResponse
from validations import InsertDuties, UpdateDuties, SelectDuties
from validations.root_validates import PatchRecord, ListOptions
from databases import Departments, Duty, Duties
from api_events.events.abstract_class import MethodToEvent, ActionsSchema
from api_objects.auth.token_objects import EmployeeTokenObject, OccupantTokenObject
class DutiesListEventMethods(MethodToEvent):
event_type = "SELECT"
__event_keys__ = {
"44b72beb-53a8-407b-a12a-76e74b65794d": "duties_list",
}
@classmethod
def duties_list(cls, list_options: ListOptions, token_dict):
Duties.filter_attr = list_options
records = Duties.filter_active(
*Duties.get_smart_query(smart_query=list_options.query),
Duties.company_id == token_dict.selected_company.company_id
)
return {
"completed": True if records.count else False,
"status": "success",
"data": records.get_dict(),
}
class DutiesGetByUUIDEventMethods(MethodToEvent):
event_type = "GET"
__event_keys__ = {
"30c54cce-3303-4d36-959a-b64e383ae177": "duties_get_by_uuid",
}
@classmethod
def duties_get_by_uuid(cls, data: SelectDuties, token_dict):
duty = Duty.find_one(uu_id=data.duty_uu_id)
if not duty:
return JSONResponse(
content={
"completed": False,
"message": "Duty record is not found",
"data": {},
},
status_code=status.HTTP_404_NOT_FOUND,
)
records = Duties.filter_active(
Duties.duties_id == duty.id,
Duties.company_id == token_dict.selected_company.company_id,
)
if not records.data:
return JSONResponse(
content={
"completed": False,
"message": "Duties record is not found",
"data": {},
},
status_code=status.HTTP_404_NOT_FOUND,
)
return {
"completed": True,
"status": "success",
"data": [record.get_dict() for record in records.data],
}
class DutiesCreateEventMethods(MethodToEvent):
event_type = "CREATE"
__event_keys__ = {
"3524ae42-0825-4af7-be85-7c890a4f65d3": "duties_create",
}
@classmethod
def duties_create(cls, data: InsertDuties, token_dict):
duty = Duty.find_one(uu_id=data.duties_uu_id)
department = Departments.find_one(uu_id=data.department_uu_id)
created_duties = Duties.find_or_create(
company_id=token_dict.selected_company.company_id,
company_uu_id=token_dict.selected_company.company_uu_id,
duties_id=duty.id,
duties_uu_id=str(duty.uu_id),
department_id=department.id,
department_uu_id=str(department.uu_id),
is_confirmed=data.is_confirmed,
)
if data.is_default_duty:
created_duties.update(users_default_duty=created_duties.id)
if not created_duties:
return JSONResponse(
content={
"completed": False,
"message": "Failed to create Duties record",
"data": {},
},
status_code=status.HTTP_400_BAD_REQUEST,
)
return {
"completed": created_duties.is_found,
"message": (
"Create Duties record"
if created_duties.is_found
else "This record is already created"
),
"data": created_duties.get_dict(),
}
class DutiesUpdateEventMethods(MethodToEvent):
event_type = "UPDATE"
__event_keys__ = {
"3fc77829-f1ee-4511-a2ca-582daa03125b": "duties_update",
}
@classmethod
def duties_update(cls, duties_uu_id: str, data: UpdateDuties, token_dict):
find_one_duties = Duties.find_one_or_abort(uu_id=duties_uu_id)
access_authorized_duties = Duties.select_action(
duty_id=getattr(token_dict, "duty_id", 5),
filter_expr=[Duties.id == find_one_duties.id],
)
if access_authorized_duties.count:
data_dict = data.excluded_dump()
updated_duties = find_one_duties.update(**data_dict)
return {
"completed": True,
"message": "Update Duties record",
"data": updated_duties,
}
return {
"completed": False,
"message": "Update Duties record failed",
"data": {},
}
class DutiesPatchEventMethods(MethodToEvent):
event_type = "PATCH"
__event_keys__ = {
"ca81c6d1-975a-4288-a27b-1069aea84afe": "duties_patch",
}
@classmethod
def duties_patch(cls, duties_uu_id: str, data: PatchRecord, token_dict):
find_one_duties = Duties.find_one_or_abort(uu_id=duties_uu_id)
access_authorized_duties = Duties.select_action(
duty_id=getattr(token_dict, "duty_id", 5),
filter_expr=[Duties.id == find_one_duties.id],
)
if access_authorized_duties.count:
action = data.excluded_dump()
find_one_duties.active = bool(action.get("active", find_one_duties.active))
find_one_duties.is_confirmed = bool(
action.get("confirm", find_one_duties.is_confirmed)
)
find_one_duties.deleted = bool(
action.get("delete", find_one_duties.deleted)
)
find_one_duties.save()
return {
"completed": True,
"message": "Patch Duties record completed",
"data": find_one_duties.get_dict(),
}
return {
"completed": False,
"message": "Patch Duties record failed",
"data": {},
}
DutiesListEventMethod = DutiesListEventMethods(
action=ActionsSchema(endpoint="/duties/list")
)
DutiesGetByUUIDEventMethod = DutiesGetByUUIDEventMethods(
action=ActionsSchema(endpoint="/duties/get_by_duty_uuid")
)
DutiesCreateEventMethod = DutiesCreateEventMethods(
action=ActionsSchema(endpoint="/duties/create")
)
DutiesUpdateEventMethod = DutiesUpdateEventMethods(
action=ActionsSchema(endpoint="/duties/update")
)
DutiesPatchEventMethod = DutiesPatchEventMethods(
action=ActionsSchema(endpoint="/duties/patch")
)

View File

@@ -0,0 +1,135 @@
from fastapi import status
from fastapi.responses import JSONResponse
from validations import InsertCompanyDuty
from validations.root_validates import PatchRecord, ListOptions
from databases import Duty
from api_events.events.abstract_class import MethodToEvent, ActionsSchema
from api_objects.auth.token_objects import EmployeeTokenObject, OccupantTokenObject
from api_validations.core_response import return_json_response_from_alchemy
class DutyListEventMethods(MethodToEvent):
event_type = "SELECT"
__event_keys__ = {
"23231c7d-4ff2-4b39-b71b-ea350d31fadf": "duty_list",
}
@classmethod
def duty_list(cls, list_options: ListOptions, token_dict):
records = Duty.filter_active(
*Duty.get_smart_query(list_options.query),
)
return return_json_response_from_alchemy(
response=records, pagination=list_options
)
class DutyCreateEventMethods(MethodToEvent):
event_type = "CREATE"
__event_keys__ = {
"c6ea200e-fa17-4393-b390-37f5337c9c65": "duty_create",
}
@classmethod
def duty_create(cls, data: InsertCompanyDuty, token_dict):
created_duty = Duty.find_or_create(**data.excluded_dump())
return JSONResponse(
content={
"completed": True,
"message": "Create Company record",
"data": created_duty.get_dict(),
},
status_code=status.HTTP_200_OK,
)
class DutyUpdateEventMethods(MethodToEvent):
event_type = "UPDATE"
__event_keys__ = {
"ad952647-bcf8-482d-9e05-b2ee8086483f": "duty_update",
}
@classmethod
def duty_update(cls, company_uu_id: str, data, token_dict):
find_one_company = Duty.find_one_or_abort(uu_id=company_uu_id)
access_authorized_company = Duty.select_action(
duty_id=getattr(token_dict, "duty_id", 5), # ?
filter_expr=[Duty.id == token_dict.get("")],
)
if access_authorized_company.count:
data_dict = data.excluded_dump()
updated_company = find_one_company.update(**data_dict)
return JSONResponse(
content={
"completed": True,
"message": "Update Company record",
"data": updated_company,
},
status_code=status.HTTP_200_OK,
)
return JSONResponse(
content={"completed": True, "message": "Update Company record", "data": {}},
status_code=status.HTTP_200_OK,
)
class DutyPatchEventMethods(MethodToEvent):
event_type = "PATCH"
__event_keys__ = {
"d5c7b5c4-7b4e-4d5b-8e3b-2b9c5f5d0c0b": "duty_patch",
}
@classmethod
def duty_patch(cls, company_uu_id: str, data: PatchRecord, token_dict):
find_one_company = Duty.find_one_or_abort(uu_id=company_uu_id)
access_authorized_company = Duty.select_action(
duty_id=getattr(token_dict, "duty_id", 5),
filter_expr=[Duty.id == find_one_company.id],
)
if access_authorized_company.count:
action = data.excluded_dump()
find_one_company.active = bool(
action.get("active", find_one_company.active)
)
find_one_company.is_confirmed = bool(
action.get("confirm", find_one_company.is_confirmed)
)
find_one_company.deleted = bool(
action.get("delete", find_one_company.deleted)
)
find_one_company.save()
return JSONResponse(
content={
"completed": True,
"message": "Patch Company record completed",
"data": find_one_company.get_dict(),
},
status_code=status.HTTP_200_OK,
)
return JSONResponse(
content={
"completed": False,
"message": "Patch Company record failed",
"data": {},
},
status_code=status.HTTP_200_OK,
)
DutyListEventMethod = DutyListEventMethods(action=ActionsSchema(endpoint="/duty/list"))
DutyCreateEventMethod = DutyCreateEventMethods(
action=ActionsSchema(endpoint="/duty/create")
)
DutyUpdateEventMethod = DutyUpdateEventMethods(
action=ActionsSchema(endpoint="/duty/update")
)
DutyPatchEventMethod = DutyPatchEventMethods(
action=ActionsSchema(endpoint="/duty/patch")
)

View File

@@ -0,0 +1,291 @@
from datetime import datetime
from fastapi import status, HTTPException
from fastapi.responses import JSONResponse
from validations import (
InsertEmployees,
BindEmployees2People,
PatchRecord,
ListOptions,
)
from databases import Employees, Staff, People, EmployeeHistory
from api_events.events.abstract_class import MethodToEvent, ActionsSchema
from api_objects.auth.token_objects import EmployeeTokenObject, OccupantTokenObject
from api_validations.core_response import return_json_response_from_alchemy
class EmployeeListEventMethods(MethodToEvent):
event_type = "SELECT"
__event_keys__ = {
"cb677c92-6b05-4122-af5c-12766fae8095": "employee_list",
}
@classmethod
def employee_list(cls, list_options: ListOptions, token_dict):
Employees.filter_attr = list_options
records = Employees.filter_active(
*Employees.get_smart_query(smart_query=list_options.query),
Employees.company_id == token_dict.selected_company.company_id,
)
return return_json_response_from_alchemy(
response=records, pagination=list_options
)
class EmployeeCreateEventMethods(MethodToEvent):
event_type = "CREATE"
__event_keys__ = {
"1e1632c3-bb0e-46a5-8e45-da3f6d88ac43": "employee_create",
}
@classmethod
def employee_create(cls, data: InsertEmployees, token_dict):
person = People.find_one(uu_id=data.people_uu_id)
staff = Staff.find_one(uu_id=data.staff_uu_id)
if not staff:
return JSONResponse(
content={
"completed": False,
"message": "Staff record not found",
"data": {},
},
status_code=status.HTTP_404_NOT_FOUND,
)
created_employee = Employees.create(
staff_id=staff.id,
staff_uu_id=str(staff.uu_id),
people_id=person.id if person else None,
people_uu_id=str(person.uu_id) if person else None,
)
return JSONResponse(
content={
"completed": True if not created_employee.is_found else False,
"message": "Create Employee record",
"data": created_employee.get_dict(),
},
status_code=status.HTTP_200_OK,
)
class EmployeeUpdateEventMethods(MethodToEvent):
event_type = "UPDATE"
__event_keys__ = {
"9015a076-d78c-463d-9474-ea343a125fb8": "employee_update",
}
@classmethod
def employee_update(cls, employee_uu_id: str, data: PatchRecord, token_dict):
find_one_employee = Employees.find_one_or_abort(uu_id=employee_uu_id)
access_authorized_employee = Employees.select_action(
employee_id=getattr(token_dict, "employee_id", 5),
filter_expr=[Employees.id == token_dict.get("")],
)
if access_authorized_employee.count:
data_dict = data.excluded_dump()
updated_employee = find_one_employee.update(**data_dict)
return JSONResponse(
content={
"completed": True,
"message": "Update Employee record",
"data": updated_employee,
},
status_code=status.HTTP_200_OK,
)
return JSONResponse(
content={
"completed": False,
"message": "Update Employee record",
"data": {},
},
status_code=status.HTTP_200_OK,
)
class EmployeePatchEventMethods(MethodToEvent):
event_type = "PATCH"
__event_keys__ = {
"8446ce0b-9310-4b9f-93e2-61f56a9dacd1": "employee_patch",
}
@classmethod
def employee_patch(cls, employee_uu_id: str, data: PatchRecord, token_dict):
find_one_employee = Employees.find_one_or_abort(uu_id=employee_uu_id)
access_authorized_employee = Employees.select_action(
employee_id=getattr(token_dict, "employee_id", 5),
filter_expr=[Employees.id == find_one_employee.id],
)
if access_authorized_employee.count:
action = data.excluded_dump()
find_one_employee.active = bool(
action.get("active", find_one_employee.active)
)
find_one_employee.is_confirmed = bool(
action.get("confirm", find_one_employee.is_confirmed)
)
find_one_employee.deleted = bool(
action.get("delete", find_one_employee.deleted)
)
find_one_employee.save()
return JSONResponse(
content={
"completed": True,
"message": "Patch Employee record completed",
"data": find_one_employee.get_dict(),
},
status_code=status.HTTP_200_OK,
)
return JSONResponse(
content={
"completed": False,
"message": "Patch Employee record failed",
"data": {},
},
status_code=status.HTTP_200_OK,
)
class Employee2PeopleEmployEventMethods(MethodToEvent):
event_type = "UPDATE"
__event_keys__ = {
"5eb04057-7a74-4555-b2c6-14eda32dae89": "company_employee_employ",
}
@classmethod
def company_employee_employ(cls, data: BindEmployees2People, token_dict):
selected_staff = Staff.find_one(uu_id=data.staff_uu_id)
selected_people = People.find_one(uu_id=data.people_uu_id)
if not selected_staff:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Staff record not found",
)
if not selected_people:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="People record not found",
)
find_one_employee = Employees.filter_active(
Employees.staff_id == selected_staff.id,
Employees.people_id == None,
).data
staff_name_upper = str(selected_staff.staff_name).upper()
if not find_one_employee:
return JSONResponse(
content={
"completed": False,
"message": f"There is no space for new Employee for given staff : {staff_name_upper}",
"data": {},
},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
)
if not data.expiry_starts:
data.expiry_starts = datetime.now()
find_one_employee = find_one_employee[0].update(
people_id=selected_people.id,
expiry_starts=data.expiry_starts,
**token_dict.update_creds,
)
return JSONResponse(
content={
"completed": True,
"message": "Get Employee to People record",
"data": find_one_employee.get_dict(),
},
status_code=status.HTTP_200_OK,
)
class Employee2PeopleFireEventMethods(MethodToEvent):
event_type = "UPDATE"
__event_keys__ = {
"caf914fa-0899-4b0b-a85a-3d40fdaa06a5": "company_employee_fire",
}
@classmethod
def company_employee_fire(cls, data: BindEmployees2People, token_dict):
selected_people = People.find_one(uu_id=data.people_uu_id)
if not selected_people:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="People record not found",
)
find_one_employee: Employees = Employees.find_one(people_id=selected_people.id)
if not find_one_employee:
return JSONResponse(
content={
"completed": False,
"message": "Employee record not found for given People",
"data": {},
},
status_code=status.HTTP_404_NOT_FOUND,
)
find_one_employee = find_one_employee.update(
people_id=None, **token_dict.update_creds
)
if not find_one_employee.people_id:
employee_history = EmployeeHistory.find_or_create(
staff_id=find_one_employee.staff_id,
expiry_ends=data.expiry_ends,
people_id=selected_people.id,
created_by_id=find_one_employee.created_by_id,
updated_by_id=find_one_employee.updated_by_id,
confirmed_by_id=find_one_employee.confirmed_by_id,
replication_id=find_one_employee.replication_id,
created_by=find_one_employee.created_by,
updated_by=find_one_employee.updated_by,
confirmed_by=find_one_employee.confirmed_by,
active=find_one_employee.active,
is_confirmed=find_one_employee.is_confirmed,
deleted=find_one_employee.deleted,
expiry_starts=find_one_employee.expiry_starts,
is_notification_send=find_one_employee.is_notification_send,
cryp_uu_id=find_one_employee.cryp_uu_id,
)
return JSONResponse(
content={
"completed": True,
"message": "Employee is fired from Staff",
"data": find_one_employee.get_dict(),
},
status_code=status.HTTP_200_OK,
)
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Employee record not found for given People",
)
EmployeeListEventMethod = EmployeeListEventMethods(
action=ActionsSchema(endpoint="/employee/list")
)
EmployeeCreateEventMethod = EmployeeCreateEventMethods(
action=ActionsSchema(endpoint="/employee/create")
)
EmployeeUpdateEventMethod = EmployeeUpdateEventMethods(
action=ActionsSchema(endpoint="/employee/update")
)
EmployeePatchEventMethod = EmployeePatchEventMethods(
action=ActionsSchema(endpoint="/employee/patch")
)
Employee2PeopleEmployEventMethod = Employee2PeopleEmployEventMethods(
action=ActionsSchema(endpoint="/employee/employ")
)
Employee2PeopleFireEventMethod = Employee2PeopleFireEventMethods(
action=ActionsSchema(endpoint="/employee/fire")
)

View File

@@ -0,0 +1,148 @@
from fastapi import status
from fastapi.responses import JSONResponse
from fastapi.exceptions import HTTPException
from validations import (
InsertStaff,
SelectStaff,
PatchRecord,
ListOptions,
)
from databases import Staff, Duties
from api_events.events.abstract_class import MethodToEvent, ActionsSchema
from api_objects.auth.token_objects import EmployeeTokenObject, OccupantTokenObject
from api_validations.core_response import return_json_response_from_alchemy
class StaffListEventMethods(MethodToEvent):
event_type = "SELECT"
__event_keys__ = {
"8984a519-99bf-4f25-8f34-2e1aebba468c": "staff_list",
}
@classmethod
def staff_list(cls, list_options: ListOptions, token_dict):
Staff.filter_attr = list_options
records = Staff.filter_active(
*Staff.get_smart_query(smart_query=list_options.query),
)
return return_json_response_from_alchemy(
response=records, pagination=list_options
)
class StaffCreateEventMethods(MethodToEvent):
event_type = "CREATE"
__event_keys__ = {
"8f619257-19fd-404f-b713-7392c588dc36": "staff_create",
}
@classmethod
def staff_create(cls, data: InsertStaff, token_dict: EmployeeTokenObject):
data_dict = data.excluded_dump()
duties = Duties.find_one(uu_id=data.duties_uu_id)
if not duties:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Duties not found",
)
data_dict["duties_id"] = duties.id
created_duty = Staff.find_or_create(**data_dict)
return JSONResponse(
content={
"completed": True,
"message": "Create Staff record",
"data": created_duty.get_dict(),
},
status_code=status.HTTP_200_OK,
)
class StaffGetByUUIDEventMethods(MethodToEvent):
event_type = "GET"
__event_keys__ = {
"7724cfbb-c0ee-4261-959b-61b84e88a34f": "staff_get_by_uu_id",
}
@classmethod
def staff_get_by_uu_id(cls, data: SelectStaff, token_dict):
if data.duties_uu_id:
duties_id = Duties.find_one(uu_id=data.duties_uu_id)
selected_staffs = Staff.filter_active(
Staff.duties_id == duties_id.id,
)
return JSONResponse(
content={
"completed": True,
"message": "Staff records are listed",
"data": [
selected_staff.get_dict()
for selected_staff in selected_staffs.data
],
},
status_code=status.HTTP_200_OK,
)
return JSONResponse(
content={
"completed": False,
"message": "Get Staff record failed",
"data": {},
},
status_code=status.HTTP_202_ACCEPTED,
)
class StaffUpdateEventMethods(MethodToEvent):
event_type = "UPDATE"
__event_keys__ = {
"5329f35d-ff9d-4656-a831-ba9c8204e483": "staff_update",
}
@classmethod
def staff_update(cls, staff_uu_id: str, data, token_dict):
return JSONResponse(
content={"completed": True, "message": "Update Staff record", "data": {}},
status_code=status.HTTP_200_OK,
)
class StaffPatchEventMethods(MethodToEvent):
event_type = "PATCH"
__event_keys__ = {
"b1cd7c0a-1458-472b-894f-3adc857c8512": "staff_patch",
}
@classmethod
def staff_patch(cls, staff_uu_id: str, data: PatchRecord, token_dict):
return JSONResponse(
content={
"completed": False,
"message": "Patch Staff record failed",
"data": {},
},
status_code=status.HTTP_200_OK,
)
StaffListEventMethod = StaffListEventMethods(
action=ActionsSchema(endpoint="/staff/list")
)
StaffCreateEventMethod = StaffCreateEventMethods(
action=ActionsSchema(endpoint="/staff/create")
)
StaffGetByUUIDEventMethod = StaffGetByUUIDEventMethods(
action=ActionsSchema(endpoint="/staff/get_by_duties_uu_id")
)
StaffUpdateEventMethod = StaffUpdateEventMethods(
action=ActionsSchema(endpoint="/staff/update")
)
StaffPatchEventMethod = StaffPatchEventMethods(
action=ActionsSchema(endpoint="/staff/patch")
)