import datetime from databases.no_sql_models.validations import ( PasswordHistoryViaUser, DomainViaUser, AccessHistoryViaUser, ) from databases.no_sql_models.mongo_database import MongoQuery from api_library.date_time_actions.date_functions import system_arrow def validate_timestamp(doc): """Validate and fix timestamp fields in MongoDB documents""" if not doc: return doc timestamp_fields = ["modified_at", "created_at", "accessed_at", "timestamp"] for field in timestamp_fields: if field in doc and not isinstance(doc[field], (int, float)): # Convert to proper timestamp if it's not already doc[field] = system_arrow.to_timestamp(doc[field]) return doc class MongoQueryIdentity: """ 4ex. mongo_collection_name = str(Company.uu_id()) + "*" + str('UserPasswordHistory') """ def __init__(self, company_uuid, storage_reasoning: str = None): self.company_uuid = company_uuid self.mongo_collection_base = str(company_uuid) if storage_reasoning: self.mongo_collection_name = ( str(company_uuid) + "*" + str(storage_reasoning) ) else: self.mongo_collection_name = str(company_uuid) + "*" + str("Domain") self.mongo_engine = MongoQuery( table_name=self.mongo_collection_name, database_name="mongo_database" ) def use_collection(self, storage_reasoning): self.mongo_collection_name = ( str(self.company_uuid) + "*" + str(storage_reasoning) ) self.mongo_engine = MongoQuery( table_name=self.mongo_collection_name, database_name="mongo_database" ) def create_domain_via_user(self, payload: DomainViaUser): self.use_collection("Domain") return self.mongo_engine.find_or_insert( field="user_uu_id", payload={ "user_uu_id": payload.user_uu_id, "other_domains_list": [payload.main_domain], "main_domain": payload.main_domain, "modified_at": system_arrow.to_timestamp(system_arrow.now()), }, ) def update_domain_via_user(self, payload: DomainViaUser): self.use_collection("Domain") return self.mongo_engine.update( match=payload.user_uu_id, payload={ "other_domains_list": payload.other_domains_list, "modified_at": system_arrow.to_timestamp(system_arrow.now()), }, field="user_uu_id", ) def get_domain_via_user(self, user_uu_id): self.use_collection("Domain") result = self.mongo_engine.find(match=user_uu_id, field="user_uu_id") return [validate_timestamp(doc) for doc in result] if result else None def refresh_password_history_via_user(self, payload: PasswordHistoryViaUser): self.use_collection("PasswordHistory") password_history_item = self.mongo_engine.get_one( match=payload.user_uu_id, field="user_uu_id" ) if not password_history_item: self.mongo_engine.insert( payload={ "user_uu_id": str(payload.user_uu_id), "password_history": [], } ) password_history_item = self.mongo_engine.get_one( match=payload.user_uu_id, field="user_uu_id" ) password_history_list = password_history_item.get("password_history", []) hashed_password = payload.password_add.get("password") for password_in_history in password_history_list: if str(password_in_history.get("password")) == str(hashed_password): raise Exception( dict( status_code=400, detail="Password already used. Please enter a new password that you have not used last 3 times.", ) ) if len(password_history_list) > 3: password_history_list.pop(0) password_history_list.append(payload.password_add) return self.mongo_engine.update( match=payload.user_uu_id, payload={ "password_history": password_history_list, "access_history_detail": payload.access_history_detail, "modified_at": system_arrow.to_timestamp(system_arrow.now()), }, field="user_uu_id", ) def get_password_history_via_user(self, user_uu_id): self.use_collection("UserPasswordHistory") result = self.mongo_engine.find(match=user_uu_id, field="user_uu_id") return [validate_timestamp(doc) for doc in result] if result else None def update_access_history_via_user(self, payload: AccessHistoryViaUser): self.use_collection("AccessHistory") if already_dict := self.get_access_history_via_user( user_uu_id=payload.user_uu_id ): access_history = already_dict[0].get("access_history") or [] access_history.append(payload.access_history) if len(access_history) > 60: access_history.pop(0) return self.mongo_engine.update( match=payload.user_uu_id, payload={ "user_uu_id": payload.user_uu_id, "access_history": access_history, "modified_at": system_arrow.to_timestamp(system_arrow.now()), }, field="user_uu_id", ) return self.mongo_engine.insert( payload={ "user_uu_id": payload.user_uu_id, "access_history": [payload.access_history], "modified_at": system_arrow.to_timestamp(system_arrow.now()), } ) def get_access_history_via_user(self, user_uu_id): self.use_collection("AccessHistory") result = self.mongo_engine.find(match=user_uu_id, field="user_uu_id") return [validate_timestamp(doc) for doc in result] if result else None