# -*- coding: utf-8 -*- __title__ = "dbtext" __author__ = 'Mehmet Karatay & "Saraswati" (ChatGPT)' __purpose__ = "Sensör ve röle olaylarını metin tabanlı SQL log olarak saklayan yardımcı sınıf" __version__ = "0.2.0" __date__ = "2025-11-20" """ ebuild/io/dbtext.py Revision : 2025-11-20 Authors : Mehmet Karatay & "Saraswati" (ChatGPT) Amaç ----- Her sensör ve rölenin: - Ne zaman açıldığı / kapandığı - Hangi değeri ürettiği - Hangi kaynaktan geldiği bilgisini tarih-saat bazlı olarak düz bir metin dosyasında tutmak. Kayıt formatı (satır başına bir olay): INSERT INTO (ts, app, source, event_type, value, unit, extra) VALUES ('YYYY-MM-DD HH:MM:SS', 'APP', 'SOURCE', 'EVENT', VALUE, 'UNIT', 'EXTRA'); Örnek: INSERT INTO ebrulor_log (ts, app, source, event_type, value, unit, extra) VALUES ('2025-11-20 12:34:56', 'ESYSTEM', 'relay:circulation_a', 'state', 1, 'bool', 'on'); Böylece: - Dosya istenirse direkt PostgreSQL'e pipe edilip çalıştırılabilir - Aynı zamanda bu modül basit bir parser ile geri okunabilir """ #from __future__ import annotations import os import datetime as _dt import re from typing import List, Optional, Dict, Any from .config_ini import KilitliDosya class DBText: """ Metin tabanlı SQL log dosyası için yardımcı sınıf. Parametreler ----------- filename : str Log dosyasının yolu (örnek: "ebina_log.sql"). table : str SQL INSERT komutlarında kullanılacak tablo adı. app : str Uygulama adı (örn. "ESYSTEM"). use_lock : bool True ise yazarken KilitliDosya kullanılır (çoklu süreç için daha güvenli). """ def __init__( self, filename: str, table: str = "ebrulor_log", app: str = "EBUILD", use_lock: bool = True, ) -> None: self.filename = filename self.table = table self.app = app self.use_lock = use_lock # Dosya yoksa basit bir header ile oluştur if not os.path.isfile(self.filename): with open(self.filename, "w", encoding="utf-8") as f: f.write(f"-- DBText log file for table {self.table}\n") f.write(f"-- created at {_dt.datetime.now().isoformat()}\n\n") self._locker = KilitliDosya(self.filename) if use_lock else None # SQL satırlarını parse etmek için basit regex self._re_values = re.compile( r"VALUES \('(?P[^']*)',\s*'(?P[^']*)',\s*'(?P[^']*)',\s*" r"'(?P[^']*)',\s*(?PNULL|[-0-9.]+),\s*(?PNULL|'[^']*'),\s*" r"'(?P[^']*)'\);" ) # ------------------------------------------------- # Yardımcılar # ------------------------------------------------- @staticmethod def _escape(value: str) -> str: """SQL için tek tırnak kaçışı yapar.""" return value.replace("'", "''") def _write_line(self, line: str) -> None: """ Tek bir satırı log dosyasına yazar. use_lock=True ise KilitliDosya üzerinden, değilse doğrudan append. """ if self.use_lock and self._locker is not None: self._locker.yaz(line + "\n") else: with open(self.filename, "a", encoding="utf-8") as f: f.write(line + "\n") # ------------------------------------------------- # Genel amaçlı event yazma # ------------------------------------------------- def insert_event( self, source: str, event_type: str, value: Optional[float] = None, unit: Optional[str] = None, timestamp: Optional[_dt.datetime] = None, extra: str = "", ) -> None: """ Genel amaçlı bir olay kaydı ekler. Örnek kullanım: logger.insert_event( source="Sensor:28-00000660e983", event_type="temperature", value=23.5, unit="°C", timestamp=datetime.now(), extra="Daire 2 Kat 1 Yön 5", ) """ ts = timestamp or _dt.datetime.now() ts_str = ts.strftime("%Y-%m-%d %H:%M:%S") # Değerler v_str = "NULL" if value is None else f"{float(value):.3f}" u_str = "NULL" if unit is None else f"'{self._escape(str(unit))}'" extra_str = self._escape(extra or "") src_str = self._escape(source) etype_str = self._escape(event_type) app_str = self._escape(self.app) line = ( f"INSERT INTO {self.table} (ts, app, source, event_type, value, unit, extra) " f"VALUES ('{ts_str}', '{app_str}', '{src_str}', '{etype_str}', {v_str}, {u_str}, '{extra_str}');" ) self._write_line(line) # ------------------------------------------------- # Sensör / röle özel kısayol metotları # ------------------------------------------------- def log_state_change( self, device_kind: str, name: str, is_on: bool, timestamp: Optional[_dt.datetime] = None, extra: str = "", ) -> None: """ Röle / dijital çıkış / giriş gibi ON/OFF durumlarını loglar. device_kind : "relay", "sensor", "pump" vb. name : cihaz ismi ("circulation_a", "burner_contactor" vb.) is_on : True → 1 (on), False → 0 (off) Event: source = f"{device_kind}:{name}" event_type = "state" value = 1.0 / 0.0 unit = "bool" """ source = f"{device_kind}:{name}" val = 1.0 if is_on else 0.0 ex = extra or ("on" if is_on else "off") self.insert_event( source=source, event_type="state", value=val, unit="bool", timestamp=timestamp, extra=ex, ) def log_sensor_value( self, name: str, value: float, unit: str = "", timestamp: Optional[_dt.datetime] = None, extra: str = "", ) -> None: """ Analog / sayısal sensör değerlerini loglar. Örnek: logger.log_sensor_value("outside_temp", 12.3, "°C") """ source = f"sensor:{name}" self.insert_event( source=source, event_type="measurement", value=value, unit=unit, timestamp=timestamp, extra=extra, ) # ------------------------------------------------- # Okuma API'si # ------------------------------------------------- def _parse_line(self, line: str) -> Optional[Dict[str, Any]]: """ Tek bir INSERT satırını dict'e çevirir. Beklenen format: INSERT INTO
(...) VALUES ('ts', 'app', 'source', 'etype', value, unit, 'extra'); """ line = line.strip() if not line or not line.upper().startswith("INSERT INTO"): return None m = self._re_values.search(line) if not m: return None gd = m.groupdict() ts_str = gd.get("ts", "") try: ts = _dt.datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S") except Exception: ts = None # value raw_v = gd.get("value", "NULL") if raw_v == "NULL": value = None else: try: value = float(raw_v) except Exception: value = None # unit raw_u = gd.get("unit", "NULL") if raw_u == "NULL": unit = None else: # 'C' şeklindeki stringten tek tırnakları atıyoruz unit = raw_u.strip("'") return { "ts": ts, "app": gd.get("app", ""), "source": gd.get("source", ""), "event_type": gd.get("etype", ""), "value": value, "unit": unit, "extra": gd.get("extra", ""), } def iter_events( self, source: Optional[str] = None, event_type: Optional[str] = None, since: Optional[_dt.datetime] = None, until: Optional[_dt.datetime] = None, ): """ Log dosyasındaki olayları satır satır okur ve filtre uygular. Parametreler: source : None veya tam eşleşen source string event_type : None veya tam eşleşen event_type since : None veya bu tarihten SONRAKİ kayıtlar until : None veya bu tarihten ÖNCEKİ kayıtlar Yield: dict: {ts, app, source, event_type, value, unit, extra} """ if not os.path.isfile(self.filename): return with open(self.filename, "r", encoding="utf-8") as f: for line in f: rec = self._parse_line(line) if not rec: continue ts = rec["ts"] if since and ts and ts < since: continue if until and ts and ts > until: continue if source and rec["source"] != source: continue if event_type and rec["event_type"] != event_type: continue yield rec def get_state_history( self, device_kind: str, name: str, limit: int = 100, since: Optional[_dt.datetime] = None, until: Optional[_dt.datetime] = None, ) -> List[Dict[str, Any]]: """ Belirli bir cihazın (sensör / röle) son durum değişikliklerini döndürür. device_kind : "relay", "sensor", "pump" vb. name : cihaz adı limit : maksimum kaç kayıt döneceği (en yeni kayıtlar) """ src = f"{device_kind}:{name}" events = list(self.iter_events( source=src, event_type="state", since=since, until=until, )) # En yeni kayıtlar sondadır; tersten limit al events.sort(key=lambda r: (r["ts"] or _dt.datetime.min), reverse=True) return events[:limit] if __name__ == "__main__": # Basit self-test logger = DBText(filename="test_dbtext_log.sql", table="ebrulor_log", app="ESYSTEM") now = _dt.datetime.now() logger.log_state_change("relay", "circulation_a", True, timestamp=now, extra="manual test on") logger.log_state_change("relay", "circulation_a", False, timestamp=now + _dt.timedelta(seconds=10), extra="manual test off") print("Son durum değişiklikleri:") history = logger.get_state_history("relay", "circulation_a", limit=10) for h in history: print(h)