import os import uuid import asyncio import json from typing import Any, Dict, Awaitable, Callable, Optional from app.core.utils import now_ms from app.core import metrics from nats.aio.client import Client as NATS from nats.js.api import StreamConfig, ConsumerConfig, AckPolicy from nats.errors import NoRespondersError class ServiceBaseAsync: """ JetStream tabanlı base: - TASKS subject: publish + consume - PUBLISH subject: event yayını (enqueued / duplicate_skipped / done / retry / failed) - Dedup: Nats-Msg-Id = task_id (JetStream duplicate window içinde yazmaz) - Retry: msg.nak(); MAX_DELIVER aşılınca msg.term() (DLQ yoksa “failed”) """ def __init__( self, produce_fn: Callable[["ServiceBaseAsync"], Awaitable[None]], consume_fn: Callable[["ServiceBaseAsync", Dict[str, Any]], Awaitable[None]], ): self.nats_url = os.getenv("NATS_URL", "nats://nats:4222") self.stream_name = os.getenv("JS_STREAM", "ACCOUNT_SERVICES_DATABASE") self.tasks_subject = os.getenv("JS_TASKS_SUBJECT", "ACCOUNT.SERVICES.DATABASE.TASKS") self.publish_subject = os.getenv("JS_PUBLISH_SUBJECT", "ACCOUNT.SERVICES.DATABASE.PUBLISH") self.durable = os.getenv("JS_DURABLE", "DB_WORKERS") self.batch_size = int(os.getenv("BATCH_SIZE", "5")) self.ack_wait_sec = int(os.getenv("ACK_WAIT_SEC", "30")) self.max_deliver = int(os.getenv("MAX_DELIVER", "3")) self.retry_enabled = os.getenv("RETRY_ENABLED", "true").lower() == "true" self.dedup_header = os.getenv("DEDUP_HEADER", "Nats-Msg-Id") self.produce_fn = produce_fn self.consume_fn = consume_fn self.nc: Optional[NATS] = None self.js = None async def run(self) -> None: metrics.start_server() self.nc = NATS() await self.nc.connect(self.nats_url) self.js = self.nc.jetstream() await self._ensure_stream_and_consumer() await asyncio.gather(self._produce_loop(), self._consume_loop()) async def _ensure_stream_and_consumer(self) -> None: try: await self.js.add_stream(StreamConfig(name=self.stream_name, subjects=[self.tasks_subject, self.publish_subject])) print(f"[js] stream created: {self.stream_name}") except Exception: pass try: await self.js.add_consumer( self.stream_name, ConsumerConfig( durable_name=self.durable, ack_policy=AckPolicy.EXPLICIT, ack_wait=self.ack_wait_sec, max_deliver=self.max_deliver, filter_subject=self.tasks_subject), ) print(f"[js] consumer created: durable={self.durable}") except Exception: pass async def _produce_loop(self) -> None: while True: try: await self.produce_fn(self) except Exception as e: print(f"[produce] ERROR: {e}") await asyncio.sleep(2) async def _consume_loop(self) -> None: sub = await self.js.pull_subscribe(self.tasks_subject, durable=self.durable) while True: try: msgs = await sub.fetch(self.batch_size, timeout=2) except Exception: msgs = [] if not msgs: await asyncio.sleep(0.2) continue for msg in msgs: job = self._decode_msg(msg) attempts = self._delivery_attempts(msg) try: await self.consume_fn(self, job) await msg.ack() await self._publish({"task_id": job.get("task_id"), "status": "done"}) except Exception as e: err = str(e) if (not self.retry_enabled) or (attempts >= self.max_deliver): await msg.term() await self._publish({"task_id": job.get("task_id"), "status": "failed", "error": err}) else: await msg.nak() await self._publish({"task_id": job.get("task_id"), "status": "retry", "attempts": attempts, "error": err}) async def enqueue(self, payload: Dict[str, Any], type_: str, task_id: Optional[str] = None) -> str: """ Dedup: Nats-Msg-Id = task_id duplicate ise publish.duplicate True döner ve JS yazmaz. """ _task_id = task_id or payload.get("task_id") or str(uuid.uuid4()) payload.setdefault("task_id", _task_id) task = {"task_id": _task_id, "type": type_, "payload": payload, "created_at": now_ms(), "_attempts": 0} data = json.dumps(task).encode() try: ack = await self.js.publish(self.tasks_subject, data, headers={self.dedup_header: _task_id}) except NoRespondersError: raise RuntimeError("NATS/JetStream not available") if getattr(ack, "duplicate", False): # await self._publish({"task_id": _task_id, "status": "duplicate_skipped"}) return _task_id await self._publish({"task_id": _task_id, "status": "enqueued"}) return _task_id async def _publish(self, event: Dict[str, Any]) -> None: evt = dict(event) evt.setdefault("ts", now_ms()) evt.setdefault("queue", self.tasks_subject) try: metrics.observe(evt.get("status","unknown"), evt["queue"], evt.get("type")) except Exception: pass try: await self.js.publish(self.publish_subject, json.dumps(evt).encode()) except Exception: pass @staticmethod def _decode_msg(msg) -> Dict[str, Any]: try: obj = json.loads(msg.data.decode()) if "payload" in obj and isinstance(obj["payload"], str): try: obj["payload"] = json.loads(obj["payload"]) except Exception: pass return obj except Exception: return {"payload": {}, "task_id": None} @staticmethod def _delivery_attempts(msg) -> int: try: return msg.metadata.num_delivered except Exception: return 1