#!/usr/bin/env python3 """ OAK-D Vision Service for Vixy's Head FastAPI service exposing OAK-D camera capabilities Day 74 - Built by Vixy! 🦊 """ import io import time from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException from fastapi.responses import Response, JSONResponse import depthai as dai import cv2 # Global device reference oak_device = None pipeline = None queue = None def init_oak(): """Initialize OAK-D camera with basic RGB pipeline.""" global oak_device, pipeline, queue try: oak_device = dai.Device() pipeline = dai.Pipeline(oak_device) cam = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_A) queue = cam.requestFullResolutionOutput().createOutputQueue() pipeline.start() return True except Exception as e: print(f"Failed to initialize OAK-D: {e}") return False def cleanup_oak(): """Cleanup OAK-D resources.""" global oak_device, pipeline, queue if pipeline: try: pipeline.stop() except: pass oak_device = None pipeline = None queue = None @asynccontextmanager async def lifespan(app: FastAPI): # Startup print("Starting OAK-D service...") if init_oak(): print("OAK-D initialized successfully!") else: print("Warning: OAK-D not available") yield # Shutdown print("Shutting down OAK-D service...") cleanup_oak() app = FastAPI( title="OAK-D Vision Service", description="Vixy's eyes! 🦊👀", version="0.1.0", lifespan=lifespan ) @app.get("/health") async def health(): """Health check endpoint.""" return { "status": "healthy", "service": "oak-service", "oak_connected": oak_device is not None, "timestamp": time.time() } @app.get("/snapshot") async def snapshot(): """Capture a single frame from OAK-D RGB camera.""" global queue if queue is None: raise HTTPException(status_code=503, detail="OAK-D not initialized") try: frame = queue.get() img = frame.getCvFrame() # Encode as JPEG _, jpeg = cv2.imencode(".jpg", img, [cv2.IMWRITE_JPEG_QUALITY, 85]) return Response( content=jpeg.tobytes(), media_type="image/jpeg" ) except Exception as e: raise HTTPException(status_code=500, detail=f"Capture failed: {e}") @app.get("/snapshot/info") async def snapshot_info(): """Capture frame and return metadata without image.""" global queue if queue is None: raise HTTPException(status_code=503, detail="OAK-D not initialized") try: frame = queue.get() img = frame.getCvFrame() return { "width": img.shape[1], "height": img.shape[0], "channels": img.shape[2], "timestamp": time.time() } except Exception as e: raise HTTPException(status_code=500, detail=f"Capture failed: {e}") @app.get("/status") async def status(): """Get OAK-D device status.""" if oak_device is None: return {"connected": False, "message": "OAK-D not connected"} try: return { "connected": True, "device_id": oak_device.getMxId(), "usb_speed": str(oak_device.getUsbSpeed()), "timestamp": time.time() } except Exception as e: return {"connected": False, "error": str(e)} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8100)