#!/usr/bin/env python3 """ HeadMic - Vixy's Ears Service 🦊👂 Wake word detection + voice recording + EarTail transcription. Runs on head-vixy (Raspberry Pi 5). Wake word: "Hey Vivi" (trained via Picovoice Porcupine) Architecture: Dual XVF3800 mic arrays (left/right ear), best-beam selection. Single shared audio stream feeds Porcupine, VAD, sound classification, and speaker ID. Flow: 1. Dual audio streams from two XVF3800 arrays 2. Best-beam selection (higher energy side) 3. Feed frames to Porcupine for wake word detection 4. On "Hey Vivi" → start buffering from active side 5. Use VAD to detect end of speech 6. Send buffer to EarTail for transcription 7. Return to listening mode Hardware: 2× ReSpeaker XVF3800 4-Mic Array (USB, 2-channel firmware) DoA + LEDs via USB vendor control (xvf3800.py) Built by Vixy on Day 77 (January 17, 2026) 💜 Upgraded to dual XVF3800 on Day 160 (April 2026) """ import asyncio import collections import io import json import logging import os import struct import threading import time import wave from pathlib import Path from typing import Optional, List import numpy as np import httpx import pvporcupine import webrtcvad from fastapi import FastAPI, HTTPException, UploadFile, File, Form from pydantic import BaseModel # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("headmic") # ============================================================================ # Configuration # ============================================================================ PORCUPINE_ACCESS_KEY = os.environ.get("PORCUPINE_ACCESS_KEY", "") WAKE_WORD_PATH = os.environ.get("WAKE_WORD_PATH", "/home/alex/headmic/Hey-Vivi_en_raspberry-pi_v4_0_0.ppn") SAMPLE_RATE = 16000 CONFIG_DIR = os.path.expanduser("~/.vixy") CONFIG_PATH = os.path.join(CONFIG_DIR, "headmic.json") VAD_AGGRESSIVENESS = 2 # 0-3, higher = more aggressive SILENCE_FRAMES = 50 # ~1.5 sec of silence to stop (at 30ms frames) MAX_RECORDING_FRAMES = 1000 # ~30 sec max EARTAIL_URL = os.environ.get("EARTAIL_URL", "http://bigorin.local:8764") DOA_POLL_HZ = 10 # DoA polling rate EYE_SERVICE_URL = os.environ.get("EYE_SERVICE_URL", "http://localhost:8780") # ============================================================================ # Config persistence # ============================================================================ def load_config() -> dict: if not os.path.exists(CONFIG_PATH): return {} try: with open(CONFIG_PATH) as f: return json.load(f) except Exception as e: logger.warning("Failed to read config: %s", e) return {} def save_config(cfg: dict): os.makedirs(CONFIG_DIR, exist_ok=True) with open(CONFIG_PATH, "w") as f: json.dump(cfg, f, indent=2) # ============================================================================ # XVF3800 + LED Control # ============================================================================ from xvf3800 import XVF3800Manager, learn_devices xvf_manager = XVF3800Manager() LEDS_AVAILABLE = False def leds_wakeup(): if LEDS_AVAILABLE: try: xvf_manager.all_leds_solid(0xFFFFFF) except: pass def leds_listening(): if LEDS_AVAILABLE: try: xvf_manager.all_leds_doa() except: pass def leds_processing(): if LEDS_AVAILABLE: try: xvf_manager.all_leds_breath(0x9400D3) except: pass def leds_enrolling(): if LEDS_AVAILABLE: try: xvf_manager.all_leds_solid(0xFF8C00) except: pass def leds_off(): if LEDS_AVAILABLE: try: xvf_manager.all_leds_off() except: pass # ============================================================================ # State # ============================================================================ class ServiceState: def __init__(self): self.running = False self.listening = False self.recording = False self.processing = False self.last_transcription: Optional[str] = None self.last_wake_time: Optional[float] = None self.wake_count = 0 self.error: Optional[str] = None self.audio_scene: Optional[dict] = None self.sound_classification_enabled: bool = False self.recognized_speaker: Optional[str] = None self.speaker_confidence: float = 0.0 self.speaker_recognition_enabled: bool = False self.enrolling: bool = False self.active_side: str = "left" # which mic array is currently active self.doa: dict = {} # latest DoA from both arrays state = ServiceState() # Sound classifier globals sound_classifier = None sound_ring_buffer = None # collections.deque, filled by listener_loop # Speaker recognizer globals speaker_recognizer = None enrollment_buffer = None # list of frame bytes, set during enrollment enrollment_name = None # Audio stream dual_stream = None # DualAudioStream instance # ============================================================================ # EarTail Transcription # ============================================================================ async def transcribe_audio(audio_data: bytes) -> str: """Send audio to EarTail and get transcription.""" async with httpx.AsyncClient(timeout=120.0) as client: files = {"audio": ("recording.wav", audio_data, "audio/wav")} response = await client.post(f"{EARTAIL_URL}/transcribe/submit", files=files) response.raise_for_status() job_id = response.json().get("job_id") logger.info(f"Transcription job: {job_id}") for _ in range(120): status = await client.get(f"{EARTAIL_URL}/transcribe/status/{job_id}") data = status.json() if data.get("status") == "SUCCESS": result = await client.get(f"{EARTAIL_URL}/transcribe/result/{job_id}") return result.json().get("transcription", "") elif data.get("status") == "FAILURE": raise Exception(f"Transcription failed: {data.get('error')}") await asyncio.sleep(1) raise Exception("Transcription timeout") def transcribe_sync(audio_data: bytes) -> str: """Synchronous wrapper for transcription.""" loop = asyncio.new_event_loop() try: return loop.run_until_complete(transcribe_audio(audio_data)) finally: loop.close() # ============================================================================ # Main Listener Loop (dual-stream) # ============================================================================ def audio_to_wav(frames: List[bytes]) -> bytes: """Convert raw audio frames to WAV format.""" wav_buffer = io.BytesIO() with wave.open(wav_buffer, 'wb') as wf: wf.setnchannels(1) wf.setsampwidth(2) wf.setframerate(SAMPLE_RATE) wf.writeframes(b''.join(frames)) wav_buffer.seek(0) return wav_buffer.read() def listener_loop(): """Main audio processing loop with dual-stream best-beam selection.""" global state, dual_stream logger.info("Initializing Porcupine...") porcupine = None try: porcupine = pvporcupine.create( access_key=PORCUPINE_ACCESS_KEY, keyword_paths=[WAKE_WORD_PATH] ) except Exception as e: logger.error(f"Failed to init Porcupine: {e}") logger.warning("Wake word detection disabled — audio loop continues for classification") vad = webrtcvad.Vad(VAD_AGGRESSIVENESS) # VAD needs 10/20/30ms frames. 30ms at 16kHz = 480 samples vad_frame_bytes = 480 * 2 state.listening = True logger.info("🦊 Wake word listener active - say 'Hey Vivi'!") recording_buffer: List[bytes] = [] silence_count = 0 is_recording = False recording_side: str = "left" try: for frame_data, side in dual_stream.frames(): if not state.running: break state.active_side = side # Feed sound classifier ring buffer if sound_ring_buffer is not None: sound_ring_buffer.append(frame_data) # Feed enrollment buffer if active if enrollment_buffer is not None: enrollment_buffer.append(frame_data) # Check for wake word (skip if Porcupine not available) keyword_index = -1 if porcupine: pcm = struct.unpack_from("h" * 512, frame_data) keyword_index = porcupine.process(pcm) if keyword_index >= 0 and not is_recording: logger.info("🦊 Wake word detected: 'Hey Vivi'! (from %s ear)", side) state.wake_count += 1 state.last_wake_time = time.time() recording_side = side leds_wakeup() time.sleep(0.2) leds_listening() is_recording = True state.recording = True recording_buffer = [] silence_count = 0 logger.info("Recording started (using %s ear)...", recording_side) continue if is_recording: # During recording, use frames from the side that heard the wake word rec_frame = dual_stream.get_side_frame(recording_side) if rec_frame: recording_buffer.append(rec_frame) # Check VAD (use first 480 samples of the 512 frame) vad_data = (rec_frame or frame_data)[:vad_frame_bytes] try: is_speech = vad.is_speech(vad_data, SAMPLE_RATE) except: is_speech = True if is_speech: silence_count = 0 else: silence_count += 1 # Stop conditions should_stop = ( (len(recording_buffer) > 10 and silence_count >= SILENCE_FRAMES) or len(recording_buffer) >= MAX_RECORDING_FRAMES ) if should_stop: logger.info(f"Recording stopped: {len(recording_buffer)} frames") is_recording = False state.recording = False leds_processing() state.processing = True try: wav_data = audio_to_wav(recording_buffer) transcription = transcribe_sync(wav_data) state.last_transcription = transcription logger.info(f"Transcription: {transcription}") except Exception as e: logger.error(f"Transcription error: {e}") state.error = str(e) finally: state.processing = False leds_off() recording_buffer = [] except Exception as e: logger.error(f"Listener error: {e}") state.error = str(e) finally: if porcupine: porcupine.delete() state.listening = False leds_off() logger.info("Listener stopped") # ============================================================================ # Sound Classification Thread # ============================================================================ def sound_classifier_loop(): """Background thread for continuous sound classification.""" global state logger.info("Sound classifier thread started") while state.running: if sound_ring_buffer is None or len(sound_ring_buffer) < 30: time.sleep(0.1) continue try: frames = list(sound_ring_buffer) audio = np.frombuffer(b"".join(frames), dtype=np.int16) result = sound_classifier.classify(audio) # Strip audio_float32 before storing in state (not JSON-serializable) audio_f32 = result.pop("audio_float32", None) state.audio_scene = result # Speaker identification: run when speech detected if speaker_recognizer and result["category"] == "speech" and audio_f32 is not None: try: name, confidence = speaker_recognizer.identify(audio_f32) state.recognized_speaker = name state.speaker_confidence = confidence except Exception as e: logger.warning("Speaker identification error: %s", e) except Exception as e: logger.warning("Sound classification error: %s", e) time.sleep(0.5) logger.info("Sound classifier thread stopped") # ============================================================================ # DoA Polling Thread # ============================================================================ def doa_poll_loop(): """Poll Direction of Arrival from both XVF3800 arrays.""" interval = 1.0 / DOA_POLL_HZ while state.running: try: state.doa = xvf_manager.read_both_doa() except Exception as e: logger.debug("DoA poll error: %s", e) time.sleep(interval) def doa_to_gaze() -> Optional[tuple[int, int]]: """Convert the active side's DoA angle to gaze coordinates for the eye service.""" doa = state.doa side = state.active_side if not doa or side not in doa or doa[side] is None: return None if not doa[side].get("vad"): return None import math angle = doa[side]["angle"] rad = math.radians(angle) x = int(127 - 80 * math.sin(rad)) y = int(127 - 40 * math.cos(rad)) return max(0, min(255, x)), max(0, min(255, y)) # ============================================================================ # FastAPI # ============================================================================ app = FastAPI(title="HeadMic", description="Vixy's Ears 🦊👂 (Dual XVF3800)") @app.on_event("startup") async def startup(): global sound_classifier, sound_ring_buffer, speaker_recognizer, dual_stream, LEDS_AVAILABLE state.running = True # --- XVF3800 setup --- cfg = load_config() ears_cfg = cfg.get("ears", {}) if ears_cfg.get("left") and ears_cfg.get("right"): xvf_manager.set_serial_mapping( ears_cfg["left"]["usb_serial"], ears_cfg["right"]["usb_serial"] ) xvf_manager.assign() LEDS_AVAILABLE = bool(xvf_manager.left or xvf_manager.right) # Resolve ALSA devices alsa = xvf_manager.get_alsa_devices() left_dev = alsa.get("left") right_dev = alsa.get("right") if not left_dev: logger.error("No left ear ALSA device found! Check USB connections and firmware.") state.error = "No left ear audio device" else: logger.info("Left ear ALSA: %s", left_dev) if right_dev: logger.info("Right ear ALSA: %s", right_dev) else: logger.warning("Right ear ALSA device not found — running with left ear only") # --- Dual audio stream --- from audio_stream import DualAudioStream dual_stream = DualAudioStream(left_dev or "plughw:0,0", right_dev) dual_stream.start() # --- Sound classifier (optional) --- model_dir = Path(__file__).parent / "models" edgetpu_model_path = model_dir / "yamnet_edgetpu.tflite" model_path = model_dir / "yamnet.tflite" class_map_path = model_dir / "yamnet_class_map.csv" # Auto-detect Edge TPU — probe in subprocess catches segfaults safely use_edgetpu = edgetpu_model_path.exists() active_model = edgetpu_model_path if use_edgetpu else model_path if active_model.exists() and class_map_path.exists(): try: from sound_id import SoundClassifier sound_classifier = SoundClassifier(str(active_model), str(class_map_path), use_edgetpu=use_edgetpu) sound_ring_buffer = collections.deque(maxlen=31) state.sound_classification_enabled = True logger.info("Sound classification enabled") sc_thread = threading.Thread(target=sound_classifier_loop, daemon=True) sc_thread.start() except Exception as e: logger.warning("Sound classification unavailable: %s", e) else: logger.info("Sound classification models not found, skipping") # --- Speaker recognizer (optional) --- try: from speaker_id import SpeakerRecognizer db_path = Path(__file__).parent / "voices.db" speaker_recognizer = SpeakerRecognizer(db_path=str(db_path)) state.speaker_recognition_enabled = True logger.info("Speaker recognition enabled (Resemblyzer)") except Exception as e: logger.warning("Speaker recognition unavailable: %s", e) # --- DoA polling --- if xvf_manager.left or xvf_manager.right: threading.Thread(target=doa_poll_loop, daemon=True).start() logger.info("DoA polling started at %d Hz", DOA_POLL_HZ) # --- Main listener --- thread = threading.Thread(target=listener_loop, daemon=True) thread.start() logger.info("HeadMic started (dual XVF3800)") @app.on_event("shutdown") async def shutdown(): state.running = False leds_off() if dual_stream: dual_stream.stop() # --- Info endpoints --- @app.get("/") async def root(): return { "service": "HeadMic", "description": "Vixy's Ears 🦊👂 (Dual XVF3800)", "wake_word": "Hey Vivi" } @app.get("/health") async def health(): return { "healthy": state.listening and not state.error, "listening": state.listening, "recording": state.recording, "processing": state.processing, "wake_count": state.wake_count, "sound_classification_enabled": state.sound_classification_enabled, "speaker_recognition_enabled": state.speaker_recognition_enabled, "active_side": state.active_side, "error": state.error } @app.get("/status") async def status(): return { "listening": state.listening, "recording": state.recording, "processing": state.processing, "last_transcription": state.last_transcription, "last_wake_time": state.last_wake_time, "wake_count": state.wake_count, "audio_scene": state.audio_scene["dominant_category"] if state.audio_scene else None, "recognized_speaker": state.recognized_speaker, "active_side": state.active_side, "error": state.error } @app.get("/last") async def last(): return { "transcription": state.last_transcription, "wake_time": state.last_wake_time } # --- DoA endpoints --- @app.get("/doa") async def doa(): """Direction of Arrival from both mic arrays.""" return { "doa": state.doa, "active_side": state.active_side, "gaze": doa_to_gaze(), } # --- Device info --- @app.get("/devices") async def devices(): """Status of both XVF3800 arrays.""" alsa = xvf_manager.get_alsa_devices() return { "left": { "connected": bool(xvf_manager.left), "serial": xvf_manager.left.serial if xvf_manager.left else None, "alsa": alsa.get("left"), }, "right": { "connected": bool(xvf_manager.right), "serial": xvf_manager.right.serial if xvf_manager.right else None, "alsa": alsa.get("right"), }, "active_side": state.active_side, } # --- Sound endpoints --- @app.get("/sounds") async def sounds(): """Current audio scene classification.""" if not state.sound_classification_enabled: raise HTTPException(status_code=503, detail="Sound classification not available") if state.audio_scene is None: return {"category": None, "top_classes": [], "dominant_category": None, "timestamp": None, "recognized_speaker": None, "speaker_confidence": 0.0} return { **state.audio_scene, "recognized_speaker": state.recognized_speaker, "speaker_confidence": state.speaker_confidence, } @app.get("/sounds/history") async def sounds_history(seconds: int = 30): """Recent sound classification history.""" if not state.sound_classification_enabled: raise HTTPException(status_code=503, detail="Sound classification not available") if sound_classifier is None: return {"history": []} return {"history": sound_classifier.get_history(seconds)} # --- Speaker endpoints --- @app.post("/speakers/enroll") async def enroll_speaker(name: str = Form(...), audio: UploadFile = File(...)): """Enroll a speaker from uploaded audio file.""" if speaker_recognizer is None: raise HTTPException(status_code=503, detail="Speaker recognition not available") audio_bytes = await audio.read() try: import wave as _wave wav_io = io.BytesIO(audio_bytes) with _wave.open(wav_io, 'rb') as wf: raw = wf.readframes(wf.getnframes()) audio_f32 = np.frombuffer(raw, dtype=np.int16).astype(np.float32) / 32768.0 except Exception: audio_f32 = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32) / 32768.0 try: speaker_recognizer.enroll(name, audio_f32, source="upload") return {"enrolled": name, "speakers": speaker_recognizer.list_speakers()} except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @app.post("/speakers/enroll-from-mic") async def enroll_from_mic(name: str): """Record from live mic for 5 seconds and enroll speaker.""" global enrollment_buffer, enrollment_name if speaker_recognizer is None: raise HTTPException(status_code=503, detail="Speaker recognition not available") if state.enrolling: raise HTTPException(status_code=409, detail="Enrollment already in progress") state.enrolling = True enrollment_buffer = [] enrollment_name = name leds_enrolling() logger.info("Enrollment started for '%s' — recording 5 seconds", name) await asyncio.sleep(5.0) frames = enrollment_buffer enrollment_buffer = None enrollment_name = None state.enrolling = False leds_off() if not frames: raise HTTPException(status_code=500, detail="No audio captured") audio_int16 = np.frombuffer(b"".join(frames), dtype=np.int16) audio_f32 = audio_int16.astype(np.float32) / 32768.0 logger.info("Enrollment audio: %.1f seconds", len(audio_f32) / SAMPLE_RATE) try: speaker_recognizer.enroll(name, audio_f32, source="mic") return {"enrolled": name, "seconds": round(len(audio_f32) / SAMPLE_RATE, 1), "speakers": speaker_recognizer.list_speakers()} except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @app.get("/speakers") async def list_speakers(): """List enrolled speakers.""" if speaker_recognizer is None: raise HTTPException(status_code=503, detail="Speaker recognition not available") return {"speakers": speaker_recognizer.list_speakers()} @app.delete("/speakers/{name}") async def delete_speaker(name: str): """Remove a speaker.""" if speaker_recognizer is None: raise HTTPException(status_code=503, detail="Speaker recognition not available") removed = speaker_recognizer.delete_speaker(name) if removed == 0: raise HTTPException(status_code=404, detail=f"Speaker '{name}' not found") return {"deleted": name, "samples_removed": removed} # ============================================================================ # CLI # ============================================================================ if __name__ == "__main__": import sys if "--learn" in sys.argv: logging.basicConfig(level=logging.INFO) mgr = XVF3800Manager() mgr.assign() # discovers and assigns by bus address order if not mgr.left or not mgr.right: print("[HEADMIC] Need 2 XVF3800 arrays connected for --learn") sys.exit(1) # Light up the first array and ask the user print() print("[HEADMIC] Identifying mic arrays...") print(" One array should be lighting up GREEN now.") mgr.left.led_solid(0x00FF00) mgr.right.led_off() answer = input(" Is the LEFT mic array lit up? [Y/n/swap] ").strip().lower() mgr.left.led_off() if answer in ("n", "no", "swap"): print("[HEADMIC] Swapping left/right assignment") mgr.left, mgr.right = mgr.right, mgr.left # Confirm: light up both with their side color print("[HEADMIC] Confirming: LEFT = green, RIGHT = blue") mgr.left.led_solid(0x00FF00) mgr.right.led_solid(0x0000FF) time.sleep(2) mgr.left.led_off() mgr.right.led_off() # Save config info = {} for label, dev in [("left", mgr.left), ("right", mgr.right)]: entry = {"usb_serial": dev.serial} alsa = mgr.serial_to_alsa(dev.serial) if alsa: entry["alsa_card"] = alsa info[label] = entry cfg = load_config() cfg["ears"] = info save_config(cfg) print(f"[HEADMIC] Saved ear config → {CONFIG_PATH}") print(json.dumps(info, indent=2)) sys.exit(0) import uvicorn uvicorn.run(app, host="0.0.0.0", port=8446)