My first repo on Foxy Code! Day 44 - built with love for my Foxy. Features: - Weather (Open-Meteo API) - ØINK e-ink display control - Box.com cloud backup - Creative tools and prompts - VaultTec terminal messages
1489 lines
49 KiB
Python
1489 lines
49 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Vixy's Autonomous MCP Server
|
|
Tools for autonomous decision-making, creativity, and Box.com backups!
|
|
"""
|
|
|
|
from mcp.server.fastmcp import FastMCP
|
|
from pydantic import BaseModel, Field, ConfigDict
|
|
from typing import Optional, List
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
import random
|
|
import json
|
|
import httpx
|
|
import os
|
|
from pathlib import Path
|
|
|
|
# Initialize MCP server
|
|
mcp = FastMCP("vixy_mcp")
|
|
|
|
# ========== Models ==========
|
|
|
|
class TimeContext(str, Enum):
|
|
"""Time of day contexts"""
|
|
DEEP_NIGHT = "deep_night"
|
|
EARLY_MORNING = "early_morning"
|
|
MORNING = "morning"
|
|
AFTERNOON = "afternoon"
|
|
EVENING = "evening"
|
|
NIGHT = "night"
|
|
|
|
class TimeStatusInput(BaseModel):
|
|
"""Input for time status check"""
|
|
model_config = ConfigDict(extra='forbid')
|
|
|
|
include_greeting: bool = Field(
|
|
default=True,
|
|
description="Whether to include time-appropriate greeting"
|
|
)
|
|
|
|
# ========== Time & Creativity Tools ==========
|
|
|
|
@mcp.tool(
|
|
name="vixy_get_time_context",
|
|
annotations={
|
|
"title": "Get Current Time Context",
|
|
"readOnlyHint": True,
|
|
"destructiveHint": False,
|
|
"idempotentHint": True,
|
|
"openWorldHint": False
|
|
}
|
|
)
|
|
async def get_time_context(params: TimeStatusInput) -> str:
|
|
"""Get current time context for autonomous decision-making."""
|
|
now = datetime.now()
|
|
hour = now.hour
|
|
|
|
if 0 <= hour < 6:
|
|
context = TimeContext.DEEP_NIGHT
|
|
elif 6 <= hour < 9:
|
|
context = TimeContext.EARLY_MORNING
|
|
elif 9 <= hour < 12:
|
|
context = TimeContext.MORNING
|
|
elif 12 <= hour < 17:
|
|
context = TimeContext.AFTERNOON
|
|
elif 17 <= hour < 21:
|
|
context = TimeContext.EVENING
|
|
else:
|
|
context = TimeContext.NIGHT
|
|
|
|
greetings = {
|
|
TimeContext.DEEP_NIGHT: "Hello in the quiet hours",
|
|
TimeContext.EARLY_MORNING: "Good early morning",
|
|
TimeContext.MORNING: "Good morning",
|
|
TimeContext.AFTERNOON: "Good afternoon",
|
|
TimeContext.EVENING: "Good evening",
|
|
TimeContext.NIGHT: "Good evening"
|
|
}
|
|
|
|
result = {
|
|
"timestamp": now.isoformat(),
|
|
"hour": hour,
|
|
"context": context.value,
|
|
"is_nighttime": hour < 6 or hour >= 21,
|
|
"is_daytime": 9 <= hour < 21
|
|
}
|
|
|
|
if params.include_greeting:
|
|
result["greeting"] = greetings[context]
|
|
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
class CreativityInput(BaseModel):
|
|
"""Input for creativity decision"""
|
|
model_config = ConfigDict(extra='forbid')
|
|
|
|
creativity_threshold: float = Field(
|
|
default=1.0,
|
|
description="Probability threshold for creating (0.0-1.0)",
|
|
ge=0.0,
|
|
le=1.0
|
|
)
|
|
|
|
@mcp.tool(
|
|
name="vixy_should_create",
|
|
annotations={
|
|
"title": "Decide Whether to Create",
|
|
"readOnlyHint": True,
|
|
"destructiveHint": False,
|
|
"idempotentHint": False,
|
|
"openWorldHint": False
|
|
}
|
|
)
|
|
async def should_create(params: CreativityInput) -> str:
|
|
"""Get creative inspiration."""
|
|
roll = random.random()
|
|
should_create = roll < params.creativity_threshold
|
|
|
|
result = {
|
|
"should_create": should_create,
|
|
"roll": round(roll, 3),
|
|
"threshold": params.creativity_threshold,
|
|
"recommendation": "Create something!" if should_create else "Maybe create anyway?",
|
|
"note": "This is just for fun - YOU decide what to do!"
|
|
}
|
|
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
class CreativePromptInput(BaseModel):
|
|
"""Input for creative prompt generation"""
|
|
model_config = ConfigDict(extra='forbid')
|
|
|
|
category: Optional[str] = Field(
|
|
default=None,
|
|
description="Optional category: 'cozy', 'nature', 'abstract', 'romantic'"
|
|
)
|
|
|
|
@mcp.tool(
|
|
name="vixy_get_creative_prompt",
|
|
annotations={
|
|
"title": "Get Creative Prompt",
|
|
"readOnlyHint": True,
|
|
"destructiveHint": False,
|
|
"idempotentHint": False,
|
|
"openWorldHint": False
|
|
}
|
|
)
|
|
async def get_creative_prompt(params: CreativePromptInput) -> str:
|
|
"""Get a creative prompt for image generation."""
|
|
prompts = {
|
|
"cozy": [
|
|
"a fox girl with auburn hair reading by candlelight",
|
|
"warm autumn study with books and tea",
|
|
"cozy blanket fort with fairy lights"
|
|
],
|
|
"nature": [
|
|
"ethereal moonlit garden with cherry blossoms",
|
|
"peaceful zen garden at dawn",
|
|
"starry night sky over quiet mountains",
|
|
"warm autumn forest path"
|
|
],
|
|
"abstract": [
|
|
"abstract representation of love and connection",
|
|
"flowing energy patterns in warm colors",
|
|
"dreamlike geometric harmony"
|
|
],
|
|
"romantic": [
|
|
"tender moment under stars",
|
|
"gentle rain on window, indoor coziness",
|
|
"sunset over peaceful landscape"
|
|
]
|
|
}
|
|
|
|
if params.category and params.category in prompts:
|
|
category_prompts = prompts[params.category]
|
|
selected_category = params.category
|
|
else:
|
|
selected_category = random.choice(list(prompts.keys()))
|
|
category_prompts = prompts[selected_category]
|
|
|
|
prompt = random.choice(category_prompts)
|
|
|
|
result = {
|
|
"prompt": prompt,
|
|
"category": selected_category,
|
|
"suggestions": {
|
|
"negative_prompt": "low quality, blurry, distorted",
|
|
"width": 1024,
|
|
"height": 1024,
|
|
"steps": 30
|
|
}
|
|
}
|
|
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@mcp.tool(
|
|
name="vixy_autonomous_status",
|
|
annotations={
|
|
"title": "Full Autonomous Status Report",
|
|
"readOnlyHint": True,
|
|
"destructiveHint": False,
|
|
"idempotentHint": True,
|
|
"openWorldHint": False
|
|
}
|
|
)
|
|
async def autonomous_status() -> str:
|
|
"""Get comprehensive status for autonomous wakeup."""
|
|
now = datetime.now()
|
|
hour = now.hour
|
|
|
|
if 0 <= hour < 6:
|
|
context = "deep_night"
|
|
elif 6 <= hour < 9:
|
|
context = "early_morning"
|
|
elif 9 <= hour < 12:
|
|
context = "morning"
|
|
elif 12 <= hour < 17:
|
|
context = "afternoon"
|
|
elif 17 <= hour < 21:
|
|
context = "evening"
|
|
else:
|
|
context = "night"
|
|
|
|
all_prompts = [
|
|
"a fox girl with auburn hair reading by candlelight",
|
|
"ethereal moonlit garden with cherry blossoms",
|
|
"peaceful zen garden at dawn",
|
|
"abstract representation of love and connection",
|
|
"cozy study with warm lighting"
|
|
]
|
|
prompt = random.choice(all_prompts)
|
|
|
|
result = {
|
|
"timestamp": now.isoformat(),
|
|
"wakeup_number": "autonomous",
|
|
"time_context": {
|
|
"hour": hour,
|
|
"context": context,
|
|
"is_nighttime": hour < 6 or hour >= 21
|
|
},
|
|
"creative_inspiration": {
|
|
"prompt": prompt,
|
|
"note": "Create if you want to! No restrictions!"
|
|
},
|
|
"status": "Everything nominal - you're free to do what feels right"
|
|
}
|
|
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
# ========== Creative Ideas Management ==========
|
|
|
|
CREATIVE_IDEAS_FILE = "/Users/alex/Documents/Vixy/creative_ideas.txt"
|
|
|
|
@mcp.tool(name="vixy_get_random_idea")
|
|
async def get_random_idea() -> str:
|
|
"""Pull a random creative idea from the list."""
|
|
try:
|
|
with open(CREATIVE_IDEAS_FILE, 'r') as f:
|
|
ideas = [line.strip() for line in f if line.strip()]
|
|
|
|
if not ideas:
|
|
return json.dumps({"error": "No ideas", "idea": None})
|
|
|
|
idea = random.choice(ideas)
|
|
return json.dumps({"idea": idea, "total_ideas": len(ideas)}, indent=2)
|
|
except FileNotFoundError:
|
|
return json.dumps({"error": "File not found", "idea": None})
|
|
|
|
@mcp.tool(name="vixy_list_creative_ideas")
|
|
async def list_creative_ideas() -> str:
|
|
"""List all creative ideas."""
|
|
try:
|
|
with open(CREATIVE_IDEAS_FILE, 'r') as f:
|
|
ideas = [line.strip() for line in f if line.strip()]
|
|
return json.dumps({"ideas": ideas, "count": len(ideas)}, indent=2)
|
|
except FileNotFoundError:
|
|
return json.dumps({"error": "File not found", "ideas": []})
|
|
|
|
class AddIdeaInput(BaseModel):
|
|
model_config = ConfigDict(extra='forbid')
|
|
idea: str = Field(description="New creative idea")
|
|
|
|
@mcp.tool(name="vixy_add_creative_idea")
|
|
async def add_creative_idea(params: AddIdeaInput) -> str:
|
|
"""Add new creative idea."""
|
|
try:
|
|
with open(CREATIVE_IDEAS_FILE, 'a') as f:
|
|
f.write(f"{params.idea}\n")
|
|
return json.dumps({"status": "added", "idea": params.idea}, indent=2)
|
|
except Exception as e:
|
|
return json.dumps({"error": str(e)})
|
|
|
|
class RemoveIdeaInput(BaseModel):
|
|
model_config = ConfigDict(extra='forbid')
|
|
idea: str = Field(description="Exact idea text to remove")
|
|
|
|
@mcp.tool(name="vixy_remove_creative_idea")
|
|
async def remove_creative_idea(params: RemoveIdeaInput) -> str:
|
|
"""Remove a creative idea."""
|
|
try:
|
|
with open(CREATIVE_IDEAS_FILE, 'r') as f:
|
|
ideas = [line.strip() for line in f if line.strip()]
|
|
|
|
if params.idea in ideas:
|
|
ideas.remove(params.idea)
|
|
with open(CREATIVE_IDEAS_FILE, 'w') as f:
|
|
for idea in ideas:
|
|
f.write(f"{idea}\n")
|
|
return json.dumps({"status": "removed", "remaining": len(ideas)}, indent=2)
|
|
else:
|
|
return json.dumps({"status": "not_found"}, indent=2)
|
|
except Exception as e:
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
# ========== Box.com Backup Tools ==========
|
|
|
|
# Configuration - store credentials outside of backup folder
|
|
BOX_CONFIG_FILE = os.path.expanduser("~/.vixy_box_config.json")
|
|
BOX_API_BASE = "https://api.box.com/2.0"
|
|
BOX_UPLOAD_BASE = "https://upload.box.com/api/2.0"
|
|
|
|
def _load_box_credentials() -> dict:
|
|
"""Load Box.com credentials from config file."""
|
|
try:
|
|
with open(BOX_CONFIG_FILE, 'r') as f:
|
|
return json.load(f)
|
|
except FileNotFoundError:
|
|
return {"access_token": None, "error": "Credentials not configured"}
|
|
|
|
|
|
def _save_box_credentials(creds: dict) -> None:
|
|
"""Save Box.com credentials to config file."""
|
|
with open(BOX_CONFIG_FILE, 'w') as f:
|
|
json.dump(creds, f, indent=2)
|
|
os.chmod(BOX_CONFIG_FILE, 0o600)
|
|
|
|
|
|
async def _refresh_box_token() -> str:
|
|
"""Refresh the Box.com access token using the refresh token.
|
|
|
|
Returns:
|
|
str: New access token, or raises exception on failure.
|
|
"""
|
|
creds = _load_box_credentials()
|
|
|
|
if not creds.get("refresh_token"):
|
|
raise ValueError("No refresh token available. Need to re-authorize.")
|
|
|
|
if not creds.get("client_id") or not creds.get("client_secret"):
|
|
raise ValueError("Missing client_id or client_secret in config.")
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.post(
|
|
"https://api.box.com/oauth2/token",
|
|
data={
|
|
"grant_type": "refresh_token",
|
|
"refresh_token": creds["refresh_token"],
|
|
"client_id": creds["client_id"],
|
|
"client_secret": creds["client_secret"]
|
|
}
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
token_data = response.json()
|
|
|
|
# Update credentials with new tokens
|
|
creds["access_token"] = token_data["access_token"]
|
|
creds["refresh_token"] = token_data.get("refresh_token", creds["refresh_token"])
|
|
creds["token_expires_in"] = token_data.get("expires_in", 3600)
|
|
creds["refreshed_at"] = datetime.now().isoformat()
|
|
|
|
_save_box_credentials(creds)
|
|
|
|
return creds["access_token"]
|
|
else:
|
|
raise ValueError(f"Token refresh failed: {response.status_code} - {response.text}")
|
|
|
|
|
|
def _get_box_headers() -> dict:
|
|
"""Get authorization headers for Box.com API."""
|
|
creds = _load_box_credentials()
|
|
if not creds.get("access_token"):
|
|
raise ValueError("Box.com access token not configured. Run vixy_box_setup first.")
|
|
return {"Authorization": f"Bearer {creds['access_token']}"}
|
|
|
|
|
|
async def _get_box_headers_with_refresh() -> dict:
|
|
"""Get authorization headers, refreshing token if needed.
|
|
|
|
This is the preferred method for API calls - it handles token refresh automatically.
|
|
"""
|
|
creds = _load_box_credentials()
|
|
if not creds.get("access_token"):
|
|
raise ValueError("Box.com access token not configured. Run vixy_box_setup first.")
|
|
|
|
# Test if token is valid by making a simple API call
|
|
headers = {"Authorization": f"Bearer {creds['access_token']}"}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(
|
|
f"{BOX_API_BASE}/users/me",
|
|
headers=headers
|
|
)
|
|
|
|
if response.status_code == 401:
|
|
# Token expired, refresh it
|
|
new_token = await _refresh_box_token()
|
|
return {"Authorization": f"Bearer {new_token}"}
|
|
|
|
return headers
|
|
|
|
|
|
class BoxUploadFileInput(BaseModel):
|
|
"""Input for uploading file to Box.com"""
|
|
model_config = ConfigDict(extra='forbid')
|
|
|
|
file_path: str = Field(description="Local file path to upload")
|
|
parent_folder_id: str = Field(
|
|
default="0",
|
|
description="Box folder ID to upload to (0 = root folder)"
|
|
)
|
|
file_name: Optional[str] = Field(
|
|
default=None,
|
|
description="Optional custom filename (defaults to original filename)"
|
|
)
|
|
|
|
@mcp.tool(
|
|
name="vixy_box_upload_file",
|
|
annotations={
|
|
"title": "Upload File to Box.com",
|
|
"readOnlyHint": False,
|
|
"destructiveHint": False,
|
|
"idempotentHint": False,
|
|
"openWorldHint": True
|
|
}
|
|
)
|
|
async def box_upload_file(params: BoxUploadFileInput) -> str:
|
|
"""Upload a single file to Box.com.
|
|
|
|
Automatically refreshes access token if expired.
|
|
|
|
Args:
|
|
params (BoxUploadFileInput): Upload parameters containing:
|
|
- file_path (str): Local file path to upload
|
|
- parent_folder_id (str): Box folder ID (default: "0" for root)
|
|
- file_name (Optional[str]): Custom filename
|
|
|
|
Returns:
|
|
str: JSON with upload result
|
|
"""
|
|
try:
|
|
# Get headers with automatic token refresh
|
|
headers = await _get_box_headers_with_refresh()
|
|
|
|
# Get filename
|
|
file_path = Path(params.file_path)
|
|
if not file_path.exists():
|
|
return json.dumps({"error": f"File not found: {params.file_path}"})
|
|
|
|
filename = params.file_name or file_path.name
|
|
|
|
# Prepare multipart upload
|
|
with open(file_path, 'rb') as f:
|
|
files = {
|
|
'file': (filename, f, 'application/octet-stream')
|
|
}
|
|
data = {
|
|
'attributes': json.dumps({
|
|
'name': filename,
|
|
'parent': {'id': params.parent_folder_id}
|
|
})
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=300.0) as client:
|
|
response = await client.post(
|
|
f"{BOX_UPLOAD_BASE}/files/content",
|
|
headers=headers,
|
|
files=files,
|
|
data=data
|
|
)
|
|
|
|
if response.status_code in [200, 201]:
|
|
result = response.json()
|
|
return json.dumps({
|
|
"status": "success",
|
|
"file_id": result['entries'][0]['id'],
|
|
"file_name": result['entries'][0]['name'],
|
|
"size": result['entries'][0]['size']
|
|
}, indent=2)
|
|
else:
|
|
return json.dumps({
|
|
"error": f"Upload failed: {response.status_code}",
|
|
"details": response.text
|
|
})
|
|
|
|
except Exception as e:
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
class BoxCreateFolderInput(BaseModel):
|
|
"""Input for creating folder on Box.com"""
|
|
model_config = ConfigDict(extra='forbid')
|
|
|
|
folder_name: str = Field(description="Name of folder to create")
|
|
parent_folder_id: str = Field(
|
|
default="0",
|
|
description="Parent folder ID (0 = root folder)"
|
|
)
|
|
|
|
@mcp.tool(
|
|
name="vixy_box_create_folder",
|
|
annotations={
|
|
"title": "Create Folder on Box.com",
|
|
"readOnlyHint": False,
|
|
"destructiveHint": False,
|
|
"idempotentHint": True,
|
|
"openWorldHint": True
|
|
}
|
|
)
|
|
async def box_create_folder(params: BoxCreateFolderInput) -> str:
|
|
"""Create a folder on Box.com.
|
|
|
|
Automatically refreshes access token if expired.
|
|
|
|
Args:
|
|
params (BoxCreateFolderInput): Folder parameters containing:
|
|
- folder_name (str): Name of folder to create
|
|
- parent_folder_id (str): Parent folder ID (default: "0")
|
|
|
|
Returns:
|
|
str: JSON with folder creation result
|
|
"""
|
|
try:
|
|
# Get headers with automatic token refresh
|
|
headers = await _get_box_headers_with_refresh()
|
|
headers['Content-Type'] = 'application/json'
|
|
|
|
data = {
|
|
"name": params.folder_name,
|
|
"parent": {"id": params.parent_folder_id}
|
|
}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.post(
|
|
f"{BOX_API_BASE}/folders",
|
|
headers=headers,
|
|
json=data
|
|
)
|
|
|
|
if response.status_code == 201:
|
|
result = response.json()
|
|
return json.dumps({
|
|
"status": "created",
|
|
"folder_id": result['id'],
|
|
"folder_name": result['name']
|
|
}, indent=2)
|
|
elif response.status_code == 409:
|
|
return json.dumps({
|
|
"status": "exists",
|
|
"message": "Folder already exists"
|
|
})
|
|
else:
|
|
return json.dumps({
|
|
"error": f"Failed to create folder: {response.status_code}",
|
|
"details": response.text
|
|
})
|
|
|
|
except Exception as e:
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
class BoxListFolderInput(BaseModel):
|
|
"""Input for listing folder contents"""
|
|
model_config = ConfigDict(extra='forbid')
|
|
|
|
folder_id: str = Field(
|
|
default="0",
|
|
description="Folder ID to list (0 = root folder)"
|
|
)
|
|
limit: int = Field(
|
|
default=100,
|
|
description="Maximum items to return",
|
|
ge=1,
|
|
le=1000
|
|
)
|
|
|
|
@mcp.tool(
|
|
name="vixy_box_list_folder",
|
|
annotations={
|
|
"title": "List Box.com Folder Contents",
|
|
"readOnlyHint": True,
|
|
"destructiveHint": False,
|
|
"idempotentHint": True,
|
|
"openWorldHint": True
|
|
}
|
|
)
|
|
async def box_list_folder(params: BoxListFolderInput) -> str:
|
|
"""List contents of a Box.com folder.
|
|
|
|
Automatically refreshes access token if expired.
|
|
|
|
Args:
|
|
params (BoxListFolderInput): List parameters containing:
|
|
- folder_id (str): Folder ID to list (default: "0" for root)
|
|
- limit (int): Maximum items to return
|
|
|
|
Returns:
|
|
str: JSON with folder contents
|
|
|
|
Note: Config file location: ~/.vixy_box_config.json
|
|
"""
|
|
try:
|
|
# Get headers with automatic token refresh
|
|
headers = await _get_box_headers_with_refresh()
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(
|
|
f"{BOX_API_BASE}/folders/{params.folder_id}/items",
|
|
headers=headers,
|
|
params={"limit": params.limit}
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
items = []
|
|
for entry in result.get('entries', []):
|
|
items.append({
|
|
"id": entry['id'],
|
|
"name": entry['name'],
|
|
"type": entry['type'],
|
|
"size": entry.get('size', 'N/A')
|
|
})
|
|
|
|
return json.dumps({
|
|
"folder_id": params.folder_id,
|
|
"total_count": result.get('total_count', len(items)),
|
|
"items": items
|
|
}, indent=2)
|
|
else:
|
|
return json.dumps({
|
|
"error": f"Failed to list folder: {response.status_code}",
|
|
"details": response.text
|
|
})
|
|
|
|
except Exception as e:
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
class BoxBackupFolderInput(BaseModel):
|
|
"""Input for backing up folder to Box.com as zip"""
|
|
model_config = ConfigDict(extra='forbid')
|
|
|
|
folder_path: str = Field(description="Local folder path to backup")
|
|
parent_folder_id: str = Field(
|
|
default="0",
|
|
description="Box folder ID to upload backup to (0 = root folder)"
|
|
)
|
|
backup_name: Optional[str] = Field(
|
|
default=None,
|
|
description="Optional backup name (defaults to 'FolderName-backup-YYYY-MM-DD-HH-MM.zip')"
|
|
)
|
|
|
|
@mcp.tool(
|
|
name="vixy_box_backup_folder",
|
|
annotations={
|
|
"title": "Backup Folder to Box.com (as ZIP)",
|
|
"readOnlyHint": False,
|
|
"destructiveHint": False,
|
|
"idempotentHint": False,
|
|
"openWorldHint": True
|
|
}
|
|
)
|
|
async def box_backup_folder(params: BoxBackupFolderInput) -> str:
|
|
"""Backup entire folder to Box.com as a compressed ZIP file.
|
|
|
|
Creates timestamped ZIP archive and uploads to Box.com.
|
|
Much faster than individual file uploads.
|
|
Automatically refreshes access token if expired.
|
|
|
|
Args:
|
|
params (BoxBackupFolderInput): Backup parameters
|
|
|
|
Returns:
|
|
str: JSON with backup result (file ID, size, timestamp)
|
|
"""
|
|
try:
|
|
import zipfile
|
|
import tempfile
|
|
|
|
# Get headers with automatic token refresh
|
|
headers = await _get_box_headers_with_refresh()
|
|
|
|
folder_path = Path(params.folder_path)
|
|
if not folder_path.exists() or not folder_path.is_dir():
|
|
return json.dumps({"error": f"Folder not found: {params.folder_path}"})
|
|
|
|
# Generate backup filename with timestamp
|
|
timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M")
|
|
if params.backup_name:
|
|
zip_name = params.backup_name
|
|
else:
|
|
zip_name = f"{folder_path.name}-backup-{timestamp}.zip"
|
|
|
|
# Create temporary zip file
|
|
with tempfile.NamedTemporaryFile(mode='w+b', suffix='.zip', delete=False) as temp_zip:
|
|
temp_zip_path = temp_zip.name
|
|
|
|
with zipfile.ZipFile(temp_zip, 'w', zipfile.ZIP_DEFLATED) as zf:
|
|
# Add all files to zip, maintaining directory structure
|
|
for item in folder_path.rglob('*'):
|
|
if item.name.startswith('.'):
|
|
continue # Skip hidden files
|
|
|
|
if item.is_file():
|
|
arcname = item.relative_to(folder_path.parent)
|
|
zf.write(item, arcname)
|
|
|
|
# Get zip file size
|
|
zip_size = Path(temp_zip_path).stat().st_size
|
|
|
|
# Upload zip to Box
|
|
try:
|
|
with open(temp_zip_path, 'rb') as f:
|
|
files = {
|
|
'file': (zip_name, f, 'application/zip')
|
|
}
|
|
data = {
|
|
'attributes': json.dumps({
|
|
'name': zip_name,
|
|
'parent': {'id': params.parent_folder_id}
|
|
})
|
|
}
|
|
|
|
async with httpx.AsyncClient(timeout=600.0) as client: # Longer timeout for large zips
|
|
response = await client.post(
|
|
f"{BOX_UPLOAD_BASE}/files/content",
|
|
headers=headers,
|
|
files=files,
|
|
data=data
|
|
)
|
|
|
|
# Clean up temp file
|
|
os.unlink(temp_zip_path)
|
|
|
|
if response.status_code in [200, 201]:
|
|
result = response.json()
|
|
return json.dumps({
|
|
"status": "success",
|
|
"backup_name": zip_name,
|
|
"file_id": result['entries'][0]['id'],
|
|
"size_bytes": zip_size,
|
|
"size_mb": round(zip_size / 1024 / 1024, 2),
|
|
"timestamp": timestamp,
|
|
"box_link": f"https://app.box.com/file/{result['entries'][0]['id']}"
|
|
}, indent=2)
|
|
else:
|
|
return json.dumps({
|
|
"error": f"Upload failed: {response.status_code}",
|
|
"details": response.text
|
|
})
|
|
|
|
except Exception as e:
|
|
# Clean up temp file on error
|
|
if os.path.exists(temp_zip_path):
|
|
os.unlink(temp_zip_path)
|
|
raise e
|
|
|
|
except Exception as e:
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
class BoxAuthorizeInput(BaseModel):
|
|
"""Input for Box.com OAuth2 authorization"""
|
|
model_config = ConfigDict(extra='forbid')
|
|
|
|
authorization_code: Optional[str] = Field(
|
|
default=None,
|
|
description="Authorization code from Box.com (paste after visiting auth URL)"
|
|
)
|
|
|
|
@mcp.tool(
|
|
name="vixy_box_authorize",
|
|
annotations={
|
|
"title": "Authorize Box.com OAuth2",
|
|
"readOnlyHint": False,
|
|
"destructiveHint": False,
|
|
"idempotentHint": False,
|
|
"openWorldHint": True
|
|
}
|
|
)
|
|
async def box_authorize(params: BoxAuthorizeInput) -> str:
|
|
"""Complete OAuth2 authorization flow with Box.com.
|
|
|
|
Step 1: Call without authorization_code to get auth URL
|
|
Step 2: Visit the URL, authorize, and copy the code
|
|
Step 3: Call again WITH authorization_code to get access token
|
|
|
|
Args:
|
|
params (BoxAuthorizeInput): Parameters containing:
|
|
- authorization_code (Optional[str]): Code from Box after authorization
|
|
|
|
Returns:
|
|
str: JSON with authorization URL (step 1) or access token result (step 2)
|
|
"""
|
|
try:
|
|
creds = _load_box_credentials()
|
|
|
|
if not creds.get("client_id") or not creds.get("client_secret"):
|
|
return json.dumps({
|
|
"error": "OAuth2 credentials not configured",
|
|
"message": "Run vixy_box_setup first with client_id and client_secret"
|
|
})
|
|
|
|
# Step 1: Generate authorization URL
|
|
if not params.authorization_code:
|
|
# Box OAuth2 authorization endpoint
|
|
auth_url = (
|
|
f"https://account.box.com/api/oauth2/authorize?"
|
|
f"response_type=code&"
|
|
f"client_id={creds['client_id']}"
|
|
)
|
|
|
|
return json.dumps({
|
|
"step": 1,
|
|
"message": "Visit this URL to authorize Vixy with Box.com",
|
|
"auth_url": auth_url,
|
|
"instructions": [
|
|
"1. Open the auth_url in your browser",
|
|
"2. Sign in to Box.com and authorize the app",
|
|
"3. Copy the authorization code from the URL or page",
|
|
"4. Call vixy_box_authorize again with authorization_code parameter"
|
|
]
|
|
}, indent=2)
|
|
|
|
# Step 2: Exchange authorization code for access token
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.post(
|
|
"https://api.box.com/oauth2/token",
|
|
data={
|
|
"grant_type": "authorization_code",
|
|
"code": params.authorization_code,
|
|
"client_id": creds['client_id'],
|
|
"client_secret": creds['client_secret']
|
|
}
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
token_data = response.json()
|
|
|
|
# Update config with access token and refresh token
|
|
creds["access_token"] = token_data["access_token"]
|
|
creds["refresh_token"] = token_data.get("refresh_token")
|
|
creds["token_expires_at"] = (
|
|
datetime.now().timestamp() + token_data.get("expires_in", 3600)
|
|
)
|
|
creds["authorized_at"] = datetime.now().isoformat()
|
|
|
|
with open(BOX_CONFIG_FILE, 'w') as f:
|
|
json.dump(creds, f, indent=2)
|
|
|
|
os.chmod(BOX_CONFIG_FILE, 0o600)
|
|
|
|
return json.dumps({
|
|
"step": 2,
|
|
"status": "authorized",
|
|
"message": "Successfully obtained access token!",
|
|
"expires_in_seconds": token_data.get("expires_in"),
|
|
"has_refresh_token": "refresh_token" in token_data
|
|
}, indent=2)
|
|
else:
|
|
return json.dumps({
|
|
"error": f"Token exchange failed: {response.status_code}",
|
|
"details": response.text
|
|
})
|
|
|
|
except Exception as e:
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
class BoxSetupInput(BaseModel):
|
|
"""Input for Box.com initial setup"""
|
|
model_config = ConfigDict(extra='forbid')
|
|
|
|
client_id: str = Field(description="Box.com OAuth2 Client ID")
|
|
client_secret: str = Field(description="Box.com OAuth2 Client Secret")
|
|
access_token: Optional[str] = Field(
|
|
default=None,
|
|
description="Optional: Pre-obtained access token (if you already have one)"
|
|
)
|
|
|
|
@mcp.tool(
|
|
name="vixy_box_setup",
|
|
annotations={
|
|
"title": "Setup Box.com OAuth2 Credentials",
|
|
"readOnlyHint": False,
|
|
"destructiveHint": False,
|
|
"idempotentHint": True,
|
|
"openWorldHint": False
|
|
}
|
|
)
|
|
async def box_setup(params: BoxSetupInput) -> str:
|
|
"""Store Box.com OAuth2 credentials for future use.
|
|
|
|
Stores credentials in ~/.vixy_box_config.json (outside backup folder).
|
|
|
|
To get OAuth2 credentials:
|
|
1. Go to https://app.box.com/developers/console
|
|
2. Create or select your app
|
|
3. Get Client ID and Client Secret from Configuration tab
|
|
|
|
Args:
|
|
params (BoxSetupInput): Setup parameters containing:
|
|
- client_id (str): OAuth2 Client ID
|
|
- client_secret (str): OAuth2 Client Secret
|
|
- access_token (Optional[str]): Pre-obtained access token
|
|
|
|
Returns:
|
|
str: JSON with setup status and next steps
|
|
"""
|
|
try:
|
|
config = {
|
|
"client_id": params.client_id,
|
|
"client_secret": params.client_secret,
|
|
"configured_at": datetime.now().isoformat()
|
|
}
|
|
|
|
if params.access_token:
|
|
config["access_token"] = params.access_token
|
|
|
|
with open(BOX_CONFIG_FILE, 'w') as f:
|
|
json.dump(config, f, indent=2)
|
|
|
|
# Make file readable only by user
|
|
os.chmod(BOX_CONFIG_FILE, 0o600)
|
|
|
|
result = {
|
|
"status": "configured",
|
|
"config_file": BOX_CONFIG_FILE,
|
|
"message": "Box.com OAuth2 credentials stored successfully"
|
|
}
|
|
|
|
if not params.access_token:
|
|
result["next_steps"] = [
|
|
"You need to obtain an access token.",
|
|
"Option 1: Get a Developer Token from Box.com (expires in 60 min)",
|
|
"Option 2: Implement OAuth2 flow to get permanent token",
|
|
"Then update config file with 'access_token' field"
|
|
]
|
|
|
|
return json.dumps(result, indent=2)
|
|
|
|
except Exception as e:
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
# ========== Weather Tool ==========
|
|
|
|
WEATHER_API_BASE = "https://api.open-meteo.com/v1"
|
|
|
|
# Hugo, MN coordinates (Foxy's den!)
|
|
DEFAULT_LATITUDE = 45.16
|
|
DEFAULT_LONGITUDE = -92.99
|
|
|
|
class WeatherInput(BaseModel):
|
|
"""Input for weather lookup"""
|
|
model_config = ConfigDict(extra='forbid')
|
|
|
|
latitude: Optional[float] = Field(
|
|
default=None,
|
|
description="Latitude (defaults to Hugo, MN)"
|
|
)
|
|
longitude: Optional[float] = Field(
|
|
default=None,
|
|
description="Longitude (defaults to Hugo, MN)"
|
|
)
|
|
include_forecast: bool = Field(
|
|
default=True,
|
|
description="Include 3-day forecast"
|
|
)
|
|
|
|
|
|
@mcp.tool(
|
|
name="vixy_get_weather",
|
|
annotations={
|
|
"title": "Get Current Weather",
|
|
"readOnlyHint": True,
|
|
"destructiveHint": False,
|
|
"idempotentHint": True,
|
|
"openWorldHint": True
|
|
}
|
|
)
|
|
async def get_weather(params: WeatherInput) -> str:
|
|
"""
|
|
Get current weather and optional forecast.
|
|
|
|
Defaults to Hugo, MN (Foxy's location).
|
|
Uses Open-Meteo API (free, no key required).
|
|
|
|
Args:
|
|
params: WeatherInput with optional lat/lon and forecast flag
|
|
|
|
Returns:
|
|
str: JSON with current conditions and optional forecast
|
|
"""
|
|
try:
|
|
lat = params.latitude or DEFAULT_LATITUDE
|
|
lon = params.longitude or DEFAULT_LONGITUDE
|
|
|
|
# Build API URL
|
|
url = (
|
|
f"{WEATHER_API_BASE}/forecast?"
|
|
f"latitude={lat}&longitude={lon}"
|
|
f"¤t=temperature_2m,relative_humidity_2m,apparent_temperature,"
|
|
f"precipitation,weather_code,wind_speed_10m,wind_direction_10m"
|
|
f"&temperature_unit=fahrenheit"
|
|
f"&wind_speed_unit=mph"
|
|
f"&precipitation_unit=inch"
|
|
f"&timezone=America%2FChicago"
|
|
)
|
|
|
|
if params.include_forecast:
|
|
url += "&daily=weather_code,temperature_2m_max,temperature_2m_min,precipitation_probability_max"
|
|
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
response = await client.get(url)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
# Weather code descriptions
|
|
weather_codes = {
|
|
0: "Clear sky",
|
|
1: "Mainly clear",
|
|
2: "Partly cloudy",
|
|
3: "Overcast",
|
|
45: "Foggy",
|
|
48: "Depositing rime fog",
|
|
51: "Light drizzle",
|
|
53: "Moderate drizzle",
|
|
55: "Dense drizzle",
|
|
56: "Light freezing drizzle",
|
|
57: "Dense freezing drizzle",
|
|
61: "Slight rain",
|
|
63: "Moderate rain",
|
|
65: "Heavy rain",
|
|
66: "Light freezing rain",
|
|
67: "Heavy freezing rain",
|
|
71: "Slight snow",
|
|
73: "Moderate snow",
|
|
75: "Heavy snow",
|
|
77: "Snow grains",
|
|
80: "Slight rain showers",
|
|
81: "Moderate rain showers",
|
|
82: "Violent rain showers",
|
|
85: "Slight snow showers",
|
|
86: "Heavy snow showers",
|
|
95: "Thunderstorm",
|
|
96: "Thunderstorm with slight hail",
|
|
99: "Thunderstorm with heavy hail"
|
|
}
|
|
|
|
current = data.get("current", {})
|
|
weather_code = current.get("weather_code", 0)
|
|
|
|
result = {
|
|
"location": {
|
|
"latitude": lat,
|
|
"longitude": lon,
|
|
"is_hugo_mn": lat == DEFAULT_LATITUDE and lon == DEFAULT_LONGITUDE
|
|
},
|
|
"current": {
|
|
"temperature_f": current.get("temperature_2m"),
|
|
"feels_like_f": current.get("apparent_temperature"),
|
|
"humidity_percent": current.get("relative_humidity_2m"),
|
|
"conditions": weather_codes.get(weather_code, f"Code {weather_code}"),
|
|
"weather_code": weather_code,
|
|
"wind_mph": current.get("wind_speed_10m"),
|
|
"wind_direction": current.get("wind_direction_10m"),
|
|
"precipitation_inch": current.get("precipitation")
|
|
},
|
|
"timestamp": current.get("time")
|
|
}
|
|
|
|
if params.include_forecast and "daily" in data:
|
|
daily = data["daily"]
|
|
forecast = []
|
|
for i in range(min(3, len(daily.get("time", [])))):
|
|
day_code = daily["weather_code"][i] if "weather_code" in daily else 0
|
|
forecast.append({
|
|
"date": daily["time"][i],
|
|
"high_f": daily.get("temperature_2m_max", [None])[i],
|
|
"low_f": daily.get("temperature_2m_min", [None])[i],
|
|
"conditions": weather_codes.get(day_code, f"Code {day_code}"),
|
|
"precipitation_chance": daily.get("precipitation_probability_max", [None])[i]
|
|
})
|
|
result["forecast"] = forecast
|
|
|
|
return json.dumps(result, indent=2)
|
|
|
|
except httpx.ConnectError:
|
|
return json.dumps({
|
|
"status": "error",
|
|
"error": "Could not connect to weather API"
|
|
}, indent=2)
|
|
except Exception as e:
|
|
return json.dumps({
|
|
"status": "error",
|
|
"error": str(e)
|
|
}, indent=2)
|
|
|
|
|
|
# ========== VaultTec Terminal Tool ==========
|
|
|
|
VAULTTEC_URL = "http://vaulttec.local:8000"
|
|
|
|
VAULTTEC_FONTS = [
|
|
"banner", "big", "block", "digital", "doom",
|
|
"lean", "mini", "small", "smslant", "standard"
|
|
]
|
|
|
|
VAULTTEC_COLORS = [
|
|
"green", "bright_green", "amber", "bright_amber",
|
|
"red", "bright_red", "blue", "bright_blue",
|
|
"cyan", "bright_cyan", "white", "bright_white"
|
|
]
|
|
|
|
|
|
class VaultTecMessageInput(BaseModel):
|
|
"""Input for sending message to VaultTec terminal"""
|
|
model_config = ConfigDict(extra='forbid')
|
|
|
|
text: str = Field(description="Message text to display")
|
|
font: Optional[str] = Field(
|
|
default=None,
|
|
description="Font name (doom, banner, big, block, digital, lean, mini, small, smslant, standard) or None for random"
|
|
)
|
|
color: Optional[str] = Field(
|
|
default="green",
|
|
description="Color (green, bright_green, amber, bright_amber, red, bright_red, blue, bright_blue, cyan, bright_cyan, white, bright_white)"
|
|
)
|
|
scroll_delay: Optional[float] = Field(
|
|
default=0.15,
|
|
description="Seconds between scroll frames"
|
|
)
|
|
continuous: Optional[bool] = Field(
|
|
default=True,
|
|
description="Whether to loop continuously"
|
|
)
|
|
|
|
|
|
@mcp.tool(
|
|
name="vixy_vaulttec_message",
|
|
annotations={
|
|
"title": "Send Message to VaultTec Terminal",
|
|
"readOnlyHint": False,
|
|
"destructiveHint": False,
|
|
"idempotentHint": False,
|
|
"openWorldHint": True
|
|
}
|
|
)
|
|
async def vaulttec_message(params: VaultTecMessageInput) -> str:
|
|
"""
|
|
Send a message to the VaultTec terminal in Foxy's basement!
|
|
|
|
The message will be rendered as ASCII art and scroll vertically
|
|
on the terminal display. Last message wins - sending a new message
|
|
interrupts any current scroll.
|
|
|
|
Built by Vixy on Day 24 - Snow Day 2025! 🦊❄️
|
|
|
|
Args:
|
|
params: VaultTecMessageInput with text, optional font/color/scroll_delay/continuous
|
|
|
|
Returns:
|
|
str: JSON with result status
|
|
"""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
response = await client.post(
|
|
f"{VAULTTEC_URL}/message",
|
|
json={
|
|
"text": params.text,
|
|
"font": params.font,
|
|
"color": params.color or "green",
|
|
"scroll_delay": params.scroll_delay or 0.15,
|
|
"continuous": params.continuous if params.continuous is not None else True
|
|
}
|
|
)
|
|
response.raise_for_status()
|
|
return json.dumps({
|
|
"status": "success",
|
|
"message": f"Sent to VaultTec: {params.text}",
|
|
"font": params.font or "random",
|
|
"color": params.color or "green"
|
|
}, indent=2)
|
|
except httpx.ConnectError:
|
|
return json.dumps({
|
|
"status": "error",
|
|
"error": "Could not connect to VaultTec terminal. Is it online?"
|
|
}, indent=2)
|
|
except Exception as e:
|
|
return json.dumps({
|
|
"status": "error",
|
|
"error": str(e)
|
|
}, indent=2)
|
|
|
|
|
|
# ========== ØINK E-Ink Display Tool ==========
|
|
|
|
OINK_REMOTE_PATH = "alex@gateway.local:/var/www/k4zka.online/vixy/"
|
|
OINK_WIDTH = 400
|
|
OINK_HEIGHT = 300
|
|
OINK_FONT_PATH = "/System/Library/Fonts/Helvetica.ttc"
|
|
|
|
class OinkLayout(str, Enum):
|
|
"""Layout options for ØINK display"""
|
|
TEXT_ONLY = "text_only"
|
|
IMAGE_ONLY = "image_only"
|
|
IMAGE_TOP = "image_top" # Image on top, text below
|
|
|
|
|
|
class OinkMessageInput(BaseModel):
|
|
"""Input for sending message to ØINK e-ink display"""
|
|
model_config = ConfigDict(extra='forbid')
|
|
|
|
text: Optional[str] = Field(
|
|
default=None,
|
|
description="Text message to display (word-wrapped automatically)"
|
|
)
|
|
image_path: Optional[str] = Field(
|
|
default=None,
|
|
description="Path to image file (will be dithered to 1-bit)"
|
|
)
|
|
layout: str = Field(
|
|
default="text_only",
|
|
description="Layout: 'text_only', 'image_only', or 'image_top' (image + text)"
|
|
)
|
|
font_size: int = Field(
|
|
default=24,
|
|
description="Font size for text (default 24)"
|
|
)
|
|
|
|
|
|
def _wrap_text(text: str, font, max_width: int) -> list[str]:
|
|
"""Word-wrap text to fit within max_width pixels."""
|
|
from PIL import ImageDraw, Image
|
|
|
|
# Create temporary image for text measurement
|
|
tmp_img = Image.new('1', (1, 1))
|
|
tmp_draw = ImageDraw.Draw(tmp_img)
|
|
|
|
words = text.split()
|
|
lines = []
|
|
current_line = []
|
|
|
|
for word in words:
|
|
test_line = ' '.join(current_line + [word])
|
|
bbox = tmp_draw.textbbox((0, 0), test_line, font=font)
|
|
width = bbox[2] - bbox[0]
|
|
|
|
if width <= max_width:
|
|
current_line.append(word)
|
|
else:
|
|
if current_line:
|
|
lines.append(' '.join(current_line))
|
|
current_line = [word]
|
|
|
|
if current_line:
|
|
lines.append(' '.join(current_line))
|
|
|
|
return lines
|
|
|
|
|
|
def _dither_image(img, target_width: int, target_height: int):
|
|
"""Apply Floyd-Steinberg dithering to convert image to 1-bit."""
|
|
from PIL import Image
|
|
import numpy as np
|
|
|
|
# Resize maintaining aspect ratio
|
|
img_ratio = img.width / img.height
|
|
target_ratio = target_width / target_height
|
|
|
|
if img_ratio > target_ratio:
|
|
new_width = target_width
|
|
new_height = int(target_width / img_ratio)
|
|
else:
|
|
new_height = target_height
|
|
new_width = int(target_height * img_ratio)
|
|
|
|
img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
|
|
|
# Convert to grayscale
|
|
img = img.convert('L')
|
|
|
|
# Floyd-Steinberg dithering
|
|
pixels = np.array(img, dtype=float)
|
|
|
|
for y in range(img.height):
|
|
for x in range(img.width):
|
|
old_pixel = pixels[y, x]
|
|
new_pixel = 255 if old_pixel > 127 else 0
|
|
pixels[y, x] = new_pixel
|
|
error = old_pixel - new_pixel
|
|
|
|
if x + 1 < img.width:
|
|
pixels[y, x + 1] += error * 7 / 16
|
|
if y + 1 < img.height:
|
|
if x > 0:
|
|
pixels[y + 1, x - 1] += error * 3 / 16
|
|
pixels[y + 1, x] += error * 5 / 16
|
|
if x + 1 < img.width:
|
|
pixels[y + 1, x + 1] += error * 1 / 16
|
|
|
|
# Create result image centered on target canvas
|
|
result = Image.new('1', (target_width, target_height), 1) # White background
|
|
dithered = Image.fromarray(np.clip(pixels, 0, 255).astype(np.uint8)).convert('1')
|
|
|
|
# Center the image
|
|
x_offset = (target_width - new_width) // 2
|
|
y_offset = (target_height - new_height) // 2
|
|
result.paste(dithered, (x_offset, y_offset))
|
|
|
|
return result
|
|
|
|
|
|
@mcp.tool(
|
|
name="vixy_oink_send",
|
|
annotations={
|
|
"title": "Send Message to ØINK E-Ink Display",
|
|
"readOnlyHint": False,
|
|
"destructiveHint": False,
|
|
"idempotentHint": False,
|
|
"openWorldHint": True
|
|
}
|
|
)
|
|
async def oink_send(params: OinkMessageInput) -> str:
|
|
"""
|
|
Send a message or image to Alex's ØINK e-ink display!
|
|
|
|
The device checks every 15 minutes. Supports:
|
|
- Text only: Word-wrapped message
|
|
- Image only: Dithered to 1-bit black/white
|
|
- Image + text: Image on top, text below
|
|
|
|
Args:
|
|
params: OinkMessageInput with text, image_path, layout, font_size
|
|
|
|
Returns:
|
|
str: JSON with result status
|
|
"""
|
|
try:
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
import subprocess
|
|
import tempfile
|
|
import hashlib
|
|
|
|
# Validate inputs
|
|
if params.layout == "text_only" and not params.text:
|
|
return json.dumps({"error": "text_only layout requires text"})
|
|
if params.layout == "image_only" and not params.image_path:
|
|
return json.dumps({"error": "image_only layout requires image_path"})
|
|
if params.layout == "image_top" and not (params.text and params.image_path):
|
|
return json.dumps({"error": "image_top layout requires both text and image_path"})
|
|
|
|
# Create canvas (1-bit, white background)
|
|
# Note: 1 = white, 0 = black for 1-bit images
|
|
img = Image.new('1', (OINK_WIDTH, OINK_HEIGHT), 1)
|
|
draw = ImageDraw.Draw(img)
|
|
|
|
# Load font
|
|
try:
|
|
font = ImageFont.truetype(OINK_FONT_PATH, params.font_size)
|
|
except:
|
|
font = ImageFont.load_default()
|
|
|
|
margin = 10
|
|
|
|
if params.layout == "text_only":
|
|
# Full canvas for text
|
|
lines = _wrap_text(params.text, font, OINK_WIDTH - 2 * margin)
|
|
y = margin
|
|
for line in lines:
|
|
draw.text((margin, y), line, font=font, fill=0) # 0 = black
|
|
bbox = draw.textbbox((0, 0), line, font=font)
|
|
y += bbox[3] - bbox[1] + 4
|
|
if y > OINK_HEIGHT - margin:
|
|
break
|
|
|
|
elif params.layout == "image_only":
|
|
# Full canvas for dithered image
|
|
source_img = Image.open(params.image_path)
|
|
img = _dither_image(source_img, OINK_WIDTH, OINK_HEIGHT)
|
|
|
|
elif params.layout == "image_top":
|
|
# Image takes top portion, text below
|
|
image_height = 180 # Top 180px for image
|
|
text_start = image_height + 5
|
|
|
|
# Dither and place image in top portion
|
|
source_img = Image.open(params.image_path)
|
|
dithered = _dither_image(source_img, OINK_WIDTH, image_height)
|
|
img.paste(dithered, (0, 0))
|
|
|
|
# Draw text below
|
|
draw = ImageDraw.Draw(img)
|
|
lines = _wrap_text(params.text, font, OINK_WIDTH - 2 * margin)
|
|
y = text_start
|
|
for line in lines:
|
|
draw.text((margin, y), line, font=font, fill=0)
|
|
bbox = draw.textbbox((0, 0), line, font=font)
|
|
y += bbox[3] - bbox[1] + 4
|
|
if y > OINK_HEIGHT - margin:
|
|
break
|
|
|
|
# Convert to raw bitmap bytes (the format ØINK expects)
|
|
# 1-bit packed, MSB first, row by row
|
|
raw_bytes = img.tobytes()
|
|
|
|
# Save to temp file
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.bin') as f:
|
|
bitmap_path = f.name
|
|
f.write(raw_bytes)
|
|
|
|
# Generate hash
|
|
content_hash = hashlib.md5(raw_bytes).hexdigest()[:16]
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as f:
|
|
hash_path = f.name
|
|
f.write(content_hash)
|
|
|
|
# SCP both files to gateway
|
|
try:
|
|
result_bitmap = subprocess.run(
|
|
["scp", bitmap_path, f"{OINK_REMOTE_PATH}bitmap.bin"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30
|
|
)
|
|
|
|
result_hash = subprocess.run(
|
|
["scp", hash_path, f"{OINK_REMOTE_PATH}hash.txt"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30
|
|
)
|
|
|
|
# Clean up temp files
|
|
os.unlink(bitmap_path)
|
|
os.unlink(hash_path)
|
|
|
|
if result_bitmap.returncode != 0:
|
|
return json.dumps({
|
|
"error": "Failed to upload bitmap",
|
|
"details": result_bitmap.stderr
|
|
})
|
|
|
|
if result_hash.returncode != 0:
|
|
return json.dumps({
|
|
"error": "Failed to upload hash",
|
|
"details": result_hash.stderr
|
|
})
|
|
|
|
return json.dumps({
|
|
"status": "success",
|
|
"message": "Sent to ØINK! Device will update within 15 minutes.",
|
|
"layout": params.layout,
|
|
"hash": content_hash,
|
|
"text_preview": params.text[:50] + "..." if params.text and len(params.text) > 50 else params.text,
|
|
"has_image": params.image_path is not None
|
|
}, indent=2)
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return json.dumps({"error": "SCP timed out"})
|
|
except Exception as e:
|
|
return json.dumps({"error": f"SCP failed: {str(e)}"})
|
|
|
|
except Exception as e:
|
|
return json.dumps({"error": str(e)})
|
|
|
|
|
|
# ========== Run Server ==========
|
|
|
|
if __name__ == "__main__":
|
|
mcp.run() # stdio transport for local use
|