user selection compoleted tested

This commit is contained in:
2025-06-15 19:05:48 +03:00
parent a48e560ece
commit 9fb517fa15
24 changed files with 574 additions and 313 deletions

View File

@@ -6,11 +6,13 @@ from .supers_events import (
SuperAccountRecordsUpdateEvent,
SuperAccountRecordsDeleteEvent,
)
from .flat_representative import FlatRepresentativeAccountRecordsListEvent
AccountRecordsRouterCluster = RouterCluster(name="AccountRecordsRouterCluster")
AccountRecordsListEventCluster = EventCluster(name="AccountRecordsListEventCluster", endpoint_uu_id=endpoints_index["AccountRecordsList"])
AccountRecordsListEventCluster.add_event(SuperAccountRecordsListEvent)
AccountRecordsListEventCluster.add_event(FlatRepresentativeAccountRecordsListEvent)
AccountRecordsCreateEventCluster = EventCluster(name="AccountRecordsCreateEventCluster", endpoint_uu_id=endpoints_index["AccountRecordsCreate"])
AccountRecordsCreateEventCluster.add_event(SuperAccountRecordsCreateEvent)

View File

@@ -0,0 +1,60 @@
from typing import Any
from Initializer.event_clusters import Event
from Validations.response import (
PaginateOnly,
Pagination,
PaginationResult,
PostgresResponseSingle,
PostgresResponse,
EndpointResponse
)
from Validations.defaults.validations import CommonHeaders
from Schemas import AccountRecords
from Extensions.Middlewares.token_provider import TokenProvider
# List all account records endpoint Flat Representative
FlatRepresentativeAccountRecordsListEvent = Event(
name="flat_representative_account_records_list",
key="7cb248d3-d75a-4d28-8be5-f147089ce654",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="Flat Representative Account Records List all flat representative users endpoint",
)
class FlatRepresentativeAccountRecordsListPydantic:
uu_id: str
bank_date: datetime.datetime
currency_value: float
process_name: str
def flat_representative_account_records_list_callable(list_options: PaginateOnly, header: CommonHeaders):
list_options = PaginateOnly(**list_options.model_dump())
token_object = TokenProvider.get_dict_from_redis(token=header.token)
if not token_object.is_occupant:
raise Exception("Forbidden for employees")
pagination_query = {}
if list_options.query:
pagination_query = FlatRepresentativeAccountRecordsListPydantic(**list_options.query)
with AccountRecords.new_session() as db_session:
AccountRecords.set_session(db_session)
if pagination_query:
account_records_list = AccountRecords.query.filter(
*AccountRecords.convert(pagination_query),
AccountRecords.build_parts_id == token_object.selected_occupant.build_id
)
else:
account_records_list = AccountRecords.query.filter(
AccountRecords.build_parts_id == token_object.selected_occupant.build_id
)
pagination = Pagination(data=account_records_list)
pagination.change(**list_options.model_dump())
pagination_result = PaginationResult(data=account_records_list, pagination=pagination, response_model=FlatRepresentativeAccountRecordsListPydantic)
return EndpointResponse(message="MSG0003-LIST", pagination_result=pagination_result).response
FlatRepresentativeAccountRecordsListEvent.event_callable = flat_representative_account_records_list_callable

View File

@@ -7,8 +7,7 @@ from config import api_config
from validations.request.auth.validations import (
RequestLogin,
RequestResetPassword,
RequestSelectLiving,
RequestSelectEmployee,
RequestSelect,
RequestCreatePassword,
RequestChangePassword,
RequestForgotPasswordPhone,
@@ -44,7 +43,7 @@ auth_route_select_living = "AuthSelectLiving"
description="Selection of users company or occupant type",
operation_id=endpoints_index[auth_route_select_living]
)
def select_living(data: Union[RequestSelectLiving, RequestSelectEmployee], headers: CommonHeaders = Depends(CommonHeaders.as_dependency)):
def select_living(data: Union[RequestSelect], headers: CommonHeaders = Depends(CommonHeaders.as_dependency)):
"""Select token object company or occupant type"""
token_object = TokenProvider.get_dict_from_redis(token=headers.token)
return AuthHandlers.LoginHandler.authentication_select_company_or_occupant_type(request=headers.request, data=data)

View File

