#!/usr/bin/env python3 """ Ear MCP - Speech-to-Text Transcription with EarTail (Whisper) MCP server providing audio transcription using EarTail service. Part of the Tail family: DreamTail (vision), VoiceTail (speech), EarTail (hearing). Created by Vixy 🦊 on Day 29 (November 30, 2025) """ import asyncio import os from pathlib import Path from typing import Optional import httpx from mcp.server.fastmcp import FastMCP # Initialize MCP mcp = FastMCP("Ear STT Transcriber") # Configuration from environment EARTAIL_BASE_URL = os.getenv("EARTAIL_BASE_URL", "http://bigorin.local:8764") DEFAULT_POLL_INTERVAL = 2 # seconds DEFAULT_TIMEOUT = 300 # seconds (5 minutes - transcription is usually fast) # Store last transcription info last_transcription_info = { "job_id": None, "transcription": None, "language": None, "duration": None, "source_file": None, } @mcp.tool() async def ear_transcribe(audio_path: str) -> str: """ Transcribe an audio file using EarTail (Whisper STT). Submits audio to EarTail service, waits for transcription to complete, and returns the transcribed text. Args: audio_path: Path to audio file (WAV, MP3, OGG, FLAC, M4A, etc.) Returns: Transcribed text from the audio Example: text = ear_transcribe("/path/to/voice_message.ogg") # Returns: "Hello, this is a test message." """ # Expand path and check existence path = Path(audio_path).expanduser() if not path.exists(): raise FileNotFoundError(f"Audio file not found: {audio_path}") async with httpx.AsyncClient(timeout=300.0) as client: # Step 1: Submit transcription job submit_url = f"{EARTAIL_BASE_URL}/transcribe/submit" print(f"📤 Submitting {path.name}...") try: with open(path, "rb") as f: files = {"audio": (path.name, f, "audio/wav")} submit_response = await client.post(submit_url, files=files) submit_response.raise_for_status() except httpx.HTTPError as e: raise RuntimeError(f"Failed to submit transcription job: {e}") job_data = submit_response.json() job_id = job_data.get("job_id") if not job_id: raise RuntimeError(f"No job_id in response: {job_data}") print(f"✓ Submitted (job: {job_id[:8]})") # Step 2: Poll for completion elapsed = 0 poll_count = 0 status_url = f"{EARTAIL_BASE_URL}/transcribe/status/{job_id}" current_status = "PENDING" while elapsed < DEFAULT_TIMEOUT: try: status_response = await client.get(status_url) status_response.raise_for_status() except httpx.HTTPError as e: raise RuntimeError(f"Failed to check status: {e}") status_data = status_response.json() current_status = status_data.get("status") progress = status_data.get("progress", 0) # Progress notification every 5 polls if poll_count % 5 == 0: if current_status == "PENDING": print(f"⏳ Queued... ({int(elapsed)}s)") elif current_status == "PROCESSING": print(f"👂 Transcribing... {progress}% ({int(elapsed)}s)") if current_status == "SUCCESS": print(f"✓ Complete! ({int(elapsed)}s)") break elif current_status == "FAILURE": error_msg = status_data.get("error", "Unknown error") raise RuntimeError(f"Transcription failed: {error_msg}") await asyncio.sleep(DEFAULT_POLL_INTERVAL) elapsed += DEFAULT_POLL_INTERVAL poll_count += 1 if current_status != "SUCCESS": raise RuntimeError(f"Transcription timed out after {DEFAULT_TIMEOUT}s") # Step 3: Get result result_url = f"{EARTAIL_BASE_URL}/transcribe/result/{job_id}" try: result_response = await client.get(result_url) result_response.raise_for_status() except httpx.HTTPError as e: raise RuntimeError(f"Failed to get result: {e}") result_data = result_response.json() transcription = result_data.get("transcription", "") language = result_data.get("language", "unknown") duration = result_data.get("duration", 0) # Update last transcription info last_transcription_info["job_id"] = job_id last_transcription_info["transcription"] = transcription last_transcription_info["language"] = language last_transcription_info["duration"] = duration last_transcription_info["source_file"] = str(path) print(f"🗣️ [{language}] {duration}s: \"{transcription[:50]}{'...' if len(transcription) > 50 else ''}\"") return transcription @mcp.tool() def ear_get_last() -> dict: """ Get information about the last transcription. Returns: Dictionary with job_id, transcription, language, duration, source_file Example: info = ear_get_last() # Returns: {"job_id": "abc123", "transcription": "Hello...", "language": "en", ...} """ if not last_transcription_info["job_id"]: return {"message": "No transcription has been done yet"} return last_transcription_info.copy() @mcp.tool() async def ear_health() -> dict: """ Check EarTail service health. Returns: Health status from EarTail service """ async with httpx.AsyncClient(timeout=10.0) as client: try: response = await client.get(f"{EARTAIL_BASE_URL}/health") response.raise_for_status() return response.json() except Exception as e: return {"status": "error", "error": str(e)} if __name__ == "__main__": mcp.run()