72 lines
3.0 KiB
Python
72 lines
3.0 KiB
Python
import uuid
|
||
import time
|
||
import random
|
||
from typing import Any, Dict, Optional, Callable
|
||
|
||
from core.utils import now_ms, jd, jl
|
||
from core.config import RedisConfig, Env
|
||
from storage.sqlite_queue import SqliteQueue
|
||
|
||
class DualQueueProducer:
|
||
"""
|
||
Her job’ı hem Redis Stream’e hem de SQLite’e yazar.
|
||
"""
|
||
def __init__(self, stream: str, sqlite_db: SqliteQueue, cfg: RedisConfig):
|
||
self.stream = stream
|
||
self.sqlite = sqlite_db
|
||
self.cfg = cfg
|
||
self.r = cfg.client()
|
||
|
||
def enqueue(self, payload: Dict[str, Any], type_: str) -> str:
|
||
task_id = payload.get("task_id") or str(uuid.uuid4())
|
||
task = {"task_id": task_id, "queue": self.stream, "type": type_, "payload": payload, "created_at": now_ms(), "_attempts": 0}
|
||
self.r.xadd(self.stream, jd(task))
|
||
self.sqlite.add_task(task)
|
||
return task_id
|
||
|
||
|
||
class DualQueueConsumer:
|
||
"""
|
||
Consumer Group ile işler: retry / failed SQLite’e işler.
|
||
(XAUTOCLAIM eklenebilir; önce temel akış)
|
||
"""
|
||
def __init__(self, stream: str, sqlite_db: SqliteQueue, cfg: RedisConfig,
|
||
group: Optional[str] = None, consumer_name: Optional[str] = None):
|
||
self.stream = stream
|
||
self.sqlite = sqlite_db
|
||
self.cfg = cfg
|
||
self.group = group or f"g:{stream}"
|
||
self.consumer = consumer_name or f"w-{random.randint(1000,9999)}"
|
||
self.r = cfg.client()
|
||
try:
|
||
self.r.xgroup_create(self.stream, self.group, id="$", mkstream=True)
|
||
except Exception:
|
||
pass
|
||
|
||
def run(self, process_fn: Callable[[Dict[str, Any]], None]) -> None:
|
||
print(f"[{self.consumer}] listening {self.stream} …")
|
||
while True:
|
||
msgs = self.r.xreadgroup(self.group, self.consumer, {self.stream: ">"}, count=self.cfg.batch_size, block=5000)
|
||
if not msgs:
|
||
continue
|
||
for _, entries in msgs:
|
||
for msg_id, fields in entries:
|
||
job = jl(fields)
|
||
task_id = job["task_id"]
|
||
attempts = int(job.get("_attempts", 0))
|
||
try:
|
||
process_fn(job)
|
||
self.r.xack(self.stream, self.group, msg_id)
|
||
self.sqlite.update_task(task_id, status="done", attempts=attempts)
|
||
except Exception as e:
|
||
attempts += 1
|
||
if attempts > self.cfg.max_retries:
|
||
self.r.xack(self.stream, self.group, msg_id)
|
||
self.sqlite.update_task(task_id, status="failed", error=str(e), attempts=attempts)
|
||
else:
|
||
self.r.xack(self.stream, self.group, msg_id)
|
||
job["_attempts"] = attempts
|
||
time.sleep(min(0.2*(2**(attempts-1)), 3.0))
|
||
self.r.xadd(self.stream, jd(job))
|
||
self.sqlite.update_task(task_id, status="retry", error=str(e), attempts=attempts)
|