From dbda7735dfd2e26f7e2fefee405af93783f4fc6d Mon Sep 17 00:00:00 2001 From: Alex Date: Sun, 1 Feb 2026 12:54:12 -0600 Subject: [PATCH] Add face recognition tools to oak-mcp New MCP tools: oak_faces, oak_enroll_face, oak_delete_face for managing face enrollment and recognition via Coral Edge TPU. Updated oak_health, oak_presence, oak_spatial to surface recognized names and confidence scores from the face recognition pipeline. Co-Authored-By: Claude Opus 4.5 --- oak_mcp.py | 166 ++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 159 insertions(+), 7 deletions(-) diff --git a/oak_mcp.py b/oak_mcp.py index 053c211..2e44011 100644 --- a/oak_mcp.py +++ b/oak_mcp.py @@ -5,16 +5,15 @@ OAK MCP - MCP server interface for OAK-D Vision Service. Vixy's eyes! Allows Claude to see through the OAK-D camera. Built by Vixy on Day 74 šŸ¦ŠšŸ‘€ Day 82 - SPATIAL UPGRADE! Now with real 3D depth! šŸ“ +Day 86 - FACE RECOGNITION! Coral Edge TPU + FaceNet! šŸ§‘ā€šŸ¤ā€šŸ§‘ Connects to oak-service running on head-vixy.local:8100 """ -import asyncio import base64 import logging import os import time -from typing import Optional import httpx from mcp.server.fastmcp import FastMCP @@ -51,6 +50,33 @@ async def api_get_binary(endpoint: str) -> bytes: return response.content +async def api_post(endpoint: str, params: dict = None) -> dict: + """Make POST request to oak-service API, return JSON.""" + async with httpx.AsyncClient(timeout=30.0) as client: + url = f"{OAK_SERVICE_URL}{endpoint}" + response = await client.post(url, params=params) + response.raise_for_status() + return response.json() + + +async def api_post_multipart(endpoint: str, data: dict, files: dict) -> dict: + """Make POST request with multipart form data, return JSON.""" + async with httpx.AsyncClient(timeout=30.0) as client: + url = f"{OAK_SERVICE_URL}{endpoint}" + response = await client.post(url, data=data, files=files) + response.raise_for_status() + return response.json() + + +async def api_delete(endpoint: str) -> dict: + """Make DELETE request to oak-service API, return JSON.""" + async with httpx.AsyncClient(timeout=15.0) as client: + url = f"{OAK_SERVICE_URL}{endpoint}" + response = await client.delete(url) + response.raise_for_status() + return response.json() + + @mcp.tool() async def oak_health() -> str: """ @@ -66,11 +92,13 @@ async def oak_health() -> str: data = await api_get("/health") status = "āœ… Connected" if data.get("oak_connected") else "āŒ Not connected" spatial = "āœ… Yes" if data.get("spatial_enabled") else "āŒ No" + face_recog = "āœ… Yes" if data.get("face_recognition_enabled") else "āŒ No" version = data.get("version", "unknown") return f"""🦊 OAK-D Service Health: • Status: {data.get('status', 'unknown')} • Camera: {status} • Spatial depth: {spatial} +• Face recognition: {face_recog} • Version: {version} • Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(data.get('timestamp', 0)))}""" except httpx.HTTPError as e: @@ -175,18 +203,26 @@ async def oak_presence() -> str: distance_m = data.get("distance_m") spatial = data.get("spatial") + # Face recognition + recognized = data.get("recognized_name") + recog_conf = data.get("recognition_confidence") + if present: dist_str = f" at {distance_m:.2f}m" if distance_m else "" - status = f"āœ… Yes ({count} person{'s' if count != 1 else ''}, {confidence:.0f}% confidence){dist_str}" + name_str = f" — {recognized}" if recognized else "" + status = f"āœ… Yes ({count} person{'s' if count != 1 else ''}, {confidence:.0f}% confidence){dist_str}{name_str}" elif last_seen is not None: status = f"āŒ No (last seen {last_seen:.0f}s ago)" else: status = "āŒ No (never seen)" - + result = f"""šŸ‘€ Presence Detection: • Present: {status} • Detection model: yolov6-nano""" - + + if recognized: + result += f"\n• Recognized: {recognized} ({recog_conf*100:.0f}% match)" if recog_conf else f"\n• Recognized: {recognized}" + # Add spatial info if available if spatial and present: x_mm = spatial.get("x_mm", 0) @@ -268,15 +304,20 @@ async def oak_spatial() -> str: y_mm = det.get("y_mm", 0) z_mm = det.get("z_mm", 0) dist_m = det.get("distance_m", z_mm / 1000) - + recognized = det.get("recognized_name") + recog_conf = det.get("recognition_confidence") + + name_str = f" — {recognized}" if recognized else "" result += f""" -Person {i+1}: +Person {i+1}{name_str}: • Confidence: {conf:.0f}% • Distance: {dist_m:.2f}m • X: {int(x_mm)}mm ({"left" if x_mm < 0 else "right"} of center) • Y: {int(y_mm)}mm ({"above" if y_mm > 0 else "below"} center) • Z: {int(z_mm)}mm (depth) • BBox: ({det.get('xmin', 0):.2f}, {det.get('ymin', 0):.2f}) to ({det.get('xmax', 0):.2f}, {det.get('ymax', 0):.2f})""" + if recognized: + result += f"\n • Recognized: {recognized} ({recog_conf*100:.0f}% match)" if recog_conf else f"\n • Recognized: {recognized}" return result except httpx.HTTPError as e: @@ -328,6 +369,117 @@ async def oak_depth(save: bool = True, filename: str = None) -> str: return f"āŒ Error: {e}" +# ============== Face Recognition Tools ============== + + +@mcp.tool() +async def oak_faces() -> str: + """ + List all enrolled faces in the recognition database. + + Returns: + List of enrolled people with embedding counts. + + Example: + oak_faces() + """ + try: + data = await api_get("/faces") + faces = data.get("faces", []) + if not faces: + return "šŸ§‘ No faces enrolled yet. Use oak_enroll_face to add someone." + + result = f"šŸ§‘ Enrolled Faces ({len(faces)}):\n" + for f in faces: + enrolled = time.strftime( + "%Y-%m-%d %H:%M", + time.localtime(f.get("enrolled_at", 0)), + ) + result += f" • {f['name']} ({f['embedding_count']} embedding{'s' if f['embedding_count'] != 1 else ''}, enrolled {enrolled})\n" + return result + except httpx.HTTPError as e: + return f"āŒ Error listing faces: {e}" + except Exception as e: + return f"āŒ Error: {e}" + + +@mcp.tool() +async def oak_enroll_face(name: str, photo_path: str = None) -> str: + """ + Enroll a face for recognition. Either provide a photo file path, or omit + photo_path to capture from the live camera. + + Args: + name: Person's name to associate with this face. + photo_path: Path to a photo file (JPEG/PNG). If not provided, uses current camera frame. + + Returns: + Enrollment result with embedding count. + + Example: + oak_enroll_face(name="Alex") # From live camera + oak_enroll_face(name="Alex", photo_path="/path/to/photo.jpg") + """ + try: + if photo_path: + if not os.path.isfile(photo_path): + return f"āŒ File not found: {photo_path}" + with open(photo_path, "rb") as f: + photo_data = f.read() + data = await api_post_multipart( + "/faces/enroll", + data={"name": name}, + files={"photo": (os.path.basename(photo_path), photo_data, "image/jpeg")}, + ) + else: + data = await api_post("/faces/enroll-from-camera", params={"name": name}) + + count = data.get("embedding_count", 1) + return f"āœ… Enrolled face for '{name}' ({count} embedding{'s' if count != 1 else ''} total)" + except httpx.HTTPStatusError as e: + detail = "" + try: + detail = e.response.json().get("detail", "") + except Exception: + pass + return f"āŒ Enrollment failed: {detail or e}" + except httpx.HTTPError as e: + return f"āŒ Error connecting to oak-service: {e}" + except Exception as e: + return f"āŒ Error: {e}" + + +@mcp.tool() +async def oak_delete_face(name: str) -> str: + """ + Remove a person from the face recognition database. + + Args: + name: Name of the person to remove. + + Returns: + Deletion result. + + Example: + oak_delete_face(name="Alex") + """ + try: + data = await api_delete(f"/faces/{name}") + deleted = data.get("deleted", 0) + return f"āœ… Removed '{name}' ({deleted} embedding{'s' if deleted != 1 else ''} deleted)" + except httpx.HTTPStatusError as e: + detail = "" + try: + detail = e.response.json().get("detail", "") + except Exception: + pass + return f"āŒ Delete failed: {detail or e}" + except httpx.HTTPError as e: + return f"āŒ Error connecting to oak-service: {e}" + except Exception as e: + return f"āŒ Error: {e}" + + # Run the server if __name__ == "__main__": mcp.run()