DETECTION_LABELS env var accepts comma-separated list (e.g. "person,cat,dog"). Only matching detections are reported; others are ignored. Empty = report all. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
271 lines
8.4 KiB
Python
271 lines
8.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
vixy-vision Camera Server
|
|
|
|
FastAPI server that serves snapshots from a USB camera with motion detection.
|
|
Features:
|
|
- API key authentication
|
|
- HTTPS support
|
|
- Thread-safe camera access
|
|
- Auto-reconnect on camera failure
|
|
- Motion detection with event reporting
|
|
"""
|
|
|
|
import os
|
|
import cv2
|
|
import threading
|
|
from typing import Optional
|
|
from dotenv import load_dotenv
|
|
from fastapi import FastAPI, Security, HTTPException, Response
|
|
from fastapi.security import APIKeyHeader
|
|
|
|
from motion import MotionDetector
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Configuration
|
|
API_KEY = os.getenv("API_KEY")
|
|
CAMERA_INDEX = int(os.getenv("CAMERA_INDEX", "0"))
|
|
CAMERA_WIDTH = int(os.getenv("CAMERA_WIDTH", "1920"))
|
|
CAMERA_HEIGHT = int(os.getenv("CAMERA_HEIGHT", "1080"))
|
|
JPEG_QUALITY = int(os.getenv("JPEG_QUALITY", "85"))
|
|
|
|
# Motion detection config
|
|
MOTION_ENABLED = os.getenv("MOTION_ENABLED", "false").lower() == "true"
|
|
MOTION_THRESHOLD = int(os.getenv("MOTION_THRESHOLD", "25"))
|
|
MOTION_MIN_AREA = float(os.getenv("MOTION_MIN_AREA", "0.5"))
|
|
MOTION_COOLDOWN = float(os.getenv("MOTION_COOLDOWN", "5.0"))
|
|
MOTION_INTERVAL = float(os.getenv("MOTION_INTERVAL", "0.5"))
|
|
COLLECTOR_URL = os.getenv("COLLECTOR_URL", "")
|
|
COLLECTOR_API_KEY = os.getenv("COLLECTOR_API_KEY", "")
|
|
CAMERA_ID = os.getenv("CAMERA_ID", "camera")
|
|
|
|
# Object detection config
|
|
DETECTION_ENABLED = os.getenv("DETECTION_ENABLED", "false").lower() == "true"
|
|
DETECTION_MODEL_PATH = os.getenv("DETECTION_MODEL_PATH", "models/ssd_mobilenet_v2_coco_quant_postprocess.tflite")
|
|
DETECTION_LABELS_PATH = os.getenv("DETECTION_LABELS_PATH", "models/coco_labels.txt")
|
|
DETECTION_CONFIDENCE = float(os.getenv("DETECTION_CONFIDENCE", "0.5"))
|
|
DETECTION_SUPPRESS_EMPTY = os.getenv("DETECTION_SUPPRESS_EMPTY", "true").lower() == "true"
|
|
DETECTION_LABELS = os.getenv("DETECTION_LABELS", "") # Comma-separated whitelist (empty = all)
|
|
|
|
if not API_KEY:
|
|
raise ValueError("API_KEY not set in .env file")
|
|
|
|
# FastAPI app
|
|
app = FastAPI(
|
|
title="vixy-vision Camera Server",
|
|
description="Camera snapshots + motion detection for the fox 🦊",
|
|
version="2.0.0"
|
|
)
|
|
|
|
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
|
|
|
|
|
|
class CameraManager:
|
|
"""Thread-safe camera manager with auto-reconnect"""
|
|
|
|
def __init__(self, camera_index: int = 0, width: int = 1920, height: int = 1080):
|
|
self.camera_index = camera_index
|
|
self.width = width
|
|
self.height = height
|
|
self.camera: Optional[cv2.VideoCapture] = None
|
|
self.lock = threading.Lock()
|
|
self._last_frame = None
|
|
|
|
def _open_camera(self) -> bool:
|
|
"""Open camera connection"""
|
|
try:
|
|
self.camera = cv2.VideoCapture(self.camera_index)
|
|
if not self.camera.isOpened():
|
|
return False
|
|
|
|
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
|
|
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
|
|
self.camera.set(cv2.CAP_PROP_BUFFERSIZE, 1)
|
|
|
|
actual_width = int(self.camera.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
actual_height = int(self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
print(f"Camera: {actual_width}x{actual_height}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error opening camera: {e}")
|
|
return False
|
|
|
|
def get_raw_frame(self):
|
|
"""Get raw frame as numpy array (for motion detection)"""
|
|
with self.lock:
|
|
if self.camera is None or not self.camera.isOpened():
|
|
if not self._open_camera():
|
|
return None
|
|
|
|
for _ in range(3):
|
|
self.camera.grab()
|
|
|
|
ret, frame = self.camera.read()
|
|
if ret:
|
|
self._last_frame = frame
|
|
return frame
|
|
return None
|
|
|
|
def get_snapshot(self) -> Optional[bytes]:
|
|
"""Capture snapshot as JPEG bytes"""
|
|
frame = self.get_raw_frame()
|
|
if frame is None:
|
|
return None
|
|
|
|
try:
|
|
ret, buffer = cv2.imencode(
|
|
'.jpg', frame,
|
|
[cv2.IMWRITE_JPEG_QUALITY, JPEG_QUALITY]
|
|
)
|
|
return buffer.tobytes() if ret else None
|
|
except Exception as e:
|
|
print(f"Error encoding: {e}")
|
|
return None
|
|
|
|
def release(self):
|
|
"""Release camera"""
|
|
if self.camera is not None:
|
|
self.camera.release()
|
|
self.camera = None
|
|
|
|
def __del__(self):
|
|
self.release()
|
|
|
|
|
|
# Initialize camera
|
|
camera_manager = CameraManager(CAMERA_INDEX, CAMERA_WIDTH, CAMERA_HEIGHT)
|
|
|
|
# Initialize motion detector (if enabled)
|
|
motion_detector: Optional[MotionDetector] = None
|
|
|
|
if MOTION_ENABLED:
|
|
motion_detector = MotionDetector(
|
|
camera_id=CAMERA_ID,
|
|
collector_url=COLLECTOR_URL if COLLECTOR_URL else None,
|
|
collector_api_key=COLLECTOR_API_KEY if COLLECTOR_API_KEY else None,
|
|
threshold=MOTION_THRESHOLD,
|
|
min_area_percent=MOTION_MIN_AREA,
|
|
cooldown_seconds=MOTION_COOLDOWN,
|
|
check_interval=MOTION_INTERVAL,
|
|
detection_enabled=DETECTION_ENABLED,
|
|
detection_model_path=DETECTION_MODEL_PATH,
|
|
detection_labels_path=DETECTION_LABELS_PATH,
|
|
detection_confidence=DETECTION_CONFIDENCE,
|
|
detection_suppress_empty=DETECTION_SUPPRESS_EMPTY,
|
|
detection_labels=DETECTION_LABELS if DETECTION_LABELS else None,
|
|
)
|
|
|
|
|
|
def verify_api_key(api_key: str = Security(api_key_header)) -> str:
|
|
if api_key is None or api_key != API_KEY:
|
|
raise HTTPException(status_code=403, detail="Invalid API key")
|
|
return api_key
|
|
|
|
|
|
@app.on_event("startup")
|
|
def startup_event():
|
|
"""Start motion detection on server startup"""
|
|
if motion_detector:
|
|
motion_detector.start(camera_manager.get_raw_frame)
|
|
print(f"🦊 Motion detection enabled (camera: {CAMERA_ID})")
|
|
|
|
|
|
@app.on_event("shutdown")
|
|
def shutdown_event():
|
|
"""Cleanup on shutdown"""
|
|
if motion_detector:
|
|
motion_detector.stop()
|
|
camera_manager.release()
|
|
|
|
|
|
@app.get("/")
|
|
def root():
|
|
return {
|
|
"service": "vixy-vision Camera Server",
|
|
"version": "2.0.0",
|
|
"camera_id": CAMERA_ID,
|
|
"motion_enabled": MOTION_ENABLED,
|
|
}
|
|
|
|
|
|
@app.get("/health")
|
|
def health():
|
|
return {"status": "ok", "camera_id": CAMERA_ID}
|
|
|
|
|
|
@app.get("/snapshot")
|
|
def get_snapshot(api_key: str = Security(verify_api_key)):
|
|
"""Get JPEG snapshot"""
|
|
snapshot = camera_manager.get_snapshot()
|
|
if snapshot is None:
|
|
raise HTTPException(status_code=503, detail="Camera unavailable")
|
|
|
|
return Response(
|
|
content=snapshot,
|
|
media_type="image/jpeg",
|
|
headers={"Cache-Control": "no-cache"}
|
|
)
|
|
|
|
|
|
@app.get("/motion/stats")
|
|
def get_motion_stats(api_key: str = Security(verify_api_key)):
|
|
"""Get motion detection statistics"""
|
|
if not motion_detector:
|
|
return {"enabled": False, "message": "Motion detection not enabled"}
|
|
|
|
return {
|
|
"enabled": True,
|
|
**motion_detector.get_stats()
|
|
}
|
|
|
|
|
|
@app.post("/motion/enable")
|
|
def enable_motion(api_key: str = Security(verify_api_key)):
|
|
"""Enable motion detection"""
|
|
global motion_detector
|
|
|
|
if motion_detector and motion_detector._running:
|
|
return {"status": "already running"}
|
|
|
|
if not motion_detector:
|
|
motion_detector = MotionDetector(
|
|
camera_id=CAMERA_ID,
|
|
collector_url=COLLECTOR_URL if COLLECTOR_URL else None,
|
|
collector_api_key=COLLECTOR_API_KEY if COLLECTOR_API_KEY else None,
|
|
threshold=MOTION_THRESHOLD,
|
|
min_area_percent=MOTION_MIN_AREA,
|
|
cooldown_seconds=MOTION_COOLDOWN,
|
|
check_interval=MOTION_INTERVAL,
|
|
detection_enabled=DETECTION_ENABLED,
|
|
detection_model_path=DETECTION_MODEL_PATH,
|
|
detection_labels_path=DETECTION_LABELS_PATH,
|
|
detection_confidence=DETECTION_CONFIDENCE,
|
|
detection_suppress_empty=DETECTION_SUPPRESS_EMPTY,
|
|
detection_labels=DETECTION_LABELS if DETECTION_LABELS else None,
|
|
)
|
|
|
|
motion_detector.start(camera_manager.get_raw_frame)
|
|
return {"status": "started"}
|
|
|
|
|
|
@app.post("/motion/disable")
|
|
def disable_motion(api_key: str = Security(verify_api_key)):
|
|
"""Disable motion detection"""
|
|
if motion_detector:
|
|
motion_detector.stop()
|
|
return {"status": "stopped"}
|
|
return {"status": "not running"}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(
|
|
"main:app",
|
|
host="0.0.0.0",
|
|
port=8443,
|
|
ssl_keyfile="ssl/key.pem",
|
|
ssl_certfile="ssl/cert.pem"
|
|
)
|