wag-managment-api-service-v.../ApiServices/api_handlers/auth_actions/auth.py

246 lines
9.9 KiB
Python

from fastapi.exceptions import HTTPException
from api_configs import Auth
from api_objects import (
OccupantTokenObject,
EmployeeTokenObject,
UserType,
)
from api_services import RedisActions
from ApiServices.api_handlers.auth_actions.token import AccessObjectActions
class AuthActions:
@classmethod
def do_occupant_login_token(
cls, request, found_user, domain, access_token: str = None
):
from databases import BuildLivingSpace, BuildParts, OccupantTypes
living_spaces: list[BuildLivingSpace] = BuildLivingSpace.filter_all(
BuildLivingSpace.person_id == found_user.person_id
).data
if not living_spaces:
raise HTTPException(
status_code=400,
detail=dict(
message="NO Living Space is found. This user has no proper account set please contact the admin."
),
)
occupants_selection_dict = {}
for living_space in living_spaces:
build_parts_selection = BuildParts.filter_all(
BuildParts.id == living_space.build_parts_id,
)
if not build_parts_selection.data:
raise HTTPException(
status_code=400,
detail=dict(
message="No build Part is found for the living space. Please contact the admin."
),
)
build_part = build_parts_selection.get(1)
build = build_part.buildings
occupant_type = OccupantTypes.filter_by_one(
id=living_space.occupant_type,
system=True,
).data
if not str(build.uu_id) in occupants_selection_dict:
occupants_selection_dict[str(build.uu_id)] = dict(
build_uu_id=str(build.uu_id),
build_name=build.build_name,
build_no=build.build_no,
occupants=[
dict(
part_uu_id=str(build_part.uu_id),
part_name=build_part.part_name,
part_level=build_part.part_level,
uu_id=str(occupant_type.uu_id),
description=occupant_type.occupant_description,
code=occupant_type.occupant_code,
)
],
)
elif str(build.uu_id) in occupants_selection_dict:
occupants_selection_dict[str(build.uu_id)]["occupants"].append(
dict(
part_uu_id=str(build_part.uu_id),
part_name=build_part.part_name,
part_level=build_part.part_level,
uu_id=str(occupant_type.uu_id),
description=occupant_type.occupant_description,
code=occupant_type.occupant_code,
)
)
AccessObjectActions.save_object_to_redis(
access_token=access_token,
model_object=OccupantTokenObject(
domain=domain,
user_type=UserType.occupant.value,
user_uu_id=str(found_user.uu_id),
credentials=found_user.credentials(),
user_id=found_user.id,
person_id=found_user.person_id,
person_uu_id=str(found_user.person.uu_id),
request=dict(request.headers),
available_occupants=occupants_selection_dict,
),
)
return dict(
user_type=UserType.occupant.name,
available_occupants=occupants_selection_dict,
)
@classmethod
def do_employee_login_token(
cls, request, found_user, domain, access_token: str = None
):
from databases import (
Companies,
Duties,
Departments,
Duty,
Employees,
Staff,
Addresses,
)
list_employee = Employees.filter_all(
Employees.people_id == found_user.person_id,
).data
companies_uu_id_list, companies_id_list, companies_list = [], [], []
duty_uu_id_list, duty_id_list = [], []
for employee in list_employee:
staff = Staff.filter_one(Staff.id == employee.staff_id).data
if duties := Duties.filter_one(Duties.id == staff.duties_id).data:
if duty_found := Duty.filter_by_one(id=duties.duties_id).data:
duty_uu_id_list.append(str(duty_found.uu_id))
duty_id_list.append(duty_found.id)
department = Departments.filter_one(
Departments.id == duties.department_id,
).data
if company := Companies.filter_one(
Companies.id == department.company_id,
).data:
companies_uu_id_list.append(str(company.uu_id))
companies_id_list.append(company.id)
company_address = Addresses.filter_by_one(
id=company.official_address_id
).data
companies_list.append(
dict(
uu_id=str(company.uu_id),
public_name=company.public_name,
company_type=company.company_type,
company_address=company_address,
)
)
AccessObjectActions.save_object_to_redis(
access_token=access_token,
model_object=EmployeeTokenObject(
domain=domain,
user_type=UserType.employee.value,
user_uu_id=str(found_user.uu_id),
credentials=found_user.credentials(),
user_id=found_user.id,
person_id=found_user.person_id,
person_uu_id=str(found_user.person.uu_id),
request=dict(request.headers),
companies_uu_id_list=companies_uu_id_list,
companies_id_list=companies_id_list,
duty_uu_id_list=duty_uu_id_list,
duty_id_list=duty_id_list,
),
)
return dict(
user_type=UserType.employee.name,
companies_list=companies_list,
)
@classmethod
def check_user_and_delete_tokens_from_redis(cls, domain: str, found_user=None):
# Check user is already logged in or has a previous session
if not found_user:
raise HTTPException(
status_code=400,
detail=dict(message="User is not found."),
)
already_tokens = AccessObjectActions.get_object_via_user_uu_id(
user_id=found_user.uu_id
)
for key, token_user in already_tokens.items():
if token_user.get("domain", "") == domain:
RedisActions.delete(key)
@classmethod
def save_access_token_to_redis(
cls, request, found_user, domain: str, access_token: str = None
):
from databases.sql_models.identity.identity import Users
found_user: Users = found_user
cls.check_user_and_delete_tokens_from_redis(domain, found_user)
access_token = (
found_user.generate_access_token() if not access_token else access_token
)
# Prepare the user's details to save in Redis Session
if found_user.is_occupant: # Check if user is NOT an occupant
return cls.do_occupant_login_token(
request, found_user, domain, access_token
)
return cls.do_employee_login_token(request, found_user, domain, access_token)
@classmethod
def update_selected_to_redis(cls, request, add_payload):
already_tokens = AccessObjectActions.get_object_via_access_key(request=request)
if not hasattr(request, "headers"):
raise HTTPException(
status_code=401,
detail=dict(
message="Headers are not found in request. Invalid request object."
),
)
access_token = request.headers.get(Auth.ACCESS_TOKEN_TAG)
if already_tokens.user_type == UserType.occupant.value:
already_tokens.selected_occupant = add_payload.model_dump()
return AccessObjectActions.save_object_to_redis(
access_token=access_token,
model_object=OccupantTokenObject(**already_tokens.model_dump()),
)
elif already_tokens.user_type == UserType.employee.value:
already_tokens.selected_company = add_payload.model_dump()
return AccessObjectActions.save_object_to_redis(
access_token=access_token,
model_object=EmployeeTokenObject(**already_tokens.model_dump()),
)
raise HTTPException(
status_code=401,
detail=dict(
message="User type is not found in the token object. Please reach to your administrator."
),
)
@classmethod
def update_access_token_to_redis(cls, request, add_payload):
already_tokens = AccessObjectActions.get_object_via_access_key(request=request)
if not hasattr(request, "headers"):
raise HTTPException(
status_code=401,
detail=dict(
message="Headers are not found in request. Invalid request object."
),
)
payload = {**add_payload, **already_tokens}
access_token = request.headers.get(Auth.ACCESS_TOKEN_TAG)
if payload.get("user_type") == str(UserType.occupant.value):
return AccessObjectActions.save_object_to_redis(
access_token=access_token,
model_object=OccupantTokenObject(**payload),
)
return AccessObjectActions.save_object_to_redis(
access_token=access_token,
model_object=EmployeeTokenObject(**payload),
)