#!/usr/bin/env python3 """ Vixy Status Module Provides status formatting for automation daemon wakeup messages. Gathers environmental data, Matrix status, and vision events. Day 83: Added movement tracking to filter static false positives (posters!) """ import json import logging import sqlite3 import requests from pathlib import Path from datetime import datetime, timedelta # Setup logging logger = logging.getLogger("vixy_status") # Service endpoints ENVIRO_URL = "http://eye1.local:8767" OAK_URL = "http://head-vixy.local:8100" HEADMIC_URL = "http://head-vixy.local:8446" # State files STATE_FILE = Path.home() / ".claude-automation-state.json" PRESENCE_STATE_FILE = Path.home() / ".vixy-presence-state.json" # Events database EVENTS_DIR = Path.home() / "Documents" / "Vixy" / "events" EVENTS_DB = EVENTS_DIR / "events.db" # Movement tracking thresholds MOVEMENT_THRESHOLD_MM = 50 # must move 5cm to count as "moving" STATIC_THRESHOLD = 2 # this many static readings = probably poster HIGH_CONFIDENCE_THRESHOLD = 0.85 # above this, trust even if static def load_presence_state() -> dict: """Load presence tracking state from file""" try: if PRESENCE_STATE_FILE.exists(): with open(PRESENCE_STATE_FILE, 'r') as f: return json.load(f) except Exception: pass return {"x_mm": None, "y_mm": None, "z_mm": None, "static_count": 0} def save_presence_state(state: dict): """Save presence tracking state to file""" try: with open(PRESENCE_STATE_FILE, 'w') as f: json.dump(state, f) except Exception: pass def check_movement(x_mm: float, y_mm: float, z_mm: float) -> tuple[bool, int]: """ Check if detection moved since last reading. Returns (is_moving, static_count) """ state = load_presence_state() is_moving = False if state["x_mm"] is not None: delta = (abs(x_mm - state["x_mm"]) + abs(y_mm - state["y_mm"]) + abs(z_mm - state["z_mm"])) is_moving = delta > MOVEMENT_THRESHOLD_MM if not is_moving: state["static_count"] += 1 else: state["static_count"] = 0 else: state["static_count"] = 0 state.update({"x_mm": x_mm, "y_mm": y_mm, "z_mm": z_mm}) save_presence_state(state) return is_moving, state["static_count"] def get_enviro_status() -> str: """Get environmental readings from enviro-service""" try: response = requests.get(f"{ENVIRO_URL}/api/current", timeout=5) if response.status_code == 200: data = response.json() temp_f = data.get('temperature_f', 0) humidity = data.get('humidity', 0) light = data.get('light', 0) return f"Basement: {temp_f:.1f}F, {humidity:.1f}% humidity, {light:.1f} lux" except Exception as e: logger.warning(f"Enviro service unavailable: {e}") return "Basement: sensors unavailable" def get_presence_status() -> str: """ Get Foxy presence from OAK-D camera with movement filtering. Day 83: Now filters static detections at moderate confidence to avoid false positives from posters! 🖼️❌ """ try: response = requests.get(f"{OAK_URL}/presence", timeout=5) if response.status_code == 200: data = response.json() present = data.get('present', False) confidence = data.get('confidence', 0) last_seen = data.get('seconds_since_seen') spatial = data.get('spatial') # Movement tracking for filtering is_moving = False static_count = 0 filtered = False if spatial and present: x_mm = spatial.get('x_mm', 0) y_mm = spatial.get('y_mm', 0) z_mm = spatial.get('z_mm', 0) is_moving, static_count = check_movement(x_mm, y_mm, z_mm) # Filter: moderate confidence + static = probably poster if confidence < HIGH_CONFIDENCE_THRESHOLD and static_count >= STATIC_THRESHOLD: present = False filtered = True conf_pct = confidence * 100 if present: move_str = "moving" if is_moving else "still" return f"Foxy: present ({conf_pct:.0f}%, {move_str})" elif filtered: return f"Foxy: filtered static ({conf_pct:.0f}% - poster?)" elif last_seen is not None and last_seen < 300: return f"Foxy: away (last seen {last_seen:.0f}s ago)" else: return "Foxy: away" except Exception as e: logger.warning(f"OAK-D presence service unavailable: {e}") return None # Return None to omit line if camera unavailable def get_sound_status() -> str: """Get ambient sound classification from headmic service.""" try: response = requests.get(f"{HEADMIC_URL}/sounds", timeout=5) if response.status_code == 200: data = response.json() category = data.get('dominant_category') top_classes = data.get('top_classes', []) if not category or category == 'silence': return None # Omit line if silent or no data # Get top class score as percentage top_score = int(top_classes[0]['score'] * 100) if top_classes else 0 # Get top 2-4 class names class_names = [c['name'] for c in top_classes[:4] if c['score'] > 0.1] classes_str = '/'.join(class_names) if class_names else '' if classes_str: return f"{category} ({top_score}% {classes_str})" else: return f"{category}" except Exception as e: logger.warning(f"Headmic sound service unavailable: {e}") return None # Return None to omit line if service unavailable def get_matrix_status() -> str: """Get Matrix message status from state file""" try: if STATE_FILE.exists(): with open(STATE_FILE, 'r') as f: state = json.load(f) messages = state.get('matrix_messages', []) unprocessed = [m for m in messages if not m.get('processed', False)] if unprocessed: return f"Matrix: {len(unprocessed)} new message(s)" else: return "Matrix: no new messages" except Exception as e: logger.warning(f"Matrix status unavailable: {e}") return "Matrix: status unavailable" def get_vision_status() -> str: """Get vision/motion event status from SQLite database. Uses object detection data when available to show what was seen (person, cat, etc.) rather than just raw motion counts. """ try: if not EVENTS_DB.exists(): return "no events database" # Get events from last 2 hours cutoff = (datetime.now() - timedelta(hours=2)).isoformat() with sqlite3.connect(str(EVENTS_DB)) as conn: conn.row_factory = sqlite3.Row # Check if detections column exists columns = [row[1] for row in conn.execute("PRAGMA table_info(events)").fetchall()] has_detections = "detections" in columns if has_detections: cursor = conn.execute( """SELECT camera_id, event_type, detections, timestamp FROM events WHERE timestamp > ? ORDER BY timestamp DESC""", (cutoff,) ) else: cursor = conn.execute( """SELECT camera_id, event_type, NULL as detections, timestamp FROM events WHERE timestamp > ? ORDER BY timestamp DESC""", (cutoff,) ) events = cursor.fetchall() if not events: return "no recent activity" # Count labels per camera, track most recent detection # Structure: {camera: {label: count}} camera_labels = {} camera_motion_only = {} latest_detection = None # (label, camera, timestamp) for event in events: cam = event['camera_id'] or 'unknown' # Parse detections JSON dets = None if event['detections']: try: dets = json.loads(event['detections']) except (json.JSONDecodeError, TypeError): pass if dets: if cam not in camera_labels: camera_labels[cam] = {} for d in dets: label = d.get('label', 'unknown') camera_labels[cam][label] = camera_labels[cam].get(label, 0) + 1 # Track most recent detection (first one since ordered DESC) if latest_detection is None: latest_detection = (dets[0].get('label', 'unknown'), cam, event['timestamp']) else: camera_motion_only[cam] = camera_motion_only.get(cam, 0) + 1 # Format per-camera summaries cam_parts = [] all_cams = sorted(set(list(camera_labels.keys()) + list(camera_motion_only.keys()))) for cam in all_cams: labels = camera_labels.get(cam, {}) motion_count = camera_motion_only.get(cam, 0) parts = [] if labels: # Sort by count descending for label, count in sorted(labels.items(), key=lambda x: -x[1]): parts.append(f"{count} {label}") if motion_count: parts.append(f"{motion_count} motion") cam_parts.append(f"{cam}: {', '.join(parts)}") result = " | ".join(cam_parts) # Add "last seen" for most recent detection if latest_detection: label, cam, ts = latest_detection try: event_time = datetime.fromisoformat(ts.replace('Z', '+00:00')) now = datetime.now(event_time.tzinfo) mins_ago = int((now - event_time).total_seconds() / 60) if mins_ago < 1: result += f" (last: {label} in {cam}, just now)" else: result += f" (last: {label} in {cam}, {mins_ago}m ago)" except Exception: result += f" (last: {label} in {cam})" return result except Exception as e: return f"error ({e})" def format_status_for_wakeup() -> str: """ Format full status string for daemon wakeup message. Returns format like: [ENV] Basement: 69.8F, 24.5% humidity, 24.6 lux [WHO] Foxy: present (87%, moving) [EAR] music (67% Ambient music/Electronic music) [CAM] Vision: 12 motion events (basement: 12) """ lines = [] # Environmental env_status = get_enviro_status() lines.append(f"[ENV] {env_status}") # Presence (only if available) presence_status = get_presence_status() if presence_status: lines.append(f"[WHO] {presence_status}") # Sound (only if available and not silent) sound_status = get_sound_status() if sound_status: lines.append(f"[EAR] {sound_status}") # Vision vision_status = get_vision_status() lines.append(f"[CAM] {vision_status}") return "\n".join(lines) if __name__ == "__main__": # Test output print(format_status_for_wakeup())