751 lines
29 KiB
Python
751 lines
29 KiB
Python
import arrow
|
|
|
|
from typing import Any, Dict, Optional, Union
|
|
from ApiServices.AuthService.events.auth.model import PasswordHistoryViaUser
|
|
from ApiServices.AuthService.validations.custom.token import (
|
|
EmployeeTokenObject,
|
|
OccupantTokenObject,
|
|
CompanyToken,
|
|
OccupantToken,
|
|
UserType,
|
|
)
|
|
from ApiServices.AuthService.config import api_config
|
|
from Schemas import (
|
|
Users,
|
|
People,
|
|
UsersTokens,
|
|
Credentials,
|
|
BuildLivingSpace,
|
|
BuildParts,
|
|
OccupantTypes,
|
|
Employees,
|
|
Addresses,
|
|
Companies,
|
|
Staff,
|
|
Duty,
|
|
Duties,
|
|
Departments,
|
|
Event2Employee,
|
|
)
|
|
from Modules.Token.password_module import PasswordModule
|
|
from Schemas.building.build import RelationshipEmployee2Build
|
|
from Schemas.event.event import Event2Occupant, Application2Employee
|
|
from Controllers.Redis.database import RedisActions
|
|
from Controllers.Mongo.database import mongo_handler
|
|
|
|
|
|
TokenDictType = Union[EmployeeTokenObject, OccupantTokenObject]
|
|
|
|
|
|
class RedisHandlers:
|
|
|
|
AUTH_TOKEN: str = "AUTH_TOKEN"
|
|
|
|
@classmethod
|
|
def process_redis_object(cls, redis_object: Dict[str, Any]) -> TokenDictType:
|
|
"""Process Redis object and return appropriate token object."""
|
|
if not redis_object.get("selected_company"):
|
|
redis_object["selected_company"] = None
|
|
if not redis_object.get("selected_occupant"):
|
|
redis_object["selected_occupant"] = None
|
|
if redis_object.get("user_type") == UserType.employee.value:
|
|
return EmployeeTokenObject(**redis_object)
|
|
elif redis_object.get("user_type") == UserType.occupant.value:
|
|
return OccupantTokenObject(**redis_object)
|
|
raise ValueError("Invalid user type")
|
|
|
|
@classmethod
|
|
def get_object_from_redis(cls, access_token: str) -> TokenDictType:
|
|
redis_response = RedisActions.get_json(
|
|
list_keys=[RedisHandlers.AUTH_TOKEN, access_token, "*"]
|
|
)
|
|
if not redis_response.status:
|
|
raise ValueError("EYS_0001")
|
|
if redis_object := redis_response.first:
|
|
return cls.process_redis_object(redis_object)
|
|
raise ValueError("EYS_0002")
|
|
|
|
@classmethod
|
|
def set_object_to_redis(cls, user: Users, token, header_info):
|
|
result_delete = RedisActions.delete(
|
|
list_keys=[RedisHandlers.AUTH_TOKEN, "*", str(user.uu_id)]
|
|
)
|
|
print("result_delete", result_delete)
|
|
generated_access_token = PasswordModule.generate_access_token()
|
|
keys = [RedisHandlers.AUTH_TOKEN, generated_access_token, str(user.uu_id)]
|
|
RedisActions.set_json(
|
|
list_keys=keys,
|
|
value={**token, **header_info},
|
|
expires={"hours": 1, "minutes": 30},
|
|
)
|
|
return generated_access_token
|
|
|
|
@classmethod
|
|
def update_token_at_redis(cls, token: str, add_payload: Union[CompanyToken, OccupantToken]):
|
|
if already_token_data := RedisActions.get_json(
|
|
list_keys=[RedisHandlers.AUTH_TOKEN, token, "*"]
|
|
).first:
|
|
already_token = cls.process_redis_object(already_token_data)
|
|
if already_token.is_employee and isinstance(add_payload, CompanyToken):
|
|
already_token.selected_company = add_payload
|
|
elif already_token.is_occupant and isinstance(add_payload, OccupantToken):
|
|
already_token.selected_occupant = add_payload
|
|
result = RedisActions.set_json(
|
|
list_keys=[
|
|
RedisHandlers.AUTH_TOKEN,
|
|
token,
|
|
str(already_token.user_uu_id),
|
|
],
|
|
value=already_token.model_dump(),
|
|
expires={"hours": 1, "minutes": 30},
|
|
)
|
|
print("result.first", result.first)
|
|
return result.first
|
|
raise ValueError("Something went wrong")
|
|
|
|
|
|
class UserHandlers:
|
|
|
|
@staticmethod
|
|
def check_user_exists(access_key: str, db_session) -> Users:
|
|
"""
|
|
Check if the user exists in the database.
|
|
"""
|
|
if "@" in access_key:
|
|
found_user: Users = Users.filter_one(
|
|
Users.email == access_key.lower(), db=db_session
|
|
).data
|
|
else:
|
|
found_user: Users = Users.filter_one(
|
|
Users.phone_number == access_key.replace(" ", ""), db=db_session
|
|
).data
|
|
if not found_user:
|
|
ValueError("EYS_0003")
|
|
return found_user
|
|
|
|
@staticmethod
|
|
def check_password_valid(domain: str, id_: str, password: str, password_hashed: str) -> bool:
|
|
"""
|
|
Check if the password is valid.
|
|
"""
|
|
print(
|
|
dict(
|
|
domain=domain,
|
|
id_=id_,
|
|
password=password,
|
|
password_hashed=password_hashed,
|
|
)
|
|
)
|
|
if PasswordModule.check_password(
|
|
domain=domain, id_=id_, password=password, password_hashed=password_hashed
|
|
):
|
|
return True
|
|
raise ValueError("EYS_0004")
|
|
|
|
@staticmethod
|
|
def update_password():
|
|
return
|
|
|
|
|
|
class LoginHandler:
|
|
|
|
@staticmethod
|
|
def is_occupant(email: str):
|
|
return not str(email).split("@")[1] == api_config.ACCESS_EMAIL_EXT
|
|
|
|
@staticmethod
|
|
def is_employee(email: str):
|
|
return str(email).split("@")[1] == api_config.ACCESS_EMAIL_EXT
|
|
|
|
@classmethod
|
|
def do_employee_login(cls, request: Any, data: Any, extra_dict: Optional[Dict[str, Any]] = None):
|
|
"""
|
|
Handle employee login.
|
|
"""
|
|
language = extra_dict.get("language", "tr")
|
|
domain = extra_dict.get("domain", None)
|
|
timezone = extra_dict.get("tz", None) or "GMT+3"
|
|
|
|
user_handler = UserHandlers()
|
|
with Users.new_session() as db_session:
|
|
found_user = user_handler.check_user_exists(
|
|
access_key=data.access_key, db_session=db_session
|
|
)
|
|
|
|
if not user_handler.check_password_valid(
|
|
domain=domain or "",
|
|
id_=str(found_user.uu_id),
|
|
password=data.password,
|
|
password_hashed=found_user.hash_password,
|
|
):
|
|
raise ValueError("EYS_0005")
|
|
|
|
list_employee = Employees.filter_all(
|
|
Employees.people_id == found_user.person_id, db=db_session
|
|
).data
|
|
|
|
companies_uu_id_list: list = []
|
|
companies_id_list: list = []
|
|
companies_list: list = []
|
|
duty_uu_id_list: list = []
|
|
duty_id_list: list = []
|
|
|
|
for employee in list_employee:
|
|
staff = Staff.filter_one(
|
|
Staff.id == employee.staff_id, db=db_session
|
|
).data
|
|
if duties := Duties.filter_one(
|
|
Duties.id == staff.duties_id, db=db_session
|
|
).data:
|
|
if duty_found := Duty.filter_by_one(
|
|
id=duties.duties_id, db=db_session
|
|
).data:
|
|
duty_uu_id_list.append(str(duty_found.uu_id))
|
|
duty_id_list.append(duty_found.id)
|
|
|
|
department = Departments.filter_one(
|
|
Departments.id == duties.department_id, db=db_session
|
|
).data
|
|
|
|
if company := Companies.filter_one(
|
|
Companies.id == department.company_id, db=db_session
|
|
).data:
|
|
companies_uu_id_list.append(str(company.uu_id))
|
|
companies_id_list.append(company.id)
|
|
company_address = Addresses.filter_by_one(
|
|
id=company.official_address_id, db=db_session
|
|
).data
|
|
companies_list.append(
|
|
{
|
|
"uu_id": str(company.uu_id),
|
|
"public_name": company.public_name,
|
|
"company_type": company.company_type,
|
|
"company_address": company_address,
|
|
}
|
|
)
|
|
person = People.filter_one(
|
|
People.id == found_user.person_id, db=db_session
|
|
).data
|
|
model_value = EmployeeTokenObject(
|
|
user_type=UserType.employee.value,
|
|
user_uu_id=str(found_user.uu_id),
|
|
user_id=found_user.id,
|
|
person_id=found_user.person_id,
|
|
person_uu_id=str(person.uu_id),
|
|
request=dict(request.headers),
|
|
companies_uu_id_list=companies_uu_id_list,
|
|
companies_id_list=companies_id_list,
|
|
duty_uu_id_list=duty_uu_id_list,
|
|
duty_id_list=duty_id_list,
|
|
).model_dump()
|
|
|
|
set_to_redis_dict = dict(
|
|
user=found_user,
|
|
token=model_value,
|
|
header_info=dict(language=language, domain=domain, timezone=timezone),
|
|
)
|
|
redis_handler = RedisHandlers()
|
|
if access_token := redis_handler.set_object_to_redis(**set_to_redis_dict):
|
|
return {
|
|
"access_token": access_token,
|
|
"user_type": UserType.employee.name,
|
|
"user": found_user.get_dict(
|
|
exclude_list=[
|
|
Users.hash_password,
|
|
Users.cryp_uu_id,
|
|
Users.password_token,
|
|
Users.created_credentials_token,
|
|
Users.updated_credentials_token,
|
|
Users.confirmed_credentials_token,
|
|
Users.is_confirmed,
|
|
Users.is_notification_send,
|
|
Users.is_email_send,
|
|
Users.remember_me,
|
|
]
|
|
),
|
|
"selection_list": companies_list,
|
|
}
|
|
raise ValueError("Something went wrong")
|
|
|
|
@classmethod
|
|
def do_employee_occupant(cls, request: Any, data: Any, extra_dict: Optional[Dict[str, Any]] = None):
|
|
"""
|
|
Handle occupant login.
|
|
"""
|
|
language = extra_dict.get("language", "tr")
|
|
domain = extra_dict.get("domain", None)
|
|
timezone = extra_dict.get("tz", None) or "GMT+3"
|
|
|
|
user_handler = UserHandlers()
|
|
with Users.new_session() as db_session:
|
|
found_user = user_handler.check_user_exists(
|
|
access_key=data.access_key, db_session=db_session
|
|
)
|
|
if not user_handler.check_password_valid(
|
|
domain=data.domain,
|
|
id_=str(found_user.uu_id),
|
|
password=data.password,
|
|
password_hashed=found_user.hash_password,
|
|
):
|
|
raise ValueError("EYS_0005")
|
|
|
|
occupants_selection_dict: Dict[str, Any] = {}
|
|
living_spaces: list[BuildLivingSpace] = BuildLivingSpace.filter_all(
|
|
BuildLivingSpace.person_id == found_user.person_id, db=db_session
|
|
).data
|
|
|
|
if not living_spaces:
|
|
raise ValueError("EYS_0006")
|
|
for living_space in living_spaces:
|
|
build_parts_selection = BuildParts.filter_all(
|
|
BuildParts.id == living_space.build_parts_id,
|
|
db=db_session,
|
|
).data
|
|
if not build_parts_selection:
|
|
raise ValueError("EYS_0007")
|
|
|
|
build_part = build_parts_selection[0]
|
|
|
|
build = build_part.buildings
|
|
occupant_type = OccupantTypes.filter_by_one(
|
|
id=living_space.occupant_type,
|
|
db=db_session,
|
|
system=True,
|
|
).data
|
|
|
|
occupant_data = {
|
|
"part_uu_id": str(build_part.uu_id),
|
|
"part_name": build_part.part_name,
|
|
"part_level": build_part.part_level,
|
|
"uu_id": str(occupant_type.uu_id),
|
|
"description": occupant_type.occupant_description,
|
|
"code": occupant_type.occupant_code,
|
|
}
|
|
|
|
build_key = str(build.uu_id)
|
|
if build_key not in occupants_selection_dict:
|
|
occupants_selection_dict[build_key] = {
|
|
"build_uu_id": build_key,
|
|
"build_name": build.build_name,
|
|
"build_no": build.build_no,
|
|
"occupants": [occupant_data],
|
|
}
|
|
else:
|
|
occupants_selection_dict[build_key]["occupants"].append(
|
|
occupant_data
|
|
)
|
|
|
|
person = found_user.person
|
|
model_value = OccupantTokenObject(
|
|
user_type=UserType.occupant.value,
|
|
user_uu_id=str(found_user.uu_id),
|
|
user_id=found_user.id,
|
|
person_id=person.id,
|
|
person_uu_id=str(person.uu_id),
|
|
request=dict(request.headers),
|
|
available_occupants=occupants_selection_dict,
|
|
).model_dump()
|
|
redis_handler = RedisHandlers()
|
|
if access_token := redis_handler.set_object_to_redis(
|
|
user=found_user,
|
|
token=model_value,
|
|
header_info=dict(language=language, domain=domain, timezone=timezone),
|
|
):
|
|
return {
|
|
"access_token": access_token,
|
|
"user_type": UserType.occupant.name,
|
|
"selection_list": occupants_selection_dict,
|
|
}
|
|
raise ValueError("Something went wrong")
|
|
|
|
@classmethod
|
|
def authentication_login_with_domain_and_creds(cls, request: Any, data: Any):
|
|
"""
|
|
Authenticate user with domain and credentials.
|
|
|
|
Args:
|
|
request: FastAPI request object
|
|
data: Request body containing login credentials
|
|
{
|
|
"domain": "evyos.com.tr",
|
|
"access_key": "karatay.berkay.sup@evyos.com.tr",
|
|
"password": "string",
|
|
"remember_me": false
|
|
}
|
|
Returns:
|
|
SuccessResponse containing authentication token and user info
|
|
"""
|
|
language = request.headers.get("language", "tr")
|
|
domain = request.headers.get("domain", None)
|
|
timezone = request.headers.get("tz", None) or "GMT+3"
|
|
|
|
if cls.is_employee(data.access_key):
|
|
return cls.do_employee_login(
|
|
request=request,
|
|
data=data,
|
|
extra_dict=dict(
|
|
language=language,
|
|
domain=domain,
|
|
timezone=timezone,
|
|
),
|
|
)
|
|
elif cls.is_occupant(data.access_key):
|
|
return cls.do_employee_login(
|
|
request=request,
|
|
data=data,
|
|
extra_dict=dict(
|
|
language=language,
|
|
domain=domain,
|
|
timezone=timezone,
|
|
),
|
|
)
|
|
else:
|
|
raise ValueError("Invalid email format")
|
|
|
|
@classmethod
|
|
def raise_error_if_request_has_no_token(cls, request: Any) -> None:
|
|
"""Validate request has required token headers."""
|
|
if not hasattr(request, "headers"):
|
|
raise ValueError("Request has no headers")
|
|
if not request.headers.get(api_config.ACCESS_TOKEN_TAG):
|
|
raise ValueError("Request has no access token")
|
|
|
|
@classmethod
|
|
def get_access_token_from_request(cls, request: Any) -> str:
|
|
"""Extract access token from request headers."""
|
|
cls.raise_error_if_request_has_no_token(request=request)
|
|
return request.headers.get(api_config.ACCESS_TOKEN_TAG)
|
|
|
|
@classmethod
|
|
def handle_employee_selection(cls, access_token: str, data: Any, token_dict: TokenDictType):
|
|
with Users.new_session() as db:
|
|
if data.company_uu_id not in token_dict.companies_uu_id_list:
|
|
ValueError("EYS_0011")
|
|
selected_company: Companies = Companies.filter_one(
|
|
Companies.uu_id == data.company_uu_id, db=db
|
|
).data
|
|
if not selected_company:
|
|
ValueError("EYS_0009")
|
|
|
|
# Get duties IDs for the company
|
|
duties_ids = [
|
|
duty.id
|
|
for duty in Duties.filter_all(
|
|
Duties.company_id == selected_company.id, db=db
|
|
).data
|
|
]
|
|
|
|
# Get staff IDs
|
|
staff_ids = [
|
|
staff.id
|
|
for staff in Staff.filter_all(
|
|
Staff.duties_id.in_(duties_ids), db=db
|
|
).data
|
|
]
|
|
|
|
# Get employee
|
|
employee: Employees = Employees.filter_one(
|
|
Employees.people_id == token_dict.person_id,
|
|
Employees.staff_id.in_(staff_ids),
|
|
db=db,
|
|
).data
|
|
|
|
if not employee:
|
|
ValueError("EYS_0010")
|
|
|
|
# Get reachable events
|
|
reachable_event_codes = Event2Employee.get_event_codes(
|
|
employee_id=employee.id, db=db
|
|
)
|
|
|
|
# Get staff and duties
|
|
staff = Staff.filter_one(Staff.id == employee.staff_id, db=db).data
|
|
duties = Duties.filter_one(Duties.id == staff.duties_id, db=db).data
|
|
department = Departments.filter_one(
|
|
Departments.id == duties.department_id, db=db
|
|
).data
|
|
|
|
# Get bulk duty
|
|
bulk_id = Duty.filter_by_one_system(duty_code="BULK", db=db).data
|
|
bulk_duty_id = Duties.filter_by_one(
|
|
company_id=selected_company.id,
|
|
duties_id=bulk_id.id,
|
|
db=db,
|
|
).data
|
|
|
|
reachable_app_codes = Application2Employee.get_application_codes(
|
|
employee_id=employee.id, db=db
|
|
)
|
|
|
|
# Create company token
|
|
company_token = CompanyToken(
|
|
company_uu_id=selected_company.uu_id.__str__(),
|
|
company_id=selected_company.id,
|
|
department_id=department.id,
|
|
department_uu_id=department.uu_id.__str__(),
|
|
duty_id=duties.id,
|
|
duty_uu_id=duties.uu_id.__str__(),
|
|
bulk_duties_id=bulk_duty_id.id,
|
|
staff_id=staff.id,
|
|
staff_uu_id=staff.uu_id.__str__(),
|
|
employee_id=employee.id,
|
|
employee_uu_id=employee.uu_id.__str__(),
|
|
reachable_event_codes=reachable_event_codes,
|
|
reachable_app_codes=reachable_app_codes
|
|
)
|
|
redis_handler = RedisHandlers()
|
|
redis_result = redis_handler.update_token_at_redis(
|
|
token=access_token, add_payload=company_token
|
|
)
|
|
return {
|
|
"selected_uu_id": data.company_uu_id,
|
|
}
|
|
|
|
@classmethod
|
|
def handle_occupant_selection(cls, access_token: str, data: Any, token_dict: TokenDictType):
|
|
"""Handle occupant type selection"""
|
|
with BuildLivingSpace.new_session() as db:
|
|
# Get selected occupant type
|
|
selected_build_living_space: BuildLivingSpace = BuildLivingSpace.filter_one(
|
|
BuildLivingSpace.uu_id == data.build_living_space_uu_id,
|
|
db=db,
|
|
).data
|
|
if not selected_build_living_space:
|
|
raise ValueError("EYS_0012")
|
|
|
|
# Get reachable events
|
|
reachable_event_codes = Event2Occupant.get_event_codes(
|
|
build_living_space_id=selected_build_living_space.id, db=db
|
|
)
|
|
occupant_type = OccupantTypes.filter_one_system(
|
|
OccupantTypes.id == selected_build_living_space.occupant_type_id,
|
|
db=db,
|
|
).data
|
|
build_part = BuildParts.filter_one(
|
|
BuildParts.id == selected_build_living_space.build_parts_id,
|
|
db=db,
|
|
).data
|
|
build = BuildParts.filter_one(
|
|
BuildParts.id == build_part.build_id,
|
|
db=db,
|
|
).data
|
|
responsible_employee = Employees.filter_one(
|
|
Employees.id == build_part.responsible_employee_id,
|
|
db=db,
|
|
).data
|
|
related_company = RelationshipEmployee2Build.filter_one(
|
|
RelationshipEmployee2Build.member_id == build.id,
|
|
db=db,
|
|
).data
|
|
# Get company
|
|
company_related = Companies.filter_one(
|
|
Companies.id == related_company.company_id,
|
|
db=db,
|
|
).data
|
|
|
|
# Create occupant token
|
|
occupant_token = OccupantToken(
|
|
living_space_id=selected_build_living_space.id,
|
|
living_space_uu_id=selected_build_living_space.uu_id.__str__(),
|
|
occupant_type_id=occupant_type.id,
|
|
occupant_type_uu_id=occupant_type.uu_id.__str__(),
|
|
occupant_type=occupant_type.occupant_type,
|
|
build_id=build.id,
|
|
build_uuid=build.uu_id.__str__(),
|
|
build_part_id=build_part.id,
|
|
build_part_uuid=build_part.uu_id.__str__(),
|
|
responsible_employee_id=responsible_employee.id,
|
|
responsible_employee_uuid=responsible_employee.uu_id.__str__(),
|
|
responsible_company_id=company_related.id,
|
|
responsible_company_uuid=company_related.uu_id.__str__(),
|
|
reachable_event_codes=reachable_event_codes,
|
|
)
|
|
redis_handler = RedisHandlers()
|
|
redis_handler.update_token_at_redis(
|
|
token=access_token, add_payload=occupant_token
|
|
)
|
|
return {
|
|
"selected_uu_id": data.company_uu_id,
|
|
}
|
|
|
|
@classmethod # Requires auth context
|
|
def authentication_select_company_or_occupant_type(cls, request: Any, data: Any):
|
|
"""
|
|
Handle selection of company or occupant type
|
|
{"data": {"build_living_space_uu_id": ""}} | {"data": {"company_uu_id": ""}}
|
|
{
|
|
"data": {"company_uu_id": "e9869a25-ba4d-49dc-bb0d-8286343b184b"}
|
|
}
|
|
{
|
|
"data": {"build_living_space_uu_id": "e9869a25-ba4d-49dc-bb0d-8286343b184b"}
|
|
}
|
|
"""
|
|
access_token = request.headers.get(api_config.ACCESS_TOKEN_TAG, None)
|
|
if not access_token:
|
|
raise ValueError("EYS_0001")
|
|
|
|
token_object = RedisHandlers.get_object_from_redis(access_token=access_token)
|
|
if token_object.is_employee:
|
|
return cls.handle_employee_selection(
|
|
access_token=access_token,
|
|
data=data,
|
|
token_dict=token_object,
|
|
)
|
|
elif token_object.is_occupant:
|
|
return cls.handle_occupant_selection(
|
|
access_token=access_token,
|
|
data=data,
|
|
token_dict=token_object,
|
|
)
|
|
|
|
@classmethod
|
|
def authentication_check_token_valid(cls, access_token: str) -> bool:
|
|
redis_handler = RedisHandlers()
|
|
if redis_handler.get_object_from_redis(access_token=access_token):
|
|
return True
|
|
return False
|
|
|
|
|
|
class PasswordHandler:
|
|
|
|
@staticmethod
|
|
def create_password(password, password_token=None):
|
|
with Users.new_session() as db_session:
|
|
found_user = Users.filter_one(
|
|
Users.password_token == password_token, db=db_session
|
|
).data
|
|
if not found_user:
|
|
raise ValueError("EYS_0031")
|
|
|
|
if found_user.password_token:
|
|
replace_day = 0
|
|
try:
|
|
replace_day = int(
|
|
str(found_user.password_expires_day or 0)
|
|
.split(",")[0]
|
|
.replace(" days", "")
|
|
)
|
|
except Exception as e:
|
|
err = e
|
|
token_is_expired = arrow.now() >= arrow.get(
|
|
str(found_user.password_expiry_begins)
|
|
).shift(days=replace_day)
|
|
|
|
if not password_token == found_user.password_token and token_is_expired:
|
|
raise ValueError("")
|
|
|
|
collection_name = f"{found_user.related_company}*Domain"
|
|
print("collection_name", collection_name)
|
|
with mongo_handler.collection(collection_name) as mongo_engine:
|
|
print({"user_uu_id": str(found_user.uu_id)})
|
|
domain_via_user = mongo_engine.find_one(
|
|
{"user_uu_id": str(found_user.uu_id)}
|
|
)
|
|
print("domain_via_user", domain_via_user)
|
|
if not domain_via_user:
|
|
raise ValueError("EYS_0024")
|
|
domain_via_user = domain_via_user.get("main_domain", None)
|
|
new_password_dict = {
|
|
"password": PasswordModule.create_hashed_password(
|
|
domain=domain_via_user,
|
|
id_=str(found_user.uu_id),
|
|
password=password,
|
|
),
|
|
"date": str(arrow.now().date()),
|
|
}
|
|
history_dict = PasswordHistoryViaUser(
|
|
user_uu_id=str(found_user.uu_id),
|
|
password_add=new_password_dict,
|
|
access_history_detail={"request": "", "ip": ""},
|
|
)
|
|
found_user.password_expiry_begins = str(arrow.now())
|
|
found_user.hash_password = new_password_dict.get("password")
|
|
found_user.password_token = "" if found_user.password_token else ""
|
|
|
|
collection_name = f"{found_user.related_company}*PasswordHistory"
|
|
with mongo_handler.collection(collection_name) as mongo_engine_sc:
|
|
password_history_item = mongo_engine_sc.find_one(
|
|
{"user_uu_id": str(found_user.uu_id)}
|
|
)
|
|
if not password_history_item:
|
|
mongo_engine_sc.insert_one(
|
|
document={
|
|
"user_uu_id": str(found_user.uu_id),
|
|
"password_history": [],
|
|
}
|
|
)
|
|
password_history_item = mongo_engine_sc.find_one(
|
|
{"user_uu_id": str(found_user.uu_id)}
|
|
)
|
|
password_history_list = password_history_item.get(
|
|
"password_history", []
|
|
)
|
|
hashed_password = history_dict.password_add.get("password")
|
|
for password_in_history in password_history_list:
|
|
if str(password_in_history.get("password")) == str(
|
|
hashed_password
|
|
):
|
|
raise ValueError("EYS_0032")
|
|
if len(password_history_list) > 3:
|
|
password_history_list.pop(0)
|
|
password_history_list.append(history_dict.password_add)
|
|
return mongo_engine_sc.update_one(
|
|
filter={"user_uu_id": str(found_user.uu_id)},
|
|
update={
|
|
"$set": {
|
|
"password_history": password_history_list,
|
|
"modified_at": arrow.now().timestamp(),
|
|
"access_history_detail": history_dict.access_history_detail,
|
|
}
|
|
},
|
|
upsert=True,
|
|
)
|
|
found_user.save(db=db_session)
|
|
return found_user
|
|
|
|
|
|
class PageHandlers:
|
|
|
|
@classmethod
|
|
def retrieve_valid_page_via_token(cls, access_token: str, page_url: str) -> str:
|
|
"""
|
|
Retrieve valid page via token.
|
|
{
|
|
access_token: "string",
|
|
page_url: "string"
|
|
}
|
|
Results: str(application)
|
|
"""
|
|
if result := RedisHandlers.get_object_from_redis(access_token=access_token):
|
|
if result.is_employee:
|
|
if application := result.selected_company.reachable_app_codes.get(page_url, None):
|
|
return application
|
|
elif result.is_occupant:
|
|
if application := result.selected_company.reachable_app_codes.get(page_url, None):
|
|
return application
|
|
raise ValueError("EYS_0013")
|
|
|
|
|
|
@classmethod
|
|
def retrieve_valid_sites_via_token(cls, access_token: str) -> list:
|
|
"""
|
|
Retrieve valid pages via token.
|
|
{
|
|
"access_token": "string"
|
|
}
|
|
Results: list(sites)
|
|
"""
|
|
if result := RedisHandlers.get_object_from_redis(access_token=access_token):
|
|
if result.is_employee:
|
|
return result.selected_company.reachable_app_codes.keys()
|
|
elif result.is_occupant:
|
|
return result.selected_company.reachable_app_codes.keys()
|
|
raise ValueError("EYS_0013")
|
|
|
|
|
|
class AuthHandlers:
|
|
|
|
LoginHandler: LoginHandler = LoginHandler()
|
|
PasswordHandler: PasswordHandler = PasswordHandler()
|
|
PageHandlers: PageHandlers = PageHandlers()
|