Files
headmic/spatial.py
Alex 36aeb19280 Add binaural recording + tune spatial tracking
binaural_recorder.py: Records left/right ear streams as stereo WAV
in rolling 5-minute segments. Training data for spatial audio models.
Enabled via BINAURAL_RECORD=1 env var.

spatial.py: Tune smoothing — alpha 0.3→0.4 (snappier response),
idle return speed 0.05→0.03 (gentler drift), timeout 2s→1.5s.

headmic.py: Wire binaural recorder into audio loop, add /recording
endpoint for stats, feed both ear streams (not just best beam).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 20:53:05 -05:00

214 lines
8.0 KiB
Python

"""
Binaural spatial hearing — triangulation, tracking, gaze.
Combines DoA angles from two XVF3800 arrays into a sound source position,
smooths the tracking, and pushes gaze coordinates to the eye service.
"""
import logging
import math
import time
from typing import Optional
logger = logging.getLogger("headmic.spatial")
# Array geometry (measured on skull, can be overridden from config)
DEFAULT_ARRAY_SEPARATION_MM = 175.0 # center-to-center distance between arrays
# Gaze mapping
GAZE_CENTER = 127 # neutral gaze (0-255 range)
GAZE_X_RANGE = 80 # max horizontal deflection from center
GAZE_Y_RANGE = 30 # max vertical deflection from center
GAZE_MAX_DISTANCE_MM = 3000 # beyond this, gaze is "far" (no convergence)
# Smoothing
SMOOTHING_ALPHA = 0.4 # exponential smoothing (0=sluggish, 1=instant) — slightly snappy
IDLE_RETURN_SPEED = 0.03 # how fast gaze drifts to center when no VAD — gentle drift
IDLE_TIMEOUT_S = 1.5 # seconds of no VAD before drifting to center
class SpatialTracker:
"""Triangulates sound source from two DoA angles and produces smooth gaze."""
def __init__(self, array_separation_mm: float = DEFAULT_ARRAY_SEPARATION_MM):
self.separation = array_separation_mm
self.half_sep = array_separation_mm / 2.0
# Smoothed state
self._smooth_x: float = 0.0 # mm, relative to skull center
self._smooth_y: float = 0.0 # mm, forward from skull
self._smooth_gaze_x: float = float(GAZE_CENTER)
self._smooth_gaze_y: float = float(GAZE_CENTER)
# VAD tracking
self._last_vad_time: float = 0.0
self._any_vad: bool = False
# Last raw result for API
self.last_position: Optional[dict] = None
def update(self, doa: dict) -> Optional[dict]:
"""
Process DoA readings from both arrays.
Args:
doa: {"left": {"angle": 0-359, "vad": bool}, "right": {"angle": 0-359, "vad": bool}}
Returns:
{"x_mm": float, "y_mm": float, "distance_mm": float,
"gaze_x": int, "gaze_y": int, "vad": bool, "side": str}
or None if insufficient data.
"""
left = doa.get("left")
right = doa.get("right")
if not left or not right:
return self._idle_drift()
left_vad = left.get("vad", False)
right_vad = right.get("vad", False)
any_vad = left_vad or right_vad
if any_vad:
self._last_vad_time = time.monotonic()
self._any_vad = True
left_angle = left["angle"]
right_angle = right["angle"]
# Triangulate position
pos = self._triangulate(left_angle, right_angle)
if pos and any_vad:
# Smooth the position
self._smooth_x += SMOOTHING_ALPHA * (pos["x_mm"] - self._smooth_x)
self._smooth_y += SMOOTHING_ALPHA * (pos["y_mm"] - self._smooth_y)
elif not any_vad:
return self._idle_drift()
# Convert to gaze
gaze_x, gaze_y = self._position_to_gaze(self._smooth_x, self._smooth_y)
# Smooth gaze
self._smooth_gaze_x += SMOOTHING_ALPHA * (gaze_x - self._smooth_gaze_x)
self._smooth_gaze_y += SMOOTHING_ALPHA * (gaze_y - self._smooth_gaze_y)
result = {
"x_mm": round(self._smooth_x, 1),
"y_mm": round(self._smooth_y, 1),
"distance_mm": round(math.sqrt(self._smooth_x**2 + self._smooth_y**2), 1),
"gaze_x": int(round(self._smooth_gaze_x)),
"gaze_y": int(round(self._smooth_gaze_y)),
"vad": any_vad,
"side": "left" if self._smooth_x < 0 else "right",
}
self.last_position = result
return result
def _idle_drift(self) -> Optional[dict]:
"""When no VAD, smoothly return gaze to center."""
elapsed = time.monotonic() - self._last_vad_time
if elapsed < IDLE_TIMEOUT_S:
# Hold last position briefly
return self.last_position
# Drift toward center
self._smooth_gaze_x += IDLE_RETURN_SPEED * (GAZE_CENTER - self._smooth_gaze_x)
self._smooth_gaze_y += IDLE_RETURN_SPEED * (GAZE_CENTER - self._smooth_gaze_y)
result = {
"x_mm": round(self._smooth_x, 1),
"y_mm": round(self._smooth_y, 1),
"distance_mm": round(math.sqrt(self._smooth_x**2 + self._smooth_y**2), 1),
"gaze_x": int(round(self._smooth_gaze_x)),
"gaze_y": int(round(self._smooth_gaze_y)),
"vad": False,
"side": "center",
}
self.last_position = result
return result
def _triangulate(self, left_deg: float, right_deg: float) -> Optional[dict]:
"""
Triangulate sound source position from two DoA angles.
Array coordinate system:
- Origin: center of skull
- X axis: positive = right (toward right ear)
- Y axis: positive = forward (in front of skull)
Each array's DoA is 0° = front, 90° = right, 180° = back, 270° = left.
The arrays are positioned at (-half_sep, 0) and (+half_sep, 0).
"""
# Convert DoA angles to bearing vectors
# DoA 0° = forward (+Y), 90° = right (+X) for each array
left_rad = math.radians(left_deg)
right_rad = math.radians(right_deg)
# Direction vectors from each array position
# Left array at (-half_sep, 0), right array at (+half_sep, 0)
left_dx = math.sin(left_rad)
left_dy = math.cos(left_rad)
right_dx = math.sin(right_rad)
right_dy = math.cos(right_rad)
# Solve intersection of two rays:
# P_left + t * D_left = P_right + s * D_right
# (-half_sep + t*left_dx, t*left_dy) = (half_sep + s*right_dx, s*right_dy)
#
# t*left_dx - s*right_dx = separation
# t*left_dy - s*right_dy = 0
denom = left_dx * right_dy - left_dy * right_dx
if abs(denom) < 0.001:
# Parallel rays — can't triangulate, source is very far away or directly ahead
# Fall back to bearing midpoint at a default distance
avg_rad = (left_rad + right_rad) / 2
return {
"x_mm": GAZE_MAX_DISTANCE_MM * math.sin(avg_rad),
"y_mm": GAZE_MAX_DISTANCE_MM * math.cos(avg_rad),
}
t = (self.separation * right_dy) / denom
if t < 0:
# Intersection is behind the arrays — likely noise or rear source
# Use the bearing with positive t scaled to max distance
avg_rad = (left_rad + right_rad) / 2
return {
"x_mm": GAZE_MAX_DISTANCE_MM * math.sin(avg_rad) * 0.5,
"y_mm": GAZE_MAX_DISTANCE_MM * math.cos(avg_rad) * 0.5,
}
# Compute intersection point relative to left array, then shift to skull center
x = -self.half_sep + t * left_dx
y = t * left_dy
return {"x_mm": x, "y_mm": y}
def _position_to_gaze(self, x_mm: float, y_mm: float) -> tuple[float, float]:
"""
Convert position (mm) to gaze coordinates (0-255).
Horizontal: source on the right → eyes look right (gaze_x > 127)
Vertical: source closer → eyes look slightly down, farther → straight ahead
"""
distance = math.sqrt(x_mm**2 + y_mm**2)
if distance < 1.0:
return float(GAZE_CENTER), float(GAZE_CENTER)
# Horizontal: angle from center
angle = math.atan2(x_mm, max(y_mm, 100.0)) # clamp y to avoid extreme angles
# Map angle (roughly -pi/2 to pi/2) to gaze range
gaze_x = GAZE_CENTER + GAZE_X_RANGE * (angle / (math.pi / 2))
gaze_x = max(GAZE_CENTER - GAZE_X_RANGE, min(GAZE_CENTER + GAZE_X_RANGE, gaze_x))
# Vertical: closer = slightly down, far = center
# This simulates looking down at someone close vs straight ahead at someone far
proximity = max(0.0, 1.0 - distance / GAZE_MAX_DISTANCE_MM)
gaze_y = GAZE_CENTER + GAZE_Y_RANGE * proximity * 0.3 # subtle effect
return gaze_x, gaze_y