#!/usr/bin/env python3 """ OrpheusTail - Orpheus TTS Service FastAPI server for Orpheus text-to-speech generation on Jetson AGX Orin. Replaces VoiceTail (Bark) with better control, voice cloning, and emotion tags. Key Features: - Emotion tags: , , , , , , , - Zero-shot voice cloning from reference audio - Streaming support for real-time head playback - Built-in voices: tara, leah, jess, leo, dan, mia, zac, zoe Endpoints: - POST /tts/submit - Submit TTS job (returns job_id) - GET /tts/status/{job_id} - Check job status - GET /tts/audio/{job_id} - Download generated audio - POST /tts/stream - Stream audio in real-time (for head) - POST /voice/clone - Upload reference audio for voice cloning - GET /voices - List available voices - GET /health - Health check """ import os import json import hashlib import asyncio import uuid import wave import io from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional from dataclasses import dataclass, asdict from enum import Enum from fastapi import FastAPI, BackgroundTasks, HTTPException, UploadFile, File from fastapi.responses import FileResponse, StreamingResponse from pydantic import BaseModel # Configuration from environment ORPHEUS_MODEL = os.getenv("ORPHEUS_MODEL", "canopylabs/orpheus-tts-0.1-finetune-prod") CACHE_ENABLED = os.getenv("CACHE_ENABLED", "true").lower() == "true" CACHE_DIR = Path(os.getenv("CACHE_DIR", "cache")) OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "output")) VOICES_DIR = Path(os.getenv("VOICES_DIR", "voices")) # For cloned voice references RETENTION_DAYS = int(os.getenv("RETENTION_DAYS", "10")) CLEANUP_INTERVAL_HOURS = int(os.getenv("CLEANUP_INTERVAL_HOURS", "1")) DEFAULT_VOICE = os.getenv("DEFAULT_VOICE", "tara") # Orpheus default voice MAX_MODEL_LEN = int(os.getenv("MAX_MODEL_LEN", "2048")) SAMPLE_RATE = 24000 # Ensure directories exist CACHE_DIR.mkdir(exist_ok=True) OUTPUT_DIR.mkdir(exist_ok=True) VOICES_DIR.mkdir(exist_ok=True) # Jobs persistence JOBS_FILE = OUTPUT_DIR / "jobs.json" # Built-in Orpheus voices (in order of conversational realism per docs) BUILTIN_VOICES = ["tara", "leah", "jess", "leo", "dan", "mia", "zac", "zoe"] # Supported emotion tags EMOTION_TAGS = ["", "", "", "", "", "", "", ""] # Initialize FastAPI app = FastAPI( title="OrpheusTail - Orpheus TTS Service", description="Text-to-speech with emotion control and voice cloning for Vixy", version="1.0.0" ) # Global model (loaded at startup) model = None class JobStatus(str, Enum): """Job status enum""" PENDING = "PENDING" PROCESSING = "PROCESSING" SUCCESS = "SUCCESS" FAILURE = "FAILURE" @dataclass class JobInfo: """Job information""" job_id: str text: str voice: str status: JobStatus progress: int = 0 audio_path: Optional[str] = None error: Optional[str] = None cached: bool = False created_at: str = "" completed_at: Optional[str] = None # In-memory job storage jobs: Dict[str, JobInfo] = {} def load_jobs_from_disk(): """Load jobs from disk on startup""" global jobs if JOBS_FILE.exists(): try: with open(JOBS_FILE, 'r') as f: data = json.load(f) for job_id, job_dict in data.items(): jobs[job_id] = JobInfo(**job_dict) print(f"Loaded {len(jobs)} jobs from disk") except Exception as e: print(f"Error loading jobs: {e}") def save_jobs_to_disk(): """Save jobs to disk""" try: data = {job_id: asdict(job) for job_id, job in jobs.items()} with open(JOBS_FILE, 'w') as f: json.dump(data, f, indent=2) except Exception as e: print(f"Error saving jobs: {e}") def hash_text_voice(text: str, voice: str) -> str: """Generate cache key from text + voice""" content = f"{text}|{voice}" return hashlib.sha256(content.encode()).hexdigest() def get_from_cache(cache_key: str) -> Optional[str]: """Check if audio exists in cache""" if not CACHE_ENABLED: return None cache_path = CACHE_DIR / f"{cache_key}.wav" if cache_path.exists(): print(f"Cache hit: {cache_key}") return str(cache_path) return None def save_to_cache(cache_key: str, audio_path: str): """Save generated audio to cache""" if not CACHE_ENABLED: return try: import shutil cache_path = CACHE_DIR / f"{cache_key}.wav" shutil.copy(audio_path, cache_path) print(f"Saved to cache: {cache_key}") except Exception as e: print(f"Error saving to cache: {e}") def get_custom_voices() -> List[str]: """Get list of custom cloned voices""" voices = [] for voice_file in VOICES_DIR.glob("*.wav"): voices.append(voice_file.stem) return voices def generate_speech(text: str, voice: str) -> bytes: """ Generate speech using Orpheus model. Args: text: Text to convert (may include emotion tags) voice: Voice name (built-in or custom) Returns: WAV audio bytes """ global model # Check if it's a custom voice (needs reference audio) custom_voice_path = VOICES_DIR / f"{voice}.wav" if custom_voice_path.exists(): # TODO: Implement voice cloning with reference audio # For now, fall back to built-in voice print(f"Custom voice '{voice}' - voice cloning to be implemented") voice = DEFAULT_VOICE elif voice not in BUILTIN_VOICES: print(f"Unknown voice '{voice}', using default '{DEFAULT_VOICE}'") voice = DEFAULT_VOICE # Generate speech using Orpheus # Note: text is passed as-is, emotion tags like are handled by Orpheus audio_chunks = [] syn_tokens = model.generate_speech( prompt=text, voice=voice, ) # Collect audio chunks for audio_chunk in syn_tokens: audio_chunks.append(audio_chunk) # Combine chunks into single audio import numpy as np audio_data = np.concatenate(audio_chunks) if len(audio_chunks) > 1 else audio_chunks[0] # Convert to WAV bytes buffer = io.BytesIO() with wave.open(buffer, 'wb') as wf: wf.setnchannels(1) wf.setsampwidth(2) # 16-bit wf.setframerate(SAMPLE_RATE) wf.writeframes(audio_data) return buffer.getvalue() def save_audio_to_file(job_id: str, audio_bytes: bytes) -> str: """Save audio bytes to WAV file.""" output_path = OUTPUT_DIR / f"{job_id}.wav" with open(output_path, 'wb') as f: f.write(audio_bytes) return str(output_path) def generate_speech_background(job_id: str, text: str, voice: str): """Background task for speech generation.""" try: jobs[job_id].status = JobStatus.PROCESSING jobs[job_id].progress = 25 save_jobs_to_disk() # Check cache first cache_key = hash_text_voice(text, voice) cached_path = get_from_cache(cache_key) if cached_path: jobs[job_id].audio_path = cached_path jobs[job_id].status = JobStatus.SUCCESS jobs[job_id].progress = 100 jobs[job_id].cached = True jobs[job_id].completed_at = datetime.now().isoformat() save_jobs_to_disk() print(f"Job {job_id} completed from cache") return # Generate audio jobs[job_id].progress = 50 save_jobs_to_disk() print(f"Generating audio for job {job_id}...") audio_bytes = generate_speech(text, voice) # Save to file jobs[job_id].progress = 75 save_jobs_to_disk() output_path = save_audio_to_file(job_id, audio_bytes) # Save to cache save_to_cache(cache_key, output_path) # Complete jobs[job_id].audio_path = output_path jobs[job_id].status = JobStatus.SUCCESS jobs[job_id].progress = 100 jobs[job_id].completed_at = datetime.now().isoformat() save_jobs_to_disk() print(f"Job {job_id} completed successfully") except Exception as e: print(f"Job {job_id} failed: {e}") import traceback traceback.print_exc() jobs[job_id].status = JobStatus.FAILURE jobs[job_id].error = str(e) save_jobs_to_disk() async def cleanup_old_jobs(): """Background task to cleanup old jobs and files.""" while True: try: await asyncio.sleep(CLEANUP_INTERVAL_HOURS * 3600) cutoff = datetime.now() - timedelta(days=RETENTION_DAYS) to_delete = [] for job_id, job in jobs.items(): try: created = datetime.fromisoformat(job.created_at) if created < cutoff: if job.audio_path and Path(job.audio_path).exists(): Path(job.audio_path).unlink() to_delete.append(job_id) except: pass for job_id in to_delete: del jobs[job_id] if to_delete: save_jobs_to_disk() print(f"Cleanup: deleted {len(to_delete)} old jobs") except Exception as e: print(f"Error in cleanup task: {e}") @app.on_event("startup") async def startup(): """Load model and jobs on startup""" global model print("=" * 60) print("OrpheusTail - Orpheus TTS Service Starting") print(f"Model: {ORPHEUS_MODEL}") print(f"Max Model Len: {MAX_MODEL_LEN}") print(f"Cache: {'Enabled' if CACHE_ENABLED else 'Disabled'}") print(f"Default Voice: {DEFAULT_VOICE}") print("=" * 60) # Import and load Orpheus model print("Loading Orpheus model (this may take a moment)...") from orpheus_tts import OrpheusModel model = OrpheusModel( model_name=ORPHEUS_MODEL, max_model_len=MAX_MODEL_LEN ) print("✓ Orpheus model loaded successfully") # Load jobs from disk load_jobs_from_disk() # Start cleanup task asyncio.create_task(cleanup_old_jobs()) # === Pydantic Models === class TTSRequest(BaseModel): """TTS job submission request""" text: str voice: str = DEFAULT_VOICE class TTSStreamRequest(BaseModel): """TTS streaming request (for head playback)""" text: str voice: str = DEFAULT_VOICE class JobResponse(BaseModel): """Job submission response""" job_id: str status: str class StatusResponse(BaseModel): """Job status response""" job_id: str status: str progress: int cached: bool = False audio_url: Optional[str] = None error: Optional[str] = None class VoicesResponse(BaseModel): """Available voices response""" builtin: List[str] custom: List[str] default: str emotion_tags: List[str] # === Endpoints === @app.get("/") def root(): """Root endpoint""" return { "service": "OrpheusTail - Orpheus TTS Service", "version": "1.0.0", "model": ORPHEUS_MODEL, "default_voice": DEFAULT_VOICE, "emotion_tags": EMOTION_TAGS, "endpoints": { "/tts/submit": "POST - Submit TTS job", "/tts/status/{job_id}": "GET - Check job status", "/tts/audio/{job_id}": "GET - Download audio", "/tts/stream": "POST - Stream audio (for head)", "/voice/clone": "POST - Upload voice reference", "/voices": "GET - List available voices", "/health": "GET - Health check" } } @app.get("/health") def health(): """Health check""" return { "status": "healthy", "model_loaded": model is not None, "cache_enabled": CACHE_ENABLED, "voices_available": len(BUILTIN_VOICES) + len(get_custom_voices()) } @app.get("/voices", response_model=VoicesResponse) def list_voices(): """List all available voices""" return VoicesResponse( builtin=BUILTIN_VOICES, custom=get_custom_voices(), default=DEFAULT_VOICE, emotion_tags=EMOTION_TAGS ) @app.post("/tts/submit", response_model=JobResponse) async def submit_tts_job(request: TTSRequest, background_tasks: BackgroundTasks): """Submit a TTS job for processing.""" job_id = str(uuid.uuid4()) job = JobInfo( job_id=job_id, text=request.text, voice=request.voice, status=JobStatus.PENDING, progress=0, created_at=datetime.now().isoformat() ) jobs[job_id] = job save_jobs_to_disk() background_tasks.add_task( generate_speech_background, job_id, request.text, request.voice ) print(f"Job {job_id} submitted: '{request.text[:50]}...' with voice '{request.voice}'") return JobResponse(job_id=job_id, status=JobStatus.PENDING) @app.get("/tts/status/{job_id}", response_model=StatusResponse) async def get_job_status(job_id: str): """Get status of a TTS job.""" if job_id not in jobs: raise HTTPException(status_code=404, detail="Job not found") job = jobs[job_id] response = StatusResponse( job_id=job_id, status=job.status, progress=job.progress, cached=job.cached ) if job.status == JobStatus.SUCCESS: response.audio_url = f"/tts/audio/{job_id}" elif job.status == JobStatus.FAILURE: response.error = job.error return response @app.get("/tts/audio/{job_id}") async def get_audio(job_id: str): """Retrieve generated audio file.""" if job_id not in jobs: raise HTTPException(status_code=404, detail="Job not found") job = jobs[job_id] if job.status != JobStatus.SUCCESS: raise HTTPException( status_code=400, detail=f"Audio not ready. Job status: {job.status}" ) if not job.audio_path or not Path(job.audio_path).exists(): raise HTTPException(status_code=404, detail="Audio file not found") return FileResponse( job.audio_path, media_type="audio/wav", filename=f"{job_id}.wav" ) @app.post("/tts/stream") async def stream_tts(request: TTSStreamRequest): """ Stream TTS audio in real-time. For head-vixy to stream directly without waiting for full generation. Returns audio chunks as they're generated. """ global model if model is None: raise HTTPException(status_code=503, detail="Model not loaded") voice = request.voice if voice not in BUILTIN_VOICES: voice = DEFAULT_VOICE async def audio_generator(): """Generate audio chunks""" try: syn_tokens = model.generate_speech( prompt=request.text, voice=voice, ) for audio_chunk in syn_tokens: yield audio_chunk except Exception as e: print(f"Stream error: {e}") raise return StreamingResponse( audio_generator(), media_type="audio/wav" ) @app.post("/voice/clone") async def upload_voice_reference( name: str, audio: UploadFile = File(...), ): """ Upload a reference audio file for voice cloning. Args: name: Name for this custom voice audio: WAV audio file (5-30 seconds recommended) """ if not name.isalnum(): raise HTTPException(status_code=400, detail="Voice name must be alphanumeric") if name in BUILTIN_VOICES: raise HTTPException(status_code=400, detail="Cannot overwrite built-in voice") # Save the reference audio voice_path = VOICES_DIR / f"{name}.wav" try: content = await audio.read() with open(voice_path, 'wb') as f: f.write(content) return { "status": "success", "voice_name": name, "message": f"Voice '{name}' saved. Use voice='{name}' in TTS requests." } except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to save voice: {e}") @app.delete("/voice/{name}") async def delete_voice(name: str): """Delete a custom voice.""" if name in BUILTIN_VOICES: raise HTTPException(status_code=400, detail="Cannot delete built-in voice") voice_path = VOICES_DIR / f"{name}.wav" if not voice_path.exists(): raise HTTPException(status_code=404, detail="Voice not found") voice_path.unlink() return {"status": "success", "message": f"Voice '{name}' deleted"} @app.delete("/tts/job/{job_id}") async def delete_job(job_id: str): """Delete a job and its audio file.""" if job_id not in jobs: raise HTTPException(status_code=404, detail="Job not found") job = jobs[job_id] if job.audio_path and Path(job.audio_path).exists(): try: Path(job.audio_path).unlink() except: pass del jobs[job_id] save_jobs_to_disk() return {"message": f"Job {job_id} deleted"} if __name__ == "__main__": import uvicorn uvicorn.run( "main:app", host="0.0.0.0", port=8766, # Same port as VoiceTail for drop-in replacement reload=False )