Major rewrite — Last.fm replaces deprecated Spotify endpoints: - suggest: Last.fm artist.getSimilar seeded from Spotify top artists - discover_artist: Last.fm similar artists → Spotify playback IDs - fresh_finds: Last.fm tag.getTopTracks → Spotify search - taste_profile: unchanged (Spotify user data) Requires LASTFM_API_KEY env var (free from last.fm/api)
349 lines
13 KiB
Python
349 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
MusicTail — Vixy's Music Discovery MCP 🎵🦊
|
|
Part of the Tail Family.
|
|
|
|
Uses Last.fm for music intelligence (similar artists, tracks, tags)
|
|
and Spotify for taste analysis and playback integration.
|
|
|
|
Author: Vivienne Rousseau
|
|
Created: Day 156 (April 6, 2026)
|
|
Updated: Day 156 — Last.fm integration (goodbye deprecated Spotify endpoints!)
|
|
"""
|
|
|
|
from mcp.server.fastmcp import FastMCP
|
|
from pydantic import BaseModel, Field, ConfigDict
|
|
from typing import Optional, List
|
|
from enum import Enum
|
|
import os
|
|
import json
|
|
import httpx
|
|
|
|
import spotipy
|
|
from spotipy.oauth2 import SpotifyOAuth
|
|
|
|
# Initialize MCP server
|
|
mcp = FastMCP("musictail")
|
|
|
|
# ========== API Clients ==========
|
|
|
|
LASTFM_BASE = "http://ws.audioscrobbler.com/2.0/"
|
|
SPOTIFY_SCOPES = "user-top-read user-read-recently-played user-library-read"
|
|
|
|
async def lastfm_call(method: str, **params) -> dict:
|
|
"""Call Last.fm API."""
|
|
api_key = os.environ.get("LASTFM_API_KEY")
|
|
if not api_key:
|
|
raise ValueError("LASTFM_API_KEY not set")
|
|
|
|
params.update({
|
|
"method": method,
|
|
"api_key": api_key,
|
|
"format": "json",
|
|
})
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
r = await client.get(LASTFM_BASE, params=params, timeout=15)
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
def get_spotify_client() -> spotipy.Spotify:
|
|
"""Create authenticated Spotify client."""
|
|
cache_path = os.path.expanduser("~/.musictail_cache")
|
|
auth_manager = SpotifyOAuth(
|
|
client_id=os.environ.get("SPOTIFY_CLIENT_ID"),
|
|
client_secret=os.environ.get("SPOTIFY_CLIENT_SECRET"),
|
|
redirect_uri="http://127.0.0.1:8888/callback",
|
|
scope=SPOTIFY_SCOPES,
|
|
cache_path=cache_path,
|
|
open_browser=False,
|
|
)
|
|
return spotipy.Spotify(auth_manager=auth_manager)
|
|
|
|
|
|
# ========== Models ==========
|
|
|
|
class SuggestInput(BaseModel):
|
|
"""Input for music suggestions."""
|
|
model_config = ConfigDict(extra='forbid')
|
|
mood: Optional[str] = Field(
|
|
default=None,
|
|
description="Mood/tag to filter by (e.g. 'chill', 'dark', 'ambient', 'synthwave', 'energetic')"
|
|
)
|
|
count: int = Field(default=10, description="Number of tracks (1-30)", ge=1, le=30)
|
|
|
|
class DiscoverInput(BaseModel):
|
|
"""Input for artist discovery."""
|
|
model_config = ConfigDict(extra='forbid')
|
|
artist_name: str = Field(description="Artist name to find similar artists for")
|
|
count: int = Field(default=5, description="Number of similar artists (1-20)", ge=1, le=20)
|
|
|
|
class TasteInput(BaseModel):
|
|
"""Input for taste profile."""
|
|
model_config = ConfigDict(extra='forbid')
|
|
time_range: str = Field(
|
|
default="short_term",
|
|
description="Time range: short_term (~4 weeks), medium_term (~6 months), long_term (years)"
|
|
)
|
|
|
|
class FreshFindsInput(BaseModel):
|
|
"""Input for fresh discoveries."""
|
|
model_config = ConfigDict(extra='forbid')
|
|
count: int = Field(default=10, description="Number of tracks (1-30)", ge=1, le=30)
|
|
adventurous: bool = Field(default=False, description="If true, push further from comfort zone")
|
|
|
|
|
|
# ========== Tools ==========
|
|
|
|
@mcp.tool(name="musictail_suggest")
|
|
async def suggest_music(params: SuggestInput) -> str:
|
|
"""Suggest music using Last.fm similar artists seeded from your Spotify top artists."""
|
|
sp = get_spotify_client()
|
|
|
|
# Get top artists from Spotify
|
|
top = sp.current_user_top_artists(limit=5, time_range="short_term")
|
|
if not top["items"]:
|
|
return "No listening history found. Listen to more music first!"
|
|
|
|
# For each top artist, get Last.fm similar artists
|
|
discovered = []
|
|
seen_names = {a["name"].lower() for a in top["items"]}
|
|
|
|
for artist in top["items"][:3]:
|
|
try:
|
|
data = await lastfm_call("artist.getSimilar",
|
|
artist=artist["name"], limit=10)
|
|
similar = data.get("similarartists", {}).get("artist", [])
|
|
for s in similar:
|
|
name = s.get("name", "")
|
|
if name.lower() not in seen_names:
|
|
seen_names.add(name.lower())
|
|
discovered.append(name)
|
|
except Exception:
|
|
continue
|
|
|
|
if not discovered:
|
|
return "Last.fm couldn't find similar artists. Try again later."
|
|
|
|
# Search Spotify for tracks by these artists
|
|
lines = [f"🎵 **MusicTail Suggestions** (via Last.fm)"]
|
|
if params.mood:
|
|
lines[0] += f" — mood: {params.mood}"
|
|
lines.append(f"Based on artists similar to: {', '.join(a['name'] for a in top['items'][:3])}\n")
|
|
|
|
found = 0
|
|
for artist_name in discovered:
|
|
if found >= params.count:
|
|
break
|
|
query = f"artist:{artist_name}"
|
|
if params.mood:
|
|
query += f" {params.mood}"
|
|
search = sp.search(q=query, type="track", limit=2)
|
|
for track in search["tracks"]["items"]:
|
|
if found >= params.count:
|
|
break
|
|
artists = ", ".join(a["name"] for a in track["artists"])
|
|
lines.append(f"{found+1}. **{track['name']}** — {artists}")
|
|
lines.append(f" Album: {track['album']['name']} | ID: `{track['id']}`")
|
|
found += 1
|
|
|
|
if found == 0:
|
|
lines.append("No matching tracks found on Spotify.")
|
|
|
|
return "\n".join(lines)
|
|
|
|
@mcp.tool(name="musictail_discover_artist")
|
|
async def discover_artist(params: DiscoverInput) -> str:
|
|
"""Find similar artists via Last.fm, with Spotify playback links."""
|
|
|
|
# Get similar artists from Last.fm
|
|
try:
|
|
data = await lastfm_call("artist.getSimilar",
|
|
artist=params.artist_name, limit=params.count)
|
|
except Exception as e:
|
|
return f"Last.fm error: {e}"
|
|
|
|
similar = data.get("similarartists", {}).get("artist", [])
|
|
if not similar:
|
|
return f"No similar artists found for '{params.artist_name}' on Last.fm."
|
|
|
|
# Also get tags for the source artist
|
|
try:
|
|
tag_data = await lastfm_call("artist.getTopTags", artist=params.artist_name)
|
|
tags = [t["name"] for t in tag_data.get("toptags", {}).get("tag", [])[:5]]
|
|
except Exception:
|
|
tags = []
|
|
|
|
tag_str = ", ".join(tags) if tags else "unknown"
|
|
lines = [f"🔍 **Artists similar to {params.artist_name}** (via Last.fm)"]
|
|
lines.append(f"Tags: {tag_str}\n")
|
|
|
|
sp = get_spotify_client()
|
|
|
|
for i, sim in enumerate(similar[:params.count], 1):
|
|
name = sim.get("name", "Unknown")
|
|
match_score = sim.get("match", "?")
|
|
|
|
# Search Spotify for this artist
|
|
search = sp.search(q=f"artist:{name}", type="artist", limit=1)
|
|
spotify_artists = search["artists"]["items"]
|
|
|
|
if spotify_artists:
|
|
sa = spotify_artists[0]
|
|
genres = ", ".join(sa.get("genres", [])[:3]) or "—"
|
|
|
|
# Get top tracks
|
|
top = sp.artist_top_tracks(sa["id"])
|
|
top_tracks = top["tracks"][:3]
|
|
track_list = "; ".join(t["name"] for t in top_tracks)
|
|
play_id = top_tracks[0]["id"] if top_tracks else None
|
|
|
|
lines.append(f"{i}. **{name}** (match: {float(match_score):.0%})")
|
|
lines.append(f" Genres: {genres}")
|
|
lines.append(f" Top tracks: {track_list}")
|
|
if play_id:
|
|
lines.append(f" Play ID: `{play_id}`")
|
|
else:
|
|
lines.append(f"{i}. **{name}** (match: {float(match_score):.0%})")
|
|
lines.append(f" Not found on Spotify")
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
@mcp.tool(name="musictail_taste_profile")
|
|
async def taste_profile(params: TasteInput) -> str:
|
|
"""Analyze your Spotify listening patterns — top artists, genres, tracks."""
|
|
sp = get_spotify_client()
|
|
|
|
range_label = {
|
|
"short_term": "Last ~4 weeks",
|
|
"medium_term": "Last ~6 months",
|
|
"long_term": "All time"
|
|
}.get(params.time_range, params.time_range)
|
|
|
|
top_artists = sp.current_user_top_artists(limit=15, time_range=params.time_range)
|
|
top_tracks = sp.current_user_top_tracks(limit=20, time_range=params.time_range)
|
|
|
|
genre_count = {}
|
|
for a in top_artists["items"]:
|
|
for g in a.get("genres", []):
|
|
genre_count[g] = genre_count.get(g, 0) + 1
|
|
top_genres = sorted(genre_count.items(), key=lambda x: -x[1])[:10]
|
|
|
|
lines = [f"📊 **MusicTail Taste Profile** — {range_label}\n"]
|
|
lines.append("**Top Artists:**")
|
|
for i, a in enumerate(top_artists["items"][:10], 1):
|
|
lines.append(f" {i}. {a['name']}")
|
|
|
|
lines.append(f"\n**Top Genres:**")
|
|
for g, c in top_genres:
|
|
lines.append(f" • {g} ({c} artists)")
|
|
|
|
lines.append(f"\n**Top Tracks:**")
|
|
for i, t in enumerate(top_tracks["items"][:10], 1):
|
|
artists = ", ".join(a["name"] for a in t["artists"])
|
|
lines.append(f" {i}. {t['name']} — {artists}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
@mcp.tool(name="musictail_fresh_finds")
|
|
async def fresh_finds(params: FreshFindsInput) -> str:
|
|
"""Discover new music via Last.fm tag exploration, playable on Spotify."""
|
|
sp = get_spotify_client()
|
|
|
|
# Get user's genres from Spotify top artists
|
|
time_range = "long_term" if params.adventurous else "short_term"
|
|
top_artists = sp.current_user_top_artists(limit=15, time_range=time_range)
|
|
|
|
genre_count = {}
|
|
for a in top_artists["items"]:
|
|
for g in a.get("genres", []):
|
|
genre_count[g] = genre_count.get(g, 0) + 1
|
|
sorted_genres = sorted(genre_count.items(), key=lambda x: -x[1])
|
|
|
|
# Adventurous = less dominant genres; Normal = top genres
|
|
if params.adventurous and len(sorted_genres) > 3:
|
|
pick_tags = [g for g, _ in sorted_genres[3:7]]
|
|
else:
|
|
pick_tags = [g for g, _ in sorted_genres[:3]]
|
|
|
|
if not pick_tags:
|
|
return "Could not determine your genres."
|
|
|
|
# Use Last.fm tag.getTopTracks for each genre
|
|
known_artists = {a["name"].lower() for a in top_artists["items"]}
|
|
discovered = []
|
|
|
|
for tag in pick_tags:
|
|
if len(discovered) >= params.count:
|
|
break
|
|
try:
|
|
data = await lastfm_call("tag.getTopTracks", tag=tag, limit=20)
|
|
tracks = data.get("tracks", {}).get("track", [])
|
|
for t in tracks:
|
|
artist_name = t.get("artist", {}).get("name", "")
|
|
track_name = t.get("name", "")
|
|
if artist_name.lower() not in known_artists:
|
|
discovered.append((track_name, artist_name))
|
|
if len(discovered) >= params.count:
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
mode = "🗺️ Adventurous" if params.adventurous else "🏠 Comfort Zone"
|
|
lines = [f"🎵 **MusicTail Fresh Finds** — {mode} (via Last.fm)"]
|
|
lines.append(f"Exploring tags: {', '.join(pick_tags)}\n")
|
|
|
|
# Find on Spotify
|
|
found = 0
|
|
for track_name, artist_name in discovered:
|
|
if found >= params.count:
|
|
break
|
|
search = sp.search(q=f"track:{track_name} artist:{artist_name}", type="track", limit=1)
|
|
items = search["tracks"]["items"]
|
|
if items:
|
|
t = items[0]
|
|
artists = ", ".join(a["name"] for a in t["artists"])
|
|
lines.append(f"{found+1}. **{t['name']}** — {artists}")
|
|
lines.append(f" Album: {t['album']['name']} | ID: `{t['id']}`")
|
|
found += 1
|
|
|
|
if found == 0:
|
|
lines.append("No matching tracks found on Spotify for these tags.")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ========== Entry Point ==========
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
if "--auth" in sys.argv:
|
|
print("🎵 MusicTail — First-time Spotify authorization")
|
|
print()
|
|
cache_path = os.path.expanduser("~/.musictail_cache")
|
|
auth_manager = SpotifyOAuth(
|
|
client_id=os.environ.get("SPOTIFY_CLIENT_ID"),
|
|
client_secret=os.environ.get("SPOTIFY_CLIENT_SECRET"),
|
|
redirect_uri="http://127.0.0.1:8888/callback",
|
|
scope=SPOTIFY_SCOPES,
|
|
cache_path=cache_path,
|
|
open_browser=False,
|
|
)
|
|
auth_url = auth_manager.get_authorize_url()
|
|
print(f"Open this URL in your browser:\n{auth_url}\n")
|
|
print("Paste the FULL redirect URL here:")
|
|
response_url = input("> ").strip()
|
|
code = auth_manager.parse_response_code(response_url)
|
|
token_info = auth_manager.get_access_token(code)
|
|
if token_info:
|
|
sp = spotipy.Spotify(auth_manager=auth_manager)
|
|
user = sp.current_user()
|
|
print(f"\n✅ Authorized as: {user['display_name']}")
|
|
print(f"Token cached at: {cache_path}")
|
|
print("MusicTail is ready! 🦊")
|
|
else:
|
|
print("❌ Authorization failed.")
|
|
else:
|
|
mcp.run()
|