diff --git a/ServicesTask/app/services/common/service_base_async.py b/ServicesTask/app/services/common/service_base_async.py index d5cbf38..a0e48ed 100644 --- a/ServicesTask/app/services/common/service_base_async.py +++ b/ServicesTask/app/services/common/service_base_async.py @@ -1,10 +1,10 @@ import os import json -import uuid import asyncio import fnmatch import aio_pika +from pydantic import BaseModel from core.utils import now_ms from contextvars import ContextVar from aio_pika.abc import AbstractIncomingMessage @@ -20,6 +20,20 @@ class _MsgCtx(NamedTuple): _MSG_CTX: ContextVar[_MsgCtx | None] = ContextVar("_MSG_CTX", default=None) +class Meta(BaseModel): + routing_key: str + attempts: int + exchange: str + + +class Job(BaseModel): + task_id: str + type: str + payload: dict + created_at: int + _meta: Meta + + class ServiceBaseAsync: """ RabbitMQ tabanlı async servis iskeleti. @@ -52,7 +66,7 @@ class ServiceBaseAsync: self.queue_dlq = f"{self.queue_main}.dlq" self.retry_delay_ms = int(os.getenv("RETRY_DELAY_MS", "5000")) self.max_retries = int(os.getenv("MAX_RETRIES", "3")) - self.prefetch = int(os.getenv("PREFETCH", "16")) + self.prefetch = int(os.getenv("PREFETCH", "5")) self.ignore_self = os.getenv("IGNORE_SELF_PRODUCED", "true").lower() == "true" self.produce_fn = produce_fn @@ -163,7 +177,6 @@ class ServiceBaseAsync: attempts = int(msg.headers.get("x-attempts", 0)) except Exception: pass - handler = self._resolve_handler(msg.routing_key) or self.consume_fn meta = job.setdefault("_meta", {}) meta["routing_key"] = msg.routing_key diff --git a/ServicesTask/app/services/database/main.py b/ServicesTask/app/services/database/main.py index 2522731..168f9b9 100644 --- a/ServicesTask/app/services/database/main.py +++ b/ServicesTask/app/services/database/main.py @@ -2,7 +2,7 @@ import os import asyncio from prisma_client import PrismaService -from services.common.service_base_async import ServiceBaseAsync +from services.common.service_base_async import ServiceBaseAsync, Job PRODUCE_BURST = int(os.getenv("PRODUCE_BURST", "10")) @@ -10,39 +10,33 @@ PRODUCE_ONCE = os.getenv("PRODUCE_ONCE", "true").lower() == "true" EVENT_TYPE = os.getenv("EVENT_TYPE", "db-event") PROCESS_SEC = 10 +prisma_service = PrismaService() + async def produce(svc: ServiceBaseAsync): - prisma_service = PrismaService() async with prisma_service._asession() as db: result = await db.account_records.find_many(take=10, skip=0) result: list = prisma_service.to_dict(result, select={"id": True, "uu_id": True, "iban": True, "bank_reference_code": True, "bank_date": True, "bank_balance": True}) for row in result: await svc.enqueue(task_id=row["uu_id"], payload=row, type_="database.account.records") await asyncio.sleep(PROCESS_SEC) - print(f"Produced From Database Producer: {len(result)} events to '{svc.produce_key}") -async def handle_mail_publish(svc: ServiceBaseAsync, job): +async def handle_comment_publish(svc: ServiceBaseAsync, job: dict): + job_model = Job(**job) await asyncio.sleep(PROCESS_SEC) await svc.ack_current() - print("Database Consumer from mail:", job) + print("handle_comment_publish Database Consumer from comment:", job_model.task_id) -async def handle_mongo_publish(svc: ServiceBaseAsync, job): - prisma_service = PrismaService() +async def consume_default(svc: ServiceBaseAsync, job: dict): + job_model = Job(**job) await asyncio.sleep(PROCESS_SEC) - await svc.ack_current() - print("Database Consumer from mongo:", job) - - -async def consume_default(svc: ServiceBaseAsync, job): - prisma_service = PrismaService() - await asyncio.sleep(PROCESS_SEC) - print("Database Consumer default (DLQ):", job.get("task_id")) - await svc.dlq_current(job, error="unsupported_routing_key") + print("consume_default Database Consumer default (DLQ):", job_model.task_id) + await svc.dlq_current(job_model, error="unsupported_routing_key") if __name__ == "__main__": - svc = ServiceBaseAsync(produce_fn=produce, consume_fn=consume_default, handlers={"mail.service.publish": handle_mail_publish, "mongo.service.publish": handle_mongo_publish}) + svc = ServiceBaseAsync(produce_fn=produce, consume_fn=consume_default, handlers={"parser.comment.publish": handle_comment_publish}) asyncio.run(svc.run()) diff --git a/ServicesTask/app/services/parser/comment/main.py b/ServicesTask/app/services/parser/comment/main.py index 7ec8834..afa4516 100644 --- a/ServicesTask/app/services/parser/comment/main.py +++ b/ServicesTask/app/services/parser/comment/main.py @@ -1,30 +1,34 @@ import asyncio -from app.services.common.service_base_async import ServiceBaseAsync +from app.services.common.service_base_async import ServiceBaseAsync, Job PROCESS_SEC = 10 -async def handle_mail_publish(svc: ServiceBaseAsync, job: dict): +async def produce(_svc: ServiceBaseAsync): + # print("Parser Comment Producer produce :") await asyncio.sleep(PROCESS_SEC) - print("Parser Mail Consumer parsed:", job) - # await svc.ack_current() - # await svc.enqueue({"source": "parser-mail", "from_task": job}, "parser-mail-done", routing_key="parser.comment.publish") -async def consume_default(svc: ServiceBaseAsync, job): - print("Parser Mail Consumer default:", job) +async def handle_excel_publish(svc: ServiceBaseAsync, job: dict): + job_model = Job(**job) + mail_id = job_model.payload['mail_id'] + task_id = f"IsBankServiceMailParser_{mail_id}" + await svc.enqueue(task_id=task_id, payload=job_model.payload, type_="parser.comment.publish") + print("Parser Comment Consumer from excel handle_excel_publish :", job_model.task_id) + await svc.ack_current() + await asyncio.sleep(PROCESS_SEC) + + +async def consume_default(svc: ServiceBaseAsync, job: dict): + job_model = Job(**job) + print("Parser Comment Consumer default :", job_model.task_id) await asyncio.sleep(PROCESS_SEC) await svc.ack_current() -async def produce(_svc: ServiceBaseAsync): - print("Parser Mail Producer produce") - await asyncio.sleep(PROCESS_SEC) - - if __name__ == "__main__": - svc = ServiceBaseAsync(produce_fn=produce, consume_fn=consume_default, handlers={"mail.service.publish": handle_mail_publish}) + svc = ServiceBaseAsync(produce_fn=produce, consume_fn=consume_default, handlers={"parser.excel.publish": handle_excel_publish}) asyncio.run(svc.run()) diff --git a/ServicesTask/app/services/parser/comment/pyproject.toml b/ServicesTask/app/services/parser/comment/pyproject.toml index 82dd953..986279f 100644 --- a/ServicesTask/app/services/parser/comment/pyproject.toml +++ b/ServicesTask/app/services/parser/comment/pyproject.toml @@ -15,7 +15,8 @@ authors = [ dependencies = [ "aio-pika>=9.4.1", "prometheus-client>=0.20.0", - "uvloop>=0.19.0" + "uvloop>=0.19.0", + "pydantic" ] [project.optional-dependencies] diff --git a/ServicesTask/app/services/parser/excel/main.py b/ServicesTask/app/services/parser/excel/main.py index 31fa326..cff50bf 100644 --- a/ServicesTask/app/services/parser/excel/main.py +++ b/ServicesTask/app/services/parser/excel/main.py @@ -1,8 +1,7 @@ import os -import uuid import asyncio -from app.services.common.service_base_async import ServiceBaseAsync +from app.services.common.service_base_async import ServiceBaseAsync, Job PRODUCE_BURST = int(os.getenv("PRODUCE_BURST", "10")) @@ -13,28 +12,29 @@ PROCESS_SEC = 10 async def produce(svc: ServiceBaseAsync): await asyncio.sleep(PROCESS_SEC) - print(f"Parser Excel Producer produced {len([1,2])} events to '{svc.produce_key}'") async def handle_from_parser(svc: ServiceBaseAsync, job): - print("Parser Excel Consumer from parser:", job) + job = Job(**job) await svc.ack_current() - return + await asyncio.sleep(PROCESS_SEC) async def handle_from_mail(svc: ServiceBaseAsync, job): - print("Parser Excel Consumer from mail:", job) + job = Job(**job) + await svc.enqueue(task_id=job.task_id, payload=job.payload, type_="parser.excel.publish") + print("Parser Excel Consumer from mail handle_from_mail :", job.task_id) await svc.ack_current() - return + await asyncio.sleep(PROCESS_SEC) -async def consume_default(svc, job): - print("Parser Excel Consumer default:", job) +async def consume_default(svc: ServiceBaseAsync, job): + job = Job(**job) await svc.ack_current() - return + await asyncio.sleep(PROCESS_SEC) if __name__ == "__main__": - svc = ServiceBaseAsync(produce_fn=produce, consume_fn=consume_default, handlers={"parser.publish": handle_from_parser, "mail.publish": handle_from_mail}) + svc = ServiceBaseAsync(produce_fn=produce, consume_fn=consume_default, handlers={"parser.mail.publish": handle_from_mail}) asyncio.run(svc.run()) diff --git a/ServicesTask/app/services/parser/excel/pyproject.toml b/ServicesTask/app/services/parser/excel/pyproject.toml index 82dd953..986279f 100644 --- a/ServicesTask/app/services/parser/excel/pyproject.toml +++ b/ServicesTask/app/services/parser/excel/pyproject.toml @@ -15,7 +15,8 @@ authors = [ dependencies = [ "aio-pika>=9.4.1", "prometheus-client>=0.20.0", - "uvloop>=0.19.0" + "uvloop>=0.19.0", + "pydantic" ] [project.optional-dependencies] diff --git a/ServicesTask/app/services/parser/mail/main.py b/ServicesTask/app/services/parser/mail/main.py index 74dc40f..e817b7d 100644 --- a/ServicesTask/app/services/parser/mail/main.py +++ b/ServicesTask/app/services/parser/mail/main.py @@ -1,7 +1,7 @@ import os import asyncio -from app.services.common.service_base_async import ServiceBaseAsync +from app.services.common.service_base_async import ServiceBaseAsync, Job PRODUCE_BURST = int(os.getenv("PRODUCE_BURST", "10")) @@ -12,31 +12,33 @@ PROCESS_SEC = 10 async def produce(svc: ServiceBaseAsync): await asyncio.sleep(PROCESS_SEC) - print(f"Parser Mail Producer produced {len([1,2])} events to '{svc.produce_key}'") -async def handle_db_publish(svc: ServiceBaseAsync, job): - await asyncio.sleep(PROCESS_SEC) +async def handle_mail_publish(svc: ServiceBaseAsync, job: dict): + job_model = Job(**job) + mail_id = job_model.payload['mail_id'] + task_id = f"IsBankServiceMailParser_{mail_id}" + await svc.enqueue(task_id=task_id, payload=job_model.payload, type_="parser.excel.publish") + print("Parser Mail Consumer parsed handle_mail_publish :", job_model.task_id) await svc.ack_current() - print("Parser Mail Consumer from db:", job) - - -async def handle_mongo_publish(svc: ServiceBaseAsync, job): await asyncio.sleep(PROCESS_SEC) + + +async def handle_mongo_publish(svc: ServiceBaseAsync, job: dict): + job_model = Job(**job) await svc.ack_current() - print("Parser Mail Consumer from mongo:", job) - - -async def consume_default(svc: ServiceBaseAsync, job): + print("Parser Mail Consumer default handle_mongo_publish :", job_model.task_id) await asyncio.sleep(PROCESS_SEC) - print("Parser Mail Consumer default:", job) + + +async def consume_default(svc: ServiceBaseAsync, job: dict): + job_model = Job(**job) + await asyncio.sleep(PROCESS_SEC) + print("Parser Mail Consumer default consume_default :", job_model.task_id) return if __name__ == "__main__": - svc = ServiceBaseAsync( - produce_fn=produce, consume_fn=consume_default, - handlers={"database.service.publish": handle_db_publish, "mongo.service.publish": handle_mongo_publish}, - ) + svc = ServiceBaseAsync(produce_fn=produce, consume_fn=consume_default, handlers={"mail.service.publish": handle_mail_publish}) asyncio.run(svc.run()) diff --git a/ServicesTask/app/services/parser/mail/pyproject.toml b/ServicesTask/app/services/parser/mail/pyproject.toml index 82dd953..986279f 100644 --- a/ServicesTask/app/services/parser/mail/pyproject.toml +++ b/ServicesTask/app/services/parser/mail/pyproject.toml @@ -15,7 +15,8 @@ authors = [ dependencies = [ "aio-pika>=9.4.1", "prometheus-client>=0.20.0", - "uvloop>=0.19.0" + "uvloop>=0.19.0", + "pydantic" ] [project.optional-dependencies] diff --git a/ServicesTask/docker-compose.yml b/ServicesTask/docker-compose.yml index 9216d0d..c9a2480 100644 --- a/ServicesTask/docker-compose.yml +++ b/ServicesTask/docker-compose.yml @@ -98,7 +98,7 @@ services: EVENT_TYPE: "mongo-event" RETRY_DELAY_MS: "5000" MAX_RETRIES: "3" - PREFETCH: "16" + PREFETCH: "5" IGNORE_SELF_PRODUCED: "true" networks: [servicesNetwork] restart: unless-stopped @@ -118,13 +118,13 @@ services: environment: SERVICE_NAME: "database-service" PRODUCE_KEY: "database.service.publish" - CONSUME_BINDINGS: "mail.service.publish,mongo.service.publish" + CONSUME_BINDINGS: "parser.comment.publish," RABBITMQ_URL: amqp://admin:admin@rabbitmq:5672/ EXCHANGE_EVENTS: "app.events" PRODUCE_ONCE: "true" RETRY_DELAY_MS: "5000" MAX_RETRIES: "3" - PREFETCH: "16" + PREFETCH: "5" IGNORE_SELF_PRODUCED: "true" volumes: - ./app/services/database/venv:/opt/venv @@ -153,7 +153,7 @@ services: PRODUCE_ONCE: "true" RETRY_DELAY_MS: "5000" MAX_RETRIES: "3" - PREFETCH: "16" + PREFETCH: "5" IGNORE_SELF_PRODUCED: "true" networks: [servicesNetwork] restart: unless-stopped @@ -175,11 +175,11 @@ services: SERVICE_NAME: "parser-mail-service" RABBITMQ_URL: amqp://admin:admin@rabbitmq:5672/ EXCHANGE_EVENTS: "app.events" - CONSUME_BINDINGS: "mail.service.publish" + CONSUME_BINDINGS: "mail.service.publish," PRODUCE_KEY: "parser.mail.publish" RETRY_DELAY_MS: "5000" MAX_RETRIES: "3" - PREFETCH: "16" + PREFETCH: "5" IGNORE_SELF_PRODUCED: "true" networks: [servicesNetwork] restart: unless-stopped @@ -201,12 +201,12 @@ services: SERVICE_NAME: "parser-excel-service" RABBITMQ_URL: amqp://admin:admin@rabbitmq:5672/ EXCHANGE_EVENTS: "app.events" - CONSUME_BINDINGS: "parser.mail.publish" + CONSUME_BINDINGS: "parser.mail.publish," PRODUCE_KEY: "parser.excel.publish" PRODUCE_ONCE: "true" RETRY_DELAY_MS: "5000" MAX_RETRIES: "3" - PREFETCH: "16" + PREFETCH: "5" IGNORE_SELF_PRODUCED: "true" networks: [servicesNetwork] restart: unless-stopped @@ -228,12 +228,12 @@ services: SERVICE_NAME: "parser-comment-service" RABBITMQ_URL: amqp://admin:admin@rabbitmq:5672/ EXCHANGE_EVENTS: "app.events" - CONSUME_BINDINGS: "parser.excel.publish" + CONSUME_BINDINGS: "parser.excel.publish," PRODUCE_KEY: "parser.comment.publish" PRODUCE_ONCE: "true" RETRY_DELAY_MS: "5000" MAX_RETRIES: "3" - PREFETCH: "16" + PREFETCH: "5" IGNORE_SELF_PRODUCED: "true" networks: [servicesNetwork] restart: unless-stopped @@ -260,7 +260,7 @@ services: PRODUCE_ONCE: "true" RETRY_DELAY_MS: "5000" MAX_RETRIES: "3" - PREFETCH: "16" + PREFETCH: "5" IGNORE_SELF_PRODUCED: "true" networks: [servicesNetwork] restart: unless-stopped