From 0705b3818b44684e65d649bcf4805cdc20adda2f Mon Sep 17 00:00:00 2001 From: Alex Date: Sun, 12 Apr 2026 21:47:30 -0500 Subject: [PATCH] Add cocktail party spatial filtering (#7) audio_stream.py: Added focus_side property. When set, the stream yields from the focused side regardless of energy (attention lock). When None, falls back to energy-based auto selection. multi_speaker.py: When beams lock onto 2 speakers, sets audio focus to the target speaker's side. Auto-switches target when the current target goes silent and the other starts talking. Manual focus via API. headmic.py: New endpoint POST /speakers/focus?speaker=0|1 to manually switch attention. /speakers/tracked now shows is_target, target_speaker, and audio_focus fields. The cocktail party effect: when 2 people are talking, the audio feed to Porcupine/VAD/transcription comes from the target speaker's direction, suppressing the other. XVF3800 beam gating silences the non-speaking beam, and audio_stream focus locks the ear facing the target. Co-Authored-By: Claude Opus 4.6 (1M context) --- audio_stream.py | 14 +++++++++++--- headmic.py | 17 +++++++++++++++-- multi_speaker.py | 41 ++++++++++++++++++++++++++++++++++++++--- 3 files changed, 64 insertions(+), 8 deletions(-) diff --git a/audio_stream.py b/audio_stream.py index fb2b0c4..5589526 100644 --- a/audio_stream.py +++ b/audio_stream.py @@ -117,6 +117,7 @@ class DualAudioStream: self.left = MicStream("left", left_device) self.right = MicStream("right", right_device) if right_device else None self.active_side: str = "left" + self.focus_side: Optional[str] = None # None=auto (energy), "left"/"right"=locked attention self._running = False def start(self): @@ -162,18 +163,25 @@ class DualAudioStream: last_frame_left = frame_left last_frame_right = frame_right - # Pick best beam + # Pick beam: focused attention overrides energy-based selection if frame_right is None: self.active_side = "left" yield frame_left, "left" + elif self.focus_side: + # Cocktail party mode: locked onto a specific side + self.active_side = self.focus_side + if self.focus_side == "right" and frame_right: + yield frame_right, "right" + else: + yield frame_left, "left" else: + # Auto mode: pick higher-energy side left_energy = self.left.get_energy() right_energy = self.right.get_energy() - if right_energy > left_energy * 1.1: # 10% hysteresis + if right_energy > left_energy * 1.1: self.active_side = "right" elif left_energy > right_energy * 1.1: self.active_side = "left" - # else: keep current active_side (hysteresis prevents flapping) if self.active_side == "right" and frame_right: yield frame_right, "right" diff --git a/headmic.py b/headmic.py index 19b632c..cc1229a 100644 --- a/headmic.py +++ b/headmic.py @@ -588,8 +588,8 @@ async def startup(): # --- Multi-speaker tracking --- if xvf_manager.left or xvf_manager.right: from multi_speaker import MultiSpeakerTracker - multi_speaker = MultiSpeakerTracker(xvf_manager) - logger.info("Multi-speaker tracking enabled (2 beams per array)") + multi_speaker = MultiSpeakerTracker(xvf_manager, audio_stream=dual_stream) + logger.info("Multi-speaker tracking enabled (2 beams per array, cocktail party filtering)") # --- Binaural recording --- if os.environ.get("BINAURAL_RECORD", "").lower() in ("1", "true", "yes"): @@ -692,6 +692,19 @@ async def tracked_speakers(): return state.multi_speaker +@app.post("/speakers/focus") +async def focus_speaker(speaker: int = 0): + """Switch attention to a specific tracked speaker (0 or 1). + In cocktail party mode, the focused speaker's audio feeds wake word + transcription.""" + if not multi_speaker: + raise HTTPException(status_code=503, detail="Multi-speaker tracking not available") + if speaker not in (0, 1): + raise HTTPException(status_code=400, detail="Speaker index must be 0 or 1") + multi_speaker.target_speaker_idx = speaker + multi_speaker._update_audio_focus() + return {"ok": True, "target_speaker": speaker} + + # --- Spatial scene --- @app.get("/scene") diff --git a/multi_speaker.py b/multi_speaker.py index 39626ec..4d807f5 100644 --- a/multi_speaker.py +++ b/multi_speaker.py @@ -80,10 +80,12 @@ class TrackedSpeaker: class MultiSpeakerTracker: """Track multiple speakers and manage beam steering.""" - def __init__(self, xvf_manager): + def __init__(self, xvf_manager, audio_stream=None): self.xvf = xvf_manager + self.audio_stream = audio_stream # for cocktail party focus control self.speakers: list[TrackedSpeaker] = [] self.fixed_mode = False + self.target_speaker_idx: int = 0 # which speaker is the "target" (0 or 1) self._lock = threading.Lock() def update(self, doa: dict) -> dict: @@ -144,7 +146,8 @@ class MultiSpeakerTracker: return best def _manage_beams(self): - """Switch between auto and fixed beam mode based on speaker count.""" + """Switch between auto and fixed beam mode based on speaker count. + Also manages audio focus for cocktail party filtering.""" stable_speakers = [s for s in self.speakers if s.stable] if len(stable_speakers) >= 2 and not self.fixed_mode: @@ -157,34 +160,66 @@ class MultiSpeakerTracker: s2.beam_locked = True self.fixed_mode = True + # Focus audio on the target speaker's side + self._update_audio_focus() + elif len(stable_speakers) >= 2 and self.fixed_mode: # Update beam directions if speakers moved s1, s2 = stable_speakers[0], stable_speakers[1] self.xvf.steer_beams(s1.angle, s2.angle) + # If the non-target speaker starts talking and target is silent, + # auto-switch target to the active one + target = stable_speakers[self.target_speaker_idx] + other_idx = 1 - self.target_speaker_idx + other = stable_speakers[other_idx] + if other.active and not target.active and target.silence_duration > 1.0: + self.target_speaker_idx = other_idx + logger.info("Attention shifted to speaker %d at %.0f°", + other_idx + 1, other.angle) + self._update_audio_focus() + elif len(stable_speakers) < 2 and self.fixed_mode: # Back to single speaker or silence — release beams logger.info("Releasing beams — back to auto mode") self.xvf.release_beams() self.fixed_mode = False + self.target_speaker_idx = 0 for s in self.speakers: s.beam_locked = False + # Release audio focus + if self.audio_stream: + self.audio_stream.focus_side = None + + def _update_audio_focus(self): + """Set the audio stream to focus on the target speaker's side.""" + if not self.audio_stream or not self.speakers: + return + stable = [s for s in self.speakers if s.stable] + if self.target_speaker_idx < len(stable): + target = stable[self.target_speaker_idx] + self.audio_stream.focus_side = target.side + logger.debug("Audio focus: %s (speaker at %.0f°)", target.side, target.angle) def _get_state(self) -> dict: return { "speakers": [ { + "index": i, "angle": round(s.angle, 1), "side": s.side, "active": s.active, "beam_locked": s.beam_locked, + "is_target": i == self.target_speaker_idx, "age_seconds": round(s.age, 1), "silence_seconds": round(s.silence_duration, 1), "speaker_name": s.speaker_name, } - for s in self.speakers + for i, s in enumerate(self.speakers) ], "beam_mode": "fixed" if self.fixed_mode else "auto", + "target_speaker": self.target_speaker_idx, + "audio_focus": self.audio_stream.focus_side if self.audio_stream else None, "active_count": sum(1 for s in self.speakers if s.active), "total_tracked": len(self.speakers), }