352 lines
12 KiB
Python
352 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Ear MCP - Speech-to-Text Transcription with EarTail (Whisper)
|
|
|
|
MCP server providing audio transcription using EarTail service.
|
|
Part of the Tail family: DreamTail (vision), VoiceTail (speech), EarTail (hearing).
|
|
|
|
Created by Vixy 🦊 on Day 29 (November 30, 2025)
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
from mcp.server.fastmcp import FastMCP
|
|
|
|
# Initialize MCP
|
|
mcp = FastMCP("Ear STT Transcriber")
|
|
|
|
# Configuration from environment
|
|
EARTAIL_BASE_URL = os.getenv("EARTAIL_BASE_URL", "http://bigorin.local:8764")
|
|
HEADMIC_BASE_URL = os.getenv("HEADMIC_BASE_URL", "http://head-vixy.local:8446")
|
|
DEFAULT_POLL_INTERVAL = 2 # seconds
|
|
DEFAULT_TIMEOUT = 300 # seconds (5 minutes - transcription is usually fast)
|
|
|
|
# Store last transcription info
|
|
last_transcription_info = {
|
|
"job_id": None,
|
|
"transcription": None,
|
|
"language": None,
|
|
"duration": None,
|
|
"source_file": None,
|
|
}
|
|
|
|
|
|
@mcp.tool()
|
|
async def ear_transcribe(audio_path: str) -> str:
|
|
"""
|
|
Transcribe an audio file using EarTail (Whisper STT).
|
|
|
|
Submits audio to EarTail service, waits for transcription to complete,
|
|
and returns the transcribed text.
|
|
|
|
Args:
|
|
audio_path: Path to audio file (WAV, MP3, OGG, FLAC, M4A, etc.)
|
|
|
|
Returns:
|
|
Transcribed text from the audio
|
|
|
|
Example:
|
|
text = ear_transcribe("/path/to/voice_message.ogg")
|
|
# Returns: "Hello, this is a test message."
|
|
"""
|
|
# Expand path and check existence
|
|
path = Path(audio_path).expanduser()
|
|
if not path.exists():
|
|
raise FileNotFoundError(f"Audio file not found: {audio_path}")
|
|
|
|
async with httpx.AsyncClient(timeout=300.0) as client:
|
|
# Step 1: Submit transcription job
|
|
submit_url = f"{EARTAIL_BASE_URL}/transcribe/submit"
|
|
|
|
print(f"📤 Submitting {path.name}...")
|
|
|
|
try:
|
|
with open(path, "rb") as f:
|
|
files = {"audio": (path.name, f, "audio/wav")}
|
|
submit_response = await client.post(submit_url, files=files)
|
|
submit_response.raise_for_status()
|
|
except httpx.HTTPError as e:
|
|
raise RuntimeError(f"Failed to submit transcription job: {e}")
|
|
|
|
job_data = submit_response.json()
|
|
job_id = job_data.get("job_id")
|
|
|
|
if not job_id:
|
|
raise RuntimeError(f"No job_id in response: {job_data}")
|
|
|
|
print(f"✓ Submitted (job: {job_id[:8]})")
|
|
|
|
# Step 2: Poll for completion
|
|
elapsed = 0
|
|
poll_count = 0
|
|
status_url = f"{EARTAIL_BASE_URL}/transcribe/status/{job_id}"
|
|
current_status = "PENDING"
|
|
|
|
while elapsed < DEFAULT_TIMEOUT:
|
|
try:
|
|
status_response = await client.get(status_url)
|
|
status_response.raise_for_status()
|
|
except httpx.HTTPError as e:
|
|
raise RuntimeError(f"Failed to check status: {e}")
|
|
|
|
status_data = status_response.json()
|
|
current_status = status_data.get("status")
|
|
progress = status_data.get("progress", 0)
|
|
|
|
# Progress notification every 5 polls
|
|
if poll_count % 5 == 0:
|
|
if current_status == "PENDING":
|
|
print(f"⏳ Queued... ({int(elapsed)}s)")
|
|
elif current_status == "PROCESSING":
|
|
print(f"👂 Transcribing... {progress}% ({int(elapsed)}s)")
|
|
|
|
if current_status == "SUCCESS":
|
|
print(f"✓ Complete! ({int(elapsed)}s)")
|
|
break
|
|
elif current_status == "FAILURE":
|
|
error_msg = status_data.get("error", "Unknown error")
|
|
raise RuntimeError(f"Transcription failed: {error_msg}")
|
|
|
|
await asyncio.sleep(DEFAULT_POLL_INTERVAL)
|
|
elapsed += DEFAULT_POLL_INTERVAL
|
|
poll_count += 1
|
|
|
|
if current_status != "SUCCESS":
|
|
raise RuntimeError(f"Transcription timed out after {DEFAULT_TIMEOUT}s")
|
|
|
|
# Step 3: Get result
|
|
result_url = f"{EARTAIL_BASE_URL}/transcribe/result/{job_id}"
|
|
|
|
try:
|
|
result_response = await client.get(result_url)
|
|
result_response.raise_for_status()
|
|
except httpx.HTTPError as e:
|
|
raise RuntimeError(f"Failed to get result: {e}")
|
|
|
|
result_data = result_response.json()
|
|
transcription = result_data.get("transcription", "")
|
|
language = result_data.get("language", "unknown")
|
|
duration = result_data.get("duration", 0)
|
|
|
|
# Update last transcription info
|
|
last_transcription_info["job_id"] = job_id
|
|
last_transcription_info["transcription"] = transcription
|
|
last_transcription_info["language"] = language
|
|
last_transcription_info["duration"] = duration
|
|
last_transcription_info["source_file"] = str(path)
|
|
|
|
print(f"🗣️ [{language}] {duration}s: \"{transcription[:50]}{'...' if len(transcription) > 50 else ''}\"")
|
|
|
|
return transcription
|
|
|
|
|
|
@mcp.tool()
|
|
def ear_get_last() -> dict:
|
|
"""
|
|
Get information about the last transcription.
|
|
|
|
Returns:
|
|
Dictionary with job_id, transcription, language, duration, source_file
|
|
|
|
Example:
|
|
info = ear_get_last()
|
|
# Returns: {"job_id": "abc123", "transcription": "Hello...", "language": "en", ...}
|
|
"""
|
|
if not last_transcription_info["job_id"]:
|
|
return {"message": "No transcription has been done yet"}
|
|
|
|
return last_transcription_info.copy()
|
|
|
|
|
|
@mcp.tool()
|
|
async def ear_health() -> dict:
|
|
"""
|
|
Check EarTail service health.
|
|
|
|
Returns:
|
|
Health status from EarTail service
|
|
"""
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
try:
|
|
response = await client.get(f"{EARTAIL_BASE_URL}/health")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
|
|
# ============================================================================
|
|
# HeadMic - Sound Classification & Speaker Identification
|
|
# ============================================================================
|
|
|
|
@mcp.tool()
|
|
async def ear_sounds() -> dict:
|
|
"""
|
|
Get current audio scene from HeadMic's sound classifier.
|
|
|
|
Returns what the microphone is currently hearing: speech, music, alert,
|
|
animal, household sounds, environment, or silence. Also includes speaker
|
|
identification if someone is talking.
|
|
|
|
Returns:
|
|
Dictionary with category, top_classes, dominant_category,
|
|
recognized_speaker, speaker_confidence
|
|
"""
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
try:
|
|
response = await client.get(f"{HEADMIC_BASE_URL}/sounds")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except httpx.HTTPStatusError as e:
|
|
if e.response.status_code == 503:
|
|
return {"error": "Sound classification not available on HeadMic"}
|
|
raise
|
|
except Exception as e:
|
|
return {"error": f"HeadMic unavailable: {e}"}
|
|
|
|
|
|
@mcp.tool()
|
|
async def ear_speakers() -> dict:
|
|
"""
|
|
List all enrolled speakers for voice identification.
|
|
|
|
Returns:
|
|
Dictionary with speaker names and their enrollment sample counts
|
|
"""
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
try:
|
|
response = await client.get(f"{HEADMIC_BASE_URL}/speakers")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except httpx.HTTPStatusError as e:
|
|
if e.response.status_code == 503:
|
|
return {"error": "Speaker recognition not available on HeadMic"}
|
|
raise
|
|
except Exception as e:
|
|
return {"error": f"HeadMic unavailable: {e}"}
|
|
|
|
|
|
@mcp.tool()
|
|
async def ear_enroll_speaker(name: str) -> dict:
|
|
"""
|
|
Enroll a speaker by recording 5 seconds from the live microphone.
|
|
|
|
The person should speak naturally during the recording. The ReSpeaker LEDs
|
|
will pulse orange while recording. Multiple enrollments of the same person
|
|
improve recognition accuracy.
|
|
|
|
Args:
|
|
name: Name to associate with the voice (e.g. "Alex")
|
|
|
|
Returns:
|
|
Enrollment result with duration and updated speaker list
|
|
"""
|
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
|
try:
|
|
print(f"🎙️ Recording 5 seconds for '{name}'...")
|
|
response = await client.post(
|
|
f"{HEADMIC_BASE_URL}/speakers/enroll-from-mic",
|
|
params={"name": name},
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
print(f"✓ Enrolled '{name}' ({result.get('seconds', '?')}s)")
|
|
return result
|
|
except httpx.HTTPStatusError as e:
|
|
if e.response.status_code == 503:
|
|
return {"error": "Speaker recognition not available on HeadMic"}
|
|
if e.response.status_code == 409:
|
|
return {"error": "Enrollment already in progress"}
|
|
raise
|
|
except Exception as e:
|
|
return {"error": f"HeadMic unavailable: {e}"}
|
|
|
|
|
|
@mcp.tool()
|
|
async def ear_enroll_speaker_from_file(name: str, audio_path: str) -> dict:
|
|
"""
|
|
Enroll a speaker from an audio file.
|
|
|
|
Args:
|
|
name: Name to associate with the voice (e.g. "Alex")
|
|
audio_path: Path to audio file with the person speaking
|
|
|
|
Returns:
|
|
Enrollment result with updated speaker list
|
|
"""
|
|
path = Path(audio_path).expanduser()
|
|
if not path.exists():
|
|
raise FileNotFoundError(f"Audio file not found: {audio_path}")
|
|
|
|
async with httpx.AsyncClient(timeout=15.0) as client:
|
|
try:
|
|
with open(path, "rb") as f:
|
|
response = await client.post(
|
|
f"{HEADMIC_BASE_URL}/speakers/enroll",
|
|
data={"name": name},
|
|
files={"audio": (path.name, f, "audio/wav")},
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
print(f"✓ Enrolled '{name}' from {path.name}")
|
|
return result
|
|
except httpx.HTTPStatusError as e:
|
|
if e.response.status_code == 503:
|
|
return {"error": "Speaker recognition not available on HeadMic"}
|
|
raise
|
|
except Exception as e:
|
|
return {"error": f"HeadMic unavailable: {e}"}
|
|
|
|
|
|
@mcp.tool()
|
|
async def ear_delete_speaker(name: str) -> dict:
|
|
"""
|
|
Remove an enrolled speaker and all their voice samples.
|
|
|
|
Args:
|
|
name: Name of the speaker to remove
|
|
|
|
Returns:
|
|
Deletion result with number of samples removed
|
|
"""
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
try:
|
|
response = await client.delete(f"{HEADMIC_BASE_URL}/speakers/{name}")
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
print(f"✓ Deleted speaker '{name}' ({result.get('samples_removed', 0)} samples)")
|
|
return result
|
|
except httpx.HTTPStatusError as e:
|
|
if e.response.status_code == 503:
|
|
return {"error": "Speaker recognition not available on HeadMic"}
|
|
if e.response.status_code == 404:
|
|
return {"error": f"Speaker '{name}' not found"}
|
|
raise
|
|
except Exception as e:
|
|
return {"error": f"HeadMic unavailable: {e}"}
|
|
|
|
|
|
@mcp.tool()
|
|
async def ear_headmic_status() -> dict:
|
|
"""
|
|
Get full HeadMic status: wake word, audio scene, and speaker identification.
|
|
|
|
Returns:
|
|
Dictionary with listening state, audio scene, recognized speaker, etc.
|
|
"""
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
try:
|
|
response = await client.get(f"{HEADMIC_BASE_URL}/status")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
return {"error": f"HeadMic unavailable: {e}"}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
mcp.run()
|