# -*- coding: utf-8 -*-
__title__ = "dbtext"
__author__ = 'Mehmet Karatay & "Saraswati" (ChatGPT)'
__purpose__ = "Sensör ve röle olaylarını metin tabanlı SQL log olarak saklayan yardımcı sınıf"
__version__ = "0.2.0"
__date__ = "2025-11-20"
"""
ebuild/io/dbtext.py
Revision : 2025-11-20
Authors : Mehmet Karatay & "Saraswati" (ChatGPT)
Amaç
-----
Her sensör ve rölenin:
- Ne zaman açıldığı / kapandığı
- Hangi değeri ürettiği
- Hangi kaynaktan geldiği
bilgisini tarih-saat bazlı olarak düz bir metin dosyasında tutmak.
Kayıt formatı (satır başına bir olay):
INSERT INTO
(ts, app, source, event_type, value, unit, extra)
VALUES ('YYYY-MM-DD HH:MM:SS', 'APP', 'SOURCE', 'EVENT', VALUE, 'UNIT', 'EXTRA');
Örnek:
INSERT INTO ebrulor_log (ts, app, source, event_type, value, unit, extra)
VALUES ('2025-11-20 12:34:56', 'ESYSTEM', 'relay:circulation_a', 'state', 1, 'bool', 'on');
Böylece:
- Dosya istenirse direkt PostgreSQL'e pipe edilip çalıştırılabilir
- Aynı zamanda bu modül basit bir parser ile geri okunabilir
"""
#from __future__ import annotations
import os
import datetime as _dt
import re
from typing import List, Optional, Dict, Any
from .config_ini import KilitliDosya
class DBText:
""" Metin tabanlı SQL log dosyası için yardımcı sınıf.
Parametreler
-----------
filename : str
Log dosyasının yolu (örnek: "ebina_log.sql").
table : str
SQL INSERT komutlarında kullanılacak tablo adı.
app : str
Uygulama adı (örn. "ESYSTEM").
use_lock : bool
True ise yazarken KilitliDosya kullanılır (çoklu süreç için daha güvenli).
"""
def __init__(
self,
filename: str,
table: str = "ebrulor_log",
app: str = "EBUILD",
use_lock: bool = True,
) -> None:
self.filename = filename
self.table = table
self.app = app
self.use_lock = use_lock
# Dosya yoksa basit bir header ile oluştur
if not os.path.isfile(self.filename):
with open(self.filename, "w", encoding="utf-8") as f:
f.write(f"-- DBText log file for table {self.table}\n")
f.write(f"-- created at {_dt.datetime.now().isoformat()}\n\n")
self._locker = KilitliDosya(self.filename) if use_lock else None
# SQL satırlarını parse etmek için basit regex
self._re_values = re.compile(
r"VALUES \('(?P[^']*)',\s*'(?P[^']*)',\s*'(?P[^']*)',\s*"
r"'(?P[^']*)',\s*(?PNULL|[-0-9.]+),\s*(?PNULL|'[^']*'),\s*"
r"'(?P[^']*)'\);"
)
# -------------------------------------------------
# Yardımcılar
# -------------------------------------------------
@staticmethod
def _escape(value: str) -> str:
"""SQL için tek tırnak kaçışı yapar."""
return value.replace("'", "''")
def _write_line(self, line: str) -> None:
""" Tek bir satırı log dosyasına yazar.
use_lock=True ise KilitliDosya üzerinden, değilse doğrudan append.
"""
if self.use_lock and self._locker is not None:
self._locker.yaz(line + "\n")
else:
with open(self.filename, "a", encoding="utf-8") as f:
f.write(line + "\n")
# -------------------------------------------------
# Genel amaçlı event yazma
# -------------------------------------------------
def insert_event(
self,
source: str,
event_type: str,
value: Optional[float] = None,
unit: Optional[str] = None,
timestamp: Optional[_dt.datetime] = None,
extra: str = "",
) -> None:
""" Genel amaçlı bir olay kaydı ekler.
Örnek kullanım:
logger.insert_event(
source="Sensor:28-00000660e983",
event_type="temperature",
value=23.5,
unit="°C",
timestamp=datetime.now(),
extra="Daire 2 Kat 1 Yön 5",
)
"""
ts = timestamp or _dt.datetime.now()
ts_str = ts.strftime("%Y-%m-%d %H:%M:%S")
# Değerler
v_str = "NULL" if value is None else f"{float(value):.3f}"
u_str = "NULL" if unit is None else f"'{self._escape(str(unit))}'"
extra_str = self._escape(extra or "")
src_str = self._escape(source)
etype_str = self._escape(event_type)
app_str = self._escape(self.app)
line = (
f"INSERT INTO {self.table} (ts, app, source, event_type, value, unit, extra) "
f"VALUES ('{ts_str}', '{app_str}', '{src_str}', '{etype_str}', {v_str}, {u_str}, '{extra_str}');"
)
self._write_line(line)
# -------------------------------------------------
# Sensör / röle özel kısayol metotları
# -------------------------------------------------
def log_state_change(
self,
device_kind: str,
name: str,
is_on: bool,
timestamp: Optional[_dt.datetime] = None,
extra: str = "",
) -> None:
""" Röle / dijital çıkış / giriş gibi ON/OFF durumlarını loglar.
device_kind : "relay", "sensor", "pump" vb.
name : cihaz ismi ("circulation_a", "burner_contactor" vb.)
is_on : True → 1 (on), False → 0 (off)
Event:
source = f"{device_kind}:{name}"
event_type = "state"
value = 1.0 / 0.0
unit = "bool"
"""
source = f"{device_kind}:{name}"
val = 1.0 if is_on else 0.0
ex = extra or ("on" if is_on else "off")
self.insert_event(
source=source,
event_type="state",
value=val,
unit="bool",
timestamp=timestamp,
extra=ex,
)
def log_sensor_value(
self,
name: str,
value: float,
unit: str = "",
timestamp: Optional[_dt.datetime] = None,
extra: str = "",
) -> None:
""" Analog / sayısal sensör değerlerini loglar.
Örnek:
logger.log_sensor_value("outside_temp", 12.3, "°C")
"""
source = f"sensor:{name}"
self.insert_event(
source=source,
event_type="measurement",
value=value,
unit=unit,
timestamp=timestamp,
extra=extra,
)
# -------------------------------------------------
# Okuma API'si
# -------------------------------------------------
def _parse_line(self, line: str) -> Optional[Dict[str, Any]]:
""" Tek bir INSERT satırını dict'e çevirir.
Beklenen format:
INSERT INTO
(...) VALUES ('ts', 'app', 'source', 'etype', value, unit, 'extra');
"""
line = line.strip()
if not line or not line.upper().startswith("INSERT INTO"):
return None
m = self._re_values.search(line)
if not m:
return None
gd = m.groupdict()
ts_str = gd.get("ts", "")
try:
ts = _dt.datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S")
except Exception:
ts = None
# value
raw_v = gd.get("value", "NULL")
if raw_v == "NULL":
value = None
else:
try:
value = float(raw_v)
except Exception:
value = None
# unit
raw_u = gd.get("unit", "NULL")
if raw_u == "NULL":
unit = None
else:
# 'C' şeklindeki stringten tek tırnakları atıyoruz
unit = raw_u.strip("'")
return {
"ts": ts,
"app": gd.get("app", ""),
"source": gd.get("source", ""),
"event_type": gd.get("etype", ""),
"value": value,
"unit": unit,
"extra": gd.get("extra", ""),
}
def iter_events(
self,
source: Optional[str] = None,
event_type: Optional[str] = None,
since: Optional[_dt.datetime] = None,
until: Optional[_dt.datetime] = None,
):
""" Log dosyasındaki olayları satır satır okur ve filtre uygular.
Parametreler:
source : None veya tam eşleşen source string
event_type : None veya tam eşleşen event_type
since : None veya bu tarihten SONRAKİ kayıtlar
until : None veya bu tarihten ÖNCEKİ kayıtlar
Yield:
dict: {ts, app, source, event_type, value, unit, extra}
"""
if not os.path.isfile(self.filename):
return
with open(self.filename, "r", encoding="utf-8") as f:
for line in f:
rec = self._parse_line(line)
if not rec:
continue
ts = rec["ts"]
if since and ts and ts < since:
continue
if until and ts and ts > until:
continue
if source and rec["source"] != source:
continue
if event_type and rec["event_type"] != event_type:
continue
yield rec
def get_state_history(
self,
device_kind: str,
name: str,
limit: int = 100,
since: Optional[_dt.datetime] = None,
until: Optional[_dt.datetime] = None,
) -> List[Dict[str, Any]]:
""" Belirli bir cihazın (sensör / röle) son durum değişikliklerini döndürür.
device_kind : "relay", "sensor", "pump" vb.
name : cihaz adı
limit : maksimum kaç kayıt döneceği (en yeni kayıtlar)
"""
src = f"{device_kind}:{name}"
events = list(self.iter_events(
source=src,
event_type="state",
since=since,
until=until,
))
# En yeni kayıtlar sondadır; tersten limit al
events.sort(key=lambda r: (r["ts"] or _dt.datetime.min), reverse=True)
return events[:limit]
if __name__ == "__main__":
# Basit self-test
logger = DBText(filename="test_dbtext_log.sql", table="ebrulor_log", app="ESYSTEM")
now = _dt.datetime.now()
logger.log_state_change("relay", "circulation_a", True, timestamp=now, extra="manual test on")
logger.log_state_change("relay", "circulation_a", False, timestamp=now + _dt.timedelta(seconds=10), extra="manual test off")
print("Son durum değişiklikleri:")
history = logger.get_state_history("relay", "circulation_a", limit=10)
for h in history:
print(h)