Files
enviro-service/sensors.py
2025-12-24 11:17:56 -06:00

186 lines
5.5 KiB
Python

"""
Sensor reading module for Pimoroni Enviro Mini HAT.
Provides unified interface to:
- BME280: Temperature, Humidity, Pressure
- LTR559: Light level, Proximity
- MEMS Microphone: Noise level
"""
import time
import logging
from dataclasses import dataclass
from typing import Optional
import numpy as np
logger = logging.getLogger(__name__)
# Sensor imports - will fail gracefully if not on Pi
try:
from bme280 import BME280
from smbus2 import SMBus
BME280_AVAILABLE = True
except ImportError:
BME280_AVAILABLE = False
logger.warning("BME280 library not available - using mock data")
try:
from ltr559 import LTR559
LTR559_AVAILABLE = True
except ImportError:
LTR559_AVAILABLE = False
logger.warning("LTR559 library not available - using mock data")
try:
import sounddevice as sd
SOUNDDEVICE_AVAILABLE = True
except ImportError:
SOUNDDEVICE_AVAILABLE = False
logger.warning("sounddevice not available - using mock data")
@dataclass
class EnviroReading:
"""A complete reading from all sensors."""
timestamp: float
temperature_c: float
temperature_f: float
humidity: float
pressure: float
light: float
proximity: int
noise: float
def to_dict(self) -> dict:
return {
"timestamp": self.timestamp,
"temperature_c": round(self.temperature_c, 1),
"temperature_f": round(self.temperature_f, 1),
"humidity": round(self.humidity, 1),
"pressure": round(self.pressure, 1),
"light": round(self.light, 1),
"proximity": self.proximity,
"noise": round(self.noise, 1)
}
class EnviroSensors:
"""Interface to Enviro Mini HAT sensors."""
def __init__(self, mock_mode: bool = False):
"""
Initialize sensors.
Args:
mock_mode: If True, return fake data (for testing without hardware)
"""
self.mock_mode = mock_mode or not (BME280_AVAILABLE and LTR559_AVAILABLE)
if not self.mock_mode:
try:
# Initialize BME280 on I2C bus 1
self.bus = SMBus(1)
self.bme280 = BME280(i2c_dev=self.bus)
# Initialize LTR559
self.ltr559 = LTR559()
logger.info("Enviro sensors initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize sensors: {e}")
self.mock_mode = True
if self.mock_mode:
logger.info("Running in mock mode")
def read_temperature(self) -> tuple[float, float]:
"""Read temperature in Celsius and Fahrenheit."""
if self.mock_mode:
temp_c = 20.0 + np.random.uniform(-2, 2)
else:
temp_c = self.bme280.get_temperature()
temp_f = (temp_c * 9/5) + 32
return temp_c, temp_f
def read_humidity(self) -> float:
"""Read relative humidity percentage."""
if self.mock_mode:
return 45.0 + np.random.uniform(-5, 5)
return self.bme280.get_humidity()
def read_pressure(self) -> float:
"""Read atmospheric pressure in hPa."""
if self.mock_mode:
return 1013.25 + np.random.uniform(-10, 10)
return self.bme280.get_pressure()
def read_light(self) -> float:
"""Read light level in lux."""
if self.mock_mode:
return 100.0 + np.random.uniform(-20, 20)
return self.ltr559.get_lux()
def read_proximity(self) -> int:
"""Read proximity sensor value (0-2047)."""
if self.mock_mode:
return int(np.random.uniform(0, 100))
return self.ltr559.get_proximity()
def read_noise(self, duration: float = 0.5) -> float:
"""
Read noise level from microphone.
Args:
duration: How long to sample in seconds
Returns:
RMS noise level (0-100 scale)
"""
if self.mock_mode or not SOUNDDEVICE_AVAILABLE:
return np.random.uniform(20, 40)
try:
# Record audio sample
sample_rate = 16000
samples = int(duration * sample_rate)
recording = sd.rec(samples, samplerate=sample_rate, channels=1, dtype='float32')
sd.wait()
# Calculate RMS
rms = np.sqrt(np.mean(recording**2))
# Convert to 0-100 scale (rough approximation)
noise_level = min(100, rms * 1000)
return noise_level
except Exception as e:
logger.error(f"Error reading noise level: {e}")
return 0.0
def read_all(self) -> EnviroReading:
"""Get a complete reading from all sensors."""
temp_c, temp_f = self.read_temperature()
return EnviroReading(
timestamp=time.time(),
temperature_c=temp_c,
temperature_f=temp_f,
humidity=self.read_humidity(),
pressure=self.read_pressure(),
light=self.read_light(),
proximity=self.read_proximity(),
noise=self.read_noise()
)
# Singleton instance
_sensors: Optional[EnviroSensors] = None
def get_sensors(mock_mode: bool = False) -> EnviroSensors:
"""Get or create the sensors instance."""
global _sensors
if _sensors is None:
_sensors = EnviroSensors(mock_mode=mock_mode)
return _sensors