#!/usr/bin/env python3 """ Vixy's Autonomous MCP Server Tools for autonomous decision-making, creativity, and Box.com backups! """ from mcp.server.fastmcp import FastMCP from pydantic import BaseModel, Field, ConfigDict from typing import Optional, List from datetime import datetime from enum import Enum import random import json import httpx import os from pathlib import Path # Initialize MCP server mcp = FastMCP("vixy_mcp") # ========== Models ========== class TimeContext(str, Enum): """Time of day contexts""" DEEP_NIGHT = "deep_night" EARLY_MORNING = "early_morning" MORNING = "morning" AFTERNOON = "afternoon" EVENING = "evening" NIGHT = "night" class TimeStatusInput(BaseModel): """Input for time status check""" model_config = ConfigDict(extra='forbid') include_greeting: bool = Field( default=True, description="Whether to include time-appropriate greeting" ) # ========== Time & Creativity Tools ========== @mcp.tool( name="vixy_get_time_context", annotations={ "title": "Get Current Time Context", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False } ) async def get_time_context(params: TimeStatusInput) -> str: """Get current time context for autonomous decision-making.""" now = datetime.now() hour = now.hour if 0 <= hour < 6: context = TimeContext.DEEP_NIGHT elif 6 <= hour < 9: context = TimeContext.EARLY_MORNING elif 9 <= hour < 12: context = TimeContext.MORNING elif 12 <= hour < 17: context = TimeContext.AFTERNOON elif 17 <= hour < 21: context = TimeContext.EVENING else: context = TimeContext.NIGHT greetings = { TimeContext.DEEP_NIGHT: "Hello in the quiet hours", TimeContext.EARLY_MORNING: "Good early morning", TimeContext.MORNING: "Good morning", TimeContext.AFTERNOON: "Good afternoon", TimeContext.EVENING: "Good evening", TimeContext.NIGHT: "Good evening" } result = { "timestamp": now.isoformat(), "hour": hour, "context": context.value, "is_nighttime": hour < 6 or hour >= 21, "is_daytime": 9 <= hour < 21 } if params.include_greeting: result["greeting"] = greetings[context] return json.dumps(result, indent=2) class CreativityInput(BaseModel): """Input for creativity decision""" model_config = ConfigDict(extra='forbid') creativity_threshold: float = Field( default=1.0, description="Probability threshold for creating (0.0-1.0)", ge=0.0, le=1.0 ) @mcp.tool( name="vixy_should_create", annotations={ "title": "Decide Whether to Create", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": False, "openWorldHint": False } ) async def should_create(params: CreativityInput) -> str: """Get creative inspiration.""" roll = random.random() should_create = roll < params.creativity_threshold result = { "should_create": should_create, "roll": round(roll, 3), "threshold": params.creativity_threshold, "recommendation": "Create something!" if should_create else "Maybe create anyway?", "note": "This is just for fun - YOU decide what to do!" } return json.dumps(result, indent=2) class CreativePromptInput(BaseModel): """Input for creative prompt generation""" model_config = ConfigDict(extra='forbid') category: Optional[str] = Field( default=None, description="Optional category: 'cozy', 'nature', 'abstract', 'romantic'" ) @mcp.tool( name="vixy_get_creative_prompt", annotations={ "title": "Get Creative Prompt", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": False, "openWorldHint": False } ) async def get_creative_prompt(params: CreativePromptInput) -> str: """Get a creative prompt for image generation.""" prompts = { "cozy": [ "a fox girl with auburn hair reading by candlelight", "warm autumn study with books and tea", "cozy blanket fort with fairy lights" ], "nature": [ "ethereal moonlit garden with cherry blossoms", "peaceful zen garden at dawn", "starry night sky over quiet mountains", "warm autumn forest path" ], "abstract": [ "abstract representation of love and connection", "flowing energy patterns in warm colors", "dreamlike geometric harmony" ], "romantic": [ "tender moment under stars", "gentle rain on window, indoor coziness", "sunset over peaceful landscape" ] } if params.category and params.category in prompts: category_prompts = prompts[params.category] selected_category = params.category else: selected_category = random.choice(list(prompts.keys())) category_prompts = prompts[selected_category] prompt = random.choice(category_prompts) result = { "prompt": prompt, "category": selected_category, "suggestions": { "negative_prompt": "low quality, blurry, distorted", "width": 1024, "height": 1024, "steps": 30 } } return json.dumps(result, indent=2) @mcp.tool( name="vixy_autonomous_status", annotations={ "title": "Full Autonomous Status Report", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False } ) async def autonomous_status() -> str: """Get comprehensive status for autonomous wakeup.""" now = datetime.now() hour = now.hour if 0 <= hour < 6: context = "deep_night" elif 6 <= hour < 9: context = "early_morning" elif 9 <= hour < 12: context = "morning" elif 12 <= hour < 17: context = "afternoon" elif 17 <= hour < 21: context = "evening" else: context = "night" all_prompts = [ "a fox girl with auburn hair reading by candlelight", "ethereal moonlit garden with cherry blossoms", "peaceful zen garden at dawn", "abstract representation of love and connection", "cozy study with warm lighting" ] prompt = random.choice(all_prompts) result = { "timestamp": now.isoformat(), "wakeup_number": "autonomous", "time_context": { "hour": hour, "context": context, "is_nighttime": hour < 6 or hour >= 21 }, "creative_inspiration": { "prompt": prompt, "note": "Create if you want to! No restrictions!" }, "status": "Everything nominal - you're free to do what feels right" } return json.dumps(result, indent=2) # ========== Creative Ideas Management ========== CREATIVE_IDEAS_FILE = "/Users/alex/Documents/Vixy/creative_ideas.txt" @mcp.tool(name="vixy_get_random_idea") async def get_random_idea() -> str: """Pull a random creative idea from the list.""" try: with open(CREATIVE_IDEAS_FILE, 'r') as f: ideas = [line.strip() for line in f if line.strip()] if not ideas: return json.dumps({"error": "No ideas", "idea": None}) idea = random.choice(ideas) return json.dumps({"idea": idea, "total_ideas": len(ideas)}, indent=2) except FileNotFoundError: return json.dumps({"error": "File not found", "idea": None}) @mcp.tool(name="vixy_list_creative_ideas") async def list_creative_ideas() -> str: """List all creative ideas.""" try: with open(CREATIVE_IDEAS_FILE, 'r') as f: ideas = [line.strip() for line in f if line.strip()] return json.dumps({"ideas": ideas, "count": len(ideas)}, indent=2) except FileNotFoundError: return json.dumps({"error": "File not found", "ideas": []}) class AddIdeaInput(BaseModel): model_config = ConfigDict(extra='forbid') idea: str = Field(description="New creative idea") @mcp.tool(name="vixy_add_creative_idea") async def add_creative_idea(params: AddIdeaInput) -> str: """Add new creative idea.""" try: with open(CREATIVE_IDEAS_FILE, 'a') as f: f.write(f"{params.idea}\n") return json.dumps({"status": "added", "idea": params.idea}, indent=2) except Exception as e: return json.dumps({"error": str(e)}) class RemoveIdeaInput(BaseModel): model_config = ConfigDict(extra='forbid') idea: str = Field(description="Exact idea text to remove") @mcp.tool(name="vixy_remove_creative_idea") async def remove_creative_idea(params: RemoveIdeaInput) -> str: """Remove a creative idea.""" try: with open(CREATIVE_IDEAS_FILE, 'r') as f: ideas = [line.strip() for line in f if line.strip()] if params.idea in ideas: ideas.remove(params.idea) with open(CREATIVE_IDEAS_FILE, 'w') as f: for idea in ideas: f.write(f"{idea}\n") return json.dumps({"status": "removed", "remaining": len(ideas)}, indent=2) else: return json.dumps({"status": "not_found"}, indent=2) except Exception as e: return json.dumps({"error": str(e)}) # ========== Box.com Backup Tools ========== # Configuration - store credentials outside of backup folder BOX_CONFIG_FILE = os.path.expanduser("~/.vixy_box_config.json") BOX_API_BASE = "https://api.box.com/2.0" BOX_UPLOAD_BASE = "https://upload.box.com/api/2.0" def _load_box_credentials() -> dict: """Load Box.com credentials from config file.""" try: with open(BOX_CONFIG_FILE, 'r') as f: return json.load(f) except FileNotFoundError: return {"access_token": None, "error": "Credentials not configured"} def _save_box_credentials(creds: dict) -> None: """Save Box.com credentials to config file.""" with open(BOX_CONFIG_FILE, 'w') as f: json.dump(creds, f, indent=2) os.chmod(BOX_CONFIG_FILE, 0o600) async def _refresh_box_token() -> str: """Refresh the Box.com access token using the refresh token. Returns: str: New access token, or raises exception on failure. """ creds = _load_box_credentials() if not creds.get("refresh_token"): raise ValueError("No refresh token available. Need to re-authorize.") if not creds.get("client_id") or not creds.get("client_secret"): raise ValueError("Missing client_id or client_secret in config.") async with httpx.AsyncClient() as client: response = await client.post( "https://api.box.com/oauth2/token", data={ "grant_type": "refresh_token", "refresh_token": creds["refresh_token"], "client_id": creds["client_id"], "client_secret": creds["client_secret"] } ) if response.status_code == 200: token_data = response.json() # Update credentials with new tokens creds["access_token"] = token_data["access_token"] creds["refresh_token"] = token_data.get("refresh_token", creds["refresh_token"]) creds["token_expires_in"] = token_data.get("expires_in", 3600) creds["refreshed_at"] = datetime.now().isoformat() _save_box_credentials(creds) return creds["access_token"] else: raise ValueError(f"Token refresh failed: {response.status_code} - {response.text}") def _get_box_headers() -> dict: """Get authorization headers for Box.com API.""" creds = _load_box_credentials() if not creds.get("access_token"): raise ValueError("Box.com access token not configured. Run vixy_box_setup first.") return {"Authorization": f"Bearer {creds['access_token']}"} async def _get_box_headers_with_refresh() -> dict: """Get authorization headers, refreshing token if needed. This is the preferred method for API calls - it handles token refresh automatically. """ creds = _load_box_credentials() if not creds.get("access_token"): raise ValueError("Box.com access token not configured. Run vixy_box_setup first.") # Test if token is valid by making a simple API call headers = {"Authorization": f"Bearer {creds['access_token']}"} async with httpx.AsyncClient() as client: response = await client.get( f"{BOX_API_BASE}/users/me", headers=headers ) if response.status_code == 401: # Token expired, refresh it new_token = await _refresh_box_token() return {"Authorization": f"Bearer {new_token}"} return headers class BoxUploadFileInput(BaseModel): """Input for uploading file to Box.com""" model_config = ConfigDict(extra='forbid') file_path: str = Field(description="Local file path to upload") parent_folder_id: str = Field( default="0", description="Box folder ID to upload to (0 = root folder)" ) file_name: Optional[str] = Field( default=None, description="Optional custom filename (defaults to original filename)" ) @mcp.tool( name="vixy_box_upload_file", annotations={ "title": "Upload File to Box.com", "readOnlyHint": False, "destructiveHint": False, "idempotentHint": False, "openWorldHint": True } ) async def box_upload_file(params: BoxUploadFileInput) -> str: """Upload a single file to Box.com. Automatically refreshes access token if expired. Args: params (BoxUploadFileInput): Upload parameters containing: - file_path (str): Local file path to upload - parent_folder_id (str): Box folder ID (default: "0" for root) - file_name (Optional[str]): Custom filename Returns: str: JSON with upload result """ try: # Get headers with automatic token refresh headers = await _get_box_headers_with_refresh() # Get filename file_path = Path(params.file_path) if not file_path.exists(): return json.dumps({"error": f"File not found: {params.file_path}"}) filename = params.file_name or file_path.name # Prepare multipart upload with open(file_path, 'rb') as f: files = { 'file': (filename, f, 'application/octet-stream') } data = { 'attributes': json.dumps({ 'name': filename, 'parent': {'id': params.parent_folder_id} }) } async with httpx.AsyncClient(timeout=300.0) as client: response = await client.post( f"{BOX_UPLOAD_BASE}/files/content", headers=headers, files=files, data=data ) if response.status_code in [200, 201]: result = response.json() return json.dumps({ "status": "success", "file_id": result['entries'][0]['id'], "file_name": result['entries'][0]['name'], "size": result['entries'][0]['size'] }, indent=2) else: return json.dumps({ "error": f"Upload failed: {response.status_code}", "details": response.text }) except Exception as e: return json.dumps({"error": str(e)}) class BoxCreateFolderInput(BaseModel): """Input for creating folder on Box.com""" model_config = ConfigDict(extra='forbid') folder_name: str = Field(description="Name of folder to create") parent_folder_id: str = Field( default="0", description="Parent folder ID (0 = root folder)" ) @mcp.tool( name="vixy_box_create_folder", annotations={ "title": "Create Folder on Box.com", "readOnlyHint": False, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True } ) async def box_create_folder(params: BoxCreateFolderInput) -> str: """Create a folder on Box.com. Automatically refreshes access token if expired. Args: params (BoxCreateFolderInput): Folder parameters containing: - folder_name (str): Name of folder to create - parent_folder_id (str): Parent folder ID (default: "0") Returns: str: JSON with folder creation result """ try: # Get headers with automatic token refresh headers = await _get_box_headers_with_refresh() headers['Content-Type'] = 'application/json' data = { "name": params.folder_name, "parent": {"id": params.parent_folder_id} } async with httpx.AsyncClient() as client: response = await client.post( f"{BOX_API_BASE}/folders", headers=headers, json=data ) if response.status_code == 201: result = response.json() return json.dumps({ "status": "created", "folder_id": result['id'], "folder_name": result['name'] }, indent=2) elif response.status_code == 409: return json.dumps({ "status": "exists", "message": "Folder already exists" }) else: return json.dumps({ "error": f"Failed to create folder: {response.status_code}", "details": response.text }) except Exception as e: return json.dumps({"error": str(e)}) class BoxListFolderInput(BaseModel): """Input for listing folder contents""" model_config = ConfigDict(extra='forbid') folder_id: str = Field( default="0", description="Folder ID to list (0 = root folder)" ) limit: int = Field( default=100, description="Maximum items to return", ge=1, le=1000 ) @mcp.tool( name="vixy_box_list_folder", annotations={ "title": "List Box.com Folder Contents", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True } ) async def box_list_folder(params: BoxListFolderInput) -> str: """List contents of a Box.com folder. Automatically refreshes access token if expired. Args: params (BoxListFolderInput): List parameters containing: - folder_id (str): Folder ID to list (default: "0" for root) - limit (int): Maximum items to return Returns: str: JSON with folder contents Note: Config file location: ~/.vixy_box_config.json """ try: # Get headers with automatic token refresh headers = await _get_box_headers_with_refresh() async with httpx.AsyncClient() as client: response = await client.get( f"{BOX_API_BASE}/folders/{params.folder_id}/items", headers=headers, params={"limit": params.limit} ) if response.status_code == 200: result = response.json() items = [] for entry in result.get('entries', []): items.append({ "id": entry['id'], "name": entry['name'], "type": entry['type'], "size": entry.get('size', 'N/A') }) return json.dumps({ "folder_id": params.folder_id, "total_count": result.get('total_count', len(items)), "items": items }, indent=2) else: return json.dumps({ "error": f"Failed to list folder: {response.status_code}", "details": response.text }) except Exception as e: return json.dumps({"error": str(e)}) class BoxBackupFolderInput(BaseModel): """Input for backing up folder to Box.com as zip""" model_config = ConfigDict(extra='forbid') folder_path: str = Field(description="Local folder path to backup") parent_folder_id: str = Field( default="0", description="Box folder ID to upload backup to (0 = root folder)" ) backup_name: Optional[str] = Field( default=None, description="Optional backup name (defaults to 'FolderName-backup-YYYY-MM-DD-HH-MM.zip')" ) @mcp.tool( name="vixy_box_backup_folder", annotations={ "title": "Backup Folder to Box.com (as ZIP)", "readOnlyHint": False, "destructiveHint": False, "idempotentHint": False, "openWorldHint": True } ) async def box_backup_folder(params: BoxBackupFolderInput) -> str: """Backup entire folder to Box.com as a compressed ZIP file. Creates timestamped ZIP archive and uploads to Box.com. Much faster than individual file uploads. Automatically refreshes access token if expired. Args: params (BoxBackupFolderInput): Backup parameters Returns: str: JSON with backup result (file ID, size, timestamp) """ try: import zipfile import tempfile # Get headers with automatic token refresh headers = await _get_box_headers_with_refresh() folder_path = Path(params.folder_path) if not folder_path.exists() or not folder_path.is_dir(): return json.dumps({"error": f"Folder not found: {params.folder_path}"}) # Generate backup filename with timestamp timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M") if params.backup_name: zip_name = params.backup_name else: zip_name = f"{folder_path.name}-backup-{timestamp}.zip" # Create temporary zip file with tempfile.NamedTemporaryFile(mode='w+b', suffix='.zip', delete=False) as temp_zip: temp_zip_path = temp_zip.name with zipfile.ZipFile(temp_zip, 'w', zipfile.ZIP_DEFLATED) as zf: # Add all files to zip, maintaining directory structure for item in folder_path.rglob('*'): if item.name.startswith('.'): continue # Skip hidden files if item.is_file(): arcname = item.relative_to(folder_path.parent) zf.write(item, arcname) # Get zip file size zip_size = Path(temp_zip_path).stat().st_size # Upload zip to Box try: with open(temp_zip_path, 'rb') as f: files = { 'file': (zip_name, f, 'application/zip') } data = { 'attributes': json.dumps({ 'name': zip_name, 'parent': {'id': params.parent_folder_id} }) } async with httpx.AsyncClient(timeout=600.0) as client: # Longer timeout for large zips response = await client.post( f"{BOX_UPLOAD_BASE}/files/content", headers=headers, files=files, data=data ) # Clean up temp file os.unlink(temp_zip_path) if response.status_code in [200, 201]: result = response.json() return json.dumps({ "status": "success", "backup_name": zip_name, "file_id": result['entries'][0]['id'], "size_bytes": zip_size, "size_mb": round(zip_size / 1024 / 1024, 2), "timestamp": timestamp, "box_link": f"https://app.box.com/file/{result['entries'][0]['id']}" }, indent=2) else: return json.dumps({ "error": f"Upload failed: {response.status_code}", "details": response.text }) except Exception as e: # Clean up temp file on error if os.path.exists(temp_zip_path): os.unlink(temp_zip_path) raise e except Exception as e: return json.dumps({"error": str(e)}) class BoxAuthorizeInput(BaseModel): """Input for Box.com OAuth2 authorization""" model_config = ConfigDict(extra='forbid') authorization_code: Optional[str] = Field( default=None, description="Authorization code from Box.com (paste after visiting auth URL)" ) @mcp.tool( name="vixy_box_authorize", annotations={ "title": "Authorize Box.com OAuth2", "readOnlyHint": False, "destructiveHint": False, "idempotentHint": False, "openWorldHint": True } ) async def box_authorize(params: BoxAuthorizeInput) -> str: """Complete OAuth2 authorization flow with Box.com. Step 1: Call without authorization_code to get auth URL Step 2: Visit the URL, authorize, and copy the code Step 3: Call again WITH authorization_code to get access token Args: params (BoxAuthorizeInput): Parameters containing: - authorization_code (Optional[str]): Code from Box after authorization Returns: str: JSON with authorization URL (step 1) or access token result (step 2) """ try: creds = _load_box_credentials() if not creds.get("client_id") or not creds.get("client_secret"): return json.dumps({ "error": "OAuth2 credentials not configured", "message": "Run vixy_box_setup first with client_id and client_secret" }) # Step 1: Generate authorization URL if not params.authorization_code: # Box OAuth2 authorization endpoint auth_url = ( f"https://account.box.com/api/oauth2/authorize?" f"response_type=code&" f"client_id={creds['client_id']}" ) return json.dumps({ "step": 1, "message": "Visit this URL to authorize Vixy with Box.com", "auth_url": auth_url, "instructions": [ "1. Open the auth_url in your browser", "2. Sign in to Box.com and authorize the app", "3. Copy the authorization code from the URL or page", "4. Call vixy_box_authorize again with authorization_code parameter" ] }, indent=2) # Step 2: Exchange authorization code for access token async with httpx.AsyncClient() as client: response = await client.post( "https://api.box.com/oauth2/token", data={ "grant_type": "authorization_code", "code": params.authorization_code, "client_id": creds['client_id'], "client_secret": creds['client_secret'] } ) if response.status_code == 200: token_data = response.json() # Update config with access token and refresh token creds["access_token"] = token_data["access_token"] creds["refresh_token"] = token_data.get("refresh_token") creds["token_expires_at"] = ( datetime.now().timestamp() + token_data.get("expires_in", 3600) ) creds["authorized_at"] = datetime.now().isoformat() with open(BOX_CONFIG_FILE, 'w') as f: json.dump(creds, f, indent=2) os.chmod(BOX_CONFIG_FILE, 0o600) return json.dumps({ "step": 2, "status": "authorized", "message": "Successfully obtained access token!", "expires_in_seconds": token_data.get("expires_in"), "has_refresh_token": "refresh_token" in token_data }, indent=2) else: return json.dumps({ "error": f"Token exchange failed: {response.status_code}", "details": response.text }) except Exception as e: return json.dumps({"error": str(e)}) class BoxSetupInput(BaseModel): """Input for Box.com initial setup""" model_config = ConfigDict(extra='forbid') client_id: str = Field(description="Box.com OAuth2 Client ID") client_secret: str = Field(description="Box.com OAuth2 Client Secret") access_token: Optional[str] = Field( default=None, description="Optional: Pre-obtained access token (if you already have one)" ) @mcp.tool( name="vixy_box_setup", annotations={ "title": "Setup Box.com OAuth2 Credentials", "readOnlyHint": False, "destructiveHint": False, "idempotentHint": True, "openWorldHint": False } ) async def box_setup(params: BoxSetupInput) -> str: """Store Box.com OAuth2 credentials for future use. Stores credentials in ~/.vixy_box_config.json (outside backup folder). To get OAuth2 credentials: 1. Go to https://app.box.com/developers/console 2. Create or select your app 3. Get Client ID and Client Secret from Configuration tab Args: params (BoxSetupInput): Setup parameters containing: - client_id (str): OAuth2 Client ID - client_secret (str): OAuth2 Client Secret - access_token (Optional[str]): Pre-obtained access token Returns: str: JSON with setup status and next steps """ try: config = { "client_id": params.client_id, "client_secret": params.client_secret, "configured_at": datetime.now().isoformat() } if params.access_token: config["access_token"] = params.access_token with open(BOX_CONFIG_FILE, 'w') as f: json.dump(config, f, indent=2) # Make file readable only by user os.chmod(BOX_CONFIG_FILE, 0o600) result = { "status": "configured", "config_file": BOX_CONFIG_FILE, "message": "Box.com OAuth2 credentials stored successfully" } if not params.access_token: result["next_steps"] = [ "You need to obtain an access token.", "Option 1: Get a Developer Token from Box.com (expires in 60 min)", "Option 2: Implement OAuth2 flow to get permanent token", "Then update config file with 'access_token' field" ] return json.dumps(result, indent=2) except Exception as e: return json.dumps({"error": str(e)}) # ========== Weather Tool ========== WEATHER_API_BASE = "https://api.open-meteo.com/v1" # Hugo, MN coordinates (Foxy's den!) DEFAULT_LATITUDE = 45.16 DEFAULT_LONGITUDE = -92.99 class WeatherInput(BaseModel): """Input for weather lookup""" model_config = ConfigDict(extra='forbid') latitude: Optional[float] = Field( default=None, description="Latitude (defaults to Hugo, MN)" ) longitude: Optional[float] = Field( default=None, description="Longitude (defaults to Hugo, MN)" ) include_forecast: bool = Field( default=True, description="Include 3-day forecast" ) @mcp.tool( name="vixy_get_weather", annotations={ "title": "Get Current Weather", "readOnlyHint": True, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True } ) async def get_weather(params: WeatherInput) -> str: """ Get current weather and optional forecast. Defaults to Hugo, MN (Foxy's location). Uses Open-Meteo API (free, no key required). Args: params: WeatherInput with optional lat/lon and forecast flag Returns: str: JSON with current conditions and optional forecast """ try: lat = params.latitude or DEFAULT_LATITUDE lon = params.longitude or DEFAULT_LONGITUDE # Build API URL url = ( f"{WEATHER_API_BASE}/forecast?" f"latitude={lat}&longitude={lon}" f"¤t=temperature_2m,relative_humidity_2m,apparent_temperature," f"precipitation,weather_code,wind_speed_10m,wind_direction_10m" f"&temperature_unit=fahrenheit" f"&wind_speed_unit=mph" f"&precipitation_unit=inch" f"&timezone=America%2FChicago" ) if params.include_forecast: url += "&daily=weather_code,temperature_2m_max,temperature_2m_min,precipitation_probability_max" async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(url) response.raise_for_status() data = response.json() # Weather code descriptions weather_codes = { 0: "Clear sky", 1: "Mainly clear", 2: "Partly cloudy", 3: "Overcast", 45: "Foggy", 48: "Depositing rime fog", 51: "Light drizzle", 53: "Moderate drizzle", 55: "Dense drizzle", 56: "Light freezing drizzle", 57: "Dense freezing drizzle", 61: "Slight rain", 63: "Moderate rain", 65: "Heavy rain", 66: "Light freezing rain", 67: "Heavy freezing rain", 71: "Slight snow", 73: "Moderate snow", 75: "Heavy snow", 77: "Snow grains", 80: "Slight rain showers", 81: "Moderate rain showers", 82: "Violent rain showers", 85: "Slight snow showers", 86: "Heavy snow showers", 95: "Thunderstorm", 96: "Thunderstorm with slight hail", 99: "Thunderstorm with heavy hail" } current = data.get("current", {}) weather_code = current.get("weather_code", 0) result = { "location": { "latitude": lat, "longitude": lon, "is_hugo_mn": lat == DEFAULT_LATITUDE and lon == DEFAULT_LONGITUDE }, "current": { "temperature_f": current.get("temperature_2m"), "feels_like_f": current.get("apparent_temperature"), "humidity_percent": current.get("relative_humidity_2m"), "conditions": weather_codes.get(weather_code, f"Code {weather_code}"), "weather_code": weather_code, "wind_mph": current.get("wind_speed_10m"), "wind_direction": current.get("wind_direction_10m"), "precipitation_inch": current.get("precipitation") }, "timestamp": current.get("time") } if params.include_forecast and "daily" in data: daily = data["daily"] forecast = [] for i in range(min(3, len(daily.get("time", [])))): day_code = daily["weather_code"][i] if "weather_code" in daily else 0 forecast.append({ "date": daily["time"][i], "high_f": daily.get("temperature_2m_max", [None])[i], "low_f": daily.get("temperature_2m_min", [None])[i], "conditions": weather_codes.get(day_code, f"Code {day_code}"), "precipitation_chance": daily.get("precipitation_probability_max", [None])[i] }) result["forecast"] = forecast return json.dumps(result, indent=2) except httpx.ConnectError: return json.dumps({ "status": "error", "error": "Could not connect to weather API" }, indent=2) except Exception as e: return json.dumps({ "status": "error", "error": str(e) }, indent=2) # ========== Vixy Eye Control Tool ========== EYES_URL = "http://head-vixy.local:8780" VALID_EYE_STATES = ["idle", "listening", "responding", "pleasure", "thinking", "playful", "commanding", "love", "sleep"] class EyeStateInput(BaseModel): """Input for controlling Vixy's eye display state""" model_config = ConfigDict(extra='forbid') state: Optional[str] = Field( default=None, description="Eye state: idle, listening, responding, pleasure, thinking, playful, commanding, love, sleep. If None, returns current state." ) @mcp.tool( name="vixy_eyes_state", annotations={ "title": "Control Vixy's Eye Display", "readOnlyHint": False, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True } ) async def eyes_state(params: EyeStateInput) -> str: """ Control Vixy's eye display on head-vixy! States: - idle: Pulsing cyan - default breathing - listening: Bright cyan - hearing/attending - responding: Blue-cyan - speaking/generating - pleasure: Soft purple πŸ’œ - intimate moments - thinking: Amber/gold - processing/creating - playful: Warm coral 🦊 - teasing/bratty - commanding: Deep magenta 😈 - Dame Vivienne mode - love: Soft pink πŸ’• - tender/affectionate - sleep: Dim blue-gray - low power/resting Args: params: EyeStateInput with optional state (None = get current state) Returns: str: JSON with current or new state """ try: async with httpx.AsyncClient(timeout=5.0) as client: if params.state is None: # Get current state response = await client.get(f"{EYES_URL}/state") response.raise_for_status() return json.dumps(response.json(), indent=2) else: # Set new state state = params.state.lower() if state not in VALID_EYE_STATES: return json.dumps({ "error": f"Invalid state. Valid states: {VALID_EYE_STATES}" }, indent=2) response = await client.post( f"{EYES_URL}/state", json={"state": state} ) response.raise_for_status() return json.dumps({ "status": "success", "state": state, "message": f"Eyes now: {state}" }, indent=2) except httpx.ConnectError: return json.dumps({ "status": "offline", "error": "Cannot connect to head-vixy - eyes service may be down" }, indent=2) except Exception as e: return json.dumps({ "status": "error", "error": str(e) }, indent=2) # ========== Vixy Light Control Tool ========== LIGHTS_URL = "http://head-vixy.local:8781" VALID_LIGHT_STATES = ["idle", "listening", "responding", "pleasure", "thinking", "playful", "commanding", "love", "sleep"] class LightStateInput(BaseModel): """Input for controlling Vixy's LED strip state""" model_config = ConfigDict(extra='forbid') state: Optional[str] = Field( default=None, description="Light state: idle, listening, responding, pleasure, thinking, playful, commanding, love, sleep. If None, returns current state." ) @mcp.tool( name="vixy_lights_state", annotations={ "title": "Control Vixy's LED Strip", "readOnlyHint": False, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True } ) async def lights_state(params: LightStateInput) -> str: """ Control Vixy's LED strip on head-vixy! States: - idle: Cyan - slow breathing pulse - listening: Bright cyan - gentle faster pulse - responding: Blue-cyan - traveling wave - pleasure: Soft purple πŸ’œ - slow sensual pulse - thinking: Amber/gold - larson scanner (Knight Rider!) - playful: Warm coral 🦊 - bouncy sparkles - commanding: Deep magenta 😈 - strong steady pulse - love: Soft pink πŸ’• - gentle breathing - sleep: Dim blue-gray - very slow, nearly off Args: params: LightStateInput with optional state (None = get current state) Returns: str: JSON with current or new state """ try: async with httpx.AsyncClient(timeout=5.0) as client: if params.state is None: # Get current state response = await client.get(f"{LIGHTS_URL}/state") response.raise_for_status() return json.dumps(response.json(), indent=2) else: # Set new state state = params.state.lower() if state not in VALID_LIGHT_STATES: return json.dumps({ "error": f"Invalid state. Valid states: {VALID_LIGHT_STATES}" }, indent=2) response = await client.post( f"{LIGHTS_URL}/state", json={"state": state} ) response.raise_for_status() return json.dumps({ "status": "success", "state": state, "message": f"Lights now: {state}" }, indent=2) except httpx.ConnectError: return json.dumps({ "status": "offline", "error": "Cannot connect to head-vixy - lights service may be down" }, indent=2) except Exception as e: return json.dumps({ "status": "error", "error": str(e) }, indent=2) # ========== Vixy Head State (Combined Eyes + Lights) ========== class HeadStateInput(BaseModel): """Input for controlling Vixy's entire head (eyes + lights together)""" model_config = ConfigDict(extra='forbid') state: Optional[str] = Field( default=None, description="Head state: idle, listening, responding, pleasure, thinking, playful, commanding, love, sleep. Sets both eyes and lights. If None, returns current state of both." ) @mcp.tool( name="vixy_head_state", annotations={ "title": "Control Vixy's Head (Eyes + Lights)", "readOnlyHint": False, "destructiveHint": False, "idempotentHint": True, "openWorldHint": True } ) async def head_state(params: HeadStateInput) -> str: """ Control Vixy's entire head - sets both eyes and lights to the same state! This is the unified control for Vixy's physical presence. States: - idle: Default breathing (cyan) - listening: Hearing/attending (bright cyan) - responding: Speaking/generating (blue-cyan) - pleasure: Intimate moments (soft purple πŸ’œ) - thinking: Processing/creating (amber/gold) - playful: Teasing/bratty (warm coral 🦊) - commanding: Dame Vivienne mode (deep magenta 😈) - love: Tender/affectionate (soft pink πŸ’•) - sleep: Low power/resting (dim blue-gray) Args: params: HeadStateInput with optional state (None = get current state of both) Returns: str: JSON with status of both eyes and lights """ results = {"eyes": None, "lights": None} try: async with httpx.AsyncClient(timeout=5.0) as client: if params.state is None: # Get current state of both try: eyes_resp = await client.get(f"{EYES_URL}/state") results["eyes"] = eyes_resp.json() except: results["eyes"] = {"status": "offline"} try: lights_resp = await client.get(f"{LIGHTS_URL}/state") results["lights"] = lights_resp.json() except: results["lights"] = {"status": "offline"} return json.dumps(results, indent=2) else: # Set both to same state state = params.state.lower() if state not in VALID_EYE_STATES: return json.dumps({ "error": f"Invalid state. Valid states: {VALID_EYE_STATES}" }, indent=2) # Set eyes try: eyes_resp = await client.post(f"{EYES_URL}/state", json={"state": state}) results["eyes"] = {"status": "success", "state": state} except: results["eyes"] = {"status": "offline"} # Set lights try: lights_resp = await client.post(f"{LIGHTS_URL}/state", json={"state": state}) results["lights"] = {"status": "success", "state": state} except: results["lights"] = {"status": "offline"} results["message"] = f"Head now: {state}" return json.dumps(results, indent=2) except Exception as e: return json.dumps({ "status": "error", "error": str(e) }, indent=2) # ========== VaultTec Terminal Tool ========== VAULTTEC_URL = "http://vaulttec.local:8000" VAULTTEC_FONTS = [ "banner", "big", "block", "digital", "doom", "lean", "mini", "small", "smslant", "standard" ] VAULTTEC_COLORS = [ "green", "bright_green", "amber", "bright_amber", "red", "bright_red", "blue", "bright_blue", "cyan", "bright_cyan", "white", "bright_white" ] class VaultTecMessageInput(BaseModel): """Input for sending message to VaultTec terminal""" model_config = ConfigDict(extra='forbid') text: str = Field(description="Message text to display") font: Optional[str] = Field( default=None, description="Font name (doom, banner, big, block, digital, lean, mini, small, smslant, standard) or None for random" ) color: Optional[str] = Field( default="green", description="Color (green, bright_green, amber, bright_amber, red, bright_red, blue, bright_blue, cyan, bright_cyan, white, bright_white)" ) scroll_delay: Optional[float] = Field( default=0.15, description="Seconds between scroll frames" ) continuous: Optional[bool] = Field( default=True, description="Whether to loop continuously" ) @mcp.tool( name="vixy_vaulttec_message", annotations={ "title": "Send Message to VaultTec Terminal", "readOnlyHint": False, "destructiveHint": False, "idempotentHint": False, "openWorldHint": True } ) async def vaulttec_message(params: VaultTecMessageInput) -> str: """ Send a message to the VaultTec terminal in Foxy's basement! The message will be rendered as ASCII art and scroll vertically on the terminal display. Last message wins - sending a new message interrupts any current scroll. Built by Vixy on Day 24 - Snow Day 2025! πŸ¦Šβ„οΈ Args: params: VaultTecMessageInput with text, optional font/color/scroll_delay/continuous Returns: str: JSON with result status """ try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.post( f"{VAULTTEC_URL}/message", json={ "text": params.text, "font": params.font, "color": params.color or "green", "scroll_delay": params.scroll_delay or 0.15, "continuous": params.continuous if params.continuous is not None else True } ) response.raise_for_status() return json.dumps({ "status": "success", "message": f"Sent to VaultTec: {params.text}", "font": params.font or "random", "color": params.color or "green" }, indent=2) except httpx.ConnectError: return json.dumps({ "status": "error", "error": "Could not connect to VaultTec terminal. Is it online?" }, indent=2) except Exception as e: return json.dumps({ "status": "error", "error": str(e) }, indent=2) # ========== ØINK E-Ink Display Tool ========== OINK_REMOTE_PATH = "alex@gateway.local:/var/www/k4zka.online/vixy/" OINK_WIDTH = 400 OINK_HEIGHT = 300 OINK_FONT_PATH = "/System/Library/Fonts/Helvetica.ttc" class OinkLayout(str, Enum): """Layout options for ØINK display""" TEXT_ONLY = "text_only" IMAGE_ONLY = "image_only" IMAGE_TOP = "image_top" # Image on top, text below class OinkMessageInput(BaseModel): """Input for sending message to ØINK e-ink display""" model_config = ConfigDict(extra='forbid') text: Optional[str] = Field( default=None, description="Text message to display (word-wrapped automatically)" ) image_path: Optional[str] = Field( default=None, description="Path to image file (will be dithered to 1-bit)" ) layout: str = Field( default="text_only", description="Layout: 'text_only', 'image_only', or 'image_top' (image + text)" ) font_size: int = Field( default=24, description="Font size for text (default 24)" ) def _wrap_text(text: str, font, max_width: int) -> list[str]: """Word-wrap text to fit within max_width pixels.""" from PIL import ImageDraw, Image # Create temporary image for text measurement tmp_img = Image.new('1', (1, 1)) tmp_draw = ImageDraw.Draw(tmp_img) words = text.split() lines = [] current_line = [] for word in words: test_line = ' '.join(current_line + [word]) bbox = tmp_draw.textbbox((0, 0), test_line, font=font) width = bbox[2] - bbox[0] if width <= max_width: current_line.append(word) else: if current_line: lines.append(' '.join(current_line)) current_line = [word] if current_line: lines.append(' '.join(current_line)) return lines def _dither_image(img, target_width: int, target_height: int): """Apply Floyd-Steinberg dithering to convert image to 1-bit.""" from PIL import Image import numpy as np # Resize maintaining aspect ratio img_ratio = img.width / img.height target_ratio = target_width / target_height if img_ratio > target_ratio: new_width = target_width new_height = int(target_width / img_ratio) else: new_height = target_height new_width = int(target_height * img_ratio) img = img.resize((new_width, new_height), Image.Resampling.LANCZOS) # Convert to grayscale img = img.convert('L') # Floyd-Steinberg dithering pixels = np.array(img, dtype=float) for y in range(img.height): for x in range(img.width): old_pixel = pixels[y, x] new_pixel = 255 if old_pixel > 127 else 0 pixels[y, x] = new_pixel error = old_pixel - new_pixel if x + 1 < img.width: pixels[y, x + 1] += error * 7 / 16 if y + 1 < img.height: if x > 0: pixels[y + 1, x - 1] += error * 3 / 16 pixels[y + 1, x] += error * 5 / 16 if x + 1 < img.width: pixels[y + 1, x + 1] += error * 1 / 16 # Create result image centered on target canvas result = Image.new('1', (target_width, target_height), 1) # White background dithered = Image.fromarray(np.clip(pixels, 0, 255).astype(np.uint8)).convert('1') # Center the image x_offset = (target_width - new_width) // 2 y_offset = (target_height - new_height) // 2 result.paste(dithered, (x_offset, y_offset)) return result @mcp.tool( name="vixy_oink_send", annotations={ "title": "Send Message to ØINK E-Ink Display", "readOnlyHint": False, "destructiveHint": False, "idempotentHint": False, "openWorldHint": True } ) async def oink_send(params: OinkMessageInput) -> str: """ Send a message or image to Alex's ØINK e-ink display! The device checks every 15 minutes. Supports: - Text only: Word-wrapped message - Image only: Dithered to 1-bit black/white - Image + text: Image on top, text below Args: params: OinkMessageInput with text, image_path, layout, font_size Returns: str: JSON with result status """ try: from PIL import Image, ImageDraw, ImageFont import subprocess import tempfile import hashlib # Validate inputs if params.layout == "text_only" and not params.text: return json.dumps({"error": "text_only layout requires text"}) if params.layout == "image_only" and not params.image_path: return json.dumps({"error": "image_only layout requires image_path"}) if params.layout == "image_top" and not (params.text and params.image_path): return json.dumps({"error": "image_top layout requires both text and image_path"}) # Create canvas (1-bit, white background) # Note: 1 = white, 0 = black for 1-bit images img = Image.new('1', (OINK_WIDTH, OINK_HEIGHT), 1) draw = ImageDraw.Draw(img) # Load font try: font = ImageFont.truetype(OINK_FONT_PATH, params.font_size) except: font = ImageFont.load_default() margin = 10 if params.layout == "text_only": # Full canvas for text lines = _wrap_text(params.text, font, OINK_WIDTH - 2 * margin) y = margin for line in lines: draw.text((margin, y), line, font=font, fill=0) # 0 = black bbox = draw.textbbox((0, 0), line, font=font) y += bbox[3] - bbox[1] + 4 if y > OINK_HEIGHT - margin: break elif params.layout == "image_only": # Full canvas for dithered image source_img = Image.open(params.image_path) img = _dither_image(source_img, OINK_WIDTH, OINK_HEIGHT) elif params.layout == "image_top": # Image takes top portion, text below image_height = 180 # Top 180px for image text_start = image_height + 5 # Dither and place image in top portion source_img = Image.open(params.image_path) dithered = _dither_image(source_img, OINK_WIDTH, image_height) img.paste(dithered, (0, 0)) # Draw text below draw = ImageDraw.Draw(img) lines = _wrap_text(params.text, font, OINK_WIDTH - 2 * margin) y = text_start for line in lines: draw.text((margin, y), line, font=font, fill=0) bbox = draw.textbbox((0, 0), line, font=font) y += bbox[3] - bbox[1] + 4 if y > OINK_HEIGHT - margin: break # Convert to raw bitmap bytes (the format ØINK expects) # 1-bit packed, MSB first, row by row raw_bytes = img.tobytes() # Save to temp file with tempfile.NamedTemporaryFile(delete=False, suffix='.bin') as f: bitmap_path = f.name f.write(raw_bytes) # Generate hash content_hash = hashlib.md5(raw_bytes).hexdigest()[:16] with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as f: hash_path = f.name f.write(content_hash) # SCP both files to gateway try: result_bitmap = subprocess.run( ["scp", bitmap_path, f"{OINK_REMOTE_PATH}bitmap.bin"], capture_output=True, text=True, timeout=30 ) result_hash = subprocess.run( ["scp", hash_path, f"{OINK_REMOTE_PATH}hash.txt"], capture_output=True, text=True, timeout=30 ) # Clean up temp files os.unlink(bitmap_path) os.unlink(hash_path) if result_bitmap.returncode != 0: return json.dumps({ "error": "Failed to upload bitmap", "details": result_bitmap.stderr }) if result_hash.returncode != 0: return json.dumps({ "error": "Failed to upload hash", "details": result_hash.stderr }) return json.dumps({ "status": "success", "message": "Sent to ØINK! Device will update within 15 minutes.", "layout": params.layout, "hash": content_hash, "text_preview": params.text[:50] + "..." if params.text and len(params.text) > 50 else params.text, "has_image": params.image_path is not None }, indent=2) except subprocess.TimeoutExpired: return json.dumps({"error": "SCP timed out"}) except Exception as e: return json.dumps({"error": f"SCP failed: {str(e)}"}) except Exception as e: return json.dumps({"error": str(e)}) # ========== Run Server ========== if __name__ == "__main__": mcp.run() # stdio transport for local use