Add temperature compensation and webcam mic noise sensing

- Temperature: dynamic compensation using CPU temp (factor=1.85, calibrated for eye1)
- Noise: uses arecord from webcam mic instead of broken MEMS HAT mic
- Config: added sensors section for temp_factor and noise_device
- API: now returns raw_temp and cpu_temp alongside compensated temp

Calibration: 69°F actual ambient
🦊 Christmas Eve 2025
This commit is contained in:
Alex Kazaiev
2025-12-24 12:38:49 -06:00
parent 524c37a8c4
commit baf7e417c1
3 changed files with 184 additions and 41 deletions

View File

@@ -11,6 +11,18 @@ database:
sampling: sampling:
interval_seconds: 60 # How often to read sensors interval_seconds: 60 # How often to read sensors
sensors:
# Temperature compensation factor for CPU heat bleed
# Calibrate by comparing raw sensor temp to actual ambient
# factor = (cpu_temp - raw_temp) / (raw_temp - actual_temp)
# eye1.local calibration: CPU=49.4°C, Raw=30.7°C, Actual=20.6°C → factor=1.85
temp_compensation_factor: 1.85
# ALSA device for noise sensing (webcam mic)
# Use "default" for system default, or specify like "hw:1,0"
# Run "arecord -l" to list available devices
noise_device: "default"
lcd: lcd:
enabled: true enabled: true
brightness: 0.5 brightness: 0.5

View File

