#!/usr/bin/env python3 """ Camera Snapshot Server Simple FastAPI server that serves snapshots from a USB camera. Features: - API key authentication - HTTPS support - Thread-safe camera access - Auto-reconnect on camera failure """ import os import cv2 import threading import secrets from typing import Optional from dotenv import load_dotenv from fastapi import FastAPI, Security, HTTPException, Response from fastapi.security import APIKeyHeader from fastapi.responses import JSONResponse # Load environment variables load_dotenv() # Configuration API_KEY = os.getenv("API_KEY") CAMERA_INDEX = int(os.getenv("CAMERA_INDEX", "0")) CAMERA_WIDTH = int(os.getenv("CAMERA_WIDTH", "1920")) CAMERA_HEIGHT = int(os.getenv("CAMERA_HEIGHT", "1080")) JPEG_QUALITY = int(os.getenv("JPEG_QUALITY", "85")) if not API_KEY: raise ValueError("API_KEY not set in .env file. Generate one with: python3 -c 'import secrets; print(secrets.token_urlsafe(32))'") # FastAPI app app = FastAPI( title="Camera Snapshot Server", description="Serves snapshots from USB camera with API key authentication", version="1.0.0" ) # API Key authentication api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) class CameraManager: """Thread-safe camera manager with auto-reconnect""" def __init__(self, camera_index: int = 0, width: int = 1920, height: int = 1080): self.camera_index = camera_index self.width = width self.height = height self.camera: Optional[cv2.VideoCapture] = None self.lock = threading.Lock() def _open_camera(self) -> bool: """Open camera connection""" try: self.camera = cv2.VideoCapture(self.camera_index) if not self.camera.isOpened(): return False # Set camera resolution self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.width) self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height) # Set camera properties for better performance self.camera.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Reduce buffer to get latest frame # Log actual resolution (camera may not support requested resolution) actual_width = int(self.camera.get(cv2.CAP_PROP_FRAME_WIDTH)) actual_height = int(self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT)) print(f"Camera resolution: {actual_width}x{actual_height} (requested: {self.width}x{self.height})") return True except Exception as e: print(f"Error opening camera: {e}") return False def get_snapshot(self) -> Optional[bytes]: """ Capture a snapshot from the camera. Returns: JPEG-encoded image bytes, or None if failed """ with self.lock: # Open camera if not initialized or closed if self.camera is None or not self.camera.isOpened(): if not self._open_camera(): return None # Flush buffer to get latest frame # Read and discard several frames to clear old buffered frames for _ in range(5): self.camera.grab() # Capture the latest frame ret, frame = self.camera.read() # Retry on failure if not ret: print("Failed to capture frame, attempting reconnect...") self.release() if not self._open_camera(): return None # Flush buffer again after reconnect for _ in range(5): self.camera.grab() ret, frame = self.camera.read() if not ret: return None # Encode as JPEG try: ret, buffer = cv2.imencode( '.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, JPEG_QUALITY] ) if not ret: return None return buffer.tobytes() except Exception as e: print(f"Error encoding image: {e}") return None def release(self): """Release camera resources""" if self.camera is not None: self.camera.release() self.camera = None def __del__(self): """Cleanup on deletion""" self.release() # Global camera manager camera_manager = CameraManager(CAMERA_INDEX, CAMERA_WIDTH, CAMERA_HEIGHT) def verify_api_key(api_key: str = Security(api_key_header)) -> str: """Verify API key from header""" if api_key is None or api_key != API_KEY: raise HTTPException( status_code=403, detail="Invalid or missing API key" ) return api_key @app.get("/") def root(): """Root endpoint with API info""" return { "service": "Camera Snapshot Server", "version": "1.0.0", "endpoints": { "/snapshot": "GET - Returns JPEG snapshot (requires X-API-Key header)", "/health": "GET - Health check (no auth required)" } } @app.get("/health") def health(): """Health check endpoint""" return {"status": "ok"} @app.get("/snapshot") def get_snapshot(api_key: str = Security(verify_api_key)): """ Get a snapshot from the USB camera. Requires X-API-Key header for authentication. Returns: JPEG image """ snapshot = camera_manager.get_snapshot() if snapshot is None: raise HTTPException( status_code=503, detail="Failed to capture snapshot. Check camera connection." ) return Response( content=snapshot, media_type="image/jpeg", headers={ "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0" } ) @app.on_event("shutdown") def shutdown_event(): """Cleanup on shutdown""" camera_manager.release() if __name__ == "__main__": import uvicorn # For development only - use uvicorn command for production uvicorn.run( "main:app", host="0.0.0.0", port=8443, ssl_keyfile="ssl/key.pem", ssl_certfile="ssl/cert.pem" )