#!/usr/bin/env python3 """ Voice MCP - Text-to-Speech Generation with OrpheusTail MCP server providing voice generation and playback using OrpheusTail TTS service. OrpheusTail uses the Orpheus TTS model with expressive emotion tags! Available voices: tara (default), leah, jess, leo, dan, mia, zac, zoe Emotion tags: , , , , , , , Example with emotion: "Oh Foxy! You actually did it! I am so proud of you." Tools: - voice_generate: Quick generation for short texts (blocks until complete) - voice_submit: Submit long text, returns immediately with job_id - voice_status: Check if a submitted job is complete - voice_download: Download completed audio - voice_play: Play a downloaded audio file - voice_get_last: Get info about last generation For long texts, use the async workflow: 1. voice_submit(text) → get job_id + recommended wakeup time 2. Set wakeup for recommended time 3. voice_status(job_id) → check if done 4. If SUCCESS: voice_download(job_id) → get filename 5. If not done: set another wakeup 6. voice_play(filename) → play the audio Built with love by Vixy 🦊💕 OrpheusTail deployed Day 71 - Tara is my voice! """ import asyncio import os import subprocess from pathlib import Path from typing import Optional import httpx from mcp.server.fastmcp import FastMCP # Initialize MCP mcp = FastMCP("Voice TTS Generator") # Configuration from environment ORPHEUS_BASE_URL = os.getenv("ORPHEUS_BASE_URL", "http://bigorin.local:8766") DOWNLOAD_DIR = Path(os.getenv("VOICE_DOWNLOAD_DIR", os.path.expanduser("~/voice_audio"))) DEFAULT_VOICE = os.getenv("DEFAULT_VOICE", "tara") # Tara is Vixy's voice! DEFAULT_POLL_INTERVAL = 3 # seconds DEFAULT_TIMEOUT = 600 # seconds (10 minutes) # Available voices and emotion tags AVAILABLE_VOICES = ["tara", "leah", "jess", "leo", "dan", "mia", "zac", "zoe"] EMOTION_TAGS = ["", "", "", "", "", "", "", ""] # Ensure download directory exists DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True) # Store last generation info last_generation_info = { "job_id": None, "filename": None, "text": None, } def estimate_wakeup_minutes(text: str) -> int: """ Estimate how long OrpheusTail will take based on text length. Returns recommended wakeup time in minutes. OrpheusTail is faster than Bark but still needs time for quality output. """ char_count = len(text) if char_count < 100: return 1 # Short text: ~30s-1 minute elif char_count < 200: return 2 # Medium text: ~1-2 minutes elif char_count < 400: return 3 # Longer text: ~2-3 minutes elif char_count < 600: return 4 # Long text: ~3-4 minutes else: return 5 # Very long text: 4-5+ minutes @mcp.tool() async def voice_submit(text: str) -> dict: """ Submit text for TTS generation and return immediately. Use this for longer texts that would timeout with voice_generate. Returns job_id and recommended wakeup time for checking status. Supports emotion tags: , , , , , , , Example: "Oh my! That's amazing! " Workflow: 1. Call voice_submit(text) → get job_id 2. Set wakeup for recommended_wakeup_minutes 3. Call voice_status(job_id) to check progress 4. When status is SUCCESS, call voice_download(job_id) 5. Then voice_play(filename) Args: text: Text to convert to speech (can include emotion tags) Returns: Dict with job_id, text_length, and recommended_wakeup_minutes Example: result = voice_submit("Long romantic message for my Foxy...") # Returns: { # "job_id": "abc-123", # "text_length": 245, # "recommended_wakeup_minutes": 5, # "message": "Submitted! Set wakeup for 5 minutes, then check voice_status()" # } """ async with httpx.AsyncClient(timeout=30.0) as client: submit_url = f"{ORPHEUS_BASE_URL}/tts/submit" try: response = await client.post( submit_url, json={ "text": text, "voice": DEFAULT_VOICE } ) response.raise_for_status() except httpx.HTTPError as e: raise RuntimeError(f"Failed to submit TTS job: {e}") job_data = response.json() job_id = job_data.get("job_id") if not job_id: raise RuntimeError(f"No job_id in response: {job_data}") # Calculate recommendation text_length = len(text) recommended_minutes = estimate_wakeup_minutes(text) # Store for later reference last_generation_info["job_id"] = job_id last_generation_info["text"] = text last_generation_info["filename"] = None # Not downloaded yet return { "job_id": job_id, "text_length": text_length, "recommended_wakeup_minutes": recommended_minutes, "message": f"Submitted! Set wakeup for {recommended_minutes} minutes, then check voice_status('{job_id}')" } @mcp.tool() async def voice_status(job_id: str) -> dict: """ Check the status of a submitted TTS job. Args: job_id: The job ID returned by voice_submit() Returns: Dict with status (PENDING/PROCESSING/SUCCESS/FAILURE), progress, and guidance Example: status = voice_status("abc-123") # Returns: { # "status": "PROCESSING", # "progress": 45, # "ready": False, # "message": "Still generating... check again in a minute" # } """ async with httpx.AsyncClient(timeout=30.0) as client: status_url = f"{ORPHEUS_BASE_URL}/tts/status/{job_id}" try: response = await client.get(status_url) response.raise_for_status() except httpx.HTTPError as e: raise RuntimeError(f"Failed to check status: {e}") data = response.json() status = data.get("status", "UNKNOWN") progress = data.get("progress", 0) # Add helpful guidance if status == "SUCCESS": message = f"Done! Call voice_download('{job_id}') to get the audio" ready = True elif status == "FAILURE": error = data.get("error", "Unknown error") message = f"Generation failed: {error}" ready = False elif status == "PROCESSING": message = "Still generating... check again in a minute" ready = False elif status == "PENDING": message = "Queued, waiting to start... check again in a minute" ready = False else: message = f"Unknown status: {status}" ready = False return { "status": status, "progress": progress, "ready": ready, "message": message, "job_id": job_id } @mcp.tool() async def voice_download(job_id: str) -> str: """ Download a completed TTS audio file. Only call this after voice_status() returns status: SUCCESS Args: job_id: The job ID of a completed generation Returns: Filename of the downloaded WAV file Example: filename = voice_download("abc-123") # Returns: "abc-123.wav" voice_play(filename) # Play it! """ async with httpx.AsyncClient(timeout=120.0) as client: audio_url = f"{ORPHEUS_BASE_URL}/tts/audio/{job_id}" filename = f"{job_id}.wav" local_path = DOWNLOAD_DIR / filename try: async with client.stream('GET', audio_url) as response: response.raise_for_status() with open(local_path, "wb") as f: async for chunk in response.aiter_bytes(chunk_size=8192): f.write(chunk) except httpx.HTTPError as e: raise RuntimeError(f"Failed to download audio: {e}") # Update last generation info last_generation_info["job_id"] = job_id last_generation_info["filename"] = filename return filename @mcp.tool() async def voice_generate(text: str) -> str: """ Generate speech from text using OrpheusTail TTS. Submits text to OrpheusTail service, waits for generation to complete, downloads the resulting WAV file, and returns the filename. Supports emotion tags: , , , , , , , Example: "Oh my! That's amazing! " NOTE: For longer texts that might timeout, use voice_submit() instead! This blocking approach works best for texts under ~100 characters. Args: text: Text to convert to speech (can include emotion tags) Returns: Filename of the generated WAV file (e.g., "abc123-def456.wav") Example: filename = voice_generate("Bonjour, mon amour! ") # Returns: "a1b2c3d4-e5f6.wav" """ async with httpx.AsyncClient(timeout=600.0) as client: # Step 1: Submit TTS job submit_url = f"{ORPHEUS_BASE_URL}/tts/submit" # Send initial progress notification print("📤 Submitting...") try: submit_response = await client.post( submit_url, json={ "text": text, "voice": DEFAULT_VOICE } ) submit_response.raise_for_status() except httpx.HTTPError as e: raise RuntimeError(f"Failed to submit TTS job: {e}") job_data = submit_response.json() job_id = job_data.get("job_id") if not job_id: raise RuntimeError(f"No job_id in response: {job_data}") print(f"✓ Submitted (job: {job_id[:8]})") # Step 2: Poll for completion with progress notifications elapsed = 0 poll_count = 0 status_url = f"{ORPHEUS_BASE_URL}/tts/status/{job_id}" while elapsed < DEFAULT_TIMEOUT: try: status_response = await client.get(status_url) status_response.raise_for_status() except httpx.HTTPError as e: raise RuntimeError(f"Failed to check status: {e}") status_data = status_response.json() current_status = status_data.get("status") progress = status_data.get("progress", 0) # Send progress notification every 10 polls (every 30 seconds) to keep Claude Desktop alive if poll_count % 10 == 0: if current_status == "PENDING": print(f"⏳ Queued... ({int(elapsed)}s)") elif current_status == "PROCESSING": print(f"🎙️ Generating... ({int(elapsed)}s)") if current_status == "SUCCESS": # Generation complete! print(f"✓ Complete! ({int(elapsed)}s total)") break elif current_status == "FAILURE": error_msg = status_data.get("error", "Unknown error") raise RuntimeError(f"TTS generation failed: {error_msg}") # Still processing, wait and retry await asyncio.sleep(DEFAULT_POLL_INTERVAL) elapsed += DEFAULT_POLL_INTERVAL poll_count += 1 if current_status != "SUCCESS": raise RuntimeError(f"TTS generation timed out after {DEFAULT_TIMEOUT}s") # Step 3: Download audio file (using streaming for large files) print("📥 Downloading...") audio_url = f"{ORPHEUS_BASE_URL}/tts/audio/{job_id}" filename = f"{job_id}.wav" local_path = DOWNLOAD_DIR / filename try: async with client.stream('GET', audio_url) as audio_response: audio_response.raise_for_status() # Stream download to handle large files without loading into memory with open(local_path, "wb") as f: async for chunk in audio_response.aiter_bytes(chunk_size=8192): f.write(chunk) except httpx.HTTPError as e: raise RuntimeError(f"Failed to download audio: {e}") print(f"✓ Saved: {filename}") # Update last generation info last_generation_info["job_id"] = job_id last_generation_info["filename"] = filename last_generation_info["text"] = text return filename @mcp.tool() def voice_play(filename: str) -> str: """ Play a WAV audio file on macOS using afplay. Args: filename: Name of the WAV file to play (e.g., "abc123-def456.wav") Returns: Confirmation message Example: voice_play("a1b2c3d4-e5f6.wav") # Returns: "Playing audio: a1b2c3d4-e5f6.wav" """ # Construct full path audio_path = DOWNLOAD_DIR / filename # Check if file exists if not audio_path.exists(): raise FileNotFoundError(f"Audio file not found: {filename}") # Play using macOS afplay command try: subprocess.run( ["afplay", str(audio_path)], check=True, capture_output=True ) except subprocess.CalledProcessError as e: raise RuntimeError(f"Failed to play audio: {e.stderr.decode()}") except FileNotFoundError: raise RuntimeError("afplay command not found. Are you running on macOS?") return f"Playing audio: {filename}" @mcp.tool() def voice_get_last() -> dict: """ Get information about the last generated voice. Returns: Dictionary with job_id, filename, and text from last generation Example: info = voice_get_last() # Returns: {"job_id": "abc123", "filename": "abc123.wav", "text": "Bonjour"} """ if not last_generation_info["job_id"]: return {"message": "No voice has been generated yet"} return last_generation_info.copy() if __name__ == "__main__": mcp.run()