rabbitmq implemented and tested

This commit is contained in:
2025-08-17 21:14:46 +03:00
parent 9543d136aa
commit 61529f7d94
43 changed files with 5433 additions and 315 deletions

View File

@@ -1,79 +1,147 @@
import os
import json
import uuid
import asyncio
import json
import fnmatch
import aio_pika
from typing import Any, Dict, Awaitable, Callable, Optional
from core.utils import now_ms
from contextvars import ContextVar
from aio_pika.abc import AbstractIncomingMessage
from typing import Any, Dict, Awaitable, Callable, Optional, List, NamedTuple
from app.core.utils import now_ms
from app.core import metrics
from nats.aio.client import Client as NATS
from nats.js.api import StreamConfig, ConsumerConfig, AckPolicy
from nats.errors import NoRespondersError
class _MsgCtx(NamedTuple):
msg: AbstractIncomingMessage
rk: str
attempts: int
_MSG_CTX: ContextVar[_MsgCtx | None] = ContextVar("_MSG_CTX", default=None)
class ServiceBaseAsync:
"""
JetStream tabanlı base:
- TASKS subject: publish + consume
- PUBLISH subject: event yayını (enqueued / duplicate_skipped / done / retry / failed)
- Dedup: Nats-Msg-Id = task_id (JetStream duplicate window içinde yazmaz)
- Retry: msg.nak(); MAX_DELIVER aşılınca msg.term() (DLQ yoksa “failed”)
RabbitMQ tabanlı async servis iskeleti.
- Topic exchange: EXCHANGE_EVENTS (default: app.events)
- Çoklu consume binding: CONSUME_BINDINGS="parser.publish,mail.publish"
- Kendi ürettiğini tüketmez: payload.source == SERVICE_NAME -> ACK & skip
- Retry: TTL'li retry kuyruğu (RETRY_DELAY_MS), sonra main'e geri DLX
- Max deneme üstünde DLQ: q.<service>.events.dlq
- Handler map: routing key -> özel callback (pattern destekli)
- Geriye uyumluluk: enqueue(payload, type_, routing_key=None, message_id=None)
"""
def __init__(
self,
produce_fn: Callable[["ServiceBaseAsync"], Awaitable[None]],
consume_fn: Callable[["ServiceBaseAsync", Dict[str, Any]], Awaitable[None]],
handlers: Optional[Dict[str, Callable[["ServiceBaseAsync", Dict[str, Any]], Awaitable[None]]]] = None,
):
self.nats_url = os.getenv("NATS_URL", "nats://nats:4222")
self.service_name = os.getenv("SERVICE_NAME", "db-service")
self.amqp_url = os.getenv("RABBITMQ_URL", "amqp://guest:guest@localhost/")
self.exchange_name = os.getenv("EXCHANGE_EVENTS", "app.events")
self.produce_key: str = os.getenv("PRODUCE_KEY", f"{self.service_name}.publish")
self.stream_name = os.getenv("JS_STREAM", "ACCOUNT_SERVICES_DATABASE")
self.tasks_subject = os.getenv("JS_TASKS_SUBJECT", "ACCOUNT.SERVICES.DATABASE.TASKS")
self.publish_subject = os.getenv("JS_PUBLISH_SUBJECT", "ACCOUNT.SERVICES.DATABASE.PUBLISH")
raw = os.getenv("CONSUME_BINDINGS", "")
self.consume_bindings: List[str] = [s.strip() for s in raw.split(",") if s.strip()]
base = self.service_name.replace("/", "_")
self.durable = os.getenv("JS_DURABLE", "DB_WORKERS")
self.batch_size = int(os.getenv("BATCH_SIZE", "5"))
self.ack_wait_sec = int(os.getenv("ACK_WAIT_SEC", "30"))
self.max_deliver = int(os.getenv("MAX_DELIVER", "3"))
self.retry_enabled = os.getenv("RETRY_ENABLED", "true").lower() == "true"
self.dedup_header = os.getenv("DEDUP_HEADER", "Nats-Msg-Id")
self.queue_main = f"q.{base}.events"
self.queue_retry = f"{self.queue_main}.retry"
self.queue_dlq = f"{self.queue_main}.dlq"
self.retry_delay_ms = int(os.getenv("RETRY_DELAY_MS", "5000"))
self.max_retries = int(os.getenv("MAX_RETRIES", "3"))
self.prefetch = int(os.getenv("PREFETCH", "16"))
self.ignore_self = os.getenv("IGNORE_SELF_PRODUCED", "true").lower() == "true"
self.produce_fn = produce_fn
self.consume_fn = consume_fn
self.nc: Optional[NATS] = None
self.js = None
self.handlers = handlers or {}
self.conn: Optional[aio_pika.RobustConnection] = None
self.chan: Optional[aio_pika.RobustChannel] = None
self.ex: Optional[aio_pika.Exchange] = None
async def run(self) -> None:
metrics.start_server()
self.nc = NATS()
await self.nc.connect(self.nats_url)
self.js = self.nc.jetstream()
async def _connect_with_retry(self, max_wait: int = 300):
delay = 1
deadline = asyncio.get_event_loop().time() + (max_wait or 10**9)
last_err = None
while True:
try:
conn = await aio_pika.connect_robust(self.amqp_url, client_properties={"connection_name": self.service_name}, timeout=10)
print(f"[amqp] connected: {self.amqp_url} : {self.service_name} : {self.exchange_name} : {str(self.consume_bindings)}")
return conn
except Exception as e:
last_err = e
now = asyncio.get_event_loop().time()
if now + delay > deadline:
raise last_err
await asyncio.sleep(delay)
delay = min(delay * 2, 10)
await self._ensure_stream_and_consumer()
await asyncio.gather(self._produce_loop(), self._consume_loop())
async def run(self):
self.conn = await self._connect_with_retry()
self.chan = await self.conn.channel()
await self.chan.set_qos(prefetch_count=self.prefetch)
self.ex = await self.chan.declare_exchange(self.exchange_name, aio_pika.ExchangeType.TOPIC, durable=True)
self.ex_retry = await self.chan.declare_exchange(f"{self.exchange_name}.retry", aio_pika.ExchangeType.TOPIC, durable=True)
self.ex_dlx = await self.chan.declare_exchange(f"{self.exchange_name}.dlx", aio_pika.ExchangeType.TOPIC, durable=True)
args_main = {"x-dead-letter-exchange": f"{self.exchange_name}.retry", "x-queue-mode": "lazy"}
q_main = await self.chan.declare_queue(self.queue_main, durable=True, arguments=args_main)
args_retry = {"x-message-ttl": self.retry_delay_ms, "x-dead-letter-exchange": self.exchange_name}
q_retry = await self.chan.declare_queue(self.queue_retry, durable=True, arguments=args_retry)
q_dlq = await self.chan.declare_queue(self.queue_dlq, durable=True)
await q_dlq.bind(self.ex_dlx, routing_key="#")
if not self.consume_bindings:
print("[warn] No CONSUME_BINDINGS configured; only producing.")
for rk in (self.consume_bindings or []):
await q_main.bind(self.ex, routing_key=rk)
await q_retry.bind(self.ex_retry, routing_key=rk)
await q_main.consume(self._on_message, no_ack=False)
await asyncio.gather(self._produce_loop())
async def _ensure_stream_and_consumer(self) -> None:
try:
await self.js.add_stream(StreamConfig(name=self.stream_name, subjects=[self.tasks_subject, self.publish_subject]))
print(f"[js] stream created: {self.stream_name}")
except Exception:
pass
try:
await self.js.add_consumer(
self.stream_name,
ConsumerConfig(
durable_name=self.durable, ack_policy=AckPolicy.EXPLICIT,
ack_wait=self.ack_wait_sec, max_deliver=self.max_deliver, filter_subject=self.tasks_subject),
)
print(f"[js] consumer created: durable={self.durable}")
except Exception:
pass
async def enqueue(self, task_id: str, payload: Dict[str, Any], type_: Optional[str] = None, routing_key: Optional[str] = None, message_id: Optional[str] = None) -> str:
assert self.ex is not None
payload.setdefault("task_id", task_id)
payload.setdefault("source", self.service_name)
body = json.dumps({"task_id": task_id, "type": type_, "payload": payload, "created_at": now_ms()}).encode()
msg = aio_pika.Message(body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT, message_id=message_id or task_id, headers={"x-attempts": 0})
rk = routing_key or self.produce_key
await self.ex.publish(msg, routing_key=rk)
return task_id
async def _produce_loop(self) -> None:
async def ack_current(self) -> None:
ctx = _MSG_CTX.get()
if ctx and ctx.msg:
await ctx.msg.ack()
async def nack_current(self, requeue: bool = False) -> None:
ctx = _MSG_CTX.get()
if ctx and ctx.msg:
await ctx.msg.nack(requeue=requeue)
async def retry_current(self, job: dict, attempts: int | None = None) -> None:
"""Retry kuyruğuna kopyala ve orijinali ACK'le."""
ctx = _MSG_CTX.get()
if not (ctx and ctx.msg):
return
att = attempts if attempts is not None else (ctx.attempts + 1)
await self._publish_retry(ctx.msg, job, att)
await ctx.msg.ack()
async def dlq_current(self, job: dict, error: str | None = None) -> None:
"""DLQ'ya gönder ve orijinali ACK'le."""
ctx = _MSG_CTX.get()
if not (ctx and ctx.msg):
return
await self._publish_dlq(ctx.msg, job, error=error)
await ctx.msg.ack()
def register_handler(self, pattern: str, fn: Callable[["ServiceBaseAsync", Dict[str, Any]], Awaitable[None]]):
self.handlers[pattern] = fn
async def _produce_loop(self):
while True:
try:
await self.produce_fn(self)
@@ -81,85 +149,61 @@ class ServiceBaseAsync:
print(f"[produce] ERROR: {e}")
await asyncio.sleep(2)
async def _consume_loop(self) -> None:
sub = await self.js.pull_subscribe(self.tasks_subject, durable=self.durable)
while True:
async def _on_message(self, msg: AbstractIncomingMessage):
async with msg.process(ignore_processed=True, requeue=False):
try:
msgs = await sub.fetch(self.batch_size, timeout=2)
job = json.loads(msg.body.decode())
except Exception:
msgs = []
job = {"payload": {}, "task_id": None}
src = (job.get("payload") or {}).get("source")
if self.ignore_self and src == self.service_name:
return
attempts = 0
try:
attempts = int(msg.headers.get("x-attempts", 0))
except Exception:
pass
if not msgs:
await asyncio.sleep(0.2)
continue
for msg in msgs:
job = self._decode_msg(msg)
attempts = self._delivery_attempts(msg)
try:
await self.consume_fn(self, job)
handler = self._resolve_handler(msg.routing_key) or self.consume_fn
meta = job.setdefault("_meta", {})
meta["routing_key"] = msg.routing_key
meta["attempts"] = attempts
meta["exchange"] = self.exchange_name
ctx_token = _MSG_CTX.set(_MsgCtx(msg=msg, rk=msg.routing_key, attempts=attempts))
try:
await handler(self, job)
except Exception as e:
if attempts + 1 >= self.max_retries:
await self._publish_dlq(msg, job, error=str(e))
await msg.ack()
await self._publish({"task_id": job.get("task_id"), "status": "done"})
except Exception as e:
err = str(e)
if (not self.retry_enabled) or (attempts >= self.max_deliver):
await msg.term()
await self._publish({"task_id": job.get("task_id"), "status": "failed", "error": err})
else:
await msg.nak()
await self._publish({"task_id": job.get("task_id"), "status": "retry", "attempts": attempts, "error": err})
else:
await self._publish_retry(msg, job, attempts + 1)
await msg.ack()
finally:
_MSG_CTX.reset(ctx_token)
async def enqueue(self, payload: Dict[str, Any], type_: str, task_id: Optional[str] = None) -> str:
"""
Dedup: Nats-Msg-Id = task_id
duplicate ise publish.duplicate True döner ve JS yazmaz.
"""
_task_id = task_id or payload.get("task_id") or str(uuid.uuid4())
payload.setdefault("task_id", _task_id)
def _resolve_handler(self, routing_key: str):
if routing_key in self.handlers:
return self.handlers[routing_key]
for pat, fn in self.handlers.items():
if fnmatch.fnmatch(routing_key, pat):
return fn
return None
task = {"task_id": _task_id, "type": type_, "payload": payload, "created_at": now_ms(), "_attempts": 0}
data = json.dumps(task).encode()
try:
ack = await self.js.publish(self.tasks_subject, data, headers={self.dedup_header: _task_id})
except NoRespondersError:
raise RuntimeError("NATS/JetStream not available")
async def _publish_retry(self, msg: AbstractIncomingMessage, job: Dict[str, Any], attempts: int):
chan = self.chan; assert chan is not None
retry_ex = await chan.get_exchange(f"{self.exchange_name}.retry")
rk = msg.routing_key
body = json.dumps(job).encode()
m = aio_pika.Message(body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT, message_id=msg.message_id, headers={"x-attempts": attempts})
await retry_ex.publish(m, routing_key=rk)
if getattr(ack, "duplicate", False):
# await self._publish({"task_id": _task_id, "status": "duplicate_skipped"})
return _task_id
await self._publish({"task_id": _task_id, "status": "enqueued"})
return _task_id
async def _publish(self, event: Dict[str, Any]) -> None:
evt = dict(event)
evt.setdefault("ts", now_ms())
evt.setdefault("queue", self.tasks_subject)
try:
metrics.observe(evt.get("status","unknown"), evt["queue"], evt.get("type"))
except Exception:
pass
try:
await self.js.publish(self.publish_subject, json.dumps(evt).encode())
except Exception:
pass
@staticmethod
def _decode_msg(msg) -> Dict[str, Any]:
try:
obj = json.loads(msg.data.decode())
if "payload" in obj and isinstance(obj["payload"], str):
try:
obj["payload"] = json.loads(obj["payload"])
except Exception:
pass
return obj
except Exception:
return {"payload": {}, "task_id": None}
@staticmethod
def _delivery_attempts(msg) -> int:
try:
return msg.metadata.num_delivered
except Exception:
return 1
async def _publish_dlq(self, msg: AbstractIncomingMessage, job: Dict[str, Any], error: Optional[str] = None):
chan = self.chan; assert chan is not None
dlx_ex = await chan.get_exchange(f"{self.exchange_name}.dlx")
body_obj = dict(job)
if error:
body_obj.setdefault("_error", str(error))
body = json.dumps(body_obj).encode()
m = aio_pika.Message(body, delivery_mode=aio_pika.DeliveryMode.PERSISTENT, message_id=msg.message_id, headers={"x-attempts": msg.headers.get("x-attempts", 0)})
await dlx_ex.publish(m, routing_key=msg.routing_key)