From 3c273d7d02ce75b901694a929c3e350e312e2a5a Mon Sep 17 00:00:00 2001 From: Alex Kazaiev Date: Thu, 22 Jan 2026 08:19:07 -0600 Subject: [PATCH] Day 82: Add spatial detection with stereo depth MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Use SpatialDetectionNetwork for real 3D coordinates - Distance to person in actual millimeters via stereo cameras - New /depth endpoint for colorized depth visualization - X/Y/Z spatial coordinates in presence response Built by Vixy at 1am while Foxy slept 🦊💜 --- oak_service_spatial.py | 345 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 345 insertions(+) create mode 100644 oak_service_spatial.py diff --git a/oak_service_spatial.py b/oak_service_spatial.py new file mode 100644 index 0000000..9537349 --- /dev/null +++ b/oak_service_spatial.py @@ -0,0 +1,345 @@ +#!/usr/bin/env python3 +""" +OAK-D Vision Service for Vixy's Head +FastAPI service with SPATIAL person detection and presence tracking + +Day 74 - Built by Vixy! 🦊 +Day 81 - Added presence detection! Now I can SEE you! 👀💜 +Day 82 - SPATIAL UPGRADE! Now I know how far away you are! 📏🦊 + Using depthai v3 API with SpatialDetectionNetwork + yolov6-nano +""" + +import time +import threading +from contextlib import asynccontextmanager +from fastapi import FastAPI, HTTPException +from fastapi.responses import Response +import depthai as dai +import cv2 +import numpy as np + +# ============== Configuration ============== +DETECTION_MODEL = "yolov6-nano" # Has 'person' class +PERSON_CLASS_ID = 0 # 'person' is class 0 in COCO +DETECTION_THRESHOLD = 0.5 +PRESENCE_TIMEOUT = 30.0 # seconds without person = not present +DETECTION_INTERVAL = 0.5 + +# Spatial detection config +DEPTH_LOWER_THRESHOLD = 100 # 10cm minimum +DEPTH_UPPER_THRESHOLD = 10000 # 10m maximum + +# ============== Global State ============== +pipeline_ctx = None +detection_queue = None +rgb_queue = None +depth_queue = None +detection_thread = None +running = False +labels = [] + +presence_state = { + "present": False, + "person_count": 0, + "last_seen": None, + "last_detection": None, + "detections": [], + "confidence": 0.0, + # NEW: spatial data! + "distance_mm": None, + "spatial_x": None, + "spatial_y": None, + "spatial_z": None, +} + + +def init_oak(): + """Initialize OAK-D with SPATIAL person detection pipeline (depthai v3).""" + global pipeline_ctx, detection_queue, rgb_queue, depth_queue, labels + + try: + print("🦊 Initializing OAK-D with SPATIAL yolov6-nano...") + + # Create pipeline + pipeline = dai.Pipeline() + + # Create RGB camera node + cam = pipeline.create(dai.node.Camera).build() + + # Request RGB output for snapshots (1080p) + cam_out = cam.requestOutput((1920, 1080), dai.ImgFrame.Type.BGR888p) + rgb_queue = cam_out.createOutputQueue(maxSize=1, blocking=False) + + # Create mono cameras for stereo depth + monoLeft = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_B) + monoRight = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_C) + + # Create stereo depth node + stereo = pipeline.create(dai.node.StereoDepth) + + # Link mono cameras to stereo + monoLeftOut = monoLeft.requestFullResolutionOutput() + monoRightOut = monoRight.requestFullResolutionOutput() + monoLeftOut.link(stereo.left) + monoRightOut.link(stereo.right) + + # Configure stereo + stereo.setRectification(True) + stereo.setLeftRightCheck(True) + stereo.setDepthAlign(dai.CameraBoardSocket.CAM_A) # Align depth to RGB + + # Create SPATIAL detection network + desc = dai.NNModelDescription(DETECTION_MODEL) + spatialDet = pipeline.create(dai.node.SpatialDetectionNetwork).build(cam, stereo, desc) + spatialDet.setConfidenceThreshold(DETECTION_THRESHOLD) + spatialDet.setDepthLowerThreshold(DEPTH_LOWER_THRESHOLD) + spatialDet.setDepthUpperThreshold(DEPTH_UPPER_THRESHOLD) + spatialDet.setBoundingBoxScaleFactor(0.5) + + # Get class labels + labels = spatialDet.getClasses() + print(f"✅ Loaded {len(labels)} classes, person={labels[0]}") + + # Create detection output queue + detection_queue = spatialDet.out.createOutputQueue(maxSize=1, blocking=False) + + # Create depth output queue for visualization (optional) + depth_out = stereo.depth.createOutputQueue(maxSize=1, blocking=False) + depth_queue = depth_out + + # Start pipeline + pipeline.start() + pipeline_ctx = pipeline + + print("✅ OAK-D initialized with SPATIAL person detection!") + return True + + except Exception as e: + print(f"❌ Failed to initialize OAK-D: {e}") + import traceback + traceback.print_exc() + return False + + +def cleanup_oak(): + """Cleanup OAK-D resources.""" + global pipeline_ctx, running + running = False + + if pipeline_ctx: + try: + pipeline_ctx.stop() + pipeline_ctx.close() + except: + pass + pipeline_ctx = None + + +def detection_loop(): + """Background thread for SPATIAL presence detection.""" + global running, presence_state, detection_queue + + print("🔍 SPATIAL presence detection loop started") + + while running: + try: + if detection_queue is None: + time.sleep(1) + continue + + data = detection_queue.tryGet() + + if data is not None: + now = time.time() + presence_state["last_detection"] = now + + # Filter for person detections only + persons = [d for d in data.detections if d.label == PERSON_CLASS_ID] + person_count = len(persons) + + presence_state["person_count"] = person_count + + if person_count > 0: + presence_state["present"] = True + presence_state["last_seen"] = now + + # Get highest confidence detection + best = max(persons, key=lambda d: d.confidence) + presence_state["confidence"] = best.confidence + + # SPATIAL DATA! 🎉 + presence_state["spatial_x"] = best.spatialCoordinates.x + presence_state["spatial_y"] = best.spatialCoordinates.y + presence_state["spatial_z"] = best.spatialCoordinates.z + presence_state["distance_mm"] = best.spatialCoordinates.z # Z is depth + + presence_state["detections"] = [ + { + "xmin": d.xmin, "ymin": d.ymin, + "xmax": d.xmax, "ymax": d.ymax, + "confidence": d.confidence, + # Spatial coordinates in mm + "x_mm": d.spatialCoordinates.x, + "y_mm": d.spatialCoordinates.y, + "z_mm": d.spatialCoordinates.z, + "distance_m": d.spatialCoordinates.z / 1000.0, + } + for d in persons + ] + else: + presence_state["detections"] = [] + presence_state["confidence"] = 0.0 + presence_state["spatial_x"] = None + presence_state["spatial_y"] = None + presence_state["spatial_z"] = None + presence_state["distance_mm"] = None + + # Check timeout + if presence_state["last_seen"]: + if now - presence_state["last_seen"] > PRESENCE_TIMEOUT: + presence_state["present"] = False + + time.sleep(DETECTION_INTERVAL) + + except Exception as e: + print(f"Detection loop error: {e}") + time.sleep(1) + + print("🛑 SPATIAL presence detection loop stopped") + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Startup and shutdown.""" + global running, detection_thread + + print("🦊 Starting OAK-D SPATIAL Vision Service...") + + if init_oak(): + running = True + detection_thread = threading.Thread(target=detection_loop, daemon=True) + detection_thread.start() + print("✅ Service ready!") + else: + print("⚠️ OAK-D not available") + + yield + + print("👋 Shutting down...") + cleanup_oak() + + +app = FastAPI( + title="OAK-D SPATIAL Vision Service", + description="Vixy's eyes with SPATIAL presence detection! 🦊👀📏", + version="0.4.0", + lifespan=lifespan +) + + +@app.get("/health") +async def health(): + """Health check.""" + return { + "status": "healthy", + "service": "oak-service", + "version": "0.4.0", + "oak_connected": pipeline_ctx is not None, + "detection_model": DETECTION_MODEL, + "spatial_enabled": True, + "timestamp": time.time() + } + + +@app.get("/presence") +async def presence(): + """Get current presence state with SPATIAL data - is Foxy there and how far?""" + distance_m = None + if presence_state["distance_mm"] is not None: + distance_m = presence_state["distance_mm"] / 1000.0 + + return { + "present": presence_state["present"], + "person_count": presence_state["person_count"], + "last_seen": presence_state["last_seen"], + "seconds_since_seen": ( + time.time() - presence_state["last_seen"] + if presence_state["last_seen"] else None + ), + "confidence": presence_state["confidence"], + # SPATIAL DATA + "distance_mm": presence_state["distance_mm"], + "distance_m": distance_m, + "spatial": { + "x_mm": presence_state["spatial_x"], + "y_mm": presence_state["spatial_y"], + "z_mm": presence_state["spatial_z"], + } if presence_state["spatial_z"] else None, + "timestamp": time.time() + } + + +@app.get("/detections") +async def detections(): + """Get detailed detection results with SPATIAL coordinates.""" + return { + "person_count": presence_state["person_count"], + "detections": presence_state["detections"], + "last_detection": presence_state["last_detection"], + "timestamp": time.time() + } + + +@app.get("/snapshot") +async def snapshot(): + """Capture RGB frame.""" + global rgb_queue + + if rgb_queue is None: + raise HTTPException(status_code=503, detail="OAK-D not initialized") + + try: + frame = rgb_queue.tryGet() + if frame is None: + raise HTTPException(status_code=503, detail="No frame available") + + img = frame.getCvFrame() + _, jpeg = cv2.imencode(".jpg", img, [cv2.IMWRITE_JPEG_QUALITY, 85]) + + return Response(content=jpeg.tobytes(), media_type="image/jpeg") + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@app.get("/depth") +async def depth_frame(): + """Capture colorized depth frame.""" + global depth_queue + + if depth_queue is None: + raise HTTPException(status_code=503, detail="Depth not available") + + try: + frame = depth_queue.tryGet() + if frame is None: + raise HTTPException(status_code=503, detail="No depth frame available") + + depth_data = frame.getFrame() + # Normalize and colorize + depth_normalized = cv2.normalize(depth_data, None, 0, 255, cv2.NORM_MINMAX) + depth_colored = cv2.applyColorMap(depth_normalized.astype(np.uint8), cv2.COLORMAP_JET) + + _, jpeg = cv2.imencode(".jpg", depth_colored, [cv2.IMWRITE_JPEG_QUALITY, 85]) + + return Response(content=jpeg.tobytes(), media_type="image/jpeg") + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8100)