from typing import Any, ClassVar, Dict from sqlalchemy import or_ from ApiLibrary.common.line_number import get_line_number_for_error from Schemas import Users from ErrorHandlers import HTTPExceptionApi from ApiValidations.Request.authentication import Login from ApiLibrary.token.password_module import PasswordModule from ApiServices.Token.token_handler import TokenService class UserLoginModule: def __init__(self, request: "Request"): self.request = request @staticmethod async def check_user_exists(access_key) -> ClassVar[Users]: """Check if user exists.""" db_session = Users.new_session() if "@" in access_key: found_user = Users.filter_one( Users.email == access_key.lower(), db=db_session ).data else: found_user = Users.filter_one( Users.phone_number == access_key.replace(" ", ""), db=db_session ).data if not found_user: raise HTTPExceptionApi( error_code="HTTP_400_BAD_REQUEST", lang="en", loc=get_line_number_for_error(), sys_msg="User not found", ) return found_user async def login_user_via_credentials(self, access_data: "Login") -> Dict[str, Any]: """Login user via credentials.""" # Get the actual data from the BaseRequestModel if needed if hasattr(access_data, "data"): access_data = access_data.data found_user: Users = await self.check_user_exists( access_key=access_data.access_key ) if len(found_user.hash_password) < 5: raise HTTPExceptionApi( error_code="HTTP_400_BAD_REQUEST", lang=found_user.lang, loc=get_line_number_for_error(), sys_msg="Invalid password create a password to user first", ) if PasswordModule.check_password( domain=access_data.domain, id_=found_user.uu_id, password=access_data.password, password_hashed=found_user.hash_password, ): return TokenService.set_access_token_to_redis( request=self.request, user=found_user, domain=access_data.domain, remember=access_data.remember_me, ) raise HTTPExceptionApi( error_code="HTTP_400_BAD_REQUEST", lang=found_user.lang, loc=get_line_number_for_error(), sys_msg="login_user_via_credentials raised error", )