From 9543d136aa357227ed4f094bda5eda7e71017372 Mon Sep 17 00:00:00 2001 From: Berkay Date: Fri, 15 Aug 2025 22:30:21 +0300 Subject: [PATCH] task services added --- .../Depends/{schame.txt => schema.txt} | 0 ServicesTask/.python-version | 1 + =1.3.8 => ServicesTask/README.md | 0 ServicesTask/app/core/config.py | 50 ++++++ ServicesTask/app/core/metrics.py | 49 +++++ ServicesTask/app/core/nats.conf | 11 ++ ServicesTask/app/core/sqlite_tasks.py | 68 +++++++ ServicesTask/app/core/utils.py | 15 ++ ServicesTask/app/queue/dual_queue.py | 71 ++++++++ ServicesTask/app/services/common/=0.19.0 | 0 ServicesTask/app/services/common/__init__.py | 0 .../app/services/common/service_base_async.py | 165 +++++++++++++++++ ServicesTask/app/services/database/=0.20.0 | 0 ServicesTask/app/services/database/Dockerfile | 18 ++ ServicesTask/app/services/database/README.md | 0 .../app/services/database/__init__.py | 0 ServicesTask/app/services/database/main.py | 28 +++ .../app/services/database/pyproject.toml | 36 ++++ ServicesTask/app/services/mail/Dockerfile | 18 ++ ServicesTask/app/services/mail/README.md | 0 ServicesTask/app/services/mail/__init__.py | 0 .../app/services/mail/mail_service_async.py | 15 ++ ServicesTask/app/services/mail/main.py | 29 +++ ServicesTask/app/services/mail/pyproject.toml | 35 ++++ ServicesTask/app/services/queue/Dockerfile | 18 ++ ServicesTask/app/services/queue/README.md | 0 ServicesTask/app/services/queue/__init__.py | 0 ServicesTask/app/services/queue/main.py | 17 ++ .../app/services/queue/pyproject.toml | 35 ++++ .../app/services/queue/queue_service_async.py | 16 ++ ServicesTask/app/services/test/Dockerfile | 18 ++ ServicesTask/app/services/test/README.md | 0 ServicesTask/app/services/test/main.py | 71 ++++++++ ServicesTask/app/services/test/pyproject.toml | 35 ++++ ServicesTask/app/storage/sqlite_queue.py | 51 ++++++ ServicesTask/docker-compose.yml | 170 ++++++++++++++++++ .../provisioning/datasources/datasource.yml | 9 + .../monitor/prometheus/prometheus.yml | 16 ++ 38 files changed, 1065 insertions(+) rename ServicesRunner/Depends/{schame.txt => schema.txt} (100%) create mode 100644 ServicesTask/.python-version rename =1.3.8 => ServicesTask/README.md (100%) create mode 100644 ServicesTask/app/core/config.py create mode 100644 ServicesTask/app/core/metrics.py create mode 100644 ServicesTask/app/core/nats.conf create mode 100644 ServicesTask/app/core/sqlite_tasks.py create mode 100644 ServicesTask/app/core/utils.py create mode 100644 ServicesTask/app/queue/dual_queue.py create mode 100644 ServicesTask/app/services/common/=0.19.0 create mode 100644 ServicesTask/app/services/common/__init__.py create mode 100644 ServicesTask/app/services/common/service_base_async.py create mode 100644 ServicesTask/app/services/database/=0.20.0 create mode 100644 ServicesTask/app/services/database/Dockerfile create mode 100644 ServicesTask/app/services/database/README.md create mode 100644 ServicesTask/app/services/database/__init__.py create mode 100644 ServicesTask/app/services/database/main.py create mode 100644 ServicesTask/app/services/database/pyproject.toml create mode 100644 ServicesTask/app/services/mail/Dockerfile create mode 100644 ServicesTask/app/services/mail/README.md create mode 100644 ServicesTask/app/services/mail/__init__.py create mode 100644 ServicesTask/app/services/mail/mail_service_async.py create mode 100644 ServicesTask/app/services/mail/main.py create mode 100644 ServicesTask/app/services/mail/pyproject.toml create mode 100644 ServicesTask/app/services/queue/Dockerfile create mode 100644 ServicesTask/app/services/queue/README.md create mode 100644 ServicesTask/app/services/queue/__init__.py create mode 100644 ServicesTask/app/services/queue/main.py create mode 100644 ServicesTask/app/services/queue/pyproject.toml create mode 100644 ServicesTask/app/services/queue/queue_service_async.py create mode 100644 ServicesTask/app/services/test/Dockerfile create mode 100644 ServicesTask/app/services/test/README.md create mode 100644 ServicesTask/app/services/test/main.py create mode 100644 ServicesTask/app/services/test/pyproject.toml create mode 100644 ServicesTask/app/storage/sqlite_queue.py create mode 100644 ServicesTask/docker-compose.yml create mode 100644 ServicesTask/monitor/grafana/provisioning/datasources/datasource.yml create mode 100644 ServicesTask/monitor/prometheus/prometheus.yml diff --git a/ServicesRunner/Depends/schame.txt b/ServicesRunner/Depends/schema.txt similarity index 100% rename from ServicesRunner/Depends/schame.txt rename to ServicesRunner/Depends/schema.txt diff --git a/ServicesTask/.python-version b/ServicesTask/.python-version new file mode 100644 index 0000000..e4fba21 --- /dev/null +++ b/ServicesTask/.python-version @@ -0,0 +1 @@ +3.12 diff --git a/=1.3.8 b/ServicesTask/README.md similarity index 100% rename from =1.3.8 rename to ServicesTask/README.md diff --git a/ServicesTask/app/core/config.py b/ServicesTask/app/core/config.py new file mode 100644 index 0000000..bc2ea24 --- /dev/null +++ b/ServicesTask/app/core/config.py @@ -0,0 +1,50 @@ +import os +import redis + +from typing import Optional + + +class Env: + REDIS_HOST: str = os.getenv("REDIS_HOST", "redis") + REDIS_PORT: int = int(os.getenv("REDIS_PORT", "6379")) + REDIS_DB: int = int(os.getenv("REDIS_DB", "0")) + REDIS_USERNAME: Optional[str] = os.getenv("REDIS_USERNAME") + REDIS_PASSWORD: Optional[str] = os.getenv("REDIS_PASSWORD") + REDIS_STREAM_PUBLISH: str = os.getenv("REDIS_STREAM_PUBLISH", "DEFAULT:REGISTER:DONTUSE") + REDIS_STREAM_TASKS: str = os.getenv("REDIS_STREAM_TASKS", "DEFAULT:REGISTER:DONTUSE") + + BATCH_SIZE: int = int(os.getenv("BATCH_SIZE", "5")) + MAX_RETRIES: int = int(os.getenv("MAX_RETRIES", "3")) + IDLE_RECLAIM_MS: int = int(os.getenv("IDLE_RECLAIM_MS", "30000")) + + SQLITE_PATH: str = os.getenv("SQLITE_PATH", "/app/data/queue.db") + +class RedisConfig: + def __init__( + self, + host: str = Env.REDIS_HOST, + port: int = Env.REDIS_PORT, + db: int = Env.REDIS_DB, + username: Optional[str] = Env.REDIS_USERNAME, + password: Optional[str] = Env.REDIS_PASSWORD, + batch_size: int = Env.BATCH_SIZE, + max_retries: int = Env.MAX_RETRIES, + idle_reclaim_ms: int = Env.IDLE_RECLAIM_MS, + ): + self.host = host + self.port = port + self.db = db + self.username = username + self.password = password + self.batch_size = batch_size + self.max_retries = max_retries + self.idle_reclaim_ms = idle_reclaim_ms + + def client(self) -> redis.Redis: + return redis.Redis( + host=self.host, + port=self.port, + db=self.db, + username=self.username, + password=self.password, + ) diff --git a/ServicesTask/app/core/metrics.py b/ServicesTask/app/core/metrics.py new file mode 100644 index 0000000..8dbf434 --- /dev/null +++ b/ServicesTask/app/core/metrics.py @@ -0,0 +1,49 @@ +import os +from prometheus_client import Counter, start_http_server + +_METRICS_STARTED = False + +NS = os.getenv("METRICS_NS", "servicestask") +SERVICE_NAME = os.getenv("SERVICE_NAME", "db-service") + +QUEUE_ENQUEUED = Counter( + f"{NS}_queue_enqueued_total", "Enqueued tasks", + labelnames=("service","queue","type") +) +QUEUE_DUPLICATE = Counter( + f"{NS}_queue_duplicate_skipped_total", "Duplicate skipped", + labelnames=("service","queue","type") +) +QUEUE_DONE = Counter( + f"{NS}_queue_done_total", "Done tasks", + labelnames=("service","queue","type") +) +QUEUE_FAILED = Counter( + f"{NS}_queue_failed_total", "Failed tasks", + labelnames=("service","queue","type") +) +QUEUE_RETRY = Counter( + f"{NS}_queue_retry_total", "Retry attempts", + labelnames=("service","queue","type") +) + +def start_server(): + global _METRICS_STARTED + if _METRICS_STARTED: + return + port = int(os.getenv("METRICS_PORT", "8000")) + start_http_server(port) + _METRICS_STARTED = True + +def observe(status: str, queue: str, type_: str): + labels = (SERVICE_NAME, queue, type_ or "unknown") + if status == "enqueued": + QUEUE_ENQUEUED.labels(*labels).inc() + elif status == "duplicate_skipped": + QUEUE_DUPLICATE.labels(*labels).inc() + elif status == "done": + QUEUE_DONE.labels(*labels).inc() + elif status == "failed": + QUEUE_FAILED.labels(*labels).inc() + elif status == "retry": + QUEUE_RETRY.labels(*labels).inc() diff --git a/ServicesTask/app/core/nats.conf b/ServicesTask/app/core/nats.conf new file mode 100644 index 0000000..88de601 --- /dev/null +++ b/ServicesTask/app/core/nats.conf @@ -0,0 +1,11 @@ +server_name: "nats-main" + +port: 4222 +http: 8222 + +jetstream: { + store_dir: "/data/jetstream", + max_mem_store: 512MB, + max_file_store: 10GB +} + diff --git a/ServicesTask/app/core/sqlite_tasks.py b/ServicesTask/app/core/sqlite_tasks.py new file mode 100644 index 0000000..dd0e0f9 --- /dev/null +++ b/ServicesTask/app/core/sqlite_tasks.py @@ -0,0 +1,68 @@ +import json +import aiosqlite + +from typing import Any, Dict, List, Optional + + +class TasksRepoAsync: + """ + SQLITE Task Manager + """ + def __init__(self, db_path: str): + self.db_path = db_path + + async def init(self) -> None: + async with aiosqlite.connect(self.db_path) as db: + await db.execute("PRAGMA journal_mode=WAL;") + await db.execute("PRAGMA synchronous=NORMAL;") + await db.execute(""" + CREATE TABLE IF NOT EXISTS tasks( + task_id TEXT PRIMARY KEY, + queue TEXT NOT NULL, + type TEXT NOT NULL, + payload_json TEXT NOT NULL, + created_at INTEGER NOT NULL, + status TEXT DEFAULT 'pending', + attempts INTEGER DEFAULT 0, + last_error TEXT, + last_msg_id TEXT + ); + """) + cols = await self._columns(db, "tasks") + if "last_msg_id" not in cols: + await db.execute("ALTER TABLE tasks ADD COLUMN last_msg_id TEXT;") + await db.commit() + + async def insert_task(self, task: Dict[str, Any], last_msg_id: Optional[str]=None) -> None: + async with aiosqlite.connect(self.db_path) as db: + await db.execute("""INSERT OR REPLACE INTO tasks (task_id, queue, type, payload_json, created_at, status, attempts, last_msg_id) VALUES(?,?,?,?,?,'pending',?,?)""", + (task["task_id"], task["queue"], task["type"], json.dumps(task["payload"]), task["created_at"], int(task.get("_attempts", 0)), last_msg_id)) + await db.commit() + + async def mark_done(self, task_id: str, attempts: int) -> None: + await self._update(task_id, status="done", attempts=attempts, error=None) + + async def mark_failed(self, task_id: str, attempts: int, error: str) -> None: + await self._update(task_id, status="failed", attempts=attempts, error=error) + + async def mark_retry(self, task_id: str, attempts: int, error: str, last_msg_id: str) -> None: + await self._update(task_id, status="retry", attempts=attempts, error=error, last_msg_id=last_msg_id) + + async def _update(self, task_id: str, *, status: str, attempts: Optional[int]=None, error: Optional[str]=None, last_msg_id: Optional[str]=None) -> None: + sets, params = ["status=?","last_error=?"], [status, error] + if attempts is not None: + sets.append("attempts=?"); params.append(int(attempts)) + if last_msg_id is not None: + sets.append("last_msg_id=?"); params.append(last_msg_id) + params.append(task_id) + sql = f"UPDATE tasks SET {', '.join(sets)} WHERE task_id=?" + async with aiosqlite.connect(self.db_path) as db: + await db.execute(sql, tuple(params)) + await db.commit() + + async def _columns(self, db: aiosqlite.Connection, table: str) -> List[str]: + cols: List[str] = [] + async with db.execute(f"PRAGMA table_info({table})") as cur: + async for row in cur: + cols.append(row[1]) + return cols diff --git a/ServicesTask/app/core/utils.py b/ServicesTask/app/core/utils.py new file mode 100644 index 0000000..00b8cf3 --- /dev/null +++ b/ServicesTask/app/core/utils.py @@ -0,0 +1,15 @@ +import json +import time + +from typing import Any, Dict + +def now_ms() -> int: + return int(time.time() * 1000) + +def jd(obj: Dict[str, Any]) -> Dict[bytes, bytes]: + """JSON to Redis fields.""" + return {"data": json.dumps(obj).encode("utf-8")} + +def jl(fields: Dict[bytes, bytes]) -> Dict[str, Any]: + """Redis fields to JSON.""" + return json.loads(fields[b"data"].decode("utf-8")) diff --git a/ServicesTask/app/queue/dual_queue.py b/ServicesTask/app/queue/dual_queue.py new file mode 100644 index 0000000..fb757ff --- /dev/null +++ b/ServicesTask/app/queue/dual_queue.py @@ -0,0 +1,71 @@ +import uuid +import time +import random +from typing import Any, Dict, Optional, Callable + +from core.utils import now_ms, jd, jl +from core.config import RedisConfig, Env +from storage.sqlite_queue import SqliteQueue + +class DualQueueProducer: + """ + Her job’ı hem Redis Stream’e hem de SQLite’e yazar. + """ + def __init__(self, stream: str, sqlite_db: SqliteQueue, cfg: RedisConfig): + self.stream = stream + self.sqlite = sqlite_db + self.cfg = cfg + self.r = cfg.client() + + def enqueue(self, payload: Dict[str, Any], type_: str) -> str: + task_id = payload.get("task_id") or str(uuid.uuid4()) + task = {"task_id": task_id, "queue": self.stream, "type": type_, "payload": payload, "created_at": now_ms(), "_attempts": 0} + self.r.xadd(self.stream, jd(task)) + self.sqlite.add_task(task) + return task_id + + +class DualQueueConsumer: + """ + Consumer Group ile işler: retry / failed SQLite’e işler. + (XAUTOCLAIM eklenebilir; önce temel akış) + """ + def __init__(self, stream: str, sqlite_db: SqliteQueue, cfg: RedisConfig, + group: Optional[str] = None, consumer_name: Optional[str] = None): + self.stream = stream + self.sqlite = sqlite_db + self.cfg = cfg + self.group = group or f"g:{stream}" + self.consumer = consumer_name or f"w-{random.randint(1000,9999)}" + self.r = cfg.client() + try: + self.r.xgroup_create(self.stream, self.group, id="$", mkstream=True) + except Exception: + pass + + def run(self, process_fn: Callable[[Dict[str, Any]], None]) -> None: + print(f"[{self.consumer}] listening {self.stream} …") + while True: + msgs = self.r.xreadgroup(self.group, self.consumer, {self.stream: ">"}, count=self.cfg.batch_size, block=5000) + if not msgs: + continue + for _, entries in msgs: + for msg_id, fields in entries: + job = jl(fields) + task_id = job["task_id"] + attempts = int(job.get("_attempts", 0)) + try: + process_fn(job) + self.r.xack(self.stream, self.group, msg_id) + self.sqlite.update_task(task_id, status="done", attempts=attempts) + except Exception as e: + attempts += 1 + if attempts > self.cfg.max_retries: + self.r.xack(self.stream, self.group, msg_id) + self.sqlite.update_task(task_id, status="failed", error=str(e), attempts=attempts) + else: + self.r.xack(self.stream, self.group, msg_id) + job["_attempts"] = attempts + time.sleep(min(0.2*(2**(attempts-1)), 3.0)) + self.r.xadd(self.stream, jd(job)) + self.sqlite.update_task(task_id, status="retry", error=str(e), attempts=attempts) diff --git a/ServicesTask/app/services/common/=0.19.0 b/ServicesTask/app/services/common/=0.19.0 new file mode 100644 index 0000000..e69de29 diff --git a/ServicesTask/app/services/common/__init__.py b/ServicesTask/app/services/common/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ServicesTask/app/services/common/service_base_async.py b/ServicesTask/app/services/common/service_base_async.py new file mode 100644 index 0000000..efc1c34 --- /dev/null +++ b/ServicesTask/app/services/common/service_base_async.py @@ -0,0 +1,165 @@ +import os +import uuid +import asyncio +import json + +from typing import Any, Dict, Awaitable, Callable, Optional + +from app.core.utils import now_ms +from app.core import metrics + +from nats.aio.client import Client as NATS +from nats.js.api import StreamConfig, ConsumerConfig, AckPolicy +from nats.errors import NoRespondersError + + +class ServiceBaseAsync: + """ + JetStream tabanlı base: + - TASKS subject: publish + consume + - PUBLISH subject: event yayını (enqueued / duplicate_skipped / done / retry / failed) + - Dedup: Nats-Msg-Id = task_id (JetStream duplicate window içinde yazmaz) + - Retry: msg.nak(); MAX_DELIVER aşılınca msg.term() (DLQ yoksa “failed”) + """ + + def __init__( + self, + produce_fn: Callable[["ServiceBaseAsync"], Awaitable[None]], + consume_fn: Callable[["ServiceBaseAsync", Dict[str, Any]], Awaitable[None]], + ): + self.nats_url = os.getenv("NATS_URL", "nats://nats:4222") + + self.stream_name = os.getenv("JS_STREAM", "ACCOUNT_SERVICES_DATABASE") + self.tasks_subject = os.getenv("JS_TASKS_SUBJECT", "ACCOUNT.SERVICES.DATABASE.TASKS") + self.publish_subject = os.getenv("JS_PUBLISH_SUBJECT", "ACCOUNT.SERVICES.DATABASE.PUBLISH") + + self.durable = os.getenv("JS_DURABLE", "DB_WORKERS") + self.batch_size = int(os.getenv("BATCH_SIZE", "5")) + self.ack_wait_sec = int(os.getenv("ACK_WAIT_SEC", "30")) + self.max_deliver = int(os.getenv("MAX_DELIVER", "3")) + + self.retry_enabled = os.getenv("RETRY_ENABLED", "true").lower() == "true" + self.dedup_header = os.getenv("DEDUP_HEADER", "Nats-Msg-Id") + + self.produce_fn = produce_fn + self.consume_fn = consume_fn + + self.nc: Optional[NATS] = None + self.js = None + + async def run(self) -> None: + metrics.start_server() + self.nc = NATS() + await self.nc.connect(self.nats_url) + self.js = self.nc.jetstream() + + await self._ensure_stream_and_consumer() + await asyncio.gather(self._produce_loop(), self._consume_loop()) + + async def _ensure_stream_and_consumer(self) -> None: + try: + await self.js.add_stream(StreamConfig(name=self.stream_name, subjects=[self.tasks_subject, self.publish_subject])) + print(f"[js] stream created: {self.stream_name}") + except Exception: + pass + try: + await self.js.add_consumer( + self.stream_name, + ConsumerConfig( + durable_name=self.durable, ack_policy=AckPolicy.EXPLICIT, + ack_wait=self.ack_wait_sec, max_deliver=self.max_deliver, filter_subject=self.tasks_subject), + ) + print(f"[js] consumer created: durable={self.durable}") + except Exception: + pass + + async def _produce_loop(self) -> None: + while True: + try: + await self.produce_fn(self) + except Exception as e: + print(f"[produce] ERROR: {e}") + await asyncio.sleep(2) + + async def _consume_loop(self) -> None: + sub = await self.js.pull_subscribe(self.tasks_subject, durable=self.durable) + while True: + try: + msgs = await sub.fetch(self.batch_size, timeout=2) + except Exception: + msgs = [] + + if not msgs: + await asyncio.sleep(0.2) + continue + + for msg in msgs: + job = self._decode_msg(msg) + attempts = self._delivery_attempts(msg) + try: + await self.consume_fn(self, job) + await msg.ack() + await self._publish({"task_id": job.get("task_id"), "status": "done"}) + except Exception as e: + err = str(e) + if (not self.retry_enabled) or (attempts >= self.max_deliver): + await msg.term() + await self._publish({"task_id": job.get("task_id"), "status": "failed", "error": err}) + else: + await msg.nak() + await self._publish({"task_id": job.get("task_id"), "status": "retry", "attempts": attempts, "error": err}) + + async def enqueue(self, payload: Dict[str, Any], type_: str, task_id: Optional[str] = None) -> str: + """ + Dedup: Nats-Msg-Id = task_id + duplicate ise publish.duplicate True döner ve JS yazmaz. + """ + _task_id = task_id or payload.get("task_id") or str(uuid.uuid4()) + payload.setdefault("task_id", _task_id) + + task = {"task_id": _task_id, "type": type_, "payload": payload, "created_at": now_ms(), "_attempts": 0} + data = json.dumps(task).encode() + try: + ack = await self.js.publish(self.tasks_subject, data, headers={self.dedup_header: _task_id}) + except NoRespondersError: + raise RuntimeError("NATS/JetStream not available") + + if getattr(ack, "duplicate", False): + # await self._publish({"task_id": _task_id, "status": "duplicate_skipped"}) + return _task_id + + await self._publish({"task_id": _task_id, "status": "enqueued"}) + return _task_id + + async def _publish(self, event: Dict[str, Any]) -> None: + evt = dict(event) + evt.setdefault("ts", now_ms()) + evt.setdefault("queue", self.tasks_subject) + try: + metrics.observe(evt.get("status","unknown"), evt["queue"], evt.get("type")) + except Exception: + pass + try: + await self.js.publish(self.publish_subject, json.dumps(evt).encode()) + except Exception: + pass + + @staticmethod + def _decode_msg(msg) -> Dict[str, Any]: + try: + obj = json.loads(msg.data.decode()) + if "payload" in obj and isinstance(obj["payload"], str): + try: + obj["payload"] = json.loads(obj["payload"]) + except Exception: + pass + return obj + except Exception: + return {"payload": {}, "task_id": None} + + @staticmethod + def _delivery_attempts(msg) -> int: + try: + return msg.metadata.num_delivered + except Exception: + return 1 diff --git a/ServicesTask/app/services/database/=0.20.0 b/ServicesTask/app/services/database/=0.20.0 new file mode 100644 index 0000000..e69de29 diff --git a/ServicesTask/app/services/database/Dockerfile b/ServicesTask/app/services/database/Dockerfile new file mode 100644 index 0000000..198e8de --- /dev/null +++ b/ServicesTask/app/services/database/Dockerfile @@ -0,0 +1,18 @@ +FROM python:3.12-slim + +ENV PYTHONDONTWRITEBYTECODE=1 PYTHONUNBUFFERED=1 +ENV PYTHONPATH=/app + +WORKDIR / + +COPY app/services/database/pyproject.toml ./ +COPY app/services/database/README.md ./ + +COPY app/core ./app/core +COPY app/services/common/ ./app/services/common/ +COPY app/services/database/ ./app/services/database/ + +RUN pip install --upgrade pip && pip install --no-cache-dir . +RUN mkdir -p /app/data + +CMD ["python", "-m", "app.services.database.main"] diff --git a/ServicesTask/app/services/database/README.md b/ServicesTask/app/services/database/README.md new file mode 100644 index 0000000..e69de29 diff --git a/ServicesTask/app/services/database/__init__.py b/ServicesTask/app/services/database/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ServicesTask/app/services/database/main.py b/ServicesTask/app/services/database/main.py new file mode 100644 index 0000000..3f9f6dd --- /dev/null +++ b/ServicesTask/app/services/database/main.py @@ -0,0 +1,28 @@ +import os +import uuid +import asyncio + +from app.services.common.service_base_async import ServiceBaseAsync + + +PRODUCE_ENABLED = os.getenv("PRODUCE_ENABLED", "true").lower() == "true" +PRODUCE_BATCH = int(os.getenv("PRODUCE_BATCH", "3")) # her produce tick'inde kaç iş +TASK_TYPE = os.getenv("TASK_TYPE", "db-task") # iş tipi (task_id'de de kullanılır) +CONSUME_SLEEP_SEC = float(os.getenv("CONSUME_SLEEP_SEC", "0.5")) # işleme süresi simülasyonu (sn) +STATIC_IDS = ["2c47f1073a9d4f05aad6c15484894a72", "65827e3452b545d6845e050a503401f3", "5c663088f09d4062b4e567f47335fb1a"] + + +async def produce(service: ServiceBaseAsync): + for biz_id in STATIC_IDS: + deterministic_task_id = f"{TASK_TYPE}:{biz_id}" + payload = {"id": biz_id, "op": "sync", "source": "db-service"} + await service.enqueue(payload, TASK_TYPE, task_id=deterministic_task_id) + print(f"[DB] produce tick attempted ids={','.join(STATIC_IDS)}") + + +async def consume(service: ServiceBaseAsync, job: dict): + await asyncio.sleep(CONSUME_SLEEP_SEC) + print(f"[DB] consumed task={job['task_id']}") + +if __name__ == "__main__": + asyncio.run(ServiceBaseAsync(produce, consume).run()) diff --git a/ServicesTask/app/services/database/pyproject.toml b/ServicesTask/app/services/database/pyproject.toml new file mode 100644 index 0000000..be9f659 --- /dev/null +++ b/ServicesTask/app/services/database/pyproject.toml @@ -0,0 +1,36 @@ +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "dual-queue-services" +version = "0.1.0" +description = "Async dual queue system with Redis Streams and SQLite persistence" +readme = "README.md" +requires-python = ">=3.11" +authors = [ + { name = "Berkay Karatay", email = "karatay.berkay@gmail.com" } +] +dependencies = [ + "nats-py>=2.6.0", + "prometheus-client>=0.20.0", + "uvloop>=0.19.0" +] + +[project.optional-dependencies] +dev = [ + "pytest>=7.4", + "black>=23.0", + "isort>=5.12" +] + +[tool.black] +line-length = 88 +target-version = ["py311"] + +[tool.isort] +profile = "black" + +[tool.setuptools.packages.find] +where = ["app"] +include = ["app*"] diff --git a/ServicesTask/app/services/mail/Dockerfile b/ServicesTask/app/services/mail/Dockerfile new file mode 100644 index 0000000..c3d5064 --- /dev/null +++ b/ServicesTask/app/services/mail/Dockerfile @@ -0,0 +1,18 @@ +FROM python:3.12-slim + +ENV PYTHONDONTWRITEBYTECODE=1 PYTHONUNBUFFERED=1 +ENV PYTHONPATH=/app + +WORKDIR / + +COPY app/services/mail/pyproject.toml ./ +COPY app/services/mail/README.md ./ + +COPY app/core ./app/core +COPY app/services/common/ ./app/services/common/ +COPY app/services/mail/ ./app/services/mail/ + +RUN pip install --upgrade pip && pip install --no-cache-dir . +RUN mkdir -p /app/data + +CMD ["python", "-m", "app.services.mail.main"] diff --git a/ServicesTask/app/services/mail/README.md b/ServicesTask/app/services/mail/README.md new file mode 100644 index 0000000..e69de29 diff --git a/ServicesTask/app/services/mail/__init__.py b/ServicesTask/app/services/mail/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ServicesTask/app/services/mail/mail_service_async.py b/ServicesTask/app/services/mail/mail_service_async.py new file mode 100644 index 0000000..f218650 --- /dev/null +++ b/ServicesTask/app/services/mail/mail_service_async.py @@ -0,0 +1,15 @@ +import asyncio +import uuid +from services.service_base_async import ServiceBaseAsync + +async def produce(service: ServiceBaseAsync): + fake_mails = [{"to": "user@example.com", "subj": "Hi", "body": "Hello!"}] + for mail in fake_mails: + await service.enqueue(mail, "send-mail") + +async def consume(service: ServiceBaseAsync, job: dict): + print(f"[MAIL] Gönderiliyor: {job}") + await asyncio.sleep(0.1) + +if __name__ == "__main__": + asyncio.run(ServiceBaseAsync(produce, consume).run()) diff --git a/ServicesTask/app/services/mail/main.py b/ServicesTask/app/services/mail/main.py new file mode 100644 index 0000000..31b252b --- /dev/null +++ b/ServicesTask/app/services/mail/main.py @@ -0,0 +1,29 @@ +import os +import uuid +import asyncio + +from app.services.common.service_base_async import ServiceBaseAsync + + +PRODUCE_ENABLED = os.getenv("PRODUCE_ENABLED", "true").lower() == "true" +PRODUCE_BATCH = int(os.getenv("PRODUCE_BATCH", "3")) # her produce tick'inde kaç iş +TASK_TYPE = os.getenv("TASK_TYPE", "db-task") # iş tipi (task_id'de de kullanılır) +CONSUME_SLEEP_SEC = float(os.getenv("CONSUME_SLEEP_SEC", "0.5")) # işleme süresi simülasyonu (sn) +STATIC_IDS = ["2c47f1073a9d4f05aad6c15484894a74", "65827e3452b545d6845e050a503401f4", "5c663088f09d4062b4e567f47335fb1e"] + + +async def produce(service: ServiceBaseAsync): + for biz_id in STATIC_IDS: + deterministic_task_id = f"{TASK_TYPE}:{biz_id}" + payload = {"id": biz_id, "op": "sync", "source": "db-service"} + await service.enqueue(payload, TASK_TYPE, task_id=deterministic_task_id) + print(f"[DB] produce tick attempted ids={','.join(STATIC_IDS)}") + + +async def consume(service: ServiceBaseAsync, job: dict): + await asyncio.sleep(CONSUME_SLEEP_SEC) + print(f"[DB] consumed task={job['task_id']} attempts={job.get('_attempts', 0)}") + + +if __name__ == "__main__": + asyncio.run(ServiceBaseAsync(produce, consume).run()) diff --git a/ServicesTask/app/services/mail/pyproject.toml b/ServicesTask/app/services/mail/pyproject.toml new file mode 100644 index 0000000..cfa5c84 --- /dev/null +++ b/ServicesTask/app/services/mail/pyproject.toml @@ -0,0 +1,35 @@ +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "dual-queue-services" +version = "0.1.0" +description = "Async dual queue system with Redis Streams and SQLite persistence" +readme = "README.md" +requires-python = ">=3.11" +authors = [ + { name = "Berkay Karatay", email = "karatay.berkay@gmail.com" } +] +dependencies = [ + "redis>=5.0.0", + "aiosqlite>=0.19.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=7.4", + "black>=23.0", + "isort>=5.12" +] + +[tool.black] +line-length = 88 +target-version = ["py311"] + +[tool.isort] +profile = "black" + +[tool.setuptools.packages.find] +where = ["app"] +include = ["app*"] diff --git a/ServicesTask/app/services/queue/Dockerfile b/ServicesTask/app/services/queue/Dockerfile new file mode 100644 index 0000000..b7fe2e4 --- /dev/null +++ b/ServicesTask/app/services/queue/Dockerfile @@ -0,0 +1,18 @@ +FROM python:3.12-slim + +ENV PYTHONDONTWRITEBYTECODE=1 PYTHONUNBUFFERED=1 +ENV PYTHONPATH=/app + +WORKDIR / + +COPY app/services/queue/pyproject.toml ./ +COPY app/services/queue/README.md ./ + +COPY app/core ./app/core +COPY app/services/common/ ./app/services/common/ +COPY app/services/queue/ ./app/services/queue/ + +RUN pip install --upgrade pip && pip install --no-cache-dir . +RUN mkdir -p /app/data + +CMD ["python", "-m", "app.services.queue.main"] diff --git a/ServicesTask/app/services/queue/README.md b/ServicesTask/app/services/queue/README.md new file mode 100644 index 0000000..e69de29 diff --git a/ServicesTask/app/services/queue/__init__.py b/ServicesTask/app/services/queue/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ServicesTask/app/services/queue/main.py b/ServicesTask/app/services/queue/main.py new file mode 100644 index 0000000..2b83cf8 --- /dev/null +++ b/ServicesTask/app/services/queue/main.py @@ -0,0 +1,17 @@ +import uuid +import asyncio + +from app.services.common.service_base_async import ServiceBaseAsync + +async def produce(service: ServiceBaseAsync): + print(f"Queue Reader Service up and running.") + while True: + await asyncio.sleep(1) + +async def consume(service: ServiceBaseAsync, job: dict): + await asyncio.sleep(0.1) + print(f"Queue Sender Service up and running. Job: {job}") + + +if __name__ == "__main__": + asyncio.run(ServiceBaseAsync(produce, consume).run()) diff --git a/ServicesTask/app/services/queue/pyproject.toml b/ServicesTask/app/services/queue/pyproject.toml new file mode 100644 index 0000000..cfa5c84 --- /dev/null +++ b/ServicesTask/app/services/queue/pyproject.toml @@ -0,0 +1,35 @@ +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "dual-queue-services" +version = "0.1.0" +description = "Async dual queue system with Redis Streams and SQLite persistence" +readme = "README.md" +requires-python = ">=3.11" +authors = [ + { name = "Berkay Karatay", email = "karatay.berkay@gmail.com" } +] +dependencies = [ + "redis>=5.0.0", + "aiosqlite>=0.19.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=7.4", + "black>=23.0", + "isort>=5.12" +] + +[tool.black] +line-length = 88 +target-version = ["py311"] + +[tool.isort] +profile = "black" + +[tool.setuptools.packages.find] +where = ["app"] +include = ["app*"] diff --git a/ServicesTask/app/services/queue/queue_service_async.py b/ServicesTask/app/services/queue/queue_service_async.py new file mode 100644 index 0000000..5183539 --- /dev/null +++ b/ServicesTask/app/services/queue/queue_service_async.py @@ -0,0 +1,16 @@ +import asyncio + +from services.service_base_async import ServiceBaseAsync + + +async def produce(service: ServiceBaseAsync): + fake_jobs = [{"action": "cleanup", "target": "old-tasks"}] + for job in fake_jobs: + await service.enqueue(job, "queue-maintenance") + +async def consume(service: ServiceBaseAsync, job: dict): + print(f"[QUEUE CONTROL] İşleme alındı: {job}") + await asyncio.sleep(0.05) + +if __name__ == "__main__": + asyncio.run(ServiceBaseAsync(produce, consume).run()) diff --git a/ServicesTask/app/services/test/Dockerfile b/ServicesTask/app/services/test/Dockerfile new file mode 100644 index 0000000..96473d9 --- /dev/null +++ b/ServicesTask/app/services/test/Dockerfile @@ -0,0 +1,18 @@ +FROM python:3.12-slim + +ENV PYTHONDONTWRITEBYTECODE=1 PYTHONUNBUFFERED=1 +ENV PYTHONPATH=/app + +WORKDIR / + +COPY app/services/test/pyproject.toml ./ +COPY app/services/test/README.md ./ + +COPY app/core ./app/core +COPY app/services/common/ ./app/services/common/ +COPY app/services/test/ ./app/services/test/ + +RUN pip install --upgrade pip && pip install --no-cache-dir . +RUN mkdir -p /app/data + +CMD ["python", "-m", "app.services.test.main"] diff --git a/ServicesTask/app/services/test/README.md b/ServicesTask/app/services/test/README.md new file mode 100644 index 0000000..e69de29 diff --git a/ServicesTask/app/services/test/main.py b/ServicesTask/app/services/test/main.py new file mode 100644 index 0000000..d8fe33a --- /dev/null +++ b/ServicesTask/app/services/test/main.py @@ -0,0 +1,71 @@ +import os +import asyncio +import uuid +import json +import aiosqlite +import redis.asyncio as aioredis + +from app.core.config import RedisConfig, Env +from app.core.utils import now_ms + + +SQLITE_PATH = Env.SQLITE_PATH +REDIS_STREAM_DATABASE_PUBLISH = os.getenv("REDIS_STREAM_DATABASE_PUBLISH", "ACCOUNT:SERVICES:DATABASE:PUBLISH") +REDIS_STREAM_DATABASE_TASKS = os.getenv("REDIS_STREAM_DATABASE_TASKS", "ACCOUNT:SERVICES:DATABASE:TASKS") +REDIS_STREAM_MAIL_PUBLISH = os.getenv("REDIS_STREAM_MAIL_PUBLISH", "ACCOUNT:SERVICES:MAIL:PUBLISH") +REDIS_STREAM_MAIL_TASKS = os.getenv("REDIS_STREAM_MAIL_TASKS", "ACCOUNT:SERVICES:MAIL:TASKS") +REDIS_STREAM_QUEUE_PUBLISH = os.getenv("REDIS_STREAM_QUEUE_PUBLISH", "ACCOUNT:SERVICES:QUEUE:PUBLISH") +REDIS_STREAM_QUEUE_TASKS = os.getenv("REDIS_STREAM_QUEUE_TASKS", "ACCOUNT:SERVICES:QUEUE:TASKS") + + +async def ensure_schema(sqlite_path: str): + async with aiosqlite.connect(sqlite_path) as db: + await db.execute(""" + CREATE TABLE IF NOT EXISTS tasks( + task_id TEXT PRIMARY KEY, + queue TEXT NOT NULL, + type TEXT NOT NULL, + payload_json TEXT NOT NULL, + created_at INTEGER NOT NULL, + status TEXT DEFAULT 'pending', + attempts INTEGER DEFAULT 0, + last_error TEXT + ); + """) + await db.commit() + +async def enqueue(r: aioredis.Redis, sqlite_path: str, stream: str, payload: dict, type_: str): + task_id = payload.get("task_id") or str(uuid.uuid4()) + task = {"task_id": task_id, "queue": stream, "type": type_, "payload": payload, "created_at": now_ms(), "_attempts": 0} + await r.xadd(stream, {"data": json.dumps(task)}) + async with aiosqlite.connect(sqlite_path) as db: + await db.execute("""INSERT OR REPLACE INTO tasks(task_id, queue, type, payload_json, created_at, status, attempts) VALUES(?,?,?,?,?,'pending',?)""", + (task_id, stream, type_, json.dumps(payload), task["created_at"], 0)) + await db.commit() + +async def push_db_mocks(r: aioredis.Redis, sqlite_path: str, n: int = 3): + for i in range(n): + payload = {"id": uuid.uuid4().hex, "op": "sync", "source": "tester"} + await enqueue(r, sqlite_path, REDIS_STREAM_DATABASE_TASKS, payload, "db-sync") + +async def push_mail_mocks(r: aioredis.Redis, sqlite_path: str, n: int = 3): + for i in range(n): + payload = {"to": f"user{i}@example.com", "subj": "Hello", "body": "Hi!", "source": "tester"} + await enqueue(r, sqlite_path, REDIS_STREAM_MAIL_TASKS, payload, "send-mail") + +async def push_queue_mocks(r: aioredis.Redis, sqlite_path: str, n: int = 3): + for i in range(n): + payload = {"action": "cleanup", "target": f"old-tasks-{i}", "source": "tester"} + await enqueue(r, sqlite_path, REDIS_STREAM_QUEUE_TASKS, payload, "queue-maintenance") + +async def main(): + db_n, mail_n, queue_n = 3, 3, 3 + cfg = RedisConfig() + r = aioredis.Redis(host=cfg.host, port=cfg.port, db=cfg.db, username=cfg.username, password=cfg.password) + await ensure_schema(SQLITE_PATH) + await push_db_mocks(r, SQLITE_PATH, db_n) + await push_mail_mocks(r, SQLITE_PATH, mail_n) + await push_queue_mocks(r, SQLITE_PATH, queue_n) + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/ServicesTask/app/services/test/pyproject.toml b/ServicesTask/app/services/test/pyproject.toml new file mode 100644 index 0000000..cfa5c84 --- /dev/null +++ b/ServicesTask/app/services/test/pyproject.toml @@ -0,0 +1,35 @@ +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "dual-queue-services" +version = "0.1.0" +description = "Async dual queue system with Redis Streams and SQLite persistence" +readme = "README.md" +requires-python = ">=3.11" +authors = [ + { name = "Berkay Karatay", email = "karatay.berkay@gmail.com" } +] +dependencies = [ + "redis>=5.0.0", + "aiosqlite>=0.19.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=7.4", + "black>=23.0", + "isort>=5.12" +] + +[tool.black] +line-length = 88 +target-version = ["py311"] + +[tool.isort] +profile = "black" + +[tool.setuptools.packages.find] +where = ["app"] +include = ["app*"] diff --git a/ServicesTask/app/storage/sqlite_queue.py b/ServicesTask/app/storage/sqlite_queue.py new file mode 100644 index 0000000..f2ff1e1 --- /dev/null +++ b/ServicesTask/app/storage/sqlite_queue.py @@ -0,0 +1,51 @@ +import sqlite3 +import json +from typing import Any, Dict, List +from core.config import Env + +class SqliteQueue: + """ + """ + + def __init__(self, db_path: str = Env.SQLITE_PATH): + self.db_path = db_path + self._init_schema() + + def _conn(self): + conn = sqlite3.connect(self.db_path) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL;") + conn.execute("PRAGMA synchronous=NORMAL;") + return conn + + def _init_schema(self): + with self._conn() as con: + con.executescript(""" + CREATE TABLE IF NOT EXISTS tasks( + task_id TEXT PRIMARY KEY, + queue TEXT NOT NULL, + type TEXT NOT NULL, + payload_json TEXT NOT NULL, + created_at INTEGER NOT NULL, + status TEXT DEFAULT 'pending', + attempts INTEGER DEFAULT 0, + last_error TEXT + ); + CREATE INDEX IF NOT EXISTS idx_tasks_queue_status ON tasks(queue, status); + """) + + def add_task(self, task: Dict[str, Any]) -> None: + with self._conn() as con: + con.execute(""" + INSERT OR REPLACE INTO tasks(task_id, queue, type, payload_json, created_at, status, attempts) + VALUES(?,?,?,?,?,'pending',?) + """, (task["task_id"], task["queue"], task["type"], json.dumps(task["payload"]), task["created_at"], task.get("_attempts", 0))) + + def update_task(self, task_id: str, status: str, error: str | None = None, attempts: int | None = None) -> None: + with self._conn() as con: + con.execute("""UPDATE tasks SET status=?, last_error=?, attempts=COALESCE(?, attempts) WHERE task_id=? """, (status, error, attempts, task_id)) + + def pending_for_queue(self, queue: str) -> List[Dict[str, Any]]: + with self._conn() as con: + rows = con.execute("SELECT * FROM tasks WHERE queue=? AND status='pending'", (queue,)).fetchall() + return [dict(r) for r in rows] diff --git a/ServicesTask/docker-compose.yml b/ServicesTask/docker-compose.yml new file mode 100644 index 0000000..2e0019b --- /dev/null +++ b/ServicesTask/docker-compose.yml @@ -0,0 +1,170 @@ + +networks: + servicesNetwork: + driver: bridge + +volumes: + sqlite_data: + prom_data: + grafana_data: + nats_data: + nui_data: + +services: + nats: + image: nats:latest + command: ["-js", "-m", "8222"] + ports: + - "4222:4222" + - "8222:8222" + volumes: + - ./app/core/nats/nats.conf:/etc/nats/nats.conf:ro + - nats_data:/data/jetstream + networks: [servicesNetwork] + restart: unless-stopped + + nats-exporter: + image: natsio/prometheus-nats-exporter:latest + command: + - "-varz" + - "-connz" + - "-subz" + - "-routez" + - "-jsz=all" + - "http://nats:8222" + depends_on: [nats] + expose: + - "7777" + networks: [servicesNetwork] + restart: unless-stopped + + prometheus: + image: prom/prometheus:latest + depends_on: [nats-exporter] + networks: [servicesNetwork] + volumes: + - ./monitor/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml:ro + - prom_data:/prometheus + command: + - "--config.file=/etc/prometheus/prometheus.yml" + - "--storage.tsdb.path=/prometheus" + - "--web.enable-lifecycle" + ports: + - "9090:9090" + restart: unless-stopped + + grafana: + image: grafana/grafana:latest + depends_on: [prometheus] + networks: [servicesNetwork] + environment: + - GF_SECURITY_ADMIN_USER=admin + - GF_SECURITY_ADMIN_PASSWORD=admin + - GF_USERS_ALLOW_SIGN_UP=false + volumes: + - grafana_data:/var/lib/grafana + - ./monitor/grafana/provisioning/datasources:/etc/grafana/provisioning/datasources:ro + ports: + - "3000:3000" + restart: unless-stopped + + nats-ui: + image: ghcr.io/nats-nui/nui:latest + ports: + - "127.0.0.1:31311:31311" + networks: [servicesNetwork] + volumes: + - nui_data:/db + restart: unless-stopped + + db-service: + build: + context: . + dockerfile: app/services/database/Dockerfile + depends_on: [nats] + networks: [servicesNetwork] + env_file: [.env] + environment: + NATS_URL: "nats://nats:4222" + JS_STREAM: "ACCOUNT_SERVICES_DATABASE" + JS_TASKS_SUBJECT: "ACCOUNT.SERVICES.DATABASE.TASKS" + JS_PUBLISH_SUBJECT: "ACCOUNT.SERVICES.DATABASE.PUBLISH" + JS_DURABLE: "DB_WORKERS" + BATCH_SIZE: "5" + ACK_WAIT_SEC: "30" + MAX_DELIVER: "3" + SQLITE_PATH: "/app/data/queue.db" + TASK_TYPE: "db-task" + CONSUME_SLEEP_SEC: "0.5" + SERVICE_NAME: "db-service" + METRICS_PORT: "8000" + volumes: + - sqlite_data:/app/data + restart: unless-stopped + + # mail-service: + # build: + # context: . + # dockerfile: app/services/mail/Dockerfile + # volumes: + # - sqlite_data:/app/data + # env_file: [.env] + # environment: + # REDIS_STREAM_PUBLISH: ACCOUNT:SERVICES:MAIL:PUBLISH + # REDIS_STREAM_TASKS: ACCOUNT:SERVICES:MAIL:TASKS + # CONSUME_BACKLOG: true + # depends_on: [redis] + # networks: [servicesNetwork] + # restart: unless-stopped + # logging: + # driver: "json-file" + # options: + # max-size: "10m" + # max-file: "3" + + # queue-service: + # build: + # context: . + # dockerfile: app/services/queue/Dockerfile + # volumes: + # - sqlite_data:/app/data + # env_file: [.env] + # environment: + # REDIS_STREAM_PUBLISH: ACCOUNT:SERVICES:QUEUE:PUBLISH + # REDIS_STREAM_TASKS: ACCOUNT:SERVICES:QUEUE:TASKS + # depends_on: [redis] + # networks: [servicesNetwork] + # restart: unless-stopped + # logging: + # driver: "json-file" + # options: + # max-size: "10m" + # max-file: "3" + + # tester: + # build: + # context: . + # dockerfile: app/services/test/Dockerfile + # volumes: + # - sqlite_data:/app/data + # env_file: [.env] + # environment: + # REDIS_STREAM_DATABASE_PUBLISH: ACCOUNT:SERVICES:DATABASE:PUBLISH + # REDIS_STREAM_DATABASE_TASKS: ACCOUNT:SERVICES:DATABASE:TASKS + # REDIS_STREAM_MAIL_PUBLISH: ACCOUNT:SERVICES:MAIL:PUBLISH + # REDIS_STREAM_MAIL_TASKS: ACCOUNT:SERVICES:MAIL:TASKS + # REDIS_STREAM_QUEUE_PUBLISH: ACCOUNT:SERVICES:QUEUE:PUBLISH + # REDIS_STREAM_QUEUE_TASKS: ACCOUNT:SERVICES:QUEUE:TASKS + # depends_on: + # - redis + # # - db-service + # # - mail-service + # # - queue-service + # networks: [servicesNetwork] + # restart: "no" + # logging: + # driver: "json-file" + # options: + # max-size: "10m" + # max-file: "3" + diff --git a/ServicesTask/monitor/grafana/provisioning/datasources/datasource.yml b/ServicesTask/monitor/grafana/provisioning/datasources/datasource.yml new file mode 100644 index 0000000..2510ee9 --- /dev/null +++ b/ServicesTask/monitor/grafana/provisioning/datasources/datasource.yml @@ -0,0 +1,9 @@ +apiVersion: 1 +datasources: + - name: Prometheus + type: prometheus + access: proxy + isDefault: true + url: http://prometheus:9090 + jsonData: + timeInterval: 5s diff --git a/ServicesTask/monitor/prometheus/prometheus.yml b/ServicesTask/monitor/prometheus/prometheus.yml new file mode 100644 index 0000000..2e13e8c --- /dev/null +++ b/ServicesTask/monitor/prometheus/prometheus.yml @@ -0,0 +1,16 @@ +global: + scrape_interval: 5s + evaluation_interval: 5s + +scrape_configs: + - job_name: prometheus + static_configs: + - targets: ["prometheus:9090"] + + - job_name: nats + static_configs: + - targets: ["nats-exporter:7777"] + + - job_name: db_service + static_configs: + - targets: ["db-service:8000"] \ No newline at end of file