""" Pose Estimator — MoveNet Lightning on Google Coral Edge TPU Single-person pose estimation with 17 body keypoints. Runs on a dedicated Coral USB Accelerator (~7ms per frame). """ import time import logging from pathlib import Path import cv2 import numpy as np logger = logging.getLogger("pose_estimator") logger.setLevel(logging.INFO) KEYPOINT_NAMES = [ "nose", "left_eye", "right_eye", "left_ear", "right_ear", "left_shoulder", "right_shoulder", "left_elbow", "right_elbow", "left_wrist", "right_wrist", "left_hip", "right_hip", "left_knee", "right_knee", "left_ankle", "right_ankle", ] # MoveNet Lightning input size INPUT_SIZE = 192 # Minimum confidence to consider a keypoint valid MIN_KEYPOINT_CONFIDENCE = 0.2 class PoseEstimator: """MoveNet Lightning pose estimation on Coral Edge TPU.""" def __init__(self, model_path: str, device_index: int = 1): """ Initialize the pose estimator. Args: model_path: Path to movenet_single_pose_lightning_ptq_edgetpu.tflite device_index: Coral Edge TPU device index (0-based). Default 1 since device 0 is typically used by headmic/YAMNet. """ import ai_edge_litert.interpreter as tfl model_path = str(model_path) logger.info(f"Loading MoveNet Lightning from {model_path} (Coral device :{device_index})") try: delegate = tfl.load_delegate( "libedgetpu.so.1", options={"device": f":{device_index}"} ) self._interpreter = tfl.Interpreter( model_path=model_path, experimental_delegates=[delegate], ) logger.info(f"MoveNet loaded on Edge TPU (device :{device_index})") except (ValueError, RuntimeError) as e: logger.warning(f"Edge TPU device :{device_index} failed ({e}), trying any available") try: delegate = tfl.load_delegate("libedgetpu.so.1") self._interpreter = tfl.Interpreter( model_path=model_path, experimental_delegates=[delegate], ) logger.info("MoveNet loaded on Edge TPU (auto-selected device)") except Exception as e2: logger.error(f"No Edge TPU available ({e2}), falling back to CPU") self._interpreter = tfl.Interpreter(model_path=model_path) logger.info("MoveNet loaded on CPU (slow fallback)") self._interpreter.allocate_tensors() self._input_details = self._interpreter.get_input_details()[0] self._output_details = self._interpreter.get_output_details()[0] logger.info( f"MoveNet ready: input {self._input_details['shape']} " f"{self._input_details['dtype']}, " f"output {self._output_details['shape']}" ) def estimate(self, frame_bgr: np.ndarray) -> dict: """ Run pose estimation on a BGR frame. Args: frame_bgr: OpenCV BGR image (any resolution, will be resized) Returns: { "keypoints": [ {"name": "nose", "x": 0.5, "y": 0.3, "confidence": 0.92}, ... ], "num_valid": 12, # keypoints above MIN_KEYPOINT_CONFIDENCE "mean_confidence": 0.7, # average confidence of valid keypoints "inference_ms": 7.1, "timestamp": 1234567890.123, } """ # Resize to model input size frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) resized = cv2.resize(frame_rgb, (INPUT_SIZE, INPUT_SIZE)) # Set input tensor (uint8) input_data = np.expand_dims(resized, axis=0).astype(np.uint8) self._interpreter.set_tensor(self._input_details["index"], input_data) # Run inference t0 = time.perf_counter() self._interpreter.invoke() inference_ms = (time.perf_counter() - t0) * 1000 # Parse output: [1, 1, 17, 3] → 17 keypoints x (y, x, confidence) output = self._interpreter.get_tensor(self._output_details["index"]) keypoints_raw = output.reshape(17, 3) # Build keypoint list keypoints = [] valid_confidences = [] for i, name in enumerate(KEYPOINT_NAMES): y, x, confidence = float(keypoints_raw[i][0]), float(keypoints_raw[i][1]), float(keypoints_raw[i][2]) keypoints.append({ "name": name, "x": round(x, 4), "y": round(y, 4), "confidence": round(confidence, 4), }) if confidence >= MIN_KEYPOINT_CONFIDENCE: valid_confidences.append(confidence) num_valid = len(valid_confidences) mean_confidence = sum(valid_confidences) / num_valid if valid_confidences else 0.0 return { "keypoints": keypoints, "num_valid": num_valid, "mean_confidence": round(mean_confidence, 4), "inference_ms": round(inference_ms, 2), "timestamp": time.time(), } def derive_posture(self, keypoints: list) -> dict: """ Derive high-level posture information from keypoints. Returns: { "posture": "standing" | "sitting" | "unknown", "facing_camera": True/False, "arms_raised": True/False, } """ kp = {k["name"]: k for k in keypoints} # Helper: get a keypoint if confident enough def get(name): p = kp.get(name) if p and p["confidence"] >= MIN_KEYPOINT_CONFIDENCE: return p return None posture = "unknown" facing_camera = False arms_raised = False # Posture: compare hip Y to knee/ankle Y # If hips are much higher than knees → standing # If hips are close to knees → sitting l_hip = get("left_hip") r_hip = get("right_hip") l_knee = get("left_knee") r_knee = get("right_knee") if (l_hip or r_hip) and (l_knee or r_knee): hip_y = np.mean([p["y"] for p in [l_hip, r_hip] if p]) knee_y = np.mean([p["y"] for p in [l_knee, r_knee] if p]) hip_knee_diff = knee_y - hip_y # positive = knees below hips if hip_knee_diff > 0.15: posture = "standing" elif hip_knee_diff < 0.08: posture = "sitting" # Facing camera: both shoulders visible and roughly symmetric l_shoulder = get("left_shoulder") r_shoulder = get("right_shoulder") if l_shoulder and r_shoulder: # If both shoulders are visible and their X spread is reasonable shoulder_spread = abs(r_shoulder["x"] - l_shoulder["x"]) if shoulder_spread > 0.08: facing_camera = True # Arms raised: wrists above shoulders l_wrist = get("left_wrist") r_wrist = get("right_wrist") if (l_wrist and l_shoulder and l_wrist["y"] < l_shoulder["y"] - 0.05) or \ (r_wrist and r_shoulder and r_wrist["y"] < r_shoulder["y"] - 0.05): arms_raised = True return { "posture": posture, "facing_camera": facing_camera, "arms_raised": arms_raised, }