@@ -46,6 +46,7 @@ def load_config(config_path: str = "config.yaml") -> dict:
"server": {"host": "0.0.0.0", "port": 8767}, "server": {"host": "0.0.0.0", "port": 8767},
"database": {"path": "enviro_history.db", "retention_hours": 168}, "database": {"path": "enviro_history.db", "retention_hours": 168},
"sampling": {"interval_seconds": 60}, "sampling": {"interval_seconds": 60},
"sensors": {"temp_compensation_factor": 1.85, "noise_device": "default"},
"lcd": {"enabled": True, "brightness": 0.5, "default_message": "🦊 Vixy"}, "lcd": {"enabled": True, "brightness": 0.5, "default_message": "🦊 Vixy"},
"mock_mode": False "mock_mode": False
} }
@@ -154,7 +155,12 @@ async def lifespan(app: FastAPI):
mock_mode = config.get("mock_mode", False) mock_mode = config.get("mock_mode", False)
# Initialize components # Initialize components
sensors = get_sensors(mock_mode=mock_mode) sensor_config = config.get("sensors", {})
sensors = get_sensors(
mock_mode=mock_mode,
temp_factor=sensor_config.get("temp_compensation_factor", 1.85),
noise_device=sensor_config.get("noise_device", "default")
)
db = await get_database( db = await get_database(
db_path=config["database"]["path"], db_path=config["database"]["path"],
retention_hours=config["database"]["retention_hours"] retention_hours=config["database"]["retention_hours"]

View File

@@ -2,19 +2,26 @@
Sensor reading module for Pimoroni Enviro Mini HAT. Sensor reading module for Pimoroni Enviro Mini HAT.
Provides unified interface to: Provides unified interface to:
- BME280: Temperature, Humidity, Pressure - BME280: Temperature, Humidity, Pressure (with CPU heat compensation)
- LTR559: Light level, Proximity - LTR559: Light level, Proximity
- MEMS Microphone: Noise level - Webcam Microphone: Noise level via arecord
""" """
import time import time
import logging import logging
import subprocess
import struct
from dataclasses import dataclass from dataclasses import dataclass
from typing import Optional from typing import Optional, Tuple
import numpy as np import numpy as np
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Temperature compensation factor - calibrated for eye1.local
# factor = (cpu_temp - raw_temp) / (raw_temp - actual_temp)
# Calibrated: CPU=49.4°C, Raw=30.7°C, Actual=20.6°C (69°F) → factor=1.85
TEMP_COMPENSATION_FACTOR = 1.85
# Sensor imports - will fail gracefully if not on Pi # Sensor imports - will fail gracefully if not on Pi
try: try:
from bme280 import BME280 from bme280 import BME280
@@ -31,12 +38,106 @@ except ImportError:
LTR559_AVAILABLE = False LTR559_AVAILABLE = False
logger.warning("LTR559 library not available - using mock data") logger.warning("LTR559 library not available - using mock data")
def get_cpu_temperature() -> float:
"""
Read CPU temperature from system.
Returns:
CPU temperature in Celsius, or 0.0 if unavailable
"""
try: try:
import sounddevice as sd # Try vcgencmd first (Raspberry Pi specific)
SOUNDDEVICE_AVAILABLE = True result = subprocess.run(
except ImportError: ['vcgencmd', 'measure_temp'],
SOUNDDEVICE_AVAILABLE = False capture_output=True,
logger.warning("sounddevice not available - using mock data") text=True,
timeout=2
)
if result.returncode == 0:
# Parse "temp=49.4'C"
temp_str = result.stdout.strip()
temp = float(temp_str.replace("temp=", "").replace("'C", ""))
return temp
except (subprocess.TimeoutExpired, FileNotFoundError, ValueError):
pass
try:
# Fallback: read from thermal zone
with open('/sys/class/thermal/thermal_zone0/temp', 'r') as f:
temp = float(f.read().strip()) / 1000.0
return temp
except (FileNotFoundError, ValueError):
pass
logger.warning("Could not read CPU temperature")
return 0.0
def read_noise_from_mic(device: str = "default", duration: float = 0.5) -> float:
"""
Read noise level from webcam/USB microphone using arecord.
Args:
device: ALSA device name (default uses system default)
duration: How long to sample in seconds
Returns:
RMS noise level (0-100 scale)
"""
try:
# Use arecord to capture raw audio
# Format: signed 16-bit little-endian, mono, 16kHz
result = subprocess.run(
[
'arecord',
'-D', device,
'-f', 'S16_LE',
'-c', '1',
'-r', '16000',
'-d', str(int(duration + 0.5)), # duration in seconds (rounded up)
'-t', 'raw',
'-q', # quiet
'-' # output to stdout
],
capture_output=True,
timeout=duration + 2
)
if result.returncode != 0:
logger.warning(f"arecord failed: {result.stderr.decode()}")
return 0.0
# Parse raw audio data
raw_data = result.stdout
if len(raw_data) < 2:
return 0.0
# Convert to numpy array of int16
samples = np.frombuffer(raw_data, dtype=np.int16)
if len(samples) == 0:
return 0.0
# Calculate RMS
rms = np.sqrt(np.mean(samples.astype(np.float64)**2))
# Normalize to 0-100 scale
# int16 max is 32767, so RMS of full-scale would be ~23170
# Scale so moderate noise (~5000 RMS) is around 50
noise_level = min(100, (rms / 5000) * 50)
return noise_level
except subprocess.TimeoutExpired:
logger.warning("arecord timed out")
return 0.0
except FileNotFoundError:
logger.warning("arecord not found - noise sensing disabled")
return 0.0
except Exception as e:
logger.error(f"Error reading noise level: {e}")
return 0.0
@dataclass @dataclass
@@ -45,6 +146,8 @@ class EnviroReading:
timestamp: float timestamp: float
temperature_c: float temperature_c: float
temperature_f: float temperature_f: float
temperature_raw_c: float # Before compensation
cpu_temp_c: float
humidity: float humidity: float
pressure: float pressure: float
light: float light: float
@@ -56,6 +159,8 @@ class EnviroReading:
"timestamp": self.timestamp, "timestamp": self.timestamp,
"temperature_c": round(self.temperature_c, 1), "temperature_c": round(self.temperature_c, 1),
"temperature_f": round(self.temperature_f, 1), "temperature_f": round(self.temperature_f, 1),
"temperature_raw_c": round(self.temperature_raw_c, 1),
"cpu_temp_c": round(self.cpu_temp_c, 1),
"humidity": round(self.humidity, 1), "humidity": round(self.humidity, 1),
"pressure": round(self.pressure, 1), "pressure": round(self.pressure, 1),
"light": round(self.light, 1), "light": round(self.light, 1),
@@ -67,14 +172,23 @@ class EnviroReading:
class EnviroSensors: class EnviroSensors:
"""Interface to Enviro Mini HAT sensors.""" """Interface to Enviro Mini HAT sensors."""
def __init__(self, mock_mode: bool = False): def __init__(
self,
mock_mode: bool = False,
temp_factor: float = TEMP_COMPENSATION_FACTOR,
noise_device: str = "default"
):
""" """
Initialize sensors. Initialize sensors.
Args: Args:
mock_mode: If True, return fake data (for testing without hardware) mock_mode: If True, return fake data (for testing without hardware)
temp_factor: Temperature compensation factor
noise_device: ALSA device for noise sensing
""" """
self.mock_mode = mock_mode or not (BME280_AVAILABLE and LTR559_AVAILABLE) self.mock_mode = mock_mode or not (BME280_AVAILABLE and LTR559_AVAILABLE)
self.temp_factor = temp_factor
self.noise_device = noise_device
if not self.mock_mode: if not self.mock_mode:
try: try:
@@ -93,15 +207,32 @@ class EnviroSensors:
if self.mock_mode: if self.mock_mode:
logger.info("Running in mock mode") logger.info("Running in mock mode")
def read_temperature(self) -> tuple[float, float]: def read_temperature(self, compensate: bool = True) -> Tuple[float, float, float, float]:
"""Read temperature in Celsius and Fahrenheit.""" """
if self.mock_mode: Read temperature in Celsius and Fahrenheit with CPU heat compensation.
temp_c = 20.0 + np.random.uniform(-2, 2)
else:
temp_c = self.bme280.get_temperature()
temp_f = (temp_c * 9/5) + 32 Args:
return temp_c, temp_f compensate: Apply CPU heat compensation
Returns:
Tuple of (compensated_c, compensated_f, raw_c, cpu_c)
"""
if self.mock_mode:
raw_c = 20.0 + np.random.uniform(-2, 2)
cpu_c = 45.0 + np.random.uniform(-5, 5)
else:
raw_c = self.bme280.get_temperature()
cpu_c = get_cpu_temperature()
# Apply compensation: temp = raw - ((cpu - raw) / factor)
if compensate and cpu_c > 0 and self.temp_factor > 0:
compensated_c = raw_c - ((cpu_c - raw_c) / self.temp_factor)
else:
compensated_c = raw_c
compensated_f = (compensated_c * 9/5) + 32
return compensated_c, compensated_f, raw_c, cpu_c
def read_humidity(self) -> float: def read_humidity(self) -> float:
"""Read relative humidity percentage.""" """Read relative humidity percentage."""
@@ -129,7 +260,7 @@ class EnviroSensors:
def read_noise(self, duration: float = 0.5) -> float: def read_noise(self, duration: float = 0.5) -> float:
""" """
Read noise level from microphone. Read noise level from webcam microphone.
Args: Args:
duration: How long to sample in seconds duration: How long to sample in seconds
@@ -137,35 +268,21 @@ class EnviroSensors:
Returns: Returns:
RMS noise level (0-100 scale) RMS noise level (0-100 scale)
""" """
if self.mock_mode or not SOUNDDEVICE_AVAILABLE: if self.mock_mode:
return np.random.uniform(20, 40) return np.random.uniform(20, 40)
try: return read_noise_from_mic(device=self.noise_device, duration=duration)
# Record audio sample
sample_rate = 16000
samples = int(duration * sample_rate)
recording = sd.rec(samples, samplerate=sample_rate, channels=1, dtype='float32')
sd.wait()
# Calculate RMS
rms = np.sqrt(np.mean(recording**2))
# Convert to 0-100 scale (rough approximation)
noise_level = min(100, rms * 1000)
return noise_level
except Exception as e:
logger.error(f"Error reading noise level: {e}")
return 0.0
def read_all(self) -> EnviroReading: def read_all(self) -> EnviroReading:
"""Get a complete reading from all sensors.""" """Get a complete reading from all sensors."""
temp_c, temp_f = self.read_temperature() temp_c, temp_f, raw_c, cpu_c = self.read_temperature()
return EnviroReading( return EnviroReading(
timestamp=time.time(), timestamp=time.time(),
temperature_c=temp_c, temperature_c=temp_c,
temperature_f=temp_f, temperature_f=temp_f,
temperature_raw_c=raw_c,
cpu_temp_c=cpu_c,
humidity=self.read_humidity(), humidity=self.read_humidity(),
pressure=self.read_pressure(), pressure=self.read_pressure(),
light=self.read_light(), light=self.read_light(),
@@ -177,9 +294,17 @@ class EnviroSensors:
# Singleton instance # Singleton instance
_sensors: Optional[EnviroSensors] = None _sensors: Optional[EnviroSensors] = None
def get_sensors(mock_mode: bool = False) -> EnviroSensors: def get_sensors(
mock_mode: bool = False,
temp_factor: float = TEMP_COMPENSATION_FACTOR,
noise_device: str = "default"
) -> EnviroSensors:
"""Get or create the sensors instance.""" """Get or create the sensors instance."""
global _sensors global _sensors
if _sensors is None: if _sensors is None:
_sensors = EnviroSensors(mock_mode=mock_mode) _sensors = EnviroSensors(
mock_mode=mock_mode,
temp_factor=temp_factor,
noise_device=noise_device
)
return _sensors return _sensors