Add multi-speaker tracking with beam steering (#5)

multi_speaker.py: Tracks up to 2 speakers simultaneously. When 2 distinct
DoA angles are detected (30°+ apart) for >1s, locks the XVF3800's fixed
beams onto each speaker. Releases back to auto mode when only 1 speaker
remains (3s timeout). Manages beam gating so only the speaking beam is active.

xvf3800.py: Added beam steering commands — enable_fixed_beams(),
set_beam_azimuths(), enable_beam_gating(), read_all_beams().
Manager gets steer_beams() and release_beams() convenience methods.

headmic.py: Wire multi-speaker tracker into DoA loop. New endpoint:
GET /speakers/tracked — current speaker positions, beam mode, lock state.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alex
2026-04-12 21:37:49 -05:00
parent 02d3ac3816
commit 38d21ef53c
3 changed files with 280 additions and 1 deletions

View File

@@ -161,6 +161,7 @@ class ServiceState:
self.doa: dict = {} # latest DoA from both arrays
self.spatial: Optional[dict] = None # triangulated position + gaze
self.last_anomaly: Optional[dict] = None # last spatial anomaly detected
self.multi_speaker: Optional[dict] = None # multi-speaker tracking state
state = ServiceState()
@@ -177,6 +178,9 @@ binaural_recorder = None
# Spatial scene
spatial_scene = None
# Multi-speaker tracker
multi_speaker = None
enrollment_name = None
# Audio stream
@@ -439,6 +443,10 @@ def doa_track_loop():
try:
state.doa = xvf_manager.read_both_doa()
# Multi-speaker tracking (beam steering)
if multi_speaker:
state.multi_speaker = multi_speaker.update(state.doa)
if spatial_tracker and dual_stream:
left_energy = dual_stream.left.get_energy() if dual_stream.left else 0.0
right_energy = dual_stream.right.get_energy() if dual_stream.right else 0.0
@@ -496,7 +504,7 @@ app = FastAPI(title="HeadMic", description="Vixy's Ears 🦊👂 (Dual XVF3800)"
@app.on_event("startup")
async def startup():
global sound_classifier, sound_ring_buffer, speaker_recognizer, dual_stream, LEDS_AVAILABLE, spatial_tracker, binaural_recorder, spatial_scene
global sound_classifier, sound_ring_buffer, speaker_recognizer, dual_stream, LEDS_AVAILABLE, spatial_tracker, binaural_recorder, spatial_scene, multi_speaker
state.running = True
@@ -577,6 +585,12 @@ async def startup():
spatial_scene = SpatialScene()
spatial_scene.start()
# --- Multi-speaker tracking ---
if xvf_manager.left or xvf_manager.right:
from multi_speaker import MultiSpeakerTracker
multi_speaker = MultiSpeakerTracker(xvf_manager)
logger.info("Multi-speaker tracking enabled (2 beams per array)")
# --- Binaural recording ---
if os.environ.get("BINAURAL_RECORD", "").lower() in ("1", "true", "yes"):
from binaural_recorder import BinauralRecorder
@@ -596,6 +610,8 @@ async def startup():
async def shutdown():
state.running = False
leds_off()
if multi_speaker and xvf_manager:
xvf_manager.release_beams()
if spatial_scene:
spatial_scene.stop()
if binaural_recorder:
@@ -666,6 +682,16 @@ async def doa():
}
# --- Multi-speaker ---
@app.get("/speakers/tracked")
async def tracked_speakers():
"""Currently tracked speaker positions and beam state."""
if not state.multi_speaker:
return {"speakers": [], "beam_mode": "auto", "active_count": 0, "total_tracked": 0}
return state.multi_speaker
# --- Spatial scene ---
@app.get("/scene")

197
multi_speaker.py Normal file
View File

@@ -0,0 +1,197 @@
"""
Multi-speaker tracking — track up to 2 speakers simultaneously.
Uses the XVF3800's fixed beam mode to lock beams onto detected speakers.
Each array has 2 steerable beams, so with 2 arrays we can independently
track 2 speakers from different directions.
Flow:
1. Auto mode: free-running beams track the loudest source (default)
2. When 2 distinct DoA angles are detected: switch to fixed beam mode
3. Beam 1 → speaker A direction, Beam 2 → speaker B direction
4. Track which speaker is active, feed to spatial tracker + speaker ID
5. When only 1 speaker remains, release back to auto mode
"""
import logging
import math
import time
import threading
from typing import Optional
logger = logging.getLogger("headmic.multispeaker")
# How different two DoA angles must be to count as separate speakers (degrees)
MIN_SPEAKER_SEPARATION = 30.0
# How long a speaker position must be stable before locking a beam (seconds)
LOCK_DELAY = 1.0
# How long after a speaker goes silent before releasing the beam (seconds)
RELEASE_DELAY = 3.0
# Maximum number of tracked speakers
MAX_SPEAKERS = 2
class TrackedSpeaker:
"""A speaker position being tracked."""
def __init__(self, angle: float, side: str):
self.angle = angle # degrees, smoothed
self.side = side # "left" or "right" (which ear detected)
self.first_seen = time.monotonic()
self.last_seen = time.monotonic()
self.active = True # currently producing speech
self.beam_locked = False # beam steered to this position
self.speaker_name: Optional[str] = None # from speaker ID
def update_angle(self, angle: float, alpha: float = 0.3):
"""Smooth angle update."""
# Handle circular wraparound
diff = angle - self.angle
if diff > 180:
diff -= 360
elif diff < -180:
diff += 360
self.angle = (self.angle + alpha * diff) % 360
self.last_seen = time.monotonic()
self.active = True
@property
def age(self) -> float:
return time.monotonic() - self.first_seen
@property
def silence_duration(self) -> float:
return time.monotonic() - self.last_seen
@property
def stable(self) -> bool:
"""Has this speaker been present long enough to lock a beam?"""
return self.age >= LOCK_DELAY
@property
def expired(self) -> bool:
"""Has this speaker been silent long enough to release?"""
return self.silence_duration >= RELEASE_DELAY
class MultiSpeakerTracker:
"""Track multiple speakers and manage beam steering."""
def __init__(self, xvf_manager):
self.xvf = xvf_manager
self.speakers: list[TrackedSpeaker] = []
self.fixed_mode = False
self._lock = threading.Lock()
def update(self, doa: dict) -> dict:
"""
Process DoA from both arrays and update speaker tracking.
Args:
doa: {"left": {"angle": N, "vad": bool}, "right": {"angle": N, "vad": bool}}
Returns:
{"speakers": [...], "beam_mode": "auto"|"fixed", "active_count": N}
"""
with self._lock:
# Collect active DoA readings
active_readings = []
for side in ("left", "right"):
d = doa.get(side)
if d and d.get("vad"):
active_readings.append({"angle": d["angle"], "side": side})
# Mark all speakers as potentially inactive this frame
for s in self.speakers:
s.active = False
# Match readings to existing speakers or create new ones
for reading in active_readings:
matched = self._match_speaker(reading["angle"])
if matched:
matched.update_angle(reading["angle"])
elif len(self.speakers) < MAX_SPEAKERS:
new_speaker = TrackedSpeaker(reading["angle"], reading["side"])
self.speakers.append(new_speaker)
logger.info("New speaker detected at %.0f° (%s side)",
reading["angle"], reading["side"])
# Remove expired speakers
expired = [s for s in self.speakers if s.expired]
for s in expired:
logger.info("Speaker at %.0f° expired (silent %.1fs)",
s.angle, s.silence_duration)
self.speakers = [s for s in self.speakers if not s.expired]
# Manage beam steering
self._manage_beams()
return self._get_state()
def _match_speaker(self, angle: float) -> Optional[TrackedSpeaker]:
"""Find an existing speaker close to this angle."""
best = None
best_dist = MIN_SPEAKER_SEPARATION
for s in self.speakers:
dist = abs(angle - s.angle) % 360
dist = min(dist, 360 - dist)
if dist < best_dist:
best = s
best_dist = dist
return best
def _manage_beams(self):
"""Switch between auto and fixed beam mode based on speaker count."""
stable_speakers = [s for s in self.speakers if s.stable]
if len(stable_speakers) >= 2 and not self.fixed_mode:
# Two speakers detected — lock beams
s1, s2 = stable_speakers[0], stable_speakers[1]
logger.info("Locking beams: speaker 1 at %.0f°, speaker 2 at %.0f°",
s1.angle, s2.angle)
self.xvf.steer_beams(s1.angle, s2.angle)
s1.beam_locked = True
s2.beam_locked = True
self.fixed_mode = True
elif len(stable_speakers) >= 2 and self.fixed_mode:
# Update beam directions if speakers moved
s1, s2 = stable_speakers[0], stable_speakers[1]
self.xvf.steer_beams(s1.angle, s2.angle)
elif len(stable_speakers) < 2 and self.fixed_mode:
# Back to single speaker or silence — release beams
logger.info("Releasing beams — back to auto mode")
self.xvf.release_beams()
self.fixed_mode = False
for s in self.speakers:
s.beam_locked = False
def _get_state(self) -> dict:
return {
"speakers": [
{
"angle": round(s.angle, 1),
"side": s.side,
"active": s.active,
"beam_locked": s.beam_locked,
"age_seconds": round(s.age, 1),
"silence_seconds": round(s.silence_duration, 1),
"speaker_name": s.speaker_name,
}
for s in self.speakers
],
"beam_mode": "fixed" if self.fixed_mode else "auto",
"active_count": sum(1 for s in self.speakers if s.active),
"total_tracked": len(self.speakers),
}
def set_speaker_name(self, angle: float, name: str):
"""Associate a speaker ID name with the nearest tracked speaker."""
with self._lock:
speaker = self._match_speaker(angle)
if speaker:
speaker.speaker_name = name

View File

@@ -40,8 +40,15 @@ LED_EFFECT_CMD = 12 # 0=off, 1=breath, 2=rainbow, 3=solid, 4=doa, 5=ring
AEC_RESID = 33
AEC_AZIMUTH_CMD = 75 # 4 floats: beam1, beam2, free-running, auto-select (radians)
AEC_SPENERGY_CMD = 80 # 4 floats: speech energy per beam (>0 = speech)
# Beam steering (resid=33)
AEC_FIXED_BEAMS_ON_CMD = 37 # rw uint8: 0=off, 1=on — enable fixed beam mode
AEC_FIXED_BEAMS_AZ_CMD = 81 # rw 2 floats (radians): beam 1 azimuth, beam 2 azimuth
AEC_FIXED_BEAMS_ELEV_CMD = 82 # rw 2 floats (radians): beam 1 elevation, beam 2 elevation
AEC_FIXED_BEAMS_GATING_CMD = 83 # rw uint8: 0=off, 1=on — silence beams without speech
AUDIO_MGR_RESID = 35
AUDIO_MGR_SELECTED_AZ_CMD = 11 # 2 floats: processed DoA, auto-select DoA (radians)
AUDIO_MGR_SELECTED_CH_CMD = 12 # rw 2 uint8: which beam goes to L/R output channel
LED_BRIGHTNESS_CMD = 14
LED_COLOR_CMD = 16 # single uint32 color (confirmed: xvf_host LED_COLOR cmdid=16)
LED_DOA_COLOR_CMD = 17 # two uint32 values: base + indicator
@@ -129,6 +136,41 @@ class XVF3800:
return int(angle_deg) % 360, vad
def read_all_beams(self) -> dict:
"""Read all 4 beam azimuths: beam1, beam2, free-running, auto-select."""
import math
data = self._read_float(AEC_RESID, AEC_AZIMUTH_CMD, 4)
if len(data) < 17: # 1 status + 4*4 bytes
return {}
beams = struct.unpack_from("<ffff", data, 1)
return {
"beam1_deg": round(math.degrees(beams[0]) % 360, 1),
"beam2_deg": round(math.degrees(beams[1]) % 360, 1),
"free_running_deg": round(math.degrees(beams[2]) % 360, 1),
"auto_select_deg": round(math.degrees(beams[3]) % 360, 1),
}
# --- Beam steering ---
def enable_fixed_beams(self, on: bool = True):
"""Enable/disable fixed beam mode. When on, beams lock to set azimuths."""
self._write(AEC_RESID, AEC_FIXED_BEAMS_ON_CMD, bytes([1 if on else 0]))
def set_beam_azimuths(self, beam1_deg: float, beam2_deg: float):
"""Set fixed beam directions in degrees (0=front, 90=right, 180=back, 270=left)."""
import math
b1_rad = math.radians(beam1_deg)
b2_rad = math.radians(beam2_deg)
self._write(AEC_RESID, AEC_FIXED_BEAMS_AZ_CMD, struct.pack("<ff", b1_rad, b2_rad))
def enable_beam_gating(self, on: bool = True):
"""Enable/disable beam gating. When on, only the beam with speech is active."""
self._write(AEC_RESID, AEC_FIXED_BEAMS_GATING_CMD, bytes([1 if on else 0]))
def disable_fixed_beams(self):
"""Return to auto beam mode."""
self.enable_fixed_beams(False)
# --- LEDs ---
def led_off(self):
@@ -280,6 +322,20 @@ class XVF3800Manager:
if dev:
dev.led_doa()
def steer_beams(self, beam1_deg: float, beam2_deg: float):
"""Steer both arrays' fixed beams to the same directions."""
for dev in [self.left, self.right]:
if dev:
dev.enable_fixed_beams(True)
dev.set_beam_azimuths(beam1_deg, beam2_deg)
dev.enable_beam_gating(True)
def release_beams(self):
"""Return both arrays to auto beam mode."""
for dev in [self.left, self.right]:
if dev:
dev.disable_fixed_beams()
def read_both_doa(self) -> dict:
"""Read DoA from both arrays."""
result = {}