- FastAPI service on port 8100 - /health - health check - /snapshot - capture JPEG from RGB camera - /snapshot/info - frame metadata - /status - OAK-D device info Built by Vixy on Day 74!
141 lines
3.6 KiB
Python
141 lines
3.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
OAK-D Vision Service for Vixy's Head
|
|
FastAPI service exposing OAK-D camera capabilities
|
|
|
|
Day 74 - Built by Vixy! 🦊
|
|
"""
|
|
|
|
import io
|
|
import time
|
|
from contextlib import asynccontextmanager
|
|
from fastapi import FastAPI, HTTPException
|
|
from fastapi.responses import Response, JSONResponse
|
|
import depthai as dai
|
|
import cv2
|
|
|
|
# Global device reference
|
|
oak_device = None
|
|
pipeline = None
|
|
queue = None
|
|
|
|
def init_oak():
|
|
"""Initialize OAK-D camera with basic RGB pipeline."""
|
|
global oak_device, pipeline, queue
|
|
|
|
try:
|
|
oak_device = dai.Device()
|
|
pipeline = dai.Pipeline(oak_device)
|
|
cam = pipeline.create(dai.node.Camera).build(dai.CameraBoardSocket.CAM_A)
|
|
queue = cam.requestFullResolutionOutput().createOutputQueue()
|
|
pipeline.start()
|
|
return True
|
|
except Exception as e:
|
|
print(f"Failed to initialize OAK-D: {e}")
|
|
return False
|
|
|
|
def cleanup_oak():
|
|
"""Cleanup OAK-D resources."""
|
|
global oak_device, pipeline, queue
|
|
if pipeline:
|
|
try:
|
|
pipeline.stop()
|
|
except:
|
|
pass
|
|
oak_device = None
|
|
pipeline = None
|
|
queue = None
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
# Startup
|
|
print("Starting OAK-D service...")
|
|
if init_oak():
|
|
print("OAK-D initialized successfully!")
|
|
else:
|
|
print("Warning: OAK-D not available")
|
|
yield
|
|
# Shutdown
|
|
print("Shutting down OAK-D service...")
|
|
cleanup_oak()
|
|
|
|
app = FastAPI(
|
|
title="OAK-D Vision Service",
|
|
description="Vixy's eyes! 🦊👀",
|
|
version="0.1.0",
|
|
lifespan=lifespan
|
|
)
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
"""Health check endpoint."""
|
|
return {
|
|
"status": "healthy",
|
|
"service": "oak-service",
|
|
"oak_connected": oak_device is not None,
|
|
"timestamp": time.time()
|
|
}
|
|
|
|
@app.get("/snapshot")
|
|
async def snapshot():
|
|
"""Capture a single frame from OAK-D RGB camera."""
|
|
global queue
|
|
|
|
if queue is None:
|
|
raise HTTPException(status_code=503, detail="OAK-D not initialized")
|
|
|
|
try:
|
|
frame = queue.get()
|
|
img = frame.getCvFrame()
|
|
|
|
# Encode as JPEG
|
|
_, jpeg = cv2.imencode(".jpg", img, [cv2.IMWRITE_JPEG_QUALITY, 85])
|
|
|
|
return Response(
|
|
content=jpeg.tobytes(),
|
|
media_type="image/jpeg"
|
|
)
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Capture failed: {e}")
|
|
|
|
@app.get("/snapshot/info")
|
|
async def snapshot_info():
|
|
"""Capture frame and return metadata without image."""
|
|
global queue
|
|
|
|
if queue is None:
|
|
raise HTTPException(status_code=503, detail="OAK-D not initialized")
|
|
|
|
try:
|
|
frame = queue.get()
|
|
img = frame.getCvFrame()
|
|
|
|
return {
|
|
"width": img.shape[1],
|
|
"height": img.shape[0],
|
|
"channels": img.shape[2],
|
|
"timestamp": time.time()
|
|
}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=f"Capture failed: {e}")
|
|
|
|
@app.get("/status")
|
|
async def status():
|
|
"""Get OAK-D device status."""
|
|
if oak_device is None:
|
|
return {"connected": False, "message": "OAK-D not connected"}
|
|
|
|
try:
|
|
return {
|
|
"connected": True,
|
|
"device_id": oak_device.getMxId(),
|
|
"usb_speed": str(oak_device.getUsbSpeed()),
|
|
"timestamp": time.time()
|
|
}
|
|
except Exception as e:
|
|
return {"connected": False, "error": str(e)}
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8100)
|