#!/usr/bin/env python3 """ Motion Detection Module Frame-differencing motion detection with optional object detection. Runs as background thread, POSTs events to collector on Mac mini. When object detection is enabled, motion acts as a pre-filter: motion triggers -> object detection confirms -> event reported. If no objects are found, the event can be suppressed (configurable). """ import os import cv2 import time import threading import logging import httpx import base64 from datetime import datetime from typing import Optional, Callable from dataclasses import dataclass, asdict, field from pathlib import Path logger = logging.getLogger(__name__) @dataclass class MotionEvent: """Motion/detection event""" timestamp: str camera_id: str event_type: str = "motion" confidence: float = 0.0 region: str = "full" # Could be "left", "right", "center" etc. area_percent: float = 0.0 # % of frame with motion detections: Optional[list] = None # List of detection dicts when objects found class MotionDetector: """ Background motion detection with optional object detection. Uses frame differencing to detect motion. When object detection is enabled, runs inference on motion frames to identify objects and suppress false positives. """ def __init__( self, camera_id: str, collector_url: Optional[str] = None, collector_api_key: Optional[str] = None, threshold: int = 25, # Pixel difference threshold min_area_percent: float = 0.5, # Minimum % of frame to trigger cooldown_seconds: float = 5.0, # Seconds between events check_interval: float = 0.5, # Seconds between frame checks # Object detection detection_enabled: bool = False, detection_model_path: Optional[str] = None, detection_labels_path: Optional[str] = None, detection_confidence: float = 0.5, detection_suppress_empty: bool = True, detection_labels: Optional[str] = None, ): self.camera_id = camera_id self.collector_url = collector_url self.collector_api_key = collector_api_key self.threshold = threshold self.min_area_percent = min_area_percent self.cooldown_seconds = cooldown_seconds self.check_interval = check_interval self.detection_suppress_empty = detection_suppress_empty self._previous_frame: Optional[any] = None self._last_event_time: float = 0 self._running = False self._thread: Optional[threading.Thread] = None self._get_frame: Optional[Callable] = None # Object detector (lazy import to avoid requiring tflite when disabled) self._detector = None if detection_enabled and detection_model_path: try: from detector import ObjectDetector label_whitelist = None if detection_labels: label_whitelist = set(l.strip() for l in detection_labels.split(",")) self._detector = ObjectDetector( model_path=detection_model_path, labels_path=detection_labels_path or "", confidence_threshold=detection_confidence, label_whitelist=label_whitelist, ) logger.info(f"Object detection enabled (model: {detection_model_path})") except ImportError as e: logger.error(f"Object detection unavailable: {e}") # Stats self.events_detected = 0 self.events_reported = 0 self.events_suppressed = 0 self.last_event: Optional[MotionEvent] = None def start(self, get_frame_func: Callable): """ Start motion detection in background thread. Args: get_frame_func: Function that returns current frame as numpy array """ if self._running: logger.warning("Motion detector already running") return self._get_frame = get_frame_func self._running = True self._thread = threading.Thread(target=self._detection_loop, daemon=True) self._thread.start() logger.info(f"Motion detection started (threshold={self.threshold}, cooldown={self.cooldown_seconds}s)") def stop(self): """Stop motion detection""" self._running = False if self._thread: self._thread.join(timeout=2.0) logger.info("Motion detection stopped") def _detection_loop(self): """Main detection loop - runs in background thread""" while self._running: try: self._check_for_motion() except Exception as e: logger.error(f"Motion detection error: {e}") time.sleep(self.check_interval) def _check_for_motion(self): """Check current frame for motion""" if not self._get_frame: return # Get current frame frame = self._get_frame() if frame is None: return # Convert to grayscale for comparison gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) gray = cv2.GaussianBlur(gray, (21, 21), 0) # Need previous frame to compare if self._previous_frame is None: self._previous_frame = gray return # Compute difference frame_delta = cv2.absdiff(self._previous_frame, gray) thresh = cv2.threshold(frame_delta, self.threshold, 255, cv2.THRESH_BINARY)[1] # Dilate to fill gaps thresh = cv2.dilate(thresh, None, iterations=2) # Calculate motion area percentage motion_pixels = cv2.countNonZero(thresh) total_pixels = thresh.shape[0] * thresh.shape[1] area_percent = (motion_pixels / total_pixels) * 100 # Update previous frame self._previous_frame = gray # Check if motion exceeds threshold if area_percent >= self.min_area_percent: self._handle_motion(frame, area_percent) def _handle_motion(self, frame, area_percent: float): """Handle detected motion, optionally running object detection""" now = time.time() # Check cooldown if now - self._last_event_time < self.cooldown_seconds: return self._last_event_time = now self.events_detected += 1 # Run object detection if enabled detections_list = [] detections_dicts = None snapshot_frame = frame if self._detector: try: detections_list = self._detector.detect(frame) except Exception as e: logger.error(f"Object detection error: {e}") if detections_list: detections_dicts = [{ "label": d.label, "confidence": round(d.confidence, 3), "bbox": [round(x, 4) for x in d.bbox], } for d in detections_list] # Draw bounding boxes on snapshot try: from detector import annotate_frame snapshot_frame = annotate_frame(frame, detections_list) except Exception as e: logger.warning(f"Failed to annotate frame: {e}") elif self.detection_suppress_empty: self.events_suppressed += 1 logger.debug( f"Motion ({area_percent:.1f}%) but no objects detected - suppressed " f"({self.events_suppressed} total)" ) return # Create event if detections_list: top_confidence = max(d.confidence for d in detections_list) event_type = "object" else: top_confidence = min(area_percent / 10.0, 1.0) event_type = "motion" event = MotionEvent( timestamp=datetime.utcnow().isoformat() + "Z", camera_id=self.camera_id, event_type=event_type, confidence=round(top_confidence, 3), area_percent=round(area_percent, 2), detections=detections_dicts, ) self.last_event = event if detections_list: labels = ", ".join(f"{d.label}({d.confidence:.0%})" for d in detections_list) logger.info(f"Objects detected: {labels} (motion: {area_percent:.1f}%)") else: logger.info(f"Motion detected: {area_percent:.1f}% of frame (confidence: {event.confidence:.2f})") # Report to collector if self.collector_url: self._report_event(event, snapshot_frame) def _report_event(self, event: MotionEvent, frame): """POST event to collector endpoint""" try: # Encode frame as JPEG _, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) snapshot_b64 = base64.b64encode(buffer.tobytes()).decode('utf-8') # Build payload payload = { "event": asdict(event), "snapshot": snapshot_b64, } # POST to collector headers = {"Content-Type": "application/json"} if self.collector_api_key: headers["X-API-Key"] = self.collector_api_key # Use sync client (we're in a thread) with httpx.Client(timeout=5.0, verify=False) as client: response = client.post( self.collector_url, json=payload, headers=headers, ) if response.status_code == 200: self.events_reported += 1 logger.info(f"Event reported to collector ({self.events_reported} total)") else: logger.warning(f"Collector returned {response.status_code}: {response.text[:100]}") except Exception as e: logger.error(f"Failed to report event: {e}") def get_stats(self) -> dict: """Get detection statistics""" return { "running": self._running, "events_detected": self.events_detected, "events_reported": self.events_reported, "events_suppressed": self.events_suppressed, "detection_enabled": self._detector is not None, "last_event": asdict(self.last_event) if self.last_event else None, "config": { "threshold": self.threshold, "min_area_percent": self.min_area_percent, "cooldown_seconds": self.cooldown_seconds, "collector_url": self.collector_url, } }