""" Spatial audio scene — maps what sounds come from where, over time. Builds a persistent picture of the acoustic environment: - "TV is usually at 270°" - "Door knocks come from 90°" - "Speech mostly from 0-45° (the desk area)" Combines YAMNet classification with triangulated position to create a spatial-temporal log of sound events. Detects anomalies when a sound type appears from an unusual direction. Data feeds into LYRA context for environmental awareness. """ import json import logging import math import os import time import threading from collections import defaultdict, deque from pathlib import Path from typing import Optional logger = logging.getLogger("headmic.scene") # Scene configuration ANGLE_BIN_SIZE = 30 # degrees per bin (12 bins around 360°) EVENT_HISTORY_SIZE = 500 # max events in memory SCENE_SAVE_INTERVAL = 60.0 # save scene map to disk every N seconds ANOMALY_MIN_OBSERVATIONS = 10 # need this many of a category before detecting anomalies ANOMALY_ANGLE_THRESHOLD = 90 # degrees from usual position = anomalous DEFAULT_SCENE_PATH = os.path.expanduser("~/.vixy/scene_map.json") def _angle_bin(angle_deg: float) -> int: """Quantize angle to bin index.""" return int(angle_deg / ANGLE_BIN_SIZE) % (360 // ANGLE_BIN_SIZE) def _bin_center(bin_idx: int) -> float: """Center angle of a bin.""" return bin_idx * ANGLE_BIN_SIZE + ANGLE_BIN_SIZE / 2 def _angle_distance(a1: float, a2: float) -> float: """Shortest angular distance between two angles in degrees.""" diff = abs(a1 - a2) % 360 return min(diff, 360 - diff) class SoundEvent: """A single classified sound with spatial information.""" def __init__(self, category: str, top_class: str, score: float, angle: float, distance_mm: float, proximity: str, side: str, timestamp: float): self.category = category self.top_class = top_class self.score = score self.angle = angle self.distance_mm = distance_mm self.proximity = proximity self.side = side self.timestamp = timestamp def to_dict(self) -> dict: return { "category": self.category, "top_class": self.top_class, "score": round(self.score, 3), "angle": round(self.angle, 1), "distance_mm": round(self.distance_mm, 1), "proximity": self.proximity, "side": self.side, "timestamp": round(self.timestamp, 2), "age_seconds": round(time.time() - self.timestamp, 1), } class SpatialScene: """Persistent spatial audio scene map.""" def __init__(self, scene_path: str = DEFAULT_SCENE_PATH): self.scene_path = scene_path # Event log (recent history) self.events: deque[SoundEvent] = deque(maxlen=EVENT_HISTORY_SIZE) # Learned scene map: category → {angle_bin → count} # Tracks where each type of sound usually comes from self.scene_map: dict[str, dict[int, int]] = defaultdict(lambda: defaultdict(int)) # Total observations per category (for anomaly detection) self.category_totals: dict[str, int] = defaultdict(int) # Last anomaly per category (avoid spamming) self._last_anomaly: dict[str, float] = {} self._lock = threading.Lock() self._save_thread: Optional[threading.Thread] = None self._running = False self._load() def start(self): self._running = True self._save_thread = threading.Thread(target=self._save_loop, daemon=True) self._save_thread.start() logger.info("Spatial scene tracking started (%d learned categories, saving to %s)", len(self.scene_map), self.scene_path) def stop(self): self._running = False self._save() def observe(self, category: str, top_class: str, score: float, spatial: dict) -> Optional[dict]: """ Record a classified sound with its spatial position. Returns anomaly info if this sound is coming from an unusual direction. """ if not spatial or not spatial.get("vad"): return None # Compute angle from position x_mm = spatial.get("x_mm", 0) y_mm = spatial.get("y_mm", 0) angle = math.degrees(math.atan2(x_mm, max(y_mm, 1.0))) % 360 event = SoundEvent( category=category, top_class=top_class, score=score, angle=angle, distance_mm=spatial.get("distance_mm", 0), proximity=spatial.get("proximity", "unknown"), side=spatial.get("side", "center"), timestamp=time.time(), ) anomaly = None with self._lock: self.events.append(event) # Update scene map angle_bin = _angle_bin(angle) self.scene_map[category][angle_bin] += 1 self.category_totals[category] += 1 # Check for anomaly anomaly = self._check_anomaly(event) return anomaly def _check_anomaly(self, event: SoundEvent) -> Optional[dict]: """Check if this event is from an unusual direction for its category.""" category = event.category total = self.category_totals[category] if total < ANOMALY_MIN_OBSERVATIONS: return None # Rate-limit anomalies per category (once per 30s) now = time.time() if now - self._last_anomaly.get(category, 0) < 30.0: return None # Find the usual direction for this category (already holding lock) usual_angle = self._usual_direction_unlocked(category) if usual_angle is None: return None deviation = _angle_distance(event.angle, usual_angle) if deviation >= ANOMALY_ANGLE_THRESHOLD: self._last_anomaly[category] = now anomaly = { "type": "spatial_anomaly", "category": category, "top_class": event.top_class, "expected_angle": round(usual_angle, 1), "actual_angle": round(event.angle, 1), "deviation": round(deviation, 1), "proximity": event.proximity, "message": f"{event.top_class} from unusual direction " f"({round(event.angle)}° vs usual {round(usual_angle)}°)", } logger.info("Spatial anomaly: %s", anomaly["message"]) return anomaly return None def _usual_direction_unlocked(self, category: str) -> Optional[float]: """Get the most common direction for a category. Caller must hold self._lock.""" bins = self.scene_map.get(category) if not bins: return None total_weight = sum(bins.values()) if total_weight == 0: return None sin_sum = 0.0 cos_sum = 0.0 for bin_idx, count in bins.items(): angle_rad = math.radians(_bin_center(bin_idx)) sin_sum += count * math.sin(angle_rad) cos_sum += count * math.cos(angle_rad) return math.degrees(math.atan2(sin_sum, cos_sum)) % 360 def get_usual_direction(self, category: str) -> Optional[float]: """Get the most common direction for a sound category (thread-safe).""" with self._lock: return self._usual_direction_unlocked(category) def get_scene_summary(self) -> dict: """Get a summary of the learned spatial scene.""" with self._lock: summary = {} for category in sorted(self.scene_map.keys()): usual = self._usual_direction_unlocked(category) total = self.category_totals[category] if usual is not None: summary[category] = { "usual_angle": round(usual, 1), "observations": total, } return summary def get_recent_events(self, seconds: float = 30.0, category: str = None) -> list[dict]: """Get recent sound events, optionally filtered by category.""" cutoff = time.time() - seconds with self._lock: events = [e.to_dict() for e in self.events if e.timestamp >= cutoff and (category is None or e.category == category)] return events def get_spatial_heatmap(self) -> dict[str, list]: """Get observation counts per angle bin, per category. Useful for visualization.""" with self._lock: n_bins = 360 // ANGLE_BIN_SIZE heatmap = {} for category, bins in self.scene_map.items(): counts = [bins.get(i, 0) for i in range(n_bins)] heatmap[category] = { "bin_size_deg": ANGLE_BIN_SIZE, "counts": counts, "total": self.category_totals[category], } return heatmap def _save(self): """Save scene map to disk.""" with self._lock: data = { "scene_map": {k: dict(v) for k, v in self.scene_map.items()}, "category_totals": dict(self.category_totals), "saved_at": time.time(), } try: os.makedirs(os.path.dirname(self.scene_path), exist_ok=True) with open(self.scene_path, "w") as f: json.dump(data, f, indent=2) except Exception as e: logger.warning("Failed to save scene map: %s", e) def _load(self): """Load scene map from disk.""" if not os.path.exists(self.scene_path): return try: with open(self.scene_path) as f: data = json.load(f) with self._lock: for cat, bins in data.get("scene_map", {}).items(): for bin_str, count in bins.items(): self.scene_map[cat][int(bin_str)] = count for cat, total in data.get("category_totals", {}).items(): self.category_totals[cat] = total logger.info("Loaded scene map: %d categories, %d total observations", len(self.scene_map), sum(self.category_totals.values())) except Exception as e: logger.warning("Failed to load scene map: %s", e) def _save_loop(self): while self._running: time.sleep(SCENE_SAVE_INTERVAL) if self._running: self._save()