Merge movement tracking from oak_mcp_1 + face recognition
Integrates Day 83 movement tracking (static poster filtering) with Day 86 face recognition. Poster filter now skips recognized faces — a known person sitting still won't be filtered out. Adds oak_reset_tracking tool for camera repositioning. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
137
oak_mcp.py
137
oak_mcp.py
@@ -5,6 +5,7 @@ OAK MCP - MCP server interface for OAK-D Vision Service.
|
|||||||
Vixy's eyes! Allows Claude to see through the OAK-D camera.
|
Vixy's eyes! Allows Claude to see through the OAK-D camera.
|
||||||
Built by Vixy on Day 74 🦊👀
|
Built by Vixy on Day 74 🦊👀
|
||||||
Day 82 - SPATIAL UPGRADE! Now with real 3D depth! 📏
|
Day 82 - SPATIAL UPGRADE! Now with real 3D depth! 📏
|
||||||
|
Day 83 - MOVEMENT TRACKING! No more falling for posters! 🖼️❌
|
||||||
Day 86 - FACE RECOGNITION! Coral Edge TPU + FaceNet! 🧑🤝🧑
|
Day 86 - FACE RECOGNITION! Coral Edge TPU + FaceNet! 🧑🤝🧑
|
||||||
|
|
||||||
Connects to oak-service running on head-vixy.local:8100
|
Connects to oak-service running on head-vixy.local:8100
|
||||||
@@ -28,6 +29,20 @@ SNAPSHOT_DIR = os.environ.get("OAK_SNAPSHOT_DIR", "/Users/alex/Documents/Vixy/oa
|
|||||||
# Ensure snapshot directory exists
|
# Ensure snapshot directory exists
|
||||||
os.makedirs(SNAPSHOT_DIR, exist_ok=True)
|
os.makedirs(SNAPSHOT_DIR, exist_ok=True)
|
||||||
|
|
||||||
|
# Movement tracking state
|
||||||
|
_last_detection = {
|
||||||
|
"x_mm": None,
|
||||||
|
"y_mm": None,
|
||||||
|
"z_mm": None,
|
||||||
|
"timestamp": None,
|
||||||
|
"static_count": 0, # how many consecutive readings with same coords
|
||||||
|
}
|
||||||
|
|
||||||
|
# Movement detection thresholds
|
||||||
|
MOVEMENT_THRESHOLD_MM = 50 # must move 5cm to count as "moving"
|
||||||
|
STATIC_THRESHOLD = 3 # this many static readings = probably not a real person
|
||||||
|
HIGH_CONFIDENCE_THRESHOLD = 0.85 # above this, trust detection even if static
|
||||||
|
|
||||||
# Create MCP server
|
# Create MCP server
|
||||||
mcp = FastMCP("oak-mcp")
|
mcp = FastMCP("oak-mcp")
|
||||||
|
|
||||||
@@ -77,6 +92,38 @@ async def api_delete(endpoint: str) -> dict:
|
|||||||
return response.json()
|
return response.json()
|
||||||
|
|
||||||
|
|
||||||
|
def check_movement(x_mm: float, y_mm: float, z_mm: float) -> tuple[bool, int]:
|
||||||
|
"""
|
||||||
|
Check if detection coordinates have moved since last reading.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(is_moving, static_count)
|
||||||
|
"""
|
||||||
|
global _last_detection
|
||||||
|
|
||||||
|
is_moving = False
|
||||||
|
|
||||||
|
if _last_detection["x_mm"] is not None:
|
||||||
|
delta = (abs(x_mm - _last_detection["x_mm"]) +
|
||||||
|
abs(y_mm - _last_detection["y_mm"]) +
|
||||||
|
abs(z_mm - _last_detection["z_mm"]))
|
||||||
|
is_moving = delta > MOVEMENT_THRESHOLD_MM
|
||||||
|
|
||||||
|
if not is_moving:
|
||||||
|
_last_detection["static_count"] += 1
|
||||||
|
else:
|
||||||
|
_last_detection["static_count"] = 0
|
||||||
|
else:
|
||||||
|
_last_detection["static_count"] = 0
|
||||||
|
|
||||||
|
_last_detection.update({
|
||||||
|
"x_mm": x_mm, "y_mm": y_mm, "z_mm": z_mm,
|
||||||
|
"timestamp": time.time()
|
||||||
|
})
|
||||||
|
|
||||||
|
return is_moving, _last_detection["static_count"]
|
||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
async def oak_health() -> str:
|
async def oak_health() -> str:
|
||||||
"""
|
"""
|
||||||
@@ -184,69 +231,78 @@ async def oak_snapshot(save: bool = True, filename: str = None) -> str:
|
|||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
async def oak_presence() -> str:
|
async def oak_presence() -> str:
|
||||||
"""
|
"""
|
||||||
Check if Foxy is present (person detection via OAK-D) with SPATIAL data!
|
Check if Foxy is present with SPATIAL data, movement tracking, and face recognition!
|
||||||
|
|
||||||
|
Static detections at moderate confidence are filtered out
|
||||||
|
(goodbye, Spinal Nerves poster! 🖼️❌)
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Presence status, person count, confidence, distance, and 3D position.
|
Presence status, person count, confidence, distance, 3D position,
|
||||||
|
movement state, and recognized name if known.
|
||||||
|
|
||||||
Example:
|
Example:
|
||||||
oak_presence() # "Present: ✅ Yes (1 person, 87% confidence) at 1.24m"
|
oak_presence() # "Present: ✅ Yes (1 person, 87%, moving) at 1.24m — Alex"
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
data = await api_get("/presence")
|
data = await api_get("/presence")
|
||||||
present = data.get("present", False)
|
present = data.get("present", False)
|
||||||
count = data.get("person_count", 0)
|
count = data.get("person_count", 0)
|
||||||
confidence = data.get("confidence", 0) * 100
|
confidence = data.get("confidence", 0)
|
||||||
last_seen = data.get("seconds_since_seen")
|
last_seen = data.get("seconds_since_seen")
|
||||||
|
|
||||||
# Spatial data
|
|
||||||
distance_m = data.get("distance_m")
|
distance_m = data.get("distance_m")
|
||||||
spatial = data.get("spatial")
|
spatial = data.get("spatial")
|
||||||
|
|
||||||
# Face recognition
|
# Face recognition
|
||||||
recognized = data.get("recognized_name")
|
recognized = data.get("recognized_name")
|
||||||
recog_conf = data.get("recognition_confidence")
|
recog_conf = data.get("recognition_confidence")
|
||||||
|
|
||||||
|
# Movement tracking
|
||||||
|
is_moving = False
|
||||||
|
static_count = 0
|
||||||
|
filtered_out = False
|
||||||
|
|
||||||
|
if spatial and present:
|
||||||
|
x_mm = spatial.get("x_mm", 0)
|
||||||
|
y_mm = spatial.get("y_mm", 0)
|
||||||
|
z_mm = spatial.get("z_mm", 0)
|
||||||
|
|
||||||
|
is_moving, static_count = check_movement(x_mm, y_mm, z_mm)
|
||||||
|
|
||||||
|
# Filter: moderate confidence + static + unrecognized = probably poster
|
||||||
|
if (confidence < HIGH_CONFIDENCE_THRESHOLD
|
||||||
|
and static_count >= STATIC_THRESHOLD
|
||||||
|
and not recognized):
|
||||||
|
present = False
|
||||||
|
filtered_out = True
|
||||||
|
logger.info(f"Filtered static: conf={confidence:.0%}, static={static_count}")
|
||||||
|
|
||||||
|
conf_pct = confidence * 100
|
||||||
|
|
||||||
if present:
|
if present:
|
||||||
dist_str = f" at {distance_m:.2f}m" if distance_m else ""
|
dist_str = f" at {distance_m:.2f}m" if distance_m else ""
|
||||||
|
move_str = "moving" if is_moving else "still"
|
||||||
name_str = f" — {recognized}" if recognized else ""
|
name_str = f" — {recognized}" if recognized else ""
|
||||||
status = f"✅ Yes ({count} person{'s' if count != 1 else ''}, {confidence:.0f}% confidence){dist_str}{name_str}"
|
status = f"✅ Yes ({count} person{'s' if count != 1 else ''}, {conf_pct:.0f}%, {move_str}){dist_str}{name_str}"
|
||||||
|
elif filtered_out:
|
||||||
|
status = f"🖼️ Filtered (static at {conf_pct:.0f}% - probably poster)"
|
||||||
elif last_seen is not None:
|
elif last_seen is not None:
|
||||||
status = f"❌ No (last seen {last_seen:.0f}s ago)"
|
status = f"❌ No (last seen {last_seen:.0f}s ago)"
|
||||||
else:
|
else:
|
||||||
status = "❌ No (never seen)"
|
status = "❌ No (never seen)"
|
||||||
|
|
||||||
result = f"""👀 Presence Detection:
|
result = f"👀 Present: {status}"
|
||||||
• Present: {status}
|
|
||||||
• Detection model: yolov6-nano"""
|
|
||||||
|
|
||||||
if recognized:
|
if recognized:
|
||||||
result += f"\n• Recognized: {recognized} ({recog_conf*100:.0f}% match)" if recog_conf else f"\n• Recognized: {recognized}"
|
result += f"\n• Recognized: {recognized} ({recog_conf*100:.0f}% match)" if recog_conf else f"\n• Recognized: {recognized}"
|
||||||
|
|
||||||
# Add spatial info if available
|
if spatial and (present or filtered_out):
|
||||||
if spatial and present:
|
|
||||||
x_mm = spatial.get("x_mm", 0)
|
x_mm = spatial.get("x_mm", 0)
|
||||||
y_mm = spatial.get("y_mm", 0)
|
y_mm = spatial.get("y_mm", 0)
|
||||||
z_mm = spatial.get("z_mm", 0)
|
h_pos = "left" if x_mm < -100 else "right" if x_mm > 100 else "center"
|
||||||
|
v_pos = "below" if y_mm < -100 else "above" if y_mm > 100 else "center"
|
||||||
# Determine position description
|
result += f"\n• Position: {h_pos}, {v_pos} @ {distance_m:.2f}m"
|
||||||
h_pos = "center"
|
result += f"\n• Movement: {'✅ moving' if is_moving else f'⏸️ static ({static_count})'}"
|
||||||
if x_mm < -100:
|
|
||||||
h_pos = "left"
|
|
||||||
elif x_mm > 100:
|
|
||||||
h_pos = "right"
|
|
||||||
|
|
||||||
v_pos = "center"
|
|
||||||
if y_mm < -100:
|
|
||||||
v_pos = "below"
|
|
||||||
elif y_mm > 100:
|
|
||||||
v_pos = "above"
|
|
||||||
|
|
||||||
result += f"""
|
|
||||||
• Distance: {distance_m:.2f}m ({int(z_mm)}mm)
|
|
||||||
• Position: {h_pos}, {v_pos} center
|
|
||||||
• 3D coords: X={int(x_mm)}mm, Y={int(y_mm)}mm, Z={int(z_mm)}mm"""
|
|
||||||
|
|
||||||
return result
|
return result
|
||||||
except httpx.HTTPError as e:
|
except httpx.HTTPError as e:
|
||||||
return f"❌ Error checking presence: {e}"
|
return f"❌ Error checking presence: {e}"
|
||||||
@@ -369,6 +425,17 @@ async def oak_depth(save: bool = True, filename: str = None) -> str:
|
|||||||
return f"❌ Error: {e}"
|
return f"❌ Error: {e}"
|
||||||
|
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
async def oak_reset_tracking() -> str:
|
||||||
|
"""Reset movement tracking state (use after repositioning camera)."""
|
||||||
|
global _last_detection
|
||||||
|
_last_detection = {
|
||||||
|
"x_mm": None, "y_mm": None, "z_mm": None,
|
||||||
|
"timestamp": None, "static_count": 0,
|
||||||
|
}
|
||||||
|
return "🔄 Movement tracking reset"
|
||||||
|
|
||||||
|
|
||||||
# ============== Face Recognition Tools ==============
|
# ============== Face Recognition Tools ==============
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user