Add binaural recording + tune spatial tracking

binaural_recorder.py: Records left/right ear streams as stereo WAV
in rolling 5-minute segments. Training data for spatial audio models.
Enabled via BINAURAL_RECORD=1 env var.

spatial.py: Tune smoothing — alpha 0.3→0.4 (snappier response),
idle return speed 0.05→0.03 (gentler drift), timeout 2s→1.5s.

headmic.py: Wire binaural recorder into audio loop, add /recording
endpoint for stats, feed both ear streams (not just best beam).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alex
2026-04-12 20:53:05 -05:00
parent afc8694c1a
commit 36aeb19280
3 changed files with 157 additions and 4 deletions

122
binaural_recorder.py Normal file
View File

@@ -0,0 +1,122 @@
"""
Binaural audio recorder — saves left/right ear streams as stereo WAV.
Records continuously in rolling segments (default 5 minutes each).
Captures spatial audio that preserves left/right positioning.
Training data for spatial audio models and being0.
"""
import logging
import os
import struct
import threading
import time
import wave
from pathlib import Path
from typing import Optional
logger = logging.getLogger("headmic.binaural")
DEFAULT_SEGMENT_SECONDS = 300 # 5 minutes per file
DEFAULT_OUTPUT_DIR = os.path.expanduser("~/headmic/recordings")
SAMPLE_RATE = 16000
class BinauralRecorder:
"""Records stereo audio from two mic streams in rolling segments."""
def __init__(self, output_dir: str = DEFAULT_OUTPUT_DIR,
segment_seconds: int = DEFAULT_SEGMENT_SECONDS):
self.output_dir = Path(output_dir)
self.segment_seconds = segment_seconds
self.output_dir.mkdir(parents=True, exist_ok=True)
self._running = False
self._lock = threading.Lock()
self._left_buf: list[bytes] = []
self._right_buf: list[bytes] = []
self._segment_start: float = 0
self._total_segments = 0
self._total_seconds = 0.0
self._thread: Optional[threading.Thread] = None
def start(self):
self._running = True
self._segment_start = time.time()
self._thread = threading.Thread(target=self._flush_loop, daemon=True)
self._thread.start()
logger.info("Binaural recording started → %s (%ds segments)",
self.output_dir, self.segment_seconds)
def stop(self):
self._running = False
self._flush_segment()
logger.info("Binaural recording stopped (%d segments, %.0f seconds total)",
self._total_segments, self._total_seconds)
def feed(self, left_frame: Optional[bytes], right_frame: Optional[bytes]):
"""Feed a pair of audio frames (512 samples each, 16-bit PCM)."""
with self._lock:
if left_frame:
self._left_buf.append(left_frame)
if right_frame:
self._right_buf.append(right_frame)
def _flush_loop(self):
while self._running:
elapsed = time.time() - self._segment_start
if elapsed >= self.segment_seconds:
self._flush_segment()
self._segment_start = time.time()
time.sleep(1.0)
def _flush_segment(self):
with self._lock:
left_frames = self._left_buf
right_frames = self._right_buf
self._left_buf = []
self._right_buf = []
if not left_frames and not right_frames:
return
# Interleave left/right into stereo
# Pad shorter channel with silence
max_frames = max(len(left_frames), len(right_frames))
silence = b'\x00' * 1024 # 512 samples * 2 bytes
stereo_data = bytearray()
for i in range(max_frames):
left = left_frames[i] if i < len(left_frames) else silence
right = right_frames[i] if i < len(right_frames) else silence
# Interleave sample by sample: L0 R0 L1 R1 ...
left_samples = struct.unpack(f"<{len(left)//2}h", left)
right_samples = struct.unpack(f"<{len(right)//2}h", right)
for l, r in zip(left_samples, right_samples):
stereo_data.extend(struct.pack("<hh", l, r))
# Write WAV
timestamp = time.strftime("%Y%m%d_%H%M%S", time.localtime(self._segment_start))
filename = self.output_dir / f"binaural_{timestamp}.wav"
duration = max_frames * 512 / SAMPLE_RATE
with wave.open(str(filename), 'wb') as wf:
wf.setnchannels(2)
wf.setsampwidth(2)
wf.setframerate(SAMPLE_RATE)
wf.writeframes(bytes(stereo_data))
self._total_segments += 1
self._total_seconds += duration
logger.info("Saved %s (%.1fs, %.1fMB)", filename.name, duration,
len(stereo_data) / 1_000_000)
@property
def stats(self) -> dict:
return {
"recording": self._running,
"output_dir": str(self.output_dir),
"segment_seconds": self.segment_seconds,
"total_segments": self._total_segments,
"total_seconds": round(self._total_seconds, 1),
}

