from typing import Union from fastapi import APIRouter, Request, status, Header, Depends from fastapi.responses import JSONResponse from config import api_config from validations.request.auth.validations import ( RequestLogin, RequestResetPassword, RequestSelectLiving, RequestSelectEmployee, RequestCreatePassword, RequestChangePassword, RequestForgotPasswordPhone, RequestForgotPasswordEmail, RequestVerifyOTP, ) from events.auth.events import AuthHandlers from endpoints.index import endpoints_index from api_validations.defaults.validations import CommonHeaders from api_middlewares.token_provider import TokenProvider from events.auth.events import LoginHandler auth_route = APIRouter(prefix="/authentication", tags=["Authentication Cluster"]) auth_route_login = "AuthLoginViaDomainAndCreds" @auth_route.post( path="/login", summary="Login via domain and access key : [email] | [phone]", description="Login Route", operation_id=endpoints_index[auth_route_login] ) def login(data: RequestLogin, headers: CommonHeaders = Depends(CommonHeaders.as_dependency)): """Login via domain and access key : [email] | [phone]""" return AuthHandlers.LoginHandler.authentication_login_with_domain_and_creds(headers=headers, data=data) auth_route_select_living = "AuthSelectLiving" @auth_route.post( path="/select", summary="Select token object company or occupant type", description="Selection of users company or occupant type", operation_id=endpoints_index[auth_route_select_living] ) def select_living(data: Union[RequestSelectLiving, RequestSelectEmployee], headers: CommonHeaders = Depends(CommonHeaders.as_dependency)): """Select token object company or occupant type""" token_object = TokenProvider.get_dict_from_redis(token=headers.token) return AuthHandlers.LoginHandler.authentication_select_company_or_occupant_type(request=headers.request, data=data) auth_route_create_password = "AuthCreatePassword" @auth_route.post( path="/password/create", summary="Create password with access token", description="Create password", operation_id=endpoints_index[auth_route_create_password] ) def create_password(data: RequestCreatePassword, headers: CommonHeaders = Depends(CommonHeaders.as_dependency)): """Create password with access token""" # token_object = TokenProvider.get_dict_from_redis(token=headers.token) return AuthHandlers.PasswordHandler.create_password(password=data.password, password_token=data.password_token) auth_route_change_password = "AuthChangePassword" @auth_route.post( path="/password/change", summary="Change password with access token", description="Change password", operation_id=endpoints_index[auth_route_change_password] ) def change_password(data: RequestChangePassword, headers: CommonHeaders = Depends(CommonHeaders.as_dependency)): """Change password with access token""" token_object = TokenProvider.get_dict_from_redis(token=headers.token) return None auth_route_reset_password = "AuthResetPassword" @auth_route.post( path="/password/reset", summary="Reset password with access token", description="Reset password", operation_id=endpoints_index[auth_route_reset_password] ) def reset_password(data: RequestResetPassword, headers: CommonHeaders = Depends(CommonHeaders.as_dependency)): """Reset password with access token""" token_object = TokenProvider.get_dict_from_redis(token=headers.token) return None auth_route_logout = "AuthLogout" @auth_route.get( path="/logout", summary="Logout user", description="Logout only single session of user which domain is provided", operation_id=endpoints_index[auth_route_logout] ) def logout(headers: CommonHeaders = Depends(CommonHeaders.as_dependency)): """Logout user""" token_object = TokenProvider.get_dict_from_redis(token=headers.token) return None auth_route_disconnect = "AuthDisconnect" @auth_route.get( path="/disconnect", summary="Disconnect all sessions", description="Disconnect all sessions of user in access token", operation_id=endpoints_index[auth_route_disconnect] ) def disconnect(headers: CommonHeaders = Depends(CommonHeaders.as_dependency)): """Disconnect all sessions""" token_object = TokenProvider.get_dict_from_redis(token=headers.token) return None auth_route_check_token = "AuthCheckToken" @auth_route.get( path="/token/check", summary="Check if token is valid", description="Check if access token is valid for user", operation_id=endpoints_index[auth_route_check_token] ) def check_token(headers: CommonHeaders = Depends(CommonHeaders.as_dependency)): """Check if token is valid""" try: if token_object := LoginHandler.authentication_check_token_valid(access_token=headers.token, domain=headers.domain): return JSONResponse(status_code=status.HTTP_200_OK, content={"success": True}) except Exception as e: print(e) return JSONResponse(status_code=status.HTTP_401_UNAUTHORIZED, content={"success": False}) auth_route_refresh_token = "AuthRefreshToken" @auth_route.get( path="/token/refresh", summary="Refresh if token is valid", description="Refresh if access token is valid for user", operation_id=endpoints_index[auth_route_refresh_token] ) def refresh_token(headers: CommonHeaders = Depends(CommonHeaders.as_dependency)): """Refresh if token is valid""" token_object = TokenProvider.get_dict_from_redis(token=headers.token) return None auth_route_verify_otp = "AuthVerifyOTP" @auth_route.get( path="/password/verify-otp", summary="Verify OTP for password reset", description="Verify OTP for password reset", operation_id=endpoints_index[auth_route_verify_otp] ) def verify_otp(headers: CommonHeaders = Depends(CommonHeaders.as_dependency)): """Verify OTP for password reset""" token_object = TokenProvider.get_dict_from_redis(token=headers.token) return None