""" Binaural spatial hearing — triangulation, tracking, gaze. Combines DoA angles from two XVF3800 arrays into a sound source position, smooths the tracking, and pushes gaze coordinates to the eye service. """ import logging import math import time from typing import Optional import numpy as np logger = logging.getLogger("headmic.spatial") # Array geometry (measured on skull, can be overridden from config) DEFAULT_ARRAY_SEPARATION_MM = 175.0 # center-to-center distance between arrays # Gaze mapping GAZE_CENTER = 127 # neutral gaze (0-255 range) GAZE_X_RANGE = 80 # max horizontal deflection from center GAZE_Y_RANGE = 30 # max vertical deflection from center GAZE_MAX_DISTANCE_MM = 3000 # beyond this, gaze is "far" (no convergence) # Smoothing SMOOTHING_ALPHA = 0.4 # exponential smoothing (0=sluggish, 1=instant) — slightly snappy IDLE_RETURN_SPEED = 0.03 # how fast gaze drifts to center when no VAD — gentle drift IDLE_TIMEOUT_S = 1.5 # seconds of no VAD before drifting to center # ITD (Interaural Time Difference) SPEED_OF_SOUND_MM_S = 343000.0 # ~343 m/s in mm/s SAMPLE_RATE = 16000 ITD_MAX_DELAY_SAMPLES = 9 # ±175mm / (343m/s * 62.5μs/sample) ≈ ±8.2 samples ITD_WEIGHT = 0.3 # weight of ITD angle in fusion (DoA=0.5, ITD=0.3, ILD=0.2) DOA_WEIGHT = 0.5 ILD_DIST_WEIGHT = 0.3 # Distance estimation (ILD-based) # ILD = 20 * log10(louder_energy / quieter_energy) in dB # Empirical mapping: ILD varies with angle and distance. # At 175mm separation, a source at 45° off-center produces: # ~0.5m: ILD ≈ 6-10 dB # ~1.5m: ILD ≈ 3-5 dB # ~3.0m: ILD ≈ 1-2 dB # These are rough — calibrate on real hardware. PROXIMITY_ZONES = [ ("intimate", 0, 500), # < 0.5m — whispering distance ("conversational", 500, 2000), # 0.5-2m — normal talking ("across_room", 2000, 5000), # 2-5m — raised voice ("far", 5000, 99999), # > 5m — shouting distance ] class SpatialTracker: """Triangulates sound source from two DoA angles and produces smooth gaze.""" def __init__(self, array_separation_mm: float = DEFAULT_ARRAY_SEPARATION_MM): self.separation = array_separation_mm self.half_sep = array_separation_mm / 2.0 # Smoothed state self._smooth_x: float = 0.0 # mm, relative to skull center self._smooth_y: float = 0.0 # mm, forward from skull self._smooth_gaze_x: float = float(GAZE_CENTER) self._smooth_gaze_y: float = float(GAZE_CENTER) self._smooth_distance: float = GAZE_MAX_DISTANCE_MM self._smooth_ild: float = 0.0 # dB self._smooth_itd_angle: float = 0.0 # degrees, from cross-correlation self._last_itd_samples: float = 0.0 # raw delay in samples # VAD tracking self._last_vad_time: float = 0.0 self._any_vad: bool = False # Last raw result for API self.last_position: Optional[dict] = None def update(self, doa: dict, left_energy: float = 0.0, right_energy: float = 0.0, left_audio: bytes = None, right_audio: bytes = None) -> Optional[dict]: """ Process DoA readings + audio energy + raw audio from both arrays. Args: doa: {"left": {"angle": 0-359, "vad": bool}, "right": {"angle": 0-359, "vad": bool}} left_energy: RMS energy from left mic stream (0.0-1.0) right_energy: RMS energy from right mic stream (0.0-1.0) left_audio: raw PCM bytes from left ear (int16, for ITD cross-correlation) right_audio: raw PCM bytes from right ear (int16, for ITD cross-correlation) Returns: {"x_mm", "y_mm", "distance_mm", "ild_db", "itd_angle", "itd_delay_us", "proximity", "gaze_x", "gaze_y", "vad", "side"} or None if insufficient data. """ left = doa.get("left") right = doa.get("right") if not left or not right: return self._idle_drift() left_vad = left.get("vad", False) right_vad = right.get("vad", False) any_vad = left_vad or right_vad if any_vad: self._last_vad_time = time.monotonic() self._any_vad = True left_angle = left["angle"] right_angle = right["angle"] # Triangulate position pos = self._triangulate(left_angle, right_angle) # Compute ILD (Interaural Level Difference) ild_db = self._compute_ild(left_energy, right_energy) # Compute ITD if we have audio from both ears itd_angle = None if left_audio and right_audio and any_vad: itd_result = self._compute_itd(left_audio, right_audio) if itd_result is not None: itd_angle, self._last_itd_samples = itd_result self._smooth_itd_angle += SMOOTHING_ALPHA * ( self._shortest_angle_diff(itd_angle, self._smooth_itd_angle)) self._smooth_itd_angle %= 360 # keep in 0-360 if pos and any_vad: # Smooth the position self._smooth_x += SMOOTHING_ALPHA * (pos["x_mm"] - self._smooth_x) self._smooth_y += SMOOTHING_ALPHA * (pos["y_mm"] - self._smooth_y) self._smooth_ild += SMOOTHING_ALPHA * (ild_db - self._smooth_ild) # Fuse triangulated distance with ILD tri_dist = math.sqrt(self._smooth_x**2 + self._smooth_y**2) ild_dist = self._ild_to_distance(self._smooth_ild) fused_dist = (1.0 - ILD_DIST_WEIGHT) * tri_dist + ILD_DIST_WEIGHT * ild_dist self._smooth_distance += SMOOTHING_ALPHA * (fused_dist - self._smooth_distance) elif not any_vad: return self._idle_drift() # Convert to gaze gaze_x, gaze_y = self._position_to_gaze(self._smooth_x, self._smooth_y) # Smooth gaze self._smooth_gaze_x += SMOOTHING_ALPHA * (gaze_x - self._smooth_gaze_x) self._smooth_gaze_y += SMOOTHING_ALPHA * (gaze_y - self._smooth_gaze_y) # Classify proximity zone proximity = self._classify_proximity(self._smooth_distance) result = { "x_mm": round(self._smooth_x, 1), "y_mm": round(self._smooth_y, 1), "distance_mm": round(self._smooth_distance, 1), "ild_db": round(self._smooth_ild, 1), "itd_angle": round(self._smooth_itd_angle, 1), "itd_delay_us": round(self._last_itd_samples * 1e6 / SAMPLE_RATE, 1), "proximity": proximity, "gaze_x": int(round(self._smooth_gaze_x)), "gaze_y": int(round(self._smooth_gaze_y)), "vad": any_vad, "side": "left" if self._smooth_x < 0 else "right", } self.last_position = result return result def _idle_drift(self) -> Optional[dict]: """When no VAD, smoothly return gaze to center.""" elapsed = time.monotonic() - self._last_vad_time if elapsed < IDLE_TIMEOUT_S: # Hold last position briefly return self.last_position # Drift toward center self._smooth_gaze_x += IDLE_RETURN_SPEED * (GAZE_CENTER - self._smooth_gaze_x) self._smooth_gaze_y += IDLE_RETURN_SPEED * (GAZE_CENTER - self._smooth_gaze_y) result = { "x_mm": round(self._smooth_x, 1), "y_mm": round(self._smooth_y, 1), "distance_mm": round(self._smooth_distance, 1), "ild_db": round(self._smooth_ild, 1), "proximity": self._classify_proximity(self._smooth_distance), "gaze_x": int(round(self._smooth_gaze_x)), "gaze_y": int(round(self._smooth_gaze_y)), "vad": False, "side": "center", } self.last_position = result return result @staticmethod def _compute_ild(left_energy: float, right_energy: float) -> float: """Compute Interaural Level Difference in dB. Positive = louder on left, negative = louder on right.""" # Clamp to avoid log(0) left_e = max(left_energy, 1e-10) right_e = max(right_energy, 1e-10) return 20.0 * math.log10(left_e / right_e) @staticmethod def _ild_to_distance(ild_db: float) -> float: """Estimate distance from ILD magnitude. Higher ILD = closer source (head shadow effect is stronger up close). This is a rough empirical mapping — should be calibrated per-installation.""" ild_abs = abs(ild_db) if ild_abs > 8.0: return 300.0 # very close, ~30cm elif ild_abs > 5.0: return 700.0 # close, ~70cm elif ild_abs > 3.0: return 1500.0 # conversational, ~1.5m elif ild_abs > 1.5: return 2500.0 # across room, ~2.5m else: return 4000.0 # far or directly ahead (no ILD) def _compute_itd(self, left_audio: bytes, right_audio: bytes) -> Optional[tuple[float, float]]: """Compute Interaural Time Difference via cross-correlation. Returns (angle_degrees, delay_samples) or None if insufficient data. Positive delay = sound arrives at right ear first = source on right. """ try: left = np.frombuffer(left_audio, dtype=np.int16).astype(np.float32) right = np.frombuffer(right_audio, dtype=np.int16).astype(np.float32) except Exception: return None min_len = min(len(left), len(right)) if min_len < 64: return None # Use the last 512 samples (~32ms window) for correlation window = min(512, min_len) left = left[-window:] right = right[-window:] # Normalize to prevent overflow left_norm = np.linalg.norm(left) right_norm = np.linalg.norm(right) if left_norm < 1.0 or right_norm < 1.0: return None # silence left = left / left_norm right = right / right_norm # Cross-correlate within the expected delay range max_delay = ITD_MAX_DELAY_SAMPLES corr = np.correlate(left, right, mode='full') # corr center is at index len(left)-1, corresponding to zero delay center = len(left) - 1 search = corr[center - max_delay:center + max_delay + 1] if len(search) == 0: return None # Peak delay in samples (positive = right leads = source on right) peak_idx = np.argmax(search) delay_samples = peak_idx - max_delay # centered: negative=left leads, positive=right leads # Convert delay to angle # delay_samples * (1/sample_rate) = time_diff # sin(angle) = time_diff * speed_of_sound / separation time_diff = delay_samples / SAMPLE_RATE sin_angle = (time_diff * SPEED_OF_SOUND_MM_S) / self.separation # Clamp to valid range (cross-correlation can overshoot) sin_angle = max(-1.0, min(1.0, sin_angle)) angle_deg = math.degrees(math.asin(sin_angle)) # Convert from ±90° (negative=left, positive=right) to 0-360° convention # 0°=front, 90°=right, 270°=left if angle_deg >= 0: bearing = 90.0 - angle_deg # right side: 0° → 90°, 90° → 0° else: bearing = 270.0 + angle_deg # left side: -90° → 180° # Keep in 0-360 bearing = bearing % 360 return bearing, delay_samples @staticmethod def _shortest_angle_diff(target: float, current: float) -> float: """Shortest signed difference between two angles, for smooth interpolation.""" diff = target - current if diff > 180: diff -= 360 elif diff < -180: diff += 360 return diff @staticmethod def _classify_proximity(distance_mm: float) -> str: """Classify distance into a proximity zone.""" for name, lo, hi in PROXIMITY_ZONES: if lo <= distance_mm < hi: return name return "far" def _triangulate(self, left_deg: float, right_deg: float) -> Optional[dict]: """ Triangulate sound source position from two DoA angles. Array coordinate system: - Origin: center of skull - X axis: positive = right (toward right ear) - Y axis: positive = forward (in front of skull) Each array's DoA is 0° = front, 90° = right, 180° = back, 270° = left. The arrays are positioned at (-half_sep, 0) and (+half_sep, 0). """ # Convert DoA angles to bearing vectors # DoA 0° = forward (+Y), 90° = right (+X) for each array left_rad = math.radians(left_deg) right_rad = math.radians(right_deg) # Direction vectors from each array position # Left array at (-half_sep, 0), right array at (+half_sep, 0) left_dx = math.sin(left_rad) left_dy = math.cos(left_rad) right_dx = math.sin(right_rad) right_dy = math.cos(right_rad) # Solve intersection of two rays: # P_left + t * D_left = P_right + s * D_right # (-half_sep + t*left_dx, t*left_dy) = (half_sep + s*right_dx, s*right_dy) # # t*left_dx - s*right_dx = separation # t*left_dy - s*right_dy = 0 denom = left_dx * right_dy - left_dy * right_dx if abs(denom) < 0.001: # Parallel rays — can't triangulate, source is very far away or directly ahead # Fall back to bearing midpoint at a default distance avg_rad = (left_rad + right_rad) / 2 return { "x_mm": GAZE_MAX_DISTANCE_MM * math.sin(avg_rad), "y_mm": GAZE_MAX_DISTANCE_MM * math.cos(avg_rad), } t = (self.separation * right_dy) / denom if t < 0: # Intersection is behind the arrays — likely noise or rear source # Use the bearing with positive t scaled to max distance avg_rad = (left_rad + right_rad) / 2 return { "x_mm": GAZE_MAX_DISTANCE_MM * math.sin(avg_rad) * 0.5, "y_mm": GAZE_MAX_DISTANCE_MM * math.cos(avg_rad) * 0.5, } # Compute intersection point relative to left array, then shift to skull center x = -self.half_sep + t * left_dx y = t * left_dy return {"x_mm": x, "y_mm": y} def _position_to_gaze(self, x_mm: float, y_mm: float) -> tuple[float, float]: """ Convert position (mm) to gaze coordinates (0-255). Horizontal: source on the right → eyes look right (gaze_x > 127) Vertical: source closer → eyes look slightly down, farther → straight ahead """ distance = math.sqrt(x_mm**2 + y_mm**2) if distance < 1.0: return float(GAZE_CENTER), float(GAZE_CENTER) # Horizontal: angle from center angle = math.atan2(x_mm, max(y_mm, 100.0)) # clamp y to avoid extreme angles # Map angle (roughly -pi/2 to pi/2) to gaze range gaze_x = GAZE_CENTER + GAZE_X_RANGE * (angle / (math.pi / 2)) gaze_x = max(GAZE_CENTER - GAZE_X_RANGE, min(GAZE_CENTER + GAZE_X_RANGE, gaze_x)) # Vertical: closer = slightly down, far = center # This simulates looking down at someone close vs straight ahead at someone far proximity = max(0.0, 1.0 - distance / GAZE_MAX_DISTANCE_MM) gaze_y = GAZE_CENTER + GAZE_Y_RANGE * proximity * 0.3 # subtle effect return gaze_x, gaze_y