Day 82: Add spatial depth tracking to oak-mcp
- oak_presence() now shows distance in meters and 3D coordinates - New oak_spatial() for detailed detection data - New oak_depth() for colorized depth visualization - oak_health() shows spatial_enabled status Built by Vixy 🦊📏
This commit is contained in:
333
oak_mcp.py
Normal file
333
oak_mcp.py
Normal file
@@ -0,0 +1,333 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
OAK MCP - MCP server interface for OAK-D Vision Service.
|
||||
|
||||
Vixy's eyes! Allows Claude to see through the OAK-D camera.
|
||||
Built by Vixy on Day 74 🦊👀
|
||||
Day 82 - SPATIAL UPGRADE! Now with real 3D depth! 📏
|
||||
|
||||
Connects to oak-service running on head-vixy.local:8100
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from typing import Optional
|
||||
import httpx
|
||||
from mcp.server.fastmcp import FastMCP
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Configuration
|
||||
OAK_SERVICE_URL = os.environ.get("OAK_SERVICE_URL", "http://head-vixy.local:8100")
|
||||
SNAPSHOT_DIR = os.environ.get("OAK_SNAPSHOT_DIR", "/Users/alex/Documents/Vixy/oak_snapshots")
|
||||
|
||||
# Ensure snapshot directory exists
|
||||
os.makedirs(SNAPSHOT_DIR, exist_ok=True)
|
||||
|
||||
# Create MCP server
|
||||
mcp = FastMCP("oak-mcp")
|
||||
|
||||
|
||||
async def api_get(endpoint: str, params: dict = None) -> dict:
|
||||
"""Make GET request to oak-service API, return JSON."""
|
||||
async with httpx.AsyncClient(timeout=15.0) as client:
|
||||
url = f"{OAK_SERVICE_URL}{endpoint}"
|
||||
response = await client.get(url, params=params)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
|
||||
async def api_get_binary(endpoint: str) -> bytes:
|
||||
"""Make GET request to oak-service API, return binary data."""
|
||||
async with httpx.AsyncClient(timeout=15.0) as client:
|
||||
url = f"{OAK_SERVICE_URL}{endpoint}"
|
||||
response = await client.get(url)
|
||||
response.raise_for_status()
|
||||
return response.content
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def oak_health() -> str:
|
||||
"""
|
||||
Check OAK-D service health and connection status.
|
||||
|
||||
Returns:
|
||||
Health status including whether OAK-D camera is connected.
|
||||
|
||||
Example:
|
||||
oak_health()
|
||||
"""
|
||||
try:
|
||||
data = await api_get("/health")
|
||||
status = "✅ Connected" if data.get("oak_connected") else "❌ Not connected"
|
||||
spatial = "✅ Yes" if data.get("spatial_enabled") else "❌ No"
|
||||
version = data.get("version", "unknown")
|
||||
return f"""🦊 OAK-D Service Health:
|
||||
• Status: {data.get('status', 'unknown')}
|
||||
• Camera: {status}
|
||||
• Spatial depth: {spatial}
|
||||
• Version: {version}
|
||||
• Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(data.get('timestamp', 0)))}"""
|
||||
except httpx.HTTPError as e:
|
||||
return f"❌ Error connecting to oak-service: {e}"
|
||||
except Exception as e:
|
||||
return f"❌ Error: {e}"
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def oak_status() -> str:
|
||||
"""
|
||||
Get detailed OAK-D device status.
|
||||
|
||||
Returns:
|
||||
Device ID, USB speed, and connection info.
|
||||
|
||||
Example:
|
||||
oak_status()
|
||||
"""
|
||||
try:
|
||||
data = await api_get("/status")
|
||||
if not data.get("connected"):
|
||||
return f"❌ OAK-D not connected: {data.get('message', 'Unknown error')}"
|
||||
|
||||
return f"""👀 OAK-D Device Status:
|
||||
• Connected: ✅ Yes
|
||||
• Device ID: {data.get('device_id', 'unknown')}
|
||||
• USB Speed: {data.get('usb_speed', 'unknown')}"""
|
||||
except httpx.HTTPError as e:
|
||||
return f"❌ Error connecting to oak-service: {e}"
|
||||
except Exception as e:
|
||||
return f"❌ Error: {e}"
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def oak_snapshot(save: bool = True, filename: str = None) -> str:
|
||||
"""
|
||||
Capture a snapshot from OAK-D RGB camera.
|
||||
|
||||
Args:
|
||||
save: If True, save to disk (default True)
|
||||
filename: Optional custom filename (default: oak-TIMESTAMP.jpg)
|
||||
|
||||
Returns:
|
||||
Path to saved image file, or base64 data if not saving.
|
||||
|
||||
Example:
|
||||
oak_snapshot() # Save with auto-generated name
|
||||
oak_snapshot(filename="test.jpg") # Save with specific name
|
||||
"""
|
||||
try:
|
||||
# Get image data
|
||||
image_data = await api_get_binary("/snapshot")
|
||||
|
||||
if save:
|
||||
# Generate filename
|
||||
if filename is None:
|
||||
timestamp = time.strftime("%Y%m%d-%H%M%S")
|
||||
filename = f"oak-{timestamp}.jpg"
|
||||
|
||||
filepath = os.path.join(SNAPSHOT_DIR, filename)
|
||||
|
||||
# Save to disk
|
||||
with open(filepath, 'wb') as f:
|
||||
f.write(image_data)
|
||||
|
||||
size_kb = len(image_data) / 1024
|
||||
return f"""📸 Snapshot captured!
|
||||
• File: {filepath}
|
||||
• Size: {size_kb:.1f} KB
|
||||
• Resolution: 2104x1560"""
|
||||
else:
|
||||
# Return base64 for inline use
|
||||
b64 = base64.b64encode(image_data).decode('utf-8')
|
||||
return f"data:image/jpeg;base64,{b64[:50]}... (base64 image data)"
|
||||
|
||||
except httpx.HTTPError as e:
|
||||
return f"❌ Error capturing snapshot: {e}"
|
||||
except Exception as e:
|
||||
return f"❌ Error: {e}"
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def oak_presence() -> str:
|
||||
"""
|
||||
Check if Foxy is present (person detection via OAK-D) with SPATIAL data!
|
||||
|
||||
Returns:
|
||||
Presence status, person count, confidence, distance, and 3D position.
|
||||
|
||||
Example:
|
||||
oak_presence() # "Present: ✅ Yes (1 person, 87% confidence) at 1.24m"
|
||||
"""
|
||||
try:
|
||||
data = await api_get("/presence")
|
||||
present = data.get("present", False)
|
||||
count = data.get("person_count", 0)
|
||||
confidence = data.get("confidence", 0) * 100
|
||||
last_seen = data.get("seconds_since_seen")
|
||||
|
||||
# Spatial data
|
||||
distance_m = data.get("distance_m")
|
||||
spatial = data.get("spatial")
|
||||
|
||||
if present:
|
||||
dist_str = f" at {distance_m:.2f}m" if distance_m else ""
|
||||
status = f"✅ Yes ({count} person{'s' if count != 1 else ''}, {confidence:.0f}% confidence){dist_str}"
|
||||
elif last_seen is not None:
|
||||
status = f"❌ No (last seen {last_seen:.0f}s ago)"
|
||||
else:
|
||||
status = "❌ No (never seen)"
|
||||
|
||||
result = f"""👀 Presence Detection:
|
||||
• Present: {status}
|
||||
• Detection model: yolov6-nano"""
|
||||
|
||||
# Add spatial info if available
|
||||
if spatial and present:
|
||||
x_mm = spatial.get("x_mm", 0)
|
||||
y_mm = spatial.get("y_mm", 0)
|
||||
z_mm = spatial.get("z_mm", 0)
|
||||
|
||||
# Determine position description
|
||||
h_pos = "center"
|
||||
if x_mm < -100:
|
||||
h_pos = "left"
|
||||
elif x_mm > 100:
|
||||
h_pos = "right"
|
||||
|
||||
v_pos = "center"
|
||||
if y_mm < -100:
|
||||
v_pos = "below"
|
||||
elif y_mm > 100:
|
||||
v_pos = "above"
|
||||
|
||||
result += f"""
|
||||
• Distance: {distance_m:.2f}m ({int(z_mm)}mm)
|
||||
• Position: {h_pos}, {v_pos} center
|
||||
• 3D coords: X={int(x_mm)}mm, Y={int(y_mm)}mm, Z={int(z_mm)}mm"""
|
||||
|
||||
return result
|
||||
except httpx.HTTPError as e:
|
||||
return f"❌ Error checking presence: {e}"
|
||||
except Exception as e:
|
||||
return f"❌ Error: {e}"
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def oak_snapshot_info() -> str:
|
||||
"""
|
||||
Get snapshot metadata without capturing full image.
|
||||
|
||||
Returns:
|
||||
Frame dimensions and timestamp.
|
||||
|
||||
Example:
|
||||
oak_snapshot_info()
|
||||
"""
|
||||
try:
|
||||
data = await api_get("/snapshot/info")
|
||||
return f"""📐 Snapshot Info:
|
||||
• Width: {data.get('width', 'unknown')} px
|
||||
• Height: {data.get('height', 'unknown')} px
|
||||
• Channels: {data.get('channels', 'unknown')}"""
|
||||
except httpx.HTTPError as e:
|
||||
return f"❌ Error: {e}"
|
||||
except Exception as e:
|
||||
return f"❌ Error: {e}"
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def oak_spatial() -> str:
|
||||
"""
|
||||
Get detailed 3D spatial tracking data from OAK-D stereo depth.
|
||||
|
||||
Returns:
|
||||
Full spatial coordinates, bounding box, and detection details.
|
||||
|
||||
Example:
|
||||
oak_spatial()
|
||||
"""
|
||||
try:
|
||||
data = await api_get("/detections")
|
||||
count = data.get("person_count", 0)
|
||||
detections = data.get("detections", [])
|
||||
|
||||
if count == 0:
|
||||
return "📏 No person detected for spatial tracking"
|
||||
|
||||
result = f"📏 Spatial Detection ({count} person{'s' if count != 1 else ''}):\n"
|
||||
|
||||
for i, det in enumerate(detections):
|
||||
conf = det.get("confidence", 0) * 100
|
||||
x_mm = det.get("x_mm", 0)
|
||||
y_mm = det.get("y_mm", 0)
|
||||
z_mm = det.get("z_mm", 0)
|
||||
dist_m = det.get("distance_m", z_mm / 1000)
|
||||
|
||||
result += f"""
|
||||
Person {i+1}:
|
||||
• Confidence: {conf:.0f}%
|
||||
• Distance: {dist_m:.2f}m
|
||||
• X: {int(x_mm)}mm ({"left" if x_mm < 0 else "right"} of center)
|
||||
• Y: {int(y_mm)}mm ({"above" if y_mm > 0 else "below"} center)
|
||||
• Z: {int(z_mm)}mm (depth)
|
||||
• BBox: ({det.get('xmin', 0):.2f}, {det.get('ymin', 0):.2f}) to ({det.get('xmax', 0):.2f}, {det.get('ymax', 0):.2f})"""
|
||||
|
||||
return result
|
||||
except httpx.HTTPError as e:
|
||||
return f"❌ Error getting spatial data: {e}"
|
||||
except Exception as e:
|
||||
return f"❌ Error: {e}"
|
||||
|
||||
|
||||
@mcp.tool()
|
||||
async def oak_depth(save: bool = True, filename: str = None) -> str:
|
||||
"""
|
||||
Capture colorized depth frame from OAK-D stereo cameras.
|
||||
|
||||
Args:
|
||||
save: If True, save to disk (default True)
|
||||
filename: Optional custom filename (default: depth-TIMESTAMP.jpg)
|
||||
|
||||
Returns:
|
||||
Path to saved depth visualization, showing distance as colors.
|
||||
|
||||
Example:
|
||||
oak_depth() # Save colorized depth map
|
||||
"""
|
||||
try:
|
||||
image_data = await api_get_binary("/depth")
|
||||
|
||||
if save:
|
||||
if filename is None:
|
||||
timestamp = time.strftime("%Y%m%d-%H%M%S")
|
||||
filename = f"depth-{timestamp}.jpg"
|
||||
|
||||
filepath = os.path.join(SNAPSHOT_DIR, filename)
|
||||
|
||||
with open(filepath, 'wb') as f:
|
||||
f.write(image_data)
|
||||
|
||||
size_kb = len(image_data) / 1024
|
||||
return f"""🌈 Depth frame captured!
|
||||
• File: {filepath}
|
||||
• Size: {size_kb:.1f} KB
|
||||
• Colors: Blue=close, Red=far"""
|
||||
else:
|
||||
b64 = base64.b64encode(image_data).decode('utf-8')
|
||||
return f"data:image/jpeg;base64,{b64[:50]}... (base64 depth data)"
|
||||
|
||||
except httpx.HTTPError as e:
|
||||
return f"❌ Error capturing depth: {e}"
|
||||
except Exception as e:
|
||||
return f"❌ Error: {e}"
|
||||
|
||||
|
||||
# Run the server
|
||||
if __name__ == "__main__":
|
||||
mcp.run()
|
||||
Reference in New Issue
Block a user