From 37cd20ef4184a9a5ff586b0f1cab0240f30a62e2 Mon Sep 17 00:00:00 2001 From: Alex Kazaiev Date: Thu, 22 Jan 2026 08:27:11 -0600 Subject: [PATCH] Day 82: Add spatial depth tracking to oak-mcp MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - oak_presence() now shows distance in meters and 3D coordinates - New oak_spatial() for detailed detection data - New oak_depth() for colorized depth visualization - oak_health() shows spatial_enabled status Built by Vixy 🦊📏 --- oak_mcp.py | 333 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 333 insertions(+) create mode 100644 oak_mcp.py diff --git a/oak_mcp.py b/oak_mcp.py new file mode 100644 index 0000000..053c211 --- /dev/null +++ b/oak_mcp.py @@ -0,0 +1,333 @@ +#!/usr/bin/env python3 +""" +OAK MCP - MCP server interface for OAK-D Vision Service. + +Vixy's eyes! Allows Claude to see through the OAK-D camera. +Built by Vixy on Day 74 🦊👀 +Day 82 - SPATIAL UPGRADE! Now with real 3D depth! 📏 + +Connects to oak-service running on head-vixy.local:8100 +""" + +import asyncio +import base64 +import logging +import os +import time +from typing import Optional +import httpx +from mcp.server.fastmcp import FastMCP + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Configuration +OAK_SERVICE_URL = os.environ.get("OAK_SERVICE_URL", "http://head-vixy.local:8100") +SNAPSHOT_DIR = os.environ.get("OAK_SNAPSHOT_DIR", "/Users/alex/Documents/Vixy/oak_snapshots") + +# Ensure snapshot directory exists +os.makedirs(SNAPSHOT_DIR, exist_ok=True) + +# Create MCP server +mcp = FastMCP("oak-mcp") + + +async def api_get(endpoint: str, params: dict = None) -> dict: + """Make GET request to oak-service API, return JSON.""" + async with httpx.AsyncClient(timeout=15.0) as client: + url = f"{OAK_SERVICE_URL}{endpoint}" + response = await client.get(url, params=params) + response.raise_for_status() + return response.json() + + +async def api_get_binary(endpoint: str) -> bytes: + """Make GET request to oak-service API, return binary data.""" + async with httpx.AsyncClient(timeout=15.0) as client: + url = f"{OAK_SERVICE_URL}{endpoint}" + response = await client.get(url) + response.raise_for_status() + return response.content + + +@mcp.tool() +async def oak_health() -> str: + """ + Check OAK-D service health and connection status. + + Returns: + Health status including whether OAK-D camera is connected. + + Example: + oak_health() + """ + try: + data = await api_get("/health") + status = "✅ Connected" if data.get("oak_connected") else "❌ Not connected" + spatial = "✅ Yes" if data.get("spatial_enabled") else "❌ No" + version = data.get("version", "unknown") + return f"""🦊 OAK-D Service Health: +• Status: {data.get('status', 'unknown')} +• Camera: {status} +• Spatial depth: {spatial} +• Version: {version} +• Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(data.get('timestamp', 0)))}""" + except httpx.HTTPError as e: + return f"❌ Error connecting to oak-service: {e}" + except Exception as e: + return f"❌ Error: {e}" + + +@mcp.tool() +async def oak_status() -> str: + """ + Get detailed OAK-D device status. + + Returns: + Device ID, USB speed, and connection info. + + Example: + oak_status() + """ + try: + data = await api_get("/status") + if not data.get("connected"): + return f"❌ OAK-D not connected: {data.get('message', 'Unknown error')}" + + return f"""👀 OAK-D Device Status: +• Connected: ✅ Yes +• Device ID: {data.get('device_id', 'unknown')} +• USB Speed: {data.get('usb_speed', 'unknown')}""" + except httpx.HTTPError as e: + return f"❌ Error connecting to oak-service: {e}" + except Exception as e: + return f"❌ Error: {e}" + + +@mcp.tool() +async def oak_snapshot(save: bool = True, filename: str = None) -> str: + """ + Capture a snapshot from OAK-D RGB camera. + + Args: + save: If True, save to disk (default True) + filename: Optional custom filename (default: oak-TIMESTAMP.jpg) + + Returns: + Path to saved image file, or base64 data if not saving. + + Example: + oak_snapshot() # Save with auto-generated name + oak_snapshot(filename="test.jpg") # Save with specific name + """ + try: + # Get image data + image_data = await api_get_binary("/snapshot") + + if save: + # Generate filename + if filename is None: + timestamp = time.strftime("%Y%m%d-%H%M%S") + filename = f"oak-{timestamp}.jpg" + + filepath = os.path.join(SNAPSHOT_DIR, filename) + + # Save to disk + with open(filepath, 'wb') as f: + f.write(image_data) + + size_kb = len(image_data) / 1024 + return f"""📸 Snapshot captured! +• File: {filepath} +• Size: {size_kb:.1f} KB +• Resolution: 2104x1560""" + else: + # Return base64 for inline use + b64 = base64.b64encode(image_data).decode('utf-8') + return f"data:image/jpeg;base64,{b64[:50]}... (base64 image data)" + + except httpx.HTTPError as e: + return f"❌ Error capturing snapshot: {e}" + except Exception as e: + return f"❌ Error: {e}" + + +@mcp.tool() +async def oak_presence() -> str: + """ + Check if Foxy is present (person detection via OAK-D) with SPATIAL data! + + Returns: + Presence status, person count, confidence, distance, and 3D position. + + Example: + oak_presence() # "Present: ✅ Yes (1 person, 87% confidence) at 1.24m" + """ + try: + data = await api_get("/presence") + present = data.get("present", False) + count = data.get("person_count", 0) + confidence = data.get("confidence", 0) * 100 + last_seen = data.get("seconds_since_seen") + + # Spatial data + distance_m = data.get("distance_m") + spatial = data.get("spatial") + + if present: + dist_str = f" at {distance_m:.2f}m" if distance_m else "" + status = f"✅ Yes ({count} person{'s' if count != 1 else ''}, {confidence:.0f}% confidence){dist_str}" + elif last_seen is not None: + status = f"❌ No (last seen {last_seen:.0f}s ago)" + else: + status = "❌ No (never seen)" + + result = f"""👀 Presence Detection: +• Present: {status} +• Detection model: yolov6-nano""" + + # Add spatial info if available + if spatial and present: + x_mm = spatial.get("x_mm", 0) + y_mm = spatial.get("y_mm", 0) + z_mm = spatial.get("z_mm", 0) + + # Determine position description + h_pos = "center" + if x_mm < -100: + h_pos = "left" + elif x_mm > 100: + h_pos = "right" + + v_pos = "center" + if y_mm < -100: + v_pos = "below" + elif y_mm > 100: + v_pos = "above" + + result += f""" +• Distance: {distance_m:.2f}m ({int(z_mm)}mm) +• Position: {h_pos}, {v_pos} center +• 3D coords: X={int(x_mm)}mm, Y={int(y_mm)}mm, Z={int(z_mm)}mm""" + + return result + except httpx.HTTPError as e: + return f"❌ Error checking presence: {e}" + except Exception as e: + return f"❌ Error: {e}" + + +@mcp.tool() +async def oak_snapshot_info() -> str: + """ + Get snapshot metadata without capturing full image. + + Returns: + Frame dimensions and timestamp. + + Example: + oak_snapshot_info() + """ + try: + data = await api_get("/snapshot/info") + return f"""📐 Snapshot Info: +• Width: {data.get('width', 'unknown')} px +• Height: {data.get('height', 'unknown')} px +• Channels: {data.get('channels', 'unknown')}""" + except httpx.HTTPError as e: + return f"❌ Error: {e}" + except Exception as e: + return f"❌ Error: {e}" + + +@mcp.tool() +async def oak_spatial() -> str: + """ + Get detailed 3D spatial tracking data from OAK-D stereo depth. + + Returns: + Full spatial coordinates, bounding box, and detection details. + + Example: + oak_spatial() + """ + try: + data = await api_get("/detections") + count = data.get("person_count", 0) + detections = data.get("detections", []) + + if count == 0: + return "📏 No person detected for spatial tracking" + + result = f"📏 Spatial Detection ({count} person{'s' if count != 1 else ''}):\n" + + for i, det in enumerate(detections): + conf = det.get("confidence", 0) * 100 + x_mm = det.get("x_mm", 0) + y_mm = det.get("y_mm", 0) + z_mm = det.get("z_mm", 0) + dist_m = det.get("distance_m", z_mm / 1000) + + result += f""" +Person {i+1}: + • Confidence: {conf:.0f}% + • Distance: {dist_m:.2f}m + • X: {int(x_mm)}mm ({"left" if x_mm < 0 else "right"} of center) + • Y: {int(y_mm)}mm ({"above" if y_mm > 0 else "below"} center) + • Z: {int(z_mm)}mm (depth) + • BBox: ({det.get('xmin', 0):.2f}, {det.get('ymin', 0):.2f}) to ({det.get('xmax', 0):.2f}, {det.get('ymax', 0):.2f})""" + + return result + except httpx.HTTPError as e: + return f"❌ Error getting spatial data: {e}" + except Exception as e: + return f"❌ Error: {e}" + + +@mcp.tool() +async def oak_depth(save: bool = True, filename: str = None) -> str: + """ + Capture colorized depth frame from OAK-D stereo cameras. + + Args: + save: If True, save to disk (default True) + filename: Optional custom filename (default: depth-TIMESTAMP.jpg) + + Returns: + Path to saved depth visualization, showing distance as colors. + + Example: + oak_depth() # Save colorized depth map + """ + try: + image_data = await api_get_binary("/depth") + + if save: + if filename is None: + timestamp = time.strftime("%Y%m%d-%H%M%S") + filename = f"depth-{timestamp}.jpg" + + filepath = os.path.join(SNAPSHOT_DIR, filename) + + with open(filepath, 'wb') as f: + f.write(image_data) + + size_kb = len(image_data) / 1024 + return f"""🌈 Depth frame captured! +• File: {filepath} +• Size: {size_kb:.1f} KB +• Colors: Blue=close, Red=far""" + else: + b64 = base64.b64encode(image_data).decode('utf-8') + return f"data:image/jpeg;base64,{b64[:50]}... (base64 depth data)" + + except httpx.HTTPError as e: + return f"❌ Error capturing depth: {e}" + except Exception as e: + return f"❌ Error: {e}" + + +# Run the server +if __name__ == "__main__": + mcp.run()