production-evyos-systems-an.../ServicesTask/app/queue/dual_queue.py

72 lines
3.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import uuid
import time
import random
from typing import Any, Dict, Optional, Callable
from core.utils import now_ms, jd, jl
from core.config import RedisConfig, Env
from storage.sqlite_queue import SqliteQueue
class DualQueueProducer:
"""
Her jobı hem Redis Streame hem de SQLitee yazar.
"""
def __init__(self, stream: str, sqlite_db: SqliteQueue, cfg: RedisConfig):
self.stream = stream
self.sqlite = sqlite_db
self.cfg = cfg
self.r = cfg.client()
def enqueue(self, payload: Dict[str, Any], type_: str) -> str:
task_id = payload.get("task_id") or str(uuid.uuid4())
task = {"task_id": task_id, "queue": self.stream, "type": type_, "payload": payload, "created_at": now_ms(), "_attempts": 0}
self.r.xadd(self.stream, jd(task))
self.sqlite.add_task(task)
return task_id
class DualQueueConsumer:
"""
Consumer Group ile işler: retry / failed SQLitee işler.
(XAUTOCLAIM eklenebilir; önce temel akış)
"""
def __init__(self, stream: str, sqlite_db: SqliteQueue, cfg: RedisConfig,
group: Optional[str] = None, consumer_name: Optional[str] = None):
self.stream = stream
self.sqlite = sqlite_db
self.cfg = cfg
self.group = group or f"g:{stream}"
self.consumer = consumer_name or f"w-{random.randint(1000,9999)}"
self.r = cfg.client()
try:
self.r.xgroup_create(self.stream, self.group, id="$", mkstream=True)
except Exception:
pass
def run(self, process_fn: Callable[[Dict[str, Any]], None]) -> None:
print(f"[{self.consumer}] listening {self.stream}")
while True:
msgs = self.r.xreadgroup(self.group, self.consumer, {self.stream: ">"}, count=self.cfg.batch_size, block=5000)
if not msgs:
continue
for _, entries in msgs:
for msg_id, fields in entries:
job = jl(fields)
task_id = job["task_id"]
attempts = int(job.get("_attempts", 0))
try:
process_fn(job)
self.r.xack(self.stream, self.group, msg_id)
self.sqlite.update_task(task_id, status="done", attempts=attempts)
except Exception as e:
attempts += 1
if attempts > self.cfg.max_retries:
self.r.xack(self.stream, self.group, msg_id)
self.sqlite.update_task(task_id, status="failed", error=str(e), attempts=attempts)
else:
self.r.xack(self.stream, self.group, msg_id)
job["_attempts"] = attempts
time.sleep(min(0.2*(2**(attempts-1)), 3.0))
self.r.xadd(self.stream, jd(job))
self.sqlite.update_task(task_id, status="retry", error=str(e), attempts=attempts)