Add cocktail party spatial filtering (#7)

audio_stream.py: Added focus_side property. When set, the stream
yields from the focused side regardless of energy (attention lock).
When None, falls back to energy-based auto selection.

multi_speaker.py: When beams lock onto 2 speakers, sets audio focus
to the target speaker's side. Auto-switches target when the current
target goes silent and the other starts talking. Manual focus via API.

headmic.py: New endpoint POST /speakers/focus?speaker=0|1 to manually
switch attention. /speakers/tracked now shows is_target, target_speaker,
and audio_focus fields.

The cocktail party effect: when 2 people are talking, the audio feed
to Porcupine/VAD/transcription comes from the target speaker's direction,
suppressing the other. XVF3800 beam gating silences the non-speaking beam,
and audio_stream focus locks the ear facing the target.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alex
2026-04-12 21:47:30 -05:00
parent 38d21ef53c
commit 0705b3818b
3 changed files with 64 additions and 8 deletions

View File

@@ -117,6 +117,7 @@ class DualAudioStream:
self.left = MicStream("left", left_device)
self.right = MicStream("right", right_device) if right_device else None
self.active_side: str = "left"
self.focus_side: Optional[str] = None # None=auto (energy), "left"/"right"=locked attention
self._running = False
def start(self):
@@ -162,18 +163,25 @@ class DualAudioStream:
last_frame_left = frame_left
last_frame_right = frame_right
# Pick best beam
# Pick beam: focused attention overrides energy-based selection
if frame_right is None:
self.active_side = "left"
yield frame_left, "left"
elif self.focus_side:
# Cocktail party mode: locked onto a specific side
self.active_side = self.focus_side
if self.focus_side == "right" and frame_right:
yield frame_right, "right"
else:
yield frame_left, "left"
else:
# Auto mode: pick higher-energy side
left_energy = self.left.get_energy()
right_energy = self.right.get_energy()
if right_energy > left_energy * 1.1: # 10% hysteresis
if right_energy > left_energy * 1.1:
self.active_side = "right"
elif left_energy > right_energy * 1.1:
self.active_side = "left"
# else: keep current active_side (hysteresis prevents flapping)
if self.active_side == "right" and frame_right:
yield frame_right, "right"

View File

@@ -588,8 +588,8 @@ async def startup():
# --- Multi-speaker tracking ---
if xvf_manager.left or xvf_manager.right:
from multi_speaker import MultiSpeakerTracker
multi_speaker = MultiSpeakerTracker(xvf_manager)
logger.info("Multi-speaker tracking enabled (2 beams per array)")
multi_speaker = MultiSpeakerTracker(xvf_manager, audio_stream=dual_stream)
logger.info("Multi-speaker tracking enabled (2 beams per array, cocktail party filtering)")
# --- Binaural recording ---
if os.environ.get("BINAURAL_RECORD", "").lower() in ("1", "true", "yes"):
@@ -692,6 +692,19 @@ async def tracked_speakers():
return state.multi_speaker
@app.post("/speakers/focus")
async def focus_speaker(speaker: int = 0):
"""Switch attention to a specific tracked speaker (0 or 1).
In cocktail party mode, the focused speaker's audio feeds wake word + transcription."""
if not multi_speaker:
raise HTTPException(status_code=503, detail="Multi-speaker tracking not available")
if speaker not in (0, 1):
raise HTTPException(status_code=400, detail="Speaker index must be 0 or 1")
multi_speaker.target_speaker_idx = speaker
multi_speaker._update_audio_focus()
return {"ok": True, "target_speaker": speaker}
# --- Spatial scene ---
@app.get("/scene")

View File

@@ -80,10 +80,12 @@ class TrackedSpeaker:
class MultiSpeakerTracker:
"""Track multiple speakers and manage beam steering."""
def __init__(self, xvf_manager):
def __init__(self, xvf_manager, audio_stream=None):
self.xvf = xvf_manager
self.audio_stream = audio_stream # for cocktail party focus control
self.speakers: list[TrackedSpeaker] = []
self.fixed_mode = False
self.target_speaker_idx: int = 0 # which speaker is the "target" (0 or 1)
self._lock = threading.Lock()
def update(self, doa: dict) -> dict:
@@ -144,7 +146,8 @@ class MultiSpeakerTracker:
return best
def _manage_beams(self):
"""Switch between auto and fixed beam mode based on speaker count."""
"""Switch between auto and fixed beam mode based on speaker count.
Also manages audio focus for cocktail party filtering."""
stable_speakers = [s for s in self.speakers if s.stable]
if len(stable_speakers) >= 2 and not self.fixed_mode:
@@ -157,34 +160,66 @@ class MultiSpeakerTracker:
s2.beam_locked = True
self.fixed_mode = True
# Focus audio on the target speaker's side
self._update_audio_focus()
elif len(stable_speakers) >= 2 and self.fixed_mode:
# Update beam directions if speakers moved
s1, s2 = stable_speakers[0], stable_speakers[1]
self.xvf.steer_beams(s1.angle, s2.angle)
# If the non-target speaker starts talking and target is silent,
# auto-switch target to the active one
target = stable_speakers[self.target_speaker_idx]
other_idx = 1 - self.target_speaker_idx
other = stable_speakers[other_idx]
if other.active and not target.active and target.silence_duration > 1.0:
self.target_speaker_idx = other_idx
logger.info("Attention shifted to speaker %d at %.0f°",
other_idx + 1, other.angle)
self._update_audio_focus()
elif len(stable_speakers) < 2 and self.fixed_mode:
# Back to single speaker or silence — release beams
logger.info("Releasing beams — back to auto mode")
self.xvf.release_beams()
self.fixed_mode = False
self.target_speaker_idx = 0
for s in self.speakers:
s.beam_locked = False
# Release audio focus
if self.audio_stream:
self.audio_stream.focus_side = None
def _update_audio_focus(self):
"""Set the audio stream to focus on the target speaker's side."""
if not self.audio_stream or not self.speakers:
return
stable = [s for s in self.speakers if s.stable]
if self.target_speaker_idx < len(stable):
target = stable[self.target_speaker_idx]
self.audio_stream.focus_side = target.side
logger.debug("Audio focus: %s (speaker at %.0f°)", target.side, target.angle)
def _get_state(self) -> dict:
return {
"speakers": [
{
"index": i,
"angle": round(s.angle, 1),
"side": s.side,
"active": s.active,
"beam_locked": s.beam_locked,
"is_target": i == self.target_speaker_idx,
"age_seconds": round(s.age, 1),
"silence_seconds": round(s.silence_duration, 1),
"speaker_name": s.speaker_name,
}
for s in self.speakers
for i, s in enumerate(self.speakers)
],
"beam_mode": "fixed" if self.fixed_mode else "auto",
"target_speaker": self.target_speaker_idx,
"audio_focus": self.audio_stream.focus_side if self.audio_stream else None,
"active_count": sum(1 for s in self.speakers if s.active),
"total_tracked": len(self.speakers),
}