From e0a4af031f494e6f2a51b595a88c9cb576c3d2ee Mon Sep 17 00:00:00 2001 From: Alex Date: Sun, 12 Apr 2026 15:12:28 -0500 Subject: [PATCH] Add binaural triangulation + smooth gaze tracking spatial.py: Triangulates sound source position from two DoA angles using ray intersection. Exponential smoothing prevents jitter. Gaze drifts back to center after 2s of silence. Converts position (mm) to gaze (0-255). headmic.py: Replaces simple doa_poll_loop with doa_track_loop that runs the spatial tracker and pushes gaze to the eye service when the position changes. Rate-limited to 10 pushes/sec with minimum delta threshold. /doa endpoint now returns triangulated position + gaze coordinates. Array separation (175mm) stored in config, overridable. Co-Authored-By: Claude Opus 4.6 (1M context) --- headmic.py | 74 +++++++++++++------ spatial.py | 213 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 264 insertions(+), 23 deletions(-) create mode 100644 spatial.py diff --git a/headmic.py b/headmic.py index 3a24c70..0fdfdfe 100644 --- a/headmic.py +++ b/headmic.py @@ -159,6 +159,7 @@ class ServiceState: self.enrolling: bool = False self.active_side: str = "left" # which mic array is currently active self.doa: dict = {} # latest DoA from both arrays + self.spatial: Optional[dict] = None # triangulated position + gaze state = ServiceState() @@ -390,34 +391,58 @@ def sound_classifier_loop(): # ============================================================================ -# DoA Polling Thread +# Spatial Tracking + Gaze (DoA → triangulation → eye service) # ============================================================================ -def doa_poll_loop(): - """Poll Direction of Arrival from both XVF3800 arrays.""" +from spatial import SpatialTracker + +spatial_tracker: Optional[SpatialTracker] = None +_last_gaze_push: tuple[int, int] = (GAZE_CENTER, GAZE_CENTER) +GAZE_CENTER = 127 +GAZE_PUSH_MIN_DELTA = 3 # don't push gaze unless it moved by at least this much +GAZE_PUSH_INTERVAL = 0.1 # max 10 gaze pushes/sec to eye service + + +def doa_track_loop(): + """Poll DoA, triangulate, smooth, push gaze to eye service.""" + global _last_gaze_push interval = 1.0 / DOA_POLL_HZ + last_push_time = 0.0 + while state.running: try: state.doa = xvf_manager.read_both_doa() + + if spatial_tracker: + result = spatial_tracker.update(state.doa) + if result: + state.spatial = result + gx, gy = result["gaze_x"], result["gaze_y"] + + # Push to eye service if changed enough and not too frequent + dx = abs(gx - _last_gaze_push[0]) + dy = abs(gy - _last_gaze_push[1]) + now = time.monotonic() + + if ((dx >= GAZE_PUSH_MIN_DELTA or dy >= GAZE_PUSH_MIN_DELTA) + and now - last_push_time >= GAZE_PUSH_INTERVAL): + _push_gaze(gx, gy) + _last_gaze_push = (gx, gy) + last_push_time = now except Exception as e: - logger.debug("DoA poll error: %s", e) + logger.debug("DoA/spatial error: %s", e) + time.sleep(interval) -def doa_to_gaze() -> Optional[tuple[int, int]]: - """Convert the active side's DoA angle to gaze coordinates for the eye service.""" - doa = state.doa - side = state.active_side - if not doa or side not in doa or doa[side] is None: - return None - if not doa[side].get("vad"): - return None - import math - angle = doa[side]["angle"] - rad = math.radians(angle) - x = int(127 - 80 * math.sin(rad)) - y = int(127 - 40 * math.cos(rad)) - return max(0, min(255, x)), max(0, min(255, y)) +def _push_gaze(x: int, y: int): + """Fire-and-forget gaze push to eye service.""" + try: + import httpx + httpx.post(f"{EYE_SERVICE_URL}/gaze", + json={"x": x, "y": y}, timeout=0.5) + except Exception: + pass # eye service may be down, don't spam logs # ============================================================================ @@ -497,10 +522,13 @@ async def startup(): except Exception as e: logger.warning("Speaker recognition unavailable: %s", e) - # --- DoA polling --- + # --- Spatial tracking (DoA → triangulation → gaze) --- if xvf_manager.left or xvf_manager.right: - threading.Thread(target=doa_poll_loop, daemon=True).start() - logger.info("DoA polling started at %d Hz", DOA_POLL_HZ) + array_sep = cfg.get("array_separation_mm", 175.0) + spatial_tracker = SpatialTracker(array_separation_mm=array_sep) + threading.Thread(target=doa_track_loop, daemon=True).start() + logger.info("Spatial tracking started (%d Hz, %.0fmm baseline, pushing gaze to %s)", + DOA_POLL_HZ, array_sep, EYE_SERVICE_URL) # --- Main listener --- thread = threading.Thread(target=listener_loop, daemon=True) @@ -570,11 +598,11 @@ async def last(): @app.get("/doa") async def doa(): - """Direction of Arrival from both mic arrays.""" + """Direction of Arrival from both mic arrays + triangulated position.""" return { "doa": state.doa, "active_side": state.active_side, - "gaze": doa_to_gaze(), + "spatial": state.spatial, } diff --git a/spatial.py b/spatial.py new file mode 100644 index 0000000..d09ad35 --- /dev/null +++ b/spatial.py @@ -0,0 +1,213 @@ +""" +Binaural spatial hearing — triangulation, tracking, gaze. + +Combines DoA angles from two XVF3800 arrays into a sound source position, +smooths the tracking, and pushes gaze coordinates to the eye service. +""" + +import logging +import math +import time +from typing import Optional + +logger = logging.getLogger("headmic.spatial") + +# Array geometry (measured on skull, can be overridden from config) +DEFAULT_ARRAY_SEPARATION_MM = 175.0 # center-to-center distance between arrays + +# Gaze mapping +GAZE_CENTER = 127 # neutral gaze (0-255 range) +GAZE_X_RANGE = 80 # max horizontal deflection from center +GAZE_Y_RANGE = 30 # max vertical deflection from center +GAZE_MAX_DISTANCE_MM = 3000 # beyond this, gaze is "far" (no convergence) + +# Smoothing +SMOOTHING_ALPHA = 0.3 # exponential smoothing (0=sluggish, 1=instant) +IDLE_RETURN_SPEED = 0.05 # how fast gaze drifts to center when no VAD +IDLE_TIMEOUT_S = 2.0 # seconds of no VAD before drifting to center + + +class SpatialTracker: + """Triangulates sound source from two DoA angles and produces smooth gaze.""" + + def __init__(self, array_separation_mm: float = DEFAULT_ARRAY_SEPARATION_MM): + self.separation = array_separation_mm + self.half_sep = array_separation_mm / 2.0 + + # Smoothed state + self._smooth_x: float = 0.0 # mm, relative to skull center + self._smooth_y: float = 0.0 # mm, forward from skull + self._smooth_gaze_x: float = float(GAZE_CENTER) + self._smooth_gaze_y: float = float(GAZE_CENTER) + + # VAD tracking + self._last_vad_time: float = 0.0 + self._any_vad: bool = False + + # Last raw result for API + self.last_position: Optional[dict] = None + + def update(self, doa: dict) -> Optional[dict]: + """ + Process DoA readings from both arrays. + + Args: + doa: {"left": {"angle": 0-359, "vad": bool}, "right": {"angle": 0-359, "vad": bool}} + + Returns: + {"x_mm": float, "y_mm": float, "distance_mm": float, + "gaze_x": int, "gaze_y": int, "vad": bool, "side": str} + or None if insufficient data. + """ + left = doa.get("left") + right = doa.get("right") + + if not left or not right: + return self._idle_drift() + + left_vad = left.get("vad", False) + right_vad = right.get("vad", False) + any_vad = left_vad or right_vad + + if any_vad: + self._last_vad_time = time.monotonic() + self._any_vad = True + + left_angle = left["angle"] + right_angle = right["angle"] + + # Triangulate position + pos = self._triangulate(left_angle, right_angle) + + if pos and any_vad: + # Smooth the position + self._smooth_x += SMOOTHING_ALPHA * (pos["x_mm"] - self._smooth_x) + self._smooth_y += SMOOTHING_ALPHA * (pos["y_mm"] - self._smooth_y) + elif not any_vad: + return self._idle_drift() + + # Convert to gaze + gaze_x, gaze_y = self._position_to_gaze(self._smooth_x, self._smooth_y) + + # Smooth gaze + self._smooth_gaze_x += SMOOTHING_ALPHA * (gaze_x - self._smooth_gaze_x) + self._smooth_gaze_y += SMOOTHING_ALPHA * (gaze_y - self._smooth_gaze_y) + + result = { + "x_mm": round(self._smooth_x, 1), + "y_mm": round(self._smooth_y, 1), + "distance_mm": round(math.sqrt(self._smooth_x**2 + self._smooth_y**2), 1), + "gaze_x": int(round(self._smooth_gaze_x)), + "gaze_y": int(round(self._smooth_gaze_y)), + "vad": any_vad, + "side": "left" if self._smooth_x < 0 else "right", + } + self.last_position = result + return result + + def _idle_drift(self) -> Optional[dict]: + """When no VAD, smoothly return gaze to center.""" + elapsed = time.monotonic() - self._last_vad_time + + if elapsed < IDLE_TIMEOUT_S: + # Hold last position briefly + return self.last_position + + # Drift toward center + self._smooth_gaze_x += IDLE_RETURN_SPEED * (GAZE_CENTER - self._smooth_gaze_x) + self._smooth_gaze_y += IDLE_RETURN_SPEED * (GAZE_CENTER - self._smooth_gaze_y) + + result = { + "x_mm": round(self._smooth_x, 1), + "y_mm": round(self._smooth_y, 1), + "distance_mm": round(math.sqrt(self._smooth_x**2 + self._smooth_y**2), 1), + "gaze_x": int(round(self._smooth_gaze_x)), + "gaze_y": int(round(self._smooth_gaze_y)), + "vad": False, + "side": "center", + } + self.last_position = result + return result + + def _triangulate(self, left_deg: float, right_deg: float) -> Optional[dict]: + """ + Triangulate sound source position from two DoA angles. + + Array coordinate system: + - Origin: center of skull + - X axis: positive = right (toward right ear) + - Y axis: positive = forward (in front of skull) + + Each array's DoA is 0° = front, 90° = right, 180° = back, 270° = left. + The arrays are positioned at (-half_sep, 0) and (+half_sep, 0). + """ + # Convert DoA angles to bearing vectors + # DoA 0° = forward (+Y), 90° = right (+X) for each array + left_rad = math.radians(left_deg) + right_rad = math.radians(right_deg) + + # Direction vectors from each array position + # Left array at (-half_sep, 0), right array at (+half_sep, 0) + left_dx = math.sin(left_rad) + left_dy = math.cos(left_rad) + right_dx = math.sin(right_rad) + right_dy = math.cos(right_rad) + + # Solve intersection of two rays: + # P_left + t * D_left = P_right + s * D_right + # (-half_sep + t*left_dx, t*left_dy) = (half_sep + s*right_dx, s*right_dy) + # + # t*left_dx - s*right_dx = separation + # t*left_dy - s*right_dy = 0 + + denom = left_dx * right_dy - left_dy * right_dx + + if abs(denom) < 0.001: + # Parallel rays — can't triangulate, source is very far away or directly ahead + # Fall back to bearing midpoint at a default distance + avg_rad = (left_rad + right_rad) / 2 + return { + "x_mm": GAZE_MAX_DISTANCE_MM * math.sin(avg_rad), + "y_mm": GAZE_MAX_DISTANCE_MM * math.cos(avg_rad), + } + + t = (self.separation * right_dy) / denom + + if t < 0: + # Intersection is behind the arrays — likely noise or rear source + # Use the bearing with positive t scaled to max distance + avg_rad = (left_rad + right_rad) / 2 + return { + "x_mm": GAZE_MAX_DISTANCE_MM * math.sin(avg_rad) * 0.5, + "y_mm": GAZE_MAX_DISTANCE_MM * math.cos(avg_rad) * 0.5, + } + + # Compute intersection point relative to left array, then shift to skull center + x = -self.half_sep + t * left_dx + y = t * left_dy + + return {"x_mm": x, "y_mm": y} + + def _position_to_gaze(self, x_mm: float, y_mm: float) -> tuple[float, float]: + """ + Convert position (mm) to gaze coordinates (0-255). + + Horizontal: source on the right → eyes look right (gaze_x > 127) + Vertical: source closer → eyes look slightly down, farther → straight ahead + """ + distance = math.sqrt(x_mm**2 + y_mm**2) + if distance < 1.0: + return float(GAZE_CENTER), float(GAZE_CENTER) + + # Horizontal: angle from center + angle = math.atan2(x_mm, max(y_mm, 100.0)) # clamp y to avoid extreme angles + # Map angle (roughly -pi/2 to pi/2) to gaze range + gaze_x = GAZE_CENTER + GAZE_X_RANGE * (angle / (math.pi / 2)) + gaze_x = max(GAZE_CENTER - GAZE_X_RANGE, min(GAZE_CENTER + GAZE_X_RANGE, gaze_x)) + + # Vertical: closer = slightly down, far = center + # This simulates looking down at someone close vs straight ahead at someone far + proximity = max(0.0, 1.0 - distance / GAZE_MAX_DISTANCE_MM) + gaze_y = GAZE_CENTER + GAZE_Y_RANGE * proximity * 0.3 # subtle effect + + return gaze_x, gaze_y