""" Multi-speaker tracking — track up to 2 speakers simultaneously. Uses the XVF3800's fixed beam mode to lock beams onto detected speakers. Each array has 2 steerable beams, so with 2 arrays we can independently track 2 speakers from different directions. Flow: 1. Auto mode: free-running beams track the loudest source (default) 2. When 2 distinct DoA angles are detected: switch to fixed beam mode 3. Beam 1 → speaker A direction, Beam 2 → speaker B direction 4. Track which speaker is active, feed to spatial tracker + speaker ID 5. When only 1 speaker remains, release back to auto mode """ import logging import math import time import threading from typing import Optional logger = logging.getLogger("headmic.multispeaker") # How different two DoA angles must be to count as separate speakers (degrees) MIN_SPEAKER_SEPARATION = 30.0 # How long a speaker position must be stable before locking a beam (seconds) LOCK_DELAY = 1.0 # How long after a speaker goes silent before releasing the beam (seconds) RELEASE_DELAY = 3.0 # Maximum number of tracked speakers MAX_SPEAKERS = 2 class TrackedSpeaker: """A speaker position being tracked.""" def __init__(self, angle: float, side: str): self.angle = angle # degrees, smoothed self.side = side # "left" or "right" (which ear detected) self.first_seen = time.monotonic() self.last_seen = time.monotonic() self.active = True # currently producing speech self.beam_locked = False # beam steered to this position self.speaker_name: Optional[str] = None # from speaker ID def update_angle(self, angle: float, alpha: float = 0.3): """Smooth angle update.""" # Handle circular wraparound diff = angle - self.angle if diff > 180: diff -= 360 elif diff < -180: diff += 360 self.angle = (self.angle + alpha * diff) % 360 self.last_seen = time.monotonic() self.active = True @property def age(self) -> float: return time.monotonic() - self.first_seen @property def silence_duration(self) -> float: return time.monotonic() - self.last_seen @property def stable(self) -> bool: """Has this speaker been present long enough to lock a beam?""" return self.age >= LOCK_DELAY @property def expired(self) -> bool: """Has this speaker been silent long enough to release?""" return self.silence_duration >= RELEASE_DELAY class MultiSpeakerTracker: """Track multiple speakers and manage beam steering.""" def __init__(self, xvf_manager, audio_stream=None): self.xvf = xvf_manager self.audio_stream = audio_stream # for cocktail party focus control self.speakers: list[TrackedSpeaker] = [] self.fixed_mode = False self.target_speaker_idx: int = 0 # which speaker is the "target" (0 or 1) self._lock = threading.Lock() def update(self, doa: dict) -> dict: """ Process DoA from both arrays and update speaker tracking. Args: doa: {"left": {"angle": N, "vad": bool}, "right": {"angle": N, "vad": bool}} Returns: {"speakers": [...], "beam_mode": "auto"|"fixed", "active_count": N} """ with self._lock: # Collect active DoA readings active_readings = [] for side in ("left", "right"): d = doa.get(side) if d and d.get("vad"): active_readings.append({"angle": d["angle"], "side": side}) # Mark all speakers as potentially inactive this frame for s in self.speakers: s.active = False # Match readings to existing speakers or create new ones for reading in active_readings: matched = self._match_speaker(reading["angle"]) if matched: matched.update_angle(reading["angle"]) elif len(self.speakers) < MAX_SPEAKERS: new_speaker = TrackedSpeaker(reading["angle"], reading["side"]) self.speakers.append(new_speaker) logger.info("New speaker detected at %.0f° (%s side)", reading["angle"], reading["side"]) # Remove expired speakers expired = [s for s in self.speakers if s.expired] for s in expired: logger.info("Speaker at %.0f° expired (silent %.1fs)", s.angle, s.silence_duration) self.speakers = [s for s in self.speakers if not s.expired] # Manage beam steering self._manage_beams() return self._get_state() def _match_speaker(self, angle: float) -> Optional[TrackedSpeaker]: """Find an existing speaker close to this angle.""" best = None best_dist = MIN_SPEAKER_SEPARATION for s in self.speakers: dist = abs(angle - s.angle) % 360 dist = min(dist, 360 - dist) if dist < best_dist: best = s best_dist = dist return best def _manage_beams(self): """Switch between auto and fixed beam mode based on speaker count. Also manages audio focus for cocktail party filtering.""" stable_speakers = [s for s in self.speakers if s.stable] if len(stable_speakers) >= 2 and not self.fixed_mode: # Two speakers detected — lock beams s1, s2 = stable_speakers[0], stable_speakers[1] logger.info("Locking beams: speaker 1 at %.0f°, speaker 2 at %.0f°", s1.angle, s2.angle) self.xvf.steer_beams(s1.angle, s2.angle) s1.beam_locked = True s2.beam_locked = True self.fixed_mode = True # Focus audio on the target speaker's side self._update_audio_focus() elif len(stable_speakers) >= 2 and self.fixed_mode: # Update beam directions if speakers moved s1, s2 = stable_speakers[0], stable_speakers[1] self.xvf.steer_beams(s1.angle, s2.angle) # If the non-target speaker starts talking and target is silent, # auto-switch target to the active one target = stable_speakers[self.target_speaker_idx] other_idx = 1 - self.target_speaker_idx other = stable_speakers[other_idx] if other.active and not target.active and target.silence_duration > 1.0: self.target_speaker_idx = other_idx logger.info("Attention shifted to speaker %d at %.0f°", other_idx + 1, other.angle) self._update_audio_focus() elif len(stable_speakers) < 2 and self.fixed_mode: # Back to single speaker or silence — release beams logger.info("Releasing beams — back to auto mode") self.xvf.release_beams() self.fixed_mode = False self.target_speaker_idx = 0 for s in self.speakers: s.beam_locked = False # Release audio focus if self.audio_stream: self.audio_stream.focus_side = None def _update_audio_focus(self): """Set the audio stream to focus on the target speaker's side.""" if not self.audio_stream or not self.speakers: return stable = [s for s in self.speakers if s.stable] if self.target_speaker_idx < len(stable): target = stable[self.target_speaker_idx] self.audio_stream.focus_side = target.side logger.debug("Audio focus: %s (speaker at %.0f°)", target.side, target.angle) def _get_state(self) -> dict: return { "speakers": [ { "index": i, "angle": round(s.angle, 1), "side": s.side, "active": s.active, "beam_locked": s.beam_locked, "is_target": i == self.target_speaker_idx, "age_seconds": round(s.age, 1), "silence_seconds": round(s.silence_duration, 1), "speaker_name": s.speaker_name, } for i, s in enumerate(self.speakers) ], "beam_mode": "fixed" if self.fixed_mode else "auto", "target_speaker": self.target_speaker_idx, "audio_focus": self.audio_stream.focus_side if self.audio_stream else None, "active_count": sum(1 for s in self.speakers if s.active), "total_tracked": len(self.speakers), } def set_speaker_name(self, angle: float, name: str): """Associate a speaker ID name with the nearest tracked speaker.""" with self._lock: speaker = self._match_speaker(angle) if speaker: speaker.speaker_name = name