diff --git a/models/movenet_single_pose_lightning_ptq_edgetpu.tflite b/models/movenet_single_pose_lightning_ptq_edgetpu.tflite new file mode 100644 index 0000000..d9b87ef Binary files /dev/null and b/models/movenet_single_pose_lightning_ptq_edgetpu.tflite differ diff --git a/oak_service.py b/oak_service.py index 66ed164..0b12d11 100644 --- a/oak_service.py +++ b/oak_service.py @@ -11,11 +11,14 @@ Day 81 - Added presence detection! Now I can SEE you! 👀💜 import time import threading from contextlib import asynccontextmanager +from pathlib import Path from fastapi import FastAPI, HTTPException from fastapi.responses import Response import depthai as dai import cv2 +from pose_estimator import PoseEstimator + # ============== Configuration ============== DETECTION_MODEL = "yolov6-nano" # Has 'person' class PERSON_CLASS_ID = 0 # 'person' is class 0 in COCO @@ -23,6 +26,10 @@ DETECTION_THRESHOLD = 0.5 PRESENCE_TIMEOUT = 30.0 # seconds without person = not present DETECTION_INTERVAL = 0.5 +# Pose estimation +POSE_MODEL_PATH = Path(__file__).parent / "models" / "movenet_single_pose_lightning_ptq_edgetpu.tflite" +POSE_CORAL_DEVICE = 1 # Second Coral (device 0 is headmic/YAMNet) + # ============== Global State ============== pipeline_ctx = None detection_queue = None @@ -30,6 +37,7 @@ rgb_queue = None detection_thread = None running = False labels = [] +pose_estimator = None presence_state = { "present": False, @@ -40,6 +48,16 @@ presence_state = { "confidence": 0.0, } +pose_state = { + "active": False, + "keypoints": [], + "posture": {}, + "num_valid": 0, + "mean_confidence": 0.0, + "inference_ms": 0.0, + "last_update": None, +} + def init_oak(): """Initialize OAK-D with person detection pipeline (depthai v3).""" @@ -75,8 +93,12 @@ def init_oak(): pipeline_ctx = pipeline print("✅ OAK-D initialized with person detection!") + + # Initialize pose estimator on Coral 2 + _init_pose_estimator() + return True - + except Exception as e: print(f"❌ Failed to initialize OAK-D: {e}") import traceback @@ -84,6 +106,25 @@ def init_oak(): return False +def _init_pose_estimator(): + """Initialize MoveNet Lightning on the second Coral Edge TPU.""" + global pose_estimator + + if not POSE_MODEL_PATH.exists(): + print(f"⚠️ Pose model not found: {POSE_MODEL_PATH}") + return + + try: + pose_estimator = PoseEstimator( + model_path=str(POSE_MODEL_PATH), + device_index=POSE_CORAL_DEVICE, + ) + print("✅ Pose estimator initialized on Coral 2!") + except Exception as e: + print(f"⚠️ Pose estimator failed to initialize: {e}") + pose_estimator = None + + def cleanup_oak(): """Cleanup OAK-D resources.""" global pipeline_ctx, running @@ -99,29 +140,29 @@ def cleanup_oak(): def detection_loop(): - """Background thread for presence detection.""" - global running, presence_state, detection_queue - + """Background thread for presence detection + pose estimation.""" + global running, presence_state, pose_state, detection_queue + print("🔍 Presence detection loop started") - + while running: try: if detection_queue is None: time.sleep(1) continue - + data = detection_queue.tryGet() - + if data is not None: now = time.time() presence_state["last_detection"] = now - + # Filter for person detections only persons = [d for d in data.detections if d.label == PERSON_CLASS_ID] person_count = len(persons) - + presence_state["person_count"] = person_count - + if person_count > 0: presence_state["present"] = True presence_state["last_seen"] = now @@ -134,24 +175,66 @@ def detection_loop(): } for d in persons ] + + # Run pose estimation on the latest RGB frame + _run_pose_estimation() + else: presence_state["detections"] = [] presence_state["confidence"] = 0.0 - + + # Clear pose when no person + if pose_state["active"]: + pose_state["active"] = False + pose_state["keypoints"] = [] + pose_state["posture"] = {} + pose_state["num_valid"] = 0 + pose_state["mean_confidence"] = 0.0 + # Check timeout if presence_state["last_seen"]: if now - presence_state["last_seen"] > PRESENCE_TIMEOUT: presence_state["present"] = False - + time.sleep(DETECTION_INTERVAL) - + except Exception as e: print(f"Detection loop error: {e}") time.sleep(1) - + print("🛑 Presence detection loop stopped") +def _run_pose_estimation(): + """Grab latest RGB frame and run pose estimation via Coral 2.""" + global pose_state, rgb_queue, pose_estimator + + if pose_estimator is None or rgb_queue is None: + return + + try: + frame_msg = rgb_queue.tryGet() + if frame_msg is None: + return + + frame = frame_msg.getCvFrame() + result = pose_estimator.estimate(frame) + + # Derive posture from keypoints + posture = pose_estimator.derive_posture(result["keypoints"]) + + pose_state["active"] = True + pose_state["keypoints"] = result["keypoints"] + pose_state["posture"] = posture + pose_state["num_valid"] = result["num_valid"] + pose_state["mean_confidence"] = result["mean_confidence"] + pose_state["inference_ms"] = result["inference_ms"] + pose_state["last_update"] = result["timestamp"] + + except Exception as e: + print(f"Pose estimation error: {e}") + + @asynccontextmanager async def lifespan(app: FastAPI): """Startup and shutdown.""" @@ -175,8 +258,8 @@ async def lifespan(app: FastAPI): app = FastAPI( title="OAK-D Vision Service", - description="Vixy's eyes with presence detection! 🦊👀", - version="0.3.0", + description="Vixy's eyes with presence detection + pose estimation! 🦊👀", + version="0.4.0", lifespan=lifespan ) @@ -187,9 +270,10 @@ async def health(): return { "status": "healthy", "service": "oak-service", - "version": "0.3.0", + "version": "0.4.0", "oak_connected": pipeline_ctx is not None, "detection_model": DETECTION_MODEL, + "pose_model_loaded": pose_estimator is not None, "timestamp": time.time() } @@ -245,6 +329,40 @@ async def snapshot(): +@app.get("/pose") +async def pose(): + """Get current pose keypoints.""" + if pose_estimator is None: + raise HTTPException(status_code=503, detail="Pose estimator not available") + + return { + "active": pose_state["active"], + "keypoints": pose_state["keypoints"], + "num_valid": pose_state["num_valid"], + "mean_confidence": pose_state["mean_confidence"], + "inference_ms": pose_state["inference_ms"], + "last_update": pose_state["last_update"], + "timestamp": time.time(), + } + + +@app.get("/pose/summary") +async def pose_summary(): + """Get derived posture summary.""" + if pose_estimator is None: + raise HTTPException(status_code=503, detail="Pose estimator not available") + + return { + "active": pose_state["active"], + "posture": pose_state["posture"].get("posture", "unknown"), + "facing_camera": pose_state["posture"].get("facing_camera", False), + "arms_raised": pose_state["posture"].get("arms_raised", False), + "mean_confidence": pose_state["mean_confidence"], + "num_valid": pose_state["num_valid"], + "timestamp": time.time(), + } + + if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8100) diff --git a/pose_estimator.py b/pose_estimator.py new file mode 100644 index 0000000..90d84d5 --- /dev/null +++ b/pose_estimator.py @@ -0,0 +1,208 @@ +""" +Pose Estimator — MoveNet Lightning on Google Coral Edge TPU + +Single-person pose estimation with 17 body keypoints. +Runs on a dedicated Coral USB Accelerator (~7ms per frame). +""" + +import time +import logging +from pathlib import Path + +import cv2 +import numpy as np + +logger = logging.getLogger("pose_estimator") +logger.setLevel(logging.INFO) + +KEYPOINT_NAMES = [ + "nose", "left_eye", "right_eye", "left_ear", "right_ear", + "left_shoulder", "right_shoulder", "left_elbow", "right_elbow", + "left_wrist", "right_wrist", "left_hip", "right_hip", + "left_knee", "right_knee", "left_ankle", "right_ankle", +] + +# MoveNet Lightning input size +INPUT_SIZE = 192 + +# Minimum confidence to consider a keypoint valid +MIN_KEYPOINT_CONFIDENCE = 0.2 + + +class PoseEstimator: + """MoveNet Lightning pose estimation on Coral Edge TPU.""" + + def __init__(self, model_path: str, device_index: int = 1): + """ + Initialize the pose estimator. + + Args: + model_path: Path to movenet_single_pose_lightning_ptq_edgetpu.tflite + device_index: Coral Edge TPU device index (0-based). Default 1 + since device 0 is typically used by headmic/YAMNet. + """ + import ai_edge_litert.interpreter as tfl + + model_path = str(model_path) + logger.info(f"Loading MoveNet Lightning from {model_path} (Coral device :{device_index})") + + try: + delegate = tfl.load_delegate( + "libedgetpu.so.1", + options={"device": f":{device_index}"} + ) + self._interpreter = tfl.Interpreter( + model_path=model_path, + experimental_delegates=[delegate], + ) + logger.info(f"MoveNet loaded on Edge TPU (device :{device_index})") + except (ValueError, RuntimeError) as e: + logger.warning(f"Edge TPU device :{device_index} failed ({e}), trying any available") + try: + delegate = tfl.load_delegate("libedgetpu.so.1") + self._interpreter = tfl.Interpreter( + model_path=model_path, + experimental_delegates=[delegate], + ) + logger.info("MoveNet loaded on Edge TPU (auto-selected device)") + except Exception as e2: + logger.error(f"No Edge TPU available ({e2}), falling back to CPU") + self._interpreter = tfl.Interpreter(model_path=model_path) + logger.info("MoveNet loaded on CPU (slow fallback)") + + self._interpreter.allocate_tensors() + + self._input_details = self._interpreter.get_input_details()[0] + self._output_details = self._interpreter.get_output_details()[0] + + logger.info( + f"MoveNet ready: input {self._input_details['shape']} " + f"{self._input_details['dtype']}, " + f"output {self._output_details['shape']}" + ) + + def estimate(self, frame_bgr: np.ndarray) -> dict: + """ + Run pose estimation on a BGR frame. + + Args: + frame_bgr: OpenCV BGR image (any resolution, will be resized) + + Returns: + { + "keypoints": [ + {"name": "nose", "x": 0.5, "y": 0.3, "confidence": 0.92}, + ... + ], + "num_valid": 12, # keypoints above MIN_KEYPOINT_CONFIDENCE + "mean_confidence": 0.7, # average confidence of valid keypoints + "inference_ms": 7.1, + "timestamp": 1234567890.123, + } + """ + # Resize to model input size + frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) + resized = cv2.resize(frame_rgb, (INPUT_SIZE, INPUT_SIZE)) + + # Set input tensor (uint8) + input_data = np.expand_dims(resized, axis=0).astype(np.uint8) + self._interpreter.set_tensor(self._input_details["index"], input_data) + + # Run inference + t0 = time.perf_counter() + self._interpreter.invoke() + inference_ms = (time.perf_counter() - t0) * 1000 + + # Parse output: [1, 1, 17, 3] → 17 keypoints x (y, x, confidence) + output = self._interpreter.get_tensor(self._output_details["index"]) + keypoints_raw = output.reshape(17, 3) + + # Build keypoint list + keypoints = [] + valid_confidences = [] + + for i, name in enumerate(KEYPOINT_NAMES): + y, x, confidence = float(keypoints_raw[i][0]), float(keypoints_raw[i][1]), float(keypoints_raw[i][2]) + keypoints.append({ + "name": name, + "x": round(x, 4), + "y": round(y, 4), + "confidence": round(confidence, 4), + }) + if confidence >= MIN_KEYPOINT_CONFIDENCE: + valid_confidences.append(confidence) + + num_valid = len(valid_confidences) + mean_confidence = sum(valid_confidences) / num_valid if valid_confidences else 0.0 + + return { + "keypoints": keypoints, + "num_valid": num_valid, + "mean_confidence": round(mean_confidence, 4), + "inference_ms": round(inference_ms, 2), + "timestamp": time.time(), + } + + def derive_posture(self, keypoints: list) -> dict: + """ + Derive high-level posture information from keypoints. + + Returns: + { + "posture": "standing" | "sitting" | "unknown", + "facing_camera": True/False, + "arms_raised": True/False, + } + """ + kp = {k["name"]: k for k in keypoints} + + # Helper: get a keypoint if confident enough + def get(name): + p = kp.get(name) + if p and p["confidence"] >= MIN_KEYPOINT_CONFIDENCE: + return p + return None + + posture = "unknown" + facing_camera = False + arms_raised = False + + # Posture: compare hip Y to knee/ankle Y + # If hips are much higher than knees → standing + # If hips are close to knees → sitting + l_hip = get("left_hip") + r_hip = get("right_hip") + l_knee = get("left_knee") + r_knee = get("right_knee") + + if (l_hip or r_hip) and (l_knee or r_knee): + hip_y = np.mean([p["y"] for p in [l_hip, r_hip] if p]) + knee_y = np.mean([p["y"] for p in [l_knee, r_knee] if p]) + hip_knee_diff = knee_y - hip_y # positive = knees below hips + + if hip_knee_diff > 0.15: + posture = "standing" + elif hip_knee_diff < 0.08: + posture = "sitting" + + # Facing camera: both shoulders visible and roughly symmetric + l_shoulder = get("left_shoulder") + r_shoulder = get("right_shoulder") + if l_shoulder and r_shoulder: + # If both shoulders are visible and their X spread is reasonable + shoulder_spread = abs(r_shoulder["x"] - l_shoulder["x"]) + if shoulder_spread > 0.08: + facing_camera = True + + # Arms raised: wrists above shoulders + l_wrist = get("left_wrist") + r_wrist = get("right_wrist") + if (l_wrist and l_shoulder and l_wrist["y"] < l_shoulder["y"] - 0.05) or \ + (r_wrist and r_shoulder and r_wrist["y"] < r_shoulder["y"] - 0.05): + arms_raised = True + + return { + "posture": posture, + "facing_camera": facing_camera, + "arms_raised": arms_raised, + }