wag-managment-api-service-v.../databases/no_sql_models/identity.py

141 lines
5.3 KiB
Python

import datetime
from databases.no_sql_models.validations import (
PasswordHistoryViaUser,
DomainViaUser,
AccessHistoryViaUser,
)
from databases.no_sql_models.mongo_database import MongoQuery
class MongoQueryIdentity:
"""
4ex. mongo_collection_name = str(Company.uu_id()) + "*" + str('UserPasswordHistory')
"""
def __init__(self, company_uuid, storage_reasoning: str = None):
self.company_uuid = company_uuid
self.mongo_collection_base = str(company_uuid)
if storage_reasoning:
self.mongo_collection_name = (
str(company_uuid) + "*" + str(storage_reasoning)
)
else:
self.mongo_collection_name = str(company_uuid) + "*" + str("Domain")
self.mongo_engine = MongoQuery(
table_name=self.mongo_collection_name, database_name="mongo_database"
)
def use_collection(self, storage_reasoning):
self.mongo_collection_name = (
str(self.company_uuid) + "*" + str(storage_reasoning)
)
self.mongo_engine = MongoQuery(
table_name=self.mongo_collection_name, database_name="mongo_database"
)
def create_domain_via_user(self, payload: DomainViaUser):
self.use_collection("Domain")
return self.mongo_engine.find_or_insert(
field="user_uu_id",
payload={
"user_uu_id": payload.user_uu_id,
"other_domains_list": [payload.main_domain],
"main_domain": payload.main_domain,
"modified_at": datetime.datetime.now().timestamp(),
},
)
def update_domain_via_user(self, payload: DomainViaUser):
self.use_collection("Domain")
return self.mongo_engine.update(
match=payload.user_uu_id,
payload={
"other_domains_list": payload.other_domains_list,
"modified_at": datetime.datetime.now().timestamp(),
},
field="user_uu_id",
)
def get_domain_via_user(self, user_uu_id):
self.use_collection("Domain")
return self.mongo_engine.get_one(match=str(user_uu_id), field="user_uu_id")
def refresh_password_history_via_user(self, payload: PasswordHistoryViaUser):
self.use_collection("PasswordHistory")
password_history_item = self.mongo_engine.get_one(
match=payload.user_uu_id, field="user_uu_id"
)
if not password_history_item:
self.mongo_engine.insert(
payload={
"user_uu_id": str(payload.user_uu_id),
"password_history": [],
}
)
password_history_item = self.mongo_engine.get_one(
match=payload.user_uu_id, field="user_uu_id"
)
password_history_list = password_history_item.get("password_history", [])
hashed_password = payload.password_add.get("password")
for password_in_history in password_history_list:
if str(password_in_history.get("password")) == str(hashed_password):
raise Exception(
dict(
status_code=400,
detail="Password already used. Please enter a new password that you have not used last 3 times.",
)
)
if len(password_history_list) > 3:
password_history_list.pop(0)
password_history_list.append(payload.password_add)
return self.mongo_engine.update(
match=payload.user_uu_id,
payload={
"password_history": password_history_list,
"access_history_detail": payload.access_history_detail,
"modified_at": datetime.datetime.now().timestamp(),
},
field="user_uu_id",
)
def get_password_history_via_user(self, user_uu_id):
self.use_collection("PasswordHistory")
return self.mongo_engine.get_one(match=user_uu_id, field="user_uu_id")
def update_access_history_via_user(self, payload: AccessHistoryViaUser):
self.use_collection("AccessHistory")
if already_dict := self.get_access_history_via_user(
user_uu_id=payload.user_uu_id
):
access_history = already_dict[0].get("access_history") or []
access_history.append(payload.access_history)
if len(access_history) > 60:
access_history.pop(0)
return self.mongo_engine.update(
match=payload.user_uu_id,
payload={
"user_uu_id": payload.user_uu_id,
"access_history": access_history,
"modified_at": datetime.datetime.now().timestamp(),
},
field="user_uu_id",
)
return self.mongo_engine.insert(
payload={
"user_uu_id": payload.user_uu_id,
"access_history": [payload.access_history],
"modified_at": datetime.datetime.now().timestamp(),
}
)
def get_access_history_via_user(self, user_uu_id):
self.use_collection("AccessHistory")
return self.mongo_engine.filter_by(
payload={"user_uu_id": user_uu_id},
sort_by="modified_at",
sort_direction="desc",
)