#!/usr/bin/env python3 """ HeadMic - Vixy's Ears Service 🦊👂 Wake word detection + voice recording + EarTail transcription. Runs on head-vixy (Raspberry Pi 5). Wake word: "Hey Vivi" (trained via Picovoice Porcupine) Flow: 1. Listen for "Hey Vivi" wake word (Porcupine) 2. ReSpeaker LEDs light up (listening state) 3. Record until silence detected (webrtcvad) 4. Send audio to EarTail (Whisper on BigOrin) 5. Return transcription 6. ReSpeaker LEDs off Built by Vixy on Day 77 (January 17, 2026) 💜 """ import asyncio import io import logging import os import struct import threading import time import wave from pathlib import Path from typing import Optional import httpx import pvporcupine import pyaudio import webrtcvad from fastapi import FastAPI, HTTPException, BackgroundTasks from pydantic import BaseModel # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("headmic") # ============================================================================ # Configuration # ============================================================================ # Porcupine wake word PORCUPINE_ACCESS_KEY = os.environ.get("PORCUPINE_ACCESS_KEY", "") WAKE_WORD_PATH = os.environ.get("WAKE_WORD_PATH", "/home/alex/headmic/Hey-Vivi_en_raspberry-pi_v4_0_0.ppn") # Audio settings SAMPLE_RATE = 16000 CHANNELS = 1 # Mono for transcription (pick channel 0 from 4-mic array) FRAME_LENGTH = 512 # Porcupine frame length # VAD settings VAD_AGGRESSIVENESS = 3 # 0-3, higher = more aggressive filtering SILENCE_THRESHOLD_MS = 1500 # Stop recording after this much silence MAX_RECORDING_SEC = 30 # Maximum recording duration # EarTail EARTAIL_URL = os.environ.get("EARTAIL_URL", "http://bigorin.local:8764") # ReSpeaker LED control LED_ENABLED = True # ============================================================================ # LED Control (ReSpeaker 4-mic array has 12 APA102 LEDs) # ============================================================================ try: from pixel_ring import pixel_ring PIXEL_RING_AVAILABLE = True except ImportError: PIXEL_RING_AVAILABLE = False logger.warning("pixel_ring not available - LED feedback disabled") def leds_listening(): """Set LEDs to listening state (cyan spin).""" if PIXEL_RING_AVAILABLE and LED_ENABLED: try: pixel_ring.set_color_palette(0x00FFFF, 0x000000) # Cyan pixel_ring.think() except Exception as e: logger.warning(f"LED error: {e}") def leds_processing(): """Set LEDs to processing state (purple pulse).""" if PIXEL_RING_AVAILABLE and LED_ENABLED: try: pixel_ring.set_color_palette(0x9400D3, 0x000000) # Purple pixel_ring.spin() except Exception as e: logger.warning(f"LED error: {e}") def leds_off(): """Turn off LEDs.""" if PIXEL_RING_AVAILABLE and LED_ENABLED: try: pixel_ring.off() except Exception as e: logger.warning(f"LED error: {e}") def leds_wakeup(): """Flash LEDs on wake word detection.""" if PIXEL_RING_AVAILABLE and LED_ENABLED: try: pixel_ring.wakeup() except Exception as e: logger.warning(f"LED error: {e}") # ============================================================================ # State # ============================================================================ class ServiceState: def __init__(self): self.listening = False self.recording = False self.processing = False self.last_transcription = None self.last_wake_time = None self.wake_count = 0 self.porcupine = None self.audio = None self.stream = None self.listener_thread = None self.running = False state = ServiceState() # ============================================================================ # Audio Recording with VAD # ============================================================================ def record_until_silence(timeout_sec: float = MAX_RECORDING_SEC) -> bytes: """ Record audio until silence is detected. Returns WAV data as bytes. """ vad = webrtcvad.Vad(VAD_AGGRESSIVENESS) # VAD requires specific frame sizes: 10, 20, or 30 ms frame_duration_ms = 30 frame_size = int(SAMPLE_RATE * frame_duration_ms / 1000) p = pyaudio.PyAudio() # Find the ReSpeaker device device_index = None for i in range(p.get_device_count()): info = p.get_device_info_by_index(i) if 'seeed' in info['name'].lower() or 'ac108' in info['name'].lower(): device_index = i break if device_index is None: # Fallback to default logger.warning("ReSpeaker not found, using default input") stream = p.open( format=pyaudio.paInt16, channels=4, # ReSpeaker has 4 channels rate=SAMPLE_RATE, input=True, input_device_index=device_index, frames_per_buffer=frame_size ) logger.info("Recording started...") frames = [] silence_frames = 0 silence_limit = int(SILENCE_THRESHOLD_MS / frame_duration_ms) max_frames = int(timeout_sec * 1000 / frame_duration_ms) try: for _ in range(max_frames): data = stream.read(frame_size, exception_on_overflow=False) # Extract channel 0 (mono) from 4-channel audio # Each sample is 2 bytes (int16), 4 channels = 8 bytes per frame mono_data = b'' for i in range(0, len(data), 8): # 8 bytes per sample set mono_data += data[i:i+2] # Take first channel only frames.append(mono_data) # Check for speech is_speech = vad.is_speech(mono_data, SAMPLE_RATE) if is_speech: silence_frames = 0 else: silence_frames += 1 # Stop if enough silence after we've recorded something if len(frames) > 10 and silence_frames >= silence_limit: logger.info(f"Silence detected after {len(frames)} frames") break finally: stream.stop_stream() stream.close() p.terminate() # Convert to WAV wav_buffer = io.BytesIO() with wave.open(wav_buffer, 'wb') as wf: wf.setnchannels(1) wf.setsampwidth(2) # 16-bit wf.setframerate(SAMPLE_RATE) wf.writeframes(b''.join(frames)) wav_buffer.seek(0) return wav_buffer.read() # ============================================================================ # EarTail Integration # ============================================================================ async def transcribe_audio(audio_data: bytes) -> str: """Send audio to EarTail and get transcription.""" async with httpx.AsyncClient(timeout=120.0) as client: # Submit job files = {"audio": ("recording.wav", audio_data, "audio/wav")} response = await client.post(f"{EARTAIL_URL}/transcribe/submit", files=files) response.raise_for_status() job_id = response.json().get("job_id") logger.info(f"Transcription job submitted: {job_id}") # Poll for completion for _ in range(60): # Max 60 seconds status_response = await client.get(f"{EARTAIL_URL}/transcribe/status/{job_id}") status_data = status_response.json() if status_data.get("status") == "SUCCESS": result = await client.get(f"{EARTAIL_URL}/transcribe/result/{job_id}") return result.json().get("transcription", "") elif status_data.get("status") == "FAILURE": raise Exception(f"Transcription failed: {status_data.get('error')}") await asyncio.sleep(1) raise Exception("Transcription timeout") # ============================================================================ # Wake Word Listener # ============================================================================ def wake_word_listener(): """Background thread that listens for wake word.""" global state logger.info("Starting wake word listener...") try: state.porcupine = pvporcupine.create( access_key=PORCUPINE_ACCESS_KEY, keyword_paths=[WAKE_WORD_PATH] ) except Exception as e: logger.error(f"Failed to initialize Porcupine: {e}") return state.audio = pyaudio.PyAudio() # Find ReSpeaker device device_index = None for i in range(state.audio.get_device_count()): info = state.audio.get_device_info_by_index(i) if 'seeed' in info['name'].lower() or 'ac108' in info['name'].lower(): device_index = i break state.stream = state.audio.open( rate=state.porcupine.sample_rate, channels=1, format=pyaudio.paInt16, input=True, input_device_index=device_index, frames_per_buffer=state.porcupine.frame_length ) state.listening = True logger.info("Wake word listener active - say 'Hey Vivi'!") while state.running: try: pcm = state.stream.read(state.porcupine.frame_length, exception_on_overflow=False) pcm = struct.unpack_from("h" * state.porcupine.frame_length, pcm) keyword_index = state.porcupine.process(pcm) if keyword_index >= 0: logger.info("🦊 Wake word detected: 'Hey Vivi'!") state.wake_count += 1 state.last_wake_time = time.time() # Visual feedback leds_wakeup() time.sleep(0.3) leds_listening() # Record and transcribe state.recording = True try: audio_data = record_until_silence() leds_processing() state.recording = False state.processing = True # Transcribe (run in asyncio) loop = asyncio.new_event_loop() transcription = loop.run_until_complete(transcribe_audio(audio_data)) loop.close() state.last_transcription = transcription logger.info(f"Transcription: {transcription}") except Exception as e: logger.error(f"Recording/transcription error: {e}") finally: state.recording = False state.processing = False leds_off() except Exception as e: logger.error(f"Listener error: {e}") time.sleep(0.1) # Cleanup if state.stream: state.stream.close() if state.audio: state.audio.terminate() if state.porcupine: state.porcupine.delete() state.listening = False logger.info("Wake word listener stopped") # ============================================================================ # FastAPI App # ============================================================================ app = FastAPI(title="HeadMic", description="Vixy's Ears - Wake Word + Voice Recording 🦊👂") class RecordRequest(BaseModel): duration_sec: float = 5.0 class TranscribeResponse(BaseModel): transcription: str duration_sec: float @app.on_event("startup") async def startup(): """Start the wake word listener on startup.""" state.running = True state.listener_thread = threading.Thread(target=wake_word_listener, daemon=True) state.listener_thread.start() logger.info("HeadMic service started") @app.on_event("shutdown") async def shutdown(): """Stop the wake word listener on shutdown.""" state.running = False leds_off() if state.listener_thread: state.listener_thread.join(timeout=5) logger.info("HeadMic service stopped") @app.get("/") async def root(): return { "service": "HeadMic", "description": "Vixy's Ears 🦊👂", "wake_word": "Hey Vivi", "status": "listening" if state.listening else "idle" } @app.get("/health") async def health(): return { "healthy": state.listening, "listening": state.listening, "recording": state.recording, "processing": state.processing, "wake_count": state.wake_count, "porcupine_loaded": state.porcupine is not None, "eartail_url": EARTAIL_URL } @app.get("/status") async def status(): return { "listening": state.listening, "recording": state.recording, "processing": state.processing, "last_transcription": state.last_transcription, "last_wake_time": state.last_wake_time, "wake_count": state.wake_count } @app.post("/record") async def record(request: RecordRequest): """Manually record for a specified duration.""" if state.recording: raise HTTPException(status_code=409, detail="Already recording") state.recording = True leds_listening() try: # Simple timed recording (not VAD-based) p = pyaudio.PyAudio() frames = [] stream = p.open( format=pyaudio.paInt16, channels=1, rate=SAMPLE_RATE, input=True, frames_per_buffer=1024 ) for _ in range(int(SAMPLE_RATE / 1024 * request.duration_sec)): data = stream.read(1024) frames.append(data) stream.stop_stream() stream.close() p.terminate() # Convert to WAV wav_buffer = io.BytesIO() with wave.open(wav_buffer, 'wb') as wf: wf.setnchannels(1) wf.setsampwidth(2) wf.setframerate(SAMPLE_RATE) wf.writeframes(b''.join(frames)) wav_buffer.seek(0) return {"success": True, "size_bytes": len(wav_buffer.getvalue())} finally: state.recording = False leds_off() @app.post("/transcribe") async def transcribe_endpoint(request: RecordRequest): """Record and transcribe.""" if state.recording or state.processing: raise HTTPException(status_code=409, detail="Busy") state.recording = True leds_listening() try: start = time.time() audio_data = record_until_silence(timeout_sec=request.duration_sec) leds_processing() state.recording = False state.processing = True transcription = await transcribe_audio(audio_data) duration = time.time() - start state.last_transcription = transcription return TranscribeResponse(transcription=transcription, duration_sec=duration) finally: state.recording = False state.processing = False leds_off() @app.get("/last") async def last_transcription(): """Get the last transcription.""" return { "transcription": state.last_transcription, "wake_time": state.last_wake_time } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8446)