from ApiLayers.ApiValidations.Request.authentication import Login from ApiLayers.ApiLibrary.token.password_module import PasswordModule from ApiLayers.ApiLibrary.common.line_number import get_line_number_for_error from ApiLayers.ErrorHandlers import HTTPExceptionApi class UserLoginModule: def __init__(self, request: "Request"): self.request = request self.user = None self.access_object = None self.access_token = None self.refresh_token = None @property def as_dict(self) -> dict: return { "user": self.user, "access_object": self.access_object, "access_token": self.access_token, "refresh_token": self.refresh_token, } @staticmethod def check_user_exists(access_key: str): from ApiLayers.Schemas import Users """ Check if the user exists in the database. """ db_session = Users.new_session() # Check if user exists. if "@" in access_key: found_user: Users = Users.filter_one( Users.email == access_key.lower(), db=db_session ).data else: found_user: Users = Users.filter_one( Users.phone_number == access_key.replace(" ", ""), db=db_session ).data if not found_user: raise HTTPExceptionApi( error_code="HTTP_400_BAD_REQUEST", lang="en", loc=get_line_number_for_error(), sys_msg="check_user_exists: User not found", ) return found_user def login_user_via_credentials(self, access_data: "Login") -> None: from ApiLayers.ApiServices.Token.token_handler import TokenService from ApiLayers.Schemas import Users """ Login the user via the credentials. """ # Get the actual data from the BaseRequestModel if needed found_user: Users = self.check_user_exists(access_key=access_data.access_key) if len(found_user.hash_password) < 5: raise HTTPExceptionApi( error_code="HTTP_400_BAD_REQUEST", lang=found_user.lang, loc=get_line_number_for_error(), sys_msg="login_user_via_credentials: Invalid password create a password to user first", ) # Check if the password is correct if PasswordModule.check_password( domain=access_data.domain, id_=found_user.uu_id, password=access_data.password, password_hashed=found_user.hash_password, ): # Set the access token to the redis token_response = TokenService.set_access_token_to_redis( request=self.request, user=found_user, domain=access_data.domain, remember=access_data.remember_me, ) # Set the user and token information to the instance self.user = found_user.get_dict() self.access_token = token_response.get("access_token") self.refresh_token = token_response.get("refresh_token") self.access_object = { "user_type": token_response.get("user_type", None), "selection_list": token_response.get("selection_list", {}), } return None raise HTTPExceptionApi( error_code="HTTP_400_BAD_REQUEST", lang="tr", loc=get_line_number_for_error(), sys_msg="login_user_via_credentials: raised an unknown error", )