""" Face Recognition Module for OAK-D Vision Service Coral Edge TPU for face detection + CPU FaceNet for embeddings + SQLite DB """ import sqlite3 import threading import time import logging from pathlib import Path import ai_edge_litert.interpreter as tfl import cv2 import numpy as np logger = logging.getLogger("face_recognition") logger.setLevel(logging.INFO) FACE_DETECT_THRESHOLD = 0.5 RECOGNITION_THRESHOLD = 0.5 EMBEDDING_DIM = 512 MIN_CROP_SIZE = 80 # minimum pixels in both dimensions for face detection FACE_DETECT_INTERVAL = 2.0 # seconds between face detection runs in process_frame class FaceRecognizer: def __init__(self, face_model_path, embed_model_path, db_path="faces.db", coral_device=2): self._lock = threading.Lock() # Coral face detector (device index avoids conflict with YAMNet on :0 and pose on :1) logger.info("Loading face detection model on Edge TPU (device :%d)...", coral_device) delegate = tfl.load_delegate("libedgetpu.so.1", options={"device": f":{coral_device}"}) self._face_interp = tfl.Interpreter( model_path=str(face_model_path), experimental_delegates=[delegate], ) self._face_interp.allocate_tensors() self._face_input = self._face_interp.get_input_details()[0] self._face_outputs = self._face_interp.get_output_details() # Log output tensor details to determine correct index mapping for i, o in enumerate(self._face_outputs): logger.info("Face detector output[%d]: name=%s shape=%s", i, o["name"], o["shape"]) # Determine score tensor index: run a test inference to find which # [1,N] tensor has non-zero values (scores) vs all-zeros (class IDs) inp_shape = self._face_input["shape"] test_input = np.zeros(inp_shape, dtype=self._face_input["dtype"]) self._face_interp.set_tensor(self._face_input["index"], test_input) self._face_interp.invoke() # Output 0 is boxes [1,N,4], output 3 is count [1] # Outputs 1 and 2 are scores and classes (order varies by model) t1 = self._face_interp.get_tensor(self._face_outputs[1]["index"]) t2 = self._face_interp.get_tensor(self._face_outputs[2]["index"]) # For a blank image: scores should be low but potentially non-zero, # while class IDs for a single-class model are always 0.0 # Use output name as primary signal if available self._score_output_idx = 2 # default for i in (1, 2): name = self._face_outputs[i].get("name", "").lower() if "score" in name: self._score_output_idx = i break if "class" in name: self._score_output_idx = 2 if i == 1 else 1 break else: # No name match — use heuristic: pick the one with higher variance if np.std(t1) > np.std(t2): self._score_output_idx = 1 else: self._score_output_idx = 2 logger.info("Face detector: using output[%d] as scores", self._score_output_idx) logger.info( "Face detector ready: input %s %s", self._face_input["shape"], self._face_input["dtype"], ) # CPU FaceNet embedder logger.info("Loading FaceNet embedding model on CPU...") self._embed_interp = tfl.Interpreter(model_path=str(embed_model_path)) self._embed_interp.allocate_tensors() self._embed_input = self._embed_interp.get_input_details()[0] self._embed_output = self._embed_interp.get_output_details()[0] logger.info( "FaceNet ready: input %s, output %s", self._embed_input["shape"], self._embed_output["shape"], ) # SQLite DB self._db_path = str(db_path) self._db = sqlite3.connect(self._db_path, check_same_thread=False) self._db.execute( """CREATE TABLE IF NOT EXISTS faces ( id INTEGER PRIMARY KEY, name TEXT NOT NULL, embedding BLOB NOT NULL, enrolled_at REAL NOT NULL, source TEXT )""" ) self._db.execute( "CREATE INDEX IF NOT EXISTS idx_faces_name ON faces(name)" ) self._db.commit() # Load embedding cache self._cache = [] # list of (name, embedding_array) self._reload_cache() logger.info("Face DB: %d embeddings loaded", len(self._cache)) # Throttling for process_frame self._last_face_detect_time = 0.0 self._last_face_results = [] # cached results from last detection def _reload_cache(self): rows = self._db.execute("SELECT name, embedding FROM faces").fetchall() cache = [] for name, blob in rows: emb = np.frombuffer(blob, dtype=np.float32).copy() if len(emb) == EMBEDDING_DIM: cache.append((name, emb)) self._cache = cache def _detect_face(self, image): """Run face detection on Coral. Returns best face bbox (y1,x1,y2,x2 in pixels) or None.""" h, w = image.shape[:2] # Skip if crop is too small for reliable face detection if h < MIN_CROP_SIZE or w < MIN_CROP_SIZE: logger.debug("detect_face: skipping %dx%d crop (too small)", w, h) return None, 0.0 inp_h, inp_w = self._face_input["shape"][1:3] # Letterbox resize: preserve aspect ratio, pad with black scale = min(inp_w / w, inp_h / h) new_w = int(w * scale) new_h = int(h * scale) resized = cv2.resize(image, (new_w, new_h)) # Create padded input padded = np.zeros((inp_h, inp_w, 3), dtype=np.uint8) pad_y = (inp_h - new_h) // 2 pad_x = (inp_w - new_w) // 2 padded[pad_y:pad_y + new_h, pad_x:pad_x + new_w] = resized self._face_interp.set_tensor( self._face_input["index"], padded[np.newaxis] ) self._face_interp.invoke() boxes = self._face_interp.get_tensor(self._face_outputs[0]["index"])[0] scores = self._face_interp.get_tensor(self._face_outputs[self._score_output_idx]["index"])[0] count = int( self._face_interp.get_tensor(self._face_outputs[3]["index"])[0] ) best_score = 0.0 best_box = None for i in range(min(count, len(scores))): if scores[i] >= FACE_DETECT_THRESHOLD and scores[i] > best_score: best_score = scores[i] # boxes are [ymin, xmin, ymax, xmax] normalized 0-1 # Map back from letterboxed coords to original image coords ymin, xmin, ymax, xmax = boxes[i] # Convert from padded coords to original orig_y1 = max(0, int((ymin * inp_h - pad_y) / scale)) orig_x1 = max(0, int((xmin * inp_w - pad_x) / scale)) orig_y2 = min(h, int((ymax * inp_h - pad_y) / scale)) orig_x2 = min(w, int((xmax * inp_w - pad_x) / scale)) best_box = (orig_y1, orig_x1, orig_y2, orig_x2) if best_box is not None: logger.debug("detect_face: %dx%d -> face at score=%.2f", w, h, best_score) return best_box, best_score def _compute_embedding(self, face_image): """Compute 512-dim embedding from a face crop. Returns numpy array.""" inp_h, inp_w = self._embed_input["shape"][1:3] resized = cv2.resize(face_image, (inp_w, inp_h)) # FaceNet preprocessing: normalize to [-1, 1] normalized = (resized.astype(np.float32) / 127.5) - 1.0 self._embed_interp.set_tensor( self._embed_input["index"], normalized[np.newaxis] ) self._embed_interp.invoke() return self._embed_interp.get_tensor(self._embed_output["index"])[0].copy() def _match_embedding(self, embedding): """Match embedding against DB. Returns (name, confidence) or (None, 0.0).""" cache = self._cache # snapshot reference if not cache: return None, 0.0 # Cosine similarity (embeddings are L2-normalized, so dot product works) best_scores = {} # name -> best score for name, stored_emb in cache: score = float(np.dot(embedding, stored_emb)) if name not in best_scores or score > best_scores[name]: best_scores[name] = score if not best_scores: return None, 0.0 best_name = max(best_scores, key=best_scores.get) best_conf = best_scores[best_name] if best_conf >= RECOGNITION_THRESHOLD: return best_name, best_conf return None, best_conf def process_frame(self, rgb_frame, person_detections): """Process an RGB frame with person detections, return face recognition results. Args: rgb_frame: BGR numpy array from OAK-D (H, W, 3) person_detections: list of depthai detection objects with xmin/ymin/xmax/ymax (normalized 0-1) Returns: list of dicts (same order as person_detections): {recognized_name: str|None, recognition_confidence: float|None} """ now = time.monotonic() if now - self._last_face_detect_time < FACE_DETECT_INTERVAL: return self._last_face_results self._last_face_detect_time = now h, w = rgb_frame.shape[:2] results = [] for det in person_detections: # Crop upper 50% of person bbox as a roughly square region px1 = max(0, int(det.xmin * w)) py1 = max(0, int(det.ymin * h)) px2 = min(w, int(det.xmax * w)) py2 = min(h, int(det.ymax * h)) bbox_w = px2 - px1 bbox_h = py2 - py1 upper_h = int(bbox_h * 0.5) # Make crop roughly square: if width >> height, narrow it # Center the crop horizontally on the person bbox crop_h = upper_h crop_w = max(bbox_w, upper_h) # at least as wide as tall if bbox_w > upper_h * 2: # Very wide bbox — narrow to ~1.5x the height, centered crop_w = int(upper_h * 1.5) cx = (px1 + px2) // 2 crop_x1 = max(0, cx - crop_w // 2) crop_x2 = min(w, cx + crop_w // 2) crop = rgb_frame[py1:py1 + crop_h, crop_x1:crop_x2] if crop.size == 0: results.append({"recognized_name": None, "recognition_confidence": None}) continue # Face detection on Coral face_box, face_score = self._detect_face(crop) if face_box is None: results.append({"recognized_name": None, "recognition_confidence": None}) continue # Crop face and compute embedding fy1, fx1, fy2, fx2 = face_box face_crop = crop[fy1:fy2, fx1:fx2] if face_crop.size == 0: results.append({"recognized_name": None, "recognition_confidence": None}) continue embedding = self._compute_embedding(face_crop) name, confidence = self._match_embedding(embedding) results.append({ "recognized_name": name, "recognition_confidence": round(confidence, 3), }) self._last_face_results = results return results def enroll(self, name, image): """Detect face in image, compute embedding, store in DB. Args: name: person's name image: BGR numpy array containing a face Returns: dict with success status and embedding count """ face_box, face_score = self._detect_face(image) if face_box is None: return {"success": False, "error": "No face detected in image"} fy1, fx1, fy2, fx2 = face_box face_crop = image[fy1:fy2, fx1:fx2] if face_crop.size == 0: return {"success": False, "error": "Face crop is empty"} embedding = self._compute_embedding(face_crop) with self._lock: self._db.execute( "INSERT INTO faces (name, embedding, enrolled_at, source) VALUES (?, ?, ?, ?)", (name, embedding.tobytes(), time.time(), "api"), ) self._db.commit() self._reload_cache() count = sum(1 for n, _ in self._cache if n == name) logger.info("Enrolled face for '%s' (score=%.2f), %d total embeddings", name, face_score, count) return {"success": True, "name": name, "embedding_count": count} def list_faces(self): """Return list of enrolled names with embedding counts.""" rows = self._db.execute( "SELECT name, COUNT(*) as cnt, MIN(enrolled_at) as first " "FROM faces GROUP BY name ORDER BY name" ).fetchall() return [ {"name": r[0], "embedding_count": r[1], "enrolled_at": r[2]} for r in rows ] def delete_face(self, name): """Remove all embeddings for a name.""" with self._lock: cur = self._db.execute("DELETE FROM faces WHERE name = ?", (name,)) self._db.commit() self._reload_cache() deleted = cur.rowcount logger.info("Deleted %d embeddings for '%s'", deleted, name) return {"success": deleted > 0, "name": name, "deleted": deleted} def close(self): """Close DB connection.""" self._db.close()