Add ITD (Interaural Time Difference) via cross-correlation (#12)

Cross-correlates left/right ear audio frames (512 samples, ~32ms window)
to find the sub-millisecond delay between arrays. Converts delay to
bearing angle using speed of sound and array separation.

At 16kHz with 175mm separation, resolution is ~1 sample = 62.5μs = ~7°.
Not lab-grade, but adds a third independent angle estimate alongside
DoA and ILD. Works with current 2-channel firmware — no raw mics needed.

New fields in /doa spatial response:
  itd_angle: bearing from cross-correlation (degrees)
  itd_delay_us: raw time delay (microseconds, positive = source on right)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alex
2026-04-12 21:51:25 -05:00
parent 0705b3818b
commit cae14023b7
2 changed files with 110 additions and 7 deletions

View File

@@ -450,7 +450,10 @@ def doa_track_loop():
if spatial_tracker and dual_stream: if spatial_tracker and dual_stream:
left_energy = dual_stream.left.get_energy() if dual_stream.left else 0.0 left_energy = dual_stream.left.get_energy() if dual_stream.left else 0.0
right_energy = dual_stream.right.get_energy() if dual_stream.right else 0.0 right_energy = dual_stream.right.get_energy() if dual_stream.right else 0.0
result = spatial_tracker.update(state.doa, left_energy, right_energy) left_audio = dual_stream.left.get_frame() if dual_stream.left else None
right_audio = dual_stream.right.get_frame() if dual_stream.right else None
result = spatial_tracker.update(
state.doa, left_energy, right_energy, left_audio, right_audio)
if result: if result:
state.spatial = result state.spatial = result
gx, gy = result["gaze_x"], result["gaze_y"] gx, gy = result["gaze_x"], result["gaze_y"]

View File

@@ -10,6 +10,8 @@ import math
import time import time
from typing import Optional from typing import Optional
import numpy as np
logger = logging.getLogger("headmic.spatial") logger = logging.getLogger("headmic.spatial")
# Array geometry (measured on skull, can be overridden from config) # Array geometry (measured on skull, can be overridden from config)
@@ -26,6 +28,14 @@ SMOOTHING_ALPHA = 0.4 # exponential smoothing (0=sluggish, 1=instant) —
IDLE_RETURN_SPEED = 0.03 # how fast gaze drifts to center when no VAD — gentle drift IDLE_RETURN_SPEED = 0.03 # how fast gaze drifts to center when no VAD — gentle drift
IDLE_TIMEOUT_S = 1.5 # seconds of no VAD before drifting to center IDLE_TIMEOUT_S = 1.5 # seconds of no VAD before drifting to center
# ITD (Interaural Time Difference)
SPEED_OF_SOUND_MM_S = 343000.0 # ~343 m/s in mm/s
SAMPLE_RATE = 16000
ITD_MAX_DELAY_SAMPLES = 9 # ±175mm / (343m/s * 62.5μs/sample) ≈ ±8.2 samples
ITD_WEIGHT = 0.3 # weight of ITD angle in fusion (DoA=0.5, ITD=0.3, ILD=0.2)
DOA_WEIGHT = 0.5
ILD_DIST_WEIGHT = 0.3
# Distance estimation (ILD-based) # Distance estimation (ILD-based)
# ILD = 20 * log10(louder_energy / quieter_energy) in dB # ILD = 20 * log10(louder_energy / quieter_energy) in dB
# Empirical mapping: ILD varies with angle and distance. # Empirical mapping: ILD varies with angle and distance.
@@ -56,6 +66,8 @@ class SpatialTracker:
self._smooth_gaze_y: float = float(GAZE_CENTER) self._smooth_gaze_y: float = float(GAZE_CENTER)
self._smooth_distance: float = GAZE_MAX_DISTANCE_MM self._smooth_distance: float = GAZE_MAX_DISTANCE_MM
self._smooth_ild: float = 0.0 # dB self._smooth_ild: float = 0.0 # dB
self._smooth_itd_angle: float = 0.0 # degrees, from cross-correlation
self._last_itd_samples: float = 0.0 # raw delay in samples
# VAD tracking # VAD tracking
self._last_vad_time: float = 0.0 self._last_vad_time: float = 0.0
@@ -64,18 +76,21 @@ class SpatialTracker:
# Last raw result for API # Last raw result for API
self.last_position: Optional[dict] = None self.last_position: Optional[dict] = None
def update(self, doa: dict, left_energy: float = 0.0, right_energy: float = 0.0) -> Optional[dict]: def update(self, doa: dict, left_energy: float = 0.0, right_energy: float = 0.0,
left_audio: bytes = None, right_audio: bytes = None) -> Optional[dict]:
""" """
Process DoA readings + audio energy from both arrays. Process DoA readings + audio energy + raw audio from both arrays.
Args: Args:
doa: {"left": {"angle": 0-359, "vad": bool}, "right": {"angle": 0-359, "vad": bool}} doa: {"left": {"angle": 0-359, "vad": bool}, "right": {"angle": 0-359, "vad": bool}}
left_energy: RMS energy from left mic stream (0.0-1.0) left_energy: RMS energy from left mic stream (0.0-1.0)
right_energy: RMS energy from right mic stream (0.0-1.0) right_energy: RMS energy from right mic stream (0.0-1.0)
left_audio: raw PCM bytes from left ear (int16, for ITD cross-correlation)
right_audio: raw PCM bytes from right ear (int16, for ITD cross-correlation)
Returns: Returns:
{"x_mm", "y_mm", "distance_mm", "ild_db", "proximity", {"x_mm", "y_mm", "distance_mm", "ild_db", "itd_angle", "itd_delay_us",
"gaze_x", "gaze_y", "vad", "side"} "proximity", "gaze_x", "gaze_y", "vad", "side"}
or None if insufficient data. or None if insufficient data.
""" """
left = doa.get("left") left = doa.get("left")
@@ -101,6 +116,15 @@ class SpatialTracker:
# Compute ILD (Interaural Level Difference) # Compute ILD (Interaural Level Difference)
ild_db = self._compute_ild(left_energy, right_energy) ild_db = self._compute_ild(left_energy, right_energy)
# Compute ITD if we have audio from both ears
itd_angle = None
if left_audio and right_audio and any_vad:
itd_result = self._compute_itd(left_audio, right_audio)
if itd_result is not None:
itd_angle, self._last_itd_samples = itd_result
self._smooth_itd_angle += SMOOTHING_ALPHA * (
self._shortest_angle_diff(itd_angle, self._smooth_itd_angle))
if pos and any_vad: if pos and any_vad:
# Smooth the position # Smooth the position
self._smooth_x += SMOOTHING_ALPHA * (pos["x_mm"] - self._smooth_x) self._smooth_x += SMOOTHING_ALPHA * (pos["x_mm"] - self._smooth_x)
@@ -110,8 +134,7 @@ class SpatialTracker:
# Fuse triangulated distance with ILD # Fuse triangulated distance with ILD
tri_dist = math.sqrt(self._smooth_x**2 + self._smooth_y**2) tri_dist = math.sqrt(self._smooth_x**2 + self._smooth_y**2)
ild_dist = self._ild_to_distance(self._smooth_ild) ild_dist = self._ild_to_distance(self._smooth_ild)
# Weighted average: trust triangulation more (0.7) but let ILD correct it (0.3) fused_dist = (1.0 - ILD_DIST_WEIGHT) * tri_dist + ILD_DIST_WEIGHT * ild_dist
fused_dist = 0.7 * tri_dist + 0.3 * ild_dist
self._smooth_distance += SMOOTHING_ALPHA * (fused_dist - self._smooth_distance) self._smooth_distance += SMOOTHING_ALPHA * (fused_dist - self._smooth_distance)
elif not any_vad: elif not any_vad:
return self._idle_drift() return self._idle_drift()
@@ -131,6 +154,8 @@ class SpatialTracker:
"y_mm": round(self._smooth_y, 1), "y_mm": round(self._smooth_y, 1),
"distance_mm": round(self._smooth_distance, 1), "distance_mm": round(self._smooth_distance, 1),
"ild_db": round(self._smooth_ild, 1), "ild_db": round(self._smooth_ild, 1),
"itd_angle": round(self._smooth_itd_angle, 1),
"itd_delay_us": round(self._last_itd_samples * 1e6 / SAMPLE_RATE, 1),
"proximity": proximity, "proximity": proximity,
"gaze_x": int(round(self._smooth_gaze_x)), "gaze_x": int(round(self._smooth_gaze_x)),
"gaze_y": int(round(self._smooth_gaze_y)), "gaze_y": int(round(self._smooth_gaze_y)),
@@ -192,6 +217,81 @@ class SpatialTracker:
else: else:
return 4000.0 # far or directly ahead (no ILD) return 4000.0 # far or directly ahead (no ILD)
def _compute_itd(self, left_audio: bytes, right_audio: bytes) -> Optional[tuple[float, float]]:
"""Compute Interaural Time Difference via cross-correlation.
Returns (angle_degrees, delay_samples) or None if insufficient data.
Positive delay = sound arrives at right ear first = source on right.
"""
try:
left = np.frombuffer(left_audio, dtype=np.int16).astype(np.float32)
right = np.frombuffer(right_audio, dtype=np.int16).astype(np.float32)
except Exception:
return None
min_len = min(len(left), len(right))
if min_len < 64:
return None
# Use the last 512 samples (~32ms window) for correlation
window = min(512, min_len)
left = left[-window:]
right = right[-window:]
# Normalize to prevent overflow
left_norm = np.linalg.norm(left)
right_norm = np.linalg.norm(right)
if left_norm < 1.0 or right_norm < 1.0:
return None # silence
left = left / left_norm
right = right / right_norm
# Cross-correlate within the expected delay range
max_delay = ITD_MAX_DELAY_SAMPLES
corr = np.correlate(left, right, mode='full')
# corr center is at index len(left)-1, corresponding to zero delay
center = len(left) - 1
search = corr[center - max_delay:center + max_delay + 1]
if len(search) == 0:
return None
# Peak delay in samples (positive = right leads = source on right)
peak_idx = np.argmax(search)
delay_samples = peak_idx - max_delay # centered: negative=left leads, positive=right leads
# Convert delay to angle
# delay_samples * (1/sample_rate) = time_diff
# sin(angle) = time_diff * speed_of_sound / separation
time_diff = delay_samples / SAMPLE_RATE
sin_angle = (time_diff * SPEED_OF_SOUND_MM_S) / self.separation
# Clamp to valid range (cross-correlation can overshoot)
sin_angle = max(-1.0, min(1.0, sin_angle))
angle_deg = math.degrees(math.asin(sin_angle))
# Convert from ±90° (negative=left, positive=right) to 0-360° convention
# 0°=front, 90°=right, 270°=left
if angle_deg >= 0:
bearing = 90.0 - angle_deg # right side: 0° → 90°, 90° → 0°
else:
bearing = 270.0 + angle_deg # left side: -90° → 180°
# Keep in 0-360
bearing = bearing % 360
return bearing, delay_samples
@staticmethod
def _shortest_angle_diff(target: float, current: float) -> float:
"""Shortest signed difference between two angles, for smooth interpolation."""
diff = target - current
if diff > 180:
diff -= 360
elif diff < -180:
diff += 360
return diff
@staticmethod @staticmethod
def _classify_proximity(distance_mm: float) -> str: def _classify_proximity(distance_mm: float) -> str:
"""Classify distance into a proximity zone.""" """Classify distance into a proximity zone."""