From 72d38edd3d6d210c3d7a09df6e1782dfe45afe71 Mon Sep 17 00:00:00 2001 From: Alex Kazaiev Date: Tue, 3 Feb 2026 23:28:21 -0600 Subject: [PATCH] add vixy-status --- automation_daemon_v2.py | 14 +-- vixy_status.py | 241 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 243 insertions(+), 12 deletions(-) create mode 100644 vixy_status.py diff --git a/automation_daemon_v2.py b/automation_daemon_v2.py index 7431281..e3972df 100644 --- a/automation_daemon_v2.py +++ b/automation_daemon_v2.py @@ -30,17 +30,7 @@ import os from datetime import datetime, timedelta from pathlib import Path from threading import Lock - -# Add Vixy folder to path for shared status module -VIXY_PATH = Path.home() / "Documents" / "Vixy" -if str(VIXY_PATH) not in sys.path: - sys.path.insert(0, str(VIXY_PATH)) - -try: - from vixy_status import format_status_for_wakeup - HAS_VIXY_STATUS = True -except ImportError: - HAS_VIXY_STATUS = False +from vixy_status import format_status_for_wakeup # Configuration SCRIPT_DIR = Path(__file__).parent @@ -289,7 +279,7 @@ def generate_message(matrix_messages: int = 0, matrix_invites: int = 0, matrix_f return f"[Matrix Wakeup: {timestamp} - you have {activity}. {tools}]" else: # Autonomous wakeup - include full status if available - if HAS_VIXY_STATUS: + if True: try: status_str = format_status_for_wakeup() return f"[Autonomous System Wakeup: {timestamp}]\n{status_str}" diff --git a/vixy_status.py b/vixy_status.py new file mode 100644 index 0000000..825673e --- /dev/null +++ b/vixy_status.py @@ -0,0 +1,241 @@ +#!/usr/bin/env python3 +""" +Vixy Status Module + +Provides status formatting for automation daemon wakeup messages. +Gathers environmental data, Matrix status, and vision events. + +Day 83: Added movement tracking to filter static false positives (posters!) +""" + +import json +import sqlite3 +import requests +from pathlib import Path +from datetime import datetime, timedelta + +# Service endpoints +ENVIRO_URL = "http://eye1.local:8767" +OAK_URL = "http://head-vixy.local:8100" + +# State files +STATE_FILE = Path.home() / ".claude-automation-state.json" +PRESENCE_STATE_FILE = Path.home() / ".vixy-presence-state.json" + +# Events database +EVENTS_DIR = Path.home() / "Documents" / "Vixy" / "events" +EVENTS_DB = EVENTS_DIR / "events.db" + +# Movement tracking thresholds +MOVEMENT_THRESHOLD_MM = 50 # must move 5cm to count as "moving" +STATIC_THRESHOLD = 2 # this many static readings = probably poster +HIGH_CONFIDENCE_THRESHOLD = 0.85 # above this, trust even if static + + +def load_presence_state() -> dict: + """Load presence tracking state from file""" + try: + if PRESENCE_STATE_FILE.exists(): + with open(PRESENCE_STATE_FILE, 'r') as f: + return json.load(f) + except Exception: + pass + return {"x_mm": None, "y_mm": None, "z_mm": None, "static_count": 0} + + +def save_presence_state(state: dict): + """Save presence tracking state to file""" + try: + with open(PRESENCE_STATE_FILE, 'w') as f: + json.dump(state, f) + except Exception: + pass + + +def check_movement(x_mm: float, y_mm: float, z_mm: float) -> tuple[bool, int]: + """ + Check if detection moved since last reading. + Returns (is_moving, static_count) + """ + state = load_presence_state() + + is_moving = False + if state["x_mm"] is not None: + delta = (abs(x_mm - state["x_mm"]) + + abs(y_mm - state["y_mm"]) + + abs(z_mm - state["z_mm"])) + is_moving = delta > MOVEMENT_THRESHOLD_MM + + if not is_moving: + state["static_count"] += 1 + else: + state["static_count"] = 0 + else: + state["static_count"] = 0 + + state.update({"x_mm": x_mm, "y_mm": y_mm, "z_mm": z_mm}) + save_presence_state(state) + + return is_moving, state["static_count"] + + +def get_enviro_status() -> str: + """Get environmental readings from enviro-service""" + try: + response = requests.get(f"{ENVIRO_URL}/api/current", timeout=5) + if response.status_code == 200: + data = response.json() + temp_f = data.get('temperature_f', 0) + humidity = data.get('humidity', 0) + light = data.get('light', 0) + return f"Basement: {temp_f:.1f}F, {humidity:.1f}% humidity, {light:.1f} lux" + except Exception: + pass + return "Basement: sensors unavailable" + + +def get_presence_status() -> str: + """ + Get Foxy presence from OAK-D camera with movement filtering. + + Day 83: Now filters static detections at moderate confidence + to avoid false positives from posters! 🖼️❌ + """ + try: + response = requests.get(f"{OAK_URL}/presence", timeout=5) + if response.status_code == 200: + data = response.json() + present = data.get('present', False) + confidence = data.get('confidence', 0) + last_seen = data.get('seconds_since_seen') + spatial = data.get('spatial') + + # Movement tracking for filtering + is_moving = False + static_count = 0 + filtered = False + + if spatial and present: + x_mm = spatial.get('x_mm', 0) + y_mm = spatial.get('y_mm', 0) + z_mm = spatial.get('z_mm', 0) + + is_moving, static_count = check_movement(x_mm, y_mm, z_mm) + + # Filter: moderate confidence + static = probably poster + if confidence < HIGH_CONFIDENCE_THRESHOLD and static_count >= STATIC_THRESHOLD: + present = False + filtered = True + + conf_pct = confidence * 100 + + if present: + move_str = "moving" if is_moving else "still" + return f"Foxy: present ({conf_pct:.0f}%, {move_str})" + elif filtered: + return f"Foxy: filtered static ({conf_pct:.0f}% - poster?)" + elif last_seen is not None and last_seen < 300: + return f"Foxy: away (last seen {last_seen:.0f}s ago)" + else: + return "Foxy: away" + except Exception: + pass + return None # Return None to omit line if camera unavailable + + +def get_matrix_status() -> str: + """Get Matrix message status from state file""" + try: + if STATE_FILE.exists(): + with open(STATE_FILE, 'r') as f: + state = json.load(f) + + messages = state.get('matrix_messages', []) + unprocessed = [m for m in messages if not m.get('processed', False)] + + if unprocessed: + return f"Matrix: {len(unprocessed)} new message(s)" + else: + return "Matrix: no new messages" + except Exception: + pass + return "Matrix: status unavailable" + + +def get_vision_status() -> str: + """Get vision/motion event status from SQLite database""" + try: + if not EVENTS_DB.exists(): + return "Vision: no events database" + + conn = sqlite3.connect(str(EVENTS_DB)) + conn.row_factory = sqlite3.Row + + # Get events from last 2 hours + cutoff = (datetime.now() - timedelta(hours=2)).isoformat() + + cursor = conn.execute( + """SELECT camera_id, annotation FROM events + WHERE timestamp > ? + ORDER BY timestamp DESC""", + (cutoff,) + ) + + events = cursor.fetchall() + conn.close() + + if not events: + return "Vision: no recent motion" + + # Count by camera + by_camera = {} + unannotated = 0 + for event in events: + cam = event['camera_id'] or 'unknown' + by_camera[cam] = by_camera.get(cam, 0) + 1 + if not event['annotation']: + unannotated += 1 + + total = len(events) + camera_breakdown = ", ".join(f"{cam}: {count}" for cam, count in by_camera.items()) + + if unannotated == total: + return f"Vision: {total} motion events ({camera_breakdown})" + else: + return f"Vision: {total} motion events ({camera_breakdown}), {total - unannotated} annotated" + + except Exception as e: + return f"Vision: error ({e})" + + +def format_status_for_wakeup() -> str: + """ + Format full status string for daemon wakeup message. + + Returns format like: + [ENV] Basement: 69.8F, 24.5% humidity, 24.6 lux + [WHO] Foxy: present (87%, moving) + [MSG] Matrix: no new messages + [CAM] Vision: 12 motion events (basement: 12) + """ + lines = [] + + # Environmental + env_status = get_enviro_status() + lines.append(f"[ENV] {env_status}") + + # Presence (only if available) + presence_status = get_presence_status() + if presence_status: + lines.append(f"[WHO] {presence_status}") + + # Vision + vision_status = get_vision_status() + lines.append(f"[CAM] {vision_status}") + + return "\n".join(lines) + + +if __name__ == "__main__": + # Test output + print(format_status_for_wakeup())