auth response and error codes imp

This commit is contained in:
berkay 2025-01-28 21:35:52 +03:00
parent 8550c2af71
commit a0b1b1bef9
16 changed files with 639 additions and 314 deletions

View File

@ -10,11 +10,14 @@
<change beforePath="$PROJECT_DIR$/ApiLayers/ApiValidations/Response/default_response.py" beforeDir="false" afterPath="$PROJECT_DIR$/ApiLayers/ApiValidations/Response/default_response.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/ApiLayers/LanguageModels/Request/Auth/login.py" beforeDir="false" afterPath="$PROJECT_DIR$/ApiLayers/LanguageModels/Request/Auth/login.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/ApiLayers/LanguageModels/Request/__init__.py" beforeDir="false" afterPath="$PROJECT_DIR$/ApiLayers/LanguageModels/Request/__init__.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/ApiLayers/LanguageModels/Response/authentication/auth.py" beforeDir="false" afterPath="$PROJECT_DIR$/ApiLayers/LanguageModels/Response/authentication/auth.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/ApiLayers/LanguageModels/set_defaults/language_setters.py" beforeDir="false" afterPath="$PROJECT_DIR$/ApiLayers/LanguageModels/set_defaults/language_setters.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/ApiLayers/Schemas/identity/identity.py" beforeDir="false" afterPath="$PROJECT_DIR$/ApiLayers/Schemas/identity/identity.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/DockerApiServices/InitServiceApi/create_all_dependecies.py" beforeDir="false" afterPath="$PROJECT_DIR$/DockerApiServices/InitServiceApi/create_all_dependecies.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/Events/AllEvents/authentication/auth/auth.py" beforeDir="false" afterPath="$PROJECT_DIR$/Events/AllEvents/authentication/auth/auth.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/Events/AllEvents/authentication/auth/api_events.py" beforeDir="false" afterPath="$PROJECT_DIR$/Events/AllEvents/authentication/auth/api_events.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/Events/AllEvents/authentication/auth/function_handlers.py" beforeDir="false" afterPath="$PROJECT_DIR$/Events/AllEvents/authentication/auth/function_handlers.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/Events/AllEvents/authentication/auth/models.py" beforeDir="false" afterPath="$PROJECT_DIR$/Events/AllEvents/authentication/auth/models.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/Events/Engine/abstract_class.py" beforeDir="false" afterPath="$PROJECT_DIR$/Events/Engine/abstract_class.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/Services/Email/send_email.py" beforeDir="false" afterPath="$PROJECT_DIR$/Services/Email/send_email.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/Services/PostgresDb/Models/filter_functions.py" beforeDir="false" afterPath="$PROJECT_DIR$/Services/PostgresDb/Models/filter_functions.py" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
@ -39,14 +42,22 @@
<option name="hideEmptyMiddlePackages" value="true" />
<option name="showLibraryContents" value="true" />
</component>
<component name="PropertiesComponent">{
&quot;keyToString&quot;: {
&quot;RunOnceActivity.ShowReadmeOnStart&quot;: &quot;true&quot;,
&quot;RunOnceActivity.git.unshallow&quot;: &quot;true&quot;,
&quot;git-widget-placeholder&quot;: &quot;development&quot;,
&quot;last_opened_file_path&quot;: &quot;/home/berkay/git-gitea-evyos/wag-managment-api-service-version-5&quot;
<component name="PropertiesComponent"><![CDATA[{
"keyToString": {
"RunOnceActivity.ShowReadmeOnStart": "true",
"RunOnceActivity.git.unshallow": "true",
"git-widget-placeholder": "development",
"last_opened_file_path": "/home/berkay/git-gitea-evyos/wag-managment-api-service-version-5/ApiLayers/LanguageModels/templates"
}
}</component>
}]]></component>
<component name="RecentsManager">
<key name="CopyFile.RECENT_KEYS">
<recent name="$PROJECT_DIR$/ApiLayers/LanguageModels/templates" />
</key>
<key name="MoveFile.RECENT_KEYS">
<recent name="$PROJECT_DIR$/ApiLayers/AllConfigs" />
</key>
</component>
<component name="RunManager">
<configuration name="events_file" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<module name="wag-managment-api-service-version-5" />

View File

@ -51,7 +51,6 @@ class RedisCategoryKeys:
PAGE_MAPPER: str = "PAGE_MAPPER"
MENU_MAPPER: str = "MENU_MAPPER"
class RedisValidationKeysAction:
# LANGUAGE_MODELS:DYNAMIC:VALIDATIONS:
dynamic_validation_key: str = f"{RedisValidationKeys.LANGUAGE_MODELS}:{RedisValidationKeys.DYNAMIC}:{RedisValidationKeys.VALIDATIONS}"

View File

@ -0,0 +1,243 @@
import datetime
def change_your_password_template(**kwargs):
user_name, forgot_link, current_year = (
kwargs["user_name"],
kwargs["forgot_link"],
str(datetime.datetime.now().year),
)
template = """<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Bootstrap demo</title>
<style>
body {
font-family: Arial, sans-serif;
background-color: #f4f4f4;
margin: 0;
padding: 0;
}
.email-container {
max-width: 600px;
margin: 0 auto;
background-color: #ffffff;
padding: 20px;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
}
.header {
text-align: center;
padding-bottom: 20px;
border-bottom: 1px solid #dddddd;
}
.header img {
max-width: 100px;
}
.content {
padding: 20px 0;
}
.footer {
text-align: center;
padding: 10px;
font-size: 12px;
color: #777777;
}
.btn-success {
color: #fff;
background-color: #198754;
border-color: #198754;
text-align: center;
text-decoration: none;
vertical-align: middle;
width: 150px;
height: 40px;
border-radius: 5px;
font-weight: 400;
padding: .375rem .75rem;
}
</style>
</head>
<body>
<div class="email-container">
<div class="header">
<img src="" alt="Company Logo">
<h2>Reset Password</h2>
</div>
<div class="content">
<p>Dear %s,</p>
<p>We have received a request to reset your password for your account with Let's Program Blog. To complete the password reset process, please click on the button below:</p>
<p>Please note that this link is only valid for a day only. If you did not request a password reset, please disregard this message.</p>
<a href="%s"><button type="button" class="btn-success">Reset Password</button></a>
</div>
<div class="footer">
<p>&copy; %s Evyos Ltd Şti. All rights reserved.</p>
</div>
</div>
</body>
</html>
""" % (
user_name,
forgot_link,
current_year,
)
return template
def password_is_changed_template(**kwargs):
user_name, current_year = kwargs["user_name"], str(datetime.datetime.now().year)
template = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Thank You for Changing Your Password</title>
<style>
body {
font-family: Arial, sans-serif;
background-color: #f4f4f4;
margin: 0;
padding: 0;
}
.email-container {
max-width: 600px;
margin: 0 auto;
background-color: #ffffff;
padding: 20px;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
}
.header {
text-align: center;
padding-bottom: 20px;
border-bottom: 1px solid #dddddd;
}
.header img {
max-width: 100px;
}
.content {
padding: 20px 0;
}
.footer {
text-align: center;
padding: 10px;
font-size: 12px;
color: #777777;
}
</style>
</head>
<body>
<div class="email-container">
<div class="header">
<img src="" alt="Company Logo">
<h2>Your Password has changed</h2>
</div>
<div class="content">
<p>Dear %s,</p>
<p>We wanted to let you know that your password has been successfully updated.
If you did not make this change or if you believe an unauthorized person has accessed your account,
please contact our support team immediately.</p>
<p>Thank you for helping us keep your account secure.</p>
</div>
<div class="footer">
<p>&copy; %s Evyos Ltd Şti. All rights reserved.</p>
</div>
</div>
</body>
</html>
""" % (
user_name,
current_year,
)
return template
def invalid_ip_or_address_found(**kwargs):
user_name, current_year, address = (
kwargs["user_name"],
str(datetime.datetime.now().year),
kwargs.get("address"),
)
template = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Thank You for Changing Your Password</title>
<style>
body {
font-family: Arial, sans-serif;
background-color: #f4f4f4;
margin: 0;
padding: 0;
}
.email-container {
max-width: 600px;
margin: 0 auto;
background-color: #ffffff;
padding: 20px;
border-radius: 8px;
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
}
.header {
text-align: center;
padding-bottom: 20px;
border-bottom: 1px solid #dddddd;
}
.header img {
max-width: 100px;
}
.content {
padding: 20px 0;
}
.footer {
text-align: center;
padding: 10px;
font-size: 12px;
color: #777777;
}
</style>
</head>
<body>
<div class="email-container">
<div class="header">
<img src="" alt="Company Logo">
<h2>An Unknown login has been attempted</h2>
</div>
<div class="content">
<p>Dear %s,</p>
<p>We wanted to let you know that an unusual login attempt has been tried from address below.
If you have login from address below please ignore this message</p>
<p>Thank you for helping us keep your account secure.</p>
<h1>Address of ip attempt</h1>
<p>City : %s</p>
<p>Zip Code : %s</p>
<p>Country : %s</p>
<p>Region : %s</p>
<p>Region Name : %s</p>
<p>If you are not login from this address lets us now by clicking link below</p>
<a href="%s"><button type="button" class="btn-success">Reset Password</button></a>
</div>
<div class="footer">
<p>&copy; %s Evyos Ltd Şti. All rights reserved.</p>
</div>
</div>
</body>
</html>
""" % (
user_name,
address["city"],
address["zip"],
address["country"],
address["region"],
address["regionName"],
kwargs["notice_link"],
current_year,
)
return template

View File

@ -2,24 +2,11 @@ from typing import Optional
from fastapi import status
from fastapi.responses import JSONResponse
from ApiLayers.LanguageModels.set_defaults.static_validation_retriever import StaticValidationRetriever
class BaseEndpointResponse:
def __init__(self, lang: str, code: str):
self.lang = lang
self.code = code
@property
def response(self) -> Optional[dict]:
from Services.Redis import RedisActions
from ApiLayers.AllConfigs.Redis.configs import RedisValidationKeysAction
language_model = RedisActions.get_json(
list_keys=[RedisValidationKeysAction.static_response_key, self.code, self.lang]
)
if language_model.status:
return language_model.first
return {"message": f"{self.code} -> Language model not found"}
class BaseEndpointResponse(StaticValidationRetriever):
pass
class EndpointSuccessResponse(BaseEndpointResponse): # 200 OK
@ -105,10 +92,10 @@ class EndpointMethodNotAllowedResponse(BaseEndpointResponse): # 405 Method Not
class EndpointNotAcceptableResponse(BaseEndpointResponse): # 406 Not Acceptable
def as_dict(self):
def as_dict(self, data: Optional[dict] = None):
return JSONResponse(
status_code=status.HTTP_406_NOT_ACCEPTABLE,
content=dict(completed=False, **self.response, lang=self.lang),
content=dict(completed=False, data=data, **self.response, lang=self.lang),
)

View File

@ -15,3 +15,15 @@ LoginRequestLanguageModel: Dict[str, Dict[str, str]] = {
"remember_me": "Remember Me",
},
}
SelectRequestLanguageModel: Dict[str, Dict[str, str]] = {
"tr": {
"company_uu_id": "Şirket UU ID",
"build_living_space_uu_id": "Bina Konut UU ID",
},
"en": {
"company_uu_id": "Company UU ID",
"build_living_space_uu_id": "Build Living Space UU ID",
},
}

