events imports are checked

This commit is contained in:
2024-11-08 15:05:12 +03:00
parent 643d6d8f65
commit a5b1e0b2f4
71 changed files with 2517 additions and 312 deletions

View File

@@ -5,7 +5,7 @@ from fastapi.responses import JSONResponse
from databases import Companies
from api_validations import (
from api_validations.validations_request import (
InsertCompany,
UpdateCompany,
ListOptions,
@@ -63,6 +63,8 @@ class CompanyCreateEventMethods(MethodToEvent):
data=data, token=token_dict.companies_list
)
created_company.related_company = token_dict.get("company_uu_id")
created_company.flush()
created_company.save()
return JSONResponse(
content={
"completed": True,
@@ -91,14 +93,14 @@ class CompanyUpdateEventMethods(MethodToEvent):
@classmethod
def company_update(cls, company_uu_id: str, data: UpdateCompany, token_dict):
find_one_company = Companies.find_one_or_abort(uu_id=company_uu_id)
find_one_company = Companies.filter_one(Companies.uu_id==company_uu_id)
access_authorized_company = Companies.select_action(
duty_id=getattr(token_dict, "duty_id", 5),
filter_expr=[Companies.id == token_dict.get("")],
)
if access_authorized_company.count:
data_dict = data.excluded_dump()
updated_company = find_one_company.update(**data_dict)
updated_company = find_one_company.data.update(**data.excluded_dump())
Companies.save()
return JSONResponse(
content={
"completed": True,

View File

@@ -1,8 +1,8 @@
from typing import Optional
from typing import Optional, Union
from fastapi import status
from fastapi.responses import JSONResponse
from validations import (
from api_validations.validations_request import (
DepartmentsPydantic,
PatchRecord,
ListOptions,
@@ -24,7 +24,9 @@ class DepartmentListEventMethods(MethodToEvent):
}
@classmethod
def department_list(cls, list_options: ListOptions, token_dict):
def department_list(
cls, list_options: ListOptions, token_dict: Union[EmployeeTokenObject, OccupantTokenObject]
):
Departments.filter_attr = list_options
records = Departments.filter_active(
*Departments.get_smart_query(smart_query=list_options.query),
@@ -57,22 +59,14 @@ class DepartmentCreateEventMethods(MethodToEvent):
data_dict["company_id"] = token_dict.selected_company.company_id
data_dict["company_uu_id"] = token_dict.selected_company.company_uu_id
created_department = Departments.find_or_create(**data_dict)
if created_department.is_found:
return JSONResponse(
content={
"completed": False,
"message": "Company record already exits here is the record",
"data": created_department.get_dict(),
},
status_code=status.HTTP_202_ACCEPTED,
)
Departments.save()
return JSONResponse(
content={
"completed": True,
"message": "Create Company record",
"completed": False,
"message": "Company record already exits here is the record",
"data": created_department.get_dict(),
},
status_code=status.HTTP_200_OK,
status_code=status.HTTP_202_ACCEPTED,
)
@@ -87,7 +81,7 @@ class DepartmentUpdateEventMethods(MethodToEvent):
def department_update(
cls, company_uu_id: str, data: DepartmentsPydantic, token_dict
):
find_one_company = Departments.find_one_or_abort(uu_id=company_uu_id)
find_one_company = Departments.filter_one(Departments.uu_id==company_uu_id)
access_authorized_company = Departments.select_action(
duty_id=getattr(token_dict, "duty_id", 5),
filter_expr=[Departments.id == token_dict.get("")],
@@ -95,6 +89,7 @@ class DepartmentUpdateEventMethods(MethodToEvent):
if access_authorized_company.count:
data_dict = data.excluded_dump()
updated_company = find_one_company.update(**data_dict)
Departments.save()
return JSONResponse(
content={
"completed": True,

View File

@@ -1,8 +1,15 @@
from typing import Union
from fastapi import status
from fastapi.responses import JSONResponse
from validations import InsertDuties, UpdateDuties, SelectDuties
from validations.root_validates import PatchRecord, ListOptions
from api_validations.validations_request import (
InsertDuties,
UpdateDuties,
SelectDuties,
PatchRecord,
ListOptions,
)
from databases import Departments, Duty, Duties
@@ -19,9 +26,11 @@ class DutiesListEventMethods(MethodToEvent):
}
@classmethod
def duties_list(cls, list_options: ListOptions, token_dict):
def duties_list(
cls, list_options: ListOptions, token_dict: Union[EmployeeTokenObject, OccupantTokenObject]
):
Duties.filter_attr = list_options
records = Duties.filter_active(
records = Duties.filter_all(
*Duties.get_smart_query(smart_query=list_options.query),
Duties.company_id == token_dict.selected_company.company_id
)
@@ -40,9 +49,11 @@ class DutiesGetByUUIDEventMethods(MethodToEvent):
}
@classmethod
def duties_get_by_uuid(cls, data: SelectDuties, token_dict):
def duties_get_by_uuid(
cls, data: SelectDuties, token_dict: Union[EmployeeTokenObject, OccupantTokenObject]
):
duty = Duty.find_one(uu_id=data.duty_uu_id)
duty = Duty.filter_one(Duty.uu_id==data.duty_uu_id).data
if not duty:
return JSONResponse(
content={
@@ -81,9 +92,11 @@ class DutiesCreateEventMethods(MethodToEvent):
}
@classmethod
def duties_create(cls, data: InsertDuties, token_dict):
duty = Duty.find_one(uu_id=data.duties_uu_id)
department = Departments.find_one(uu_id=data.department_uu_id)
def duties_create(
cls, data: InsertDuties, token_dict: Union[EmployeeTokenObject, OccupantTokenObject]
):
duty = Duty.filter_one(Duty.uu_id==data.duties_uu_id).data
department = Departments.filter_one(Duty.uu_id==data.department_uu_id).data
created_duties = Duties.find_or_create(
company_id=token_dict.selected_company.company_id,
@@ -98,6 +111,7 @@ class DutiesCreateEventMethods(MethodToEvent):
created_duties.update(users_default_duty=created_duties.id)
if not created_duties:
Duty.save()
return JSONResponse(
content={
"completed": False,
@@ -134,6 +148,7 @@ class DutiesUpdateEventMethods(MethodToEvent):
if access_authorized_duties.count:
data_dict = data.excluded_dump()
updated_duties = find_one_duties.update(**data_dict)
Duties.save()
return {
"completed": True,
"message": "Update Duties record",

View File

@@ -1,8 +1,11 @@
from typing import Union
from fastapi import status
from fastapi.responses import JSONResponse
from validations import InsertCompanyDuty
from validations.root_validates import PatchRecord, ListOptions
from api_validations.validations_request import (
InsertCompanyDuty, PatchRecord, ListOptions
)
from databases import Duty
@@ -19,7 +22,9 @@ class DutyListEventMethods(MethodToEvent):
}
@classmethod
def duty_list(cls, list_options: ListOptions, token_dict):
def duty_list(
cls, list_options: ListOptions, token_dict: Union[EmployeeTokenObject, OccupantTokenObject]
):
records = Duty.filter_active(
*Duty.get_smart_query(list_options.query),
)
@@ -36,8 +41,11 @@ class DutyCreateEventMethods(MethodToEvent):
}
@classmethod
def duty_create(cls, data: InsertCompanyDuty, token_dict):
def duty_create(
cls, data: InsertCompanyDuty, token_dict: Union[EmployeeTokenObject, OccupantTokenObject]
):
created_duty = Duty.find_or_create(**data.excluded_dump())
Duty.save()
return JSONResponse(
content={
"completed": True,
@@ -56,7 +64,7 @@ class DutyUpdateEventMethods(MethodToEvent):
}
@classmethod
def duty_update(cls, company_uu_id: str, data, token_dict):
def duty_update(cls, company_uu_id: str, data, token_dict: Union[EmployeeTokenObject, OccupantTokenObject]):
find_one_company = Duty.find_one_or_abort(uu_id=company_uu_id)
access_authorized_company = Duty.select_action(
duty_id=getattr(token_dict, "duty_id", 5), # ?
@@ -65,6 +73,7 @@ class DutyUpdateEventMethods(MethodToEvent):
if access_authorized_company.count:
data_dict = data.excluded_dump()
updated_company = find_one_company.update(**data_dict)
Duty.save()
return JSONResponse(
content={
"completed": True,
@@ -95,15 +104,9 @@ class DutyPatchEventMethods(MethodToEvent):
)
if access_authorized_company.count:
action = data.excluded_dump()
find_one_company.active = bool(
action.get("active", find_one_company.active)
)
find_one_company.is_confirmed = bool(
action.get("confirm", find_one_company.is_confirmed)
)
find_one_company.deleted = bool(
action.get("delete", find_one_company.deleted)
)
find_one_company.active = bool(action.get("active", find_one_company.active))
find_one_company.is_confirmed = bool(action.get("confirm", find_one_company.is_confirmed))
find_one_company.deleted = bool(action.get("delete", find_one_company.deleted))
find_one_company.save()
return JSONResponse(
content={

View File

@@ -1,9 +1,10 @@
from datetime import datetime
from typing import Union
from fastapi import status, HTTPException
from fastapi.responses import JSONResponse
from validations import (
from api_validations.validations_request import (
InsertEmployees,
BindEmployees2People,
PatchRecord,
@@ -25,7 +26,9 @@ class EmployeeListEventMethods(MethodToEvent):
}
@classmethod
def employee_list(cls, list_options: ListOptions, token_dict):
def employee_list(
cls, list_options: ListOptions, token_dict: Union[EmployeeTokenObject, OccupantTokenObject]
):
Employees.filter_attr = list_options
records = Employees.filter_active(
*Employees.get_smart_query(smart_query=list_options.query),
@@ -44,7 +47,9 @@ class EmployeeCreateEventMethods(MethodToEvent):
}
@classmethod
def employee_create(cls, data: InsertEmployees, token_dict):
def employee_create(
cls, data: InsertEmployees, token_dict: Union[EmployeeTokenObject, OccupantTokenObject]
):
person = People.find_one(uu_id=data.people_uu_id)
staff = Staff.find_one(uu_id=data.staff_uu_id)
if not staff:
@@ -63,6 +68,7 @@ class EmployeeCreateEventMethods(MethodToEvent):
people_id=person.id if person else None,
people_uu_id=str(person.uu_id) if person else None,
)
Employees.save()
return JSONResponse(
content={
"completed": True if not created_employee.is_found else False,
@@ -90,6 +96,7 @@ class EmployeeUpdateEventMethods(MethodToEvent):
if access_authorized_employee.count:
data_dict = data.excluded_dump()
updated_employee = find_one_employee.update(**data_dict)
Employees.save()
return JSONResponse(
content={
"completed": True,
@@ -116,7 +123,9 @@ class EmployeePatchEventMethods(MethodToEvent):
}
@classmethod
def employee_patch(cls, employee_uu_id: str, data: PatchRecord, token_dict):
def employee_patch(
cls, employee_uu_id: str, data: PatchRecord, token_dict: Union[EmployeeTokenObject, OccupantTokenObject]
):
find_one_employee = Employees.find_one_or_abort(uu_id=employee_uu_id)
access_authorized_employee = Employees.select_action(
employee_id=getattr(token_dict, "employee_id", 5),
@@ -133,7 +142,6 @@ class EmployeePatchEventMethods(MethodToEvent):
find_one_employee.deleted = bool(
action.get("delete", find_one_employee.deleted)
)
find_one_employee.save()
return JSONResponse(
content={
"completed": True,
@@ -197,7 +205,7 @@ class Employee2PeopleEmployEventMethods(MethodToEvent):
expiry_starts=data.expiry_starts,
**token_dict.update_creds,
)
Employees.save()
return JSONResponse(
content={
"completed": True,
@@ -257,6 +265,7 @@ class Employee2PeopleFireEventMethods(MethodToEvent):
is_notification_send=find_one_employee.is_notification_send,
cryp_uu_id=find_one_employee.cryp_uu_id,
)
Employees.save()
return JSONResponse(
content={
"completed": True,

View File

@@ -2,7 +2,7 @@ from fastapi import status
from fastapi.responses import JSONResponse
from fastapi.exceptions import HTTPException
from validations import (
from api_validations.validations_request import (
InsertStaff,
SelectStaff,
PatchRecord,
@@ -48,10 +48,9 @@ class StaffCreateEventMethods(MethodToEvent):
status_code=status.HTTP_404_NOT_FOUND,
detail="Duties not found",
)
data_dict["duties_id"] = duties.id
created_duty = Staff.find_or_create(**data_dict)
Staff.save()
return JSONResponse(
content={
"completed": True,