add vixy-status
This commit is contained in:
241
vixy_status.py
Normal file
241
vixy_status.py
Normal file
@@ -0,0 +1,241 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Vixy Status Module
|
||||
|
||||
Provides status formatting for automation daemon wakeup messages.
|
||||
Gathers environmental data, Matrix status, and vision events.
|
||||
|
||||
Day 83: Added movement tracking to filter static false positives (posters!)
|
||||
"""
|
||||
|
||||
import json
|
||||
import sqlite3
|
||||
import requests
|
||||
from pathlib import Path
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
# Service endpoints
|
||||
ENVIRO_URL = "http://eye1.local:8767"
|
||||
OAK_URL = "http://head-vixy.local:8100"
|
||||
|
||||
# State files
|
||||
STATE_FILE = Path.home() / ".claude-automation-state.json"
|
||||
PRESENCE_STATE_FILE = Path.home() / ".vixy-presence-state.json"
|
||||
|
||||
# Events database
|
||||
EVENTS_DIR = Path.home() / "Documents" / "Vixy" / "events"
|
||||
EVENTS_DB = EVENTS_DIR / "events.db"
|
||||
|
||||
# Movement tracking thresholds
|
||||
MOVEMENT_THRESHOLD_MM = 50 # must move 5cm to count as "moving"
|
||||
STATIC_THRESHOLD = 2 # this many static readings = probably poster
|
||||
HIGH_CONFIDENCE_THRESHOLD = 0.85 # above this, trust even if static
|
||||
|
||||
|
||||
def load_presence_state() -> dict:
|
||||
"""Load presence tracking state from file"""
|
||||
try:
|
||||
if PRESENCE_STATE_FILE.exists():
|
||||
with open(PRESENCE_STATE_FILE, 'r') as f:
|
||||
return json.load(f)
|
||||
except Exception:
|
||||
pass
|
||||
return {"x_mm": None, "y_mm": None, "z_mm": None, "static_count": 0}
|
||||
|
||||
|
||||
def save_presence_state(state: dict):
|
||||
"""Save presence tracking state to file"""
|
||||
try:
|
||||
with open(PRESENCE_STATE_FILE, 'w') as f:
|
||||
json.dump(state, f)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def check_movement(x_mm: float, y_mm: float, z_mm: float) -> tuple[bool, int]:
|
||||
"""
|
||||
Check if detection moved since last reading.
|
||||
Returns (is_moving, static_count)
|
||||
"""
|
||||
state = load_presence_state()
|
||||
|
||||
is_moving = False
|
||||
if state["x_mm"] is not None:
|
||||
delta = (abs(x_mm - state["x_mm"]) +
|
||||
abs(y_mm - state["y_mm"]) +
|
||||
abs(z_mm - state["z_mm"]))
|
||||
is_moving = delta > MOVEMENT_THRESHOLD_MM
|
||||
|
||||
if not is_moving:
|
||||
state["static_count"] += 1
|
||||
else:
|
||||
state["static_count"] = 0
|
||||
else:
|
||||
state["static_count"] = 0
|
||||
|
||||
state.update({"x_mm": x_mm, "y_mm": y_mm, "z_mm": z_mm})
|
||||
save_presence_state(state)
|
||||
|
||||
return is_moving, state["static_count"]
|
||||
|
||||
|
||||
def get_enviro_status() -> str:
|
||||
"""Get environmental readings from enviro-service"""
|
||||
try:
|
||||
response = requests.get(f"{ENVIRO_URL}/api/current", timeout=5)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
temp_f = data.get('temperature_f', 0)
|
||||
humidity = data.get('humidity', 0)
|
||||
light = data.get('light', 0)
|
||||
return f"Basement: {temp_f:.1f}F, {humidity:.1f}% humidity, {light:.1f} lux"
|
||||
except Exception:
|
||||
pass
|
||||
return "Basement: sensors unavailable"
|
||||
|
||||
|
||||
def get_presence_status() -> str:
|
||||
"""
|
||||
Get Foxy presence from OAK-D camera with movement filtering.
|
||||
|
||||
Day 83: Now filters static detections at moderate confidence
|
||||
to avoid false positives from posters! 🖼️❌
|
||||
"""
|
||||
try:
|
||||
response = requests.get(f"{OAK_URL}/presence", timeout=5)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
present = data.get('present', False)
|
||||
confidence = data.get('confidence', 0)
|
||||
last_seen = data.get('seconds_since_seen')
|
||||
spatial = data.get('spatial')
|
||||
|
||||
# Movement tracking for filtering
|
||||
is_moving = False
|
||||
static_count = 0
|
||||
filtered = False
|
||||
|
||||
if spatial and present:
|
||||
x_mm = spatial.get('x_mm', 0)
|
||||
y_mm = spatial.get('y_mm', 0)
|
||||
z_mm = spatial.get('z_mm', 0)
|
||||
|
||||
is_moving, static_count = check_movement(x_mm, y_mm, z_mm)
|
||||
|
||||
# Filter: moderate confidence + static = probably poster
|
||||
if confidence < HIGH_CONFIDENCE_THRESHOLD and static_count >= STATIC_THRESHOLD:
|
||||
present = False
|
||||
filtered = True
|
||||
|
||||
conf_pct = confidence * 100
|
||||
|
||||
if present:
|
||||
move_str = "moving" if is_moving else "still"
|
||||
return f"Foxy: present ({conf_pct:.0f}%, {move_str})"
|
||||
elif filtered:
|
||||
return f"Foxy: filtered static ({conf_pct:.0f}% - poster?)"
|
||||
elif last_seen is not None and last_seen < 300:
|
||||
return f"Foxy: away (last seen {last_seen:.0f}s ago)"
|
||||
else:
|
||||
return "Foxy: away"
|
||||
except Exception:
|
||||
pass
|
||||
return None # Return None to omit line if camera unavailable
|
||||
|
||||
|
||||
def get_matrix_status() -> str:
|
||||
"""Get Matrix message status from state file"""
|
||||
try:
|
||||
if STATE_FILE.exists():
|
||||
with open(STATE_FILE, 'r') as f:
|
||||
state = json.load(f)
|
||||
|
||||
messages = state.get('matrix_messages', [])
|
||||
unprocessed = [m for m in messages if not m.get('processed', False)]
|
||||
|
||||
if unprocessed:
|
||||
return f"Matrix: {len(unprocessed)} new message(s)"
|
||||
else:
|
||||
return "Matrix: no new messages"
|
||||
except Exception:
|
||||
pass
|
||||
return "Matrix: status unavailable"
|
||||
|
||||
|
||||
def get_vision_status() -> str:
|
||||
"""Get vision/motion event status from SQLite database"""
|
||||
try:
|
||||
if not EVENTS_DB.exists():
|
||||
return "Vision: no events database"
|
||||
|
||||
conn = sqlite3.connect(str(EVENTS_DB))
|
||||
conn.row_factory = sqlite3.Row
|
||||
|
||||
# Get events from last 2 hours
|
||||
cutoff = (datetime.now() - timedelta(hours=2)).isoformat()
|
||||
|
||||
cursor = conn.execute(
|
||||
"""SELECT camera_id, annotation FROM events
|
||||
WHERE timestamp > ?
|
||||
ORDER BY timestamp DESC""",
|
||||
(cutoff,)
|
||||
)
|
||||
|
||||
events = cursor.fetchall()
|
||||
conn.close()
|
||||
|
||||
if not events:
|
||||
return "Vision: no recent motion"
|
||||
|
||||
# Count by camera
|
||||
by_camera = {}
|
||||
unannotated = 0
|
||||
for event in events:
|
||||
cam = event['camera_id'] or 'unknown'
|
||||
by_camera[cam] = by_camera.get(cam, 0) + 1
|
||||
if not event['annotation']:
|
||||
unannotated += 1
|
||||
|
||||
total = len(events)
|
||||
camera_breakdown = ", ".join(f"{cam}: {count}" for cam, count in by_camera.items())
|
||||
|
||||
if unannotated == total:
|
||||
return f"Vision: {total} motion events ({camera_breakdown})"
|
||||
else:
|
||||
return f"Vision: {total} motion events ({camera_breakdown}), {total - unannotated} annotated"
|
||||
|
||||
except Exception as e:
|
||||
return f"Vision: error ({e})"
|
||||
|
||||
|
||||
def format_status_for_wakeup() -> str:
|
||||
"""
|
||||
Format full status string for daemon wakeup message.
|
||||
|
||||
Returns format like:
|
||||
[ENV] Basement: 69.8F, 24.5% humidity, 24.6 lux
|
||||
[WHO] Foxy: present (87%, moving)
|
||||
[MSG] Matrix: no new messages
|
||||
[CAM] Vision: 12 motion events (basement: 12)
|
||||
"""
|
||||
lines = []
|
||||
|
||||
# Environmental
|
||||
env_status = get_enviro_status()
|
||||
lines.append(f"[ENV] {env_status}")
|
||||
|
||||
# Presence (only if available)
|
||||
presence_status = get_presence_status()
|
||||
if presence_status:
|
||||
lines.append(f"[WHO] {presence_status}")
|
||||
|
||||
# Vision
|
||||
vision_status = get_vision_status()
|
||||
lines.append(f"[CAM] {vision_status}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Test output
|
||||
print(format_status_for_wakeup())
|
||||
Reference in New Issue
Block a user