339 lines
11 KiB
Python
339 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
||
__title__ = "dbtext"
|
||
__author__ = 'Mehmet Karatay & "Saraswati" (ChatGPT)'
|
||
__purpose__ = "Sensör ve röle olaylarını metin tabanlı SQL log olarak saklayan yardımcı sınıf"
|
||
__version__ = "0.2.0"
|
||
__date__ = "2025-11-20"
|
||
|
||
"""
|
||
ebuild/io/dbtext.py
|
||
|
||
Revision : 2025-11-20
|
||
Authors : Mehmet Karatay & "Saraswati" (ChatGPT)
|
||
|
||
Amaç
|
||
-----
|
||
Her sensör ve rölenin:
|
||
- Ne zaman açıldığı / kapandığı
|
||
- Hangi değeri ürettiği
|
||
- Hangi kaynaktan geldiği
|
||
|
||
bilgisini tarih-saat bazlı olarak düz bir metin dosyasında tutmak.
|
||
|
||
Kayıt formatı (satır başına bir olay):
|
||
INSERT INTO <table> (ts, app, source, event_type, value, unit, extra)
|
||
VALUES ('YYYY-MM-DD HH:MM:SS', 'APP', 'SOURCE', 'EVENT', VALUE, 'UNIT', 'EXTRA');
|
||
|
||
Örnek:
|
||
INSERT INTO ebrulor_log (ts, app, source, event_type, value, unit, extra)
|
||
VALUES ('2025-11-20 12:34:56', 'ESYSTEM', 'relay:circulation_a', 'state', 1, 'bool', 'on');
|
||
|
||
Böylece:
|
||
- Dosya istenirse direkt PostgreSQL'e pipe edilip çalıştırılabilir
|
||
- Aynı zamanda bu modül basit bir parser ile geri okunabilir
|
||
"""
|
||
|
||
#from __future__ import annotations
|
||
|
||
import os
|
||
import datetime as _dt
|
||
import re
|
||
from typing import List, Optional, Dict, Any
|
||
|
||
from .config_ini import KilitliDosya
|
||
|
||
|
||
class DBText:
|
||
""" Metin tabanlı SQL log dosyası için yardımcı sınıf.
|
||
|
||
Parametreler
|
||
-----------
|
||
filename : str
|
||
Log dosyasının yolu (örnek: "ebina_log.sql").
|
||
table : str
|
||
SQL INSERT komutlarında kullanılacak tablo adı.
|
||
app : str
|
||
Uygulama adı (örn. "ESYSTEM").
|
||
use_lock : bool
|
||
True ise yazarken KilitliDosya kullanılır (çoklu süreç için daha güvenli).
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
filename: str,
|
||
table: str = "ebrulor_log",
|
||
app: str = "EBUILD",
|
||
use_lock: bool = True,
|
||
) -> None:
|
||
self.filename = filename
|
||
self.table = table
|
||
self.app = app
|
||
self.use_lock = use_lock
|
||
|
||
# Dosya yoksa basit bir header ile oluştur
|
||
if not os.path.isfile(self.filename):
|
||
with open(self.filename, "w", encoding="utf-8") as f:
|
||
f.write(f"-- DBText log file for table {self.table}\n")
|
||
f.write(f"-- created at {_dt.datetime.now().isoformat()}\n\n")
|
||
|
||
self._locker = KilitliDosya(self.filename) if use_lock else None
|
||
|
||
# SQL satırlarını parse etmek için basit regex
|
||
self._re_values = re.compile(
|
||
r"VALUES \('(?P<ts>[^']*)',\s*'(?P<app>[^']*)',\s*'(?P<source>[^']*)',\s*"
|
||
r"'(?P<etype>[^']*)',\s*(?P<value>NULL|[-0-9.]+),\s*(?P<unit>NULL|'[^']*'),\s*"
|
||
r"'(?P<extra>[^']*)'\);"
|
||
)
|
||
|
||
# -------------------------------------------------
|
||
# Yardımcılar
|
||
# -------------------------------------------------
|
||
@staticmethod
|
||
def _escape(value: str) -> str:
|
||
"""SQL için tek tırnak kaçışı yapar."""
|
||
return value.replace("'", "''")
|
||
|
||
def _write_line(self, line: str) -> None:
|
||
""" Tek bir satırı log dosyasına yazar.
|
||
|
||
use_lock=True ise KilitliDosya üzerinden, değilse doğrudan append.
|
||
"""
|
||
if self.use_lock and self._locker is not None:
|
||
self._locker.yaz(line + "\n")
|
||
else:
|
||
with open(self.filename, "a", encoding="utf-8") as f:
|
||
f.write(line + "\n")
|
||
|
||
# -------------------------------------------------
|
||
# Genel amaçlı event yazma
|
||
# -------------------------------------------------
|
||
def insert_event(
|
||
self,
|
||
source: str,
|
||
event_type: str,
|
||
value: Optional[float] = None,
|
||
unit: Optional[str] = None,
|
||
timestamp: Optional[_dt.datetime] = None,
|
||
extra: str = "",
|
||
) -> None:
|
||
""" Genel amaçlı bir olay kaydı ekler.
|
||
|
||
Örnek kullanım:
|
||
logger.insert_event(
|
||
source="Sensor:28-00000660e983",
|
||
event_type="temperature",
|
||
value=23.5,
|
||
unit="°C",
|
||
timestamp=datetime.now(),
|
||
extra="Daire 2 Kat 1 Yön 5",
|
||
)
|
||
"""
|
||
ts = timestamp or _dt.datetime.now()
|
||
ts_str = ts.strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
# Değerler
|
||
v_str = "NULL" if value is None else f"{float(value):.3f}"
|
||
u_str = "NULL" if unit is None else f"'{self._escape(str(unit))}'"
|
||
extra_str = self._escape(extra or "")
|
||
|
||
src_str = self._escape(source)
|
||
etype_str = self._escape(event_type)
|
||
app_str = self._escape(self.app)
|
||
|
||
line = (
|
||
f"INSERT INTO {self.table} (ts, app, source, event_type, value, unit, extra) "
|
||
f"VALUES ('{ts_str}', '{app_str}', '{src_str}', '{etype_str}', {v_str}, {u_str}, '{extra_str}');"
|
||
)
|
||
|
||
self._write_line(line)
|
||
|
||
# -------------------------------------------------
|
||
# Sensör / röle özel kısayol metotları
|
||
# -------------------------------------------------
|
||
def log_state_change(
|
||
self,
|
||
device_kind: str,
|
||
name: str,
|
||
is_on: bool,
|
||
timestamp: Optional[_dt.datetime] = None,
|
||
extra: str = "",
|
||
) -> None:
|
||
""" Röle / dijital çıkış / giriş gibi ON/OFF durumlarını loglar.
|
||
|
||
device_kind : "relay", "sensor", "pump" vb.
|
||
name : cihaz ismi ("circulation_a", "burner_contactor" vb.)
|
||
is_on : True → 1 (on), False → 0 (off)
|
||
|
||
Event:
|
||
source = f"{device_kind}:{name}"
|
||
event_type = "state"
|
||
value = 1.0 / 0.0
|
||
unit = "bool"
|
||
"""
|
||
source = f"{device_kind}:{name}"
|
||
val = 1.0 if is_on else 0.0
|
||
ex = extra or ("on" if is_on else "off")
|
||
|
||
self.insert_event(
|
||
source=source,
|
||
event_type="state",
|
||
value=val,
|
||
unit="bool",
|
||
timestamp=timestamp,
|
||
extra=ex,
|
||
)
|
||
|
||
def log_sensor_value(
|
||
self,
|
||
name: str,
|
||
value: float,
|
||
unit: str = "",
|
||
timestamp: Optional[_dt.datetime] = None,
|
||
extra: str = "",
|
||
) -> None:
|
||
""" Analog / sayısal sensör değerlerini loglar.
|
||
|
||
Örnek:
|
||
logger.log_sensor_value("outside_temp", 12.3, "°C")
|
||
"""
|
||
source = f"sensor:{name}"
|
||
self.insert_event(
|
||
source=source,
|
||
event_type="measurement",
|
||
value=value,
|
||
unit=unit,
|
||
timestamp=timestamp,
|
||
extra=extra,
|
||
)
|
||
|
||
# -------------------------------------------------
|
||
# Okuma API'si
|
||
# -------------------------------------------------
|
||
def _parse_line(self, line: str) -> Optional[Dict[str, Any]]:
|
||
""" Tek bir INSERT satırını dict'e çevirir.
|
||
|
||
Beklenen format:
|
||
INSERT INTO <table> (...) VALUES ('ts', 'app', 'source', 'etype', value, unit, 'extra');
|
||
"""
|
||
line = line.strip()
|
||
if not line or not line.upper().startswith("INSERT INTO"):
|
||
return None
|
||
|
||
m = self._re_values.search(line)
|
||
if not m:
|
||
return None
|
||
|
||
gd = m.groupdict()
|
||
ts_str = gd.get("ts", "")
|
||
try:
|
||
ts = _dt.datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S")
|
||
except Exception:
|
||
ts = None
|
||
|
||
# value
|
||
raw_v = gd.get("value", "NULL")
|
||
if raw_v == "NULL":
|
||
value = None
|
||
else:
|
||
try:
|
||
value = float(raw_v)
|
||
except Exception:
|
||
value = None
|
||
|
||
# unit
|
||
raw_u = gd.get("unit", "NULL")
|
||
if raw_u == "NULL":
|
||
unit = None
|
||
else:
|
||
# 'C' şeklindeki stringten tek tırnakları atıyoruz
|
||
unit = raw_u.strip("'")
|
||
|
||
return {
|
||
"ts": ts,
|
||
"app": gd.get("app", ""),
|
||
"source": gd.get("source", ""),
|
||
"event_type": gd.get("etype", ""),
|
||
"value": value,
|
||
"unit": unit,
|
||
"extra": gd.get("extra", ""),
|
||
}
|
||
|
||
def iter_events(
|
||
self,
|
||
source: Optional[str] = None,
|
||
event_type: Optional[str] = None,
|
||
since: Optional[_dt.datetime] = None,
|
||
until: Optional[_dt.datetime] = None,
|
||
):
|
||
""" Log dosyasındaki olayları satır satır okur ve filtre uygular.
|
||
|
||
Parametreler:
|
||
source : None veya tam eşleşen source string
|
||
event_type : None veya tam eşleşen event_type
|
||
since : None veya bu tarihten SONRAKİ kayıtlar
|
||
until : None veya bu tarihten ÖNCEKİ kayıtlar
|
||
|
||
Yield:
|
||
dict: {ts, app, source, event_type, value, unit, extra}
|
||
"""
|
||
if not os.path.isfile(self.filename):
|
||
return
|
||
|
||
with open(self.filename, "r", encoding="utf-8") as f:
|
||
for line in f:
|
||
rec = self._parse_line(line)
|
||
if not rec:
|
||
continue
|
||
|
||
ts = rec["ts"]
|
||
if since and ts and ts < since:
|
||
continue
|
||
if until and ts and ts > until:
|
||
continue
|
||
if source and rec["source"] != source:
|
||
continue
|
||
if event_type and rec["event_type"] != event_type:
|
||
continue
|
||
|
||
yield rec
|
||
|
||
def get_state_history(
|
||
self,
|
||
device_kind: str,
|
||
name: str,
|
||
limit: int = 100,
|
||
since: Optional[_dt.datetime] = None,
|
||
until: Optional[_dt.datetime] = None,
|
||
) -> List[Dict[str, Any]]:
|
||
""" Belirli bir cihazın (sensör / röle) son durum değişikliklerini döndürür.
|
||
|
||
device_kind : "relay", "sensor", "pump" vb.
|
||
name : cihaz adı
|
||
limit : maksimum kaç kayıt döneceği (en yeni kayıtlar)
|
||
"""
|
||
src = f"{device_kind}:{name}"
|
||
events = list(self.iter_events(
|
||
source=src,
|
||
event_type="state",
|
||
since=since,
|
||
until=until,
|
||
))
|
||
|
||
# En yeni kayıtlar sondadır; tersten limit al
|
||
events.sort(key=lambda r: (r["ts"] or _dt.datetime.min), reverse=True)
|
||
return events[:limit]
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# Basit self-test
|
||
logger = DBText(filename="test_dbtext_log.sql", table="ebrulor_log", app="ESYSTEM")
|
||
|
||
now = _dt.datetime.now()
|
||
logger.log_state_change("relay", "circulation_a", True, timestamp=now, extra="manual test on")
|
||
logger.log_state_change("relay", "circulation_a", False, timestamp=now + _dt.timedelta(seconds=10), extra="manual test off")
|
||
|
||
print("Son durum değişiklikleri:")
|
||
history = logger.get_state_history("relay", "circulation_a", limit=10)
|
||
for h in history:
|
||
print(h)
|