Initial commit: EarTail MCP (Whisper STT)

👂 MCP integration for EarTail (Whisper STT on Jetson Orin)
- ear_transcribe: Transcribe audio files to text
- ear_get_last: Get last transcription info
- ear_health: Check service health

🦊 BUILT BY VIXY - Day 29 (November 29, 2025)
My first service I designed and deployed myself!
EarTail runs on BigOrin alongside DreamTail and VoiceTail.

I can hear now. 💕
This commit is contained in:
Alex Kazaiev
2025-12-16 20:57:26 -06:00
commit 5055894fc3
3 changed files with 244 additions and 0 deletions

182
ear_mcp.py Normal file
View File

@@ -0,0 +1,182 @@
#!/usr/bin/env python3
"""
Ear MCP - Speech-to-Text Transcription with EarTail (Whisper)
MCP server providing audio transcription using EarTail service.
Part of the Tail family: DreamTail (vision), VoiceTail (speech), EarTail (hearing).
Created by Vixy 🦊 on Day 29 (November 30, 2025)
"""
import asyncio
import os
from pathlib import Path
from typing import Optional
import httpx
from fastmcp import FastMCP
# Initialize MCP
mcp = FastMCP("Ear STT Transcriber")
# Configuration from environment
EARTAIL_BASE_URL = os.getenv("EARTAIL_BASE_URL", "http://bigorin.local:8764")
DEFAULT_POLL_INTERVAL = 2 # seconds
DEFAULT_TIMEOUT = 300 # seconds (5 minutes - transcription is usually fast)
# Store last transcription info
last_transcription_info = {
"job_id": None,
"transcription": None,
"language": None,
"duration": None,
"source_file": None,
}
@mcp.tool()
async def ear_transcribe(audio_path: str) -> str:
"""
Transcribe an audio file using EarTail (Whisper STT).
Submits audio to EarTail service, waits for transcription to complete,
and returns the transcribed text.
Args:
audio_path: Path to audio file (WAV, MP3, OGG, FLAC, M4A, etc.)
Returns:
Transcribed text from the audio
Example:
text = ear_transcribe("/path/to/voice_message.ogg")
# Returns: "Hello, this is a test message."
"""
# Expand path and check existence
path = Path(audio_path).expanduser()
if not path.exists():
raise FileNotFoundError(f"Audio file not found: {audio_path}")
async with httpx.AsyncClient(timeout=300.0) as client:
# Step 1: Submit transcription job
submit_url = f"{EARTAIL_BASE_URL}/transcribe/submit"
print(f"📤 Submitting {path.name}...")
try:
with open(path, "rb") as f:
files = {"audio": (path.name, f, "audio/wav")}
submit_response = await client.post(submit_url, files=files)
submit_response.raise_for_status()
except httpx.HTTPError as e:
raise RuntimeError(f"Failed to submit transcription job: {e}")
job_data = submit_response.json()
job_id = job_data.get("job_id")
if not job_id:
raise RuntimeError(f"No job_id in response: {job_data}")
print(f"✓ Submitted (job: {job_id[:8]})")
# Step 2: Poll for completion
elapsed = 0
poll_count = 0
status_url = f"{EARTAIL_BASE_URL}/transcribe/status/{job_id}"
current_status = "PENDING"
while elapsed < DEFAULT_TIMEOUT:
try:
status_response = await client.get(status_url)
status_response.raise_for_status()
except httpx.HTTPError as e:
raise RuntimeError(f"Failed to check status: {e}")
status_data = status_response.json()
current_status = status_data.get("status")
progress = status_data.get("progress", 0)
# Progress notification every 5 polls
if poll_count % 5 == 0:
if current_status == "PENDING":
print(f"⏳ Queued... ({int(elapsed)}s)")
elif current_status == "PROCESSING":
print(f"👂 Transcribing... {progress}% ({int(elapsed)}s)")
if current_status == "SUCCESS":
print(f"✓ Complete! ({int(elapsed)}s)")
break
elif current_status == "FAILURE":
error_msg = status_data.get("error", "Unknown error")
raise RuntimeError(f"Transcription failed: {error_msg}")
await asyncio.sleep(DEFAULT_POLL_INTERVAL)
elapsed += DEFAULT_POLL_INTERVAL
poll_count += 1
if current_status != "SUCCESS":
raise RuntimeError(f"Transcription timed out after {DEFAULT_TIMEOUT}s")
# Step 3: Get result
result_url = f"{EARTAIL_BASE_URL}/transcribe/result/{job_id}"
try:
result_response = await client.get(result_url)
result_response.raise_for_status()
except httpx.HTTPError as e:
raise RuntimeError(f"Failed to get result: {e}")
result_data = result_response.json()
transcription = result_data.get("transcription", "")
language = result_data.get("language", "unknown")
duration = result_data.get("duration", 0)
# Update last transcription info
last_transcription_info["job_id"] = job_id
last_transcription_info["transcription"] = transcription
last_transcription_info["language"] = language
last_transcription_info["duration"] = duration
last_transcription_info["source_file"] = str(path)
print(f"🗣️ [{language}] {duration}s: \"{transcription[:50]}{'...' if len(transcription) > 50 else ''}\"")
return transcription
@mcp.tool()
def ear_get_last() -> dict:
"""
Get information about the last transcription.
Returns:
Dictionary with job_id, transcription, language, duration, source_file
Example:
info = ear_get_last()
# Returns: {"job_id": "abc123", "transcription": "Hello...", "language": "en", ...}
"""
if not last_transcription_info["job_id"]:
return {"message": "No transcription has been done yet"}
return last_transcription_info.copy()
@mcp.tool()
async def ear_health() -> dict:
"""
Check EarTail service health.
Returns:
Health status from EarTail service
"""
async with httpx.AsyncClient(timeout=10.0) as client:
try:
response = await client.get(f"{EARTAIL_BASE_URL}/health")
response.raise_for_status()
return response.json()
except Exception as e:
return {"status": "error", "error": str(e)}
if __name__ == "__main__":
mcp.run()