import hashlib import uuid import requests import secrets from datetime import timedelta from api_services.redis.functions import RedisActions from databases.no_sql_models.validations import ( PasswordHistoryViaUser, AccessHistoryViaUser, ) from api_library.date_time_actions.date_functions import system_arrow from api_configs import ApiStatic, Auth class PasswordModule: @staticmethod def generate_token(length=32): return secrets.token_urlsafe(length) @staticmethod def create_hashed_password(domain: str, id_: str, password: str): return hashlib.sha256(f"{domain}:{id_}:{password}".encode("utf-8")).hexdigest() @classmethod def check_password(cls, domain, id_, password, password_hashed): return cls.create_hashed_password(domain, id_, password) == password_hashed @classmethod def generate_access_token(cls): return secrets.token_urlsafe(Auth.ACCESS_TOKEN_LENGTH) class AuthModule(PasswordModule): @classmethod def check_user_exits(cls, access_key, domain): from databases import Users from sqlalchemy import or_ from fastapi import status from fastapi.exceptions import HTTPException found_user: Users = Users.query.filter( or_( Users.email == str(access_key).lower(), Users.phone_number == str(access_key).replace(" ", ""), ), ).first() if not found_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Given access key or domain is not matching with the any user record.", ) other_domains_list = found_user.get_main_domain_and_other_domains( get_main_domain=False ) if domain not in other_domains_list: raise HTTPException( status_code=401, detail=dict(message="Unauthorized User attempts to connect api"), ) return found_user def remove_refresher_token(self, domain, disconnect: bool = False): from databases import UsersTokens if disconnect: registered_tokens = UsersTokens.filter_all( UsersTokens.user_id == self.id, system=True ) else: registered_tokens = UsersTokens.filter_all( UsersTokens.domain == domain, UsersTokens.user_id == self.id, system=True, ) registered_tokens.query.delete() UsersTokens.save() def check_password_is_different(self, password): from fastapi.exceptions import HTTPException main_domain = self.get_main_domain_and_other_domains(get_main_domain=True) if self.hash_password == self.create_hashed_password( domain=main_domain, id_=self.uu_id, password=password ): raise HTTPException( status_code=401, detail="New password is same with old password.", ) @staticmethod def create_password(found_user, password, password_token=None): from databases import MongoQueryIdentity, Users from fastapi.exceptions import HTTPException found_user: Users = found_user if found_user.password_token: replace_day = 0 try: replace_day = int( str(found_user.password_expires_day or 0) .split(",")[0] .replace(" days", "") ) except Exception as e: err = e token_is_expired = system_arrow.now() >= system_arrow.get( str(found_user.password_expiry_begins) ).shift(days=replace_day) if not password_token == found_user.password_token and token_is_expired: raise HTTPException( status_code=401, detail="Password token is not valid. Please request a new password token.", ) query_engine = MongoQueryIdentity(company_uuid=found_user.related_company) domain_via_user = query_engine.get_domain_via_user( user_uu_id=str(found_user.uu_id) )[0]["main_domain"] new_password_dict = { "password": found_user.create_hashed_password( domain=domain_via_user, id_=str(found_user.uu_id), password=password ), "date": str(system_arrow.now().date()), } history_dict = PasswordHistoryViaUser( user_uu_id=str(found_user.uu_id), password_add=new_password_dict, access_history_detail={ "request": "", "ip": "", }, ) found_user.password_expiry_begins = str(system_arrow.now()) found_user.hash_password = new_password_dict.get("password") found_user.password_token = "" if found_user.password_token else "" query_engine.refresh_password_history_via_user(payload=history_dict) found_user.save() return found_user @staticmethod def reset_password_token(found_user): found_user.password_expiry_begins = str(system_arrow.now()) found_user.password_token = found_user.generate_token( Auth.REFRESHER_TOKEN_LENGTH ) found_user.save() return found_user.password_token def generate_refresher_token(self, domain: str, remember_me=False): from databases import UsersTokens if remember_me: refresh_token = self.generate_token(Auth.REFRESHER_TOKEN_LENGTH) if already_token := UsersTokens.filter_by_one( system=True, user_id=self.id, token_type="RememberMe", domain=domain ).data: already_token.update(token=refresh_token) already_token.expires_at = system_arrow.shift(days=3) already_token.save() return refresh_token users_tokens = UsersTokens.filter_by_all( user_id=self.id, token_type="RememberMe", domain=domain, system=True ).data if users_tokens: users_tokens.query.delete() UsersTokens.save() users_token = UsersTokens.find_or_create( user_id=self.id, token_type="RememberMe", token=refresh_token, domain=domain, ) users_token.save_and_confirm() return refresh_token return None def remainder_day(self): join_list = [ _ for _ in str(self.password_expires_day).split(",")[0] if _.isdigit() ] return float(timedelta(days=int("".join(join_list))).seconds) class UserLoginModule(AuthModule): @classmethod def set_login_details_to_mongo_database( cls, found_user, headers_request, access_token, record_id ): from databases.no_sql_models.identity import MongoQueryIdentity agent = headers_request.get("evyos-user-agent", "") platform = headers_request.get("evyos-platform", "") ext_ip = headers_request.get("evyos-ip-ext") address = requests.get(f"http://ip-api.com/json/{ext_ip}").json() address_package = { "city": address["city"], "zip": address["zip"], "country": address["country"], "countryCode": address["countryCode"], "region": address["region"], "regionName": address["regionName"], } mongo_db = MongoQueryIdentity( company_uuid=found_user.related_company, storage_reasoning="AccessHistory", ) query_engine = MongoQueryIdentity(company_uuid=found_user.related_company) filter_query = { "agent": agent, "platform": platform, "address": address_package, "user_id": found_user.id, } already_exits = mongo_db.mongo_engine.filter_by(filter_query).data no_address_validates = mongo_db.mongo_engine.get_all().data access_via_user = query_engine.update_access_history_via_user( AccessHistoryViaUser( **{ "user_uu_id": found_user.uu_id.__str__(), "access_history": { "record_id": record_id, "agent": agent, "platform": platform, "address": address_package, "ip": ext_ip, "access_token": access_token, "created_at": system_arrow.now().timestamp(), }, } ) ) if already_exits: update_mongo = mongo_db.mongo_engine.table.update_one( filter=filter_query, update={ "$set": { "ip": ext_ip, "access_token": access_token, "created_at": system_arrow.now().timestamp(), } }, ) else: insert_mongo = mongo_db.mongo_engine.insert( payload={ "user_id": found_user.id, "record_id": record_id, "agent": agent, "platform": platform, "address": address_package, "ip": ext_ip, "access_token": access_token, "created_at": system_arrow.now().timestamp(), "is_confirmed": True if no_address_validates else False, "is_first": True if no_address_validates else False, } ) @classmethod def login_user_with_credentials(cls, data, request): from databases.sql_models.identity.identity import Users from ApiServices.api_handlers.auth_actions.auth import AuthActions from fastapi.exceptions import HTTPException found_user: Users = Users.check_user_exits( access_key=data.access_key, domain=data.domain ) if not found_user: raise HTTPException( status_code=401, detail="Login is not successful. Please check your credentials.", ) if len(found_user.hash_password) < 5: raise HTTPException( status_code=401, detail="This user has not set up password. Please contact support.", ) if found_user.check_password( domain=data.domain, id_=found_user.uu_id, password_hashed=found_user.hash_password, password=data.password, ): access_object_to_redis = AuthActions.save_access_token_to_redis( request=request, found_user=found_user, domain=data.domain, # remember_me=data.remember_me, ) access_token = access_object_to_redis.get("access_token") access_object_to_redis["user"] = found_user headers_request = dict(request.headers) headers_request["evyos-user-agent"] = headers_request.get("user-agent") headers_request["evyos-platform"] = headers_request.get("user-agent") headers_request["evyos-ip-ext"] = "94.54.68.158" if headers_request.get("evyos-ip-ext"): record_id = uuid.uuid4().__str__() cls.set_login_details_to_mongo_database( found_user=found_user, headers_request=headers_request, access_token=access_token, record_id=record_id, ) notice_link = ApiStatic.blacklist_login(record_id=record_id) found_user.remember_me = bool(data.remember_me) found_user.save() return access_object_to_redis raise HTTPException( status_code=401, detail="Login is not successful. Please check your credentials.", )