Files
headmic/headmic.py
Alex f9a25eb5d8 Keep audio loop running when Porcupine key is missing
Without this fix, listener_loop exits early on Porcupine init failure,
which starves the sound classifier ring buffer. Now the audio loop
continues for YAMNet classification even without wake word detection.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 16:56:45 -05:00

771 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
HeadMic - Vixy's Ears Service 🦊👂
Wake word detection + voice recording + EarTail transcription.
Runs on head-vixy (Raspberry Pi 5).
Wake word: "Hey Vivi" (trained via Picovoice Porcupine)
Architecture: Dual XVF3800 mic arrays (left/right ear), best-beam selection.
Single shared audio stream feeds Porcupine, VAD, sound classification, and speaker ID.
Flow:
1. Dual audio streams from two XVF3800 arrays
2. Best-beam selection (higher energy side)
3. Feed frames to Porcupine for wake word detection
4. On "Hey Vivi" → start buffering from active side
5. Use VAD to detect end of speech
6. Send buffer to EarTail for transcription
7. Return to listening mode
Hardware: 2× ReSpeaker XVF3800 4-Mic Array (USB, 2-channel firmware)
DoA + LEDs via USB vendor control (xvf3800.py)
Built by Vixy on Day 77 (January 17, 2026) 💜
Upgraded to dual XVF3800 on Day 160 (April 2026)
"""
import asyncio
import collections
import io
import json
import logging
import os
import struct
import threading
import time
import wave
from pathlib import Path
from typing import Optional, List
import numpy as np
import httpx
import pvporcupine
import webrtcvad
from fastapi import FastAPI, HTTPException, UploadFile, File, Form
from pydantic import BaseModel
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("headmic")
# ============================================================================
# Configuration
# ============================================================================
PORCUPINE_ACCESS_KEY = os.environ.get("PORCUPINE_ACCESS_KEY", "")
WAKE_WORD_PATH = os.environ.get("WAKE_WORD_PATH", "/home/alex/headmic/Hey-Vivi_en_raspberry-pi_v4_0_0.ppn")
SAMPLE_RATE = 16000
CONFIG_DIR = os.path.expanduser("~/.vixy")
CONFIG_PATH = os.path.join(CONFIG_DIR, "headmic.json")
VAD_AGGRESSIVENESS = 2 # 0-3, higher = more aggressive
SILENCE_FRAMES = 50 # ~1.5 sec of silence to stop (at 30ms frames)
MAX_RECORDING_FRAMES = 1000 # ~30 sec max
EARTAIL_URL = os.environ.get("EARTAIL_URL", "http://bigorin.local:8764")
DOA_POLL_HZ = 10 # DoA polling rate
EYE_SERVICE_URL = os.environ.get("EYE_SERVICE_URL", "http://localhost:8780")
# ============================================================================
# Config persistence
# ============================================================================
def load_config() -> dict:
if not os.path.exists(CONFIG_PATH):
return {}
try:
with open(CONFIG_PATH) as f:
return json.load(f)
except Exception as e:
logger.warning("Failed to read config: %s", e)
return {}
def save_config(cfg: dict):
os.makedirs(CONFIG_DIR, exist_ok=True)
with open(CONFIG_PATH, "w") as f:
json.dump(cfg, f, indent=2)
# ============================================================================
# XVF3800 + LED Control
# ============================================================================
from xvf3800 import XVF3800Manager, learn_devices
xvf_manager = XVF3800Manager()
LEDS_AVAILABLE = False
def leds_wakeup():
if LEDS_AVAILABLE:
try:
xvf_manager.all_leds_solid(0xFFFFFF)
except: pass
def leds_listening():
if LEDS_AVAILABLE:
try:
xvf_manager.all_leds_doa()
except: pass
def leds_processing():
if LEDS_AVAILABLE:
try:
xvf_manager.all_leds_breath(0x9400D3)
except: pass
def leds_enrolling():
if LEDS_AVAILABLE:
try:
xvf_manager.all_leds_solid(0xFF8C00)
except: pass
def leds_off():
if LEDS_AVAILABLE:
try:
xvf_manager.all_leds_off()
except: pass
# ============================================================================
# State
# ============================================================================
class ServiceState:
def __init__(self):
self.running = False
self.listening = False
self.recording = False
self.processing = False
self.last_transcription: Optional[str] = None
self.last_wake_time: Optional[float] = None
self.wake_count = 0
self.error: Optional[str] = None
self.audio_scene: Optional[dict] = None
self.sound_classification_enabled: bool = False
self.recognized_speaker: Optional[str] = None
self.speaker_confidence: float = 0.0
self.speaker_recognition_enabled: bool = False
self.enrolling: bool = False
self.active_side: str = "left" # which mic array is currently active
self.doa: dict = {} # latest DoA from both arrays
state = ServiceState()
# Sound classifier globals
sound_classifier = None
sound_ring_buffer = None # collections.deque, filled by listener_loop
# Speaker recognizer globals
speaker_recognizer = None
enrollment_buffer = None # list of frame bytes, set during enrollment
enrollment_name = None
# Audio stream
dual_stream = None # DualAudioStream instance
# ============================================================================
# EarTail Transcription
# ============================================================================
async def transcribe_audio(audio_data: bytes) -> str:
"""Send audio to EarTail and get transcription."""
async with httpx.AsyncClient(timeout=120.0) as client:
files = {"audio": ("recording.wav", audio_data, "audio/wav")}
response = await client.post(f"{EARTAIL_URL}/transcribe/submit", files=files)
response.raise_for_status()
job_id = response.json().get("job_id")
logger.info(f"Transcription job: {job_id}")
for _ in range(120):
status = await client.get(f"{EARTAIL_URL}/transcribe/status/{job_id}")
data = status.json()
if data.get("status") == "SUCCESS":
result = await client.get(f"{EARTAIL_URL}/transcribe/result/{job_id}")
return result.json().get("transcription", "")
elif data.get("status") == "FAILURE":
raise Exception(f"Transcription failed: {data.get('error')}")
await asyncio.sleep(1)
raise Exception("Transcription timeout")
def transcribe_sync(audio_data: bytes) -> str:
"""Synchronous wrapper for transcription."""
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(transcribe_audio(audio_data))
finally:
loop.close()
# ============================================================================
# Main Listener Loop (dual-stream)
# ============================================================================
def audio_to_wav(frames: List[bytes]) -> bytes:
"""Convert raw audio frames to WAV format."""
wav_buffer = io.BytesIO()
with wave.open(wav_buffer, 'wb') as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(SAMPLE_RATE)
wf.writeframes(b''.join(frames))
wav_buffer.seek(0)
return wav_buffer.read()
def listener_loop():
"""Main audio processing loop with dual-stream best-beam selection."""
global state, dual_stream
logger.info("Initializing Porcupine...")
porcupine = None
try:
porcupine = pvporcupine.create(
access_key=PORCUPINE_ACCESS_KEY,
keyword_paths=[WAKE_WORD_PATH]
)
except Exception as e:
logger.error(f"Failed to init Porcupine: {e}")
logger.warning("Wake word detection disabled — audio loop continues for classification")
vad = webrtcvad.Vad(VAD_AGGRESSIVENESS)
# VAD needs 10/20/30ms frames. 30ms at 16kHz = 480 samples
vad_frame_bytes = 480 * 2
state.listening = True
logger.info("🦊 Wake word listener active - say 'Hey Vivi'!")
recording_buffer: List[bytes] = []
silence_count = 0
is_recording = False
recording_side: str = "left"
try:
for frame_data, side in dual_stream.frames():
if not state.running:
break
state.active_side = side
# Feed sound classifier ring buffer
if sound_ring_buffer is not None:
sound_ring_buffer.append(frame_data)
# Feed enrollment buffer if active
if enrollment_buffer is not None:
enrollment_buffer.append(frame_data)
# Check for wake word (skip if Porcupine not available)
keyword_index = -1
if porcupine:
pcm = struct.unpack_from("h" * 512, frame_data)
keyword_index = porcupine.process(pcm)
if keyword_index >= 0 and not is_recording:
logger.info("🦊 Wake word detected: 'Hey Vivi'! (from %s ear)", side)
state.wake_count += 1
state.last_wake_time = time.time()
recording_side = side
leds_wakeup()
time.sleep(0.2)
leds_listening()
is_recording = True
state.recording = True
recording_buffer = []
silence_count = 0
logger.info("Recording started (using %s ear)...", recording_side)
continue
if is_recording:
# During recording, use frames from the side that heard the wake word
rec_frame = dual_stream.get_side_frame(recording_side)
if rec_frame:
recording_buffer.append(rec_frame)
# Check VAD (use first 480 samples of the 512 frame)
vad_data = (rec_frame or frame_data)[:vad_frame_bytes]
try:
is_speech = vad.is_speech(vad_data, SAMPLE_RATE)
except:
is_speech = True
if is_speech:
silence_count = 0
else:
silence_count += 1
# Stop conditions
should_stop = (
(len(recording_buffer) > 10 and silence_count >= SILENCE_FRAMES) or
len(recording_buffer) >= MAX_RECORDING_FRAMES
)
if should_stop:
logger.info(f"Recording stopped: {len(recording_buffer)} frames")
is_recording = False
state.recording = False
leds_processing()
state.processing = True
try:
wav_data = audio_to_wav(recording_buffer)
transcription = transcribe_sync(wav_data)
state.last_transcription = transcription
logger.info(f"Transcription: {transcription}")
except Exception as e:
logger.error(f"Transcription error: {e}")
state.error = str(e)
finally:
state.processing = False
leds_off()
recording_buffer = []
except Exception as e:
logger.error(f"Listener error: {e}")
state.error = str(e)
finally:
if porcupine:
porcupine.delete()
state.listening = False
leds_off()
logger.info("Listener stopped")
# ============================================================================
# Sound Classification Thread
# ============================================================================
def sound_classifier_loop():
"""Background thread for continuous sound classification."""
global state
logger.info("Sound classifier thread started")
while state.running:
if sound_ring_buffer is None or len(sound_ring_buffer) < 30:
time.sleep(0.1)
continue
try:
frames = list(sound_ring_buffer)
audio = np.frombuffer(b"".join(frames), dtype=np.int16)
result = sound_classifier.classify(audio)
# Strip audio_float32 before storing in state (not JSON-serializable)
audio_f32 = result.pop("audio_float32", None)
state.audio_scene = result
# Speaker identification: run when speech detected
if speaker_recognizer and result["category"] == "speech" and audio_f32 is not None:
try:
name, confidence = speaker_recognizer.identify(audio_f32)
state.recognized_speaker = name
state.speaker_confidence = confidence
except Exception as e:
logger.warning("Speaker identification error: %s", e)
except Exception as e:
logger.warning("Sound classification error: %s", e)
time.sleep(0.5)
logger.info("Sound classifier thread stopped")
# ============================================================================
# DoA Polling Thread
# ============================================================================
def doa_poll_loop():
"""Poll Direction of Arrival from both XVF3800 arrays."""
interval = 1.0 / DOA_POLL_HZ
while state.running:
try:
state.doa = xvf_manager.read_both_doa()
except Exception as e:
logger.debug("DoA poll error: %s", e)
time.sleep(interval)
def doa_to_gaze() -> Optional[tuple[int, int]]:
"""Convert the active side's DoA angle to gaze coordinates for the eye service."""
doa = state.doa
side = state.active_side
if not doa or side not in doa or doa[side] is None:
return None
if not doa[side].get("vad"):
return None
import math
angle = doa[side]["angle"]
rad = math.radians(angle)
x = int(127 - 80 * math.sin(rad))
y = int(127 - 40 * math.cos(rad))
return max(0, min(255, x)), max(0, min(255, y))
# ============================================================================
# FastAPI
# ============================================================================
app = FastAPI(title="HeadMic", description="Vixy's Ears 🦊👂 (Dual XVF3800)")
@app.on_event("startup")
async def startup():
global sound_classifier, sound_ring_buffer, speaker_recognizer, dual_stream, LEDS_AVAILABLE
state.running = True
# --- XVF3800 setup ---
cfg = load_config()
ears_cfg = cfg.get("ears", {})
if ears_cfg.get("left") and ears_cfg.get("right"):
xvf_manager.set_serial_mapping(
ears_cfg["left"]["usb_serial"],
ears_cfg["right"]["usb_serial"]
)
xvf_manager.assign()
LEDS_AVAILABLE = bool(xvf_manager.left or xvf_manager.right)
# Resolve ALSA devices
alsa = xvf_manager.get_alsa_devices()
left_dev = alsa.get("left")
right_dev = alsa.get("right")
if not left_dev:
logger.error("No left ear ALSA device found! Check USB connections and firmware.")
state.error = "No left ear audio device"
else:
logger.info("Left ear ALSA: %s", left_dev)
if right_dev:
logger.info("Right ear ALSA: %s", right_dev)
else:
logger.warning("Right ear ALSA device not found — running with left ear only")
# --- Dual audio stream ---
from audio_stream import DualAudioStream
dual_stream = DualAudioStream(left_dev or "plughw:0,0", right_dev)
dual_stream.start()
# --- Sound classifier (optional) ---
model_dir = Path(__file__).parent / "models"
edgetpu_model_path = model_dir / "yamnet_edgetpu.tflite"
model_path = model_dir / "yamnet.tflite"
class_map_path = model_dir / "yamnet_class_map.csv"
# Prefer Edge TPU model if available
use_edgetpu = edgetpu_model_path.exists()
active_model = edgetpu_model_path if use_edgetpu else model_path
if active_model.exists() and class_map_path.exists():
try:
from sound_id import SoundClassifier
sound_classifier = SoundClassifier(str(active_model), str(class_map_path), use_edgetpu=use_edgetpu)
sound_ring_buffer = collections.deque(maxlen=31)
state.sound_classification_enabled = True
logger.info("Sound classification enabled (YAMNet %s)", "Edge TPU" if use_edgetpu else "CPU")
sc_thread = threading.Thread(target=sound_classifier_loop, daemon=True)
sc_thread.start()
except Exception as e:
logger.warning("Sound classification unavailable: %s", e)
else:
logger.info("Sound classification models not found, skipping")
# --- Speaker recognizer (optional) ---
try:
from speaker_id import SpeakerRecognizer
db_path = Path(__file__).parent / "voices.db"
speaker_recognizer = SpeakerRecognizer(db_path=str(db_path))
state.speaker_recognition_enabled = True
logger.info("Speaker recognition enabled (Resemblyzer)")
except Exception as e:
logger.warning("Speaker recognition unavailable: %s", e)
# --- DoA polling ---
if xvf_manager.left or xvf_manager.right:
threading.Thread(target=doa_poll_loop, daemon=True).start()
logger.info("DoA polling started at %d Hz", DOA_POLL_HZ)
# --- Main listener ---
thread = threading.Thread(target=listener_loop, daemon=True)
thread.start()
logger.info("HeadMic started (dual XVF3800)")
@app.on_event("shutdown")
async def shutdown():
state.running = False
leds_off()
if dual_stream:
dual_stream.stop()
# --- Info endpoints ---
@app.get("/")
async def root():
return {
"service": "HeadMic",
"description": "Vixy's Ears 🦊👂 (Dual XVF3800)",
"wake_word": "Hey Vivi"
}
@app.get("/health")
async def health():
return {
"healthy": state.listening and not state.error,
"listening": state.listening,
"recording": state.recording,
"processing": state.processing,
"wake_count": state.wake_count,
"sound_classification_enabled": state.sound_classification_enabled,
"speaker_recognition_enabled": state.speaker_recognition_enabled,
"active_side": state.active_side,
"error": state.error
}
@app.get("/status")
async def status():
return {
"listening": state.listening,
"recording": state.recording,
"processing": state.processing,
"last_transcription": state.last_transcription,
"last_wake_time": state.last_wake_time,
"wake_count": state.wake_count,
"audio_scene": state.audio_scene["dominant_category"] if state.audio_scene else None,
"recognized_speaker": state.recognized_speaker,
"active_side": state.active_side,
"error": state.error
}
@app.get("/last")
async def last():
return {
"transcription": state.last_transcription,
"wake_time": state.last_wake_time
}
# --- DoA endpoints ---
@app.get("/doa")
async def doa():
"""Direction of Arrival from both mic arrays."""
return {
"doa": state.doa,
"active_side": state.active_side,
"gaze": doa_to_gaze(),
}
# --- Device info ---
@app.get("/devices")
async def devices():
"""Status of both XVF3800 arrays."""
alsa = xvf_manager.get_alsa_devices()
return {
"left": {
"connected": bool(xvf_manager.left),
"serial": xvf_manager.left.serial if xvf_manager.left else None,
"alsa": alsa.get("left"),
},
"right": {
"connected": bool(xvf_manager.right),
"serial": xvf_manager.right.serial if xvf_manager.right else None,
"alsa": alsa.get("right"),
},
"active_side": state.active_side,
}
# --- Sound endpoints ---
@app.get("/sounds")
async def sounds():
"""Current audio scene classification."""
if not state.sound_classification_enabled:
raise HTTPException(status_code=503, detail="Sound classification not available")
if state.audio_scene is None:
return {"category": None, "top_classes": [], "dominant_category": None, "timestamp": None,
"recognized_speaker": None, "speaker_confidence": 0.0}
return {
**state.audio_scene,
"recognized_speaker": state.recognized_speaker,
"speaker_confidence": state.speaker_confidence,
}
@app.get("/sounds/history")
async def sounds_history(seconds: int = 30):
"""Recent sound classification history."""
if not state.sound_classification_enabled:
raise HTTPException(status_code=503, detail="Sound classification not available")
if sound_classifier is None:
return {"history": []}
return {"history": sound_classifier.get_history(seconds)}
# --- Speaker endpoints ---
@app.post("/speakers/enroll")
async def enroll_speaker(name: str = Form(...), audio: UploadFile = File(...)):
"""Enroll a speaker from uploaded audio file."""
if speaker_recognizer is None:
raise HTTPException(status_code=503, detail="Speaker recognition not available")
audio_bytes = await audio.read()
try:
import wave as _wave
wav_io = io.BytesIO(audio_bytes)
with _wave.open(wav_io, 'rb') as wf:
raw = wf.readframes(wf.getnframes())
audio_f32 = np.frombuffer(raw, dtype=np.int16).astype(np.float32) / 32768.0
except Exception:
audio_f32 = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32) / 32768.0
try:
speaker_recognizer.enroll(name, audio_f32, source="upload")
return {"enrolled": name, "speakers": speaker_recognizer.list_speakers()}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/speakers/enroll-from-mic")
async def enroll_from_mic(name: str):
"""Record from live mic for 5 seconds and enroll speaker."""
global enrollment_buffer, enrollment_name
if speaker_recognizer is None:
raise HTTPException(status_code=503, detail="Speaker recognition not available")
if state.enrolling:
raise HTTPException(status_code=409, detail="Enrollment already in progress")
state.enrolling = True
enrollment_buffer = []
enrollment_name = name
leds_enrolling()
logger.info("Enrollment started for '%s' — recording 5 seconds", name)
await asyncio.sleep(5.0)
frames = enrollment_buffer
enrollment_buffer = None
enrollment_name = None
state.enrolling = False
leds_off()
if not frames:
raise HTTPException(status_code=500, detail="No audio captured")
audio_int16 = np.frombuffer(b"".join(frames), dtype=np.int16)
audio_f32 = audio_int16.astype(np.float32) / 32768.0
logger.info("Enrollment audio: %.1f seconds", len(audio_f32) / SAMPLE_RATE)
try:
speaker_recognizer.enroll(name, audio_f32, source="mic")
return {"enrolled": name, "seconds": round(len(audio_f32) / SAMPLE_RATE, 1),
"speakers": speaker_recognizer.list_speakers()}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/speakers")
async def list_speakers():
"""List enrolled speakers."""
if speaker_recognizer is None:
raise HTTPException(status_code=503, detail="Speaker recognition not available")
return {"speakers": speaker_recognizer.list_speakers()}
@app.delete("/speakers/{name}")
async def delete_speaker(name: str):
"""Remove a speaker."""
if speaker_recognizer is None:
raise HTTPException(status_code=503, detail="Speaker recognition not available")
removed = speaker_recognizer.delete_speaker(name)
if removed == 0:
raise HTTPException(status_code=404, detail=f"Speaker '{name}' not found")
return {"deleted": name, "samples_removed": removed}
# ============================================================================
# CLI
# ============================================================================
if __name__ == "__main__":
import sys
if "--learn" in sys.argv:
logging.basicConfig(level=logging.INFO)
mgr = XVF3800Manager()
mgr.assign() # discovers and assigns by bus address order
if not mgr.left or not mgr.right:
print("[HEADMIC] Need 2 XVF3800 arrays connected for --learn")
sys.exit(1)
# Light up the first array and ask the user
print()
print("[HEADMIC] Identifying mic arrays...")
print(" One array should be lighting up GREEN now.")
mgr.left.led_solid(0x00FF00)
mgr.right.led_off()
answer = input(" Is the LEFT mic array lit up? [Y/n/swap] ").strip().lower()
mgr.left.led_off()
if answer in ("n", "no", "swap"):
print("[HEADMIC] Swapping left/right assignment")
mgr.left, mgr.right = mgr.right, mgr.left
# Confirm: light up both with their side color
print("[HEADMIC] Confirming: LEFT = green, RIGHT = blue")
mgr.left.led_solid(0x00FF00)
mgr.right.led_solid(0x0000FF)
time.sleep(2)
mgr.left.led_off()
mgr.right.led_off()
# Save config
info = {}
for label, dev in [("left", mgr.left), ("right", mgr.right)]:
entry = {"usb_serial": dev.serial}
alsa = mgr.serial_to_alsa(dev.serial)
if alsa:
entry["alsa_card"] = alsa
info[label] = entry
cfg = load_config()
cfg["ears"] = info
save_config(cfg)
print(f"[HEADMIC] Saved ear config → {CONFIG_PATH}")
print(json.dumps(info, indent=2))
sys.exit(0)
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8446)