import uuid import time import random from typing import Any, Dict, Optional, Callable from core.utils import now_ms, jd, jl from core.config import RedisConfig, Env from storage.sqlite_queue import SqliteQueue class DualQueueProducer: """ Her job’ı hem Redis Stream’e hem de SQLite’e yazar. """ def __init__(self, stream: str, sqlite_db: SqliteQueue, cfg: RedisConfig): self.stream = stream self.sqlite = sqlite_db self.cfg = cfg self.r = cfg.client() def enqueue(self, payload: Dict[str, Any], type_: str) -> str: task_id = payload.get("task_id") or str(uuid.uuid4()) task = {"task_id": task_id, "queue": self.stream, "type": type_, "payload": payload, "created_at": now_ms(), "_attempts": 0} self.r.xadd(self.stream, jd(task)) self.sqlite.add_task(task) return task_id class DualQueueConsumer: """ Consumer Group ile işler: retry / failed SQLite’e işler. (XAUTOCLAIM eklenebilir; önce temel akış) """ def __init__(self, stream: str, sqlite_db: SqliteQueue, cfg: RedisConfig, group: Optional[str] = None, consumer_name: Optional[str] = None): self.stream = stream self.sqlite = sqlite_db self.cfg = cfg self.group = group or f"g:{stream}" self.consumer = consumer_name or f"w-{random.randint(1000,9999)}" self.r = cfg.client() try: self.r.xgroup_create(self.stream, self.group, id="$", mkstream=True) except Exception: pass def run(self, process_fn: Callable[[Dict[str, Any]], None]) -> None: print(f"[{self.consumer}] listening {self.stream} …") while True: msgs = self.r.xreadgroup(self.group, self.consumer, {self.stream: ">"}, count=self.cfg.batch_size, block=5000) if not msgs: continue for _, entries in msgs: for msg_id, fields in entries: job = jl(fields) task_id = job["task_id"] attempts = int(job.get("_attempts", 0)) try: process_fn(job) self.r.xack(self.stream, self.group, msg_id) self.sqlite.update_task(task_id, status="done", attempts=attempts) except Exception as e: attempts += 1 if attempts > self.cfg.max_retries: self.r.xack(self.stream, self.group, msg_id) self.sqlite.update_task(task_id, status="failed", error=str(e), attempts=attempts) else: self.r.xack(self.stream, self.group, msg_id) job["_attempts"] = attempts time.sleep(min(0.2*(2**(attempts-1)), 3.0)) self.r.xadd(self.stream, jd(job)) self.sqlite.update_task(task_id, status="retry", error=str(e), attempts=attempts)