141 lines
5.1 KiB
Python
141 lines
5.1 KiB
Python
from typing import Union
|
|
|
|
from fastapi.routing import APIRouter
|
|
from fastapi.requests import Request
|
|
|
|
from api_validations.validations_request import (
|
|
Login,
|
|
Logout,
|
|
ChangePassword,
|
|
Remember,
|
|
Forgot,
|
|
CreatePassword,
|
|
OccupantSelection,
|
|
EmployeeSelection,
|
|
)
|
|
|
|
from api_services.redis.auth_actions.token import parse_token_object_to_dict
|
|
from api_events.events import (
|
|
AuthenticationLoginEventMethod,
|
|
AuthenticationSelectEventMethod,
|
|
AuthenticationCheckTokenEventMethod,
|
|
AuthenticationRefreshEventMethod,
|
|
AuthenticationChangePasswordEventMethod,
|
|
AuthenticationCreatePasswordEventMethod,
|
|
AuthenticationDisconnectUserEventMethod,
|
|
AuthenticationLogoutEventMethod,
|
|
AuthenticationRefreshTokenEventMethod,
|
|
AuthenticationForgotPasswordEventMethod,
|
|
AuthenticationDownloadAvatarEventMethod,
|
|
)
|
|
|
|
|
|
login_route = APIRouter(prefix="/authentication", tags=["Authentication"])
|
|
login_route.include_router(login_route, include_in_schema=True)
|
|
|
|
|
|
@login_route.post(path="/select", summary="Select company or occupant type")
|
|
def authentication_select_company_or_occupant_type(
|
|
request: Request, data: Union[EmployeeSelection, OccupantSelection]
|
|
):
|
|
token_dict = parse_token_object_to_dict(request=request)
|
|
return AuthenticationSelectEventMethod.authentication_select_company_or_occupant_type(
|
|
data=data, request=request, token_dict=token_dict
|
|
)
|
|
|
|
|
|
@login_route.post(path="/login", summary="Login user with domain and password")
|
|
def authentication_login_with_domain_and_creds(request: Request, data: Login):
|
|
return AuthenticationLoginEventMethod.authentication_login_with_domain_and_creds(
|
|
request=request, data=data
|
|
)
|
|
|
|
|
|
@login_route.get(path="/valid", summary="Check access token is valid")
|
|
def authentication_check_token_is_valid(request: Request):
|
|
|
|
token_dict = parse_token_object_to_dict(request=request)
|
|
active_function = getattr(
|
|
AuthenticationCheckTokenEventMethod, "authentication_check_token_is_valid"
|
|
)
|
|
return active_function(request=request, token_dict=token_dict)
|
|
|
|
|
|
@login_route.get(path="/refresh", summary="Refresh credentials with access token")
|
|
def authentication_refresh_user_info(request: Request):
|
|
|
|
token_dict = parse_token_object_to_dict(request=request)
|
|
active_function = getattr(
|
|
AuthenticationRefreshEventMethod, "authentication_refresh_user_info"
|
|
)
|
|
return active_function(request=request, token_dict=token_dict)
|
|
|
|
|
|
@login_route.post(path="/change_password", summary="Change password with access token")
|
|
def authentication_change_password(request: Request, data: ChangePassword):
|
|
token_dict = parse_token_object_to_dict(request=request)
|
|
active_function = getattr(
|
|
AuthenticationChangePasswordEventMethod, "authentication_change_password"
|
|
)
|
|
return active_function(data=data, token_dict=token_dict)
|
|
|
|
|
|
@login_route.post(
|
|
path="/create_password", summary="Create password with password token"
|
|
)
|
|
def authentication_create_password(data: CreatePassword):
|
|
active_function = getattr(
|
|
AuthenticationCreatePasswordEventMethod, "authentication_create_password"
|
|
)
|
|
return active_function(data=data)
|
|
|
|
|
|
@login_route.post(path="/disconnect", summary="Disconnect user with access token")
|
|
def authentication_disconnect_user(request: Request, data: Logout):
|
|
|
|
token_dict = parse_token_object_to_dict(request=request)
|
|
active_function = getattr(
|
|
AuthenticationDisconnectUserEventMethod, "authentication_disconnect_user"
|
|
)
|
|
return active_function(data=data, request=request, token_dict=token_dict)
|
|
|
|
|
|
@login_route.post(path="/logout", summary="Logout user with access token")
|
|
def authentication_logout_user(request: Request, data: Logout):
|
|
|
|
token_dict = parse_token_object_to_dict(request=request)
|
|
active_function = getattr(
|
|
AuthenticationLogoutEventMethod, "authentication_logout_user"
|
|
)
|
|
return active_function(data=data, request=request, token_dict=token_dict)
|
|
|
|
|
|
@login_route.post(path="/refresher", summary="Refresh token with refresh token")
|
|
def authentication_refresher_token(request: Request, data: Remember):
|
|
|
|
token_dict = parse_token_object_to_dict(request=request)
|
|
active_function = getattr(
|
|
AuthenticationRefreshTokenEventMethod, "authentication_refresher_token"
|
|
)
|
|
return active_function(data=data, request=request, token_dict=token_dict)
|
|
|
|
|
|
@login_route.post(path="/forgot", summary="Forgot password with email or phone number")
|
|
def authentication_forgot_password(request: Request, data: Forgot):
|
|
|
|
token_dict = parse_token_object_to_dict(request=request)
|
|
active_function = getattr(
|
|
AuthenticationForgotPasswordEventMethod, "authentication_forgot_password"
|
|
)
|
|
return active_function(data=data, request=request, token_dict=token_dict)
|
|
|
|
|
|
@login_route.post(path="/avatar", summary="Get link of avatar with credentials")
|
|
def authentication_download_avatar(request: Request, data: Forgot):
|
|
|
|
token_dict = parse_token_object_to_dict(request=request)
|
|
active_function = getattr(
|
|
AuthenticationDownloadAvatarEventMethod, "authentication_download_avatar"
|
|
)
|
|
return active_function(data=data, request=request, token_dict=token_dict)
|