Initial commit: Image Watch MCP

🖼️ MCP for monitoring and managing images
- get_latest_image: Get most recent image as inline display
- get_recent_images: Get multiple recent images
- list_pending_images: List without loading
- open_image_from_path: View any image file
- archive_processed_images: Clean up processed images

Built with love for the hardware dragon 🦊
This commit is contained in:
Alex Kazaiev
2025-12-16 20:56:09 -06:00
commit 9198846405
5 changed files with 936 additions and 0 deletions

573
image_watch_mcp.py Executable file
View File

@@ -0,0 +1,573 @@
#!/usr/bin/env python3
"""
Image Watch MCP Server
Model Context Protocol server for watching local folders and exposing images
to Claude Desktop for inline viewing.
Features:
- Watches ~/Downloads for new images (JPEG, PNG, HEIC)
- Smart compression to fit Claude's 1MB limit
- HEIC conversion for iPhone photos
- Archive processed images to ~/Downloads/Processed/
- Multiple tools for flexible workflows
"""
import asyncio
import logging
import os
import shutil
import threading
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from io import BytesIO
from pathlib import Path
from typing import List, Optional, Union, Dict, Any
from fastmcp import FastMCP
from fastmcp.utilities.types import Image as MCPImage
from PIL import Image
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/tmp/image_watch_mcp.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Initialize MCP server
mcp = FastMCP("Image Watcher")
# Configuration
WATCH_DIR = Path.home() / "Downloads"
ARCHIVE_DIR = Path.home() / "Downloads" / "Processed"
IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.heic', '.gif', '.bmp', '.webp'}
MAX_IMAGE_SIZE_MB = 0.95 # Target for base64-encoded size (leave room for dict metadata)
DEFAULT_RECENT_MINUTES = 60 # Consider images from last hour as "recent"
# Global state
pending_images: List['PendingImage'] = []
state_lock = threading.Lock()
# Try to import HEIC support
try:
import pillow_heif
pillow_heif.register_heif_opener()
HEIC_SUPPORTED = True
logger.info("HEIC support enabled")
except ImportError:
HEIC_SUPPORTED = False
logger.warning("HEIC support not available - install pillow-heif for iPhone photo support")
@dataclass
class PendingImage:
"""Represents an image waiting to be processed"""
path: Path
timestamp: datetime
size_mb: float
format: str
processed: bool = False
archived: bool = False
@classmethod
def from_path(cls, path: Path) -> 'PendingImage':
"""Create PendingImage from file path"""
stat = path.stat()
size_mb = stat.st_size / (1024 * 1024)
format_ext = path.suffix.lower().lstrip('.')
return cls(
path=path,
timestamp=datetime.fromtimestamp(stat.st_mtime),
size_mb=size_mb,
format=format_ext
)
async def smart_compress(image_path: Path, target_size_mb: float = MAX_IMAGE_SIZE_MB) -> bytes:
"""
Compress image to fit under target size with quality adaptation.
Strategy:
1. Convert HEIC/RGBA to RGB JPEG
2. Try progressive quality reduction (85, 75, 65, 55)
3. If still too large, resize image
4. Return best effort
Args:
image_path: Path to image file
target_size_mb: Target size in MB for base64-encoded output (default 1.0MB)
Returns:
Compressed image bytes (JPEG format) that will be under target when base64 encoded
NOTE: We check the ACTUAL base64 size, not an estimate.
"""
try:
import base64
logger.info(f"Compressing {image_path.name}")
# Open image
img = Image.open(image_path)
# Convert RGBA/P/LA to RGB (JPEG doesn't support transparency)
if img.mode in ('RGBA', 'LA', 'P'):
rgb_img = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'RGBA':
rgb_img.paste(img, mask=img.split()[-1])
else:
rgb_img.paste(img)
img = rgb_img
elif img.mode != 'RGB':
img = img.convert('RGB')
# Try progressive quality reduction
for quality in [85, 75, 65, 55]:
buffer = BytesIO()
img.save(buffer, format='JPEG', quality=quality, optimize=True)
img_bytes = buffer.getvalue()
# Check ACTUAL base64 size instead of estimating
base64_encoded = base64.b64encode(img_bytes)
base64_size_mb = len(base64_encoded) / (1024 * 1024)
raw_size_mb = len(img_bytes) / (1024 * 1024)
logger.debug(f"Quality {quality}: {raw_size_mb:.2f}MB raw → {base64_size_mb:.2f}MB base64 (actual)")
if base64_size_mb <= target_size_mb:
logger.info(f"✓ Compressed to {img.width}x{img.height} @ quality {quality}: {raw_size_mb:.2f}MB raw → {base64_size_mb:.2f}MB base64 (actual)")
return img_bytes
# Still too large - try resizing
logger.warning(f"Image still too large, attempting resize")
scale = 0.9
while scale >= 0.5:
new_size = (int(img.width * scale), int(img.height * scale))
resized = img.resize(new_size, Image.Resampling.LANCZOS)
buffer = BytesIO()
resized.save(buffer, format='JPEG', quality=70, optimize=True)
img_bytes = buffer.getvalue()
# Check ACTUAL base64 size instead of estimating
base64_encoded = base64.b64encode(img_bytes)
base64_size_mb = len(base64_encoded) / (1024 * 1024)
raw_size_mb = len(img_bytes) / (1024 * 1024)
logger.debug(f"Scale {scale:.1f} ({new_size}): {raw_size_mb:.2f}MB raw → {base64_size_mb:.2f}MB base64 (actual)")
if base64_size_mb <= target_size_mb:
logger.info(f"✓ Resized to {new_size}: {raw_size_mb:.2f}MB raw → {base64_size_mb:.2f}MB base64 (actual)")
return img_bytes
scale -= 0.1
# Return best effort
base64_encoded = base64.b64encode(img_bytes)
base64_size_mb = len(base64_encoded) / (1024 * 1024)
raw_size_mb = len(img_bytes) / (1024 * 1024)
logger.warning(f"Could not compress below target, returning {raw_size_mb:.2f}MB raw → {base64_size_mb:.2f}MB base64 (actual)")
return img_bytes
except Exception as e:
logger.error(f"Failed to compress {image_path}: {e}")
raise
class ImageHandler(FileSystemEventHandler):
"""Watchdog event handler for image files"""
def on_created(self, event):
"""Handle new file creation"""
if event.is_directory:
return
path = Path(event.src_path)
# Check if it's an image file
if path.suffix.lower() not in IMAGE_EXTENSIONS:
return
# Skip files in archive directory
if ARCHIVE_DIR in path.parents:
return
logger.info(f"New image detected: {path.name}")
try:
# Create PendingImage and add to queue
pending_image = PendingImage.from_path(path)
with state_lock:
# Avoid duplicates
if not any(img.path == path for img in pending_images):
pending_images.append(pending_image)
logger.info(f"Added to pending queue: {path.name} ({pending_image.size_mb:.2f}MB)")
except Exception as e:
logger.error(f"Failed to process new image {path}: {e}")
def start_watcher():
"""Start background file watcher"""
logger.info(f"Starting watchdog observer on {WATCH_DIR}")
# Ensure watch directory exists
WATCH_DIR.mkdir(parents=True, exist_ok=True)
event_handler = ImageHandler()
observer = Observer()
observer.schedule(event_handler, str(WATCH_DIR), recursive=False)
observer.start()
logger.info("Watchdog observer started successfully")
return observer
def scan_existing_images():
"""Scan watch directory for existing images on startup"""
logger.info(f"Scanning {WATCH_DIR} for existing images")
recent_threshold = datetime.now() - timedelta(minutes=DEFAULT_RECENT_MINUTES)
count = 0
for path in WATCH_DIR.iterdir():
if not path.is_file():
continue
if path.suffix.lower() not in IMAGE_EXTENSIONS:
continue
# Skip files in archive directory
if ARCHIVE_DIR in path.parents:
continue
try:
pending_image = PendingImage.from_path(path)
# Only add recent images
if pending_image.timestamp >= recent_threshold:
with state_lock:
if not any(img.path == path for img in pending_images):
pending_images.append(pending_image)
count += 1
except Exception as e:
logger.error(f"Failed to scan {path}: {e}")
logger.info(f"Found {count} recent images in {WATCH_DIR}")
# MCP Tools
@mcp.tool()
async def get_latest_image() -> Union[MCPImage, str]:
"""
Get the most recently added image from the watch directory.
Returns the latest unprocessed image as an inline image, or a message
if no images are available. The image is automatically compressed to
fit Claude's 1MB limit while maintaining visual quality.
After viewing, the image is marked as processed and can be archived
using archive_processed_images().
Returns:
MCPImage object for inline display, or text message
"""
with state_lock:
unprocessed = [img for img in pending_images if not img.processed]
if not unprocessed:
return "No new images available. AirDrop some photos to your Downloads folder!"
# Get most recent
latest = max(unprocessed, key=lambda x: x.timestamp)
try:
# Compress and prepare image
compressed = await smart_compress(latest.path)
# Mark as processed
with state_lock:
latest.processed = True
logger.info(f"Returning latest image: {latest.path.name} ({latest.timestamp.isoformat()})")
return MCPImage(data=compressed, format="jpeg")
except Exception as e:
logger.error(f"Failed to process {latest.path}: {e}")
return f"Error loading image {latest.path.name}: {str(e)}"
@mcp.tool()
async def get_recent_images(count: int = 5, since_minutes: int = DEFAULT_RECENT_MINUTES) -> Union[List[MCPImage], str]:
"""
Get multiple recent images from the watch directory.
Returns up to 'count' images that were added within the last 'since_minutes'
minutes. Images are returned in chronological order (oldest first) and
compressed to fit Claude's 1MB limit.
All returned images are marked as processed and can be archived later.
Args:
count: Maximum number of images to return (default: 5)
since_minutes: Consider images from this many minutes ago (default: 60)
Returns:
List of MCPImage objects for inline display, or text message
"""
threshold = datetime.now() - timedelta(minutes=since_minutes)
with state_lock:
recent = [
img for img in pending_images
if not img.processed and img.timestamp >= threshold
]
if not recent:
return f"No images found from the last {since_minutes} minutes"
# Sort by timestamp (oldest first) and limit count
recent.sort(key=lambda x: x.timestamp)
to_process = recent[:count]
logger.info(f"Processing {len(to_process)} recent images")
# Adjust target size per image to keep total under 1MB (Claude's tool output limit)
# Leave ~100KB overhead for dict metadata and JSON serialization
total_budget_mb = 0.90
if len(to_process) == 1:
target_per_image = total_budget_mb
elif len(to_process) == 2:
target_per_image = total_budget_mb / 2 # ~0.45 MB each
elif len(to_process) == 3:
target_per_image = total_budget_mb / 3 # ~0.30 MB each
elif len(to_process) == 4:
target_per_image = total_budget_mb / 4 # ~0.22 MB each
else:
target_per_image = total_budget_mb / 5 # ~0.18 MB each (max 5 images)
logger.info(f"Target size per image: {target_per_image:.2f}MB (total budget: {total_budget_mb}MB)")
images = []
errors = []
for img in to_process:
try:
compressed = await smart_compress(img.path, target_size_mb=target_per_image)
images.append(MCPImage(data=compressed, format="jpeg"))
# Mark as processed
with state_lock:
img.processed = True
# Log filename for reference
logger.info(f"Added image: {img.path.name} ({img.timestamp.isoformat()})")
except Exception as e:
logger.error(f"Failed to process {img.path}: {e}")
errors.append(f"{img.path.name}: {str(e)}")
if not images and errors:
return f"Failed to load images:\n" + "\n".join(errors)
logger.info(f"Returning {len(images)} images")
# FastMCP will show each dict with its metadata and inline image
return images
@mcp.tool()
async def list_pending_images() -> str:
"""
List all pending (unprocessed) images without loading them.
Shows image names, sizes, formats, and timestamps. Useful for seeing
what images are available before deciding to load them.
Returns:
Formatted text list of pending images
"""
with state_lock:
unprocessed = [img for img in pending_images if not img.processed]
if not unprocessed:
return "No pending images"
# Sort by timestamp (newest first)
unprocessed.sort(key=lambda x: x.timestamp, reverse=True)
lines = [f"Pending images ({len(unprocessed)}):"]
for i, img in enumerate(unprocessed, 1):
time_ago = datetime.now() - img.timestamp
if time_ago.seconds < 60:
time_str = "just now"
elif time_ago.seconds < 3600:
time_str = f"{time_ago.seconds // 60}m ago"
else:
time_str = f"{time_ago.seconds // 3600}h ago"
lines.append(
f"{i}. {img.path.name} ({img.size_mb:.1f}MB, {img.format.upper()}, {time_str})"
)
return "\n".join(lines)
@mcp.tool()
async def open_image_from_path(file_path: str) -> Union[MCPImage, str]:
"""
Open and display any image from a file path.
Opens an image from anywhere on the file system, compresses it to fit
Claude's 1MB limit, and returns it for inline display. Useful for
viewing images from DreamTail saves, Matrix downloads, or any other
location.
Does NOT mark the image as processed or track it in the watch list.
Args:
file_path: Absolute or relative path to the image file (~ expansion supported)
Returns:
MCPImage object for inline display, or error message
Examples:
open_image_from_path("~/Downloads/photo.jpg")
open_image_from_path("/Users/alex/Pictures/vacation.png")
open_image_from_path("../relative/path/image.heic")
"""
try:
# Resolve path
path = Path(file_path).expanduser().resolve()
if not path.exists():
return f"Error: File not found: {file_path}"
if not path.is_file():
return f"Error: Not a file: {file_path}"
# Check if it's an image
if path.suffix.lower() not in IMAGE_EXTENSIONS:
return f"Error: Not a supported image format: {path.suffix}\nSupported: {', '.join(IMAGE_EXTENSIONS)}"
# Get file size
size_bytes = path.stat().st_size
size_mb = size_bytes / (1024 * 1024)
# Compress and prepare image
logger.info(f"Opening image from path: {path} ({size_mb:.2f}MB)")
compressed = await smart_compress(path)
# Get compressed size (base64 encoded)
import base64
base64_size = len(base64.b64encode(compressed))
compressed_mb = base64_size / (1024 * 1024)
logger.info(f"Opened image: {path.name} from {path}")
logger.info(f"Image compressed: {size_mb:.2f}MB → {compressed_mb:.2f}MB")
return MCPImage(data=compressed, format="jpeg")
except Exception as e:
logger.error(f"Failed to open image from path {file_path}: {e}")
return f"Error opening image: {str(e)}"
@mcp.tool()
async def archive_processed_images() -> str:
"""
Move all processed images to the archive folder.
Moves images that have been viewed to ~/Downloads/Processed/ to keep
your Downloads folder clean. The images are moved (not copied) so they
won't be processed again.
If an image with the same name already exists in the archive, the new
file will be renamed with a timestamp suffix.
Returns:
Summary of archived images
"""
with state_lock:
to_archive = [img for img in pending_images if img.processed and not img.archived]
if not to_archive:
return "No processed images to archive"
# Ensure archive directory exists
ARCHIVE_DIR.mkdir(parents=True, exist_ok=True)
archived_count = 0
errors = []
for img in to_archive:
try:
dest = ARCHIVE_DIR / img.path.name
# Handle duplicate names
if dest.exists():
stem = img.path.stem
suffix = img.path.suffix
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
dest = ARCHIVE_DIR / f"{stem}_{timestamp}{suffix}"
# Move file
shutil.move(str(img.path), str(dest))
# Update state
with state_lock:
img.archived = True
archived_count += 1
logger.info(f"Archived {img.path.name}{dest.name}")
except Exception as e:
logger.error(f"Failed to archive {img.path}: {e}")
errors.append(f"{img.path.name}: {str(e)}")
# Clean up archived images from memory
with state_lock:
pending_images[:] = [img for img in pending_images if not img.archived]
result = f"Archived {archived_count} image(s) to {ARCHIVE_DIR}"
if errors:
result += f"\n\nErrors:\n" + "\n".join(errors)
return result
if __name__ == "__main__":
logger.info("=" * 60)
logger.info("Image Watch MCP Server starting...")
logger.info(f"Watch directory: {WATCH_DIR}")
logger.info(f"Archive directory: {ARCHIVE_DIR}")
logger.info(f"HEIC support: {'enabled' if HEIC_SUPPORTED else 'disabled'}")
logger.info("=" * 60)
# Scan for existing recent images
scan_existing_images()
# Start watchdog observer in background
observer = start_watcher()
try:
# Run MCP server (uses stdio transport)
logger.info("Starting MCP server...")
mcp.run()
finally:
# Clean up watchdog observer
observer.stop()
observer.join()
logger.info("Server stopped")