ebuild_rasp2/ebuild/core/environment.py

307 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
from __future__ import annotations
__title__ = "environment"
__author__ = 'Mehmet Karatay & "Saraswati" (ChatGPT)'
__purpose__ = "Bina dış ortam sıcaklığı (DS18B20) ve analog sensörler için ortam merkezi"
__version__ = "0.3.0"
__date__ = "2025-11-23"
"""
ebuild/core/environment.py
Amaç
-----
- Bina dış ortam sıcaklığı (DS18B20) ve MCP3008 üzerinden okunan analog sensörleri
tek bir merkezde toplamak.
- Üst seviye modüller (ör: BurnerController, sulama sistemi, alarm sistemi) bu
sınıf üzerinden dış ortam ve analog bilgilerine erişir.
Notlar
------
- Bu katman donanım erişimini kapsüller:
* io/ds18b20.py
* io/adc_mcp3008.py
* core/analog_sensors.py
"""
from typing import Dict, Optional, Any
import datetime
# Donanım bağımlı modüller
try:
from ..io.ds18b20 import DS18B20Sensor
except ImportError:
print("from ..io.ds18b20 import DS18B20Sensor NOT IMPORTED")
DS18B20Sensor = None # type: ignore
try:
from ..io.adc_mcp3008 import MCP3008ADC
except ImportError:
print("from ..io.adc_mcp3008 import MCP3008ADC NOT IMPORTED")
MCP3008ADC = None # type: ignore
try:
from .analog_sensors import AnalogSensorsHub
except ImportError:
print("from .analog_sensors import AnalogSensorsHub NOT IMPORTED")
AnalogSensorsHub = None # type: ignore
# Config HER ZAMAN ayrı import edilmeli, donanıma bağlı değil
try:
from .. import config_statics as cfgs
except ImportError as e:
print("ENV: config_statics import edilemedi:", e)
cfgs = None # type: ignore
class BuildingEnvironment:
"""
Bina çevre sensörlerini yöneten merkez.
Özellikler:
-----------
- outside : DS18B20 dış ortam sensörü (OUTSIDE_SENSOR_ID üzerinden)
- adc : MCP3008ADC örneği
- analog : AnalogSensorsHub (basınç, gaz, yağmur, LDR)
"""
def __init__(
self,
name: str = "BuildingEnvironment",
outside_sensor_id: Optional[str] = None,
use_outside_ds18: bool = True,
use_adc: bool = True,
) -> None:
#print("BuildingEnvironment Init:", name, outside_sensor_id, use_outside_ds18, use_adc)
self.name = name
# ------------------------------------------------------------------
# Dış ortam sıcaklığı (DS18B20)
# ------------------------------------------------------------------
# 1) Önce parametreden gelen ID; yoksa config_statics.OUTSIDE_SENSOR_ID
if not outside_sensor_id and cfgs is not None:
try:
outside_sensor_id = getattr(cfgs, "OUTSIDE_SENSOR_ID", None)
except Exception as e:
print("BuildingEnvironment: outside sensör ID okunamadı:", e)
outside_sensor_id = None
#print( "BuildingEnvironment get outside_sensor_id:", outside_sensor_id, "use_outside_ds18", use_outside_ds18, "use_adc", use_adc )
# 2) self.outside_id ve self.outside MUTLAKA burada tanımlanmalı
self.outside_id: str = outside_sensor_id or ""
self.outside: Optional[DS18B20Sensor] = None # type: ignore
#print("DS18B20Sensor", type(DS18B20Sensor), DS18B20Sensor)
# 3) ID makulse DS18 sensörünü yarat
if use_outside_ds18 and DS18B20Sensor is not None and len(self.outside_id) > 5:
try:
#print("BuildingEnvironment: DS18B20Sensor yaratılıyor, id =", self.outside_id)
self.outside = DS18B20Sensor(serial=self.outside_id, name="OutsideTemp")
#print("BuildingEnvironment: self.outside:", self.outside)
except Exception as e:
print("BuildingEnvironment: outside sensör oluşturulamadı:", e)
self.outside = None
else:
print(
"BuildingEnvironment: dış sensör yok veya devre dışı:",
"use_outside_ds18 =", use_outside_ds18,
"DS18B20Sensor is None =", DS18B20Sensor is None,
"outside_id =", repr(self.outside_id),
)
#print("BuildingEnvironment 5:", name, self.outside, outside_sensor_id, use_outside_ds18, use_adc)
# Dış ısı cache'i (kontrol katmanları için)
self._last_outside_temp_c: Optional[float] = None
self._last_outside_read_ts: Optional[datetime.datetime] = None
# ------------------------------------------------------------------
# MCP3008 ve analog sensör hub
# ------------------------------------------------------------------
self.adc: Optional[MCP3008ADC] = None # type: ignore
self.analog: Optional[AnalogSensorsHub] = None # type: ignore
if use_adc and MCP3008ADC is not None and AnalogSensorsHub is not None:
try:
self.adc = MCP3008ADC()
self.analog = AnalogSensorsHub(self.adc)
except Exception:
self.adc = None
self.analog = None
# ----------------------------------------------------------------------
# Okuma fonksiyonları
# ----------------------------------------------------------------------
def read_outside_temp(self) -> Dict[str, Optional[float]]:
"""
DS18B20 dış ortam sensöründen sıcaklık okur.
Dönüş:
{"outside_temp_c": 23.4, "read_ts": datetime} # veya None
"""
temp: Optional[float] = None
now = datetime.datetime.now()
if self.outside is not None:
try:
temp = self.outside.read_temperature()
except Exception:
temp = None
if temp is not None:
self._last_outside_temp_c = temp
self._last_outside_read_ts = now
return {
"outside_temp_c": temp,
"read_ts": now,
}
def get_outside_temp_cached(self, max_age_sec: int = 60) -> Optional[float]:
"""
Dış ortam sıcaklığını cache üzerinden döndürür; gerekirse sensörden günceller.
- Eğer hiç okunmamışsa → sensörden okur.
- Eğer son okuma max_age_sec saniyeden eskiyse → yeniden okur.
"""
now = datetime.datetime.now()
if (
self._last_outside_read_ts is None
or (now - self._last_outside_read_ts).total_seconds() > max_age_sec
):
snap = self.read_outside_temp()
temp = snap.get("outside_temp_c")
if temp is not None:
self._last_outside_temp_c = temp
self._last_outside_read_ts = now
return self._last_outside_temp_c
def read_analog_all(self) -> Dict[str, Any]:
"""
AnalogSensorsHub üzerinden tüm analog kanalları okur.
Dönüş sözlüğü:
{
"pressure_raw": ...,
"pressure_state": ...,
"gas_raw": ...,
"gas_state": ...,
"gas_latched_alarm": ...,
"rain_raw": ...,
"rain_state": ...,
"ldr_raw": ...,
"ldr_state": ...,
}
"""
data: Dict[str, Any] = {
"pressure_raw": None,
"pressure_state": None,
"gas_raw": None,
"gas_state": None,
"gas_latched_alarm": None,
"rain_raw": None,
"rain_state": None,
"ldr_raw": None,
"ldr_state": None,
}
if self.analog is None:
return data
values = self.analog.update_all()
# Bası
data["pressure_raw"] = values.get("pressure")
data["pressure_state"] = getattr(self.analog.pressure, "state", None)
# Gaz
data["gas_raw"] = values.get("gas")
data["gas_state"] = getattr(self.analog.gas, "state", None)
data["gas_latched_alarm"] = getattr(self.analog.gas, "latched_alarm", None)
# Yağmur
data["rain_raw"] = values.get("rain")
data["rain_state"] = getattr(self.analog.rain, "state", None)
# LDR
data["ldr_raw"] = values.get("ldr")
data["ldr_state"] = getattr(self.analog.ldr, "state", None)
return data
# ----------------------------------------------------------------------
# Yüksek seviye snapshot
# ----------------------------------------------------------------------
def get_snapshot(self) -> Dict[str, Any]:
"""
Dış sıcaklık (DS18B20) + tüm analog sensörler için tek sözlük döndürür.
Örnek:
{
"outside_temp_c": 14.3,
"pressure_raw": 512,
"pressure_state": "SAFE",
"gas_raw": 80,
"gas_state": "SAFE",
"gas_latched_alarm": False,
"rain_raw": 100,
"rain_state": "DRY",
"ldr_raw": 900,
"ldr_state": "BRIGHT",
}
"""
snap: Dict[str, Any] = {}
snap.update(self.read_outside_temp())
snap.update(self.read_analog_all())
return snap
# ----------------------------------------------------------------------
# Güvenlik yardımcıları
# ----------------------------------------------------------------------
def should_shutdown_system(self) -> bool:
"""
Analog sensör verilerine göre sistem acil kapatılmalı mı?
Örn:
- Gaz sızıntısı
- Aşırı bası
vs.
Şu an sadece gaz latched_alarm'a bakıyor.
"""
if self.analog is None:
return False
if getattr(self.analog.gas, "latched_alarm", False):
return True
return False
# ----------------------------------------------------------------------
# Temel temsil
# ----------------------------------------------------------------------
def summary(self) -> str:
"""
Ortamın kısa özetini döndürür.
"""
parts = [f"env_name={self.name}"]
parts.append(f"outside_id={self.outside_id!r}")
parts.append(f"outside={'OK' if self.outside is not None else 'NONE'}")
parts.append(f"adc={'OK' if self.adc is not None else 'NONE'}")
parts.append(f"analog={'OK' if self.analog is not None else 'NONE'}")
return "BuildingEnvironment(" + ", ".join(parts) + ")"
if __name__ == "__main__":
# Basit demo
env = BuildingEnvironment()
print(env.summary())
snap = env.get_snapshot()
print("Snapshot:", snap)
print("Gaz shutdown mu?:", env.should_shutdown_system())