from typing import Any, Union from fastapi import Request from ApiLayers.ApiLibrary.common.line_number import get_line_number_for_error from ApiLayers.ApiServices.Login.user_login_handler import UserLoginModule from ApiLayers.ApiServices.Token.token_handler import TokenService from ApiLayers.ApiValidations.Custom.token_objects import CompanyToken, OccupantToken from ApiLayers.ApiValidations.Response.default_response import ( EndpointSuccessResponse, EndpointNotAcceptableResponse, EndpointBadRequestResponse, ) from ApiLayers.ErrorHandlers import HTTPExceptionApi from ApiLayers.Schemas import ( BuildLivingSpace, BuildParts, RelationshipEmployee2Build, Companies, Departments, Duties, Duty, Staff, Employees, Event2Employee, Event2Occupant, OccupantTypes, Users, UsersTokens, ) from Events.base_request_model import TokenDictType, BaseRouteModel from Services.Redis.Actions.actions import RedisActions from ApiLayers.AllConfigs.Redis.configs import RedisAuthKeys class Handlers: """Class for handling authentication functions""" @classmethod # Requires no auth context def handle_employee_selection( cls, request: Request, data: Any, token_dict: TokenDictType ): db = Users.new_session() if data.company_uu_id not in token_dict.companies_uu_id_list: raise HTTPExceptionApi( error_code="HTTP_400_BAD_REQUEST", lang=token_dict.lang, loc=get_line_number_for_error(), sys_msg="Company not found in token", ) selected_company: Companies = Companies.filter_one( Companies.uu_id == data.company_uu_id, db=db ).data if not selected_company: raise HTTPExceptionApi( error_code="HTTP_400_BAD_REQUEST", lang=token_dict.lang, loc=get_line_number_for_error(), sys_msg="Company not found in token", ) # Get duties IDs for the company duties_ids = [ duty.id for duty in Duties.filter_all( Duties.company_id == selected_company.id, db=db ).data ] # Get staff IDs staff_ids = [ staff.id for staff in Staff.filter_all(Staff.duties_id.in_(duties_ids), db=db).data ] # Get employee employee: Employees = Employees.filter_one( Employees.people_id == token_dict.person_id, Employees.staff_id.in_(staff_ids), db=db, ).data if not employee: raise HTTPExceptionApi( error_code="HTTP_400_BAD_REQUEST", lang=token_dict.lang, loc=get_line_number_for_error(), sys_msg="Employee not found in token", ) # Get reachable events reachable_event_codes = Event2Employee.get_event_codes(employee_id=employee.id) # Get staff and duties staff = Staff.filter_one(Staff.id == employee.staff_id, db=db).data duties = Duties.filter_one(Duties.id == staff.duties_id, db=db).data department = Departments.filter_one( Departments.id == duties.department_id, db=db ).data # Get bulk duty bulk_id = Duty.filter_by_one(system=True, duty_code="BULK", db=db).data bulk_duty_id = Duties.filter_by_one( company_id=selected_company.id, duties_id=bulk_id.id, db=db, ).data # Create company token company_token = CompanyToken( company_uu_id=selected_company.uu_id.__str__(), company_id=selected_company.id, department_id=department.id, department_uu_id=department.uu_id.__str__(), duty_id=duties.id, duty_uu_id=duties.uu_id.__str__(), bulk_duties_id=bulk_duty_id.id, staff_id=staff.id, staff_uu_id=staff.uu_id.__str__(), employee_id=employee.id, employee_uu_id=employee.uu_id.__str__(), reachable_event_codes=reachable_event_codes, ) try: # Update Redis return TokenService.update_token_at_redis( request=request, add_payload=company_token ) except Exception as e: raise HTTPExceptionApi( error_code="", lang="en", loc=get_line_number_for_error(), sys_msg=f"{e}", ) @classmethod # Requires no auth context def handle_occupant_selection( cls, request: Request, data: Any, token_dict: TokenDictType ): """Handle occupant type selection""" db = BuildLivingSpace.new_session() # Get selected occupant type selected_build_living_space: BuildLivingSpace = BuildLivingSpace.filter_one( BuildLivingSpace.uu_id == data.build_living_space_uu_id, db=db, ).data if not selected_build_living_space: raise HTTPExceptionApi( error_code="HTTP_400_BAD_REQUEST", lang=token_dict.lang, loc=get_line_number_for_error(), sys_msg="Selected occupant type not found", ) # Get reachable events reachable_event_codes = Event2Occupant.get_event_codes( build_living_space_id=selected_build_living_space.id ) occupant_type = OccupantTypes.filter_one_system( OccupantTypes.id == selected_build_living_space.occupant_type_id, db=db, ).data build_part = BuildParts.filter_one( BuildParts.id == selected_build_living_space.build_parts_id, db=db, ).data build = BuildParts.filter_one( BuildParts.id == build_part.build_id, db=db, ).data responsible_employee = Employees.filter_one( Employees.id == build_part.responsible_employee_id, db=db, ).data related_company = RelationshipEmployee2Build.filter_one( RelationshipEmployee2Build.member_id == build.id, db=db, ).data # Get company company_related = Companies.filter_one( Companies.id == related_company.company_id, db=db, ).data # Create occupant token occupant_token = OccupantToken( living_space_id=selected_build_living_space.id, living_space_uu_id=selected_build_living_space.uu_id.__str__(), occupant_type_id=occupant_type.id, occupant_type_uu_id=occupant_type.uu_id.__str__(), occupant_type=occupant_type.occupant_type, build_id=build.id, build_uuid=build.uu_id.__str__(), build_part_id=build_part.id, build_part_uuid=build_part.uu_id.__str__(), responsible_employee_id=responsible_employee.id, responsible_employee_uuid=responsible_employee.uu_id.__str__(), responsible_company_id=company_related.id, responsible_company_uuid=company_related.uu_id.__str__(), reachable_event_codes=reachable_event_codes, ) try: # Update Redis return TokenService.update_token_at_redis( request=request, add_payload=occupant_token ) except Exception as e: raise HTTPExceptionApi( error_code="", lang="en", loc=get_line_number_for_error(), sys_msg=f"{e}", ) class AuthenticationFunctions(BaseRouteModel): """Class for handling authentication functions""" @classmethod # Requires no auth context def authentication_login_with_domain_and_creds(cls, request: Request, data: Any): """ Authenticate user with domain and credentials. Args: request: FastAPI request object data: Request body containing login credentials { "data": { "domain": "evyos.com.tr", "access_key": "karatay.berkay.sup@evyos.com.tr", "password": "string", "remember_me": false } } Returns: SuccessResponse containing authentication token and user info """ # Get token from login module user_login_module = UserLoginModule(request=request) user_login_module.login_user_via_credentials(access_data=data) user_login_module.language = "en" # Return response with token and headers return EndpointSuccessResponse( code="LOGIN_SUCCESS", lang=user_login_module.language ).as_dict(data=user_login_module.as_dict) @classmethod # Requires auth context def authentication_select_company_or_occupant_type(cls, data: Any): """ Handle selection of company or occupant type {"data": {"build_living_space_uu_id": ""}} | {"data": {"company_uu_id": ""}} { "data": {"company_uu_id": "e9869a25-ba4d-49dc-bb0d-8286343b184b"} } { "data": {"build_living_space_uu_id": "e9869a25-ba4d-49dc-bb0d-8286343b184b"} } """ selection_dict = dict( request=cls.context_retriever.request, token_dict=cls.context_retriever.token, data=data, ) if cls.context_retriever.token.is_employee: if Handlers.handle_employee_selection(**selection_dict): return EndpointSuccessResponse( code="LOGIN_SELECT", lang=cls.context_retriever.token.lang ).as_dict( data={"selected": data.company_uu_id, **cls.context_retriever.base} ) elif cls.context_retriever.token.is_occupant: if Handlers.handle_occupant_selection(**selection_dict): return EndpointSuccessResponse( code="LOGIN_SELECT", lang=cls.context_retriever.token.lang ).as_dict( data={ "selected": data.build_living_space_uu_id, **cls.context_retriever.base, } ) @classmethod # Requires auth context def authentication_check_token_is_valid(cls): """Check if token is valid for user""" if cls.context_retriever.token: return EndpointSuccessResponse( code="TOKEN_VALID", lang=cls.context_retriever.token.lang ).as_dict(data=cls.context_retriever.base) return { "completed": False, "message": "Token is not valid", } @classmethod # Requires not auth context def authentication_access_token_user_info(cls): """Refresh user info using access token""" if cls.context_retriever.token: db = Users.new_session() if found_user := Users.filter_one( Users.id == cls.context_retriever.token.user_id, db=db ).data: return EndpointSuccessResponse( code="USER_INFO_REFRESHED", lang=cls.context_retriever.token.lang ).as_dict( { "access_token": cls.context_retriever.get_token, "user": found_user.get_dict(), } ) if not found_user: return EndpointNotAcceptableResponse( code="USER_NOT_FOUND", lang=cls.context_retriever.token.lang ).as_dict(data={}) @classmethod # Requires no auth context def authentication_change_password(cls, data: Any): """Change password with access token""" if cls.context_retriever.token: db = Users.new_session() if found_user := Users.filter_one( Users.id == cls.context_retriever.token.user_id, db=db ).data: found_user.set_password(data.new_password) return EndpointSuccessResponse( code="PASSWORD_CHANGED", lang=cls.context_retriever.token.lang ).as_dict(data={"user": found_user.get_dict()}) if not found_user: return EndpointNotAcceptableResponse( code="USER_NOT_FOUND", lang=cls.context_retriever.token.lang ).as_dict(data={}) @classmethod # Requires not auth context def authentication_create_password(cls, data: Any): """Create password with password reset token requested via email""" db = Users.new_session() if not data.re_password == data.password: return EndpointNotAcceptableResponse( code="PASSWORD_NOT_MATCH", lang=cls.context_retriever.token.lang ).as_dict(data={"password": data.password, "re_password": data.re_password}) if found_user := Users.filter_one( Users.password_token == data.password_token, db=db ).data: found_user.create_password(found_user=found_user, password=data.password) found_user.password_token = "" found_user.save() return EndpointSuccessResponse( code="CREATED_PASSWORD", lang=cls.context_retriever.token.lang ).as_dict(data={"user": found_user.get_dict()}) @classmethod # Requires auth context def authentication_disconnect_user(cls): """Disconnect all sessions of user in access token""" db = Users.new_session() found_user = Users.filter_one_system( Users.id == cls.context_retriever.token.user_id, db=db ).data if not found_user: return EndpointNotAcceptableResponse( code="USER_NOT_FOUND", lang=cls.context_retriever.token.lang ).as_dict(data={}) registered_tokens = UsersTokens.filter_all( UsersTokens.user_id == cls.context_retriever.token.user_id, db=db ) if registered_tokens.count: registered_tokens.query.delete() UsersTokens.save(db=db) RedisActions.delete( list_keys=[f"{RedisAuthKeys.AUTH}:*:{str(found_user.uu_id)}"] ) return EndpointSuccessResponse( code="DISCONNECTED_USER", lang=cls.context_retriever.token.lang ).as_dict(data={"user": found_user.get_dict()}) @classmethod # Requires auth context def authentication_logout_user(cls, data: Any): """Logout only single session of user which domain is provided""" db = Users.new_session() found_user = Users.filter_one_system( Users.id == cls.context_retriever.token.user_id, db=db ).data if not found_user: return EndpointNotAcceptableResponse( code="USER_NOT_FOUND", lang=cls.context_retriever.token.lang ).as_dict(data={}) registered_tokens = UsersTokens.filter_all_system( UsersTokens.user_id == cls.context_retriever.token.user_id, UsersTokens.domain == cls.context_retriever.token.domain, db=db, ) if registered_tokens.count: registered_tokens.query.delete() UsersTokens.save(db=db) TokenService.remove_token_with_domain(user=found_user, domain=data.domain) return EndpointSuccessResponse( code="LOGOUT_USER", lang=cls.context_retriever.token.lang ).as_dict(data={"user": found_user.get_dict()}) @classmethod # Requires not auth context def authentication_refresher_token(cls, request: Request, data: Any): """ Refresh access token with refresher token { "data": { "refresh_token": "string", "domain": "string" } } """ import arrow from ApiLayers.ApiServices.Token.token_handler import TokenService db = UsersTokens.new_session() token_refresher: UsersTokens = UsersTokens.filter_by_one( token=data.refresh_token, domain=data.domain, db=db, ).data language = request.headers.get("evyos-language", "tr") if not token_refresher: return EndpointNotAcceptableResponse( code="REFRESHER_NOT_FOUND", lang=language ).as_dict(data={"refresh_token": data.refresh_token}) if found_user := Users.filter_one( Users.id == token_refresher.user_id, db=db ).data: token_created = TokenService.set_access_token_to_redis( request=request, user=found_user, domain=data.domain, remember=True, ) found_user.last_agent = request.headers.get("User-Agent", None) found_user.last_platform = request.headers.get("Origin", None) found_user.last_remote_addr = getattr( request, "remote_addr", None ) or request.headers.get("X-Forwarded-For", None) found_user.last_seen = str(arrow.now()) response_data = { "access_token": token_created.get("access_token"), "refresh_token": data.refresh_token, } return EndpointSuccessResponse(code="TOKEN_REFRESH", lang=language).as_dict( data=response_data ) raise EndpointNotAcceptableResponse( code="USER_NOT_FOUND", lang=language ).as_dict(data={}) @classmethod # Requires not auth context def authentication_forgot_password(cls, data: Any): """Send an email to user for a valid password reset token""" import arrow from ApiLayers.ApiServices.Token.token_handler import TokenService from ApiLayers.AllConfigs.Templates.password_templates import ( change_your_password_template, ) from Services.Email.send_email import email_sender from config import ApiStatic db = Users.new_session() request = cls.context_retriever.request found_user: Users = Users.check_user_exits( access_key=data.access_key, domain=data.domain ) forgot_key = TokenService._create_access_token(access=False) forgot_link = ApiStatic.forgot_link(forgot_key=forgot_key) send_email_completed = email_sender.send_email( subject=f"Dear {found_user.user_tag}, your forgot password link has been sent.", receivers=[str(found_user.email)], html=change_your_password_template( user_name=found_user.user_tag, forgot_link=forgot_link ), ) if not send_email_completed: return EndpointBadRequestResponse( code="EMAIL_NOT_SENT", lang=cls.context_retriever.token.lang ).as_dict(data={"email": found_user.email}) found_user.password_token = forgot_key found_user.password_token_is_valid = str(arrow.now().shift(days=1)) found_user.save(db=db) return EndpointSuccessResponse( code="FORGOT_PASSWORD", lang=cls.context_retriever.token.lang ).as_dict( data={ "user": found_user.get_dict(), "forgot_link": forgot_link, "token": forgot_key, } ) @classmethod # Requires not auth context def authentication_reset_password(cls, data: Any): """Reset password with forgot password token""" return cls.context_retriever.base @classmethod # Requires not auth context def authentication_download_avatar(cls): """Download avatar icon and profile info of user""" import arrow db = Users.new_session() if found_user := Users.filter_one( Users.id == cls.context_retriever.token.user_id, db=db ).data: expired_starts = str(arrow.now() - arrow.get(str(found_user.expiry_ends))) expired_int = ( arrow.now().datetime - arrow.get(str(found_user.expiry_ends)).datetime ) user_info = { "lang": cls.context_retriever.token.lang, "full_name": found_user.person.full_name, "avatar": found_user.avatar, "remember_me": found_user.remember_me, "expiry_ends": str(found_user.expiry_ends), "expired_humanized": expired_starts, "expired_day": int(expired_int.days) * -1, } return EndpointSuccessResponse( code="USER_AVATAR", lang=cls.context_retriever.token.lang ).as_dict(data=user_info) return EndpointNotAcceptableResponse( code="USER_NOT_FOUND", lang=cls.context_retriever.token.lang ).as_dict(data={})