production-evyos-systems-an.../ServicesTask/app/services/common/service_base_async.py

201 lines
9.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import asyncio
import fnmatch
import aio_pika
from aio_pika.abc import AbstractIncomingMessage
from typing import Any, Dict, Awaitable, Callable, Optional, List
from services.types.task import _MsgCtx, _MSG_CTX
from services.types.queue import Enqueue
class ServiceBaseAsync:
"""
RabbitMQ tabanlı async servis iskeleti.
- Topic exchange: EXCHANGE_EVENTS (default: app.events)
- Çoklu consume binding: CONSUME_BINDINGS="parser.publish,mail.publish"
- Kendi ürettiğini tüketmez: payload.source == SERVICE_NAME -> ACK & skip
- Retry: TTL'li retry kuyruğu (RETRY_DELAY_MS), sonra main'e geri DLX
- Max deneme üstünde DLQ: q.<service>.events.dlq
- Handler map: routing key -> özel callback (pattern destekli)
- Geriye uyumluluk: enqueue(payload, action, routing_key=None, message_id=None)
"""
def __init__(
self,
produce_fn: Callable[["ServiceBaseAsync"], Awaitable[None]],
consume_fn: Callable[["ServiceBaseAsync", Dict[str, Any]], Awaitable[None]],
handlers: Optional[Dict[str, Callable[["ServiceBaseAsync", Dict[str, Any]], Awaitable[None]]]] = None,
):
self.service_name = os.getenv("SERVICE_NAME", "db-service")
self.amqp_url = os.getenv("RABBITMQ_URL", "amqp://guest:guest@localhost/")
self.exchange_name = os.getenv("EXCHANGE_EVENTS", "app.events")
self.produce_key: str = os.getenv("PRODUCE_KEY", f"{self.service_name}.publish")
raw = os.getenv("CONSUME_BINDINGS", "")
self.consume_bindings: List[str] = [s.strip() for s in raw.split(",") if s.strip()]
base = self.service_name.replace("/", "_")
self.queue_main = f"q.{base}.events"
self.queue_retry = f"{self.queue_main}.retry"
self.queue_dlq = f"{self.queue_main}.dlq"
self.retry_delay_ms = int(os.getenv("RETRY_DELAY_MS", "5000"))
self.max_retries = int(os.getenv("MAX_RETRIES", "3"))
self.prefetch = int(os.getenv("PREFETCH", "5"))
self.ignore_self = os.getenv("IGNORE_SELF_PRODUCED", "true").lower() == "true"
self.produce_fn = produce_fn
self.consume_fn = consume_fn
self.handlers = handlers or {}
self.conn: Optional[aio_pika.RobustConnection] = None
self.chan: Optional[aio_pika.RobustChannel] = None
self.ex: Optional[aio_pika.Exchange] = None
async def _connect_with_retry(self, max_wait: int = 300):
delay = 1
deadline = asyncio.get_event_loop().time() + (max_wait or 10**9)
last_err = None
while True:
try:
conn = await aio_pika.connect_robust(self.amqp_url, client_properties={"connection_name": self.service_name}, timeout=10)
print(f"[amqp] connected: {self.amqp_url} : {self.service_name} : {self.exchange_name} : {str(self.consume_bindings)}")
return conn
except Exception as e:
last_err = e
now = asyncio.get_event_loop().time()
if now + delay > deadline:
raise last_err
await asyncio.sleep(delay)
delay = min(delay * 2, 10)
async def run(self):
self.conn = await self._connect_with_retry()
self.chan = await self.conn.channel()
await self.chan.set_qos(prefetch_count=self.prefetch)
self.ex = await self.chan.declare_exchange(self.exchange_name, aio_pika.ExchangeType.TOPIC, durable=True)
self.ex_retry = await self.chan.declare_exchange(f"{self.exchange_name}.retry", aio_pika.ExchangeType.TOPIC, durable=True)
self.ex_dlx = await self.chan.declare_exchange(f"{self.exchange_name}.dlx", aio_pika.ExchangeType.TOPIC, durable=True)
args_main = {"x-dead-letter-exchange": f"{self.exchange_name}.retry", "x-queue-mode": "lazy"}
q_main = await self.chan.declare_queue(self.queue_main, durable=True, arguments=args_main)
args_retry = {"x-message-ttl": self.retry_delay_ms, "x-dead-letter-exchange": self.exchange_name}
q_retry = await self.chan.declare_queue(self.queue_retry, durable=True, arguments=args_retry)
q_dlq = await self.chan.declare_queue(self.queue_dlq, durable=True)
await q_dlq.bind(self.ex_dlx, routing_key="#")
if not self.consume_bindings:
print("[warn] No CONSUME_BINDINGS configured; only producing.")
for rk in (self.consume_bindings or []):
await q_main.bind(self.ex, routing_key=rk)
await q_retry.bind(self.ex_retry, routing_key=rk)
await q_main.consume(self._on_message, no_ack=False)
await asyncio.gather(self._produce_loop())
async def enqueue(self, enqueue: Enqueue) -> str:
assert self.ex is not None
payload: dict = enqueue.payload
payload.setdefault("task_id", enqueue.task_id)
payload.setdefault("source", self.service_name)
enqueue.payload = payload
msg = aio_pika.Message(enqueue.body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT, message_id=enqueue.message_id or enqueue.task_id, headers={"x-attempts": 0}, type=enqueue.action)
routing_key = enqueue.routing_key or self.produce_key
await self.ex.publish(msg, routing_key=routing_key)
return enqueue.task_id
async def ack_current(self) -> None:
ctx = _MSG_CTX.get()
if ctx and ctx.msg:
await ctx.msg.ack()
async def nack_current(self, requeue: bool = False) -> None:
ctx = _MSG_CTX.get()
if ctx and ctx.msg:
await ctx.msg.nack(requeue=requeue)
async def retry_current(self, job: dict, attempts: int | None = None) -> None:
"""Retry kuyruğuna kopyala ve orijinali ACK'le."""
ctx = _MSG_CTX.get()
if not (ctx and ctx.msg):
return
att = attempts if attempts is not None else (ctx.attempts + 1)
await self._publish_retry(ctx.msg, job, att)
await ctx.msg.ack()
async def dlq_current(self, job: dict, error: str | None = None) -> None:
"""DLQ'ya gönder ve orijinali ACK'le."""
ctx = _MSG_CTX.get()
if not (ctx and ctx.msg):
return
await self._publish_dlq(ctx.msg, job, error=error)
await ctx.msg.ack()
def register_handler(self, pattern: str, fn: Callable[["ServiceBaseAsync", Dict[str, Any]], Awaitable[None]]):
self.handlers[pattern] = fn
async def _produce_loop(self):
while True:
try:
await self.produce_fn(self)
except Exception as e:
print(f"[produce] ERROR: {e}")
await asyncio.sleep(2)
async def _on_message(self, msg: AbstractIncomingMessage):
async with msg.process(ignore_processed=True, requeue=False):
try:
job = json.loads(msg.body.decode())
except Exception:
job = {"payload": {}, "task_id": None}
src = (job.get("payload") or {}).get("source")
if self.ignore_self and src == self.service_name:
return
attempts = 0
try:
attempts = int(msg.headers.get("x-attempts", 0))
except Exception:
pass
handler = self._resolve_handler(msg.routing_key) or self.consume_fn
meta = job.setdefault("_meta", {})
meta["routing_key"] = msg.routing_key
meta["attempts"] = attempts
meta["exchange"] = self.exchange_name
ctx_token = _MSG_CTX.set(_MsgCtx(msg=msg, rk=msg.routing_key, attempts=attempts))
try:
await handler(self, job)
except Exception as e:
if attempts + 1 >= self.max_retries:
await self._publish_dlq(msg, job, error=str(e))
await msg.ack()
else:
await self._publish_retry(msg, job, attempts + 1)
await msg.ack()
finally:
_MSG_CTX.reset(ctx_token)
def _resolve_handler(self, routing_key: str):
if routing_key in self.handlers:
return self.handlers[routing_key]
for pat, fn in self.handlers.items():
if fnmatch.fnmatch(routing_key, pat):
return fn
return None
async def _publish_retry(self, msg: AbstractIncomingMessage, job: Dict[str, Any], attempts: int):
chan = self.chan; assert chan is not None
retry_ex = await chan.get_exchange(f"{self.exchange_name}.retry")
rk = msg.routing_key
body = json.dumps(job).encode()
m = aio_pika.Message(body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT, message_id=msg.message_id, headers={"x-attempts": attempts})
await retry_ex.publish(m, routing_key=rk)
async def _publish_dlq(self, msg: AbstractIncomingMessage, job: Dict[str, Any], error: Optional[str] = None):
chan = self.chan; assert chan is not None
dlx_ex = await chan.get_exchange(f"{self.exchange_name}.dlx")
body_obj = dict(job)
if error:
body_obj.setdefault("_error", str(error))
body = json.dumps(body_obj).encode()
m = aio_pika.Message(body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT, message_id=msg.message_id, headers={"x-attempts": msg.headers.get("x-attempts", 0)})
await dlx_ex.publish(m, routing_key=msg.routing_key)