Add motion detection to camera server

🔍 New features:
- motion.py: Frame differencing motion detector
- Background thread compares frames continuously
- Configurable threshold, cooldown, sensitivity
- POSTs events + snapshot to collector

📡 New endpoints:
- GET /motion/stats - Detection statistics
- POST /motion/enable - Start detection
- POST /motion/disable - Stop detection

⚙️ Configuration (in .env):
- MOTION_ENABLED: true/false
- MOTION_THRESHOLD: Pixel diff threshold
- MOTION_COOLDOWN: Seconds between events
- COLLECTOR_URL: Where to POST events

Next: Event collector in vixy-mcp 🦊
This commit is contained in:
2025-12-16 16:15:30 -06:00
parent a17c09cac1
commit 6ecdf998c1
5 changed files with 478 additions and 116 deletions

66
server/README.md Normal file
View File

@@ -0,0 +1,66 @@
# vixy-vision Server
Camera snapshot server with motion detection for Raspberry Pi.
## Quick Start
```bash
./setup.sh
sudo systemctl start vixy-vision
```
## Features
- 📷 USB camera snapshots via HTTPS API
- 🔐 API key authentication
- 🔍 Motion detection with frame differencing
- 📤 Event reporting to central collector
- 🔄 Auto-reconnect on camera failure
## Configuration
Copy `env.example` to `.env` and customize:
```bash
cp env.example .env
nano .env
```
Key settings:
| Variable | Description | Default |
|----------|-------------|---------|
| `API_KEY` | Auth key for API | (required) |
| `CAMERA_ID` | Identifier for this camera | camera |
| `MOTION_ENABLED` | Enable motion detection | false |
| `MOTION_THRESHOLD` | Sensitivity (lower = more) | 25 |
| `MOTION_COOLDOWN` | Seconds between events | 5.0 |
| `COLLECTOR_URL` | Where to POST events | (optional) |
## API Endpoints
| Endpoint | Auth | Description |
|----------|------|-------------|
| `GET /` | No | Service info |
| `GET /health` | No | Health check |
| `GET /snapshot` | Yes | JPEG snapshot |
| `GET /motion/stats` | Yes | Detection stats |
| `POST /motion/enable` | Yes | Start detection |
| `POST /motion/disable` | Yes | Stop detection |
## Motion Events
When motion is detected, the server POSTs to `COLLECTOR_URL`:
```json
{
"event": {
"timestamp": "2024-12-16T14:23:01Z",
"camera_id": "basement",
"event_type": "motion",
"confidence": 0.75,
"area_percent": 7.5
},
"snapshot": "<base64 JPEG>"
}
```

47
server/env.example Normal file
View File

@@ -0,0 +1,47 @@
# vixy-vision Server Configuration
# Copy to .env and customize
# ============ Required ============
# API Key for authentication (generate with: python3 -c 'import secrets; print(secrets.token_urlsafe(32))')
API_KEY=your-secret-key-here
# Camera identifier (used in events)
CAMERA_ID=basement
# ============ Camera Settings ============
# Camera device index (0 = first USB camera)
CAMERA_INDEX=0
# Resolution (camera will use closest supported)
CAMERA_WIDTH=1920
CAMERA_HEIGHT=1080
# JPEG quality (1-100)
JPEG_QUALITY=85
# ============ Motion Detection ============
# Enable motion detection (true/false)
MOTION_ENABLED=true
# Pixel difference threshold (lower = more sensitive)
MOTION_THRESHOLD=25
# Minimum % of frame that must change to trigger event
MOTION_MIN_AREA=0.5
# Seconds between motion events (prevents spam)
MOTION_COOLDOWN=5.0
# Seconds between frame checks
MOTION_INTERVAL=0.5
# ============ Event Collector ============
# URL to POST motion events to (on Mac mini)
COLLECTOR_URL=http://192.168.1.50:8780/events
# API key for collector (optional)
COLLECTOR_API_KEY=

View File

@@ -1,24 +1,25 @@
#!/usr/bin/env python3
"""
Camera Snapshot Server
vixy-vision Camera Server
Simple FastAPI server that serves snapshots from a USB camera.
FastAPI server that serves snapshots from a USB camera with motion detection.
Features:
- API key authentication
- HTTPS support
- Thread-safe camera access
- Auto-reconnect on camera failure
- Motion detection with event reporting
"""
import os
import cv2
import threading
import secrets
from typing import Optional
from dotenv import load_dotenv
from fastapi import FastAPI, Security, HTTPException, Response
from fastapi.security import APIKeyHeader
from fastapi.responses import JSONResponse
from motion import MotionDetector
# Load environment variables
load_dotenv()
@@ -30,17 +31,26 @@ CAMERA_WIDTH = int(os.getenv("CAMERA_WIDTH", "1920"))
CAMERA_HEIGHT = int(os.getenv("CAMERA_HEIGHT", "1080"))
JPEG_QUALITY = int(os.getenv("JPEG_QUALITY", "85"))
# Motion detection config
MOTION_ENABLED = os.getenv("MOTION_ENABLED", "false").lower() == "true"
MOTION_THRESHOLD = int(os.getenv("MOTION_THRESHOLD", "25"))
MOTION_MIN_AREA = float(os.getenv("MOTION_MIN_AREA", "0.5"))
MOTION_COOLDOWN = float(os.getenv("MOTION_COOLDOWN", "5.0"))
MOTION_INTERVAL = float(os.getenv("MOTION_INTERVAL", "0.5"))
COLLECTOR_URL = os.getenv("COLLECTOR_URL", "")
COLLECTOR_API_KEY = os.getenv("COLLECTOR_API_KEY", "")
CAMERA_ID = os.getenv("CAMERA_ID", "camera")
if not API_KEY:
raise ValueError("API_KEY not set in .env file. Generate one with: python3 -c 'import secrets; print(secrets.token_urlsafe(32))'")
raise ValueError("API_KEY not set in .env file")
# FastAPI app
app = FastAPI(
title="Camera Snapshot Server",
description="Serves snapshots from USB camera with API key authentication",
version="1.0.0"
title="vixy-vision Camera Server",
description="Camera snapshots + motion detection for the fox 🦊",
version="2.0.0"
)
# API Key authentication
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
@@ -53,6 +63,7 @@ class CameraManager:
self.height = height
self.camera: Optional[cv2.VideoCapture] = None
self.lock = threading.Lock()
self._last_frame = None
def _open_camera(self) -> bool:
"""Open camera connection"""
@@ -61,156 +72,173 @@ class CameraManager:
if not self.camera.isOpened():
return False
# Set camera resolution
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
self.camera.set(cv2.CAP_PROP_BUFFERSIZE, 1)
# Set camera properties for better performance
self.camera.set(cv2.CAP_PROP_BUFFERSIZE, 1) # Reduce buffer to get latest frame
# Log actual resolution (camera may not support requested resolution)
actual_width = int(self.camera.get(cv2.CAP_PROP_FRAME_WIDTH))
actual_height = int(self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(f"Camera resolution: {actual_width}x{actual_height} (requested: {self.width}x{self.height})")
print(f"Camera: {actual_width}x{actual_height}")
return True
except Exception as e:
print(f"Error opening camera: {e}")
return False
def get_snapshot(self) -> Optional[bytes]:
"""
Capture a snapshot from the camera.
Returns:
JPEG-encoded image bytes, or None if failed
"""
def get_raw_frame(self):
"""Get raw frame as numpy array (for motion detection)"""
with self.lock:
# Open camera if not initialized or closed
if self.camera is None or not self.camera.isOpened():
if not self._open_camera():
return None
# Flush buffer to get latest frame
# Read and discard several frames to clear old buffered frames
for _ in range(5):
for _ in range(3):
self.camera.grab()
# Capture the latest frame
ret, frame = self.camera.read()
if ret:
self._last_frame = frame
return frame
return None
# Retry on failure
if not ret:
print("Failed to capture frame, attempting reconnect...")
self.release()
if not self._open_camera():
return None
# Flush buffer again after reconnect
for _ in range(5):
self.camera.grab()
ret, frame = self.camera.read()
def get_snapshot(self) -> Optional[bytes]:
"""Capture snapshot as JPEG bytes"""
frame = self.get_raw_frame()
if frame is None:
return None
if not ret:
return None
# Encode as JPEG
try:
ret, buffer = cv2.imencode(
'.jpg',
frame,
[cv2.IMWRITE_JPEG_QUALITY, JPEG_QUALITY]
)
if not ret:
return None
return buffer.tobytes()
except Exception as e:
print(f"Error encoding image: {e}")
return None
try:
ret, buffer = cv2.imencode(
'.jpg', frame,
[cv2.IMWRITE_JPEG_QUALITY, JPEG_QUALITY]
)
return buffer.tobytes() if ret else None
except Exception as e:
print(f"Error encoding: {e}")
return None
def release(self):
"""Release camera resources"""
"""Release camera"""
if self.camera is not None:
self.camera.release()
self.camera = None
def __del__(self):
"""Cleanup on deletion"""
self.release()
# Global camera manager
# Initialize camera
camera_manager = CameraManager(CAMERA_INDEX, CAMERA_WIDTH, CAMERA_HEIGHT)
# Initialize motion detector (if enabled)
motion_detector: Optional[MotionDetector] = None
if MOTION_ENABLED:
motion_detector = MotionDetector(
camera_id=CAMERA_ID,
collector_url=COLLECTOR_URL if COLLECTOR_URL else None,
collector_api_key=COLLECTOR_API_KEY if COLLECTOR_API_KEY else None,
threshold=MOTION_THRESHOLD,
min_area_percent=MOTION_MIN_AREA,
cooldown_seconds=MOTION_COOLDOWN,
check_interval=MOTION_INTERVAL,
)
def verify_api_key(api_key: str = Security(api_key_header)) -> str:
"""Verify API key from header"""
if api_key is None or api_key != API_KEY:
raise HTTPException(
status_code=403,
detail="Invalid or missing API key"
)
raise HTTPException(status_code=403, detail="Invalid API key")
return api_key
@app.get("/")
def root():
"""Root endpoint with API info"""
return {
"service": "Camera Snapshot Server",
"version": "1.0.0",
"endpoints": {
"/snapshot": "GET - Returns JPEG snapshot (requires X-API-Key header)",
"/health": "GET - Health check (no auth required)"
}
}
@app.get("/health")
def health():
"""Health check endpoint"""
return {"status": "ok"}
@app.get("/snapshot")
def get_snapshot(api_key: str = Security(verify_api_key)):
"""
Get a snapshot from the USB camera.
Requires X-API-Key header for authentication.
Returns:
JPEG image
"""
snapshot = camera_manager.get_snapshot()
if snapshot is None:
raise HTTPException(
status_code=503,
detail="Failed to capture snapshot. Check camera connection."
)
return Response(
content=snapshot,
media_type="image/jpeg",
headers={
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
"Expires": "0"
}
)
@app.on_event("startup")
def startup_event():
"""Start motion detection on server startup"""
if motion_detector:
motion_detector.start(camera_manager.get_raw_frame)
print(f"🦊 Motion detection enabled (camera: {CAMERA_ID})")
@app.on_event("shutdown")
def shutdown_event():
"""Cleanup on shutdown"""
if motion_detector:
motion_detector.stop()
camera_manager.release()
@app.get("/")
def root():
return {
"service": "vixy-vision Camera Server",
"version": "2.0.0",
"camera_id": CAMERA_ID,
"motion_enabled": MOTION_ENABLED,
}
@app.get("/health")
def health():
return {"status": "ok", "camera_id": CAMERA_ID}
@app.get("/snapshot")
def get_snapshot(api_key: str = Security(verify_api_key)):
"""Get JPEG snapshot"""
snapshot = camera_manager.get_snapshot()
if snapshot is None:
raise HTTPException(status_code=503, detail="Camera unavailable")
return Response(
content=snapshot,
media_type="image/jpeg",
headers={"Cache-Control": "no-cache"}
)
@app.get("/motion/stats")
def get_motion_stats(api_key: str = Security(verify_api_key)):
"""Get motion detection statistics"""
if not motion_detector:
return {"enabled": False, "message": "Motion detection not enabled"}
return {
"enabled": True,
**motion_detector.get_stats()
}
@app.post("/motion/enable")
def enable_motion(api_key: str = Security(verify_api_key)):
"""Enable motion detection"""
global motion_detector
if motion_detector and motion_detector._running:
return {"status": "already running"}
if not motion_detector:
motion_detector = MotionDetector(
camera_id=CAMERA_ID,
collector_url=COLLECTOR_URL if COLLECTOR_URL else None,
threshold=MOTION_THRESHOLD,
min_area_percent=MOTION_MIN_AREA,
cooldown_seconds=MOTION_COOLDOWN,
)
motion_detector.start(camera_manager.get_raw_frame)
return {"status": "started"}
@app.post("/motion/disable")
def disable_motion(api_key: str = Security(verify_api_key)):
"""Disable motion detection"""
if motion_detector:
motion_detector.stop()
return {"status": "stopped"}
return {"status": "not running"}
if __name__ == "__main__":
import uvicorn
# For development only - use uvicorn command for production
uvicorn.run(
"main:app",
host="0.0.0.0",

218
server/motion.py Normal file
View File

@@ -0,0 +1,218 @@
#!/usr/bin/env python3
"""
Motion Detection Module
Simple frame-differencing motion detection with event reporting.
Runs as background thread, POSTs events to collector on Mac mini.
"""
import os
import cv2
import time
import threading
import logging
import httpx
import base64
from datetime import datetime
from typing import Optional, Callable
from dataclasses import dataclass, asdict
from pathlib import Path
logger = logging.getLogger(__name__)
@dataclass
class MotionEvent:
"""Motion detection event"""
timestamp: str
camera_id: str
event_type: str = "motion"
confidence: float = 0.0
region: str = "full" # Could be "left", "right", "center" etc.
area_percent: float = 0.0 # % of frame with motion
class MotionDetector:
"""
Background motion detection with event reporting.
Uses frame differencing to detect motion and reports
events to a collector endpoint.
"""
def __init__(
self,
camera_id: str,
collector_url: Optional[str] = None,
collector_api_key: Optional[str] = None,
threshold: int = 25, # Pixel difference threshold
min_area_percent: float = 0.5, # Minimum % of frame to trigger
cooldown_seconds: float = 5.0, # Seconds between events
check_interval: float = 0.5, # Seconds between frame checks
):
self.camera_id = camera_id
self.collector_url = collector_url
self.collector_api_key = collector_api_key
self.threshold = threshold
self.min_area_percent = min_area_percent
self.cooldown_seconds = cooldown_seconds
self.check_interval = check_interval
self._previous_frame: Optional[any] = None
self._last_event_time: float = 0
self._running = False
self._thread: Optional[threading.Thread] = None
self._get_frame: Optional[Callable] = None
# Stats
self.events_detected = 0
self.events_reported = 0
self.last_event: Optional[MotionEvent] = None
def start(self, get_frame_func: Callable):
"""
Start motion detection in background thread.
Args:
get_frame_func: Function that returns current frame as numpy array
"""
if self._running:
logger.warning("Motion detector already running")
return
self._get_frame = get_frame_func
self._running = True
self._thread = threading.Thread(target=self._detection_loop, daemon=True)
self._thread.start()
logger.info(f"Motion detection started (threshold={self.threshold}, cooldown={self.cooldown_seconds}s)")
def stop(self):
"""Stop motion detection"""
self._running = False
if self._thread:
self._thread.join(timeout=2.0)
logger.info("Motion detection stopped")
def _detection_loop(self):
"""Main detection loop - runs in background thread"""
while self._running:
try:
self._check_for_motion()
except Exception as e:
logger.error(f"Motion detection error: {e}")
time.sleep(self.check_interval)
def _check_for_motion(self):
"""Check current frame for motion"""
if not self._get_frame:
return
# Get current frame
frame = self._get_frame()
if frame is None:
return
# Convert to grayscale for comparison
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray = cv2.GaussianBlur(gray, (21, 21), 0)
# Need previous frame to compare
if self._previous_frame is None:
self._previous_frame = gray
return
# Compute difference
frame_delta = cv2.absdiff(self._previous_frame, gray)
thresh = cv2.threshold(frame_delta, self.threshold, 255, cv2.THRESH_BINARY)[1]
# Dilate to fill gaps
thresh = cv2.dilate(thresh, None, iterations=2)
# Calculate motion area percentage
motion_pixels = cv2.countNonZero(thresh)
total_pixels = thresh.shape[0] * thresh.shape[1]
area_percent = (motion_pixels / total_pixels) * 100
# Update previous frame
self._previous_frame = gray
# Check if motion exceeds threshold
if area_percent >= self.min_area_percent:
self._handle_motion(frame, area_percent)
def _handle_motion(self, frame, area_percent: float):
"""Handle detected motion"""
now = time.time()
# Check cooldown
if now - self._last_event_time < self.cooldown_seconds:
return
self._last_event_time = now
self.events_detected += 1
# Create event
event = MotionEvent(
timestamp=datetime.utcnow().isoformat() + "Z",
camera_id=self.camera_id,
confidence=min(area_percent / 10.0, 1.0), # Normalize to 0-1
area_percent=round(area_percent, 2),
)
self.last_event = event
logger.info(f"Motion detected: {area_percent:.1f}% of frame (confidence: {event.confidence:.2f})")
# Report to collector
if self.collector_url:
self._report_event(event, frame)
def _report_event(self, event: MotionEvent, frame):
"""POST event to collector endpoint"""
try:
# Encode frame as JPEG
_, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
snapshot_b64 = base64.b64encode(buffer.tobytes()).decode('utf-8')
# Build payload
payload = {
"event": asdict(event),
"snapshot": snapshot_b64,
}
# POST to collector
headers = {"Content-Type": "application/json"}
if self.collector_api_key:
headers["X-API-Key"] = self.collector_api_key
# Use sync client (we're in a thread)
with httpx.Client(timeout=5.0, verify=False) as client:
response = client.post(
self.collector_url,
json=payload,
headers=headers,
)
if response.status_code == 200:
self.events_reported += 1
logger.info(f"Event reported to collector ({self.events_reported} total)")
else:
logger.warning(f"Collector returned {response.status_code}: {response.text[:100]}")
except Exception as e:
logger.error(f"Failed to report event: {e}")
def get_stats(self) -> dict:
"""Get detection statistics"""
return {
"running": self._running,
"events_detected": self.events_detected,
"events_reported": self.events_reported,
"last_event": asdict(self.last_event) if self.last_event else None,
"config": {
"threshold": self.threshold,
"min_area_percent": self.min_area_percent,
"cooldown_seconds": self.cooldown_seconds,
"collector_url": self.collector_url,
}
}

View File

@@ -1,11 +1,14 @@
# Camera Snapshot Server Dependencies
# vixy-vision Server Requirements
# Web framework
fastapi>=0.104.0
uvicorn[standard]>=0.24.0
fastapi>=0.100.0
uvicorn[standard]>=0.22.0
# Camera access
# Camera / CV
opencv-python-headless>=4.8.0
# Configuration
# Config
python-dotenv>=1.0.0
# HTTP client (for posting events to collector)
httpx>=0.24.0