update events via wrapper routers

This commit is contained in:
2025-01-16 19:32:59 +03:00
parent 049a7c1e11
commit 426b69b33c
42 changed files with 2344 additions and 460 deletions

View File

@@ -0,0 +1,5 @@
"""Auth Service API initialization"""
from .route_configs import get_route_configs
__all__ = ["get_route_configs"]

View File

@@ -0,0 +1,977 @@
"""
Authentication related API endpoints.
"""
from typing import TYPE_CHECKING, Union, Optional, Dict, Any
# Regular imports (non-TYPE_CHECKING)
from ApiEvents.abstract_class import (
MethodToEvent,
RouteFactoryConfig,
EndpointFactoryConfig,
)
from ApiEvents.base_request_model import (
BaseRequestModel,
DictRequestModel,
SuccessResponse,
)
if TYPE_CHECKING:
from fastapi import Request
from ApiServices.Token.token_handler import OccupantTokenObject, EmployeeTokenObject
from Schemas import (
Login,
ChangePassword,
CreatePassword,
Forgot,
Logout,
Remember,
EmployeeSelection,
OccupantSelection,
)
from ApiLibrary.common.line_number import get_line_number_for_error
from ApiServices.Login.user_login_handler import UserLoginModule
from ApiValidations.Request import (
Login,
Logout,
Remember,
Forgot,
ChangePassword,
CreatePassword,
EmployeeSelection,
OccupantSelection,
)
from Services.PostgresDb.Models.alchemy_response import AlchemyJsonResponse
from ApiValidations.Response import AccountRecordResponse
from ApiServices.Token.token_handler import AccessToken, TokenService
from ErrorHandlers import HTTPExceptionApi
# Type aliases for common types
TokenDictType = Union["EmployeeTokenObject", "OccupantTokenObject"]
class LoginRequestModel(BaseRequestModel[Login]):
"""Request model for login endpoint."""
pass
class LogoutRequestModel(BaseRequestModel[Logout]):
"""Request model for logout endpoint."""
pass
class RememberRequestModel(BaseRequestModel[Remember]):
"""Request model for remember token endpoint."""
pass
class ForgotRequestModel(BaseRequestModel[Forgot]):
"""Request model for forgot password endpoint."""
pass
class ChangePasswordRequestModel(BaseRequestModel[ChangePassword]):
"""Request model for change password endpoint."""
pass
class CreatePasswordRequestModel(BaseRequestModel[CreatePassword]):
"""Request model for create password endpoint."""
pass
class EmployeeSelectionRequestModel(BaseRequestModel[EmployeeSelection]):
"""Request model for employee selection endpoint."""
pass
class OccupantSelectionRequestModel(BaseRequestModel[OccupantSelection]):
"""Request model for occupant selection endpoint."""
pass
class AuthenticationLoginEventMethods(MethodToEvent):
event_type = "LOGIN"
event_description = "Login via domain and access key : [email] | [phone]"
event_category = "AUTHENTICATION"
__event_keys__ = {
"e672846d-cc45-4d97-85d5-6f96747fac67": "authentication_login_with_domain_and_creds",
}
__event_validation__ = {
"e672846d-cc45-4d97-85d5-6f96747fac67": SuccessResponse,
}
@classmethod
async def authentication_login_with_domain_and_creds(
cls, request: "Request", data: LoginRequestModel
):
"""
Authenticate user with domain and credentials.
Args:
request: FastAPI request object
data: Request body containing login credentials
{
"domain": "evyos.com.tr",
"access_key": "karatay.berkay.sup@evyos.com.tr",
"password": "string",
"remember_me": false
}
Returns:
SuccessResponse containing authentication token and user info
"""
# Create login module instance
login_module = UserLoginModule(request=request)
# Get token from login module
token = await login_module.login_user_via_credentials(access_data=data)
# Return response with token and headers
return {
"token": token,
"headers": dict(request.headers),
}
class AuthenticationSelectEventMethods(MethodToEvent):
event_type = "LOGIN"
event_description = "Select Employee Duty or Occupant Type"
event_category = "AUTHENTICATION"
__event_keys__ = {
"cee96b9b-8487-4e9f-aaed-2e8c79687bf9": "authentication_select_company_or_occupant_type",
}
# __event_validation__ = {
# "cee96b9b-8487-4e9f-aaed-2e8c79687bf9": "authentication_select_company_or_occupant_type",
# }
@classmethod
def _handle_employee_selection(
cls,
data: EmployeeSelectionRequestModel,
token_dict: TokenDictType,
request: "Request",
):
return
# """Handle employee company selection"""
# Users.client_arrow = DateTimeLocal(is_client=True, timezone=token_dict.timezone)
# if data.company_uu_id not in token_dict.companies_uu_id_list:
# return ResponseHandler.unauthorized(
# "Company not found in user's company list"
# )
# selected_company = Companies.filter_one(
# Companies.uu_id == data.company_uu_id
# ).data
# if not selected_company:
# return ResponseHandler.not_found("Company not found")
# # Get department IDs for the company
# department_ids = [
# dept.id
# for dept in Departments.filter_all(
# Departments.company_id == selected_company.id
# ).data
# ]
# # Get duties IDs for the company
# duties_ids = [
# duty.id
# for duty in Duties.filter_all(Duties.company_id == selected_company.id).data
# ]
# # Get staff IDs
# staff_ids = [
# staff.id for staff in Staff.filter_all(Staff.duties_id.in_(duties_ids)).data
# ]
# # Get employee
# employee = Employees.filter_one(
# Employees.people_id == token_dict.person_id,
# Employees.staff_id.in_(staff_ids),
# ).data
# if not employee:
# return ResponseHandler.not_found("Employee not found")
# # Get reachable events
# reachable_event_list_id = Event2Employee.get_event_id_by_employee_id(
# employee_id=employee.id
# )
# # Get staff and duties
# staff = Staff.filter_one(Staff.id == employee.staff_id).data
# duties = Duties.filter_one(Duties.id == staff.duties_id).data
# department = Departments.filter_one(Departments.id == duties.department_id).data
# # Get bulk duty
# bulk_id = Duty.filter_by_one(system=True, duty_code="BULK").data
# bulk_duty_id = Duties.filter_by_one(
# company_id=selected_company.id,
# duties_id=bulk_id.id,
# **Duties.valid_record_dict,
# ).data
# # Create company token
# company_token = CompanyToken(
# company_uu_id=selected_company.uu_id.__str__(),
# company_id=selected_company.id,
# department_id=department.id,
# department_uu_id=department.uu_id.__str__(),
# duty_id=duties.id,
# duty_uu_id=duties.uu_id.__str__(),
# bulk_duties_id=bulk_duty_id.id,
# staff_id=staff.id,
# staff_uu_id=staff.uu_id.__str__(),
# employee_id=employee.id,
# employee_uu_id=employee.uu_id.__str__(),
# reachable_event_list_id=reachable_event_list_id,
# )
# # Update Redis
# AuthActions.update_selected_to_redis(request=request, add_payload=company_token)
# return ResponseHandler.success("Company selected successfully")
@classmethod
def _handle_occupant_selection(
cls,
data: OccupantSelectionRequestModel,
token_dict: TokenDictType,
request: "Request",
):
"""Handle selection of company or occupant type"""
try:
if isinstance(token_dict, EmployeeTokenObject):
return cls._handle_employee_selection(data, token_dict, request)
elif isinstance(token_dict, OccupantTokenObject):
return cls._handle_occupant_selection(data, token_dict, request)
raise HTTPExceptionApi(
error_code="HTTP_400_BAD_REQUEST",
lang=token_dict.lang,
loc=get_line_number_for_error(),
)
except Exception as e:
return ResponseHandler.error(
str(e), status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
)
@classmethod
def authentication_select_company_or_occupant_type(
cls,
request: "Request",
data: Union[EmployeeSelectionRequestModel, OccupantSelectionRequestModel],
token_dict: TokenDictType,
):
"""Handle selection of company or occupant type"""
# try:
# if isinstance(token_dict, EmployeeTokenObject):
# return cls._handle_employee_selection(data, token_dict, request)
# elif isinstance(token_dict, OccupantTokenObject):
# return cls._handle_occupant_selection(data, token_dict, request)
# return ResponseHandler.error(
# "Invalid token type", status_code=status.HTTP_400_BAD_REQUEST
# )
# except Exception as e:
# return ResponseHandler.error(
# str(e), status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
# )
return
class AuthenticationCheckTokenEventMethods(MethodToEvent):
event_type = "LOGIN"
event_description = "Check Token is valid for user"
event_category = "AUTHENTICATION"
__event_keys__ = {
"73d77e45-a33f-4f12-909e-3b56f00d8a12": "authentication_check_token_is_valid",
}
# __event_validation__ = {
# "73d77e45-a33f-4f12-909e-3b56f00d8a12": "authentication_check_token_is_valid",
# }
@classmethod
def authentication_check_token_is_valid(
cls, request: "Request", data: DictRequestModel
):
# try:
# if RedisActions.get_object_via_access_key(request=request):
# return ResponseHandler.success("Access Token is valid")
# except HTTPException:
# return ResponseHandler.unauthorized("Access Token is NOT valid")
return
class AuthenticationRefreshEventMethods(MethodToEvent):
event_type = "LOGIN"
event_description = "Refresh user info using access token"
event_category = "AUTHENTICATION"
__event_keys__ = {
"48379bb2-ba81-4d8e-a9dd-58837cfcbf67": "authentication_refresh_user_info",
}
# __event_validation__ = {
# "48379bb2-ba81-4d8e-a9dd-58837cfcbf67": AuthenticationRefreshResponse,
# }
@classmethod
def authentication_refresh_user_info(
cls,
request: "Request",
token_dict: TokenDictType,
data: DictRequestModel,
):
# try:
# access_token = request.headers.get(Auth.ACCESS_TOKEN_TAG)
# if not access_token:
# return ResponseHandler.unauthorized()
# # Get user and token info
# found_user = Users.filter_one(Users.uu_id == token_dict.user_uu_id).data
# if not found_user:
# return ResponseHandler.not_found("User not found")
# user_token = UsersTokens.filter_one(
# UsersTokens.domain == found_user.domain_name,
# UsersTokens.user_id == found_user.id,
# UsersTokens.token_type == "RememberMe",
# ).data
# response_data = {
# "access_token": access_token,
# "refresh_token": getattr(user_token, "token", None),
# "user": found_user.get_dict(),
# }
# return ResponseHandler.success(
# "User info refreshed successfully",
# data=response_data,
# )
# except Exception as e:
# return ResponseHandler.error(str(e))
return
class AuthenticationChangePasswordEventMethods(MethodToEvent):
event_type = "LOGIN"
event_description = "Change password with access token"
event_category = "AUTHENTICATION"
__event_keys__ = {
"f09f7c1a-bee6-4e32-8444-962ec8f39091": "authentication_change_password",
}
# __event_validation__ = {
# "f09f7c1a-bee6-4e32-8444-962ec8f39091": "authentication_change_password",
# }
@classmethod
def authentication_change_password(
cls,
request: "Request",
data: ChangePasswordRequestModel,
token_dict: TokenDictType,
):
# try:
# if not isinstance(token_dict, EmployeeTokenObject):
# return ResponseHandler.unauthorized(
# "Only employees can change password"
# )
# found_user = Users.filter_one(Users.uu_id == token_dict.user_uu_id).data
# if not found_user:
# return ResponseHandler.not_found("User not found")
# if not found_user.check_password(data.old_password):
# # UserLogger.log_password_change(
# # request,
# # found_user.id,
# # "change",
# # success=False,
# # error="Invalid old password",
# # )
# return ResponseHandler.unauthorized("Old password is incorrect")
# found_user.set_password(data.new_password)
# # UserLogger.log_password_change(
# # request, found_user.id, "change", success=True
# # )
# return ResponseHandler.success("Password changed successfully")
# except Exception as e:
# # UserLogger.log_password_change(
# # request,
# # found_user.id if found_user else None,
# # "change",
# # success=False,
# # error=str(e),
# # )
# return ResponseHandler.error(str(e))
return
class AuthenticationCreatePasswordEventMethods(MethodToEvent):
event_type = "LOGIN"
event_description = "Create password with password reset token requested via email"
event_category = "AUTHENTICATION"
__event_keys__ = {
"c519f9af-92e1-47b2-abf7-5a3316d075f7": "authentication_create_password",
}
# __event_validation__ = {
# "c519f9af-92e1-47b2-abf7-5a3316d075f7": "authentication_create_password",
# }
@classmethod
def authentication_create_password(
cls, request: "Request", data: CreatePasswordRequestModel
):
# if not data.re_password == data.password:
# raise HTTPException(
# status_code=status.HTTP_406_NOT_ACCEPTABLE, detail="Password must match"
# )
# if found_user := Users.filter_one(
# Users.password_token == data.password_token
# ).data:
# found_user: Users = found_user
# found_user.create_password(found_user=found_user, password=data.password)
# found_user.password_token = ""
# found_user.save()
# # send_email_completed = send_email(
# # subject=f"Dear {found_user.user_tag}, your password has been changed.",
# # receivers=[str(found_user.email)],
# # html=password_is_changed_template(user_name=found_user.user_tag),
# # )
# # if not send_email_completed:
# # raise HTTPException(
# # status_code=400, detail="Email can not be sent. Try again later"
# # )
# return ResponseHandler.success(
# "Password is created successfully",
# data=found_user.get_dict(),
# )
# return ResponseHandler.not_found("Record not found")
return
class AuthenticationDisconnectUserEventMethods(MethodToEvent):
event_type = "LOGIN"
event_description = "Disconnect all sessions of user in access token"
event_category = "AUTHENTICATION"
__event_keys__ = {
"8b586848-2fb3-4161-abbe-642157eec7ce": "authentication_disconnect_user",
}
# __event_validation__ = {
# "8b586848-2fb3-4161-abbe-642157eec7ce": "authentication_disconnect_user",
# }
@classmethod
def authentication_disconnect_user(
cls, request: "Request", data: LogoutRequestModel, token_dict: TokenDictType
):
# found_user = Users.filter_one(Users.uu_id == token_dict.user_uu_id).data
# if not found_user:
# return ResponseHandler.not_found("User not found")
# if already_tokens := RedisActions.get_object_via_user_uu_id(
# user_id=str(found_user.uu_id)
# ):
# for key, token_user in already_tokens.items():
# RedisActions.delete(key)
# selected_user = Users.filter_one(
# Users.uu_id == token_user.get("uu_id"),
# ).data
# selected_user.remove_refresher_token(
# domain=data.domain, disconnect=True
# )
# return ResponseHandler.success(
# "All sessions are disconnected",
# data=selected_user.get_dict(),
# )
# return ResponseHandler.not_found("Invalid data")
return
class AuthenticationLogoutEventMethods(MethodToEvent):
event_type = "LOGIN"
event_description = "Logout only single session of user which domain is provided"
event_category = "AUTHENTICATION"
__event_keys__ = {
"5cc22e4e-a0f7-4077-be41-1871feb3dfd1": "authentication_logout_user",
}
# __event_validation__ = {
# "5cc22e4e-a0f7-4077-be41-1871feb3dfd1": "authentication_logout_user",
# }
@classmethod
def authentication_logout_user(
cls,
request: "Request",
data: LogoutRequestModel,
token_dict: TokenDictType = None,
):
# token_user = None
# if already_tokens := RedisActions.get_object_via_access_key(request=request):
# for key in already_tokens:
# token_user = RedisActions.get_json(key)
# if token_user.get("domain") == data.domain:
# RedisActions.delete(key)
# selected_user = Users.filter_one(
# Users.uu_id == token_user.get("uu_id"),
# ).data
# selected_user.remove_refresher_token(domain=data.domain)
# return ResponseHandler.success(
# "Session is logged out",
# data=token_user,
# )
# return ResponseHandler.not_found("Logout is not successfully completed")
return
class AuthenticationRefreshTokenEventMethods(MethodToEvent):
event_type = "LOGIN"
event_description = "Refresh access token with refresher token"
event_category = "AUTHENTICATION"
__event_keys__ = {
"c90f3334-10c9-4181-b5ff-90d98a0287b2": "authentication_refresher_token",
}
# __event_validation__ = {
# "c90f3334-10c9-4181-b5ff-90d98a0287b2": AuthenticationRefreshResponse,
# }
@classmethod
def authentication_refresher_token(
cls, request: "Request", data: RememberRequestModel, token_dict: TokenDictType
):
# token_refresher = UsersTokens.filter_by_one(
# token=data.refresh_token,
# domain=data.domain,
# **UsersTokens.valid_record_dict,
# ).data
# if not token_refresher:
# return ResponseHandler.not_found("Invalid data")
# if found_user := Users.filter_one(
# Users.id == token_refresher.user_id,
# ).data:
# found_user: Users = found_user
# access_key = AuthActions.save_access_token_to_redis(
# request=request, found_user=found_user, domain=data.domain
# )
# found_user.last_agent = request.headers.get("User-Agent", None)
# found_user.last_platform = request.headers.get("Origin", None)
# found_user.last_remote_addr = getattr(
# request, "remote_addr", None
# ) or request.headers.get("X-Forwarded-For", None)
# found_user.last_seen = str(system_arrow.now())
# response_data = {
# "access_token": access_key,
# "refresh_token": data.refresh_token,
# }
# return ResponseHandler.success(
# "User is logged in successfully via refresher token",
# data=response_data,
# )
# return ResponseHandler.not_found("Invalid data")
return
class AuthenticationForgotPasswordEventMethods(MethodToEvent):
event_type = "LOGIN"
event_description = "Send an email to user for a valid password reset token"
event_category = "AUTHENTICATION"
__event_keys__ = {
"e3ca6e24-b9f8-4127-949c-3bfa364e3513": "authentication_forgot_password",
}
# __event_validation__ = {
# "e3ca6e24-b9f8-4127-949c-3bfa364e3513": "authentication_forgot_password",
# }
@classmethod
def authentication_forgot_password(
cls,
request: "Request",
data: ForgotRequestModel,
):
# found_user: Users = Users.check_user_exits(
# access_key=data.access_key, domain=data.domain
# )
# forgot_key = AuthActions.save_access_token_to_redis(
# request=request, found_user=found_user, domain=data.domain
# )
# forgot_link = ApiStatic.forgot_link(forgot_key=forgot_key)
# send_email_completed = send_email(
# subject=f"Dear {found_user.user_tag}, your forgot password link has been sent.",
# receivers=[str(found_user.email)],
# html=change_your_password_template(
# user_name=found_user.user_tag, forgot_link=forgot_link
# ),
# )
# if not send_email_completed:
# raise HTTPException(
# status_code=400, detail="Email can not be sent. Try again later"
# )
# found_user.password_token = forgot_key
# found_user.password_token_is_valid = str(system_arrow.shift(days=1))
# found_user.save()
# return ResponseHandler.success(
# "Password is change link is sent to your email or phone",
# data={},
# )
return
class AuthenticationResetPasswordEventMethods(MethodToEvent):
event_type = "UPDATE"
__event_keys__ = {
"af9e121e-24bb-44ac-a616-471d5754360e": "authentication_reset_password",
}
@classmethod
def authentication_reset_password(
cls, request: "Request", data: ForgotRequestModel
):
# from sqlalchemy import or_
# found_user = Users.query.filter(
# or_(
# Users.email == str(data.access_key).lower(),
# Users.phone_number == str(data.access_key).replace(" ", ""),
# ),
# ).first()
# if not found_user:
# raise HTTPException(
# status_code=status.HTTP_400_BAD_REQUEST,
# detail="Given access key or domain is not matching with the any user record.",
# )
# reset_password_token = found_user.reset_password_token(found_user=found_user)
# send_email_completed = send_email(
# subject=f"Dear {found_user.user_tag}, a password reset request has been received.",
# receivers=[str(found_user.email)],
# html=change_your_password_template(
# user_name=found_user.user_tag,
# forgot_link=ApiStatic.forgot_link(forgot_key=reset_password_token),
# ),
# )
# if not send_email_completed:
# raise found_user.raise_http_exception(
# status_code=400, message="Email can not be sent. Try again later"
# )
# return ResponseHandler.success(
# "Password change link is sent to your email or phone",
# data=found_user.get_dict(),
# )
return
class AuthenticationDownloadAvatarEventMethods(MethodToEvent):
event_type = "LOGIN"
event_description = "Download avatar icon and profile info of user"
event_category = "AUTHENTICATION"
__event_keys__ = {
"c140cd5f-307f-4046-a93e-3ade032a57a7": "authentication_download_avatar",
}
# __event_validation__ = {
# "c140cd5f-307f-4046-a93e-3ade032a57a7": AuthenticationUserInfoResponse,
# }
@classmethod
def authentication_download_avatar(
cls,
token_dict: TokenDictType,
request: "Request",
data: DictRequestModel,
):
# if found_user := Users.filter_one(Users.id == token_dict.user_id).data:
# expired_starts = str(
# system_arrow.now() - system_arrow.get(str(found_user.expiry_ends))
# )
# expired_int = (
# system_arrow.now() - system_arrow.get(str(found_user.expiry_ends))
# ).days
# user_info = {
# "lang": token_dict.lang,
# "full_name": found_user.person.full_name,
# "avatar": found_user.avatar,
# "remember_me": found_user.remember_me,
# "expiry_ends": str(found_user.expiry_ends),
# "expired_str": expired_starts,
# "expired_int": int(expired_int),
# }
# return ResponseHandler.success(
# "Avatar and profile is shared via user credentials",
# data=user_info,
# )
# return ResponseHandler.not_found("Invalid data")
return
async def authentication_select_company_or_occupant_type(
request: "Request",
data: Union[EmployeeSelectionRequestModel, OccupantSelectionRequestModel],
) -> Dict[str, Any]:
return await AuthenticationSelectEventMethods.authentication_select_company_or_occupant_type(
request=request, data=data
)
async def authentication_login_with_domain_and_creds(
request: "Request", data: LoginRequestModel
) -> SuccessResponse:
"""
Authenticate user with domain and credentials.
Args:
request: FastAPI request object
data: Request body containing login credentials
{
"domain": str,
"username": str,
"password": str
}
Returns:
SuccessResponse containing authentication token and user info
"""
return await AuthenticationLoginEventMethods.authentication_login_with_domain_and_creds(
request=request, data=data
)
async def authentication_check_token_is_valid(
request: "Request", data: DictRequestModel
) -> Dict[str, Any]:
return (
await AuthenticationCheckTokenEventMethods.authentication_check_token_is_valid(
request=request, data=data
)
)
def authentication_refresh_user_info(
request: "Request", data: DictRequestModel
) -> Dict[str, Any]:
return
def authentication_change_password(
request: "Request", data: ChangePasswordRequestModel
) -> Dict[str, Any]:
return
def authentication_create_password(
request: "Request", data: CreatePasswordRequestModel
) -> Dict[str, Any]:
return
def authentication_forgot_password(
request: "Request", data: ForgotRequestModel
) -> Dict[str, Any]:
return
def authentication_reset_password(
request: "Request", data: ForgotRequestModel
) -> Dict[str, Any]:
return
def authentication_disconnect_user(
request: "Request", data: LogoutRequestModel
) -> Dict[str, Any]:
return
def authentication_logout_user(
request: "Request", data: LogoutRequestModel
) -> Dict[str, Any]:
return
def authentication_refresher_token(
request: "Request", data: RememberRequestModel
) -> Dict[str, Any]:
return
def authentication_download_avatar(
request: "Request", data: DictRequestModel
) -> Dict[str, Any]:
return
AUTH_CONFIG = RouteFactoryConfig(
name="authentication",
prefix="/authentication",
tags=["Authentication"],
include_in_schema=True,
endpoints=[
EndpointFactoryConfig(
url_prefix="/authentication",
url_endpoint="/select",
url_of_endpoint="/authentication/select",
endpoint="/select",
method="POST",
summary="Select company or occupant type",
description="Select company or occupant type",
is_auth_required=True, # Needs token_dict
is_event_required=False,
endpoint_function=authentication_select_company_or_occupant_type,
),
EndpointFactoryConfig(
url_prefix="/authentication",
url_endpoint="/login",
url_of_endpoint="/authentication/login",
endpoint="/login",
method="POST",
summary="Login user with domain and password",
description="Login user with domain and password",
is_auth_required=False, # Public endpoint
is_event_required=False,
endpoint_function=authentication_login_with_domain_and_creds,
),
EndpointFactoryConfig(
url_prefix="/authentication",
url_endpoint="/valid",
url_of_endpoint="/authentication/valid",
endpoint="/valid",
method="GET",
summary="Check access token is valid",
description="Check access token is valid",
is_auth_required=True, # Needs token validation
is_event_required=False,
endpoint_function=authentication_check_token_is_valid,
),
EndpointFactoryConfig(
url_prefix="/authentication",
url_endpoint="/refresh",
url_of_endpoint="/authentication/refresh",
endpoint="/refresh",
method="GET",
summary="Refresh credentials with access token",
description="Refresh credentials with access token",
is_auth_required=True, # Needs token_dict
is_event_required=False,
endpoint_function=authentication_refresh_user_info,
),
EndpointFactoryConfig(
url_prefix="/authentication",
url_endpoint="/change_password",
url_of_endpoint="/authentication/change_password",
endpoint="/change_password",
method="POST",
summary="Change password with access token",
description="Change password with access token",
is_auth_required=True, # Needs token_dict
is_event_required=False,
endpoint_function=authentication_change_password,
),
EndpointFactoryConfig(
url_prefix="/authentication",
url_endpoint="/create_password",
url_of_endpoint="/authentication/create_password",
endpoint="/create_password",
method="POST",
summary="Create password with password token",
description="Create password with password token",
is_auth_required=False, # Public endpoint
is_event_required=False,
endpoint_function=authentication_create_password,
),
EndpointFactoryConfig(
url_prefix="/authentication",
url_endpoint="/reset_password",
url_of_endpoint="/authentication/reset_password",
endpoint="/reset_password",
method="POST",
summary="Create password with password token",
description="Create password with password token",
is_auth_required=False, # Public endpoint
is_event_required=False,
endpoint_function=authentication_reset_password,
),
EndpointFactoryConfig(
url_prefix="/authentication",
url_endpoint="/disconnect",
url_of_endpoint="/authentication/disconnect",
endpoint="/disconnect",
method="POST",
summary="Disconnect user with access token",
description="Disconnect user with access token",
is_auth_required=True, # Needs token_dict
is_event_required=False,
endpoint_function=authentication_disconnect_user,
),
EndpointFactoryConfig(
url_prefix="/authentication",
url_endpoint="/logout",
url_of_endpoint="/authentication/logout",
endpoint="/logout",
method="POST",
summary="Logout user with access token",
description="Logout user with access token",
is_auth_required=True, # Needs token_dict
is_event_required=False,
endpoint_function=authentication_logout_user,
),
EndpointFactoryConfig(
url_prefix="/authentication",
url_endpoint="/refresher",
url_of_endpoint="/authentication/refresher",
endpoint="/refresher",
method="POST",
summary="Refresh token with refresh token",
description="Refresh token with refresh token",
is_auth_required=True, # Needs token_dict
is_event_required=False,
endpoint_function=authentication_refresher_token,
),
EndpointFactoryConfig(
url_prefix="/authentication",
url_endpoint="/forgot",
url_of_endpoint="/authentication/forgot",
endpoint="/forgot",
method="POST",
summary="Forgot password with email or phone number",
description="Forgot password with email or phone number",
is_auth_required=False, # Public endpoint
is_event_required=False,
endpoint_function=authentication_forgot_password,
),
EndpointFactoryConfig(
url_prefix="/authentication",
url_endpoint="/avatar",
url_of_endpoint="/authentication/avatar",
endpoint="/avatar",
method="POST",
summary="Get link of avatar with credentials",
description="Get link of avatar with credentials",
is_auth_required=True, # Needs token_dict
is_event_required=False,
endpoint_function=authentication_download_avatar,
),
],
).as_dict()

View File

@@ -0,0 +1,20 @@
"""
Route configuration registry.
This module collects and registers all route configurations from different modules
to be used by the dynamic route creation system.
"""
from typing import Dict, List, Any
from .auth.auth import AUTH_CONFIG
# Registry of all route configurations
ROUTE_CONFIGS = [
AUTH_CONFIG,
]
def get_route_configs() -> List[Dict[str, Any]]:
"""Get all registered route configurations."""
return [AUTH_CONFIG]