@@ -5,8 +5,9 @@ from config import api_config
from Schemas import (
Users,
People,
BuildLivingSpace,
Build,
BuildParts,
BuildLivingSpace,
OccupantTypes,
Employees,
Addresses,
@@ -78,6 +79,8 @@ class LoginHandler:
user_handler, other_domains_list, main_domain = UserHandlers(), [], ""
found_user = user_handler.check_user_exists(access_key=data.access_key, db_session=db_session)
redis_handler = RedisHandlers()
with mongo_handler.collection(f"{str(found_user.related_company)}*Domain") as collection:
result = collection.find_one({"user_uu_id": str(found_user.uu_id)})
if not result:
@@ -133,15 +136,13 @@ class LoginHandler:
person_uu_id=str(list_employees[0]["People.uu_id"]), request=dict(headers.request.headers), domain_list=other_domains_list,
companies_uu_id_list=companies_uu_id_list, companies_id_list=companies_id_list, duty_uu_id_list=duty_uu_id_list, duty_id_list=duty_id_list,
).model_dump()
employee_uuid = str(companies_list[0]["uu_id"])
print('employee_uuid', employee_uuid)
set_to_redis_dict = dict(
user=found_user, token=model_value, add_uuid=employee_uuid,
user=found_user, token=model_value,
header_info=dict(language=headers.language, domain=headers.domain, timezone=headers.timezone),
)
user_dict = found_user.get_dict()
person_dict = found_user.person.get_dict()
if access_token := RedisHandlers().set_object_to_redis(**set_to_redis_dict):
if access_token := redis_handler.set_object_to_redis(**set_to_redis_dict):
return {
"access_token": access_token,
"user_type": UserType.employee.name,
@@ -166,12 +167,15 @@ class LoginHandler:
language = headers.request.headers.get("language", "tr")
domain = headers.request.headers.get("domain", None)
timezone = headers.request.headers.get("tz", None) or "GMT+3"
BuildLivingSpace.set_session(db_session)
BuildParts.set_session(db_session)
OccupantTypes.set_session(db_session)
user_handler = UserHandlers()
found_user = user_handler.check_user_exists(access_key=data.access_key, db_session=db_session)
other_domains_list, main_domain = [], ""
redis_handler = RedisHandlers()
with mongo_handler.collection(f"{str(found_user.related_company)}*Domain") as collection:
result = collection.find_one({"user_uu_id": str(found_user.uu_id)})
if not result:
@@ -189,6 +193,10 @@ class LoginHandler:
if not living_spaces:
raise ValueError("EYS_0006")
user_dict = found_user.get_dict()
person_dict = found_user.person.get_dict()
for living_space in living_spaces:
build_part = BuildParts.query.filter(BuildParts.id == living_space.build_parts_id).first()
if not build_part:
@@ -197,28 +205,40 @@ class LoginHandler:
build = build_part.buildings
occupant_type = OccupantTypes.query.filter(OccupantTypes.id == living_space.occupant_type_id).first()
occupant_data = {
"build_living_space_uu_id": str(living_space.uu_id), "part_uu_id": str(build_part.uu_id), "part_name": build_part.part_name(), "part_level": build_part.part_level,
"occupant_uu_id": str(occupant_type.uu_id), "description": occupant_type.occupant_description, "code": occupant_type.occupant_code,
"build_living_space_uu_id": str(living_space.uu_id), "part_uu_id": str(build_part.uu_id), "part_name": build_part.part_name(db=db_session),
"part_level": build_part.part_level, "occupant_uu_id": str(occupant_type.uu_id), "description": occupant_type.occupant_description,
"code": occupant_type.occupant_code,
}
build_key = str(build.uu_id)
if build_key not in occupants_selection_dict:
occupants_selection_dict[build_key] = {"build_uu_id": build_key, "build_name": build.build_name, "build_no": build.build_no, "occupants": [occupant_data],}
occupants_selection_dict[build_key] = {"build_uu_id": build_key, "build_name": build.build_name, "build_no": build.build_no, "occupants": [occupant_data]}
else:
occupants_selection_dict[build_key]["occupants"].append(occupant_data)
person = found_user.person
model_value = OccupantTokenObject(
user_type=UserType.occupant.value, user_uu_id=str(found_user.uu_id), user_id=found_user.id, person_id=person.id,
person_uu_id=str(person.uu_id), domain_list=other_domains_list, request=dict(request.headers), available_occupants=occupants_selection_dict,
person_uu_id=str(person.uu_id), domain_list=other_domains_list, request=dict(headers.request.headers), available_occupants=occupants_selection_dict,
).model_dump()
redis_handler = RedisHandlers()
if access_token := redis_handler.set_object_to_redis(
user=found_user, token=model_value, add_uuid=living_space.uu_id,
header_info=dict(language=language, domain=domain, timezone=timezone)
):
return {"access_token": access_token, "user_type": UserType.occupant.name, "selection_list": occupants_selection_dict}
raise ValueError("Something went wrong")
set_redis_dict = dict(user=found_user, token=model_value, header_info=dict(language=language, domain=domain, timezone=timezone))
if access_token := redis_handler.set_object_to_redis(**set_redis_dict):
return {
"access_token": access_token,
"user_type": UserType.occupant.name,
"user": {
"uuid": user_dict["uu_id"], "avatar": user_dict["avatar"], "email": user_dict["email"], "phone_number": user_dict["phone_number"], "user_tag": user_dict["user_tag"],
"password_expiry_begins": str(arrow.get(user_dict["password_expiry_begins"]).shift(days=int(user_dict["password_expires_day"]))),
"person": {
"uuid": person_dict["uu_id"], "firstname": person_dict["firstname"], "surname": person_dict["surname"],
"middle_name": person_dict["middle_name"], "sex_code": person_dict["sex_code"], "person_tag": person_dict["person_tag"],
"country_code": person_dict["country_code"], "birth_date": person_dict["birth_date"],
},
},
"selection_list": occupants_selection_dict,
}
raise ValueError("Something went wrong")
@classmethod
def authentication_login_with_domain_and_creds(cls, headers: CommonHeaders, data: Any):
@@ -264,10 +284,12 @@ class LoginHandler:
with Users.new_session() as db_session:
if data.uuid not in token_dict.companies_uu_id_list:
ValueError("EYS_0011")
list_of_returns = (
Employees.id, Employees.uu_id, People.id, People.uu_id, Users.id, Users.uu_id, Companies.id, Companies.uu_id,
Departments.id, Departments.uu_id, Duty.id, Duty.uu_id, Addresses.id, Addresses.letter_address, Staff.id, Staff.uu_id,
Duties.id, Duties.uu_id,
Employees.id, Employees.uu_id, People.id, People.uu_id, Users.id, Users.uu_id, Companies.id, Companies.uu_id, Companies.public_name, Companies.company_type,
Departments.id, Departments.uu_id, Duty.id, Duty.uu_id, Addresses.id, Addresses.letter_address, Staff.id, Staff.uu_id, Duty.duty_name,
Duties.id, Duties.uu_id, BuildParts.id, BuildParts.uu_id, BuildParts.part_name, BuildParts.part_level, BuildParts.part_code,
OccupantTypes.id, OccupantTypes.uu_id, OccupantTypes.occupant_type, OccupantTypes.occupant_description, Addresses.letter_address
)
selected_company_query = db_session.query(*list_of_returns
@@ -298,82 +320,76 @@ class LoginHandler:
filter_endpoints_and_events = db_session.query(EndpointRestriction.operation_uu_id, Events.function_code
).join(EndpointRestriction, EndpointRestriction.id == Events.endpoint_id).filter().all()
reachable_event_codes = {endpoint_name: function_code for endpoint_name, function_code in filter_endpoints_and_events}
# Get reachable applications
reachable_app_codes = Application2Employee.get_application_codes(employee_id=int(result_with_keys_dict['Employees.id']), db=db_session)
company_token = CompanyToken(
company_uu_id=str(result_with_keys_dict['Companies.uu_id']),
company_id=int(result_with_keys_dict['Companies.id']),
department_id=int(result_with_keys_dict['Departments.id']),
department_uu_id=str(result_with_keys_dict['Departments.uu_id']),
duty_id=int(result_with_keys_dict['Duty.id']),
duty_uu_id=str(result_with_keys_dict['Duty.uu_id']),
bulk_duties_id=int(result_with_keys_dict['Duties.id']),
staff_id=int(result_with_keys_dict['Staff.id']),
staff_uu_id=str(result_with_keys_dict['Staff.uu_id']),
employee_id=int(result_with_keys_dict['Employees.id']),
employee_uu_id=str(result_with_keys_dict['Employees.uu_id']),
reachable_event_codes=reachable_event_codes,
reachable_app_codes=reachable_app_codes,
)
add_company = {
"uu_id": str(result_with_keys_dict["Employees.uu_id"]),
"public_name": result_with_keys_dict["Companies.public_name"],
"company_type": result_with_keys_dict["Companies.company_type"],
"company_address": result_with_keys_dict["Addresses.letter_address"],
"duty": result_with_keys_dict["Duty.duty_name"],
"company_uuid": str(result_with_keys_dict["Companies.uu_id"])
}
redis_handler = RedisHandlers()
redis_result = redis_handler.update_token_at_redis(
token=access_token, add_payload=company_token, add_uuid=str(result_with_keys_dict['Employees.uu_id'])
)
user_uu_id = Users.query.filter(Users.person_id == result_with_keys_dict['People.id']).first().uu_id
redis_result = redis_handler.update_token_at_redis(token=access_token, add_payload=add_company, user_uuid=user_uu_id)
return {"selected_uu_id": data.uuid}
@classmethod
def handle_occupant_selection(cls, access_token: str, data: Any, token_dict: TokenDictType):
"""Handle occupant type selection"""
with BuildLivingSpace.new_session() as db:
# Get selected occupant type
selected_build_living_space: BuildLivingSpace = BuildLivingSpace.filter_one(BuildLivingSpace.uu_id == data.build_living_space_uu_id, db=db).data
if not selected_build_living_space:
BuildLivingSpace.set_session(db)
BuildParts.set_session(db)
Users.set_session(db)
list_of_returns = (
BuildLivingSpace.id, BuildLivingSpace.uu_id, OccupantTypes.id, OccupantTypes.uu_id, OccupantTypes.occupant_type, BuildParts.id, BuildParts.uu_id, Build.id, Build.uu_id,
BuildParts.id, BuildParts.part_level, OccupantTypes.occupant_description, OccupantTypes.occupant_code, People.id, People.uu_id
)
selected_build_living_space_query = db.query(*list_of_returns
).join(OccupantTypes, OccupantTypes.id == BuildLivingSpace.occupant_type_id
).join(BuildParts, BuildParts.id == BuildLivingSpace.build_parts_id
).join(Build, Build.id == BuildParts.build_id
).join(People, People.id == BuildLivingSpace.person_id
).filter(BuildLivingSpace.uu_id == data.uuid)
selected_build_living_space_first = selected_build_living_space_query.first()
if not selected_build_living_space_first:
raise ValueError("EYS_0012")
result_with_keys_dict = {}
for ix, selected_build_living_space_item in enumerate(selected_build_living_space_first):
result_with_keys_dict[str(list_of_returns[ix])] = selected_build_living_space_item
# Get reachable events
reachable_event_codes = Event2Occupant.get_event_codes(build_living_space_id=selected_build_living_space.id, db=db)
occupant_type = OccupantTypes.filter_one_system(OccupantTypes.id == selected_build_living_space.occupant_type_id, db=db).data
build_part = BuildParts.filter_one(BuildParts.id == selected_build_living_space.build_parts_id, db=db)
build = build_part.buildings
reachable_app_codes = Application2Occupant.get_application_codes(build_living_space_id=selected_build_living_space.id, db=db)
# responsible_employee = Employees.filter_one(
# Employees.id == build_part.responsible_employee_id,
# db=db,
# ).data
# related_company = RelationshipEmployee2Build.filter_one(
# RelationshipEmployee2Build.member_id == build.id,
# db=db,
# ).data
# Get company
# company_related = Companies.filter_one(
# Companies.id == related_company.company_id,
# db=db,
# ).data
filter_endpoints_and_events = db.query(EndpointRestriction.operation_uu_id, Events.function_code
).join(EndpointRestriction, EndpointRestriction.id == Events.endpoint_id).filter().all()
reachable_event_codes = {endpoint_name: function_code for endpoint_name, function_code in filter_endpoints_and_events}
# Create occupant token
occupant_token = OccupantToken(
living_space_id=selected_build_living_space.id,
living_space_uu_id=selected_build_living_space.uu_id.__str__(),
occupant_type_id=occupant_type.id,
occupant_type_uu_id=occupant_type.uu_id.__str__(),
occupant_type=occupant_type.occupant_type,
build_id=build.id,
build_uuid=build.uu_id.__str__(),
build_part_id=build_part.id,
build_part_uuid=build_part.uu_id.__str__(),
# responsible_employee_id=responsible_employee.id,
# responsible_employee_uuid=responsible_employee.uu_id.__str__(),
# responsible_company_id=company_related.id,
# responsible_company_uuid=company_related.uu_id.__str__(),
reachable_event_codes=reachable_event_codes,
reachable_app_codes=reachable_app_codes,
)
# Get reachable applications
reachable_app_codes = Application2Occupant.get_application_codes(build_living_space_id=selected_build_living_space_first.id, db=db)
build_part = BuildParts.query.filter(BuildParts.id == result_with_keys_dict['BuildParts.id']).first()
if not build_part:
raise ValueError("EYS_0013")
add_build_living_space = {
"build_living_space_uu_id": str(data.uuid),
"part_uu_id": str(result_with_keys_dict['BuildParts.uu_id']),
"part_name": build_part.part_name(db=db),
"part_level": result_with_keys_dict['BuildParts.part_level'],
"occupant_uu_id": str(result_with_keys_dict['OccupantTypes.uu_id']),
"description": result_with_keys_dict['OccupantTypes.occupant_description'],
"code": result_with_keys_dict['OccupantTypes.occupant_code']
}
redis_handler = RedisHandlers()
redis_handler.update_token_at_redis(
token=access_token, add_payload=occupant_token, add_uuid=occupant_token.living_space_uu_id
)
return {"selected_uu_id": occupant_token.living_space_uu_id}
user_uu_id = Users.query.filter(Users.person_id == result_with_keys_dict['People.id']).first().uu_id
redis_handler.update_token_at_redis(token=access_token, add_payload=add_build_living_space, user_uuid=user_uu_id)
return {"selected_uu_id": data.uuid}
@classmethod
def authentication_select_company_or_occupant_type(cls, request: Any, data: Any):
@@ -469,3 +485,30 @@ class AuthHandlers:
LoginHandler: LoginHandler = LoginHandler()
PasswordHandler: PasswordHandler = PasswordHandler()
# Get selected occupant type
# selected_build_living_space: BuildLivingSpace = BuildLivingSpace.filter_one(BuildLivingSpace.uu_id == data.build_living_space_uu_id, db=db).data
# if not selected_build_living_space:
# raise ValueError("EYS_0012")
# occupant_type = OccupantTypes.filter_one_system(OccupantTypes.id == selected_build_living_space.occupant_type_id, db=db).data
# build_part = BuildParts.filter_one(BuildParts.id == selected_build_living_space.build_parts_id, db=db)
# build = build_part.buildings
# reachable_app_codes = Application2Occupant.get_application_codes(build_living_space_id=selected_build_living_space.id, db=db)
# responsible_employee = Employees.filter_one(
# Employees.id == build_part.responsible_employee_id,
# db=db,
# ).data
# related_company = RelationshipEmployee2Build.filter_one(
# RelationshipEmployee2Build.member_id == build.id,
# db=db,
# ).data
# Get company
# company_related = Companies.filter_one(
# Companies.id == related_company.company_id,
# db=db,
# ).data
# Get reachable events
# reachable_event_codes = Event2Occupant.get_event_codes(build_living_space_id=selected_build_living_space.id, db=db)

View File

@@ -14,7 +14,7 @@ class RequestVerifyOTP(BaseModel):
otp: str
class RequestSelectEmployee(BaseModel):
class RequestSelect(BaseModel):
uuid: str
@@ -23,10 +23,6 @@ class RequestResetPassword(BaseModel):
password: str
re_password: str
class RequestSelectLiving(BaseModel):
uuid: str
class RequestCreatePassword(BaseModel):
password_token: str
password: str

View File

@@ -0,0 +1,58 @@
from typing import Any
from Initializer.event_clusters import Event
from Validations.response import (
PaginateOnly,
Pagination,
PaginationResult,
PostgresResponseSingle,
PostgresResponse,
EndpointResponse
)
from Validations.defaults.validations import CommonHeaders
from Schemas import People
from Extensions.Middlewares.token_provider import TokenProvider
# List all people endpoint Flat Tenant
FlatTenantPeopleListEvent = Event(
name="flat_tenant_people_list",
key="7cb248d3-d75a-4d28-8be5-f147089ce654",
request_validator=None, # TODO: Add request validator
response_validator=None, # TODO: Add response validator
description="Flat Tenant People List all flat tenant users endpoint",
)
class FlatTenantPeopleListPydantic:
...
def flat_tenant_people_list_callable(list_options: PaginateOnly, header: CommonHeaders):
list_options = PaginateOnly(**list_options.model_dump())
token_object = TokenProvider.get_dict_from_redis(token=header.token)
if not token_object.is_occupant:
raise Exception("Forbidden for employees")
pagination_query = {}
if list_options.query:
pagination_query = FlatTenantPeopleListPydantic(**list_options.query)
with People.new_session() as db_session:
People.set_session(db_session)
if pagination_query:
people_list = People.query.filter(
*People.convert(pagination_query),
People.build_parts_id == token_object.selected_occupant.build_id
)
else:
people_list = People.query.filter(
People.build_parts_id == token_object.selected_occupant.build_id
)
pagination = Pagination(data=people_list)
pagination.change(**list_options.model_dump())
pagination_result = PaginationResult(data=people_list, pagination=pagination, response_model=FlatTenantPeopleListPydantic)
return EndpointResponse(message="MSG0003-LIST", pagination_result=pagination_result).response
FlatTenantPeopleListEvent.event_callable = flat_tenant_people_list_callable

View File

@@ -36,6 +36,7 @@ class TokenProvider:
"""
token_to_use, user_uu_id_to_use = token or "*", user_uu_id or "*"
list_of_token_dict, auth_key_list = [], [cls.AUTH_TOKEN, token_to_use, user_uu_id_to_use]
print("auth_key_list", auth_key_list)
if token:
result = RedisActions.get_json(list_keys=auth_key_list, limit=1)
if first_record := result.first:
@@ -56,7 +57,8 @@ class TokenProvider:
Retrieve token object from Redis using token and user_uu_id
"""
token_to_use, user_uu_id_to_use = token or "*", user_uu_id or "*"
list_of_token_dict, auth_key_list = [], [cls.AUTH_TOKEN, token_to_use, user_uu_id_to_use, "*"]
list_of_token_dict, auth_key_list = [], [cls.AUTH_TOKEN, token_to_use, user_uu_id_to_use]
print("auth_key_list", auth_key_list)
if token:
result = RedisActions.get_json(list_keys=auth_key_list, limit=1)
if first_record := result.first:

View File

@@ -33,7 +33,7 @@ class RedisHandlers:
@classmethod
def get_object_from_redis(cls, access_token: str) -> TokenDictType:
redis_response = RedisActions.get_json(list_keys=[cls.AUTH_TOKEN, access_token, "*", "*"])
redis_response = RedisActions.get_json(list_keys=[cls.AUTH_TOKEN, access_token, "*"])
if not redis_response.status:
raise ValueError("EYS_0001")
if redis_object := redis_response.first:
@@ -41,30 +41,27 @@ class RedisHandlers:
raise ValueError("EYS_0002")
@classmethod
def set_object_to_redis(cls, user: Users, token, header_info, add_uuid: str):
result_delete = RedisActions.delete(list_keys=[cls.AUTH_TOKEN, "*", str(user.uu_id), add_uuid])
def set_object_to_redis(cls, user: Users, token, header_info):
result_delete = RedisActions.delete(list_keys=[cls.AUTH_TOKEN, "*", str(user.uu_id)])
generated_access_token = PasswordModule.generate_access_token()
keys = [cls.AUTH_TOKEN, generated_access_token, str(user.uu_id)]
if add_uuid:
keys.append(add_uuid)
RedisActions.set_json(list_keys=keys, value={**token, **header_info}, expires={"hours": 1, "minutes": 30})
return generated_access_token
RedisActions.set_json(list_keys=keys, value={**token, **header_info}, expires={"hours": 1, "minutes": 30})
return generated_access_token
@classmethod
def update_token_at_redis(cls, token: str, add_payload: Union[CompanyToken, OccupantToken], add_uuid: str):
if already_token_data := RedisActions.get_json(list_keys=[cls.AUTH_TOKEN, token, '*', add_uuid]).first:
def update_token_at_redis(cls, token: str, add_payload: Union[dict], user_uuid: str):
list_keys = [cls.AUTH_TOKEN, token, "*"]
print("user_uuid", user_uuid, "add_payload", add_payload)
if not user_uuid:
raise ValueError("User UUID not found")
if already_token_data := RedisActions.get_json(list_keys=list_keys).first:
already_token = cls.process_redis_object(already_token_data)
if already_token.is_employee and isinstance(add_payload, CompanyToken):
if already_token.is_employee:
already_token.selected_company = add_payload
list_keys = [cls.AUTH_TOKEN, token, str(already_token.user_uu_id), str(add_uuid)]
print('is_employee: ', list_keys)
elif already_token.is_occupant and isinstance(add_payload, OccupantToken):
elif already_token.is_occupant:
already_token.selected_occupant = add_payload
list_keys = [cls.AUTH_TOKEN, token, str(already_token.user_uu_id), str(add_uuid)]
print('is_occupant: ', list_keys)
list_keys = [cls.AUTH_TOKEN, token, str(user_uuid)]
print("already_token", already_token)
result = RedisActions.set_json(list_keys=list_keys, value=already_token.model_dump(), expires={"hours": 1, "minutes": 30})
RedisActions.delete(list_keys=[cls.AUTH_TOKEN, token, str(already_token.user_uu_id)])
return result.first
raise ValueError("Something went wrong")

View File

@@ -238,9 +238,14 @@ class Build(CrudCollection):
building_types = None
for part in self.parts:
building_types = {}
build_type = BuildTypes.filter_by_one(
system=True, id=part.build_part_type_id, db=db_session
).data
build_type = BuildTypes.query.filter(
BuildTypes.id == part.build_part_type_id
).first()
if not build_type:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="There is no building type in this building.",
)
if build_type.type_code in building_types:
building_types[build_type.type_code]["list"].append(part.part_no)
else:
@@ -327,9 +332,10 @@ class BuildParts(CrudCollection):
)
def part_name(self, db):
if build_type := BuildTypes.filter_by_one(
system=True, id=self.part_type_id, db=db
).data:
BuildTypes.set_session(db)
if build_type := BuildTypes.query.filter(
BuildTypes.id == self.part_type_id
).first():
return f"{str(build_type.type_name).upper()} : {str(self.part_no).upper()}"
return f"Undefined:{str(build_type.type_name).upper()}"

