# -*- coding: utf-8 -*- from __future__ import annotations __title__ = "relay_driver" __author__ = 'Mehmet Karatay & "Saraswati" (ChatGPT)' __purpose__ = "GPIO röle sürücüsü + brülör grup soyutlaması" __version__ = "0.4.0" __date__ = "2025-11-22" """ ebuild/io/relay_driver.py Revision : 2025-11-22 Authors : Mehmet Karatay & "Saraswati" (ChatGPT) Amaç ----- - Raspberry Pi GPIO üzerinden röle sürmek için basit bir soyutlama. - Soyut kanal isimleri (ör: "igniter", "circulation_a") → BCM pin eşlemesi config_statics.RELAY_GPIO üzerinden gelir. - Brülör grupları için BURNER_GROUPS kullanılır: BURNER_GROUPS = { 0: { "name": "MainBurner", "location": "Sol binada", "igniter_pin": 16, "circulation": { "circ_1": {"channel": "circulation_a", "pin": 26, "default": 1}, "circ_2": {"channel": "circulation_b", "pin": 24, "default": 0}, }, }, ... } Bu modül: - Tek tek kanal ON/OFF (set_channel) - Tüm kanalları kapatma (all_off) - Brülör → igniter kanalını ve pompalarını soyutlayan yardımcılar - Kanal bazlı basit istatistik (RelayStats) sağlar. """ import time from dataclasses import dataclass from typing import Dict, Iterable, List, Optional try: from .. import config_statics as cfg except ImportError: # test / standalone cfg = None # type: ignore # ---------------------------------------------------------------------- # GPIO soyutlama (RPi.GPIO yoksa dummy) # ---------------------------------------------------------------------- try: import RPi.GPIO as GPIO # type: ignore _HAS_GPIO = True except Exception: # Raspi dışı ortam GPIO = None # type: ignore _HAS_GPIO = False # ---------------------------------------------------------------------- # İstatistik yapısı # ---------------------------------------------------------------------- @dataclass class RelayStats: """ Tek bir röle kanalı için istatistikler. - on_count : kaç defa ON'a çekildi - last_on_ts : en son ON'a çekildiği zaman (epoch saniye) - last_off_ts : en son OFF olduğu zaman (epoch saniye) - last_duration_s : en son ON periyodunun süresi (saniye) - total_on_s : bugüne kadar toplam ON kalma süresi (saniye) """ on_count: int = 0 last_on_ts: Optional[float] = None last_off_ts: Optional[float] = None last_duration_s: float = 0.0 total_on_s: float = 0.0 def on(self, now: float) -> None: """ Kanal ON'a çekildiğinde çağrılır. Aynı ON periyodu içinde tekrar çağrılırsa sayaç artmaz. """ if self.last_on_ts is None: self.last_on_ts = now self.on_count += 1 def off(self, now: float) -> None: """ Kanal OFF'a çekildiğinde çağrılır. Son ON zamanına göre süre hesaplanır, last_duration_s ve total_on_s güncellenir. """ if self.last_on_ts is not None: dur = max(0.0, now - self.last_on_ts) self.last_duration_s = dur self.total_on_s += dur self.last_on_ts = None self.last_off_ts = now def current_duration(self, now: Optional[float] = None) -> float: """ Kanal şu anda ON ise, bu ON periyodunun şu ana kadarki süresini döndürür. OFF ise 0.0 döner. """ if self.last_on_ts is None: return 0.0 if now is None: now = time.time() return max(0.0, now - self.last_on_ts) # ---------------------------------------------------------------------- # Ana sürücü # ---------------------------------------------------------------------- class RelayDriver: """ Basit bir röle sürücüsü. - Soyut kanal isimleri: RELAY_GPIO dict'indeki anahtarlar - Brülör grup API'si: * burners() → mevcut brülör id listesi * burner_info(bid) → config_statics.BURNER_GROUPS[bid] * igniter_channel(bid) → ateşleme kanal adı * set_igniter(bid, state) * set_pump(bid, pump_name, state) * enabled_pumps(bid) → default=1 olan isimler (konfig default) * all_pumps(bid) → tüm pompa isimleri * active_pumps(bid) → şu anda ON olan pompa isimleri """ def __init__(self, onoff: bool = False) -> None: print("RelayDriver yükleniyor…") # Konfigten kanal → GPIO pin map self._pin_map: Dict[str, int] = dict(getattr(cfg, "RELAY_GPIO", {})) if cfg else {} # Her kanal için istatistik objesi self._stats: Dict[str, RelayStats] = { ch: RelayStats() for ch in self._pin_map.keys() } # Brülör grupları self._burner_groups: Dict[int, dict] = dict(getattr(cfg, "BURNER_GROUPS", {})) if cfg else {} # GPIO kurulumu if _HAS_GPIO and self._pin_map: GPIO.setmode(GPIO.BCM) for ch, pin in self._pin_map.items(): GPIO.setup(pin, GPIO.OUT) # Güvenli başlangıç: tüm kanallar kapalı GPIO.output(pin, GPIO.LOW) elif not self._pin_map: print("⚠️ RELAY_GPIO konfigürasyonu boş; donanım pin eşlemesi yok.") # igniter_pin → kanal adı map'ini BURNER_GROUPS içine enjekte et if self._burner_groups and self._pin_map: pin_to_channel = {pin: ch for ch, pin in self._pin_map.items()} for bid, info in self._burner_groups.items(): if not isinstance(info, dict): continue ign_pin = info.get("igniter_pin") if ign_pin is not None: ch = pin_to_channel.get(ign_pin) if ch: info.setdefault("igniter", ch) # İstenirse tüm röleleri açılışta kapat if onoff is False: self.all_off() # ----------------------------------------------------- # Düşük seviye kanal kontrolü # ----------------------------------------------------- def set_channel(self, channel: str, state: bool) -> None: """ Verilen kanal adını ON/OFF yapar. """ if channel not in self._pin_map: # Tanımsız kanal – sessiz geç return pin = self._pin_map[channel] now = time.time() # İstatistik güncelle st = self._stats.get(channel) if st is None: st = RelayStats() self._stats[channel] = st if state: st.on(now) else: st.off(now) # Donanım if _HAS_GPIO: # Aktif-high röle kartı varsayıyoruz; gerekiyorsa buraya # ACTIVE_LOW/ACTIVE_HIGH gibi bir bayrak eklenebilir. GPIO.output(pin, GPIO.HIGH if state else GPIO.LOW) def get_stats(self, channel: str) -> RelayStats: """ Kanal için istatistik objesini döndürür (yoksa yaratır). """ st = self._stats.get(channel) if st is None: st = RelayStats() self._stats[channel] = st return st def get_channel_state(self, channel: str) -> bool: """ Kanal şu anda ON mu? (last_on_ts None değilse ON kabul edilir) """ st = self._stats.get(channel) if st is None: return False return st.last_on_ts is not None # ----------------------------------------------------- # Tüm kanalları güvenli moda çek # ----------------------------------------------------- def all_off(self) -> None: """ Tüm kanalları OFF yapar. """ now = time.time() for ch in list(self._pin_map.keys()): st = self._stats.get(ch) if st is not None and st.last_on_ts is not None: st.off(now) if _HAS_GPIO: GPIO.output(self._pin_map[ch], GPIO.LOW) # ----------------------------------------------------- # Brülör grup API'si # ----------------------------------------------------- def burners(self) -> List[int]: """ Mevcut brülör id listesini döndürür. """ return sorted(self._burner_groups.keys()) def burner_info(self, burner_id: int) -> Optional[dict]: """ Verilen brülör id için BURNER_GROUPS kaydını döndürür. """ return self._burner_groups.get(burner_id) def igniter_channel(self, burner_id: int) -> Optional[str]: """ Brülörün igniter kanal adını döndürür. - Eğer BURNER_GROUPS kaydında 'igniter' alanı varsa doğrudan onu kullanır. - Yoksa 'igniter_pin' alanından pin numarasını alır ve RELAY_GPIO'daki pin → kanal eşlemesini kullanarak kanalı bulur. """ info = self.burner_info(burner_id) if not info: return None # BURNER_GROUPS konfiginde igniter_pin veriliyor; bunu kanala çevir. ch = info.get("igniter") if ch: return ch pin = info.get("igniter_pin") if pin is None: return None # pin → channel eşlemesini RELAY_GPIO'dan bul for cname, cpin in self._pin_map.items(): if cpin == pin: return cname return None def all_pumps(self, burner_id: int) -> Iterable[str]: """ Konfigte tanımlı tüm pompa kanal adlarını döndürür (circulation altı). """ info = self.burner_info(burner_id) if not info: return [] circ = info.get("circulation", {}) or {} # circ_x → {channel: "circulation_a", pin: ..} for logical_name, entry in circ.items(): ch = entry.get("channel") if ch: yield ch def enabled_pumps(self, burner_id: int) -> Iterable[str]: """ Varsayılan olarak açık olması gereken pompa kanal adlarını döndürür. (circulation altındaki default=1 kayıtları) """ info = self.burner_info(burner_id) if not info: return [] circ = info.get("circulation", {}) or {} for logical_name, entry in circ.items(): ch = entry.get("channel") default = int(entry.get("default", 0)) if ch and default == 1: yield ch def active_pumps(self, burner_id: int) -> Iterable[str]: """ Şu anda ON olan pompa kanal adlarını döndürür. """ for ch in self.all_pumps(burner_id): if self.get_channel_state(ch): yield ch def set_igniter(self, burner_id: int, state: bool) -> None: """ İlgili brülörün igniter kanalını ON/OFF yapar. """ ch = self.igniter_channel(burner_id) if ch: self.set_channel(ch, state) def set_pump(self, burner_id: int, pump_name: str, state: bool) -> None: """ Belirtilen brülörün belirtilen pompasını ON/OFF yapar. pump_name: BURNER_GROUPS[..]["circulation"][pump_name] """ info = self.burner_info(burner_id) if not info: return circ = info.get("circulation", {}) if pump_name in circ: ch = circ[pump_name]["channel"] self.set_channel(ch, state) # ----------------------------------------------------- # Yardımcı: özet # ----------------------------------------------------- def summary(self) -> str: """ Kanallar ve brülör gruplarının kısa bir özetini döndürür (debug amaçlı). """ lines: List[str] = [] chans = ", ".join(sorted(self._pin_map.keys())) lines.append(f"Kanallar: {chans}") for bid in self.burners(): info = self.burner_info(bid) or {} name = info.get("name", "?") loc = info.get("location", "?") ign = self.igniter_channel(bid) pumps = list(self.all_pumps(bid)) defaults = list(self.enabled_pumps(bid)) lines.append( f" #{bid}: {name} @ {loc} | igniter={ign} | " f"pumps={pumps} | default_on={defaults}" ) return "\n".join(lines) # ----------------------------------------------------- # Temizlik # ----------------------------------------------------- def cleanup(self) -> None: """ GPIO pinlerini serbest bırakır. """ if _HAS_GPIO: GPIO.cleanup() if __name__ == "__main__": drv = RelayDriver() print("\n🧰 RelayDriver Summary") print(drv.summary())