View File

@@ -170,6 +170,9 @@ sound_ring_buffer = None # collections.deque, filled by listener_loop
# Speaker recognizer globals
speaker_recognizer = None
enrollment_buffer = None # list of frame bytes, set during enrollment
# Binaural recorder
binaural_recorder = None
enrollment_name = None
# Audio stream
@@ -265,6 +268,13 @@ def listener_loop():
state.active_side = side
# Feed binaural recorder (both ears)
if binaural_recorder:
binaural_recorder.feed(
dual_stream.left.get_frame(),
dual_stream.right.get_frame() if dual_stream.right else None
)
# Feed sound classifier ring buffer
if sound_ring_buffer is not None:
sound_ring_buffer.append(frame_data)
@@ -454,7 +464,7 @@ app = FastAPI(title="HeadMic", description="Vixy's Ears 🦊👂 (Dual XVF3800)"
@app.on_event("startup")
async def startup():
global sound_classifier, sound_ring_buffer, speaker_recognizer, dual_stream, LEDS_AVAILABLE, spatial_tracker
global sound_classifier, sound_ring_buffer, speaker_recognizer, dual_stream, LEDS_AVAILABLE, spatial_tracker, binaural_recorder
state.running = True
@@ -530,6 +540,15 @@ async def startup():
logger.info("Spatial tracking started (%d Hz, %.0fmm baseline, pushing gaze to %s)",
DOA_POLL_HZ, array_sep, EYE_SERVICE_URL)
# --- Binaural recording ---
if os.environ.get("BINAURAL_RECORD", "").lower() in ("1", "true", "yes"):
from binaural_recorder import BinauralRecorder
rec_dir = os.environ.get("BINAURAL_DIR", os.path.expanduser("~/headmic/recordings"))
binaural_recorder = BinauralRecorder(output_dir=rec_dir)
binaural_recorder.start()
else:
logger.info("Binaural recording disabled (set BINAURAL_RECORD=1 to enable)")
# --- Main listener ---
thread = threading.Thread(target=listener_loop, daemon=True)
thread.start()
@@ -540,6 +559,8 @@ async def startup():
async def shutdown():
state.running = False
leds_off()
if binaural_recorder:
binaural_recorder.stop()
if dual_stream:
dual_stream.stop()
@@ -606,6 +627,16 @@ async def doa():
}
# --- Binaural recording ---
@app.get("/recording")
async def recording():
"""Binaural recording status."""
if not binaural_recorder:
return {"recording": False, "enabled": False}
return binaural_recorder.stats
# --- Device info ---
@app.get("/devices")

View File

@@ -22,9 +22,9 @@ GAZE_Y_RANGE = 30 # max vertical deflection from center
GAZE_MAX_DISTANCE_MM = 3000 # beyond this, gaze is "far" (no convergence)
# Smoothing
SMOOTHING_ALPHA = 0.3 # exponential smoothing (0=sluggish, 1=instant)
IDLE_RETURN_SPEED = 0.05 # how fast gaze drifts to center when no VAD
IDLE_TIMEOUT_S = 2.0 # seconds of no VAD before drifting to center
SMOOTHING_ALPHA = 0.4 # exponential smoothing (0=sluggish, 1=instant) — slightly snappy
IDLE_RETURN_SPEED = 0.03 # how fast gaze drifts to center when no VAD — gentle drift
IDLE_TIMEOUT_S = 1.5 # seconds of no VAD before drifting to center
class SpatialTracker: