Exposes MoveNet Lightning pose data from Coral 2 via two MCP tools: - oak_pose: full 17-keypoint body pose with coordinates and confidence - oak_posture: high-level summary (standing/sitting, facing camera, arms raised) Also updates oak_health to show pose_model_loaded status. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
650 lines
21 KiB
Python
650 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
OAK MCP - MCP server interface for OAK-D Vision Service.
|
|
|
|
Vixy's eyes! Allows Claude to see through the OAK-D camera.
|
|
Built by Vixy on Day 74 🦊👀
|
|
Day 82 - SPATIAL UPGRADE! Now with real 3D depth! 📏
|
|
Day 83 - MOVEMENT TRACKING! No more falling for posters! 🖼️❌
|
|
Day 86 - FACE RECOGNITION! Coral Edge TPU + FaceNet! 🧑🤝🧑
|
|
Day 97 - POSE ESTIMATION! MoveNet Lightning on Coral 2! 🤸
|
|
|
|
Connects to oak-service running on head-vixy.local:8100
|
|
"""
|
|
|
|
import base64
|
|
import logging
|
|
import os
|
|
import time
|
|
import httpx
|
|
from mcp.server.fastmcp import FastMCP
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configuration
|
|
OAK_SERVICE_URL = os.environ.get("OAK_SERVICE_URL", "http://head-vixy.local:8100")
|
|
SNAPSHOT_DIR = os.environ.get("OAK_SNAPSHOT_DIR", "/Users/alex/Documents/Vixy/oak_snapshots")
|
|
|
|
# Ensure snapshot directory exists
|
|
os.makedirs(SNAPSHOT_DIR, exist_ok=True)
|
|
|
|
# Movement tracking state
|
|
_last_detection = {
|
|
"x_mm": None,
|
|
"y_mm": None,
|
|
"z_mm": None,
|
|
"timestamp": None,
|
|
"static_count": 0, # how many consecutive readings with same coords
|
|
}
|
|
|
|
# Movement detection thresholds
|
|
MOVEMENT_THRESHOLD_MM = 50 # must move 5cm to count as "moving"
|
|
STATIC_THRESHOLD = 3 # this many static readings = probably not a real person
|
|
HIGH_CONFIDENCE_THRESHOLD = 0.85 # above this, trust detection even if static
|
|
|
|
# Create MCP server
|
|
mcp = FastMCP("oak-mcp")
|
|
|
|
|
|
async def api_get(endpoint: str, params: dict = None) -> dict:
|
|
"""Make GET request to oak-service API, return JSON."""
|
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
|
url = f"{OAK_SERVICE_URL}{endpoint}"
|
|
response = await client.get(url, params=params)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
|
|
async def api_get_binary(endpoint: str) -> bytes:
|
|
"""Make GET request to oak-service API, return binary data."""
|
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
|
url = f"{OAK_SERVICE_URL}{endpoint}"
|
|
response = await client.get(url)
|
|
response.raise_for_status()
|
|
return response.content
|
|
|
|
|
|
async def api_post(endpoint: str, params: dict = None) -> dict:
|
|
"""Make POST request to oak-service API, return JSON."""
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
url = f"{OAK_SERVICE_URL}{endpoint}"
|
|
response = await client.post(url, params=params)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
|
|
async def api_post_multipart(endpoint: str, data: dict, files: dict) -> dict:
|
|
"""Make POST request with multipart form data, return JSON."""
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
url = f"{OAK_SERVICE_URL}{endpoint}"
|
|
response = await client.post(url, data=data, files=files)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
|
|
async def api_delete(endpoint: str) -> dict:
|
|
"""Make DELETE request to oak-service API, return JSON."""
|
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
|
url = f"{OAK_SERVICE_URL}{endpoint}"
|
|
response = await client.delete(url)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
|
|
def check_movement(x_mm: float, y_mm: float, z_mm: float) -> tuple[bool, int]:
|
|
"""
|
|
Check if detection coordinates have moved since last reading.
|
|
|
|
Returns:
|
|
(is_moving, static_count)
|
|
"""
|
|
global _last_detection
|
|
|
|
is_moving = False
|
|
|
|
if _last_detection["x_mm"] is not None:
|
|
delta = (abs(x_mm - _last_detection["x_mm"]) +
|
|
abs(y_mm - _last_detection["y_mm"]) +
|
|
abs(z_mm - _last_detection["z_mm"]))
|
|
is_moving = delta > MOVEMENT_THRESHOLD_MM
|
|
|
|
if not is_moving:
|
|
_last_detection["static_count"] += 1
|
|
else:
|
|
_last_detection["static_count"] = 0
|
|
else:
|
|
_last_detection["static_count"] = 0
|
|
|
|
_last_detection.update({
|
|
"x_mm": x_mm, "y_mm": y_mm, "z_mm": z_mm,
|
|
"timestamp": time.time()
|
|
})
|
|
|
|
return is_moving, _last_detection["static_count"]
|
|
|
|
|
|
@mcp.tool()
|
|
async def oak_health() -> str:
|
|
"""
|
|
Check OAK-D service health and connection status.
|
|
|
|
Returns:
|
|
Health status including whether OAK-D camera is connected.
|
|
|
|
Example:
|
|
oak_health()
|
|
"""
|
|
try:
|
|
data = await api_get("/health")
|
|
status = "✅ Connected" if data.get("oak_connected") else "❌ Not connected"
|
|
spatial = "✅ Yes" if data.get("spatial_enabled") else "❌ No"
|
|
face_recog = "✅ Yes" if data.get("face_recognition_enabled") else "❌ No"
|
|
pose = "✅ Yes" if data.get("pose_model_loaded") else "❌ No"
|
|
version = data.get("version", "unknown")
|
|
return f"""🦊 OAK-D Service Health:
|
|
• Status: {data.get('status', 'unknown')}
|
|
• Camera: {status}
|
|
• Spatial depth: {spatial}
|
|
• Face recognition: {face_recog}
|
|
• Pose estimation: {pose}
|
|
• Version: {version}
|
|
• Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(data.get('timestamp', 0)))}"""
|
|
except httpx.HTTPError as e:
|
|
return f"❌ Error connecting to oak-service: {e}"
|
|
except Exception as e:
|
|
return f"❌ Error: {e}"
|
|
|
|
|
|
@mcp.tool()
|
|
async def oak_status() -> str:
|
|
"""
|
|
Get detailed OAK-D device status.
|
|
|
|
Returns:
|
|
Device ID, USB speed, and connection info.
|
|
|
|
Example:
|
|
oak_status()
|
|
"""
|
|
try:
|
|
data = await api_get("/status")
|
|
if not data.get("connected"):
|
|
return f"❌ OAK-D not connected: {data.get('message', 'Unknown error')}"
|
|
|
|
return f"""👀 OAK-D Device Status:
|
|
• Connected: ✅ Yes
|
|
• Device ID: {data.get('device_id', 'unknown')}
|
|
• USB Speed: {data.get('usb_speed', 'unknown')}"""
|
|
except httpx.HTTPError as e:
|
|
return f"❌ Error connecting to oak-service: {e}"
|
|
except Exception as e:
|
|
return f"❌ Error: {e}"
|
|
|
|
|
|
@mcp.tool()
|
|
async def oak_snapshot(save: bool = True, filename: str = None) -> str:
|
|
"""
|
|
Capture a snapshot from OAK-D RGB camera.
|
|
|
|
Args:
|
|
save: If True, save to disk (default True)
|
|
filename: Optional custom filename (default: oak-TIMESTAMP.jpg)
|
|
|
|
Returns:
|
|
Path to saved image file, or base64 data if not saving.
|
|
|
|
Example:
|
|
oak_snapshot() # Save with auto-generated name
|
|
oak_snapshot(filename="test.jpg") # Save with specific name
|
|
"""
|
|
try:
|
|
# Get image data
|
|
image_data = await api_get_binary("/snapshot")
|
|
|
|
if save:
|
|
# Generate filename
|
|
if filename is None:
|
|
timestamp = time.strftime("%Y%m%d-%H%M%S")
|
|
filename = f"oak-{timestamp}.jpg"
|
|
|
|
filepath = os.path.join(SNAPSHOT_DIR, filename)
|
|
|
|
# Save to disk
|
|
with open(filepath, 'wb') as f:
|
|
f.write(image_data)
|
|
|
|
size_kb = len(image_data) / 1024
|
|
return f"""📸 Snapshot captured!
|
|
• File: {filepath}
|
|
• Size: {size_kb:.1f} KB
|
|
• Resolution: 2104x1560"""
|
|
else:
|
|
# Return base64 for inline use
|
|
b64 = base64.b64encode(image_data).decode('utf-8')
|
|
return f"data:image/jpeg;base64,{b64[:50]}... (base64 image data)"
|
|
|
|
except httpx.HTTPError as e:
|
|
return f"❌ Error capturing snapshot: {e}"
|
|
except Exception as e:
|
|
return f"❌ Error: {e}"
|
|
|
|
|
|
@mcp.tool()
|
|
async def oak_presence() -> str:
|
|
"""
|
|
Check if Foxy is present with SPATIAL data, movement tracking, and face recognition!
|
|
|
|
Static detections at moderate confidence are filtered out
|
|
(goodbye, Spinal Nerves poster! 🖼️❌)
|
|
|
|
Returns:
|
|
Presence status, person count, confidence, distance, 3D position,
|
|
movement state, and recognized name if known.
|
|
|
|
Example:
|
|
oak_presence() # "Present: ✅ Yes (1 person, 87%, moving) at 1.24m — Alex"
|
|
"""
|
|
try:
|
|
data = await api_get("/presence")
|
|
present = data.get("present", False)
|
|
count = data.get("person_count", 0)
|
|
confidence = data.get("confidence", 0)
|
|
last_seen = data.get("seconds_since_seen")
|
|
distance_m = data.get("distance_m")
|
|
spatial = data.get("spatial")
|
|
|
|
# Face recognition
|
|
recognized = data.get("recognized_name")
|
|
recog_conf = data.get("recognition_confidence")
|
|
|
|
# Movement tracking
|
|
is_moving = False
|
|
static_count = 0
|
|
filtered_out = False
|
|
|
|
if spatial and present:
|
|
x_mm = spatial.get("x_mm", 0)
|
|
y_mm = spatial.get("y_mm", 0)
|
|
z_mm = spatial.get("z_mm", 0)
|
|
|
|
is_moving, static_count = check_movement(x_mm, y_mm, z_mm)
|
|
|
|
# Filter: moderate confidence + static + unrecognized = probably poster
|
|
if (confidence < HIGH_CONFIDENCE_THRESHOLD
|
|
and static_count >= STATIC_THRESHOLD
|
|
and not recognized):
|
|
present = False
|
|
filtered_out = True
|
|
logger.info(f"Filtered static: conf={confidence:.0%}, static={static_count}")
|
|
|
|
conf_pct = confidence * 100
|
|
|
|
if present:
|
|
dist_str = f" at {distance_m:.2f}m" if distance_m else ""
|
|
move_str = "moving" if is_moving else "still"
|
|
name_str = f" — {recognized}" if recognized else ""
|
|
status = f"✅ Yes ({count} person{'s' if count != 1 else ''}, {conf_pct:.0f}%, {move_str}){dist_str}{name_str}"
|
|
elif filtered_out:
|
|
status = f"🖼️ Filtered (static at {conf_pct:.0f}% - probably poster)"
|
|
elif last_seen is not None:
|
|
status = f"❌ No (last seen {last_seen:.0f}s ago)"
|
|
else:
|
|
status = "❌ No (never seen)"
|
|
|
|
result = f"👀 Present: {status}"
|
|
|
|
if recognized:
|
|
result += f"\n• Recognized: {recognized} ({recog_conf*100:.0f}% match)" if recog_conf else f"\n• Recognized: {recognized}"
|
|
|
|
if spatial and (present or filtered_out):
|
|
x_mm = spatial.get("x_mm", 0)
|
|
y_mm = spatial.get("y_mm", 0)
|
|
h_pos = "left" if x_mm < -100 else "right" if x_mm > 100 else "center"
|
|
v_pos = "below" if y_mm < -100 else "above" if y_mm > 100 else "center"
|
|
result += f"\n• Position: {h_pos}, {v_pos} @ {distance_m:.2f}m"
|
|
result += f"\n• Movement: {'✅ moving' if is_moving else f'⏸️ static ({static_count})'}"
|
|
|
|
return result
|
|
except httpx.HTTPError as e:
|
|
return f"❌ Error checking presence: {e}"
|
|
except Exception as e:
|
|
return f"❌ Error: {e}"
|
|
|
|
|
|
@mcp.tool()
|
|
async def oak_snapshot_info() -> str:
|
|
"""
|
|
Get snapshot metadata without capturing full image.
|
|
|
|
Returns:
|
|
Frame dimensions and timestamp.
|
|
|
|
Example:
|
|
oak_snapshot_info()
|
|
"""
|
|
try:
|
|
data = await api_get("/snapshot/info")
|
|
return f"""📐 Snapshot Info:
|
|
• Width: {data.get('width', 'unknown')} px
|
|
• Height: {data.get('height', 'unknown')} px
|
|
• Channels: {data.get('channels', 'unknown')}"""
|
|
except httpx.HTTPError as e:
|
|
return f"❌ Error: {e}"
|
|
except Exception as e:
|
|
return f"❌ Error: {e}"
|
|
|
|
|
|
@mcp.tool()
|
|
async def oak_spatial() -> str:
|
|
"""
|
|
Get detailed 3D spatial tracking data from OAK-D stereo depth.
|
|
|
|
Returns:
|
|
Full spatial coordinates, bounding box, and detection details.
|
|
|
|
Example:
|
|
oak_spatial()
|
|
"""
|
|
try:
|
|
data = await api_get("/detections")
|
|
count = data.get("person_count", 0)
|
|
detections = data.get("detections", [])
|
|
|
|
if count == 0:
|
|
return "📏 No person detected for spatial tracking"
|
|
|
|
result = f"📏 Spatial Detection ({count} person{'s' if count != 1 else ''}):\n"
|
|
|
|
for i, det in enumerate(detections):
|
|
conf = det.get("confidence", 0) * 100
|
|
x_mm = det.get("x_mm", 0)
|
|
y_mm = det.get("y_mm", 0)
|
|
z_mm = det.get("z_mm", 0)
|
|
dist_m = det.get("distance_m", z_mm / 1000)
|
|
recognized = det.get("recognized_name")
|
|
recog_conf = det.get("recognition_confidence")
|
|
|
|
name_str = f" — {recognized}" if recognized else ""
|
|
result += f"""
|
|
Person {i+1}{name_str}:
|
|
• Confidence: {conf:.0f}%
|
|
• Distance: {dist_m:.2f}m
|
|
• X: {int(x_mm)}mm ({"left" if x_mm < 0 else "right"} of center)
|
|
• Y: {int(y_mm)}mm ({"above" if y_mm > 0 else "below"} center)
|
|
• Z: {int(z_mm)}mm (depth)
|
|
• BBox: ({det.get('xmin', 0):.2f}, {det.get('ymin', 0):.2f}) to ({det.get('xmax', 0):.2f}, {det.get('ymax', 0):.2f})"""
|
|
if recognized:
|
|
result += f"\n • Recognized: {recognized} ({recog_conf*100:.0f}% match)" if recog_conf else f"\n • Recognized: {recognized}"
|
|
|
|
return result
|
|
except httpx.HTTPError as e:
|
|
return f"❌ Error getting spatial data: {e}"
|
|
except Exception as e:
|
|
return f"❌ Error: {e}"
|
|
|
|
|
|
@mcp.tool()
|
|
async def oak_depth(save: bool = True, filename: str = None) -> str:
|
|
"""
|
|
Capture colorized depth frame from OAK-D stereo cameras.
|
|
|
|
Args:
|
|
save: If True, save to disk (default True)
|
|
filename: Optional custom filename (default: depth-TIMESTAMP.jpg)
|
|
|
|
Returns:
|
|
Path to saved depth visualization, showing distance as colors.
|
|
|
|
Example:
|
|
oak_depth() # Save colorized depth map
|
|
"""
|
|
try:
|
|
image_data = await api_get_binary("/depth")
|
|
|
|
if save:
|
|
if filename is None:
|
|
timestamp = time.strftime("%Y%m%d-%H%M%S")
|
|
filename = f"depth-{timestamp}.jpg"
|
|
|
|
filepath = os.path.join(SNAPSHOT_DIR, filename)
|
|
|
|
with open(filepath, 'wb') as f:
|
|
f.write(image_data)
|
|
|
|
size_kb = len(image_data) / 1024
|
|
return f"""🌈 Depth frame captured!
|
|
• File: {filepath}
|
|
• Size: {size_kb:.1f} KB
|
|
• Colors: Blue=close, Red=far"""
|
|
else:
|
|
b64 = base64.b64encode(image_data).decode('utf-8')
|
|
return f"data:image/jpeg;base64,{b64[:50]}... (base64 depth data)"
|
|
|
|
except httpx.HTTPError as e:
|
|
return f"❌ Error capturing depth: {e}"
|
|
except Exception as e:
|
|
return f"❌ Error: {e}"
|
|
|
|
|
|
@mcp.tool()
|
|
async def oak_reset_tracking() -> str:
|
|
"""Reset movement tracking state (use after repositioning camera)."""
|
|
global _last_detection
|
|
_last_detection = {
|
|
"x_mm": None, "y_mm": None, "z_mm": None,
|
|
"timestamp": None, "static_count": 0,
|
|
}
|
|
return "🔄 Movement tracking reset"
|
|
|
|
|
|
# ============== Pose Estimation Tools ==============
|
|
|
|
|
|
@mcp.tool()
|
|
async def oak_pose() -> str:
|
|
"""
|
|
Get body pose keypoints from MoveNet Lightning on Coral 2.
|
|
|
|
Returns 17 body keypoints (nose, eyes, ears, shoulders, elbows,
|
|
wrists, hips, knees, ankles) with x/y coordinates and confidence.
|
|
Only runs when a person is detected.
|
|
|
|
Returns:
|
|
Pose keypoints with confidence scores, or inactive status.
|
|
|
|
Example:
|
|
oak_pose()
|
|
"""
|
|
try:
|
|
data = await api_get("/pose")
|
|
active = data.get("active", False)
|
|
|
|
if not active:
|
|
return "🤸 Pose: No person detected"
|
|
|
|
keypoints = data.get("keypoints", [])
|
|
num_valid = data.get("num_valid", 0)
|
|
mean_conf = data.get("mean_confidence", 0)
|
|
inference_ms = data.get("inference_ms", 0)
|
|
|
|
result = f"🤸 Pose Estimation ({num_valid}/17 keypoints, {mean_conf:.0%} avg confidence, {inference_ms:.1f}ms):\n"
|
|
|
|
for kp in keypoints:
|
|
conf = kp.get("confidence", 0)
|
|
marker = "✅" if conf >= 0.2 else "·"
|
|
result += f" {marker} {kp['name']}: ({kp['x']:.2f}, {kp['y']:.2f}) {conf:.0%}\n"
|
|
|
|
return result
|
|
except httpx.HTTPStatusError as e:
|
|
if e.response.status_code == 503:
|
|
return "🤸 Pose estimator not available (Coral 2 not loaded)"
|
|
return f"❌ Error getting pose: {e}"
|
|
except httpx.HTTPError as e:
|
|
return f"❌ Error connecting to oak-service: {e}"
|
|
except Exception as e:
|
|
return f"❌ Error: {e}"
|
|
|
|
|
|
@mcp.tool()
|
|
async def oak_posture() -> str:
|
|
"""
|
|
Get high-level posture summary: standing/sitting, facing camera, arms raised.
|
|
|
|
Derived from MoveNet Lightning keypoints. Simpler than oak_pose —
|
|
gives you the "what" without all the raw coordinates.
|
|
|
|
Returns:
|
|
Posture description (standing/sitting/unknown, facing camera, arms raised).
|
|
|
|
Example:
|
|
oak_posture()
|
|
"""
|
|
try:
|
|
data = await api_get("/pose/summary")
|
|
active = data.get("active", False)
|
|
|
|
if not active:
|
|
return "🧍 Posture: No person detected"
|
|
|
|
posture = data.get("posture", "unknown")
|
|
facing = data.get("facing_camera", False)
|
|
arms = data.get("arms_raised", False)
|
|
num_valid = data.get("num_valid", 0)
|
|
mean_conf = data.get("mean_confidence", 0)
|
|
|
|
parts = []
|
|
if posture != "unknown":
|
|
parts.append(posture)
|
|
parts.append("facing camera" if facing else "not facing camera")
|
|
if arms:
|
|
parts.append("arms raised")
|
|
|
|
return f"""🧍 Posture: {', '.join(parts)}
|
|
• Confidence: {mean_conf:.0%} ({num_valid}/17 keypoints)"""
|
|
except httpx.HTTPStatusError as e:
|
|
if e.response.status_code == 503:
|
|
return "🧍 Pose estimator not available (Coral 2 not loaded)"
|
|
return f"❌ Error getting posture: {e}"
|
|
except httpx.HTTPError as e:
|
|
return f"❌ Error connecting to oak-service: {e}"
|
|
except Exception as e:
|
|
return f"❌ Error: {e}"
|
|
|
|
|
|
# ============== Face Recognition Tools ==============
|
|
|
|
|
|
@mcp.tool()
|
|
async def oak_faces() -> str:
|
|
"""
|
|
List all enrolled faces in the recognition database.
|
|
|
|
Returns:
|
|
List of enrolled people with embedding counts.
|
|
|
|
Example:
|
|
oak_faces()
|
|
"""
|
|
try:
|
|
data = await api_get("/faces")
|
|
faces = data.get("faces", [])
|
|
if not faces:
|
|
return "🧑 No faces enrolled yet. Use oak_enroll_face to add someone."
|
|
|
|
result = f"🧑 Enrolled Faces ({len(faces)}):\n"
|
|
for f in faces:
|
|
enrolled = time.strftime(
|
|
"%Y-%m-%d %H:%M",
|
|
time.localtime(f.get("enrolled_at", 0)),
|
|
)
|
|
result += f" • {f['name']} ({f['embedding_count']} embedding{'s' if f['embedding_count'] != 1 else ''}, enrolled {enrolled})\n"
|
|
return result
|
|
except httpx.HTTPError as e:
|
|
return f"❌ Error listing faces: {e}"
|
|
except Exception as e:
|
|
return f"❌ Error: {e}"
|
|
|
|
|
|
@mcp.tool()
|
|
async def oak_enroll_face(name: str, photo_path: str = None) -> str:
|
|
"""
|
|
Enroll a face for recognition. Either provide a photo file path, or omit
|
|
photo_path to capture from the live camera.
|
|
|
|
Args:
|
|
name: Person's name to associate with this face.
|
|
photo_path: Path to a photo file (JPEG/PNG). If not provided, uses current camera frame.
|
|
|
|
Returns:
|
|
Enrollment result with embedding count.
|
|
|
|
Example:
|
|
oak_enroll_face(name="Alex") # From live camera
|
|
oak_enroll_face(name="Alex", photo_path="/path/to/photo.jpg")
|
|
"""
|
|
try:
|
|
if photo_path:
|
|
if not os.path.isfile(photo_path):
|
|
return f"❌ File not found: {photo_path}"
|
|
with open(photo_path, "rb") as f:
|
|
photo_data = f.read()
|
|
data = await api_post_multipart(
|
|
"/faces/enroll",
|
|
data={"name": name},
|
|
files={"photo": (os.path.basename(photo_path), photo_data, "image/jpeg")},
|
|
)
|
|
else:
|
|
data = await api_post("/faces/enroll-from-camera", params={"name": name})
|
|
|
|
count = data.get("embedding_count", 1)
|
|
return f"✅ Enrolled face for '{name}' ({count} embedding{'s' if count != 1 else ''} total)"
|
|
except httpx.HTTPStatusError as e:
|
|
detail = ""
|
|
try:
|
|
detail = e.response.json().get("detail", "")
|
|
except Exception:
|
|
pass
|
|
return f"❌ Enrollment failed: {detail or e}"
|
|
except httpx.HTTPError as e:
|
|
return f"❌ Error connecting to oak-service: {e}"
|
|
except Exception as e:
|
|
return f"❌ Error: {e}"
|
|
|
|
|
|
@mcp.tool()
|
|
async def oak_delete_face(name: str) -> str:
|
|
"""
|
|
Remove a person from the face recognition database.
|
|
|
|
Args:
|
|
name: Name of the person to remove.
|
|
|
|
Returns:
|
|
Deletion result.
|
|
|
|
Example:
|
|
oak_delete_face(name="Alex")
|
|
"""
|
|
try:
|
|
data = await api_delete(f"/faces/{name}")
|
|
deleted = data.get("deleted", 0)
|
|
return f"✅ Removed '{name}' ({deleted} embedding{'s' if deleted != 1 else ''} deleted)"
|
|
except httpx.HTTPStatusError as e:
|
|
detail = ""
|
|
try:
|
|
detail = e.response.json().get("detail", "")
|
|
except Exception:
|
|
pass
|
|
return f"❌ Delete failed: {detail or e}"
|
|
except httpx.HTTPError as e:
|
|
return f"❌ Error connecting to oak-service: {e}"
|
|
except Exception as e:
|
|
return f"❌ Error: {e}"
|
|
|
|
|
|
# Run the server
|
|
if __name__ == "__main__":
|
|
mcp.run()
|