Files
oak-service/oak_service.py
Alex cdbf7ff394 Add MoveNet Lightning pose estimation on Coral 2
Integrates single-person pose detection into oak-service using MoveNet
Lightning on a second Google Coral Edge TPU. Detects 17 body keypoints
at ~7ms per frame, derives posture (standing/sitting), facing direction,
and arm position. Only runs when a person is detected by YOLOv6.

New endpoints: /pose (raw keypoints), /pose/summary (derived posture)
New module: pose_estimator.py (PoseEstimator class)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 19:29:16 -06:00

369 lines
11 KiB
Python

#!/usr/bin/env python3
"""
OAK-D Vision Service for Vixy's Head
FastAPI service with person detection and presence tracking
Day 74 - Built by Vixy! 🦊
Day 81 - Added presence detection! Now I can SEE you! 👀💜
Using depthai v3 API with yolov6-nano
"""
import time
import threading
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import FastAPI, HTTPException
from fastapi.responses import Response
import depthai as dai
import cv2
from pose_estimator import PoseEstimator
# ============== Configuration ==============
DETECTION_MODEL = "yolov6-nano" # Has 'person' class
PERSON_CLASS_ID = 0 # 'person' is class 0 in COCO
DETECTION_THRESHOLD = 0.5
PRESENCE_TIMEOUT = 30.0 # seconds without person = not present
DETECTION_INTERVAL = 0.5
# Pose estimation
POSE_MODEL_PATH = Path(__file__).parent / "models" / "movenet_single_pose_lightning_ptq_edgetpu.tflite"
POSE_CORAL_DEVICE = 1 # Second Coral (device 0 is headmic/YAMNet)
# ============== Global State ==============
pipeline_ctx = None
detection_queue = None
rgb_queue = None
detection_thread = None
running = False
labels = []
pose_estimator = None
presence_state = {
"present": False,
"person_count": 0,
"last_seen": None,
"last_detection": None,
"detections": [],
"confidence": 0.0,
}
pose_state = {
"active": False,
"keypoints": [],
"posture": {},
"num_valid": 0,
"mean_confidence": 0.0,
"inference_ms": 0.0,
"last_update": None,
}
def init_oak():
"""Initialize OAK-D with person detection pipeline (depthai v3)."""
global pipeline_ctx, detection_queue, rgb_queue, labels
try:
print("🦊 Initializing OAK-D with yolov6-nano...")
# Create pipeline with context manager pattern for v3
pipeline = dai.Pipeline()
# Create camera node
cam = pipeline.create(dai.node.Camera).build()
# Request RGB output for snapshots (1080p)
cam_out = cam.requestOutput((1920, 1080), dai.ImgFrame.Type.BGR888p)
rgb_queue = cam_out.createOutputQueue(maxSize=1, blocking=False)
# Create detection network with yolov6-nano
desc = dai.NNModelDescription(DETECTION_MODEL)
det = pipeline.create(dai.node.DetectionNetwork).build(cam, desc)
det.setConfidenceThreshold(DETECTION_THRESHOLD)
# Get class labels
labels = det.getClasses()
print(f"✅ Loaded {len(labels)} classes, person={labels[0]}")
# Create detection output queue
detection_queue = det.out.createOutputQueue(maxSize=1, blocking=False)
# Start pipeline
pipeline.start()
pipeline_ctx = pipeline
print("✅ OAK-D initialized with person detection!")
# Initialize pose estimator on Coral 2
_init_pose_estimator()
return True
except Exception as e:
print(f"❌ Failed to initialize OAK-D: {e}")
import traceback
traceback.print_exc()
return False
def _init_pose_estimator():
"""Initialize MoveNet Lightning on the second Coral Edge TPU."""
global pose_estimator
if not POSE_MODEL_PATH.exists():
print(f"⚠️ Pose model not found: {POSE_MODEL_PATH}")
return
try:
pose_estimator = PoseEstimator(
model_path=str(POSE_MODEL_PATH),
device_index=POSE_CORAL_DEVICE,
)
print("✅ Pose estimator initialized on Coral 2!")
except Exception as e:
print(f"⚠️ Pose estimator failed to initialize: {e}")
pose_estimator = None
def cleanup_oak():
"""Cleanup OAK-D resources."""
global pipeline_ctx, running
running = False
if pipeline_ctx:
try:
pipeline_ctx.stop()
pipeline_ctx.close()
except:
pass
pipeline_ctx = None
def detection_loop():
"""Background thread for presence detection + pose estimation."""
global running, presence_state, pose_state, detection_queue
print("🔍 Presence detection loop started")
while running:
try:
if detection_queue is None:
time.sleep(1)
continue
data = detection_queue.tryGet()
if data is not None:
now = time.time()
presence_state["last_detection"] = now
# Filter for person detections only
persons = [d for d in data.detections if d.label == PERSON_CLASS_ID]
person_count = len(persons)
presence_state["person_count"] = person_count
if person_count > 0:
presence_state["present"] = True
presence_state["last_seen"] = now
presence_state["confidence"] = max(d.confidence for d in persons)
presence_state["detections"] = [
{
"xmin": d.xmin, "ymin": d.ymin,
"xmax": d.xmax, "ymax": d.ymax,
"confidence": d.confidence
}
for d in persons
]
# Run pose estimation on the latest RGB frame
_run_pose_estimation()
else:
presence_state["detections"] = []
presence_state["confidence"] = 0.0
# Clear pose when no person
if pose_state["active"]:
pose_state["active"] = False
pose_state["keypoints"] = []
pose_state["posture"] = {}
pose_state["num_valid"] = 0
pose_state["mean_confidence"] = 0.0
# Check timeout
if presence_state["last_seen"]:
if now - presence_state["last_seen"] > PRESENCE_TIMEOUT:
presence_state["present"] = False
time.sleep(DETECTION_INTERVAL)
except Exception as e:
print(f"Detection loop error: {e}")
time.sleep(1)
print("🛑 Presence detection loop stopped")
def _run_pose_estimation():
"""Grab latest RGB frame and run pose estimation via Coral 2."""
global pose_state, rgb_queue, pose_estimator
if pose_estimator is None or rgb_queue is None:
return
try:
frame_msg = rgb_queue.tryGet()
if frame_msg is None:
return
frame = frame_msg.getCvFrame()
result = pose_estimator.estimate(frame)
# Derive posture from keypoints
posture = pose_estimator.derive_posture(result["keypoints"])
pose_state["active"] = True
pose_state["keypoints"] = result["keypoints"]
pose_state["posture"] = posture
pose_state["num_valid"] = result["num_valid"]
pose_state["mean_confidence"] = result["mean_confidence"]
pose_state["inference_ms"] = result["inference_ms"]
pose_state["last_update"] = result["timestamp"]
except Exception as e:
print(f"Pose estimation error: {e}")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup and shutdown."""
global running, detection_thread
print("🦊 Starting OAK-D Vision Service...")
if init_oak():
running = True
detection_thread = threading.Thread(target=detection_loop, daemon=True)
detection_thread.start()
print("✅ Service ready!")
else:
print("⚠️ OAK-D not available")
yield
print("👋 Shutting down...")
cleanup_oak()
app = FastAPI(
title="OAK-D Vision Service",
description="Vixy's eyes with presence detection + pose estimation! 🦊👀",
version="0.4.0",
lifespan=lifespan
)
@app.get("/health")
async def health():
"""Health check."""
return {
"status": "healthy",
"service": "oak-service",
"version": "0.4.0",
"oak_connected": pipeline_ctx is not None,
"detection_model": DETECTION_MODEL,
"pose_model_loaded": pose_estimator is not None,
"timestamp": time.time()
}
@app.get("/presence")
async def presence():
"""Get current presence state - is Foxy there?"""
return {
"present": presence_state["present"],
"person_count": presence_state["person_count"],
"last_seen": presence_state["last_seen"],
"seconds_since_seen": (
time.time() - presence_state["last_seen"]
if presence_state["last_seen"] else None
),
"confidence": presence_state["confidence"],
"timestamp": time.time()
}
@app.get("/detections")
async def detections():
"""Get detailed detection results."""
return {
"person_count": presence_state["person_count"],
"detections": presence_state["detections"],
"last_detection": presence_state["last_detection"],
"timestamp": time.time()
}
@app.get("/snapshot")
async def snapshot():
"""Capture RGB frame."""
global rgb_queue
if rgb_queue is None:
raise HTTPException(status_code=503, detail="OAK-D not initialized")
try:
frame = rgb_queue.tryGet()
if frame is None:
raise HTTPException(status_code=503, detail="No frame available")
img = frame.getCvFrame()
_, jpeg = cv2.imencode(".jpg", img, [cv2.IMWRITE_JPEG_QUALITY, 85])
return Response(content=jpeg.tobytes(), media_type="image/jpeg")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/pose")
async def pose():
"""Get current pose keypoints."""
if pose_estimator is None:
raise HTTPException(status_code=503, detail="Pose estimator not available")
return {
"active": pose_state["active"],
"keypoints": pose_state["keypoints"],
"num_valid": pose_state["num_valid"],
"mean_confidence": pose_state["mean_confidence"],
"inference_ms": pose_state["inference_ms"],
"last_update": pose_state["last_update"],
"timestamp": time.time(),
}
@app.get("/pose/summary")
async def pose_summary():
"""Get derived posture summary."""
if pose_estimator is None:
raise HTTPException(status_code=503, detail="Pose estimator not available")
return {
"active": pose_state["active"],
"posture": pose_state["posture"].get("posture", "unknown"),
"facing_camera": pose_state["posture"].get("facing_camera", False),
"arms_raised": pose_state["posture"].get("arms_raised", False),
"mean_confidence": pose_state["mean_confidence"],
"num_valid": pose_state["num_valid"],
"timestamp": time.time(),
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8100)