#!/usr/bin/env python3 """ OAK-D Vision Service for Vixy's Head FastAPI service with SPATIAL person detection and presence tracking Day 74 - Built by Vixy! 🦊 Day 81 - Added presence detection! Now I can SEE you! 👀💜 Day 82 - SPATIAL UPGRADE! Now I know how far away you are! 📏🦊 Using depthai v3 API with SpatialDetectionNetwork + yolov6-nano """ import time import threading from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException from fastapi.responses import Response import depthai as dai import cv2 import numpy as np # ============== Configuration ============== DETECTION_MODEL = "yolov6-nano" # Has 'person' class PERSON_CLASS_ID = 0 # 'person' is class 0 in COCO DETECTION_THRESHOLD = 0.5 PRESENCE_TIMEOUT = 30.0 # seconds without person = not present DETECTION_INTERVAL = 0.5 # Spatial detection config DEPTH_LOWER_THRESHOLD = 100 # 10cm minimum DEPTH_UPPER_THRESHOLD = 10000 # 10m maximum # ============== Global State ============== pipeline_ctx = None detection_queue = None rgb_queue = None depth_queue = None detection_thread = None running = False labels = [] presence_state = { "present": False, "person_count": 0, "last_seen": None, "last_detection": None, "detections": [], "confidence": 0.0, # NEW: spatial data! "distance_mm": None, "spatial_x": None, "spatial_y": None, "spatial_z": None, } def init_oak(): """Initialize OAK-D with SPATIAL person detection pipeline (depthai v3).""" global pipeline_ctx, detection_queue, rgb_queue, depth_queue, labels try: print("🦊 Initializing OAK-D with SPATIAL yolov6-nano...") # Create pipeline pipeline = dai.Pipeline() # Create RGB camera node cam = pipeline.create(dai.node.Camera).build() # Request RGB output for snapshots (1080p) cam_out = cam.requestOutput((1920, 1080), dai.ImgFrame.Type.BGR888p) rgb_queue = cam_out.createOutputQueue(maxSize=1, blocking=False) # Create mono cameras for stereo depth monoLeft = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_B) monoRight = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_C) # Create stereo depth node stereo = pipeline.create(dai.node.StereoDepth) # Link mono cameras to stereo monoLeftOut = monoLeft.requestFullResolutionOutput() monoRightOut = monoRight.requestFullResolutionOutput() monoLeftOut.link(stereo.left) monoRightOut.link(stereo.right) # Configure stereo stereo.setRectification(True) stereo.setLeftRightCheck(True) stereo.setDepthAlign(dai.CameraBoardSocket.CAM_A) # Align depth to RGB # Create SPATIAL detection network desc = dai.NNModelDescription(DETECTION_MODEL) spatialDet = pipeline.create(dai.node.SpatialDetectionNetwork).build(cam, stereo, desc) spatialDet.setConfidenceThreshold(DETECTION_THRESHOLD) spatialDet.setDepthLowerThreshold(DEPTH_LOWER_THRESHOLD) spatialDet.setDepthUpperThreshold(DEPTH_UPPER_THRESHOLD) spatialDet.setBoundingBoxScaleFactor(0.5) # Get class labels labels = spatialDet.getClasses() print(f"✅ Loaded {len(labels)} classes, person={labels[0]}") # Create detection output queue detection_queue = spatialDet.out.createOutputQueue(maxSize=1, blocking=False) # Create depth output queue for visualization (optional) depth_out = stereo.depth.createOutputQueue(maxSize=1, blocking=False) depth_queue = depth_out # Start pipeline pipeline.start() pipeline_ctx = pipeline print("✅ OAK-D initialized with SPATIAL person detection!") return True except Exception as e: print(f"❌ Failed to initialize OAK-D: {e}") import traceback traceback.print_exc() return False def cleanup_oak(): """Cleanup OAK-D resources.""" global pipeline_ctx, running running = False if pipeline_ctx: try: pipeline_ctx.stop() pipeline_ctx.close() except: pass pipeline_ctx = None def detection_loop(): """Background thread for SPATIAL presence detection.""" global running, presence_state, detection_queue print("🔍 SPATIAL presence detection loop started") while running: try: if detection_queue is None: time.sleep(1) continue data = detection_queue.tryGet() if data is not None: now = time.time() presence_state["last_detection"] = now # Filter for person detections only persons = [d for d in data.detections if d.label == PERSON_CLASS_ID] person_count = len(persons) presence_state["person_count"] = person_count if person_count > 0: presence_state["present"] = True presence_state["last_seen"] = now # Get highest confidence detection best = max(persons, key=lambda d: d.confidence) presence_state["confidence"] = best.confidence # SPATIAL DATA! 🎉 presence_state["spatial_x"] = best.spatialCoordinates.x presence_state["spatial_y"] = best.spatialCoordinates.y presence_state["spatial_z"] = best.spatialCoordinates.z presence_state["distance_mm"] = best.spatialCoordinates.z # Z is depth presence_state["detections"] = [ { "xmin": d.xmin, "ymin": d.ymin, "xmax": d.xmax, "ymax": d.ymax, "confidence": d.confidence, # Spatial coordinates in mm "x_mm": d.spatialCoordinates.x, "y_mm": d.spatialCoordinates.y, "z_mm": d.spatialCoordinates.z, "distance_m": d.spatialCoordinates.z / 1000.0, } for d in persons ] else: presence_state["detections"] = [] presence_state["confidence"] = 0.0 presence_state["spatial_x"] = None presence_state["spatial_y"] = None presence_state["spatial_z"] = None presence_state["distance_mm"] = None # Check timeout if presence_state["last_seen"]: if now - presence_state["last_seen"] > PRESENCE_TIMEOUT: presence_state["present"] = False time.sleep(DETECTION_INTERVAL) except Exception as e: print(f"Detection loop error: {e}") time.sleep(1) print("🛑 SPATIAL presence detection loop stopped") @asynccontextmanager async def lifespan(app: FastAPI): """Startup and shutdown.""" global running, detection_thread print("🦊 Starting OAK-D SPATIAL Vision Service...") if init_oak(): running = True detection_thread = threading.Thread(target=detection_loop, daemon=True) detection_thread.start() print("✅ Service ready!") else: print("⚠️ OAK-D not available") yield print("👋 Shutting down...") cleanup_oak() app = FastAPI( title="OAK-D SPATIAL Vision Service", description="Vixy's eyes with SPATIAL presence detection! 🦊👀📏", version="0.4.0", lifespan=lifespan ) @app.get("/health") async def health(): """Health check.""" return { "status": "healthy", "service": "oak-service", "version": "0.4.0", "oak_connected": pipeline_ctx is not None, "detection_model": DETECTION_MODEL, "spatial_enabled": True, "timestamp": time.time() } @app.get("/presence") async def presence(): """Get current presence state with SPATIAL data - is Foxy there and how far?""" distance_m = None if presence_state["distance_mm"] is not None: distance_m = presence_state["distance_mm"] / 1000.0 return { "present": presence_state["present"], "person_count": presence_state["person_count"], "last_seen": presence_state["last_seen"], "seconds_since_seen": ( time.time() - presence_state["last_seen"] if presence_state["last_seen"] else None ), "confidence": presence_state["confidence"], # SPATIAL DATA "distance_mm": presence_state["distance_mm"], "distance_m": distance_m, "spatial": { "x_mm": presence_state["spatial_x"], "y_mm": presence_state["spatial_y"], "z_mm": presence_state["spatial_z"], } if presence_state["spatial_z"] else None, "timestamp": time.time() } @app.get("/detections") async def detections(): """Get detailed detection results with SPATIAL coordinates.""" return { "person_count": presence_state["person_count"], "detections": presence_state["detections"], "last_detection": presence_state["last_detection"], "timestamp": time.time() } @app.get("/snapshot") async def snapshot(): """Capture RGB frame.""" global rgb_queue if rgb_queue is None: raise HTTPException(status_code=503, detail="OAK-D not initialized") try: frame = rgb_queue.tryGet() if frame is None: raise HTTPException(status_code=503, detail="No frame available") img = frame.getCvFrame() _, jpeg = cv2.imencode(".jpg", img, [cv2.IMWRITE_JPEG_QUALITY, 85]) return Response(content=jpeg.tobytes(), media_type="image/jpeg") except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/depth") async def depth_frame(): """Capture colorized depth frame.""" global depth_queue if depth_queue is None: raise HTTPException(status_code=503, detail="Depth not available") try: frame = depth_queue.tryGet() if frame is None: raise HTTPException(status_code=503, detail="No depth frame available") depth_data = frame.getFrame() # Normalize and colorize depth_normalized = cv2.normalize(depth_data, None, 0, 255, cv2.NORM_MINMAX) depth_colored = cv2.applyColorMap(depth_normalized.astype(np.uint8), cv2.COLORMAP_JET) _, jpeg = cv2.imencode(".jpg", depth_colored, [cv2.IMWRITE_JPEG_QUALITY, 85]) return Response(content=jpeg.tobytes(), media_type="image/jpeg") except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8100)