Add MoveNet Lightning pose estimation on Coral 2

Integrates single-person pose detection into oak-service using MoveNet
Lightning on a second Google Coral Edge TPU. Detects 17 body keypoints
at ~7ms per frame, derives posture (standing/sitting), facing direction,
and arm position. Only runs when a person is detected by YOLOv6.

New endpoints: /pose (raw keypoints), /pose/summary (derived posture)
New module: pose_estimator.py (PoseEstimator class)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Alex
2026-02-08 19:29:16 -06:00
parent 19a27a1240
commit cdbf7ff394
3 changed files with 343 additions and 17 deletions

View File

@@ -11,11 +11,14 @@ Day 81 - Added presence detection! Now I can SEE you! 👀💜
import time import time
import threading import threading
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import FastAPI, HTTPException from fastapi import FastAPI, HTTPException
from fastapi.responses import Response from fastapi.responses import Response
import depthai as dai import depthai as dai
import cv2 import cv2
from pose_estimator import PoseEstimator
# ============== Configuration ============== # ============== Configuration ==============
DETECTION_MODEL = "yolov6-nano" # Has 'person' class DETECTION_MODEL = "yolov6-nano" # Has 'person' class
PERSON_CLASS_ID = 0 # 'person' is class 0 in COCO PERSON_CLASS_ID = 0 # 'person' is class 0 in COCO
@@ -23,6 +26,10 @@ DETECTION_THRESHOLD = 0.5
PRESENCE_TIMEOUT = 30.0 # seconds without person = not present PRESENCE_TIMEOUT = 30.0 # seconds without person = not present
DETECTION_INTERVAL = 0.5 DETECTION_INTERVAL = 0.5
# Pose estimation
POSE_MODEL_PATH = Path(__file__).parent / "models" / "movenet_single_pose_lightning_ptq_edgetpu.tflite"
POSE_CORAL_DEVICE = 1 # Second Coral (device 0 is headmic/YAMNet)
# ============== Global State ============== # ============== Global State ==============
pipeline_ctx = None pipeline_ctx = None
detection_queue = None detection_queue = None
@@ -30,6 +37,7 @@ rgb_queue = None
detection_thread = None detection_thread = None
running = False running = False
labels = [] labels = []
pose_estimator = None
presence_state = { presence_state = {
"present": False, "present": False,
@@ -40,6 +48,16 @@ presence_state = {
"confidence": 0.0, "confidence": 0.0,
} }
pose_state = {
"active": False,
"keypoints": [],
"posture": {},
"num_valid": 0,
"mean_confidence": 0.0,
"inference_ms": 0.0,
"last_update": None,
}
def init_oak(): def init_oak():
"""Initialize OAK-D with person detection pipeline (depthai v3).""" """Initialize OAK-D with person detection pipeline (depthai v3)."""
@@ -75,8 +93,12 @@ def init_oak():
pipeline_ctx = pipeline pipeline_ctx = pipeline
print("✅ OAK-D initialized with person detection!") print("✅ OAK-D initialized with person detection!")
# Initialize pose estimator on Coral 2
_init_pose_estimator()
return True return True
except Exception as e: except Exception as e:
print(f"❌ Failed to initialize OAK-D: {e}") print(f"❌ Failed to initialize OAK-D: {e}")
import traceback import traceback
@@ -84,6 +106,25 @@ def init_oak():
return False return False
def _init_pose_estimator():
"""Initialize MoveNet Lightning on the second Coral Edge TPU."""
global pose_estimator
if not POSE_MODEL_PATH.exists():
print(f"⚠️ Pose model not found: {POSE_MODEL_PATH}")
return
try:
pose_estimator = PoseEstimator(
model_path=str(POSE_MODEL_PATH),
device_index=POSE_CORAL_DEVICE,
)
print("✅ Pose estimator initialized on Coral 2!")
except Exception as e:
print(f"⚠️ Pose estimator failed to initialize: {e}")
pose_estimator = None
def cleanup_oak(): def cleanup_oak():
"""Cleanup OAK-D resources.""" """Cleanup OAK-D resources."""
global pipeline_ctx, running global pipeline_ctx, running
@@ -99,29 +140,29 @@ def cleanup_oak():
def detection_loop(): def detection_loop():
"""Background thread for presence detection.""" """Background thread for presence detection + pose estimation."""
global running, presence_state, detection_queue global running, presence_state, pose_state, detection_queue
print("🔍 Presence detection loop started") print("🔍 Presence detection loop started")
while running: while running:
try: try:
if detection_queue is None: if detection_queue is None:
time.sleep(1) time.sleep(1)
continue continue
data = detection_queue.tryGet() data = detection_queue.tryGet()
if data is not None: if data is not None:
now = time.time() now = time.time()
presence_state["last_detection"] = now presence_state["last_detection"] = now
# Filter for person detections only # Filter for person detections only
persons = [d for d in data.detections if d.label == PERSON_CLASS_ID] persons = [d for d in data.detections if d.label == PERSON_CLASS_ID]
person_count = len(persons) person_count = len(persons)
presence_state["person_count"] = person_count presence_state["person_count"] = person_count
if person_count > 0: if person_count > 0:
presence_state["present"] = True presence_state["present"] = True
presence_state["last_seen"] = now presence_state["last_seen"] = now
@@ -134,24 +175,66 @@ def detection_loop():
} }
for d in persons for d in persons
] ]
# Run pose estimation on the latest RGB frame
_run_pose_estimation()
else: else:
presence_state["detections"] = [] presence_state["detections"] = []
presence_state["confidence"] = 0.0 presence_state["confidence"] = 0.0
# Clear pose when no person
if pose_state["active"]:
pose_state["active"] = False
pose_state["keypoints"] = []
pose_state["posture"] = {}
pose_state["num_valid"] = 0
pose_state["mean_confidence"] = 0.0
# Check timeout # Check timeout
if presence_state["last_seen"]: if presence_state["last_seen"]:
if now - presence_state["last_seen"] > PRESENCE_TIMEOUT: if now - presence_state["last_seen"] > PRESENCE_TIMEOUT:
presence_state["present"] = False presence_state["present"] = False
time.sleep(DETECTION_INTERVAL) time.sleep(DETECTION_INTERVAL)
except Exception as e: except Exception as e:
print(f"Detection loop error: {e}") print(f"Detection loop error: {e}")
time.sleep(1) time.sleep(1)
print("🛑 Presence detection loop stopped") print("🛑 Presence detection loop stopped")
def _run_pose_estimation():
"""Grab latest RGB frame and run pose estimation via Coral 2."""
global pose_state, rgb_queue, pose_estimator
if pose_estimator is None or rgb_queue is None:
return
try:
frame_msg = rgb_queue.tryGet()
if frame_msg is None:
return
frame = frame_msg.getCvFrame()
result = pose_estimator.estimate(frame)
# Derive posture from keypoints
posture = pose_estimator.derive_posture(result["keypoints"])
pose_state["active"] = True
pose_state["keypoints"] = result["keypoints"]
pose_state["posture"] = posture
pose_state["num_valid"] = result["num_valid"]
pose_state["mean_confidence"] = result["mean_confidence"]
pose_state["inference_ms"] = result["inference_ms"]
pose_state["last_update"] = result["timestamp"]
except Exception as e:
print(f"Pose estimation error: {e}")
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
"""Startup and shutdown.""" """Startup and shutdown."""
@@ -175,8 +258,8 @@ async def lifespan(app: FastAPI):
app = FastAPI( app = FastAPI(
title="OAK-D Vision Service", title="OAK-D Vision Service",
description="Vixy's eyes with presence detection! 🦊👀", description="Vixy's eyes with presence detection + pose estimation! 🦊👀",
version="0.3.0", version="0.4.0",
lifespan=lifespan lifespan=lifespan
) )
@@ -187,9 +270,10 @@ async def health():
return { return {
"status": "healthy", "status": "healthy",
"service": "oak-service", "service": "oak-service",
"version": "0.3.0", "version": "0.4.0",
"oak_connected": pipeline_ctx is not None, "oak_connected": pipeline_ctx is not None,
"detection_model": DETECTION_MODEL, "detection_model": DETECTION_MODEL,
"pose_model_loaded": pose_estimator is not None,
"timestamp": time.time() "timestamp": time.time()
} }
@@ -245,6 +329,40 @@ async def snapshot():
@app.get("/pose")
async def pose():
"""Get current pose keypoints."""
if pose_estimator is None:
raise HTTPException(status_code=503, detail="Pose estimator not available")
return {
"active": pose_state["active"],
"keypoints": pose_state["keypoints"],
"num_valid": pose_state["num_valid"],
"mean_confidence": pose_state["mean_confidence"],
"inference_ms": pose_state["inference_ms"],
"last_update": pose_state["last_update"],
"timestamp": time.time(),
}
@app.get("/pose/summary")
async def pose_summary():
"""Get derived posture summary."""
if pose_estimator is None:
raise HTTPException(status_code=503, detail="Pose estimator not available")
return {
"active": pose_state["active"],
"posture": pose_state["posture"].get("posture", "unknown"),
"facing_camera": pose_state["posture"].get("facing_camera", False),
"arms_raised": pose_state["posture"].get("arms_raised", False),
"mean_confidence": pose_state["mean_confidence"],
"num_valid": pose_state["num_valid"],
"timestamp": time.time(),
}
if __name__ == "__main__": if __name__ == "__main__":
import uvicorn import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8100) uvicorn.run(app, host="0.0.0.0", port=8100)

