Add binaural triangulation + smooth gaze tracking
spatial.py: Triangulates sound source position from two DoA angles using ray intersection. Exponential smoothing prevents jitter. Gaze drifts back to center after 2s of silence. Converts position (mm) to gaze (0-255). headmic.py: Replaces simple doa_poll_loop with doa_track_loop that runs the spatial tracker and pushes gaze to the eye service when the position changes. Rate-limited to 10 pushes/sec with minimum delta threshold. /doa endpoint now returns triangulated position + gaze coordinates. Array separation (175mm) stored in config, overridable. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
74
headmic.py
74
headmic.py
@@ -159,6 +159,7 @@ class ServiceState:
|
||||
self.enrolling: bool = False
|
||||
self.active_side: str = "left" # which mic array is currently active
|
||||
self.doa: dict = {} # latest DoA from both arrays
|
||||
self.spatial: Optional[dict] = None # triangulated position + gaze
|
||||
|
||||
state = ServiceState()
|
||||
|
||||
@@ -390,34 +391,58 @@ def sound_classifier_loop():
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# DoA Polling Thread
|
||||
# Spatial Tracking + Gaze (DoA → triangulation → eye service)
|
||||
# ============================================================================
|
||||
|
||||
def doa_poll_loop():
|
||||
"""Poll Direction of Arrival from both XVF3800 arrays."""
|
||||
from spatial import SpatialTracker
|
||||
|
||||
spatial_tracker: Optional[SpatialTracker] = None
|
||||
_last_gaze_push: tuple[int, int] = (GAZE_CENTER, GAZE_CENTER)
|
||||
GAZE_CENTER = 127
|
||||
GAZE_PUSH_MIN_DELTA = 3 # don't push gaze unless it moved by at least this much
|
||||
GAZE_PUSH_INTERVAL = 0.1 # max 10 gaze pushes/sec to eye service
|
||||
|
||||
|
||||
def doa_track_loop():
|
||||
"""Poll DoA, triangulate, smooth, push gaze to eye service."""
|
||||
global _last_gaze_push
|
||||
interval = 1.0 / DOA_POLL_HZ
|
||||
last_push_time = 0.0
|
||||
|
||||
while state.running:
|
||||
try:
|
||||
state.doa = xvf_manager.read_both_doa()
|
||||
|
||||
if spatial_tracker:
|
||||
result = spatial_tracker.update(state.doa)
|
||||
if result:
|
||||
state.spatial = result
|
||||
gx, gy = result["gaze_x"], result["gaze_y"]
|
||||
|
||||
# Push to eye service if changed enough and not too frequent
|
||||
dx = abs(gx - _last_gaze_push[0])
|
||||
dy = abs(gy - _last_gaze_push[1])
|
||||
now = time.monotonic()
|
||||
|
||||
if ((dx >= GAZE_PUSH_MIN_DELTA or dy >= GAZE_PUSH_MIN_DELTA)
|
||||
and now - last_push_time >= GAZE_PUSH_INTERVAL):
|
||||
_push_gaze(gx, gy)
|
||||
_last_gaze_push = (gx, gy)
|
||||
last_push_time = now
|
||||
except Exception as e:
|
||||
logger.debug("DoA poll error: %s", e)
|
||||
logger.debug("DoA/spatial error: %s", e)
|
||||
|
||||
time.sleep(interval)
|
||||
|
||||
|
||||
def doa_to_gaze() -> Optional[tuple[int, int]]:
|
||||
"""Convert the active side's DoA angle to gaze coordinates for the eye service."""
|
||||
doa = state.doa
|
||||
side = state.active_side
|
||||
if not doa or side not in doa or doa[side] is None:
|
||||
return None
|
||||
if not doa[side].get("vad"):
|
||||
return None
|
||||
import math
|
||||
angle = doa[side]["angle"]
|
||||
rad = math.radians(angle)
|
||||
x = int(127 - 80 * math.sin(rad))
|
||||
y = int(127 - 40 * math.cos(rad))
|
||||
return max(0, min(255, x)), max(0, min(255, y))
|
||||
def _push_gaze(x: int, y: int):
|
||||
"""Fire-and-forget gaze push to eye service."""
|
||||
try:
|
||||
import httpx
|
||||
httpx.post(f"{EYE_SERVICE_URL}/gaze",
|
||||
json={"x": x, "y": y}, timeout=0.5)
|
||||
except Exception:
|
||||
pass # eye service may be down, don't spam logs
|
||||
|
||||
|
||||
# ============================================================================
|
||||
@@ -497,10 +522,13 @@ async def startup():
|
||||
except Exception as e:
|
||||
logger.warning("Speaker recognition unavailable: %s", e)
|
||||
|
||||
# --- DoA polling ---
|
||||
# --- Spatial tracking (DoA → triangulation → gaze) ---
|
||||
if xvf_manager.left or xvf_manager.right:
|
||||
threading.Thread(target=doa_poll_loop, daemon=True).start()
|
||||
logger.info("DoA polling started at %d Hz", DOA_POLL_HZ)
|
||||
array_sep = cfg.get("array_separation_mm", 175.0)
|
||||
spatial_tracker = SpatialTracker(array_separation_mm=array_sep)
|
||||
threading.Thread(target=doa_track_loop, daemon=True).start()
|
||||
logger.info("Spatial tracking started (%d Hz, %.0fmm baseline, pushing gaze to %s)",
|
||||
DOA_POLL_HZ, array_sep, EYE_SERVICE_URL)
|
||||
|
||||
# --- Main listener ---
|
||||
thread = threading.Thread(target=listener_loop, daemon=True)
|
||||
@@ -570,11 +598,11 @@ async def last():
|
||||
|
||||
@app.get("/doa")
|
||||
async def doa():
|
||||
"""Direction of Arrival from both mic arrays."""
|
||||
"""Direction of Arrival from both mic arrays + triangulated position."""
|
||||
return {
|
||||
"doa": state.doa,
|
||||
"active_side": state.active_side,
|
||||
"gaze": doa_to_gaze(),
|
||||
"spatial": state.spatial,
|
||||
}
|
||||
|
||||
|
||||
|
||||
213
spatial.py
Normal file
213
spatial.py
Normal file
@@ -0,0 +1,213 @@
|
||||
"""
|
||||
Binaural spatial hearing — triangulation, tracking, gaze.
|
||||
|
||||
Combines DoA angles from two XVF3800 arrays into a sound source position,
|
||||
smooths the tracking, and pushes gaze coordinates to the eye service.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import math
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
logger = logging.getLogger("headmic.spatial")
|
||||
|
||||
# Array geometry (measured on skull, can be overridden from config)
|
||||
DEFAULT_ARRAY_SEPARATION_MM = 175.0 # center-to-center distance between arrays
|
||||
|
||||
# Gaze mapping
|
||||
GAZE_CENTER = 127 # neutral gaze (0-255 range)
|
||||
GAZE_X_RANGE = 80 # max horizontal deflection from center
|
||||
GAZE_Y_RANGE = 30 # max vertical deflection from center
|
||||
GAZE_MAX_DISTANCE_MM = 3000 # beyond this, gaze is "far" (no convergence)
|
||||
|
||||
# Smoothing
|
||||
SMOOTHING_ALPHA = 0.3 # exponential smoothing (0=sluggish, 1=instant)
|
||||
IDLE_RETURN_SPEED = 0.05 # how fast gaze drifts to center when no VAD
|
||||
IDLE_TIMEOUT_S = 2.0 # seconds of no VAD before drifting to center
|
||||
|
||||
|
||||
class SpatialTracker:
|
||||
"""Triangulates sound source from two DoA angles and produces smooth gaze."""
|
||||
|
||||
def __init__(self, array_separation_mm: float = DEFAULT_ARRAY_SEPARATION_MM):
|
||||
self.separation = array_separation_mm
|
||||
self.half_sep = array_separation_mm / 2.0
|
||||
|
||||
# Smoothed state
|
||||
self._smooth_x: float = 0.0 # mm, relative to skull center
|
||||
self._smooth_y: float = 0.0 # mm, forward from skull
|
||||
self._smooth_gaze_x: float = float(GAZE_CENTER)
|
||||
self._smooth_gaze_y: float = float(GAZE_CENTER)
|
||||
|
||||
# VAD tracking
|
||||
self._last_vad_time: float = 0.0
|
||||
self._any_vad: bool = False
|
||||
|
||||
# Last raw result for API
|
||||
self.last_position: Optional[dict] = None
|
||||
|
||||
def update(self, doa: dict) -> Optional[dict]:
|
||||
"""
|
||||
Process DoA readings from both arrays.
|
||||
|
||||
Args:
|
||||
doa: {"left": {"angle": 0-359, "vad": bool}, "right": {"angle": 0-359, "vad": bool}}
|
||||
|
||||
Returns:
|
||||
{"x_mm": float, "y_mm": float, "distance_mm": float,
|
||||
"gaze_x": int, "gaze_y": int, "vad": bool, "side": str}
|
||||
or None if insufficient data.
|
||||
"""
|
||||
left = doa.get("left")
|
||||
right = doa.get("right")
|
||||
|
||||
if not left or not right:
|
||||
return self._idle_drift()
|
||||
|
||||
left_vad = left.get("vad", False)
|
||||
right_vad = right.get("vad", False)
|
||||
any_vad = left_vad or right_vad
|
||||
|
||||
if any_vad:
|
||||
self._last_vad_time = time.monotonic()
|
||||
self._any_vad = True
|
||||
|
||||
left_angle = left["angle"]
|
||||
right_angle = right["angle"]
|
||||
|
||||
# Triangulate position
|
||||
pos = self._triangulate(left_angle, right_angle)
|
||||
|
||||
if pos and any_vad:
|
||||
# Smooth the position
|
||||
self._smooth_x += SMOOTHING_ALPHA * (pos["x_mm"] - self._smooth_x)
|
||||
self._smooth_y += SMOOTHING_ALPHA * (pos["y_mm"] - self._smooth_y)
|
||||
elif not any_vad:
|
||||
return self._idle_drift()
|
||||
|
||||
# Convert to gaze
|
||||
gaze_x, gaze_y = self._position_to_gaze(self._smooth_x, self._smooth_y)
|
||||
|
||||
# Smooth gaze
|
||||
self._smooth_gaze_x += SMOOTHING_ALPHA * (gaze_x - self._smooth_gaze_x)
|
||||
self._smooth_gaze_y += SMOOTHING_ALPHA * (gaze_y - self._smooth_gaze_y)
|
||||
|
||||
result = {
|
||||
"x_mm": round(self._smooth_x, 1),
|
||||
"y_mm": round(self._smooth_y, 1),
|
||||
"distance_mm": round(math.sqrt(self._smooth_x**2 + self._smooth_y**2), 1),
|
||||
"gaze_x": int(round(self._smooth_gaze_x)),
|
||||
"gaze_y": int(round(self._smooth_gaze_y)),
|
||||
"vad": any_vad,
|
||||
"side": "left" if self._smooth_x < 0 else "right",
|
||||
}
|
||||
self.last_position = result
|
||||
return result
|
||||
|
||||
def _idle_drift(self) -> Optional[dict]:
|
||||
"""When no VAD, smoothly return gaze to center."""
|
||||
elapsed = time.monotonic() - self._last_vad_time
|
||||
|
||||
if elapsed < IDLE_TIMEOUT_S:
|
||||
# Hold last position briefly
|
||||
return self.last_position
|
||||
|
||||
# Drift toward center
|
||||
self._smooth_gaze_x += IDLE_RETURN_SPEED * (GAZE_CENTER - self._smooth_gaze_x)
|
||||
self._smooth_gaze_y += IDLE_RETURN_SPEED * (GAZE_CENTER - self._smooth_gaze_y)
|
||||
|
||||
result = {
|
||||
"x_mm": round(self._smooth_x, 1),
|
||||
"y_mm": round(self._smooth_y, 1),
|
||||
"distance_mm": round(math.sqrt(self._smooth_x**2 + self._smooth_y**2), 1),
|
||||
"gaze_x": int(round(self._smooth_gaze_x)),
|
||||
"gaze_y": int(round(self._smooth_gaze_y)),
|
||||
"vad": False,
|
||||
"side": "center",
|
||||
}
|
||||
self.last_position = result
|
||||
return result
|
||||
|
||||
def _triangulate(self, left_deg: float, right_deg: float) -> Optional[dict]:
|
||||
"""
|
||||
Triangulate sound source position from two DoA angles.
|
||||
|
||||
Array coordinate system:
|
||||
- Origin: center of skull
|
||||
- X axis: positive = right (toward right ear)
|
||||
- Y axis: positive = forward (in front of skull)
|
||||
|
||||
Each array's DoA is 0° = front, 90° = right, 180° = back, 270° = left.
|
||||
The arrays are positioned at (-half_sep, 0) and (+half_sep, 0).
|
||||
"""
|
||||
# Convert DoA angles to bearing vectors
|
||||
# DoA 0° = forward (+Y), 90° = right (+X) for each array
|
||||
left_rad = math.radians(left_deg)
|
||||
right_rad = math.radians(right_deg)
|
||||
|
||||
# Direction vectors from each array position
|
||||
# Left array at (-half_sep, 0), right array at (+half_sep, 0)
|
||||
left_dx = math.sin(left_rad)
|
||||
left_dy = math.cos(left_rad)
|
||||
right_dx = math.sin(right_rad)
|
||||
right_dy = math.cos(right_rad)
|
||||
|
||||
# Solve intersection of two rays:
|
||||
# P_left + t * D_left = P_right + s * D_right
|
||||
# (-half_sep + t*left_dx, t*left_dy) = (half_sep + s*right_dx, s*right_dy)
|
||||
#
|
||||
# t*left_dx - s*right_dx = separation
|
||||
# t*left_dy - s*right_dy = 0
|
||||
|
||||
denom = left_dx * right_dy - left_dy * right_dx
|
||||
|
||||
if abs(denom) < 0.001:
|
||||
# Parallel rays — can't triangulate, source is very far away or directly ahead
|
||||
# Fall back to bearing midpoint at a default distance
|
||||
avg_rad = (left_rad + right_rad) / 2
|
||||
return {
|
||||
"x_mm": GAZE_MAX_DISTANCE_MM * math.sin(avg_rad),
|
||||
"y_mm": GAZE_MAX_DISTANCE_MM * math.cos(avg_rad),
|
||||
}
|
||||
|
||||
t = (self.separation * right_dy) / denom
|
||||
|
||||
if t < 0:
|
||||
# Intersection is behind the arrays — likely noise or rear source
|
||||
# Use the bearing with positive t scaled to max distance
|
||||
avg_rad = (left_rad + right_rad) / 2
|
||||
return {
|
||||
"x_mm": GAZE_MAX_DISTANCE_MM * math.sin(avg_rad) * 0.5,
|
||||
"y_mm": GAZE_MAX_DISTANCE_MM * math.cos(avg_rad) * 0.5,
|
||||
}
|
||||
|
||||
# Compute intersection point relative to left array, then shift to skull center
|
||||
x = -self.half_sep + t * left_dx
|
||||
y = t * left_dy
|
||||
|
||||
return {"x_mm": x, "y_mm": y}
|
||||
|
||||
def _position_to_gaze(self, x_mm: float, y_mm: float) -> tuple[float, float]:
|
||||
"""
|
||||
Convert position (mm) to gaze coordinates (0-255).
|
||||
|
||||
Horizontal: source on the right → eyes look right (gaze_x > 127)
|
||||
Vertical: source closer → eyes look slightly down, farther → straight ahead
|
||||
"""
|
||||
distance = math.sqrt(x_mm**2 + y_mm**2)
|
||||
if distance < 1.0:
|
||||
return float(GAZE_CENTER), float(GAZE_CENTER)
|
||||
|
||||
# Horizontal: angle from center
|
||||
angle = math.atan2(x_mm, max(y_mm, 100.0)) # clamp y to avoid extreme angles
|
||||
# Map angle (roughly -pi/2 to pi/2) to gaze range
|
||||
gaze_x = GAZE_CENTER + GAZE_X_RANGE * (angle / (math.pi / 2))
|
||||
gaze_x = max(GAZE_CENTER - GAZE_X_RANGE, min(GAZE_CENTER + GAZE_X_RANGE, gaze_x))
|
||||
|
||||
# Vertical: closer = slightly down, far = center
|
||||
# This simulates looking down at someone close vs straight ahead at someone far
|
||||
proximity = max(0.0, 1.0 - distance / GAZE_MAX_DISTANCE_MM)
|
||||
gaze_y = GAZE_CENTER + GAZE_Y_RANGE * proximity * 0.3 # subtle effect
|
||||
|
||||
return gaze_x, gaze_y
|
||||
Reference in New Issue
Block a user