Add TFLite object detection to reduce false positives
Motion detection now optionally runs MobileNet V2 SSD (COCO, quantized) on frames that trigger motion, identifying objects like people, cats, and cars. Events without detected objects are suppressed by default. Snapshots include bounding box annotations. New MCP tool vision_get_detections() enables label-based queries. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
210
server/detector.py
Normal file
210
server/detector.py
Normal file
@@ -0,0 +1,210 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Object Detection Module
|
||||
|
||||
Lightweight object detection using TensorFlow Lite with MobileNet V2 SSD.
|
||||
Designed to run on Raspberry Pi 4/5 with minimal overhead.
|
||||
|
||||
The model is lazy-loaded on first detect() call to avoid startup delay.
|
||||
"""
|
||||
|
||||
import cv2
|
||||
import logging
|
||||
import numpy as np
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Detection:
|
||||
"""A single detected object"""
|
||||
label: str
|
||||
confidence: float
|
||||
bbox: tuple # (x_min, y_min, x_max, y_max) normalized 0-1
|
||||
class_id: int
|
||||
|
||||
|
||||
class ObjectDetector:
|
||||
"""
|
||||
Object detection using TFLite MobileNet V2 SSD.
|
||||
|
||||
Lazy-loads the model on first detect() call. Designed to be called
|
||||
from the motion detection thread after motion is confirmed.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
model_path: str,
|
||||
labels_path: str,
|
||||
confidence_threshold: float = 0.5,
|
||||
):
|
||||
self.model_path = Path(model_path)
|
||||
self.labels_path = Path(labels_path)
|
||||
self.confidence_threshold = confidence_threshold
|
||||
|
||||
self._interpreter = None
|
||||
self._input_details = None
|
||||
self._output_details = None
|
||||
self._labels: list[str] = []
|
||||
self._input_height = 0
|
||||
self._input_width = 0
|
||||
|
||||
def _load_model(self):
|
||||
"""Load TFLite model and label map"""
|
||||
try:
|
||||
import tflite_runtime.interpreter as tflite
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
"tflite-runtime not installed. "
|
||||
"Install with: pip install tflite-runtime"
|
||||
)
|
||||
|
||||
if not self.model_path.exists():
|
||||
raise FileNotFoundError(
|
||||
f"Model file not found: {self.model_path}\n"
|
||||
f"Run download_model.sh to download the model."
|
||||
)
|
||||
|
||||
# Load labels
|
||||
if self.labels_path.exists():
|
||||
self._labels = self.labels_path.read_text().strip().splitlines()
|
||||
else:
|
||||
logger.warning(f"Labels file not found: {self.labels_path}")
|
||||
self._labels = []
|
||||
|
||||
# Try XNNPACK delegate for ARM acceleration
|
||||
delegates = []
|
||||
try:
|
||||
delegates = [tflite.load_delegate('libXNNPACK.so')]
|
||||
logger.info("XNNPACK delegate loaded")
|
||||
except (ValueError, OSError):
|
||||
logger.info("XNNPACK delegate not available, using default CPU")
|
||||
|
||||
# Load model
|
||||
self._interpreter = tflite.Interpreter(
|
||||
model_path=str(self.model_path),
|
||||
experimental_delegates=delegates if delegates else None,
|
||||
)
|
||||
self._interpreter.allocate_tensors()
|
||||
|
||||
self._input_details = self._interpreter.get_input_details()
|
||||
self._output_details = self._interpreter.get_output_details()
|
||||
|
||||
# Get expected input size
|
||||
input_shape = self._input_details[0]['shape']
|
||||
self._input_height = input_shape[1]
|
||||
self._input_width = input_shape[2]
|
||||
|
||||
logger.info(
|
||||
f"Object detection model loaded: {self.model_path.name} "
|
||||
f"(input: {self._input_width}x{self._input_height}, "
|
||||
f"{len(self._labels)} classes)"
|
||||
)
|
||||
|
||||
def detect(self, frame: np.ndarray) -> list[Detection]:
|
||||
"""
|
||||
Run object detection on a frame.
|
||||
|
||||
Args:
|
||||
frame: BGR numpy array from OpenCV
|
||||
|
||||
Returns:
|
||||
List of Detection objects above confidence threshold
|
||||
"""
|
||||
# Lazy load
|
||||
if self._interpreter is None:
|
||||
self._load_model()
|
||||
|
||||
# Preprocess: resize and convert BGR to RGB
|
||||
input_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||
input_frame = cv2.resize(input_frame, (self._input_width, self._input_height))
|
||||
input_data = np.expand_dims(input_frame, axis=0)
|
||||
|
||||
# Ensure correct dtype
|
||||
input_dtype = self._input_details[0]['dtype']
|
||||
if input_dtype == np.uint8:
|
||||
input_data = input_data.astype(np.uint8)
|
||||
elif input_dtype == np.float32:
|
||||
input_data = (input_data / 255.0).astype(np.float32)
|
||||
|
||||
# Run inference
|
||||
self._interpreter.set_tensor(self._input_details[0]['index'], input_data)
|
||||
self._interpreter.invoke()
|
||||
|
||||
# Parse outputs (SSD MobileNet post-processed format):
|
||||
# [0] bounding boxes: [1, N, 4] (y_min, x_min, y_max, x_max) normalized
|
||||
# [1] class IDs: [1, N]
|
||||
# [2] scores: [1, N]
|
||||
# [3] number of detections: [1]
|
||||
boxes = self._interpreter.get_tensor(self._output_details[0]['index'])[0]
|
||||
class_ids = self._interpreter.get_tensor(self._output_details[1]['index'])[0]
|
||||
scores = self._interpreter.get_tensor(self._output_details[2]['index'])[0]
|
||||
num_detections = int(self._interpreter.get_tensor(self._output_details[3]['index'])[0])
|
||||
|
||||
# Filter by confidence
|
||||
detections = []
|
||||
for i in range(num_detections):
|
||||
score = float(scores[i])
|
||||
if score < self.confidence_threshold:
|
||||
continue
|
||||
|
||||
class_id = int(class_ids[i])
|
||||
label = self._labels[class_id] if class_id < len(self._labels) else f"class_{class_id}"
|
||||
|
||||
# Convert from (y_min, x_min, y_max, x_max) to (x_min, y_min, x_max, y_max)
|
||||
y_min, x_min, y_max, x_max = boxes[i]
|
||||
bbox = (
|
||||
float(np.clip(x_min, 0, 1)),
|
||||
float(np.clip(y_min, 0, 1)),
|
||||
float(np.clip(x_max, 0, 1)),
|
||||
float(np.clip(y_max, 0, 1)),
|
||||
)
|
||||
|
||||
detections.append(Detection(
|
||||
label=label,
|
||||
confidence=score,
|
||||
bbox=bbox,
|
||||
class_id=class_id,
|
||||
))
|
||||
|
||||
return detections
|
||||
|
||||
|
||||
def annotate_frame(frame: np.ndarray, detections: list[Detection]) -> np.ndarray:
|
||||
"""
|
||||
Draw bounding boxes and labels on a frame.
|
||||
|
||||
Args:
|
||||
frame: BGR numpy array (will be copied, not modified in place)
|
||||
detections: List of Detection objects
|
||||
|
||||
Returns:
|
||||
Annotated copy of the frame
|
||||
"""
|
||||
annotated = frame.copy()
|
||||
h, w = annotated.shape[:2]
|
||||
|
||||
for det in detections:
|
||||
x1 = int(det.bbox[0] * w)
|
||||
y1 = int(det.bbox[1] * h)
|
||||
x2 = int(det.bbox[2] * w)
|
||||
y2 = int(det.bbox[3] * h)
|
||||
|
||||
# Green box with label
|
||||
color = (0, 255, 0)
|
||||
cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 2)
|
||||
|
||||
label_text = f"{det.label} {det.confidence:.0%}"
|
||||
font_scale = 0.6
|
||||
thickness = 1
|
||||
(tw, th), _ = cv2.getTextSize(label_text, cv2.FONT_HERSHEY_SIMPLEX, font_scale, thickness)
|
||||
|
||||
# Background for text
|
||||
cv2.rectangle(annotated, (x1, y1 - th - 8), (x1 + tw + 4, y1), color, -1)
|
||||
cv2.putText(annotated, label_text, (x1 + 2, y1 - 4),
|
||||
cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 0), thickness)
|
||||
|
||||
return annotated
|
||||
Reference in New Issue
Block a user