diff --git a/headmic.py b/headmic.py index ddda680..19b632c 100644 --- a/headmic.py +++ b/headmic.py @@ -161,6 +161,7 @@ class ServiceState: self.doa: dict = {} # latest DoA from both arrays self.spatial: Optional[dict] = None # triangulated position + gaze self.last_anomaly: Optional[dict] = None # last spatial anomaly detected + self.multi_speaker: Optional[dict] = None # multi-speaker tracking state state = ServiceState() @@ -177,6 +178,9 @@ binaural_recorder = None # Spatial scene spatial_scene = None + +# Multi-speaker tracker +multi_speaker = None enrollment_name = None # Audio stream @@ -439,6 +443,10 @@ def doa_track_loop(): try: state.doa = xvf_manager.read_both_doa() + # Multi-speaker tracking (beam steering) + if multi_speaker: + state.multi_speaker = multi_speaker.update(state.doa) + if spatial_tracker and dual_stream: left_energy = dual_stream.left.get_energy() if dual_stream.left else 0.0 right_energy = dual_stream.right.get_energy() if dual_stream.right else 0.0 @@ -496,7 +504,7 @@ app = FastAPI(title="HeadMic", description="Vixy's Ears 🦊👂 (Dual XVF3800)" @app.on_event("startup") async def startup(): - global sound_classifier, sound_ring_buffer, speaker_recognizer, dual_stream, LEDS_AVAILABLE, spatial_tracker, binaural_recorder, spatial_scene + global sound_classifier, sound_ring_buffer, speaker_recognizer, dual_stream, LEDS_AVAILABLE, spatial_tracker, binaural_recorder, spatial_scene, multi_speaker state.running = True @@ -577,6 +585,12 @@ async def startup(): spatial_scene = SpatialScene() spatial_scene.start() + # --- Multi-speaker tracking --- + if xvf_manager.left or xvf_manager.right: + from multi_speaker import MultiSpeakerTracker + multi_speaker = MultiSpeakerTracker(xvf_manager) + logger.info("Multi-speaker tracking enabled (2 beams per array)") + # --- Binaural recording --- if os.environ.get("BINAURAL_RECORD", "").lower() in ("1", "true", "yes"): from binaural_recorder import BinauralRecorder @@ -596,6 +610,8 @@ async def startup(): async def shutdown(): state.running = False leds_off() + if multi_speaker and xvf_manager: + xvf_manager.release_beams() if spatial_scene: spatial_scene.stop() if binaural_recorder: @@ -666,6 +682,16 @@ async def doa(): } +# --- Multi-speaker --- + +@app.get("/speakers/tracked") +async def tracked_speakers(): + """Currently tracked speaker positions and beam state.""" + if not state.multi_speaker: + return {"speakers": [], "beam_mode": "auto", "active_count": 0, "total_tracked": 0} + return state.multi_speaker + + # --- Spatial scene --- @app.get("/scene") diff --git a/multi_speaker.py b/multi_speaker.py new file mode 100644 index 0000000..39626ec --- /dev/null +++ b/multi_speaker.py @@ -0,0 +1,197 @@ +""" +Multi-speaker tracking — track up to 2 speakers simultaneously. + +Uses the XVF3800's fixed beam mode to lock beams onto detected speakers. +Each array has 2 steerable beams, so with 2 arrays we can independently +track 2 speakers from different directions. + +Flow: +1. Auto mode: free-running beams track the loudest source (default) +2. When 2 distinct DoA angles are detected: switch to fixed beam mode +3. Beam 1 → speaker A direction, Beam 2 → speaker B direction +4. Track which speaker is active, feed to spatial tracker + speaker ID +5. When only 1 speaker remains, release back to auto mode +""" + +import logging +import math +import time +import threading +from typing import Optional + +logger = logging.getLogger("headmic.multispeaker") + +# How different two DoA angles must be to count as separate speakers (degrees) +MIN_SPEAKER_SEPARATION = 30.0 + +# How long a speaker position must be stable before locking a beam (seconds) +LOCK_DELAY = 1.0 + +# How long after a speaker goes silent before releasing the beam (seconds) +RELEASE_DELAY = 3.0 + +# Maximum number of tracked speakers +MAX_SPEAKERS = 2 + + +class TrackedSpeaker: + """A speaker position being tracked.""" + + def __init__(self, angle: float, side: str): + self.angle = angle # degrees, smoothed + self.side = side # "left" or "right" (which ear detected) + self.first_seen = time.monotonic() + self.last_seen = time.monotonic() + self.active = True # currently producing speech + self.beam_locked = False # beam steered to this position + self.speaker_name: Optional[str] = None # from speaker ID + + def update_angle(self, angle: float, alpha: float = 0.3): + """Smooth angle update.""" + # Handle circular wraparound + diff = angle - self.angle + if diff > 180: + diff -= 360 + elif diff < -180: + diff += 360 + self.angle = (self.angle + alpha * diff) % 360 + self.last_seen = time.monotonic() + self.active = True + + @property + def age(self) -> float: + return time.monotonic() - self.first_seen + + @property + def silence_duration(self) -> float: + return time.monotonic() - self.last_seen + + @property + def stable(self) -> bool: + """Has this speaker been present long enough to lock a beam?""" + return self.age >= LOCK_DELAY + + @property + def expired(self) -> bool: + """Has this speaker been silent long enough to release?""" + return self.silence_duration >= RELEASE_DELAY + + +class MultiSpeakerTracker: + """Track multiple speakers and manage beam steering.""" + + def __init__(self, xvf_manager): + self.xvf = xvf_manager + self.speakers: list[TrackedSpeaker] = [] + self.fixed_mode = False + self._lock = threading.Lock() + + def update(self, doa: dict) -> dict: + """ + Process DoA from both arrays and update speaker tracking. + + Args: + doa: {"left": {"angle": N, "vad": bool}, "right": {"angle": N, "vad": bool}} + + Returns: + {"speakers": [...], "beam_mode": "auto"|"fixed", "active_count": N} + """ + with self._lock: + # Collect active DoA readings + active_readings = [] + for side in ("left", "right"): + d = doa.get(side) + if d and d.get("vad"): + active_readings.append({"angle": d["angle"], "side": side}) + + # Mark all speakers as potentially inactive this frame + for s in self.speakers: + s.active = False + + # Match readings to existing speakers or create new ones + for reading in active_readings: + matched = self._match_speaker(reading["angle"]) + if matched: + matched.update_angle(reading["angle"]) + elif len(self.speakers) < MAX_SPEAKERS: + new_speaker = TrackedSpeaker(reading["angle"], reading["side"]) + self.speakers.append(new_speaker) + logger.info("New speaker detected at %.0f° (%s side)", + reading["angle"], reading["side"]) + + # Remove expired speakers + expired = [s for s in self.speakers if s.expired] + for s in expired: + logger.info("Speaker at %.0f° expired (silent %.1fs)", + s.angle, s.silence_duration) + self.speakers = [s for s in self.speakers if not s.expired] + + # Manage beam steering + self._manage_beams() + + return self._get_state() + + def _match_speaker(self, angle: float) -> Optional[TrackedSpeaker]: + """Find an existing speaker close to this angle.""" + best = None + best_dist = MIN_SPEAKER_SEPARATION + for s in self.speakers: + dist = abs(angle - s.angle) % 360 + dist = min(dist, 360 - dist) + if dist < best_dist: + best = s + best_dist = dist + return best + + def _manage_beams(self): + """Switch between auto and fixed beam mode based on speaker count.""" + stable_speakers = [s for s in self.speakers if s.stable] + + if len(stable_speakers) >= 2 and not self.fixed_mode: + # Two speakers detected — lock beams + s1, s2 = stable_speakers[0], stable_speakers[1] + logger.info("Locking beams: speaker 1 at %.0f°, speaker 2 at %.0f°", + s1.angle, s2.angle) + self.xvf.steer_beams(s1.angle, s2.angle) + s1.beam_locked = True + s2.beam_locked = True + self.fixed_mode = True + + elif len(stable_speakers) >= 2 and self.fixed_mode: + # Update beam directions if speakers moved + s1, s2 = stable_speakers[0], stable_speakers[1] + self.xvf.steer_beams(s1.angle, s2.angle) + + elif len(stable_speakers) < 2 and self.fixed_mode: + # Back to single speaker or silence — release beams + logger.info("Releasing beams — back to auto mode") + self.xvf.release_beams() + self.fixed_mode = False + for s in self.speakers: + s.beam_locked = False + + def _get_state(self) -> dict: + return { + "speakers": [ + { + "angle": round(s.angle, 1), + "side": s.side, + "active": s.active, + "beam_locked": s.beam_locked, + "age_seconds": round(s.age, 1), + "silence_seconds": round(s.silence_duration, 1), + "speaker_name": s.speaker_name, + } + for s in self.speakers + ], + "beam_mode": "fixed" if self.fixed_mode else "auto", + "active_count": sum(1 for s in self.speakers if s.active), + "total_tracked": len(self.speakers), + } + + def set_speaker_name(self, angle: float, name: str): + """Associate a speaker ID name with the nearest tracked speaker.""" + with self._lock: + speaker = self._match_speaker(angle) + if speaker: + speaker.speaker_name = name diff --git a/xvf3800.py b/xvf3800.py index 707fb20..a751fd0 100644 --- a/xvf3800.py +++ b/xvf3800.py @@ -40,8 +40,15 @@ LED_EFFECT_CMD = 12 # 0=off, 1=breath, 2=rainbow, 3=solid, 4=doa, 5=ring AEC_RESID = 33 AEC_AZIMUTH_CMD = 75 # 4 floats: beam1, beam2, free-running, auto-select (radians) AEC_SPENERGY_CMD = 80 # 4 floats: speech energy per beam (>0 = speech) +# Beam steering (resid=33) +AEC_FIXED_BEAMS_ON_CMD = 37 # rw uint8: 0=off, 1=on — enable fixed beam mode +AEC_FIXED_BEAMS_AZ_CMD = 81 # rw 2 floats (radians): beam 1 azimuth, beam 2 azimuth +AEC_FIXED_BEAMS_ELEV_CMD = 82 # rw 2 floats (radians): beam 1 elevation, beam 2 elevation +AEC_FIXED_BEAMS_GATING_CMD = 83 # rw uint8: 0=off, 1=on — silence beams without speech + AUDIO_MGR_RESID = 35 AUDIO_MGR_SELECTED_AZ_CMD = 11 # 2 floats: processed DoA, auto-select DoA (radians) +AUDIO_MGR_SELECTED_CH_CMD = 12 # rw 2 uint8: which beam goes to L/R output channel LED_BRIGHTNESS_CMD = 14 LED_COLOR_CMD = 16 # single uint32 color (confirmed: xvf_host LED_COLOR cmdid=16) LED_DOA_COLOR_CMD = 17 # two uint32 values: base + indicator @@ -129,6 +136,41 @@ class XVF3800: return int(angle_deg) % 360, vad + def read_all_beams(self) -> dict: + """Read all 4 beam azimuths: beam1, beam2, free-running, auto-select.""" + import math + data = self._read_float(AEC_RESID, AEC_AZIMUTH_CMD, 4) + if len(data) < 17: # 1 status + 4*4 bytes + return {} + beams = struct.unpack_from(" dict: """Read DoA from both arrays.""" result = {}