View File

@ -1,3 +1,9 @@
from .Auth.login import LoginRequestLanguageModel
from .Auth.login import (
LoginRequestLanguageModel,
SelectRequestLanguageModel,
)
__all__ = ["LoginRequestLanguageModel"]
__all__ = [
"LoginRequestLanguageModel",
"SelectRequestLanguageModel",
]

View File

@ -1,3 +1,5 @@
authResponses = {
"LOGIN_SELECT": {
"tr": {
@ -15,4 +17,60 @@ authResponses = {
"message": "Login successful. Please select an company/duty to continue.",
},
},
"TOKEN_VALID": {
"tr": {
"message": "Header'da belirtilen token geçerli.",
},
"en": {
"message": "The token specified in the header is valid.",
},
},
"USER_INFO_REFRESHED": {
"tr": {
"message": "Token aracılığıyla kullanıcı bilgileri başarılı bir şekilde güncellendi.",
},
"en": {
"message": "User information updated successfully via token.",
},
},
"CREATED_PASSWORD": {
"tr": {
"message": "Şifre başarılı bir şekilde oluşturuldu.",
},
"en": {
"message": "Password created successfully.",
},
},
"PASSWORD_CHANGED": {
"tr": {
"message": "Şifre başarılı bir şekilde değiştirildi.",
},
"en": {
"message": "Password changed successfully.",
},
},
"DISCONNECTED_USER": {
"tr": {
"message": "Kullanıcı başarılı bir şekilde çıkış yaptı.",
},
"en": {
"message": "User logged out successfully.",
},
},
"USER_NOT_FOUND": {
"tr": {
"message": "Kullanıcı bulunamadı. Lütfen tekrar deneyiniz.",
},
"en": {
"message": "User not found. Please try again.",
},
},
"FORGOT_PASSWORD": {
"tr": {
"message": "Şifre sıfırlama talebi başarılı bir şekilde oluşturuldu.",
},
"en": {
"message": "Password reset request created successfully.",
},
}
}

View File

@ -1,4 +1,4 @@
from ApiLayers.AllConfigs.Redis.configs import RedisValidationKeys
from ApiLayers.AllConfigs.Redis.configs import RedisValidationKeysAction, RedisValidationKeys
from ApiLayers.AllConfigs.main import LanguageConfig
from Events.Engine.set_defaults.category_cluster_models import CategoryClusterController
from Services.Redis.Actions.actions import RedisActions
@ -28,10 +28,7 @@ class SetDefaultLanguageModelsRedis:
for lang in list(LanguageConfig.SUPPORTED_LANGUAGES):
for code, dict_to_set in response.items():
# [SAVE]REDIS => LANGUAGE_MODELS:STATIC:RESPONSES:{ResponseCode}:tr = {...}
redis_key = f"{RedisValidationKeys.LANGUAGE_MODELS}:{RedisValidationKeys.STATIC}"
set_key = (
f"{redis_key}:{RedisValidationKeys.RESPONSES}:{code}:{lang}"
)
set_key = f"{RedisValidationKeysAction.static_response_key}:{code}:{lang}"
RedisActions.set_json(list_keys=[set_key], value=dict_to_set[lang])
self.std_out += f"Language Response Models are set to Redis\n"
@ -39,8 +36,7 @@ class SetDefaultLanguageModelsRedis:
for lang in list(LanguageConfig.SUPPORTED_LANGUAGES):
for code, dict_to_set in response.items():
# [SAVE]REDIS => LANGUAGE_MODELS:STATIC:ERRORCODES:{ErrorCode}:en = {...}
redis_key = f"{RedisValidationKeys.LANGUAGE_MODELS}:{RedisValidationKeys.STATIC}"
set_key = f"{redis_key}:{code}:{lang}"
set_key = f"{RedisValidationKeysAction.static_error_code_key}:{code}:{lang}"
RedisActions.set_json(list_keys=[set_key], value=dict_to_set[lang])
self.std_out += f"Language Error Models are set to Redis\n"
@ -80,26 +76,15 @@ class SetClusterLanguageModelsRedis:
def set_models_from_cluster(self):
"""
iterate(ClusterToMethod) to set all models by pairing function codes
"""
for cluster_control in self.cluster_controller_group.imports:
self.std_out += f"Setting models from cluster : {cluster_control.name}\n"
for endpoint in cluster_control.category_cluster.ENDPOINTS.values():
for key_event, event in endpoint.EVENTS.items():
merged_language_dict = self.merge_language_dicts(
event.LANGUAGE_MODELS
)
request_validation = getattr(
event.REQUEST_VALIDATOR, "model_fields", None
)
response_validation = getattr(
event.RESPONSE_VALIDATOR, "model_fields", None
)
objects_missing = (
bool(request_validation)
and bool(response_validation)
and bool(merged_language_dict)
)
merged_language_dict = self.merge_language_dicts(event.LANGUAGE_MODELS)
request_validation = getattr(event.REQUEST_VALIDATOR, "model_fields", None)
response_validation = getattr(event.RESPONSE_VALIDATOR, "model_fields", None)
objects_missing = bool(request_validation) and bool(merged_language_dict)
if not objects_missing:
continue
if merged_language_dict:
@ -113,7 +98,6 @@ class SetClusterLanguageModelsRedis:
def set_all(self):
"""
Set all language models from cluster list by pairing event code and models
"""
self.set_models_from_cluster()
if self.events_lm_dict and self.events_rq_dict:
@ -121,28 +105,21 @@ class SetClusterLanguageModelsRedis:
[SAVE]REDIS => LANGUAGE_MODELS:DYNAMIC:HEADERS:REQUEST:{FunctionCode}:tr = {...}
Get Request BaseModel pydantic model_fields of each event and set headers which are included in model_fields
"""
for lang in list(
LanguageConfig.SUPPORTED_LANGUAGES
): # Iterate(languages ["tr", "en"])
for lang in list(LanguageConfig.SUPPORTED_LANGUAGES): # Iterate(languages ["tr", "en"])
for key_field in self.events_rq_dict.keys(): # Iterate(function_code)
request_model = self.events_rq_dict[key_field]
if not request_model:
self.std_out += (
f"Request validation model not found for {key_field}\n"
)
self.std_out += f"Request validation model not found for {key_field}\n"
continue
if (
key_field not in self.events_rq_dict
or key_field not in self.events_lm_dict
):
self.std_out += (
f"Request language model not found for {key_field}\n"
)
self.std_out += f"Request language model are missing {key_field}\n"
continue
value_to_set = {}
redis_key = f"{RedisValidationKeys.LANGUAGE_MODELS}:{RedisValidationKeys.DYNAMIC}"
set_key = f"{redis_key}:{key_field}:{lang}"
set_key = f"{RedisValidationKeysAction.dynamic_header_request_key}:{key_field}:{lang}"
for key in request_model.keys():
value_to_set[key] = self.events_lm_dict[key_field][lang][key]
RedisActions.set_json(list_keys=[set_key], value=value_to_set)
@ -153,28 +130,21 @@ class SetClusterLanguageModelsRedis:
[SAVE]REDIS => LANGUAGE_MODELS:DYNAMIC:HEADERS:RESPONSE:{FunctionCode}:en = {...}
Get Response BaseModel pydantic model_fields of each event and set headers which are included in model_fields
"""
for lang in list(
LanguageConfig.SUPPORTED_LANGUAGES
): # Iterate(languages ["tr", "en"])
for lang in list(LanguageConfig.SUPPORTED_LANGUAGES): # Iterate(languages ["tr", "en"])
for key_field in self.events_rs_dict.keys(): # Iterate(function_code)
response_model = self.events_rs_dict[key_field]
if not response_model:
self.std_out += (
f"Response validation model not found for {key_field}\n"
)
self.std_out += f"Response validation model not found for {key_field}\n"
continue
if (
key_field not in self.events_rs_dict
or key_field not in self.events_lm_dict
):
self.std_out += (
f"Response language model not found for {key_field}\n"
)
self.std_out += f"Response language model are missing {key_field}\n"
continue
value_to_set = {}
redis_key = f"{RedisValidationKeys.LANGUAGE_MODELS}:{RedisValidationKeys.DYNAMIC}"
set_key = f"{redis_key}:{key_field}:{lang}"
set_key = f"{RedisValidationKeysAction.dynamic_header_response_key}:{key_field}:{lang}"
for key in response_model.keys():
value_to_set[key] = self.events_lm_dict[key_field][lang][key]
RedisActions.set_json(list_keys=[set_key], value=value_to_set)

View File

@ -0,0 +1,23 @@
from typing import Optional
from Services.Redis import RedisActions
from ApiLayers.AllConfigs.Redis.configs import RedisValidationKeysAction
class StaticValidationRetriever:
lang: str = "tr"
code: str = ""
def __init__(self, lang: str, code: str):
self.lang = lang
self.code = code
@property
def response(self) -> Optional[dict]:
language_model = RedisActions.get_json(
list_keys=[RedisValidationKeysAction.static_response_key, self.code, self.lang]
)
if language_model.status:
return language_model.first
return {"message": f"{self.code} -> Language model not found"}

0
ApiLayers/__init__.py Normal file
View File

View File

@ -1,17 +1,20 @@
from Events.Engine.abstract_class import Event
from ApiLayers.Schemas import Users
from ApiLayers.LanguageModels.Request import (
LoginRequestLanguageModel,
SelectRequestLanguageModel,
)
from .models import AuthenticationRequestModels, AuthenticationResponseModels
from .function_handlers import AuthenticationFunctions
from ApiLayers.LanguageModels.Request import LoginRequestLanguageModel
# Auth Login
authentication_login_super_user_event = Event(
name="authentication_login_super_user_event",
key="a5d2d0d1-3e9b-4b0f-8c7d-6d4a4b4c4d4e",
request_validator=AuthenticationRequestModels.LoginSuperUserRequestModel,
language_models=[],
response_validator=AuthenticationResponseModels.LoginSuperUserResponseModel,
language_models=[LoginRequestLanguageModel],
response_validation_static="LOGIN_SUCCESS",
description="Login super user",
)
@ -26,8 +29,8 @@ authentication_select_super_user_event = Event(
name="authentication_select_super_user_event",
key="a5d2d0d1-3e9b-4b0f-8c7d-6d4a4b4c4d4e",
request_validator=AuthenticationRequestModels.SelectCompanyOrOccupantTypeSuperUserRequestModel,
language_models=[],
response_validator=AuthenticationResponseModels.SelectCompanyOrOccupantTypeSuperUserResponseModel,
language_models=[SelectRequestLanguageModel],
response_validation_static="LOGIN_SELECT",
description="Select company or occupant type super user",
)
@ -42,7 +45,7 @@ authentication_check_token_event = Event(
name="authentication_check_token_event",
key="b6e3d1e2-4f9c-5c1g-9d8e-7e5f6f5e5d5f",
request_validator=None, # TODO: Add request validator
language_models=[Users.__language_model__],
language_models=[],
# response_validator=None, # TODO: Add response validator
description="Check if token is valid",
)
@ -58,14 +61,14 @@ authentication_refresh_user_info_event = Event(
name="authentication_refresh_user_info_event",
key="c7f4e2f3-5g0d-6d2h-0e9f-8f6g7g6f6e6g",
request_validator=None, # TODO: Add request validator
language_models=[Users.__language_model__],
language_models=[],
# response_validator=None, # TODO: Add response validator
description="Refresh user information",
)
authentication_refresh_user_info_event.endpoint_callable = (
AuthenticationFunctions.authentication_refresh_user_info
AuthenticationFunctions.authentication_access_token_user_info
)
@ -74,7 +77,7 @@ authentication_change_password_event = Event(
name="authentication_change_password_event",
key="d8g5f3g4-6h1e-7e3i-1f0g-9g7h8h7g7f7h",
request_validator=None, # TODO: Add request validator
language_models=[Users.__language_model__],
language_models=[],
# response_validator=None, # TODO: Add response validator
description="Change user password",
)
@ -90,7 +93,7 @@ authentication_create_password_event = Event(
name="authentication_create_password_event",
key="e9h6g4h5-7i2f-8f4j-2g1h-0h8i9i8h8g8i",
request_validator=None, # TODO: Add request validator
language_models=[Users.__language_model__],
language_models=[],
# response_validator=None, # TODO: Add response validator
description="Create new password",
)
@ -106,7 +109,7 @@ authentication_disconnect_user_event = Event(
name="authentication_disconnect_user_event",
key="f0i7h5i6-8j3g-9g5k-3h2i-1i9j0j9i9h9j",
request_validator=None, # TODO: Add request validator
language_models=[Users.__language_model__],
language_models=[],
# response_validator=None, # TODO: Add response validator
description="Disconnect all user sessions",
)
@ -122,7 +125,7 @@ authentication_logout_user_event = Event(
name="authentication_logout_user_event",
key="g1j8i6j7-9k4h-0h6l-4i3j-2j0k1k0j0i0k",
request_validator=None, # TODO: Add request validator
language_models=[Users.__language_model__],
language_models=[],
# response_validator=None, # TODO: Add response validator
description="Logout user session",
)
@ -138,7 +141,7 @@ authentication_refresher_token_event = Event(
name="authentication_refresher_token_event",
key="h2k9j7k8-0l5i-1i7m-5j4k-3k1l2l1k1j1l",
request_validator=None, # TODO: Add request validator
language_models=[Users.__language_model__],
language_models=[],
# response_validator=None, # TODO: Add response validator
description="Refresh authentication token",
)
@ -154,7 +157,7 @@ authentication_forgot_password_event = Event(
name="authentication_forgot_password_event",
key="i3l0k8l9-1m6j-2j8n-6k5l-4l2m3m2l2k2m",
request_validator=None, # TODO: Add request validator
language_models=[Users.__language_model__],
language_models=[],
# response_validator=None, # TODO: Add response validator
description="Request password reset",
)
@ -170,7 +173,7 @@ authentication_reset_password_event = Event(
name="authentication_reset_password_event",
key="j4m1l9m0-2n7k-3k9o-7l6m-5m3n4n3m3l3n",
request_validator=None, # TODO: Add request validator
language_models=[Users.__language_model__],
language_models=[],
# response_validator=None, # TODO: Add response validator
description="Reset user password",
)
@ -186,7 +189,7 @@ authentication_download_avatar_event = Event(
name="authentication_download_avatar_event",
key="k5n2m0n1-3o8l-4l0p-8m7n-6n4o5o4n4m4o",
request_validator=None, # TODO: Add request validator
language_models=[Users.__language_model__],
language_models=[],
# response_validator=None, # TODO: Add response validator
description="Download user avatar and profile info",
)

View File

@ -5,7 +5,11 @@ from ApiLayers.ApiLibrary.common.line_number import get_line_number_for_error
from ApiLayers.ApiServices.Login.user_login_handler import UserLoginModule
from ApiLayers.ApiServices.Token.token_handler import TokenService
from ApiLayers.ApiValidations.Custom.token_objects import CompanyToken, OccupantToken
from ApiLayers.ApiValidations.Response.default_response import EndpointSuccessResponse
from ApiLayers.ApiValidations.Response.default_response import (
EndpointSuccessResponse,
EndpointNotAcceptableResponse,
EndpointBadRequestResponse,
)
from ApiLayers.ErrorHandlers import HTTPExceptionApi
from ApiLayers.Schemas import (
BuildLivingSpace,
@ -21,6 +25,7 @@ from ApiLayers.Schemas import (
Event2Occupant,
OccupantTypes,
Users,
UsersTokens,
)
from Events.base_request_model import ContextRetrievers, TokenDictType
@ -149,10 +154,9 @@ class Handlers:
reachable_event_codes = Event2Occupant.get_event_codes(
build_living_space_id=selected_build_living_space.id
)
occupant_type = OccupantTypes.filter_one(
occupant_type = OccupantTypes.filter_one_system(
OccupantTypes.id == selected_build_living_space.occupant_type_id,
db=db,
system=True,
).data
build_part = BuildParts.filter_one(
BuildParts.id == selected_build_living_space.build_parts_id,
@ -267,199 +271,219 @@ class AuthenticationFunctions:
}
)
@classmethod # Requires not auth context
def authentication_check_token_is_valid(cls, data: Any):
@classmethod # Requires auth context
def authentication_check_token_is_valid(cls):
"""Check if token is valid for user"""
# try:
# if RedisActions.get_object_via_access_key(request=request):
# return ResponseHandler.success("Access Token is valid")
# except HTTPException:
# return ResponseHandler.unauthorized("Access Token is NOT valid")
return
if cls.context_retriever.token:
return EndpointSuccessResponse(
code="TOKEN_VALID", lang=cls.context_retriever.token.lang
).as_dict(data=cls.context_retriever.base)
return {
"completed": False,
"message": "Token is not valid",
}
@classmethod # Requires not auth context
def authentication_refresh_user_info(cls, data: Any):
def authentication_access_token_user_info(cls):
"""Refresh user info using access token"""
# try:
# access_token = request.headers.get(Auth.ACCESS_TOKEN_TAG)
# if not access_token:
# return ResponseHandler.unauthorized()
# found_user = Users.filter_one(Users.uu_id == token_dict.user_uu_id).data
# if not found_user:
# return ResponseHandler.not_found("User not found")
# user_token = UsersTokens.filter_one(
# UsersTokens.domain == found_user.domain_name,
# UsersTokens.user_id == found_user.id,
# UsersTokens.token_type == "RememberMe",
# ).data
# response_data = {
# "access_token": access_token,
# "refresh_token": getattr(user_token, "token", None),
# "user": found_user.get_dict(),
# }
# return ResponseHandler.success(
# "User info refreshed successfully",
# data=response_data,
# )
# except Exception as e:
# return ResponseHandler.error(str(e))
return
if cls.context_retriever.token:
db = Users.new_session()
if found_user := Users.filter_one(Users.id == cls.context_retriever.token.user_id, db=db).data:
return EndpointSuccessResponse(
code="USER_INFO_REFRESHED", lang=cls.context_retriever.token.lang
).as_dict({
"access_token": cls.context_retriever.token, "user": found_user.get_dict(),
})
if not found_user:
return EndpointNotAcceptableResponse(
code="USER_NOT_FOUND", lang=cls.context_retriever.token.lang
).as_dict(
data={"user": found_user.get_dict()}
)
@classmethod # Requires no auth context
def authentication_change_password(cls, data: Any):
"""Change password with access token"""
# try:
# if not isinstance(token_dict, EmployeeTokenObject):
# return ResponseHandler.unauthorized("Only employees can change password")
# found_user = Users.filter_one(Users.uu_id == token_dict.user_uu_id).data
# if not found_user:
# return ResponseHandler.not_found("User not found")
# if not found_user.check_password(data.old_password):
# return ResponseHandler.unauthorized("Old password is incorrect")
# found_user.set_password(data.new_password)
# return ResponseHandler.success("Password changed successfully")
# except Exception as e:
# return ResponseHandler.error(str(e))
return
if cls.context_retriever.token:
db = Users.new_session()
if found_user := Users.filter_one(Users.id == cls.context_retriever.token.user_id, db=db).data:
found_user.set_password(data.new_password)
return EndpointSuccessResponse(
code="PASSWORD_CHANGED", lang=cls.context_retriever.token.lang
).as_dict(data={"user": found_user.get_dict()})
if not found_user:
return EndpointNotAcceptableResponse(
code="USER_NOT_FOUND", lang=cls.context_retriever.token.lang
).as_dict(
data={"user": found_user.get_dict()}
)
@classmethod # Requires not auth context
def authentication_create_password(cls, data: Any):
"""Create password with password reset token requested via email"""
# if not data.re_password == data.password:
# raise HTTPException(status_code=status.HTTP_406_NOT_ACCEPTABLE, detail="Password must match")
# if found_user := Users.filter_one(Users.password_token == data.password_token).data:
# found_user.create_password(found_user=found_user, password=data.password)
# found_user.password_token = ""
# found_user.save()
# return ResponseHandler.success("Password is created successfully", data=found_user.get_dict())
# return ResponseHandler.not_found("Record not found")
return
db = Users.new_session()
if not data.re_password == data.password:
return EndpointNotAcceptableResponse(
code="PASSWORD_NOT_MATCH", lang=cls.context_retriever.token.lang
).as_dict(
data={"password": data.password, "re_password": data.re_password}
)
if found_user := Users.filter_one(
Users.password_token == data.password_token, db=db
).data:
found_user.create_password(found_user=found_user, password=data.password)
found_user.password_token = ""
found_user.save()
return EndpointSuccessResponse(
code="CREATED_PASSWORD", lang=cls.context_retriever.token.lang
).as_dict(data={"user": found_user.get_dict()})
@classmethod # Requires auth context
def authentication_disconnect_user(cls, data: Any):
def authentication_disconnect_user(cls):
"""Disconnect all sessions of user in access token"""
# found_user = Users.filter_one(Users.uu_id == token_dict.user_uu_id).data
# if not found_user:
# return ResponseHandler.not_found("User not found")
# if already_tokens := RedisActions.get_object_via_user_uu_id(user_id=str(found_user.uu_id)):
# for key, token_user in already_tokens.items():
# RedisActions.delete(key)
# selected_user = Users.filter_one(Users.uu_id == token_user.get("uu_id")).data
# selected_user.remove_refresher_token(domain=data.domain, disconnect=True)
# return ResponseHandler.success("All sessions are disconnected", data=selected_user.get_dict())
# return ResponseHandler.not_found("Invalid data")
return
db = Users.new_session()
found_user = Users.filter_one_system(
Users.id == cls.context_retriever.token.user_id, db=db
).data
if not found_user:
return EndpointNotAcceptableResponse(
code="USER_NOT_FOUND", lang=cls.context_retriever.token.lang
).as_dict(
data={"user": found_user.get_dict()}
)
registered_tokens = UsersTokens.filter_all(
UsersTokens.user_id == cls.context_retriever.token.id, db=db
)
if registered_tokens.count:
registered_tokens.query.delete()
UsersTokens.save(db=db)
return EndpointSuccessResponse(
code="DISCONNECTED_USER", lang=cls.context_retriever.token.lang
).as_dict(data={"user": found_user.get_dict()})
@classmethod # Requires auth context
def authentication_logout_user(cls, data: Any):
"""Logout only single session of user which domain is provided"""
# token_user = None
# if already_tokens := RedisActions.get_object_via_access_key(request=request):
# for key in already_tokens:
# token_user = RedisActions.get_json(key)
# if token_user.get("domain") == data.domain:
# RedisActions.delete(key)
# selected_user = Users.filter_one(Users.uu_id == token_user.get("uu_id")).data
# selected_user.remove_refresher_token(domain=data.domain)
# return ResponseHandler.success("Session is logged out", data=token_user)
# return ResponseHandler.not_found("Logout is not successfully completed")
context_retriever = ContextRetrievers(func=cls.authentication_logout_user)
return context_retriever.base
db = Users.new_session()
found_user = Users.filter_one_system(
Users.id == cls.context_retriever.token.user_id, db=db
).data
if not found_user:
return EndpointNotAcceptableResponse(
code="USER_NOT_FOUND", lang=cls.context_retriever.token.lang
).as_dict(
data={"user": found_user.get_dict()}
)
registered_tokens = UsersTokens.filter_all_system(
UsersTokens.user_id == cls.context_retriever.token.id,
UsersTokens.domain == cls.context_retriever.token.domain,
db=db,
)
if registered_tokens.count:
registered_tokens.query.delete()
UsersTokens.save(db=db)
return EndpointSuccessResponse(
code="DISCONNECTED_USER", lang=cls.context_retriever.token.lang
).as_dict(data={"user": found_user.get_dict()})
@classmethod # Requires not auth context
def authentication_refresher_token(cls, data: Any):
"""Refresh access token with refresher token"""
# token_refresher = UsersTokens.filter_by_one(
# token=data.refresh_token,
# domain=data.domain,
# **UsersTokens.valid_record_dict,
# ).data
# if not token_refresher:
# return ResponseHandler.not_found("Invalid data")
# if found_user := Users.filter_one(Users.id == token_refresher.user_id).data:
# access_key = AuthActions.save_access_token_to_redis(
# request=request, found_user=found_user, domain=data.domain
# )
# found_user.last_agent = request.headers.get("User-Agent", None)
# found_user.last_platform = request.headers.get("Origin", None)
# found_user.last_remote_addr = getattr(request, "remote_addr", None) or request.headers.get("X-Forwarded-For", None)
# found_user.last_seen = str(system_arrow.now())
# response_data = {
# "access_token": access_key,
# "refresh_token": data.refresh_token,
# }
# return ResponseHandler.success("User is logged in successfully via refresher token", data=response_data)
# return ResponseHandler.not_found("Invalid data")
context_retriever = ContextRetrievers(func=cls.authentication_refresher_token)
return context_retriever.base
import arrow
from ApiLayers.ApiServices.Token.token_handler import TokenService
db = UsersTokens.new_session()
token_refresher: UsersTokens = UsersTokens.filter_by_one(
token=data.refresh_token,
domain=data.domain,
db=db,
).data
if not token_refresher:
return EndpointNotAcceptableResponse(
code="REFRESHER_NOT_FOUND", lang=cls.context_retriever.token.lang
).as_dict(
data={"refresh_token": data.refresh_token}
)
if found_user := Users.filter_one(Users.id == token_refresher.user_id, db=db).data:
request = cls.context_retriever.request
token_created = TokenService.set_access_token_to_redis(
request=request,
user=found_user,
domain=data.domain,
remember=True,
)
found_user.last_agent = request.headers.get("User-Agent", None)
found_user.last_platform = request.headers.get("Origin", None)
found_user.last_remote_addr = getattr(request, "remote_addr", None) or request.headers.get("X-Forwarded-For", None)
found_user.last_seen = str(arrow.now())
response_data = {
"access_token": token_created.get("access_token"),
"refresh_token": data.refresh_token,
}
return EndpointSuccessResponse(
code="TOKEN_REFRESH", lang=cls.context_retriever.token.lang
).as_dict(data=response_data)
@classmethod # Requires not auth context
def authentication_forgot_password(cls, data: Any):
"""Send an email to user for a valid password reset token"""
# found_user: Users = Users.check_user_exits(access_key=data.access_key, domain=data.domain)
# forgot_key = AuthActions.save_access_token_to_redis(request=request, found_user=found_user, domain=data.domain)
# forgot_link = ApiStatic.forgot_link(forgot_key=forgot_key)
# send_email_completed = send_email(
# subject=f"Dear {found_user.user_tag}, your forgot password link has been sent.",
# receivers=[str(found_user.email)],
# html=change_your_password_template(user_name=found_user.user_tag, forgot_link=forgot_link),
# )
# if not send_email_completed:
# raise HTTPException(status_code=400, detail="Email can not be sent. Try again later")
# found_user.password_token = forgot_key
# found_user.password_token_is_valid = str(system_arrow.shift(days=1))
# found_user.save()
# return ResponseHandler.success("Password is change link is sent to your email or phone", data={})
return
import arrow
from ApiLayers.ApiServices.Token.token_handler import TokenService
from ApiLayers.AllConfigs.Templates.password_templates import change_your_password_template
from Services.Email.send_email import email_sender
from config import ApiStatic
db = Users.new_session()
request = cls.context_retriever.request
found_user: Users = Users.check_user_exits(access_key=data.access_key, domain=data.domain)
forgot_key = TokenService._create_access_token(access=False)
forgot_link = ApiStatic.forgot_link(forgot_key=forgot_key)
send_email_completed = email_sender.send_email(
subject=f"Dear {found_user.user_tag}, your forgot password link has been sent.",
receivers=[str(found_user.email)],
html=change_your_password_template(user_name=found_user.user_tag, forgot_link=forgot_link),
)
if not send_email_completed:
return EndpointBadRequestResponse(
code="EMAIL_NOT_SENT", lang=cls.context_retriever.token.lang
).as_dict(
data={"email": found_user.email}
)
found_user.password_token = forgot_key
found_user.password_token_is_valid = str(arrow.now().shift(days=1))
found_user.save(db=db)
return EndpointSuccessResponse(
code="FORGOT_PASSWORD", lang=cls.context_retriever.token.lang
).as_dict(data={"user": found_user.get_dict(), "forgot_link": forgot_link, "token": forgot_key})
@classmethod # Requires not auth context
def authentication_reset_password(cls, data: Any):
"""Reset password with forgot password token"""
# from sqlalchemy import or_
# found_user = Users.query.filter(
# or_(
# Users.email == str(data.access_key).lower(),
# Users.phone_number == str(data.access_key).replace(" ", ""),
# ),
# ).first()
# if not found_user:
# raise HTTPException(
# status_code=status.HTTP_400_BAD_REQUEST,
# detail="Given access key or domain is not matching with the any user record.",
# )
# reset_password_token = found_user.reset_password_token(found_user=found_user)
# send_email_completed = send_email(
# subject=f"Dear {found_user.user_tag}, a password reset request has been received.",
# receivers=[str(found_user.email)],
# html=change_your_password_template(
# user_name=found_user.user_tag,
# forgot_link=ApiStatic.forgot_link(forgot_key=reset_password_token),
# ),
# )
# if not send_email_completed:
# raise found_user.raise_http_exception(status_code=400, message="Email can not be sent. Try again later")
# return ResponseHandler.success("Password change link is sent to your email or phone", data=found_user.get_dict())
return
return cls.context_retriever.base
@classmethod # Requires not auth context
def authentication_download_avatar(cls, data: Any):
"""Download avatar icon and profile info of user"""
# if found_user := Users.filter_one(Users.id == token_dict.user_id).data:
# expired_starts = str(system_arrow.now() - system_arrow.get(str(found_user.expiry_ends)))
# expired_int = (system_arrow.now() - system_arrow.get(str(found_user.expiry_ends))).days
# user_info = {
# "lang": token_dict.lang,
# "full_name": found_user.person.full_name,
# "avatar": found_user.avatar,
# "remember_me": found_user.remember_me,
# "expiry_ends": str(found_user.expiry_ends),
# "expired_str": expired_starts,
# "expired_int": int(expired_int),
# }
# return ResponseHandler.success("Avatar and profile is shared via user credentials", data=user_info)
# return ResponseHandler.not_found("Invalid data")
return
import arrow
db = Users.new_session()
if found_user := Users.filter_one(
Users.id == cls.context_retriever.token.user_id, db=db
).data:
expired_starts = str(arrow.now() - arrow.get(str(found_user.expiry_ends)))
expired_int = arrow.now() - arrow.get(str(found_user.expiry_ends)).days
user_info = {
"lang": cls.context_retriever.token.lang,
"full_name": found_user.person.full_name,
"avatar": found_user.avatar,
"remember_me": found_user.remember_me,
"expiry_ends": str(found_user.expiry_ends),
"expired_str": expired_starts,
"expired_int": int(expired_int),
}
return EndpointSuccessResponse(
code="USER_AVATAR", lang=cls.context_retriever.token.lang
).as_dict(data=user_info)
return EndpointNotAcceptableResponse(
code="USER_NOT_FOUND", lang=cls.context_retriever.token.lang
).as_dict(data={"user": found_user.get_dict()})

View File

@ -1,53 +1,19 @@
from pydantic import BaseModel
from ApiLayers.ApiValidations.Request import (
Login,
EmployeeSelection,
OccupantSelection,
CreatePassword,
ChangePassword,
Forgot,
)
class LoginSuperUserResponseModel(BaseModel):
pass
class SelectCompanyOrOccupantTypeSuperUserRequestModel(BaseModel):
pass
class SelectCompanyOrOccupantTypeSuperUserResponseModel(BaseModel):
pass
class EmployeeSelectionSuperUserRequestModel(BaseModel):
pass
class EmployeeSelectionSuperUserResponseModel(BaseModel):
pass
class OccupantSelectionSuperUserRequestModel(BaseModel):
pass
class OccupantSelectionSuperUserResponseModel(BaseModel):
pass
class AuthenticationRequestModels:
LoginSuperUserRequestModel = Login
SelectCompanyOrOccupantTypeSuperUserRequestModel = {
"EmployeeSelection": EmployeeSelection,
"OccupantSelection": OccupantSelection,
"EmployeeSelection": EmployeeSelection, "OccupantSelection": OccupantSelection,
}
EmployeeSelectionSuperUserRequestModel = EmployeeSelectionSuperUserRequestModel
OccupantSelectionSuperUserRequestModel = OccupantSelectionSuperUserRequestModel
class AuthenticationResponseModels:
LoginSuperUserResponseModel = LoginSuperUserResponseModel
SelectCompanyOrOccupantTypeSuperUserResponseModel = (
SelectCompanyOrOccupantTypeSuperUserResponseModel
)
EmployeeSelectionSuperUserResponseModel = EmployeeSelectionSuperUserResponseModel
OccupantSelectionSuperUserResponseModel = OccupantSelectionSuperUserResponseModel
pass

View File

@ -18,8 +18,7 @@ class PageInfo:
title: Dict[str, Any],
description: Dict[str, Any],
icon: str,
parent: str,
url: str,
parent: str
):
self.NAME = name
self.TITLE = title
@ -35,6 +34,7 @@ class Event:
REQUEST_VALIDATOR: Optional[Any]
DESCRIPTION: str
LANGUAGE_MODELS: list
RESPONSE_VALIDATOR_STATIC: str
EXTRA_OPTIONS: Optional[Dict[str, Any]] = None
endpoint_callable: Any
@ -44,6 +44,7 @@ class Event:
key: str | UUID,
description: str,
language_models: list[Dict[str, Dict]],
response_validation_static: str = None,
request_validator: Optional[Any] = None,
response_validator: Optional[Any] = None,
extra_options: Optional[Dict[str, Any]] = None,
@ -52,17 +53,29 @@ class Event:
self.KEY_ = key
self.REQUEST_VALIDATOR = request_validator
self.RESPONSE_VALIDATOR = response_validator
self.RESPONSE_VALIDATOR_STATIC = response_validation_static
self.LANGUAGE_MODELS = language_models
self.DESCRIPTION = description
self.EXTRA_OPTIONS = extra_options
@property
def request_headers(self):
return self.LANGUAGE_MODELS
def is_static_response(self):
return bool(self.RESPONSE_VALIDATOR_STATIC)
@property
def response_headers(self):
return self.LANGUAGE_MODELS
def static_response(self):
from Services.Redis.Actions.actions import RedisActions
from ApiLayers.AllConfigs.Redis.configs import RedisValidationKeysAction
if self.is_static_response:
static_response = RedisActions.get_json(
list_keys=[
f"{RedisValidationKeysAction.static_response_key}:{self.RESPONSE_VALIDATOR_STATIC}"
]
)
if static_response.status:
return static_response.first
return None
@property
def description(self):

View File

@ -1,7 +1,7 @@
from redmail import EmailSender
from AllConfigs.Email.configs import EmailConfig
from AllConfigs.Email.email_send_model import EmailSendModel
from ApiLayers.AllConfigs.Email.configs import EmailConfig
from ApiLayers.AllConfigs.Email.email_send_model import EmailSendModel
email_sender = EmailSender(**EmailConfig.as_dict())

View File

@ -6,7 +6,9 @@ including pagination, ordering, and complex query building.
"""
from __future__ import annotations
from typing import Any, TypeVar, Type
from typing import Any, TypeVar, Type, Union
from sqlalchemy import ColumnExpressionArgument
from sqlalchemy.orm import Query, Session
from sqlalchemy.sql.elements import BinaryExpression
@ -103,10 +105,8 @@ class QueryModel(ArgumentModel):
@classmethod
def filter_one(
cls: Type[T],
*args: Any,
*args: Union[BinaryExpression, ColumnExpressionArgument],
db: Session,
system: bool = False,
expired: bool = False,
) -> PostgresResponse:
"""
Filter single record by expressions.
@ -120,10 +120,19 @@ class QueryModel(ArgumentModel):
Returns:
Query response with single record
"""
if not system:
args = cls.get_active_and_confirmed_query_arg(args)
if not expired:
args = cls.get_not_expired_query_arg(args)
args = cls.get_active_and_confirmed_query_arg(args)
args = cls.get_not_expired_query_arg(args)
query = cls._query(db=db).filter(*args)
return PostgresResponse(
pre_query=cls._query(db=db), query=query, is_array=False
)
@classmethod
def filter_one_system(
cls,
*args: Union[BinaryExpression, ColumnExpressionArgument],
db: Session,
):
query = cls._query(db=db).filter(*args)
return PostgresResponse(
pre_query=cls._query(db=db), query=query, is_array=False
@ -131,7 +140,7 @@ class QueryModel(ArgumentModel):
@classmethod
def filter_all_system(
cls: Type[T], *args: BinaryExpression, db: Session
cls: Type[T], *args: Union[BinaryExpression, ColumnExpressionArgument], db: Session
) -> PostgresResponse:
"""
Filter multiple records by expressions without status filtering.
@ -143,12 +152,13 @@ class QueryModel(ArgumentModel):
Returns:
Query response with matching records
"""
query = cls._query(db)
query = query.filter(*args)
return PostgresResponse(pre_query=cls._query(db), query=query, is_array=True)
@classmethod
def filter_all(cls: Type[T], *args: Any, db: Session) -> PostgresResponse:
def filter_all(cls: Type[T], *args: Union[BinaryExpression, ColumnExpressionArgument], db: Session) -> PostgresResponse:
"""
Filter multiple records by expressions.