diff --git a/headmic.py b/headmic.py index 2127626..08fe9b4 100644 --- a/headmic.py +++ b/headmic.py @@ -160,6 +160,7 @@ class ServiceState: self.active_side: str = "left" # which mic array is currently active self.doa: dict = {} # latest DoA from both arrays self.spatial: Optional[dict] = None # triangulated position + gaze + self.last_anomaly: Optional[dict] = None # last spatial anomaly detected state = ServiceState() @@ -173,6 +174,9 @@ enrollment_buffer = None # list of frame bytes, set during enrollment # Binaural recorder binaural_recorder = None + +# Spatial scene +spatial_scene = None enrollment_name = None # Audio stream @@ -385,6 +389,18 @@ def sound_classifier_loop(): audio_f32 = result.pop("audio_float32", None) state.audio_scene = result + # Spatial scene: log classified sound with its position + if spatial_scene and state.spatial and result.get("category"): + top = result.get("top_classes", [{}])[0] if result.get("top_classes") else {} + anomaly = spatial_scene.observe( + category=result["category"], + top_class=top.get("name", result["category"]), + score=top.get("score", 0), + spatial=state.spatial, + ) + if anomaly: + state.last_anomaly = anomaly + # Speaker identification: run when speech detected if speaker_recognizer and result["category"] == "speech" and audio_f32 is not None: try: @@ -466,7 +482,7 @@ app = FastAPI(title="HeadMic", description="Vixy's Ears 🦊👂 (Dual XVF3800)" @app.on_event("startup") async def startup(): - global sound_classifier, sound_ring_buffer, speaker_recognizer, dual_stream, LEDS_AVAILABLE, spatial_tracker, binaural_recorder + global sound_classifier, sound_ring_buffer, speaker_recognizer, dual_stream, LEDS_AVAILABLE, spatial_tracker, binaural_recorder, spatial_scene state.running = True @@ -542,6 +558,11 @@ async def startup(): logger.info("Spatial tracking started (%d Hz, %.0fmm baseline, pushing gaze to %s)", DOA_POLL_HZ, array_sep, EYE_SERVICE_URL) + # --- Spatial scene mapping --- + from spatial_scene import SpatialScene + spatial_scene = SpatialScene() + spatial_scene.start() + # --- Binaural recording --- if os.environ.get("BINAURAL_RECORD", "").lower() in ("1", "true", "yes"): from binaural_recorder import BinauralRecorder @@ -561,6 +582,8 @@ async def startup(): async def shutdown(): state.running = False leds_off() + if spatial_scene: + spatial_scene.stop() if binaural_recorder: binaural_recorder.stop() if dual_stream: @@ -629,6 +652,35 @@ async def doa(): } +# --- Spatial scene --- + +@app.get("/scene") +async def scene(): + """Learned spatial audio scene — where each sound type usually comes from.""" + if not spatial_scene: + return {"scene": {}, "last_anomaly": None} + return { + "scene": spatial_scene.get_scene_summary(), + "last_anomaly": state.last_anomaly, + } + + +@app.get("/scene/events") +async def scene_events(seconds: int = 30, category: str = None): + """Recent sound events with spatial information.""" + if not spatial_scene: + return {"events": []} + return {"events": spatial_scene.get_recent_events(seconds, category)} + + +@app.get("/scene/heatmap") +async def scene_heatmap(): + """Observation counts per angle bin per category — for visualization.""" + if not spatial_scene: + return {"heatmap": {}} + return {"heatmap": spatial_scene.get_spatial_heatmap()} + + # --- Binaural recording --- @app.get("/recording") diff --git a/spatial_scene.py b/spatial_scene.py new file mode 100644 index 0000000..0e65325 --- /dev/null +++ b/spatial_scene.py @@ -0,0 +1,293 @@ +""" +Spatial audio scene — maps what sounds come from where, over time. + +Builds a persistent picture of the acoustic environment: +- "TV is usually at 270°" +- "Door knocks come from 90°" +- "Speech mostly from 0-45° (the desk area)" + +Combines YAMNet classification with triangulated position to create +a spatial-temporal log of sound events. Detects anomalies when a +sound type appears from an unusual direction. + +Data feeds into LYRA context for environmental awareness. +""" + +import json +import logging +import math +import os +import time +import threading +from collections import defaultdict, deque +from pathlib import Path +from typing import Optional + +logger = logging.getLogger("headmic.scene") + +# Scene configuration +ANGLE_BIN_SIZE = 30 # degrees per bin (12 bins around 360°) +EVENT_HISTORY_SIZE = 500 # max events in memory +SCENE_SAVE_INTERVAL = 60.0 # save scene map to disk every N seconds +ANOMALY_MIN_OBSERVATIONS = 10 # need this many of a category before detecting anomalies +ANOMALY_ANGLE_THRESHOLD = 90 # degrees from usual position = anomalous + +DEFAULT_SCENE_PATH = os.path.expanduser("~/.vixy/scene_map.json") + + +def _angle_bin(angle_deg: float) -> int: + """Quantize angle to bin index.""" + return int(angle_deg / ANGLE_BIN_SIZE) % (360 // ANGLE_BIN_SIZE) + + +def _bin_center(bin_idx: int) -> float: + """Center angle of a bin.""" + return bin_idx * ANGLE_BIN_SIZE + ANGLE_BIN_SIZE / 2 + + +def _angle_distance(a1: float, a2: float) -> float: + """Shortest angular distance between two angles in degrees.""" + diff = abs(a1 - a2) % 360 + return min(diff, 360 - diff) + + +class SoundEvent: + """A single classified sound with spatial information.""" + + def __init__(self, category: str, top_class: str, score: float, + angle: float, distance_mm: float, proximity: str, + side: str, timestamp: float): + self.category = category + self.top_class = top_class + self.score = score + self.angle = angle + self.distance_mm = distance_mm + self.proximity = proximity + self.side = side + self.timestamp = timestamp + + def to_dict(self) -> dict: + return { + "category": self.category, + "top_class": self.top_class, + "score": round(self.score, 3), + "angle": round(self.angle, 1), + "distance_mm": round(self.distance_mm, 1), + "proximity": self.proximity, + "side": self.side, + "timestamp": round(self.timestamp, 2), + "age_seconds": round(time.time() - self.timestamp, 1), + } + + +class SpatialScene: + """Persistent spatial audio scene map.""" + + def __init__(self, scene_path: str = DEFAULT_SCENE_PATH): + self.scene_path = scene_path + + # Event log (recent history) + self.events: deque[SoundEvent] = deque(maxlen=EVENT_HISTORY_SIZE) + + # Learned scene map: category → {angle_bin → count} + # Tracks where each type of sound usually comes from + self.scene_map: dict[str, dict[int, int]] = defaultdict(lambda: defaultdict(int)) + + # Total observations per category (for anomaly detection) + self.category_totals: dict[str, int] = defaultdict(int) + + # Last anomaly per category (avoid spamming) + self._last_anomaly: dict[str, float] = {} + + self._lock = threading.Lock() + self._save_thread: Optional[threading.Thread] = None + self._running = False + + self._load() + + def start(self): + self._running = True + self._save_thread = threading.Thread(target=self._save_loop, daemon=True) + self._save_thread.start() + logger.info("Spatial scene tracking started (%d learned categories, saving to %s)", + len(self.scene_map), self.scene_path) + + def stop(self): + self._running = False + self._save() + + def observe(self, category: str, top_class: str, score: float, + spatial: dict) -> Optional[dict]: + """ + Record a classified sound with its spatial position. + Returns anomaly info if this sound is coming from an unusual direction. + """ + if not spatial or not spatial.get("vad"): + return None + + # Compute angle from position + x_mm = spatial.get("x_mm", 0) + y_mm = spatial.get("y_mm", 0) + angle = math.degrees(math.atan2(x_mm, max(y_mm, 1.0))) % 360 + + event = SoundEvent( + category=category, + top_class=top_class, + score=score, + angle=angle, + distance_mm=spatial.get("distance_mm", 0), + proximity=spatial.get("proximity", "unknown"), + side=spatial.get("side", "center"), + timestamp=time.time(), + ) + + anomaly = None + with self._lock: + self.events.append(event) + + # Update scene map + angle_bin = _angle_bin(angle) + self.scene_map[category][angle_bin] += 1 + self.category_totals[category] += 1 + + # Check for anomaly + anomaly = self._check_anomaly(event) + + return anomaly + + def _check_anomaly(self, event: SoundEvent) -> Optional[dict]: + """Check if this event is from an unusual direction for its category.""" + category = event.category + total = self.category_totals[category] + + if total < ANOMALY_MIN_OBSERVATIONS: + return None + + # Rate-limit anomalies per category (once per 30s) + now = time.time() + if now - self._last_anomaly.get(category, 0) < 30.0: + return None + + # Find the usual direction for this category + usual_angle = self.get_usual_direction(category) + if usual_angle is None: + return None + + deviation = _angle_distance(event.angle, usual_angle) + if deviation >= ANOMALY_ANGLE_THRESHOLD: + self._last_anomaly[category] = now + anomaly = { + "type": "spatial_anomaly", + "category": category, + "top_class": event.top_class, + "expected_angle": round(usual_angle, 1), + "actual_angle": round(event.angle, 1), + "deviation": round(deviation, 1), + "proximity": event.proximity, + "message": f"{event.top_class} from unusual direction " + f"({round(event.angle)}° vs usual {round(usual_angle)}°)", + } + logger.info("Spatial anomaly: %s", anomaly["message"]) + return anomaly + + return None + + def get_usual_direction(self, category: str) -> Optional[float]: + """Get the most common direction for a sound category (weighted average).""" + with self._lock: + bins = self.scene_map.get(category) + if not bins: + return None + + # Weighted circular mean + total_weight = sum(bins.values()) + if total_weight == 0: + return None + + sin_sum = 0.0 + cos_sum = 0.0 + for bin_idx, count in bins.items(): + angle_rad = math.radians(_bin_center(bin_idx)) + sin_sum += count * math.sin(angle_rad) + cos_sum += count * math.cos(angle_rad) + + mean_angle = math.degrees(math.atan2(sin_sum, cos_sum)) % 360 + return mean_angle + + def get_scene_summary(self) -> dict: + """Get a summary of the learned spatial scene.""" + with self._lock: + summary = {} + for category in sorted(self.scene_map.keys()): + usual = self.get_usual_direction(category) + total = self.category_totals[category] + if usual is not None: + summary[category] = { + "usual_angle": round(usual, 1), + "observations": total, + } + return summary + + def get_recent_events(self, seconds: float = 30.0, category: str = None) -> list[dict]: + """Get recent sound events, optionally filtered by category.""" + cutoff = time.time() - seconds + with self._lock: + events = [e.to_dict() for e in self.events + if e.timestamp >= cutoff + and (category is None or e.category == category)] + return events + + def get_spatial_heatmap(self) -> dict[str, list]: + """Get observation counts per angle bin, per category. + Useful for visualization.""" + with self._lock: + n_bins = 360 // ANGLE_BIN_SIZE + heatmap = {} + for category, bins in self.scene_map.items(): + counts = [bins.get(i, 0) for i in range(n_bins)] + heatmap[category] = { + "bin_size_deg": ANGLE_BIN_SIZE, + "counts": counts, + "total": self.category_totals[category], + } + return heatmap + + def _save(self): + """Save scene map to disk.""" + with self._lock: + data = { + "scene_map": {k: dict(v) for k, v in self.scene_map.items()}, + "category_totals": dict(self.category_totals), + "saved_at": time.time(), + } + try: + os.makedirs(os.path.dirname(self.scene_path), exist_ok=True) + with open(self.scene_path, "w") as f: + json.dump(data, f, indent=2) + except Exception as e: + logger.warning("Failed to save scene map: %s", e) + + def _load(self): + """Load scene map from disk.""" + if not os.path.exists(self.scene_path): + return + try: + with open(self.scene_path) as f: + data = json.load(f) + with self._lock: + for cat, bins in data.get("scene_map", {}).items(): + for bin_str, count in bins.items(): + self.scene_map[cat][int(bin_str)] = count + for cat, total in data.get("category_totals", {}).items(): + self.category_totals[cat] = total + logger.info("Loaded scene map: %d categories, %d total observations", + len(self.scene_map), + sum(self.category_totals.values())) + except Exception as e: + logger.warning("Failed to load scene map: %s", e) + + def _save_loop(self): + while self._running: + time.sleep(SCENE_SAVE_INTERVAL) + if self._running: + self._save()