From 1c0bb741a096b3a5e95816e68fe09c74dfd8e9a0 Mon Sep 17 00:00:00 2001 From: Berkay Date: Tue, 3 Jun 2025 21:43:39 +0300 Subject: [PATCH] updated services api --- ServicesApi/Builds/Auth/events/auth/events.py | 7 +- .../Builds/Building/events/areas/cluster.py | 2 +- .../Building/events/areas/supers_events.py | 8 +- .../Building/events/building_parts/cluster.py | 2 +- .../events/building_parts/supers_events.py | 8 +- .../Builds/Building/events/builds/cluster.py | 2 +- .../Building/events/builds/supers_events.py | 8 +- .../Building/events/living_space/cluster.py | 2 +- .../events/living_space/supers_events.py | 8 +- ServicesApi/Builds/Identity/Dockerfile | 2 +- .../Builds/Identity/events/people/cluster.py | 2 +- .../Identity/events/people/supers_events.py | 8 +- .../Builds/Identity/events/user/cluster.py | 2 +- .../Identity/events/user/supers_events.py | 8 +- ServicesApi/Builds/Initial/alembic/env.py | 4 +- ServicesApi/Builds/Initial/init_address.py | 2 +- .../Builds/Initial/init_app_defaults.py | 6 +- ServicesApi/Builds/Initial/init_enums.py | 2 +- .../Builds/Initial/init_occ_defaults.py | 6 +- .../Builds/Initial/init_occupant_types.py | 2 +- ServicesApi/Builds/Initial/init_services.py | 2 +- ServicesApi/Builds/Management/Dockerfile | 2 +- .../Builds/Management/endpoints/routes.py | 4 +- .../Management/endpoints/services/router.py | 8 +- .../Management/endpoints/tests/router.py | 2 +- .../Management/events/application/cluster.py | 2 +- .../events/application/supers_events.py | 6 +- .../Management/events/events/cluster.py | 2 +- .../Management/events/events/supers_events.py | 6 +- .../Management/events/services/cluster.py | 2 +- .../events/services/supers_events.py | 8 +- ServicesApi/Builds/Restriction/Dockerfile | 2 +- .../Restriction/endpoints/pages/router.py | 3 +- .../Builds/Restriction/events/pages/events.py | 2 +- .../Builds/TestApi/endpoints/tester/router.py | 1 + .../Controllers/Redis/Broadcast/README.md | 67 ++++ .../Controllers/Redis/Broadcast/actions.py | 248 +++++++++++++ .../Redis/Broadcast/implementations.py | 205 +++++++++++ ServicesApi/Controllers/Redis/README.md | 85 +++++ ServicesApi/Controllers/Redis/base.py | 328 +++++++++++++++++ ServicesApi/Controllers/Redis/config.py | 25 ++ ServicesApi/Controllers/Redis/connection.py | 215 +++++++++++ ServicesApi/Controllers/Redis/database.py | 340 ++++++++++++++++++ .../Controllers/Redis/implementations.py | 281 +++++++++++++++ ServicesApi/Controllers/Redis/response.py | 194 ++++++++++ .../Extensions/Middlewares/token_provider.py | 4 +- .../Extensions/OnMemory/redis_handlers.py | 70 ++++ ServicesApi/Initializer/event_clusters.py | 2 +- docker-compose.yml | 118 +++--- 49 files changed, 2192 insertions(+), 133 deletions(-) create mode 100644 ServicesApi/Controllers/Redis/Broadcast/README.md create mode 100644 ServicesApi/Controllers/Redis/Broadcast/actions.py create mode 100644 ServicesApi/Controllers/Redis/Broadcast/implementations.py create mode 100644 ServicesApi/Controllers/Redis/README.md create mode 100644 ServicesApi/Controllers/Redis/base.py create mode 100644 ServicesApi/Controllers/Redis/config.py create mode 100644 ServicesApi/Controllers/Redis/connection.py create mode 100644 ServicesApi/Controllers/Redis/database.py create mode 100644 ServicesApi/Controllers/Redis/implementations.py create mode 100644 ServicesApi/Controllers/Redis/response.py create mode 100644 ServicesApi/Extensions/OnMemory/redis_handlers.py diff --git a/ServicesApi/Builds/Auth/events/auth/events.py b/ServicesApi/Builds/Auth/events/auth/events.py index e5c48ab..fc270b9 100644 --- a/ServicesApi/Builds/Auth/events/auth/events.py +++ b/ServicesApi/Builds/Auth/events/auth/events.py @@ -23,13 +23,14 @@ from Schemas import ( Events, EndpointRestriction, ) -from Controllers.mongo.database import mongo_handler +from Controllers.Mongo.database import mongo_handler from Validations.token.validations import TokenDictType, EmployeeTokenObject, OccupantTokenObject, CompanyToken, OccupantToken, UserType from Validations.defaults.validations import CommonHeaders -from Extends.redis.redis_handlers import RedisHandlers -from Extends.token.password_module import PasswordModule from validations.password.validations import PasswordHistoryViaUser +from Extensions.OnMemory.redis_handlers import RedisHandlers +from Extensions.Token.password_module import PasswordModule + class UserHandlers: diff --git a/ServicesApi/Builds/Building/events/areas/cluster.py b/ServicesApi/Builds/Building/events/areas/cluster.py index a4af6d1..4d22b9f 100644 --- a/ServicesApi/Builds/Building/events/areas/cluster.py +++ b/ServicesApi/Builds/Building/events/areas/cluster.py @@ -1,4 +1,4 @@ -from api_initializer.event_clusters import EventCluster, RouterCluster +from Initializer.event_clusters import EventCluster, RouterCluster from index import endpoints_index from .supers_events import ( SuperAreaListEvent, diff --git a/ServicesApi/Builds/Building/events/areas/supers_events.py b/ServicesApi/Builds/Building/events/areas/supers_events.py index 65401eb..c296024 100644 --- a/ServicesApi/Builds/Building/events/areas/supers_events.py +++ b/ServicesApi/Builds/Building/events/areas/supers_events.py @@ -1,7 +1,7 @@ from typing import Any -from api_initializer.event_clusters import Event -from api_validations.response import ( +from Initializer.event_clusters import Event +from Validations.response import ( PaginateOnly, Pagination, PaginationResult, @@ -9,13 +9,13 @@ from api_validations.response import ( PostgresResponse, EndpointResponse ) -from schemas import ( +from Validations.defaults.validations import CommonHeaders +from Schemas import ( BuildArea, Build, BuildParts, AccountRecords, ) -from api_validations.defaults.validations import CommonHeaders # List all area Super User diff --git a/ServicesApi/Builds/Building/events/building_parts/cluster.py b/ServicesApi/Builds/Building/events/building_parts/cluster.py index 8589f84..e60fb8a 100644 --- a/ServicesApi/Builds/Building/events/building_parts/cluster.py +++ b/ServicesApi/Builds/Building/events/building_parts/cluster.py @@ -1,4 +1,4 @@ -from api_initializer.event_clusters import EventCluster, RouterCluster +from Initializer.event_clusters import EventCluster, RouterCluster from index import endpoints_index from .supers_events import ( SuperPartsListEvent, diff --git a/ServicesApi/Builds/Building/events/building_parts/supers_events.py b/ServicesApi/Builds/Building/events/building_parts/supers_events.py index 887cbfb..14fdfd3 100644 --- a/ServicesApi/Builds/Building/events/building_parts/supers_events.py +++ b/ServicesApi/Builds/Building/events/building_parts/supers_events.py @@ -1,7 +1,7 @@ from typing import Any -from api_initializer.event_clusters import Event -from api_validations.response import ( +from Initializer.event_clusters import Event +from Validations.response import ( PaginateOnly, Pagination, PaginationResult, @@ -9,12 +9,12 @@ from api_validations.response import ( PostgresResponse, EndpointResponse ) -from schemas import ( +from Validations.defaults.validations import CommonHeaders +from Schemas import ( Build, BuildParts, AccountRecords, ) -from api_validations.defaults.validations import CommonHeaders # List all endpoint Super Parts diff --git a/ServicesApi/Builds/Building/events/builds/cluster.py b/ServicesApi/Builds/Building/events/builds/cluster.py index 20a8aec..d9cfcd6 100644 --- a/ServicesApi/Builds/Building/events/builds/cluster.py +++ b/ServicesApi/Builds/Building/events/builds/cluster.py @@ -1,4 +1,4 @@ -from api_initializer.event_clusters import EventCluster, RouterCluster +from Initializer.event_clusters import EventCluster, RouterCluster from index import endpoints_index from .supers_events import ( SuperBuildListEvent, diff --git a/ServicesApi/Builds/Building/events/builds/supers_events.py b/ServicesApi/Builds/Building/events/builds/supers_events.py index aece126..d6829dc 100644 --- a/ServicesApi/Builds/Building/events/builds/supers_events.py +++ b/ServicesApi/Builds/Building/events/builds/supers_events.py @@ -1,7 +1,7 @@ from typing import Any -from api_initializer.event_clusters import Event -from api_validations.response import ( +from Initializer.event_clusters import Event +from Validations.response import ( PaginateOnly, Pagination, PaginationResult, @@ -9,12 +9,12 @@ from api_validations.response import ( PostgresResponse, EndpointResponse ) -from schemas import ( +from Validations.defaults.validations import CommonHeaders +from Schemas import ( Build, BuildParts, AccountRecords, ) -from api_validations.defaults.validations import CommonHeaders # List all endpoint FL-REP diff --git a/ServicesApi/Builds/Building/events/living_space/cluster.py b/ServicesApi/Builds/Building/events/living_space/cluster.py index 9eeae5e..9b47e28 100644 --- a/ServicesApi/Builds/Building/events/living_space/cluster.py +++ b/ServicesApi/Builds/Building/events/living_space/cluster.py @@ -1,4 +1,4 @@ -from api_initializer.event_clusters import EventCluster, RouterCluster +from Initializer.event_clusters import EventCluster, RouterCluster from index import endpoints_index from .supers_events import ( SuperLivingSpaceListEvent, diff --git a/ServicesApi/Builds/Building/events/living_space/supers_events.py b/ServicesApi/Builds/Building/events/living_space/supers_events.py index 72e39bc..e765f30 100644 --- a/ServicesApi/Builds/Building/events/living_space/supers_events.py +++ b/ServicesApi/Builds/Building/events/living_space/supers_events.py @@ -1,7 +1,7 @@ from typing import Any -from api_initializer.event_clusters import Event -from api_validations.response import ( +from Initializer.event_clusters import Event +from Validations.response import ( PaginateOnly, Pagination, PaginationResult, @@ -9,12 +9,12 @@ from api_validations.response import ( PostgresResponse, EndpointResponse ) -from schemas import ( +from Validations.defaults.validations import CommonHeaders +from Schemas import ( Build, BuildParts, AccountRecords, ) -from api_validations.defaults.validations import CommonHeaders # List all endpoint FL-REP diff --git a/ServicesApi/Builds/Identity/Dockerfile b/ServicesApi/Builds/Identity/Dockerfile index 9caa184..581078e 100644 --- a/ServicesApi/Builds/Identity/Dockerfile +++ b/ServicesApi/Builds/Identity/Dockerfile @@ -18,11 +18,11 @@ COPY /ServicesApi/Validations /Validations COPY /ServicesApi/Schemas /Schemas COPY /ServicesApi/Extensions /Extensions -COPY /ServicesApi/api_middlewares /api_middlewares COPY /ServicesApi/Builds/Identity/endpoints /Initializer/endpoints COPY /ServicesApi/Builds/Identity/events /Initializer/events COPY /ServicesApi/Builds/Identity/validations /Initializer/validations COPY /ServicesApi/Builds/Identity/index.py /Initializer/index.py +# COPY /ServicesApi/api_middlewares /api_middlewares # Set Python path to include app directory ENV PYTHONPATH=/ PYTHONUNBUFFERED=1 PYTHONDONTWRITEBYTECODE=1 diff --git a/ServicesApi/Builds/Identity/events/people/cluster.py b/ServicesApi/Builds/Identity/events/people/cluster.py index 941ff1c..2358386 100644 --- a/ServicesApi/Builds/Identity/events/people/cluster.py +++ b/ServicesApi/Builds/Identity/events/people/cluster.py @@ -1,4 +1,4 @@ -from api_initializer.event_clusters import EventCluster, RouterCluster +from Initializer.event_clusters import EventCluster, RouterCluster from index import endpoints_index from .supers_events import ( SuperPeopleListEvent, diff --git a/ServicesApi/Builds/Identity/events/people/supers_events.py b/ServicesApi/Builds/Identity/events/people/supers_events.py index 1925bbb..0c08210 100644 --- a/ServicesApi/Builds/Identity/events/people/supers_events.py +++ b/ServicesApi/Builds/Identity/events/people/supers_events.py @@ -1,7 +1,7 @@ from typing import Any -from api_initializer.event_clusters import Event -from api_validations.response import ( +from Initializer.event_clusters import Event +from Validations.response import ( PaginateOnly, Pagination, PaginationResult, @@ -9,8 +9,8 @@ from api_validations.response import ( PostgresResponse, EndpointResponse ) -from api_validations.token.validations import TokenDictType -from schemas import ( +from Validations.token.validations import TokenDictType +from Schemas import ( Build, BuildLivingSpace, BuildParts, diff --git a/ServicesApi/Builds/Identity/events/user/cluster.py b/ServicesApi/Builds/Identity/events/user/cluster.py index 269d2de..799adfd 100644 --- a/ServicesApi/Builds/Identity/events/user/cluster.py +++ b/ServicesApi/Builds/Identity/events/user/cluster.py @@ -1,4 +1,4 @@ -from api_initializer.event_clusters import EventCluster, RouterCluster +from Initializer.event_clusters import EventCluster, RouterCluster from index import endpoints_index from .supers_events import ( SuperUserListEvent, diff --git a/ServicesApi/Builds/Identity/events/user/supers_events.py b/ServicesApi/Builds/Identity/events/user/supers_events.py index d3bdd59..8e93ed5 100644 --- a/ServicesApi/Builds/Identity/events/user/supers_events.py +++ b/ServicesApi/Builds/Identity/events/user/supers_events.py @@ -1,7 +1,7 @@ from typing import Any -from api_initializer.event_clusters import Event -from api_validations.response import ( +from Initializer.event_clusters import Event +from Validations.response import ( PaginateOnly, Pagination, PaginationResult, @@ -9,8 +9,8 @@ from api_validations.response import ( PostgresResponse, EndpointResponse ) -from api_validations.token.validations import TokenDictType -from schemas import ( +from Validations.token.validations import TokenDictType +from Schemas import ( Build, BuildLivingSpace, BuildParts, diff --git a/ServicesApi/Builds/Initial/alembic/env.py b/ServicesApi/Builds/Initial/alembic/env.py index c007425..9057564 100644 --- a/ServicesApi/Builds/Initial/alembic/env.py +++ b/ServicesApi/Builds/Initial/alembic/env.py @@ -6,8 +6,8 @@ from sqlalchemy import pool from sqlalchemy import create_engine from alembic import context -from schemas import * -from api_controllers.postgres.engine import Base +from Schemas import * +from Controllers.Postgres.engine import Base # this is the Alembic Config object, which provides # access to the values within the .ini file in use. diff --git a/ServicesApi/Builds/Initial/init_address.py b/ServicesApi/Builds/Initial/init_address.py index caeca40..763faaa 100644 --- a/ServicesApi/Builds/Initial/init_address.py +++ b/ServicesApi/Builds/Initial/init_address.py @@ -1,4 +1,4 @@ -from schemas import ( +from Schemas import ( Addresses, AddressCity, AddressStreet, diff --git a/ServicesApi/Builds/Initial/init_app_defaults.py b/ServicesApi/Builds/Initial/init_app_defaults.py index e102943..fd5e6e2 100644 --- a/ServicesApi/Builds/Initial/init_app_defaults.py +++ b/ServicesApi/Builds/Initial/init_app_defaults.py @@ -1,8 +1,8 @@ import arrow -from api_modules.token.password_module import PasswordModule -from api_controllers.mongo.database import mongo_handler -from schemas import ( +from Extensions.Token.password_module import PasswordModule +from Controllers.Mongo.database import mongo_handler +from Schemas import ( Companies, Departments, Duty, diff --git a/ServicesApi/Builds/Initial/init_enums.py b/ServicesApi/Builds/Initial/init_enums.py index ab7ecfb..abc181c 100644 --- a/ServicesApi/Builds/Initial/init_enums.py +++ b/ServicesApi/Builds/Initial/init_enums.py @@ -1,5 +1,5 @@ from pydantic import BaseModel -from schemas import BuildTypes, ApiEnumDropdown +from Schemas import BuildTypes, ApiEnumDropdown class InsertBuildTypes(BaseModel): diff --git a/ServicesApi/Builds/Initial/init_occ_defaults.py b/ServicesApi/Builds/Initial/init_occ_defaults.py index 7a75cce..8b6c784 100644 --- a/ServicesApi/Builds/Initial/init_occ_defaults.py +++ b/ServicesApi/Builds/Initial/init_occ_defaults.py @@ -1,7 +1,7 @@ import arrow -from api_modules.token.password_module import PasswordModule -from api_controllers.mongo.database import mongo_handler -from schemas import ( +from Extensions.Token.password_module import PasswordModule +from Controllers.Mongo.database import mongo_handler +from Schemas import ( Addresses, BuildLivingSpace, Users, diff --git a/ServicesApi/Builds/Initial/init_occupant_types.py b/ServicesApi/Builds/Initial/init_occupant_types.py index 20e0206..5270e93 100644 --- a/ServicesApi/Builds/Initial/init_occupant_types.py +++ b/ServicesApi/Builds/Initial/init_occupant_types.py @@ -1,4 +1,4 @@ -from schemas import OccupantTypes +from Schemas import OccupantTypes def create_occupant_types_defaults(db_session): diff --git a/ServicesApi/Builds/Initial/init_services.py b/ServicesApi/Builds/Initial/init_services.py index 510028b..b4edcec 100644 --- a/ServicesApi/Builds/Initial/init_services.py +++ b/ServicesApi/Builds/Initial/init_services.py @@ -1,4 +1,4 @@ -from schemas import ( +from Schemas import ( Duty, OccupantTypes, Modules, diff --git a/ServicesApi/Builds/Management/Dockerfile b/ServicesApi/Builds/Management/Dockerfile index 5597a17..c6bd763 100644 --- a/ServicesApi/Builds/Management/Dockerfile +++ b/ServicesApi/Builds/Management/Dockerfile @@ -18,11 +18,11 @@ COPY /ServicesApi/Validations /Validations COPY /ServicesApi/Schemas /Schemas COPY /ServicesApi/Extensions /Extensions -COPY /ServicesApi/api_middlewares /api_middlewares COPY /ServicesApi/Builds/Management/endpoints /Initializer/endpoints COPY /ServicesApi/Builds/Management/events /Initializer/events COPY /ServicesApi/Builds/Management/validations /Initializer/validations COPY /ServicesApi/Builds/Management/index.py /Initializer/index.py +# COPY /ServicesApi/api_middlewares /api_middlewares # Set Python path to include app directory ENV PYTHONPATH=/ PYTHONUNBUFFERED=1 PYTHONDONTWRITEBYTECODE=1 diff --git a/ServicesApi/Builds/Management/endpoints/routes.py b/ServicesApi/Builds/Management/endpoints/routes.py index 8976978..fa95bc5 100644 --- a/ServicesApi/Builds/Management/endpoints/routes.py +++ b/ServicesApi/Builds/Management/endpoints/routes.py @@ -1,10 +1,10 @@ from fastapi import APIRouter from .events.router import event_endpoint_route -from .services.router import services_endpoint_route +from .services.router import services_route from .application.router import application_endpoint_route def get_routes() -> list[APIRouter]: - return [event_endpoint_route, application_endpoint_route, services_endpoint_route] + return [event_endpoint_route, application_endpoint_route, services_route] def get_safe_endpoint_urls() -> list[tuple[str, str]]: diff --git a/ServicesApi/Builds/Management/endpoints/services/router.py b/ServicesApi/Builds/Management/endpoints/services/router.py index 8b8eae5..b869508 100644 --- a/ServicesApi/Builds/Management/endpoints/services/router.py +++ b/ServicesApi/Builds/Management/endpoints/services/router.py @@ -19,7 +19,7 @@ services_list = "ServicesList" description="List all services endpoint", operation_id=endpoints_index[services_list], ) -def services_list(data: PaginateOnly, headers: CommonHeaders = Depends(CommonHeaders)): +def services_list(data: PaginateOnly, headers: CommonHeaders = Depends(CommonHeaders.as_dependency)): token_object = TokenProvider.get_dict_from_redis(token=headers.token) event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object) event_key = TokenProvider.retrieve_event_codes(**event_founder_dict) @@ -34,7 +34,7 @@ services_create = "ServicesCreate" description="Create service endpoint", operation_id=endpoints_index[services_create], ) -def services_create(data, headers: CommonHeaders = Depends(CommonHeaders)): +def services_create(data: dict, headers: CommonHeaders = Depends(CommonHeaders.as_dependency)): token_object = TokenProvider.get_dict_from_redis(token=headers.token) event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object) event_key = TokenProvider.retrieve_event_codes(**event_founder_dict) @@ -49,7 +49,7 @@ services_update = "ServicesUpdate" description="Update service endpoint", operation_id=endpoints_index[services_update], ) -def services_update(uu_id: str, data, headers: CommonHeaders = Depends(CommonHeaders)): +def services_update(uu_id: str, data: dict, headers: CommonHeaders = Depends(CommonHeaders.as_dependency)): token_object = TokenProvider.get_dict_from_redis(token=headers.token) event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object) event_key = TokenProvider.retrieve_event_codes(**event_founder_dict) @@ -64,7 +64,7 @@ services_delete = "ServicesDelete" description="Delete service endpoint", operation_id=endpoints_index[services_delete], ) -def services_delete(uu_id: str, headers: CommonHeaders = Depends(CommonHeaders)): +def services_delete(uu_id: str, headers: CommonHeaders = Depends(CommonHeaders.as_dependency)): token_object = TokenProvider.get_dict_from_redis(token=headers.token) event_founder_dict = dict(endpoint_code=headers.operation_id, token=token_object) event_key = TokenProvider.retrieve_event_codes(**event_founder_dict) diff --git a/ServicesApi/Builds/Management/endpoints/tests/router.py b/ServicesApi/Builds/Management/endpoints/tests/router.py index ab5c75b..b7aae13 100644 --- a/ServicesApi/Builds/Management/endpoints/tests/router.py +++ b/ServicesApi/Builds/Management/endpoints/tests/router.py @@ -1,7 +1,7 @@ from typing import Any from fastapi import APIRouter, Depends from sqlalchemy import func -from schemas import AccountRecords +from Schemas import AccountRecords from endpoints.index import endpoints_index from events.event_endpoints.cluster import EventsEndpointRouterCluster diff --git a/ServicesApi/Builds/Management/events/application/cluster.py b/ServicesApi/Builds/Management/events/application/cluster.py index 1250c97..41201b4 100644 --- a/ServicesApi/Builds/Management/events/application/cluster.py +++ b/ServicesApi/Builds/Management/events/application/cluster.py @@ -1,4 +1,4 @@ -from api_initializer.event_clusters import EventCluster, RouterCluster +from Initializer.event_clusters import EventCluster, RouterCluster from index import endpoints_index from .supers_events import ( ApplicationListAllEvent, diff --git a/ServicesApi/Builds/Management/events/application/supers_events.py b/ServicesApi/Builds/Management/events/application/supers_events.py index fa62565..46949a7 100644 --- a/ServicesApi/Builds/Management/events/application/supers_events.py +++ b/ServicesApi/Builds/Management/events/application/supers_events.py @@ -1,7 +1,7 @@ from typing import Any -from api_initializer.event_clusters import Event -from api_validations.response import ( +from Initializer.event_clusters import Event +from Validations.response import ( PaginateOnly, Pagination, PaginationResult, @@ -10,7 +10,7 @@ from api_validations.response import ( EndpointResponse ) -from schemas import ( +from Schemas import ( Applications, Application2Employee, Application2Occupant, diff --git a/ServicesApi/Builds/Management/events/events/cluster.py b/ServicesApi/Builds/Management/events/events/cluster.py index 3930e1d..1d535ad 100644 --- a/ServicesApi/Builds/Management/events/events/cluster.py +++ b/ServicesApi/Builds/Management/events/events/cluster.py @@ -1,4 +1,4 @@ -from api_initializer.event_clusters import EventCluster, RouterCluster +from Initializer.event_clusters import EventCluster, RouterCluster from index import endpoints_index from .supers_events import ( EventsListAvailableEvent, diff --git a/ServicesApi/Builds/Management/events/events/supers_events.py b/ServicesApi/Builds/Management/events/events/supers_events.py index 06ee0a4..f453da5 100644 --- a/ServicesApi/Builds/Management/events/events/supers_events.py +++ b/ServicesApi/Builds/Management/events/events/supers_events.py @@ -1,7 +1,7 @@ from typing import Any -from api_initializer.event_clusters import Event -from api_validations.response import ( +from Initializer.event_clusters import Event +from Validations.response import ( PaginateOnly, Pagination, PaginationResult, @@ -10,7 +10,7 @@ from api_validations.response import ( EndpointResponse ) -from schemas import ( +from Schemas import ( Events, Event2Employee, Event2Occupant, diff --git a/ServicesApi/Builds/Management/events/services/cluster.py b/ServicesApi/Builds/Management/events/services/cluster.py index 68fbc11..82298d0 100644 --- a/ServicesApi/Builds/Management/events/services/cluster.py +++ b/ServicesApi/Builds/Management/events/services/cluster.py @@ -1,4 +1,4 @@ -from api_initializer.event_clusters import EventCluster, RouterCluster +from Initializer.event_clusters import EventCluster, RouterCluster from index import endpoints_index from .supers_events import ( SuperServiceListEvent, diff --git a/ServicesApi/Builds/Management/events/services/supers_events.py b/ServicesApi/Builds/Management/events/services/supers_events.py index 4981afe..741044e 100644 --- a/ServicesApi/Builds/Management/events/services/supers_events.py +++ b/ServicesApi/Builds/Management/events/services/supers_events.py @@ -1,8 +1,8 @@ from typing import Any -from api_validations.defaults.validations import CommonHeaders -from api_initializer.event_clusters import Event -from api_validations.response import ( +from Validations.defaults.validations import CommonHeaders +from Initializer.event_clusters import Event +from Validations.response import ( PaginateOnly, Pagination, PaginationResult, @@ -10,7 +10,7 @@ from api_validations.response import ( PostgresResponse, EndpointResponse ) -from schemas import ( +from Schemas import ( Events, Event2Employee, Event2Occupant, diff --git a/ServicesApi/Builds/Restriction/Dockerfile b/ServicesApi/Builds/Restriction/Dockerfile index a554751..895fe57 100644 --- a/ServicesApi/Builds/Restriction/Dockerfile +++ b/ServicesApi/Builds/Restriction/Dockerfile @@ -18,11 +18,11 @@ COPY /ServicesApi/Validations /Validations COPY /ServicesApi/Schemas /Schemas COPY /ServicesApi/Extensions /Extensions -COPY /ServicesApi/api_middlewares /api_middlewares COPY /ServicesApi/Builds/Restriction/endpoints /Initializer/endpoints COPY /ServicesApi/Builds/Restriction/events /Initializer/events COPY /ServicesApi/Builds/Restriction/validations /Initializer/validations COPY /ServicesApi/Builds/Restriction/index.py /Initializer/index.py +# COPY /ServicesApi/api_middlewares /api_middlewares # Set Python path to include app directory ENV PYTHONPATH=/ PYTHONUNBUFFERED=1 PYTHONDONTWRITEBYTECODE=1 diff --git a/ServicesApi/Builds/Restriction/endpoints/pages/router.py b/ServicesApi/Builds/Restriction/endpoints/pages/router.py index 64963a1..fdba8ca 100644 --- a/ServicesApi/Builds/Restriction/endpoints/pages/router.py +++ b/ServicesApi/Builds/Restriction/endpoints/pages/router.py @@ -2,9 +2,8 @@ from fastapi import APIRouter, Depends from events.pages.events import PageHandlers from index import endpoints_index - +from Validations.defaults.validations import CommonHeaders from validations.request.restrictions.validations import RequestApplication -from api_validations.defaults.validations import CommonHeaders pages_route = APIRouter(prefix="/restrictions", tags=["Restrictions Cluster"]) diff --git a/ServicesApi/Builds/Restriction/events/pages/events.py b/ServicesApi/Builds/Restriction/events/pages/events.py index 31b0398..1c73eed 100644 --- a/ServicesApi/Builds/Restriction/events/pages/events.py +++ b/ServicesApi/Builds/Restriction/events/pages/events.py @@ -1,4 +1,4 @@ -from api_modules.redis.redis_handlers import RedisHandlers +from Extensions.OnMemory.redis_handlers import RedisHandlers class PageHandlers: diff --git a/ServicesApi/Builds/TestApi/endpoints/tester/router.py b/ServicesApi/Builds/TestApi/endpoints/tester/router.py index 8b275b9..abcf0b3 100644 --- a/ServicesApi/Builds/TestApi/endpoints/tester/router.py +++ b/ServicesApi/Builds/TestApi/endpoints/tester/router.py @@ -3,6 +3,7 @@ import datetime from typing import Any from fastapi import APIRouter, Depends from pydantic import BaseModel + from Validations.response import PaginateOnly, Pagination, PaginationResult, EndpointResponse from Validations.defaults.validations import CommonHeaders from Schemas import AccountRecords diff --git a/ServicesApi/Controllers/Redis/Broadcast/README.md b/ServicesApi/Controllers/Redis/Broadcast/README.md new file mode 100644 index 0000000..3fdf588 --- /dev/null +++ b/ServicesApi/Controllers/Redis/Broadcast/README.md @@ -0,0 +1,67 @@ +# Redis Pub/Sub Chain Implementation + +This module implements a chain of services communicating through Redis Pub/Sub channels. Each service in the chain subscribes to the previous service's channel and publishes to its own channel, creating a processing pipeline. + +## Architecture + +The implementation follows a simple chain pattern: + +``` +READER → PROCESSOR → WRITER +``` + +- **READER**: Generates mock data with a "red" stage and publishes to `chain:reader` +- **PROCESSOR**: Subscribes to `chain:reader`, processes messages with "red" stage, updates stage to "processed", and publishes to `chain:processor` +- **WRITER**: Subscribes to `chain:processor`, processes messages with "processed" stage, updates stage to "completed", and publishes to `chain:writer` + +## Message Flow + +Each message flows through the chain with a stage attribute that determines how it's processed: + +1. READER generates a message with `stage="red"` +2. PROCESSOR receives the message, checks if `stage="red"`, processes it, and sets `stage="processed"` +3. WRITER receives the message, checks if `stage="processed"`, processes it, and sets `stage="completed"` + +## Performance + +The implementation includes timing information to track how long messages take to flow through the entire chain. Sample output: + +``` +[READER] 1745176466.132082 | Published UUID: 74cf2312-25ec-4da8-bc0a-521b6ccd5206 +[PROCESSOR] 1745176466.132918 | Received UUID: 74cf2312-25ec-4da8-bc0a-521b6ccd5206 | Published UUID: 74cf2312-25ec-4da8-bc0a-521b6ccd5206 +[WRITER] 1745176466.133097 | Received UUID: 74cf2312-25ec-4da8-bc0a-521b6ccd5206 | Published UUID: 74cf2312-25ec-4da8-bc0a-521b6ccd5206 | Elapsed: 1.83ms +[READER] 1745176468.133018 | Published UUID: 2ffd217f-650f-4e10-bc16-317adcf7a59a +[PROCESSOR] 1745176468.133792 | Received UUID: 2ffd217f-650f-4e10-bc16-317adcf7a59a | Published UUID: 2ffd217f-650f-4e10-bc16-317adcf7a59a +[WRITER] 1745176468.134001 | Received UUID: 2ffd217f-650f-4e10-bc16-317adcf7a59a | Published UUID: 2ffd217f-650f-4e10-bc16-317adcf7a59a | Elapsed: 1.76ms +[READER] 1745176470.133841 | Published UUID: 87e1f3af-c6c2-4fa5-9a65-57e7327d3989 +[PROCESSOR] 1745176470.134623 | Received UUID: 87e1f3af-c6c2-4fa5-9a65-57e7327d3989 | Published UUID: 87e1f3af-c6c2-4fa5-9a65-57e7327d3989 +[WRITER] 1745176470.134861 | Received UUID: 87e1f3af-c6c2-4fa5-9a65-57e7327d3989 | Published UUID: 87e1f3af-c6c2-4fa5-9a65-57e7327d3989 | Elapsed: 1.68ms +``` + +The elapsed time shows the total time from when the READER publishes a message until the WRITER completes processing it. In the samples above, the end-to-end processing time ranges from 1.68ms to 1.83ms. + +## Usage + +To run the demonstration: + +```bash +python -m Controllers.Redis.Broadcast.implementations +``` + +This will start all three services in the chain and begin processing messages. Press Ctrl+C to stop the demonstration. + +## Implementation Details + +The implementation uses: + +1. A singleton Redis Pub/Sub handler with publisher and subscriber capabilities +2. Thread-based message processing +3. JSON serialization for message passing +4. Stage-based message processing to track progress through the chain +5. Timing information to measure performance + +Each service in the chain follows these steps: +1. Subscribe to the appropriate channel +2. Define a message handler function +3. Process incoming messages based on their stage +4. Publish processed messages to the next channel in the chain diff --git a/ServicesApi/Controllers/Redis/Broadcast/actions.py b/ServicesApi/Controllers/Redis/Broadcast/actions.py new file mode 100644 index 0000000..756e9f3 --- /dev/null +++ b/ServicesApi/Controllers/Redis/Broadcast/actions.py @@ -0,0 +1,248 @@ +import json +from typing import Optional, Dict, Any, List, Callable, Union +from threading import Thread + +from Controllers.Redis.connection import redis_cli +from Controllers.Redis.response import RedisResponse + + +class RedisPublisher: + """Redis Publisher class for broadcasting messages to channels.""" + + def __init__(self, redis_client=redis_cli): + self.redis_client = redis_client + + def publish(self, channel: str, message: Union[Dict, List, str]) -> RedisResponse: + """Publish a message to a Redis channel. + + Args: + channel: The channel to publish to + message: The message to publish (will be JSON serialized if dict or list) + + Returns: + RedisResponse with status and message + """ + try: + # Convert dict/list to JSON string if needed + if isinstance(message, (dict, list)): + message = json.dumps(message) + + # Publish the message + recipient_count = self.redis_client.publish(channel, message) + + return RedisResponse( + status=True, + message=f"Message published successfully to {channel}.", + data={"recipients": recipient_count}, + ) + except Exception as e: + return RedisResponse( + status=False, + message=f"Failed to publish message to {channel}.", + error=str(e), + ) + + +class RedisSubscriber: + """Redis Subscriber class for listening to channels.""" + + def __init__(self, redis_client=redis_cli): + self.redis_client = redis_client + self.pubsub = self.redis_client.pubsub() + self.active_threads = {} + + def subscribe( + self, channel: str, callback: Callable[[Dict], None] + ) -> RedisResponse: + """Subscribe to a Redis channel with a callback function. + + Args: + channel: The channel to subscribe to + callback: Function to call when a message is received + + Returns: + RedisResponse with status and message + """ + try: + # Subscribe to the channel + self.pubsub.subscribe(**{channel: self._message_handler(callback)}) + + return RedisResponse( + status=True, message=f"Successfully subscribed to {channel}." + ) + except Exception as e: + return RedisResponse( + status=False, message=f"Failed to subscribe to {channel}.", error=str(e) + ) + + def psubscribe( + self, pattern: str, callback: Callable[[Dict], None] + ) -> RedisResponse: + """Subscribe to Redis channels matching a pattern. + + Args: + pattern: The pattern to subscribe to (e.g., 'user.*') + callback: Function to call when a message is received + + Returns: + RedisResponse with status and message + """ + try: + # Subscribe to the pattern + self.pubsub.psubscribe(**{pattern: self._message_handler(callback)}) + + return RedisResponse( + status=True, message=f"Successfully pattern-subscribed to {pattern}." + ) + except Exception as e: + return RedisResponse( + status=False, + message=f"Failed to pattern-subscribe to {pattern}.", + error=str(e), + ) + + def _message_handler(self, callback: Callable[[Dict], None]): + """Create a message handler function for the subscription.""" + + def handler(message): + # Skip subscription confirmation messages + if message["type"] in ("subscribe", "psubscribe"): + return + + # Parse JSON if the message is a JSON string + data = message["data"] + if isinstance(data, bytes): + data = data.decode("utf-8") + try: + data = json.loads(data) + except json.JSONDecodeError: + # Not JSON, keep as is + pass + + # Call the callback with the message data + callback( + { + "channel": ( + message.get("channel", b"").decode("utf-8") + if isinstance(message.get("channel", b""), bytes) + else message.get("channel", "") + ), + "pattern": ( + message.get("pattern", b"").decode("utf-8") + if isinstance(message.get("pattern", b""), bytes) + else message.get("pattern", "") + ), + "data": data, + } + ) + + return handler + + def start_listening(self, in_thread: bool = True) -> RedisResponse: + """Start listening for messages on subscribed channels. + + Args: + in_thread: If True, start listening in a separate thread + + Returns: + RedisResponse with status and message + """ + try: + if in_thread: + thread = Thread(target=self._listen_thread, daemon=True) + thread.start() + self.active_threads["listener"] = thread + return RedisResponse( + status=True, message="Listening thread started successfully." + ) + else: + # This will block the current thread + self._listen_thread() + return RedisResponse( + status=True, message="Listening started successfully (blocking)." + ) + except Exception as e: + return RedisResponse( + status=False, message="Failed to start listening.", error=str(e) + ) + + def _listen_thread(self): + """Thread function for listening to messages.""" + self.pubsub.run_in_thread(sleep_time=0.01) + + def stop_listening(self) -> RedisResponse: + """Stop listening for messages.""" + try: + self.pubsub.close() + return RedisResponse(status=True, message="Successfully stopped listening.") + except Exception as e: + return RedisResponse( + status=False, message="Failed to stop listening.", error=str(e) + ) + + def unsubscribe(self, channel: Optional[str] = None) -> RedisResponse: + """Unsubscribe from a channel or all channels. + + Args: + channel: The channel to unsubscribe from, or None for all channels + + Returns: + RedisResponse with status and message + """ + try: + if channel: + self.pubsub.unsubscribe(channel) + message = f"Successfully unsubscribed from {channel}." + else: + self.pubsub.unsubscribe() + message = "Successfully unsubscribed from all channels." + + return RedisResponse(status=True, message=message) + except Exception as e: + return RedisResponse( + status=False, + message=f"Failed to unsubscribe from {'channel' if channel else 'all channels'}.", + error=str(e), + ) + + def punsubscribe(self, pattern: Optional[str] = None) -> RedisResponse: + """Unsubscribe from a pattern or all patterns. + + Args: + pattern: The pattern to unsubscribe from, or None for all patterns + + Returns: + RedisResponse with status and message + """ + try: + if pattern: + self.pubsub.punsubscribe(pattern) + message = f"Successfully unsubscribed from pattern {pattern}." + else: + self.pubsub.punsubscribe() + message = "Successfully unsubscribed from all patterns." + + return RedisResponse(status=True, message=message) + except Exception as e: + return RedisResponse( + status=False, + message=f"Failed to unsubscribe from {'pattern' if pattern else 'all patterns'}.", + error=str(e), + ) + + +class RedisPubSub: + """Singleton class that provides both publisher and subscriber functionality.""" + + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super(RedisPubSub, cls).__new__(cls) + cls._instance.publisher = RedisPublisher() + cls._instance.subscriber = RedisSubscriber() + return cls._instance + + +# Create a singleton instance +redis_pubsub = RedisPubSub() diff --git a/ServicesApi/Controllers/Redis/Broadcast/implementations.py b/ServicesApi/Controllers/Redis/Broadcast/implementations.py new file mode 100644 index 0000000..34a336f --- /dev/null +++ b/ServicesApi/Controllers/Redis/Broadcast/implementations.py @@ -0,0 +1,205 @@ +import json +import time +import uuid +from datetime import datetime +from threading import Thread + +from Controllers.Redis.Broadcast.actions import redis_pubsub + + +# Define the channels for our chain +CHANNEL_READER = "chain:reader" +CHANNEL_PROCESSOR = "chain:processor" +CHANNEL_WRITER = "chain:writer" + +# Flag to control the demo +running = True + + +def generate_mock_data(): + """Generate a mock message with UUID, timestamp, and sample data.""" + return { + "uuid": str(uuid.uuid4()), + "timestamp": datetime.now().isoformat(), + "stage": "red", # Initial stage is 'red' + "data": { + "value": f"Sample data {int(time.time())}", + "status": "new", + "counter": 0, + }, + } + + +def reader_function(): + """ + First function in the chain. + Generates mock data and publishes to the reader channel. + """ + print("[READER] Function started") + + while running: + # Generate mock data + message = generate_mock_data() + start_time = time.time() + message["start_time"] = start_time + + # Publish to reader channel + result = redis_pubsub.publisher.publish(CHANNEL_READER, message) + + if result.status: + print(f"[READER] {time.time():.6f} | Published UUID: {message['uuid']}") + else: + print(f"[READER] Publish error: {result.error}") + + # Wait before generating next message + time.sleep(2) + + +def processor_function(): + """ + Second function in the chain. + Subscribes to reader channel, processes messages, and publishes to processor channel. + """ + print("[PROCESSOR] Function started") + + def on_reader_message(message): + # The message structure from the subscriber has 'data' containing our actual message + # If data is a string, parse it as JSON + data = message["data"] + if isinstance(data, str): + try: + data = json.loads(data) + except json.JSONDecodeError as e: + print(f"[PROCESSOR] Error parsing message data: {e}") + return + + # Check if stage is 'red' before processing + if data.get("stage") == "red": + # Process the message + data["processor_timestamp"] = datetime.now().isoformat() + data["data"]["status"] = "processed" + data["data"]["counter"] += 1 + + # Update stage to 'processed' + data["stage"] = "processed" + + # Add some processing metadata + data["processing"] = { + "duration_ms": 150, # Mock processing time + "processor_id": "main-processor", + } + + # Publish to processor channel + result = redis_pubsub.publisher.publish(CHANNEL_PROCESSOR, data) + + if result.status: + print( + f"[PROCESSOR] {time.time():.6f} | Received UUID: {data['uuid']} | Published UUID: {data['uuid']}" + ) + else: + print(f"[PROCESSOR] Publish error: {result.error}") + else: + print(f"[PROCESSOR] Skipped message: {data['uuid']} (stage is not 'red')") + + # Subscribe to reader channel + result = redis_pubsub.subscriber.subscribe(CHANNEL_READER, on_reader_message) + + if result.status: + print(f"[PROCESSOR] Subscribed to channel: {CHANNEL_READER}") + else: + print(f"[PROCESSOR] Subscribe error: {result.error}") + + +def writer_function(): + """ + Third function in the chain. + Subscribes to processor channel and performs final processing. + """ + print("[WRITER] Function started") + + def on_processor_message(message): + # The message structure from the subscriber has 'data' containing our actual message + # If data is a string, parse it as JSON + data = message["data"] + if isinstance(data, str): + try: + data = json.loads(data) + except json.JSONDecodeError as e: + print(f"[WRITER] Error parsing message data: {e}") + return + + # Check if stage is 'processed' before processing + if data.get("stage") == "processed": + # Process the message + data["writer_timestamp"] = datetime.now().isoformat() + data["data"]["status"] = "completed" + data["data"]["counter"] += 1 + + # Update stage to 'completed' + data["stage"] = "completed" + + # Add some writer metadata + data["storage"] = {"location": "main-db", "partition": "events-2025-04"} + + # Calculate elapsed time if start_time is available + current_time = time.time() + elapsed_ms = "" + if "start_time" in data: + elapsed_ms = ( + f" | Elapsed: {(current_time - data['start_time']) * 1000:.2f}ms" + ) + + # Optionally publish to writer channel for any downstream listeners + result = redis_pubsub.publisher.publish(CHANNEL_WRITER, data) + + if result.status: + print( + f"[WRITER] {current_time:.6f} | Received UUID: {data['uuid']} | Published UUID: {data['uuid']}{elapsed_ms}" + ) + else: + print(f"[WRITER] Publish error: {result.error}") + else: + print( + f"[WRITER] Skipped message: {data['uuid']} (stage is not 'processed')" + ) + + # Subscribe to processor channel + result = redis_pubsub.subscriber.subscribe(CHANNEL_PROCESSOR, on_processor_message) + + if result.status: + print(f"[WRITER] Subscribed to channel: {CHANNEL_PROCESSOR}") + else: + print(f"[WRITER] Subscribe error: {result.error}") + + +def run_demo(): + """Run a demonstration of the simple chain of functions.""" + print("=== Starting Redis Pub/Sub Chain Demonstration ===") + print("Chain: READER → PROCESSOR → WRITER") + print(f"Channels: {CHANNEL_READER} → {CHANNEL_PROCESSOR} → {CHANNEL_WRITER}") + print("Format: [SERVICE] TIMESTAMP | Received/Published UUID | [Elapsed time]") + + # Start the Redis subscriber listening thread + redis_pubsub.subscriber.start_listening() + + # Start processor and writer functions (these subscribe to channels) + processor_function() + writer_function() + + # Create a thread for the reader function (this publishes messages) + reader_thread = Thread(target=reader_function, daemon=True) + reader_thread.start() + + # Keep the main thread alive + try: + while True: + time.sleep(0.1) + except KeyboardInterrupt: + print("\nStopping demonstration...") + global running + running = False + redis_pubsub.subscriber.stop_listening() + + +if __name__ == "__main__": + run_demo() diff --git a/ServicesApi/Controllers/Redis/README.md b/ServicesApi/Controllers/Redis/README.md new file mode 100644 index 0000000..80bda34 --- /dev/null +++ b/ServicesApi/Controllers/Redis/README.md @@ -0,0 +1,85 @@ +# Redis Controller + +## Overview +This module provides a robust, thread-safe Redis connection handler with comprehensive concurrent operation testing. The Redis controller is designed for high-performance, resilient database connection management that can handle multiple simultaneous operations efficiently. + +## Features +- Singleton pattern for efficient connection management +- Connection pooling with configurable settings +- Automatic retry capabilities for Redis operations +- Thread-safe operations with proper error handling +- Comprehensive JSON data handling +- TTL management and expiry time resolution +- Efficient batch operations using Redis pipelines + +## Configuration +The Redis controller is configured with the following default settings: +- Host: 10.10.2.15 +- Port: 6379 +- DB: 0 +- Connection pool size: 50 connections +- Health check interval: 30 seconds +- Socket timeout: 5.0 seconds +- Retry on timeout: Enabled +- Socket keepalive: Enabled + +## Usage Examples +The controller provides several high-level methods for Redis operations: +- `set_json`: Store JSON data with optional expiry +- `get_json`: Retrieve JSON data with pattern matching +- `get_json_iterator`: Memory-efficient iterator for large datasets +- `delete`: Remove keys matching a pattern +- `refresh_ttl`: Update expiry time for existing keys +- `key_exists`: Check if a key exists without retrieving it +- `resolve_expires_at`: Get human-readable expiry time + +## Concurrent Performance Testing +The Redis controller has been thoroughly tested for concurrent operations with impressive results: + +### Test Configuration +- 10,000 concurrent threads +- Each thread performs a set, get, and delete operation +- Pipeline used for efficient batching +- Exponential backoff for connection errors +- Comprehensive error tracking and reporting + +### Test Results +``` +Concurrent Redis Test Results: +Total threads: 10000 +Passed: 10000 +Failed: 0 +Operations with retries: 0 +Total retry attempts: 0 +Success rate: 100.00% + +Performance Metrics: +Total execution time: 4.30 seconds +Operations per second: 2324.35 +Average operation time: 1.92 ms +Minimum operation time: 0.43 ms +Maximum operation time: 40.45 ms +95th percentile operation time: 4.14 ms +``` + +## Thread Safety +The Redis controller is designed to be thread-safe with the following mechanisms: +- Connection pooling to manage concurrent connections efficiently +- Thread-local storage for operation-specific data +- Atomic operations using Redis pipelines +- Proper error handling and retry logic for connection issues +- Exponential backoff for handling connection limits + +## Error Handling +The controller implements comprehensive error handling: +- Connection errors are automatically retried with exponential backoff +- Detailed error reporting with context-specific information +- Graceful degradation under high load +- Connection health monitoring and automatic reconnection + +## Best Practices +- Use pipelines for batching multiple operations +- Implement proper key naming conventions +- Set appropriate TTL values for cached data +- Monitor connection pool usage in production +- Use the JSON iterator for large datasets to minimize memory usage diff --git a/ServicesApi/Controllers/Redis/base.py b/ServicesApi/Controllers/Redis/base.py new file mode 100644 index 0000000..c792e76 --- /dev/null +++ b/ServicesApi/Controllers/Redis/base.py @@ -0,0 +1,328 @@ +""" +Redis key-value operations with structured data handling. + +This module provides a class for managing Redis key-value operations with support for: +- Structured data storage and retrieval +- Key pattern generation for searches +- JSON serialization/deserialization +- Type-safe value handling +""" + +import arrow +import json + +from typing import Union, Dict, List, Optional, Any, TypeVar +from .connection import redis_cli + + +T = TypeVar("T", Dict[str, Any], List[Any]) + + +class RedisKeyError(Exception): + """Exception raised for Redis key-related errors.""" + + pass + + +class RedisValueError(Exception): + """Exception raised for Redis value-related errors.""" + + pass + + +class RedisRow: + """ + Handles Redis key-value operations with structured data. + + This class provides methods for: + - Managing compound keys with delimiters + - Converting between bytes and string formats + - JSON serialization/deserialization of values + - Pattern generation for Redis key searches + + Attributes: + key: The Redis key in bytes or string format + value: The stored value (will be JSON serialized) + delimiter: Character used to separate compound key parts + expires_at: Optional expiration timestamp + """ + + key: Union[str, bytes] + value: Optional[str] = None + delimiter: str = ":" + expires_at: Optional[dict] = {"seconds": 60 * 60 * 30} + expires_at_string: Optional[str] + + def get_expiry_time(self) -> int | None: + """Calculate expiry time in seconds from kwargs.""" + time_multipliers = {"days": 86400, "hours": 3600, "minutes": 60, "seconds": 1} + if self.expires_at: + return sum( + int(self.expires_at.get(unit, 0)) * multiplier + for unit, multiplier in time_multipliers.items() + ) + return None + + def merge(self, set_values: List[Union[str, bytes]]) -> None: + """ + Merge list of values into a single delimited key. + + Args: + set_values: List of values to merge into key + + Example: + >>> RedisRow.merge(["users", "123", "profile"]) + >>> print(RedisRow.key) + b'users:123:profile' + """ + if not set_values: + raise RedisKeyError("Cannot merge empty list of values") + + merged = [] + for value in set_values: + if value is None: + continue + if isinstance(value, bytes): + value = value.decode() + merged.append(str(value)) + + self.key = self.delimiter.join(merged).encode() + + @classmethod + def regex(cls, list_keys: List[Union[Optional[str], Optional[bytes]]]) -> str: + """ + Generate Redis search pattern from list of keys. + + Args: + list_keys: List of key parts, can include None for wildcards + + Returns: + str: Redis key pattern with wildcards + + Example: + >>> RedisRow.regex([None, "users", "active"]) + '*:users:active' + """ + if not list_keys: + return "" + + # Filter and convert valid keys + valid_keys = [] + for key in list_keys: + if key is None or str(key) == "None": + continue + if isinstance(key, bytes): + key = key.decode() + valid_keys.append(str(key)) + + # Build pattern + pattern = cls.delimiter.join(valid_keys) + if not pattern: + return "" + + # Add wildcard if first key was None + if list_keys[0] is None: + pattern = f"*{cls.delimiter}{pattern}" + if "*" not in pattern and any([list_key is None for list_key in list_keys]): + pattern = f"{pattern}:*" + return pattern + + def parse(self) -> List[str]: + """ + Parse the key into its component parts. + + Returns: + List[str]: Key parts split by delimiter + + Example: + >>> RedisRow.key = b'users:123:profile' + >>> RedisRow.parse() + ['users', '123', 'profile'] + """ + if not self.key: + return [] + + key_str = self.key.decode() if isinstance(self.key, bytes) else self.key + return key_str.split(self.delimiter) + + def feed(self, value: Union[bytes, Dict, List, str]) -> None: + """ + Convert and store value in JSON format. + + Args: + value: Value to store (bytes, dict, or list) + + Raises: + RedisValueError: If value type is not supported + + Example: + >>> RedisRow.feed({"name": "John", "age": 30}) + >>> print(RedisRow.value) + '{"name": "John", "age": 30}' + """ + try: + if isinstance(value, (dict, list)): + self.value = json.dumps(value) + elif isinstance(value, bytes): + self.value = json.dumps(json.loads(value.decode())) + elif isinstance(value, str): + self.value = value + else: + raise RedisValueError(f"Unsupported value type: {type(value)}") + except json.JSONDecodeError as e: + raise RedisValueError(f"Invalid JSON format: {str(e)}") + + def modify(self, add_dict: Dict) -> None: + """ + Modify existing data by merging with new dictionary. + + Args: + add_dict: Dictionary to merge with existing data + + Example: + >>> RedisRow.feed({"name": "John"}) + >>> RedisRow.modify({"age": 30}) + >>> print(RedisRow.data) + {"name": "John", "age": 30} + """ + if not isinstance(add_dict, dict): + raise RedisValueError("modify() requires a dictionary argument") + current_data = self.row if self.row else {} + if not isinstance(current_data, dict): + raise RedisValueError("Cannot modify non-dictionary data") + current_data = { + **current_data, + **add_dict, + } + self.feed(current_data) + self.save() + + def save(self): + """ + Save the data to Redis with optional expiration. + + Raises: + RedisKeyError: If key is not set + RedisValueError: If value is not set + """ + + if not self.key: + raise RedisKeyError("Cannot save data without a key") + if not self.value: + raise RedisValueError("Cannot save empty data") + + if self.expires_at: + redis_cli.setex( + name=self.redis_key, time=self.get_expiry_time(), value=self.value + ) + self.expires_at_string = str( + arrow.now() + .shift(seconds=self.get_expiry_time()) + .format("YYYY-MM-DD HH:mm:ss") + ) + return self.value + redis_cli.set(name=self.redis_key, value=self.value) + self.expires_at = None + self.expires_at_string = None + return self.value + + def remove(self, key: str) -> None: + """ + Remove a key from the stored dictionary. + + Args: + key: Key to remove from stored dictionary + + Raises: + KeyError: If key doesn't exist + RedisValueError: If stored value is not a dictionary + """ + current_data = self.row + if not isinstance(current_data, dict): + raise RedisValueError("Cannot remove key from non-dictionary data") + + try: + current_data.pop(key) + self.feed(current_data) + self.save() + except KeyError: + raise KeyError(f"Key '{key}' not found in stored data") + + def delete(self) -> None: + """Delete the key from Redis.""" + try: + redis_cli.delete(self.redis_key) + except Exception as e: + raise RedisKeyError(f"Failed to delete key: {str(e)}") + + @property + def keys(self) -> str: + """ + Get key as string. + + Returns: + str: Key in string format + """ + return self.key.decode() if isinstance(self.key, bytes) else self.key + + def set_key(self, key: Union[str, bytes]) -> None: + """ + Set key ensuring bytes format. + + Args: + key: Key in string or bytes format + + Raises: + RedisKeyError: If key is empty or invalid + """ + if not key: + raise RedisKeyError("Cannot set empty key") + + # Convert to string for validation + key_str = key.decode() if isinstance(key, bytes) else str(key) + + # Validate key length (Redis has a 512MB limit for keys) + if len(key_str) > 512 * 1024 * 1024: + raise RedisKeyError("Key exceeds maximum length of 512MB") + + # Validate key format (basic check for invalid characters) + if any(c in key_str for c in ["\n", "\r", "\t", "\0"]): + raise RedisKeyError("Key contains invalid characters") + + self.key = key if isinstance(key, bytes) else str(key).encode() + + @property + def redis_key(self) -> bytes: + """ + Get key in bytes format for Redis operations. + + Returns: + bytes: Key in bytes format + """ + return self.key if isinstance(self.key, bytes) else str(self.key).encode() + + @property + def row(self) -> Union[Dict, List]: + """ + Get stored value as Python object. + + Returns: + Union[Dict, List]: Deserialized JSON data + """ + try: + return json.loads(self.value) + except json.JSONDecodeError as e: + raise RedisValueError(f"Invalid JSON format in stored value: {str(e)}") + + @property + def as_dict(self) -> Dict[str, Any]: + """ + Get row data as dictionary. + + Returns: + Dict[str, Any]: Dictionary with keys and value + """ + return { + "keys": self.keys, + "value": self.row, + } diff --git a/ServicesApi/Controllers/Redis/config.py b/ServicesApi/Controllers/Redis/config.py new file mode 100644 index 0000000..337bff3 --- /dev/null +++ b/ServicesApi/Controllers/Redis/config.py @@ -0,0 +1,25 @@ +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Configs(BaseSettings): + """ + Redis configuration settings. + """ + + HOST: str = "10.10.2.15" + PASSWORD: str = "your_strong_password_here" + PORT: int = 6379 + DB: int = 0 + + def as_dict(self): + return dict( + host=self.HOST, + password=self.PASSWORD, + port=int(self.PORT), + db=self.DB, + ) + + model_config = SettingsConfigDict(env_prefix="REDIS_") + + +redis_configs = Configs() # singleton instance of the REDIS configuration settings diff --git a/ServicesApi/Controllers/Redis/connection.py b/ServicesApi/Controllers/Redis/connection.py new file mode 100644 index 0000000..5c1422c --- /dev/null +++ b/ServicesApi/Controllers/Redis/connection.py @@ -0,0 +1,215 @@ +import time + +from typing import Dict, Any +from redis import Redis, ConnectionError, TimeoutError, ConnectionPool +from .config import redis_configs + + +class RedisConn: + """ + Redis connection manager with connection pooling, retry logic, + and health check capabilities. + """ + + CONNECTION_RETRIES = 3 # Number of connection retries before failing + RETRY_DELAY = 0.5 # Delay between retries in seconds + DEFAULT_TIMEOUT = 5.0 # Default connection timeout in seconds + + def __init__( + self, + max_retries: int = CONNECTION_RETRIES, + ): + """ + Initialize Redis connection with configuration. + + Args: + max_retries: Maximum number of connection attempts. + """ + self.max_retries = max_retries + self.config = redis_configs.as_dict() + self._redis = None + self._pool = None + + # Add default parameters if not provided + if "socket_timeout" not in self.config: + self.config["socket_timeout"] = self.DEFAULT_TIMEOUT + if "socket_connect_timeout" not in self.config: + self.config["socket_connect_timeout"] = self.DEFAULT_TIMEOUT + if "decode_responses" not in self.config: + self.config["decode_responses"] = True + + # Add connection pooling settings if not provided + if "max_connections" not in self.config: + self.config["max_connections"] = 50 # Increased for better concurrency + + # Add connection timeout settings + if "health_check_interval" not in self.config: + self.config["health_check_interval"] = 30 # Health check every 30 seconds + + # Add retry settings for operations + if "retry_on_timeout" not in self.config: + self.config["retry_on_timeout"] = True + + # Add connection pool settings for better performance + if "socket_keepalive" not in self.config: + self.config["socket_keepalive"] = True + + # Initialize the connection with retry logic + self._connect_with_retry() + + def __del__(self): + """Cleanup Redis connection and pool on object destruction.""" + self.close() + + def close(self) -> None: + """Close Redis connection and connection pool.""" + try: + if self._redis: + self._redis.close() + self._redis = None + if self._pool: + self._pool.disconnect() + self._pool = None + except Exception as e: + print(f"Error closing Redis connection: {str(e)}") + + def _connect_with_retry(self) -> None: + """ + Attempt to establish a Redis connection with retry logic. + + Raises: + Exception: If all connection attempts fail. + """ + for attempt in range(1, self.max_retries + 1): + try: + if self._pool is None: + self._pool = ConnectionPool(**self.config) + self._redis = Redis(connection_pool=self._pool) + if self.check_connection(): + return + except (ConnectionError, TimeoutError) as e: + if attempt < self.max_retries: + time.sleep(self.RETRY_DELAY) + else: + raise Exception( + f"Redis connection error after {self.max_retries} attempts: {str(e)}" + ) + except Exception as e: + raise + + def check_connection(self) -> bool: + """ + Check if the Redis connection is alive with a PING command. + + Returns: + bool: True if connection is healthy, False otherwise. + """ + try: + return self._redis.ping() + except Exception as e: + err = e + return False + + def set_connection(self, **kwargs) -> Redis: + """ + Recreate Redis connection with new parameters. + + Args: + host: Redis server hostname or IP + password: Redis authentication password + port: Redis server port + db: Redis database number + **kwargs: Additional Redis connection parameters + + Returns: + Redis: The new Redis connection object + """ + try: + # Update configuration + self.config = { + "host": redis_configs.HOST, + "password": redis_configs.PASSWORD, + "port": redis_configs.PORT, + "db": redis_configs.PORT, + "socket_timeout": kwargs.get("socket_timeout", self.DEFAULT_TIMEOUT), + "socket_connect_timeout": kwargs.get( + "socket_connect_timeout", self.DEFAULT_TIMEOUT + ), + "decode_responses": kwargs.get("decode_responses", True), + "max_connections": kwargs.get("max_connections", 50), + "health_check_interval": kwargs.get("health_check_interval", 30), + "retry_on_timeout": kwargs.get("retry_on_timeout", True), + "socket_keepalive": kwargs.get("socket_keepalive", True), + } + + # Add any additional parameters + for key, value in kwargs.items(): + if key not in self.config: + self.config[key] = value + + # Create new connection + self._redis = Redis(**self.config) + if not self.check_connection(): + raise ConnectionError( + "Failed to establish connection with new parameters" + ) + + return self._redis + except Exception as e: + raise + + def get_connection_info(self) -> Dict[str, Any]: + """ + Get current connection configuration details. + + Returns: + Dict: Current connection configuration + """ + # Create a copy without password for security + info = self.config.copy() + if "password" in info: + info["password"] = "********" if info["password"] else None + return info + + def get_stats(self) -> Dict[str, Any]: + """ + Get Redis server statistics. + + Returns: + Dict: Redis server info + """ + try: + return self._redis.info() + except Exception as e: + return {"error": str(e)} + + @property + def redis(self) -> Redis: + """ + Property to access the Redis client. + + Returns: + Redis: The Redis client instance + + Raises: + Exception: If Redis connection is not available + """ + if not self._redis: + raise Exception("Redis client is not initialized") + + # Check connection health and reconnect if necessary + if not self.check_connection(): + self._connect_with_retry() + + return self._redis + + +# Create singleton instance with error handling +try: + redis_conn = RedisConn() + redis_cli = redis_conn.redis +except Exception as t: + # Optionally set a dummy/mock Redis client for testing or fallback behavior + # redis_cli = MockRedis() # If you have a mock implementation + # Or raise the exception to fail fast + raise diff --git a/ServicesApi/Controllers/Redis/database.py b/ServicesApi/Controllers/Redis/database.py new file mode 100644 index 0000000..062fe4c --- /dev/null +++ b/ServicesApi/Controllers/Redis/database.py @@ -0,0 +1,340 @@ +import arrow + +from typing import Optional, List, Dict, Union, Iterator + +from .response import RedisResponse +from .connection import redis_cli +from .base import RedisRow + + +class MainConfig: + DATETIME_FORMAT: str = "YYYY-MM-DD HH:mm:ss" + + +class RedisActions: + """Class for handling Redis operations with JSON data.""" + + @classmethod + def get_expiry_time(cls, expiry_kwargs: Dict[str, int]) -> int: + """ + Calculate expiry time in seconds from kwargs. + + Args: + expiry_kwargs: Dictionary with time units as keys (days, hours, minutes, seconds) + and their respective values. + + Returns: + Total expiry time in seconds. + """ + time_multipliers = {"days": 86400, "hours": 3600, "minutes": 60, "seconds": 1} + return sum( + int(expiry_kwargs.get(unit, 0)) * multiplier + for unit, multiplier in time_multipliers.items() + ) + + @classmethod + def set_expiry_time(cls, expiry_seconds: int) -> Dict[str, int]: + """ + Convert total seconds back into a dictionary of time units. + + Args: + expiry_seconds: Total expiry time in seconds. + + Returns: + Dictionary with time units and their values. + """ + time_multipliers = {"days": 86400, "hours": 3600, "minutes": 60, "seconds": 1} + result = {} + remaining_seconds = expiry_seconds + + if expiry_seconds < 0: + return {} + + for unit, multiplier in time_multipliers.items(): + if remaining_seconds >= multiplier: + result[unit], remaining_seconds = divmod(remaining_seconds, multiplier) + return result + + @classmethod + def resolve_expires_at(cls, redis_row: RedisRow) -> str: + """ + Resolve expiry time for Redis key. + + Args: + redis_row: RedisRow object containing the redis_key. + + Returns: + Formatted expiry time string or message indicating no expiry. + """ + expiry_time = redis_cli.ttl(redis_row.redis_key) + if expiry_time == -1: + return "Key has no expiry time." + if expiry_time == -2: + return "Key does not exist." + return arrow.now().shift(seconds=expiry_time).format(MainConfig.DATETIME_FORMAT) + + @classmethod + def key_exists(cls, key: Union[str, bytes]) -> bool: + """ + Check if a key exists in Redis without retrieving its value. + + Args: + key: Redis key to check. + + Returns: + Boolean indicating if key exists. + """ + return bool(redis_cli.exists(key)) + + @classmethod + def refresh_ttl(cls, key: Union[str, bytes], expires: Dict[str, int]) -> RedisResponse: + """ + Refresh TTL for an existing key. + + Args: + key: Redis key to refresh TTL. + expires: Dictionary with time units to set new expiry. + + Returns: + RedisResponse with operation result. + """ + try: + if not cls.key_exists(key): + return RedisResponse( + status=False, + message="Cannot refresh TTL: Key does not exist.", + ) + + expiry_time = cls.get_expiry_time(expiry_kwargs=expires) + redis_cli.expire(name=key, time=expiry_time) + + expires_at_string = ( + arrow.now() + .shift(seconds=expiry_time) + .format(MainConfig.DATETIME_FORMAT) + ) + + return RedisResponse( + status=True, + message="TTL refreshed successfully.", + data={"key": key, "expires_at": expires_at_string}, + ) + except Exception as e: + return RedisResponse( + status=False, + message="Failed to refresh TTL.", + error=str(e), + ) + + @classmethod + def delete_key(cls, key: Union[Optional[str], Optional[bytes]]) -> RedisResponse: + """ + Delete a specific key from Redis. + + Args: + key: Redis key to delete. + + Returns: + RedisResponse with operation result. + """ + try: + deleted_count = redis_cli.delete(key) + if deleted_count > 0: + return RedisResponse( + status=True, + message="Key deleted successfully.", + data={"deleted_count": deleted_count}, + ) + return RedisResponse( + status=False, + message="Key not found or already deleted.", + data={"deleted_count": 0}, + ) + except Exception as e: + return RedisResponse( + status=False, + message="Failed to delete key.", + error=str(e), + ) + + @classmethod + def delete(cls, list_keys: List[Union[Optional[str], Optional[bytes]]]) -> RedisResponse: + """ + Delete multiple keys matching a pattern. + + Args: + list_keys: List of key components to form pattern for deletion. + + Returns: + RedisResponse with operation result. + """ + try: + regex = RedisRow().regex(list_keys=list_keys) + json_get = redis_cli.scan_iter(match=regex) + + deleted_keys, deleted_count = [], 0 + + # Use pipeline for batch deletion + with redis_cli.pipeline() as pipe: + for row in json_get: + pipe.delete(row) + deleted_keys.append(row) + results = pipe.execute() + deleted_count = sum(results) + + return RedisResponse( + status=True, + message="Keys deleted successfully.", + data={"deleted_count": deleted_count, "deleted_keys": deleted_keys}, + ) + except Exception as e: + return RedisResponse( + status=False, + message="Failed to delete keys.", + error=str(e), + ) + + @classmethod + def set_json(cls, list_keys: List[Union[str, bytes]], value: Optional[Union[Dict, List]], expires: Optional[Dict[str, int]] = None) -> RedisResponse: + """ + Set JSON value in Redis with optional expiry. + + Args: + list_keys: List of key components to form Redis key. + value: JSON-serializable data to store. + expires: Optional dictionary with time units for expiry. + + Returns: + RedisResponse with operation result. + """ + redis_row = RedisRow() + redis_row.merge(set_values=list_keys) + redis_row.feed(value) + redis_row.expires_at_string = None + redis_row.expires_at = None + + try: + if expires: + redis_row.expires_at = expires + expiry_time = cls.get_expiry_time(expiry_kwargs=expires) + redis_cli.setex( + name=redis_row.redis_key, + time=expiry_time, + value=redis_row.value, + ) + redis_row.expires_at_string = str( + arrow.now() + .shift(seconds=expiry_time) + .format(MainConfig.DATETIME_FORMAT) + ) + else: + redis_cli.set(name=redis_row.redis_key, value=redis_row.value) + + return RedisResponse( + status=True, + message="Value set successfully.", + data=redis_row, + ) + except Exception as e: + return RedisResponse( + status=False, + message="Failed to set value.", + error=str(e), + ) + + @classmethod + def get_json(cls, list_keys: List[Union[Optional[str], Optional[bytes]]], limit: Optional[int] = None) -> RedisResponse: + """ + Get JSON values from Redis using pattern matching. + + Args: + list_keys: List of key components to form pattern for retrieval. + limit: Optional limit on number of results to return. + + Returns: + RedisResponse with operation result. + """ + try: + list_of_rows, count = [], 0 + regex = RedisRow.regex(list_keys=list_keys) + json_get = redis_cli.scan_iter(match=regex) + + for row in json_get: + if limit is not None and count >= limit: + break + + redis_row = RedisRow() + redis_row.set_key(key=row) + + # Use pipeline for batch retrieval + with redis_cli.pipeline() as pipe: + pipe.get(row) + pipe.ttl(row) + redis_value, redis_value_expire = pipe.execute() + redis_row.expires_at = cls.set_expiry_time( + expiry_seconds=int(redis_value_expire) + ) + redis_row.expires_at_string = cls.resolve_expires_at( + redis_row=redis_row + ) + redis_row.feed(redis_value) + list_of_rows.append(redis_row) + count += 1 + + if list_of_rows: + return RedisResponse( + status=True, + message="Values retrieved successfully.", + data=list_of_rows, + ) + return RedisResponse( + status=False, + message="No matching keys found.", + data=list_of_rows, + ) + except Exception as e: + return RedisResponse( + status=False, + message="Failed to retrieve values.", + error=str(e), + ) + + @classmethod + def get_json_iterator(cls, list_keys: List[Union[Optional[str], Optional[bytes]]]) -> Iterator[RedisRow]: + """ + Get JSON values from Redis as an iterator for memory-efficient processing of large datasets. + + Args: + list_keys: List of key components to form pattern for retrieval. + + Returns: + Iterator yielding RedisRow objects. + + Raises: + RedisValueError: If there's an error processing a row + """ + regex = RedisRow.regex(list_keys=list_keys) + json_get = redis_cli.scan_iter(match=regex) + + for row in json_get: + try: + redis_row = RedisRow() + redis_row.set_key(key=row) + + # Use pipeline for batch retrieval + with redis_cli.pipeline() as pipe: + pipe.get(row) + pipe.ttl(row) + redis_value, redis_value_expire = pipe.execute() + redis_row.expires_at = cls.set_expiry_time( + expiry_seconds=int(redis_value_expire) + ) + redis_row.expires_at_string = cls.resolve_expires_at( + redis_row=redis_row + ) + redis_row.feed(redis_value) + yield redis_row + except Exception as e: + # Log the error and continue with next row + print(f"Error processing row {row}: {str(e)}") + continue diff --git a/ServicesApi/Controllers/Redis/implementations.py b/ServicesApi/Controllers/Redis/implementations.py new file mode 100644 index 0000000..c7b5500 --- /dev/null +++ b/ServicesApi/Controllers/Redis/implementations.py @@ -0,0 +1,281 @@ +import threading +import time +import random +import uuid +import concurrent.futures + +from .database import RedisActions + + +def example_set_json() -> None: + """Example of setting JSON data in Redis with and without expiry.""" + # Example 1: Set JSON without expiry + data = {"name": "John", "age": 30, "city": "New York"} + keys = ["user", "profile", "123"] + result = RedisActions.set_json(list_keys=keys, value=data) + print("Set JSON without expiry:", result.as_dict()) + + # Example 2: Set JSON with expiry + expiry = {"hours": 1, "minutes": 30} + result = RedisActions.set_json(list_keys=keys, value=data, expires=expiry) + print("Set JSON with expiry:", result.as_dict()) + + +def example_get_json() -> None: + """Example of retrieving JSON data from Redis.""" + # Example 1: Get all matching keys + keys = ["user", "profile", "*"] + result = RedisActions.get_json(list_keys=keys) + print("Get all matching JSON:", result.as_dict()) + + # Example 2: Get with limit + result = RedisActions.get_json(list_keys=keys, limit=5) + print("Get JSON with limit:", result.as_dict()) + + +def example_get_json_iterator() -> None: + """Example of using the JSON iterator for large datasets.""" + keys = ["user", "profile", "*"] + for row in RedisActions.get_json_iterator(list_keys=keys): + print( + "Iterating over JSON row:", + row.as_dict if isinstance(row.as_dict, dict) else row.as_dict, + ) + + +def example_delete_key() -> None: + """Example of deleting a specific key.""" + key = "user:profile:123" + result = RedisActions.delete_key(key) + print("Delete specific key:", result) + + +def example_delete() -> None: + """Example of deleting multiple keys matching a pattern.""" + keys = ["user", "profile", "*"] + result = RedisActions.delete(list_keys=keys) + print("Delete multiple keys:", result) + + +def example_refresh_ttl() -> None: + """Example of refreshing TTL for a key.""" + key = "user:profile:123" + new_expiry = {"hours": 2, "minutes": 0} + result = RedisActions.refresh_ttl(key=key, expires=new_expiry) + print("Refresh TTL:", result.as_dict()) + + +def example_key_exists() -> None: + """Example of checking if a key exists.""" + key = "user:profile:123" + exists = RedisActions.key_exists(key) + print(f"Key {key} exists:", exists) + + +def example_resolve_expires_at() -> None: + """Example of resolving expiry time for a key.""" + from Controllers.Redis.base import RedisRow + + redis_row = RedisRow() + redis_row.set_key("user:profile:123") + print(redis_row.keys) + expires_at = RedisActions.resolve_expires_at(redis_row) + print("Resolve expires at:", expires_at) + + +def run_all_examples() -> None: + """Run all example functions to demonstrate RedisActions functionality.""" + print("\n=== Redis Actions Examples ===\n") + + print("1. Setting JSON data:") + example_set_json() + + print("\n2. Getting JSON data:") + example_get_json() + + print("\n3. Using JSON iterator:") + example_get_json_iterator() + + # print("\n4. Deleting specific key:") + # example_delete_key() + # + # print("\n5. Deleting multiple keys:") + # example_delete() + + print("\n6. Refreshing TTL:") + example_refresh_ttl() + + print("\n7. Checking key existence:") + example_key_exists() + + print("\n8. Resolving expiry time:") + example_resolve_expires_at() + + +def run_concurrent_test(num_threads=100): + """Run a comprehensive concurrent test with multiple threads to verify Redis connection handling.""" + print( + f"\nStarting comprehensive Redis concurrent test with {num_threads} threads..." + ) + + # Results tracking with detailed metrics + results = { + "passed": 0, + "failed": 0, + "retried": 0, + "errors": [], + "operation_times": [], + "retry_count": 0, + "max_retries": 3, + "retry_delay": 0.1, + } + results_lock = threading.Lock() + + def worker(thread_id): + # Track operation timing + start_time = time.time() + retry_count = 0 + success = False + error_message = None + + while retry_count <= results["max_retries"] and not success: + try: + # Generate unique key for this thread + unique_id = str(uuid.uuid4())[:8] + full_key = f"test:concurrent:{thread_id}:{unique_id}" + + # Simple string operations instead of JSON + test_value = f"test-value-{thread_id}-{time.time()}" + + # Set data in Redis with pipeline for efficiency + from Controllers.Redis.database import redis_cli + + # Use pipeline to reduce network overhead + with redis_cli.pipeline() as pipe: + pipe.set(full_key, test_value) + pipe.get(full_key) + pipe.delete(full_key) + results_list = pipe.execute() + + # Check results + set_ok = results_list[0] + retrieved_value = results_list[1] + if isinstance(retrieved_value, bytes): + retrieved_value = retrieved_value.decode("utf-8") + + # Verify data + success = set_ok and retrieved_value == test_value + + if success: + break + else: + error_message = f"Data verification failed: set_ok={set_ok}, value_match={retrieved_value == test_value}" + retry_count += 1 + with results_lock: + results["retry_count"] += 1 + time.sleep( + results["retry_delay"] * (2**retry_count) + ) # Exponential backoff + + except Exception as e: + error_message = str(e) + retry_count += 1 + with results_lock: + results["retry_count"] += 1 + + # Check if it's a connection error and retry + if "Too many connections" in str(e) or "Connection" in str(e): + # Exponential backoff for connection issues + backoff_time = results["retry_delay"] * (2**retry_count) + time.sleep(backoff_time) + else: + # For other errors, use a smaller delay + time.sleep(results["retry_delay"]) + + # Record operation time + operation_time = time.time() - start_time + + # Update results + with results_lock: + if success: + results["passed"] += 1 + results["operation_times"].append(operation_time) + if retry_count > 0: + results["retried"] += 1 + else: + results["failed"] += 1 + if error_message: + results["errors"].append( + f"Thread {thread_id} failed after {retry_count} retries: {error_message}" + ) + else: + results["errors"].append( + f"Thread {thread_id} failed after {retry_count} retries with unknown error" + ) + + # Create and start threads using a thread pool + start_time = time.time() + with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: + futures = [executor.submit(worker, i) for i in range(num_threads)] + concurrent.futures.wait(futures) + + # Calculate execution time and performance metrics + execution_time = time.time() - start_time + ops_per_second = num_threads / execution_time if execution_time > 0 else 0 + + # Calculate additional metrics if we have successful operations + avg_op_time = 0 + min_op_time = 0 + max_op_time = 0 + p95_op_time = 0 + + if results["operation_times"]: + avg_op_time = sum(results["operation_times"]) / len(results["operation_times"]) + min_op_time = min(results["operation_times"]) + max_op_time = max(results["operation_times"]) + # Calculate 95th percentile + sorted_times = sorted(results["operation_times"]) + p95_index = int(len(sorted_times) * 0.95) + p95_op_time = ( + sorted_times[p95_index] + if p95_index < len(sorted_times) + else sorted_times[-1] + ) + + # Print detailed results + print("\nConcurrent Redis Test Results:") + print(f"Total threads: {num_threads}") + print(f"Passed: {results['passed']}") + print(f"Failed: {results['failed']}") + print(f"Operations with retries: {results['retried']}") + print(f"Total retry attempts: {results['retry_count']}") + print(f"Success rate: {(results['passed'] / num_threads) * 100:.2f}%") + + print("\nPerformance Metrics:") + print(f"Total execution time: {execution_time:.2f} seconds") + print(f"Operations per second: {ops_per_second:.2f}") + + if results["operation_times"]: + print(f"Average operation time: {avg_op_time * 1000:.2f} ms") + print(f"Minimum operation time: {min_op_time * 1000:.2f} ms") + print(f"Maximum operation time: {max_op_time * 1000:.2f} ms") + print(f"95th percentile operation time: {p95_op_time * 1000:.2f} ms") + + # Print errors (limited to 10 for readability) + if results["errors"]: + print("\nErrors:") + for i, error in enumerate(results["errors"][:10]): + print(f"- {error}") + if len(results["errors"]) > 10: + print(f"- ... and {len(results['errors']) - 10} more errors") + + # Return results for potential further analysis + return results + + +if __name__ == "__main__": + # Run basic examples + run_all_examples() + + # Run enhanced concurrent test + run_concurrent_test(10000) diff --git a/ServicesApi/Controllers/Redis/response.py b/ServicesApi/Controllers/Redis/response.py new file mode 100644 index 0000000..8af3f1a --- /dev/null +++ b/ServicesApi/Controllers/Redis/response.py @@ -0,0 +1,194 @@ +from typing import Union, Dict, Optional, Any +from .base import RedisRow + + +class RedisResponse: + """ + Base class for Redis response handling. + + Provides a standardized way to return and process Redis operation results, + with tools to convert between different data representations. + """ + + def __init__(self, status: bool, message: str, data: Any = None, error: Optional[str] = None): + """ + Initialize a Redis response. + + Args: + status: Operation success status + message: Human-readable message about the operation + data: Response data (can be None, RedisRow, list, or dict) + error: Optional error message if operation failed + """ + self.status = status + self.message = message + self.data = data + self.error = error + + # Determine the data type + if isinstance(data, dict): + self.data_type = "dict" + elif isinstance(data, list): + self.data_type = "list" + elif isinstance(data, RedisRow): + self.data_type = "row" + elif isinstance(data, (int, float, str, bool)): + self.data_type = "primitive" + else: + self.data_type = None + + def as_dict(self) -> Dict: + """ + Convert the response to a dictionary format suitable for serialization. + + Returns: + Dictionary representation of the response + """ + # Base response fields + main_dict = { + "status": self.status, + "message": self.message, + "count": self.count, + "dataType": self.data_type, + } + + # Add error if present + if self.error: + main_dict["error"] = self.error + + data = self.all + + # Process single RedisRow + if isinstance(data, RedisRow): + result = {**main_dict} + if hasattr(data, "keys") and hasattr(data, "row"): + if not isinstance(data.keys, str): + raise ValueError("RedisRow keys must be string type") + result[data.keys] = data.row + return result + + # Process list of RedisRows + elif isinstance(data, list): + result = {**main_dict} + + # Handle list of RedisRow objects + rows_dict = {} + for row in data: + if ( + isinstance(row, RedisRow) + and hasattr(row, "keys") + and hasattr(row, "row") + ): + if not isinstance(row.keys, str): + raise ValueError("RedisRow keys must be string type") + rows_dict[row.keys] = row.row + + if rows_dict: + result["data"] = rows_dict + elif data: # If it's just a regular list with items + result["data"] = data + + return result + + # Process dictionary + elif isinstance(data, dict): + return {**main_dict, "data": data} + + return main_dict + + @property + def all(self) -> Any: + """ + Get all data from the response. + + Returns: + All data or empty list if None + """ + return self.data if self.data is not None else [] + + @property + def count(self) -> int: + """ + Count the number of items in the response data. + + Returns: + Number of items (0 if no data) + """ + data = self.all + + if isinstance(data, list): + return len(data) + elif isinstance(data, (RedisRow, dict)): + return 1 + return 0 + + @property + def first(self) -> Union[Dict, None]: + """ + Get the first item from the response data. + + Returns: + First item as a dictionary or None if no data + """ + if not self.data: + return None + + if isinstance(self.data, list) and self.data: + item = self.data[0] + if isinstance(item, RedisRow) and hasattr(item, "row"): + return item.row + return item + elif isinstance(self.data, RedisRow) and hasattr(self.data, "row"): + return self.data.row + elif isinstance(self.data, dict): + return self.data + + return None + + def is_successful(self) -> bool: + """ + Check if the operation was successful. + + Returns: + Boolean indicating success status + """ + return self.status + + def to_api_response(self) -> Dict: + """ + Format the response for API consumption. + + Returns: + API-friendly response dictionary + """ + try: + response = { + "success": self.status, + "message": self.message, + } + + if self.error: + response["error"] = self.error + + if self.data is not None: + if self.data_type == "row" and hasattr(self.data, "to_dict"): + response["data"] = self.data.to_dict() + elif self.data_type == "list": + try: + if all(hasattr(item, "to_dict") for item in self.data): + response["data"] = [item.to_dict() for item in self.data] + else: + response["data"] = self.data + except Exception as e: + response["error"] = f"Error converting list items: {str(e)}" + else: + response["data"] = self.data + + response["count"] = self.count + return response + except Exception as e: + return { + "success": False, + "message": "Error formatting response", + "error": str(e), + } diff --git a/ServicesApi/Extensions/Middlewares/token_provider.py b/ServicesApi/Extensions/Middlewares/token_provider.py index 5e16c08..e35fed1 100644 --- a/ServicesApi/Extensions/Middlewares/token_provider.py +++ b/ServicesApi/Extensions/Middlewares/token_provider.py @@ -3,8 +3,8 @@ import enum from typing import Optional, Union, Dict, Any, List from pydantic import BaseModel -from api_controllers.redis.database import RedisActions -from api_validations.token.validations import ( +from Controllers.Redis.database import RedisActions +from Validations.token.validations import ( TokenDictType, OccupantTokenObject, EmployeeTokenObject, diff --git a/ServicesApi/Extensions/OnMemory/redis_handlers.py b/ServicesApi/Extensions/OnMemory/redis_handlers.py new file mode 100644 index 0000000..e2a7156 --- /dev/null +++ b/ServicesApi/Extensions/OnMemory/redis_handlers.py @@ -0,0 +1,70 @@ +from typing import Any, Union + +from Validations.token.validations import ( + TokenDictType, + EmployeeTokenObject, + OccupantTokenObject, + CompanyToken, + OccupantToken, + UserType +) +from Controllers.Redis.database import RedisActions +from Extensions.Token.password_module import PasswordModule + +from Schemas import Users + + +class RedisHandlers: + + AUTH_TOKEN: str = "AUTH_TOKEN" + + @classmethod + def process_redis_object(cls, redis_object: dict[str, Any]) -> TokenDictType: + """Process Redis object and return appropriate token object.""" + if not redis_object.get("selected_company"): + redis_object["selected_company"] = None + if not redis_object.get("selected_occupant"): + redis_object["selected_occupant"] = None + if redis_object.get("user_type") == UserType.employee.value: + return EmployeeTokenObject(**redis_object) + elif redis_object.get("user_type") == UserType.occupant.value: + return OccupantTokenObject(**redis_object) + raise ValueError("Invalid user type") + + @classmethod + def get_object_from_redis(cls, access_token: str) -> TokenDictType: + redis_response = RedisActions.get_json(list_keys=[cls.AUTH_TOKEN, access_token, "*", "*"]) + if not redis_response.status: + raise ValueError("EYS_0001") + if redis_object := redis_response.first: + return cls.process_redis_object(redis_object) + raise ValueError("EYS_0002") + + @classmethod + def set_object_to_redis(cls, user: Users, token, header_info, add_uuid: str): + result_delete = RedisActions.delete(list_keys=[cls.AUTH_TOKEN, "*", str(user.uu_id), add_uuid]) + generated_access_token = PasswordModule.generate_access_token() + keys = [cls.AUTH_TOKEN, generated_access_token, str(user.uu_id)] + if add_uuid: + keys.append(add_uuid) + RedisActions.set_json(list_keys=keys, value={**token, **header_info}, expires={"hours": 1, "minutes": 30}) + return generated_access_token + RedisActions.set_json(list_keys=keys, value={**token, **header_info}, expires={"hours": 1, "minutes": 30}) + return generated_access_token + + @classmethod + def update_token_at_redis(cls, token: str, add_payload: Union[CompanyToken, OccupantToken], add_uuid: str): + if already_token_data := RedisActions.get_json(list_keys=[cls.AUTH_TOKEN, token, '*', add_uuid]).first: + already_token = cls.process_redis_object(already_token_data) + if already_token.is_employee and isinstance(add_payload, CompanyToken): + already_token.selected_company = add_payload + list_keys = [cls.AUTH_TOKEN, token, str(already_token.user_uu_id), str(add_uuid)] + print('is_employee: ', list_keys) + elif already_token.is_occupant and isinstance(add_payload, OccupantToken): + already_token.selected_occupant = add_payload + list_keys = [cls.AUTH_TOKEN, token, str(already_token.user_uu_id), str(add_uuid)] + print('is_occupant: ', list_keys) + result = RedisActions.set_json(list_keys=list_keys, value=already_token.model_dump(), expires={"hours": 1, "minutes": 30}) + RedisActions.delete(list_keys=[cls.AUTH_TOKEN, token, str(already_token.user_uu_id)]) + return result.first + raise ValueError("Something went wrong") diff --git a/ServicesApi/Initializer/event_clusters.py b/ServicesApi/Initializer/event_clusters.py index 110fcb5..8fb64af 100644 --- a/ServicesApi/Initializer/event_clusters.py +++ b/ServicesApi/Initializer/event_clusters.py @@ -32,7 +32,7 @@ class EventCluster: return None def set_events_to_database(self): - from schemas import Events, EndpointRestriction + from Schemas import Events, EndpointRestriction with Events.new_session() as db_session: Events.set_session(db_session) diff --git a/docker-compose.yml b/docker-compose.yml index 117de7a..b035a48 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,19 +1,19 @@ services: - client_frontend: - container_name: client_frontend - build: - context: . - dockerfile: web_services/client_frontend/Dockerfile - networks: - - wag-services - ports: - - "3000:3000" - environment: - - NODE_ENV=development - - WEB_BASE_URL=http://localhost:3000 - - API_BASE_URL=http://localhost:3000/api - cpus: 1.5 - mem_limit: 2048m + # client_frontend: + # container_name: client_frontend + # build: + # context: . + # dockerfile: web_services/client_frontend/Dockerfile + # networks: + # - wag-services + # ports: + # - "3000:3000" + # environment: + # - NODE_ENV=development + # - WEB_BASE_URL=http://localhost:3000 + # - API_BASE_URL=http://localhost:3000/api + # cpus: 1.5 + # mem_limit: 2048m # management_frontend: # container_name: management_frontend @@ -57,7 +57,7 @@ services: # container_name: account_service # build: # context: . - # dockerfile: api_services/api_builds/account_service/Dockerfile + # dockerfile: ServicesApi/Builds/Account/Dockerfile # env_file: # - api_env.env # networks: @@ -75,33 +75,33 @@ services: # ports: # - "8004:8004" - # building_service: - # container_name: building_service - # build: - # context: . - # dockerfile: api_services/api_builds/building_service/Dockerfile - # env_file: - # - api_env.env - # networks: - # - wag-services - # environment: - # - API_PATH=app:app - # - API_HOST=0.0.0.0 - # - API_PORT=8006 - # - API_LOG_LEVEL=info - # - API_RELOAD=1 - # - API_APP_NAME=evyos-building-api-gateway - # - API_TITLE=WAG API Building Api Gateway - # - API_DESCRIPTION=This api is serves as web building api gateway only to evyos web services. - # - API_APP_URL=https://building_service - # ports: - # - "8006:8006" + building_service: + container_name: building_service + build: + context: . + dockerfile: ServicesApi/Builds/Building/Dockerfile + env_file: + - api_env.env + networks: + - wag-services + environment: + - API_PATH=app:app + - API_HOST=0.0.0.0 + - API_PORT=8006 + - API_LOG_LEVEL=info + - API_RELOAD=1 + - API_APP_NAME=evyos-building-api-gateway + - API_TITLE=WAG API Building Api Gateway + - API_DESCRIPTION=This api is serves as web building api gateway only to evyos web services. + - API_APP_URL=https://building_service + ports: + - "8006:8006" identity_service: container_name: identity_service build: context: . - dockerfile: api_services/api_builds/identity_service/Dockerfile + dockerfile: ServicesApi/Builds/Identity/Dockerfile env_file: - api_env.env networks: @@ -123,7 +123,7 @@ services: container_name: auth_service build: context: . - dockerfile: api_services/api_builds/auth_service/Dockerfile + dockerfile: ServicesApi/Builds/Auth/Dockerfile env_file: - api_env.env networks: @@ -147,7 +147,7 @@ services: container_name: restriction_service build: context: . - dockerfile: api_services/api_builds/restriction_service/Dockerfile + dockerfile: ServicesApi/Builds/Restriction/Dockerfile env_file: - api_env.env networks: @@ -170,7 +170,7 @@ services: container_name: management_service build: context: . - dockerfile: api_services/api_builds/management_service/Dockerfile + dockerfile: ServicesApi/Builds/Management/Dockerfile env_file: - api_env.env networks: @@ -189,25 +189,11 @@ services: - "8003:8003" # restart: unless-stopped - # initializer_service: - # container_name: initializer_service - # build: - # context: . - # dockerfile: api_services/api_builds/initial_service/Dockerfile - # environment: - # - SET_ALEMBIC=0 - # networks: - # - wag-services - # env_file: - # - api_env.env - # mem_limit: 512m - # cpus: 0.5 - # address_service: # container_name: address_service # build: # context: . - # dockerfile: api_services/api_builds/address_service/Dockerfile + # dockerfile: ServicesApi/Builds/Address/Dockerfile # env_file: # - api_env.env # networks: @@ -229,7 +215,7 @@ services: # container_name: decision_book_service # build: # context: . - # dockerfile: api_services/api_builds/decision_book_service/Dockerfile + # dockerfile: ServicesApi/Builds/DecisionBook/Dockerfile # env_file: # - api_env.env # networks: @@ -251,7 +237,7 @@ services: # container_name: project_decision_book_service # build: # context: . - # dockerfile: api_services/api_builds/project_decision_book_service/Dockerfile + # dockerfile: ServicesApi/Builds/ProjectDecisionBook/Dockerfile # env_file: # - api_env.env # networks: @@ -273,7 +259,7 @@ services: # container_name: company_service # build: # context: . - # dockerfile: api_services/api_builds/company_service/Dockerfile + # dockerfile: ServicesApi/Builds/Company/Dockerfile # env_file: # - api_env.env # networks: @@ -291,6 +277,20 @@ services: # ports: # - "8005:8005" + # initializer_service: + # container_name: initializer_service + # build: + # context: . + # dockerfile: ServicesApi/Builds/initial_service/Dockerfile + # environment: + # - SET_ALEMBIC=0 + # networks: + # - wag-services + # env_file: + # - api_env.env + # mem_limit: 512m + # cpus: 0.5 + networks: wag-services: driver: bridge