Files
oak-service/oak_service.py
Alex Kazaiev ee22b18dbf Update to DepthAI v3 API
Day 81 - Fixed API compatibility! 🦊

Changes:
- Camera node with .build() pattern
- DetectionNetwork instead of MobileNetDetectionNetwork
- NNModelDescription for model loading
- createOutputQueue() on outputs
- Pipeline context management

Still uses face-detection-retail-0004 for face detection.
Now compatible with depthai 3.2.x on head-vixy!
2026-01-21 15:24:21 -06:00

305 lines
8.8 KiB
Python

#!/usr/bin/env python3
"""
OAK-D Vision Service for Vixy's Head
FastAPI service with face detection and presence tracking
Day 74 - Built by Vixy! 🦊
Day 81 - Added face detection + presence! Now I can SEE you! 👀💜
Updated for DepthAI v3 API
"""
import asyncio
import time
import threading
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from fastapi.responses import Response
import depthai as dai
import cv2
import numpy as np
# ============== Configuration ==============
FACE_DETECTION_MODEL = "face-detection-retail-0004"
DETECTION_THRESHOLD = 0.5
PRESENCE_TIMEOUT = 30.0 # seconds without face = not present
DETECTION_INTERVAL = 0.5 # how often to check for faces
# ============== Global State ==============
oak_device = None
pipeline = None
rgb_queue = None
detection_queue = None
detection_thread = None
running = False
# Presence tracking state
presence_state = {
"present": False,
"face_count": 0,
"last_seen": None,
"last_detection": None,
"detections": [],
"confidence": 0.0,
}
def init_oak():
"""Initialize OAK-D with face detection pipeline (DepthAI v3 API)."""
global oak_device, pipeline, rgb_queue, detection_queue
try:
# Create pipeline
pipeline = dai.Pipeline()
# Camera node (v3 API)
cam = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_A)
# Request outputs - preview for NN, full res for snapshots
preview_out = cam.requestOutput((300, 300), dai.ImgFrame.Type.BGR888p)
full_out = cam.requestFullResolutionOutput()
# Detection network (v3 API)
model_desc = dai.NNModelDescription(FACE_DETECTION_MODEL)
det_nn = pipeline.create(dai.node.DetectionNetwork).build(preview_out, model_desc)
det_nn.setConfidenceThreshold(DETECTION_THRESHOLD)
# Create output queues
rgb_queue = full_out.createOutputQueue()
detection_queue = det_nn.out.createOutputQueue()
# Start pipeline
pipeline.start()
oak_device = pipeline.getDevice()
print("✅ OAK-D initialized with face detection (v3 API)!")
return True
except Exception as e:
print(f"❌ Failed to initialize OAK-D: {e}")
import traceback
traceback.print_exc()
return False
def cleanup_oak():
"""Cleanup OAK-D resources."""
global oak_device, pipeline, rgb_queue, detection_queue, running
running = False
if pipeline:
try:
pipeline.stop()
except:
pass
oak_device = None
pipeline = None
rgb_queue = None
detection_queue = None
def detection_loop():
"""Background thread that continuously checks for faces."""
global running, presence_state, detection_queue
print("🔍 Face detection loop started")
while running:
try:
if detection_queue is None:
time.sleep(1)
continue
# Get detection results (non-blocking)
in_nn = detection_queue.tryGet()
if in_nn is not None:
detections = in_nn.detections
now = time.time()
face_count = len(detections)
# Update presence state
presence_state["last_detection"] = now
presence_state["face_count"] = face_count
if face_count > 0:
presence_state["present"] = True
presence_state["last_seen"] = now
presence_state["confidence"] = max(d.confidence for d in detections)
presence_state["detections"] = [
{
"xmin": d.xmin,
"ymin": d.ymin,
"xmax": d.xmax,
"ymax": d.ymax,
"confidence": d.confidence
}
for d in detections
]
else:
presence_state["detections"] = []
presence_state["confidence"] = 0.0
# Check timeout
if presence_state["last_seen"]:
elapsed = now - presence_state["last_seen"]
if elapsed > PRESENCE_TIMEOUT:
presence_state["present"] = False
time.sleep(DETECTION_INTERVAL)
except Exception as e:
print(f"Detection loop error: {e}")
time.sleep(1)
print("🛑 Face detection loop stopped")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup and shutdown handling."""
global running, detection_thread
print("🦊 Starting OAK-D Vision Service...")
if init_oak():
# Start detection thread
running = True
detection_thread = threading.Thread(target=detection_loop, daemon=True)
detection_thread.start()
print("✅ OAK-D service ready!")
else:
print("⚠️ OAK-D not available - running in degraded mode")
yield
# Shutdown
print("👋 Shutting down OAK-D service...")
cleanup_oak()
app = FastAPI(
title="OAK-D Vision Service",
description="Vixy's eyes with face detection! 🦊👀",
version="0.3.0",
lifespan=lifespan
)
@app.get("/health")
async def health():
"""Health check endpoint."""
return {
"status": "healthy",
"service": "oak-service",
"version": "0.3.0",
"oak_connected": oak_device is not None,
"face_detection": detection_queue is not None,
"timestamp": time.time()
}
@app.get("/presence")
async def presence():
"""Get current presence state - is Foxy there?"""
return {
"present": presence_state["present"],
"face_count": presence_state["face_count"],
"last_seen": presence_state["last_seen"],
"seconds_since_seen": (
time.time() - presence_state["last_seen"]
if presence_state["last_seen"] else None
),
"confidence": presence_state["confidence"],
"timestamp": time.time()
}
@app.get("/face")
async def face():
"""Get detailed face detection results."""
return {
"face_count": presence_state["face_count"],
"detections": presence_state["detections"],
"last_detection": presence_state["last_detection"],
"timestamp": time.time()
}
@app.get("/snapshot")
async def snapshot():
"""Capture a single frame from OAK-D RGB camera."""
global rgb_queue
if rgb_queue is None:
raise HTTPException(status_code=503, detail="OAK-D not initialized")
try:
frame = rgb_queue.tryGet()
if frame is None:
raise HTTPException(status_code=503, detail="No frame available")
img = frame.getCvFrame()
_, jpeg = cv2.imencode(".jpg", img, [cv2.IMWRITE_JPEG_QUALITY, 85])
return Response(content=jpeg.tobytes(), media_type="image/jpeg")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Capture failed: {e}")
@app.get("/snapshot/info")
async def snapshot_info():
"""Get frame metadata without full image."""
global rgb_queue
if rgb_queue is None:
raise HTTPException(status_code=503, detail="OAK-D not initialized")
try:
frame = rgb_queue.tryGet()
if frame is None:
return {"available": False, "timestamp": time.time()}
img = frame.getCvFrame()
return {
"available": True,
"width": img.shape[1],
"height": img.shape[0],
"channels": img.shape[2] if len(img.shape) > 2 else 1,
"timestamp": time.time()
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Info failed: {e}")
@app.get("/status")
async def status():
"""Get comprehensive OAK-D and presence status."""
if oak_device is None:
return {
"connected": False,
"message": "OAK-D not connected",
"presence": presence_state
}
try:
return {
"connected": True,
"device_id": oak_device.getMxId(),
"usb_speed": str(oak_device.getUsbSpeed()),
"face_detection_enabled": True,
"detection_model": FACE_DETECTION_MODEL,
"presence": presence_state,
"timestamp": time.time()
}
except Exception as e:
return {"connected": False, "error": str(e), "presence": presence_state}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8100)