Add spatial audio scene mapping + sound event localization (#6 + #8)

spatial_scene.py: Builds a persistent map of where each sound category
usually comes from (30° angle bins, circular mean). Detects anomalies
when a sound appears from an unusual direction (90°+ deviation).
Scene map persists to ~/.vixy/scene_map.json across restarts.

headmic.py: Feed classified sounds + spatial position into scene tracker.
New endpoints:
  /scene — learned scene summary + last anomaly
  /scene/events — recent events with what+where+when
  /scene/heatmap — per-category angular distribution (for visualization)

Example: after running for a day, /scene might show:
  {"speech": {"usual_angle": 15.0, "observations": 847},
   "music": {"usual_angle": 270.0, "observations": 312}}
And if speech comes from 270° (where music usually is): spatial anomaly.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alex
2026-04-12 21:17:29 -05:00
parent 2a25db8498
commit 8f71d97af6
2 changed files with 346 additions and 1 deletions

View File

@@ -160,6 +160,7 @@ class ServiceState:
self.active_side: str = "left" # which mic array is currently active self.active_side: str = "left" # which mic array is currently active
self.doa: dict = {} # latest DoA from both arrays self.doa: dict = {} # latest DoA from both arrays
self.spatial: Optional[dict] = None # triangulated position + gaze self.spatial: Optional[dict] = None # triangulated position + gaze
self.last_anomaly: Optional[dict] = None # last spatial anomaly detected
state = ServiceState() state = ServiceState()
@@ -173,6 +174,9 @@ enrollment_buffer = None # list of frame bytes, set during enrollment
# Binaural recorder # Binaural recorder
binaural_recorder = None binaural_recorder = None
# Spatial scene
spatial_scene = None
enrollment_name = None enrollment_name = None
# Audio stream # Audio stream
@@ -385,6 +389,18 @@ def sound_classifier_loop():
audio_f32 = result.pop("audio_float32", None) audio_f32 = result.pop("audio_float32", None)
state.audio_scene = result state.audio_scene = result
# Spatial scene: log classified sound with its position
if spatial_scene and state.spatial and result.get("category"):
top = result.get("top_classes", [{}])[0] if result.get("top_classes") else {}
anomaly = spatial_scene.observe(
category=result["category"],
top_class=top.get("name", result["category"]),
score=top.get("score", 0),
spatial=state.spatial,
)
if anomaly:
state.last_anomaly = anomaly
# Speaker identification: run when speech detected # Speaker identification: run when speech detected
if speaker_recognizer and result["category"] == "speech" and audio_f32 is not None: if speaker_recognizer and result["category"] == "speech" and audio_f32 is not None:
try: try:
@@ -466,7 +482,7 @@ app = FastAPI(title="HeadMic", description="Vixy's Ears 🦊👂 (Dual XVF3800)"
@app.on_event("startup") @app.on_event("startup")
async def startup(): async def startup():
global sound_classifier, sound_ring_buffer, speaker_recognizer, dual_stream, LEDS_AVAILABLE, spatial_tracker, binaural_recorder global sound_classifier, sound_ring_buffer, speaker_recognizer, dual_stream, LEDS_AVAILABLE, spatial_tracker, binaural_recorder, spatial_scene
state.running = True state.running = True
@@ -542,6 +558,11 @@ async def startup():
logger.info("Spatial tracking started (%d Hz, %.0fmm baseline, pushing gaze to %s)", logger.info("Spatial tracking started (%d Hz, %.0fmm baseline, pushing gaze to %s)",
DOA_POLL_HZ, array_sep, EYE_SERVICE_URL) DOA_POLL_HZ, array_sep, EYE_SERVICE_URL)
# --- Spatial scene mapping ---
from spatial_scene import SpatialScene
spatial_scene = SpatialScene()
spatial_scene.start()
# --- Binaural recording --- # --- Binaural recording ---
if os.environ.get("BINAURAL_RECORD", "").lower() in ("1", "true", "yes"): if os.environ.get("BINAURAL_RECORD", "").lower() in ("1", "true", "yes"):
from binaural_recorder import BinauralRecorder from binaural_recorder import BinauralRecorder
@@ -561,6 +582,8 @@ async def startup():
async def shutdown(): async def shutdown():
state.running = False state.running = False
leds_off() leds_off()
if spatial_scene:
spatial_scene.stop()
if binaural_recorder: if binaural_recorder:
binaural_recorder.stop() binaural_recorder.stop()
if dual_stream: if dual_stream:
@@ -629,6 +652,35 @@ async def doa():
} }
# --- Spatial scene ---
@app.get("/scene")
async def scene():
"""Learned spatial audio scene — where each sound type usually comes from."""
if not spatial_scene:
return {"scene": {}, "last_anomaly": None}
return {
"scene": spatial_scene.get_scene_summary(),
"last_anomaly": state.last_anomaly,
}
@app.get("/scene/events")
async def scene_events(seconds: int = 30, category: str = None):
"""Recent sound events with spatial information."""
if not spatial_scene:
return {"events": []}
return {"events": spatial_scene.get_recent_events(seconds, category)}
@app.get("/scene/heatmap")
async def scene_heatmap():
"""Observation counts per angle bin per category — for visualization."""
if not spatial_scene:
return {"heatmap": {}}
return {"heatmap": spatial_scene.get_spatial_heatmap()}
# --- Binaural recording --- # --- Binaural recording ---
@app.get("/recording") @app.get("/recording")

