wag-managment-api-service-v.../ApiServices/ValidationService/middlewares/token_middleware.py

117 lines
3.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
from time import perf_counter
from api_configs import Config
from fastapi import status
from fastapi.exceptions import HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
class MiddlewareLogs:
@staticmethod
def log_error(self, log_message):
print(log_message)
def log_middlewares_exception(endpoint, token_user, message, request):
MiddlewareLogs.log_error(
str(
{
"log_type": "Authentication",
"log_message": message,
"log_action": "User",
"log_data": json.dumps(
{
"endpoint": endpoint,
"user": str(token_user),
"request": str(request.headers),
}
),
}
)
)
class AuthHeaderMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
start_time, token_user, endpoint = perf_counter(), None, None
if check_if_path_secure(request=request, insecure_paths=Config.INSECURE_PATHS):
endpoint = str(getattr(getattr(request, "url", None), "path", None))
if un_auth := check_if_token_is_not_valid(
request=request, endpoint_name=endpoint
):
auth, token_user = un_auth
if not auth == "valid":
# log_middlewares_exception(endpoint, token_user, "auth", request)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail=auth
)
response = await call_next(request)
prepare_response_needs(response, start_time)
# if endpoint and token_user:
# log_middlewares_exception(endpoint, token_user, "Request is completed", request)
return response
def prepare_response_needs(response, start_time):
end_time = perf_counter()
response.headers["request-starts"], response.headers["request-ends"] = str(
start_time
), str(end_time)
response.headers["elapsed-Time"] = str(float(end_time) - float(start_time)) + " ms"
def check_if_path_secure(request, insecure_paths) -> bool:
return (
str(getattr(getattr(request, "url", None), "path", None)) not in insecure_paths
)
def check_if_token_is_not_valid(request, endpoint_name):
from api_services.redis.functions import get_object_via_access_key
token_user = get_object_via_access_key(request)
if not token_user:
return "Session geçerli değil. Lütfen tekrar giriş yapınız.", token_user
return "valid", token_user
# on_token_user: Users = Users.find_one(uu_id=token_user["uu_id"])
# on_token_people: People = on_token_user.person
# #
# # if on_token_people.priority == 78:
# # return "valid", token_user
#
# if not token_user.get("duty_id", None):
# return (
# "Kullanıcı hiçbir yetki tanımlanmamıştır. Supervisor ile iletişime geçiniz.",
# token_user,
# )
#
# if endpoint_name in release_endpoint:
# return "valid", token_user
#
# if company_duty_app := CompanyDutyApp.find_one(
# endpoint_name=str("".join(endpoint_name.split("/")[:-1])),
# company_duty_id=int(token_user.get("duty_id")),
# ):
# if not company_duty_app.is_access_valid(
# endpoint_ext=endpoint_name.split("/")[-1]
# ):
# return (
# "Kullanıcı yetkili değildir. Supervisor ile iletişime geçiniz.",
# token_user,
# )
# else:
# return (
# "Kullanıcıya yetki tanımlanmamıştır. Supervisor ile iletişime geçiniz.",
# token_user,
# )
# return "valid", token_user