Probes the Edge TPU in a subprocess before loading — catches segfaults (libedgetpu ABI mismatch on Debian Trixie/Python 3.13) and falls back to CPU automatically. No more service crashes on Coral incompatibility. When the runtime is eventually fixed, Edge TPU will be used automatically with no config changes needed. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
771 lines
25 KiB
Python
771 lines
25 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
HeadMic - Vixy's Ears Service 🦊👂
|
||
|
||
Wake word detection + voice recording + EarTail transcription.
|
||
Runs on head-vixy (Raspberry Pi 5).
|
||
|
||
Wake word: "Hey Vivi" (trained via Picovoice Porcupine)
|
||
|
||
Architecture: Dual XVF3800 mic arrays (left/right ear), best-beam selection.
|
||
Single shared audio stream feeds Porcupine, VAD, sound classification, and speaker ID.
|
||
|
||
Flow:
|
||
1. Dual audio streams from two XVF3800 arrays
|
||
2. Best-beam selection (higher energy side)
|
||
3. Feed frames to Porcupine for wake word detection
|
||
4. On "Hey Vivi" → start buffering from active side
|
||
5. Use VAD to detect end of speech
|
||
6. Send buffer to EarTail for transcription
|
||
7. Return to listening mode
|
||
|
||
Hardware: 2× ReSpeaker XVF3800 4-Mic Array (USB, 2-channel firmware)
|
||
DoA + LEDs via USB vendor control (xvf3800.py)
|
||
|
||
Built by Vixy on Day 77 (January 17, 2026) 💜
|
||
Upgraded to dual XVF3800 on Day 160 (April 2026)
|
||
"""
|
||
|
||
import asyncio
|
||
import collections
|
||
import io
|
||
import json
|
||
import logging
|
||
import os
|
||
import struct
|
||
import threading
|
||
import time
|
||
import wave
|
||
from pathlib import Path
|
||
from typing import Optional, List
|
||
|
||
import numpy as np
|
||
import httpx
|
||
import pvporcupine
|
||
import webrtcvad
|
||
from fastapi import FastAPI, HTTPException, UploadFile, File, Form
|
||
from pydantic import BaseModel
|
||
|
||
# Configure logging
|
||
logging.basicConfig(level=logging.INFO)
|
||
logger = logging.getLogger("headmic")
|
||
|
||
# ============================================================================
|
||
# Configuration
|
||
# ============================================================================
|
||
|
||
PORCUPINE_ACCESS_KEY = os.environ.get("PORCUPINE_ACCESS_KEY", "")
|
||
WAKE_WORD_PATH = os.environ.get("WAKE_WORD_PATH", "/home/alex/headmic/Hey-Vivi_en_raspberry-pi_v4_0_0.ppn")
|
||
|
||
SAMPLE_RATE = 16000
|
||
CONFIG_DIR = os.path.expanduser("~/.vixy")
|
||
CONFIG_PATH = os.path.join(CONFIG_DIR, "headmic.json")
|
||
|
||
VAD_AGGRESSIVENESS = 2 # 0-3, higher = more aggressive
|
||
SILENCE_FRAMES = 50 # ~1.5 sec of silence to stop (at 30ms frames)
|
||
MAX_RECORDING_FRAMES = 1000 # ~30 sec max
|
||
|
||
EARTAIL_URL = os.environ.get("EARTAIL_URL", "http://bigorin.local:8764")
|
||
|
||
DOA_POLL_HZ = 10 # DoA polling rate
|
||
EYE_SERVICE_URL = os.environ.get("EYE_SERVICE_URL", "http://localhost:8780")
|
||
|
||
# ============================================================================
|
||
# Config persistence
|
||
# ============================================================================
|
||
|
||
def load_config() -> dict:
|
||
if not os.path.exists(CONFIG_PATH):
|
||
return {}
|
||
try:
|
||
with open(CONFIG_PATH) as f:
|
||
return json.load(f)
|
||
except Exception as e:
|
||
logger.warning("Failed to read config: %s", e)
|
||
return {}
|
||
|
||
|
||
def save_config(cfg: dict):
|
||
os.makedirs(CONFIG_DIR, exist_ok=True)
|
||
with open(CONFIG_PATH, "w") as f:
|
||
json.dump(cfg, f, indent=2)
|
||
|
||
|
||
# ============================================================================
|
||
# XVF3800 + LED Control
|
||
# ============================================================================
|
||
|
||
from xvf3800 import XVF3800Manager, learn_devices
|
||
|
||
xvf_manager = XVF3800Manager()
|
||
|
||
LEDS_AVAILABLE = False
|
||
|
||
|
||
def leds_wakeup():
|
||
if LEDS_AVAILABLE:
|
||
try:
|
||
xvf_manager.all_leds_solid(0xFFFFFF)
|
||
except: pass
|
||
|
||
|
||
def leds_listening():
|
||
if LEDS_AVAILABLE:
|
||
try:
|
||
xvf_manager.all_leds_doa()
|
||
except: pass
|
||
|
||
|
||
def leds_processing():
|
||
if LEDS_AVAILABLE:
|
||
try:
|
||
xvf_manager.all_leds_breath(0x9400D3)
|
||
except: pass
|
||
|
||
|
||
def leds_enrolling():
|
||
if LEDS_AVAILABLE:
|
||
try:
|
||
xvf_manager.all_leds_solid(0xFF8C00)
|
||
except: pass
|
||
|
||
|
||
def leds_off():
|
||
if LEDS_AVAILABLE:
|
||
try:
|
||
xvf_manager.all_leds_off()
|
||
except: pass
|
||
|
||
|
||
# ============================================================================
|
||
# State
|
||
# ============================================================================
|
||
|
||
class ServiceState:
|
||
def __init__(self):
|
||
self.running = False
|
||
self.listening = False
|
||
self.recording = False
|
||
self.processing = False
|
||
self.last_transcription: Optional[str] = None
|
||
self.last_wake_time: Optional[float] = None
|
||
self.wake_count = 0
|
||
self.error: Optional[str] = None
|
||
self.audio_scene: Optional[dict] = None
|
||
self.sound_classification_enabled: bool = False
|
||
self.recognized_speaker: Optional[str] = None
|
||
self.speaker_confidence: float = 0.0
|
||
self.speaker_recognition_enabled: bool = False
|
||
self.enrolling: bool = False
|
||
self.active_side: str = "left" # which mic array is currently active
|
||
self.doa: dict = {} # latest DoA from both arrays
|
||
|
||
state = ServiceState()
|
||
|
||
# Sound classifier globals
|
||
sound_classifier = None
|
||
sound_ring_buffer = None # collections.deque, filled by listener_loop
|
||
|
||
# Speaker recognizer globals
|
||
speaker_recognizer = None
|
||
enrollment_buffer = None # list of frame bytes, set during enrollment
|
||
enrollment_name = None
|
||
|
||
# Audio stream
|
||
dual_stream = None # DualAudioStream instance
|
||
|
||
|
||
# ============================================================================
|
||
# EarTail Transcription
|
||
# ============================================================================
|
||
|
||
async def transcribe_audio(audio_data: bytes) -> str:
|
||
"""Send audio to EarTail and get transcription."""
|
||
async with httpx.AsyncClient(timeout=120.0) as client:
|
||
files = {"audio": ("recording.wav", audio_data, "audio/wav")}
|
||
response = await client.post(f"{EARTAIL_URL}/transcribe/submit", files=files)
|
||
response.raise_for_status()
|
||
|
||
job_id = response.json().get("job_id")
|
||
logger.info(f"Transcription job: {job_id}")
|
||
|
||
for _ in range(120):
|
||
status = await client.get(f"{EARTAIL_URL}/transcribe/status/{job_id}")
|
||
data = status.json()
|
||
|
||
if data.get("status") == "SUCCESS":
|
||
result = await client.get(f"{EARTAIL_URL}/transcribe/result/{job_id}")
|
||
return result.json().get("transcription", "")
|
||
elif data.get("status") == "FAILURE":
|
||
raise Exception(f"Transcription failed: {data.get('error')}")
|
||
|
||
await asyncio.sleep(1)
|
||
|
||
raise Exception("Transcription timeout")
|
||
|
||
|
||
def transcribe_sync(audio_data: bytes) -> str:
|
||
"""Synchronous wrapper for transcription."""
|
||
loop = asyncio.new_event_loop()
|
||
try:
|
||
return loop.run_until_complete(transcribe_audio(audio_data))
|
||
finally:
|
||
loop.close()
|
||
|
||
|
||
# ============================================================================
|
||
# Main Listener Loop (dual-stream)
|
||
# ============================================================================
|
||
|
||
def audio_to_wav(frames: List[bytes]) -> bytes:
|
||
"""Convert raw audio frames to WAV format."""
|
||
wav_buffer = io.BytesIO()
|
||
with wave.open(wav_buffer, 'wb') as wf:
|
||
wf.setnchannels(1)
|
||
wf.setsampwidth(2)
|
||
wf.setframerate(SAMPLE_RATE)
|
||
wf.writeframes(b''.join(frames))
|
||
wav_buffer.seek(0)
|
||
return wav_buffer.read()
|
||
|
||
|
||
def listener_loop():
|
||
"""Main audio processing loop with dual-stream best-beam selection."""
|
||
global state, dual_stream
|
||
|
||
logger.info("Initializing Porcupine...")
|
||
porcupine = None
|
||
try:
|
||
porcupine = pvporcupine.create(
|
||
access_key=PORCUPINE_ACCESS_KEY,
|
||
keyword_paths=[WAKE_WORD_PATH]
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Failed to init Porcupine: {e}")
|
||
logger.warning("Wake word detection disabled — audio loop continues for classification")
|
||
|
||
vad = webrtcvad.Vad(VAD_AGGRESSIVENESS)
|
||
|
||
# VAD needs 10/20/30ms frames. 30ms at 16kHz = 480 samples
|
||
vad_frame_bytes = 480 * 2
|
||
|
||
state.listening = True
|
||
logger.info("🦊 Wake word listener active - say 'Hey Vivi'!")
|
||
|
||
recording_buffer: List[bytes] = []
|
||
silence_count = 0
|
||
is_recording = False
|
||
recording_side: str = "left"
|
||
|
||
try:
|
||
for frame_data, side in dual_stream.frames():
|
||
if not state.running:
|
||
break
|
||
|
||
state.active_side = side
|
||
|
||
# Feed sound classifier ring buffer
|
||
if sound_ring_buffer is not None:
|
||
sound_ring_buffer.append(frame_data)
|
||
|
||
# Feed enrollment buffer if active
|
||
if enrollment_buffer is not None:
|
||
enrollment_buffer.append(frame_data)
|
||
|
||
# Check for wake word (skip if Porcupine not available)
|
||
keyword_index = -1
|
||
if porcupine:
|
||
pcm = struct.unpack_from("h" * 512, frame_data)
|
||
keyword_index = porcupine.process(pcm)
|
||
|
||
if keyword_index >= 0 and not is_recording:
|
||
logger.info("🦊 Wake word detected: 'Hey Vivi'! (from %s ear)", side)
|
||
state.wake_count += 1
|
||
state.last_wake_time = time.time()
|
||
recording_side = side
|
||
|
||
leds_wakeup()
|
||
time.sleep(0.2)
|
||
leds_listening()
|
||
|
||
is_recording = True
|
||
state.recording = True
|
||
recording_buffer = []
|
||
silence_count = 0
|
||
logger.info("Recording started (using %s ear)...", recording_side)
|
||
continue
|
||
|
||
if is_recording:
|
||
# During recording, use frames from the side that heard the wake word
|
||
rec_frame = dual_stream.get_side_frame(recording_side)
|
||
if rec_frame:
|
||
recording_buffer.append(rec_frame)
|
||
|
||
# Check VAD (use first 480 samples of the 512 frame)
|
||
vad_data = (rec_frame or frame_data)[:vad_frame_bytes]
|
||
try:
|
||
is_speech = vad.is_speech(vad_data, SAMPLE_RATE)
|
||
except:
|
||
is_speech = True
|
||
|
||
if is_speech:
|
||
silence_count = 0
|
||
else:
|
||
silence_count += 1
|
||
|
||
# Stop conditions
|
||
should_stop = (
|
||
(len(recording_buffer) > 10 and silence_count >= SILENCE_FRAMES) or
|
||
len(recording_buffer) >= MAX_RECORDING_FRAMES
|
||
)
|
||
|
||
if should_stop:
|
||
logger.info(f"Recording stopped: {len(recording_buffer)} frames")
|
||
is_recording = False
|
||
state.recording = False
|
||
|
||
leds_processing()
|
||
state.processing = True
|
||
|
||
try:
|
||
wav_data = audio_to_wav(recording_buffer)
|
||
transcription = transcribe_sync(wav_data)
|
||
state.last_transcription = transcription
|
||
logger.info(f"Transcription: {transcription}")
|
||
except Exception as e:
|
||
logger.error(f"Transcription error: {e}")
|
||
state.error = str(e)
|
||
finally:
|
||
state.processing = False
|
||
leds_off()
|
||
|
||
recording_buffer = []
|
||
|
||
except Exception as e:
|
||
logger.error(f"Listener error: {e}")
|
||
state.error = str(e)
|
||
finally:
|
||
if porcupine:
|
||
porcupine.delete()
|
||
state.listening = False
|
||
leds_off()
|
||
logger.info("Listener stopped")
|
||
|
||
|
||
# ============================================================================
|
||
# Sound Classification Thread
|
||
# ============================================================================
|
||
|
||
def sound_classifier_loop():
|
||
"""Background thread for continuous sound classification."""
|
||
global state
|
||
logger.info("Sound classifier thread started")
|
||
while state.running:
|
||
if sound_ring_buffer is None or len(sound_ring_buffer) < 30:
|
||
time.sleep(0.1)
|
||
continue
|
||
|
||
try:
|
||
frames = list(sound_ring_buffer)
|
||
audio = np.frombuffer(b"".join(frames), dtype=np.int16)
|
||
result = sound_classifier.classify(audio)
|
||
|
||
# Strip audio_float32 before storing in state (not JSON-serializable)
|
||
audio_f32 = result.pop("audio_float32", None)
|
||
state.audio_scene = result
|
||
|
||
# Speaker identification: run when speech detected
|
||
if speaker_recognizer and result["category"] == "speech" and audio_f32 is not None:
|
||
try:
|
||
name, confidence = speaker_recognizer.identify(audio_f32)
|
||
state.recognized_speaker = name
|
||
state.speaker_confidence = confidence
|
||
except Exception as e:
|
||
logger.warning("Speaker identification error: %s", e)
|
||
except Exception as e:
|
||
logger.warning("Sound classification error: %s", e)
|
||
|
||
time.sleep(0.5)
|
||
logger.info("Sound classifier thread stopped")
|
||
|
||
|
||
# ============================================================================
|
||
# DoA Polling Thread
|
||
# ============================================================================
|
||
|
||
def doa_poll_loop():
|
||
"""Poll Direction of Arrival from both XVF3800 arrays."""
|
||
interval = 1.0 / DOA_POLL_HZ
|
||
while state.running:
|
||
try:
|
||
state.doa = xvf_manager.read_both_doa()
|
||
except Exception as e:
|
||
logger.debug("DoA poll error: %s", e)
|
||
time.sleep(interval)
|
||
|
||
|
||
def doa_to_gaze() -> Optional[tuple[int, int]]:
|
||
"""Convert the active side's DoA angle to gaze coordinates for the eye service."""
|
||
doa = state.doa
|
||
side = state.active_side
|
||
if not doa or side not in doa or doa[side] is None:
|
||
return None
|
||
if not doa[side].get("vad"):
|
||
return None
|
||
import math
|
||
angle = doa[side]["angle"]
|
||
rad = math.radians(angle)
|
||
x = int(127 - 80 * math.sin(rad))
|
||
y = int(127 - 40 * math.cos(rad))
|
||
return max(0, min(255, x)), max(0, min(255, y))
|
||
|
||
|
||
# ============================================================================
|
||
# FastAPI
|
||
# ============================================================================
|
||
|
||
app = FastAPI(title="HeadMic", description="Vixy's Ears 🦊👂 (Dual XVF3800)")
|
||
|
||
|
||
@app.on_event("startup")
|
||
async def startup():
|
||
global sound_classifier, sound_ring_buffer, speaker_recognizer, dual_stream, LEDS_AVAILABLE
|
||
|
||
state.running = True
|
||
|
||
# --- XVF3800 setup ---
|
||
cfg = load_config()
|
||
ears_cfg = cfg.get("ears", {})
|
||
if ears_cfg.get("left") and ears_cfg.get("right"):
|
||
xvf_manager.set_serial_mapping(
|
||
ears_cfg["left"]["usb_serial"],
|
||
ears_cfg["right"]["usb_serial"]
|
||
)
|
||
xvf_manager.assign()
|
||
LEDS_AVAILABLE = bool(xvf_manager.left or xvf_manager.right)
|
||
|
||
# Resolve ALSA devices
|
||
alsa = xvf_manager.get_alsa_devices()
|
||
left_dev = alsa.get("left")
|
||
right_dev = alsa.get("right")
|
||
|
||
if not left_dev:
|
||
logger.error("No left ear ALSA device found! Check USB connections and firmware.")
|
||
state.error = "No left ear audio device"
|
||
else:
|
||
logger.info("Left ear ALSA: %s", left_dev)
|
||
if right_dev:
|
||
logger.info("Right ear ALSA: %s", right_dev)
|
||
else:
|
||
logger.warning("Right ear ALSA device not found — running with left ear only")
|
||
|
||
# --- Dual audio stream ---
|
||
from audio_stream import DualAudioStream
|
||
dual_stream = DualAudioStream(left_dev or "plughw:0,0", right_dev)
|
||
dual_stream.start()
|
||
|
||
# --- Sound classifier (optional) ---
|
||
model_dir = Path(__file__).parent / "models"
|
||
edgetpu_model_path = model_dir / "yamnet_edgetpu.tflite"
|
||
model_path = model_dir / "yamnet.tflite"
|
||
class_map_path = model_dir / "yamnet_class_map.csv"
|
||
# Auto-detect Edge TPU — probe in subprocess catches segfaults safely
|
||
use_edgetpu = edgetpu_model_path.exists()
|
||
active_model = edgetpu_model_path if use_edgetpu else model_path
|
||
if active_model.exists() and class_map_path.exists():
|
||
try:
|
||
from sound_id import SoundClassifier
|
||
sound_classifier = SoundClassifier(str(active_model), str(class_map_path), use_edgetpu=use_edgetpu)
|
||
sound_ring_buffer = collections.deque(maxlen=31)
|
||
state.sound_classification_enabled = True
|
||
logger.info("Sound classification enabled (YAMNet %s)", "Edge TPU" if use_edgetpu else "CPU")
|
||
|
||
sc_thread = threading.Thread(target=sound_classifier_loop, daemon=True)
|
||
sc_thread.start()
|
||
except Exception as e:
|
||
logger.warning("Sound classification unavailable: %s", e)
|
||
else:
|
||
logger.info("Sound classification models not found, skipping")
|
||
|
||
# --- Speaker recognizer (optional) ---
|
||
try:
|
||
from speaker_id import SpeakerRecognizer
|
||
db_path = Path(__file__).parent / "voices.db"
|
||
speaker_recognizer = SpeakerRecognizer(db_path=str(db_path))
|
||
state.speaker_recognition_enabled = True
|
||
logger.info("Speaker recognition enabled (Resemblyzer)")
|
||
except Exception as e:
|
||
logger.warning("Speaker recognition unavailable: %s", e)
|
||
|
||
# --- DoA polling ---
|
||
if xvf_manager.left or xvf_manager.right:
|
||
threading.Thread(target=doa_poll_loop, daemon=True).start()
|
||
logger.info("DoA polling started at %d Hz", DOA_POLL_HZ)
|
||
|
||
# --- Main listener ---
|
||
thread = threading.Thread(target=listener_loop, daemon=True)
|
||
thread.start()
|
||
logger.info("HeadMic started (dual XVF3800)")
|
||
|
||
|
||
@app.on_event("shutdown")
|
||
async def shutdown():
|
||
state.running = False
|
||
leds_off()
|
||
if dual_stream:
|
||
dual_stream.stop()
|
||
|
||
|
||
# --- Info endpoints ---
|
||
|
||
@app.get("/")
|
||
async def root():
|
||
return {
|
||
"service": "HeadMic",
|
||
"description": "Vixy's Ears 🦊👂 (Dual XVF3800)",
|
||
"wake_word": "Hey Vivi"
|
||
}
|
||
|
||
|
||
@app.get("/health")
|
||
async def health():
|
||
return {
|
||
"healthy": state.listening and not state.error,
|
||
"listening": state.listening,
|
||
"recording": state.recording,
|
||
"processing": state.processing,
|
||
"wake_count": state.wake_count,
|
||
"sound_classification_enabled": state.sound_classification_enabled,
|
||
"speaker_recognition_enabled": state.speaker_recognition_enabled,
|
||
"active_side": state.active_side,
|
||
"error": state.error
|
||
}
|
||
|
||
|
||
@app.get("/status")
|
||
async def status():
|
||
return {
|
||
"listening": state.listening,
|
||
"recording": state.recording,
|
||
"processing": state.processing,
|
||
"last_transcription": state.last_transcription,
|
||
"last_wake_time": state.last_wake_time,
|
||
"wake_count": state.wake_count,
|
||
"audio_scene": state.audio_scene["dominant_category"] if state.audio_scene else None,
|
||
"recognized_speaker": state.recognized_speaker,
|
||
"active_side": state.active_side,
|
||
"error": state.error
|
||
}
|
||
|
||
|
||
@app.get("/last")
|
||
async def last():
|
||
return {
|
||
"transcription": state.last_transcription,
|
||
"wake_time": state.last_wake_time
|
||
}
|
||
|
||
|
||
# --- DoA endpoints ---
|
||
|
||
@app.get("/doa")
|
||
async def doa():
|
||
"""Direction of Arrival from both mic arrays."""
|
||
return {
|
||
"doa": state.doa,
|
||
"active_side": state.active_side,
|
||
"gaze": doa_to_gaze(),
|
||
}
|
||
|
||
|
||
# --- Device info ---
|
||
|
||
@app.get("/devices")
|
||
async def devices():
|
||
"""Status of both XVF3800 arrays."""
|
||
alsa = xvf_manager.get_alsa_devices()
|
||
return {
|
||
"left": {
|
||
"connected": bool(xvf_manager.left),
|
||
"serial": xvf_manager.left.serial if xvf_manager.left else None,
|
||
"alsa": alsa.get("left"),
|
||
},
|
||
"right": {
|
||
"connected": bool(xvf_manager.right),
|
||
"serial": xvf_manager.right.serial if xvf_manager.right else None,
|
||
"alsa": alsa.get("right"),
|
||
},
|
||
"active_side": state.active_side,
|
||
}
|
||
|
||
|
||
# --- Sound endpoints ---
|
||
|
||
@app.get("/sounds")
|
||
async def sounds():
|
||
"""Current audio scene classification."""
|
||
if not state.sound_classification_enabled:
|
||
raise HTTPException(status_code=503, detail="Sound classification not available")
|
||
if state.audio_scene is None:
|
||
return {"category": None, "top_classes": [], "dominant_category": None, "timestamp": None,
|
||
"recognized_speaker": None, "speaker_confidence": 0.0}
|
||
return {
|
||
**state.audio_scene,
|
||
"recognized_speaker": state.recognized_speaker,
|
||
"speaker_confidence": state.speaker_confidence,
|
||
}
|
||
|
||
|
||
@app.get("/sounds/history")
|
||
async def sounds_history(seconds: int = 30):
|
||
"""Recent sound classification history."""
|
||
if not state.sound_classification_enabled:
|
||
raise HTTPException(status_code=503, detail="Sound classification not available")
|
||
if sound_classifier is None:
|
||
return {"history": []}
|
||
return {"history": sound_classifier.get_history(seconds)}
|
||
|
||
|
||
# --- Speaker endpoints ---
|
||
|
||
@app.post("/speakers/enroll")
|
||
async def enroll_speaker(name: str = Form(...), audio: UploadFile = File(...)):
|
||
"""Enroll a speaker from uploaded audio file."""
|
||
if speaker_recognizer is None:
|
||
raise HTTPException(status_code=503, detail="Speaker recognition not available")
|
||
|
||
audio_bytes = await audio.read()
|
||
try:
|
||
import wave as _wave
|
||
wav_io = io.BytesIO(audio_bytes)
|
||
with _wave.open(wav_io, 'rb') as wf:
|
||
raw = wf.readframes(wf.getnframes())
|
||
audio_f32 = np.frombuffer(raw, dtype=np.int16).astype(np.float32) / 32768.0
|
||
except Exception:
|
||
audio_f32 = np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32) / 32768.0
|
||
|
||
try:
|
||
speaker_recognizer.enroll(name, audio_f32, source="upload")
|
||
return {"enrolled": name, "speakers": speaker_recognizer.list_speakers()}
|
||
except ValueError as e:
|
||
raise HTTPException(status_code=400, detail=str(e))
|
||
|
||
|
||
@app.post("/speakers/enroll-from-mic")
|
||
async def enroll_from_mic(name: str):
|
||
"""Record from live mic for 5 seconds and enroll speaker."""
|
||
global enrollment_buffer, enrollment_name
|
||
|
||
if speaker_recognizer is None:
|
||
raise HTTPException(status_code=503, detail="Speaker recognition not available")
|
||
if state.enrolling:
|
||
raise HTTPException(status_code=409, detail="Enrollment already in progress")
|
||
|
||
state.enrolling = True
|
||
enrollment_buffer = []
|
||
enrollment_name = name
|
||
|
||
leds_enrolling()
|
||
logger.info("Enrollment started for '%s' — recording 5 seconds", name)
|
||
|
||
await asyncio.sleep(5.0)
|
||
|
||
frames = enrollment_buffer
|
||
enrollment_buffer = None
|
||
enrollment_name = None
|
||
state.enrolling = False
|
||
leds_off()
|
||
|
||
if not frames:
|
||
raise HTTPException(status_code=500, detail="No audio captured")
|
||
|
||
audio_int16 = np.frombuffer(b"".join(frames), dtype=np.int16)
|
||
audio_f32 = audio_int16.astype(np.float32) / 32768.0
|
||
logger.info("Enrollment audio: %.1f seconds", len(audio_f32) / SAMPLE_RATE)
|
||
|
||
try:
|
||
speaker_recognizer.enroll(name, audio_f32, source="mic")
|
||
return {"enrolled": name, "seconds": round(len(audio_f32) / SAMPLE_RATE, 1),
|
||
"speakers": speaker_recognizer.list_speakers()}
|
||
except ValueError as e:
|
||
raise HTTPException(status_code=400, detail=str(e))
|
||
|
||
|
||
@app.get("/speakers")
|
||
async def list_speakers():
|
||
"""List enrolled speakers."""
|
||
if speaker_recognizer is None:
|
||
raise HTTPException(status_code=503, detail="Speaker recognition not available")
|
||
return {"speakers": speaker_recognizer.list_speakers()}
|
||
|
||
|
||
@app.delete("/speakers/{name}")
|
||
async def delete_speaker(name: str):
|
||
"""Remove a speaker."""
|
||
if speaker_recognizer is None:
|
||
raise HTTPException(status_code=503, detail="Speaker recognition not available")
|
||
removed = speaker_recognizer.delete_speaker(name)
|
||
if removed == 0:
|
||
raise HTTPException(status_code=404, detail=f"Speaker '{name}' not found")
|
||
return {"deleted": name, "samples_removed": removed}
|
||
|
||
|
||
# ============================================================================
|
||
# CLI
|
||
# ============================================================================
|
||
|
||
if __name__ == "__main__":
|
||
import sys
|
||
|
||
if "--learn" in sys.argv:
|
||
logging.basicConfig(level=logging.INFO)
|
||
|
||
mgr = XVF3800Manager()
|
||
mgr.assign() # discovers and assigns by bus address order
|
||
|
||
if not mgr.left or not mgr.right:
|
||
print("[HEADMIC] Need 2 XVF3800 arrays connected for --learn")
|
||
sys.exit(1)
|
||
|
||
# Light up the first array and ask the user
|
||
print()
|
||
print("[HEADMIC] Identifying mic arrays...")
|
||
print(" One array should be lighting up GREEN now.")
|
||
mgr.left.led_solid(0x00FF00)
|
||
mgr.right.led_off()
|
||
|
||
answer = input(" Is the LEFT mic array lit up? [Y/n/swap] ").strip().lower()
|
||
|
||
mgr.left.led_off()
|
||
|
||
if answer in ("n", "no", "swap"):
|
||
print("[HEADMIC] Swapping left/right assignment")
|
||
mgr.left, mgr.right = mgr.right, mgr.left
|
||
|
||
# Confirm: light up both with their side color
|
||
print("[HEADMIC] Confirming: LEFT = green, RIGHT = blue")
|
||
mgr.left.led_solid(0x00FF00)
|
||
mgr.right.led_solid(0x0000FF)
|
||
time.sleep(2)
|
||
mgr.left.led_off()
|
||
mgr.right.led_off()
|
||
|
||
# Save config
|
||
info = {}
|
||
for label, dev in [("left", mgr.left), ("right", mgr.right)]:
|
||
entry = {"usb_serial": dev.serial}
|
||
alsa = mgr.serial_to_alsa(dev.serial)
|
||
if alsa:
|
||
entry["alsa_card"] = alsa
|
||
info[label] = entry
|
||
|
||
cfg = load_config()
|
||
cfg["ears"] = info
|
||
save_config(cfg)
|
||
print(f"[HEADMIC] Saved ear config → {CONFIG_PATH}")
|
||
print(json.dumps(info, indent=2))
|
||
sys.exit(0)
|
||
|
||
import uvicorn
|
||
uvicorn.run(app, host="0.0.0.0", port=8446)
|