Files
headmic/binaural_recorder.py
Alex 36aeb19280 Add binaural recording + tune spatial tracking
binaural_recorder.py: Records left/right ear streams as stereo WAV
in rolling 5-minute segments. Training data for spatial audio models.
Enabled via BINAURAL_RECORD=1 env var.

spatial.py: Tune smoothing — alpha 0.3→0.4 (snappier response),
idle return speed 0.05→0.03 (gentler drift), timeout 2s→1.5s.

headmic.py: Wire binaural recorder into audio loop, add /recording
endpoint for stats, feed both ear streams (not just best beam).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 20:53:05 -05:00

123 lines
4.3 KiB
Python

"""
Binaural audio recorder — saves left/right ear streams as stereo WAV.
Records continuously in rolling segments (default 5 minutes each).
Captures spatial audio that preserves left/right positioning.
Training data for spatial audio models and being0.
"""
import logging
import os
import struct
import threading
import time
import wave
from pathlib import Path
from typing import Optional
logger = logging.getLogger("headmic.binaural")
DEFAULT_SEGMENT_SECONDS = 300 # 5 minutes per file
DEFAULT_OUTPUT_DIR = os.path.expanduser("~/headmic/recordings")
SAMPLE_RATE = 16000
class BinauralRecorder:
"""Records stereo audio from two mic streams in rolling segments."""
def __init__(self, output_dir: str = DEFAULT_OUTPUT_DIR,
segment_seconds: int = DEFAULT_SEGMENT_SECONDS):
self.output_dir = Path(output_dir)
self.segment_seconds = segment_seconds
self.output_dir.mkdir(parents=True, exist_ok=True)
self._running = False
self._lock = threading.Lock()
self._left_buf: list[bytes] = []
self._right_buf: list[bytes] = []
self._segment_start: float = 0
self._total_segments = 0
self._total_seconds = 0.0
self._thread: Optional[threading.Thread] = None
def start(self):
self._running = True
self._segment_start = time.time()
self._thread = threading.Thread(target=self._flush_loop, daemon=True)
self._thread.start()
logger.info("Binaural recording started → %s (%ds segments)",
self.output_dir, self.segment_seconds)
def stop(self):
self._running = False
self._flush_segment()
logger.info("Binaural recording stopped (%d segments, %.0f seconds total)",
self._total_segments, self._total_seconds)
def feed(self, left_frame: Optional[bytes], right_frame: Optional[bytes]):
"""Feed a pair of audio frames (512 samples each, 16-bit PCM)."""
with self._lock:
if left_frame:
self._left_buf.append(left_frame)
if right_frame:
self._right_buf.append(right_frame)
def _flush_loop(self):
while self._running:
elapsed = time.time() - self._segment_start
if elapsed >= self.segment_seconds:
self._flush_segment()
self._segment_start = time.time()
time.sleep(1.0)
def _flush_segment(self):
with self._lock:
left_frames = self._left_buf
right_frames = self._right_buf
self._left_buf = []
self._right_buf = []
if not left_frames and not right_frames:
return
# Interleave left/right into stereo
# Pad shorter channel with silence
max_frames = max(len(left_frames), len(right_frames))
silence = b'\x00' * 1024 # 512 samples * 2 bytes
stereo_data = bytearray()
for i in range(max_frames):
left = left_frames[i] if i < len(left_frames) else silence
right = right_frames[i] if i < len(right_frames) else silence
# Interleave sample by sample: L0 R0 L1 R1 ...
left_samples = struct.unpack(f"<{len(left)//2}h", left)
right_samples = struct.unpack(f"<{len(right)//2}h", right)
for l, r in zip(left_samples, right_samples):
stereo_data.extend(struct.pack("<hh", l, r))
# Write WAV
timestamp = time.strftime("%Y%m%d_%H%M%S", time.localtime(self._segment_start))
filename = self.output_dir / f"binaural_{timestamp}.wav"
duration = max_frames * 512 / SAMPLE_RATE
with wave.open(str(filename), 'wb') as wf:
wf.setnchannels(2)
wf.setsampwidth(2)
wf.setframerate(SAMPLE_RATE)
wf.writeframes(bytes(stereo_data))
self._total_segments += 1
self._total_seconds += duration
logger.info("Saved %s (%.1fs, %.1fMB)", filename.name, duration,
len(stereo_data) / 1_000_000)
@property
def stats(self) -> dict:
return {
"recording": self._running,
"output_dir": str(self.output_dir),
"segment_seconds": self.segment_seconds,
"total_segments": self._total_segments,
"total_seconds": round(self._total_seconds, 1),
}