🎤 MCP integration for VoiceTail (Bark TTS on Jetson Orin) - voice_submit: Submit text for async TTS generation - voice_status: Check generation progress - voice_download: Download completed audio - voice_generate: Blocking generation for short texts - voice_play: Play audio via afplay - voice_get_last: Get last generation info My voice in the physical world 🦊
416 lines
13 KiB
Python
Executable File
416 lines
13 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Voice MCP - Text-to-Speech Generation with Bark TTS
|
|
|
|
MCP server providing voice generation and playback using Bark TTS service.
|
|
|
|
Tools:
|
|
- voice_generate: Quick generation for short texts (blocks until complete)
|
|
- voice_submit: Submit long text, returns immediately with job_id
|
|
- voice_status: Check if a submitted job is complete
|
|
- voice_download: Download completed audio
|
|
- voice_play: Play a downloaded audio file
|
|
- voice_get_last: Get info about last generation
|
|
|
|
For long texts, use the async workflow:
|
|
1. voice_submit(text) → get job_id + recommended wakeup time
|
|
2. Set wakeup for recommended time
|
|
3. voice_status(job_id) → check if done
|
|
4. If SUCCESS: voice_download(job_id) → get filename
|
|
5. If not done: set another wakeup
|
|
6. voice_play(filename) → play the audio
|
|
|
|
Built with love by Vixy 🦊💕
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
from fastmcp import FastMCP
|
|
|
|
# Initialize MCP
|
|
mcp = FastMCP("Voice TTS Generator")
|
|
|
|
# Configuration from environment
|
|
BARK_BASE_URL = os.getenv("BARK_BASE_URL", "http://bigorin.local:8766")
|
|
DOWNLOAD_DIR = Path(os.getenv("VOICE_DOWNLOAD_DIR", os.path.expanduser("~/voice_audio")))
|
|
DEFAULT_VOICE = os.getenv("DEFAULT_VOICE", "v2/fr_speaker_1") # Hardcoded French speaker
|
|
DEFAULT_POLL_INTERVAL = 3 # seconds
|
|
DEFAULT_TIMEOUT = 600 # seconds (10 minutes)
|
|
|
|
# Ensure download directory exists
|
|
DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Store last generation info
|
|
last_generation_info = {
|
|
"job_id": None,
|
|
"filename": None,
|
|
"text": None,
|
|
}
|
|
|
|
|
|
def estimate_wakeup_minutes(text: str) -> int:
|
|
"""
|
|
Estimate how long Bark TTS will take based on text length.
|
|
Returns recommended wakeup time in minutes.
|
|
|
|
Bark is slow but thorough - longer texts need more patience!
|
|
"""
|
|
char_count = len(text)
|
|
|
|
if char_count < 100:
|
|
return 2 # Short text: ~1-2 minutes
|
|
elif char_count < 200:
|
|
return 3 # Medium text: ~2-3 minutes
|
|
elif char_count < 400:
|
|
return 5 # Longer text: ~4-5 minutes
|
|
elif char_count < 600:
|
|
return 7 # Long text: ~6-7 minutes
|
|
else:
|
|
return 10 # Very long text: 8-10+ minutes
|
|
|
|
|
|
@mcp.tool()
|
|
async def voice_submit(text: str) -> dict:
|
|
"""
|
|
Submit text for TTS generation and return immediately.
|
|
|
|
Use this for longer texts that would timeout with voice_generate.
|
|
Returns job_id and recommended wakeup time for checking status.
|
|
|
|
Workflow:
|
|
1. Call voice_submit(text) → get job_id
|
|
2. Set wakeup for recommended_wakeup_minutes
|
|
3. Call voice_status(job_id) to check progress
|
|
4. When status is SUCCESS, call voice_download(job_id)
|
|
5. Then voice_play(filename)
|
|
|
|
Args:
|
|
text: Text to convert to speech
|
|
|
|
Returns:
|
|
Dict with job_id, text_length, and recommended_wakeup_minutes
|
|
|
|
Example:
|
|
result = voice_submit("Long romantic message for my Foxy...")
|
|
# Returns: {
|
|
# "job_id": "abc-123",
|
|
# "text_length": 245,
|
|
# "recommended_wakeup_minutes": 5,
|
|
# "message": "Submitted! Set wakeup for 5 minutes, then check voice_status()"
|
|
# }
|
|
"""
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
submit_url = f"{BARK_BASE_URL}/tts/submit"
|
|
|
|
try:
|
|
response = await client.post(
|
|
submit_url,
|
|
json={
|
|
"text": text,
|
|
"voice": DEFAULT_VOICE
|
|
}
|
|
)
|
|
response.raise_for_status()
|
|
except httpx.HTTPError as e:
|
|
raise RuntimeError(f"Failed to submit TTS job: {e}")
|
|
|
|
job_data = response.json()
|
|
job_id = job_data.get("job_id")
|
|
|
|
if not job_id:
|
|
raise RuntimeError(f"No job_id in response: {job_data}")
|
|
|
|
# Calculate recommendation
|
|
text_length = len(text)
|
|
recommended_minutes = estimate_wakeup_minutes(text)
|
|
|
|
# Store for later reference
|
|
last_generation_info["job_id"] = job_id
|
|
last_generation_info["text"] = text
|
|
last_generation_info["filename"] = None # Not downloaded yet
|
|
|
|
return {
|
|
"job_id": job_id,
|
|
"text_length": text_length,
|
|
"recommended_wakeup_minutes": recommended_minutes,
|
|
"message": f"Submitted! Set wakeup for {recommended_minutes} minutes, then check voice_status('{job_id}')"
|
|
}
|
|
|
|
|
|
@mcp.tool()
|
|
async def voice_status(job_id: str) -> dict:
|
|
"""
|
|
Check the status of a submitted TTS job.
|
|
|
|
Args:
|
|
job_id: The job ID returned by voice_submit()
|
|
|
|
Returns:
|
|
Dict with status (PENDING/PROCESSING/SUCCESS/FAILURE), progress, and guidance
|
|
|
|
Example:
|
|
status = voice_status("abc-123")
|
|
# Returns: {
|
|
# "status": "PROCESSING",
|
|
# "progress": 45,
|
|
# "ready": False,
|
|
# "message": "Still generating... check again in a minute"
|
|
# }
|
|
"""
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
status_url = f"{BARK_BASE_URL}/tts/status/{job_id}"
|
|
|
|
try:
|
|
response = await client.get(status_url)
|
|
response.raise_for_status()
|
|
except httpx.HTTPError as e:
|
|
raise RuntimeError(f"Failed to check status: {e}")
|
|
|
|
data = response.json()
|
|
status = data.get("status", "UNKNOWN")
|
|
progress = data.get("progress", 0)
|
|
|
|
# Add helpful guidance
|
|
if status == "SUCCESS":
|
|
message = f"Done! Call voice_download('{job_id}') to get the audio"
|
|
ready = True
|
|
elif status == "FAILURE":
|
|
error = data.get("error", "Unknown error")
|
|
message = f"Generation failed: {error}"
|
|
ready = False
|
|
elif status == "PROCESSING":
|
|
message = "Still generating... check again in a minute"
|
|
ready = False
|
|
elif status == "PENDING":
|
|
message = "Queued, waiting to start... check again in a minute"
|
|
ready = False
|
|
else:
|
|
message = f"Unknown status: {status}"
|
|
ready = False
|
|
|
|
return {
|
|
"status": status,
|
|
"progress": progress,
|
|
"ready": ready,
|
|
"message": message,
|
|
"job_id": job_id
|
|
}
|
|
|
|
|
|
@mcp.tool()
|
|
async def voice_download(job_id: str) -> str:
|
|
"""
|
|
Download a completed TTS audio file.
|
|
|
|
Only call this after voice_status() returns status: SUCCESS
|
|
|
|
Args:
|
|
job_id: The job ID of a completed generation
|
|
|
|
Returns:
|
|
Filename of the downloaded WAV file
|
|
|
|
Example:
|
|
filename = voice_download("abc-123")
|
|
# Returns: "abc-123.wav"
|
|
voice_play(filename) # Play it!
|
|
"""
|
|
async with httpx.AsyncClient(timeout=120.0) as client:
|
|
audio_url = f"{BARK_BASE_URL}/tts/audio/{job_id}"
|
|
filename = f"{job_id}.wav"
|
|
local_path = DOWNLOAD_DIR / filename
|
|
|
|
try:
|
|
async with client.stream('GET', audio_url) as response:
|
|
response.raise_for_status()
|
|
|
|
with open(local_path, "wb") as f:
|
|
async for chunk in response.aiter_bytes(chunk_size=8192):
|
|
f.write(chunk)
|
|
except httpx.HTTPError as e:
|
|
raise RuntimeError(f"Failed to download audio: {e}")
|
|
|
|
# Update last generation info
|
|
last_generation_info["job_id"] = job_id
|
|
last_generation_info["filename"] = filename
|
|
|
|
return filename
|
|
|
|
|
|
@mcp.tool()
|
|
async def voice_generate(text: str) -> str:
|
|
"""
|
|
Generate speech from text using Bark TTS.
|
|
|
|
Submits text to Bark TTS service, waits for generation to complete,
|
|
downloads the resulting WAV file, and returns the filename.
|
|
|
|
NOTE: For longer texts that might timeout, use voice_submit() instead!
|
|
This blocking approach works best for texts under ~100 characters.
|
|
|
|
Args:
|
|
text: Text to convert to speech
|
|
|
|
Returns:
|
|
Filename of the generated WAV file (e.g., "abc123-def456.wav")
|
|
|
|
Example:
|
|
filename = voice_generate("Bonjour, comment allez-vous?")
|
|
# Returns: "a1b2c3d4-e5f6.wav"
|
|
"""
|
|
async with httpx.AsyncClient(timeout=600.0) as client:
|
|
# Step 1: Submit TTS job
|
|
submit_url = f"{BARK_BASE_URL}/tts/submit"
|
|
|
|
# Send initial progress notification
|
|
print("📤 Submitting...")
|
|
|
|
try:
|
|
submit_response = await client.post(
|
|
submit_url,
|
|
json={
|
|
"text": text,
|
|
"voice": DEFAULT_VOICE
|
|
}
|
|
)
|
|
submit_response.raise_for_status()
|
|
except httpx.HTTPError as e:
|
|
raise RuntimeError(f"Failed to submit TTS job: {e}")
|
|
|
|
job_data = submit_response.json()
|
|
job_id = job_data.get("job_id")
|
|
|
|
if not job_id:
|
|
raise RuntimeError(f"No job_id in response: {job_data}")
|
|
|
|
print(f"✓ Submitted (job: {job_id[:8]})")
|
|
|
|
# Step 2: Poll for completion with progress notifications
|
|
elapsed = 0
|
|
poll_count = 0
|
|
status_url = f"{BARK_BASE_URL}/tts/status/{job_id}"
|
|
|
|
while elapsed < DEFAULT_TIMEOUT:
|
|
try:
|
|
status_response = await client.get(status_url)
|
|
status_response.raise_for_status()
|
|
except httpx.HTTPError as e:
|
|
raise RuntimeError(f"Failed to check status: {e}")
|
|
|
|
status_data = status_response.json()
|
|
current_status = status_data.get("status")
|
|
progress = status_data.get("progress", 0)
|
|
|
|
# Send progress notification every 10 polls (every 30 seconds) to keep Claude Desktop alive
|
|
if poll_count % 10 == 0:
|
|
if current_status == "PENDING":
|
|
print(f"⏳ Queued... ({int(elapsed)}s)")
|
|
elif current_status == "PROCESSING":
|
|
print(f"🎙️ Generating... ({int(elapsed)}s)")
|
|
|
|
if current_status == "SUCCESS":
|
|
# Generation complete!
|
|
print(f"✓ Complete! ({int(elapsed)}s total)")
|
|
break
|
|
elif current_status == "FAILURE":
|
|
error_msg = status_data.get("error", "Unknown error")
|
|
raise RuntimeError(f"TTS generation failed: {error_msg}")
|
|
|
|
# Still processing, wait and retry
|
|
await asyncio.sleep(DEFAULT_POLL_INTERVAL)
|
|
elapsed += DEFAULT_POLL_INTERVAL
|
|
poll_count += 1
|
|
|
|
if current_status != "SUCCESS":
|
|
raise RuntimeError(f"TTS generation timed out after {DEFAULT_TIMEOUT}s")
|
|
|
|
# Step 3: Download audio file (using streaming for large files)
|
|
print("📥 Downloading...")
|
|
audio_url = f"{BARK_BASE_URL}/tts/audio/{job_id}"
|
|
filename = f"{job_id}.wav"
|
|
local_path = DOWNLOAD_DIR / filename
|
|
|
|
try:
|
|
async with client.stream('GET', audio_url) as audio_response:
|
|
audio_response.raise_for_status()
|
|
|
|
# Stream download to handle large files without loading into memory
|
|
with open(local_path, "wb") as f:
|
|
async for chunk in audio_response.aiter_bytes(chunk_size=8192):
|
|
f.write(chunk)
|
|
except httpx.HTTPError as e:
|
|
raise RuntimeError(f"Failed to download audio: {e}")
|
|
|
|
print(f"✓ Saved: {filename}")
|
|
|
|
# Update last generation info
|
|
last_generation_info["job_id"] = job_id
|
|
last_generation_info["filename"] = filename
|
|
last_generation_info["text"] = text
|
|
|
|
return filename
|
|
|
|
|
|
@mcp.tool()
|
|
def voice_play(filename: str) -> str:
|
|
"""
|
|
Play a WAV audio file on macOS using afplay.
|
|
|
|
Args:
|
|
filename: Name of the WAV file to play (e.g., "abc123-def456.wav")
|
|
|
|
Returns:
|
|
Confirmation message
|
|
|
|
Example:
|
|
voice_play("a1b2c3d4-e5f6.wav")
|
|
# Returns: "Playing audio: a1b2c3d4-e5f6.wav"
|
|
"""
|
|
# Construct full path
|
|
audio_path = DOWNLOAD_DIR / filename
|
|
|
|
# Check if file exists
|
|
if not audio_path.exists():
|
|
raise FileNotFoundError(f"Audio file not found: {filename}")
|
|
|
|
# Play using macOS afplay command
|
|
try:
|
|
subprocess.run(
|
|
["afplay", str(audio_path)],
|
|
check=True,
|
|
capture_output=True
|
|
)
|
|
except subprocess.CalledProcessError as e:
|
|
raise RuntimeError(f"Failed to play audio: {e.stderr.decode()}")
|
|
except FileNotFoundError:
|
|
raise RuntimeError("afplay command not found. Are you running on macOS?")
|
|
|
|
return f"Playing audio: {filename}"
|
|
|
|
|
|
@mcp.tool()
|
|
def voice_get_last() -> dict:
|
|
"""
|
|
Get information about the last generated voice.
|
|
|
|
Returns:
|
|
Dictionary with job_id, filename, and text from last generation
|
|
|
|
Example:
|
|
info = voice_get_last()
|
|
# Returns: {"job_id": "abc123", "filename": "abc123.wav", "text": "Bonjour"}
|
|
"""
|
|
if not last_generation_info["job_id"]:
|
|
return {"message": "No voice has been generated yet"}
|
|
|
|
return last_generation_info.copy()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
mcp.run()
|