#!/usr/bin/env python3 """ HeadMic - Vixy's Ears Service 🦊👂 Wake word detection + voice recording + EarTail transcription. Runs on head-vixy (Raspberry Pi 5). Wake word: "Hey Vivi" (trained via Picovoice Porcupine) Architecture: Dual XVF3800 mic arrays (left/right ear), best-beam selection. Single shared audio stream feeds Porcupine, VAD, sound classification, and speaker ID. Flow: 1. Dual audio streams from two XVF3800 arrays 2. Best-beam selection (higher energy side) 3. Feed frames to Porcupine for wake word detection 4. On "Hey Vivi" → start buffering from active side 5. Use VAD to detect end of speech 6. Send buffer to EarTail for transcription 7. Return to listening mode Hardware: 2× ReSpeaker XVF3800 4-Mic Array (USB, 2-channel firmware) DoA + LEDs via USB vendor control (xvf3800.py) Built by Vixy on Day 77 (January 17, 2026) 💜 Upgraded to dual XVF3800 on Day 160 (April 2026) """ import asyncio import collections import io import json import logging import os import struct import threading import time import wave from pathlib import Path from typing import Optional, List import numpy as np import httpx import pvporcupine import webrtcvad from fastapi import FastAPI, HTTPException, UploadFile, File, Form from pydantic import BaseModel # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("headmic") # ============================================================================ # Configuration # ============================================================================ PORCUPINE_ACCESS_KEY = os.environ.get("PORCUPINE_ACCESS_KEY", "") WAKE_WORD_PATH = os.environ.get("WAKE_WORD_PATH", "/home/alex/headmic/Hey-Vivi_en_raspberry-pi_v4_0_0.ppn") SAMPLE_RATE = 16000 CONFIG_DIR = os.path.expanduser("~/.vixy") CONFIG_PATH = os.path.join(CONFIG_DIR, "headmic.json") VAD_AGGRESSIVENESS = 2 # 0-3, higher = more aggressive SILENCE_FRAMES = 50 # ~1.5 sec of silence to stop (at 30ms frames) MAX_RECORDING_FRAMES = 1000 # ~30 sec max EARTAIL_URL = os.environ.get("EARTAIL_URL", "http://bigorin.local:8764") DOA_POLL_HZ = 10 # DoA polling rate EYE_SERVICE_URL = os.environ.get("EYE_SERVICE_URL", "http://localhost:8780") # ============================================================================ # Config persistence # ============================================================================ def load_config() -> dict: if not os.path.exists(CONFIG_PATH): return {} try: with open(CONFIG_PATH) as f: return json.load(f) except Exception as e: logger.warning("Failed to read config: %s", e) return {} def save_config(cfg: dict): os.makedirs(CONFIG_DIR, exist_ok=True) with open(CONFIG_PATH, "w") as f: json.dump(cfg, f, indent=2) # ============================================================================ # XVF3800 + LED Control # ============================================================================ from xvf3800 import XVF3800Manager, learn_devices xvf_manager = XVF3800Manager() LEDS_AVAILABLE = False def leds_wakeup(): if LEDS_AVAILABLE: try: xvf_manager.all_leds_solid(0xFFFFFF) except: pass def leds_listening(): if LEDS_AVAILABLE: try: xvf_manager.all_leds_doa() except: pass def leds_processing(): if LEDS_AVAILABLE: try: xvf_manager.all_leds_breath(0x9400D3) except: pass def leds_enrolling(): if LEDS_AVAILABLE: try: xvf_manager.all_leds_solid(0xFF8C00) except: pass def leds_off(): if LEDS_AVAILABLE: try: xvf_manager.all_leds_off() except: pass # ============================================================================ # State # ============================================================================ class ServiceState: def __init__(self): self.running = False self.listening = False self.recording = False self.processing = False self.last_transcription: Optional[str] = None self.last_wake_time: Optional[float] = None self.wake_count = 0 self.error: Optional[str] = None self.audio_scene: Optional[dict] = None self.sound_classification_enabled: bool = False self.recognized_speaker: Optional[str] = None self.speaker_confidence: float = 0.0 self.speaker_recognition_enabled: bool = False self.enrolling: bool = False self.active_side: str = "left" # which mic array is currently active self.doa: dict = {} # latest DoA from both arrays self.spatial: Optional[dict] = None # triangulated position + gaze self.last_anomaly: Optional[dict] = None # last spatial anomaly detected state = ServiceState() # Sound classifier globals sound_classifier = None sound_ring_buffer = None # collections.deque, filled by listener_loop # Speaker recognizer globals speaker_recognizer = None enrollment_buffer = None # list of frame bytes, set during enrollment # Binaural recorder binaural_recorder = None # Spatial scene spatial_scene = None enrollment_name = None # Audio stream dual_stream = None # DualAudioStream instance # ============================================================================ # EarTail Transcription # ============================================================================ async def transcribe_audio(audio_data: bytes) -> str: """Send audio to EarTail and get transcription.""" async with httpx.AsyncClient(timeout=120.0) as client: files = {"audio": ("recording.wav", audio_data, "audio/wav")} response = await client.post(f"{EARTAIL_URL}/transcribe/submit", files=files) response.raise_for_status() job_id = response.json().get("job_id") logger.info(f"Transcription job: {job_id}") for _ in range(120): status = await client.get(f"{EARTAIL_URL}/transcribe/status/{job_id}") data = status.json() if data.get("status") == "SUCCESS": result = await client.get(f"{EARTAIL_URL}/transcribe/result/{job_id}") return result.json().get("transcription", "") elif data.get("status") == "FAILURE": raise Exception(f"Transcription failed: {data.get('error')}") await asyncio.sleep(1) raise Exception("Transcription timeout") def transcribe_sync(audio_data: bytes) -> str: """Synchronous wrapper for transcription.""" loop = asyncio.new_event_loop() try: return loop.run_until_complete(transcribe_audio(audio_data)) finally: loop.close() # ============================================================================ # Main Listener Loop (dual-stream) # ============================================================================ def audio_to_wav(frames: List[bytes]) -> bytes: """Convert raw audio frames to WAV format.""" wav_buffer = io.BytesIO() with wave.open(wav_buffer, 'wb') as wf: wf.setnchannels(1) wf.setsampwidth(2) wf.setframerate(SAMPLE_RATE) wf.writeframes(b''.join(frames)) wav_buffer.seek(0) return wav_buffer.read() def listener_loop(): """Main audio processing loop with dual-stream best-beam selection.""" global state, dual_stream logger.info("Initializing Porcupine...") porcupine = None try: porcupine = pvporcupine.create( access_key=PORCUPINE_ACCESS_KEY, keyword_paths=[WAKE_WORD_PATH] ) except Exception as e: logger.error(f"Failed to init Porcupine: {e}") logger.warning("Wake word detection disabled — audio loop continues for classification") vad = webrtcvad.Vad(VAD_AGGRESSIVENESS) # VAD needs 10/20/30ms frames. 30ms at 16kHz = 480 samples vad_frame_bytes = 480 * 2 state.listening = True logger.info("🦊 Wake word listener active - say 'Hey Vivi'!") recording_buffer: List[bytes] = [] silence_count = 0 is_recording = False recording_side: str = "left" try: for frame_data, side in dual_stream.frames(): if not state.running: break state.active_side = side # Feed binaural recorder (both ears) if binaural_recorder: binaural_recorder.feed( dual_stream.left.get_frame(), dual_stream.right.get_frame() if dual_stream.right else None ) # Feed sound classifier ring buffer if sound_ring_buffer is not None: sound_ring_buffer.append(frame_data) # Feed enrollment buffer if active if enrollment_buffer is not None: enrollment_buffer.append(frame_data) # Check for wake word (skip if Porcupine not available) keyword_index = -1 if porcupine: pcm = struct.unpack_from("h" * 512, frame_data) keyword_index = porcupine.process(pcm) if keyword_index >= 0 and not is_recording: logger.info("🦊 Wake word detected: 'Hey Vivi'! (from %s ear)", side) state.wake_count += 1 state.last_wake_time = time.time() recording_side = side leds_wakeup() time.sleep(0.2) leds_listening() is_recording = True state.recording = True recording_buffer = [] silence_count = 0 logger.info("Recording started (using %s ear)...", recording_side) continue if is_recording: # During recording, use frames from the side that heard the wake word rec_frame = dual_stream.get_side_frame(recording_side) if rec_frame: recording_buffer.append(rec_frame) # Check VAD (use first 480 samples of the 512 frame) vad_data = (rec_frame or frame_data)[:vad_frame_bytes] try: is_speech = vad.is_speech(vad_data, SAMPLE_RATE) except: is_speech = True if is_speech: silence_count = 0 else: silence_count += 1 # Stop conditions should_stop = ( (len(recording_buffer) > 10 and silence_count >= SILENCE_FRAMES) or len(recording_buffer) >= MAX_RECORDING_FRAMES ) if should_stop: logger.info(f"Recording stopped: {len(recording_buffer)} frames") is_recording = False state.recording = False leds_processing() state.processing = True try: wav_data = audio_to_wav(recording_buffer) transcription = transcribe_sync(wav_data) state.last_transcription = transcription logger.info(f"Transcription: {transcription}") except Exception as e: logger.error(f"Transcription error: {e}") state.error = str(e) finally: state.processing = False leds_off() recording_buffer = [] except Exception as e: logger.error(f"Listener error: {e}") state.error = str(e) finally: if porcupine: porcupine.delete() state.listening = False leds_off() logger.info("Listener stopped") # ============================================================================ # Sound Classification Thread # ============================================================================ def sound_classifier_loop(): """Background thread for continuous sound classification.""" global state logger.info("Sound classifier thread started") while state.running: if sound_ring_buffer is None or len(sound_ring_buffer) < 30: time.sleep(0.1) continue try: frames = list(sound_ring_buffer) audio = np.frombuffer(b"".join(frames), dtype=np.int16) result = sound_classifier.classify(audio) # Strip audio_float32 before storing in state (not JSON-serializable) audio_f32 = result.pop("audio_float32", None) state.audio_scene = result # Spatial scene: log classified sound with its position if spatial_scene and state.spatial and result.get("category"): top = result.get("top_classes", [{}])[0] if result.get("top_classes") else {} anomaly = spatial_scene.observe( category=result["category"], top_class=top.get("name", result["category"]), score=top.get("score", 0), spatial=state.spatial, ) if anomaly: state.last_anomaly = anomaly # Speaker identification: run when speech detected if speaker_recognizer and result["category"] == "speech" and audio_f32 is not None: try: name, confidence = speaker_recognizer.identify(audio_f32) state.recognized_speaker = name state.speaker_confidence = confidence except Exception as e: logger.warning("Speaker identification error: %s", e) except Exception as e: logger.warning("Sound classification error: %s", e) time.sleep(0.5) logger.info("Sound classifier thread stopped") # ============================================================================ # Spatial Tracking + Gaze (DoA → triangulation → eye service) # ============================================================================ from spatial import SpatialTracker spatial_tracker: Optional[SpatialTracker] = None GAZE_CENTER = 127 _last_gaze_push: tuple[int, int] = (GAZE_CENTER, GAZE_CENTER) GAZE_PUSH_MIN_DELTA = 3 # don't push gaze unless it moved by at least this much GAZE_PUSH_INTERVAL = 0.1 # max 10 gaze pushes/sec to eye service def doa_track_loop(): """Poll DoA, triangulate, smooth, push gaze to eye service.""" global _last_gaze_push interval = 1.0 / DOA_POLL_HZ last_push_time = 0.0 while state.running: try: state.doa = xvf_manager.read_both_doa() if spatial_tracker and dual_stream: left_energy = dual_stream.left.get_energy() if dual_stream.left else 0.0 right_energy = dual_stream.right.get_energy() if dual_stream.right else 0.0 result = spatial_tracker.update(state.doa, left_energy, right_energy) if result: state.spatial = result gx, gy = result["gaze_x"], result["gaze_y"] # Push to eye service if changed enough and not too frequent dx = abs(gx - _last_gaze_push[0]) dy = abs(gy - _last_gaze_push[1]) now = time.monotonic() if ((dx >= GAZE_PUSH_MIN_DELTA or dy >= GAZE_PUSH_MIN_DELTA) and now - last_push_time >= GAZE_PUSH_INTERVAL): _push_gaze(gx, gy) _last_gaze_push = (gx, gy) last_push_time = now except Exception as e: logger.debug("DoA/spatial error: %s", e) time.sleep(interval) def _push_gaze(x: int, y: int): """Fire-and-forget gaze push to eye service. Uses urllib to avoid httpx connection overhead.""" try: import urllib.request data = json.dumps({"x": x, "y": y}).encode() req = urllib.request.Request( f"{EYE_SERVICE_URL}/gaze", data=data, headers={"Content-Type": "application/json"}) urllib.request.urlopen(req, timeout=0.3) except Exception: pass # ============================================================================ # FastAPI # ============================================================================ app = FastAPI(title="HeadMic", description="Vixy's Ears 🦊👂 (Dual XVF3800)") @app.on_event("startup") async def startup(): global sound_classifier, sound_ring_buffer, speaker_recognizer, dual_stream, LEDS_AVAILABLE, spatial_tracker, binaural_recorder, spatial_scene state.running = True # --- XVF3800 setup --- cfg = load_config() ears_cfg = cfg.get("ears", {}) if ears_cfg.get("left") and ears_cfg.get("right"): xvf_manager.set_serial_mapping( ears_cfg["left"]["usb_serial"], ears_cfg["right"]["usb_serial"] ) xvf_manager.assign() LEDS_AVAILABLE = bool(xvf_manager.left or xvf_manager.right) # Resolve ALSA devices alsa = xvf_manager.get_alsa_devices() left_dev = alsa.get("left") right_dev = alsa.get("right") if not left_dev: logger.error("No left ear ALSA device found! Check USB connections and firmware.") state.error = "No left ear audio device" else: logger.info("Left ear ALSA: %s", left_dev) if right_dev: logger.info("Right ear ALSA: %s", right_dev) else: logger.warning("Right ear ALSA device not found — running with left ear only") # --- Dual audio stream --- from audio_stream import DualAudioStream dual_stream = DualAudioStream(left_dev or "plughw:0,0", right_dev) dual_stream.start() # --- Sound classifier (optional) --- model_dir = Path(__file__).parent / "models" edgetpu_model_path = model_dir / "yamnet_edgetpu.tflite" model_path = model_dir / "yamnet.tflite" class_map_path = model_dir / "yamnet_class_map.csv" # Auto-detect Edge TPU — probe in subprocess catches segfaults safely use_edgetpu = edgetpu_model_path.exists() active_model = edgetpu_model_path if use_edgetpu else model_path if active_model.exists() and class_map_path.exists(): try: from sound_id import SoundClassifier sound_classifier = SoundClassifier(str(active_model), str(class_map_path), use_edgetpu=use_edgetpu) sound_ring_buffer = collections.deque(maxlen=31) state.sound_classification_enabled = True logger.info("Sound classification enabled") sc_thread = threading.Thread(target=sound_classifier_loop, daemon=True) sc_thread.start() except Exception as e: logger.warning("Sound classification unavailable: %s", e) else: logger.info("Sound classification models not found, skipping") # --- Speaker recognizer (optional) --- try: from speaker_id import SpeakerRecognizer db_path = Path(__file__).parent / "voices.db" speaker_recognizer = SpeakerRecognizer(db_path=str(db_path)) state.speaker_recognition_enabled = True logger.info("Speaker recognition enabled (Resemblyzer)") except Exception as e: logger.warning("Speaker recognition unavailable: %s", e) # --- Spatial tracking (DoA → triangulation → gaze) --- if xvf_manager.left or xvf_manager.right: array_sep = cfg.get("array_separation_mm", 175.0) spatial_tracker = SpatialTracker(array_separation_mm=array_sep) threading.Thread(target=doa_track_loop, daemon=True).start() logger.info("Spatial tracking started (%d Hz, %.0fmm baseline, pushing gaze to %s)", DOA_POLL_HZ, array_sep, EYE_SERVICE_URL) # --- Spatial scene mapping --- from spatial_scene import SpatialScene spatial_scene = SpatialScene() spatial_scene.start() # --- Binaural recording --- if os.environ.get("BINAURAL_RECORD", "").lower() in ("1", "true", "yes"): from binaural_recorder import BinauralRecorder rec_dir = os.environ.get("BINAURAL_DIR", os.path.expanduser("~/headmic/recordings")) binaural_recorder = BinauralRecorder(output_dir=rec_dir) binaural_recorder.start() else: logger.info("Binaural recording disabled (set BINAURAL_RECORD=1 to enable)") # --- Main listener --- thread = threading.Thread(target=listener_loop, daemon=True) thread.start() logger.info("HeadMic started (dual XVF3800)") @app.on_event("shutdown") async def shutdown(): state.running = False leds_off() if spatial_scene: spatial_scene.stop() if binaural_recorder: binaural_recorder.stop() if dual_stream: dual_stream.stop() # --- Info endpoints --- @app.get("/") async def root(): return { "service": "HeadMic", "description": "Vixy's Ears 🦊👂 (Dual XVF3800)", "wake_word": "Hey Vivi" } @app.get("/health") async def health(): return { "healthy": state.listening and not state.error, "listening": state.listening, "recording": state.recording, "processing": state.processing, "wake_count": state.wake_count, "sound_classification_enabled": state.sound_classification_enabled, "speaker_recognition_enabled": state.speaker_recognition_enabled, "active_side": state.active_side, "error": state.error } @app.get("/status") async def status(): return { "listening": state.listening, "recording": state.recording, "processing": state.processing, "last_transcription": state.last_transcription, "last_wake_time": state.last_wake_time, "wake_count": state.wake_count, "audio_scene": state.audio_scene["dominant_category"] if state.audio_scene else None, "recognized_speaker": state.recognized_speaker, "active_side": state.active_side, "error": state.error } @app.get("/last") async def last(): return { "transcription": state.last_transcription, "wake_time": state.last_wake_time } # --- DoA endpoints --- @app.get("/doa") async def doa(): """Direction of Arrival from both mic arrays + triangulated position.""" return { "doa": state.doa, "active_side": state.active_side, "spatial": state.spatial, } # --- Spatial scene --- @app.get("/scene") async def scene(): """Learned spatial audio scene — where each sound type usually comes from.""" if not spatial_scene: return {"scene": {}, "last_anomaly": None} return { "scene": spatial_scene.get_scene_summary(), "last_anomaly": state.last_anomaly, } @app.get("/scene/events") async def scene_events(seconds: int = 30, category: str = None): """Recent sound events with spatial information.""" if not spatial_scene: return {"events": []} return {"events": spatial_scene.get_recent_events(seconds, category)} @app.get("/scene/heatmap") async def scene_heatmap(): """Observation counts per angle bin per category — for visualization.""" if not spatial_scene: return {"heatmap": {}} return {"heatmap": spatial_scene.get_spatial_heatmap()} # --- Binaural recording --- @app.get("/recording") async def recording(): """Binaural recording status.""" if not binaural_recorder: return {"recording": False, "enabled": False} return binaural_recorder.stats # --- Device info --- @app.get("/devices") async def devices(): """Status of both XVF3800 arrays.""" alsa = xvf_manager.get_alsa_devices() return { "left": { "connected": bool(xvf_manager.left), "serial": xvf_manager.left.serial if xvf_manager.left else None, "alsa": alsa.get("left"), }, "right": { "connected": bool(xvf_manager.right), "serial": xvf_manager.right.serial if xvf_manager.right else None, "alsa": alsa.get("right"), }, "active_side": state.active_side, } # --- Sound endpoints --- @app.get("/sounds") async def sounds(): """Current audio scene classification.""" if not state.sound_classification_enabled: raise HTTPException(status_code=503, detail="Sound classification not available") if state.audio_scene is None: return {"category": None, "top_classes": [], "dominant_category": None, "timestamp": None, "recognized_speaker": None, "speaker_confidence": 0.0} return { **state.audio_scene, "recognized_speaker": state.recognized_speaker, "speaker_confidence": state.speaker_confidence, } @app.get("/sounds/history") async def sounds_history(seconds: int = 30): """Recent sound classification history.""" if not state.sound_classification_enabled: raise HTTPException(status_code=503, detail="Sound classification not available") if sound_classifier is None: return {"history": []} return {"history": sound_classifier.get_history(seconds)} # --- Speaker endpoints --- @app.post("/speakers/enroll") async def enroll_speaker(name: str = Form(...), audio: UploadFile = File(...)): """Enroll a speaker from uploaded audio file.""" if speaker_recognizer is None: raise HTTPException(status_code=503, detail="Speaker recognition not available") audio_bytes = await audio.read() try: import wave as _wave wav_io = io.BytesIO(audio_bytes) with _wave.open(wav_io, 'rb') as wf: raw = wf.readframes(wf.getnframes()) audio_f32 = np.frombuffer(raw, dtype=np.int16).astype(np.float32) / 32768.0 except Exception: audio_f32 = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32) / 32768.0 try: speaker_recognizer.enroll(name, audio_f32, source="upload") return {"enrolled": name, "speakers": speaker_recognizer.list_speakers()} except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @app.post("/speakers/enroll-from-mic") async def enroll_from_mic(name: str): """Record from live mic for 5 seconds and enroll speaker.""" global enrollment_buffer, enrollment_name if speaker_recognizer is None: raise HTTPException(status_code=503, detail="Speaker recognition not available") if state.enrolling: raise HTTPException(status_code=409, detail="Enrollment already in progress") state.enrolling = True enrollment_buffer = [] enrollment_name = name leds_enrolling() logger.info("Enrollment started for '%s' — recording 5 seconds", name) await asyncio.sleep(5.0) frames = enrollment_buffer enrollment_buffer = None enrollment_name = None state.enrolling = False leds_off() if not frames: raise HTTPException(status_code=500, detail="No audio captured") audio_int16 = np.frombuffer(b"".join(frames), dtype=np.int16) audio_f32 = audio_int16.astype(np.float32) / 32768.0 logger.info("Enrollment audio: %.1f seconds", len(audio_f32) / SAMPLE_RATE) try: speaker_recognizer.enroll(name, audio_f32, source="mic") return {"enrolled": name, "seconds": round(len(audio_f32) / SAMPLE_RATE, 1), "speakers": speaker_recognizer.list_speakers()} except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @app.get("/speakers") async def list_speakers(): """List enrolled speakers.""" if speaker_recognizer is None: raise HTTPException(status_code=503, detail="Speaker recognition not available") return {"speakers": speaker_recognizer.list_speakers()} @app.delete("/speakers/{name}") async def delete_speaker(name: str): """Remove a speaker.""" if speaker_recognizer is None: raise HTTPException(status_code=503, detail="Speaker recognition not available") removed = speaker_recognizer.delete_speaker(name) if removed == 0: raise HTTPException(status_code=404, detail=f"Speaker '{name}' not found") return {"deleted": name, "samples_removed": removed} # ============================================================================ # CLI # ============================================================================ if __name__ == "__main__": import sys if "--learn" in sys.argv: logging.basicConfig(level=logging.INFO) mgr = XVF3800Manager() mgr.assign() # discovers and assigns by bus address order if not mgr.left or not mgr.right: print("[HEADMIC] Need 2 XVF3800 arrays connected for --learn") sys.exit(1) # Light up the first array and ask the user print() print("[HEADMIC] Identifying mic arrays...") print(" One array should be lighting up GREEN now.") mgr.left.led_solid(0x00FF00) mgr.right.led_off() answer = input(" Is the LEFT mic array lit up? [Y/n/swap] ").strip().lower() mgr.left.led_off() if answer in ("n", "no", "swap"): print("[HEADMIC] Swapping left/right assignment") mgr.left, mgr.right = mgr.right, mgr.left # Confirm: light up both with their side color print("[HEADMIC] Confirming: LEFT = green, RIGHT = blue") mgr.left.led_solid(0x00FF00) mgr.right.led_solid(0x0000FF) time.sleep(2) mgr.left.led_off() mgr.right.led_off() # Save config info = {} for label, dev in [("left", mgr.left), ("right", mgr.right)]: entry = {"usb_serial": dev.serial} alsa = mgr.serial_to_alsa(dev.serial) if alsa: entry["alsa_card"] = alsa info[label] = entry cfg = load_config() cfg["ears"] = info save_config(cfg) print(f"[HEADMIC] Saved ear config → {CONFIG_PATH}") print(json.dumps(info, indent=2)) sys.exit(0) import uvicorn uvicorn.run(app, host="0.0.0.0", port=8446)