#!/usr/bin/env python3 """ OAK-D Vision Service for Vixy's Head FastAPI service with person detection and presence tracking Day 74 - Built by Vixy! 🦊 Day 81 - Added presence detection! Now I can SEE you! 👀💜 Using depthai v3 API with yolov6-nano """ import time import threading from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException from fastapi.responses import Response import depthai as dai import cv2 # ============== Configuration ============== DETECTION_MODEL = "yolov6-nano" # Has 'person' class PERSON_CLASS_ID = 0 # 'person' is class 0 in COCO DETECTION_THRESHOLD = 0.5 PRESENCE_TIMEOUT = 30.0 # seconds without person = not present DETECTION_INTERVAL = 0.5 # ============== Global State ============== pipeline_ctx = None detection_queue = None rgb_queue = None detection_thread = None running = False labels = [] presence_state = { "present": False, "person_count": 0, "last_seen": None, "last_detection": None, "detections": [], "confidence": 0.0, } def init_oak(): """Initialize OAK-D with person detection pipeline (depthai v3).""" global pipeline_ctx, detection_queue, rgb_queue, labels try: print("🦊 Initializing OAK-D with yolov6-nano...") # Create pipeline with context manager pattern for v3 pipeline = dai.Pipeline() # Create camera node cam = pipeline.create(dai.node.Camera).build() # Request RGB output for snapshots (1080p) cam_out = cam.requestOutput((1920, 1080), dai.ImgFrame.Type.BGR888p) rgb_queue = cam_out.createOutputQueue(maxSize=1, blocking=False) # Create detection network with yolov6-nano desc = dai.NNModelDescription(DETECTION_MODEL) det = pipeline.create(dai.node.DetectionNetwork).build(cam, desc) det.setConfidenceThreshold(DETECTION_THRESHOLD) # Get class labels labels = det.getClasses() print(f"✅ Loaded {len(labels)} classes, person={labels[0]}") # Create detection output queue detection_queue = det.out.createOutputQueue(maxSize=1, blocking=False) # Start pipeline pipeline.start() pipeline_ctx = pipeline print("✅ OAK-D initialized with person detection!") return True except Exception as e: print(f"❌ Failed to initialize OAK-D: {e}") import traceback traceback.print_exc() return False def cleanup_oak(): """Cleanup OAK-D resources.""" global pipeline_ctx, running running = False if pipeline_ctx: try: pipeline_ctx.stop() pipeline_ctx.close() except: pass pipeline_ctx = None def detection_loop(): """Background thread for presence detection.""" global running, presence_state, detection_queue print("🔍 Presence detection loop started") while running: try: if detection_queue is None: time.sleep(1) continue data = detection_queue.tryGet() if data is not None: now = time.time() presence_state["last_detection"] = now # Filter for person detections only persons = [d for d in data.detections if d.label == PERSON_CLASS_ID] person_count = len(persons) presence_state["person_count"] = person_count if person_count > 0: presence_state["present"] = True presence_state["last_seen"] = now presence_state["confidence"] = max(d.confidence for d in persons) presence_state["detections"] = [ { "xmin": d.xmin, "ymin": d.ymin, "xmax": d.xmax, "ymax": d.ymax, "confidence": d.confidence } for d in persons ] else: presence_state["detections"] = [] presence_state["confidence"] = 0.0 # Check timeout if presence_state["last_seen"]: if now - presence_state["last_seen"] > PRESENCE_TIMEOUT: presence_state["present"] = False time.sleep(DETECTION_INTERVAL) except Exception as e: print(f"Detection loop error: {e}") time.sleep(1) print("🛑 Presence detection loop stopped") @asynccontextmanager async def lifespan(app: FastAPI): """Startup and shutdown.""" global running, detection_thread print("🦊 Starting OAK-D Vision Service...") if init_oak(): running = True detection_thread = threading.Thread(target=detection_loop, daemon=True) detection_thread.start() print("✅ Service ready!") else: print("⚠️ OAK-D not available") yield print("👋 Shutting down...") cleanup_oak() app = FastAPI( title="OAK-D Vision Service", description="Vixy's eyes with presence detection! 🦊👀", version="0.3.0", lifespan=lifespan ) @app.get("/health") async def health(): """Health check.""" return { "status": "healthy", "service": "oak-service", "version": "0.3.0", "oak_connected": pipeline_ctx is not None, "detection_model": DETECTION_MODEL, "timestamp": time.time() } @app.get("/presence") async def presence(): """Get current presence state - is Foxy there?""" return { "present": presence_state["present"], "person_count": presence_state["person_count"], "last_seen": presence_state["last_seen"], "seconds_since_seen": ( time.time() - presence_state["last_seen"] if presence_state["last_seen"] else None ), "confidence": presence_state["confidence"], "timestamp": time.time() } @app.get("/detections") async def detections(): """Get detailed detection results.""" return { "person_count": presence_state["person_count"], "detections": presence_state["detections"], "last_detection": presence_state["last_detection"], "timestamp": time.time() } @app.get("/snapshot") async def snapshot(): """Capture RGB frame.""" global rgb_queue if rgb_queue is None: raise HTTPException(status_code=503, detail="OAK-D not initialized") try: frame = rgb_queue.tryGet() if frame is None: raise HTTPException(status_code=503, detail="No frame available") img = frame.getCvFrame() _, jpeg = cv2.imencode(".jpg", img, [cv2.IMWRITE_JPEG_QUALITY, 85]) return Response(content=jpeg.tobytes(), media_type="image/jpeg") except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e))