# -*- coding: utf-8 -*- from __future__ import annotations __title__ = "environment" __author__ = 'Mehmet Karatay & "Saraswati" (ChatGPT)' __purpose__ = "Bina dış ortam sıcaklığı (DS18B20) ve analog sensörler için ortam merkezi" __version__ = "0.3.0" __date__ = "2025-11-23" """ ebuild/core/environment.py Amaç ----- - Bina dış ortam sıcaklığı (DS18B20) ve MCP3008 üzerinden okunan analog sensörleri tek bir merkezde toplamak. - Üst seviye modüller (ör: BurnerController, sulama sistemi, alarm sistemi) bu sınıf üzerinden dış ortam ve analog bilgilerine erişir. Notlar ------ - Bu katman donanım erişimini kapsüller: * io/ds18b20.py * io/adc_mcp3008.py * core/analog_sensors.py """ from typing import Dict, Optional, Any import datetime # Donanım bağımlı modüller try: from ..io.ds18b20 import DS18B20Sensor except ImportError: print("from ..io.ds18b20 import DS18B20Sensor NOT IMPORTED") DS18B20Sensor = None # type: ignore try: from ..io.adc_mcp3008 import MCP3008ADC except ImportError: print("from ..io.adc_mcp3008 import MCP3008ADC NOT IMPORTED") MCP3008ADC = None # type: ignore try: from .analog_sensors import AnalogSensorsHub except ImportError: print("from .analog_sensors import AnalogSensorsHub NOT IMPORTED") AnalogSensorsHub = None # type: ignore # Config HER ZAMAN ayrı import edilmeli, donanıma bağlı değil try: from .. import config_statics as cfgs except ImportError as e: print("ENV: config_statics import edilemedi:", e) cfgs = None # type: ignore class BuildingEnvironment: """ Bina çevre sensörlerini yöneten merkez. Özellikler: ----------- - outside : DS18B20 dış ortam sensörü (OUTSIDE_SENSOR_ID üzerinden) - adc : MCP3008ADC örneği - analog : AnalogSensorsHub (basınç, gaz, yağmur, LDR) """ def __init__( self, name: str = "BuildingEnvironment", outside_sensor_id: Optional[str] = None, use_outside_ds18: bool = True, use_adc: bool = True, ) -> None: #print("BuildingEnvironment Init:", name, outside_sensor_id, use_outside_ds18, use_adc) self.name = name # ------------------------------------------------------------------ # Dış ortam sıcaklığı (DS18B20) # ------------------------------------------------------------------ # 1) Önce parametreden gelen ID; yoksa config_statics.OUTSIDE_SENSOR_ID if not outside_sensor_id and cfgs is not None: try: outside_sensor_id = getattr(cfgs, "OUTSIDE_SENSOR_ID", None) except Exception as e: print("BuildingEnvironment: outside sensör ID okunamadı:", e) outside_sensor_id = None #print( "BuildingEnvironment get outside_sensor_id:", outside_sensor_id, "use_outside_ds18", use_outside_ds18, "use_adc", use_adc ) # 2) self.outside_id ve self.outside MUTLAKA burada tanımlanmalı self.outside_id: str = outside_sensor_id or "" self.outside: Optional[DS18B20Sensor] = None # type: ignore #print("DS18B20Sensor", type(DS18B20Sensor), DS18B20Sensor) # 3) ID makulse DS18 sensörünü yarat if use_outside_ds18 and DS18B20Sensor is not None and len(self.outside_id) > 5: try: #print("BuildingEnvironment: DS18B20Sensor yaratılıyor, id =", self.outside_id) self.outside = DS18B20Sensor(serial=self.outside_id, name="OutsideTemp") #print("BuildingEnvironment: self.outside:", self.outside) except Exception as e: print("BuildingEnvironment: outside sensör oluşturulamadı:", e) self.outside = None else: print( "BuildingEnvironment: dış sensör yok veya devre dışı:", "use_outside_ds18 =", use_outside_ds18, "DS18B20Sensor is None =", DS18B20Sensor is None, "outside_id =", repr(self.outside_id), ) #print("BuildingEnvironment 5:", name, self.outside, outside_sensor_id, use_outside_ds18, use_adc) # Dış ısı cache'i (kontrol katmanları için) self._last_outside_temp_c: Optional[float] = None self._last_outside_read_ts: Optional[datetime.datetime] = None # ------------------------------------------------------------------ # MCP3008 ve analog sensör hub # ------------------------------------------------------------------ self.adc: Optional[MCP3008ADC] = None # type: ignore self.analog: Optional[AnalogSensorsHub] = None # type: ignore if use_adc and MCP3008ADC is not None and AnalogSensorsHub is not None: try: self.adc = MCP3008ADC() self.analog = AnalogSensorsHub(self.adc) except Exception: self.adc = None self.analog = None # ---------------------------------------------------------------------- # Okuma fonksiyonları # ---------------------------------------------------------------------- def read_outside_temp(self) -> Dict[str, Optional[float]]: """ DS18B20 dış ortam sensöründen sıcaklık okur. Dönüş: {"outside_temp_c": 23.4, "read_ts": datetime} # veya None """ temp: Optional[float] = None now = datetime.datetime.now() if self.outside is not None: try: temp = self.outside.read_temperature() except Exception: temp = None if temp is not None: self._last_outside_temp_c = temp self._last_outside_read_ts = now return { "outside_temp_c": temp, "read_ts": now, } def get_outside_temp_cached(self, max_age_sec: int = 60) -> Optional[float]: """ Dış ortam sıcaklığını cache üzerinden döndürür; gerekirse sensörden günceller. - Eğer hiç okunmamışsa → sensörden okur. - Eğer son okuma max_age_sec saniyeden eskiyse → yeniden okur. """ now = datetime.datetime.now() if ( self._last_outside_read_ts is None or (now - self._last_outside_read_ts).total_seconds() > max_age_sec ): snap = self.read_outside_temp() temp = snap.get("outside_temp_c") if temp is not None: self._last_outside_temp_c = temp self._last_outside_read_ts = now return self._last_outside_temp_c def read_analog_all(self) -> Dict[str, Any]: """ AnalogSensorsHub üzerinden tüm analog kanalları okur. Dönüş sözlüğü: { "pressure_raw": ..., "pressure_state": ..., "gas_raw": ..., "gas_state": ..., "gas_latched_alarm": ..., "rain_raw": ..., "rain_state": ..., "ldr_raw": ..., "ldr_state": ..., } """ data: Dict[str, Any] = { "pressure_raw": None, "pressure_state": None, "gas_raw": None, "gas_state": None, "gas_latched_alarm": None, "rain_raw": None, "rain_state": None, "ldr_raw": None, "ldr_state": None, } if self.analog is None: return data values = self.analog.update_all() # Basınç data["pressure_raw"] = values.get("pressure") data["pressure_state"] = getattr(self.analog.pressure, "state", None) # Gaz data["gas_raw"] = values.get("gas") data["gas_state"] = getattr(self.analog.gas, "state", None) data["gas_latched_alarm"] = getattr(self.analog.gas, "latched_alarm", None) # Yağmur data["rain_raw"] = values.get("rain") data["rain_state"] = getattr(self.analog.rain, "state", None) # LDR data["ldr_raw"] = values.get("ldr") data["ldr_state"] = getattr(self.analog.ldr, "state", None) return data # ---------------------------------------------------------------------- # Yüksek seviye snapshot # ---------------------------------------------------------------------- def get_snapshot(self) -> Dict[str, Any]: """ Dış sıcaklık (DS18B20) + tüm analog sensörler için tek sözlük döndürür. Örnek: { "outside_temp_c": 14.3, "pressure_raw": 512, "pressure_state": "SAFE", "gas_raw": 80, "gas_state": "SAFE", "gas_latched_alarm": False, "rain_raw": 100, "rain_state": "DRY", "ldr_raw": 900, "ldr_state": "BRIGHT", } """ snap: Dict[str, Any] = {} snap.update(self.read_outside_temp()) snap.update(self.read_analog_all()) return snap # ---------------------------------------------------------------------- # Güvenlik yardımcıları # ---------------------------------------------------------------------- def should_shutdown_system(self) -> bool: """ Analog sensör verilerine göre sistem acil kapatılmalı mı? Örn: - Gaz sızıntısı - Aşırı basınç vs. Şu an sadece gaz latched_alarm'a bakıyor. """ if self.analog is None: return False if getattr(self.analog.gas, "latched_alarm", False): return True return False # ---------------------------------------------------------------------- # Temel temsil # ---------------------------------------------------------------------- def summary(self) -> str: """ Ortamın kısa özetini döndürür. """ parts = [f"env_name={self.name}"] parts.append(f"outside_id={self.outside_id!r}") parts.append(f"outside={'OK' if self.outside is not None else 'NONE'}") parts.append(f"adc={'OK' if self.adc is not None else 'NONE'}") parts.append(f"analog={'OK' if self.analog is not None else 'NONE'}") return "BuildingEnvironment(" + ", ".join(parts) + ")" if __name__ == "__main__": # Basit demo env = BuildingEnvironment() print(env.summary()) snap = env.get_snapshot() print("Snapshot:", snap) print("Gaz shutdown mu?:", env.should_shutdown_system())