events and auth updated

This commit is contained in:
berkay 2024-11-13 13:11:56 +03:00
parent 5fa183c12e
commit 934c7bc845
7 changed files with 29 additions and 26 deletions

View File

@ -19,6 +19,7 @@ class Config:
"/test/create/test/gateway",
"/test/create/test/company",
"/infos/current_date",
"/authentication/select",
"/authentication/login",
"/authentication/logout",
"/authentication/refresher",

View File

@ -102,13 +102,13 @@ class AuthenticationSelectEventMethods(MethodToEvent):
def authentication_select_company_or_occupant_type(
cls,
request: Request,
data,
data: Union[EmployeeSelection, OccupantSelection],
token_dict: Union[EmployeeTokenObject, OccupantTokenObject]
):
from api_objects.auth.token_objects import OccupantToken, CompanyToken
from api_objects import OccupantToken, CompanyToken
token_user = get_object_via_access_key(request=request)
if token_user.user_type == 1:
if data.company_uu_id not in token_user.companies_uu_id_list:
if token_dict.user_type == 1:
if data.company_uu_id not in token_dict.companies_uu_id_list:
return JSONResponse(
content={
"completed": False,
@ -143,11 +143,10 @@ class AuthenticationSelectEventMethods(MethodToEvent):
).data
]
employee = Employees.filter_one(
Employees.people_id == token_user.person_id,
Employees.people_id == token_dict.person_id,
Employees.staff_id.in_(staff_ids),
*Employees.valid_record_args(Employees),
).data
reachable_event_list_id, reachable_event_list_uu_id = (
Event2Employee.get_event_id_by_employee_id(employee_id=employee.id)
)
@ -163,7 +162,7 @@ class AuthenticationSelectEventMethods(MethodToEvent):
Departments.id == duties.department_id,
).data
bulk_id = Duty.filter_by_one(
duty_code="BULK", **Duty.valid_record_dict
system=True, duty_code="BULK"
).data
bulk_duty_id = Duties.filter_by_one(
company_id=selected_company.id,
@ -195,7 +194,7 @@ class AuthenticationSelectEventMethods(MethodToEvent):
},
status_code=status.HTTP_200_OK,
)
elif token_user.user_type == 2:
elif token_dict.user_type == 2:
occupant_type = OccupantTypes.filter_by_one(
system=True, uu_id=data.occupant_uu_id
).data
@ -232,7 +231,7 @@ class AuthenticationSelectEventMethods(MethodToEvent):
).data
if selected_occupant_type := BuildLivingSpace.filter_one(
BuildLivingSpace.occupant_type == occupant_type.id,
BuildLivingSpace.person_id == token_user.person_id,
BuildLivingSpace.person_id == token_dict.person_id,
BuildLivingSpace.build_parts_id == build_part.id,
*BuildLivingSpace.valid_record_args(BuildLivingSpace),
).data:

View File

@ -77,6 +77,12 @@ class SuperUserEventBlock(AddEventFunctionality):
{"function_code": "7b58ed84-9a65-4588-994d-30df8366b050"},
{"function_code": "5702f0a9-fe8f-4aae-922e-6e04b497ef6a"},
{"function_code": "c93a3009-65a0-498d-9191-04484d5cde81"},
{"function_code": "6798414c-6c7d-47f0-9d8b-6935a0f51c2e"},
{"function_code": "57edc8bf-8f29-4e75-b5e1-9ca0139a3fda"},
{"function_code": "b18e8e37-a62b-4a84-9972-ba17121ed393"},
{"function_code": "0bb51845-65a2-4340-8872-a3b5aad95468"},
{"function_code": "a10571fa-ac1d-4546-9272-cacb911d8004"},
{"function_code": "58178738-7489-4f8f-954e-5c8f083c1845"},
]
def __new__(cls, *args, **kwargs):

View File

@ -6,9 +6,12 @@ def parse_token_object_to_dict(request): # from requests import Request
from api_services.redis.functions import get_object_via_access_key
from databases import EndpointRestriction, Events
from api_configs.configs import Config
if valid_token := get_object_via_access_key(request=request):
endpoint_name = str(request.url).replace(str(request.base_url), "/")
if str(endpoint_name) in Config.INSECURE_PATHS:
return valid_token
endpoint_active = EndpointRestriction.filter_one(
EndpointRestriction.endpoint_name.ilike(f"%{endpoint_name}%"),
*EndpointRestriction.valid_record_args(EndpointRestriction),
@ -18,24 +21,22 @@ def parse_token_object_to_dict(request): # from requests import Request
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"This endpoint {endpoint_name} is not active for this user, please contact your responsible company for further information.",
)
if valid_token.user_type == 1:
if not valid_token.selected_company:
raise HTTPException(
status_code=status.HTTP_418_IM_A_TEAPOT,
detail="Selected company is not found in the token object.",
)
selected_event = Events.filter_all(
selected_event = Events.filter_one(
Events.endpoint_id == endpoint_active.id,
Events.id.in_(valid_token.selected_company.reachable_event_list_id),
*Events.valid_record_args(Events),
)
if not selected_event.data:
).data
if not selected_event:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="This endpoint requires event validation. Please contact your responsible company to use this event.",
)
selected_event = selected_event.data[0]
event_function_class = getattr(selected_event, "function_class", None)
event_function_code = getattr(selected_event, "function_code", None)
function_class = getattr(events, event_function_class, None)

View File

@ -1,4 +1,4 @@
from sqlalchemy import String
from sqlalchemy import String, Boolean
from databases.sql_models.core_mixin import CrudCollection
from sqlalchemy.orm import mapped_column, Mapped

View File

@ -37,8 +37,7 @@ from .company.employee.router import employee_route
from .events.events.bind_events_router import bind_events_route
from .events.modules.router import modules_route
from service_app.routers.events.modules.bind_events_router import bind_modules_route
from .events.modules.bind_events_router import bind_modules_route
from .events.services.bind_services_router import bind_services_route
from .events.services.router import services_route

View File

@ -38,20 +38,17 @@ login_route.include_router(login_route, include_in_schema=True)
def authentication_select_company_or_occupant_type(
request: Request, data: Union[EmployeeSelection, OccupantSelection]
):
active_function = getattr(
AuthenticationSelectEventMethod,
"authentication_select_company_or_occupant_type",
token_dict = parse_token_object_to_dict(request=request)
return AuthenticationSelectEventMethod.authentication_select_company_or_occupant_type(
data=data, request=request, token_dict=token_dict
)
return active_function(data=data, request=request, token_dict=None)
@login_route.post(path="/login", summary="Login user with domain and password")
def authentication_login_with_domain_and_creds(request: Request, data: Login):
active_function = getattr(
AuthenticationLoginEventMethod, "authentication_login_with_domain_and_creds"
return AuthenticationLoginEventMethod.authentication_login_with_domain_and_creds(
request=request, data=data
)
return active_function(request=request, data=data)
@login_route.get(path="/valid", summary="Check access token is valid")