293
spatial_scene.py Normal file
View File

@@ -0,0 +1,293 @@
"""
Spatial audio scene — maps what sounds come from where, over time.
Builds a persistent picture of the acoustic environment:
- "TV is usually at 270°"
- "Door knocks come from 90°"
- "Speech mostly from 0-45° (the desk area)"
Combines YAMNet classification with triangulated position to create
a spatial-temporal log of sound events. Detects anomalies when a
sound type appears from an unusual direction.
Data feeds into LYRA context for environmental awareness.
"""
import json
import logging
import math
import os
import time
import threading
from collections import defaultdict, deque
from pathlib import Path
from typing import Optional
logger = logging.getLogger("headmic.scene")
# Scene configuration
ANGLE_BIN_SIZE = 30 # degrees per bin (12 bins around 360°)
EVENT_HISTORY_SIZE = 500 # max events in memory
SCENE_SAVE_INTERVAL = 60.0 # save scene map to disk every N seconds
ANOMALY_MIN_OBSERVATIONS = 10 # need this many of a category before detecting anomalies
ANOMALY_ANGLE_THRESHOLD = 90 # degrees from usual position = anomalous
DEFAULT_SCENE_PATH = os.path.expanduser("~/.vixy/scene_map.json")
def _angle_bin(angle_deg: float) -> int:
"""Quantize angle to bin index."""
return int(angle_deg / ANGLE_BIN_SIZE) % (360 // ANGLE_BIN_SIZE)
def _bin_center(bin_idx: int) -> float:
"""Center angle of a bin."""
return bin_idx * ANGLE_BIN_SIZE + ANGLE_BIN_SIZE / 2
def _angle_distance(a1: float, a2: float) -> float:
"""Shortest angular distance between two angles in degrees."""
diff = abs(a1 - a2) % 360
return min(diff, 360 - diff)
class SoundEvent:
"""A single classified sound with spatial information."""
def __init__(self, category: str, top_class: str, score: float,
angle: float, distance_mm: float, proximity: str,
side: str, timestamp: float):
self.category = category
self.top_class = top_class
self.score = score
self.angle = angle
self.distance_mm = distance_mm
self.proximity = proximity
self.side = side
self.timestamp = timestamp
def to_dict(self) -> dict:
return {
"category": self.category,
"top_class": self.top_class,
"score": round(self.score, 3),
"angle": round(self.angle, 1),
"distance_mm": round(self.distance_mm, 1),
"proximity": self.proximity,
"side": self.side,
"timestamp": round(self.timestamp, 2),
"age_seconds": round(time.time() - self.timestamp, 1),
}
class SpatialScene:
"""Persistent spatial audio scene map."""
def __init__(self, scene_path: str = DEFAULT_SCENE_PATH):
self.scene_path = scene_path
# Event log (recent history)
self.events: deque[SoundEvent] = deque(maxlen=EVENT_HISTORY_SIZE)
# Learned scene map: category → {angle_bin → count}
# Tracks where each type of sound usually comes from
self.scene_map: dict[str, dict[int, int]] = defaultdict(lambda: defaultdict(int))
# Total observations per category (for anomaly detection)
self.category_totals: dict[str, int] = defaultdict(int)
# Last anomaly per category (avoid spamming)
self._last_anomaly: dict[str, float] = {}
self._lock = threading.Lock()
self._save_thread: Optional[threading.Thread] = None
self._running = False
self._load()
def start(self):
self._running = True
self._save_thread = threading.Thread(target=self._save_loop, daemon=True)
self._save_thread.start()
logger.info("Spatial scene tracking started (%d learned categories, saving to %s)",
len(self.scene_map), self.scene_path)
def stop(self):
self._running = False
self._save()
def observe(self, category: str, top_class: str, score: float,
spatial: dict) -> Optional[dict]:
"""
Record a classified sound with its spatial position.
Returns anomaly info if this sound is coming from an unusual direction.
"""
if not spatial or not spatial.get("vad"):
return None
# Compute angle from position
x_mm = spatial.get("x_mm", 0)
y_mm = spatial.get("y_mm", 0)
angle = math.degrees(math.atan2(x_mm, max(y_mm, 1.0))) % 360
event = SoundEvent(
category=category,
top_class=top_class,
score=score,
angle=angle,
distance_mm=spatial.get("distance_mm", 0),
proximity=spatial.get("proximity", "unknown"),
side=spatial.get("side", "center"),
timestamp=time.time(),
)
anomaly = None
with self._lock:
self.events.append(event)
# Update scene map
angle_bin = _angle_bin(angle)
self.scene_map[category][angle_bin] += 1
self.category_totals[category] += 1
# Check for anomaly
anomaly = self._check_anomaly(event)
return anomaly
def _check_anomaly(self, event: SoundEvent) -> Optional[dict]:
"""Check if this event is from an unusual direction for its category."""
category = event.category
total = self.category_totals[category]
if total < ANOMALY_MIN_OBSERVATIONS:
return None
# Rate-limit anomalies per category (once per 30s)
now = time.time()
if now - self._last_anomaly.get(category, 0) < 30.0:
return None
# Find the usual direction for this category
usual_angle = self.get_usual_direction(category)
if usual_angle is None:
return None
deviation = _angle_distance(event.angle, usual_angle)
if deviation >= ANOMALY_ANGLE_THRESHOLD:
self._last_anomaly[category] = now
anomaly = {
"type": "spatial_anomaly",
"category": category,
"top_class": event.top_class,
"expected_angle": round(usual_angle, 1),
"actual_angle": round(event.angle, 1),
"deviation": round(deviation, 1),
"proximity": event.proximity,
"message": f"{event.top_class} from unusual direction "
f"({round(event.angle)}° vs usual {round(usual_angle)}°)",
}
logger.info("Spatial anomaly: %s", anomaly["message"])
return anomaly
return None
def get_usual_direction(self, category: str) -> Optional[float]:
"""Get the most common direction for a sound category (weighted average)."""
with self._lock:
bins = self.scene_map.get(category)
if not bins:
return None
# Weighted circular mean
total_weight = sum(bins.values())
if total_weight == 0:
return None
sin_sum = 0.0
cos_sum = 0.0
for bin_idx, count in bins.items():
angle_rad = math.radians(_bin_center(bin_idx))
sin_sum += count * math.sin(angle_rad)
cos_sum += count * math.cos(angle_rad)
mean_angle = math.degrees(math.atan2(sin_sum, cos_sum)) % 360
return mean_angle
def get_scene_summary(self) -> dict:
"""Get a summary of the learned spatial scene."""
with self._lock:
summary = {}
for category in sorted(self.scene_map.keys()):
usual = self.get_usual_direction(category)
total = self.category_totals[category]
if usual is not None:
summary[category] = {
"usual_angle": round(usual, 1),
"observations": total,
}
return summary
def get_recent_events(self, seconds: float = 30.0, category: str = None) -> list[dict]:
"""Get recent sound events, optionally filtered by category."""
cutoff = time.time() - seconds
with self._lock:
events = [e.to_dict() for e in self.events
if e.timestamp >= cutoff
and (category is None or e.category == category)]
return events
def get_spatial_heatmap(self) -> dict[str, list]:
"""Get observation counts per angle bin, per category.
Useful for visualization."""
with self._lock:
n_bins = 360 // ANGLE_BIN_SIZE
heatmap = {}
for category, bins in self.scene_map.items():
counts = [bins.get(i, 0) for i in range(n_bins)]
heatmap[category] = {
"bin_size_deg": ANGLE_BIN_SIZE,
"counts": counts,
"total": self.category_totals[category],
}
return heatmap
def _save(self):
"""Save scene map to disk."""
with self._lock:
data = {
"scene_map": {k: dict(v) for k, v in self.scene_map.items()},
"category_totals": dict(self.category_totals),
"saved_at": time.time(),
}
try:
os.makedirs(os.path.dirname(self.scene_path), exist_ok=True)
with open(self.scene_path, "w") as f:
json.dump(data, f, indent=2)
except Exception as e:
logger.warning("Failed to save scene map: %s", e)
def _load(self):
"""Load scene map from disk."""
if not os.path.exists(self.scene_path):
return
try:
with open(self.scene_path) as f:
data = json.load(f)
with self._lock:
for cat, bins in data.get("scene_map", {}).items():
for bin_str, count in bins.items():
self.scene_map[cat][int(bin_str)] = count
for cat, total in data.get("category_totals", {}).items():
self.category_totals[cat] = total
logger.info("Loaded scene map: %d categories, %d total observations",
len(self.scene_map),
sum(self.category_totals.values()))
except Exception as e:
logger.warning("Failed to load scene map: %s", e)
def _save_loop(self):
while self._running:
time.sleep(SCENE_SAVE_INTERVAL)
if self._running:
self._save()