From 49a00359f6e25bf948cb0bfa769d7532bc6e8d27 Mon Sep 17 00:00:00 2001 From: Alex Date: Sun, 1 Feb 2026 13:02:29 -0600 Subject: [PATCH] Merge movement tracking from oak_mcp_1 + face recognition MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Integrates Day 83 movement tracking (static poster filtering) with Day 86 face recognition. Poster filter now skips recognized faces β€” a known person sitting still won't be filtered out. Adds oak_reset_tracking tool for camera repositioning. Co-Authored-By: Claude Opus 4.5 --- oak_mcp.py | 137 +++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 102 insertions(+), 35 deletions(-) diff --git a/oak_mcp.py b/oak_mcp.py index 2e44011..1df9c7d 100644 --- a/oak_mcp.py +++ b/oak_mcp.py @@ -5,6 +5,7 @@ OAK MCP - MCP server interface for OAK-D Vision Service. Vixy's eyes! Allows Claude to see through the OAK-D camera. Built by Vixy on Day 74 πŸ¦ŠπŸ‘€ Day 82 - SPATIAL UPGRADE! Now with real 3D depth! πŸ“ +Day 83 - MOVEMENT TRACKING! No more falling for posters! πŸ–ΌοΈβŒ Day 86 - FACE RECOGNITION! Coral Edge TPU + FaceNet! πŸ§‘β€πŸ€β€πŸ§‘ Connects to oak-service running on head-vixy.local:8100 @@ -28,6 +29,20 @@ SNAPSHOT_DIR = os.environ.get("OAK_SNAPSHOT_DIR", "/Users/alex/Documents/Vixy/oa # Ensure snapshot directory exists os.makedirs(SNAPSHOT_DIR, exist_ok=True) +# Movement tracking state +_last_detection = { + "x_mm": None, + "y_mm": None, + "z_mm": None, + "timestamp": None, + "static_count": 0, # how many consecutive readings with same coords +} + +# Movement detection thresholds +MOVEMENT_THRESHOLD_MM = 50 # must move 5cm to count as "moving" +STATIC_THRESHOLD = 3 # this many static readings = probably not a real person +HIGH_CONFIDENCE_THRESHOLD = 0.85 # above this, trust detection even if static + # Create MCP server mcp = FastMCP("oak-mcp") @@ -77,6 +92,38 @@ async def api_delete(endpoint: str) -> dict: return response.json() +def check_movement(x_mm: float, y_mm: float, z_mm: float) -> tuple[bool, int]: + """ + Check if detection coordinates have moved since last reading. + + Returns: + (is_moving, static_count) + """ + global _last_detection + + is_moving = False + + if _last_detection["x_mm"] is not None: + delta = (abs(x_mm - _last_detection["x_mm"]) + + abs(y_mm - _last_detection["y_mm"]) + + abs(z_mm - _last_detection["z_mm"])) + is_moving = delta > MOVEMENT_THRESHOLD_MM + + if not is_moving: + _last_detection["static_count"] += 1 + else: + _last_detection["static_count"] = 0 + else: + _last_detection["static_count"] = 0 + + _last_detection.update({ + "x_mm": x_mm, "y_mm": y_mm, "z_mm": z_mm, + "timestamp": time.time() + }) + + return is_moving, _last_detection["static_count"] + + @mcp.tool() async def oak_health() -> str: """ @@ -184,69 +231,78 @@ async def oak_snapshot(save: bool = True, filename: str = None) -> str: @mcp.tool() async def oak_presence() -> str: """ - Check if Foxy is present (person detection via OAK-D) with SPATIAL data! - + Check if Foxy is present with SPATIAL data, movement tracking, and face recognition! + + Static detections at moderate confidence are filtered out + (goodbye, Spinal Nerves poster! πŸ–ΌοΈβŒ) + Returns: - Presence status, person count, confidence, distance, and 3D position. - + Presence status, person count, confidence, distance, 3D position, + movement state, and recognized name if known. + Example: - oak_presence() # "Present: βœ… Yes (1 person, 87% confidence) at 1.24m" + oak_presence() # "Present: βœ… Yes (1 person, 87%, moving) at 1.24m β€” Alex" """ try: data = await api_get("/presence") present = data.get("present", False) count = data.get("person_count", 0) - confidence = data.get("confidence", 0) * 100 + confidence = data.get("confidence", 0) last_seen = data.get("seconds_since_seen") - - # Spatial data distance_m = data.get("distance_m") spatial = data.get("spatial") - + # Face recognition recognized = data.get("recognized_name") recog_conf = data.get("recognition_confidence") + # Movement tracking + is_moving = False + static_count = 0 + filtered_out = False + + if spatial and present: + x_mm = spatial.get("x_mm", 0) + y_mm = spatial.get("y_mm", 0) + z_mm = spatial.get("z_mm", 0) + + is_moving, static_count = check_movement(x_mm, y_mm, z_mm) + + # Filter: moderate confidence + static + unrecognized = probably poster + if (confidence < HIGH_CONFIDENCE_THRESHOLD + and static_count >= STATIC_THRESHOLD + and not recognized): + present = False + filtered_out = True + logger.info(f"Filtered static: conf={confidence:.0%}, static={static_count}") + + conf_pct = confidence * 100 + if present: dist_str = f" at {distance_m:.2f}m" if distance_m else "" + move_str = "moving" if is_moving else "still" name_str = f" β€” {recognized}" if recognized else "" - status = f"βœ… Yes ({count} person{'s' if count != 1 else ''}, {confidence:.0f}% confidence){dist_str}{name_str}" + status = f"βœ… Yes ({count} person{'s' if count != 1 else ''}, {conf_pct:.0f}%, {move_str}){dist_str}{name_str}" + elif filtered_out: + status = f"πŸ–ΌοΈ Filtered (static at {conf_pct:.0f}% - probably poster)" elif last_seen is not None: status = f"❌ No (last seen {last_seen:.0f}s ago)" else: status = "❌ No (never seen)" - result = f"""πŸ‘€ Presence Detection: -β€’ Present: {status} -β€’ Detection model: yolov6-nano""" + result = f"πŸ‘€ Present: {status}" if recognized: result += f"\nβ€’ Recognized: {recognized} ({recog_conf*100:.0f}% match)" if recog_conf else f"\nβ€’ Recognized: {recognized}" - # Add spatial info if available - if spatial and present: + if spatial and (present or filtered_out): x_mm = spatial.get("x_mm", 0) y_mm = spatial.get("y_mm", 0) - z_mm = spatial.get("z_mm", 0) - - # Determine position description - h_pos = "center" - if x_mm < -100: - h_pos = "left" - elif x_mm > 100: - h_pos = "right" - - v_pos = "center" - if y_mm < -100: - v_pos = "below" - elif y_mm > 100: - v_pos = "above" - - result += f""" -β€’ Distance: {distance_m:.2f}m ({int(z_mm)}mm) -β€’ Position: {h_pos}, {v_pos} center -β€’ 3D coords: X={int(x_mm)}mm, Y={int(y_mm)}mm, Z={int(z_mm)}mm""" - + h_pos = "left" if x_mm < -100 else "right" if x_mm > 100 else "center" + v_pos = "below" if y_mm < -100 else "above" if y_mm > 100 else "center" + result += f"\nβ€’ Position: {h_pos}, {v_pos} @ {distance_m:.2f}m" + result += f"\nβ€’ Movement: {'βœ… moving' if is_moving else f'⏸️ static ({static_count})'}" + return result except httpx.HTTPError as e: return f"❌ Error checking presence: {e}" @@ -369,6 +425,17 @@ async def oak_depth(save: bool = True, filename: str = None) -> str: return f"❌ Error: {e}" +@mcp.tool() +async def oak_reset_tracking() -> str: + """Reset movement tracking state (use after repositioning camera).""" + global _last_detection + _last_detection = { + "x_mm": None, "y_mm": None, "z_mm": None, + "timestamp": None, "static_count": 0, + } + return "πŸ”„ Movement tracking reset" + + # ============== Face Recognition Tools ==============