alchemy flush and save functions updated

This commit is contained in:
2024-11-11 18:55:53 +03:00
parent c42a19c262
commit 1f1222c32d
163 changed files with 6296 additions and 476 deletions

View File

@@ -32,6 +32,11 @@ class PasswordModule:
@classmethod
def check_hashed_password(cls, domain, id_, password, password_hashed):
if not password_hashed:
raise HTTPException(
status_code=401,
detail="Password is not changed yet user has no password.",
)
return cls.create_hashed_password(domain, id_, password) == password_hashed
@@ -83,12 +88,19 @@ class AuthModule(PasswordModule):
def check_password(self, password):
main_domain = self.get_main_domain_and_other_domains(get_main_domain=True)
print('check_password', dict(
domain=main_domain,
id_=str(self.uu_id),
password_hashed=self.hash_password,
password=password,
))
if check_password := self.check_hashed_password(
domain=main_domain,
id_=self.uu_id,
id_=str(self.uu_id),
password_hashed=self.hash_password,
password=password,
):
print('check_password', check_password)
return check_password
raise HTTPException(
status_code=401,
@@ -105,59 +117,59 @@ class AuthModule(PasswordModule):
detail="New password is same with old password.",
)
def create_password(self, password, password_token=None):
from databases import (
MongoQueryIdentity,
)
if self.password_token:
@staticmethod
def create_password(found_user, password, password_token=None):
from databases import MongoQueryIdentity
if found_user.password_token:
replace_day = 0
try:
replace_day = int(
str(self.password_expires_day or 0)
str(found_user.password_expires_day or 0)
.split(",")[0]
.replace(" days", "")
)
except Exception as e:
err = e
token_is_expired = system_arrow.now() >= system_arrow.get(
self.password_expiry_begins
found_user.password_expiry_begins
).shift(days=replace_day)
if not password_token == self.password_token and token_is_expired:
if not password_token == found_user.password_token and token_is_expired:
raise HTTPException(
status_code=401,
detail="Password token is not valid. Please request a new password token.",
)
query_engine = MongoQueryIdentity(company_uuid=found_user.related_company)
query_engine = MongoQueryIdentity(company_uuid=self.related_company)
domain_via_user = query_engine.get_domain_via_user(user_uu_id=str(self.uu_id))[
domain_via_user = query_engine.get_domain_via_user(user_uu_id=str(found_user.uu_id))[
"main_domain"
]
new_password_dict = {
"password": self.create_hashed_password(
domain=domain_via_user, id_=self.uu_id, password=password
"password": found_user.create_hashed_password(
domain=domain_via_user, id_=str(found_user.uu_id), password=password
),
"date": str(system_arrow.now()),
"date": str(system_arrow.now().date()),
}
history_dict = PasswordHistoryViaUser(
user_uu_id=str(self.uu_id),
user_uu_id=str(found_user.uu_id),
password_add=new_password_dict,
access_history_detail={
"request": "",
"ip": "",
},
)
found_user.password_expiry_begins = str(system_arrow.now())
found_user.hash_password = new_password_dict.get("password")
found_user.password_token = "" if found_user.password_token else ""
found_user.save()
query_engine.refresh_password_history_via_user(payload=history_dict)
self.password_expiry_begins = str(system_arrow.now())
self.hash_password = new_password_dict.get("password")
if self.password_token:
self.password_token = None
self.save()
return
def reset_password_token(self):
self.password_expiry_begins = str(system_arrow.now())
self.password_token = self.generate_token(127)
self.save()
@staticmethod
def reset_password_token(found_user):
found_user.password_expiry_begins = str(system_arrow.now())
found_user.password_token = found_user.generate_token(127)
found_user.save()
def generate_refresher_token(self, domain: str, remember_me=False):
from databases import (
@@ -212,8 +224,8 @@ class UserLoginModule(AuthModule):
)
access_token = found_user.generate_access_token()
query_engine = MongoQueryIdentity(company_uuid=found_user.related_company)
if found_user.check_password(password=data.password):
print('before access_object_to_redis')
access_object_to_redis = save_access_token_to_redis(
request=request,
found_user=found_user,