import datetime from fastapi import HTTPException from .validations import PasswordHistoryViaUser, DomainViaUser, AccessHistoryViaUser from .mongo_database import MongoQuery class MongoQueryIdentity: """ 4ex. mongo_collection_name = str(Company.uu_id()) + "*" + str('UserPasswordHistory') """ def __init__(self, company_uuid, storage_reasoning: str = None): self.company_uuid = company_uuid self.mongo_collection_base = str(company_uuid) if storage_reasoning: self.mongo_collection_name = ( str(company_uuid) + "*" + str(storage_reasoning) ) else: self.mongo_collection_name = str(company_uuid) + "*" + str("Domain") self.mongo_engine = MongoQuery( table_name=self.mongo_collection_name, database_name="mongo_database" ) def use_collection(self, storage_reasoning): self.mongo_collection_name = ( str(self.company_uuid) + "*" + str(storage_reasoning) ) self.mongo_engine = MongoQuery( table_name=self.mongo_collection_name, database_name="mongo_database" ) def create_domain_via_user(self, payload: DomainViaUser): self.use_collection("Domain") return self.mongo_engine.insert( payload={ "user_uu_id": payload.user_uu_id, "other_domains_list": [payload.main_domain], "main_domain": payload.main_domain, "modified_at": datetime.datetime.now().timestamp(), } ) def update_domain_via_user(self, payload: DomainViaUser): self.use_collection("Domain") return self.mongo_engine.update( match=payload.user_uu_id, payload={ "other_domains_list": payload.other_domains_list, "modified_at": datetime.datetime.now().timestamp(), }, field="user_uu_id", ) def get_domain_via_user(self, user_uu_id): self.use_collection("Domain") return self.mongo_engine.get_one(match=str(user_uu_id), field="user_uu_id") def refresh_password_history_via_user(self, payload: PasswordHistoryViaUser): self.use_collection("PasswordHistory") password_history_item = self.mongo_engine.get_one( match=payload.user_uu_id, field="user_uu_id" ) if not password_history_item: self.mongo_engine.insert( payload={ "user_uu_id": str(payload.user_uu_id), "password_history": [], } ) password_history_item = self.mongo_engine.get_one( match=payload.user_uu_id, field="user_uu_id" ) password_history_list = password_history_item.get("password_history", []) hashed_password = payload.password_add.get("password") for password_in_history in password_history_list: if password_in_history.get("password") == str(hashed_password): raise HTTPException( status_code=400, detail="Password already used. Please enter a new password that you have not used last 3 times.", ) if len(password_history_list) > 3: password_history_list.pop(0) password_history_list.append(payload.password_add) return self.mongo_engine.update( match=payload.user_uu_id, payload={ "password_history": password_history_list, "access_history_detail": payload.access_history_detail, "modified_at": datetime.datetime.now().timestamp(), }, field="user_uu_id", ) def get_password_history_via_user(self, user_uu_id): self.use_collection("PasswordHistory") return self.mongo_engine.get_one(match=user_uu_id, field="user_uu_id") def update_access_history_via_user(self, payload: AccessHistoryViaUser): self.use_collection("AccessHistory") if already_dict := self.get_access_history_via_user( user_uu_id=payload.user_uu_id ): access_history = already_dict[0].get("access_history") or [] access_history.append(payload.access_history) if len(access_history) > 60: access_history.pop(0) return self.mongo_engine.update( match=payload.user_uu_id, payload={ "user_uu_id": payload.user_uu_id, "access_history": access_history, "modified_at": datetime.datetime.now().timestamp(), }, field="user_uu_id", ) return self.mongo_engine.insert( payload={ "user_uu_id": payload.user_uu_id, "access_history": [payload.access_history], "modified_at": datetime.datetime.now().timestamp(), } ) def get_access_history_via_user(self, user_uu_id): self.use_collection("AccessHistory") return self.mongo_engine.filter_by( payload={"user_uu_id": user_uu_id}, sort_by="modified_at", sort_direction="desc", )