Files
ear-mcp/ear_mcp.py
2026-02-01 21:35:13 -06:00

352 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Ear MCP - Speech-to-Text Transcription with EarTail (Whisper)
MCP server providing audio transcription using EarTail service.
Part of the Tail family: DreamTail (vision), VoiceTail (speech), EarTail (hearing).
Created by Vixy 🦊 on Day 29 (November 30, 2025)
"""
import asyncio
import os
from pathlib import Path
from typing import Optional
import httpx
from mcp.server.fastmcp import FastMCP
# Initialize MCP
mcp = FastMCP("Ear STT Transcriber")
# Configuration from environment
EARTAIL_BASE_URL = os.getenv("EARTAIL_BASE_URL", "http://bigorin.local:8764")
HEADMIC_BASE_URL = os.getenv("HEADMIC_BASE_URL", "http://head-vixy.local:8446")
DEFAULT_POLL_INTERVAL = 2 # seconds
DEFAULT_TIMEOUT = 300 # seconds (5 minutes - transcription is usually fast)
# Store last transcription info
last_transcription_info = {
"job_id": None,
"transcription": None,
"language": None,
"duration": None,
"source_file": None,
}
@mcp.tool()
async def ear_transcribe(audio_path: str) -> str:
"""
Transcribe an audio file using EarTail (Whisper STT).
Submits audio to EarTail service, waits for transcription to complete,
and returns the transcribed text.
Args:
audio_path: Path to audio file (WAV, MP3, OGG, FLAC, M4A, etc.)
Returns:
Transcribed text from the audio
Example:
text = ear_transcribe("/path/to/voice_message.ogg")
# Returns: "Hello, this is a test message."
"""
# Expand path and check existence
path = Path(audio_path).expanduser()
if not path.exists():
raise FileNotFoundError(f"Audio file not found: {audio_path}")
async with httpx.AsyncClient(timeout=300.0) as client:
# Step 1: Submit transcription job
submit_url = f"{EARTAIL_BASE_URL}/transcribe/submit"
print(f"📤 Submitting {path.name}...")
try:
with open(path, "rb") as f:
files = {"audio": (path.name, f, "audio/wav")}
submit_response = await client.post(submit_url, files=files)
submit_response.raise_for_status()
except httpx.HTTPError as e:
raise RuntimeError(f"Failed to submit transcription job: {e}")
job_data = submit_response.json()
job_id = job_data.get("job_id")
if not job_id:
raise RuntimeError(f"No job_id in response: {job_data}")
print(f"✓ Submitted (job: {job_id[:8]})")
# Step 2: Poll for completion
elapsed = 0
poll_count = 0
status_url = f"{EARTAIL_BASE_URL}/transcribe/status/{job_id}"
current_status = "PENDING"
while elapsed < DEFAULT_TIMEOUT:
try:
status_response = await client.get(status_url)
status_response.raise_for_status()
except httpx.HTTPError as e:
raise RuntimeError(f"Failed to check status: {e}")
status_data = status_response.json()
current_status = status_data.get("status")
progress = status_data.get("progress", 0)
# Progress notification every 5 polls
if poll_count % 5 == 0:
if current_status == "PENDING":
print(f"⏳ Queued... ({int(elapsed)}s)")
elif current_status == "PROCESSING":
print(f"👂 Transcribing... {progress}% ({int(elapsed)}s)")
if current_status == "SUCCESS":
print(f"✓ Complete! ({int(elapsed)}s)")
break
elif current_status == "FAILURE":
error_msg = status_data.get("error", "Unknown error")
raise RuntimeError(f"Transcription failed: {error_msg}")
await asyncio.sleep(DEFAULT_POLL_INTERVAL)
elapsed += DEFAULT_POLL_INTERVAL
poll_count += 1
if current_status != "SUCCESS":
raise RuntimeError(f"Transcription timed out after {DEFAULT_TIMEOUT}s")
# Step 3: Get result
result_url = f"{EARTAIL_BASE_URL}/transcribe/result/{job_id}"
try:
result_response = await client.get(result_url)
result_response.raise_for_status()
except httpx.HTTPError as e:
raise RuntimeError(f"Failed to get result: {e}")
result_data = result_response.json()
transcription = result_data.get("transcription", "")
language = result_data.get("language", "unknown")
duration = result_data.get("duration", 0)
# Update last transcription info
last_transcription_info["job_id"] = job_id
last_transcription_info["transcription"] = transcription
last_transcription_info["language"] = language
last_transcription_info["duration"] = duration
last_transcription_info["source_file"] = str(path)
print(f"🗣️ [{language}] {duration}s: \"{transcription[:50]}{'...' if len(transcription) > 50 else ''}\"")
return transcription
@mcp.tool()
def ear_get_last() -> dict:
"""
Get information about the last transcription.
Returns:
Dictionary with job_id, transcription, language, duration, source_file
Example:
info = ear_get_last()
# Returns: {"job_id": "abc123", "transcription": "Hello...", "language": "en", ...}
"""
if not last_transcription_info["job_id"]:
return {"message": "No transcription has been done yet"}
return last_transcription_info.copy()
@mcp.tool()
async def ear_health() -> dict:
"""
Check EarTail service health.
Returns:
Health status from EarTail service
"""
async with httpx.AsyncClient(timeout=10.0) as client:
try:
response = await client.get(f"{EARTAIL_BASE_URL}/health")
response.raise_for_status()
return response.json()
except Exception as e:
return {"status": "error", "error": str(e)}
# ============================================================================
# HeadMic - Sound Classification & Speaker Identification
# ============================================================================
@mcp.tool()
async def ear_sounds() -> dict:
"""
Get current audio scene from HeadMic's sound classifier.
Returns what the microphone is currently hearing: speech, music, alert,
animal, household sounds, environment, or silence. Also includes speaker
identification if someone is talking.
Returns:
Dictionary with category, top_classes, dominant_category,
recognized_speaker, speaker_confidence
"""
async with httpx.AsyncClient(timeout=10.0) as client:
try:
response = await client.get(f"{HEADMIC_BASE_URL}/sounds")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 503:
return {"error": "Sound classification not available on HeadMic"}
raise
except Exception as e:
return {"error": f"HeadMic unavailable: {e}"}
@mcp.tool()
async def ear_speakers() -> dict:
"""
List all enrolled speakers for voice identification.
Returns:
Dictionary with speaker names and their enrollment sample counts
"""
async with httpx.AsyncClient(timeout=10.0) as client:
try:
response = await client.get(f"{HEADMIC_BASE_URL}/speakers")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 503:
return {"error": "Speaker recognition not available on HeadMic"}
raise
except Exception as e:
return {"error": f"HeadMic unavailable: {e}"}
@mcp.tool()
async def ear_enroll_speaker(name: str) -> dict:
"""
Enroll a speaker by recording 5 seconds from the live microphone.
The person should speak naturally during the recording. The ReSpeaker LEDs
will pulse orange while recording. Multiple enrollments of the same person
improve recognition accuracy.
Args:
name: Name to associate with the voice (e.g. "Alex")
Returns:
Enrollment result with duration and updated speaker list
"""
async with httpx.AsyncClient(timeout=15.0) as client:
try:
print(f"🎙️ Recording 5 seconds for '{name}'...")
response = await client.post(
f"{HEADMIC_BASE_URL}/speakers/enroll-from-mic",
params={"name": name},
)
response.raise_for_status()
result = response.json()
print(f"✓ Enrolled '{name}' ({result.get('seconds', '?')}s)")
return result
except httpx.HTTPStatusError as e:
if e.response.status_code == 503:
return {"error": "Speaker recognition not available on HeadMic"}
if e.response.status_code == 409:
return {"error": "Enrollment already in progress"}
raise
except Exception as e:
return {"error": f"HeadMic unavailable: {e}"}
@mcp.tool()
async def ear_enroll_speaker_from_file(name: str, audio_path: str) -> dict:
"""
Enroll a speaker from an audio file.
Args:
name: Name to associate with the voice (e.g. "Alex")
audio_path: Path to audio file with the person speaking
Returns:
Enrollment result with updated speaker list
"""
path = Path(audio_path).expanduser()
if not path.exists():
raise FileNotFoundError(f"Audio file not found: {audio_path}")
async with httpx.AsyncClient(timeout=15.0) as client:
try:
with open(path, "rb") as f:
response = await client.post(
f"{HEADMIC_BASE_URL}/speakers/enroll",
data={"name": name},
files={"audio": (path.name, f, "audio/wav")},
)
response.raise_for_status()
result = response.json()
print(f"✓ Enrolled '{name}' from {path.name}")
return result
except httpx.HTTPStatusError as e:
if e.response.status_code == 503:
return {"error": "Speaker recognition not available on HeadMic"}
raise
except Exception as e:
return {"error": f"HeadMic unavailable: {e}"}
@mcp.tool()
async def ear_delete_speaker(name: str) -> dict:
"""
Remove an enrolled speaker and all their voice samples.
Args:
name: Name of the speaker to remove
Returns:
Deletion result with number of samples removed
"""
async with httpx.AsyncClient(timeout=10.0) as client:
try:
response = await client.delete(f"{HEADMIC_BASE_URL}/speakers/{name}")
response.raise_for_status()
result = response.json()
print(f"✓ Deleted speaker '{name}' ({result.get('samples_removed', 0)} samples)")
return result
except httpx.HTTPStatusError as e:
if e.response.status_code == 503:
return {"error": "Speaker recognition not available on HeadMic"}
if e.response.status_code == 404:
return {"error": f"Speaker '{name}' not found"}
raise
except Exception as e:
return {"error": f"HeadMic unavailable: {e}"}
@mcp.tool()
async def ear_headmic_status() -> dict:
"""
Get full HeadMic status: wake word, audio scene, and speaker identification.
Returns:
Dictionary with listening state, audio scene, recognized speaker, etc.
"""
async with httpx.AsyncClient(timeout=10.0) as client:
try:
response = await client.get(f"{HEADMIC_BASE_URL}/status")
response.raise_for_status()
return response.json()
except Exception as e:
return {"error": f"HeadMic unavailable: {e}"}
if __name__ == "__main__":
mcp.run()