#!/usr/bin/env python3 """ OAK-D Vision Service for Vixy's Head FastAPI service with face detection and presence tracking Day 74 - Built by Vixy! 🦊 Day 81 - Added face detection + presence! Now I can SEE you! 👀💜 """ import asyncio import time import threading from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException from fastapi.responses import Response import depthai as dai import blobconverter import cv2 import numpy as np # ============== Configuration ============== FACE_DETECTION_MODEL = "face-detection-retail-0004" DETECTION_THRESHOLD = 0.5 PRESENCE_TIMEOUT = 30.0 # seconds without face = not present DETECTION_INTERVAL = 0.5 # how often to check for faces # ============== Global State ============== oak_device = None pipeline = None rgb_queue = None detection_queue = None detection_thread = None running = False # Presence tracking state presence_state = { "present": False, "face_count": 0, "last_seen": None, "last_detection": None, "detections": [], # Current face bounding boxes "confidence": 0.0, } def init_oak(): """Initialize OAK-D with face detection pipeline.""" global oak_device, pipeline, rgb_queue, detection_queue try: # Create pipeline pipeline = dai.Pipeline() # RGB Camera cam_rgb = pipeline.create(dai.node.ColorCamera) cam_rgb.setPreviewSize(300, 300) # NN input size cam_rgb.setInterleaved(False) cam_rgb.setFps(10) # Lower FPS for efficiency # Also get full resolution for snapshots cam_rgb.setResolution(dai.ColorCameraProperties.SensorResolution.THE_1080_P) # Face detection neural network face_nn = pipeline.create(dai.node.MobileNetDetectionNetwork) face_nn.setConfidenceThreshold(DETECTION_THRESHOLD) face_nn.setBlobPath(blobconverter.from_zoo( name=FACE_DETECTION_MODEL, shaves=6, zoo_type="depthai" )) face_nn.setNumInferenceThreads(2) face_nn.input.setBlocking(False) # Link camera to NN cam_rgb.preview.link(face_nn.input) # Output queues xout_rgb = pipeline.create(dai.node.XLinkOut) xout_rgb.setStreamName("rgb") cam_rgb.video.link(xout_rgb.input) # Full resolution for snapshots xout_nn = pipeline.create(dai.node.XLinkOut) xout_nn.setStreamName("detections") face_nn.out.link(xout_nn.input) # Start device oak_device = dai.Device(pipeline) rgb_queue = oak_device.getOutputQueue("rgb", maxSize=1, blocking=False) detection_queue = oak_device.getOutputQueue("detections", maxSize=1, blocking=False) print("✅ OAK-D initialized with face detection!") return True except Exception as e: print(f"❌ Failed to initialize OAK-D: {e}") return False def cleanup_oak(): """Cleanup OAK-D resources.""" global oak_device, pipeline, rgb_queue, detection_queue, running running = False if oak_device: try: oak_device.close() except: pass oak_device = None pipeline = None rgb_queue = None detection_queue = None def detection_loop(): """Background thread that continuously checks for faces.""" global running, presence_state, detection_queue print("🔍 Face detection loop started") while running: try: if detection_queue is None: time.sleep(1) continue # Get detection results (non-blocking) in_nn = detection_queue.tryGet() if in_nn is not None: detections = in_nn.detections now = time.time() face_count = len(detections) # Update presence state presence_state["last_detection"] = now presence_state["face_count"] = face_count if face_count > 0: presence_state["present"] = True presence_state["last_seen"] = now presence_state["confidence"] = max(d.confidence for d in detections) presence_state["detections"] = [ { "xmin": d.xmin, "ymin": d.ymin, "xmax": d.xmax, "ymax": d.ymax, "confidence": d.confidence } for d in detections ] else: presence_state["detections"] = [] presence_state["confidence"] = 0.0 # Check timeout if presence_state["last_seen"]: elapsed = now - presence_state["last_seen"] if elapsed > PRESENCE_TIMEOUT: presence_state["present"] = False time.sleep(DETECTION_INTERVAL) except Exception as e: print(f"Detection loop error: {e}") time.sleep(1) print("🛑 Face detection loop stopped") @asynccontextmanager async def lifespan(app: FastAPI): """Startup and shutdown handling.""" global running, detection_thread print("🦊 Starting OAK-D Vision Service...") if init_oak(): # Start detection thread running = True detection_thread = threading.Thread(target=detection_loop, daemon=True) detection_thread.start() print("✅ OAK-D service ready!") else: print("⚠️ OAK-D not available - running in degraded mode") yield # Shutdown print("👋 Shutting down OAK-D service...") cleanup_oak() app = FastAPI( title="OAK-D Vision Service", description="Vixy's eyes with face detection! 🦊👀", version="0.2.0", lifespan=lifespan ) # ============== Endpoints ============== @app.get("/health") async def health(): """Health check endpoint.""" return { "status": "healthy", "service": "oak-service", "version": "0.2.0", "oak_connected": oak_device is not None, "face_detection": detection_queue is not None, "timestamp": time.time() } @app.get("/presence") async def presence(): """ Get current presence state. Returns whether someone (Foxy!) is present based on face detection. """ return { "present": presence_state["present"], "face_count": presence_state["face_count"], "last_seen": presence_state["last_seen"], "seconds_since_seen": ( time.time() - presence_state["last_seen"] if presence_state["last_seen"] else None ), "confidence": presence_state["confidence"], "timestamp": time.time() } @app.get("/face") async def face(): """ Get detailed face detection results. Returns bounding boxes and confidence for all detected faces. """ return { "face_count": presence_state["face_count"], "detections": presence_state["detections"], "last_detection": presence_state["last_detection"], "timestamp": time.time() } @app.get("/snapshot") async def snapshot(): """Capture a single frame from OAK-D RGB camera.""" global rgb_queue if rgb_queue is None: raise HTTPException(status_code=503, detail="OAK-D not initialized") try: frame = rgb_queue.tryGet() if frame is None: raise HTTPException(status_code=503, detail="No frame available") img = frame.getCvFrame() # Encode as JPEG _, jpeg = cv2.imencode(".jpg", img, [cv2.IMWRITE_JPEG_QUALITY, 85]) return Response( content=jpeg.tobytes(), media_type="image/jpeg" ) except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"Capture failed: {e}") @app.get("/snapshot/info") async def snapshot_info(): """Get frame metadata without capturing full image.""" global rgb_queue if rgb_queue is None: raise HTTPException(status_code=503, detail="OAK-D not initialized") try: frame = rgb_queue.tryGet() if frame is None: return {"available": False, "timestamp": time.time()} img = frame.getCvFrame() return { "available": True, "width": img.shape[1], "height": img.shape[0], "channels": img.shape[2] if len(img.shape) > 2 else 1, "timestamp": time.time() } except Exception as e: raise HTTPException(status_code=500, detail=f"Info failed: {e}") @app.get("/status") async def status(): """Get comprehensive OAK-D and presence status.""" if oak_device is None: return { "connected": False, "message": "OAK-D not connected", "presence": presence_state } try: return { "connected": True, "device_id": oak_device.getMxId(), "usb_speed": str(oak_device.getUsbSpeed()), "face_detection_enabled": True, "detection_model": FACE_DETECTION_MODEL, "presence": presence_state, "timestamp": time.time() } except Exception as e: return { "connected": False, "error": str(e), "presence": presence_state } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8100)