Files
headmic/speaker_id.py
Alex 05034acd27 Add anonymous speaker tracking (online diarization)
Unrecognized speakers now get stable IDs like "unknown_a7f3" instead
of None. Uses online clustering of Resemblyzer embeddings:
- Matches against tracked anonymous speakers (cosine > 0.70)
- Updates running average embedding on re-identification
- Creates new ID from SHA-256 hash of quantized embedding
- Expires after 1 hour of silence, max 10 tracked simultaneously

New API: POST /speakers/promote?anon_id=unknown_a7f3&name=Alex
Promotes an anonymous speaker to enrolled using their averaged embedding.

Flow: unknown person speaks → "unknown_a7f3" → you ask "who's that?" →
promote to "Bob" → now recognized by name going forward.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 21:58:30 -05:00

228 lines
8.7 KiB
Python

"""
Speaker Identification Module for HeadMic
Resemblyzer GE2E speaker encoder — 256-dim embeddings, cosine similarity matching.
Triggered when YAMNet detects speech.
Supports both enrolled speakers ("Alex") and anonymous tracking ("unknown_a7f3")
via online clustering of unrecognized embeddings.
"""
import hashlib
import logging
import sqlite3
import time
from pathlib import Path
import numpy as np
logger = logging.getLogger("speaker_id")
logger.setLevel(logging.INFO)
SIMILARITY_THRESHOLD = 0.75
ANON_SIMILARITY_THRESHOLD = 0.70 # slightly looser for clustering unknowns
ANON_MAX_TRACKED = 10 # max anonymous speakers to track
ANON_EXPIRY_S = 3600 # forget anonymous speakers after 1 hour of silence
class SpeakerRecognizer:
def __init__(self, db_path="voices.db"):
from resemblyzer import VoiceEncoder
self._encoder = VoiceEncoder("cpu")
logger.info("Resemblyzer voice encoder loaded")
self._db_path = str(db_path)
self._init_db()
self._cache = self._load_embeddings()
# Anonymous speaker tracking: short-lived clustering of unrecognized voices
# Key: "unknown_XXXX", Value: {"embedding": avg_emb, "last_seen": time, "count": N}
self._anon_speakers: dict[str, dict] = {}
logger.info(
"Speaker DB ready: %d embeddings for %d speakers",
sum(len(v) for v in self._cache.values()),
len(self._cache),
)
def _init_db(self):
with sqlite3.connect(self._db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS voices (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
embedding BLOB NOT NULL,
enrolled_at REAL NOT NULL,
source TEXT
)
""")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_voices_name ON voices(name)"
)
def _load_embeddings(self):
"""Load all embeddings from DB into memory, grouped by name."""
cache = {}
with sqlite3.connect(self._db_path) as conn:
rows = conn.execute("SELECT name, embedding FROM voices").fetchall()
for name, blob in rows:
emb = np.frombuffer(blob, dtype=np.float32).copy()
cache.setdefault(name, []).append(emb)
return cache
def identify(self, audio_float32):
"""Identify speaker from float32 audio at 16kHz.
Returns:
(name, confidence) where name is either an enrolled name ("Alex")
or an anonymous tracker ID ("unknown_a7f3"). Returns (None, 0.0)
only if the audio is too short to compute an embedding.
"""
try:
from resemblyzer import preprocess_wav
wav = preprocess_wav(audio_float32, source_sr=16000)
if len(wav) < 1600: # too short
return None, 0.0
embedding = self._encoder.embed_utterance(wav)
except Exception as e:
logger.warning("Embedding computation failed: %s", e)
return None, 0.0
# First: check enrolled speakers
best_name = None
best_score = 0.0
for name, embeddings in self._cache.items():
scores = [np.dot(embedding, emb) for emb in embeddings]
top = max(scores)
if top > best_score:
best_score = top
best_name = name
if best_score >= SIMILARITY_THRESHOLD:
return best_name, round(float(best_score), 3)
# Not enrolled — match or create anonymous speaker
anon_name, anon_score = self._match_anonymous(embedding)
return anon_name, round(float(anon_score), 3)
def _match_anonymous(self, embedding: np.ndarray) -> tuple[str, float]:
"""Match embedding against tracked anonymous speakers, or create new one."""
now = time.time()
# Expire old anonymous speakers
expired = [k for k, v in self._anon_speakers.items()
if now - v["last_seen"] > ANON_EXPIRY_S]
for k in expired:
logger.debug("Anonymous speaker %s expired", k)
del self._anon_speakers[k]
# Find best match among existing anonymous speakers
best_id = None
best_score = 0.0
for anon_id, info in self._anon_speakers.items():
score = float(np.dot(embedding, info["embedding"]))
if score > best_score:
best_score = score
best_id = anon_id
if best_score >= ANON_SIMILARITY_THRESHOLD and best_id:
# Update the running average embedding
info = self._anon_speakers[best_id]
count = info["count"]
# Incremental mean: new_avg = old_avg + (new - old_avg) / (count + 1)
info["embedding"] = info["embedding"] + (embedding - info["embedding"]) / (count + 1)
# Re-normalize (embeddings should be unit vectors)
norm = np.linalg.norm(info["embedding"])
if norm > 0:
info["embedding"] /= norm
info["count"] = count + 1
info["last_seen"] = now
return best_id, best_score
# No match — create new anonymous speaker
if len(self._anon_speakers) >= ANON_MAX_TRACKED:
# Evict the oldest
oldest = min(self._anon_speakers, key=lambda k: self._anon_speakers[k]["last_seen"])
del self._anon_speakers[oldest]
anon_id = self._make_anon_id(embedding)
self._anon_speakers[anon_id] = {
"embedding": embedding.copy(),
"last_seen": now,
"first_seen": now,
"count": 1,
}
logger.info("New anonymous speaker: %s", anon_id)
return anon_id, 0.5 # moderate confidence for first sighting
@staticmethod
def _make_anon_id(embedding: np.ndarray) -> str:
"""Generate a stable short ID from an embedding. Same voice → same ID."""
# Quantize embedding to 8-bit and hash — similar voices get similar hashes
quantized = ((embedding + 1.0) * 127.5).clip(0, 255).astype(np.uint8)
h = hashlib.sha256(quantized.tobytes()).hexdigest()[:4]
return f"unknown_{h}"
def enroll(self, name, audio_float32, source="api"):
"""Enroll a speaker from float32 audio at 16kHz.
Returns:
The computed embedding (256-dim).
"""
from resemblyzer import preprocess_wav
wav = preprocess_wav(audio_float32, source_sr=16000)
if len(wav) < 1600:
raise ValueError("Audio too short for enrollment")
embedding = self._encoder.embed_utterance(wav)
blob = embedding.astype(np.float32).tobytes()
now = time.time()
with sqlite3.connect(self._db_path) as conn:
conn.execute(
"INSERT INTO voices (name, embedding, enrolled_at, source) VALUES (?, ?, ?, ?)",
(name, blob, now, source),
)
self._cache.setdefault(name, []).append(embedding)
logger.info("Enrolled speaker '%s' (source=%s, total=%d samples)", name, source, len(self._cache[name]))
return embedding
def list_speakers(self):
"""Return enrolled speaker names with sample counts."""
result = {name: len(embs) for name, embs in self._cache.items()}
# Include active anonymous speakers
for anon_id, info in self._anon_speakers.items():
result[anon_id] = info["count"]
return result
def promote_anonymous(self, anon_id: str, name: str) -> bool:
"""Promote an anonymous speaker to an enrolled speaker.
Saves their averaged embedding to the database under the given name."""
if anon_id not in self._anon_speakers:
return False
info = self._anon_speakers.pop(anon_id)
embedding = info["embedding"]
blob = embedding.astype(np.float32).tobytes()
now = time.time()
with sqlite3.connect(self._db_path) as conn:
conn.execute(
"INSERT INTO voices (name, embedding, enrolled_at, source) VALUES (?, ?, ?, ?)",
(name, blob, now, "promoted"),
)
self._cache.setdefault(name, []).append(embedding)
logger.info("Promoted %s'%s' (%d observations)", anon_id, name, info["count"])
return True
def delete_speaker(self, name):
"""Remove all embeddings for a speaker."""
with sqlite3.connect(self._db_path) as conn:
conn.execute("DELETE FROM voices WHERE name = ?", (name,))
removed = self._cache.pop(name, None)
if removed:
logger.info("Deleted speaker '%s' (%d samples)", name, len(removed))
return len(removed)
return 0