ebuild_rasp2/ebuild/io/dbtext.py

339 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
__title__ = "dbtext"
__author__ = 'Mehmet Karatay & "Saraswati" (ChatGPT)'
__purpose__ = "Sensör ve röle olaylarını metin tabanlı SQL log olarak saklayan yardımcı sınıf"
__version__ = "0.2.0"
__date__ = "2025-11-20"
"""
ebuild/io/dbtext.py
Revision : 2025-11-20
Authors : Mehmet Karatay & "Saraswati" (ChatGPT)
Amaç
-----
Her sensör ve rölenin:
- Ne zaman açıldığı / kapandığı
- Hangi değeri ürettiği
- Hangi kaynaktan geldiği
bilgisini tarih-saat bazlı olarak düz bir metin dosyasında tutmak.
Kayıt formatı (satır başına bir olay):
INSERT INTO <table> (ts, app, source, event_type, value, unit, extra)
VALUES ('YYYY-MM-DD HH:MM:SS', 'APP', 'SOURCE', 'EVENT', VALUE, 'UNIT', 'EXTRA');
Örnek:
INSERT INTO ebrulor_log (ts, app, source, event_type, value, unit, extra)
VALUES ('2025-11-20 12:34:56', 'ESYSTEM', 'relay:circulation_a', 'state', 1, 'bool', 'on');
Böylece:
- Dosya istenirse direkt PostgreSQL'e pipe edilip çalıştırılabilir
- Aynı zamanda bu modül basit bir parser ile geri okunabilir
"""
#from __future__ import annotations
import os
import datetime as _dt
import re
from typing import List, Optional, Dict, Any
from .config_ini import KilitliDosya
class DBText:
""" Metin tabanlı SQL log dosyası için yardımcı sınıf.
Parametreler
-----------
filename : str
Log dosyasının yolu (örnek: "ebina_log.sql").
table : str
SQL INSERT komutlarında kullanılacak tablo adı.
app : str
Uygulama adı (örn. "ESYSTEM").
use_lock : bool
True ise yazarken KilitliDosya kullanılır (çoklu süreç için daha güvenli).
"""
def __init__(
self,
filename: str,
table: str = "ebrulor_log",
app: str = "EBUILD",
use_lock: bool = True,
) -> None:
self.filename = filename
self.table = table
self.app = app
self.use_lock = use_lock
# Dosya yoksa basit bir header ile oluştur
if not os.path.isfile(self.filename):
with open(self.filename, "w", encoding="utf-8") as f:
f.write(f"-- DBText log file for table {self.table}\n")
f.write(f"-- created at {_dt.datetime.now().isoformat()}\n\n")
self._locker = KilitliDosya(self.filename) if use_lock else None
# SQL satırlarını parse etmek için basit regex
self._re_values = re.compile(
r"VALUES \('(?P<ts>[^']*)',\s*'(?P<app>[^']*)',\s*'(?P<source>[^']*)',\s*"
r"'(?P<etype>[^']*)',\s*(?P<value>NULL|[-0-9.]+),\s*(?P<unit>NULL|'[^']*'),\s*"
r"'(?P<extra>[^']*)'\);"
)
# -------------------------------------------------
# Yardımcılar
# -------------------------------------------------
@staticmethod
def _escape(value: str) -> str:
"""SQL için tek tırnak kaçışı yapar."""
return value.replace("'", "''")
def _write_line(self, line: str) -> None:
""" Tek bir satırı log dosyasına yazar.
use_lock=True ise KilitliDosya üzerinden, değilse doğrudan append.
"""
if self.use_lock and self._locker is not None:
self._locker.yaz(line + "\n")
else:
with open(self.filename, "a", encoding="utf-8") as f:
f.write(line + "\n")
# -------------------------------------------------
# Genel amaçlı event yazma
# -------------------------------------------------
def insert_event(
self,
source: str,
event_type: str,
value: Optional[float] = None,
unit: Optional[str] = None,
timestamp: Optional[_dt.datetime] = None,
extra: str = "",
) -> None:
""" Genel amaçlı bir olay kaydı ekler.
Örnek kullanım:
logger.insert_event(
source="Sensor:28-00000660e983",
event_type="temperature",
value=23.5,
unit="°C",
timestamp=datetime.now(),
extra="Daire 2 Kat 1 Yön 5",
)
"""
ts = timestamp or _dt.datetime.now()
ts_str = ts.strftime("%Y-%m-%d %H:%M:%S")
# Değerler
v_str = "NULL" if value is None else f"{float(value):.3f}"
u_str = "NULL" if unit is None else f"'{self._escape(str(unit))}'"
extra_str = self._escape(extra or "")
src_str = self._escape(source)
etype_str = self._escape(event_type)
app_str = self._escape(self.app)
line = (
f"INSERT INTO {self.table} (ts, app, source, event_type, value, unit, extra) "
f"VALUES ('{ts_str}', '{app_str}', '{src_str}', '{etype_str}', {v_str}, {u_str}, '{extra_str}');"
)
self._write_line(line)
# -------------------------------------------------
# Sensör / röle özel kısayol metotları
# -------------------------------------------------
def log_state_change(
self,
device_kind: str,
name: str,
is_on: bool,
timestamp: Optional[_dt.datetime] = None,
extra: str = "",
) -> None:
""" Röle / dijital çıkış / giriş gibi ON/OFF durumlarını loglar.
device_kind : "relay", "sensor", "pump" vb.
name : cihaz ismi ("circulation_a", "burner_contactor" vb.)
is_on : True → 1 (on), False → 0 (off)
Event:
source = f"{device_kind}:{name}"
event_type = "state"
value = 1.0 / 0.0
unit = "bool"
"""
source = f"{device_kind}:{name}"
val = 1.0 if is_on else 0.0
ex = extra or ("on" if is_on else "off")
self.insert_event(
source=source,
event_type="state",
value=val,
unit="bool",
timestamp=timestamp,
extra=ex,
)
def log_sensor_value(
self,
name: str,
value: float,
unit: str = "",
timestamp: Optional[_dt.datetime] = None,
extra: str = "",
) -> None:
""" Analog / sayısal sensör değerlerini loglar.
Örnek:
logger.log_sensor_value("outside_temp", 12.3, "°C")
"""
source = f"sensor:{name}"
self.insert_event(
source=source,
event_type="measurement",
value=value,
unit=unit,
timestamp=timestamp,
extra=extra,
)
# -------------------------------------------------
# Okuma API'si
# -------------------------------------------------
def _parse_line(self, line: str) -> Optional[Dict[str, Any]]:
""" Tek bir INSERT satırını dict'e çevirir.
Beklenen format:
INSERT INTO <table> (...) VALUES ('ts', 'app', 'source', 'etype', value, unit, 'extra');
"""
line = line.strip()
if not line or not line.upper().startswith("INSERT INTO"):
return None
m = self._re_values.search(line)
if not m:
return None
gd = m.groupdict()
ts_str = gd.get("ts", "")
try:
ts = _dt.datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S")
except Exception:
ts = None
# value
raw_v = gd.get("value", "NULL")
if raw_v == "NULL":
value = None
else:
try:
value = float(raw_v)
except Exception:
value = None
# unit
raw_u = gd.get("unit", "NULL")
if raw_u == "NULL":
unit = None
else:
# 'C' şeklindeki stringten tek tırnakları atıyoruz
unit = raw_u.strip("'")
return {
"ts": ts,
"app": gd.get("app", ""),
"source": gd.get("source", ""),
"event_type": gd.get("etype", ""),
"value": value,
"unit": unit,
"extra": gd.get("extra", ""),
}
def iter_events(
self,
source: Optional[str] = None,
event_type: Optional[str] = None,
since: Optional[_dt.datetime] = None,
until: Optional[_dt.datetime] = None,
):
""" Log dosyasındaki olayları satır satır okur ve filtre uygular.
Parametreler:
source : None veya tam eşleşen source string
event_type : None veya tam eşleşen event_type
since : None veya bu tarihten SONRAKİ kayıtlar
until : None veya bu tarihten ÖNCEKİ kayıtlar
Yield:
dict: {ts, app, source, event_type, value, unit, extra}
"""
if not os.path.isfile(self.filename):
return
with open(self.filename, "r", encoding="utf-8") as f:
for line in f:
rec = self._parse_line(line)
if not rec:
continue
ts = rec["ts"]
if since and ts and ts < since:
continue
if until and ts and ts > until:
continue
if source and rec["source"] != source:
continue
if event_type and rec["event_type"] != event_type:
continue
yield rec
def get_state_history(
self,
device_kind: str,
name: str,
limit: int = 100,
since: Optional[_dt.datetime] = None,
until: Optional[_dt.datetime] = None,
) -> List[Dict[str, Any]]:
""" Belirli bir cihazın (sensör / röle) son durum değişikliklerini döndürür.
device_kind : "relay", "sensor", "pump" vb.
name : cihaz adı
limit : maksimum kaç kayıt döneceği (en yeni kayıtlar)
"""
src = f"{device_kind}:{name}"
events = list(self.iter_events(
source=src,
event_type="state",
since=since,
until=until,
))
# En yeni kayıtlar sondadır; tersten limit al
events.sort(key=lambda r: (r["ts"] or _dt.datetime.min), reverse=True)
return events[:limit]
if __name__ == "__main__":
# Basit self-test
logger = DBText(filename="test_dbtext_log.sql", table="ebrulor_log", app="ESYSTEM")
now = _dt.datetime.now()
logger.log_state_change("relay", "circulation_a", True, timestamp=now, extra="manual test on")
logger.log_state_change("relay", "circulation_a", False, timestamp=now + _dt.timedelta(seconds=10), extra="manual test off")
print("Son durum değişiklikleri:")
history = logger.get_state_history("relay", "circulation_a", limit=10)
for h in history:
print(h)