alchemy functions updated

This commit is contained in:
berkay 2024-11-10 12:14:10 +03:00
parent 1f75e49a07
commit e01a2c8afb
24 changed files with 543 additions and 389 deletions

View File

@ -113,7 +113,7 @@ class AddressCreateEventMethods(MethodToEvent):
token_dict: Union[EmployeeTokenObject, OccupantTokenObject],
):
post_code = AddressPostcode.filter_one(
AddressPostcode.uu_id==data.post_code_uu_id,
AddressPostcode.uu_id == data.post_code_uu_id,
AddressPostcode.active == True,
).data
if not post_code:
@ -232,7 +232,7 @@ class AddressUpdateEventMethods(MethodToEvent):
):
address = Addresses.find_one_or_abort(uu_id=address_uu_id)
post_code = RelationshipEmployee2PostCode.filter_one(
RelationshipEmployee2PostCode.member_id==address.post_code_id
RelationshipEmployee2PostCode.member_id == address.post_code_id
).data
if not post_code:
raise HTTPException(
@ -309,7 +309,7 @@ class AddressPostCodeCreateEventMethods(MethodToEvent):
token_dict: Union[EmployeeTokenObject, OccupantTokenObject],
):
data_dump = data.excluded_dump()
street = AddressStreet.filter_one(AddressStreet.uu_id==data.street_uu_id).data
street = AddressStreet.filter_one(AddressStreet.uu_id == data.street_uu_id).data
if not street:
raise HTTPException(
status_code=404,
@ -351,8 +351,12 @@ class AddressPostCodeUpdateEventMethods(MethodToEvent):
data: InsertPostCode,
token_dict: Union[EmployeeTokenObject, OccupantTokenObject],
):
post_code = AddressPostcode.filter_one(AddressPostcode.uu_id==post_code_uu_id).data
street = AddressStreet.filter_one(AddressPostcode.uu_id==data.street_uu_id).data
post_code = AddressPostcode.filter_one(
AddressPostcode.uu_id == post_code_uu_id
).data
street = AddressStreet.filter_one(
AddressPostcode.uu_id == data.street_uu_id
).data
if not street:
raise HTTPException(
status_code=404,

View File

@ -118,7 +118,7 @@ class AuthenticationSelectEventMethods(MethodToEvent):
department_ids = [
department.id
for department in Departments.filter_all(
Departments.company_id==selected_company.id
Departments.company_id == selected_company.id
).data
]
duties_ids = [
@ -130,9 +130,7 @@ class AuthenticationSelectEventMethods(MethodToEvent):
]
staff_ids = [
staff.id
for staff in Staff.filter_all(
Staff.duties_id.in_(duties_ids)
).data
for staff in Staff.filter_all(Staff.duties_id.in_(duties_ids)).data
]
employee = Employees.filter_one(
Employees.people_id == token_user.person_id,
@ -142,13 +140,15 @@ class AuthenticationSelectEventMethods(MethodToEvent):
reachable_event_list_id, reachable_event_list_uu_id = (
Event2Employee.get_event_id_by_employee_id(employee_id=employee.id)
)
staff = Staff.filter_one(Staff.id==employee.staff_id).data
duties = Duties.find_one(Duties.id==staff.duties_id).data
department = Departments.find_one(Departments.id==duties.department_id).data
bulk_id = Duty.filter_one(Duty.duty_code=="BULK").data
staff = Staff.filter_one(Staff.id == employee.staff_id).data
duties = Duties.find_one(Duties.id == staff.duties_id).data
department = Departments.find_one(
Departments.id == duties.department_id
).data
bulk_id = Duty.filter_one(Duty.duty_code == "BULK").data
bulk_duty_id = Duties.filter_one(
Duties.company_id==selected_company.id,
Duties.duties_id==bulk_id.id
Duties.company_id == selected_company.id,
Duties.duties_id == bulk_id.id,
).data
update_selected_to_redis(
request=request,
@ -176,36 +176,41 @@ class AuthenticationSelectEventMethods(MethodToEvent):
status_code=status.HTTP_200_OK,
)
elif token_user.user_type == 2:
occupant_type = OccupantTypes.filter_one(OccupantTypes.uu_id==data.occupant_uu_id).data
occupant_type = OccupantTypes.filter_one(
OccupantTypes.uu_id == data.occupant_uu_id
).data
if not occupant_type:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Occupant Type is not found",
)
build_part = BuildParts.filter_one(BuildParts.uu_id==data.build_part_uu_id).data
build_part = BuildParts.filter_one(
BuildParts.uu_id == data.build_part_uu_id
).data
if not build_part:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Build Part is not found",
)
build = Build.filter_one(Build.id==build_part.build_id).data
build = Build.filter_one(Build.id == build_part.build_id).data
related_company = RelationshipEmployee2Build.filter_one(
RelationshipEmployee2Build.member_id==build.id
RelationshipEmployee2Build.member_id == build.id
).data
company_related = Companies.filter_one(
Companies.id==related_company.company_id
Companies.id == related_company.company_id
).data
responsible_employee = Employees.filter_one(
Employees.id==related_company.employee_id
Employees.id == related_company.employee_id
).data
if selected_occupant_type := BuildLivingSpace.filter_one(
BuildLivingSpace.occupant_type==occupant_type.id,
BuildLivingSpace.person_id==token_user.person_id,
BuildLivingSpace.build_parts_id==build_part.id,
BuildLivingSpace.occupant_type == occupant_type.id,
BuildLivingSpace.person_id == token_user.person_id,
BuildLivingSpace.build_parts_id == build_part.id,
).data:
reachable_event_list_id, reachable_event_list_uu_id = (
Event2Occupant.get_event_id_by_build_living_space_id(
Event2Occupant.build_living_space_id==selected_occupant_type.id
Event2Occupant.build_living_space_id
== selected_occupant_type.id
)
)
update_selected_to_redis(
@ -250,7 +255,9 @@ class AuthenticationCheckTokenEventMethods(MethodToEvent):
@classmethod
def authentication_login_with_domain_and_creds(
cls, request, token_dict: typing.Union[EmployeeSelection, OccupantSelection],
cls,
request,
token_dict: typing.Union[EmployeeSelection, OccupantSelection],
):
if get_object_via_access_key(request=request):
return JSONResponse(
@ -272,15 +279,19 @@ class AuthenticationRefreshEventMethods(MethodToEvent):
@classmethod
def authentication_refresh_user_info(
cls, request, token_dict: typing.Union[EmployeeSelection, OccupantSelection],
cls,
request,
token_dict: typing.Union[EmployeeSelection, OccupantSelection],
):
access_token = str(request.headers.get(Auth.ACCESS_TOKEN_TAG))
if token_user := get_object_via_access_key(request=request):
if found_user := Users.filter_one(Users.uu_id==token_user.get("uu_id")).data:
if found_user := Users.filter_one(
Users.uu_id == token_user.get("uu_id")
).data:
user_token = UsersTokens.filter_one(
UsersTokens.domain==found_user.domain_name,
UsersTokens.user_id==found_user.id,
UsersTokens.token_type=="RememberMe",
UsersTokens.domain == found_user.domain_name,
UsersTokens.user_id == found_user.id,
UsersTokens.token_type == "RememberMe",
).data
access_dict = {
"access_token": access_token,
@ -309,11 +320,14 @@ class AuthenticationChangePasswordEventMethods(MethodToEvent):
@classmethod
def authentication_change_password(
cls, request, data: ChangePassword, token_dict: typing.Union[EmployeeSelection, OccupantSelection],
cls,
request,
data: ChangePassword,
token_dict: typing.Union[EmployeeSelection, OccupantSelection],
):
token_user = get_object_via_access_key(request=request)
if token_user.user_type == 1:
if found_user := Users.filter_one(Users.uu_id==token_user.uu_id).data:
if found_user := Users.filter_one(Users.uu_id == token_user.uu_id).data:
if found_user.check_password(data.old_password):
found_user.set_password(data.new_password)
return JSONResponse(
@ -353,7 +367,7 @@ class AuthenticationCreatePasswordEventMethods(MethodToEvent):
status_code=status.HTTP_406_NOT_ACCEPTABLE, detail="Password must match"
)
if found_user := Users.filter_one(
Users.password_token==data.password_token
Users.password_token == data.password_token
).data:
found_user.create_password(password=data.password)
found_user.password_token = None
@ -398,9 +412,7 @@ class AuthenticationDisconnectUserEventMethods(MethodToEvent):
):
if token_user := get_object_via_access_key(request=request):
found_user = Users.filter_one(
Users.uu_id==token_user.get("uu_id")
).data
found_user = Users.filter_one(Users.uu_id == token_user.get("uu_id")).data
if not found_user:
return JSONResponse(
content={
@ -415,7 +427,7 @@ class AuthenticationDisconnectUserEventMethods(MethodToEvent):
token_user = json.loads(redis_cli.get(key) or {})
redis_cli.delete(key)
selected_user = Users.filter_one(
Users.uu_id==token_user.get("uu_id")
Users.uu_id == token_user.get("uu_id")
).data
selected_user.remove_refresher_token(
domain=data.domain, disconnect=True
@ -466,7 +478,7 @@ class AuthenticationLogoutEventMethods(MethodToEvent):
if token_user.get("domain") == data.domain:
redis_cli.delete(key)
selected_user = Users.filter_one(
Users.uu_id==token_user.get("uu_id")
Users.uu_id == token_user.get("uu_id")
).data
selected_user.remove_refresher_token(domain=data.domain)
# UserLogger.log_error(
@ -521,9 +533,7 @@ class AuthenticationRefreshTokenEventMethods(MethodToEvent):
content={"completed": False, "message": "Invalid data", "data": {}},
status_code=status.HTTP_202_ACCEPTED,
)
if found_user := Users.filter_one(
Users.id==token_refresher.user_id
).data:
if found_user := Users.filter_one(Users.id == token_refresher.user_id).data:
found_user: Users = found_user
access_key = save_access_token_to_redis(
request=request, found_user=found_user, domain=data.domain

View File

@ -1,5 +1,6 @@
import json
import typing
from typing import Union
from fastapi import status
from fastapi.requests import Request
@ -96,8 +97,8 @@ class AuthenticationSelectEventMethods(MethodToEvent):
def authentication_select_company_or_occupant_type(
cls,
request: Request,
data: typing.Union[EmployeeSelection, OccupantSelection],
token_dict: dict = None,
data,
token_dict: typing.Union[EmployeeSelection, OccupantSelection],
):
from api_objects.auth.token_objects import OccupantToken, CompanyToken
@ -120,32 +121,45 @@ class AuthenticationSelectEventMethods(MethodToEvent):
]
duties_ids = [
duties.id
for duties in Duties.filter_active(
for duties in Duties.filter_all(
Duties.company_id == selected_company.id,
Duties.department_id.in_(department_ids),
*Duties.valid_record_args(Duties),
).data
]
staff_ids = [
staff.id
for staff in Staff.filter_active(
Staff.duties_id.in_(duties_ids)
for staff in Staff.filter_all(
Staff.duties_id.in_(duties_ids),
*Staff.valid_record_args(Staff),
).data
]
employee = Employees.filter_active(
employee = Employees.filter_one(
Employees.people_id == token_user.person_id,
Employees.staff_id.in_(staff_ids),
).data[0]
*Employees.valid_record_args(Employees),
).data
reachable_event_list_id, reachable_event_list_uu_id = (
Event2Employee.get_event_id_by_employee_id(employee_id=employee.id)
)
staff = Staff.find_one(id=employee.staff_id)
duties = Duties.find_one(id=staff.duties_id)
department = Departments.find_one(id=duties.department_id)
bulk_id = Duty.find_one(duty_code="BULK")
bulk_duty_id = Duties.find_one(
company_id=selected_company.id, duties_id=bulk_id.id
)
staff = Staff.filter_one(
Staff.id==employee.staff_id,
*Staff.valid_record_args(Staff),
).data
duties = Duties.filter_one(
Duties.id==staff.duties_id,
*Duties.valid_record_args(Duties),
).data
department = Departments.filter_one(
Departments.id==duties.department_id,
).data
bulk_id = Duty.filter_by_one(
duty_code="BULK", *Duty.valid_record_args(Duty)
).data
bulk_duty_id = Duties.filter_by_one(
company_id=selected_company.id, duties_id=bulk_id.id, *Duties.valid_record_dict
).data
update_selected_to_redis(
request=request,
add_payload=CompanyToken(
@ -172,27 +186,40 @@ class AuthenticationSelectEventMethods(MethodToEvent):
status_code=status.HTTP_200_OK,
)
elif token_user.user_type == 2:
occupant_type = OccupantTypes.find_one(uu_id=data.occupant_uu_id)
occupant_type = OccupantTypes.filter_by_one(uu_id=data.occupant_uu_id)
if not occupant_type:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Occupant Type is not found",
)
build_part = BuildParts.find_one(uu_id=data.build_part_uu_id)
build_part = BuildParts.filter_by_one(uu_id=data.build_part_uu_id)
if not build_part:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Build Part is not found",
)
build = Build.find_one(id=build_part.build_id)
related_company = RelationshipEmployee2Build.find_one(member_id=build.id)
company_related = Companies.find_one(id=related_company.company_id)
responsible_employee = Employees.find_one(id=related_company.employee_id)
if selected_occupant_type := BuildLivingSpace.find_one(
occupant_type=occupant_type.id,
person_id=token_user.person_id,
build_parts_id=build_part.id,
):
build = Build.filter_one(
Build.id==build_part.build_id,
*Build.valid_record_args(Build),
).data
related_company = RelationshipEmployee2Build.filter_one(
RelationshipEmployee2Build.member_id==build.id,
*RelationshipEmployee2Build.valid_record_args(RelationshipEmployee2Build),
).data
company_related = Companies.filter_one(
Companies.id==related_company.company_id,
*Companies.valid_record_args(Companies),
).data
responsible_employee = Employees.find_one(
Employees.id==related_company.employee_id,
*Employees.valid_record_args(Employees),
).data
if selected_occupant_type := BuildLivingSpace.filter_one(
BuildLivingSpace.occupant_type==occupant_type.id,
BuildLivingSpace.person_id==token_user.person_id,
BuildLivingSpace.build_parts_id==build_part.id,
*BuildLivingSpace.valid_record_args(BuildLivingSpace),
).data:
reachable_event_list_id, reachable_event_list_uu_id = (
Event2Occupant.get_event_id_by_build_living_space_id(
build_living_space_id=selected_occupant_type.id
@ -265,12 +292,16 @@ class AuthenticationRefreshEventMethods(MethodToEvent):
access_token = str(request.headers.get(Auth.ACCESS_TOKEN_TAG))
if token_user := get_object_via_access_key(request=request):
if found_user := Users.find_one(uu_id=token_user.get("uu_id")):
user_token = UsersTokens.find_one(
domain=found_user.domain_name,
user_id=found_user.id,
token_type="RememberMe",
)
if found_user := Users.filter_one(
Users.uu_id==token_user.get("uu_id"),
*Users.valid_record_args(Users),
).data:
user_token = UsersTokens.filter_one(
UsersTokens.domain==found_user.domain_name,
UsersTokens.user_id==found_user.id,
UsersTokens.token_type=="RememberMe",
*UsersTokens.valid_record_args(UsersTokens),
).data
access_dict = {
"access_token": access_token,
"refresh_token": getattr(user_token, "token", None),
@ -298,11 +329,13 @@ class AuthenticationChangePasswordEventMethods(MethodToEvent):
@classmethod
def authentication_change_password(
cls, request, data: ChangePassword, token_dict: dict = None
cls, data: ChangePassword, token_dict: Union[EmployeeTokenObject, OccupantTokenObject]
):
token_user = get_object_via_access_key(request=request)
if token_user.user_type == 1:
if found_user := Users.find_one(uu_id=token_user.uu_id):
if token_dict.user_type == 1:
if found_user := Users.filter_one(
Users.uu_id==token_dict.person_uu_id,
*Users.valid_record_args(Users),
).data:
if found_user.check_password(data.old_password):
found_user.set_password(data.new_password)
return JSONResponse(
@ -334,13 +367,16 @@ class AuthenticationCreatePasswordEventMethods(MethodToEvent):
@classmethod
def authentication_create_password(
cls, request, data: CreatePassword, token_dict: dict = None
cls, data: CreatePassword
):
if not data.re_password == data.password:
raise HTTPException(
status_code=status.HTTP_406_NOT_ACCEPTABLE, detail="Password must match"
)
if found_user := Users.find_one(password_token=data.password_token):
if found_user := Users.filter_one(
Users.password_token==data.password_token,
*Users.valid_record_args(Users),
).data:
found_user.create_password(password=data.password)
found_user.password_token = None
found_user.save()
@ -380,50 +416,52 @@ class AuthenticationDisconnectUserEventMethods(MethodToEvent):
@classmethod
def authentication_disconnect_user(
cls, request: Request, data: Logout, token_dict: dict = None
cls, request: Request, data: Logout, token_dict: Union[EmployeeTokenObject, OccupantTokenObject]
):
if token_user := get_object_via_access_key(request=request):
found_user = Users.find_one(uu_id=token_user.get("uu_id"))
if not found_user:
return JSONResponse(
content={
"completed": False,
"message": "Invalid data",
"data": None,
},
status_code=status.HTTP_202_ACCEPTED,
)
if already_tokens := get_object_via_user_uu_id(user_id=found_user.uu_id):
for key in already_tokens:
token_user = json.loads(redis_cli.get(key) or {})
redis_cli.delete(key)
selected_user = Users.find_one(uu_id=token_user.get("uu_id"))
selected_user.remove_refresher_token(
domain=data.domain, disconnect=True
)
# UserLogger.log_error(
# str(
# dict(
# user_id=found_user.id,
# domain=data.domain,
# access_key=token_user.get("access_input"),
# agent=request.headers.get("User-Agent", None),
# ip=getattr(request, "remote_addr", None)
# or request.headers.get("X-Forwarded-For", None),
# platform=request.headers.get("Origin", None),
# login_date=datetime.datetime.utcnow().__str__(),
# is_login=False,
# )
# )
# )
return JSONResponse(
content={
"completed": True,
"message": "All sessions are disconnected",
"data": token_user,
},
status_code=status.HTTP_200_OK,
found_user = Users.filter_one(
Users.uu_id==token_dict.person_uu_id,
*Users.valid_record_args(Users),
).data
if not found_user:
return JSONResponse(
content={
"completed": False,
"message": "Invalid data",
"data": None,
},
status_code=status.HTTP_202_ACCEPTED,
)
if already_tokens := get_object_via_user_uu_id(user_id=found_user.uu_id):
for key in already_tokens:
token_user = json.loads(redis_cli.get(key) or {})
redis_cli.delete(key)
selected_user = Users.find_one(uu_id=token_user.get("uu_id"))
selected_user.remove_refresher_token(
domain=data.domain, disconnect=True
)
# UserLogger.log_error(
# str(
# dict(
# user_id=found_user.id,
# domain=data.domain,
# access_key=token_user.get("access_input"),
# agent=request.headers.get("User-Agent", None),
# ip=getattr(request, "remote_addr", None)
# or request.headers.get("X-Forwarded-For", None),
# platform=request.headers.get("Origin", None),
# login_date=datetime.datetime.utcnow().__str__(),
# is_login=False,
# )
# )
# )
return JSONResponse(
content={
"completed": True,
"message": "All sessions are disconnected",
"data": token_user,
},
status_code=status.HTTP_200_OK,
)
return JSONResponse(
content={"completed": False, "message": "Invalid data", "data": None},
status_code=status.HTTP_202_ACCEPTED,
@ -438,47 +476,48 @@ class AuthenticationLogoutEventMethods(MethodToEvent):
@classmethod
def authentication_logout_user(
cls, request: Request, data: Logout, token_dict: dict = None
cls, data: Logout, token_dict: Union[EmployeeTokenObject, OccupantTokenObject]
):
token_user = None
if already_tokens := get_object_via_access_key(request=request):
for key in already_tokens:
token_user = json.loads(redis_cli.get(key) or {})
if token_user.get("domain") == data.domain:
redis_cli.delete(key)
selected_user = Users.find_one(uu_id=token_user.get("uu_id"))
selected_user.remove_refresher_token(domain=data.domain)
# UserLogger.log_error(
# str(
# dict(
# user_id=selected_user.id,
# domain=data.domain,
# access_key=token_user.get("access_input"),
# agent=request.headers.get("User-Agent", None),
# ip=getattr(request, "remote_addr", None)
# or request.headers.get("X-Forwarded-For", None),
# platform=request.headers.get("Origin", None),
# login_date=datetime.datetime.utcnow().__str__(),
# is_login=False,
# )
# )
# )
from api_services.redis.functions import get_object_via_user_uu_id
if not token_dict:
return JSONResponse(
content={
"completed": True,
"message": "Session is logged out",
"data": token_user,
"completed": False,
"message": "Logout is not successfully completed",
"data": None,
},
status_code=status.HTTP_200_OK,
status_code=status.HTTP_202_ACCEPTED,
)
return JSONResponse(
content={
"completed": False,
"message": "Logout is not successfully completed",
"data": None,
},
status_code=status.HTTP_202_ACCEPTED,
)
token_users = get_object_via_user_uu_id(token_dict.user_uu_id)
for token_user in token_users:
if token_dict.domain == data.domain:
redis_cli.delete(token_user)
selected_user = Users.find_one(uu_id=token_user.get("uu_id"))
selected_user.remove_refresher_token(domain=data.domain)
# UserLogger.log_error(
# str(
# dict(
# user_id=selected_user.id,
# domain=data.domain,
# access_key=token_user.get("access_input"),
# agent=request.headers.get("User-Agent", None),
# ip=getattr(request, "remote_addr", None)
# or request.headers.get("X-Forwarded-For", None),
# platform=request.headers.get("Origin", None),
# login_date=datetime.datetime.utcnow().__str__(),
# is_login=False,
# )
# )
# )
return JSONResponse(
content={
"completed": True,
"message": "Session is logged out",
"data": token_user,
},
status_code=status.HTTP_200_OK,
)
class AuthenticationRefreshTokenEventMethods(MethodToEvent):
@ -500,7 +539,10 @@ class AuthenticationRefreshTokenEventMethods(MethodToEvent):
content={"completed": False, "message": "Invalid data", "data": {}},
status_code=status.HTTP_202_ACCEPTED,
)
if found_user := Users.find_one(id=token_refresher.user_id):
if found_user := Users.filter_one(
Users.id==token_refresher.user_id,
*Users.valid_record_args(Users),
):
found_user: Users = found_user
access_key = save_access_token_to_redis(
request=request, found_user=found_user, domain=data.domain

View File

@ -83,13 +83,15 @@ class BuildCreateEventMethods(MethodToEvent):
created_build = Build.create_action(data=data, token=token_dict)
if not created_build.is_found:
build_type = BuildTypes.filter_by_one(type_code="APT_YNT").data
build_type = BuildTypes.filter_by_one(type_code="APT_YNT", *BuildTypes.valid_record_dict).data
if not build_type:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Build type APT_YNT is not found. Please contact with your system administrator.",
)
api_enum = ApiEnumDropdown.filter_by_one(enum_class="Directions", key="NN").data
api_enum = ApiEnumDropdown.filter_by_one(
enum_class="Directions", key="NN"
).data
if not api_enum:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,

View File

@ -26,7 +26,9 @@ class BuildingBuildPartsListEventMethods(MethodToEvent):
@classmethod
def building_build_parts_list(
cls, list_options: ListOptions, token_dict: Union[EmployeeTokenObject, OccupantTokenObject]
cls,
list_options: ListOptions,
token_dict: Union[EmployeeTokenObject, OccupantTokenObject],
):
build_list_query = Build.select_action(
employee_id=token_dict.selected_company.employee_id,
@ -54,7 +56,9 @@ class BuildingBuildPartsCreateEventMethods(MethodToEvent):
@classmethod
def building_build_parts_create(
cls, data: InsertBuildParts, token_dict: Union[EmployeeTokenObject, OccupantTokenObject]
cls,
data: InsertBuildParts,
token_dict: Union[EmployeeTokenObject, OccupantTokenObject],
):
created_build = BuildParts.create_action(data=data, token=token_dict)
if not created_build:
@ -86,7 +90,9 @@ class BuildingBuildPartsUpdateEventMethods(MethodToEvent):
@classmethod
def building_build_parts_update(
cls, data: InsertBuildParts, token_dict: Union[EmployeeTokenObject, OccupantTokenObject]
cls,
data: InsertBuildParts,
token_dict: Union[EmployeeTokenObject, OccupantTokenObject],
):
if updated_build := BuildParts.update_action(data=data, token=token_dict):
updated_build.save()
@ -117,7 +123,7 @@ class BuildingBuildPartsPatchEventMethods(MethodToEvent):
@classmethod
def building_build_parts_patch(cls, data, token_dict):
find_one_build = BuildParts.filter_one(BuildParts.uu_id==data.uu_id).data
find_one_build = BuildParts.filter_one(BuildParts.uu_id == data.uu_id).data
access_authorized_build = BuildParts.select_action(
duty_id=token_dict.selected_company.duty_id,
filter_expr=[BuildParts.id == find_one_build.id],

View File

@ -112,7 +112,7 @@ class BuildingLivingSpacesPartsCreateEventMethods(MethodToEvent):
detail=f"{data.build_parts_uu_id} - Build Part is not found in database. Check build part uu_id",
)
life_person = People.filter_one(People.uu_id==data.person_uu_id or "").data
life_person = People.filter_one(People.uu_id == data.person_uu_id or "").data
if not life_person:
raise HTTPException(
status_code=status.HTTP_418_IM_A_TEAPOT,
@ -145,7 +145,7 @@ class BuildingLivingSpacesPartsCreateEventMethods(MethodToEvent):
str(system_arrow.now()) >= BuildLivingSpace.expiry_starts,
select_args=[BuildLivingSpace.id],
order_by=BuildLivingSpace.expiry_starts.desc(),
limit=1
limit=1,
).data
last_living_space = BuildLivingSpace.filter_one(
@ -162,7 +162,9 @@ class BuildingLivingSpacesPartsCreateEventMethods(MethodToEvent):
last_living_space.expiry_ends = str(system_arrow.shift(minutes=-10))
last_living_space.save()
user_module = Modules.filter_one(Modules.module_code == "USR-PUB", system=True).data
user_module = Modules.filter_one(
Modules.module_code == "USR-PUB", system=True
).data
ModulesBindOccupantEventMethods.modules_bind_occupant_system(
build_living_space_id=created_living_space.id,
modules_id=user_module.id,
@ -198,7 +200,9 @@ class BuildingLivingSpacesPartsUpdateEventMethods(MethodToEvent):
detail=f"{data.build_parts_uu_id} - Build Part is not found in database. Check build part uu_id",
)
life_person = People.filter_one(People.uu_id==data.life_person_uu_id or "").data
life_person = People.filter_one(
People.uu_id == data.life_person_uu_id or ""
).data
if not life_person:
raise HTTPException(
status_code=status.HTTP_418_IM_A_TEAPOT,
@ -209,7 +213,7 @@ class BuildingLivingSpacesPartsUpdateEventMethods(MethodToEvent):
living_space_id = BuildLivingSpace.select_only(
select_args=[BuildLivingSpace.id],
order_by=BuildLivingSpace.expiry_starts.desc(),
limit=1
limit=1,
).get(1)
last_living_space = BuildLivingSpace.filter_one(

View File

@ -111,7 +111,6 @@ class DecisionBookCreateEventMethods(MethodToEvent):
data_dict["resp_company_id"] = company.id
data_dict["resp_company_uu_id"] = str(company.uu_id)
decision_period_date = DateTimeLocal.get(build.decision_period_date)
data_dict["expiry_starts"] = DateTimeLocal.get(
system_arrow.now().date().year,
@ -143,11 +142,11 @@ class DecisionBookCreateEventMethods(MethodToEvent):
)
occupant_build = Build.filter_one(
Build.id==token_dict.selected_occupant.build_id,
Build.id == token_dict.selected_occupant.build_id,
Build.active == True,
).get(1)
occupant_company = Companies.find_one(
Companies.id==token_dict.selected_occupant.responsible_company_id,
Companies.id == token_dict.selected_occupant.responsible_company_id,
Companies.active == True,
).get(1)
data_dict["build_id"] = occupant_build.id
@ -155,9 +154,7 @@ class DecisionBookCreateEventMethods(MethodToEvent):
data_dict["resp_company_id"] = occupant_company.id
data_dict["resp_company_uu_id"] = str(occupant_company.uu_id)
decision_period_date = system_arrow.get(
occupant_build.decision_period_date
)
decision_period_date = system_arrow.get(occupant_build.decision_period_date)
data_dict["expiry_starts"] = DateTimeLocal.get(
system_arrow.now().date().year,
int(decision_period_date.date().month),

View File

@ -92,10 +92,11 @@ class DecisionBookPersonAddEventMethods(MethodToEvent):
)
manger_book_person = BuildDecisionBookPerson.filter_one(
BuildDecisionBookPerson.token==data.token,
BuildDecisionBookPerson.build_decision_book_uu_id==data.build_decision_book_uu_id,
BuildDecisionBookPerson.is_confirmed==True,
BuildDecisionBookPerson.active==True,
BuildDecisionBookPerson.token == data.token,
BuildDecisionBookPerson.build_decision_book_uu_id
== data.build_decision_book_uu_id,
BuildDecisionBookPerson.is_confirmed == True,
BuildDecisionBookPerson.active == True,
).get(1)
if not manger_book_person:
raise HTTPException(
@ -103,8 +104,9 @@ class DecisionBookPersonAddEventMethods(MethodToEvent):
detail="Manager person not found. Please check token",
)
book_invite = BuildDecisionBookInvitations.filter_one(
BuildDecisionBookInvitations.id==manger_book_person.invite_id,
BuildDecisionBookInvitations.build_id==token_dict.selected_occupant.build_id,
BuildDecisionBookInvitations.id == manger_book_person.invite_id,
BuildDecisionBookInvitations.build_id
== token_dict.selected_occupant.build_id,
).get(1)
if not book_invite:
raise HTTPException(
@ -112,10 +114,10 @@ class DecisionBookPersonAddEventMethods(MethodToEvent):
detail="Invitation not found. Please check token",
)
selected_book_person = BuildDecisionBookPerson.filter_one(
BuildDecisionBookPerson.invite_id==book_invite.id,
BuildDecisionBookPerson.person_uu_id==data.person_uu_id,
BuildDecisionBookPerson.is_confirmed==True,
BuildDecisionBookPerson.active==True,
BuildDecisionBookPerson.invite_id == book_invite.id,
BuildDecisionBookPerson.person_uu_id == data.person_uu_id,
BuildDecisionBookPerson.is_confirmed == True,
BuildDecisionBookPerson.active == True,
).get(1)
if not selected_book_person:
raise HTTPException(
@ -171,11 +173,11 @@ class DecisionBookPersonAttendEventMethods(MethodToEvent):
detail="Employee cannot create decision book invitations",
)
token_user = Users.filter_one(Users.id==token_dict.user_id).get(1)
token_user = Users.filter_one(Users.id == token_dict.user_id).get(1)
invitation_person = BuildDecisionBookPerson.filter_one(
BuildDecisionBookPerson.token==data.token,
BuildDecisionBookPerson.active==True,
BuildDecisionBookPerson.is_confirmed==True
BuildDecisionBookPerson.token == data.token,
BuildDecisionBookPerson.active == True,
BuildDecisionBookPerson.is_confirmed == True,
).get(1)
if not invitation_person:
raise HTTPException(
@ -192,9 +194,10 @@ class DecisionBookPersonAttendEventMethods(MethodToEvent):
# )
# todo check if vicarious person is valid
invitation = BuildDecisionBookInvitations.filter_one(
BuildDecisionBookInvitations.id==invitation_person.invite_id,
BuildDecisionBookInvitations.build_id==token_dict.selected_occupant.build_id,
BuildDecisionBookInvitations.active==True,
BuildDecisionBookInvitations.id == invitation_person.invite_id,
BuildDecisionBookInvitations.build_id
== token_dict.selected_occupant.build_id,
BuildDecisionBookInvitations.active == True,
).get(1)
if not invitation:
raise HTTPException(
@ -240,10 +243,11 @@ class DecisionBookPersonAssignOccupantEventMethods(MethodToEvent):
)
book_person_manager = BuildDecisionBookPerson.filter_one(
BuildDecisionBookPerson.token==data.token,
BuildDecisionBookPerson.build_living_space_id==token_dict.selected_occupant.living_space_id,
BuildDecisionBookPerson.active==True,
BuildDecisionBookPerson.is_confirmed==True,
BuildDecisionBookPerson.token == data.token,
BuildDecisionBookPerson.build_living_space_id
== token_dict.selected_occupant.living_space_id,
BuildDecisionBookPerson.active == True,
BuildDecisionBookPerson.is_confirmed == True,
).get(1)
manager_occupant_type = OccupantTypes.filter_by_one(
occupant_code="BU-MNG", occupant_category_type="BU"
@ -254,9 +258,10 @@ class DecisionBookPersonAssignOccupantEventMethods(MethodToEvent):
# book_person_supervisor.check_occupant_type(supervisor_occupant_type)
invitation = BuildDecisionBookInvitations.filter_one(
BuildDecisionBookInvitations.id==book_person_manager.invite_id,
BuildDecisionBookInvitations.build_id==token_dict.selected_occupant.build_id,
BuildDecisionBookInvitations.active==True,
BuildDecisionBookInvitations.id == book_person_manager.invite_id,
BuildDecisionBookInvitations.build_id
== token_dict.selected_occupant.build_id,
BuildDecisionBookInvitations.active == True,
).get(1)
if not invitation:
raise HTTPException(
@ -265,7 +270,7 @@ class DecisionBookPersonAssignOccupantEventMethods(MethodToEvent):
)
assign_occupant_type = OccupantTypes.filter_by_one(
uu_id=data.occupant_type_uu_id, active=True
uu_id=data.occupant_type_uu_id
).get(1)
if not assign_occupant_type:
raise HTTPException(
@ -288,11 +293,14 @@ class DecisionBookPersonAssignOccupantEventMethods(MethodToEvent):
detail=f"Person not found. Please check person uuid",
)
book_person_to_assign: BuildDecisionBookPerson = BuildDecisionBookPerson.filter_one(
BuildDecisionBookPerson.build_living_space_id==selected_living_space.id,
BuildDecisionBookPerson.invite_id==invitation.id,
BuildDecisionBookPerson.active==True
).get(1)
book_person_to_assign: BuildDecisionBookPerson = (
BuildDecisionBookPerson.filter_one(
BuildDecisionBookPerson.build_living_space_id
== selected_living_space.id,
BuildDecisionBookPerson.invite_id == invitation.id,
BuildDecisionBookPerson.active == True,
).get(1)
)
if not book_person_to_assign:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
@ -311,11 +319,14 @@ class DecisionBookPersonAssignOccupantEventMethods(MethodToEvent):
occupant_category_type="MT",
).get(1)
if assigned_book_person_occupant := BuildDecisionBookPersonOccupants.filter_one(
BuildDecisionBookPersonOccupants.invite_id==invitation.id,
BuildDecisionBookPersonOccupants.occupant_type_id==occupant_type_unique.id,
BuildDecisionBookPersonOccupants.active==True,
BuildDecisionBookPersonOccupants.is_confirmed==True,
).get(1):
BuildDecisionBookPersonOccupants.invite_id == invitation.id,
BuildDecisionBookPersonOccupants.occupant_type_id
== occupant_type_unique.id,
BuildDecisionBookPersonOccupants.active == True,
BuildDecisionBookPersonOccupants.is_confirmed == True,
).get(
1
):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Only one person can be assigned to {assign_occupant_type.occupant_code} type"
@ -324,10 +335,11 @@ class DecisionBookPersonAssignOccupantEventMethods(MethodToEvent):
if assign_occupant_type.occupant_code == "BU-MNG":
person_occupant_manager = BuildDecisionBookPersonOccupants.filter_one(
BuildDecisionBookPersonOccupants.invite_id==invitation.id,
BuildDecisionBookPersonOccupants.occupant_type_id==manager_occupant_type.id,
BuildDecisionBookPersonOccupants.active==True,
BuildDecisionBookPersonOccupants.is_confirmed==True,
BuildDecisionBookPersonOccupants.invite_id == invitation.id,
BuildDecisionBookPersonOccupants.occupant_type_id
== manager_occupant_type.id,
BuildDecisionBookPersonOccupants.active == True,
BuildDecisionBookPersonOccupants.is_confirmed == True,
)
person_occupant_manager.query.delete()

View File

@ -73,9 +73,9 @@ class BuildDecisionBookInvitationsCreateEventMethods(MethodToEvent):
# Check decision book is valid for this token and building
decision_book = BuildDecisionBook.filter_one(
BuildDecisionBook.uu_id==data.build_decision_book_uu_id,
BuildDecisionBook.build_id==token_dict.selected_occupant.build_id,
BuildDecisionBook.active==True,
BuildDecisionBook.uu_id == data.build_decision_book_uu_id,
BuildDecisionBook.build_id == token_dict.selected_occupant.build_id,
BuildDecisionBook.active == True,
).get(1)
if not decision_book:
raise HTTPException(
@ -83,7 +83,7 @@ class BuildDecisionBookInvitationsCreateEventMethods(MethodToEvent):
detail="Decision book not found. Please create decision book first",
)
occupant_building = Build.filter_one(
Build.id==token_dict.selected_occupant.build_id
Build.id == token_dict.selected_occupant.build_id
).get(1)
# Check meeting type is valid
@ -107,9 +107,9 @@ class BuildDecisionBookInvitationsCreateEventMethods(MethodToEvent):
)
# Create an invitation for specific invitation type to start invite sending process
planned_date_expires = str(
system_arrow.get(data.planned_date).shift(days=15).date()
),
planned_date_expires = (
str(system_arrow.get(data.planned_date).shift(days=15).date()),
)
book_invitation = BuildDecisionBookInvitations.find_or_create(
build_id=token_dict.selected_occupant.build_id,
build_uu_id=token_dict.selected_occupant.build_uuid,
@ -136,21 +136,24 @@ class BuildDecisionBookInvitationsCreateEventMethods(MethodToEvent):
# Get all the parts of the building that is occupant in token
build_parts = BuildParts.filter_all(
BuildParts.build_id == occupant_building.id,
BuildParts.active == True
BuildParts.build_id == occupant_building.id, BuildParts.active == True
).data
# Get all build living spaces that is found in building with distinct person id
occupants = OccupantTypes.filter_all(system=True)
BuildLivingSpace.filter_attr = None
build_living_spaces_people = BuildLivingSpace.filter_all(
BuildLivingSpace.build_parts_id.in_(
[build_part.id for build_part in build_parts.data]
),
BuildLivingSpace.occupant_type.in_(
[occupant.id for occupant in occupants.data]
),
).query.distinct(BuildLivingSpace.person_id).all()
build_living_spaces_people = (
BuildLivingSpace.filter_all(
BuildLivingSpace.build_parts_id.in_(
[build_part.id for build_part in build_parts.data]
),
BuildLivingSpace.occupant_type.in_(
[occupant.id for occupant in occupants.data]
),
)
.query.distinct(BuildLivingSpace.person_id)
.all()
)
if not build_living_spaces_people:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
@ -201,9 +204,9 @@ class BuildDecisionBookInvitationsCreateEventMethods(MethodToEvent):
if invitations_person and not invitations_person.is_found:
print(f'"{invitations_person.token}",')
spaces_user = Users.filter_one(
Users.active==True,
Users.is_confirmed==True,
Users.person_id==build_living_spaces_user.person_id,
Users.active == True,
Users.is_confirmed == True,
Users.person_id == build_living_spaces_user.person_id,
).data
# print(
# f"Invitation is send : {spaces_user.email} "

View File

@ -37,7 +37,10 @@ class EventBindOccupantEventMethods(MethodToEvent):
detail="This employee is not authorized to add event to this occupant",
)
occupants_build_part = BuildParts.find_one(uu_id=data.build_part_uu_id)
occupants_build_part = BuildParts.filter_one(
BuildParts.uu_id == data.build_part_uu_id,
BuildParts.active == True,
).data
if not occupants_build_part:
return JSONResponse(
content={
@ -48,7 +51,9 @@ class EventBindOccupantEventMethods(MethodToEvent):
status_code=status.HTTP_404_NOT_FOUND,
)
occupant_occupant_type = OccupantTypes.find_one(uu_id=data.occupant_uu_id)
occupant_occupant_type = OccupantTypes.filter_by_one(
uu_id=data.occupant_uu_id
).data
if not occupant_occupant_type:
return JSONResponse(
content={
@ -59,8 +64,9 @@ class EventBindOccupantEventMethods(MethodToEvent):
status_code=status.HTTP_404_NOT_FOUND,
)
events_to_add_to_occupant = Events.filter_active(
Events.uu_id.in_(list(data.event_uu_id_list))
events_to_add_to_occupant = Events.filter_all(
Events.uu_id.in_(list(data.event_uu_id_list)),
Events.active == True,
)
if not events_to_add_to_occupant.data:
return JSONResponse(
@ -85,11 +91,11 @@ class EventBindOccupantEventMethods(MethodToEvent):
status_code=status.HTTP_401_UNAUTHORIZED,
)
occupant_to_add_event = BuildLivingSpace.find_one(
build_parts_id=occupants_build_part.id,
occupant_type=occupant_occupant_type.id,
)
occupant_to_add_event = BuildLivingSpace.filter_one(
BuildLivingSpace.build_parts_id == occupants_build_part.id,
BuildLivingSpace.occupant_type == occupant_occupant_type.id,
BuildLivingSpace.active == True,
).data
if not occupant_to_add_event:
return JSONResponse(
content={

View File

@ -30,9 +30,12 @@ class ModulesBindOccupantEventMethods(MethodToEvent):
):
living_space = BuildLivingSpace.filter_one(
Modules.id == build_living_space_id
Modules.id == build_living_space_id,
Modules.active == True,
).data
modules = Modules.filter_one(
Modules.id == modules_id, Modules.active == True
).data
modules = Modules.filter_one(Modules.id == modules_id).data
service_build_dict = dict(build_living_space_id=living_space.id)
service_build_dict["expires_at"] = str(

View File

@ -37,11 +37,16 @@ class ServiceBindOccupantEventMethods(MethodToEvent):
from sqlalchemy.dialects.postgresql import insert
living_space = BuildLivingSpace.filter_one(
BuildLivingSpace.id == build_living_space_id
)
service = Services.filter_one(Services.id == service_id)
BuildLivingSpace.id == build_living_space_id,
BuildLivingSpace.active == True,
).data
service = Services.filter_one(
Services.id == service_id,
Services.active == True,
).data
add_events_list = Service2Events.filter_all(
Service2Events.service_id == service.id
Service2Events.service_id == service.id,
Service2Events.active == True,
).data
if not add_events_list:
raise Exception(
@ -85,10 +90,11 @@ class ServiceBindOccupantEventMethods(MethodToEvent):
detail="Employee is not authorized to add service to any occupant",
)
occupants_build_part = BuildParts.find_one(
uu_id=data.build_part_uu_id,
build_id=token_dict.selected_occupant.build_id,
)
occupants_build_part = BuildParts.filter_one(
BuildParts.uu_id == data.build_part_uu_id,
BuildParts.build_id == token_dict.selected_occupant.build_id,
BuildParts.active == True,
).data
print("occupants_build_part", occupants_build_part)
if not occupants_build_part:
return JSONResponse(
@ -100,7 +106,7 @@ class ServiceBindOccupantEventMethods(MethodToEvent):
status_code=status.HTTP_404_NOT_FOUND,
)
occupant_occupant_type = OccupantTypes.find_one(uu_id=data.occupant_uu_id)
occupant_occupant_type = OccupantTypes.filter_by_one(uu_id=data.occupant_uu_id)
if not occupant_occupant_type:
return JSONResponse(
content={
@ -111,7 +117,7 @@ class ServiceBindOccupantEventMethods(MethodToEvent):
status_code=status.HTTP_404_NOT_FOUND,
)
service = Services.find_one(uu_id=data.service_uu_id)
service = Services.filter_one(Services.uu_id == data.service_uu_id).data
if not service:
return JSONResponse(
content={
@ -122,26 +128,22 @@ class ServiceBindOccupantEventMethods(MethodToEvent):
status_code=status.HTTP_404_NOT_FOUND,
)
default_module = Modules.find_one(module_code="USR-PUB")
add_default_service = Services.find_one(module_id=default_module.id)
service_events = Service2Events.filter_all(
Service2Events.service_id == service.id,
)
default_service_events = Service2Events.filter_all(
Service2Events.service_id == add_default_service.id,
)
add_events_list = service_events.data + default_service_events.data
if not add_events_list:
Service2Events.active == True,
).data
if not service_events:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Service has no events registered. Please contact with your manager",
)
living_space = BuildLivingSpace.find_one(
build_parts_id=occupants_build_part.id,
occupant_types_id=occupant_occupant_type.id,
person_id=token_dict.person_id,
)
living_space = BuildLivingSpace.filter_one(
BuildLivingSpace.build_parts_id == occupants_build_part.id,
BuildLivingSpace.occupant_types_id == occupant_occupant_type.id,
BuildLivingSpace.person_id == token_dict.person_id,
BuildLivingSpace.active == True,
).data
if not living_space:
return JSONResponse(
content={
@ -160,7 +162,7 @@ class ServiceBindOccupantEventMethods(MethodToEvent):
"event_uu_id": str(service_event.event_uu_id),
"is_confirmed": True,
}
for service_event in add_events_list
for service_event in service_events
]
session_execute = Services.session.execute(
@ -186,22 +188,13 @@ class ServiceBindEmployeeEventMethods(MethodToEvent):
def bind_services_employee(cls, service_id: int, employee_id: int):
from sqlalchemy.dialects.postgresql import insert
employee = Employees.find_or_abort(
id=employee_id,
)
service = Services.find_or_abort(
id=service_id,
)
default_module = Modules.find_one(module_code="USR-PUB")
add_default_service = Services.find_one(module_id=default_module.id)
employee = Employees.filter_by_one(id=employee_id, *Employees.valid_record_dict)
service = Services.filter_by_one(id=service_id, *Services.valid_record_dict)
service_events = Service2Events.filter_all(
Service2Events.service_id == service.id,
)
default_service_events = Service2Events.filter_all(
Service2Events.service_id == add_default_service.id,
)
add_events_list = service_events.data + default_service_events.data
if not add_events_list:
Service2Events.active == True,
).data
if not service_events:
raise Exception(
"Service has no events registered. Please contact with your manager"
)
@ -214,7 +207,7 @@ class ServiceBindEmployeeEventMethods(MethodToEvent):
"event_uu_id": str(service_event.event_uu_id),
"is_confirmed": True,
}
for service_event in add_events_list
for service_event in service_events
]
session_execute = Services.session.execute(
@ -242,7 +235,7 @@ class ServiceBindEmployeeEventMethods(MethodToEvent):
detail="Occupant is not authorized to add service to any employee",
)
employee = Employees.find_one(uu_id=data.employee_uu_id)
employee = Employees.filter_by_one(uu_id=data.employee_uu_id, *Employees.valid_record_dict).data
if not employee:
return JSONResponse(
content={
@ -253,7 +246,7 @@ class ServiceBindEmployeeEventMethods(MethodToEvent):
status_code=status.HTTP_404_NOT_FOUND,
)
service = Services.find_one(uu_id=data.service_uu_id)
service = Services.filter_by_one(uu_id=data.service_uu_id, *Services.valid_record_dict).data
if not service:
return JSONResponse(
content={
@ -266,16 +259,9 @@ class ServiceBindEmployeeEventMethods(MethodToEvent):
service_events = Service2Events.filter_all(
Service2Events.service_id == service.id,
)
default_module = Modules.find_one(module_code="USR-PUB")
add_default_service = Services.find_one(module_id=default_module.id)
default_service_events = Service2Events.filter_all(
Service2Events.service_id == add_default_service.id,
)
add_events_list = service_events.data + default_service_events.data
if not add_events_list:
Service2Events.active == True,
).data
if not service_events:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Service has no events registered. Please contact with your manager",
@ -289,7 +275,7 @@ class ServiceBindEmployeeEventMethods(MethodToEvent):
"event_uu_id": service_event.event_uu_id,
"is_confirmed": True,
}
for service_event in add_events_list
for service_event in service_events
]
session_execute = Services.session.execute(

View File

@ -39,6 +39,7 @@ class EventsListEventMethods(MethodToEvent):
Events.filter_attr = list_options
records = Events.filter_active(
*Events.get_smart_query(list_options.query),
Events.active == True,
)
return AlchemyJsonResponse(
completed=True,
@ -82,7 +83,7 @@ class EventsUpdateEventMethods(MethodToEvent):
@classmethod
def events_update(cls, data: CreateEvents, token_dict):
event = Events.find_one(uu_id=data.uu_id)
event = Events.filter_by_one(uu_id=data.uu_id, *Events.valid_record_dict)
if not event:
raise HTTPException(
status_code=404,
@ -112,7 +113,7 @@ class EventsPatchEventMethods(MethodToEvent):
@classmethod
def events_patch(cls, data: CreateEvents, token_dict):
event = Events.find_one(uu_id=data.uu_id)
event = Events.filter_by_one(uu_id=data.uu_id, *Events.valid_record_dict)
if not event:
raise HTTPException(
status_code=404,
@ -141,18 +142,29 @@ class EventsBindEventToOccupantMethods(MethodToEvent):
@classmethod
def bind_events_employee(cls, data: RegisterEvents2Employee, token_dict):
events = Events.filter_active(Events.uu_id.in_(data.event_id))
events = Events.filter_all(
Events.uu_id.in_(data.event_id),
*Events.valid_record_args(Events),
).data
if not events:
raise HTTPException(
status_code=401,
detail="No event found. Please contact your super user.",
)
employee = Employees.find_one(employee_uu_id=data.employee_uu_id)
employee_is_not_valid = False
employee = Employees.filter_one(
Employees.employee_uu_id == data.employee_uu_id,
*Employees.valid_record_args(Employees),
).data
if employee:
staff = Staff.find_one(id=employee.staff_id)
duties = Duties.find_one(id=staff.duties_id)
staff = Staff.filter_one(
Staff.id == employee.staff_id,
*Staff.valid_record_args(Staff),
).data
duties = Duties.filter_one(
Duties.id == staff.duties_id,
*Duties.valid_record_args(Duties),
).data
if duties.company_id not in token_dict.companies_id_list:
employee_is_not_valid = True
@ -162,7 +174,7 @@ class EventsBindEventToOccupantMethods(MethodToEvent):
detail="This employee can not be reached by this user. Please contact your super user.",
)
for event in events.data:
for event in events:
employee = Event2Employee.find_or_create(
**token_dict.user_creds, employee_id=employee.id, event_id=event.id
)
@ -183,21 +195,25 @@ class EventsBindEventToEmployeeMethods(MethodToEvent):
@classmethod
def bind_events_occupant(cls, data: RegisterEvents2Occupant, token_dict):
events = Events.filter_active(Events.uu_id.in_(data.event_id))
events = Events.filter_all(
Events.uu_id.in_(data.event_id),
*Events.valid_record_args(Events),
).data
if not events:
raise HTTPException(
status_code=401,
detail="No event found. Please contact your super user.",
)
occupant = BuildLivingSpace.find_one(uu_id=data.build_living_space_uu_id)
occupant_is_not_valid = False
if occupant_is_not_valid:
occupant = BuildLivingSpace.filter_one(
BuildLivingSpace.uu_id == data.build_living_space_uu_id,
*BuildLivingSpace.valid_record_args(BuildLivingSpace),
).data
if not occupant:
raise HTTPException(
status_code=401,
detail="This occupant can not be reached by this user. Please contact your super user.",
)
for event in events.data:
for event in events:
occupant = Event2Occupant.find_or_create(
**token_dict.user_creds,
build_living_space_id=occupant.id,

View File

@ -1,3 +1,5 @@
from typing import Union
from api_validations.validations_request import (
DepartmentsPydantic,
PatchRecord,
@ -12,27 +14,49 @@ from api_validations.core_response import AlchemyJsonResponse
class ServicesEvents(MethodToEvent):
@classmethod
def services_list(cls, list_options: ListOptions):
def services_list(
cls,
list_options: ListOptions,
token_dict: Union[EmployeeTokenObject, OccupantTokenObject],
):
return
@classmethod
def services_create(cls, data: DepartmentsPydantic, token_dict):
def services_create(
cls,
data: DepartmentsPydantic,
token_dict: Union[EmployeeTokenObject, OccupantTokenObject],
):
return
@classmethod
def services_update(cls, service_uu_id: str, data: DepartmentsPydantic, token_dict):
def services_update(
cls,
service_uu_id: str,
data: DepartmentsPydantic,
token_dict: Union[EmployeeTokenObject, OccupantTokenObject],
):
return
@classmethod
def services_patch(cls, service_uu_id: str, data: PatchRecord, token_dict):
def services_patch(
cls,
service_uu_id: str,
data: PatchRecord,
token_dict: Union[EmployeeTokenObject, OccupantTokenObject],
):
return
@classmethod
def bind_service_to_action(cls, data, token_dict):
def bind_service_to_action(
cls, data, token_dict: Union[EmployeeTokenObject, OccupantTokenObject]
):
return
@classmethod
def bind_module_to_service(cls, data, token_dict):
def bind_module_to_service(
cls, data, token_dict: Union[EmployeeTokenObject, OccupantTokenObject]
):
return
@classmethod

View File

@ -27,10 +27,13 @@ class PeopleListEventMethods(MethodToEvent):
}
@classmethod
def super_users_people_list(cls, list_options, token_dict):
records = People.filter_active(
*People.get_smart_query(smart_query=list_options.query)
)
def super_users_people_list(
cls, list_options, token_dict: Union[EmployeeTokenObject, OccupantTokenObject]
):
records = People.filter_all(
*People.get_smart_query(smart_query=list_options.query),
*People.valid_record_args(People),
).data
return AlchemyJsonResponse(
completed=True,
message="People are listed successfully",
@ -38,9 +41,13 @@ class PeopleListEventMethods(MethodToEvent):
)
@classmethod
def sales_users_people_list(cls, data, token_dict):
records = People.filter_active(*People.get_smart_query(smart_query=data.query))
def sales_users_people_list(
cls, list_options, token_dict: Union[EmployeeTokenObject, OccupantTokenObject]
):
records = People.filter_all(
*People.get_smart_query(smart_query=list_options.query),
*People.valid_record_args(People),
).data
return AlchemyJsonResponse(
completed=True,
message="People are listed successfully",
@ -48,9 +55,13 @@ class PeopleListEventMethods(MethodToEvent):
)
@classmethod
def human_resources_users_people_list(cls, data, token_dict):
records = People.filter_active(*People.get_smart_query(smart_query=data.query))
def human_resources_users_people_list(
cls, list_options, token_dict: Union[EmployeeTokenObject, OccupantTokenObject]
):
records = People.filter_all(
*People.get_smart_query(smart_query=list_options.query),
*People.valid_record_args(People),
).data
return AlchemyJsonResponse(
completed=True,
message="People are listed successfully",
@ -97,7 +108,10 @@ class PeopleUpdateEventMethods(MethodToEvent):
user_uu_id: str,
token_dict: Union[EmployeeTokenObject, OccupantTokenObject],
):
find_one_user = Users.find_one_or_abort(uu_id=user_uu_id)
find_one_user = Users.filter_one(
Users.uu_id == user_uu_id,
*Users.valid_record_args(Users),
).data
access_authorized_company = Companies.select_action(
duty_id_list=[getattr(token_dict, "duty_id")],
filter_expr=[Companies.id == find_one_user.id],

View File

@ -40,19 +40,27 @@ class UserListEventMethods(MethodToEvent):
# ])
if "user_uu_id_list" in list_options.query:
people_ids = list_options.query.pop("user_uu_id_list")
people_id_list = [
people_id_list = (
user.person_id
for user in Users.filter_active(Users.uu_id.in_(people_ids)).data
]
for user in Users.filter_all(
Users.uu_id.in_(people_ids), *Users.valid_record_args(Users)
).data
)
People.filter_attr = list_options
records = People.filter_active(People.id.in_(people_id_list))
records = People.filter_all(
People.id.in_(people_id_list),
*People.valid_record_args(People),
).data
return AlchemyJsonResponse(
completed=True,
message="Users are listed successfully",
result=records,
)
People.filter_attr = list_options
records = Users.filter_active(*Users.get_smart_query(list_options.query))
Users.filter_attr = list_options
records = Users.filter_all(
*Users.get_smart_query(list_options.query),
*Users.valid_record_args(Users),
)
return AlchemyJsonResponse(
completed=True,
message="Users are listed successfully",
@ -114,9 +122,12 @@ class UserUpdateEventMethods(MethodToEvent):
user_uu_id: str,
token_dict: typing.Union[EmployeeTokenObject, OccupantTokenObject],
):
find_one_user = Users.find_one_or_abort(uu_id=user_uu_id)
find_one_user = Users.filter_one(
Users.uu_id==user_uu_id,
*Users.valid_record_args(Users),
).data
access_authorized_company = Companies.select_action(
duty_id=getattr(token_dict, "duty_id", 5),
duty_id_list=[getattr(token_dict, "duty_id", 5)],
filter_expr=[Companies.id == token_dict.get("")],
)
if access_authorized_company.count:
@ -146,9 +157,12 @@ class UserPatchEventMethods(MethodToEvent):
@classmethod
def user_patch(cls, data: PatchRecord, user_uu_id: str, token_dict):
find_one_user = Users.find_one_or_abort(uu_id=user_uu_id)
find_one_user = Users.filter_one(
Users.uu_id==user_uu_id,
*Users.valid_record_args(Users),
).data
access_authorized_company = Companies.select_action(
duty_id=getattr(token_dict, "duty_id", 5),
duty_id_list=[getattr(token_dict, "duty_id", 5)],
filter_expr=[Companies.id == find_one_user.id],
)
if access_authorized_company.count:

View File

@ -74,11 +74,13 @@ class AuthModule(PasswordModule):
def remove_refresher_token(self, domain, disconnect: bool = False):
if disconnect:
registered_tokens = UsersTokens.filter_all(
UsersTokens.user_id==self.id, system=True
UsersTokens.user_id == self.id, system=True
)
else:
registered_tokens = UsersTokens.filter_all(
UsersTokens.domain==domain, UsersTokens.user_id==self.id, system=True
UsersTokens.domain == domain,
UsersTokens.user_id == self.id,
system=True,
)
registered_tokens.query.delete()
UsersTokens.save()

View File

@ -341,15 +341,16 @@ class Build(CrudCollection, SelectActionWithEmployee):
@classmethod
def update_action(cls, data: UpdateBuild, build_uu_id: str, token):
from databases import Addresses
data_dict = data.excluded_dump()
if data.official_address_uu_id:
official_address = Addresses.filter_one(
Addresses.uu_id==data.address_uu_id
Addresses.uu_id == data.address_uu_id
).data
data_dict["address_id"] = official_address.id if official_address else None
del data_dict["address_uu_id"]
if build_to_update := cls.filter_one(
cls.uu_id==build_uu_id, cls.person_id==token.id
cls.uu_id == build_uu_id, cls.person_id == token.id
).data:
return build_to_update.update(**data_dict)
@ -491,7 +492,7 @@ class BuildParts(CrudCollection):
)
if build_types := BuildTypes.filter_one(
BuildTypes.uu_id==data.build_part_type_uu_id
BuildTypes.uu_id == data.build_part_type_uu_id
).data:
part_direction = ApiEnumDropdown.get_by_uuid(
uuid=str(data.part_direction_uu_id)

View File

@ -406,9 +406,9 @@ class BuildDecisionBookPerson(CrudCollection):
**book_dict
):
decision_book = BuildDecisionBook.filter_one(
BuildDecisionBook.id==self.build_decision_book_id,
BuildDecisionBook.active==True,
BuildDecisionBook.is_confirmed==True
BuildDecisionBook.id == self.build_decision_book_id,
BuildDecisionBook.active == True,
BuildDecisionBook.is_confirmed == True,
).data
person_occupants.update(
expiry_starts=decision_book.expiry_starts,
@ -417,8 +417,7 @@ class BuildDecisionBookPerson(CrudCollection):
if build_living_space_id:
related_service = Services.filter_by_one(
related_responsibility=str(occupant_type.occupant_code),
active=True,
is_confirmed=True,
*Services.valid_record_dict
)
if not related_service:
raise HTTPException(
@ -427,9 +426,9 @@ class BuildDecisionBookPerson(CrudCollection):
)
decision_build = Build.filter_one(
Build.id==decision_book.build_id,
Build.active==True,
Build.is_confirmed==True
Build.id == decision_book.build_id,
Build.active == True,
Build.is_confirmed == True,
).data
management_room = decision_build.management_room
if not management_room:
@ -439,16 +438,14 @@ class BuildDecisionBookPerson(CrudCollection):
)
living_space = BuildLivingSpace.filter_one(
BuildLivingSpace.id==build_living_space_id,
BuildLivingSpace.active==True,
BuildLivingSpace.is_confirmed==True
BuildLivingSpace.id == build_living_space_id,
BuildLivingSpace.active == True,
BuildLivingSpace.is_confirmed == True,
).data
expiry_ends = str(
system_arrow.get(decision_book.meeting_date).shift(hours=23)
)
expiry_starts = str(
system_arrow.get(decision_book.meeting_date)
)
expiry_starts = str(system_arrow.get(decision_book.meeting_date))
related_living_space = BuildLivingSpace.find_or_create(
build_parts_id=management_room.id,
build_parts_uu_id=str(management_room.uu_id),
@ -481,10 +478,10 @@ class BuildDecisionBookPerson(CrudCollection):
def check_occupant_type(self, occupant_type):
book_person_occupant_type = BuildDecisionBookPersonOccupants.filter_one(
BuildDecisionBookPersonOccupants.build_decision_book_person_id==self.id,
BuildDecisionBookPersonOccupants.occupant_type_id==occupant_type.id,
BuildDecisionBookPersonOccupants.active==True,
BuildDecisionBookPersonOccupants.is_confirmed==True,
BuildDecisionBookPersonOccupants.build_decision_book_person_id == self.id,
BuildDecisionBookPersonOccupants.occupant_type_id == occupant_type.id,
BuildDecisionBookPersonOccupants.active == True,
BuildDecisionBookPersonOccupants.is_confirmed == True,
).data
if not book_person_occupant_type:
raise HTTPException(

View File

@ -80,6 +80,8 @@ class CrudMixin(Base, SmartQueryMixin, SessionMixin, FilterAttributes):
creds: Credentials = None # The credentials to use in the model.
client_arrow: DateTimeLocal = None # The arrow to use in the model.
valid_record_dict: dict = {"active": True, "deleted": False}
valid_record_args = lambda class_: [class_.active == True, class_.deleted == False]
expiry_starts: Mapped[TIMESTAMP] = mapped_column(
TIMESTAMP, server_default=func.now(), nullable=False

View File

@ -181,12 +181,16 @@ class Event2Occupant(CrudCollection):
def get_event_id_by_build_living_space_id(
cls, build_living_space_id
) -> (list, list):
active_events = cls.filter_by_active(
build_living_space_id=build_living_space_id
)
active_events_id = [event.event_id for event in active_events.data]
active_events = Events.filter_active(Events.id.in_(active_events_id))
active_events_uu_id = [str(event.uu_id) for event in active_events.data]
active_events = cls.filter_all(
cls.build_living_space_id==build_living_space_id,
*cls.valid_record_args(cls)
).data
active_events_id = [event.event_id for event in active_events]
active_events = Events.filter_all(
Events.id.in_(active_events_id),
*Events.valid_record_args(Events)
).data
active_events_uu_id = [str(event.uu_id) for event in active_events]
return active_events_id, active_events_uu_id

View File

@ -136,8 +136,10 @@ class Users(CrudCollection, UserLoginModule, SelectAction):
@classmethod
def create_action(cls, create_user: InsertUsers):
found_person = People.find_one(uu_id=create_user.people_uu_id)
found_person = People.filter_one(
People.uu_id==create_user.people_uu_id,
*People.valid_record_args(People),
).data
if not found_person:
raise HTTPException(status_code=400, detail="Person not found.")
if (
@ -403,12 +405,11 @@ class People(CrudCollection, SelectAction):
create_dict["surname"] = str(create_dict["surname"]).upper()
create_dict["birth_place"] = str(create_dict["birth_place"]).upper()
created_people = cls.find_or_create(**create_dict)
if not created_people.is_found:
RelationshipDutyPeople.find_or_create(
company_id=token.selected_company.company_id,
duties_id=bulk_duty.id,
member_id=created_people.id,
)
RelationshipDutyPeople.find_or_create(
company_id=token.selected_company.company_id,
duties_id=bulk_duty.id,
member_id=created_people.id,
)
return created_people
@ -495,7 +496,7 @@ class Addresses(CrudCollection):
post_code_list = RelationshipEmployee2PostCode.filter_all(
RelationshipEmployee2PostCode.employee_id
== token_dict.selected_company.employee_id,
RelationshipEmployee2PostCode.active==True,
RelationshipEmployee2PostCode.active == True,
).data
post_code_id_list = [post_code.member_id for post_code in post_code_list]
if not post_code_id_list:

View File

@ -39,14 +39,14 @@ class ApiEnumDropdown(BaseCollection):
if search := cls.query.filter(
cls.enum_class.in_(["DebitTypes"]),
cls.uu_id == search_uu_id,
cls.active == True,
cls.active == True,
).first():
return search
elif search_debit:
if search := cls.query.filter(
cls.enum_class.in_(["DebitTypes"]),
cls.key == search_debit,
cls.active == True,
cls.active == True,
).first():
return search
return cls.query.filter(
@ -57,7 +57,8 @@ class ApiEnumDropdown(BaseCollection):
@classmethod
def get_due_types(cls):
if due_list := cls.filter_all(
cls.enum_class == "BuildDuesTypes", cls.key.in_(["BDT-A", "BDT-D"]),
cls.enum_class == "BuildDuesTypes",
cls.key.in_(["BDT-A", "BDT-D"]),
cls.active == True,
).data:
return [due.uu_id.__str__() for due in due_list]
@ -72,14 +73,14 @@ class ApiEnumDropdown(BaseCollection):
if search := cls.query.filter(
cls.enum_class.in_(["BuildDuesTypes"]),
cls.uu_id == search_uu_id,
cls.active == True,
cls.active == True,
).first():
return search
elif search_management:
if search := cls.query.filter(
cls.enum_class.in_(["BuildDuesTypes"]),
cls.key == search_management,
cls.active == True,
cls.active == True,
).first():
return search
return cls.query.filter(
@ -98,9 +99,13 @@ class ApiEnumDropdown(BaseCollection):
@classmethod
def uuid_of_enum(cls, enum_class: str, key: str):
return str(getattr(cls.filter_one(
cls.enum_class==enum_class, cls.key==key
).data, "uu_id", None))
return str(
getattr(
cls.filter_one(cls.enum_class == enum_class, cls.key == key).data,
"uu_id",
None,
)
)
ApiEnumDropdown.set_session(ApiEnumDropdown.__session__)

View File

@ -76,22 +76,21 @@ def authentication_refresh_user_info(request: Request):
@login_route.post(path="/change_password", summary="Change password with access token")
def authentication_change_password(request: Request, data: ChangePassword):
token_dict = parse_token_object_to_dict(request=request)
active_function = getattr(
AuthenticationChangePasswordEventMethod, "authentication_change_password"
)
return active_function(data=data, request=request, token_dict=None)
return active_function(data=data, token_dict=token_dict)
@login_route.post(
path="/create_password", summary="Create password with password token"
)
def authentication_create_password(request: Request, data: CreatePassword):
def authentication_create_password(data: CreatePassword):
active_function = getattr(
AuthenticationCreatePasswordEventMethod, "authentication_create_password"
)
return active_function(data=data, request=request, token_dict=None)
return active_function(data=data)
@login_route.post(path="/disconnect", summary="Disconnect user with access token")