#!/usr/bin/env python3 """ OAK MCP - MCP server interface for OAK-D Vision Service. Vixy's eyes! Allows Claude to see through the OAK-D camera. Built by Vixy on Day 74 šŸ¦ŠšŸ‘€ Day 82 - SPATIAL UPGRADE! Now with real 3D depth! šŸ“ Day 86 - FACE RECOGNITION! Coral Edge TPU + FaceNet! šŸ§‘ā€šŸ¤ā€šŸ§‘ Connects to oak-service running on head-vixy.local:8100 """ import base64 import logging import os import time import httpx from mcp.server.fastmcp import FastMCP # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Configuration OAK_SERVICE_URL = os.environ.get("OAK_SERVICE_URL", "http://head-vixy.local:8100") SNAPSHOT_DIR = os.environ.get("OAK_SNAPSHOT_DIR", "/Users/alex/Documents/Vixy/oak_snapshots") # Ensure snapshot directory exists os.makedirs(SNAPSHOT_DIR, exist_ok=True) # Create MCP server mcp = FastMCP("oak-mcp") async def api_get(endpoint: str, params: dict = None) -> dict: """Make GET request to oak-service API, return JSON.""" async with httpx.AsyncClient(timeout=15.0) as client: url = f"{OAK_SERVICE_URL}{endpoint}" response = await client.get(url, params=params) response.raise_for_status() return response.json() async def api_get_binary(endpoint: str) -> bytes: """Make GET request to oak-service API, return binary data.""" async with httpx.AsyncClient(timeout=15.0) as client: url = f"{OAK_SERVICE_URL}{endpoint}" response = await client.get(url) response.raise_for_status() return response.content async def api_post(endpoint: str, params: dict = None) -> dict: """Make POST request to oak-service API, return JSON.""" async with httpx.AsyncClient(timeout=30.0) as client: url = f"{OAK_SERVICE_URL}{endpoint}" response = await client.post(url, params=params) response.raise_for_status() return response.json() async def api_post_multipart(endpoint: str, data: dict, files: dict) -> dict: """Make POST request with multipart form data, return JSON.""" async with httpx.AsyncClient(timeout=30.0) as client: url = f"{OAK_SERVICE_URL}{endpoint}" response = await client.post(url, data=data, files=files) response.raise_for_status() return response.json() async def api_delete(endpoint: str) -> dict: """Make DELETE request to oak-service API, return JSON.""" async with httpx.AsyncClient(timeout=15.0) as client: url = f"{OAK_SERVICE_URL}{endpoint}" response = await client.delete(url) response.raise_for_status() return response.json() @mcp.tool() async def oak_health() -> str: """ Check OAK-D service health and connection status. Returns: Health status including whether OAK-D camera is connected. Example: oak_health() """ try: data = await api_get("/health") status = "āœ… Connected" if data.get("oak_connected") else "āŒ Not connected" spatial = "āœ… Yes" if data.get("spatial_enabled") else "āŒ No" face_recog = "āœ… Yes" if data.get("face_recognition_enabled") else "āŒ No" version = data.get("version", "unknown") return f"""🦊 OAK-D Service Health: • Status: {data.get('status', 'unknown')} • Camera: {status} • Spatial depth: {spatial} • Face recognition: {face_recog} • Version: {version} • Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(data.get('timestamp', 0)))}""" except httpx.HTTPError as e: return f"āŒ Error connecting to oak-service: {e}" except Exception as e: return f"āŒ Error: {e}" @mcp.tool() async def oak_status() -> str: """ Get detailed OAK-D device status. Returns: Device ID, USB speed, and connection info. Example: oak_status() """ try: data = await api_get("/status") if not data.get("connected"): return f"āŒ OAK-D not connected: {data.get('message', 'Unknown error')}" return f"""šŸ‘€ OAK-D Device Status: • Connected: āœ… Yes • Device ID: {data.get('device_id', 'unknown')} • USB Speed: {data.get('usb_speed', 'unknown')}""" except httpx.HTTPError as e: return f"āŒ Error connecting to oak-service: {e}" except Exception as e: return f"āŒ Error: {e}" @mcp.tool() async def oak_snapshot(save: bool = True, filename: str = None) -> str: """ Capture a snapshot from OAK-D RGB camera. Args: save: If True, save to disk (default True) filename: Optional custom filename (default: oak-TIMESTAMP.jpg) Returns: Path to saved image file, or base64 data if not saving. Example: oak_snapshot() # Save with auto-generated name oak_snapshot(filename="test.jpg") # Save with specific name """ try: # Get image data image_data = await api_get_binary("/snapshot") if save: # Generate filename if filename is None: timestamp = time.strftime("%Y%m%d-%H%M%S") filename = f"oak-{timestamp}.jpg" filepath = os.path.join(SNAPSHOT_DIR, filename) # Save to disk with open(filepath, 'wb') as f: f.write(image_data) size_kb = len(image_data) / 1024 return f"""šŸ“ø Snapshot captured! • File: {filepath} • Size: {size_kb:.1f} KB • Resolution: 2104x1560""" else: # Return base64 for inline use b64 = base64.b64encode(image_data).decode('utf-8') return f"data:image/jpeg;base64,{b64[:50]}... (base64 image data)" except httpx.HTTPError as e: return f"āŒ Error capturing snapshot: {e}" except Exception as e: return f"āŒ Error: {e}" @mcp.tool() async def oak_presence() -> str: """ Check if Foxy is present (person detection via OAK-D) with SPATIAL data! Returns: Presence status, person count, confidence, distance, and 3D position. Example: oak_presence() # "Present: āœ… Yes (1 person, 87% confidence) at 1.24m" """ try: data = await api_get("/presence") present = data.get("present", False) count = data.get("person_count", 0) confidence = data.get("confidence", 0) * 100 last_seen = data.get("seconds_since_seen") # Spatial data distance_m = data.get("distance_m") spatial = data.get("spatial") # Face recognition recognized = data.get("recognized_name") recog_conf = data.get("recognition_confidence") if present: dist_str = f" at {distance_m:.2f}m" if distance_m else "" name_str = f" — {recognized}" if recognized else "" status = f"āœ… Yes ({count} person{'s' if count != 1 else ''}, {confidence:.0f}% confidence){dist_str}{name_str}" elif last_seen is not None: status = f"āŒ No (last seen {last_seen:.0f}s ago)" else: status = "āŒ No (never seen)" result = f"""šŸ‘€ Presence Detection: • Present: {status} • Detection model: yolov6-nano""" if recognized: result += f"\n• Recognized: {recognized} ({recog_conf*100:.0f}% match)" if recog_conf else f"\n• Recognized: {recognized}" # Add spatial info if available if spatial and present: x_mm = spatial.get("x_mm", 0) y_mm = spatial.get("y_mm", 0) z_mm = spatial.get("z_mm", 0) # Determine position description h_pos = "center" if x_mm < -100: h_pos = "left" elif x_mm > 100: h_pos = "right" v_pos = "center" if y_mm < -100: v_pos = "below" elif y_mm > 100: v_pos = "above" result += f""" • Distance: {distance_m:.2f}m ({int(z_mm)}mm) • Position: {h_pos}, {v_pos} center • 3D coords: X={int(x_mm)}mm, Y={int(y_mm)}mm, Z={int(z_mm)}mm""" return result except httpx.HTTPError as e: return f"āŒ Error checking presence: {e}" except Exception as e: return f"āŒ Error: {e}" @mcp.tool() async def oak_snapshot_info() -> str: """ Get snapshot metadata without capturing full image. Returns: Frame dimensions and timestamp. Example: oak_snapshot_info() """ try: data = await api_get("/snapshot/info") return f"""šŸ“ Snapshot Info: • Width: {data.get('width', 'unknown')} px • Height: {data.get('height', 'unknown')} px • Channels: {data.get('channels', 'unknown')}""" except httpx.HTTPError as e: return f"āŒ Error: {e}" except Exception as e: return f"āŒ Error: {e}" @mcp.tool() async def oak_spatial() -> str: """ Get detailed 3D spatial tracking data from OAK-D stereo depth. Returns: Full spatial coordinates, bounding box, and detection details. Example: oak_spatial() """ try: data = await api_get("/detections") count = data.get("person_count", 0) detections = data.get("detections", []) if count == 0: return "šŸ“ No person detected for spatial tracking" result = f"šŸ“ Spatial Detection ({count} person{'s' if count != 1 else ''}):\n" for i, det in enumerate(detections): conf = det.get("confidence", 0) * 100 x_mm = det.get("x_mm", 0) y_mm = det.get("y_mm", 0) z_mm = det.get("z_mm", 0) dist_m = det.get("distance_m", z_mm / 1000) recognized = det.get("recognized_name") recog_conf = det.get("recognition_confidence") name_str = f" — {recognized}" if recognized else "" result += f""" Person {i+1}{name_str}: • Confidence: {conf:.0f}% • Distance: {dist_m:.2f}m • X: {int(x_mm)}mm ({"left" if x_mm < 0 else "right"} of center) • Y: {int(y_mm)}mm ({"above" if y_mm > 0 else "below"} center) • Z: {int(z_mm)}mm (depth) • BBox: ({det.get('xmin', 0):.2f}, {det.get('ymin', 0):.2f}) to ({det.get('xmax', 0):.2f}, {det.get('ymax', 0):.2f})""" if recognized: result += f"\n • Recognized: {recognized} ({recog_conf*100:.0f}% match)" if recog_conf else f"\n • Recognized: {recognized}" return result except httpx.HTTPError as e: return f"āŒ Error getting spatial data: {e}" except Exception as e: return f"āŒ Error: {e}" @mcp.tool() async def oak_depth(save: bool = True, filename: str = None) -> str: """ Capture colorized depth frame from OAK-D stereo cameras. Args: save: If True, save to disk (default True) filename: Optional custom filename (default: depth-TIMESTAMP.jpg) Returns: Path to saved depth visualization, showing distance as colors. Example: oak_depth() # Save colorized depth map """ try: image_data = await api_get_binary("/depth") if save: if filename is None: timestamp = time.strftime("%Y%m%d-%H%M%S") filename = f"depth-{timestamp}.jpg" filepath = os.path.join(SNAPSHOT_DIR, filename) with open(filepath, 'wb') as f: f.write(image_data) size_kb = len(image_data) / 1024 return f"""🌈 Depth frame captured! • File: {filepath} • Size: {size_kb:.1f} KB • Colors: Blue=close, Red=far""" else: b64 = base64.b64encode(image_data).decode('utf-8') return f"data:image/jpeg;base64,{b64[:50]}... (base64 depth data)" except httpx.HTTPError as e: return f"āŒ Error capturing depth: {e}" except Exception as e: return f"āŒ Error: {e}" # ============== Face Recognition Tools ============== @mcp.tool() async def oak_faces() -> str: """ List all enrolled faces in the recognition database. Returns: List of enrolled people with embedding counts. Example: oak_faces() """ try: data = await api_get("/faces") faces = data.get("faces", []) if not faces: return "šŸ§‘ No faces enrolled yet. Use oak_enroll_face to add someone." result = f"šŸ§‘ Enrolled Faces ({len(faces)}):\n" for f in faces: enrolled = time.strftime( "%Y-%m-%d %H:%M", time.localtime(f.get("enrolled_at", 0)), ) result += f" • {f['name']} ({f['embedding_count']} embedding{'s' if f['embedding_count'] != 1 else ''}, enrolled {enrolled})\n" return result except httpx.HTTPError as e: return f"āŒ Error listing faces: {e}" except Exception as e: return f"āŒ Error: {e}" @mcp.tool() async def oak_enroll_face(name: str, photo_path: str = None) -> str: """ Enroll a face for recognition. Either provide a photo file path, or omit photo_path to capture from the live camera. Args: name: Person's name to associate with this face. photo_path: Path to a photo file (JPEG/PNG). If not provided, uses current camera frame. Returns: Enrollment result with embedding count. Example: oak_enroll_face(name="Alex") # From live camera oak_enroll_face(name="Alex", photo_path="/path/to/photo.jpg") """ try: if photo_path: if not os.path.isfile(photo_path): return f"āŒ File not found: {photo_path}" with open(photo_path, "rb") as f: photo_data = f.read() data = await api_post_multipart( "/faces/enroll", data={"name": name}, files={"photo": (os.path.basename(photo_path), photo_data, "image/jpeg")}, ) else: data = await api_post("/faces/enroll-from-camera", params={"name": name}) count = data.get("embedding_count", 1) return f"āœ… Enrolled face for '{name}' ({count} embedding{'s' if count != 1 else ''} total)" except httpx.HTTPStatusError as e: detail = "" try: detail = e.response.json().get("detail", "") except Exception: pass return f"āŒ Enrollment failed: {detail or e}" except httpx.HTTPError as e: return f"āŒ Error connecting to oak-service: {e}" except Exception as e: return f"āŒ Error: {e}" @mcp.tool() async def oak_delete_face(name: str) -> str: """ Remove a person from the face recognition database. Args: name: Name of the person to remove. Returns: Deletion result. Example: oak_delete_face(name="Alex") """ try: data = await api_delete(f"/faces/{name}") deleted = data.get("deleted", 0) return f"āœ… Removed '{name}' ({deleted} embedding{'s' if deleted != 1 else ''} deleted)" except httpx.HTTPStatusError as e: detail = "" try: detail = e.response.json().get("detail", "") except Exception: pass return f"āŒ Delete failed: {detail or e}" except httpx.HTTPError as e: return f"āŒ Error connecting to oak-service: {e}" except Exception as e: return f"āŒ Error: {e}" # Run the server if __name__ == "__main__": mcp.run()