#!/usr/bin/env python3 """ vixy-vision Camera Server FastAPI server that serves snapshots from a USB camera with motion detection. Features: - API key authentication - HTTPS support - Thread-safe camera access - Auto-reconnect on camera failure - Motion detection with event reporting """ import os import cv2 import threading from typing import Optional from dotenv import load_dotenv from fastapi import FastAPI, Security, HTTPException, Response from fastapi.security import APIKeyHeader from motion import MotionDetector # Load environment variables load_dotenv() # Configuration API_KEY = os.getenv("API_KEY") CAMERA_INDEX = int(os.getenv("CAMERA_INDEX", "0")) CAMERA_WIDTH = int(os.getenv("CAMERA_WIDTH", "1920")) CAMERA_HEIGHT = int(os.getenv("CAMERA_HEIGHT", "1080")) JPEG_QUALITY = int(os.getenv("JPEG_QUALITY", "85")) # Motion detection config MOTION_ENABLED = os.getenv("MOTION_ENABLED", "false").lower() == "true" MOTION_THRESHOLD = int(os.getenv("MOTION_THRESHOLD", "25")) MOTION_MIN_AREA = float(os.getenv("MOTION_MIN_AREA", "0.5")) MOTION_COOLDOWN = float(os.getenv("MOTION_COOLDOWN", "5.0")) MOTION_INTERVAL = float(os.getenv("MOTION_INTERVAL", "0.5")) COLLECTOR_URL = os.getenv("COLLECTOR_URL", "") COLLECTOR_API_KEY = os.getenv("COLLECTOR_API_KEY", "") CAMERA_ID = os.getenv("CAMERA_ID", "camera") if not API_KEY: raise ValueError("API_KEY not set in .env file") # FastAPI app app = FastAPI( title="vixy-vision Camera Server", description="Camera snapshots + motion detection for the fox 🦊", version="2.0.0" ) api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) class CameraManager: """Thread-safe camera manager with auto-reconnect""" def __init__(self, camera_index: int = 0, width: int = 1920, height: int = 1080): self.camera_index = camera_index self.width = width self.height = height self.camera: Optional[cv2.VideoCapture] = None self.lock = threading.Lock() self._last_frame = None def _open_camera(self) -> bool: """Open camera connection""" try: self.camera = cv2.VideoCapture(self.camera_index) if not self.camera.isOpened(): return False self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.width) self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height) self.camera.set(cv2.CAP_PROP_BUFFERSIZE, 1) actual_width = int(self.camera.get(cv2.CAP_PROP_FRAME_WIDTH)) actual_height = int(self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT)) print(f"Camera: {actual_width}x{actual_height}") return True except Exception as e: print(f"Error opening camera: {e}") return False def get_raw_frame(self): """Get raw frame as numpy array (for motion detection)""" with self.lock: if self.camera is None or not self.camera.isOpened(): if not self._open_camera(): return None for _ in range(3): self.camera.grab() ret, frame = self.camera.read() if ret: self._last_frame = frame return frame return None def get_snapshot(self) -> Optional[bytes]: """Capture snapshot as JPEG bytes""" frame = self.get_raw_frame() if frame is None: return None try: ret, buffer = cv2.imencode( '.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, JPEG_QUALITY] ) return buffer.tobytes() if ret else None except Exception as e: print(f"Error encoding: {e}") return None def release(self): """Release camera""" if self.camera is not None: self.camera.release() self.camera = None def __del__(self): self.release() # Initialize camera camera_manager = CameraManager(CAMERA_INDEX, CAMERA_WIDTH, CAMERA_HEIGHT) # Initialize motion detector (if enabled) motion_detector: Optional[MotionDetector] = None if MOTION_ENABLED: motion_detector = MotionDetector( camera_id=CAMERA_ID, collector_url=COLLECTOR_URL if COLLECTOR_URL else None, collector_api_key=COLLECTOR_API_KEY if COLLECTOR_API_KEY else None, threshold=MOTION_THRESHOLD, min_area_percent=MOTION_MIN_AREA, cooldown_seconds=MOTION_COOLDOWN, check_interval=MOTION_INTERVAL, ) def verify_api_key(api_key: str = Security(api_key_header)) -> str: if api_key is None or api_key != API_KEY: raise HTTPException(status_code=403, detail="Invalid API key") return api_key @app.on_event("startup") def startup_event(): """Start motion detection on server startup""" if motion_detector: motion_detector.start(camera_manager.get_raw_frame) print(f"🦊 Motion detection enabled (camera: {CAMERA_ID})") @app.on_event("shutdown") def shutdown_event(): """Cleanup on shutdown""" if motion_detector: motion_detector.stop() camera_manager.release() @app.get("/") def root(): return { "service": "vixy-vision Camera Server", "version": "2.0.0", "camera_id": CAMERA_ID, "motion_enabled": MOTION_ENABLED, } @app.get("/health") def health(): return {"status": "ok", "camera_id": CAMERA_ID} @app.get("/snapshot") def get_snapshot(api_key: str = Security(verify_api_key)): """Get JPEG snapshot""" snapshot = camera_manager.get_snapshot() if snapshot is None: raise HTTPException(status_code=503, detail="Camera unavailable") return Response( content=snapshot, media_type="image/jpeg", headers={"Cache-Control": "no-cache"} ) @app.get("/motion/stats") def get_motion_stats(api_key: str = Security(verify_api_key)): """Get motion detection statistics""" if not motion_detector: return {"enabled": False, "message": "Motion detection not enabled"} return { "enabled": True, **motion_detector.get_stats() } @app.post("/motion/enable") def enable_motion(api_key: str = Security(verify_api_key)): """Enable motion detection""" global motion_detector if motion_detector and motion_detector._running: return {"status": "already running"} if not motion_detector: motion_detector = MotionDetector( camera_id=CAMERA_ID, collector_url=COLLECTOR_URL if COLLECTOR_URL else None, threshold=MOTION_THRESHOLD, min_area_percent=MOTION_MIN_AREA, cooldown_seconds=MOTION_COOLDOWN, ) motion_detector.start(camera_manager.get_raw_frame) return {"status": "started"} @app.post("/motion/disable") def disable_motion(api_key: str = Security(verify_api_key)): """Disable motion detection""" if motion_detector: motion_detector.stop() return {"status": "stopped"} return {"status": "not running"} if __name__ == "__main__": import uvicorn uvicorn.run( "main:app", host="0.0.0.0", port=8443, ssl_keyfile="ssl/key.pem", ssl_certfile="ssl/cert.pem" )