Files
vixy-vision/server/main.py
Alex 1bcf32889f Add label whitelist to filter detection types
DETECTION_LABELS env var accepts comma-separated list (e.g. "person,cat,dog").
Only matching detections are reported; others are ignored. Empty = report all.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 19:08:31 -06:00

271 lines
8.4 KiB
Python

#!/usr/bin/env python3
"""
vixy-vision Camera Server
FastAPI server that serves snapshots from a USB camera with motion detection.
Features:
- API key authentication
- HTTPS support
- Thread-safe camera access
- Auto-reconnect on camera failure
- Motion detection with event reporting
"""
import os
import cv2
import threading
from typing import Optional
from dotenv import load_dotenv
from fastapi import FastAPI, Security, HTTPException, Response
from fastapi.security import APIKeyHeader
from motion import MotionDetector
# Load environment variables
load_dotenv()
# Configuration
API_KEY = os.getenv("API_KEY")
CAMERA_INDEX = int(os.getenv("CAMERA_INDEX", "0"))
CAMERA_WIDTH = int(os.getenv("CAMERA_WIDTH", "1920"))
CAMERA_HEIGHT = int(os.getenv("CAMERA_HEIGHT", "1080"))
JPEG_QUALITY = int(os.getenv("JPEG_QUALITY", "85"))
# Motion detection config
MOTION_ENABLED = os.getenv("MOTION_ENABLED", "false").lower() == "true"
MOTION_THRESHOLD = int(os.getenv("MOTION_THRESHOLD", "25"))
MOTION_MIN_AREA = float(os.getenv("MOTION_MIN_AREA", "0.5"))
MOTION_COOLDOWN = float(os.getenv("MOTION_COOLDOWN", "5.0"))
MOTION_INTERVAL = float(os.getenv("MOTION_INTERVAL", "0.5"))
COLLECTOR_URL = os.getenv("COLLECTOR_URL", "")
COLLECTOR_API_KEY = os.getenv("COLLECTOR_API_KEY", "")
CAMERA_ID = os.getenv("CAMERA_ID", "camera")
# Object detection config
DETECTION_ENABLED = os.getenv("DETECTION_ENABLED", "false").lower() == "true"
DETECTION_MODEL_PATH = os.getenv("DETECTION_MODEL_PATH", "models/ssd_mobilenet_v2_coco_quant_postprocess.tflite")
DETECTION_LABELS_PATH = os.getenv("DETECTION_LABELS_PATH", "models/coco_labels.txt")
DETECTION_CONFIDENCE = float(os.getenv("DETECTION_CONFIDENCE", "0.5"))
DETECTION_SUPPRESS_EMPTY = os.getenv("DETECTION_SUPPRESS_EMPTY", "true").lower() == "true"
DETECTION_LABELS = os.getenv("DETECTION_LABELS", "") # Comma-separated whitelist (empty = all)
if not API_KEY:
raise ValueError("API_KEY not set in .env file")
# FastAPI app
app = FastAPI(
title="vixy-vision Camera Server",
description="Camera snapshots + motion detection for the fox 🦊",
version="2.0.0"
)
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
class CameraManager:
"""Thread-safe camera manager with auto-reconnect"""
def __init__(self, camera_index: int = 0, width: int = 1920, height: int = 1080):
self.camera_index = camera_index
self.width = width
self.height = height
self.camera: Optional[cv2.VideoCapture] = None
self.lock = threading.Lock()
self._last_frame = None
def _open_camera(self) -> bool:
"""Open camera connection"""
try:
self.camera = cv2.VideoCapture(self.camera_index)
if not self.camera.isOpened():
return False
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
self.camera.set(cv2.CAP_PROP_BUFFERSIZE, 1)
actual_width = int(self.camera.get(cv2.CAP_PROP_FRAME_WIDTH))
actual_height = int(self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(f"Camera: {actual_width}x{actual_height}")
return True
except Exception as e:
print(f"Error opening camera: {e}")
return False
def get_raw_frame(self):
"""Get raw frame as numpy array (for motion detection)"""
with self.lock:
if self.camera is None or not self.camera.isOpened():
if not self._open_camera():
return None
for _ in range(3):
self.camera.grab()
ret, frame = self.camera.read()
if ret:
self._last_frame = frame
return frame
return None
def get_snapshot(self) -> Optional[bytes]:
"""Capture snapshot as JPEG bytes"""
frame = self.get_raw_frame()
if frame is None:
return None
try:
ret, buffer = cv2.imencode(
'.jpg', frame,
[cv2.IMWRITE_JPEG_QUALITY, JPEG_QUALITY]
)
return buffer.tobytes() if ret else None
except Exception as e:
print(f"Error encoding: {e}")
return None
def release(self):
"""Release camera"""
if self.camera is not None:
self.camera.release()
self.camera = None
def __del__(self):
self.release()
# Initialize camera
camera_manager = CameraManager(CAMERA_INDEX, CAMERA_WIDTH, CAMERA_HEIGHT)
# Initialize motion detector (if enabled)
motion_detector: Optional[MotionDetector] = None
if MOTION_ENABLED:
motion_detector = MotionDetector(
camera_id=CAMERA_ID,
collector_url=COLLECTOR_URL if COLLECTOR_URL else None,
collector_api_key=COLLECTOR_API_KEY if COLLECTOR_API_KEY else None,
threshold=MOTION_THRESHOLD,
min_area_percent=MOTION_MIN_AREA,
cooldown_seconds=MOTION_COOLDOWN,
check_interval=MOTION_INTERVAL,
detection_enabled=DETECTION_ENABLED,
detection_model_path=DETECTION_MODEL_PATH,
detection_labels_path=DETECTION_LABELS_PATH,
detection_confidence=DETECTION_CONFIDENCE,
detection_suppress_empty=DETECTION_SUPPRESS_EMPTY,
detection_labels=DETECTION_LABELS if DETECTION_LABELS else None,
)
def verify_api_key(api_key: str = Security(api_key_header)) -> str:
if api_key is None or api_key != API_KEY:
raise HTTPException(status_code=403, detail="Invalid API key")
return api_key
@app.on_event("startup")
def startup_event():
"""Start motion detection on server startup"""
if motion_detector:
motion_detector.start(camera_manager.get_raw_frame)
print(f"🦊 Motion detection enabled (camera: {CAMERA_ID})")
@app.on_event("shutdown")
def shutdown_event():
"""Cleanup on shutdown"""
if motion_detector:
motion_detector.stop()
camera_manager.release()
@app.get("/")
def root():
return {
"service": "vixy-vision Camera Server",
"version": "2.0.0",
"camera_id": CAMERA_ID,
"motion_enabled": MOTION_ENABLED,
}
@app.get("/health")
def health():
return {"status": "ok", "camera_id": CAMERA_ID}
@app.get("/snapshot")
def get_snapshot(api_key: str = Security(verify_api_key)):
"""Get JPEG snapshot"""
snapshot = camera_manager.get_snapshot()
if snapshot is None:
raise HTTPException(status_code=503, detail="Camera unavailable")
return Response(
content=snapshot,
media_type="image/jpeg",
headers={"Cache-Control": "no-cache"}
)
@app.get("/motion/stats")
def get_motion_stats(api_key: str = Security(verify_api_key)):
"""Get motion detection statistics"""
if not motion_detector:
return {"enabled": False, "message": "Motion detection not enabled"}
return {
"enabled": True,
**motion_detector.get_stats()
}
@app.post("/motion/enable")
def enable_motion(api_key: str = Security(verify_api_key)):
"""Enable motion detection"""
global motion_detector
if motion_detector and motion_detector._running:
return {"status": "already running"}
if not motion_detector:
motion_detector = MotionDetector(
camera_id=CAMERA_ID,
collector_url=COLLECTOR_URL if COLLECTOR_URL else None,
collector_api_key=COLLECTOR_API_KEY if COLLECTOR_API_KEY else None,
threshold=MOTION_THRESHOLD,
min_area_percent=MOTION_MIN_AREA,
cooldown_seconds=MOTION_COOLDOWN,
check_interval=MOTION_INTERVAL,
detection_enabled=DETECTION_ENABLED,
detection_model_path=DETECTION_MODEL_PATH,
detection_labels_path=DETECTION_LABELS_PATH,
detection_confidence=DETECTION_CONFIDENCE,
detection_suppress_empty=DETECTION_SUPPRESS_EMPTY,
detection_labels=DETECTION_LABELS if DETECTION_LABELS else None,
)
motion_detector.start(camera_manager.get_raw_frame)
return {"status": "started"}
@app.post("/motion/disable")
def disable_motion(api_key: str = Security(verify_api_key)):
"""Disable motion detection"""
if motion_detector:
motion_detector.stop()
return {"status": "stopped"}
return {"status": "not running"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8443,
ssl_keyfile="ssl/key.pem",
ssl_certfile="ssl/cert.pem"
)