import json import typing from fastapi import status from fastapi.exceptions import HTTPException from api_configs import Auth from api_objects import ( OccupantTokenObject, EmployeeTokenObject, UserType, ) from api_services.redis.conn import redis_cli from api_services.redis.functions import ( get_object_via_user_uu_id, get_object_via_access_key, ) from databases.sql_models.building.build import Build from databases.sql_models.identity.identity import Addresses, OccupantTypes def save_object_to_redis( access_token, model_object: typing.Union[OccupantTokenObject, EmployeeTokenObject] ) -> bool: try: if redis_cli.set( name=str(access_token) + ":" + str(model_object.user_uu_id), value=model_object.model_dump_json(), ): return access_token except Exception as e: print("Save Object to Redis Error: ", e) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=dict( message="Headers are not found in request. Invalid request object. Redis Error: Token is not saved." ), ) def save_access_token_to_redis( request, found_user, domain: str, access_token: str = None ): from databases import ( BuildLivingSpace, BuildParts, Companies, Duties, Departments, Duty, Employees, Staff, ) if not found_user: raise HTTPException( status_code=400, detail=dict(message="User is not found."), ) # Check user is already logged in or has a previous session already_tokens = get_object_via_user_uu_id(user_id=found_user.uu_id) for key, token_user in already_tokens.items(): if token_user.get("domain", "") == domain: redis_cli.delete(key) access_token = ( found_user.generate_access_token() if not access_token else access_token ) # Prepare the user's details to save in Redis Session if found_user.is_occupant: # Check if user is NOT an occupant living_spaces: list[BuildLivingSpace] = BuildLivingSpace.filter_all( BuildLivingSpace.person_id == found_user.person_id ).data if not living_spaces: raise HTTPException( status_code=400, detail=dict( message="NO Living Space is found. This user has no proper account set please contact the admin." ), ) occupants_selection_dict = {} for living_space in living_spaces: build_parts_selection = BuildParts.filter_all( BuildParts.id == living_space.build_parts_id, ) if not build_parts_selection.data: raise HTTPException( status_code=400, detail=dict( message="No build Part is found for the living space. Please contact the admin." ), ) build_part = build_parts_selection.get(1) build = build_part.buildings occupant_type = OccupantTypes.filter_by_one( id=living_space.occupant_type, system=True, ).data if not str(build.uu_id) in occupants_selection_dict: occupants_selection_dict[str(build.uu_id)] = dict( build_uu_id=str(build.uu_id), build_name=build.build_name, build_no=build.build_no, occupants=[ dict( part_uu_id=str(build_part.uu_id), part_name=build_part.part_name, part_level=build_part.part_level, uu_id=str(occupant_type.uu_id), description=occupant_type.occupant_description, code=occupant_type.occupant_code, ) ], ) elif str(build.uu_id) in occupants_selection_dict: occupants_selection_dict[str(build.uu_id)]["occupants"].append( dict( part_uu_id=str(build_part.uu_id), part_name=build_part.part_name, part_level=build_part.part_level, uu_id=str(occupant_type.uu_id), description=occupant_type.occupant_description, code=occupant_type.occupant_code, ) ) save_object_to_redis( access_token=access_token, model_object=OccupantTokenObject( domain=domain, user_type=UserType.occupant.value, user_uu_id=str(found_user.uu_id), credentials=found_user.credentials(), user_id=found_user.id, person_id=found_user.person_id, person_uu_id=str(found_user.person.uu_id), request=dict(request.headers), available_occupants=occupants_selection_dict, ), ) return dict( user_type=UserType.occupant.name, available_occupants=occupants_selection_dict, ) list_employee = Employees.filter_all( Employees.people_id == found_user.person_id, ).data companies_uu_id_list, companies_id_list, companies_list = [], [], [] duty_uu_id_list, duty_id_list = [], [] for employee in list_employee: staff = Staff.filter_one(Staff.id == employee.staff_id).data if duties := Duties.filter_one(Duties.id == staff.duties_id).data: if duty_found := Duty.filter_by_one(id=duties.duties_id).data: duty_uu_id_list.append(str(duty_found.uu_id)) duty_id_list.append(duty_found.id) department = Departments.filter_one( Departments.id == duties.department_id, ).data if company := Companies.filter_one( Companies.id == department.company_id, ).data: companies_uu_id_list.append(str(company.uu_id)) companies_id_list.append(company.id) company_address = Addresses.filter_by_one( id=company.official_address_id ).data companies_list.append( dict( uu_id=str(company.uu_id), public_name=company.public_name, company_type=company.company_type, company_address=company_address, ) ) save_object_to_redis( access_token=access_token, model_object=EmployeeTokenObject( domain=domain, user_type=UserType.employee.value, user_uu_id=str(found_user.uu_id), credentials=found_user.credentials(), user_id=found_user.id, person_id=found_user.person_id, person_uu_id=str(found_user.person.uu_id), request=dict(request.headers), companies_uu_id_list=companies_uu_id_list, companies_id_list=companies_id_list, duty_uu_id_list=duty_uu_id_list, duty_id_list=duty_id_list, ), ) return dict( user_type=UserType.employee.name, companies_list=companies_list, ) def update_selected_to_redis(request, add_payload): already_tokens = get_object_via_access_key(request=request) if not hasattr(request, "headers"): raise HTTPException( status_code=401, detail=dict( message="Headers are not found in request. Invalid request object." ), ) access_token = request.headers.get(Auth.ACCESS_TOKEN_TAG) if already_tokens.user_type == UserType.occupant.value: already_tokens.selected_occupant = add_payload.model_dump() return save_object_to_redis( access_token=access_token, model_object=OccupantTokenObject(**already_tokens.model_dump()), ) elif already_tokens.user_type == UserType.employee.value: already_tokens.selected_company = add_payload.model_dump() return save_object_to_redis( access_token=access_token, model_object=EmployeeTokenObject(**already_tokens.model_dump()), ) raise HTTPException( status_code=401, detail=dict( message="User type is not found in the token object. Please reach to your administrator." ), ) def update_access_token_to_redis(request, add_payload): already_tokens = get_object_via_access_key(request=request) if not hasattr(request, "headers"): raise HTTPException( status_code=401, detail=dict( message="Headers are not found in request. Invalid request object." ), ) payload = {**add_payload, **already_tokens} access_token = request.headers.get(Auth.ACCESS_TOKEN_TAG) if payload.get("user_type") == str(UserType.occupant.value): return save_object_to_redis( access_token=access_token, model_object=OccupantTokenObject(**payload), ) return save_object_to_redis( access_token=access_token, model_object=EmployeeTokenObject(**payload), )