wag-managment-api-service-v.../api_events/events/application/authentication.py

758 lines
28 KiB
Python

import json
import typing
from typing import Union
from fastapi import status
from fastapi.requests import Request
from fastapi.exceptions import HTTPException
from fastapi.responses import JSONResponse
from databases import (
Companies,
Staff,
Duties,
Departments,
Employees,
BuildLivingSpace,
BuildParts,
Build,
Duty,
Event2Occupant,
Event2Employee,
Users,
UsersTokens,
OccupantTypes,
RelationshipEmployee2Build,
)
from api_services import (
redis_cli,
send_email,
get_object_via_access_key,
get_object_via_user_uu_id,
save_access_token_to_redis,
update_selected_to_redis,
password_is_changed_template,
change_your_password_template,
)
from api_configs import ApiStatic, Auth
from api_events.events.abstract_class import MethodToEvent, ActionsSchema
from api_objects.auth.token_objects import EmployeeTokenObject, OccupantTokenObject
from api_library.date_time_actions.date_functions import system_arrow
from databases.no_sql_models.login_handlers import load_user_with_erp_details
from api_validations.validations_request import (
Login,
Logout,
ChangePassword,
Remember,
Forgot,
CreatePassword,
OccupantSelection,
EmployeeSelection,
)
class AuthenticationLoginEventMethods(MethodToEvent):
event_type = "LOGIN"
event_description = "Login via domain and access key : [email] | [phone]"
event_category = "AUTHENTICATION"
__event_keys__ = {
"e672846d-cc45-4d97-85d5-6f96747fac67": "authentication_login_with_domain_and_creds",
}
@classmethod
def authentication_login_with_domain_and_creds(
cls,
data: Login,
request,
):
access_dict = Users.login_user_with_credentials(data=data, request=request)
found_user = access_dict.get("user", None)
if not found_user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials"
)
return JSONResponse(
content={
"completed": True,
"message": "User is logged in successfully",
"access_token": access_dict.get("access_token"),
"refresh_token": access_dict.get("refresher_token"),
"access_object": access_dict.get("access_object"),
"user": found_user.get_dict(),
},
status_code=status.HTTP_200_OK,
)
class AuthenticationSelectEventMethods(MethodToEvent):
event_type = "LOGIN"
event_description = "Select Employee Duty or Occupant Type"
event_category = "AUTHENTICATION"
__event_keys__ = {
"cee96b9b-8487-4e9f-aaed-2e8c79687bf9": "authentication_select_company_or_occupant_type",
}
@classmethod
def authentication_select_company_or_occupant_type(
cls,
request: Request,
data,
token_dict: typing.Union[EmployeeTokenObject, OccupantTokenObject],
):
from api_objects import OccupantToken, CompanyToken
if isinstance(token_dict, EmployeeTokenObject):
if data.company_uu_id not in token_dict.companies_uu_id_list:
return JSONResponse(
content={
"completed": False,
"message": "Company is not found in users company list",
},
status_code=status.HTTP_401_UNAUTHORIZED,
)
if selected_company := Companies.filter_one(
Companies.uu_id == data.company_uu_id,
).data:
department_ids = [
department.id
for department in Departments.filter_all(
Departments.company_id == selected_company.id,
).data
]
duties_ids = [
duties.id
for duties in Duties.filter_all(
Duties.company_id == selected_company.id,
Duties.department_id.in_(department_ids),
).data
]
staff_ids = [
staff.id
for staff in Staff.filter_all(
Staff.duties_id.in_(duties_ids),
).data
]
employee = Employees.filter_one(
Employees.people_id == token_dict.person_id,
Employees.staff_id.in_(staff_ids),
).data
reachable_event_list_id, reachable_event_list_uu_id = (
Event2Employee.get_event_id_by_employee_id(employee_id=employee.id)
)
staff = Staff.filter_one(
Staff.id == employee.staff_id,
).data
duties = Duties.filter_one(
Duties.id == staff.duties_id,
).data
department = Departments.filter_one(
Departments.id == duties.department_id,
).data
bulk_id = Duty.filter_one(
Duty.duty_code == "BULK",
).data
bulk_duty_id = Duties.filter_one(
Duties.company_id == selected_company.id,
Duties.duties_id == bulk_id.id,
).data
update_selected_to_redis(
request=request,
add_payload=CompanyToken(
company_uu_id=selected_company.uu_id.__str__(),
company_id=selected_company.id,
department_id=department.id,
department_uu_id=department.uu_id.__str__(),
duty_id=duties.id,
duty_uu_id=duties.uu_id.__str__(),
bulk_duties_id=bulk_duty_id.id,
staff_id=staff.id,
staff_uu_id=staff.uu_id.__str__(),
employee_id=employee.id,
employee_uu_id=employee.uu_id.__str__(),
reachable_event_list_id=reachable_event_list_id,
reachable_event_list_uu_id=reachable_event_list_uu_id,
),
)
return JSONResponse(
content={
"completed": True,
"message": "Company is selected successfully",
},
status_code=status.HTTP_200_OK,
)
elif isinstance(token_dict, OccupantTokenObject):
occupant_type = OccupantTypes.filter_one(
OccupantTypes.uu_id == data.occupant_uu_id
).data
if not occupant_type:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Occupant Type is not found",
)
build_part = BuildParts.filter_one(
BuildParts.uu_id == data.build_part_uu_id,
).data
if not build_part:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Build Part is not found",
)
build = Build.filter_one(Build.id == build_part.build_id).data
related_company = RelationshipEmployee2Build.filter_one(
RelationshipEmployee2Build.member_id == build.id,
).data
company_related = Companies.filter_one(
Companies.id == related_company.company_id,
).data
responsible_employee = Employees.filter_one(
Employees.id == related_company.employee_id,
).data
if selected_occupant_type := BuildLivingSpace.filter_one(
BuildLivingSpace.occupant_type == occupant_type.id,
BuildLivingSpace.person_id == token_dict.person_id,
BuildLivingSpace.build_parts_id == build_part.id,
).data:
reachable_event_list_id, reachable_event_list_uu_id = (
Event2Occupant.get_event_id_by_build_living_space_id(
Event2Occupant.build_living_space_id
== selected_occupant_type.id
)
)
update_selected_to_redis(
request=request,
add_payload=OccupantToken(
living_space_id=selected_occupant_type.id,
living_space_uu_id=selected_occupant_type.uu_id.__str__(),
occupant_type_id=occupant_type.id,
occupant_type_uu_id=occupant_type.uu_id.__str__(),
occupant_type=occupant_type.occupant_type,
build_id=build.id,
build_uuid=build.uu_id.__str__(),
build_part_id=build_part.id,
build_part_uuid=build_part.uu_id.__str__(),
responsible_employee_id=responsible_employee.id,
responsible_employee_uuid=responsible_employee.uu_id.__str__(),
responsible_company_id=company_related.id,
responsible_company_uuid=company_related.uu_id.__str__(),
reachable_event_list_id=reachable_event_list_id,
reachable_event_list_uu_id=reachable_event_list_uu_id,
),
)
return JSONResponse(
content={
"completed": True,
"message": "Occupant is selected successfully",
},
status_code=status.HTTP_200_OK,
)
return JSONResponse(
content={"completed": False, "message": "Invalid data provided"},
status_code=status.HTTP_418_IM_A_TEAPOT,
)
class AuthenticationCheckTokenEventMethods(MethodToEvent):
event_type = "LOGIN"
event_description = "Check Token is valid for user"
event_category = "AUTHENTICATION"
__event_keys__ = {
"73d77e45-a33f-4f12-909e-3b56f00d8a12": "authentication_check_token_is_valid",
}
@classmethod
def authentication_login_with_domain_and_creds(
cls,
request,
token_dict: typing.Union[EmployeeSelection, OccupantSelection],
):
if get_object_via_access_key(request=request):
return JSONResponse(
content={"completed": True, "message": "Access Token is valid"},
status_code=status.HTTP_200_OK,
)
return JSONResponse(
content={"completed": False, "message": "Access Token is NOT valid"},
status_code=status.HTTP_401_UNAUTHORIZED,
)
class AuthenticationRefreshEventMethods(MethodToEvent):
event_type = "LOGIN"
event_description = (
"Refresher Token for refreshing access token without credentials"
)
event_category = "AUTHENTICATION"
__event_keys__ = {
"48379bb2-ba81-4d8e-a9dd-58837cfcbf67": "authentication_refresh_user_info",
}
@classmethod
def authentication_refresh_user_info(
cls,
request,
token_dict: typing.Union[EmployeeSelection, OccupantSelection],
):
access_token = str(request.headers.get(Auth.ACCESS_TOKEN_TAG))
if token_user := get_object_via_access_key(request=request):
if found_user := Users.filter_one(
Users.uu_id == token_user.get("uu_id")
).data:
user_token = UsersTokens.filter_one(
UsersTokens.domain == found_user.domain_name,
UsersTokens.user_id == found_user.id,
UsersTokens.token_type == "RememberMe",
).data
access_dict = {
"access_token": access_token,
"refresh_token": getattr(user_token, "token", None),
}
return JSONResponse(
content={
"completed": True,
"message": "User is logged in successfully via refresh token",
"data": load_user_with_erp_details(found_user, access_dict),
},
status_code=status.HTTP_200_OK,
)
return JSONResponse(
content={"completed": False, "message": "Invalid data", "data": {}},
status_code=status.HTTP_401_UNAUTHORIZED,
)
class AuthenticationChangePasswordEventMethods(MethodToEvent):
event_type = "LOGIN"
event_description = "Change password with access token implemented on request headers without password reset token"
event_category = "AUTHENTICATION"
__event_keys__ = {
"f09f7c1a-bee6-4e32-8444-962ec8f39091": "authentication_change_password",
}
@classmethod
def authentication_change_password(
cls,
data: ChangePassword,
token_dict: typing.Union[EmployeeTokenObject, OccupantTokenObject],
):
if isinstance(token_dict, EmployeeTokenObject):
if found_user := Users.filter_one(Users.uu_id == token_dict.uu_id).data:
if found_user.check_password(data.old_password):
found_user.set_password(data.new_password)
return JSONResponse(
content={
"completed": True,
"message": "Password is changed successfully",
},
status_code=status.HTTP_200_OK,
)
return JSONResponse(
content={
"completed": False,
"message": "Old password is not correct",
},
status_code=status.HTTP_401_UNAUTHORIZED,
)
return JSONResponse(
content={"completed": False, "message": "Invalid data"},
status_code=status.HTTP_401_UNAUTHORIZED,
)
class AuthenticationCreatePasswordEventMethods(MethodToEvent):
event_type = "LOGIN"
event_description = "Create password with password reset token requested via email"
event_category = "AUTHENTICATION"
__event_keys__ = {
"c519f9af-92e1-47b2-abf7-5a3316d075f7": "authentication_create_password",
}
@classmethod
def authentication_create_password(cls, data: CreatePassword):
if not data.re_password == data.password:
raise HTTPException(
status_code=status.HTTP_406_NOT_ACCEPTABLE, detail="Password must match"
)
if found_user := Users.filter_one(
Users.password_token == data.password_token
).data:
found_user.create_password(password=data.password)
found_user.password_token = None
Users.save()
send_email_completed = send_email(
subject=f"Dear {found_user.user_tag}, your password has been changed.",
receivers=[str(found_user.email)],
html=password_is_changed_template(user_name=found_user.user_tag),
)
if not send_email_completed:
raise HTTPException(
status_code=400, detail="Email can not be sent. Try again later"
)
return JSONResponse(
content={
"completed": True,
"message": "Password is created successfully",
"data": found_user.get_dict(),
},
status_code=status.HTTP_200_OK,
)
return JSONResponse(
content={
"completed": False,
"message": "Record not found",
"data": {},
},
status_code=status.HTTP_202_ACCEPTED,
)
class AuthenticationDisconnectUserEventMethods(MethodToEvent):
event_type = "LOGIN"
event_description = "Disconnect all sessions of user in access token"
event_category = "AUTHENTICATION"
__event_keys__ = {
"8b586848-2fb3-4161-abbe-642157eec7ce": "authentication_disconnect_user",
}
@classmethod
def authentication_disconnect_user(
cls,
data: Logout,
token_dict: Union[EmployeeTokenObject, OccupantTokenObject]
):
found_user = Users.filter_one(Users.uu_id == token_dict.user_uu_id).data
if not found_user:
return JSONResponse(
content={
"completed": False,
"message": "Invalid data",
"data": None,
},
status_code=status.HTTP_202_ACCEPTED,
)
if already_tokens := get_object_via_user_uu_id(user_id=str(found_user.uu_id)):
for key, token_user in already_tokens.items():
redis_cli.delete(key)
selected_user = Users.filter_one(
Users.uu_id == token_user.get("uu_id"),
).data
selected_user.remove_refresher_token(
domain=data.domain, disconnect=True
)
return JSONResponse(
content={
"completed": True,
"message": "All sessions are disconnected",
"data": selected_user.get_dict(),
},
status_code=status.HTTP_200_OK,
)
return JSONResponse(
content={"completed": False, "message": "Invalid data", "data": None},
status_code=status.HTTP_202_ACCEPTED,
)
class AuthenticationLogoutEventMethods(MethodToEvent):
event_type = "LOGIN"
event_description = "Logout only single session of user which domain is provided"
event_category = "AUTHENTICATION"
__event_keys__ = {
"5cc22e4e-a0f7-4077-be41-1871feb3dfd1": "authentication_logout_user",
}
@classmethod
def authentication_logout_user(
cls, request: Request, data: Logout, token_dict: dict = None
):
token_user = None
if already_tokens := get_object_via_access_key(request=request):
for key in already_tokens:
token_user = json.loads(redis_cli.get(key) or {})
if token_user.get("domain") == data.domain:
redis_cli.delete(key)
selected_user = Users.filter_one(
Users.uu_id == token_user.get("uu_id"),
).data
selected_user.remove_refresher_token(domain=data.domain)
return JSONResponse(
content={
"completed": True,
"message": "Session is logged out",
"data": token_user,
},
status_code=status.HTTP_200_OK,
)
return JSONResponse(
content={
"completed": False,
"message": "Logout is not successfully completed",
"data": None,
},
status_code=status.HTTP_202_ACCEPTED,
)
class AuthenticationRefreshTokenEventMethods(MethodToEvent):
event_type = "LOGIN"
event_description = "Refresh access token with refresher token"
event_category = "AUTHENTICATION"
__event_keys__ = {
"c90f3334-10c9-4181-b5ff-90d98a0287b2": "authentication_refresher_token",
}
@classmethod
def authentication_refresher_token(
cls, request: Request, data: Remember, token_dict: dict = None
):
token_refresher = UsersTokens.filter_by_one(
token=data.refresh_token,
domain=data.domain,
**UsersTokens.valid_record_dict,
).data
if not token_refresher:
return JSONResponse(
content={"completed": False, "message": "Invalid data", "data": {}},
status_code=status.HTTP_202_ACCEPTED,
)
if found_user := Users.filter_one(
Users.id == token_refresher.user_id,
).data:
found_user: Users = found_user
access_key = save_access_token_to_redis(
request=request, found_user=found_user, domain=data.domain
)
found_user.last_agent = request.headers.get("User-Agent", None)
found_user.last_platform = request.headers.get("Origin", None)
found_user.last_remote_addr = getattr(
request, "remote_addr", None
) or request.headers.get("X-Forwarded-For", None)
found_user.last_seen = str(system_arrow.now())
return JSONResponse(
content={
"completed": True,
"message": "User is logged in successfully via refresher token",
"data": load_user_with_erp_details(
found_user,
{
"access_token": access_key,
"refresh_token": data.refresh_token,
},
),
},
status_code=status.HTTP_200_OK,
)
return JSONResponse(
content={"completed": False, "message": "Invalid data", "data": {}},
status_code=status.HTTP_202_ACCEPTED,
)
class AuthenticationForgotPasswordEventMethods(MethodToEvent):
event_type = "LOGIN"
event_description = "Send an email to user for a valid password reset token"
event_category = "AUTHENTICATION"
__event_keys__ = {
"e3ca6e24-b9f8-4127-949c-3bfa364e3513": "authentication_forgot_password",
}
@classmethod
def authentication_forgot_password(
cls,
request: Request,
data: Forgot,
):
found_user: Users = Users.check_user_exits(
access_key=data.access_key, domain=data.domain
)
forgot_key = save_access_token_to_redis(
request=request, found_user=found_user, domain=data.domain
)
forgot_link = ApiStatic.forgot_link(forgot_key=forgot_key)
send_email_completed = send_email(
subject=f"Dear {found_user.user_tag}, your forgot password link has been sent.",
receivers=[str(found_user.email)],
html=change_your_password_template(
user_name=found_user.user_tag, forgot_link=forgot_link
),
)
if not send_email_completed:
raise HTTPException(
status_code=400, detail="Email can not be sent. Try again later"
)
found_user.password_token = forgot_key
found_user.password_token_is_valid = str(system_arrow.shift(days=1))
found_user.save()
return JSONResponse(
content={
"completed": True,
"message": "Password is change link is sent to your email or phone",
"data": {},
},
status_code=status.HTTP_200_OK,
)
class AuthenticationDownloadAvatarEventMethods(MethodToEvent):
event_type = "LOGIN"
event_description = "Download avatar icon and profile info of user"
event_category = "AUTHENTICATION"
__event_keys__ = {
"c140cd5f-307f-4046-a93e-3ade032a57a7": "authentication_download_avatar",
}
@classmethod
def authentication_download_avatar(
cls, data: Forgot
):
if found_user := Users.check_user_exits(
access_key=data.access_key, domain=data.domain
):
expired_starts = str(system_arrow.now() - system_arrow.get(str(found_user.expiry_ends)))
expired_int = int(system_arrow.now() - system_arrow.get(str(found_user.expiry_ends)).days)
return JSONResponse(
content={
"completed": True,
"message": "Avatar and profile is shared via user credentials",
"data": {
"last_seen": str(found_user.last_seen),
"avatar": found_user.avatar,
"remember_me": found_user.remember_me,
"expiry_ends": str(found_user.expiry_ends),
"expired_str": expired_starts,
"expired_int": expired_int,
},
},
status_code=status.HTTP_200_OK,
)
return JSONResponse(
content={"completed": False, "message": "Invalid data", "data": {}},
status_code=status.HTTP_202_ACCEPTED,
)
AuthenticationLoginEventMethod = AuthenticationLoginEventMethods(
action=ActionsSchema(endpoint="/authentication/login")
)
AuthenticationSelectEventMethod = AuthenticationSelectEventMethods(
action=ActionsSchema(endpoint="/authentication/select")
)
AuthenticationCheckTokenEventMethod = AuthenticationCheckTokenEventMethods(
action=ActionsSchema(endpoint="/authentication/valid")
)
AuthenticationRefreshEventMethod = AuthenticationRefreshEventMethods(
action=ActionsSchema(endpoint="/authentication/refresh")
)
AuthenticationChangePasswordEventMethod = AuthenticationChangePasswordEventMethods(
action=ActionsSchema(endpoint="/authentication/change_password")
)
AuthenticationCreatePasswordEventMethod = AuthenticationCreatePasswordEventMethods(
action=ActionsSchema(endpoint="/authentication/create_password")
)
AuthenticationDisconnectUserEventMethod = AuthenticationDisconnectUserEventMethods(
action=ActionsSchema(endpoint="/authentication/disconnect")
)
AuthenticationLogoutEventMethod = AuthenticationLogoutEventMethods(
action=ActionsSchema(endpoint="/authentication/logout")
)
AuthenticationRefreshTokenEventMethod = AuthenticationRefreshTokenEventMethods(
action=ActionsSchema(endpoint="/authentication/refresher")
)
AuthenticationForgotPasswordEventMethod = AuthenticationForgotPasswordEventMethods(
action=ActionsSchema(endpoint="/authentication/forgot")
)
AuthenticationDownloadAvatarEventMethod = AuthenticationDownloadAvatarEventMethods(
action=ActionsSchema(endpoint="/authentication/avatar")
)
# UserLogger.log_error(
# str(
# dict(
# user_id=found_user.id,
# domain=data.domain,
# access_key=token_user.get("access_input"),
# agent=request.headers.get("User-Agent", None),
# ip=getattr(request, "remote_addr", None)
# or request.headers.get("X-Forwarded-For", None),
# platform=request.headers.get("Origin", None),
# login_date=datetime.datetime.utcnow().__str__(),
# is_login=False,
# )
# )
# )
# UserLogger.log_error(
# str(
# dict(
# user_id=found_user.id,
# domain=data.domain,
# access_key=data.access_key,
# agent=request.headers.get("User-Agent", None),
# ip=getattr(request, "remote_addr", None)
# or request.headers.get("X-Forwarded-For", None),
# platform=request.headers.get("Origin", None),
# login_date=str(DateTimeLocal.now()),
# is_login=False,
# )
# )
# )
# UserLogger.log_error(
# str(
# dict(
# user_id=found_user.id,
# domain=data.domain,
# access_key="via_refresher",
# agent=request.headers.get("User-Agent", None),
# ip=getattr(request, "remote_addr", None)
# or request.headers.get("X-Forwarded-For", None),
# platform=request.headers.get("Origin", None),
# login_date=datetime.datetime.utcnow().__str__(),
# is_login=False,
# )
# )
# )
# UserLogger.log_error(
# str(
# dict(
# user_id=selected_user.id,
# domain=data.domain,
# access_key=token_user.get("access_input"),
# agent=request.headers.get("User-Agent", None),
# ip=getattr(request, "remote_addr", None)
# or request.headers.get("X-Forwarded-For", None),
# platform=request.headers.get("Origin", None),
# login_date=datetime.datetime.utcnow().__str__(),
# is_login=False,
# )
# )
# )