diff --git a/ServicesRunner/AccountRecordServices/Reader/Banks/IsBank/app.py b/ServicesRunner/AccountRecordServices/Reader/Banks/IsBank/app.py index c424461..baa9a54 100644 --- a/ServicesRunner/AccountRecordServices/Reader/Banks/IsBank/app.py +++ b/ServicesRunner/AccountRecordServices/Reader/Banks/IsBank/app.py @@ -1,6 +1,7 @@ import sys import socket import logging + from time import sleep from config import IsBankConfig from Depends.mail_handler import EmailReaderService, EmailServiceRunner diff --git a/ServicesTask/app/services/common/=0.19.0 b/ServicesTask/app/services/common/=0.19.0 deleted file mode 100644 index e69de29..0000000 diff --git a/ServicesTask/app/services/common/=6.4.0 b/ServicesTask/app/services/common/=6.4.0 deleted file mode 100644 index e69de29..0000000 diff --git a/ServicesTask/app/services/common/models.py b/ServicesTask/app/services/common/models.py index 9fbdf36..9a4ab89 100644 --- a/ServicesTask/app/services/common/models.py +++ b/ServicesTask/app/services/common/models.py @@ -88,6 +88,13 @@ class BuildingCluster(BaseModel): build_parts: List['BuildPart'] = [] +class BuildRequirements(BaseModel): + + building_count: int + living_space: int + build_parts: int + + # Update forward references for models with circular dependencies BuildPart.update_forward_refs() BuildingCluster.update_forward_refs() diff --git a/ServicesTask/app/services/common/redis_handler.py b/ServicesTask/app/services/common/redis_handler.py index 38b8e6d..3f80ed0 100644 --- a/ServicesTask/app/services/common/redis_handler.py +++ b/ServicesTask/app/services/common/redis_handler.py @@ -4,8 +4,7 @@ from json import loads, dumps from contextlib import contextmanager from time import sleep from redis import Redis, RedisError, ConnectionError as RedisConnectionError - -from config import RedisConfig +from .config import RedisConfig logger = logging.getLogger('RedisHandler') @@ -43,35 +42,28 @@ class RedisHandler: return cls._instance def __init__(self): - # Initialize only once if self._initialized: return - - # Initialize Redis client with retry logic self.redis_client = self._create_redis_client() self.redis_connected = self._check_redis_connection() self._initialized = True def _create_redis_client(self): """Create a Redis client with connection retry""" - max_retries = 5 - retry_delay = 5 - + max_retries, retry_delay = 5, 5 for attempt in range(max_retries): try: client = Redis(**RedisConfig.as_dict()) - client.ping() # Test the connection + client.ping() logger.info("Redis connection established successfully") return client except (RedisConnectionError, RedisError) as e: if attempt < max_retries - 1: logger.warning(f"Redis connection attempt {attempt + 1} failed: {str(e)}. Retrying in {retry_delay} seconds...") sleep(retry_delay) - retry_delay *= 2 # Exponential backoff + retry_delay *= 2 else: logger.error(f"Failed to connect to Redis after {max_retries} attempts: {str(e)}") - # Continue with a new Redis client instance even if ping fails - # This allows the service to start and retry connections later return Redis(**RedisConfig.as_dict()) def _check_redis_connection(self) -> bool: @@ -81,11 +73,11 @@ class RedisHandler: return True except Exception as e: return False - + def ping(self): """Ping Redis server to check connection""" return self.redis_client.ping() - + def sadd(self, key: str, value): """Add a value to a Redis set""" return self.redis_client.sadd(key, value) @@ -93,11 +85,22 @@ class RedisHandler: def ismember(self, key: str, value): """Check if a value is a member of a Redis set""" return self.redis_client.sismember(key, value) - + def get(self, key: str): """Get a value from Redis by key""" return self.redis_client.get(key) - + + def get_json(self, key: str) -> dict: + """Get a value from Redis by key""" + obj = self.redis_client.get(key) + if obj: + return loads(obj) + return None + + def set_json(self, key: str, value): + """Set a key-value pair in Redis""" + return self.redis_client.set(key, dumps(value)) + def set(self, key: str, value): """Set a key-value pair in Redis""" return self.redis_client.set(key, value) @@ -139,7 +142,7 @@ class RedisHandler: logger.error(f"Failed to re-establish Redis connection: {str(e)}") return False return True - + @classmethod def handle_reconnection(cls, consecutive_errors=0, max_consecutive_errors=5): """ @@ -151,12 +154,9 @@ class RedisHandler: tuple: (RedisHandler instance, bool indicating if extended sleep is needed) """ try: - # Get a fresh instance (will reconnect internally) instance = cls() instance.redis_connected = instance._check_redis_connection() logger.info("Recreated Redis handler using singleton pattern") - - # Determine if extended sleep is needed need_extended_sleep = consecutive_errors >= max_consecutive_errors if need_extended_sleep: logger.warning(f"Hit {max_consecutive_errors} consecutive Redis errors, taking longer pause") @@ -165,3 +165,8 @@ class RedisHandler: logger.error(f"Failed to recreate Redis handler: {str(redis_retry_error)}") return None, consecutive_errors >= max_consecutive_errors + +class RedisSaveModels: + + COMMENT_BUILDING_CLUSTER = "COMMENT:PARSER:BUILDING:CLUSTER" + COMMENT_BUILDING_INFO = "COMMENT:PARSER:BUILDING:INFO" diff --git a/ServicesTask/app/services/common/service_base_async.py b/ServicesTask/app/services/common/service_base_async.py index e818742..80f3f45 100644 --- a/ServicesTask/app/services/common/service_base_async.py +++ b/ServicesTask/app/services/common/service_base_async.py @@ -6,8 +6,9 @@ import aio_pika from aio_pika.abc import AbstractIncomingMessage from typing import Any, Dict, Awaitable, Callable, Optional, List -from app.services.types.task import _MsgCtx, _MSG_CTX -from app.services.types.queue import Enqueue + +from services.types.task import _MsgCtx, _MSG_CTX +from services.types.queue import Enqueue class ServiceBaseAsync: diff --git a/ServicesTask/app/services/database/Dockerfile b/ServicesTask/app/services/database/Dockerfile index 33d96bf..310bb33 100644 --- a/ServicesTask/app/services/database/Dockerfile +++ b/ServicesTask/app/services/database/Dockerfile @@ -14,6 +14,7 @@ COPY app/services/database/README.md ./ COPY app/core ./app/core COPY app/services/common/ ./app/services/common/ COPY app/services/database/ ./app/services/database/ +COPY app/services/types/ ./app/services/types/ RUN apt-get update && apt-get install -y bash RUN mkdir -p /app/data diff --git a/ServicesTask/app/services/database/comment_requirements.py b/ServicesTask/app/services/database/comment_requirements.py index ff60253..c70d20f 100644 --- a/ServicesTask/app/services/database/comment_requirements.py +++ b/ServicesTask/app/services/database/comment_requirements.py @@ -1,136 +1,181 @@ import arrow -from app.services.common.models import BuildingCluster, BuildPart, BuildLivingSpace, Person, User, OccupantType -from app.services.database.prisma_client import PrismaService +from typing import Optional, Dict + +from services.common.redis_handler import RedisHandler, RedisSaveModels +from services.common.models import BuildingCluster, BuildPart, BuildLivingSpace, Person, User, OccupantType, BuildRequirements +from prisma_client import PrismaService -prisma_service = PrismaService() +class DefaultImportsToMemory: + def __init__(self, prisma_service: Optional[PrismaService] = None): + self.prisma_service = None + if prisma_service: + self.prisma_service = prisma_service + self.redis_handler = RedisHandler() + self.today = arrow.now().to('GMT+3').datetime -async def get_count_person_data_due_to_build(): - today = arrow.now().to('GMT+3').datetime - async with prisma_service._asession() as db: - occupant_flat_owner = await db.occupant_types.find_first(query={"occupant_code": "FL-OWN", "active": True, "is_confirmed": True}, include={"user_types": True}) - occupant_tenant = await db.occupant_types.find_first(query={"occupant_code": "FL-TEN", "active": True, "is_confirmed": True}, include={"user_types": True}) - possible_money_sender_occupants = [occupant_flat_owner.id, occupant_tenant.id] - building_count = await db.build.count(where={"active": True, "is_confirmed": True,"expiry_starts": {"lte": today}, "expiry_ends": {"gte": today}}) - build_parts_count = await db.build_parts.count(where={"active": True, "is_confirmed": True, "human_livable": True, "expiry_starts": {"lte": today}, "expiry_ends": {"gte": today}}) - living_spaces_count = await db.build_living_space.count( - where={"active": True, "is_confirmed": True, "expiry_starts": {"lte": today}, "expiry_ends": {"gte": today}, "occupant_type_id": {"in": possible_money_sender_occupants}}, - ) - return {"building_count": building_count, "living_space": living_spaces_count, "build_parts": build_parts_count} - # return {"building_count": 0, "living_space": 0, "build_parts": 0} + # Redis Actions + async def get_count_person_data_due_to_build_info(self) -> Optional[BuildRequirements]: + """Get count of person data due to build with comprehensive inner joins""" + return self.redis_handler.get_json(RedisSaveModels.COMMENT_BUILDING_INFO) + async def set_count_person_data_due_to_build_info(self, data: BuildRequirements): + """Set count of person data due to build with comprehensive inner joins""" + return self.redis_handler.set_json(RedisSaveModels.COMMENT_BUILDING_INFO, data.dict()) -async def get_all_person_data_due_to_build(): - """ - Get all person data due to build with comprehensive inner joins - Returns a dictionary of buildings clustered with their build parts, people, and living spaces - """ - buildings_dict, today = {}, arrow.now().to('GMT+3').datetime - async with prisma_service._asession() as db: - occupant_flat_owner = await db.occupant_types.find_first(query={"occupant_code": "FL-OWN", "active": True, "is_confirmed": True}, include={"user_types": True}) - occupant_tenant = await db.occupant_types.find_first(query={"occupant_code": "FL-TEN", "active": True, "is_confirmed": True}, include={"user_types": True}) - possible_money_sender_occupants = [occupant_flat_owner.id, occupant_tenant.id] - buildings = await db.build.find_many(where={"active": True, "is_confirmed": True,"expiry_starts": {"lte": today}, "expiry_ends": {"gte": today}}) - for build in buildings: - buildings_dict[str(build.id)] = BuildingCluster( - id=build.id, - uu_id=build.uu_id, - build_name=build.build_name, - build_no=build.build_no, - build_date=str(build.build_date), - decision_period_date=str(build.decision_period_date), - expiry_starts=str(build.expiry_starts), - expiry_ends=str(build.expiry_ends), - is_confirmed=build.is_confirmed, - active=build.active, - build_parts=[] + async def get_count_person_data_due_to_build_data(self): + """Get count of person data due to build with comprehensive inner joins""" + data = self.redis_handler.get_json(RedisSaveModels.COMMENT_BUILDING_CLUSTER) + return {i: BuildingCluster(**v) for i, v in data.items()} + + async def set_count_person_data_due_to_build_data(self, data: Dict[str, BuildingCluster]): + """Set count of person data due to build with comprehensive inner joins""" + excluded_dict = {i: v.dict(exclude_none=True) for i, v in data.items()} + return self.redis_handler.set_json(RedisSaveModels.COMMENT_BUILDING_CLUSTER, excluded_dict) + + # Database Actions + def check_if_database_is_available(self): + if not self.prisma_service: + raise ValueError("PrismaService is not initialized") + + async def get_count_person_data_due_to_build(self) -> BuildRequirements: + """Get count of person data due to build with comprehensive inner joins""" + self.check_if_database_is_available() + async with self.prisma_service._asession() as db: + occupant_flat_owner = await db.occupant_types.find_first(where={"occupant_code": "FL-OWN", "active": True, "is_confirmed": True}, include={"user_types": True}) + occupant_tenant = await db.occupant_types.find_first(where={"occupant_code": "FL-TEN", "active": True, "is_confirmed": True}, include={"user_types": True}) + possible_money_sender_occupants = [occupant_flat_owner.id, occupant_tenant.id] + building_count = await db.build.count(where={"active": True, "is_confirmed": True,"expiry_starts": {"lte": self.today}, "expiry_ends": {"gte": self.today}}) + build_parts_count = await db.build_parts.count(where={"active": True, "is_confirmed": True, "human_livable": True, "expiry_starts": {"lte": self.today}, "expiry_ends": {"gte": self.today}}) + living_spaces_count = await db.build_living_space.count( + where={"active": True, "is_confirmed": True, "expiry_starts": {"lte": self.today}, "expiry_ends": {"gte": self.today}, "occupant_type_id": {"in": possible_money_sender_occupants}}, ) - build_parts = db.build_parts(where={"build_id": build.id, "active": True, "is_confirmed": True, "human_livable": True, "expiry_starts": {"lte": today}, "expiry_ends": {"gte": today}}) - for build_part in build_parts: - part_obj = BuildPart( - id=build_part.id, - uu_id=build_part.uu_id, - part_no=build_part.part_no, - part_level=build_part.part_level, - part_code=build_part.part_code, - part_gross_size=build_part.part_gross_size, - part_net_size=build_part.part_net_size, - human_livable=build_part.human_livable, - build_id=build_part.build_id, - build_uu_id=build_part.build_uu_id, - is_confirmed=build_part.is_confirmed, - active=build_part.active, - living_spaces=[], - build=None + return BuildRequirements(building_count=building_count, living_space=living_spaces_count, build_parts=build_parts_count) + + async def retrieve_all_person_data_due_to_build(self) -> Dict[str, BuildingCluster]: + """ + Get all person data due to build with comprehensive inner joins + Returns a dictionary of buildings clustered with their build parts, people, and living spaces + """ + self.check_if_database_is_available() + buildings_dict = {} + async with self.prisma_service._asession() as db: + occupant_flat_owner = await db.occupant_types.find_first(where={"occupant_code": "FL-OWN", "active": True, "is_confirmed": True}, include={"user_types": True}) + occupant_tenant = await db.occupant_types.find_first(where={"occupant_code": "FL-TEN", "active": True, "is_confirmed": True}, include={"user_types": True}) + possible_money_sender_occupants = [occupant_flat_owner.id, occupant_tenant.id] + buildings = await db.build.find_many(where={"active": True, "is_confirmed": True,"expiry_starts": {"lte": self.today}, "expiry_ends": {"gte": self.today}}) + for build in buildings: + buildings_dict[str(build.id)] = BuildingCluster( + id=build.id, uu_id=build.uu_id, build_name=build.build_name, build_no=build.build_no, build_date=str(build.build_date), + decision_period_date=str(build.decision_period_date), expiry_starts=str(build.expiry_starts), expiry_ends=str(build.expiry_ends), + is_confirmed=build.is_confirmed, active=build.active, build_parts=[] ) - living_spaces = db.build_living_space.find_many( - include={"occupant_types": True, "people": {"include": {"users": True}}}, - query={"build_parts_id": build_part.id, "active": True, "is_confirmed": True, "expiry_starts": {"lte": today}, "expiry_ends": {"gte": today}, "occupant_type_id": {"in": possible_money_sender_occupants}}, - ) - for living_space in living_spaces: - person = living_space.people - user = db.users.find_first(where={"person_id": person.id, "active": True, "is_confirmed": True}) - user_of_person = None - if user: - user_of_person = User( - id=user.id, - uu_id=user.uu_id, - user_tag=user.user_tag, - user_type=user.user_type, - email=user.email, - phone_number=user.phone_number, - related_company=user.related_company, - is_confirmed=user.is_confirmed, - active=user.active + build_parts = await db.build_parts.find_many(where={"build_id": build.id, "active": True, "is_confirmed": True, "human_livable": True, "expiry_starts": {"lte": self.today}, "expiry_ends": {"gte": self.today}}) + for build_part in build_parts: + part_obj = BuildPart( + id=build_part.id, + uu_id=build_part.uu_id, + part_no=build_part.part_no, + part_level=build_part.part_level, + part_code=build_part.part_code, + part_gross_size=build_part.part_gross_size, + part_net_size=build_part.part_net_size, + human_livable=build_part.human_livable, + build_id=build_part.build_id, + build_uu_id=build_part.build_uu_id, + is_confirmed=build_part.is_confirmed, + active=build_part.active, + living_spaces=[], + build=None + ) + living_spaces = await db.build_living_space.find_many( + include={"occupant_types": True, "people": {"include": {"users": True}}}, + where={"build_parts_id": build_part.id, "active": True, "is_confirmed": True, + "expiry_starts": {"lte": self.today}, "expiry_ends": {"gte": self.today}, "occupant_type_id": {"in": possible_money_sender_occupants}}, + ) + for living_space in living_spaces: + person = living_space.people + user = await db.users.find_first(where={"person_id": person.id, "active": True, "is_confirmed": True}) + user_of_person = None + if user: + user_of_person = User( + id=user.id, + uu_id=user.uu_id, + user_tag=user.user_tag, + user_type=user.user_type, + email=user.email, + phone_number=user.phone_number, + related_company=user.related_company, + is_confirmed=user.is_confirmed, + active=user.active + ) + person_obj = Person( + id=person.id, + uu_id=person.uu_id, + firstname=person.firstname, + surname=person.surname, + middle_name=person.middle_name, + birthname=person.birthname, + is_confirmed=person.is_confirmed, + active=person.active, + user=user_of_person ) - person_obj = Person( - id=person.id, - uu_id=person.uu_id, - firstname=person.firstname, - surname=person.surname, - middle_name=person.middle_name, - birthname=person.birthname, - is_confirmed=person.is_confirmed, - active=person.active, - user=user_of_person - ) - occupant_type = living_space.occupant_types - occupant_type_obj = OccupantType( - id=occupant_type.id, - uu_id=occupant_type.uu_id, - occupant_code=occupant_type.occupant_code, - occupant_type=occupant_type.occupant_type, - is_confirmed=occupant_type.is_confirmed, - active=occupant_type.active, - user_type_uu_id=occupant_type.user_type_uu_id - ) - living_space_obj = BuildLivingSpace( - id=living_space.id, - uu_id=living_space.uu_id, - expiry_starts=str(living_space.expiry_starts), - expiry_ends=str(living_space.expiry_ends), - fix_value=float(living_space.fix_value), - fix_percent=float(living_space.fix_percent), - agreement_no=living_space.agreement_no, - marketing_process=living_space.marketing_process, - build_parts_id=living_space.build_parts_id, - build_parts_uu_id=living_space.build_parts_uu_id, - person_id=living_space.person_id, - person_uu_id=living_space.person_uu_id, - occupant_type_id=living_space.occupant_type_id, - occupant_type_uu_id=living_space.occupant_type_uu_id, - is_confirmed=living_space.is_confirmed, - active=living_space.active, - person=person_obj, - occupant_types=occupant_type_obj - ) - part_obj.living_spaces.append(living_space_obj) - buildings_dict[str(build.id)].build_parts.append(part_obj) - return {i: v.dict(exclude_none=True) for i, v in buildings_dict.items()} + occupant_type = living_space.occupant_types + occupant_type_obj = OccupantType( + id=occupant_type.id, + uu_id=occupant_type.uu_id, + occupant_code=occupant_type.occupant_code, + occupant_type=occupant_type.occupant_type, + is_confirmed=occupant_type.is_confirmed, + active=occupant_type.active, + user_type_uu_id=occupant_type.user_type_uu_id + ) + living_space_obj = BuildLivingSpace( + id=living_space.id, + uu_id=living_space.uu_id, + expiry_starts=str(living_space.expiry_starts), + expiry_ends=str(living_space.expiry_ends), + fix_value=float(living_space.fix_value), + fix_percent=float(living_space.fix_percent), + agreement_no=living_space.agreement_no, + marketing_process=living_space.marketing_process, + build_parts_id=living_space.build_parts_id, + build_parts_uu_id=living_space.build_parts_uu_id, + person_id=living_space.person_id, + person_uu_id=living_space.person_uu_id, + occupant_type_id=living_space.occupant_type_id, + occupant_type_uu_id=living_space.occupant_type_uu_id, + is_confirmed=living_space.is_confirmed, + active=living_space.active, + person=person_obj, + occupant_types=occupant_type_obj + ) + part_obj.living_spaces.append(living_space_obj) + buildings_dict[str(build.id)].build_parts.append(part_obj) + return buildings_dict + async def retrieve_all_companies_data(self): + self.check_if_database_is_available() + async with self.prisma_service._asession() as db: + return db.companies.find_many(where={"active": True, "is_confirmed": True}) -def get_all_companies_data(): - return prisma_service.find_many(table="companies", query={"active": True, "is_confirmed": True}) + async def renew_requirements(self): + self.check_if_database_is_available() + async def set_to_redis(): + await self.set_count_person_data_due_to_build_info(count_person_data_due_to_build_info_db) + all_person_data = await self.retrieve_all_person_data_due_to_build() + await self.set_count_person_data_due_to_build_data(all_person_data) + return + + count_person_data_due_to_build_info_db = await self.get_count_person_data_due_to_build() + count_person_data_due_to_build_info_memory = await self.get_count_person_data_due_to_build_info() + if not count_person_data_due_to_build_info_memory: + return await set_to_redis() + + all_counts_in_memory = [count_person_data_due_to_build_info_memory.building_count, count_person_data_due_to_build_info_memory.living_space, count_person_data_due_to_build_info_memory.build_parts] + all_counts_in_db = [count_person_data_due_to_build_info_db.building_count, count_person_data_due_to_build_info_db.living_space, count_person_data_due_to_build_info_db.build_parts] + if not all_counts_in_memory == all_counts_in_db: + return await set_to_redis() diff --git a/ServicesTask/app/services/database/main.py b/ServicesTask/app/services/database/main.py index 9c2a266..4b96610 100644 --- a/ServicesTask/app/services/database/main.py +++ b/ServicesTask/app/services/database/main.py @@ -1,9 +1,11 @@ import os import asyncio -from prisma_client import PrismaService +from comment_requirements import DefaultImportsToMemory from services.common.service_base_async import ServiceBaseAsync -from app.services.types.task import Job +from services.types.task import Job + +from prisma_client import PrismaService PRODUCE_BURST = int(os.getenv("PRODUCE_BURST", "10")) @@ -12,13 +14,20 @@ EVENT_TYPE = os.getenv("EVENT_TYPE", "db-event") PROCESS_SEC = 10 prisma_service = PrismaService() +is_db_pulled = False async def produce(svc: ServiceBaseAsync): + global is_db_pulled + + # Get build info to memory + if not is_db_pulled: + await default_imports() + is_db_pulled = True async with prisma_service._asession() as db: # Routine Email Service routine_email_service_result = await db.account_records.find_many( - where={"is_email_send": False,"is_active": True, "is_confirmed": True, "is_deleted": False}, take=3, skip=0 + where={"is_email_send": False,"active": True, "is_confirmed": True, "deleted": False}, take=3, skip=0 ) if not routine_email_service_result: await asyncio.sleep(PROCESS_SEC) @@ -29,8 +38,6 @@ async def produce(svc: ServiceBaseAsync): generate_task__uuid += str(row["uu_id"])[:4] await svc.enqueue(task_id=generate_task__uuid, payload=routine_email_service_result, action="routine.email.send.service") # Get Build and Company Requirements - - await asyncio.sleep(PROCESS_SEC) @@ -55,7 +62,10 @@ async def consume_default(svc: ServiceBaseAsync, job: dict): await svc.dlq_current(job_model, error="unsupported_routing_key") -if __name__ == "__main__": +async def default_imports(): + update_comment_requirements = DefaultImportsToMemory(prisma_service) + await update_comment_requirements.renew_requirements() +if __name__ == "__main__": svc = ServiceBaseAsync(produce_fn=produce, consume_fn=consume_default, handlers={"parser.comment.publish": handle_comment_publish, "mail.service.publish": handle_routine_email_send_service_ack}) asyncio.run(svc.run()) diff --git a/ServicesTask/app/services/database/requirements.txt b/ServicesTask/app/services/database/requirements.txt index 4ef13f8..85f771e 100644 --- a/ServicesTask/app/services/database/requirements.txt +++ b/ServicesTask/app/services/database/requirements.txt @@ -3,4 +3,5 @@ prometheus-client>=0.20.0 uvloop>=0.19.0 prisma==0.9.1 asyncio==3.4.3 -arrow>=1.3.0 \ No newline at end of file +arrow>=1.3.0 +redis>=6.4.0 \ No newline at end of file diff --git a/ServicesTask/app/services/types/queue.py b/ServicesTask/app/services/types/queue.py index b42c292..9aac597 100644 --- a/ServicesTask/app/services/types/queue.py +++ b/ServicesTask/app/services/types/queue.py @@ -2,7 +2,7 @@ from json import dumps from typing import Any, Optional, Dict from pydantic import BaseModel -from app.core.utils import now_ms +from core.utils import now_ms class Enqueue(BaseModel):