prod-wag-backend-automate-s.../ApiServices/AuthService/endpoints/auth/route.py

336 lines
9.1 KiB
Python

import uuid
from typing import Union
from fastapi import APIRouter, Request, status, Header
from fastapi.responses import JSONResponse
from ApiServices.AuthService.config import api_config
from ApiServices.AuthService.validations.request.authentication.login_post import (
RequestLogin,
RequestSelectLiving,
RequestSelectOccupant,
RequestCreatePassword,
RequestChangePassword,
RequestForgotPasswordPhone,
RequestForgotPasswordEmail,
)
auth_route = APIRouter(
prefix="/authentication",
tags=["Authentication Cluster"],
)
@auth_route.post(
path="/login",
summary="Login via domain and access key : [email] | [phone]",
description="Login Route",
)
def authentication_login_post(
request: Request,
data: RequestLogin,
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
):
"""
Authentication Login Route with Post Method
"""
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
"timezone": tz or "GMT+3",
}
if not domain or not language:
return JSONResponse(
content={"error": "EYS_0001"},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
headers=headers,
)
return JSONResponse(
content={**data.model_dump()},
status_code=status.HTTP_202_ACCEPTED,
headers=headers,
)
@auth_route.post(
path="/select",
summary="Select company or occupant type",
description="Selection of users company or occupant type",
)
def authentication_select_post(
request: Request,
data: Union[RequestSelectOccupant, RequestSelectLiving],
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
):
"""
Authentication Select Route with Post Method
"""
token = request.headers.get(api_config.ACCESS_TOKEN_TAG, None)
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
"token": token,
}
if not domain or not language:
return JSONResponse(
content={"error": "EYS_0001"},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
headers=headers,
)
return JSONResponse(
content=data.model_dump(),
status_code=status.HTTP_202_ACCEPTED,
headers=headers,
)
@auth_route.post(
path="/password/create",
summary="Create password with access token",
description="Create password",
)
def authentication_password_create_post(
request: Request,
data: RequestCreatePassword,
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
):
"""
Authentication create password Route with Post Method
"""
token = request.headers.get(api_config.ACCESS_TOKEN_TAG, None)
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
"token": token,
}
if not domain or not language:
return JSONResponse(
content={"error": "EYS_0001"},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
headers=headers,
)
return JSONResponse(
content={**data.model_dump()},
status_code=status.HTTP_202_ACCEPTED,
headers=headers,
)
@auth_route.post(
path="/password/change",
summary="Change password with access token",
description="Change password",
)
def authentication_password_change_post(
request: Request,
data: RequestChangePassword,
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
):
"""
Authentication change password Route with Post Method
"""
token = request.headers.get(api_config.ACCESS_TOKEN_TAG, None)
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
"token": token,
}
if not domain or not language:
return JSONResponse(
content={"error": "EYS_0001"},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
headers=headers,
)
return JSONResponse(
content={**data.model_dump()},
status_code=status.HTTP_202_ACCEPTED,
headers=headers,
)
@auth_route.post(
path="/password/reset",
summary="Reset password with access token",
description="Reset password",
)
def authentication_password_reset_post(
request: Request,
data: Union[RequestForgotPasswordEmail, RequestForgotPasswordPhone],
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
):
"""
Authentication reset password Route with Post Method
"""
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
}
if not domain or not language:
return JSONResponse(
content={"error": "EYS_0001"},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
headers=headers,
)
return JSONResponse(
content={**data.model_dump()},
status_code=status.HTTP_202_ACCEPTED,
headers=headers,
)
@auth_route.get(
path="/logout",
summary="Logout user",
description="Logout only single session of user which domain is provided",
)
def authentication_logout_post(
request: Request,
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
):
"""
Logout user from the system
"""
token = request.headers.get(api_config.ACCESS_TOKEN_TAG, None)
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
"token": token,
}
if not domain or not language:
return JSONResponse(
content={"error": "EYS_0003"},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
headers=headers,
)
return JSONResponse(
content={},
status_code=status.HTTP_202_ACCEPTED,
headers=headers,
)
@auth_route.get(
path="/disconnect",
summary="Disconnect all sessions",
description="Disconnect all sessions of user in access token",
)
def authentication_disconnect_post(
request: Request,
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
):
"""
Disconnect all sessions of user in access token
"""
token = request.headers.get(api_config.ACCESS_TOKEN_TAG, None)
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
"token": token,
}
if not domain or not language:
return JSONResponse(
content={"error": "EYS_0003"},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
headers=headers,
)
return JSONResponse(
content={},
status_code=status.HTTP_202_ACCEPTED,
headers=headers,
)
@auth_route.get(
path="/token/check",
summary="Check if token is valid",
description="Check if access token is valid for user",
)
def authentication_token_check_post(
request: Request,
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
):
"""
Check if access token is valid for user
"""
token = request.headers.get(api_config.ACCESS_TOKEN_TAG, None)
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
"token": token,
}
if not domain or not language:
return JSONResponse(
content={"error": "EYS_0003"},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
headers=headers,
)
return JSONResponse(
content={},
status_code=status.HTTP_202_ACCEPTED,
headers=headers,
)
@auth_route.get(
path="/token/refresh",
summary="Refresh if token is valid",
description="Refresh if access token is valid for user",
)
def authentication_token_refresh_post(
request: Request,
language: str = Header(None, alias="language"),
domain: str = Header(None, alias="domain"),
tz: str = Header(None, alias="timezone"),
):
"""
Refresh if access token is valid for user
"""
headers = {
"language": language or "",
"domain": domain or "",
"eys-ext": f"{str(uuid.uuid4())}",
}
if not domain or not language:
return JSONResponse(
content={"error": "EYS_0003"},
status_code=status.HTTP_406_NOT_ACCEPTABLE,
headers=headers,
)
return JSONResponse(
content={},
status_code=status.HTTP_202_ACCEPTED,
headers=headers,
)