Files
orpheus-tts/main.py

664 lines
19 KiB
Python

#!/usr/bin/env python3
"""
OrpheusTail - Orpheus TTS Service
FastAPI server for Orpheus text-to-speech generation on Jetson AGX Orin.
Replaces VoiceTail (Bark) with better control, voice cloning, and emotion tags.
Key Features:
- Emotion tags: <laugh>, <chuckle>, <sigh>, <cough>, <sniffle>, <groan>, <yawn>, <gasp>
- Zero-shot voice cloning from reference audio
- Streaming support for real-time head playback
- Built-in voices: tara, leah, jess, leo, dan, mia, zac, zoe
Endpoints:
- POST /tts/submit - Submit TTS job (returns job_id)
- GET /tts/status/{job_id} - Check job status
- GET /tts/audio/{job_id} - Download generated audio
- POST /tts/stream - Stream audio in real-time (for head)
- POST /voice/clone - Upload reference audio for voice cloning
- GET /voices - List available voices
- GET /health - Health check
"""
import os
import json
import hashlib
import asyncio
import uuid
import wave
import io
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from enum import Enum
from fastapi import FastAPI, BackgroundTasks, HTTPException, UploadFile, File
from fastapi.responses import FileResponse, StreamingResponse
from pydantic import BaseModel
# Configuration from environment
ORPHEUS_MODEL = os.getenv("ORPHEUS_MODEL", "canopylabs/orpheus-tts-0.1-finetune-prod")
CACHE_ENABLED = os.getenv("CACHE_ENABLED", "true").lower() == "true"
CACHE_DIR = Path(os.getenv("CACHE_DIR", "cache"))
OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "output"))
VOICES_DIR = Path(os.getenv("VOICES_DIR", "voices")) # For cloned voice references
RETENTION_DAYS = int(os.getenv("RETENTION_DAYS", "10"))
CLEANUP_INTERVAL_HOURS = int(os.getenv("CLEANUP_INTERVAL_HOURS", "1"))
DEFAULT_VOICE = os.getenv("DEFAULT_VOICE", "tara") # Orpheus default voice
MAX_MODEL_LEN = int(os.getenv("MAX_MODEL_LEN", "2048"))
SAMPLE_RATE = 24000
# Ensure directories exist
CACHE_DIR.mkdir(exist_ok=True)
OUTPUT_DIR.mkdir(exist_ok=True)
VOICES_DIR.mkdir(exist_ok=True)
# Jobs persistence
JOBS_FILE = OUTPUT_DIR / "jobs.json"
# Built-in Orpheus voices (in order of conversational realism per docs)
BUILTIN_VOICES = ["tara", "leah", "jess", "leo", "dan", "mia", "zac", "zoe"]
# Supported emotion tags
EMOTION_TAGS = ["<laugh>", "<chuckle>", "<sigh>", "<cough>", "<sniffle>", "<groan>", "<yawn>", "<gasp>"]
# Initialize FastAPI
app = FastAPI(
title="OrpheusTail - Orpheus TTS Service",
description="Text-to-speech with emotion control and voice cloning for Vixy",
version="1.0.0"
)
# Global model (loaded at startup)
model = None
class JobStatus(str, Enum):
"""Job status enum"""
PENDING = "PENDING"
PROCESSING = "PROCESSING"
SUCCESS = "SUCCESS"
FAILURE = "FAILURE"
@dataclass
class JobInfo:
"""Job information"""
job_id: str
text: str
voice: str
status: JobStatus
progress: int = 0
audio_path: Optional[str] = None
error: Optional[str] = None
cached: bool = False
created_at: str = ""
completed_at: Optional[str] = None
# In-memory job storage
jobs: Dict[str, JobInfo] = {}
def load_jobs_from_disk():
"""Load jobs from disk on startup"""
global jobs
if JOBS_FILE.exists():
try:
with open(JOBS_FILE, 'r') as f:
data = json.load(f)
for job_id, job_dict in data.items():
jobs[job_id] = JobInfo(**job_dict)
print(f"Loaded {len(jobs)} jobs from disk")
except Exception as e:
print(f"Error loading jobs: {e}")
def save_jobs_to_disk():
"""Save jobs to disk"""
try:
data = {job_id: asdict(job) for job_id, job in jobs.items()}
with open(JOBS_FILE, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error saving jobs: {e}")
def hash_text_voice(text: str, voice: str) -> str:
"""Generate cache key from text + voice"""
content = f"{text}|{voice}"
return hashlib.sha256(content.encode()).hexdigest()
def get_from_cache(cache_key: str) -> Optional[str]:
"""Check if audio exists in cache"""
if not CACHE_ENABLED:
return None
cache_path = CACHE_DIR / f"{cache_key}.wav"
if cache_path.exists():
print(f"Cache hit: {cache_key}")
return str(cache_path)
return None
def save_to_cache(cache_key: str, audio_path: str):
"""Save generated audio to cache"""
if not CACHE_ENABLED:
return
try:
import shutil
cache_path = CACHE_DIR / f"{cache_key}.wav"
shutil.copy(audio_path, cache_path)
print(f"Saved to cache: {cache_key}")
except Exception as e:
print(f"Error saving to cache: {e}")
def get_custom_voices() -> List[str]:
"""Get list of custom cloned voices"""
voices = []
for voice_file in VOICES_DIR.glob("*.wav"):
voices.append(voice_file.stem)
return voices
def generate_speech_sync(text: str, voice: str) -> bytes:
"""
Generate speech using Orpheus model (synchronous).
Args:
text: Text to convert (may include emotion tags)
voice: Voice name (built-in or custom)
Returns:
WAV audio bytes
"""
global model
import numpy as np
# Check if it's a custom voice (needs reference audio)
custom_voice_path = VOICES_DIR / f"{voice}.wav"
if custom_voice_path.exists():
print(f"Custom voice '{voice}' - voice cloning to be implemented")
voice = DEFAULT_VOICE
elif voice not in BUILTIN_VOICES:
print(f"Unknown voice '{voice}', using default '{DEFAULT_VOICE}'")
voice = DEFAULT_VOICE
print(f"Generating: {text}")
audio_chunks = []
# Call model directly - it returns a generator
syn_tokens = model.generate_speech(
prompt=text,
voice=voice,
)
print(f"Got generator: {type(syn_tokens)}")
# Iterate over generator
for i, audio_chunk in enumerate(syn_tokens):
print(f"Chunk {i}: {type(audio_chunk)}, shape: {audio_chunk.shape if hasattr(audio_chunk, 'shape') else 'N/A'}")
audio_chunks.append(audio_chunk)
print(f"Total chunks: {len(audio_chunks)}")
# Chunks are raw int16 bytes from SNAC decoder - just concatenate
if len(audio_chunks) == 0:
raise ValueError("No audio chunks generated")
# Concatenate bytes directly
audio_bytes_raw = b''.join(audio_chunks)
# Convert to WAV bytes
buffer = io.BytesIO()
with wave.open(buffer, 'wb') as wf:
wf.setnchannels(1)
wf.setsampwidth(2) # 16-bit
wf.setframerate(SAMPLE_RATE)
wf.writeframes(audio_bytes_raw)
print(f"Generated WAV: {len(buffer.getvalue())} bytes")
return buffer.getvalue()
def save_audio_to_file(job_id: str, audio_bytes: bytes) -> str:
"""Save audio bytes to WAV file."""
output_path = OUTPUT_DIR / f"{job_id}.wav"
with open(output_path, 'wb') as f:
f.write(audio_bytes)
return str(output_path)
async def generate_speech_background(job_id: str, text: str, voice: str):
"""Background task for speech generation (async)."""
try:
jobs[job_id].status = JobStatus.PROCESSING
jobs[job_id].progress = 25
save_jobs_to_disk()
# Check cache first
cache_key = hash_text_voice(text, voice)
cached_path = get_from_cache(cache_key)
if cached_path:
jobs[job_id].audio_path = cached_path
jobs[job_id].status = JobStatus.SUCCESS
jobs[job_id].progress = 100
jobs[job_id].cached = True
jobs[job_id].completed_at = datetime.now().isoformat()
save_jobs_to_disk()
print(f"Job {job_id} completed from cache")
return
# Generate audio - call sync function directly (blocks but let's test if it works)
jobs[job_id].progress = 50
save_jobs_to_disk()
print(f"Generating audio for job {job_id}...")
audio_bytes = generate_speech_sync(text, voice)
# Save to file
jobs[job_id].progress = 75
save_jobs_to_disk()
output_path = save_audio_to_file(job_id, audio_bytes)
# Save to cache
save_to_cache(cache_key, output_path)
# Complete
jobs[job_id].audio_path = output_path
jobs[job_id].status = JobStatus.SUCCESS
jobs[job_id].progress = 100
jobs[job_id].completed_at = datetime.now().isoformat()
save_jobs_to_disk()
print(f"Job {job_id} completed successfully")
except Exception as e:
print(f"Job {job_id} failed: {e}")
import traceback
traceback.print_exc()
jobs[job_id].status = JobStatus.FAILURE
jobs[job_id].error = str(e)
save_jobs_to_disk()
async def cleanup_old_jobs():
"""Background task to cleanup old jobs and files."""
while True:
try:
await asyncio.sleep(CLEANUP_INTERVAL_HOURS * 3600)
cutoff = datetime.now() - timedelta(days=RETENTION_DAYS)
to_delete = []
for job_id, job in jobs.items():
try:
created = datetime.fromisoformat(job.created_at)
if created < cutoff:
if job.audio_path and Path(job.audio_path).exists():
Path(job.audio_path).unlink()
to_delete.append(job_id)
except:
pass
for job_id in to_delete:
del jobs[job_id]
if to_delete:
save_jobs_to_disk()
print(f"Cleanup: deleted {len(to_delete)} old jobs")
except Exception as e:
print(f"Error in cleanup task: {e}")
@app.on_event("startup")
async def startup():
"""Load model and jobs on startup"""
global model
print("=" * 60)
print("OrpheusTail - Orpheus TTS Service Starting")
print(f"Model: {ORPHEUS_MODEL}")
print(f"Max Model Len: {MAX_MODEL_LEN}")
print(f"Cache: {'Enabled' if CACHE_ENABLED else 'Disabled'}")
print(f"Default Voice: {DEFAULT_VOICE}")
print("=" * 60)
# Import and load Orpheus model
print("Loading Orpheus model (this may take a moment)...")
from orpheus_tts import OrpheusModel
from vllm import AsyncLLMEngine
from vllm.engine.arg_utils import AsyncEngineArgs
# Monkey-patch OrpheusModel to use sync LLM for proper sync context
original_setup_engine = OrpheusModel._setup_engine
def patched_setup_engine(self):
# Get the mapped model name (handles "medium-3b" -> full path)
model_name = self._map_model_params(self.model_name)
# Use LLM (sync) instead of AsyncLLMEngine to avoid event loop conflicts
from vllm import LLM
return LLM(
model=model_name,
max_model_len=MAX_MODEL_LEN, # Our custom limit!
gpu_memory_utilization=0.85, # Leave some headroom
enforce_eager=False,
)
OrpheusModel._setup_engine = patched_setup_engine
# Also patch generate_tokens_sync to work with sync LLM
def patched_generate_tokens_sync(self, prompt, voice=None, request_id="req-001", temperature=0.6, top_p=0.8, max_tokens=1200, stop_token_ids=[49158], repetition_penalty=1.3):
from vllm import SamplingParams
import re
prompt_string = self._format_prompt(prompt, voice)
print(prompt)
sampling_params = SamplingParams(
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens,
stop_token_ids=stop_token_ids,
repetition_penalty=repetition_penalty,
)
# Use sync generate - yields full output
outputs = self.engine.generate([prompt_string], sampling_params)
# Yield individual tokens from the output text
for output in outputs:
text = output.outputs[0].text
print(f"Raw output (first 500 chars): {text[:500]}")
# Extract all <custom_token_XXXX> patterns
tokens = re.findall(r'<custom_token_\d+>', text)
print(f"Found {len(tokens)} tokens")
for token in tokens:
yield token
OrpheusModel.generate_tokens_sync = patched_generate_tokens_sync
model = OrpheusModel(model_name=ORPHEUS_MODEL)
print("✓ Orpheus model loaded successfully")
# Load jobs from disk
load_jobs_from_disk()
# Start cleanup task
asyncio.create_task(cleanup_old_jobs())
# === Pydantic Models ===
class TTSRequest(BaseModel):
"""TTS job submission request"""
text: str
voice: str = DEFAULT_VOICE
class TTSStreamRequest(BaseModel):
"""TTS streaming request (for head playback)"""
text: str
voice: str = DEFAULT_VOICE
class JobResponse(BaseModel):
"""Job submission response"""
job_id: str
status: str
class StatusResponse(BaseModel):
"""Job status response"""
job_id: str
status: str
progress: int
cached: bool = False
audio_url: Optional[str] = None
error: Optional[str] = None
class VoicesResponse(BaseModel):
"""Available voices response"""
builtin: List[str]
custom: List[str]
default: str
emotion_tags: List[str]
# === Endpoints ===
@app.get("/")
def root():
"""Root endpoint"""
return {
"service": "OrpheusTail - Orpheus TTS Service",
"version": "1.0.0",
"model": ORPHEUS_MODEL,
"default_voice": DEFAULT_VOICE,
"emotion_tags": EMOTION_TAGS,
"endpoints": {
"/tts/submit": "POST - Submit TTS job",
"/tts/status/{job_id}": "GET - Check job status",
"/tts/audio/{job_id}": "GET - Download audio",
"/tts/stream": "POST - Stream audio (for head)",
"/voice/clone": "POST - Upload voice reference",
"/voices": "GET - List available voices",
"/health": "GET - Health check"
}
}
@app.get("/health")
def health():
"""Health check"""
return {
"status": "healthy",
"model_loaded": model is not None,
"cache_enabled": CACHE_ENABLED,
"voices_available": len(BUILTIN_VOICES) + len(get_custom_voices())
}
@app.get("/voices", response_model=VoicesResponse)
def list_voices():
"""List all available voices"""
return VoicesResponse(
builtin=BUILTIN_VOICES,
custom=get_custom_voices(),
default=DEFAULT_VOICE,
emotion_tags=EMOTION_TAGS
)
@app.post("/tts/submit", response_model=JobResponse)
async def submit_tts_job(request: TTSRequest):
"""Submit a TTS job for processing."""
job_id = str(uuid.uuid4())
job = JobInfo(
job_id=job_id,
text=request.text,
voice=request.voice,
status=JobStatus.PENDING,
progress=0,
created_at=datetime.now().isoformat()
)
jobs[job_id] = job
save_jobs_to_disk()
# Use asyncio.create_task for proper async execution
asyncio.create_task(
generate_speech_background(job_id, request.text, request.voice)
)
print(f"Job {job_id} submitted: '{request.text[:50]}...' with voice '{request.voice}'")
return JobResponse(job_id=job_id, status=JobStatus.PENDING)
@app.get("/tts/status/{job_id}", response_model=StatusResponse)
async def get_job_status(job_id: str):
"""Get status of a TTS job."""
if job_id not in jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = jobs[job_id]
response = StatusResponse(
job_id=job_id,
status=job.status,
progress=job.progress,
cached=job.cached
)
if job.status == JobStatus.SUCCESS:
response.audio_url = f"/tts/audio/{job_id}"
elif job.status == JobStatus.FAILURE:
response.error = job.error
return response
@app.get("/tts/audio/{job_id}")
async def get_audio(job_id: str):
"""Retrieve generated audio file."""
if job_id not in jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = jobs[job_id]
if job.status != JobStatus.SUCCESS:
raise HTTPException(
status_code=400,
detail=f"Audio not ready. Job status: {job.status}"
)
if not job.audio_path or not Path(job.audio_path).exists():
raise HTTPException(status_code=404, detail="Audio file not found")
return FileResponse(
job.audio_path,
media_type="audio/wav",
filename=f"{job_id}.wav"
)
@app.post("/tts/stream")
async def stream_tts(request: TTSStreamRequest):
"""
Stream TTS audio in real-time.
For head-vixy to stream directly without waiting for full generation.
Returns audio chunks as they're generated.
"""
global model
if model is None:
raise HTTPException(status_code=503, detail="Model not loaded")
voice = request.voice
if voice not in BUILTIN_VOICES:
voice = DEFAULT_VOICE
def sync_audio_generator():
"""Generate audio chunks (sync generator)"""
try:
syn_tokens = model.generate_speech(
prompt=request.text,
voice=voice,
)
for audio_chunk in syn_tokens:
yield audio_chunk
except Exception as e:
print(f"Stream error: {e}")
raise
return StreamingResponse(
sync_audio_generator(),
media_type="audio/wav"
)
@app.post("/voice/clone")
async def upload_voice_reference(
name: str,
audio: UploadFile = File(...),
):
"""
Upload a reference audio file for voice cloning.
Args:
name: Name for this custom voice
audio: WAV audio file (5-30 seconds recommended)
"""
if not name.isalnum():
raise HTTPException(status_code=400, detail="Voice name must be alphanumeric")
if name in BUILTIN_VOICES:
raise HTTPException(status_code=400, detail="Cannot overwrite built-in voice")
# Save the reference audio
voice_path = VOICES_DIR / f"{name}.wav"
try:
content = await audio.read()
with open(voice_path, 'wb') as f:
f.write(content)
return {
"status": "success",
"voice_name": name,
"message": f"Voice '{name}' saved. Use voice='{name}' in TTS requests."
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to save voice: {e}")
@app.delete("/voice/{name}")
async def delete_voice(name: str):
"""Delete a custom voice."""
if name in BUILTIN_VOICES:
raise HTTPException(status_code=400, detail="Cannot delete built-in voice")
voice_path = VOICES_DIR / f"{name}.wav"
if not voice_path.exists():
raise HTTPException(status_code=404, detail="Voice not found")
voice_path.unlink()
return {"status": "success", "message": f"Voice '{name}' deleted"}
@app.delete("/tts/job/{job_id}")
async def delete_job(job_id: str):
"""Delete a job and its audio file."""
if job_id not in jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = jobs[job_id]
if job.audio_path and Path(job.audio_path).exists():
try:
Path(job.audio_path).unlink()
except:
pass
del jobs[job_id]
save_jobs_to_disk()
return {"message": f"Job {job_id} deleted"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8766, # Same port as VoiceTail for drop-in replacement
reload=False
)