208
pose_estimator.py Normal file
View File

@@ -0,0 +1,208 @@
"""
Pose Estimator — MoveNet Lightning on Google Coral Edge TPU
Single-person pose estimation with 17 body keypoints.
Runs on a dedicated Coral USB Accelerator (~7ms per frame).
"""
import time
import logging
from pathlib import Path
import cv2
import numpy as np
logger = logging.getLogger("pose_estimator")
logger.setLevel(logging.INFO)
KEYPOINT_NAMES = [
"nose", "left_eye", "right_eye", "left_ear", "right_ear",
"left_shoulder", "right_shoulder", "left_elbow", "right_elbow",
"left_wrist", "right_wrist", "left_hip", "right_hip",
"left_knee", "right_knee", "left_ankle", "right_ankle",
]
# MoveNet Lightning input size
INPUT_SIZE = 192
# Minimum confidence to consider a keypoint valid
MIN_KEYPOINT_CONFIDENCE = 0.2
class PoseEstimator:
"""MoveNet Lightning pose estimation on Coral Edge TPU."""
def __init__(self, model_path: str, device_index: int = 1):
"""
Initialize the pose estimator.
Args:
model_path: Path to movenet_single_pose_lightning_ptq_edgetpu.tflite
device_index: Coral Edge TPU device index (0-based). Default 1
since device 0 is typically used by headmic/YAMNet.
"""
import ai_edge_litert.interpreter as tfl
model_path = str(model_path)
logger.info(f"Loading MoveNet Lightning from {model_path} (Coral device :{device_index})")
try:
delegate = tfl.load_delegate(
"libedgetpu.so.1",
options={"device": f":{device_index}"}
)
self._interpreter = tfl.Interpreter(
model_path=model_path,
experimental_delegates=[delegate],
)
logger.info(f"MoveNet loaded on Edge TPU (device :{device_index})")
except (ValueError, RuntimeError) as e:
logger.warning(f"Edge TPU device :{device_index} failed ({e}), trying any available")
try:
delegate = tfl.load_delegate("libedgetpu.so.1")
self._interpreter = tfl.Interpreter(
model_path=model_path,
experimental_delegates=[delegate],
)
logger.info("MoveNet loaded on Edge TPU (auto-selected device)")
except Exception as e2:
logger.error(f"No Edge TPU available ({e2}), falling back to CPU")
self._interpreter = tfl.Interpreter(model_path=model_path)
logger.info("MoveNet loaded on CPU (slow fallback)")
self._interpreter.allocate_tensors()
self._input_details = self._interpreter.get_input_details()[0]
self._output_details = self._interpreter.get_output_details()[0]
logger.info(
f"MoveNet ready: input {self._input_details['shape']} "
f"{self._input_details['dtype']}, "
f"output {self._output_details['shape']}"
)
def estimate(self, frame_bgr: np.ndarray) -> dict:
"""
Run pose estimation on a BGR frame.
Args:
frame_bgr: OpenCV BGR image (any resolution, will be resized)
Returns:
{
"keypoints": [
{"name": "nose", "x": 0.5, "y": 0.3, "confidence": 0.92},
...
],
"num_valid": 12, # keypoints above MIN_KEYPOINT_CONFIDENCE
"mean_confidence": 0.7, # average confidence of valid keypoints
"inference_ms": 7.1,
"timestamp": 1234567890.123,
}
"""
# Resize to model input size
frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
resized = cv2.resize(frame_rgb, (INPUT_SIZE, INPUT_SIZE))
# Set input tensor (uint8)
input_data = np.expand_dims(resized, axis=0).astype(np.uint8)
self._interpreter.set_tensor(self._input_details["index"], input_data)
# Run inference
t0 = time.perf_counter()
self._interpreter.invoke()
inference_ms = (time.perf_counter() - t0) * 1000
# Parse output: [1, 1, 17, 3] → 17 keypoints x (y, x, confidence)
output = self._interpreter.get_tensor(self._output_details["index"])
keypoints_raw = output.reshape(17, 3)
# Build keypoint list
keypoints = []
valid_confidences = []
for i, name in enumerate(KEYPOINT_NAMES):
y, x, confidence = float(keypoints_raw[i][0]), float(keypoints_raw[i][1]), float(keypoints_raw[i][2])
keypoints.append({
"name": name,
"x": round(x, 4),
"y": round(y, 4),
"confidence": round(confidence, 4),
})
if confidence >= MIN_KEYPOINT_CONFIDENCE:
valid_confidences.append(confidence)
num_valid = len(valid_confidences)
mean_confidence = sum(valid_confidences) / num_valid if valid_confidences else 0.0
return {
"keypoints": keypoints,
"num_valid": num_valid,
"mean_confidence": round(mean_confidence, 4),
"inference_ms": round(inference_ms, 2),
"timestamp": time.time(),
}
def derive_posture(self, keypoints: list) -> dict:
"""
Derive high-level posture information from keypoints.
Returns:
{
"posture": "standing" | "sitting" | "unknown",
"facing_camera": True/False,
"arms_raised": True/False,
}
"""
kp = {k["name"]: k for k in keypoints}
# Helper: get a keypoint if confident enough
def get(name):
p = kp.get(name)
if p and p["confidence"] >= MIN_KEYPOINT_CONFIDENCE:
return p
return None
posture = "unknown"
facing_camera = False
arms_raised = False
# Posture: compare hip Y to knee/ankle Y
# If hips are much higher than knees → standing
# If hips are close to knees → sitting
l_hip = get("left_hip")
r_hip = get("right_hip")
l_knee = get("left_knee")
r_knee = get("right_knee")
if (l_hip or r_hip) and (l_knee or r_knee):
hip_y = np.mean([p["y"] for p in [l_hip, r_hip] if p])
knee_y = np.mean([p["y"] for p in [l_knee, r_knee] if p])
hip_knee_diff = knee_y - hip_y # positive = knees below hips
if hip_knee_diff > 0.15:
posture = "standing"
elif hip_knee_diff < 0.08:
posture = "sitting"
# Facing camera: both shoulders visible and roughly symmetric
l_shoulder = get("left_shoulder")
r_shoulder = get("right_shoulder")
if l_shoulder and r_shoulder:
# If both shoulders are visible and their X spread is reasonable
shoulder_spread = abs(r_shoulder["x"] - l_shoulder["x"])
if shoulder_spread > 0.08:
facing_camera = True
# Arms raised: wrists above shoulders
l_wrist = get("left_wrist")
r_wrist = get("right_wrist")
if (l_wrist and l_shoulder and l_wrist["y"] < l_shoulder["y"] - 0.05) or \
(r_wrist and r_shoulder and r_wrist["y"] < r_shoulder["y"] - 0.05):
arms_raised = True
return {
"posture": posture,
"facing_camera": facing_camera,
"arms_raised": arms_raised,
}