Files
orpheus-tts/main.py
Alex cfc9b1a5a0 Revert to sync LLM + sentence-level streaming
AsyncLLMEngine hangs on Jetson during model loading. Reverted to sync
LLM but added fine-grained text chunking (chunk_text_fine, ~200 chars)
for the stream endpoint. Each sentence/clause generates independently,
so first audio plays after ~2-4s instead of waiting for the full text.

Not true token-level streaming, but a significant latency reduction
for multi-sentence utterances without AsyncLLMEngine dependency.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 23:45:11 -05:00

732 lines
22 KiB
Python

#!/usr/bin/env python3
"""
OrpheusTail - Orpheus TTS Service
FastAPI server for Orpheus text-to-speech generation on Jetson AGX Orin.
Replaces VoiceTail (Bark) with better control, voice cloning, and emotion tags.
Key Features:
- Emotion tags: <laugh>, <chuckle>, <sigh>, <cough>, <sniffle>, <groan>, <yawn>, <gasp>
- Zero-shot voice cloning from reference audio
- Streaming support for real-time head playback
- Built-in voices: tara, leah, jess, leo, dan, mia, zac, zoe
Endpoints:
- POST /tts/submit - Submit TTS job (returns job_id)
- GET /tts/status/{job_id} - Check job status
- GET /tts/audio/{job_id} - Download generated audio
- POST /tts/stream - Stream audio in real-time (for head)
- POST /voice/clone - Upload reference audio for voice cloning
- GET /voices - List available voices
- GET /health - Health check
"""
import os
import re
import json
import hashlib
import asyncio
import uuid
import wave
import io
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from enum import Enum
from fastapi import FastAPI, BackgroundTasks, HTTPException, UploadFile, File
from fastapi.responses import FileResponse, StreamingResponse
from pydantic import BaseModel
# Configuration from environment
ORPHEUS_MODEL = os.getenv("ORPHEUS_MODEL", "canopylabs/orpheus-tts-0.1-finetune-prod")
CACHE_ENABLED = os.getenv("CACHE_ENABLED", "true").lower() == "true"
CACHE_DIR = Path(os.getenv("CACHE_DIR", "cache"))
OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "output"))
VOICES_DIR = Path(os.getenv("VOICES_DIR", "voices")) # For cloned voice references
RETENTION_DAYS = int(os.getenv("RETENTION_DAYS", "10"))
CLEANUP_INTERVAL_HOURS = int(os.getenv("CLEANUP_INTERVAL_HOURS", "1"))
DEFAULT_VOICE = os.getenv("DEFAULT_VOICE", "tara") # Orpheus default voice
MAX_MODEL_LEN = int(os.getenv("MAX_MODEL_LEN", "8192"))
MAX_TOKENS = int(os.getenv("MAX_TOKENS", "8000"))
SAMPLE_RATE = 24000
# Ensure directories exist
CACHE_DIR.mkdir(exist_ok=True)
OUTPUT_DIR.mkdir(exist_ok=True)
VOICES_DIR.mkdir(exist_ok=True)
# Jobs persistence
JOBS_FILE = OUTPUT_DIR / "jobs.json"
# Built-in Orpheus voices (in order of conversational realism per docs)
BUILTIN_VOICES = ["tara", "leah", "jess", "leo", "dan", "mia", "zac", "zoe"]
# Supported emotion tags
EMOTION_TAGS = ["<laugh>", "<chuckle>", "<sigh>", "<cough>", "<sniffle>", "<groan>", "<yawn>", "<gasp>"]
# Initialize FastAPI
app = FastAPI(
title="OrpheusTail - Orpheus TTS Service",
description="Text-to-speech with emotion control and voice cloning for Vixy",
version="1.0.0"
)
# Global model (loaded at startup)
model = None
class JobStatus(str, Enum):
"""Job status enum"""
PENDING = "PENDING"
PROCESSING = "PROCESSING"
SUCCESS = "SUCCESS"
FAILURE = "FAILURE"
@dataclass
class JobInfo:
"""Job information"""
job_id: str
text: str
voice: str
status: JobStatus
progress: int = 0
audio_path: Optional[str] = None
error: Optional[str] = None
cached: bool = False
created_at: str = ""
completed_at: Optional[str] = None
# In-memory job storage
jobs: Dict[str, JobInfo] = {}
def load_jobs_from_disk():
"""Load jobs from disk on startup"""
global jobs
if JOBS_FILE.exists():
try:
with open(JOBS_FILE, 'r') as f:
data = json.load(f)
for job_id, job_dict in data.items():
jobs[job_id] = JobInfo(**job_dict)
print(f"Loaded {len(jobs)} jobs from disk")
except Exception as e:
print(f"Error loading jobs: {e}")
def save_jobs_to_disk():
"""Save jobs to disk"""
try:
data = {job_id: asdict(job) for job_id, job in jobs.items()}
with open(JOBS_FILE, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error saving jobs: {e}")
def hash_text_voice(text: str, voice: str) -> str:
"""Generate cache key from text + voice"""
content = f"{text}|{voice}"
return hashlib.sha256(content.encode()).hexdigest()
def get_from_cache(cache_key: str) -> Optional[str]:
"""Check if audio exists in cache"""
if not CACHE_ENABLED:
return None
cache_path = CACHE_DIR / f"{cache_key}.wav"
if cache_path.exists():
print(f"Cache hit: {cache_key}")
return str(cache_path)
return None
def save_to_cache(cache_key: str, audio_path: str):
"""Save generated audio to cache"""
if not CACHE_ENABLED:
return
try:
import shutil
cache_path = CACHE_DIR / f"{cache_key}.wav"
shutil.copy(audio_path, cache_path)
print(f"Saved to cache: {cache_key}")
except Exception as e:
print(f"Error saving to cache: {e}")
def get_custom_voices() -> List[str]:
"""Get list of custom cloned voices"""
voices = []
for voice_file in VOICES_DIR.glob("*.wav"):
voices.append(voice_file.stem)
return voices
def chunk_text(text: str, max_chars: int = 800) -> List[str]:
"""
Split text into chunks at sentence boundaries for sequential TTS generation.
Ensures no chunk exceeds max_chars (unless a single sentence is longer).
Preserves emotion tags within sentences.
"""
# Split on sentence-ending punctuation followed by whitespace
sentences = re.split(r'(?<=[.!?])\s+', text.strip())
chunks = []
current_chunk = []
current_len = 0
for sentence in sentences:
sentence_len = len(sentence)
# If adding this sentence would exceed the limit, finalize current chunk
if current_chunk and current_len + 1 + sentence_len > max_chars:
chunks.append(' '.join(current_chunk))
current_chunk = []
current_len = 0
current_chunk.append(sentence)
current_len += sentence_len + (1 if current_len > 0 else 0)
# Don't forget the last chunk
if current_chunk:
chunks.append(' '.join(current_chunk))
return chunks if chunks else [text]
def chunk_text_fine(text: str, max_chars: int = 200) -> List[str]:
"""
Split text into fine-grained chunks for streaming — every sentence or clause.
Smaller chunks = faster first-audio, slight quality tradeoff at boundaries.
"""
# Split on sentence boundaries AND commas/semicolons with reasonable length
parts = re.split(r'(?<=[.!?;])\s+', text.strip())
# Further split long parts on commas
chunks = []
for part in parts:
if len(part) <= max_chars:
chunks.append(part)
else:
# Split on commas
sub = re.split(r',\s+', part)
current = []
current_len = 0
for s in sub:
if current and current_len + len(s) > max_chars:
chunks.append(', '.join(current))
current = []
current_len = 0
current.append(s)
current_len += len(s)
if current:
chunks.append(', '.join(current))
# Filter empty chunks
return [c.strip() for c in chunks if c.strip()]
def generate_speech_sync(text: str, voice: str) -> bytes:
"""
Generate speech using Orpheus model (synchronous).
Args:
text: Text to convert (may include emotion tags)
voice: Voice name (built-in or custom)
Returns:
WAV audio bytes
"""
global model
import numpy as np
# Check if it's a custom voice (needs reference audio)
custom_voice_path = VOICES_DIR / f"{voice}.wav"
if custom_voice_path.exists():
print(f"Custom voice '{voice}' - voice cloning to be implemented")
voice = DEFAULT_VOICE
elif voice not in BUILTIN_VOICES:
print(f"Unknown voice '{voice}', using default '{DEFAULT_VOICE}'")
voice = DEFAULT_VOICE
# Split long text into chunks for sequential generation
text_chunks = chunk_text(text)
print(f"Generating: {text} ({len(text_chunks)} chunk(s))")
all_pcm = []
for chunk_idx, chunk in enumerate(text_chunks):
print(f"Chunk {chunk_idx + 1}/{len(text_chunks)}: {chunk[:80]}...")
audio_chunks = []
syn_tokens = model.generate_speech(
prompt=chunk,
voice=voice,
max_tokens=MAX_TOKENS,
)
for i, audio_chunk in enumerate(syn_tokens):
audio_chunks.append(audio_chunk)
print(f" -> {len(audio_chunks)} audio frames")
if audio_chunks:
all_pcm.append(b''.join(audio_chunks))
if not all_pcm:
raise ValueError("No audio chunks generated")
# Concatenate raw PCM from all chunks, wrap in single WAV
audio_bytes_raw = b''.join(all_pcm)
buffer = io.BytesIO()
with wave.open(buffer, 'wb') as wf:
wf.setnchannels(1)
wf.setsampwidth(2) # 16-bit
wf.setframerate(SAMPLE_RATE)
wf.writeframes(audio_bytes_raw)
print(f"Generated WAV: {len(buffer.getvalue())} bytes")
return buffer.getvalue()
def save_audio_to_file(job_id: str, audio_bytes: bytes) -> str:
"""Save audio bytes to WAV file."""
output_path = OUTPUT_DIR / f"{job_id}.wav"
with open(output_path, 'wb') as f:
f.write(audio_bytes)
return str(output_path)
async def generate_speech_background(job_id: str, text: str, voice: str):
"""Background task for speech generation (async)."""
try:
jobs[job_id].status = JobStatus.PROCESSING
jobs[job_id].progress = 25
save_jobs_to_disk()
# Check cache first
cache_key = hash_text_voice(text, voice)
cached_path = get_from_cache(cache_key)
if cached_path:
jobs[job_id].audio_path = cached_path
jobs[job_id].status = JobStatus.SUCCESS
jobs[job_id].progress = 100
jobs[job_id].cached = True
jobs[job_id].completed_at = datetime.now().isoformat()
save_jobs_to_disk()
print(f"Job {job_id} completed from cache")
return
# Generate audio - call sync function directly (blocks but let's test if it works)
jobs[job_id].progress = 50
save_jobs_to_disk()
print(f"Generating audio for job {job_id}...")
audio_bytes = generate_speech_sync(text, voice)
# Save to file
jobs[job_id].progress = 75
save_jobs_to_disk()
output_path = save_audio_to_file(job_id, audio_bytes)
# Save to cache
save_to_cache(cache_key, output_path)
# Complete
jobs[job_id].audio_path = output_path
jobs[job_id].status = JobStatus.SUCCESS
jobs[job_id].progress = 100
jobs[job_id].completed_at = datetime.now().isoformat()
save_jobs_to_disk()
print(f"Job {job_id} completed successfully")
except Exception as e:
print(f"Job {job_id} failed: {e}")
import traceback
traceback.print_exc()
jobs[job_id].status = JobStatus.FAILURE
jobs[job_id].error = str(e)
save_jobs_to_disk()
async def cleanup_old_jobs():
"""Background task to cleanup old jobs and files."""
while True:
try:
await asyncio.sleep(CLEANUP_INTERVAL_HOURS * 3600)
cutoff = datetime.now() - timedelta(days=RETENTION_DAYS)
to_delete = []
for job_id, job in jobs.items():
try:
created = datetime.fromisoformat(job.created_at)
if created < cutoff:
if job.audio_path and Path(job.audio_path).exists():
Path(job.audio_path).unlink()
to_delete.append(job_id)
except:
pass
for job_id in to_delete:
del jobs[job_id]
if to_delete:
save_jobs_to_disk()
print(f"Cleanup: deleted {len(to_delete)} old jobs")
except Exception as e:
print(f"Error in cleanup task: {e}")
@app.on_event("startup")
async def startup():
"""Load model and jobs on startup"""
global model
print("=" * 60)
print("OrpheusTail - Orpheus TTS Service Starting")
print(f"Model: {ORPHEUS_MODEL}")
print(f"Max Model Len: {MAX_MODEL_LEN}")
print(f"Max Tokens: {MAX_TOKENS}")
print(f"Cache: {'Enabled' if CACHE_ENABLED else 'Disabled'}")
print(f"Default Voice: {DEFAULT_VOICE}")
print("=" * 60)
# Import and load Orpheus model
print("Loading Orpheus model (this may take a moment)...")
from orpheus_tts import OrpheusModel
from vllm import AsyncLLMEngine
from vllm.engine.arg_utils import AsyncEngineArgs
# Monkey-patch OrpheusModel to use sync LLM (AsyncLLMEngine hangs on Jetson)
original_setup_engine = OrpheusModel._setup_engine
def patched_setup_engine(self):
model_name = self._map_model_params(self.model_name)
from vllm import LLM
return LLM(
model=model_name,
max_model_len=MAX_MODEL_LEN,
gpu_memory_utilization=0.85,
enforce_eager=False,
)
OrpheusModel._setup_engine = patched_setup_engine
# Sync token generation
def patched_generate_tokens_sync(self, prompt, voice=None, request_id="req-001",
temperature=0.6, top_p=0.8, max_tokens=MAX_TOKENS,
stop_token_ids=[49158], repetition_penalty=1.3):
from vllm import SamplingParams
import re
prompt_string = self._format_prompt(prompt, voice)
print(prompt)
sampling_params = SamplingParams(
temperature=temperature, top_p=top_p, max_tokens=max_tokens,
stop_token_ids=stop_token_ids, repetition_penalty=repetition_penalty,
)
outputs = self.engine.generate([prompt_string], sampling_params)
for output in outputs:
text = output.outputs[0].text
print(f"Raw output (first 500 chars): {text[:500]}")
tokens = re.findall(r'<custom_token_\d+>', text)
print(f"Found {len(tokens)} tokens")
for token in tokens:
yield token
OrpheusModel.generate_tokens_sync = patched_generate_tokens_sync
model = OrpheusModel(model_name=ORPHEUS_MODEL)
print("✓ Orpheus model loaded successfully")
# Load jobs from disk
load_jobs_from_disk()
# Start cleanup task
asyncio.create_task(cleanup_old_jobs())
# === Pydantic Models ===
class TTSRequest(BaseModel):
"""TTS job submission request"""
text: str
voice: str = DEFAULT_VOICE
class TTSStreamRequest(BaseModel):
"""TTS streaming request (for head playback)"""
text: str
voice: str = DEFAULT_VOICE
class JobResponse(BaseModel):
"""Job submission response"""
job_id: str
status: str
class StatusResponse(BaseModel):
"""Job status response"""
job_id: str
status: str
progress: int
cached: bool = False
audio_url: Optional[str] = None
error: Optional[str] = None
class VoicesResponse(BaseModel):
"""Available voices response"""
builtin: List[str]
custom: List[str]
default: str
emotion_tags: List[str]
# === Endpoints ===
@app.get("/")
def root():
"""Root endpoint"""
return {
"service": "OrpheusTail - Orpheus TTS Service",
"version": "1.0.0",
"model": ORPHEUS_MODEL,
"default_voice": DEFAULT_VOICE,
"emotion_tags": EMOTION_TAGS,
"endpoints": {
"/tts/submit": "POST - Submit TTS job",
"/tts/status/{job_id}": "GET - Check job status",
"/tts/audio/{job_id}": "GET - Download audio",
"/tts/stream": "POST - Stream audio (for head)",
"/voice/clone": "POST - Upload voice reference",
"/voices": "GET - List available voices",
"/health": "GET - Health check"
}
}
@app.get("/health")
def health():
"""Health check"""
return {
"status": "healthy",
"model_loaded": model is not None,
"cache_enabled": CACHE_ENABLED,
"voices_available": len(BUILTIN_VOICES) + len(get_custom_voices())
}
@app.get("/voices", response_model=VoicesResponse)
def list_voices():
"""List all available voices"""
return VoicesResponse(
builtin=BUILTIN_VOICES,
custom=get_custom_voices(),
default=DEFAULT_VOICE,
emotion_tags=EMOTION_TAGS
)
@app.post("/tts/submit", response_model=JobResponse)
async def submit_tts_job(request: TTSRequest):
"""Submit a TTS job for processing."""
job_id = str(uuid.uuid4())
job = JobInfo(
job_id=job_id,
text=request.text,
voice=request.voice,
status=JobStatus.PENDING,
progress=0,
created_at=datetime.now().isoformat()
)
jobs[job_id] = job
save_jobs_to_disk()
# Use asyncio.create_task for proper async execution
asyncio.create_task(
generate_speech_background(job_id, request.text, request.voice)
)
print(f"Job {job_id} submitted: '{request.text[:50]}...' with voice '{request.voice}'")
return JobResponse(job_id=job_id, status=JobStatus.PENDING)
@app.get("/tts/status/{job_id}", response_model=StatusResponse)
async def get_job_status(job_id: str):
"""Get status of a TTS job."""
if job_id not in jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = jobs[job_id]
response = StatusResponse(
job_id=job_id,
status=job.status,
progress=job.progress,
cached=job.cached
)
if job.status == JobStatus.SUCCESS:
response.audio_url = f"/tts/audio/{job_id}"
elif job.status == JobStatus.FAILURE:
response.error = job.error
return response
@app.get("/tts/audio/{job_id}")
async def get_audio(job_id: str):
"""Retrieve generated audio file."""
if job_id not in jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = jobs[job_id]
if job.status != JobStatus.SUCCESS:
raise HTTPException(
status_code=400,
detail=f"Audio not ready. Job status: {job.status}"
)
if not job.audio_path or not Path(job.audio_path).exists():
raise HTTPException(status_code=404, detail="Audio file not found")
return FileResponse(
job.audio_path,
media_type="audio/wav",
filename=f"{job_id}.wav"
)
@app.post("/tts/stream")
async def stream_tts(request: TTSStreamRequest):
"""
Stream TTS audio with sentence-level chunking.
Splits text into small chunks (sentences/clauses) and generates each
independently. First chunk's audio starts playing while later chunks
are still generating. Reduces perceived latency significantly.
"""
global model
if model is None:
raise HTTPException(status_code=503, detail="Model not loaded")
voice = request.voice
if voice not in BUILTIN_VOICES:
voice = DEFAULT_VOICE
def sync_audio_generator():
"""Generate audio per-sentence, yielding as each finishes."""
try:
# Split into fine-grained chunks for faster first-audio
text_chunks = chunk_text_fine(request.text)
print(f"[stream] {len(text_chunks)} chunk(s): {[c[:40] for c in text_chunks]}")
for chunk_idx, chunk in enumerate(text_chunks):
print(f" Generating chunk {chunk_idx + 1}/{len(text_chunks)}: {chunk[:60]}...")
syn_tokens = model.generate_speech(
prompt=chunk,
voice=voice,
max_tokens=MAX_TOKENS,
)
for audio_chunk in syn_tokens:
yield audio_chunk
except Exception as e:
print(f"Stream error: {e}")
raise
return StreamingResponse(
sync_audio_generator(),
media_type="audio/pcm"
)
@app.post("/voice/clone")
async def upload_voice_reference(
name: str,
audio: UploadFile = File(...),
):
"""
Upload a reference audio file for voice cloning.
Args:
name: Name for this custom voice
audio: WAV audio file (5-30 seconds recommended)
"""
if not name.isalnum():
raise HTTPException(status_code=400, detail="Voice name must be alphanumeric")
if name in BUILTIN_VOICES:
raise HTTPException(status_code=400, detail="Cannot overwrite built-in voice")
# Save the reference audio
voice_path = VOICES_DIR / f"{name}.wav"
try:
content = await audio.read()
with open(voice_path, 'wb') as f:
f.write(content)
return {
"status": "success",
"voice_name": name,
"message": f"Voice '{name}' saved. Use voice='{name}' in TTS requests."
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to save voice: {e}")
@app.delete("/voice/{name}")
async def delete_voice(name: str):
"""Delete a custom voice."""
if name in BUILTIN_VOICES:
raise HTTPException(status_code=400, detail="Cannot delete built-in voice")
voice_path = VOICES_DIR / f"{name}.wav"
if not voice_path.exists():
raise HTTPException(status_code=404, detail="Voice not found")
voice_path.unlink()
return {"status": "success", "message": f"Voice '{name}' deleted"}
@app.delete("/tts/job/{job_id}")
async def delete_job(job_id: str):
"""Delete a job and its audio file."""
if job_id not in jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = jobs[job_id]
if job.audio_path and Path(job.audio_path).exists():
try:
Path(job.audio_path).unlink()
except:
pass
del jobs[job_id]
save_jobs_to_disk()
return {"message": f"Job {job_id} deleted"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8766, # Same port as VoiceTail for drop-in replacement
reload=False
)