# -*- coding: utf-8 -*- from __future__ import annotations __title__ = "analog_sensors" __author__ = 'Mehmet Karatay & "Saraswati" (ChatGPT)' __purpose__ = "MCP3008 tabanlı basınç, gaz, yağmur ve LDR sensörleri için sınıf bazlı arayüz" __version__ = "0.1.0" __date__ = "2025-11-21" """ ebuild/core/analog_sensors.py Revision : 2025-11-21 Authors : Mehmet Karatay & "Saraswati" (ChatGPT) Amaç ----- - MCP3008 ADC üzerinden bağlı analog sensörler için tekil sınıf yapıları sağlamak: * PressureAnalogSensor : su hattı basınç sensörü * GasAnalogSensor : MQ-4 veya benzeri gaz sensörü * RainAnalogSensor : yağmur sensörü (wet/dry) * LDRAnalogSensor : ışık seviyesi sensörü - Her bir sensör: * MCP3008ADC üzerinden ilgili kanalı okur (config_statics.ADC_CHANNELS). * Ham raw (0..1023) ve volt cinsinden değer döndürebilir. * Eşik ve basit state (SAFE/WARN/ALARM vb.) hesabını kendi içinde tutar. - Güvenlik ve uyarı mantığı üst kattaki HeatEngine/Burner/Buzzer/Legacy ile kolay entegre edilebilir. Notlar ------ - Bu modül "karar mantığı" ve analog okuma katmanını birleştirir. - Röle kapama, sistem shutdown, buzzer vb. aksiyonlar yine üst katmanda yapılmalıdır. """ from dataclasses import dataclass, field from typing import Optional, Dict try: from ..io.adc_mcp3008 import MCP3008ADC from .. import config_statics as cfg except ImportError: MCP3008ADC = None # type: ignore cfg = None # type: ignore # ------------------------------------------------------------- # Ortak durum enum'u # ------------------------------------------------------------- class SafetyState: SAFE = "SAFE" WARN = "WARN" ALARM = "ALARM" # ------------------------------------------------------------- # Ortak base sınıf # ------------------------------------------------------------- @dataclass class BaseAnalogSensor: """ MCP3008 üzerinden tek bir analog kanalı temsil eden temel sınıf. Özellikler: ----------- - adc : MCP3008ADC örneği - channel : int kanal no (0..7) - name : mantıksal isim (örn: "gas", "pressure") - last_raw : son okunan ham değer (0..1023) - last_volt : son okunan volt cinsinden değer """ adc: MCP3008ADC name: str channel: Optional[int] = None last_raw: Optional[int] = None last_volt: Optional[float] = None def __post_init__(self) -> None: # Eğer kanal configten alınacaksa burada çöz if self.channel is None and cfg is not None: ch_map = getattr(cfg, "ADC_CHANNELS", {}) if self.name in ch_map: self.channel = int(ch_map[self.name]) if self.channel is None: raise ValueError(f"{self.__class__.__name__}: '{self.name}' için kanal bulunamadı.") # ------------------------------------------------------------------ def read_raw(self) -> Optional[int]: """ ADC'den ham değeri okur (0..1023). """ if self.channel is None: return None raw = self.adc.read_raw(self.channel) self.last_raw = raw return raw def read_voltage(self) -> Optional[float]: """ ADC'den raw okur ve volt cinsine çevirir. """ if self.channel is None: return None volt = self.adc.read_voltage(self.channel) self.last_volt = volt return volt def update(self) -> Optional[int]: """ Varsayılan olarak sadece raw okur; alt sınıflar state hesaplarını kendi override'larında yapar. """ return self.read_raw() def summary(self) -> str: return f"{self.__class__.__name__}(name={self.name}, ch={self.channel}, raw={self.last_raw}, V={self.last_volt})" # ------------------------------------------------------------- # Gaz sensörü (MQ-4) – kill switch mantığı ile # ------------------------------------------------------------- @dataclass class GasAnalogSensor(BaseAnalogSensor): """ Gaz sensörü (MQ-4) için analog ve güvenlik mantığı. State mantığı: - raw >= alarm_threshold → ALARM, latched - raw >= warn_threshold → WARN (latched varsa ALARM) - trend (slope) ile hızlı artış + warn üstü → ALARM - Diğer durumlarda SAFE (latched yoksa). latched_alarm True olduğu sürece: - state ALARM olarak kalır - should_shutdown_system() True döner """ warn_threshold: int = field(default=150) alarm_threshold: int = field(default=250) slope_min_delta: int = field(default=30) slope_window: int = field(default=5) history_len: int = field(default=20) state: str = SafetyState.SAFE latched_alarm: bool = False _history: list[int] = field(default_factory=list) def __post_init__(self) -> None: super().__post_init__() # Konfig override if cfg is not None: self.warn_threshold = int(getattr(cfg, "GAS_WARN_THRESHOLD_RAW", self.warn_threshold)) self.alarm_threshold = int(getattr(cfg, "GAS_ALARM_THRESHOLD_RAW", self.alarm_threshold)) self.slope_min_delta = int(getattr(cfg, "GAS_SLOPE_MIN_DELTA", self.slope_min_delta)) self.slope_window = int(getattr(cfg, "GAS_SLOPE_WINDOW", self.slope_window)) # ------------------------------------------------------------------ def reset_latch(self) -> None: """ Gaz alarm latch'ini manuel olarak sıfırlar. """ self.latched_alarm = False # state sonraki update ile yeniden değerlendirilecek. def update(self) -> Optional[int]: """ Ham değeri okur ve state hesaplar. """ raw = self.read_raw() if raw is None: return None # history güncelle self._history.append(raw) if len(self._history) > self.history_len: self._history = self._history[-self.history_len:] self._evaluate_state(raw) return raw def _evaluate_state(self, raw: int) -> None: # Trend kontrolü slope_alarm = False if len(self._history) >= self.slope_window: first = self._history[-self.slope_window] delta = raw - first if delta >= self.slope_min_delta: slope_alarm = True # Eşik mantığı if raw >= self.alarm_threshold or (slope_alarm and raw >= self.warn_threshold): self.state = SafetyState.ALARM self.latched_alarm = True elif raw >= self.warn_threshold: if self.latched_alarm: self.state = SafetyState.ALARM else: self.state = SafetyState.WARN else: if self.latched_alarm: self.state = SafetyState.ALARM else: self.state = SafetyState.SAFE def should_shutdown_system(self) -> bool: """ Gaz açısından sistemin tamamen kapatılması gerekip gerekmediğini söyler. """ return self.latched_alarm def summary(self) -> str: return ( f"GasAnalogSensor(ch={self.channel}, raw={self.last_raw}, " f"state={self.state}, latched={self.latched_alarm})" ) # ------------------------------------------------------------- # Basınç sensörü – limit kontrolü # ------------------------------------------------------------- @dataclass class PressureAnalogSensor(BaseAnalogSensor): """ Su hattı basınç sensörü. State mantığı: - raw < (min_raw - warn_hyst) veya raw > (max_raw + warn_hyst) → WARN - min_raw <= raw <= max_raw → SAFE - Aradaki buffer bölgede state korunur. """ min_raw: int = field(default=200) max_raw: int = field(default=900) warn_hyst: int = field(default=20) state: str = SafetyState.SAFE def __post_init__(self) -> None: super().__post_init__() if cfg is not None: self.min_raw = int(getattr(cfg, "PRESSURE_MIN_RAW", self.min_raw)) self.max_raw = int(getattr(cfg, "PRESSURE_MAX_RAW", self.max_raw)) self.warn_hyst = int(getattr(cfg, "PRESSURE_WARN_HYST", self.warn_hyst)) def update(self) -> Optional[int]: raw = self.read_raw() if raw is None: return None self._evaluate_state(raw) return raw def _evaluate_state(self, raw: int) -> None: if raw < (self.min_raw - self.warn_hyst) or raw > (self.max_raw + self.warn_hyst): self.state = SafetyState.WARN elif self.min_raw <= raw <= self.max_raw: self.state = SafetyState.SAFE # Buffer bölgede state korunur. def is_pressure_ok(self) -> bool: return self.state == SafetyState.SAFE def summary(self) -> str: return ( f"PressureAnalogSensor(ch={self.channel}, raw={self.last_raw}, " f"state={self.state}, min={self.min_raw}, max={self.max_raw})" ) # ------------------------------------------------------------- # Yağmur sensörü – basit dry/wet mantığı # ------------------------------------------------------------- @dataclass class RainAnalogSensor(BaseAnalogSensor): """ Yağmur sensörü (analog). Basit model: - raw <= dry_threshold → DRY - raw >= wet_threshold → WET - arası → MID Gerektiğinde bu sınıf geliştirilebilir (ör. şiddetli yağmur vs.). """ dry_threshold: int = field(default=100) wet_threshold: int = field(default=400) state: str = "UNKNOWN" # "DRY", "MID", "WET" def __post_init__(self) -> None: super().__post_init__() # İleride configten override eklenebilir: if cfg is not None: self.dry_threshold = int(getattr(cfg, "RAIN_DRY_THRESHOLD_RAW", self.dry_threshold)) self.wet_threshold = int(getattr(cfg, "RAIN_WET_THRESHOLD_RAW", self.wet_threshold)) def update(self) -> Optional[int]: raw = self.read_raw() if raw is None: return None self._evaluate_state(raw) return raw def _evaluate_state(self, raw: int) -> None: if raw <= self.dry_threshold: self.state = "DRY" elif raw >= self.wet_threshold: self.state = "WET" else: self.state = "MID" def is_raining(self) -> bool: return self.state == "WET" def summary(self) -> str: return ( f"RainAnalogSensor(ch={self.channel}, raw={self.last_raw}, " f"state={self.state}, dry_th={self.dry_threshold}, wet_th={self.wet_threshold})" ) # ------------------------------------------------------------- # LDR sensörü – ışık seviyesi # ------------------------------------------------------------- @dataclass class LDRAnalogSensor(BaseAnalogSensor): """ LDR (ışık) sensörü. Basit model: - raw <= dark_threshold → DARK - raw >= bright_threshold → BRIGHT - arası → MID Bu bilgi: - dış ortam karanlık/aydınlık - gece/gündüz teyidi - aydınlatma / gösterge kararları için kullanılabilir. """ dark_threshold: int = field(default=200) bright_threshold: int = field(default=800) state: str = "UNKNOWN" # "DARK", "MID", "BRIGHT" def __post_init__(self) -> None: super().__post_init__() if cfg is not None: self.dark_threshold = int(getattr(cfg, "LDR_DARK_THRESHOLD_RAW", self.dark_threshold)) self.bright_threshold = int(getattr(cfg, "LDR_BRIGHT_THRESHOLD_RAW", self.bright_threshold)) def update(self) -> Optional[int]: raw = self.read_raw() if raw is None: return None self._evaluate_state(raw) return raw def _evaluate_state(self, raw: int) -> None: if raw <= self.dark_threshold: self.state = "DARK" elif raw >= self.bright_threshold: self.state = "BRIGHT" else: self.state = "MID" def is_dark(self) -> bool: return self.state == "DARK" def is_bright(self) -> bool: return self.state == "BRIGHT" def summary(self) -> str: return ( f"LDRAnalogSensor(ch={self.channel}, raw={self.last_raw}, " f"state={self.state}, dark_th={self.dark_threshold}, bright_th={self.bright_threshold})" ) # ------------------------------------------------------------- # Tümünü toplayan minik hub (opsiyonel) # ------------------------------------------------------------- class AnalogSensorsHub: """ MCP3008 üstündeki tüm analog sensörleri yöneten yardımcı sınıf. - pressure : PressureAnalogSensor - gas : GasAnalogSensor - rain : RainAnalogSensor - ldr : LDRAnalogSensor """ def __init__(self, adc: MCP3008ADC) -> None: self.adc = adc self.pressure = PressureAnalogSensor(adc=self.adc, name="pressure") self.gas = GasAnalogSensor(adc=self.adc, name="gas") self.rain = RainAnalogSensor(adc=self.adc, name="rain") self.ldr = LDRAnalogSensor(adc=self.adc, name="ldr") def update_all(self) -> Dict[str, Optional[int]]: """ Tüm sensörleri günceller ve ham değerleri döndürür. """ return { "pressure": self.pressure.update(), "gas": self.gas.update(), "rain": self.rain.update(), "ldr": self.ldr.update(), } def should_shutdown_system(self) -> bool: """ Gaz sensörü açısından kill-switch gerekip gerekmediğini söyler. """ return self.gas.should_shutdown_system()