- Use letterbox resize (preserve aspect ratio + pad) instead of stretching to 320x320. Stretching 16:9 frames caused faces to be undetectable. - Auto-detect score tensor output index at init time (name + variance heuristic) - Smart upper-body crop: roughly square region instead of thin wide strip - Throttle face detection to every 2s to reduce Coral USB traffic - Skip crops smaller than 80px (too small for reliable detection) - Reduce log level from DEBUG to INFO Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
345 lines
13 KiB
Python
345 lines
13 KiB
Python
"""
|
|
Face Recognition Module for OAK-D Vision Service
|
|
Coral Edge TPU for face detection + CPU FaceNet for embeddings + SQLite DB
|
|
"""
|
|
|
|
import sqlite3
|
|
import threading
|
|
import time
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
import ai_edge_litert.interpreter as tfl
|
|
import cv2
|
|
import numpy as np
|
|
|
|
logger = logging.getLogger("face_recognition")
|
|
logger.setLevel(logging.INFO)
|
|
|
|
FACE_DETECT_THRESHOLD = 0.5
|
|
RECOGNITION_THRESHOLD = 0.5
|
|
EMBEDDING_DIM = 512
|
|
MIN_CROP_SIZE = 80 # minimum pixels in both dimensions for face detection
|
|
FACE_DETECT_INTERVAL = 2.0 # seconds between face detection runs in process_frame
|
|
|
|
|
|
class FaceRecognizer:
|
|
def __init__(self, face_model_path, embed_model_path, db_path="faces.db"):
|
|
self._lock = threading.Lock()
|
|
|
|
# Coral face detector
|
|
logger.info("Loading face detection model on Edge TPU...")
|
|
delegate = tfl.load_delegate("libedgetpu.so.1")
|
|
self._face_interp = tfl.Interpreter(
|
|
model_path=str(face_model_path),
|
|
experimental_delegates=[delegate],
|
|
)
|
|
self._face_interp.allocate_tensors()
|
|
self._face_input = self._face_interp.get_input_details()[0]
|
|
self._face_outputs = self._face_interp.get_output_details()
|
|
|
|
# Log output tensor details to determine correct index mapping
|
|
for i, o in enumerate(self._face_outputs):
|
|
logger.info("Face detector output[%d]: name=%s shape=%s", i, o["name"], o["shape"])
|
|
|
|
# Determine score tensor index: run a test inference to find which
|
|
# [1,N] tensor has non-zero values (scores) vs all-zeros (class IDs)
|
|
inp_shape = self._face_input["shape"]
|
|
test_input = np.zeros(inp_shape, dtype=self._face_input["dtype"])
|
|
self._face_interp.set_tensor(self._face_input["index"], test_input)
|
|
self._face_interp.invoke()
|
|
|
|
# Output 0 is boxes [1,N,4], output 3 is count [1]
|
|
# Outputs 1 and 2 are scores and classes (order varies by model)
|
|
t1 = self._face_interp.get_tensor(self._face_outputs[1]["index"])
|
|
t2 = self._face_interp.get_tensor(self._face_outputs[2]["index"])
|
|
# For a blank image: scores should be low but potentially non-zero,
|
|
# while class IDs for a single-class model are always 0.0
|
|
# Use output name as primary signal if available
|
|
self._score_output_idx = 2 # default
|
|
for i in (1, 2):
|
|
name = self._face_outputs[i].get("name", "").lower()
|
|
if "score" in name:
|
|
self._score_output_idx = i
|
|
break
|
|
if "class" in name:
|
|
self._score_output_idx = 2 if i == 1 else 1
|
|
break
|
|
else:
|
|
# No name match — use heuristic: pick the one with higher variance
|
|
if np.std(t1) > np.std(t2):
|
|
self._score_output_idx = 1
|
|
else:
|
|
self._score_output_idx = 2
|
|
|
|
logger.info("Face detector: using output[%d] as scores", self._score_output_idx)
|
|
logger.info(
|
|
"Face detector ready: input %s %s",
|
|
self._face_input["shape"],
|
|
self._face_input["dtype"],
|
|
)
|
|
|
|
# CPU FaceNet embedder
|
|
logger.info("Loading FaceNet embedding model on CPU...")
|
|
self._embed_interp = tfl.Interpreter(model_path=str(embed_model_path))
|
|
self._embed_interp.allocate_tensors()
|
|
self._embed_input = self._embed_interp.get_input_details()[0]
|
|
self._embed_output = self._embed_interp.get_output_details()[0]
|
|
logger.info(
|
|
"FaceNet ready: input %s, output %s",
|
|
self._embed_input["shape"],
|
|
self._embed_output["shape"],
|
|
)
|
|
|
|
# SQLite DB
|
|
self._db_path = str(db_path)
|
|
self._db = sqlite3.connect(self._db_path, check_same_thread=False)
|
|
self._db.execute(
|
|
"""CREATE TABLE IF NOT EXISTS faces (
|
|
id INTEGER PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
embedding BLOB NOT NULL,
|
|
enrolled_at REAL NOT NULL,
|
|
source TEXT
|
|
)"""
|
|
)
|
|
self._db.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_faces_name ON faces(name)"
|
|
)
|
|
self._db.commit()
|
|
|
|
# Load embedding cache
|
|
self._cache = [] # list of (name, embedding_array)
|
|
self._reload_cache()
|
|
logger.info("Face DB: %d embeddings loaded", len(self._cache))
|
|
|
|
# Throttling for process_frame
|
|
self._last_face_detect_time = 0.0
|
|
self._last_face_results = [] # cached results from last detection
|
|
|
|
def _reload_cache(self):
|
|
rows = self._db.execute("SELECT name, embedding FROM faces").fetchall()
|
|
cache = []
|
|
for name, blob in rows:
|
|
emb = np.frombuffer(blob, dtype=np.float32).copy()
|
|
if len(emb) == EMBEDDING_DIM:
|
|
cache.append((name, emb))
|
|
self._cache = cache
|
|
|
|
def _detect_face(self, image):
|
|
"""Run face detection on Coral. Returns best face bbox (y1,x1,y2,x2 in pixels) or None."""
|
|
h, w = image.shape[:2]
|
|
|
|
# Skip if crop is too small for reliable face detection
|
|
if h < MIN_CROP_SIZE or w < MIN_CROP_SIZE:
|
|
logger.debug("detect_face: skipping %dx%d crop (too small)", w, h)
|
|
return None, 0.0
|
|
|
|
inp_h, inp_w = self._face_input["shape"][1:3]
|
|
|
|
# Letterbox resize: preserve aspect ratio, pad with black
|
|
scale = min(inp_w / w, inp_h / h)
|
|
new_w = int(w * scale)
|
|
new_h = int(h * scale)
|
|
resized = cv2.resize(image, (new_w, new_h))
|
|
|
|
# Create padded input
|
|
padded = np.zeros((inp_h, inp_w, 3), dtype=np.uint8)
|
|
pad_y = (inp_h - new_h) // 2
|
|
pad_x = (inp_w - new_w) // 2
|
|
padded[pad_y:pad_y + new_h, pad_x:pad_x + new_w] = resized
|
|
|
|
self._face_interp.set_tensor(
|
|
self._face_input["index"], padded[np.newaxis]
|
|
)
|
|
self._face_interp.invoke()
|
|
|
|
boxes = self._face_interp.get_tensor(self._face_outputs[0]["index"])[0]
|
|
scores = self._face_interp.get_tensor(self._face_outputs[self._score_output_idx]["index"])[0]
|
|
count = int(
|
|
self._face_interp.get_tensor(self._face_outputs[3]["index"])[0]
|
|
)
|
|
|
|
best_score = 0.0
|
|
best_box = None
|
|
for i in range(min(count, len(scores))):
|
|
if scores[i] >= FACE_DETECT_THRESHOLD and scores[i] > best_score:
|
|
best_score = scores[i]
|
|
# boxes are [ymin, xmin, ymax, xmax] normalized 0-1
|
|
# Map back from letterboxed coords to original image coords
|
|
ymin, xmin, ymax, xmax = boxes[i]
|
|
# Convert from padded coords to original
|
|
orig_y1 = max(0, int((ymin * inp_h - pad_y) / scale))
|
|
orig_x1 = max(0, int((xmin * inp_w - pad_x) / scale))
|
|
orig_y2 = min(h, int((ymax * inp_h - pad_y) / scale))
|
|
orig_x2 = min(w, int((xmax * inp_w - pad_x) / scale))
|
|
best_box = (orig_y1, orig_x1, orig_y2, orig_x2)
|
|
|
|
if best_box is not None:
|
|
logger.debug("detect_face: %dx%d -> face at score=%.2f", w, h, best_score)
|
|
return best_box, best_score
|
|
|
|
def _compute_embedding(self, face_image):
|
|
"""Compute 512-dim embedding from a face crop. Returns numpy array."""
|
|
inp_h, inp_w = self._embed_input["shape"][1:3]
|
|
resized = cv2.resize(face_image, (inp_w, inp_h))
|
|
# FaceNet preprocessing: normalize to [-1, 1]
|
|
normalized = (resized.astype(np.float32) / 127.5) - 1.0
|
|
self._embed_interp.set_tensor(
|
|
self._embed_input["index"], normalized[np.newaxis]
|
|
)
|
|
self._embed_interp.invoke()
|
|
return self._embed_interp.get_tensor(self._embed_output["index"])[0].copy()
|
|
|
|
def _match_embedding(self, embedding):
|
|
"""Match embedding against DB. Returns (name, confidence) or (None, 0.0)."""
|
|
cache = self._cache # snapshot reference
|
|
if not cache:
|
|
return None, 0.0
|
|
|
|
# Cosine similarity (embeddings are L2-normalized, so dot product works)
|
|
best_scores = {} # name -> best score
|
|
for name, stored_emb in cache:
|
|
score = float(np.dot(embedding, stored_emb))
|
|
if name not in best_scores or score > best_scores[name]:
|
|
best_scores[name] = score
|
|
|
|
if not best_scores:
|
|
return None, 0.0
|
|
|
|
best_name = max(best_scores, key=best_scores.get)
|
|
best_conf = best_scores[best_name]
|
|
|
|
if best_conf >= RECOGNITION_THRESHOLD:
|
|
return best_name, best_conf
|
|
return None, best_conf
|
|
|
|
def process_frame(self, rgb_frame, person_detections):
|
|
"""Process an RGB frame with person detections, return face recognition results.
|
|
|
|
Args:
|
|
rgb_frame: BGR numpy array from OAK-D (H, W, 3)
|
|
person_detections: list of depthai detection objects with
|
|
xmin/ymin/xmax/ymax (normalized 0-1)
|
|
|
|
Returns:
|
|
list of dicts (same order as person_detections):
|
|
{recognized_name: str|None, recognition_confidence: float|None}
|
|
"""
|
|
now = time.monotonic()
|
|
if now - self._last_face_detect_time < FACE_DETECT_INTERVAL:
|
|
return self._last_face_results
|
|
|
|
self._last_face_detect_time = now
|
|
h, w = rgb_frame.shape[:2]
|
|
results = []
|
|
|
|
for det in person_detections:
|
|
# Crop upper 50% of person bbox as a roughly square region
|
|
px1 = max(0, int(det.xmin * w))
|
|
py1 = max(0, int(det.ymin * h))
|
|
px2 = min(w, int(det.xmax * w))
|
|
py2 = min(h, int(det.ymax * h))
|
|
|
|
bbox_w = px2 - px1
|
|
bbox_h = py2 - py1
|
|
upper_h = int(bbox_h * 0.5)
|
|
|
|
# Make crop roughly square: if width >> height, narrow it
|
|
# Center the crop horizontally on the person bbox
|
|
crop_h = upper_h
|
|
crop_w = max(bbox_w, upper_h) # at least as wide as tall
|
|
if bbox_w > upper_h * 2:
|
|
# Very wide bbox — narrow to ~1.5x the height, centered
|
|
crop_w = int(upper_h * 1.5)
|
|
cx = (px1 + px2) // 2
|
|
crop_x1 = max(0, cx - crop_w // 2)
|
|
crop_x2 = min(w, cx + crop_w // 2)
|
|
|
|
crop = rgb_frame[py1:py1 + crop_h, crop_x1:crop_x2]
|
|
if crop.size == 0:
|
|
results.append({"recognized_name": None, "recognition_confidence": None})
|
|
continue
|
|
|
|
# Face detection on Coral
|
|
face_box, face_score = self._detect_face(crop)
|
|
if face_box is None:
|
|
results.append({"recognized_name": None, "recognition_confidence": None})
|
|
continue
|
|
|
|
# Crop face and compute embedding
|
|
fy1, fx1, fy2, fx2 = face_box
|
|
face_crop = crop[fy1:fy2, fx1:fx2]
|
|
if face_crop.size == 0:
|
|
results.append({"recognized_name": None, "recognition_confidence": None})
|
|
continue
|
|
|
|
embedding = self._compute_embedding(face_crop)
|
|
name, confidence = self._match_embedding(embedding)
|
|
|
|
results.append({
|
|
"recognized_name": name,
|
|
"recognition_confidence": round(confidence, 3),
|
|
})
|
|
|
|
self._last_face_results = results
|
|
return results
|
|
|
|
def enroll(self, name, image):
|
|
"""Detect face in image, compute embedding, store in DB.
|
|
|
|
Args:
|
|
name: person's name
|
|
image: BGR numpy array containing a face
|
|
|
|
Returns:
|
|
dict with success status and embedding count
|
|
"""
|
|
face_box, face_score = self._detect_face(image)
|
|
if face_box is None:
|
|
return {"success": False, "error": "No face detected in image"}
|
|
|
|
fy1, fx1, fy2, fx2 = face_box
|
|
face_crop = image[fy1:fy2, fx1:fx2]
|
|
if face_crop.size == 0:
|
|
return {"success": False, "error": "Face crop is empty"}
|
|
|
|
embedding = self._compute_embedding(face_crop)
|
|
|
|
with self._lock:
|
|
self._db.execute(
|
|
"INSERT INTO faces (name, embedding, enrolled_at, source) VALUES (?, ?, ?, ?)",
|
|
(name, embedding.tobytes(), time.time(), "api"),
|
|
)
|
|
self._db.commit()
|
|
self._reload_cache()
|
|
|
|
count = sum(1 for n, _ in self._cache if n == name)
|
|
logger.info("Enrolled face for '%s' (score=%.2f), %d total embeddings", name, face_score, count)
|
|
return {"success": True, "name": name, "embedding_count": count}
|
|
|
|
def list_faces(self):
|
|
"""Return list of enrolled names with embedding counts."""
|
|
rows = self._db.execute(
|
|
"SELECT name, COUNT(*) as cnt, MIN(enrolled_at) as first "
|
|
"FROM faces GROUP BY name ORDER BY name"
|
|
).fetchall()
|
|
return [
|
|
{"name": r[0], "embedding_count": r[1], "enrolled_at": r[2]}
|
|
for r in rows
|
|
]
|
|
|
|
def delete_face(self, name):
|
|
"""Remove all embeddings for a name."""
|
|
with self._lock:
|
|
cur = self._db.execute("DELETE FROM faces WHERE name = ?", (name,))
|
|
self._db.commit()
|
|
self._reload_cache()
|
|
deleted = cur.rowcount
|
|
logger.info("Deleted %d embeddings for '%s'", deleted, name)
|
|
return {"success": deleted > 0, "name": name, "deleted": deleted}
|
|
|
|
def close(self):
|
|
"""Close DB connection."""
|
|
self._db.close()
|