View File

@@ -415,7 +415,8 @@ class Application2Employee(CrudCollection):
applications = Applications.query.filter(Applications.id.in_([application.application_id for application in active_applications])).all()
if extra_applications := Application2EmployeeExtra.query.filter(Application2EmployeeExtra.employee_id == employee_id).all():
applications_extra = Applications.query.filter(Applications.id.in_([application.application_id for application in extra_applications])).all()
applications.extend(applications_extra)
if applications_extra:
applications.extend(applications_extra)
applications_dict = {}
for application in applications:
if not application.site_url in applications_dict:
@@ -466,13 +467,6 @@ class Application2Occupant(CrudCollection):
applications_extra = Applications.query.filter(Applications.id.in_([application.application_id for application in extra_applications])).all()
applications.extend(applications_extra)
applications_dict = {}
for application in applications:
if not application.site_url in applications_dict:
applications_dict[str(application.site_url)] = str(application.application_code)
else:
ValueError("Duplicate application code found for single endpoint")
applications.extend(applications_extra)
applications_dict = {}
for application in applications:
if not application.site_url in applications_dict:
applications_dict[str(application.site_url)] = str(application.application_code)

View File

@@ -89,7 +89,7 @@ class OccupantTokenObject(ApplicationToken):
# Occupant Token Object -> Requires selection of the occupant type for a specific build part
available_occupants: dict = None
selected_occupant: Optional[OccupantToken] = None # Selected Occupant Type
selected_occupant: Optional[dict] = None # Selected Occupant Type
@property
def is_employee(self) -> bool:
@@ -109,7 +109,7 @@ class EmployeeTokenObject(ApplicationToken):
duty_id_list: list[int] # List of duty objects
duty_uu_id_list: list[str] # List of duty objects
selected_company: Optional[CompanyToken] = None # Selected Company Object
selected_company: Optional[dict] = None # Selected Company Object
@property
def is_employee(self) -> bool: