From 844502b4a1eaaa726611ed315e6326a01044e41f Mon Sep 17 00:00:00 2001 From: Alex Kazaiev Date: Mon, 29 Dec 2025 16:18:45 -0600 Subject: [PATCH] Add release-after-use and cycling motion detection modes - main_release.py (v3.1.0): Release camera after each snapshot for V4L2 compatibility - main_cycling.py (v3.2.0): Single motion thread cycles between cameras (1s interval) - mcp/vision_mcp.py: Support custom snapshot_path for multi-camera servers Fixes Pi 3 dual-camera V4L2 conflicts by not holding cameras open. --- mcp/vision_mcp.py | 4 +- server/main_cycling.py | 259 +++++++++++++++++++++++++++++++++++++++++ server/main_release.py | 139 ++++++++++++++++++++++ 3 files changed, 401 insertions(+), 1 deletion(-) create mode 100644 server/main_cycling.py create mode 100644 server/main_release.py diff --git a/mcp/vision_mcp.py b/mcp/vision_mcp.py index 31f0209..f844bac 100644 --- a/mcp/vision_mcp.py +++ b/mcp/vision_mcp.py @@ -302,7 +302,9 @@ async def vision_snap(cam_id: str) -> Union[MCPImage, str]: if cam_type == 'http': # HTTP API camera async with httpx.AsyncClient(timeout=REQUEST_TIMEOUT, verify=False) as client: - snapshot_url = f"{cam['url'].rstrip('/')}/snapshot" + # Support custom snapshot path for multi-camera servers + snapshot_path = cam.get('snapshot_path', '/snapshot') + snapshot_url = f"{cam['url'].rstrip('/')}{snapshot_path}" headers = {"X-API-Key": cam['api_key']} logger.info(f"Requesting HTTP snapshot from '{cam_id}' at {snapshot_url}") diff --git a/server/main_cycling.py b/server/main_cycling.py new file mode 100644 index 0000000..3ed7f63 --- /dev/null +++ b/server/main_cycling.py @@ -0,0 +1,259 @@ +#!/usr/bin/env python3 +""" +vixy-vision Camera Server v3.2 - Multi-Camera with Cycling Motion Detection + +For Pi with V4L2 limitations: +- Releases camera after each snapshot +- Single motion detection thread cycles between cameras +""" + +import os +import cv2 +import json +import time +import threading +import requests +from datetime import datetime +from typing import Optional, Dict +from dotenv import load_dotenv +from fastapi import FastAPI, Security, HTTPException, Response +from fastapi.security import APIKeyHeader + +load_dotenv() + +# Configuration +API_KEY = os.getenv("API_KEY") +JPEG_QUALITY = int(os.getenv("JPEG_QUALITY", "85")) +CAMERAS_CONFIG = os.getenv("CAMERAS", '{"camera": 0}') +DEFAULT_WIDTH = int(os.getenv("CAMERA_WIDTH", "1920")) +DEFAULT_HEIGHT = int(os.getenv("CAMERA_HEIGHT", "1080")) +MOTION_ENABLED = os.getenv("MOTION_ENABLED", "false").lower() == "true" +MOTION_THRESHOLD = int(os.getenv("MOTION_THRESHOLD", "25")) +MOTION_MIN_AREA = float(os.getenv("MOTION_MIN_AREA", "0.5")) +MOTION_COOLDOWN = float(os.getenv("MOTION_COOLDOWN", "60.0")) +COLLECTOR_URL = os.getenv("COLLECTOR_URL", "") +CYCLE_INTERVAL = float(os.getenv("CYCLE_INTERVAL", "1.0")) # seconds between camera switches + +if not API_KEY: + raise ValueError("API_KEY not set") + +CAMERAS: Dict[str, int] = json.loads(CAMERAS_CONFIG) + +app = FastAPI( + title="vixy-vision Camera Server", + description="Multi-camera with cycling motion detection 🦊", + version="3.2.0" +) + +api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) +camera_lock = threading.Lock() + +# Motion detection state per camera +motion_state: Dict[str, dict] = {} +for cam_id in CAMERAS: + motion_state[cam_id] = { + "prev_frame": None, + "last_event": 0, + } + + +def capture_frame(device_index: int, width: int, height: int) -> Optional[any]: + """Capture a single frame, release camera immediately.""" + cap = None + try: + cap = cv2.VideoCapture(device_index) + if not cap.isOpened(): + return None + cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) + cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) + cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) + for _ in range(3): + cap.grab() + ret, frame = cap.read() + return frame if ret else None + except Exception as e: + print(f"Error capturing from /dev/video{device_index}: {e}") + return None + finally: + if cap is not None: + cap.release() + + +def capture_snapshot(device_index: int, width: int, height: int) -> Optional[bytes]: + """Capture snapshot as JPEG bytes.""" + with camera_lock: + frame = capture_frame(device_index, width, height) + if frame is None: + return None + ret, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, JPEG_QUALITY]) + return buffer.tobytes() if ret else None + + +def detect_motion(cam_id: str, frame) -> Optional[dict]: + """Check for motion against previous frame.""" + state = motion_state[cam_id] + + # Convert to grayscale and blur + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + gray = cv2.GaussianBlur(gray, (21, 21), 0) + + if state["prev_frame"] is None: + state["prev_frame"] = gray + return None + + # Calculate difference + delta = cv2.absdiff(state["prev_frame"], gray) + state["prev_frame"] = gray + + thresh = cv2.threshold(delta, MOTION_THRESHOLD, 255, cv2.THRESH_BINARY)[1] + thresh = cv2.dilate(thresh, None, iterations=2) + + contours, _ = cv2.findContours(thresh.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + + if not contours: + return None + + # Find largest contour + largest = max(contours, key=cv2.contourArea) + area = cv2.contourArea(largest) + frame_area = frame.shape[0] * frame.shape[1] + area_percent = (area / frame_area) * 100 + + if area_percent < MOTION_MIN_AREA: + return None + + # Check cooldown + now = time.time() + if now - state["last_event"] < MOTION_COOLDOWN: + return None + + state["last_event"] = now + + return { + "camera_id": cam_id, + "area_percent": round(area_percent, 2), + "timestamp": datetime.now().isoformat(), + } + + +def send_event(event: dict, snapshot: bytes): + """Send motion event to collector.""" + if not COLLECTOR_URL: + print(f"🔔 Motion on {event['camera_id']}: {event['area_percent']}% (no collector)") + return + + try: + files = {"snapshot": ("snapshot.jpg", snapshot, "image/jpeg")} + data = { + "camera_id": event["camera_id"], + "event_type": "motion", + "confidence": min(event["area_percent"] * 10, 100), + "area_percent": event["area_percent"], + } + response = requests.post(COLLECTOR_URL, data=data, files=files, timeout=5) + if response.ok: + print(f"🔔 Motion event sent: {event['camera_id']}") + else: + print(f"⚠️ Collector error: {response.status_code}") + except Exception as e: + print(f"⚠️ Failed to send event: {e}") + + +def motion_detection_loop(): + """Single thread that cycles through all cameras for motion detection.""" + print(f"🎯 Motion detection started (cycling {len(CAMERAS)} cameras, {CYCLE_INTERVAL}s interval)") + + camera_list = list(CAMERAS.items()) + + while True: + for cam_id, device_index in camera_list: + try: + with camera_lock: + frame = capture_frame(device_index, DEFAULT_WIDTH, DEFAULT_HEIGHT) + + if frame is None: + continue + + event = detect_motion(cam_id, frame) + + if event: + # Capture fresh snapshot for the event + with camera_lock: + snap_frame = capture_frame(device_index, DEFAULT_WIDTH, DEFAULT_HEIGHT) + if snap_frame is not None: + ret, buffer = cv2.imencode('.jpg', snap_frame, [cv2.IMWRITE_JPEG_QUALITY, JPEG_QUALITY]) + if ret: + send_event(event, buffer.tobytes()) + + except Exception as e: + print(f"Motion error on {cam_id}: {e}") + + # Wait before checking next camera + time.sleep(CYCLE_INTERVAL) + + +def verify_api_key(api_key: str = Security(api_key_header)) -> str: + if api_key != API_KEY: + raise HTTPException(status_code=403, detail="Invalid API key") + return api_key + + +# Startup +for cam_id, dev_idx in CAMERAS.items(): + print(f"🦊 Registered: {cam_id} -> /dev/video{dev_idx}") + +if MOTION_ENABLED: + motion_thread = threading.Thread(target=motion_detection_loop, daemon=True) + motion_thread.start() +else: + print("📷 Motion detection disabled") + +DEFAULT_CAM = list(CAMERAS.keys())[0] if CAMERAS else None + + +@app.get("/") +def root(): + return { + "service": "vixy-vision Camera Server", + "version": "3.2.0", + "mode": "cycling-motion", + "cameras": list(CAMERAS.keys()), + "motion_enabled": MOTION_ENABLED, + "cycle_interval": CYCLE_INTERVAL, + } + + +@app.get("/health") +def health(): + return {"status": "ok", "cameras": list(CAMERAS.keys()), "motion": MOTION_ENABLED} + + +@app.get("/snapshot") +def snapshot_default(api_key: str = Security(verify_api_key)): + if not DEFAULT_CAM: + raise HTTPException(status_code=503, detail="No cameras configured") + return snapshot_by_id(DEFAULT_CAM, api_key) + + +@app.get("/snapshot/{cam_id}") +def snapshot_by_id(cam_id: str, api_key: str = Security(verify_api_key)): + if cam_id not in CAMERAS: + raise HTTPException(status_code=404, detail=f"Camera '{cam_id}' not found") + + device_index = CAMERAS[cam_id] + snapshot = capture_snapshot(device_index, DEFAULT_WIDTH, DEFAULT_HEIGHT) + + if snapshot is None: + raise HTTPException(status_code=503, detail=f"Camera '{cam_id}' unavailable") + + return Response( + content=snapshot, + media_type="image/jpeg", + headers={"Cache-Control": "no-cache", "X-Camera-ID": cam_id} + ) + + +if __name__ == "__main__": + import uvicorn + uvicorn.run("main:app", host="0.0.0.0", port=8443, + ssl_keyfile="ssl/key.pem", ssl_certfile="ssl/cert.pem") diff --git a/server/main_release.py b/server/main_release.py new file mode 100644 index 0000000..89d1a1a --- /dev/null +++ b/server/main_release.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python3 +""" +vixy-vision Camera Server v3.1 - Multi-Camera with Release-After-Use + +For Pi with V4L2 limitations - releases camera after each snapshot +to allow multiple cameras to work. +""" + +import os +import cv2 +import json +import threading +from typing import Optional, Dict +from dotenv import load_dotenv +from fastapi import FastAPI, Security, HTTPException, Response, Path +from fastapi.security import APIKeyHeader + +load_dotenv() + +# Configuration +API_KEY = os.getenv("API_KEY") +JPEG_QUALITY = int(os.getenv("JPEG_QUALITY", "85")) +CAMERAS_CONFIG = os.getenv("CAMERAS", '{"camera": 0}') +DEFAULT_WIDTH = int(os.getenv("CAMERA_WIDTH", "1920")) +DEFAULT_HEIGHT = int(os.getenv("CAMERA_HEIGHT", "1080")) + +if not API_KEY: + raise ValueError("API_KEY not set in .env file") + +CAMERAS: Dict[str, int] = json.loads(CAMERAS_CONFIG) + +app = FastAPI( + title="vixy-vision Camera Server", + description="Multi-camera snapshots for the fox 🦊 (release-after-use mode)", + version="3.1.0" +) + +api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) + +# Global lock to prevent simultaneous camera access on Pi +camera_lock = threading.Lock() + + +def capture_snapshot(device_index: int, width: int, height: int) -> Optional[bytes]: + """ + Capture a single snapshot, then release the camera. + Uses global lock to prevent V4L2 conflicts. + """ + with camera_lock: + cap = None + try: + cap = cv2.VideoCapture(device_index) + if not cap.isOpened(): + print(f"Failed to open /dev/video{device_index}") + return None + + cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) + cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) + cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) + + # Grab a few frames to get fresh image + for _ in range(3): + cap.grab() + + ret, frame = cap.read() + if not ret or frame is None: + print(f"Failed to read from /dev/video{device_index}") + return None + + ret, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, JPEG_QUALITY]) + return buffer.tobytes() if ret else None + + except Exception as e: + print(f"Error capturing from /dev/video{device_index}: {e}") + return None + finally: + if cap is not None: + cap.release() + + +def verify_api_key(api_key: str = Security(api_key_header)) -> str: + if api_key != API_KEY: + raise HTTPException(status_code=403, detail="Invalid API key") + return api_key + + +# Print registered cameras on startup +for cam_id, dev_idx in CAMERAS.items(): + print(f"🦊 Registered: {cam_id} -> /dev/video{dev_idx}") + +DEFAULT_CAM = list(CAMERAS.keys())[0] if CAMERAS else None + + +@app.get("/") +def root(): + return { + "service": "vixy-vision Camera Server", + "version": "3.1.0", + "mode": "release-after-use", + "cameras": list(CAMERAS.keys()), + } + + +@app.get("/health") +def health(): + return {"status": "ok", "cameras": list(CAMERAS.keys())} + + +@app.get("/snapshot") +def snapshot_default(api_key: str = Security(verify_api_key)): + """Snapshot from default camera""" + if not DEFAULT_CAM: + raise HTTPException(status_code=503, detail="No cameras configured") + return snapshot_by_id(DEFAULT_CAM, api_key) + + +@app.get("/snapshot/{cam_id}") +def snapshot_by_id(cam_id: str, api_key: str = Security(verify_api_key)): + """Snapshot from specific camera""" + if cam_id not in CAMERAS: + raise HTTPException(status_code=404, detail=f"Camera '{cam_id}' not found") + + device_index = CAMERAS[cam_id] + snapshot = capture_snapshot(device_index, DEFAULT_WIDTH, DEFAULT_HEIGHT) + + if snapshot is None: + raise HTTPException(status_code=503, detail=f"Camera '{cam_id}' unavailable") + + return Response( + content=snapshot, + media_type="image/jpeg", + headers={"Cache-Control": "no-cache", "X-Camera-ID": cam_id} + ) + + +if __name__ == "__main__": + import uvicorn + uvicorn.run("main_multi:app", host="0.0.0.0", port=8443, + ssl_keyfile="ssl/key.pem", ssl_certfile="ssl/cert.pem")