prod-wag-backend-automate-s.../ApiServices/AuthService/endpoints/auth/route.py

479 lines
14 KiB
Python

import uuid
from typing import Union
from fastapi import APIRouter, Request, status, Header
from fastapi.responses import JSONResponse
from ApiServices.AuthService.config import api_config
from ApiServices.AuthService.validations.request.authentication.login_post import (
RequestLogin,
RequestSelectLiving,
RequestSelectEmployee,
RequestCreatePassword,
RequestChangePassword,
RequestForgotPasswordPhone,
RequestForgotPasswordEmail,
RequestVerifyOTP,
RequestApplication,
)
from ApiServices.AuthService.events.auth.auth import AuthHandlers
auth_route = APIRouter(prefix="/authentication", tags=["Authentication Cluster"])
@auth_route.post(
path="/login",
summary="Login via domain and access key : [email] | [phone]",
description="Login Route",
)
def authentication_login_post(
request: Request,
data: RequestLogin,
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
):
"""
Authentication Login Route with Post Method
"""
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
"timezone": tz or "GMT+3",
}
if not domain or not language:
return JSONResponse(
content={"error": "EYS_0001"},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
headers=headers,
)
result = AuthHandlers.LoginHandler.authentication_login_with_domain_and_creds(
request=request,
data=data,
)
return JSONResponse(
content=result,
status_code=status.HTTP_202_ACCEPTED,
headers=headers,
)
@auth_route.post(
path="/select",
summary="Select company or occupant type",
description="Selection of users company or occupant type",
)
def authentication_select_post(
request: Request,
data: Union[RequestSelectEmployee, RequestSelectLiving],
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
):
"""
Authentication Select Route with Post Method
"""
token = request.headers.get(api_config.ACCESS_TOKEN_TAG, None)
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
"token": token,
}
if not domain or not language:
return JSONResponse(
content={"error": "EYS_0001"},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
headers=headers,
)
result = AuthHandlers.LoginHandler.authentication_select_company_or_occupant_type(
request=request,
data=data,
)
return JSONResponse(
content=result,
status_code=status.HTTP_202_ACCEPTED,
headers=headers,
)
@auth_route.post(
path="/password/create",
summary="Create password with access token",
description="Create password",
)
def authentication_password_create_post(
request: Request,
data: RequestCreatePassword,
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
):
"""
Authentication create password Route with Post Method
"""
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
}
result = AuthHandlers.PasswordHandler.create_password(
password=data.password,
password_token=data.password_token,
)
if not domain or not language:
return JSONResponse(
content={"error": "EYS_0001"},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
headers=headers,
)
return JSONResponse(
content={},
status_code=status.HTTP_202_ACCEPTED,
headers=headers,
)
@auth_route.post(
path="/password/change",
summary="Change password with access token",
description="Change password",
)
def authentication_password_change_post(
request: Request,
data: RequestChangePassword,
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
):
"""
Authentication change password Route with Post Method
"""
token = request.headers.get(api_config.ACCESS_TOKEN_TAG, None)
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
"token": token,
}
if not domain or not language:
return JSONResponse(
content={"error": "EYS_0001"},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
headers=headers,
)
return JSONResponse(
content={**data.model_dump()},
status_code=status.HTTP_202_ACCEPTED,
headers=headers,
)
@auth_route.post(
path="/password/reset",
summary="Reset password with access token",
description="Reset password",
)
def authentication_password_reset_post(
request: Request,
data: Union[RequestForgotPasswordEmail, RequestForgotPasswordPhone],
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
):
"""
Authentication reset password Route with Post Method
"""
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
}
if not domain or not language:
return JSONResponse(
content={"error": "EYS_0001"},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
headers=headers,
)
return JSONResponse(
content={**data.model_dump()},
status_code=status.HTTP_202_ACCEPTED,
headers=headers,
)
@auth_route.get(
path="/logout",
summary="Logout user",
description="Logout only single session of user which domain is provided",
)
def authentication_logout_post(
request: Request,
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
):
"""
Logout user from the system
"""
token = request.headers.get(api_config.ACCESS_TOKEN_TAG, None)
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
"token": token,
}
if not domain or not language:
return JSONResponse(
content={"error": "EYS_0003"},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
headers=headers,
)
return JSONResponse(
content={},
status_code=status.HTTP_202_ACCEPTED,
headers=headers,
)
@auth_route.get(
path="/disconnect",
summary="Disconnect all sessions",
description="Disconnect all sessions of user in access token",
)
def authentication_disconnect_post(
request: Request,
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
):
"""
Disconnect all sessions of user in access token
"""
token = request.headers.get(api_config.ACCESS_TOKEN_TAG, None)
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
"token": token,
}
if not domain or not language:
return JSONResponse(
content={"error": "EYS_0003"},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
headers=headers,
)
return JSONResponse(
content={},
status_code=status.HTTP_202_ACCEPTED,
headers=headers,
)
@auth_route.get(
path="/token/check",
summary="Check if token is valid",
description="Check if access token is valid for user",
)
def authentication_token_check_post(
request: Request,
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
):
"""
Check if access token is valid for user
"""
token = request.headers.get(api_config.ACCESS_TOKEN_TAG, None)
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
"token": token,
"tz": tz or "GMT+3",
}
if not domain or not language:
return JSONResponse(
content={"error": "EYS_0003"},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
headers=headers,
)
if AuthHandlers.LoginHandler.authentication_check_token_valid(access_token=token):
return JSONResponse(
content={"message": "MSG_0001"},
status_code=status.HTTP_202_ACCEPTED,
headers=headers,
)
return JSONResponse(
content={"error": "EYS_0033"},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
headers=headers,
)
@auth_route.get(
path="/token/refresh",
summary="Refresh if token is valid",
description="Refresh if access token is valid for user",
)
def authentication_token_refresh_post(
request: Request,
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
):
"""
Refresh if access token is valid for user
"""
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
}
if not domain or not language:
return JSONResponse(
content={"error": "EYS_0003"},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
headers=headers,
)
return JSONResponse(
content={},
status_code=status.HTTP_202_ACCEPTED,
headers=headers,
)
@auth_route.get(
path="/password/verify-otp",
summary="Verify OTP for password reset",
description="Verify OTP for password reset",
)
def authentication_password_verify_otp(
request: Request,
data: RequestVerifyOTP,
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
):
"""
Verify OTP for password reset
"""
token = request.headers.get(api_config.ACCESS_TOKEN_TAG, None)
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
"tz": tz or "GMT+3",
"token": token,
}
if not domain or not language:
return JSONResponse(
content={"error": "EYS_0003"},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
headers=headers,
)
return JSONResponse(
content={},
status_code=status.HTTP_202_ACCEPTED,
headers=headers,
)
@auth_route.post(
path="/page/valid",
summary="Verify if page is valid returns application available",
description="Verify if page is valid returns application available",
)
def authentication_page_valid(
request: Request,
data: RequestApplication,
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
):
"""
Verify if page is valid returns application that can user reach
page: { url = /building/create}
result: { "application": "4c11f5ef-0bbd-41ac-925e-f79d9aac2b0e" }
"""
token = request.headers.get(api_config.ACCESS_TOKEN_TAG, None)
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
"tz": tz or "GMT+3",
"token": token,
}
if not domain or not language:
return JSONResponse(
content={"error": "EYS_0003"},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
headers=headers,
)
result = AuthHandlers.PageHandlers.retrieve_valid_page_via_token(
page_url=data.page_url, access_token=token
)
if not result:
return JSONResponse(
content={"error": "EYS_0004"},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
headers=headers,
)
return JSONResponse(
content={"application": result},
status_code=status.HTTP_202_ACCEPTED,
headers=headers,
)
@auth_route.get(
path="/sites/list",
summary="Lists all sites that are available for user",
description="Lists all sites that are available for user",
)
def authentication_page_valid(
request: Request,
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
):
"""
Verify if page is valid returns application that can user reach
result: { "sites": ['/dashboard', '/building/create'] }
"""
token = request.headers.get(api_config.ACCESS_TOKEN_TAG, None)
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
"tz": tz or "GMT+3",
"token": token,
}
if not domain or not language:
return JSONResponse(
content={"error": "EYS_0003"},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
headers=headers,
)
result = AuthHandlers.PageHandlers.retrieve_valid_sites_via_token(
access_token=token
)
if not result:
return JSONResponse(
content={"error": "EYS_0004"},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
headers=headers,
)
return JSONResponse(
content={"sites": list(set(result))},
status_code=status.HTTP_202_ACCEPTED,
headers=headers,
)