auth api tested

This commit is contained in:
2025-03-25 19:01:46 +03:00
parent db2cde2f5d
commit 637edfadd4
65 changed files with 774 additions and 673 deletions

View File

@@ -30,8 +30,9 @@ from ApiLayers.Schemas import (
Departments,
OccupantTypes,
)
from Services.Redis.Models.response import RedisResponse
from Services.Redis import RedisActions, AccessToken
from Services.RedisService.Models.response import RedisResponse
from Services.RedisService.Actions.actions import RedisActions
from Services.RedisService.Models.access import AccessToken
if TYPE_CHECKING:
@@ -63,78 +64,78 @@ class TokenService:
"""Handle employee login process and return login information."""
from ApiLayers.Schemas.identity.identity import UsersTokens, People
db_session = Employees.new_session()
list_employee = Employees.filter_all(
Employees.people_id == user.person_id, db=db_session
).data
companies_uu_id_list: List[str] = []
companies_id_list: List[int] = []
companies_list: List[Dict[str, Any]] = []
duty_uu_id_list: List[str] = []
duty_id_list: List[int] = []
for employee in list_employee:
staff = Staff.filter_one(Staff.id == employee.staff_id, db=db_session).data
if duties := Duties.filter_one(
Duties.id == staff.duties_id, db=db_session
).data:
if duty_found := Duty.filter_by_one(
id=duties.duties_id, db=db_session
).data:
duty_uu_id_list.append(str(duty_found.uu_id))
duty_id_list.append(duty_found.id)
department = Departments.filter_one(
Departments.id == duties.department_id, db=db_session
with Employees.new_session() as db_session:
list_employee = Employees.filter_all(
Employees.people_id == user.person_id, db=db_session
).data
if company := Companies.filter_one(
Companies.id == department.company_id, db=db_session
).data:
companies_uu_id_list.append(str(company.uu_id))
companies_id_list.append(company.id)
company_address = Addresses.filter_by_one(
id=company.official_address_id, db=db_session
companies_uu_id_list: List[str] = []
companies_id_list: List[int] = []
companies_list: List[Dict[str, Any]] = []
duty_uu_id_list: List[str] = []
duty_id_list: List[int] = []
for employee in list_employee:
staff = Staff.filter_one(Staff.id == employee.staff_id, db=db_session).data
if duties := Duties.filter_one(
Duties.id == staff.duties_id, db=db_session
).data:
if duty_found := Duty.filter_by_one(
id=duties.duties_id, db=db_session
).data:
duty_uu_id_list.append(str(duty_found.uu_id))
duty_id_list.append(duty_found.id)
department = Departments.filter_one(
Departments.id == duties.department_id, db=db_session
).data
companies_list.append(
{
"uu_id": str(company.uu_id),
"public_name": company.public_name,
"company_type": company.company_type,
"company_address": company_address,
}
)
person = People.filter_one(People.id == user.person_id, db=db_session).data
model_value = EmployeeTokenObject(
domain=domain,
user_type=UserType.employee.value,
user_uu_id=str(user.uu_id),
credentials=user.credentials(),
user_id=user.id,
person_id=person.id,
person_uu_id=str(person.uu_id),
full_name=person.full_name,
request=dict(request.headers),
companies_uu_id_list=companies_uu_id_list,
companies_id_list=companies_id_list,
duty_uu_id_list=duty_uu_id_list,
duty_id_list=duty_id_list,
timezone=user.local_timezone or "GMT+0",
lang="tr",
).model_dump()
if access_token := cls.set_object_to_redis(user, model_value):
return {
"access_token": access_token,
"user_type": UserType.employee.name,
"selection_list": companies_list,
}
raise HTTPExceptionApi(
error_code="",
lang="en",
loc=get_line_number_for_error(),
sys_msg="Creating Token failed...",
)
if company := Companies.filter_one(
Companies.id == department.company_id, db=db_session
).data:
companies_uu_id_list.append(str(company.uu_id))
companies_id_list.append(company.id)
company_address = Addresses.filter_by_one(
id=company.official_address_id, db=db_session
).data
companies_list.append(
{
"uu_id": str(company.uu_id),
"public_name": company.public_name,
"company_type": company.company_type,
"company_address": company_address,
}
)
person = People.filter_one(People.id == user.person_id, db=db_session).data
model_value = EmployeeTokenObject(
domain=domain,
user_type=UserType.employee.value,
user_uu_id=str(user.uu_id),
credentials=user.credentials(db_session=db_session),
user_id=user.id,
person_id=person.id,
person_uu_id=str(person.uu_id),
full_name=person.full_name,
request=dict(request.headers),
companies_uu_id_list=companies_uu_id_list,
companies_id_list=companies_id_list,
duty_uu_id_list=duty_uu_id_list,
duty_id_list=duty_id_list,
timezone=user.local_timezone or "GMT+0",
lang="tr",
).model_dump()
if access_token := cls.set_object_to_redis(user, model_value):
return {
"access_token": access_token,
"user_type": UserType.employee.name,
"selection_list": companies_list,
}
raise HTTPExceptionApi(
error_code="",
lang="en",
loc=get_line_number_for_error(),
sys_msg="Creating Token failed...",
)
@classmethod
def do_occupant_login(
@@ -285,14 +286,15 @@ class TokenService:
user: Users,
domain: str,
remember: bool,
db_session
) -> Dict[str, Any]:
"""Set access token to redis and handle user session."""
from ApiLayers.AllConfigs.Token.config import Auth
from ApiLayers.Schemas.identity.identity import UsersTokens, People
user_id, user_dict = user.id, user.get_dict()
cls.remove_token_with_domain(user=user, domain=domain)
# Users.client_arrow = DateTimeLocal(is_client=True, timezone=user.local_timezone)
login_dict, db_session = {}, UsersTokens.new_session()
login_dict = {}
if user.is_occupant: # Handle login based on user type
login_dict = cls.do_occupant_login(
request=request, user=user, domain=domain
@@ -309,7 +311,7 @@ class TokenService:
login_dict["refresh_token"] = users_token_created
users_token = UsersTokens.find_or_create(
db=db_session,
user_id=user.id,
user_id=user_id,
token_type="RememberMe",
domain=domain,
)
@@ -329,15 +331,15 @@ class TokenService:
login_dict["refresh_token"] = users_token.token
else:
already_refresher = UsersTokens.filter_all(
UsersTokens.user_id == user.id,
UsersTokens.user_id == user_id,
UsersTokens.token_type == "RememberMe",
UsersTokens.domain == domain,
db=db_session,
)
if already_refresher.count:
already_refresher.query.delete(synchronize_session=False)
already_refresher.core_query.delete(synchronize_session=False)
user.save(db=db_session)
return {**login_dict, "user": user.get_dict()}
return {**login_dict, "user": user_dict}
@classmethod
def update_token_at_redis(