#!/usr/bin/env python3 """ vixy-vision Camera Server v3.1 - Multi-Camera with Release-After-Use For Pi with V4L2 limitations - releases camera after each snapshot to allow multiple cameras to work. """ import os import cv2 import json import threading from typing import Optional, Dict from dotenv import load_dotenv from fastapi import FastAPI, Security, HTTPException, Response, Path from fastapi.security import APIKeyHeader load_dotenv() # Configuration API_KEY = os.getenv("API_KEY") JPEG_QUALITY = int(os.getenv("JPEG_QUALITY", "85")) CAMERAS_CONFIG = os.getenv("CAMERAS", '{"camera": 0}') DEFAULT_WIDTH = int(os.getenv("CAMERA_WIDTH", "1920")) DEFAULT_HEIGHT = int(os.getenv("CAMERA_HEIGHT", "1080")) if not API_KEY: raise ValueError("API_KEY not set in .env file") CAMERAS: Dict[str, int] = json.loads(CAMERAS_CONFIG) app = FastAPI( title="vixy-vision Camera Server", description="Multi-camera snapshots for the fox 🦊 (release-after-use mode)", version="3.1.0" ) api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) # Global lock to prevent simultaneous camera access on Pi camera_lock = threading.Lock() def capture_snapshot(device_index: int, width: int, height: int) -> Optional[bytes]: """ Capture a single snapshot, then release the camera. Uses global lock to prevent V4L2 conflicts. """ with camera_lock: cap = None try: cap = cv2.VideoCapture(device_index) if not cap.isOpened(): print(f"Failed to open /dev/video{device_index}") return None cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Grab a few frames to get fresh image for _ in range(3): cap.grab() ret, frame = cap.read() if not ret or frame is None: print(f"Failed to read from /dev/video{device_index}") return None ret, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, JPEG_QUALITY]) return buffer.tobytes() if ret else None except Exception as e: print(f"Error capturing from /dev/video{device_index}: {e}") return None finally: if cap is not None: cap.release() def verify_api_key(api_key: str = Security(api_key_header)) -> str: if api_key != API_KEY: raise HTTPException(status_code=403, detail="Invalid API key") return api_key # Print registered cameras on startup for cam_id, dev_idx in CAMERAS.items(): print(f"🦊 Registered: {cam_id} -> /dev/video{dev_idx}") DEFAULT_CAM = list(CAMERAS.keys())[0] if CAMERAS else None @app.get("/") def root(): return { "service": "vixy-vision Camera Server", "version": "3.1.0", "mode": "release-after-use", "cameras": list(CAMERAS.keys()), } @app.get("/health") def health(): return {"status": "ok", "cameras": list(CAMERAS.keys())} @app.get("/snapshot") def snapshot_default(api_key: str = Security(verify_api_key)): """Snapshot from default camera""" if not DEFAULT_CAM: raise HTTPException(status_code=503, detail="No cameras configured") return snapshot_by_id(DEFAULT_CAM, api_key) @app.get("/snapshot/{cam_id}") def snapshot_by_id(cam_id: str, api_key: str = Security(verify_api_key)): """Snapshot from specific camera""" if cam_id not in CAMERAS: raise HTTPException(status_code=404, detail=f"Camera '{cam_id}' not found") device_index = CAMERAS[cam_id] snapshot = capture_snapshot(device_index, DEFAULT_WIDTH, DEFAULT_HEIGHT) if snapshot is None: raise HTTPException(status_code=503, detail=f"Camera '{cam_id}' unavailable") return Response( content=snapshot, media_type="image/jpeg", headers={"Cache-Control": "no-cache", "X-Camera-ID": cam_id} ) if __name__ == "__main__": import uvicorn uvicorn.run("main_multi:app", host="0.0.0.0", port=8443, ssl_keyfile="ssl/key.pem", ssl_certfile="ssl/cert.pem")