Files
vixy-vision/server/motion.py
Alex 1bcf32889f Add label whitelist to filter detection types
DETECTION_LABELS env var accepts comma-separated list (e.g. "person,cat,dog").
Only matching detections are reported; others are ignored. Empty = report all.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 19:08:31 -06:00

300 lines
10 KiB
Python

#!/usr/bin/env python3
"""
Motion Detection Module
Frame-differencing motion detection with optional object detection.
Runs as background thread, POSTs events to collector on Mac mini.
When object detection is enabled, motion acts as a pre-filter:
motion triggers -> object detection confirms -> event reported.
If no objects are found, the event can be suppressed (configurable).
"""
import os
import cv2
import time
import threading
import logging
import httpx
import base64
from datetime import datetime
from typing import Optional, Callable
from dataclasses import dataclass, asdict, field
from pathlib import Path
logger = logging.getLogger(__name__)
@dataclass
class MotionEvent:
"""Motion/detection event"""
timestamp: str
camera_id: str
event_type: str = "motion"
confidence: float = 0.0
region: str = "full" # Could be "left", "right", "center" etc.
area_percent: float = 0.0 # % of frame with motion
detections: Optional[list] = None # List of detection dicts when objects found
class MotionDetector:
"""
Background motion detection with optional object detection.
Uses frame differencing to detect motion. When object detection is
enabled, runs inference on motion frames to identify objects and
suppress false positives.
"""
def __init__(
self,
camera_id: str,
collector_url: Optional[str] = None,
collector_api_key: Optional[str] = None,
threshold: int = 25, # Pixel difference threshold
min_area_percent: float = 0.5, # Minimum % of frame to trigger
cooldown_seconds: float = 5.0, # Seconds between events
check_interval: float = 0.5, # Seconds between frame checks
# Object detection
detection_enabled: bool = False,
detection_model_path: Optional[str] = None,
detection_labels_path: Optional[str] = None,
detection_confidence: float = 0.5,
detection_suppress_empty: bool = True,
detection_labels: Optional[str] = None,
):
self.camera_id = camera_id
self.collector_url = collector_url
self.collector_api_key = collector_api_key
self.threshold = threshold
self.min_area_percent = min_area_percent
self.cooldown_seconds = cooldown_seconds
self.check_interval = check_interval
self.detection_suppress_empty = detection_suppress_empty
self._previous_frame: Optional[any] = None
self._last_event_time: float = 0
self._running = False
self._thread: Optional[threading.Thread] = None
self._get_frame: Optional[Callable] = None
# Object detector (lazy import to avoid requiring tflite when disabled)
self._detector = None
if detection_enabled and detection_model_path:
try:
from detector import ObjectDetector
label_whitelist = None
if detection_labels:
label_whitelist = set(l.strip() for l in detection_labels.split(","))
self._detector = ObjectDetector(
model_path=detection_model_path,
labels_path=detection_labels_path or "",
confidence_threshold=detection_confidence,
label_whitelist=label_whitelist,
)
logger.info(f"Object detection enabled (model: {detection_model_path})")
except ImportError as e:
logger.error(f"Object detection unavailable: {e}")
# Stats
self.events_detected = 0
self.events_reported = 0
self.events_suppressed = 0
self.last_event: Optional[MotionEvent] = None
def start(self, get_frame_func: Callable):
"""
Start motion detection in background thread.
Args:
get_frame_func: Function that returns current frame as numpy array
"""
if self._running:
logger.warning("Motion detector already running")
return
self._get_frame = get_frame_func
self._running = True
self._thread = threading.Thread(target=self._detection_loop, daemon=True)
self._thread.start()
logger.info(f"Motion detection started (threshold={self.threshold}, cooldown={self.cooldown_seconds}s)")
def stop(self):
"""Stop motion detection"""
self._running = False
if self._thread:
self._thread.join(timeout=2.0)
logger.info("Motion detection stopped")
def _detection_loop(self):
"""Main detection loop - runs in background thread"""
while self._running:
try:
self._check_for_motion()
except Exception as e:
logger.error(f"Motion detection error: {e}")
time.sleep(self.check_interval)
def _check_for_motion(self):
"""Check current frame for motion"""
if not self._get_frame:
return
# Get current frame
frame = self._get_frame()
if frame is None:
return
# Convert to grayscale for comparison
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray = cv2.GaussianBlur(gray, (21, 21), 0)
# Need previous frame to compare
if self._previous_frame is None:
self._previous_frame = gray
return
# Compute difference
frame_delta = cv2.absdiff(self._previous_frame, gray)
thresh = cv2.threshold(frame_delta, self.threshold, 255, cv2.THRESH_BINARY)[1]
# Dilate to fill gaps
thresh = cv2.dilate(thresh, None, iterations=2)
# Calculate motion area percentage
motion_pixels = cv2.countNonZero(thresh)
total_pixels = thresh.shape[0] * thresh.shape[1]
area_percent = (motion_pixels / total_pixels) * 100
# Update previous frame
self._previous_frame = gray
# Check if motion exceeds threshold
if area_percent >= self.min_area_percent:
self._handle_motion(frame, area_percent)
def _handle_motion(self, frame, area_percent: float):
"""Handle detected motion, optionally running object detection"""
now = time.time()
# Check cooldown
if now - self._last_event_time < self.cooldown_seconds:
return
self._last_event_time = now
self.events_detected += 1
# Run object detection if enabled
detections_list = []
detections_dicts = None
snapshot_frame = frame
if self._detector:
try:
detections_list = self._detector.detect(frame)
except Exception as e:
logger.error(f"Object detection error: {e}")
if detections_list:
detections_dicts = [{
"label": d.label,
"confidence": round(d.confidence, 3),
"bbox": [round(x, 4) for x in d.bbox],
} for d in detections_list]
# Draw bounding boxes on snapshot
try:
from detector import annotate_frame
snapshot_frame = annotate_frame(frame, detections_list)
except Exception as e:
logger.warning(f"Failed to annotate frame: {e}")
elif self.detection_suppress_empty:
self.events_suppressed += 1
logger.debug(
f"Motion ({area_percent:.1f}%) but no objects detected - suppressed "
f"({self.events_suppressed} total)"
)
return
# Create event
if detections_list:
top_confidence = max(d.confidence for d in detections_list)
event_type = "object"
else:
top_confidence = min(area_percent / 10.0, 1.0)
event_type = "motion"
event = MotionEvent(
timestamp=datetime.utcnow().isoformat() + "Z",
camera_id=self.camera_id,
event_type=event_type,
confidence=round(top_confidence, 3),
area_percent=round(area_percent, 2),
detections=detections_dicts,
)
self.last_event = event
if detections_list:
labels = ", ".join(f"{d.label}({d.confidence:.0%})" for d in detections_list)
logger.info(f"Objects detected: {labels} (motion: {area_percent:.1f}%)")
else:
logger.info(f"Motion detected: {area_percent:.1f}% of frame (confidence: {event.confidence:.2f})")
# Report to collector
if self.collector_url:
self._report_event(event, snapshot_frame)
def _report_event(self, event: MotionEvent, frame):
"""POST event to collector endpoint"""
try:
# Encode frame as JPEG
_, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
snapshot_b64 = base64.b64encode(buffer.tobytes()).decode('utf-8')
# Build payload
payload = {
"event": asdict(event),
"snapshot": snapshot_b64,
}
# POST to collector
headers = {"Content-Type": "application/json"}
if self.collector_api_key:
headers["X-API-Key"] = self.collector_api_key
# Use sync client (we're in a thread)
with httpx.Client(timeout=5.0, verify=False) as client:
response = client.post(
self.collector_url,
json=payload,
headers=headers,
)
if response.status_code == 200:
self.events_reported += 1
logger.info(f"Event reported to collector ({self.events_reported} total)")
else:
logger.warning(f"Collector returned {response.status_code}: {response.text[:100]}")
except Exception as e:
logger.error(f"Failed to report event: {e}")
def get_stats(self) -> dict:
"""Get detection statistics"""
return {
"running": self._running,
"events_detected": self.events_detected,
"events_reported": self.events_reported,
"events_suppressed": self.events_suppressed,
"detection_enabled": self._detector is not None,
"last_event": asdict(self.last_event) if self.last_event else None,
"config": {
"threshold": self.threshold,
"min_area_percent": self.min_area_percent,
"cooldown_seconds": self.cooldown_seconds,
"collector_url": self.collector_url,
}
}