Files
enviro-service/sensors.py
Alex Kazaiev baf7e417c1 Add temperature compensation and webcam mic noise sensing
- Temperature: dynamic compensation using CPU temp (factor=1.85, calibrated for eye1)
- Noise: uses arecord from webcam mic instead of broken MEMS HAT mic
- Config: added sensors section for temp_factor and noise_device
- API: now returns raw_temp and cpu_temp alongside compensated temp

Calibration: 69°F actual ambient
🦊 Christmas Eve 2025
2025-12-24 12:38:49 -06:00

311 lines
9.5 KiB
Python

"""
Sensor reading module for Pimoroni Enviro Mini HAT.
Provides unified interface to:
- BME280: Temperature, Humidity, Pressure (with CPU heat compensation)
- LTR559: Light level, Proximity
- Webcam Microphone: Noise level via arecord
"""
import time
import logging
import subprocess
import struct
from dataclasses import dataclass
from typing import Optional, Tuple
import numpy as np
logger = logging.getLogger(__name__)
# Temperature compensation factor - calibrated for eye1.local
# factor = (cpu_temp - raw_temp) / (raw_temp - actual_temp)
# Calibrated: CPU=49.4°C, Raw=30.7°C, Actual=20.6°C (69°F) → factor=1.85
TEMP_COMPENSATION_FACTOR = 1.85
# Sensor imports - will fail gracefully if not on Pi
try:
from bme280 import BME280
from smbus2 import SMBus
BME280_AVAILABLE = True
except ImportError:
BME280_AVAILABLE = False
logger.warning("BME280 library not available - using mock data")
try:
from ltr559 import LTR559
LTR559_AVAILABLE = True
except ImportError:
LTR559_AVAILABLE = False
logger.warning("LTR559 library not available - using mock data")
def get_cpu_temperature() -> float:
"""
Read CPU temperature from system.
Returns:
CPU temperature in Celsius, or 0.0 if unavailable
"""
try:
# Try vcgencmd first (Raspberry Pi specific)
result = subprocess.run(
['vcgencmd', 'measure_temp'],
capture_output=True,
text=True,
timeout=2
)
if result.returncode == 0:
# Parse "temp=49.4'C"
temp_str = result.stdout.strip()
temp = float(temp_str.replace("temp=", "").replace("'C", ""))
return temp
except (subprocess.TimeoutExpired, FileNotFoundError, ValueError):
pass
try:
# Fallback: read from thermal zone
with open('/sys/class/thermal/thermal_zone0/temp', 'r') as f:
temp = float(f.read().strip()) / 1000.0
return temp
except (FileNotFoundError, ValueError):
pass
logger.warning("Could not read CPU temperature")
return 0.0
def read_noise_from_mic(device: str = "default", duration: float = 0.5) -> float:
"""
Read noise level from webcam/USB microphone using arecord.
Args:
device: ALSA device name (default uses system default)
duration: How long to sample in seconds
Returns:
RMS noise level (0-100 scale)
"""
try:
# Use arecord to capture raw audio
# Format: signed 16-bit little-endian, mono, 16kHz
result = subprocess.run(
[
'arecord',
'-D', device,
'-f', 'S16_LE',
'-c', '1',
'-r', '16000',
'-d', str(int(duration + 0.5)), # duration in seconds (rounded up)
'-t', 'raw',
'-q', # quiet
'-' # output to stdout
],
capture_output=True,
timeout=duration + 2
)
if result.returncode != 0:
logger.warning(f"arecord failed: {result.stderr.decode()}")
return 0.0
# Parse raw audio data
raw_data = result.stdout
if len(raw_data) < 2:
return 0.0
# Convert to numpy array of int16
samples = np.frombuffer(raw_data, dtype=np.int16)
if len(samples) == 0:
return 0.0
# Calculate RMS
rms = np.sqrt(np.mean(samples.astype(np.float64)**2))
# Normalize to 0-100 scale
# int16 max is 32767, so RMS of full-scale would be ~23170
# Scale so moderate noise (~5000 RMS) is around 50
noise_level = min(100, (rms / 5000) * 50)
return noise_level
except subprocess.TimeoutExpired:
logger.warning("arecord timed out")
return 0.0
except FileNotFoundError:
logger.warning("arecord not found - noise sensing disabled")
return 0.0
except Exception as e:
logger.error(f"Error reading noise level: {e}")
return 0.0
@dataclass
class EnviroReading:
"""A complete reading from all sensors."""
timestamp: float
temperature_c: float
temperature_f: float
temperature_raw_c: float # Before compensation
cpu_temp_c: float
humidity: float
pressure: float
light: float
proximity: int
noise: float
def to_dict(self) -> dict:
return {
"timestamp": self.timestamp,
"temperature_c": round(self.temperature_c, 1),
"temperature_f": round(self.temperature_f, 1),
"temperature_raw_c": round(self.temperature_raw_c, 1),
"cpu_temp_c": round(self.cpu_temp_c, 1),
"humidity": round(self.humidity, 1),
"pressure": round(self.pressure, 1),
"light": round(self.light, 1),
"proximity": self.proximity,
"noise": round(self.noise, 1)
}
class EnviroSensors:
"""Interface to Enviro Mini HAT sensors."""
def __init__(
self,
mock_mode: bool = False,
temp_factor: float = TEMP_COMPENSATION_FACTOR,
noise_device: str = "default"
):
"""
Initialize sensors.
Args:
mock_mode: If True, return fake data (for testing without hardware)
temp_factor: Temperature compensation factor
noise_device: ALSA device for noise sensing
"""
self.mock_mode = mock_mode or not (BME280_AVAILABLE and LTR559_AVAILABLE)
self.temp_factor = temp_factor
self.noise_device = noise_device
if not self.mock_mode:
try:
# Initialize BME280 on I2C bus 1
self.bus = SMBus(1)
self.bme280 = BME280(i2c_dev=self.bus)
# Initialize LTR559
self.ltr559 = LTR559()
logger.info("Enviro sensors initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize sensors: {e}")
self.mock_mode = True
if self.mock_mode:
logger.info("Running in mock mode")
def read_temperature(self, compensate: bool = True) -> Tuple[float, float, float, float]:
"""
Read temperature in Celsius and Fahrenheit with CPU heat compensation.
Args:
compensate: Apply CPU heat compensation
Returns:
Tuple of (compensated_c, compensated_f, raw_c, cpu_c)
"""
if self.mock_mode:
raw_c = 20.0 + np.random.uniform(-2, 2)
cpu_c = 45.0 + np.random.uniform(-5, 5)
else:
raw_c = self.bme280.get_temperature()
cpu_c = get_cpu_temperature()
# Apply compensation: temp = raw - ((cpu - raw) / factor)
if compensate and cpu_c > 0 and self.temp_factor > 0:
compensated_c = raw_c - ((cpu_c - raw_c) / self.temp_factor)
else:
compensated_c = raw_c
compensated_f = (compensated_c * 9/5) + 32
return compensated_c, compensated_f, raw_c, cpu_c
def read_humidity(self) -> float:
"""Read relative humidity percentage."""
if self.mock_mode:
return 45.0 + np.random.uniform(-5, 5)
return self.bme280.get_humidity()
def read_pressure(self) -> float:
"""Read atmospheric pressure in hPa."""
if self.mock_mode:
return 1013.25 + np.random.uniform(-10, 10)
return self.bme280.get_pressure()
def read_light(self) -> float:
"""Read light level in lux."""
if self.mock_mode:
return 100.0 + np.random.uniform(-20, 20)
return self.ltr559.get_lux()
def read_proximity(self) -> int:
"""Read proximity sensor value (0-2047)."""
if self.mock_mode:
return int(np.random.uniform(0, 100))
return self.ltr559.get_proximity()
def read_noise(self, duration: float = 0.5) -> float:
"""
Read noise level from webcam microphone.
Args:
duration: How long to sample in seconds
Returns:
RMS noise level (0-100 scale)
"""
if self.mock_mode:
return np.random.uniform(20, 40)
return read_noise_from_mic(device=self.noise_device, duration=duration)
def read_all(self) -> EnviroReading:
"""Get a complete reading from all sensors."""
temp_c, temp_f, raw_c, cpu_c = self.read_temperature()
return EnviroReading(
timestamp=time.time(),
temperature_c=temp_c,
temperature_f=temp_f,
temperature_raw_c=raw_c,
cpu_temp_c=cpu_c,
humidity=self.read_humidity(),
pressure=self.read_pressure(),
light=self.read_light(),
proximity=self.read_proximity(),
noise=self.read_noise()
)
# Singleton instance
_sensors: Optional[EnviroSensors] = None
def get_sensors(
mock_mode: bool = False,
temp_factor: float = TEMP_COMPENSATION_FACTOR,
noise_device: str = "default"
) -> EnviroSensors:
"""Get or create the sensors instance."""
global _sensors
if _sensors is None:
_sensors = EnviroSensors(
mock_mode=mock_mode,
temp_factor=temp_factor,
noise_device=noise_device
)
return _sensors