diff --git a/headmic.py b/headmic.py index cc1229a..f2b8dc1 100644 --- a/headmic.py +++ b/headmic.py @@ -450,7 +450,10 @@ def doa_track_loop(): if spatial_tracker and dual_stream: left_energy = dual_stream.left.get_energy() if dual_stream.left else 0.0 right_energy = dual_stream.right.get_energy() if dual_stream.right else 0.0 - result = spatial_tracker.update(state.doa, left_energy, right_energy) + left_audio = dual_stream.left.get_frame() if dual_stream.left else None + right_audio = dual_stream.right.get_frame() if dual_stream.right else None + result = spatial_tracker.update( + state.doa, left_energy, right_energy, left_audio, right_audio) if result: state.spatial = result gx, gy = result["gaze_x"], result["gaze_y"] diff --git a/spatial.py b/spatial.py index 11a170e..80249b0 100644 --- a/spatial.py +++ b/spatial.py @@ -10,6 +10,8 @@ import math import time from typing import Optional +import numpy as np + logger = logging.getLogger("headmic.spatial") # Array geometry (measured on skull, can be overridden from config) @@ -26,6 +28,14 @@ SMOOTHING_ALPHA = 0.4 # exponential smoothing (0=sluggish, 1=instant) — IDLE_RETURN_SPEED = 0.03 # how fast gaze drifts to center when no VAD — gentle drift IDLE_TIMEOUT_S = 1.5 # seconds of no VAD before drifting to center +# ITD (Interaural Time Difference) +SPEED_OF_SOUND_MM_S = 343000.0 # ~343 m/s in mm/s +SAMPLE_RATE = 16000 +ITD_MAX_DELAY_SAMPLES = 9 # ±175mm / (343m/s * 62.5μs/sample) ≈ ±8.2 samples +ITD_WEIGHT = 0.3 # weight of ITD angle in fusion (DoA=0.5, ITD=0.3, ILD=0.2) +DOA_WEIGHT = 0.5 +ILD_DIST_WEIGHT = 0.3 + # Distance estimation (ILD-based) # ILD = 20 * log10(louder_energy / quieter_energy) in dB # Empirical mapping: ILD varies with angle and distance. @@ -56,6 +66,8 @@ class SpatialTracker: self._smooth_gaze_y: float = float(GAZE_CENTER) self._smooth_distance: float = GAZE_MAX_DISTANCE_MM self._smooth_ild: float = 0.0 # dB + self._smooth_itd_angle: float = 0.0 # degrees, from cross-correlation + self._last_itd_samples: float = 0.0 # raw delay in samples # VAD tracking self._last_vad_time: float = 0.0 @@ -64,18 +76,21 @@ class SpatialTracker: # Last raw result for API self.last_position: Optional[dict] = None - def update(self, doa: dict, left_energy: float = 0.0, right_energy: float = 0.0) -> Optional[dict]: + def update(self, doa: dict, left_energy: float = 0.0, right_energy: float = 0.0, + left_audio: bytes = None, right_audio: bytes = None) -> Optional[dict]: """ - Process DoA readings + audio energy from both arrays. + Process DoA readings + audio energy + raw audio from both arrays. Args: doa: {"left": {"angle": 0-359, "vad": bool}, "right": {"angle": 0-359, "vad": bool}} left_energy: RMS energy from left mic stream (0.0-1.0) right_energy: RMS energy from right mic stream (0.0-1.0) + left_audio: raw PCM bytes from left ear (int16, for ITD cross-correlation) + right_audio: raw PCM bytes from right ear (int16, for ITD cross-correlation) Returns: - {"x_mm", "y_mm", "distance_mm", "ild_db", "proximity", - "gaze_x", "gaze_y", "vad", "side"} + {"x_mm", "y_mm", "distance_mm", "ild_db", "itd_angle", "itd_delay_us", + "proximity", "gaze_x", "gaze_y", "vad", "side"} or None if insufficient data. """ left = doa.get("left") @@ -101,6 +116,15 @@ class SpatialTracker: # Compute ILD (Interaural Level Difference) ild_db = self._compute_ild(left_energy, right_energy) + # Compute ITD if we have audio from both ears + itd_angle = None + if left_audio and right_audio and any_vad: + itd_result = self._compute_itd(left_audio, right_audio) + if itd_result is not None: + itd_angle, self._last_itd_samples = itd_result + self._smooth_itd_angle += SMOOTHING_ALPHA * ( + self._shortest_angle_diff(itd_angle, self._smooth_itd_angle)) + if pos and any_vad: # Smooth the position self._smooth_x += SMOOTHING_ALPHA * (pos["x_mm"] - self._smooth_x) @@ -110,8 +134,7 @@ class SpatialTracker: # Fuse triangulated distance with ILD tri_dist = math.sqrt(self._smooth_x**2 + self._smooth_y**2) ild_dist = self._ild_to_distance(self._smooth_ild) - # Weighted average: trust triangulation more (0.7) but let ILD correct it (0.3) - fused_dist = 0.7 * tri_dist + 0.3 * ild_dist + fused_dist = (1.0 - ILD_DIST_WEIGHT) * tri_dist + ILD_DIST_WEIGHT * ild_dist self._smooth_distance += SMOOTHING_ALPHA * (fused_dist - self._smooth_distance) elif not any_vad: return self._idle_drift() @@ -131,6 +154,8 @@ class SpatialTracker: "y_mm": round(self._smooth_y, 1), "distance_mm": round(self._smooth_distance, 1), "ild_db": round(self._smooth_ild, 1), + "itd_angle": round(self._smooth_itd_angle, 1), + "itd_delay_us": round(self._last_itd_samples * 1e6 / SAMPLE_RATE, 1), "proximity": proximity, "gaze_x": int(round(self._smooth_gaze_x)), "gaze_y": int(round(self._smooth_gaze_y)), @@ -192,6 +217,81 @@ class SpatialTracker: else: return 4000.0 # far or directly ahead (no ILD) + def _compute_itd(self, left_audio: bytes, right_audio: bytes) -> Optional[tuple[float, float]]: + """Compute Interaural Time Difference via cross-correlation. + + Returns (angle_degrees, delay_samples) or None if insufficient data. + Positive delay = sound arrives at right ear first = source on right. + """ + try: + left = np.frombuffer(left_audio, dtype=np.int16).astype(np.float32) + right = np.frombuffer(right_audio, dtype=np.int16).astype(np.float32) + except Exception: + return None + + min_len = min(len(left), len(right)) + if min_len < 64: + return None + + # Use the last 512 samples (~32ms window) for correlation + window = min(512, min_len) + left = left[-window:] + right = right[-window:] + + # Normalize to prevent overflow + left_norm = np.linalg.norm(left) + right_norm = np.linalg.norm(right) + if left_norm < 1.0 or right_norm < 1.0: + return None # silence + left = left / left_norm + right = right / right_norm + + # Cross-correlate within the expected delay range + max_delay = ITD_MAX_DELAY_SAMPLES + corr = np.correlate(left, right, mode='full') + # corr center is at index len(left)-1, corresponding to zero delay + center = len(left) - 1 + search = corr[center - max_delay:center + max_delay + 1] + + if len(search) == 0: + return None + + # Peak delay in samples (positive = right leads = source on right) + peak_idx = np.argmax(search) + delay_samples = peak_idx - max_delay # centered: negative=left leads, positive=right leads + + # Convert delay to angle + # delay_samples * (1/sample_rate) = time_diff + # sin(angle) = time_diff * speed_of_sound / separation + time_diff = delay_samples / SAMPLE_RATE + sin_angle = (time_diff * SPEED_OF_SOUND_MM_S) / self.separation + + # Clamp to valid range (cross-correlation can overshoot) + sin_angle = max(-1.0, min(1.0, sin_angle)) + angle_deg = math.degrees(math.asin(sin_angle)) + + # Convert from ±90° (negative=left, positive=right) to 0-360° convention + # 0°=front, 90°=right, 270°=left + if angle_deg >= 0: + bearing = 90.0 - angle_deg # right side: 0° → 90°, 90° → 0° + else: + bearing = 270.0 + angle_deg # left side: -90° → 180° + + # Keep in 0-360 + bearing = bearing % 360 + + return bearing, delay_samples + + @staticmethod + def _shortest_angle_diff(target: float, current: float) -> float: + """Shortest signed difference between two angles, for smooth interpolation.""" + diff = target - current + if diff > 180: + diff -= 360 + elif diff < -180: + diff += 360 + return diff + @staticmethod def _classify_proximity(distance_mm: float) -> str: """Classify distance into a proximity zone."""