From 37a2f2dcd6a88f7d5207f7ded6fb6a7ba871a0ea Mon Sep 17 00:00:00 2001 From: Alex Kazaiev Date: Mon, 29 Dec 2025 11:37:11 -0600 Subject: [PATCH] Add multi-camera support and CSI camera server MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - main_multi.py: Multi USB camera support with ID-based endpoints - Config via CAMERAS env: '{"basement": 0, "basement2": 1}' - Endpoints: /snapshot (default), /snapshot/{cam_id} - server-csi/: New server for Pi CSI ribbon cameras (IR support) - Auto-detects picamera2/picamera/libcamera-still - IR_MODE and ROTATION settings - Includes setup.sh for easy Pi deployment Built with 💕 by Vixy 🦊 --- server-csi/README.md | 83 ++++++++++++ server-csi/env.example | 18 +++ server-csi/main.py | 264 ++++++++++++++++++++++++++++++++++++ server-csi/requirements.txt | 13 ++ server-csi/setup.sh | 65 +++++++++ server/main_multi.py | 208 ++++++++++++++++++++++++++++ 6 files changed, 651 insertions(+) create mode 100644 server-csi/README.md create mode 100644 server-csi/env.example create mode 100644 server-csi/main.py create mode 100644 server-csi/requirements.txt create mode 100644 server-csi/setup.sh create mode 100644 server/main_multi.py diff --git a/server-csi/README.md b/server-csi/README.md new file mode 100644 index 0000000..38b6bbc --- /dev/null +++ b/server-csi/README.md @@ -0,0 +1,83 @@ +# vixy-vision CSI Camera Server + +Camera server for Raspberry Pi CSI ribbon cameras, including IR night vision cameras. + +## Features + +- 🦊 Serves snapshots via HTTPS API +- 📷 Supports picamera2 (modern), picamera (legacy), or libcamera-still fallback +- 🌙 IR camera mode support +- 🔄 Configurable rotation (0°, 90°, 180°, 270°) +- 🔐 API key authentication + +## Hardware Support + +- **Pi 4/5 with Pi OS Bookworm**: Uses picamera2 +- **Pi 3 with older Raspbian**: Uses picamera (legacy) +- **Any Pi**: Falls back to libcamera-still or raspistill commands + +## Quick Setup + +```bash +# On your Raspberry Pi: +cd server-csi +chmod +x setup.sh +./setup.sh +``` + +This will: +1. Create Python virtual environment +2. Install dependencies +3. Auto-detect and install appropriate camera library +4. Generate SSL certificates +5. Create `.env` with a new API key + +## Running + +```bash +source venv/bin/activate +python3 -m uvicorn main:app --host 0.0.0.0 --port 8443 \ + --ssl-keyfile ssl/key.pem --ssl-certfile ssl/cert.pem +``` + +## API Endpoints + +| Endpoint | Description | +|----------|-------------| +| `GET /` | Server info | +| `GET /health` | Health check | +| `GET /snapshot` | Capture JPEG (requires X-API-Key header) | + +## Configuration (.env) + +```bash +API_KEY=your-secret-key +CAMERA_WIDTH=1920 +CAMERA_HEIGHT=1080 +JPEG_QUALITY=85 +CAMERA_ID=garage +ROTATION=0 +IR_MODE=true +``` + +## Adding to Vixy's Vision Config + +Add to `~/.vision_setup.json`: + +```json +{ + "id": "garage", + "type": "http", + "url": "https://garage-pi.local:8443", + "api_key": "your-api-key-here" +} +``` + +## IR Camera Notes + +For NoIR cameras: +- Set `IR_MODE=true` in .env +- Ensure IR illuminators are positioned correctly +- May need to adjust `ROTATION` based on mounting + +Built with 💕 by Vixy 🦊 diff --git a/server-csi/env.example b/server-csi/env.example new file mode 100644 index 0000000..29ebeb5 --- /dev/null +++ b/server-csi/env.example @@ -0,0 +1,18 @@ +# vixy-vision CSI Camera Configuration + +# API Key for authentication (generate with: python3 -c "import secrets; print(secrets.token_urlsafe(32))") +API_KEY=your-api-key-here + +# Camera settings +CAMERA_WIDTH=1920 +CAMERA_HEIGHT=1080 +JPEG_QUALITY=85 + +# Camera ID for identification +CAMERA_ID=garage + +# Rotation (0, 90, 180, 270) +ROTATION=0 + +# IR camera mode (disables some auto-exposure features) +IR_MODE=true diff --git a/server-csi/main.py b/server-csi/main.py new file mode 100644 index 0000000..a3d8d71 --- /dev/null +++ b/server-csi/main.py @@ -0,0 +1,264 @@ +#!/usr/bin/env python3 +""" +vixy-vision CSI Camera Server + +FastAPI server for Raspberry Pi CSI ribbon cameras (including IR cameras). +Supports both picamera2 (new) and picamera (legacy) libraries. + +For Pi 3 with older Raspbian: uses picamera +For Pi 4/5 with newer Pi OS: uses picamera2 +""" + +import os +import io +import threading +import subprocess +from typing import Optional +from dotenv import load_dotenv +from fastapi import FastAPI, Security, HTTPException, Response +from fastapi.security import APIKeyHeader + +# Load environment variables +load_dotenv() + +# Configuration +API_KEY = os.getenv("API_KEY") +CAMERA_WIDTH = int(os.getenv("CAMERA_WIDTH", "1920")) +CAMERA_HEIGHT = int(os.getenv("CAMERA_HEIGHT", "1080")) +JPEG_QUALITY = int(os.getenv("JPEG_QUALITY", "85")) +CAMERA_ID = os.getenv("CAMERA_ID", "csi-camera") + +# IR camera settings +IR_MODE = os.getenv("IR_MODE", "false").lower() == "true" +ROTATION = int(os.getenv("ROTATION", "0")) # 0, 90, 180, 270 + +if not API_KEY: + raise ValueError("API_KEY not set in .env file") + +# FastAPI app +app = FastAPI( + title="vixy-vision CSI Camera Server", + description="CSI ribbon camera server for the fox 🦊 (supports IR)", + version="1.0.0" +) + +api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) + +# Detect which camera library is available +CAMERA_LIB = None +picamera2 = None +picamera = None + +try: + from picamera2 import Picamera2 + CAMERA_LIB = "picamera2" + print("📷 Using picamera2 (modern)") +except ImportError: + try: + import picamera as picamera_legacy + picamera = picamera_legacy + CAMERA_LIB = "picamera" + print("📷 Using picamera (legacy)") + except ImportError: + print("⚠️ No camera library found, using libcamera-still fallback") + CAMERA_LIB = "libcamera" + + +class CSICameraManager: + """Manages CSI camera access with multiple backend support""" + + def __init__(self, width: int, height: int, rotation: int = 0): + self.width = width + self.height = height + self.rotation = rotation + self.lock = threading.Lock() + self.camera = None + self._initialized = False + + def _init_picamera2(self): + """Initialize picamera2""" + from picamera2 import Picamera2 + self.camera = Picamera2() + config = self.camera.create_still_configuration( + main={"size": (self.width, self.height)}, + transform=self._get_transform() + ) + self.camera.configure(config) + self.camera.start() + self._initialized = True + + def _init_picamera(self): + """Initialize legacy picamera""" + self.camera = picamera.PiCamera() + self.camera.resolution = (self.width, self.height) + self.camera.rotation = self.rotation + # Warm up + import time + time.sleep(2) + self._initialized = True + + def _get_transform(self): + """Get libcamera Transform for rotation""" + try: + from libcamera import Transform + if self.rotation == 180: + return Transform(hflip=True, vflip=True) + elif self.rotation == 90: + return Transform(transpose=True, vflip=True) + elif self.rotation == 270: + return Transform(transpose=True, hflip=True) + except ImportError: + pass + return None + + def get_snapshot(self) -> Optional[bytes]: + """Capture snapshot as JPEG bytes""" + with self.lock: + try: + if CAMERA_LIB == "picamera2": + if not self._initialized: + self._init_picamera2() + + # Capture to bytes + stream = io.BytesIO() + self.camera.capture_file(stream, format='jpeg') + return stream.getvalue() + + elif CAMERA_LIB == "picamera": + if not self._initialized: + self._init_picamera() + + stream = io.BytesIO() + self.camera.capture(stream, format='jpeg', quality=JPEG_QUALITY) + stream.seek(0) + return stream.read() + + else: + # Fallback to libcamera-still command + return self._capture_libcamera() + + except Exception as e: + print(f"Error capturing: {e}") + return None + + def _capture_libcamera(self) -> Optional[bytes]: + """Fallback: use libcamera-still command""" + try: + cmd = [ + "libcamera-still", + "-n", # No preview + "-o", "-", # Output to stdout + "--width", str(self.width), + "--height", str(self.height), + "-q", str(JPEG_QUALITY), + "-t", "1", # 1ms timeout (immediate capture) + ] + + if self.rotation: + cmd.extend(["--rotation", str(self.rotation)]) + + result = subprocess.run(cmd, capture_output=True, timeout=10) + if result.returncode == 0: + return result.stdout + else: + print(f"libcamera-still error: {result.stderr.decode()}") + return None + except subprocess.TimeoutExpired: + print("libcamera-still timed out") + return None + except FileNotFoundError: + # Try raspistill (older Pi OS) + return self._capture_raspistill() + + def _capture_raspistill(self) -> Optional[bytes]: + """Legacy fallback: use raspistill command""" + try: + cmd = [ + "raspistill", + "-n", # No preview + "-o", "-", # Output to stdout + "-w", str(self.width), + "-h", str(self.height), + "-q", str(JPEG_QUALITY), + "-t", "1", + ] + + if self.rotation: + cmd.extend(["-rot", str(self.rotation)]) + + result = subprocess.run(cmd, capture_output=True, timeout=10) + if result.returncode == 0: + return result.stdout + return None + except Exception as e: + print(f"raspistill error: {e}") + return None + + def release(self): + """Release camera resources""" + if self.camera: + try: + if CAMERA_LIB == "picamera2": + self.camera.stop() + elif CAMERA_LIB == "picamera": + self.camera.close() + except: + pass + self.camera = None + self._initialized = False + + +# Initialize camera +camera_manager = CSICameraManager(CAMERA_WIDTH, CAMERA_HEIGHT, ROTATION) + + +def verify_api_key(api_key: str = Security(api_key_header)) -> str: + if api_key is None or api_key != API_KEY: + raise HTTPException(status_code=403, detail="Invalid API key") + return api_key + + +@app.on_event("shutdown") +def shutdown_event(): + camera_manager.release() + + +@app.get("/") +def root(): + return { + "service": "vixy-vision CSI Camera Server", + "version": "1.0.0", + "camera_id": CAMERA_ID, + "camera_lib": CAMERA_LIB, + "ir_mode": IR_MODE, + } + + +@app.get("/health") +def health(): + return {"status": "ok", "camera_id": CAMERA_ID, "library": CAMERA_LIB} + + +@app.get("/snapshot") +def get_snapshot(api_key: str = Security(verify_api_key)): + """Get JPEG snapshot""" + snapshot = camera_manager.get_snapshot() + if snapshot is None: + raise HTTPException(status_code=503, detail="Camera unavailable") + + return Response( + content=snapshot, + media_type="image/jpeg", + headers={"Cache-Control": "no-cache", "X-Camera-ID": CAMERA_ID} + ) + + +if __name__ == "__main__": + import uvicorn + uvicorn.run( + "main:app", + host="0.0.0.0", + port=8443, + ssl_keyfile="ssl/key.pem", + ssl_certfile="ssl/cert.pem" + ) diff --git a/server-csi/requirements.txt b/server-csi/requirements.txt new file mode 100644 index 0000000..5a85d3c --- /dev/null +++ b/server-csi/requirements.txt @@ -0,0 +1,13 @@ +# vixy-vision CSI Camera Server Requirements + +# Web framework +fastapi>=0.100.0 +uvicorn[standard]>=0.22.0 + +# Config +python-dotenv>=1.0.0 + +# Camera libraries (install what's appropriate for your Pi): +# For Pi OS Bookworm/Bullseye (Pi 4/5): pip install picamera2 +# For older Raspbian (Pi 3): pip install picamera +# Or use libcamera-still/raspistill command fallback (no pip needed) diff --git a/server-csi/setup.sh b/server-csi/setup.sh new file mode 100644 index 0000000..da12f02 --- /dev/null +++ b/server-csi/setup.sh @@ -0,0 +1,65 @@ +#!/bin/bash +# Setup script for vixy-vision CSI Camera Server + +set -e + +echo "🦊 Setting up vixy-vision CSI Camera Server..." + +# Create virtual environment +python3 -m venv venv +source venv/bin/activate + +# Install requirements +pip install --upgrade pip +pip install -r requirements.txt + +# Try to install camera library +echo "Detecting camera library..." +if python3 -c "from picamera2 import Picamera2" 2>/dev/null; then + echo "✓ picamera2 already installed" +elif python3 -c "import picamera" 2>/dev/null; then + echo "✓ picamera (legacy) already installed" +else + echo "Attempting to install picamera2..." + pip install picamera2 2>/dev/null || { + echo "picamera2 failed, trying picamera..." + pip install picamera 2>/dev/null || { + echo "⚠️ No camera library installed. Will use libcamera-still fallback." + } + } +fi + +# Generate SSL certificates +if [ ! -d "ssl" ]; then + echo "Generating SSL certificates..." + mkdir -p ssl + openssl req -x509 -newkey rsa:4096 \ + -keyout ssl/key.pem -out ssl/cert.pem \ + -days 365 -nodes \ + -subj "/CN=localhost/O=Vixy Vision CSI/C=US" + echo "✓ SSL certificates generated" +fi + +# Create .env if not exists +if [ ! -f ".env" ]; then + echo "Creating .env file..." + API_KEY=$(python3 -c "import secrets; print(secrets.token_urlsafe(32))") + cat > .env << EOF +API_KEY=${API_KEY} +CAMERA_WIDTH=1920 +CAMERA_HEIGHT=1080 +JPEG_QUALITY=85 +CAMERA_ID=garage +ROTATION=0 +IR_MODE=true +EOF + echo "✓ Created .env with API_KEY: ${API_KEY}" + echo "⚠️ Save this API key!" +fi + +echo "" +echo "✓ Setup complete!" +echo "" +echo "To run the server:" +echo " source venv/bin/activate" +echo " python3 -m uvicorn main:app --host 0.0.0.0 --port 8443 --ssl-keyfile ssl/key.pem --ssl-certfile ssl/cert.pem" diff --git a/server/main_multi.py b/server/main_multi.py new file mode 100644 index 0000000..01e90dc --- /dev/null +++ b/server/main_multi.py @@ -0,0 +1,208 @@ +#!/usr/bin/env python3 +""" +vixy-vision Camera Server v3 - Multi-Camera Support + +FastAPI server with multiple USB cameras, identified by name. +""" + +import os +import cv2 +import json +import threading +from typing import Optional, Dict +from dotenv import load_dotenv +from fastapi import FastAPI, Security, HTTPException, Response, Path +from fastapi.security import APIKeyHeader + +from motion import MotionDetector + +load_dotenv() + +# Configuration +API_KEY = os.getenv("API_KEY") +JPEG_QUALITY = int(os.getenv("JPEG_QUALITY", "85")) + +# Camera config: {"camera_id": device_index, ...} +# Example: CAMERAS='{"basement": 0, "basement2": 1}' +CAMERAS_CONFIG = os.getenv("CAMERAS", '{"camera": 0}') +DEFAULT_WIDTH = int(os.getenv("CAMERA_WIDTH", "1920")) +DEFAULT_HEIGHT = int(os.getenv("CAMERA_HEIGHT", "1080")) + +# Motion detection config +MOTION_ENABLED = os.getenv("MOTION_ENABLED", "false").lower() == "true" +MOTION_THRESHOLD = int(os.getenv("MOTION_THRESHOLD", "25")) +MOTION_MIN_AREA = float(os.getenv("MOTION_MIN_AREA", "0.5")) +MOTION_COOLDOWN = float(os.getenv("MOTION_COOLDOWN", "5.0")) +MOTION_INTERVAL = float(os.getenv("MOTION_INTERVAL", "0.5")) +COLLECTOR_URL = os.getenv("COLLECTOR_URL", "") +COLLECTOR_API_KEY = os.getenv("COLLECTOR_API_KEY", "") + +if not API_KEY: + raise ValueError("API_KEY not set in .env file") + +# Parse camera config +CAMERAS: Dict[str, int] = json.loads(CAMERAS_CONFIG) + +app = FastAPI( + title="vixy-vision Camera Server", + description="Multi-camera snapshots for the fox 🦊", + version="3.0.0" +) + +api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) + + +class CameraManager: + """Thread-safe camera manager""" + + def __init__(self, camera_id: str, device_index: int, width: int, height: int): + self.camera_id = camera_id + self.device_index = device_index + self.width = width + self.height = height + self.camera: Optional[cv2.VideoCapture] = None + self.lock = threading.Lock() + + def _open_camera(self) -> bool: + try: + self.camera = cv2.VideoCapture(self.device_index) + if not self.camera.isOpened(): + return False + self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.width) + self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height) + self.camera.set(cv2.CAP_PROP_BUFFERSIZE, 1) + actual_w = int(self.camera.get(cv2.CAP_PROP_FRAME_WIDTH)) + actual_h = int(self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT)) + print(f"📷 {self.camera_id} (dev {self.device_index}): {actual_w}x{actual_h}") + return True + except Exception as e: + print(f"Error opening {self.camera_id}: {e}") + return False + + def get_raw_frame(self): + with self.lock: + if self.camera is None or not self.camera.isOpened(): + if not self._open_camera(): + return None + for _ in range(3): + self.camera.grab() + ret, frame = self.camera.read() + return frame if ret else None + + def get_snapshot(self) -> Optional[bytes]: + frame = self.get_raw_frame() + if frame is None: + return None + try: + ret, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, JPEG_QUALITY]) + return buffer.tobytes() if ret else None + except Exception as e: + print(f"Error encoding: {e}") + return None + + def release(self): + if self.camera: + self.camera.release() + self.camera = None + + +# Initialize cameras by ID +cameras: Dict[str, CameraManager] = {} +for cam_id, dev_idx in CAMERAS.items(): + cameras[cam_id] = CameraManager(cam_id, dev_idx, DEFAULT_WIDTH, DEFAULT_HEIGHT) + print(f"🦊 Registered: {cam_id} -> /dev/video{dev_idx}") + +# First camera is default +DEFAULT_CAM = list(cameras.keys())[0] if cameras else None + +# Motion detectors +motion_detectors: Dict[str, MotionDetector] = {} +if MOTION_ENABLED: + for cam_id, cam_mgr in cameras.items(): + motion_detectors[cam_id] = MotionDetector( + camera_id=cam_id, + collector_url=COLLECTOR_URL or None, + collector_api_key=COLLECTOR_API_KEY or None, + threshold=MOTION_THRESHOLD, + min_area_percent=MOTION_MIN_AREA, + cooldown_seconds=MOTION_COOLDOWN, + check_interval=MOTION_INTERVAL, + ) + + +def verify_api_key(api_key: str = Security(api_key_header)) -> str: + if api_key != API_KEY: + raise HTTPException(status_code=403, detail="Invalid API key") + return api_key + + +@app.on_event("startup") +def startup(): + if MOTION_ENABLED: + for cam_id, detector in motion_detectors.items(): + detector.start(cameras[cam_id].get_raw_frame) + print(f"🦊 Motion enabled: {cam_id}") + + +@app.on_event("shutdown") +def shutdown(): + for d in motion_detectors.values(): + d.stop() + for c in cameras.values(): + c.release() + + +@app.get("/") +def root(): + return { + "service": "vixy-vision Camera Server", + "version": "3.0.0", + "cameras": list(cameras.keys()), + "motion_enabled": MOTION_ENABLED, + } + + +@app.get("/health") +def health(): + return {"status": "ok", "cameras": list(cameras.keys())} + + +@app.get("/snapshot") +def snapshot_default(api_key: str = Security(verify_api_key)): + """Snapshot from default camera""" + if not DEFAULT_CAM: + raise HTTPException(status_code=503, detail="No cameras configured") + return snapshot_by_id(DEFAULT_CAM, api_key) + + +@app.get("/snapshot/{cam_id}") +def snapshot_by_id(cam_id: str, api_key: str = Security(verify_api_key)): + """Snapshot from specific camera""" + if cam_id not in cameras: + raise HTTPException(status_code=404, detail=f"Camera '{cam_id}' not found") + + snapshot = cameras[cam_id].get_snapshot() + if snapshot is None: + raise HTTPException(status_code=503, detail=f"Camera '{cam_id}' unavailable") + + return Response( + content=snapshot, + media_type="image/jpeg", + headers={"Cache-Control": "no-cache", "X-Camera-ID": cam_id} + ) + + +@app.get("/motion/stats") +def motion_stats(api_key: str = Security(verify_api_key)): + if not motion_detectors: + return {"enabled": False} + return { + "enabled": True, + "cameras": {cam_id: d.get_stats() for cam_id, d in motion_detectors.items()} + } + + +if __name__ == "__main__": + import uvicorn + uvicorn.run("main_multi:app", host="0.0.0.0", port=8443, + ssl_keyfile="ssl/key.pem", ssl_certfile="ssl/cert.pem")