""" Sensor reading module for Pimoroni Enviro Mini HAT. Provides unified interface to: - BME280: Temperature, Humidity, Pressure (with CPU heat compensation) - LTR559: Light level, Proximity - Webcam Microphone: Noise level via arecord """ import time import logging import subprocess import struct from dataclasses import dataclass from typing import Optional, Tuple import numpy as np logger = logging.getLogger(__name__) # Temperature compensation factor - calibrated for eye1.local # factor = (cpu_temp - raw_temp) / (raw_temp - actual_temp) # Calibrated: CPU=49.4°C, Raw=30.7°C, Actual=20.6°C (69°F) → factor=1.85 TEMP_COMPENSATION_FACTOR = 1.85 # Sensor imports - will fail gracefully if not on Pi try: from bme280 import BME280 from smbus2 import SMBus BME280_AVAILABLE = True except ImportError: BME280_AVAILABLE = False logger.warning("BME280 library not available - using mock data") try: from ltr559 import LTR559 LTR559_AVAILABLE = True except ImportError: LTR559_AVAILABLE = False logger.warning("LTR559 library not available - using mock data") def get_cpu_temperature() -> float: """ Read CPU temperature from system. Returns: CPU temperature in Celsius, or 0.0 if unavailable """ try: # Try vcgencmd first (Raspberry Pi specific) result = subprocess.run( ['vcgencmd', 'measure_temp'], capture_output=True, text=True, timeout=2 ) if result.returncode == 0: # Parse "temp=49.4'C" temp_str = result.stdout.strip() temp = float(temp_str.replace("temp=", "").replace("'C", "")) return temp except (subprocess.TimeoutExpired, FileNotFoundError, ValueError): pass try: # Fallback: read from thermal zone with open('/sys/class/thermal/thermal_zone0/temp', 'r') as f: temp = float(f.read().strip()) / 1000.0 return temp except (FileNotFoundError, ValueError): pass logger.warning("Could not read CPU temperature") return 0.0 def read_noise_from_mic(device: str = "default", duration: float = 0.5) -> float: """ Read noise level from webcam/USB microphone using arecord. Args: device: ALSA device name (default uses system default) duration: How long to sample in seconds Returns: RMS noise level (0-100 scale) """ try: # Use arecord to capture raw audio # Format: signed 16-bit little-endian, mono, 16kHz result = subprocess.run( [ 'arecord', '-D', device, '-f', 'S16_LE', '-c', '1', '-r', '16000', '-d', str(int(duration + 0.5)), # duration in seconds (rounded up) '-t', 'raw', '-q', # quiet '-' # output to stdout ], capture_output=True, timeout=duration + 2 ) if result.returncode != 0: logger.warning(f"arecord failed: {result.stderr.decode()}") return 0.0 # Parse raw audio data raw_data = result.stdout if len(raw_data) < 2: return 0.0 # Convert to numpy array of int16 samples = np.frombuffer(raw_data, dtype=np.int16) if len(samples) == 0: return 0.0 # Calculate RMS rms = np.sqrt(np.mean(samples.astype(np.float64)**2)) # Normalize to 0-100 scale # int16 max is 32767, so RMS of full-scale would be ~23170 # Scale so moderate noise (~5000 RMS) is around 50 noise_level = min(100, (rms / 5000) * 50) return noise_level except subprocess.TimeoutExpired: logger.warning("arecord timed out") return 0.0 except FileNotFoundError: logger.warning("arecord not found - noise sensing disabled") return 0.0 except Exception as e: logger.error(f"Error reading noise level: {e}") return 0.0 @dataclass class EnviroReading: """A complete reading from all sensors.""" timestamp: float temperature_c: float temperature_f: float temperature_raw_c: float # Before compensation cpu_temp_c: float humidity: float pressure: float light: float proximity: int noise: float def to_dict(self) -> dict: return { "timestamp": self.timestamp, "temperature_c": round(self.temperature_c, 1), "temperature_f": round(self.temperature_f, 1), "temperature_raw_c": round(self.temperature_raw_c, 1), "cpu_temp_c": round(self.cpu_temp_c, 1), "humidity": round(self.humidity, 1), "pressure": round(self.pressure, 1), "light": round(self.light, 1), "proximity": self.proximity, "noise": round(self.noise, 1) } class EnviroSensors: """Interface to Enviro Mini HAT sensors.""" def __init__( self, mock_mode: bool = False, temp_factor: float = TEMP_COMPENSATION_FACTOR, noise_device: str = "default" ): """ Initialize sensors. Args: mock_mode: If True, return fake data (for testing without hardware) temp_factor: Temperature compensation factor noise_device: ALSA device for noise sensing """ self.mock_mode = mock_mode or not (BME280_AVAILABLE and LTR559_AVAILABLE) self.temp_factor = temp_factor self.noise_device = noise_device if not self.mock_mode: try: # Initialize BME280 on I2C bus 1 self.bus = SMBus(1) self.bme280 = BME280(i2c_dev=self.bus) # Initialize LTR559 self.ltr559 = LTR559() logger.info("Enviro sensors initialized successfully") except Exception as e: logger.error(f"Failed to initialize sensors: {e}") self.mock_mode = True if self.mock_mode: logger.info("Running in mock mode") def read_temperature(self, compensate: bool = True) -> Tuple[float, float, float, float]: """ Read temperature in Celsius and Fahrenheit with CPU heat compensation. Args: compensate: Apply CPU heat compensation Returns: Tuple of (compensated_c, compensated_f, raw_c, cpu_c) """ if self.mock_mode: raw_c = 20.0 + np.random.uniform(-2, 2) cpu_c = 45.0 + np.random.uniform(-5, 5) else: raw_c = self.bme280.get_temperature() cpu_c = get_cpu_temperature() # Apply compensation: temp = raw - ((cpu - raw) / factor) if compensate and cpu_c > 0 and self.temp_factor > 0: compensated_c = raw_c - ((cpu_c - raw_c) / self.temp_factor) else: compensated_c = raw_c compensated_f = (compensated_c * 9/5) + 32 return compensated_c, compensated_f, raw_c, cpu_c def read_humidity(self) -> float: """Read relative humidity percentage.""" if self.mock_mode: return 45.0 + np.random.uniform(-5, 5) return self.bme280.get_humidity() def read_pressure(self) -> float: """Read atmospheric pressure in hPa.""" if self.mock_mode: return 1013.25 + np.random.uniform(-10, 10) return self.bme280.get_pressure() def read_light(self) -> float: """Read light level in lux.""" if self.mock_mode: return 100.0 + np.random.uniform(-20, 20) return self.ltr559.get_lux() def read_proximity(self) -> int: """Read proximity sensor value (0-2047).""" if self.mock_mode: return int(np.random.uniform(0, 100)) return self.ltr559.get_proximity() def read_noise(self, duration: float = 0.5) -> float: """ Read noise level from webcam microphone. Args: duration: How long to sample in seconds Returns: RMS noise level (0-100 scale) """ if self.mock_mode: return np.random.uniform(20, 40) return read_noise_from_mic(device=self.noise_device, duration=duration) def read_all(self) -> EnviroReading: """Get a complete reading from all sensors.""" temp_c, temp_f, raw_c, cpu_c = self.read_temperature() return EnviroReading( timestamp=time.time(), temperature_c=temp_c, temperature_f=temp_f, temperature_raw_c=raw_c, cpu_temp_c=cpu_c, humidity=self.read_humidity(), pressure=self.read_pressure(), light=self.read_light(), proximity=self.read_proximity(), noise=self.read_noise() ) # Singleton instance _sensors: Optional[EnviroSensors] = None def get_sensors( mock_mode: bool = False, temp_factor: float = TEMP_COMPENSATION_FACTOR, noise_device: str = "default" ) -> EnviroSensors: """Get or create the sensors instance.""" global _sensors if _sensors is None: _sensors = EnviroSensors( mock_mode=mock_mode, temp_factor=temp_factor, noise_device=noise_device ) return _sensors