import os import json import asyncio import fnmatch import aio_pika from pydantic import BaseModel from core.utils import now_ms from contextvars import ContextVar from aio_pika.abc import AbstractIncomingMessage from typing import Any, Dict, Awaitable, Callable, Optional, List, NamedTuple class _MsgCtx(NamedTuple): msg: AbstractIncomingMessage rk: str attempts: int _MSG_CTX: ContextVar[_MsgCtx | None] = ContextVar("_MSG_CTX", default=None) class Meta(BaseModel): routing_key: str attempts: int exchange: str class Job(BaseModel): task_id: str type: str payload: dict created_at: int _meta: Meta class ServiceBaseAsync: """ RabbitMQ tabanlı async servis iskeleti. - Topic exchange: EXCHANGE_EVENTS (default: app.events) - Çoklu consume binding: CONSUME_BINDINGS="parser.publish,mail.publish" - Kendi ürettiğini tüketmez: payload.source == SERVICE_NAME -> ACK & skip - Retry: TTL'li retry kuyruğu (RETRY_DELAY_MS), sonra main'e geri DLX - Max deneme üstünde DLQ: q..events.dlq - Handler map: routing key -> özel callback (pattern destekli) - Geriye uyumluluk: enqueue(payload, type_, routing_key=None, message_id=None) """ def __init__( self, produce_fn: Callable[["ServiceBaseAsync"], Awaitable[None]], consume_fn: Callable[["ServiceBaseAsync", Dict[str, Any]], Awaitable[None]], handlers: Optional[Dict[str, Callable[["ServiceBaseAsync", Dict[str, Any]], Awaitable[None]]]] = None, ): self.service_name = os.getenv("SERVICE_NAME", "db-service") self.amqp_url = os.getenv("RABBITMQ_URL", "amqp://guest:guest@localhost/") self.exchange_name = os.getenv("EXCHANGE_EVENTS", "app.events") self.produce_key: str = os.getenv("PRODUCE_KEY", f"{self.service_name}.publish") raw = os.getenv("CONSUME_BINDINGS", "") self.consume_bindings: List[str] = [s.strip() for s in raw.split(",") if s.strip()] base = self.service_name.replace("/", "_") self.queue_main = f"q.{base}.events" self.queue_retry = f"{self.queue_main}.retry" self.queue_dlq = f"{self.queue_main}.dlq" self.retry_delay_ms = int(os.getenv("RETRY_DELAY_MS", "5000")) self.max_retries = int(os.getenv("MAX_RETRIES", "3")) self.prefetch = int(os.getenv("PREFETCH", "5")) self.ignore_self = os.getenv("IGNORE_SELF_PRODUCED", "true").lower() == "true" self.produce_fn = produce_fn self.consume_fn = consume_fn self.handlers = handlers or {} self.conn: Optional[aio_pika.RobustConnection] = None self.chan: Optional[aio_pika.RobustChannel] = None self.ex: Optional[aio_pika.Exchange] = None async def _connect_with_retry(self, max_wait: int = 300): delay = 1 deadline = asyncio.get_event_loop().time() + (max_wait or 10**9) last_err = None while True: try: conn = await aio_pika.connect_robust(self.amqp_url, client_properties={"connection_name": self.service_name}, timeout=10) print(f"[amqp] connected: {self.amqp_url} : {self.service_name} : {self.exchange_name} : {str(self.consume_bindings)}") return conn except Exception as e: last_err = e now = asyncio.get_event_loop().time() if now + delay > deadline: raise last_err await asyncio.sleep(delay) delay = min(delay * 2, 10) async def run(self): self.conn = await self._connect_with_retry() self.chan = await self.conn.channel() await self.chan.set_qos(prefetch_count=self.prefetch) self.ex = await self.chan.declare_exchange(self.exchange_name, aio_pika.ExchangeType.TOPIC, durable=True) self.ex_retry = await self.chan.declare_exchange(f"{self.exchange_name}.retry", aio_pika.ExchangeType.TOPIC, durable=True) self.ex_dlx = await self.chan.declare_exchange(f"{self.exchange_name}.dlx", aio_pika.ExchangeType.TOPIC, durable=True) args_main = {"x-dead-letter-exchange": f"{self.exchange_name}.retry", "x-queue-mode": "lazy"} q_main = await self.chan.declare_queue(self.queue_main, durable=True, arguments=args_main) args_retry = {"x-message-ttl": self.retry_delay_ms, "x-dead-letter-exchange": self.exchange_name} q_retry = await self.chan.declare_queue(self.queue_retry, durable=True, arguments=args_retry) q_dlq = await self.chan.declare_queue(self.queue_dlq, durable=True) await q_dlq.bind(self.ex_dlx, routing_key="#") if not self.consume_bindings: print("[warn] No CONSUME_BINDINGS configured; only producing.") for rk in (self.consume_bindings or []): await q_main.bind(self.ex, routing_key=rk) await q_retry.bind(self.ex_retry, routing_key=rk) await q_main.consume(self._on_message, no_ack=False) await asyncio.gather(self._produce_loop()) async def enqueue(self, task_id: str, payload: Dict[str, Any], type_: Optional[str] = None, routing_key: Optional[str] = None, message_id: Optional[str] = None) -> str: assert self.ex is not None payload.setdefault("task_id", task_id) payload.setdefault("source", self.service_name) body = json.dumps({"task_id": task_id, "type": type_, "payload": payload, "created_at": now_ms()}).encode() msg = aio_pika.Message(body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT, message_id=message_id or task_id, headers={"x-attempts": 0}) rk = routing_key or self.produce_key await self.ex.publish(msg, routing_key=rk) return task_id async def ack_current(self) -> None: ctx = _MSG_CTX.get() if ctx and ctx.msg: await ctx.msg.ack() async def nack_current(self, requeue: bool = False) -> None: ctx = _MSG_CTX.get() if ctx and ctx.msg: await ctx.msg.nack(requeue=requeue) async def retry_current(self, job: dict, attempts: int | None = None) -> None: """Retry kuyruğuna kopyala ve orijinali ACK'le.""" ctx = _MSG_CTX.get() if not (ctx and ctx.msg): return att = attempts if attempts is not None else (ctx.attempts + 1) await self._publish_retry(ctx.msg, job, att) await ctx.msg.ack() async def dlq_current(self, job: dict, error: str | None = None) -> None: """DLQ'ya gönder ve orijinali ACK'le.""" ctx = _MSG_CTX.get() if not (ctx and ctx.msg): return await self._publish_dlq(ctx.msg, job, error=error) await ctx.msg.ack() def register_handler(self, pattern: str, fn: Callable[["ServiceBaseAsync", Dict[str, Any]], Awaitable[None]]): self.handlers[pattern] = fn async def _produce_loop(self): while True: try: await self.produce_fn(self) except Exception as e: print(f"[produce] ERROR: {e}") await asyncio.sleep(2) async def _on_message(self, msg: AbstractIncomingMessage): async with msg.process(ignore_processed=True, requeue=False): try: job = json.loads(msg.body.decode()) except Exception: job = {"payload": {}, "task_id": None} src = (job.get("payload") or {}).get("source") if self.ignore_self and src == self.service_name: return attempts = 0 try: attempts = int(msg.headers.get("x-attempts", 0)) except Exception: pass handler = self._resolve_handler(msg.routing_key) or self.consume_fn meta = job.setdefault("_meta", {}) meta["routing_key"] = msg.routing_key meta["attempts"] = attempts meta["exchange"] = self.exchange_name ctx_token = _MSG_CTX.set(_MsgCtx(msg=msg, rk=msg.routing_key, attempts=attempts)) try: await handler(self, job) except Exception as e: if attempts + 1 >= self.max_retries: await self._publish_dlq(msg, job, error=str(e)) await msg.ack() else: await self._publish_retry(msg, job, attempts + 1) await msg.ack() finally: _MSG_CTX.reset(ctx_token) def _resolve_handler(self, routing_key: str): if routing_key in self.handlers: return self.handlers[routing_key] for pat, fn in self.handlers.items(): if fnmatch.fnmatch(routing_key, pat): return fn return None async def _publish_retry(self, msg: AbstractIncomingMessage, job: Dict[str, Any], attempts: int): chan = self.chan; assert chan is not None retry_ex = await chan.get_exchange(f"{self.exchange_name}.retry") rk = msg.routing_key body = json.dumps(job).encode() m = aio_pika.Message(body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT, message_id=msg.message_id, headers={"x-attempts": attempts}) await retry_ex.publish(m, routing_key=rk) async def _publish_dlq(self, msg: AbstractIncomingMessage, job: Dict[str, Any], error: Optional[str] = None): chan = self.chan; assert chan is not None dlx_ex = await chan.get_exchange(f"{self.exchange_name}.dlx") body_obj = dict(job) if error: body_obj.setdefault("_error", str(error)) body = json.dumps(body_obj).encode() m = aio_pika.Message(body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT, message_id=msg.message_id, headers={"x-attempts": msg.headers.get("x-attempts", 0)}) await dlx_ex.publish(m, routing_key=msg.routing_key)