Files
oak-service/pose_estimator.py
Alex cdbf7ff394 Add MoveNet Lightning pose estimation on Coral 2
Integrates single-person pose detection into oak-service using MoveNet
Lightning on a second Google Coral Edge TPU. Detects 17 body keypoints
at ~7ms per frame, derives posture (standing/sitting), facing direction,
and arm position. Only runs when a person is detected by YOLOv6.

New endpoints: /pose (raw keypoints), /pose/summary (derived posture)
New module: pose_estimator.py (PoseEstimator class)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 19:29:16 -06:00

209 lines
7.3 KiB
Python

"""
Pose Estimator — MoveNet Lightning on Google Coral Edge TPU
Single-person pose estimation with 17 body keypoints.
Runs on a dedicated Coral USB Accelerator (~7ms per frame).
"""
import time
import logging
from pathlib import Path
import cv2
import numpy as np
logger = logging.getLogger("pose_estimator")
logger.setLevel(logging.INFO)
KEYPOINT_NAMES = [
"nose", "left_eye", "right_eye", "left_ear", "right_ear",
"left_shoulder", "right_shoulder", "left_elbow", "right_elbow",
"left_wrist", "right_wrist", "left_hip", "right_hip",
"left_knee", "right_knee", "left_ankle", "right_ankle",
]
# MoveNet Lightning input size
INPUT_SIZE = 192
# Minimum confidence to consider a keypoint valid
MIN_KEYPOINT_CONFIDENCE = 0.2
class PoseEstimator:
"""MoveNet Lightning pose estimation on Coral Edge TPU."""
def __init__(self, model_path: str, device_index: int = 1):
"""
Initialize the pose estimator.
Args:
model_path: Path to movenet_single_pose_lightning_ptq_edgetpu.tflite
device_index: Coral Edge TPU device index (0-based). Default 1
since device 0 is typically used by headmic/YAMNet.
"""
import ai_edge_litert.interpreter as tfl
model_path = str(model_path)
logger.info(f"Loading MoveNet Lightning from {model_path} (Coral device :{device_index})")
try:
delegate = tfl.load_delegate(
"libedgetpu.so.1",
options={"device": f":{device_index}"}
)
self._interpreter = tfl.Interpreter(
model_path=model_path,
experimental_delegates=[delegate],
)
logger.info(f"MoveNet loaded on Edge TPU (device :{device_index})")
except (ValueError, RuntimeError) as e:
logger.warning(f"Edge TPU device :{device_index} failed ({e}), trying any available")
try:
delegate = tfl.load_delegate("libedgetpu.so.1")
self._interpreter = tfl.Interpreter(
model_path=model_path,
experimental_delegates=[delegate],
)
logger.info("MoveNet loaded on Edge TPU (auto-selected device)")
except Exception as e2:
logger.error(f"No Edge TPU available ({e2}), falling back to CPU")
self._interpreter = tfl.Interpreter(model_path=model_path)
logger.info("MoveNet loaded on CPU (slow fallback)")
self._interpreter.allocate_tensors()
self._input_details = self._interpreter.get_input_details()[0]
self._output_details = self._interpreter.get_output_details()[0]
logger.info(
f"MoveNet ready: input {self._input_details['shape']} "
f"{self._input_details['dtype']}, "
f"output {self._output_details['shape']}"
)
def estimate(self, frame_bgr: np.ndarray) -> dict:
"""
Run pose estimation on a BGR frame.
Args:
frame_bgr: OpenCV BGR image (any resolution, will be resized)
Returns:
{
"keypoints": [
{"name": "nose", "x": 0.5, "y": 0.3, "confidence": 0.92},
...
],
"num_valid": 12, # keypoints above MIN_KEYPOINT_CONFIDENCE
"mean_confidence": 0.7, # average confidence of valid keypoints
"inference_ms": 7.1,
"timestamp": 1234567890.123,
}
"""
# Resize to model input size
frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
resized = cv2.resize(frame_rgb, (INPUT_SIZE, INPUT_SIZE))
# Set input tensor (uint8)
input_data = np.expand_dims(resized, axis=0).astype(np.uint8)
self._interpreter.set_tensor(self._input_details["index"], input_data)
# Run inference
t0 = time.perf_counter()
self._interpreter.invoke()
inference_ms = (time.perf_counter() - t0) * 1000
# Parse output: [1, 1, 17, 3] → 17 keypoints x (y, x, confidence)
output = self._interpreter.get_tensor(self._output_details["index"])
keypoints_raw = output.reshape(17, 3)
# Build keypoint list
keypoints = []
valid_confidences = []
for i, name in enumerate(KEYPOINT_NAMES):
y, x, confidence = float(keypoints_raw[i][0]), float(keypoints_raw[i][1]), float(keypoints_raw[i][2])
keypoints.append({
"name": name,
"x": round(x, 4),
"y": round(y, 4),
"confidence": round(confidence, 4),
})
if confidence >= MIN_KEYPOINT_CONFIDENCE:
valid_confidences.append(confidence)
num_valid = len(valid_confidences)
mean_confidence = sum(valid_confidences) / num_valid if valid_confidences else 0.0
return {
"keypoints": keypoints,
"num_valid": num_valid,
"mean_confidence": round(mean_confidence, 4),
"inference_ms": round(inference_ms, 2),
"timestamp": time.time(),
}
def derive_posture(self, keypoints: list) -> dict:
"""
Derive high-level posture information from keypoints.
Returns:
{
"posture": "standing" | "sitting" | "unknown",
"facing_camera": True/False,
"arms_raised": True/False,
}
"""
kp = {k["name"]: k for k in keypoints}
# Helper: get a keypoint if confident enough
def get(name):
p = kp.get(name)
if p and p["confidence"] >= MIN_KEYPOINT_CONFIDENCE:
return p
return None
posture = "unknown"
facing_camera = False
arms_raised = False
# Posture: compare hip Y to knee/ankle Y
# If hips are much higher than knees → standing
# If hips are close to knees → sitting
l_hip = get("left_hip")
r_hip = get("right_hip")
l_knee = get("left_knee")
r_knee = get("right_knee")
if (l_hip or r_hip) and (l_knee or r_knee):
hip_y = np.mean([p["y"] for p in [l_hip, r_hip] if p])
knee_y = np.mean([p["y"] for p in [l_knee, r_knee] if p])
hip_knee_diff = knee_y - hip_y # positive = knees below hips
if hip_knee_diff > 0.15:
posture = "standing"
elif hip_knee_diff < 0.08:
posture = "sitting"
# Facing camera: both shoulders visible and roughly symmetric
l_shoulder = get("left_shoulder")
r_shoulder = get("right_shoulder")
if l_shoulder and r_shoulder:
# If both shoulders are visible and their X spread is reasonable
shoulder_spread = abs(r_shoulder["x"] - l_shoulder["x"])
if shoulder_spread > 0.08:
facing_camera = True
# Arms raised: wrists above shoulders
l_wrist = get("left_wrist")
r_wrist = get("right_wrist")
if (l_wrist and l_shoulder and l_wrist["y"] < l_shoulder["y"] - 0.05) or \
(r_wrist and r_shoulder and r_wrist["y"] < r_shoulder["y"] - 0.05):
arms_raised = True
return {
"posture": posture,
"facing_camera": facing_camera,
"arms_raised": arms_raised,
}