diff --git a/ServicesTask/app/services/common/=6.4.0 b/ServicesTask/app/services/common/=6.4.0 new file mode 100644 index 0000000..e69de29 diff --git a/ServicesTask/app/services/common/config.py b/ServicesTask/app/services/common/config.py new file mode 100644 index 0000000..4509955 --- /dev/null +++ b/ServicesTask/app/services/common/config.py @@ -0,0 +1,14 @@ +import os + + +class RedisConfig: + + HOST: str = os.getenv("REDIS_HOST", "10.10.2.15") + PASSWORD: str = os.getenv("REDIS_PASSWORD", "your_strong_password_here") + PORT: int = int(os.getenv("REDIS_PORT", 6379)) + DB: int = int(os.getenv("REDIS_DB", 0)) + + @classmethod + def as_dict(cls): + return dict(host=RedisConfig.HOST, port=int(RedisConfig.PORT), password=RedisConfig.PASSWORD, db=int(RedisConfig.DB)) + diff --git a/ServicesTask/app/services/common/models.py b/ServicesTask/app/services/common/models.py new file mode 100644 index 0000000..9fbdf36 --- /dev/null +++ b/ServicesTask/app/services/common/models.py @@ -0,0 +1,93 @@ +from typing import Optional, List +from pydantic import BaseModel + + +class User(BaseModel): + id: int + uu_id: str + user_tag: str + user_type: str + email: str + phone_number: str + related_company: str + is_confirmed: bool + active: bool + + +class Person(BaseModel): + id: int + uu_id: str + firstname: str + surname: str + middle_name: Optional[str] = "" + birthname: Optional[str] = "" + # national_identity_id: str + is_confirmed: bool + active: bool + user: Optional[User] = None + + +class OccupantType(BaseModel): + id: int + uu_id: str + occupant_code: str + occupant_type: str + is_confirmed: bool + active: bool + user_type_uu_id: Optional[str] = None + + +class BuildPart(BaseModel): + id: int + uu_id: str + part_no: str + part_level: str + part_code: str + part_gross_size: float + part_net_size: float + human_livable: bool + build_id: int + build_uu_id: str + is_confirmed: bool + active: bool + living_spaces: Optional[List['BuildLivingSpace']] = None + + +class BuildLivingSpace(BaseModel): + id: int + uu_id: str + expiry_starts: str + expiry_ends: str + fix_value: float + fix_percent: float + agreement_no: str + marketing_process: bool + build_parts_id: int + build_parts_uu_id: str + person_id: int + person_uu_id: str + occupant_type_id: int + occupant_type_uu_id: str + is_confirmed: bool + active: bool + person: Optional[Person] = None + occupant_type: Optional[OccupantType] = None + + +class BuildingCluster(BaseModel): + id: int + uu_id: str + build_name: str + build_no: str + build_date: str + decision_period_date: str + expiry_starts: str + expiry_ends: str + is_confirmed: bool + active: bool + build_parts: List['BuildPart'] = [] + + +# Update forward references for models with circular dependencies +BuildPart.update_forward_refs() +BuildingCluster.update_forward_refs() diff --git a/ServicesTask/app/services/common/redis_handler.py b/ServicesTask/app/services/common/redis_handler.py new file mode 100644 index 0000000..38b8e6d --- /dev/null +++ b/ServicesTask/app/services/common/redis_handler.py @@ -0,0 +1,167 @@ +import logging + +from json import loads, dumps +from contextlib import contextmanager +from time import sleep +from redis import Redis, RedisError, ConnectionError as RedisConnectionError + +from config import RedisConfig + + +logger = logging.getLogger('RedisHandler') + + +@contextmanager +def safe_redis_operation(redis_client: Redis, operation_name: str = "Redis operation"): + """ + Context manager for safely executing Redis operations with error handling + """ + try: + yield redis_client + except RedisConnectionError as e: + logger.error(f"{operation_name} failed due to Redis connection error: {str(e)}") + raise + except RedisError as e: + logger.error(f"{operation_name} failed due to Redis error: {str(e)}") + raise + except Exception as e: + logger.error(f"{operation_name} failed with unexpected error: {str(e)}") + raise + + +class RedisHandler: + + """Singleton Redis handler class for centralized Redis operations""" + + _instance = None + REDIS_EXCEPTIONS = (RedisConnectionError, RedisError) + + def __new__(cls): + if cls._instance is None: + cls._instance = super(RedisHandler, cls).__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self): + # Initialize only once + if self._initialized: + return + + # Initialize Redis client with retry logic + self.redis_client = self._create_redis_client() + self.redis_connected = self._check_redis_connection() + self._initialized = True + + def _create_redis_client(self): + """Create a Redis client with connection retry""" + max_retries = 5 + retry_delay = 5 + + for attempt in range(max_retries): + try: + client = Redis(**RedisConfig.as_dict()) + client.ping() # Test the connection + logger.info("Redis connection established successfully") + return client + except (RedisConnectionError, RedisError) as e: + if attempt < max_retries - 1: + logger.warning(f"Redis connection attempt {attempt + 1} failed: {str(e)}. Retrying in {retry_delay} seconds...") + sleep(retry_delay) + retry_delay *= 2 # Exponential backoff + else: + logger.error(f"Failed to connect to Redis after {max_retries} attempts: {str(e)}") + # Continue with a new Redis client instance even if ping fails + # This allows the service to start and retry connections later + return Redis(**RedisConfig.as_dict()) + + def _check_redis_connection(self) -> bool: + """Check if Redis connection is alive""" + try: + self.ping() + return True + except Exception as e: + return False + + def ping(self): + """Ping Redis server to check connection""" + return self.redis_client.ping() + + def sadd(self, key: str, value): + """Add a value to a Redis set""" + return self.redis_client.sadd(key, value) + + def ismember(self, key: str, value): + """Check if a value is a member of a Redis set""" + return self.redis_client.sismember(key, value) + + def get(self, key: str): + """Get a value from Redis by key""" + return self.redis_client.get(key) + + def set(self, key: str, value): + """Set a key-value pair in Redis""" + return self.redis_client.set(key, value) + + def delete_value(self, key: str, value): + """Delete a value from a Redis value by finding key""" + get_redis = self.get(key) + if get_redis: + get_redis: dict = loads(get_redis) + get_redis.pop(value) + self.set(key, dumps(get_redis)) + + def rpush(self, key: str, value): + """Append a value to a Redis list""" + return self.redis_client.rpush(key, value) + + def lindex(self, key: str, index: int): + """Get an element from a Redis list by its index""" + return self.redis_client.lindex(key, index) + + def spop(self, key: str, count=1): + """Remove and return random members from a Redis set""" + return self.redis_client.spop(key, count) + + def srem(self, key: str, value): + """Remove a specific member from a Redis set""" + return self.redis_client.srem(key, value) + + def ensure_connection(self) -> bool: + """Check if Redis connection is alive and reconnect if needed""" + if not self.redis_connected: + try: + self.redis_client = self._create_redis_client() + self.redis_connected = self._check_redis_connection() + if self.redis_connected: + logger.info("Redis connection re-established successfully") + return self.redis_connected + except Exception as e: + logger.error(f"Failed to re-establish Redis connection: {str(e)}") + return False + return True + + @classmethod + def handle_reconnection(cls, consecutive_errors=0, max_consecutive_errors=5): + """ + Handle Redis reconnection with exponential backoff based on consecutive errors + Args: + consecutive_errors: Number of consecutive errors encountered + max_consecutive_errors: Threshold for extended sleep time + Returns: + tuple: (RedisHandler instance, bool indicating if extended sleep is needed) + """ + try: + # Get a fresh instance (will reconnect internally) + instance = cls() + instance.redis_connected = instance._check_redis_connection() + logger.info("Recreated Redis handler using singleton pattern") + + # Determine if extended sleep is needed + need_extended_sleep = consecutive_errors >= max_consecutive_errors + if need_extended_sleep: + logger.warning(f"Hit {max_consecutive_errors} consecutive Redis errors, taking longer pause") + return instance, need_extended_sleep + except Exception as redis_retry_error: + logger.error(f"Failed to recreate Redis handler: {str(redis_retry_error)}") + return None, consecutive_errors >= max_consecutive_errors + diff --git a/ServicesTask/app/services/common/service_base_async.py b/ServicesTask/app/services/common/service_base_async.py index a0e48ed..e818742 100644 --- a/ServicesTask/app/services/common/service_base_async.py +++ b/ServicesTask/app/services/common/service_base_async.py @@ -4,34 +4,10 @@ import asyncio import fnmatch import aio_pika -from pydantic import BaseModel -from core.utils import now_ms -from contextvars import ContextVar from aio_pika.abc import AbstractIncomingMessage -from typing import Any, Dict, Awaitable, Callable, Optional, List, NamedTuple - - -class _MsgCtx(NamedTuple): - msg: AbstractIncomingMessage - rk: str - attempts: int - - -_MSG_CTX: ContextVar[_MsgCtx | None] = ContextVar("_MSG_CTX", default=None) - - -class Meta(BaseModel): - routing_key: str - attempts: int - exchange: str - - -class Job(BaseModel): - task_id: str - type: str - payload: dict - created_at: int - _meta: Meta +from typing import Any, Dict, Awaitable, Callable, Optional, List +from app.services.types.task import _MsgCtx, _MSG_CTX +from app.services.types.queue import Enqueue class ServiceBaseAsync: @@ -43,7 +19,7 @@ class ServiceBaseAsync: - Retry: TTL'li retry kuyruğu (RETRY_DELAY_MS), sonra main'e geri DLX - Max deneme üstünde DLQ: q..events.dlq - Handler map: routing key -> özel callback (pattern destekli) - - Geriye uyumluluk: enqueue(payload, type_, routing_key=None, message_id=None) + - Geriye uyumluluk: enqueue(payload, action, routing_key=None, message_id=None) """ def __init__( @@ -115,15 +91,16 @@ class ServiceBaseAsync: await q_main.consume(self._on_message, no_ack=False) await asyncio.gather(self._produce_loop()) - async def enqueue(self, task_id: str, payload: Dict[str, Any], type_: Optional[str] = None, routing_key: Optional[str] = None, message_id: Optional[str] = None) -> str: + async def enqueue(self, enqueue: Enqueue) -> str: assert self.ex is not None - payload.setdefault("task_id", task_id) + payload: dict = enqueue.payload + payload.setdefault("task_id", enqueue.task_id) payload.setdefault("source", self.service_name) - body = json.dumps({"task_id": task_id, "type": type_, "payload": payload, "created_at": now_ms()}).encode() - msg = aio_pika.Message(body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT, message_id=message_id or task_id, headers={"x-attempts": 0}) - rk = routing_key or self.produce_key - await self.ex.publish(msg, routing_key=rk) - return task_id + enqueue.payload = payload + msg = aio_pika.Message(enqueue.body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT, message_id=enqueue.message_id or enqueue.task_id, headers={"x-attempts": 0}, type=enqueue.action) + routing_key = enqueue.routing_key or self.produce_key + await self.ex.publish(msg, routing_key=routing_key) + return enqueue.task_id async def ack_current(self) -> None: ctx = _MSG_CTX.get() diff --git a/ServicesTask/app/services/database/comment_requirements.py b/ServicesTask/app/services/database/comment_requirements.py new file mode 100644 index 0000000..ff60253 --- /dev/null +++ b/ServicesTask/app/services/database/comment_requirements.py @@ -0,0 +1,136 @@ +import arrow + +from app.services.common.models import BuildingCluster, BuildPart, BuildLivingSpace, Person, User, OccupantType +from app.services.database.prisma_client import PrismaService + + +prisma_service = PrismaService() + + +async def get_count_person_data_due_to_build(): + today = arrow.now().to('GMT+3').datetime + async with prisma_service._asession() as db: + occupant_flat_owner = await db.occupant_types.find_first(query={"occupant_code": "FL-OWN", "active": True, "is_confirmed": True}, include={"user_types": True}) + occupant_tenant = await db.occupant_types.find_first(query={"occupant_code": "FL-TEN", "active": True, "is_confirmed": True}, include={"user_types": True}) + possible_money_sender_occupants = [occupant_flat_owner.id, occupant_tenant.id] + building_count = await db.build.count(where={"active": True, "is_confirmed": True,"expiry_starts": {"lte": today}, "expiry_ends": {"gte": today}}) + build_parts_count = await db.build_parts.count(where={"active": True, "is_confirmed": True, "human_livable": True, "expiry_starts": {"lte": today}, "expiry_ends": {"gte": today}}) + living_spaces_count = await db.build_living_space.count( + where={"active": True, "is_confirmed": True, "expiry_starts": {"lte": today}, "expiry_ends": {"gte": today}, "occupant_type_id": {"in": possible_money_sender_occupants}}, + ) + return {"building_count": building_count, "living_space": living_spaces_count, "build_parts": build_parts_count} + # return {"building_count": 0, "living_space": 0, "build_parts": 0} + + +async def get_all_person_data_due_to_build(): + """ + Get all person data due to build with comprehensive inner joins + Returns a dictionary of buildings clustered with their build parts, people, and living spaces + """ + buildings_dict, today = {}, arrow.now().to('GMT+3').datetime + async with prisma_service._asession() as db: + occupant_flat_owner = await db.occupant_types.find_first(query={"occupant_code": "FL-OWN", "active": True, "is_confirmed": True}, include={"user_types": True}) + occupant_tenant = await db.occupant_types.find_first(query={"occupant_code": "FL-TEN", "active": True, "is_confirmed": True}, include={"user_types": True}) + possible_money_sender_occupants = [occupant_flat_owner.id, occupant_tenant.id] + buildings = await db.build.find_many(where={"active": True, "is_confirmed": True,"expiry_starts": {"lte": today}, "expiry_ends": {"gte": today}}) + for build in buildings: + buildings_dict[str(build.id)] = BuildingCluster( + id=build.id, + uu_id=build.uu_id, + build_name=build.build_name, + build_no=build.build_no, + build_date=str(build.build_date), + decision_period_date=str(build.decision_period_date), + expiry_starts=str(build.expiry_starts), + expiry_ends=str(build.expiry_ends), + is_confirmed=build.is_confirmed, + active=build.active, + build_parts=[] + ) + build_parts = db.build_parts(where={"build_id": build.id, "active": True, "is_confirmed": True, "human_livable": True, "expiry_starts": {"lte": today}, "expiry_ends": {"gte": today}}) + for build_part in build_parts: + part_obj = BuildPart( + id=build_part.id, + uu_id=build_part.uu_id, + part_no=build_part.part_no, + part_level=build_part.part_level, + part_code=build_part.part_code, + part_gross_size=build_part.part_gross_size, + part_net_size=build_part.part_net_size, + human_livable=build_part.human_livable, + build_id=build_part.build_id, + build_uu_id=build_part.build_uu_id, + is_confirmed=build_part.is_confirmed, + active=build_part.active, + living_spaces=[], + build=None + ) + living_spaces = db.build_living_space.find_many( + include={"occupant_types": True, "people": {"include": {"users": True}}}, + query={"build_parts_id": build_part.id, "active": True, "is_confirmed": True, "expiry_starts": {"lte": today}, "expiry_ends": {"gte": today}, "occupant_type_id": {"in": possible_money_sender_occupants}}, + ) + for living_space in living_spaces: + person = living_space.people + user = db.users.find_first(where={"person_id": person.id, "active": True, "is_confirmed": True}) + user_of_person = None + if user: + user_of_person = User( + id=user.id, + uu_id=user.uu_id, + user_tag=user.user_tag, + user_type=user.user_type, + email=user.email, + phone_number=user.phone_number, + related_company=user.related_company, + is_confirmed=user.is_confirmed, + active=user.active + ) + person_obj = Person( + id=person.id, + uu_id=person.uu_id, + firstname=person.firstname, + surname=person.surname, + middle_name=person.middle_name, + birthname=person.birthname, + is_confirmed=person.is_confirmed, + active=person.active, + user=user_of_person + ) + occupant_type = living_space.occupant_types + occupant_type_obj = OccupantType( + id=occupant_type.id, + uu_id=occupant_type.uu_id, + occupant_code=occupant_type.occupant_code, + occupant_type=occupant_type.occupant_type, + is_confirmed=occupant_type.is_confirmed, + active=occupant_type.active, + user_type_uu_id=occupant_type.user_type_uu_id + ) + living_space_obj = BuildLivingSpace( + id=living_space.id, + uu_id=living_space.uu_id, + expiry_starts=str(living_space.expiry_starts), + expiry_ends=str(living_space.expiry_ends), + fix_value=float(living_space.fix_value), + fix_percent=float(living_space.fix_percent), + agreement_no=living_space.agreement_no, + marketing_process=living_space.marketing_process, + build_parts_id=living_space.build_parts_id, + build_parts_uu_id=living_space.build_parts_uu_id, + person_id=living_space.person_id, + person_uu_id=living_space.person_uu_id, + occupant_type_id=living_space.occupant_type_id, + occupant_type_uu_id=living_space.occupant_type_uu_id, + is_confirmed=living_space.is_confirmed, + active=living_space.active, + person=person_obj, + occupant_types=occupant_type_obj + ) + part_obj.living_spaces.append(living_space_obj) + buildings_dict[str(build.id)].build_parts.append(part_obj) + return {i: v.dict(exclude_none=True) for i, v in buildings_dict.items()} + + +def get_all_companies_data(): + return prisma_service.find_many(table="companies", query={"active": True, "is_confirmed": True}) + diff --git a/ServicesTask/app/services/database/main.py b/ServicesTask/app/services/database/main.py index 168f9b9..9c2a266 100644 --- a/ServicesTask/app/services/database/main.py +++ b/ServicesTask/app/services/database/main.py @@ -2,7 +2,8 @@ import os import asyncio from prisma_client import PrismaService -from services.common.service_base_async import ServiceBaseAsync, Job +from services.common.service_base_async import ServiceBaseAsync +from app.services.types.task import Job PRODUCE_BURST = int(os.getenv("PRODUCE_BURST", "10")) @@ -15,10 +16,21 @@ prisma_service = PrismaService() async def produce(svc: ServiceBaseAsync): async with prisma_service._asession() as db: - result = await db.account_records.find_many(take=10, skip=0) - result: list = prisma_service.to_dict(result, select={"id": True, "uu_id": True, "iban": True, "bank_reference_code": True, "bank_date": True, "bank_balance": True}) - for row in result: - await svc.enqueue(task_id=row["uu_id"], payload=row, type_="database.account.records") + # Routine Email Service + routine_email_service_result = await db.account_records.find_many( + where={"is_email_send": False,"is_active": True, "is_confirmed": True, "is_deleted": False}, take=3, skip=0 + ) + if not routine_email_service_result: + await asyncio.sleep(PROCESS_SEC) + return + routine_email_service_result: list = prisma_service.to_dict(routine_email_service_result, select={"id": True, "uu_id": True, "iban": True, "bank_reference_code": True, "bank_date": True, "bank_balance": True}) + generate_task__uuid = "" + for row in routine_email_service_result: + generate_task__uuid += str(row["uu_id"])[:4] + await svc.enqueue(task_id=generate_task__uuid, payload=routine_email_service_result, action="routine.email.send.service") + # Get Build and Company Requirements + + await asyncio.sleep(PROCESS_SEC) @@ -29,6 +41,13 @@ async def handle_comment_publish(svc: ServiceBaseAsync, job: dict): print("handle_comment_publish Database Consumer from comment:", job_model.task_id) +async def handle_routine_email_send_service_ack(svc: ServiceBaseAsync, job: dict): + job_model = Job(**job) + await svc.ack_current() + print("handle_routine_email_send_service_ack Database Consumer from routine.email.send.service:", job_model.task_id) + return + + async def consume_default(svc: ServiceBaseAsync, job: dict): job_model = Job(**job) await asyncio.sleep(PROCESS_SEC) @@ -38,5 +57,5 @@ async def consume_default(svc: ServiceBaseAsync, job: dict): if __name__ == "__main__": - svc = ServiceBaseAsync(produce_fn=produce, consume_fn=consume_default, handlers={"parser.comment.publish": handle_comment_publish}) + svc = ServiceBaseAsync(produce_fn=produce, consume_fn=consume_default, handlers={"parser.comment.publish": handle_comment_publish, "mail.service.publish": handle_routine_email_send_service_ack}) asyncio.run(svc.run()) diff --git a/ServicesTask/app/services/database/pyproject.toml b/ServicesTask/app/services/database/pyproject.toml index 945330f..7c95c18 100644 --- a/ServicesTask/app/services/database/pyproject.toml +++ b/ServicesTask/app/services/database/pyproject.toml @@ -17,7 +17,8 @@ dependencies = [ "uvloop>=0.19.0", "prisma==0.9.1", "asyncio==3.4.3", - "arrow>=1.3.0" + "arrow>=1.3.0", + "redis>=6.4.0" ] [project.optional-dependencies] diff --git a/ServicesTask/app/services/mail/Dockerfile b/ServicesTask/app/services/mail/Dockerfile index c3d5064..fa5715e 100644 --- a/ServicesTask/app/services/mail/Dockerfile +++ b/ServicesTask/app/services/mail/Dockerfile @@ -11,6 +11,7 @@ COPY app/services/mail/README.md ./ COPY app/core ./app/core COPY app/services/common/ ./app/services/common/ COPY app/services/mail/ ./app/services/mail/ +COPY app/services/types/ ./app/services/types/ RUN pip install --upgrade pip && pip install --no-cache-dir . RUN mkdir -p /app/data diff --git a/ServicesTask/app/services/mail/IsBank/params.py b/ServicesTask/app/services/mail/IsBank/params.py index c823de1..8b65a05 100644 --- a/ServicesTask/app/services/mail/IsBank/params.py +++ b/ServicesTask/app/services/mail/IsBank/params.py @@ -9,6 +9,7 @@ class IsBankConfig: NO_ATTACHMENT_FOLDER: str = "NoAttachment" COMPLETED_FOLDER: str = "Completed" SERVICE_NAME: str = "IsBankEmailService" + BANK_NAME: str = "IsBank" TASK_DATA_PREFIX: str = ConfigServices.MAIN_TASK_PREFIX TASK_MAILID_INDEX_PREFIX: str = ConfigServices.TASK_MAILID_INDEX_PREFIX TASK_UUID_INDEX_PREFIX: str = ConfigServices.TASK_UUID_INDEX_PREFIX diff --git a/ServicesTask/app/services/mail/IsBank/runner.py b/ServicesTask/app/services/mail/IsBank/runner.py index 70cd87e..ccf436c 100644 --- a/ServicesTask/app/services/mail/IsBank/runner.py +++ b/ServicesTask/app/services/mail/IsBank/runner.py @@ -13,16 +13,28 @@ basicConfig(level=INFO, format=format, handlers=handlers) logger = getLogger(IsBankConfig.SERVICE_NAME) -def initialize_service(): +def drop(email_service: EmailReaderService): + """Clean up resources""" + try: + email_service.commit() + except Exception as e: + print(f"Error during commit on drop: {str(e)}") + try: + email_service.logout() + except Exception as e: + print(f"Error during logout on drop: {str(e)}") + + +def initialize_service() -> EmailReaderService: """Initialize the service with proper error handling""" try: logger.info("Creating EmailReaderService") email_service = EmailReaderService(IsBankConfig()) - logger.info("Connecting to email service") email_service.login_and_connect() return email_service except Exception as e: + drop(email_service) logger.error(f"Service initialization failed: {str(e)}") sleep(5) return initialize_service() diff --git a/ServicesTask/app/services/mail/mail_service_async.py b/ServicesTask/app/services/mail/mail_service_async.py deleted file mode 100644 index f218650..0000000 --- a/ServicesTask/app/services/mail/mail_service_async.py +++ /dev/null @@ -1,15 +0,0 @@ -import asyncio -import uuid -from services.service_base_async import ServiceBaseAsync - -async def produce(service: ServiceBaseAsync): - fake_mails = [{"to": "user@example.com", "subj": "Hi", "body": "Hello!"}] - for mail in fake_mails: - await service.enqueue(mail, "send-mail") - -async def consume(service: ServiceBaseAsync, job: dict): - print(f"[MAIL] Gönderiliyor: {job}") - await asyncio.sleep(0.1) - -if __name__ == "__main__": - asyncio.run(ServiceBaseAsync(produce, consume).run()) diff --git a/ServicesTask/app/services/mail/main.py b/ServicesTask/app/services/mail/main.py index 093b970..87e2e70 100644 --- a/ServicesTask/app/services/mail/main.py +++ b/ServicesTask/app/services/mail/main.py @@ -1,12 +1,14 @@ import os import asyncio -from typing import List from app.services.mail.IsBank.runner import initialize_service +from app.services.mail.mail_handler import Mails +from app.services.mail.IsBank.params import IsBankConfig from app.services.common.service_base_async import ServiceBaseAsync -from .mail_handler import Mails -from .IsBank.params import IsBankConfig +from app.services.types.queue import Enqueue +from app.services.types.mail import MailParsedResult, ProcessMailObject +from app.services.types.task import Job PRODUCE_BURST = int(os.getenv("PRODUCE_BURST", "10")) @@ -22,7 +24,7 @@ def generate_unique_with_mail_id(mail_id: str, service_prefix: str): return f"{service_prefix}_{mail_id}" -def process_mail_with_attachments(mail: Mails, mail_id: str): +def process_mail_with_attachments(mail: Mails, mail_id: str, count: int, total: int) -> ProcessMailObject: """ Process an email with attachments using MailReaderService Args: mail: Mail object, mail_id: Mail ID @@ -31,74 +33,70 @@ def process_mail_with_attachments(mail: Mails, mail_id: str): try: mail_to_dict = mail.to_dict() task_uuid = generate_unique_with_mail_id(mail_id, IsBankConfig.SERVICE_NAME) - process_mail_dict = dict(mail_id=mail_id, mail_data=mail_to_dict, service_prefix=email_service.config.SERVICE_PREFIX) - return task_uuid, process_mail_dict + return ProcessMailObject(uuid=task_uuid, id=mail_id, data=mail_to_dict, service=email_service.config.SERVICE_PREFIX, count=count, total=total, attachments=mail.attachments) except Exception as e: - print(f"Email Service Runner Error processing mail {mail_id}: {str(e)}") - raise + raise Exception(f"Email Service Runner Error processing mail {mail_id}: {str(e)}") -def drop(): - """Clean up resources""" - try: - email_service.commit() - except Exception as e: - print(f"Error during commit on drop: {str(e)}") - try: - email_service.logout() - except Exception as e: - print(f"Error during logout on drop: {str(e)}") - - -async def produce(svc: ServiceBaseAsync): +# Isbank producer mail Reader +async def produce(svc: ServiceBaseAsync): mails, count, length = email_service.refresh() + if not mails: + await asyncio.sleep(PROCESS_SEC) + return for mail in mails: if not getattr(mail, 'id', None): - print("Skipping email with no ID") + print(f"Skipping email with no ID: {mail.subject}") continue mail_id, mail_dict = mail.id.decode('utf-8'), mail.to_dict() try: if mail.attachments: - if any([str(attachment['filename']).lower().endswith('.pdf') for attachment in mail_dict['attachments']]): - email_service.mark_no_attachment(mail_id) - else: - task_uuid, process_mail_dict = process_mail_with_attachments(mail, mail_id) - await svc.enqueue(task_id=task_uuid, payload=process_mail_dict, type_="mail.service.isbank") - else: - email_service.mark_no_attachment(mail_id) + is_attachment_pdf = any([str(attachment['filename']).lower().endswith('.pdf') for attachment in mail_dict['attachments']]) + if not is_attachment_pdf: + process_mail_object = process_mail_with_attachments(mail, mail_id, count, length) + enqueue = Enqueue(task_id=process_mail_object.uuid, payload=process_mail_object.model_dump(), action=IsBankConfig.SERVICE_NAME) + await svc.enqueue(enqueue) + await svc.ack_current() + print(f"Mail Consumer from parser with attachments : {mail_id}") + continue + print(f"Mail Consumer from parser with no attachments : {mail_id}") + email_service.mark_no_attachment(mail_id) + await svc.ack_current() except Exception as e: print(f"Error processing email {mail_id}: {str(e)}") + await svc.retry_current() continue await asyncio.sleep(PROCESS_SEC) async def handle_from_parser(svc: ServiceBaseAsync, job): - print("Mail Consumer from parser:", job) + job_model = Job(**job) + await svc.ack_current() + print("Mail Consumer from parser :", job_model.model_dump()) await asyncio.sleep(PROCESS_SEC) return -async def handle_database_publish(svc: ServiceBaseAsync, job): +async def handle_parser_excel(svc: ServiceBaseAsync, job): + job_model = Job(**job) + parsed_result = MailParsedResult(**job_model.payload) + if parsed_result.send_to == "Completed": + print("Mail Consumer from parser excel :", parsed_result.mail_data.id) + email_service.mark_completed(parsed_result.mail_data.id) + await svc.ack_current() await asyncio.sleep(PROCESS_SEC) - print("Mail Consumer from database:", job) return -async def handle_from_mail(svc: ServiceBaseAsync, job): +async def consume_default(svc: ServiceBaseAsync, job): + job_model = Job(**job) + await svc.dlq_current() await asyncio.sleep(PROCESS_SEC) - print("Mail Consumer from mail:", job) - return - - -async def consume_default(svc, job): - await asyncio.sleep(PROCESS_SEC) - print("Mail Consumer default:", job) + print("Mail Consumer default:", job_model.model_dump()) return if __name__ == "__main__": - svc = ServiceBaseAsync(produce, consume_default, - handlers={"parser.publish": handle_from_parser, "mail.publish": handle_from_mail, "database.publish": handle_database_publish} - ) + svc = ServiceBaseAsync(produce, consume_default, handlers={"parser.comment.publish": handle_from_parser, "parser.excel.publish": handle_parser_excel}) asyncio.run(svc.run()) diff --git a/ServicesTask/app/services/parser/comment/Dockerfile b/ServicesTask/app/services/parser/comment/Dockerfile index 6e96476..02ca562 100644 --- a/ServicesTask/app/services/parser/comment/Dockerfile +++ b/ServicesTask/app/services/parser/comment/Dockerfile @@ -11,6 +11,7 @@ COPY app/services/parser/comment/README.md ./ COPY app/core ./app/core COPY app/services/common/ ./app/services/common/ COPY app/services/parser/comment/ ./app/services/parser/comment/ +COPY app/services/types/ ./app/services/types/ RUN pip install --upgrade pip && pip install --no-cache-dir . RUN mkdir -p /app/data diff --git a/ServicesTask/app/services/parser/comment/main.py b/ServicesTask/app/services/parser/comment/main.py index afa4516..de85bf2 100644 --- a/ServicesTask/app/services/parser/comment/main.py +++ b/ServicesTask/app/services/parser/comment/main.py @@ -1,6 +1,11 @@ import asyncio -from app.services.common.service_base_async import ServiceBaseAsync, Job +from app.services.common.service_base_async import ServiceBaseAsync +from app.services.types.queue import Enqueue +from app.services.types.task import Job +from app.services.types.mail import MailParsedResult +from app.services.types.mail import PlainMailReader +from app.services.types.mail import ProcessMailObject PROCESS_SEC = 10 @@ -12,10 +17,11 @@ async def produce(_svc: ServiceBaseAsync): async def handle_excel_publish(svc: ServiceBaseAsync, job: dict): + print("Parser Comment Consumer from excel handle_excel_publish :", job) job_model = Job(**job) mail_id = job_model.payload['mail_id'] - task_id = f"IsBankServiceMailParser_{mail_id}" - await svc.enqueue(task_id=task_id, payload=job_model.payload, type_="parser.comment.publish") + task_id = f"IsBankServiceCommentParser_{mail_id}" + await svc.enqueue(task_id=task_id, payload=job_model.payload, action="parser.comment.publish") print("Parser Comment Consumer from excel handle_excel_publish :", job_model.task_id) await svc.ack_current() await asyncio.sleep(PROCESS_SEC) diff --git a/ServicesTask/app/services/parser/excel/Dockerfile b/ServicesTask/app/services/parser/excel/Dockerfile index 71725b2..741c645 100644 --- a/ServicesTask/app/services/parser/excel/Dockerfile +++ b/ServicesTask/app/services/parser/excel/Dockerfile @@ -10,6 +10,7 @@ COPY app/services/parser/excel/README.md ./ COPY app/core ./app/core COPY app/services/common/ ./app/services/common/ +COPY app/services/types/ ./app/services/types/ COPY app/services/parser/excel/ ./app/services/parser/excel/ RUN pip install --upgrade pip && pip install --no-cache-dir . diff --git a/ServicesTask/app/services/parser/excel/isbank/parser.py b/ServicesTask/app/services/parser/excel/isbank/parser.py new file mode 100644 index 0000000..0f4cfa2 --- /dev/null +++ b/ServicesTask/app/services/parser/excel/isbank/parser.py @@ -0,0 +1,118 @@ +import sys +import logging +from time import sleep +from typing import List +import pandas as pd + +from datetime import datetime +from io import BytesIO +from base64 import b64decode +from unidecode import unidecode + +from app.services.types.mail import ProcessMailObject, MailParser + + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[logging.StreamHandler(sys.stdout), logging.FileHandler('isbank_parser_service.log')] +) +logger = logging.getLogger('IsBank_Parser_Service') + + +class IsbankMailParserService: + + + def try_dataframe_extract_with_xlsx(self, binary_data: BytesIO): + try: + df = pd.read_excel(binary_data, engine='openpyxl') + return df + except Exception as e: + return None + + def try_dataframe_extract_with_xls(self, binary_data: BytesIO): + try: + df = pd.read_excel(binary_data, engine='xlrd') + return df + except Exception as e: + return None + + def try_dataframe_extract_else(self, binary_data: BytesIO): + try: + df = pd.read_excel(binary_data, engine='openpyxl') + except Exception as e1: + try: + binary_data.seek(0) + df = pd.read_excel(binary_data, engine='xlrd') + except Exception as e2: + return None + return df + + def parse_record(self, excel_frame: pd.DataFrame, file_name: str) -> list[dict]: + """Parse Excel file data. + + Args: + excel_frame: DataFrame containing Excel data + + Returns: + list[dict]: List of parsed data dictionaries + """ + iban, data_list = "", [] + try: + for row in excel_frame.itertuples(): + if "IBAN" in str(row[3]).upper(): + iban = str(row[5]).replace(" ", "") + if not str(row[1]) == "nan" and not str(row[2]) == "nan": + if len(str(row[1]).split("/")) > 2: + data_list.append(dict( + filename=file_name, iban=str(iban), bank_date=datetime.strptime(str(row[1]), "%d/%m/%Y-%H:%M:%S").strftime("%Y-%m-%d %H:%M:%S"), + channel_branch=unidecode(str(row[3])), currency_value=(float(str(row[4]).replace(",", "")) if row[4] else 0), + balance=(float(str(row[5]).replace(",", "")) if row[5] else 0), additional_balance=(float(str(row[6]).replace(",", "")) if row[6] else 0), + process_name=str(row[7]), process_type=unidecode(str(row[8])), process_comment=unidecode(str(row[9])), bank_reference_code=str(row[15]), + )) + except Exception as e: + print(f"[PARSER_SERVICE] Error parsing Excel file: {str(e)}") + return data_list + + def parse_dataframes(self, dataframe: pd.DataFrame, task: ProcessMailObject, attachment_data: MailParser): + data_list = self.parse_record(dataframe, attachment_data.filename) + print(f"[PARSER_SERVICE] Successfully parsed {len(data_list)} records from Excel file") + if data_list: + print(f"Updated service data for task {task.uuid} with {len(data_list)} records") + return data_list + return None + + def process_task(self, active_task: ProcessMailObject): + """Process a task object using the MailParserService + Args: task: RedisTaskObject or task dictionary to process + """ + try: + for attachment in active_task.data.attachments: + task_id = active_task.data.id + if not attachment or not attachment.data: + print(f"[PARSER_SERVICE] No data found for task {task_id}") + continue + binary_data: bytes = b64decode(attachment.data) + excel_data = BytesIO(binary_data) + df = self.try_dataframe_extract_with_xlsx(excel_data) + if df is None: + excel_data.seek(0) + df = self.try_dataframe_extract_with_xls(excel_data) + if df is None: + excel_data.seek(0) + df = self.try_dataframe_extract_else(excel_data) + if df is not None: + return self.parse_dataframes(df, active_task, attachment) + except Exception as e: + print(f"[PARSER_SERVICE] Error processing task: {str(e)}") + + +parser = IsbankMailParserService() + + +def parse_isbank_mail(mailObject: ProcessMailObject): + try: + return parser.process_task(mailObject) + except Exception as e: + print(f"[PARSER_SERVICE] Error parsing mail: {str(e)}") + return None diff --git a/ServicesTask/app/services/parser/excel/main.py b/ServicesTask/app/services/parser/excel/main.py index cff50bf..b92260b 100644 --- a/ServicesTask/app/services/parser/excel/main.py +++ b/ServicesTask/app/services/parser/excel/main.py @@ -1,40 +1,79 @@ import os import asyncio +import fnmatch +from typing import Awaitable, Callable -from app.services.common.service_base_async import ServiceBaseAsync, Job +from app.services.parser.excel.isbank.parser import parse_isbank_mail +from app.services.types.mail import MailParsedResult, PlainMailReader, ProcessMailObject +from app.services.common.service_base_async import ServiceBaseAsync +from app.services.types.queue import Enqueue +from app.services.types.task import Job PRODUCE_BURST = int(os.getenv("PRODUCE_BURST", "10")) PRODUCE_ONCE = os.getenv("PRODUCE_ONCE", "true").lower() == "true" EVENT_TYPE = os.getenv("EVENT_TYPE", "db-mongo") + PROCESS_SEC = 10 +bank_mail_dict = { + "ileti.isbank.com.tr": parse_isbank_mail +} + + +def grab_fn_callable(domain: str) -> Callable[[ServiceBaseAsync, dict], Awaitable[None]]: + for pat, fn in bank_mail_dict.items(): + if fnmatch.fnmatch(domain, pat): + return fn + return None + + async def produce(svc: ServiceBaseAsync): await asyncio.sleep(PROCESS_SEC) -async def handle_from_parser(svc: ServiceBaseAsync, job): - job = Job(**job) +async def handle_from_parser(svc: ServiceBaseAsync, job: dict): + job_model = Job(**job) await svc.ack_current() await asyncio.sleep(PROCESS_SEC) -async def handle_from_mail(svc: ServiceBaseAsync, job): - job = Job(**job) - await svc.enqueue(task_id=job.task_id, payload=job.payload, type_="parser.excel.publish") - print("Parser Excel Consumer from mail handle_from_mail :", job.task_id) - await svc.ack_current() +async def handle_from_mail_parser(svc: ServiceBaseAsync, job: dict): + job_model = Job(**job) + process_mail_object = ProcessMailObject(**job_model.payload) + mail_id = process_mail_object.data.id + task_id = f"IsBankServiceExcelParser_{mail_id}" await asyncio.sleep(PROCESS_SEC) + function_handler = grab_fn_callable(process_mail_object.data.from_.domain) + if not function_handler: + await svc.dlq_current(job, error="Invalid domain") + return + parsed_data = function_handler(process_mail_object) + if not parsed_data: + plain_mail_data = PlainMailReader(**process_mail_object.data.model_dump()) + parsed_result = MailParsedResult(task_id=task_id, mail_data=plain_mail_data.model_dump(), send_to="Completed", data=parsed_data) + print("Parser Excel Consumer from mail handle_from_mail :", parsed_result) + enqueue = Enqueue(task_id=task_id, payload=parsed_result.model_dump(), action="mail.service.publish") + await svc.enqueue(enqueue) + await svc.ack_current() + else: + plain_mail_data = PlainMailReader(**process_mail_object.data.model_dump()) + parsed_result = MailParsedResult(task_id=task_id, mail_data=plain_mail_data.model_dump(), send_to="Completed", data=parsed_data) + enqueue = Enqueue(task_id=task_id, payload=parsed_result.model_dump(), action="parser.comment.publish") + await svc.enqueue(enqueue) + await svc.ack_current() + print("Parser Excel Consumer from mail handle_from_mail :", task_id) -async def consume_default(svc: ServiceBaseAsync, job): - job = Job(**job) +async def consume_default(svc: ServiceBaseAsync, job: dict): + + job_model = Job(**job) await svc.ack_current() await asyncio.sleep(PROCESS_SEC) if __name__ == "__main__": - svc = ServiceBaseAsync(produce_fn=produce, consume_fn=consume_default, handlers={"parser.mail.publish": handle_from_mail}) + svc = ServiceBaseAsync(produce_fn=produce, consume_fn=consume_default, handlers={"parser.mail.publish": handle_from_mail_parser}) asyncio.run(svc.run()) diff --git a/ServicesTask/app/services/parser/excel/pyproject.toml b/ServicesTask/app/services/parser/excel/pyproject.toml index 986279f..0f46ae8 100644 --- a/ServicesTask/app/services/parser/excel/pyproject.toml +++ b/ServicesTask/app/services/parser/excel/pyproject.toml @@ -16,7 +16,12 @@ dependencies = [ "aio-pika>=9.4.1", "prometheus-client>=0.20.0", "uvloop>=0.19.0", - "pydantic" + "pydantic", + "arrow>=1.3.0", + "pandas>=2.2.3", + "unidecode>=1.3.8", + "xlrd>=2.0.1", + "openpyxl>=3.1.2", ] [project.optional-dependencies] diff --git a/ServicesTask/app/services/parser/mail/Dockerfile b/ServicesTask/app/services/parser/mail/Dockerfile index 8a72a91..11cae90 100644 --- a/ServicesTask/app/services/parser/mail/Dockerfile +++ b/ServicesTask/app/services/parser/mail/Dockerfile @@ -10,6 +10,7 @@ COPY app/services/parser/mail/README.md ./ COPY app/core ./app/core COPY app/services/common/ ./app/services/common/ +COPY app/services/types/ ./app/services/types/ COPY app/services/parser/mail/ ./app/services/parser/mail/ RUN pip install --upgrade pip && pip install --no-cache-dir . diff --git a/ServicesTask/app/services/parser/mail/main.py b/ServicesTask/app/services/parser/mail/main.py index e817b7d..486731e 100644 --- a/ServicesTask/app/services/parser/mail/main.py +++ b/ServicesTask/app/services/parser/mail/main.py @@ -1,7 +1,11 @@ import os import asyncio +import fnmatch -from app.services.common.service_base_async import ServiceBaseAsync, Job +from app.services.types.queue import Enqueue +from app.services.common.service_base_async import ServiceBaseAsync +from app.services.types.task import Job +from app.services.types.mail import ProcessMailObject PRODUCE_BURST = int(os.getenv("PRODUCE_BURST", "10")) @@ -15,16 +19,20 @@ async def produce(svc: ServiceBaseAsync): async def handle_mail_publish(svc: ServiceBaseAsync, job: dict): + # Check for bank mail is from which Bank and to which bulding job_model = Job(**job) - mail_id = job_model.payload['mail_id'] + process_mail_object = ProcessMailObject(**job_model.payload) + mail_id = process_mail_object.data.id task_id = f"IsBankServiceMailParser_{mail_id}" - await svc.enqueue(task_id=task_id, payload=job_model.payload, type_="parser.excel.publish") - print("Parser Mail Consumer parsed handle_mail_publish :", job_model.task_id) + enqueue = Enqueue(task_id=task_id, payload=process_mail_object.model_dump(), action="parser.excel.publish") + await svc.enqueue(enqueue) + print(f"Parser Mail Consumer parsed handle_mail_publish : {enqueue.task_id}") await svc.ack_current() await asyncio.sleep(PROCESS_SEC) async def handle_mongo_publish(svc: ServiceBaseAsync, job: dict): + print('job', job) job_model = Job(**job) await svc.ack_current() print("Parser Mail Consumer default handle_mongo_publish :", job_model.task_id) diff --git a/ServicesTask/app/services/types/mail.py b/ServicesTask/app/services/types/mail.py new file mode 100644 index 0000000..577ce13 --- /dev/null +++ b/ServicesTask/app/services/types/mail.py @@ -0,0 +1,67 @@ +from pydantic import BaseModel +from typing import List, Optional, Literal + + +class FromToHeader(BaseModel): + + display_name: Optional[str] + username: Optional[str] + domain: Optional[str] + mail: Optional[str] + + +class MailParser(BaseModel): + + filename: str + content_type: str + charset: str + data: str + + +class PlainMailReader(BaseModel): + + id: str + subject: str + from_: FromToHeader + to: List[FromToHeader] + date: str + + +class MailReader(BaseModel): + + id: str + subject: str + from_: FromToHeader + to: List[FromToHeader] + date: str + body_text: str + attachments: List[MailParser] + + +class MailTaskObject(BaseModel): + + task: str + data: MailReader + completed: bool + service: str + status: str + created_at: str + is_completed: bool + + +class ProcessMailObject(BaseModel): + + service: str + uuid: str + id: int + data: MailReader + count: int + total: int + + +class MailParsedResult(BaseModel): + + task_id: str + mail_data: PlainMailReader + send_to: Literal["Completed", "NoAttachments", "Archive"] + data: Optional[List[dict]] = None diff --git a/ServicesTask/app/services/types/queue.py b/ServicesTask/app/services/types/queue.py new file mode 100644 index 0000000..b42c292 --- /dev/null +++ b/ServicesTask/app/services/types/queue.py @@ -0,0 +1,18 @@ +from json import dumps +from typing import Any, Optional, Dict +from pydantic import BaseModel + +from app.core.utils import now_ms + + +class Enqueue(BaseModel): + + task_id: str + payload: Dict[str, Any] + action: Optional[str] = None + routing_key: Optional[str] = None + message_id: Optional[str] = None + + @property + def body(self): + return dumps({"task_id": self.task_id, "action": self.action, "payload": self.payload, "created_at": now_ms()}).encode() diff --git a/ServicesTask/app/services/types/task.py b/ServicesTask/app/services/types/task.py new file mode 100644 index 0000000..a95e709 --- /dev/null +++ b/ServicesTask/app/services/types/task.py @@ -0,0 +1,31 @@ +from contextvars import ContextVar +from typing import NamedTuple, Optional +from pydantic import BaseModel +from aio_pika.abc import AbstractIncomingMessage + + +class _MsgCtx(NamedTuple): + + msg: AbstractIncomingMessage + rk: str + attempts: int + + +_MSG_CTX: ContextVar[_MsgCtx | None] = ContextVar("_MSG_CTX", default=None) + + +class Meta(BaseModel): + + routing_key: str + attempts: int + exchange: str + + +class Job(BaseModel): + + task_id: str + action: str + payload: dict + created_at: int + _meta: Meta + message_id: Optional[str] = None diff --git a/ServicesTask/docker-compose.yml b/ServicesTask/docker-compose.yml index c9a2480..53c1c4f 100644 --- a/ServicesTask/docker-compose.yml +++ b/ServicesTask/docker-compose.yml @@ -106,7 +106,7 @@ services: driver: "json-file" options: { max-size: "10m", max-file: "3" } - db-service: + database-service: build: context: . dockerfile: app/services/database/Dockerfile @@ -118,7 +118,7 @@ services: environment: SERVICE_NAME: "database-service" PRODUCE_KEY: "database.service.publish" - CONSUME_BINDINGS: "parser.comment.publish," + CONSUME_BINDINGS: "parser.comment.publish, mail.service.publish" RABBITMQ_URL: amqp://admin:admin@rabbitmq:5672/ EXCHANGE_EVENTS: "app.events" PRODUCE_ONCE: "true" @@ -147,7 +147,7 @@ services: environment: SERVICE_NAME: "mail-service" PRODUCE_KEY: "mail.service.publish" - CONSUME_BINDINGS: "database.service.publish,mongo.service.publish" + CONSUME_BINDINGS: "parser.comment.publish, database.service.publish, parser.excel.publish" RABBITMQ_URL: amqp://admin:admin@rabbitmq:5672/ EXCHANGE_EVENTS: "app.events" PRODUCE_ONCE: "true" @@ -177,6 +177,7 @@ services: EXCHANGE_EVENTS: "app.events" CONSUME_BINDINGS: "mail.service.publish," PRODUCE_KEY: "parser.mail.publish" + PRODUCE_ONCE: "true" RETRY_DELAY_MS: "5000" MAX_RETRIES: "3" PREFETCH: "5"