Files
orpheus-tts/main.py

616 lines
17 KiB
Python

#!/usr/bin/env python3
"""
OrpheusTail - Orpheus TTS Service
FastAPI server for Orpheus text-to-speech generation on Jetson AGX Orin.
Replaces VoiceTail (Bark) with better control, voice cloning, and emotion tags.
Key Features:
- Emotion tags: <laugh>, <chuckle>, <sigh>, <cough>, <sniffle>, <groan>, <yawn>, <gasp>
- Zero-shot voice cloning from reference audio
- Streaming support for real-time head playback
- Built-in voices: tara, leah, jess, leo, dan, mia, zac, zoe
Endpoints:
- POST /tts/submit - Submit TTS job (returns job_id)
- GET /tts/status/{job_id} - Check job status
- GET /tts/audio/{job_id} - Download generated audio
- POST /tts/stream - Stream audio in real-time (for head)
- POST /voice/clone - Upload reference audio for voice cloning
- GET /voices - List available voices
- GET /health - Health check
"""
import os
import json
import hashlib
import asyncio
import uuid
import wave
import io
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from enum import Enum
from fastapi import FastAPI, BackgroundTasks, HTTPException, UploadFile, File
from fastapi.responses import FileResponse, StreamingResponse
from pydantic import BaseModel
# Configuration from environment
ORPHEUS_MODEL = os.getenv("ORPHEUS_MODEL", "medium-3b")
CACHE_ENABLED = os.getenv("CACHE_ENABLED", "true").lower() == "true"
CACHE_DIR = Path(os.getenv("CACHE_DIR", "cache"))
OUTPUT_DIR = Path(os.getenv("OUTPUT_DIR", "output"))
VOICES_DIR = Path(os.getenv("VOICES_DIR", "voices")) # For cloned voice references
RETENTION_DAYS = int(os.getenv("RETENTION_DAYS", "10"))
CLEANUP_INTERVAL_HOURS = int(os.getenv("CLEANUP_INTERVAL_HOURS", "1"))
DEFAULT_VOICE = os.getenv("DEFAULT_VOICE", "tara") # Orpheus default voice
MAX_MODEL_LEN = int(os.getenv("MAX_MODEL_LEN", "2048"))
SAMPLE_RATE = 24000
# Ensure directories exist
CACHE_DIR.mkdir(exist_ok=True)
OUTPUT_DIR.mkdir(exist_ok=True)
VOICES_DIR.mkdir(exist_ok=True)
# Jobs persistence
JOBS_FILE = OUTPUT_DIR / "jobs.json"
# Built-in Orpheus voices (in order of conversational realism per docs)
BUILTIN_VOICES = ["tara", "leah", "jess", "leo", "dan", "mia", "zac", "zoe"]
# Supported emotion tags
EMOTION_TAGS = ["<laugh>", "<chuckle>", "<sigh>", "<cough>", "<sniffle>", "<groan>", "<yawn>", "<gasp>"]
# Initialize FastAPI
app = FastAPI(
title="OrpheusTail - Orpheus TTS Service",
description="Text-to-speech with emotion control and voice cloning for Vixy",
version="1.0.0"
)
# Global model (loaded at startup)
model = None
class JobStatus(str, Enum):
"""Job status enum"""
PENDING = "PENDING"
PROCESSING = "PROCESSING"
SUCCESS = "SUCCESS"
FAILURE = "FAILURE"
@dataclass
class JobInfo:
"""Job information"""
job_id: str
text: str
voice: str
status: JobStatus
progress: int = 0
audio_path: Optional[str] = None
error: Optional[str] = None
cached: bool = False
created_at: str = ""
completed_at: Optional[str] = None
# In-memory job storage
jobs: Dict[str, JobInfo] = {}
def load_jobs_from_disk():
"""Load jobs from disk on startup"""
global jobs
if JOBS_FILE.exists():
try:
with open(JOBS_FILE, 'r') as f:
data = json.load(f)
for job_id, job_dict in data.items():
jobs[job_id] = JobInfo(**job_dict)
print(f"Loaded {len(jobs)} jobs from disk")
except Exception as e:
print(f"Error loading jobs: {e}")
def save_jobs_to_disk():
"""Save jobs to disk"""
try:
data = {job_id: asdict(job) for job_id, job in jobs.items()}
with open(JOBS_FILE, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"Error saving jobs: {e}")
def hash_text_voice(text: str, voice: str) -> str:
"""Generate cache key from text + voice"""
content = f"{text}|{voice}"
return hashlib.sha256(content.encode()).hexdigest()
def get_from_cache(cache_key: str) -> Optional[str]:
"""Check if audio exists in cache"""
if not CACHE_ENABLED:
return None
cache_path = CACHE_DIR / f"{cache_key}.wav"
if cache_path.exists():
print(f"Cache hit: {cache_key}")
return str(cache_path)
return None
def save_to_cache(cache_key: str, audio_path: str):
"""Save generated audio to cache"""
if not CACHE_ENABLED:
return
try:
import shutil
cache_path = CACHE_DIR / f"{cache_key}.wav"
shutil.copy(audio_path, cache_path)
print(f"Saved to cache: {cache_key}")
except Exception as e:
print(f"Error saving to cache: {e}")
def get_custom_voices() -> List[str]:
"""Get list of custom cloned voices"""
voices = []
for voice_file in VOICES_DIR.glob("*.wav"):
voices.append(voice_file.stem)
return voices
def generate_speech(text: str, voice: str) -> bytes:
"""
Generate speech using Orpheus model.
Args:
text: Text to convert (may include emotion tags)
voice: Voice name (built-in or custom)
Returns:
WAV audio bytes
"""
global model
# Check if it's a custom voice (needs reference audio)
custom_voice_path = VOICES_DIR / f"{voice}.wav"
if custom_voice_path.exists():
# TODO: Implement voice cloning with reference audio
# For now, fall back to built-in voice
print(f"Custom voice '{voice}' - voice cloning to be implemented")
voice = DEFAULT_VOICE
elif voice not in BUILTIN_VOICES:
print(f"Unknown voice '{voice}', using default '{DEFAULT_VOICE}'")
voice = DEFAULT_VOICE
# Generate speech using Orpheus
# Note: text is passed as-is, emotion tags like <laugh> are handled by Orpheus
audio_chunks = []
syn_tokens = model.generate_speech(
prompt=text,
voice=voice,
)
# Collect audio chunks
for audio_chunk in syn_tokens:
audio_chunks.append(audio_chunk)
# Combine chunks into single audio
import numpy as np
audio_data = np.concatenate(audio_chunks) if len(audio_chunks) > 1 else audio_chunks[0]
# Convert to WAV bytes
buffer = io.BytesIO()
with wave.open(buffer, 'wb') as wf:
wf.setnchannels(1)
wf.setsampwidth(2) # 16-bit
wf.setframerate(SAMPLE_RATE)
wf.writeframes(audio_data)
return buffer.getvalue()
def save_audio_to_file(job_id: str, audio_bytes: bytes) -> str:
"""Save audio bytes to WAV file."""
output_path = OUTPUT_DIR / f"{job_id}.wav"
with open(output_path, 'wb') as f:
f.write(audio_bytes)
return str(output_path)
def generate_speech_background(job_id: str, text: str, voice: str):
"""Background task for speech generation."""
try:
jobs[job_id].status = JobStatus.PROCESSING
jobs[job_id].progress = 25
save_jobs_to_disk()
# Check cache first
cache_key = hash_text_voice(text, voice)
cached_path = get_from_cache(cache_key)
if cached_path:
jobs[job_id].audio_path = cached_path
jobs[job_id].status = JobStatus.SUCCESS
jobs[job_id].progress = 100
jobs[job_id].cached = True
jobs[job_id].completed_at = datetime.now().isoformat()
save_jobs_to_disk()
print(f"Job {job_id} completed from cache")
return
# Generate audio
jobs[job_id].progress = 50
save_jobs_to_disk()
print(f"Generating audio for job {job_id}...")
audio_bytes = generate_speech(text, voice)
# Save to file
jobs[job_id].progress = 75
save_jobs_to_disk()
output_path = save_audio_to_file(job_id, audio_bytes)
# Save to cache
save_to_cache(cache_key, output_path)
# Complete
jobs[job_id].audio_path = output_path
jobs[job_id].status = JobStatus.SUCCESS
jobs[job_id].progress = 100
jobs[job_id].completed_at = datetime.now().isoformat()
save_jobs_to_disk()
print(f"Job {job_id} completed successfully")
except Exception as e:
print(f"Job {job_id} failed: {e}")
import traceback
traceback.print_exc()
jobs[job_id].status = JobStatus.FAILURE
jobs[job_id].error = str(e)
save_jobs_to_disk()
async def cleanup_old_jobs():
"""Background task to cleanup old jobs and files."""
while True:
try:
await asyncio.sleep(CLEANUP_INTERVAL_HOURS * 3600)
cutoff = datetime.now() - timedelta(days=RETENTION_DAYS)
to_delete = []
for job_id, job in jobs.items():
try:
created = datetime.fromisoformat(job.created_at)
if created < cutoff:
if job.audio_path and Path(job.audio_path).exists():
Path(job.audio_path).unlink()
to_delete.append(job_id)
except:
pass
for job_id in to_delete:
del jobs[job_id]
if to_delete:
save_jobs_to_disk()
print(f"Cleanup: deleted {len(to_delete)} old jobs")
except Exception as e:
print(f"Error in cleanup task: {e}")
@app.on_event("startup")
async def startup():
"""Load model and jobs on startup"""
global model
print("=" * 60)
print("OrpheusTail - Orpheus TTS Service Starting")
print(f"Model: {ORPHEUS_MODEL}")
print(f"Max Model Len: {MAX_MODEL_LEN}")
print(f"Cache: {'Enabled' if CACHE_ENABLED else 'Disabled'}")
print(f"Default Voice: {DEFAULT_VOICE}")
print("=" * 60)
# Import and load Orpheus model
print("Loading Orpheus model (this may take a moment)...")
from orpheus_tts import OrpheusModel
# Note: PyPI orpheus-speech 0.1.0 uses simpler API
# model_name can be "medium-3b" or full HF path
model = OrpheusModel(model_name=ORPHEUS_MODEL)
print("✓ Orpheus model loaded successfully")
# Load jobs from disk
load_jobs_from_disk()
# Start cleanup task
asyncio.create_task(cleanup_old_jobs())
# === Pydantic Models ===
class TTSRequest(BaseModel):
"""TTS job submission request"""
text: str
voice: str = DEFAULT_VOICE
class TTSStreamRequest(BaseModel):
"""TTS streaming request (for head playback)"""
text: str
voice: str = DEFAULT_VOICE
class JobResponse(BaseModel):
"""Job submission response"""
job_id: str
status: str
class StatusResponse(BaseModel):
"""Job status response"""
job_id: str
status: str
progress: int
cached: bool = False
audio_url: Optional[str] = None
error: Optional[str] = None
class VoicesResponse(BaseModel):
"""Available voices response"""
builtin: List[str]
custom: List[str]
default: str
emotion_tags: List[str]
# === Endpoints ===
@app.get("/")
def root():
"""Root endpoint"""
return {
"service": "OrpheusTail - Orpheus TTS Service",
"version": "1.0.0",
"model": ORPHEUS_MODEL,
"default_voice": DEFAULT_VOICE,
"emotion_tags": EMOTION_TAGS,
"endpoints": {
"/tts/submit": "POST - Submit TTS job",
"/tts/status/{job_id}": "GET - Check job status",
"/tts/audio/{job_id}": "GET - Download audio",
"/tts/stream": "POST - Stream audio (for head)",
"/voice/clone": "POST - Upload voice reference",
"/voices": "GET - List available voices",
"/health": "GET - Health check"
}
}
@app.get("/health")
def health():
"""Health check"""
return {
"status": "healthy",
"model_loaded": model is not None,
"cache_enabled": CACHE_ENABLED,
"voices_available": len(BUILTIN_VOICES) + len(get_custom_voices())
}
@app.get("/voices", response_model=VoicesResponse)
def list_voices():
"""List all available voices"""
return VoicesResponse(
builtin=BUILTIN_VOICES,
custom=get_custom_voices(),
default=DEFAULT_VOICE,
emotion_tags=EMOTION_TAGS
)
@app.post("/tts/submit", response_model=JobResponse)
async def submit_tts_job(request: TTSRequest, background_tasks: BackgroundTasks):
"""Submit a TTS job for processing."""
job_id = str(uuid.uuid4())
job = JobInfo(
job_id=job_id,
text=request.text,
voice=request.voice,
status=JobStatus.PENDING,
progress=0,
created_at=datetime.now().isoformat()
)
jobs[job_id] = job
save_jobs_to_disk()
background_tasks.add_task(
generate_speech_background,
job_id,
request.text,
request.voice
)
print(f"Job {job_id} submitted: '{request.text[:50]}...' with voice '{request.voice}'")
return JobResponse(job_id=job_id, status=JobStatus.PENDING)
@app.get("/tts/status/{job_id}", response_model=StatusResponse)
async def get_job_status(job_id: str):
"""Get status of a TTS job."""
if job_id not in jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = jobs[job_id]
response = StatusResponse(
job_id=job_id,
status=job.status,
progress=job.progress,
cached=job.cached
)
if job.status == JobStatus.SUCCESS:
response.audio_url = f"/tts/audio/{job_id}"
elif job.status == JobStatus.FAILURE:
response.error = job.error
return response
@app.get("/tts/audio/{job_id}")
async def get_audio(job_id: str):
"""Retrieve generated audio file."""
if job_id not in jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = jobs[job_id]
if job.status != JobStatus.SUCCESS:
raise HTTPException(
status_code=400,
detail=f"Audio not ready. Job status: {job.status}"
)
if not job.audio_path or not Path(job.audio_path).exists():
raise HTTPException(status_code=404, detail="Audio file not found")
return FileResponse(
job.audio_path,
media_type="audio/wav",
filename=f"{job_id}.wav"
)
@app.post("/tts/stream")
async def stream_tts(request: TTSStreamRequest):
"""
Stream TTS audio in real-time.
For head-vixy to stream directly without waiting for full generation.
Returns audio chunks as they're generated.
"""
global model
if model is None:
raise HTTPException(status_code=503, detail="Model not loaded")
voice = request.voice
if voice not in BUILTIN_VOICES:
voice = DEFAULT_VOICE
async def audio_generator():
"""Generate audio chunks"""
try:
syn_tokens = model.generate_speech(
prompt=request.text,
voice=voice,
)
for audio_chunk in syn_tokens:
yield audio_chunk
except Exception as e:
print(f"Stream error: {e}")
raise
return StreamingResponse(
audio_generator(),
media_type="audio/wav"
)
@app.post("/voice/clone")
async def upload_voice_reference(
name: str,
audio: UploadFile = File(...),
):
"""
Upload a reference audio file for voice cloning.
Args:
name: Name for this custom voice
audio: WAV audio file (5-30 seconds recommended)
"""
if not name.isalnum():
raise HTTPException(status_code=400, detail="Voice name must be alphanumeric")
if name in BUILTIN_VOICES:
raise HTTPException(status_code=400, detail="Cannot overwrite built-in voice")
# Save the reference audio
voice_path = VOICES_DIR / f"{name}.wav"
try:
content = await audio.read()
with open(voice_path, 'wb') as f:
f.write(content)
return {
"status": "success",
"voice_name": name,
"message": f"Voice '{name}' saved. Use voice='{name}' in TTS requests."
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to save voice: {e}")
@app.delete("/voice/{name}")
async def delete_voice(name: str):
"""Delete a custom voice."""
if name in BUILTIN_VOICES:
raise HTTPException(status_code=400, detail="Cannot delete built-in voice")
voice_path = VOICES_DIR / f"{name}.wav"
if not voice_path.exists():
raise HTTPException(status_code=404, detail="Voice not found")
voice_path.unlink()
return {"status": "success", "message": f"Voice '{name}' deleted"}
@app.delete("/tts/job/{job_id}")
async def delete_job(job_id: str):
"""Delete a job and its audio file."""
if job_id not in jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = jobs[job_id]
if job.audio_path and Path(job.audio_path).exists():
try:
Path(job.audio_path).unlink()
except:
pass
del jobs[job_id]
save_jobs_to_disk()
return {"message": f"Job {job_id} deleted"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8766, # Same port as VoiceTail for drop-in replacement
reload=False
)