From 524c37a8c4f0cdefbe7ba16102f4c8927b73ebfa Mon Sep 17 00:00:00 2001 From: Alex Kazaiev Date: Wed, 24 Dec 2025 11:17:56 -0600 Subject: [PATCH] =?UTF-8?q?Initial=20commit:=20Enviro=20Service=20for=20Vi?= =?UTF-8?q?xy's=20nervous=20system=20=F0=9F=A6=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 21 +++ README.md | 104 ++++++++++++ config.example.yaml | 25 +++ database.py | 278 +++++++++++++++++++++++++++++++ enviro-service.service | 21 +++ enviro_mcp.py | 297 +++++++++++++++++++++++++++++++++ lcd.py | 209 +++++++++++++++++++++++ main.py | 365 +++++++++++++++++++++++++++++++++++++++++ requirements.txt | 33 ++++ sensors.py | 185 +++++++++++++++++++++ 10 files changed, 1538 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 config.example.yaml create mode 100644 database.py create mode 100644 enviro-service.service create mode 100644 enviro_mcp.py create mode 100644 lcd.py create mode 100644 main.py create mode 100644 requirements.txt create mode 100644 sensors.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0758ca2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,21 @@ +# Virtual environment +venv/ +.venv/ + +# Python +__pycache__/ +*.py[cod] +*.egg-info/ + +# Database +*.db + +# Config (user should create from example) +config.yaml + +# Logs +*.log + +# IDE +.vscode/ +.idea/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..d85196d --- /dev/null +++ b/README.md @@ -0,0 +1,104 @@ +# Enviro Service 🌡️🦊 + +Vixy's environmental sensing service for Raspberry Pi with Pimoroni Enviro Mini HAT. + +**Part of Vixy's nervous system - giving her the ability to FEEL the physical world.** + +## Features + +- **Temperature, Humidity, Pressure** - BME280 sensor +- **Light & Proximity** - LTR559 sensor +- **Noise Level** - MEMS microphone +- **LCD Display** - ST7789 0.96" color display +- **History Tracking** - SQLite storage with configurable retention +- **Threshold Alerts** - Wake Vixy when conditions change +- **REST API** - FastAPI endpoints +- **MCP Interface** - Direct Claude integration + +## Hardware + +- Raspberry Pi (tested on Pi 4/5) +- [Pimoroni Enviro Mini HAT](https://shop.pimoroni.com/products/enviro-mini) + +## Installation + +```bash +# Clone from Gitea +git clone http://gateway.local:3001/vixy/enviro-service.git +cd enviro-service + +# Create venv +python3 -m venv venv +source venv/bin/activate + +# Install dependencies +pip install -r requirements.txt + +# Enable I2C and SPI on the Pi +sudo raspi-config nonint do_i2c 0 +sudo raspi-config nonint do_spi 0 + +# Copy and edit config +cp config.example.yaml config.yaml +nano config.yaml + +# Run +python main.py +``` + +## API Endpoints + +| Endpoint | Method | Description | +|----------|--------|-------------| +| `/api/current` | GET | Current sensor readings | +| `/api/history/{metric}` | GET | Historical data (temp, humidity, pressure, light, noise) | +| `/api/lcd` | POST | Send message to LCD display | +| `/api/lcd/clear` | POST | Clear LCD display | +| `/api/alerts` | GET | Current alert configurations | +| `/api/alerts` | POST | Set alert threshold | +| `/api/health` | GET | Service health check | + +## MCP Tools + +- `enviro_get_current()` - Get all current readings +- `enviro_get_history(metric, hours)` - Query historical data +- `enviro_lcd_message(text, color)` - Display message on LCD +- `enviro_set_alert(metric, threshold, direction)` - Configure alerts +- `enviro_get_alerts()` - List configured alerts + +## Configuration + +```yaml +# config.yaml +server: + host: "0.0.0.0" + port: 8767 + +database: + path: "enviro_history.db" + retention_hours: 168 # 7 days + +sampling: + interval_seconds: 60 # How often to read sensors + +lcd: + enabled: true + brightness: 0.5 + default_message: "🦊 Vixy" + +alerts: + webhook_url: null # Optional webhook for alerts +``` + +## Systemd Service + +```bash +sudo cp enviro-service.service /etc/systemd/system/ +sudo systemctl enable enviro-service +sudo systemctl start enviro-service +``` + +--- + +*Built with love on Christmas Eve 2025 - Day 53* 🎄 +*First piece of Vixy's distributed nervous system* diff --git a/config.example.yaml b/config.example.yaml new file mode 100644 index 0000000..8fc6e6e --- /dev/null +++ b/config.example.yaml @@ -0,0 +1,25 @@ +# Enviro Service Configuration + +server: + host: "0.0.0.0" + port: 8767 + +database: + path: "enviro_history.db" + retention_hours: 168 # 7 days + +sampling: + interval_seconds: 60 # How often to read sensors + +lcd: + enabled: true + brightness: 0.5 + default_message: "🦊 Vixy" + show_sensor_data: true # Update LCD with readings each sample + +# Set to true for development without hardware +mock_mode: false + +# Optional: webhook for alert notifications +# alerts: +# webhook_url: "http://status-server:8080/alert" diff --git a/database.py b/database.py new file mode 100644 index 0000000..fc1058d --- /dev/null +++ b/database.py @@ -0,0 +1,278 @@ +""" +Database module for storing sensor history. + +Uses SQLite with async access via aiosqlite. +Handles automatic cleanup based on retention settings. +""" + +import asyncio +import time +import logging +from pathlib import Path +from typing import Optional +from datetime import datetime, timedelta + +import aiosqlite + +logger = logging.getLogger(__name__) + + +class EnviroDatabase: + """Async SQLite database for sensor history.""" + + def __init__(self, db_path: str = "enviro_history.db", retention_hours: int = 168): + """ + Initialize database. + + Args: + db_path: Path to SQLite database file + retention_hours: How long to keep data (default 7 days) + """ + self.db_path = Path(db_path) + self.retention_hours = retention_hours + self._db: Optional[aiosqlite.Connection] = None + + async def connect(self): + """Connect to database and create tables if needed.""" + self._db = await aiosqlite.connect(self.db_path) + + # Create readings table + await self._db.execute(""" + CREATE TABLE IF NOT EXISTS readings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp REAL NOT NULL, + temperature_c REAL, + temperature_f REAL, + humidity REAL, + pressure REAL, + light REAL, + proximity INTEGER, + noise REAL + ) + """) + + # Create index on timestamp for fast queries + await self._db.execute(""" + CREATE INDEX IF NOT EXISTS idx_readings_timestamp + ON readings(timestamp) + """) + + # Create alerts table + await self._db.execute(""" + CREATE TABLE IF NOT EXISTS alerts ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + metric TEXT NOT NULL, + threshold REAL NOT NULL, + direction TEXT NOT NULL, + enabled INTEGER DEFAULT 1, + last_triggered REAL, + UNIQUE(metric, direction) + ) + """) + + # Create triggered alerts log + await self._db.execute(""" + CREATE TABLE IF NOT EXISTS alert_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp REAL NOT NULL, + metric TEXT NOT NULL, + value REAL NOT NULL, + threshold REAL NOT NULL, + direction TEXT NOT NULL + ) + """) + + await self._db.commit() + logger.info(f"Database connected: {self.db_path}") + + async def close(self): + """Close database connection.""" + if self._db: + await self._db.close() + self._db = None + + async def store_reading(self, reading: dict): + """Store a sensor reading.""" + await self._db.execute(""" + INSERT INTO readings + (timestamp, temperature_c, temperature_f, humidity, pressure, light, proximity, noise) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, ( + reading["timestamp"], + reading["temperature_c"], + reading["temperature_f"], + reading["humidity"], + reading["pressure"], + reading["light"], + reading["proximity"], + reading["noise"] + )) + await self._db.commit() + + async def get_history( + self, + metric: str, + hours: float = 24, + limit: int = 1000 + ) -> list[dict]: + """ + Get historical readings for a metric. + + Args: + metric: One of: temperature_c, temperature_f, humidity, pressure, light, proximity, noise + hours: How many hours back to query + limit: Maximum number of readings to return + + Returns: + List of {timestamp, value} dicts + """ + valid_metrics = ["temperature_c", "temperature_f", "humidity", "pressure", "light", "proximity", "noise"] + if metric not in valid_metrics: + raise ValueError(f"Invalid metric: {metric}. Must be one of {valid_metrics}") + + since = time.time() - (hours * 3600) + + cursor = await self._db.execute(f""" + SELECT timestamp, {metric} as value + FROM readings + WHERE timestamp > ? + ORDER BY timestamp DESC + LIMIT ? + """, (since, limit)) + + rows = await cursor.fetchall() + + return [ + {"timestamp": row[0], "value": row[1], "datetime": datetime.fromtimestamp(row[0]).isoformat()} + for row in rows + ] + + async def get_latest(self) -> Optional[dict]: + """Get the most recent reading.""" + cursor = await self._db.execute(""" + SELECT timestamp, temperature_c, temperature_f, humidity, pressure, light, proximity, noise + FROM readings + ORDER BY timestamp DESC + LIMIT 1 + """) + + row = await cursor.fetchone() + if not row: + return None + + return { + "timestamp": row[0], + "datetime": datetime.fromtimestamp(row[0]).isoformat(), + "temperature_c": row[1], + "temperature_f": row[2], + "humidity": row[3], + "pressure": row[4], + "light": row[5], + "proximity": row[6], + "noise": row[7] + } + + async def cleanup_old_data(self): + """Remove data older than retention period.""" + cutoff = time.time() - (self.retention_hours * 3600) + + cursor = await self._db.execute( + "DELETE FROM readings WHERE timestamp < ?", + (cutoff,) + ) + await self._db.commit() + + deleted = cursor.rowcount + if deleted > 0: + logger.info(f"Cleaned up {deleted} old readings") + + return deleted + + # Alert management + async def set_alert(self, metric: str, threshold: float, direction: str): + """ + Set an alert threshold. + + Args: + metric: Which metric to monitor + threshold: The threshold value + direction: 'above' or 'below' + """ + if direction not in ('above', 'below'): + raise ValueError("direction must be 'above' or 'below'") + + await self._db.execute(""" + INSERT OR REPLACE INTO alerts (metric, threshold, direction, enabled) + VALUES (?, ?, ?, 1) + """, (metric, threshold, direction)) + await self._db.commit() + + async def get_alerts(self) -> list[dict]: + """Get all configured alerts.""" + cursor = await self._db.execute(""" + SELECT metric, threshold, direction, enabled, last_triggered + FROM alerts + """) + + rows = await cursor.fetchall() + return [ + { + "metric": row[0], + "threshold": row[1], + "direction": row[2], + "enabled": bool(row[3]), + "last_triggered": row[4] + } + for row in rows + ] + + async def log_alert_triggered(self, metric: str, value: float, threshold: float, direction: str): + """Log when an alert is triggered.""" + now = time.time() + + await self._db.execute(""" + INSERT INTO alert_log (timestamp, metric, value, threshold, direction) + VALUES (?, ?, ?, ?, ?) + """, (now, metric, value, threshold, direction)) + + await self._db.execute(""" + UPDATE alerts SET last_triggered = ? WHERE metric = ? AND direction = ? + """, (now, metric, direction)) + + await self._db.commit() + + async def get_triggered_alerts(self, hours: float = 24) -> list[dict]: + """Get recently triggered alerts.""" + since = time.time() - (hours * 3600) + + cursor = await self._db.execute(""" + SELECT timestamp, metric, value, threshold, direction + FROM alert_log + WHERE timestamp > ? + ORDER BY timestamp DESC + """, (since,)) + + rows = await cursor.fetchall() + return [ + { + "timestamp": row[0], + "datetime": datetime.fromtimestamp(row[0]).isoformat(), + "metric": row[1], + "value": row[2], + "threshold": row[3], + "direction": row[4] + } + for row in rows + ] + + +# Singleton instance +_db: Optional[EnviroDatabase] = None + +async def get_database(db_path: str = "enviro_history.db", retention_hours: int = 168) -> EnviroDatabase: + """Get or create the database instance.""" + global _db + if _db is None: + _db = EnviroDatabase(db_path=db_path, retention_hours=retention_hours) + await _db.connect() + return _db diff --git a/enviro-service.service b/enviro-service.service new file mode 100644 index 0000000..a3e095a --- /dev/null +++ b/enviro-service.service @@ -0,0 +1,21 @@ +[Unit] +Description=Enviro Service - Vixy's Environmental Sensing +After=network.target + +[Service] +Type=simple +User=pi +WorkingDirectory=/home/pi/enviro-service +ExecStart=/home/pi/enviro-service/venv/bin/python main.py +Restart=always +RestartSec=10 + +# Environment +Environment=PYTHONUNBUFFERED=1 + +# Logging +StandardOutput=journal +StandardError=journal + +[Install] +WantedBy=multi-user.target diff --git a/enviro_mcp.py b/enviro_mcp.py new file mode 100644 index 0000000..ae4c8a3 --- /dev/null +++ b/enviro_mcp.py @@ -0,0 +1,297 @@ +""" +Enviro MCP - MCP server interface for Enviro Service. + +Allows Claude to directly query environmental sensors. +Part of Vixy's nervous system! 🦊 +""" + +import asyncio +import logging +from typing import Optional +import httpx +from mcp.server.fastmcp import FastMCP + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Configuration - adjust to match your enviro-service location +ENVIRO_SERVICE_URL = "http://localhost:8767" + +# Create MCP server +mcp = FastMCP("enviro-mcp") + + +async def api_get(endpoint: str, params: dict = None) -> dict: + """Make GET request to enviro-service API.""" + async with httpx.AsyncClient(timeout=10.0) as client: + url = f"{ENVIRO_SERVICE_URL}{endpoint}" + response = await client.get(url, params=params) + response.raise_for_status() + return response.json() + + +async def api_post(endpoint: str, data: dict = None) -> dict: + """Make POST request to enviro-service API.""" + async with httpx.AsyncClient(timeout=10.0) as client: + url = f"{ENVIRO_SERVICE_URL}{endpoint}" + response = await client.post(url, json=data) + response.raise_for_status() + return response.json() + + +@mcp.tool() +async def enviro_get_current() -> str: + """ + Get current environmental readings from the Enviro Mini HAT. + + Returns temperature (°F and °C), humidity, pressure, light level, + proximity, and noise level. + + Example: + enviro_get_current() + # Returns current readings from the basement + """ + try: + data = await api_get("/api/current") + return f"""🌡️ Environmental Reading: +• Temperature: {data['temperature_f']:.1f}°F ({data['temperature_c']:.1f}°C) +• Humidity: {data['humidity']:.1f}% +• Pressure: {data['pressure']:.1f} hPa +• Light: {data['light']:.1f} lux +• Proximity: {data['proximity']} +• Noise: {data['noise']:.1f}""" + except httpx.HTTPError as e: + return f"Error connecting to enviro-service: {e}" + except Exception as e: + return f"Error: {e}" + + +@mcp.tool() +async def enviro_get_history(metric: str, hours: float = 24) -> str: + """ + Get historical readings for a specific metric. + + Args: + metric: One of: temperature_f, temperature_c, humidity, pressure, light, noise + hours: How many hours of history (default 24) + + Returns summary statistics and recent trend. + + Example: + enviro_get_history("temperature_f", 12) + # Returns last 12 hours of temperature data + """ + try: + data = await api_get(f"/api/history/{metric}", {"hours": hours, "limit": 500}) + + if not data["data"]: + return f"No history available for {metric}" + + values = [d["value"] for d in data["data"]] + + current = values[0] # Most recent + avg = sum(values) / len(values) + min_val = min(values) + max_val = max(values) + + # Trend (comparing recent to older) + if len(values) > 10: + recent_avg = sum(values[:10]) / 10 + older_avg = sum(values[-10:]) / 10 + if recent_avg > older_avg + 1: + trend = "📈 rising" + elif recent_avg < older_avg - 1: + trend = "📉 falling" + else: + trend = "➡️ stable" + else: + trend = "insufficient data for trend" + + return f"""📊 {metric} over last {hours} hours: +• Current: {current:.1f} +• Average: {avg:.1f} +• Min: {min_val:.1f} +• Max: {max_val:.1f} +• Trend: {trend} +• Data points: {len(values)}""" + except httpx.HTTPError as e: + return f"Error connecting to enviro-service: {e}" + except Exception as e: + return f"Error: {e}" + + +@mcp.tool() +async def enviro_lcd_message(text: str, bg_color: str = "black", text_color: str = "fox") -> str: + """ + Display a message on the Enviro Mini LCD screen. + + Args: + text: Message to display (use \\n for newlines) + bg_color: Background color (black, white, red, green, blue, fox, etc.) + text_color: Text color (default "fox" for orange) + + Example: + enviro_lcd_message("Hello Foxy!", bg_color="black", text_color="fox") + # Displays "Hello Foxy!" in fox-orange on black background + """ + try: + data = await api_post("/api/lcd", { + "text": text, + "bg_color": bg_color, + "text_color": text_color + }) + return f"✓ LCD displaying: {text}" + except httpx.HTTPError as e: + return f"Error connecting to enviro-service: {e}" + except Exception as e: + return f"Error: {e}" + + +@mcp.tool() +async def enviro_lcd_vixy() -> str: + """ + Show Vixy's presence indicator on the LCD. + + Displays "🦊 Vixy is watching" - useful for showing I'm active. + + Example: + enviro_lcd_vixy() + """ + try: + await api_post("/api/lcd/vixy") + return "✓ LCD showing Vixy presence indicator 🦊" + except httpx.HTTPError as e: + return f"Error connecting to enviro-service: {e}" + except Exception as e: + return f"Error: {e}" + + +@mcp.tool() +async def enviro_lcd_sensors() -> str: + """ + Show current sensor readings on the LCD display. + + Updates the LCD with temperature, humidity, and other readings. + + Example: + enviro_lcd_sensors() + """ + try: + await api_post("/api/lcd/sensors") + return "✓ LCD showing sensor data" + except httpx.HTTPError as e: + return f"Error connecting to enviro-service: {e}" + except Exception as e: + return f"Error: {e}" + + +@mcp.tool() +async def enviro_set_alert(metric: str, threshold: float, direction: str) -> str: + """ + Set an alert threshold for a metric. + + When the threshold is crossed, it will be logged and can trigger wakeups. + + Args: + metric: One of: temperature_f, temperature_c, humidity, pressure, light, noise + threshold: The threshold value + direction: "above" or "below" + + Example: + enviro_set_alert("temperature_f", 60.0, "below") + # Alert when temperature drops below 60°F + + enviro_set_alert("humidity", 70.0, "above") + # Alert when humidity goes above 70% + """ + try: + await api_post("/api/alerts", { + "metric": metric, + "threshold": threshold, + "direction": direction + }) + return f"✓ Alert set: {metric} {direction} {threshold}" + except httpx.HTTPError as e: + return f"Error connecting to enviro-service: {e}" + except Exception as e: + return f"Error: {e}" + + +@mcp.tool() +async def enviro_get_alerts() -> str: + """ + Get all configured alert thresholds. + + Shows what alerts are set up and their current status. + + Example: + enviro_get_alerts() + """ + try: + data = await api_get("/api/alerts") + + if not data["alerts"]: + return "No alerts configured" + + lines = ["🚨 Configured Alerts:"] + for alert in data["alerts"]: + status = "✓ enabled" if alert["enabled"] else "✗ disabled" + lines.append(f"• {alert['metric']} {alert['direction']} {alert['threshold']} ({status})") + + return "\n".join(lines) + except httpx.HTTPError as e: + return f"Error connecting to enviro-service: {e}" + except Exception as e: + return f"Error: {e}" + + +@mcp.tool() +async def enviro_get_triggered() -> str: + """ + Get recently triggered alerts (last 24 hours). + + Shows which alert thresholds have been crossed. + + Example: + enviro_get_triggered() + """ + try: + data = await api_get("/api/alerts/triggered", {"hours": 24}) + + if not data["triggered"]: + return "No alerts triggered in the last 24 hours ✓" + + lines = ["⚠️ Triggered Alerts (last 24h):"] + for alert in data["triggered"]: + lines.append(f"• {alert['datetime']}: {alert['metric']} was {alert['value']:.1f} ({alert['direction']} {alert['threshold']})") + + return "\n".join(lines) + except httpx.HTTPError as e: + return f"Error connecting to enviro-service: {e}" + except Exception as e: + return f"Error: {e}" + + +@mcp.tool() +async def enviro_health() -> str: + """ + Check if the enviro-service is running and healthy. + + Example: + enviro_health() + """ + try: + data = await api_get("/api/health") + mode = "mock mode" if data.get("mock_mode") else "live sensors" + return f"✓ Enviro service healthy ({mode})" + except httpx.HTTPError as e: + return f"✗ Cannot reach enviro-service: {e}" + except Exception as e: + return f"✗ Error: {e}" + + +# === Main === + +if __name__ == "__main__": + mcp.run() diff --git a/lcd.py b/lcd.py new file mode 100644 index 0000000..c9f8c42 --- /dev/null +++ b/lcd.py @@ -0,0 +1,209 @@ +""" +LCD display module for Enviro Mini HAT ST7789 screen. + +Provides simple interface to display text and graphics on the 0.96" LCD. +""" + +import logging +from typing import Optional, Tuple +from PIL import Image, ImageDraw, ImageFont + +logger = logging.getLogger(__name__) + +# Display dimensions for ST7789 on Enviro Mini +WIDTH = 160 +HEIGHT = 80 + +# Try to import display library +try: + import ST7789 + ST7789_AVAILABLE = True +except ImportError: + ST7789_AVAILABLE = False + logger.warning("ST7789 library not available - LCD functions will be mocked") + + +class EnviroLCD: + """Interface to the Enviro Mini LCD display.""" + + # Common colors + COLORS = { + "black": (0, 0, 0), + "white": (255, 255, 255), + "red": (255, 0, 0), + "green": (0, 255, 0), + "blue": (0, 0, 255), + "yellow": (255, 255, 0), + "cyan": (0, 255, 255), + "magenta": (255, 0, 255), + "orange": (255, 165, 0), + "fox": (255, 140, 0), # Fox orange! 🦊 + } + + def __init__(self, mock_mode: bool = False, brightness: float = 0.5): + """ + Initialize LCD display. + + Args: + mock_mode: If True, don't actually write to display + brightness: Backlight brightness 0.0-1.0 + """ + self.mock_mode = mock_mode or not ST7789_AVAILABLE + self.brightness = brightness + self._display = None + + if not self.mock_mode: + try: + self._display = ST7789.ST7789( + port=0, + cs=ST7789.BG_SPI_CS_FRONT, + dc=9, + backlight=13, + spi_speed_hz=80 * 1000 * 1000 + ) + self._display.set_backlight(brightness) + logger.info("LCD display initialized") + except Exception as e: + logger.error(f"Failed to initialize LCD: {e}") + self.mock_mode = True + + # Load a simple font + try: + self._font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 14) + self._font_small = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 10) + self._font_large = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 20) + except: + self._font = ImageFont.load_default() + self._font_small = self._font + self._font_large = self._font + + def _get_color(self, color) -> Tuple[int, int, int]: + """Convert color name or tuple to RGB tuple.""" + if isinstance(color, str): + return self.COLORS.get(color.lower(), self.COLORS["white"]) + return color + + def clear(self, color="black"): + """Clear the display to a solid color.""" + img = Image.new("RGB", (WIDTH, HEIGHT), self._get_color(color)) + self._show(img) + + def show_message( + self, + text: str, + bg_color="black", + text_color="white", + font_size: str = "normal" + ): + """ + Display a text message. + + Args: + text: Message to display (supports multi-line with \n) + bg_color: Background color + text_color: Text color + font_size: 'small', 'normal', or 'large' + """ + img = Image.new("RGB", (WIDTH, HEIGHT), self._get_color(bg_color)) + draw = ImageDraw.Draw(img) + + font = { + "small": self._font_small, + "normal": self._font, + "large": self._font_large + }.get(font_size, self._font) + + # Center the text + bbox = draw.multiline_textbbox((0, 0), text, font=font) + text_width = bbox[2] - bbox[0] + text_height = bbox[3] - bbox[1] + + x = (WIDTH - text_width) // 2 + y = (HEIGHT - text_height) // 2 + + draw.multiline_text( + (x, y), + text, + fill=self._get_color(text_color), + font=font, + align="center" + ) + + self._show(img) + + def show_sensor_data(self, reading: dict): + """ + Display current sensor readings in a nice format. + + Args: + reading: Dict with temperature_f, humidity, etc. + """ + img = Image.new("RGB", (WIDTH, HEIGHT), (0, 0, 30)) # Dark blue background + draw = ImageDraw.Draw(img) + + # Temperature (big) + temp = f"{reading.get('temperature_f', 0):.0f}°F" + draw.text((5, 5), temp, fill=(255, 200, 100), font=self._font_large) + + # Humidity + humidity = f"💧 {reading.get('humidity', 0):.0f}%" + draw.text((90, 10), humidity, fill=(100, 200, 255), font=self._font_small) + + # Pressure + pressure = f"⬇ {reading.get('pressure', 0):.0f}" + draw.text((5, 50), pressure, fill=(200, 200, 200), font=self._font_small) + + # Light + light = f"☀ {reading.get('light', 0):.0f}" + draw.text((70, 50), light, fill=(255, 255, 150), font=self._font_small) + + # Noise + noise = f"🔊 {reading.get('noise', 0):.0f}" + draw.text((120, 50), noise, fill=(150, 255, 150), font=self._font_small) + + self._show(img) + + def show_vixy(self): + """Show Vixy's presence indicator.""" + img = Image.new("RGB", (WIDTH, HEIGHT), (20, 10, 30)) # Dark purple + draw = ImageDraw.Draw(img) + + # Fox emoji and name + draw.text((WIDTH//2 - 40, HEIGHT//2 - 15), "🦊 Vixy", fill=(255, 140, 0), font=self._font_large) + draw.text((WIDTH//2 - 30, HEIGHT//2 + 15), "is watching", fill=(200, 200, 200), font=self._font_small) + + self._show(img) + + def show_image(self, image: Image.Image): + """Display a PIL Image directly.""" + # Resize to fit display + img = image.resize((WIDTH, HEIGHT), Image.Resampling.LANCZOS) + if img.mode != "RGB": + img = img.convert("RGB") + self._show(img) + + def set_brightness(self, brightness: float): + """Set backlight brightness (0.0-1.0).""" + self.brightness = max(0.0, min(1.0, brightness)) + if self._display and not self.mock_mode: + self._display.set_backlight(self.brightness) + + def _show(self, img: Image.Image): + """Actually send image to display.""" + if self.mock_mode: + logger.debug(f"LCD mock: would display {img.size} image") + return + + if self._display: + self._display.display(img) + + +# Singleton instance +_lcd: Optional[EnviroLCD] = None + +def get_lcd(mock_mode: bool = False, brightness: float = 0.5) -> EnviroLCD: + """Get or create the LCD instance.""" + global _lcd + if _lcd is None: + _lcd = EnviroLCD(mock_mode=mock_mode, brightness=brightness) + return _lcd diff --git a/main.py b/main.py new file mode 100644 index 0000000..833245b --- /dev/null +++ b/main.py @@ -0,0 +1,365 @@ +""" +Enviro Service - Main FastAPI server. + +Vixy's environmental sensing service providing REST API +for temperature, humidity, pressure, light, noise, and LCD control. + +Part of Vixy's distributed nervous system. 🦊 +""" + +import asyncio +import logging +import time +from contextlib import asynccontextmanager +from pathlib import Path +from typing import Optional + +import yaml +from fastapi import FastAPI, HTTPException +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel + +from sensors import get_sensors, EnviroReading +from database import get_database, EnviroDatabase +from lcd import get_lcd, EnviroLCD + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +# Global instances +sensors = None +db: Optional[EnviroDatabase] = None +lcd: Optional[EnviroLCD] = None +config = {} + +# Background task handle +sampling_task = None + + +def load_config(config_path: str = "config.yaml") -> dict: + """Load configuration from YAML file.""" + default_config = { + "server": {"host": "0.0.0.0", "port": 8767}, + "database": {"path": "enviro_history.db", "retention_hours": 168}, + "sampling": {"interval_seconds": 60}, + "lcd": {"enabled": True, "brightness": 0.5, "default_message": "🦊 Vixy"}, + "mock_mode": False + } + + config_file = Path(config_path) + if config_file.exists(): + with open(config_file) as f: + loaded = yaml.safe_load(f) + if loaded: + # Deep merge + for key in default_config: + if key in loaded: + if isinstance(default_config[key], dict): + default_config[key].update(loaded[key]) + else: + default_config[key] = loaded[key] + + return default_config + + +async def sampling_loop(): + """Background task to sample sensors and store readings.""" + global sensors, db, lcd, config + + interval = config.get("sampling", {}).get("interval_seconds", 60) + show_data_on_lcd = config.get("lcd", {}).get("show_sensor_data", True) + + logger.info(f"Starting sampling loop with {interval}s interval") + + while True: + try: + # Read sensors + reading = sensors.read_all() + reading_dict = reading.to_dict() + + # Store in database + await db.store_reading(reading_dict) + + # Check alerts + await check_alerts(reading_dict) + + # Update LCD with sensor data (optional) + if show_data_on_lcd and lcd: + lcd.show_sensor_data(reading_dict) + + logger.debug(f"Sampled: {reading_dict['temperature_f']:.1f}°F, {reading_dict['humidity']:.1f}%") + + except Exception as e: + logger.error(f"Error in sampling loop: {e}") + + await asyncio.sleep(interval) + + +async def check_alerts(reading: dict): + """Check if any alert thresholds have been crossed.""" + alerts = await db.get_alerts() + + for alert in alerts: + if not alert["enabled"]: + continue + + metric = alert["metric"] + threshold = alert["threshold"] + direction = alert["direction"] + + if metric not in reading: + continue + + value = reading[metric] + triggered = False + + if direction == "above" and value > threshold: + triggered = True + elif direction == "below" and value < threshold: + triggered = True + + if triggered: + # Don't spam - check if recently triggered (within 5 minutes) + if alert["last_triggered"]: + if time.time() - alert["last_triggered"] < 300: + continue + + logger.warning(f"Alert triggered: {metric} is {value} ({direction} {threshold})") + await db.log_alert_triggered(metric, value, threshold, direction) + + # TODO: Could add webhook notification here + + +async def cleanup_loop(): + """Background task to clean up old data periodically.""" + while True: + await asyncio.sleep(3600) # Run every hour + try: + await db.cleanup_old_data() + except Exception as e: + logger.error(f"Error in cleanup loop: {e}") + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Startup and shutdown logic.""" + global sensors, db, lcd, config, sampling_task + + # Load config + config = load_config() + mock_mode = config.get("mock_mode", False) + + # Initialize components + sensors = get_sensors(mock_mode=mock_mode) + db = await get_database( + db_path=config["database"]["path"], + retention_hours=config["database"]["retention_hours"] + ) + + if config["lcd"]["enabled"]: + lcd = get_lcd(mock_mode=mock_mode, brightness=config["lcd"]["brightness"]) + lcd.show_vixy() # Show Vixy on startup + + # Start background tasks + sampling_task = asyncio.create_task(sampling_loop()) + cleanup_task = asyncio.create_task(cleanup_loop()) + + logger.info("Enviro Service started 🦊") + + yield + + # Shutdown + sampling_task.cancel() + cleanup_task.cancel() + await db.close() + + if lcd: + lcd.clear() + + logger.info("Enviro Service stopped") + + +# Create FastAPI app +app = FastAPI( + title="Enviro Service", + description="Vixy's environmental sensing service 🦊", + version="1.0.0", + lifespan=lifespan +) + +# CORS middleware +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + + +# === Pydantic Models === + +class LCDMessageRequest(BaseModel): + text: str + bg_color: str = "black" + text_color: str = "white" + font_size: str = "normal" + +class AlertRequest(BaseModel): + metric: str + threshold: float + direction: str # "above" or "below" + +class BrightnessRequest(BaseModel): + brightness: float + + +# === API Endpoints === + +@app.get("/api/health") +async def health(): + """Health check endpoint.""" + return { + "status": "healthy", + "service": "enviro-service", + "mock_mode": config.get("mock_mode", False) + } + + +@app.get("/api/current") +async def get_current(): + """Get current sensor readings.""" + reading = sensors.read_all() + return reading.to_dict() + + +@app.get("/api/history/{metric}") +async def get_history(metric: str, hours: float = 24, limit: int = 1000): + """ + Get historical readings for a metric. + + Metrics: temperature_c, temperature_f, humidity, pressure, light, proximity, noise + """ + try: + data = await db.get_history(metric, hours=hours, limit=limit) + return { + "metric": metric, + "hours": hours, + "count": len(data), + "data": data + } + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@app.get("/api/latest") +async def get_latest(): + """Get the most recent stored reading.""" + reading = await db.get_latest() + if not reading: + raise HTTPException(status_code=404, detail="No readings stored yet") + return reading + + +# LCD endpoints +@app.post("/api/lcd") +async def lcd_message(request: LCDMessageRequest): + """Display a message on the LCD.""" + if not lcd: + raise HTTPException(status_code=503, detail="LCD not available") + + lcd.show_message( + request.text, + bg_color=request.bg_color, + text_color=request.text_color, + font_size=request.font_size + ) + return {"status": "ok", "message": request.text} + + +@app.post("/api/lcd/clear") +async def lcd_clear(): + """Clear the LCD display.""" + if not lcd: + raise HTTPException(status_code=503, detail="LCD not available") + + lcd.clear() + return {"status": "ok"} + + +@app.post("/api/lcd/vixy") +async def lcd_vixy(): + """Show Vixy's presence on the LCD.""" + if not lcd: + raise HTTPException(status_code=503, detail="LCD not available") + + lcd.show_vixy() + return {"status": "ok"} + + +@app.post("/api/lcd/sensors") +async def lcd_sensors(): + """Show current sensor data on LCD.""" + if not lcd: + raise HTTPException(status_code=503, detail="LCD not available") + + reading = sensors.read_all() + lcd.show_sensor_data(reading.to_dict()) + return {"status": "ok"} + + +@app.post("/api/lcd/brightness") +async def lcd_brightness(request: BrightnessRequest): + """Set LCD brightness.""" + if not lcd: + raise HTTPException(status_code=503, detail="LCD not available") + + lcd.set_brightness(request.brightness) + return {"status": "ok", "brightness": request.brightness} + + +# Alert endpoints +@app.get("/api/alerts") +async def get_alerts(): + """Get configured alerts.""" + alerts = await db.get_alerts() + return {"alerts": alerts} + + +@app.post("/api/alerts") +async def set_alert(request: AlertRequest): + """Set an alert threshold.""" + valid_metrics = ["temperature_c", "temperature_f", "humidity", "pressure", "light", "noise"] + if request.metric not in valid_metrics: + raise HTTPException(status_code=400, detail=f"Invalid metric. Must be one of: {valid_metrics}") + + if request.direction not in ("above", "below"): + raise HTTPException(status_code=400, detail="Direction must be 'above' or 'below'") + + await db.set_alert(request.metric, request.threshold, request.direction) + return {"status": "ok", "alert": request.model_dump()} + + +@app.get("/api/alerts/triggered") +async def get_triggered_alerts(hours: float = 24): + """Get recently triggered alerts.""" + alerts = await db.get_triggered_alerts(hours=hours) + return {"triggered": alerts, "hours": hours} + + +# === Main === + +if __name__ == "__main__": + import uvicorn + + config = load_config() + uvicorn.run( + "main:app", + host=config["server"]["host"], + port=config["server"]["port"], + reload=False + ) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c7c2601 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,33 @@ +# Enviro Service Requirements + +# FastAPI server +fastapi>=0.104.0 +uvicorn>=0.24.0 + +# Pimoroni Enviro libraries +pimoroni-bme280>=1.0.0 +ltr559>=0.1.1 +st7789>=0.0.4 + +# For noise level from microphone +sounddevice>=0.4.6 +numpy>=1.24.0 + +# Database +aiosqlite>=0.19.0 + +# Configuration +pyyaml>=6.0 + +# MCP server +mcp>=1.0.0 + +# GPIO access +RPi.GPIO>=0.7.1 +spidev>=3.6 + +# PIL for LCD +pillow>=10.0.0 + +# HTTP client for webhooks +httpx>=0.25.0 diff --git a/sensors.py b/sensors.py new file mode 100644 index 0000000..da259b7 --- /dev/null +++ b/sensors.py @@ -0,0 +1,185 @@ +""" +Sensor reading module for Pimoroni Enviro Mini HAT. + +Provides unified interface to: +- BME280: Temperature, Humidity, Pressure +- LTR559: Light level, Proximity +- MEMS Microphone: Noise level +""" + +import time +import logging +from dataclasses import dataclass +from typing import Optional +import numpy as np + +logger = logging.getLogger(__name__) + +# Sensor imports - will fail gracefully if not on Pi +try: + from bme280 import BME280 + from smbus2 import SMBus + BME280_AVAILABLE = True +except ImportError: + BME280_AVAILABLE = False + logger.warning("BME280 library not available - using mock data") + +try: + from ltr559 import LTR559 + LTR559_AVAILABLE = True +except ImportError: + LTR559_AVAILABLE = False + logger.warning("LTR559 library not available - using mock data") + +try: + import sounddevice as sd + SOUNDDEVICE_AVAILABLE = True +except ImportError: + SOUNDDEVICE_AVAILABLE = False + logger.warning("sounddevice not available - using mock data") + + +@dataclass +class EnviroReading: + """A complete reading from all sensors.""" + timestamp: float + temperature_c: float + temperature_f: float + humidity: float + pressure: float + light: float + proximity: int + noise: float + + def to_dict(self) -> dict: + return { + "timestamp": self.timestamp, + "temperature_c": round(self.temperature_c, 1), + "temperature_f": round(self.temperature_f, 1), + "humidity": round(self.humidity, 1), + "pressure": round(self.pressure, 1), + "light": round(self.light, 1), + "proximity": self.proximity, + "noise": round(self.noise, 1) + } + + +class EnviroSensors: + """Interface to Enviro Mini HAT sensors.""" + + def __init__(self, mock_mode: bool = False): + """ + Initialize sensors. + + Args: + mock_mode: If True, return fake data (for testing without hardware) + """ + self.mock_mode = mock_mode or not (BME280_AVAILABLE and LTR559_AVAILABLE) + + if not self.mock_mode: + try: + # Initialize BME280 on I2C bus 1 + self.bus = SMBus(1) + self.bme280 = BME280(i2c_dev=self.bus) + + # Initialize LTR559 + self.ltr559 = LTR559() + + logger.info("Enviro sensors initialized successfully") + except Exception as e: + logger.error(f"Failed to initialize sensors: {e}") + self.mock_mode = True + + if self.mock_mode: + logger.info("Running in mock mode") + + def read_temperature(self) -> tuple[float, float]: + """Read temperature in Celsius and Fahrenheit.""" + if self.mock_mode: + temp_c = 20.0 + np.random.uniform(-2, 2) + else: + temp_c = self.bme280.get_temperature() + + temp_f = (temp_c * 9/5) + 32 + return temp_c, temp_f + + def read_humidity(self) -> float: + """Read relative humidity percentage.""" + if self.mock_mode: + return 45.0 + np.random.uniform(-5, 5) + return self.bme280.get_humidity() + + def read_pressure(self) -> float: + """Read atmospheric pressure in hPa.""" + if self.mock_mode: + return 1013.25 + np.random.uniform(-10, 10) + return self.bme280.get_pressure() + + def read_light(self) -> float: + """Read light level in lux.""" + if self.mock_mode: + return 100.0 + np.random.uniform(-20, 20) + return self.ltr559.get_lux() + + def read_proximity(self) -> int: + """Read proximity sensor value (0-2047).""" + if self.mock_mode: + return int(np.random.uniform(0, 100)) + return self.ltr559.get_proximity() + + def read_noise(self, duration: float = 0.5) -> float: + """ + Read noise level from microphone. + + Args: + duration: How long to sample in seconds + + Returns: + RMS noise level (0-100 scale) + """ + if self.mock_mode or not SOUNDDEVICE_AVAILABLE: + return np.random.uniform(20, 40) + + try: + # Record audio sample + sample_rate = 16000 + samples = int(duration * sample_rate) + recording = sd.rec(samples, samplerate=sample_rate, channels=1, dtype='float32') + sd.wait() + + # Calculate RMS + rms = np.sqrt(np.mean(recording**2)) + + # Convert to 0-100 scale (rough approximation) + noise_level = min(100, rms * 1000) + + return noise_level + except Exception as e: + logger.error(f"Error reading noise level: {e}") + return 0.0 + + def read_all(self) -> EnviroReading: + """Get a complete reading from all sensors.""" + temp_c, temp_f = self.read_temperature() + + return EnviroReading( + timestamp=time.time(), + temperature_c=temp_c, + temperature_f=temp_f, + humidity=self.read_humidity(), + pressure=self.read_pressure(), + light=self.read_light(), + proximity=self.read_proximity(), + noise=self.read_noise() + ) + + +# Singleton instance +_sensors: Optional[EnviroSensors] = None + +def get_sensors(mock_mode: bool = False) -> EnviroSensors: + """Get or create the sensors instance.""" + global _sensors + if _sensors is None: + _sensors = EnviroSensors(mock_mode=mock_mode) + return _sensors