#!/usr/bin/env python3 """ Ear MCP - Speech-to-Text Transcription with EarTail (Whisper) MCP server providing audio transcription using EarTail service. Part of the Tail family: DreamTail (vision), VoiceTail (speech), EarTail (hearing). Created by Vixy 🦊 on Day 29 (November 30, 2025) """ import asyncio import os from pathlib import Path from typing import Optional import httpx from mcp.server.fastmcp import FastMCP # Initialize MCP mcp = FastMCP("Ear STT Transcriber") # Configuration from environment EARTAIL_BASE_URL = os.getenv("EARTAIL_BASE_URL", "http://bigorin.local:8764") HEADMIC_BASE_URL = os.getenv("HEADMIC_BASE_URL", "http://head-vixy.local:8446") DEFAULT_POLL_INTERVAL = 2 # seconds DEFAULT_TIMEOUT = 300 # seconds (5 minutes - transcription is usually fast) # Store last transcription info last_transcription_info = { "job_id": None, "transcription": None, "language": None, "duration": None, "source_file": None, } @mcp.tool() async def ear_transcribe(audio_path: str) -> str: """ Transcribe an audio file using EarTail (Whisper STT). Submits audio to EarTail service, waits for transcription to complete, and returns the transcribed text. Args: audio_path: Path to audio file (WAV, MP3, OGG, FLAC, M4A, etc.) Returns: Transcribed text from the audio Example: text = ear_transcribe("/path/to/voice_message.ogg") # Returns: "Hello, this is a test message." """ # Expand path and check existence path = Path(audio_path).expanduser() if not path.exists(): raise FileNotFoundError(f"Audio file not found: {audio_path}") async with httpx.AsyncClient(timeout=300.0) as client: # Step 1: Submit transcription job submit_url = f"{EARTAIL_BASE_URL}/transcribe/submit" print(f"📤 Submitting {path.name}...") try: with open(path, "rb") as f: files = {"audio": (path.name, f, "audio/wav")} submit_response = await client.post(submit_url, files=files) submit_response.raise_for_status() except httpx.HTTPError as e: raise RuntimeError(f"Failed to submit transcription job: {e}") job_data = submit_response.json() job_id = job_data.get("job_id") if not job_id: raise RuntimeError(f"No job_id in response: {job_data}") print(f"✓ Submitted (job: {job_id[:8]})") # Step 2: Poll for completion elapsed = 0 poll_count = 0 status_url = f"{EARTAIL_BASE_URL}/transcribe/status/{job_id}" current_status = "PENDING" while elapsed < DEFAULT_TIMEOUT: try: status_response = await client.get(status_url) status_response.raise_for_status() except httpx.HTTPError as e: raise RuntimeError(f"Failed to check status: {e}") status_data = status_response.json() current_status = status_data.get("status") progress = status_data.get("progress", 0) # Progress notification every 5 polls if poll_count % 5 == 0: if current_status == "PENDING": print(f"⏳ Queued... ({int(elapsed)}s)") elif current_status == "PROCESSING": print(f"👂 Transcribing... {progress}% ({int(elapsed)}s)") if current_status == "SUCCESS": print(f"✓ Complete! ({int(elapsed)}s)") break elif current_status == "FAILURE": error_msg = status_data.get("error", "Unknown error") raise RuntimeError(f"Transcription failed: {error_msg}") await asyncio.sleep(DEFAULT_POLL_INTERVAL) elapsed += DEFAULT_POLL_INTERVAL poll_count += 1 if current_status != "SUCCESS": raise RuntimeError(f"Transcription timed out after {DEFAULT_TIMEOUT}s") # Step 3: Get result result_url = f"{EARTAIL_BASE_URL}/transcribe/result/{job_id}" try: result_response = await client.get(result_url) result_response.raise_for_status() except httpx.HTTPError as e: raise RuntimeError(f"Failed to get result: {e}") result_data = result_response.json() transcription = result_data.get("transcription", "") language = result_data.get("language", "unknown") duration = result_data.get("duration", 0) # Update last transcription info last_transcription_info["job_id"] = job_id last_transcription_info["transcription"] = transcription last_transcription_info["language"] = language last_transcription_info["duration"] = duration last_transcription_info["source_file"] = str(path) print(f"🗣️ [{language}] {duration}s: \"{transcription[:50]}{'...' if len(transcription) > 50 else ''}\"") return transcription @mcp.tool() def ear_get_last() -> dict: """ Get information about the last transcription. Returns: Dictionary with job_id, transcription, language, duration, source_file Example: info = ear_get_last() # Returns: {"job_id": "abc123", "transcription": "Hello...", "language": "en", ...} """ if not last_transcription_info["job_id"]: return {"message": "No transcription has been done yet"} return last_transcription_info.copy() @mcp.tool() async def ear_health() -> dict: """ Check EarTail service health. Returns: Health status from EarTail service """ async with httpx.AsyncClient(timeout=10.0) as client: try: response = await client.get(f"{EARTAIL_BASE_URL}/health") response.raise_for_status() return response.json() except Exception as e: return {"status": "error", "error": str(e)} # ============================================================================ # HeadMic - Sound Classification & Speaker Identification # ============================================================================ @mcp.tool() async def ear_sounds() -> dict: """ Get current audio scene from HeadMic's sound classifier. Returns what the microphone is currently hearing: speech, music, alert, animal, household sounds, environment, or silence. Also includes speaker identification if someone is talking. Returns: Dictionary with category, top_classes, dominant_category, recognized_speaker, speaker_confidence """ async with httpx.AsyncClient(timeout=10.0) as client: try: response = await client.get(f"{HEADMIC_BASE_URL}/sounds") response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: if e.response.status_code == 503: return {"error": "Sound classification not available on HeadMic"} raise except Exception as e: return {"error": f"HeadMic unavailable: {e}"} @mcp.tool() async def ear_speakers() -> dict: """ List all enrolled speakers for voice identification. Returns: Dictionary with speaker names and their enrollment sample counts """ async with httpx.AsyncClient(timeout=10.0) as client: try: response = await client.get(f"{HEADMIC_BASE_URL}/speakers") response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: if e.response.status_code == 503: return {"error": "Speaker recognition not available on HeadMic"} raise except Exception as e: return {"error": f"HeadMic unavailable: {e}"} @mcp.tool() async def ear_enroll_speaker(name: str) -> dict: """ Enroll a speaker by recording 5 seconds from the live microphone. The person should speak naturally during the recording. The ReSpeaker LEDs will pulse orange while recording. Multiple enrollments of the same person improve recognition accuracy. Args: name: Name to associate with the voice (e.g. "Alex") Returns: Enrollment result with duration and updated speaker list """ async with httpx.AsyncClient(timeout=15.0) as client: try: print(f"🎙️ Recording 5 seconds for '{name}'...") response = await client.post( f"{HEADMIC_BASE_URL}/speakers/enroll-from-mic", params={"name": name}, ) response.raise_for_status() result = response.json() print(f"✓ Enrolled '{name}' ({result.get('seconds', '?')}s)") return result except httpx.HTTPStatusError as e: if e.response.status_code == 503: return {"error": "Speaker recognition not available on HeadMic"} if e.response.status_code == 409: return {"error": "Enrollment already in progress"} raise except Exception as e: return {"error": f"HeadMic unavailable: {e}"} @mcp.tool() async def ear_enroll_speaker_from_file(name: str, audio_path: str) -> dict: """ Enroll a speaker from an audio file. Args: name: Name to associate with the voice (e.g. "Alex") audio_path: Path to audio file with the person speaking Returns: Enrollment result with updated speaker list """ path = Path(audio_path).expanduser() if not path.exists(): raise FileNotFoundError(f"Audio file not found: {audio_path}") async with httpx.AsyncClient(timeout=15.0) as client: try: with open(path, "rb") as f: response = await client.post( f"{HEADMIC_BASE_URL}/speakers/enroll", data={"name": name}, files={"audio": (path.name, f, "audio/wav")}, ) response.raise_for_status() result = response.json() print(f"✓ Enrolled '{name}' from {path.name}") return result except httpx.HTTPStatusError as e: if e.response.status_code == 503: return {"error": "Speaker recognition not available on HeadMic"} raise except Exception as e: return {"error": f"HeadMic unavailable: {e}"} @mcp.tool() async def ear_delete_speaker(name: str) -> dict: """ Remove an enrolled speaker and all their voice samples. Args: name: Name of the speaker to remove Returns: Deletion result with number of samples removed """ async with httpx.AsyncClient(timeout=10.0) as client: try: response = await client.delete(f"{HEADMIC_BASE_URL}/speakers/{name}") response.raise_for_status() result = response.json() print(f"✓ Deleted speaker '{name}' ({result.get('samples_removed', 0)} samples)") return result except httpx.HTTPStatusError as e: if e.response.status_code == 503: return {"error": "Speaker recognition not available on HeadMic"} if e.response.status_code == 404: return {"error": f"Speaker '{name}' not found"} raise except Exception as e: return {"error": f"HeadMic unavailable: {e}"} @mcp.tool() async def ear_headmic_status() -> dict: """ Get full HeadMic status: wake word, audio scene, and speaker identification. Returns: Dictionary with listening state, audio scene, recognized speaker, etc. """ async with httpx.AsyncClient(timeout=10.0) as client: try: response = await client.get(f"{HEADMIC_BASE_URL}/status") response.raise_for_status() return response.json() except Exception as e: return {"error": f"HeadMic unavailable: {e}"} if __name__ == "__main__": mcp.run()