from typing import Any from fastapi import Request from ApiLayers.ApiLibrary.common.line_number import get_line_number_for_error from ApiLayers.ApiServices.Login.user_login_handler import UserLoginModule from ApiLayers.ApiServices.Token.token_handler import TokenService from ApiLayers.ApiValidations.Custom.token_objects import CompanyToken, OccupantToken from ApiLayers.ApiValidations.Response.default_response import EndpointSuccessResponse from ApiLayers.ErrorHandlers import HTTPExceptionApi from ApiLayers.Schemas import ( BuildLivingSpace, BuildParts, RelationshipEmployee2Build, Companies, Departments, Duties, Duty, Staff, Employees, Event2Employee, Event2Occupant, OccupantTypes, Users, ) from Events.base_request_model import ContextRetrievers, TokenDictType class Handlers: """Class for handling authentication functions""" @classmethod # Requires no auth context def handle_employee_selection(cls, request: Request, data: Any, token_dict: TokenDictType): Users.set_user_define_properties(token=token_dict) db = Users.new_session() if data.company_uu_id not in token_dict.companies_uu_id_list: raise HTTPExceptionApi( error_code="HTTP_400_BAD_REQUEST", lang=token_dict.lang, loc=get_line_number_for_error(), sys_msg="Company not found in token", ) selected_company: Companies = Companies.filter_one( Companies.uu_id == data.company_uu_id, db=db, ).data if not selected_company: raise HTTPExceptionApi( error_code="HTTP_400_BAD_REQUEST", lang=token_dict.lang, loc=get_line_number_for_error(), sys_msg="Company not found in token", ) # Get department IDs for the company department_ids = [ dept.id for dept in Departments.filter_all( Departments.company_id == selected_company.id, db=db, ).data ] # Get duties IDs for the company duties_ids = [ duty.id for duty in Duties.filter_all( Duties.company_id == selected_company.id, db=db ).data ] # Get staff IDs staff_ids = [ staff.id for staff in Staff.filter_all(Staff.duties_id.in_(duties_ids), db=db).data ] # Get employee employee: Employees = Employees.filter_one( Employees.people_id == token_dict.person_id, Employees.staff_id.in_(staff_ids), db=db, ).data if not employee: raise HTTPExceptionApi( error_code="HTTP_400_BAD_REQUEST", lang=token_dict.lang, loc=get_line_number_for_error(), sys_msg="Employee not found in token", ) # Get reachable events reachable_event_codes = Event2Employee.get_event_codes(employee_id=employee.id) reachable_event_endpoints = Event2Employee.get_event_endpoints( employee_id=employee.id ) # Get staff and duties staff = Staff.filter_one(Staff.id == employee.staff_id, db=db).data duties = Duties.filter_one(Duties.id == staff.duties_id, db=db).data department = Departments.filter_one( Departments.id == duties.department_id, db=db ).data # Get bulk duty bulk_id = Duty.filter_by_one(system=True, duty_code="BULK", db=db).data bulk_duty_id = Duties.filter_by_one( company_id=selected_company.id, duties_id=bulk_id.id, **Duties.valid_record_dict, db=db, ).data # Create company token company_token = CompanyToken( company_uu_id=selected_company.uu_id.__str__(), company_id=selected_company.id, department_id=department.id, department_uu_id=department.uu_id.__str__(), duty_id=duties.id, duty_uu_id=duties.uu_id.__str__(), bulk_duties_id=bulk_duty_id.id, staff_id=staff.id, staff_uu_id=staff.uu_id.__str__(), employee_id=employee.id, employee_uu_id=employee.uu_id.__str__(), reachable_event_codes=reachable_event_codes, reachable_event_endpoints=reachable_event_endpoints, ) try: # Update Redis return TokenService.update_token_at_redis( request=request, add_payload=company_token ) except Exception as e: raise HTTPExceptionApi( error_code="", lang="en", loc=get_line_number_for_error(), sys_msg=f"{e}", ) @classmethod # Requires no auth context def handle_occupant_selection(cls, request: Request, data: Any, token_dict: TokenDictType): """Handle occupant type selection""" Users.set_user_define_properties(token=token_dict) db = BuildLivingSpace.new_session() # Get selected occupant type selected_build_living_space: BuildLivingSpace = BuildLivingSpace.filter_one( BuildLivingSpace.uu_id == data.build_living_space_uu_id, db=db, ).data if not selected_build_living_space: raise HTTPExceptionApi( error_code="HTTP_400_BAD_REQUEST", lang=token_dict.lang, loc=get_line_number_for_error(), sys_msg="Selected occupant type not found", ) # Get reachable events reachable_event_codes = Event2Occupant.get_event_codes( build_living_space_id=selected_build_living_space.id ) reachable_event_endpoints = Event2Occupant.get_event_endpoints( build_living_space_id=selected_build_living_space.id ) occupant_type = OccupantTypes.filter_one( OccupantTypes.id == selected_build_living_space.occupant_type_id, db=db, system=True, ).data build_part = BuildParts.filter_one( BuildParts.id == selected_build_living_space.build_parts_id, db=db, ).data build = BuildParts.filter_one( BuildParts.id == build_part.build_id, db=db, ).data responsible_employee = Employees.filter_one( Employees.id == build_part.responsible_employee_id, db=db, ).data related_company = RelationshipEmployee2Build.filter_one( RelationshipEmployee2Build.member_id == build.id, db=db, ).data # Get company company_related = Companies.filter_one( Companies.id == related_company.company_id, db=db, ).data # Create occupant token occupant_token = OccupantToken( living_space_id=selected_build_living_space.id, living_space_uu_id=selected_build_living_space.uu_id.__str__(), occupant_type_id=occupant_type.id, occupant_type_uu_id=occupant_type.uu_id.__str__(), occupant_type=occupant_type.occupant_type, build_id=build.id, build_uuid=build.uu_id.__str__(), build_part_id=build_part.id, build_part_uuid=build_part.uu_id.__str__(), responsible_employee_id=responsible_employee.id, responsible_employee_uuid=responsible_employee.uu_id.__str__(), responsible_company_id=company_related.id, responsible_company_uuid=company_related.uu_id.__str__(), reachable_event_codes=reachable_event_codes, reachable_event_endpoints=reachable_event_endpoints, ) try: # Update Redis return TokenService.update_token_at_redis( request=request, add_payload=occupant_token ) except Exception as e: raise HTTPExceptionApi( error_code="", lang="en", loc=get_line_number_for_error(), sys_msg=f"{e}", ) class AuthenticationFunctions: """Class for handling authentication functions""" @classmethod # Requires auth context def authentication_select_company_or_occupant_type(cls, data: Any): """Handle selection of company or occupant type""" context_retriever = ContextRetrievers(func=cls.authentication_select_company_or_occupant_type) if context_retriever.token.is_employee: if Handlers.handle_employee_selection( request=context_retriever.request, data=data, token_dict=context_retriever.token ): return { "completed": True, "selected": data.company_uu_id, **context_retriever.base, } elif context_retriever.token.is_occupant: if Handlers.handle_occupant_selection( request=context_retriever.request, data=data, token_dict=context_retriever.token ): return { "completed": True, "selected": data.build_living_space_uu_id, **context_retriever.base, } return {"completed": False, "selected": None, **context_retriever.base} @classmethod # Requires no auth context def authentication_login_with_domain_and_creds(cls, request: Request, data: Any): """ Authenticate user with domain and credentials. Args: request: FastAPI request object data: Request body containing login credentials { "domain": "evyos.com.tr", "access_key": "karatay.berkay.sup@evyos.com.tr", "password": "string", "remember_me": false } Returns: SuccessResponse containing authentication token and user info """ # Get token from login module user_login_module = UserLoginModule(request=request) user_login_module.login_user_via_credentials(access_data=data) user_login_module.language = "tr" # Return response with token and headers return EndpointSuccessResponse( code="LOGIN_SUCCESS", lang=user_login_module.language ).as_dict(data=user_login_module.as_dict) @classmethod # Requires not auth context def authentication_check_token_is_valid(cls, data: Any): """Check if token is valid for user""" # try: # if RedisActions.get_object_via_access_key(request=request): # return ResponseHandler.success("Access Token is valid") # except HTTPException: # return ResponseHandler.unauthorized("Access Token is NOT valid") return @classmethod # Requires not auth context def authentication_refresh_user_info(cls, data: Any): """Refresh user info using access token""" # try: # access_token = request.headers.get(Auth.ACCESS_TOKEN_TAG) # if not access_token: # return ResponseHandler.unauthorized() # found_user = Users.filter_one(Users.uu_id == token_dict.user_uu_id).data # if not found_user: # return ResponseHandler.not_found("User not found") # user_token = UsersTokens.filter_one( # UsersTokens.domain == found_user.domain_name, # UsersTokens.user_id == found_user.id, # UsersTokens.token_type == "RememberMe", # ).data # response_data = { # "access_token": access_token, # "refresh_token": getattr(user_token, "token", None), # "user": found_user.get_dict(), # } # return ResponseHandler.success( # "User info refreshed successfully", # data=response_data, # ) # except Exception as e: # return ResponseHandler.error(str(e)) return @classmethod # Requires no auth context def authentication_change_password(cls, data: Any): """Change password with access token""" # try: # if not isinstance(token_dict, EmployeeTokenObject): # return ResponseHandler.unauthorized("Only employees can change password") # found_user = Users.filter_one(Users.uu_id == token_dict.user_uu_id).data # if not found_user: # return ResponseHandler.not_found("User not found") # if not found_user.check_password(data.old_password): # return ResponseHandler.unauthorized("Old password is incorrect") # found_user.set_password(data.new_password) # return ResponseHandler.success("Password changed successfully") # except Exception as e: # return ResponseHandler.error(str(e)) return @classmethod # Requires not auth context def authentication_create_password(cls, data: Any): """Create password with password reset token requested via email""" # if not data.re_password == data.password: # raise HTTPException(status_code=status.HTTP_406_NOT_ACCEPTABLE, detail="Password must match") # if found_user := Users.filter_one(Users.password_token == data.password_token).data: # found_user.create_password(found_user=found_user, password=data.password) # found_user.password_token = "" # found_user.save() # return ResponseHandler.success("Password is created successfully", data=found_user.get_dict()) # return ResponseHandler.not_found("Record not found") return @classmethod # Requires auth context def authentication_disconnect_user(cls, data: Any): """Disconnect all sessions of user in access token""" # found_user = Users.filter_one(Users.uu_id == token_dict.user_uu_id).data # if not found_user: # return ResponseHandler.not_found("User not found") # if already_tokens := RedisActions.get_object_via_user_uu_id(user_id=str(found_user.uu_id)): # for key, token_user in already_tokens.items(): # RedisActions.delete(key) # selected_user = Users.filter_one(Users.uu_id == token_user.get("uu_id")).data # selected_user.remove_refresher_token(domain=data.domain, disconnect=True) # return ResponseHandler.success("All sessions are disconnected", data=selected_user.get_dict()) # return ResponseHandler.not_found("Invalid data") return @classmethod # Requires auth context def authentication_logout_user(cls, data: Any): """Logout only single session of user which domain is provided""" # token_user = None # if already_tokens := RedisActions.get_object_via_access_key(request=request): # for key in already_tokens: # token_user = RedisActions.get_json(key) # if token_user.get("domain") == data.domain: # RedisActions.delete(key) # selected_user = Users.filter_one(Users.uu_id == token_user.get("uu_id")).data # selected_user.remove_refresher_token(domain=data.domain) # return ResponseHandler.success("Session is logged out", data=token_user) # return ResponseHandler.not_found("Logout is not successfully completed") context_retriever = ContextRetrievers(func=cls.authentication_logout_user) return context_retriever.base @classmethod # Requires not auth context def authentication_refresher_token(cls, data: Any): """Refresh access token with refresher token""" # token_refresher = UsersTokens.filter_by_one( # token=data.refresh_token, # domain=data.domain, # **UsersTokens.valid_record_dict, # ).data # if not token_refresher: # return ResponseHandler.not_found("Invalid data") # if found_user := Users.filter_one(Users.id == token_refresher.user_id).data: # access_key = AuthActions.save_access_token_to_redis( # request=request, found_user=found_user, domain=data.domain # ) # found_user.last_agent = request.headers.get("User-Agent", None) # found_user.last_platform = request.headers.get("Origin", None) # found_user.last_remote_addr = getattr(request, "remote_addr", None) or request.headers.get("X-Forwarded-For", None) # found_user.last_seen = str(system_arrow.now()) # response_data = { # "access_token": access_key, # "refresh_token": data.refresh_token, # } # return ResponseHandler.success("User is logged in successfully via refresher token", data=response_data) # return ResponseHandler.not_found("Invalid data") context_retriever = ContextRetrievers(func=cls.authentication_refresher_token) return context_retriever.base @classmethod # Requires not auth context def authentication_forgot_password(cls, data: Any): """Send an email to user for a valid password reset token""" # found_user: Users = Users.check_user_exits(access_key=data.access_key, domain=data.domain) # forgot_key = AuthActions.save_access_token_to_redis(request=request, found_user=found_user, domain=data.domain) # forgot_link = ApiStatic.forgot_link(forgot_key=forgot_key) # send_email_completed = send_email( # subject=f"Dear {found_user.user_tag}, your forgot password link has been sent.", # receivers=[str(found_user.email)], # html=change_your_password_template(user_name=found_user.user_tag, forgot_link=forgot_link), # ) # if not send_email_completed: # raise HTTPException(status_code=400, detail="Email can not be sent. Try again later") # found_user.password_token = forgot_key # found_user.password_token_is_valid = str(system_arrow.shift(days=1)) # found_user.save() # return ResponseHandler.success("Password is change link is sent to your email or phone", data={}) return @classmethod # Requires not auth context def authentication_reset_password(cls, data: Any): """Reset password with forgot password token""" # from sqlalchemy import or_ # found_user = Users.query.filter( # or_( # Users.email == str(data.access_key).lower(), # Users.phone_number == str(data.access_key).replace(" ", ""), # ), # ).first() # if not found_user: # raise HTTPException( # status_code=status.HTTP_400_BAD_REQUEST, # detail="Given access key or domain is not matching with the any user record.", # ) # reset_password_token = found_user.reset_password_token(found_user=found_user) # send_email_completed = send_email( # subject=f"Dear {found_user.user_tag}, a password reset request has been received.", # receivers=[str(found_user.email)], # html=change_your_password_template( # user_name=found_user.user_tag, # forgot_link=ApiStatic.forgot_link(forgot_key=reset_password_token), # ), # ) # if not send_email_completed: # raise found_user.raise_http_exception(status_code=400, message="Email can not be sent. Try again later") # return ResponseHandler.success("Password change link is sent to your email or phone", data=found_user.get_dict()) return @classmethod # Requires not auth context def authentication_download_avatar(cls, data: Any): """Download avatar icon and profile info of user""" # if found_user := Users.filter_one(Users.id == token_dict.user_id).data: # expired_starts = str(system_arrow.now() - system_arrow.get(str(found_user.expiry_ends))) # expired_int = (system_arrow.now() - system_arrow.get(str(found_user.expiry_ends))).days # user_info = { # "lang": token_dict.lang, # "full_name": found_user.person.full_name, # "avatar": found_user.avatar, # "remember_me": found_user.remember_me, # "expiry_ends": str(found_user.expiry_ends), # "expired_str": expired_starts, # "expired_int": int(expired_int), # } # return ResponseHandler.success("Avatar and profile is shared via user credentials", data=user_info) # return ResponseHandler.not_found("Invalid data") return