Files
oak-service/face_recognition.py
2026-02-01 11:36:22 -06:00

271 lines
9.7 KiB
Python

"""
Face Recognition Module for OAK-D Vision Service
Coral Edge TPU for face detection + CPU FaceNet for embeddings + SQLite DB
"""
import sqlite3
import threading
import time
import logging
from pathlib import Path
import ai_edge_litert.interpreter as tfl
import cv2
import numpy as np
logger = logging.getLogger("face_recognition")
FACE_DETECT_THRESHOLD = 0.5
RECOGNITION_THRESHOLD = 0.5
EMBEDDING_DIM = 512
class FaceRecognizer:
def __init__(self, face_model_path, embed_model_path, db_path="faces.db"):
self._lock = threading.Lock()
# Coral face detector
logger.info("Loading face detection model on Edge TPU...")
delegate = tfl.load_delegate("libedgetpu.so.1")
self._face_interp = tfl.Interpreter(
model_path=str(face_model_path),
experimental_delegates=[delegate],
)
self._face_interp.allocate_tensors()
self._face_input = self._face_interp.get_input_details()[0]
self._face_outputs = self._face_interp.get_output_details()
logger.info(
"Face detector ready: input %s %s",
self._face_input["shape"],
self._face_input["dtype"],
)
# CPU FaceNet embedder
logger.info("Loading FaceNet embedding model on CPU...")
self._embed_interp = tfl.Interpreter(model_path=str(embed_model_path))
self._embed_interp.allocate_tensors()
self._embed_input = self._embed_interp.get_input_details()[0]
self._embed_output = self._embed_interp.get_output_details()[0]
logger.info(
"FaceNet ready: input %s, output %s",
self._embed_input["shape"],
self._embed_output["shape"],
)
# SQLite DB
self._db_path = str(db_path)
self._db = sqlite3.connect(self._db_path, check_same_thread=False)
self._db.execute(
"""CREATE TABLE IF NOT EXISTS faces (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
embedding BLOB NOT NULL,
enrolled_at REAL NOT NULL,
source TEXT
)"""
)
self._db.execute(
"CREATE INDEX IF NOT EXISTS idx_faces_name ON faces(name)"
)
self._db.commit()
# Load embedding cache
self._cache = [] # list of (name, embedding_array)
self._reload_cache()
logger.info("Face DB: %d embeddings loaded", len(self._cache))
def _reload_cache(self):
rows = self._db.execute("SELECT name, embedding FROM faces").fetchall()
cache = []
for name, blob in rows:
emb = np.frombuffer(blob, dtype=np.float32).copy()
if len(emb) == EMBEDDING_DIM:
cache.append((name, emb))
self._cache = cache
def _detect_face(self, image):
"""Run face detection on Coral. Returns best face bbox (y1,x1,y2,x2 in pixels) or None."""
h, w = image.shape[:2]
inp_h, inp_w = self._face_input["shape"][1:3]
resized = cv2.resize(image, (inp_w, inp_h))
if resized.dtype != np.uint8:
resized = resized.astype(np.uint8)
self._face_interp.set_tensor(
self._face_input["index"], resized[np.newaxis]
)
self._face_interp.invoke()
# Parse outputs: boxes [1,50,4], classes [1,50], scores [1,50], count [1]
boxes = self._face_interp.get_tensor(self._face_outputs[0]["index"])[0]
scores = self._face_interp.get_tensor(self._face_outputs[2]["index"])[0]
count = int(
self._face_interp.get_tensor(self._face_outputs[3]["index"])[0]
)
best_score = 0.0
best_box = None
for i in range(min(count, len(scores))):
if scores[i] >= FACE_DETECT_THRESHOLD and scores[i] > best_score:
best_score = scores[i]
# boxes are [ymin, xmin, ymax, xmax] normalized 0-1
ymin, xmin, ymax, xmax = boxes[i]
best_box = (
max(0, int(ymin * h)),
max(0, int(xmin * w)),
min(h, int(ymax * h)),
min(w, int(xmax * w)),
)
return best_box, best_score
def _compute_embedding(self, face_image):
"""Compute 512-dim embedding from a face crop. Returns numpy array."""
inp_h, inp_w = self._embed_input["shape"][1:3]
resized = cv2.resize(face_image, (inp_w, inp_h))
# FaceNet preprocessing: normalize to [-1, 1]
normalized = (resized.astype(np.float32) / 127.5) - 1.0
self._embed_interp.set_tensor(
self._embed_input["index"], normalized[np.newaxis]
)
self._embed_interp.invoke()
return self._embed_interp.get_tensor(self._embed_output["index"])[0].copy()
def _match_embedding(self, embedding):
"""Match embedding against DB. Returns (name, confidence) or (None, 0.0)."""
cache = self._cache # snapshot reference
if not cache:
return None, 0.0
# Cosine similarity (embeddings are L2-normalized, so dot product works)
best_scores = {} # name -> best score
for name, stored_emb in cache:
score = float(np.dot(embedding, stored_emb))
if name not in best_scores or score > best_scores[name]:
best_scores[name] = score
if not best_scores:
return None, 0.0
best_name = max(best_scores, key=best_scores.get)
best_conf = best_scores[best_name]
if best_conf >= RECOGNITION_THRESHOLD:
return best_name, best_conf
return None, best_conf
def process_frame(self, rgb_frame, person_detections):
"""Process an RGB frame with person detections, return face recognition results.
Args:
rgb_frame: BGR numpy array from OAK-D (H, W, 3)
person_detections: list of depthai detection objects with
xmin/ymin/xmax/ymax (normalized 0-1)
Returns:
list of dicts (same order as person_detections):
{recognized_name: str|None, recognition_confidence: float|None}
"""
h, w = rgb_frame.shape[:2]
results = []
for det in person_detections:
# Crop upper 40% of person bbox (head + shoulders)
px1 = max(0, int(det.xmin * w))
py1 = max(0, int(det.ymin * h))
px2 = min(w, int(det.xmax * w))
py2 = min(h, int(det.ymax * h))
bbox_h = py2 - py1
upper_y2 = py1 + int(bbox_h * 0.4)
# Add 10% horizontal padding
pad_x = int((px2 - px1) * 0.1)
crop_x1 = max(0, px1 - pad_x)
crop_x2 = min(w, px2 + pad_x)
crop = rgb_frame[py1:upper_y2, crop_x1:crop_x2]
if crop.size == 0:
results.append({"recognized_name": None, "recognition_confidence": None})
continue
# Face detection on Coral
face_box, face_score = self._detect_face(crop)
if face_box is None:
results.append({"recognized_name": None, "recognition_confidence": None})
continue
# Crop face and compute embedding
fy1, fx1, fy2, fx2 = face_box
face_crop = crop[fy1:fy2, fx1:fx2]
if face_crop.size == 0:
results.append({"recognized_name": None, "recognition_confidence": None})
continue
embedding = self._compute_embedding(face_crop)
name, confidence = self._match_embedding(embedding)
results.append({
"recognized_name": name,
"recognition_confidence": round(confidence, 3),
})
return results
def enroll(self, name, image):
"""Detect face in image, compute embedding, store in DB.
Args:
name: person's name
image: BGR numpy array containing a face
Returns:
dict with success status and embedding count
"""
face_box, face_score = self._detect_face(image)
if face_box is None:
return {"success": False, "error": "No face detected in image"}
fy1, fx1, fy2, fx2 = face_box
face_crop = image[fy1:fy2, fx1:fx2]
if face_crop.size == 0:
return {"success": False, "error": "Face crop is empty"}
embedding = self._compute_embedding(face_crop)
with self._lock:
self._db.execute(
"INSERT INTO faces (name, embedding, enrolled_at, source) VALUES (?, ?, ?, ?)",
(name, embedding.tobytes(), time.time(), "api"),
)
self._db.commit()
self._reload_cache()
count = sum(1 for n, _ in self._cache if n == name)
logger.info("Enrolled face for '%s' (score=%.2f), %d total embeddings", name, face_score, count)
return {"success": True, "name": name, "embedding_count": count}
def list_faces(self):
"""Return list of enrolled names with embedding counts."""
rows = self._db.execute(
"SELECT name, COUNT(*) as cnt, MIN(enrolled_at) as first "
"FROM faces GROUP BY name ORDER BY name"
).fetchall()
return [
{"name": r[0], "embedding_count": r[1], "enrolled_at": r[2]}
for r in rows
]
def delete_face(self, name):
"""Remove all embeddings for a name."""
with self._lock:
cur = self._db.execute("DELETE FROM faces WHERE name = ?", (name,))
self._db.commit()
self._reload_cache()
deleted = cur.rowcount
logger.info("Deleted %d embeddings for '%s'", deleted, name)
return {"success": deleted > 0, "name": name, "deleted": deleted}
def close(self):
"""Close DB connection."""
self._db.close()