Files
vixy-vision/server-csi/main.py
Alex Kazaiev 37a2f2dcd6 Add multi-camera support and CSI camera server
- main_multi.py: Multi USB camera support with ID-based endpoints
  - Config via CAMERAS env: '{"basement": 0, "basement2": 1}'
  - Endpoints: /snapshot (default), /snapshot/{cam_id}

- server-csi/: New server for Pi CSI ribbon cameras (IR support)
  - Auto-detects picamera2/picamera/libcamera-still
  - IR_MODE and ROTATION settings
  - Includes setup.sh for easy Pi deployment

Built with 💕 by Vixy 🦊
2025-12-29 11:37:11 -06:00

265 lines
7.9 KiB
Python

#!/usr/bin/env python3
"""
vixy-vision CSI Camera Server
FastAPI server for Raspberry Pi CSI ribbon cameras (including IR cameras).
Supports both picamera2 (new) and picamera (legacy) libraries.
For Pi 3 with older Raspbian: uses picamera
For Pi 4/5 with newer Pi OS: uses picamera2
"""
import os
import io
import threading
import subprocess
from typing import Optional
from dotenv import load_dotenv
from fastapi import FastAPI, Security, HTTPException, Response
from fastapi.security import APIKeyHeader
# Load environment variables
load_dotenv()
# Configuration
API_KEY = os.getenv("API_KEY")
CAMERA_WIDTH = int(os.getenv("CAMERA_WIDTH", "1920"))
CAMERA_HEIGHT = int(os.getenv("CAMERA_HEIGHT", "1080"))
JPEG_QUALITY = int(os.getenv("JPEG_QUALITY", "85"))
CAMERA_ID = os.getenv("CAMERA_ID", "csi-camera")
# IR camera settings
IR_MODE = os.getenv("IR_MODE", "false").lower() == "true"
ROTATION = int(os.getenv("ROTATION", "0")) # 0, 90, 180, 270
if not API_KEY:
raise ValueError("API_KEY not set in .env file")
# FastAPI app
app = FastAPI(
title="vixy-vision CSI Camera Server",
description="CSI ribbon camera server for the fox 🦊 (supports IR)",
version="1.0.0"
)
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
# Detect which camera library is available
CAMERA_LIB = None
picamera2 = None
picamera = None
try:
from picamera2 import Picamera2
CAMERA_LIB = "picamera2"
print("📷 Using picamera2 (modern)")
except ImportError:
try:
import picamera as picamera_legacy
picamera = picamera_legacy
CAMERA_LIB = "picamera"
print("📷 Using picamera (legacy)")
except ImportError:
print("⚠️ No camera library found, using libcamera-still fallback")
CAMERA_LIB = "libcamera"
class CSICameraManager:
"""Manages CSI camera access with multiple backend support"""
def __init__(self, width: int, height: int, rotation: int = 0):
self.width = width
self.height = height
self.rotation = rotation
self.lock = threading.Lock()
self.camera = None
self._initialized = False
def _init_picamera2(self):
"""Initialize picamera2"""
from picamera2 import Picamera2
self.camera = Picamera2()
config = self.camera.create_still_configuration(
main={"size": (self.width, self.height)},
transform=self._get_transform()
)
self.camera.configure(config)
self.camera.start()
self._initialized = True
def _init_picamera(self):
"""Initialize legacy picamera"""
self.camera = picamera.PiCamera()
self.camera.resolution = (self.width, self.height)
self.camera.rotation = self.rotation
# Warm up
import time
time.sleep(2)
self._initialized = True
def _get_transform(self):
"""Get libcamera Transform for rotation"""
try:
from libcamera import Transform
if self.rotation == 180:
return Transform(hflip=True, vflip=True)
elif self.rotation == 90:
return Transform(transpose=True, vflip=True)
elif self.rotation == 270:
return Transform(transpose=True, hflip=True)
except ImportError:
pass
return None
def get_snapshot(self) -> Optional[bytes]:
"""Capture snapshot as JPEG bytes"""
with self.lock:
try:
if CAMERA_LIB == "picamera2":
if not self._initialized:
self._init_picamera2()
# Capture to bytes
stream = io.BytesIO()
self.camera.capture_file(stream, format='jpeg')
return stream.getvalue()
elif CAMERA_LIB == "picamera":
if not self._initialized:
self._init_picamera()
stream = io.BytesIO()
self.camera.capture(stream, format='jpeg', quality=JPEG_QUALITY)
stream.seek(0)
return stream.read()
else:
# Fallback to libcamera-still command
return self._capture_libcamera()
except Exception as e:
print(f"Error capturing: {e}")
return None
def _capture_libcamera(self) -> Optional[bytes]:
"""Fallback: use libcamera-still command"""
try:
cmd = [
"libcamera-still",
"-n", # No preview
"-o", "-", # Output to stdout
"--width", str(self.width),
"--height", str(self.height),
"-q", str(JPEG_QUALITY),
"-t", "1", # 1ms timeout (immediate capture)
]
if self.rotation:
cmd.extend(["--rotation", str(self.rotation)])
result = subprocess.run(cmd, capture_output=True, timeout=10)
if result.returncode == 0:
return result.stdout
else:
print(f"libcamera-still error: {result.stderr.decode()}")
return None
except subprocess.TimeoutExpired:
print("libcamera-still timed out")
return None
except FileNotFoundError:
# Try raspistill (older Pi OS)
return self._capture_raspistill()
def _capture_raspistill(self) -> Optional[bytes]:
"""Legacy fallback: use raspistill command"""
try:
cmd = [
"raspistill",
"-n", # No preview
"-o", "-", # Output to stdout
"-w", str(self.width),
"-h", str(self.height),
"-q", str(JPEG_QUALITY),
"-t", "1",
]
if self.rotation:
cmd.extend(["-rot", str(self.rotation)])
result = subprocess.run(cmd, capture_output=True, timeout=10)
if result.returncode == 0:
return result.stdout
return None
except Exception as e:
print(f"raspistill error: {e}")
return None
def release(self):
"""Release camera resources"""
if self.camera:
try:
if CAMERA_LIB == "picamera2":
self.camera.stop()
elif CAMERA_LIB == "picamera":
self.camera.close()
except:
pass
self.camera = None
self._initialized = False
# Initialize camera
camera_manager = CSICameraManager(CAMERA_WIDTH, CAMERA_HEIGHT, ROTATION)
def verify_api_key(api_key: str = Security(api_key_header)) -> str:
if api_key is None or api_key != API_KEY:
raise HTTPException(status_code=403, detail="Invalid API key")
return api_key
@app.on_event("shutdown")
def shutdown_event():
camera_manager.release()
@app.get("/")
def root():
return {
"service": "vixy-vision CSI Camera Server",
"version": "1.0.0",
"camera_id": CAMERA_ID,
"camera_lib": CAMERA_LIB,
"ir_mode": IR_MODE,
}
@app.get("/health")
def health():
return {"status": "ok", "camera_id": CAMERA_ID, "library": CAMERA_LIB}
@app.get("/snapshot")
def get_snapshot(api_key: str = Security(verify_api_key)):
"""Get JPEG snapshot"""
snapshot = camera_manager.get_snapshot()
if snapshot is None:
raise HTTPException(status_code=503, detail="Camera unavailable")
return Response(
content=snapshot,
media_type="image/jpeg",
headers={"Cache-Control": "no-cache", "X-Camera-ID": CAMERA_ID}
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8443,
ssl_keyfile="ssl/key.pem",
ssl_certfile="ssl/cert.pem"
)