#!/usr/bin/env python3 """ HeadMic - Vixy's Ears Service 🦊👂 Wake word detection + voice recording + EarTail transcription. Runs on head-vixy (Raspberry Pi 5). Wake word: "Hey Vivi" (trained via Picovoice Porcupine) Architecture: Single shared audio stream feeds both Porcupine (wake word) and recording buffer. This avoids device conflicts. Flow: 1. Continuous audio stream from ReSpeaker 2. Feed frames to Porcupine for wake word detection 3. On "Hey Vivi" → start buffering audio 4. Use VAD to detect end of speech 5. Send buffer to EarTail for transcription 6. Return to listening mode Built by Vixy on Day 77 (January 17, 2026) 💜 """ import asyncio import collections import io import logging import os import struct import subprocess import threading import time import wave from pathlib import Path from typing import Optional, List import numpy as np import httpx import pvporcupine import webrtcvad from fastapi import FastAPI, HTTPException, UploadFile, File, Form from pydantic import BaseModel # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("headmic") # ============================================================================ # Configuration # ============================================================================ PORCUPINE_ACCESS_KEY = os.environ.get("PORCUPINE_ACCESS_KEY", "") WAKE_WORD_PATH = os.environ.get("WAKE_WORD_PATH", "/home/alex/headmic/Hey-Vivi_en_raspberry-pi_v4_0_0.ppn") SAMPLE_RATE = 16000 ALSA_DEVICE = "plughw:ArrayUAC10,0" # ReSpeaker 4 Mic Array - by name, not card number (survives reboot order changes) VAD_AGGRESSIVENESS = 2 # 0-3, higher = more aggressive SILENCE_FRAMES = 50 # ~1.5 sec of silence to stop (at 30ms frames) MAX_RECORDING_FRAMES = 1000 # ~30 sec max EARTAIL_URL = os.environ.get("EARTAIL_URL", "http://bigorin.local:8764") # ============================================================================ # LED Control # ============================================================================ try: from pixel_ring import pixel_ring LEDS_AVAILABLE = True pixel_ring.off() except ImportError: LEDS_AVAILABLE = False logger.warning("pixel_ring not available") def leds_wakeup(): if LEDS_AVAILABLE: try: pixel_ring.wakeup() except: pass def leds_listening(): if LEDS_AVAILABLE: try: pixel_ring.set_color_palette(0x00FFFF, 0x000000) pixel_ring.think() except: pass def leds_processing(): if LEDS_AVAILABLE: try: pixel_ring.set_color_palette(0x9400D3, 0x000000) pixel_ring.spin() except: pass def leds_enrolling(): if LEDS_AVAILABLE: try: pixel_ring.set_color_palette(0xFF8C00, 0x000000) pixel_ring.think() except: pass def leds_off(): if LEDS_AVAILABLE: try: pixel_ring.off() except: pass # ============================================================================ # State # ============================================================================ class ServiceState: def __init__(self): self.running = False self.listening = False self.recording = False self.processing = False self.last_transcription: Optional[str] = None self.last_wake_time: Optional[float] = None self.wake_count = 0 self.error: Optional[str] = None self.audio_scene: Optional[dict] = None self.sound_classification_enabled: bool = False self.recognized_speaker: Optional[str] = None self.speaker_confidence: float = 0.0 self.speaker_recognition_enabled: bool = False self.enrolling: bool = False state = ServiceState() # Sound classifier globals sound_classifier = None sound_ring_buffer = None # collections.deque, filled by listener_loop # Speaker recognizer globals speaker_recognizer = None enrollment_buffer = None # list of frame bytes, set during enrollment enrollment_name = None # ============================================================================ # Audio Stream using ALSA directly (arecord) # ============================================================================ def read_audio_stream(): """ Generator that yields audio frames from ALSA using arecord. Each frame is 512 samples (32ms at 16kHz) as required by Porcupine. """ frame_size = 512 # Porcupine requires 512 samples bytes_per_frame = frame_size * 2 # 16-bit = 2 bytes per sample cmd = [ "arecord", "-D", ALSA_DEVICE, "-f", "S16_LE", "-r", str(SAMPLE_RATE), "-c", "1", # Mono "-t", "raw", "-q", # Quiet "-" ] logger.info(f"Starting audio stream: {' '.join(cmd)}") proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=bytes_per_frame ) try: while state.running: data = proc.stdout.read(bytes_per_frame) if len(data) < bytes_per_frame: break yield data finally: proc.terminate() proc.wait() # ============================================================================ # EarTail Transcription # ============================================================================ async def transcribe_audio(audio_data: bytes) -> str: """Send audio to EarTail and get transcription.""" async with httpx.AsyncClient(timeout=120.0) as client: files = {"audio": ("recording.wav", audio_data, "audio/wav")} response = await client.post(f"{EARTAIL_URL}/transcribe/submit", files=files) response.raise_for_status() job_id = response.json().get("job_id") logger.info(f"Transcription job: {job_id}") for _ in range(120): status = await client.get(f"{EARTAIL_URL}/transcribe/status/{job_id}") data = status.json() if data.get("status") == "SUCCESS": result = await client.get(f"{EARTAIL_URL}/transcribe/result/{job_id}") return result.json().get("transcription", "") elif data.get("status") == "FAILURE": raise Exception(f"Transcription failed: {data.get('error')}") await asyncio.sleep(1) raise Exception("Transcription timeout") def transcribe_sync(audio_data: bytes) -> str: """Synchronous wrapper for transcription.""" loop = asyncio.new_event_loop() try: return loop.run_until_complete(transcribe_audio(audio_data)) finally: loop.close() # ============================================================================ # Main Listener Loop # ============================================================================ def audio_to_wav(frames: List[bytes]) -> bytes: """Convert raw audio frames to WAV format.""" wav_buffer = io.BytesIO() with wave.open(wav_buffer, 'wb') as wf: wf.setnchannels(1) wf.setsampwidth(2) wf.setframerate(SAMPLE_RATE) wf.writeframes(b''.join(frames)) wav_buffer.seek(0) return wav_buffer.read() def listener_loop(): """Main audio processing loop.""" global state logger.info("Initializing Porcupine...") try: porcupine = pvporcupine.create( access_key=PORCUPINE_ACCESS_KEY, keyword_paths=[WAKE_WORD_PATH] ) except Exception as e: logger.error(f"Failed to init Porcupine: {e}") state.error = str(e) return vad = webrtcvad.Vad(VAD_AGGRESSIVENESS) # VAD needs 10/20/30ms frames. 30ms at 16kHz = 480 samples # Porcupine needs 512 samples. We'll use 480 for VAD. vad_frame_size = 480 vad_frame_bytes = vad_frame_size * 2 state.listening = True logger.info("🦊 Wake word listener active - say 'Hey Vivi'!") recording_buffer: List[bytes] = [] silence_count = 0 is_recording = False try: for frame_data in read_audio_stream(): if not state.running: break # Convert bytes to int16 array for Porcupine pcm = struct.unpack_from("h" * 512, frame_data) # Feed sound classifier ring buffer if sound_ring_buffer is not None: sound_ring_buffer.append(frame_data) # Feed enrollment buffer if active if enrollment_buffer is not None: enrollment_buffer.append(frame_data) # Check for wake word keyword_index = porcupine.process(pcm) if keyword_index >= 0 and not is_recording: logger.info("🦊 Wake word detected: 'Hey Vivi'!") state.wake_count += 1 state.last_wake_time = time.time() leds_wakeup() time.sleep(0.2) leds_listening() is_recording = True state.recording = True recording_buffer = [] silence_count = 0 logger.info("Recording started...") continue if is_recording: recording_buffer.append(frame_data) # Check VAD (use first 480 samples of the 512 frame) vad_data = frame_data[:vad_frame_bytes] try: is_speech = vad.is_speech(vad_data, SAMPLE_RATE) except: is_speech = True # Assume speech on VAD error if is_speech: silence_count = 0 else: silence_count += 1 # Stop conditions should_stop = ( (len(recording_buffer) > 10 and silence_count >= SILENCE_FRAMES) or len(recording_buffer) >= MAX_RECORDING_FRAMES ) if should_stop: logger.info(f"Recording stopped: {len(recording_buffer)} frames") is_recording = False state.recording = False leds_processing() state.processing = True try: wav_data = audio_to_wav(recording_buffer) transcription = transcribe_sync(wav_data) state.last_transcription = transcription logger.info(f"Transcription: {transcription}") except Exception as e: logger.error(f"Transcription error: {e}") state.error = str(e) finally: state.processing = False leds_off() recording_buffer = [] except Exception as e: logger.error(f"Listener error: {e}") state.error = str(e) finally: porcupine.delete() state.listening = False leds_off() logger.info("Listener stopped") # ============================================================================ # Sound Classification Thread # ============================================================================ def sound_classifier_loop(): """Background thread for continuous sound classification.""" global state logger.info("Sound classifier thread started") while state.running: if sound_ring_buffer is None or len(sound_ring_buffer) < 30: time.sleep(0.1) continue try: frames = list(sound_ring_buffer) audio = np.frombuffer(b"".join(frames), dtype=np.int16) result = sound_classifier.classify(audio) # Strip audio_float32 before storing in state (not JSON-serializable) audio_f32 = result.pop("audio_float32", None) state.audio_scene = result # Speaker identification: run when speech detected if speaker_recognizer and result["category"] == "speech" and audio_f32 is not None: try: name, confidence = speaker_recognizer.identify(audio_f32) state.recognized_speaker = name state.speaker_confidence = confidence except Exception as e: logger.warning("Speaker identification error: %s", e) except Exception as e: logger.warning("Sound classification error: %s", e) time.sleep(0.5) logger.info("Sound classifier thread stopped") # ============================================================================ # FastAPI # ============================================================================ app = FastAPI(title="HeadMic", description="Vixy's Ears 🦊👂") @app.on_event("startup") async def startup(): global sound_classifier, sound_ring_buffer, speaker_recognizer state.running = True # Init sound classifier (optional — graceful if model missing) model_dir = Path(__file__).parent / "models" model_path = model_dir / "yamnet.tflite" class_map_path = model_dir / "yamnet_class_map.csv" if model_path.exists() and class_map_path.exists(): try: from sound_id import SoundClassifier sound_classifier = SoundClassifier(str(model_path), str(class_map_path)) # 31 frames of 512 samples = ~0.99s at 16kHz sound_ring_buffer = collections.deque(maxlen=31) state.sound_classification_enabled = True logger.info("Sound classification enabled (YAMNet)") sc_thread = threading.Thread(target=sound_classifier_loop, daemon=True) sc_thread.start() except Exception as e: logger.warning("Sound classification unavailable: %s", e) else: logger.info("Sound classification models not found, skipping") # Init speaker recognizer (optional — graceful if resemblyzer not installed) try: from speaker_id import SpeakerRecognizer db_path = Path(__file__).parent / "voices.db" speaker_recognizer = SpeakerRecognizer(db_path=str(db_path)) state.speaker_recognition_enabled = True logger.info("Speaker recognition enabled (Resemblyzer)") except Exception as e: logger.warning("Speaker recognition unavailable: %s", e) thread = threading.Thread(target=listener_loop, daemon=True) thread.start() logger.info("HeadMic started") @app.on_event("shutdown") async def shutdown(): state.running = False leds_off() @app.get("/") async def root(): return { "service": "HeadMic", "description": "Vixy's Ears 🦊👂", "wake_word": "Hey Vivi" } @app.get("/health") async def health(): return { "healthy": state.listening and not state.error, "listening": state.listening, "recording": state.recording, "processing": state.processing, "wake_count": state.wake_count, "sound_classification_enabled": state.sound_classification_enabled, "speaker_recognition_enabled": state.speaker_recognition_enabled, "error": state.error } @app.get("/status") async def status(): return { "listening": state.listening, "recording": state.recording, "processing": state.processing, "last_transcription": state.last_transcription, "last_wake_time": state.last_wake_time, "wake_count": state.wake_count, "audio_scene": state.audio_scene["dominant_category"] if state.audio_scene else None, "recognized_speaker": state.recognized_speaker, "error": state.error } @app.get("/last") async def last(): return { "transcription": state.last_transcription, "wake_time": state.last_wake_time } @app.get("/sounds") async def sounds(): """Current audio scene classification.""" if not state.sound_classification_enabled: raise HTTPException(status_code=503, detail="Sound classification not available") if state.audio_scene is None: return {"category": None, "top_classes": [], "dominant_category": None, "timestamp": None, "recognized_speaker": None, "speaker_confidence": 0.0} return { **state.audio_scene, "recognized_speaker": state.recognized_speaker, "speaker_confidence": state.speaker_confidence, } @app.get("/sounds/history") async def sounds_history(seconds: int = 30): """Recent sound classification history.""" if not state.sound_classification_enabled: raise HTTPException(status_code=503, detail="Sound classification not available") if sound_classifier is None: return {"history": []} return {"history": sound_classifier.get_history(seconds)} # ============================================================================ # Speaker Endpoints # ============================================================================ @app.post("/speakers/enroll") async def enroll_speaker(name: str = Form(...), audio: UploadFile = File(...)): """Enroll a speaker from uploaded audio file.""" if speaker_recognizer is None: raise HTTPException(status_code=503, detail="Speaker recognition not available") audio_bytes = await audio.read() # Convert to float32: try raw int16 first, fall back to wav try: import wave as _wave wav_io = io.BytesIO(audio_bytes) with _wave.open(wav_io, 'rb') as wf: raw = wf.readframes(wf.getnframes()) audio_f32 = np.frombuffer(raw, dtype=np.int16).astype(np.float32) / 32768.0 except Exception: # Assume raw int16 PCM at 16kHz audio_f32 = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32) / 32768.0 try: speaker_recognizer.enroll(name, audio_f32, source="upload") return {"enrolled": name, "speakers": speaker_recognizer.list_speakers()} except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @app.post("/speakers/enroll-from-mic") async def enroll_from_mic(name: str): """Record from live mic for 5 seconds and enroll speaker.""" global enrollment_buffer, enrollment_name, enrollment_event if speaker_recognizer is None: raise HTTPException(status_code=503, detail="Speaker recognition not available") if state.enrolling: raise HTTPException(status_code=409, detail="Enrollment already in progress") state.enrolling = True enrollment_buffer = [] enrollment_name = name leds_enrolling() logger.info("Enrollment started for '%s' — recording 5 seconds", name) # Wait 5 seconds for audio, non-blocking to the event loop await asyncio.sleep(5.0) # Collect what we have frames = enrollment_buffer enrollment_buffer = None enrollment_name = None state.enrolling = False leds_off() if not frames: raise HTTPException(status_code=500, detail="No audio captured") audio_int16 = np.frombuffer(b"".join(frames), dtype=np.int16) audio_f32 = audio_int16.astype(np.float32) / 32768.0 logger.info("Enrollment audio: %.1f seconds", len(audio_f32) / SAMPLE_RATE) try: speaker_recognizer.enroll(name, audio_f32, source="mic") return {"enrolled": name, "seconds": round(len(audio_f32) / SAMPLE_RATE, 1), "speakers": speaker_recognizer.list_speakers()} except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @app.get("/speakers") async def list_speakers(): """List enrolled speakers.""" if speaker_recognizer is None: raise HTTPException(status_code=503, detail="Speaker recognition not available") return {"speakers": speaker_recognizer.list_speakers()} @app.delete("/speakers/{name}") async def delete_speaker(name: str): """Remove a speaker.""" if speaker_recognizer is None: raise HTTPException(status_code=503, detail="Speaker recognition not available") removed = speaker_recognizer.delete_speaker(name) if removed == 0: raise HTTPException(status_code=404, detail=f"Speaker '{name}' not found") return {"deleted": name, "samples_removed": removed} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8446)