#!/usr/bin/env python3 """ Claude Desktop Automation - Wakeup Control MCP Server Model Context Protocol server that allows Claude to control the automation daemon's wake schedule. Tools: - next_wakeup(minutes) - Set when to wake next (overrides default 60min) - pause_automation() - Pause the automation - resume_automation() - Resume the automation - get_status() - Check automation status and next wake time """ import json import logging import fcntl import os from datetime import datetime, timedelta from pathlib import Path from typing import Optional from fastmcp import FastMCP # Configuration STATE_FILE = Path.home() / ".claude-automation-state.json" LOG_FILE = Path("/tmp/wakeup-mcp.log") # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(LOG_FILE), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Initialize MCP server mcp = FastMCP("Wakeup Control") def load_state() -> dict: """Load automation state from JSON file""" if not STATE_FILE.exists(): return { 'interval_minutes': 60, 'paused': False, 'last_wake': None, 'next_wake_timestamp': None, 'matrix_messages': [], 'matrix_invites': [], 'matrix_last_wake': None, 'matrix_wake_requested': False, } try: with open(STATE_FILE, 'r') as f: return json.load(f) except Exception as e: logger.error(f"Failed to load state: {e}") return {} def save_state(state: dict): """Save automation state to JSON file atomically with file locking""" try: temp_path = STATE_FILE.with_suffix('.tmp') with open(temp_path, 'w') as f: fcntl.flock(f.fileno(), fcntl.LOCK_EX) json.dump(state, f, indent=2) f.flush() os.fsync(f.fileno()) temp_path.rename(STATE_FILE) logger.info(f"Saved state atomically") except Exception as e: logger.error(f"Failed to save state: {e}") @mcp.tool() def next_wakeup(minutes: int) -> str: """ Set when the automation should wake up next. This overrides the default 60-minute interval for the NEXT wake cycle only. After that wake, it reverts to the default interval unless you call this again. Use this to adjust monitoring frequency based on current activity: - High activity: next_wakeup(15) for more frequent checks - Low activity: next_wakeup(120) for less frequent checks - Skip next: next_wakeup(1440) to wake tomorrow Args: minutes: How many minutes from now to wake up next (min: 1, max: 1440) Returns: Confirmation message with next wake time Examples: next_wakeup(30) - Wake in 30 minutes next_wakeup(120) - Wake in 2 hours next_wakeup(5) - Wake in 5 minutes (urgent check) """ # Validate input if not isinstance(minutes, int): return f"Error: minutes must be an integer, got {type(minutes)}" if minutes < 1: return "Error: minutes must be at least 1" if minutes > 1440: return "Error: minutes cannot exceed 1440 (24 hours)" try: # Calculate next wake time next_wake = datetime.now() + timedelta(minutes=minutes) # Load current state state = load_state() # Update next wake timestamp state['next_wake_timestamp'] = next_wake.isoformat() # Save state save_state(state) logger.info(f"Next wake set to {next_wake.strftime('%Y-%m-%d %H:%M:%S')} ({minutes} minutes)") return f"✓ Next wake scheduled for {next_wake.strftime('%Y-%m-%d %H:%M:%S')} (in {minutes} minutes)" except Exception as e: logger.error(f"Failed to set next wakeup: {e}") return f"Error setting next wakeup: {str(e)}" @mcp.tool() def pause_automation() -> str: """ Pause the automation daemon. The daemon will stop sending wake messages until you call resume_automation(). The daemon process continues running but skips wake cycles. Use this when: - You don't need monitoring for a while - Working on something that shouldn't be interrupted - Testing or debugging Call resume_automation() to restart wake messages. Returns: Confirmation message """ try: state = load_state() state['paused'] = True save_state(state) logger.info("Automation paused") return "✓ Automation paused - no more wake messages until you call resume_automation()" except Exception as e: logger.error(f"Failed to pause automation: {e}") return f"Error pausing automation: {str(e)}" @mcp.tool() def resume_automation() -> str: """ Resume the automation daemon. Restarts wake messages after being paused. The next wake will happen according to the configured interval (default 60 minutes). Returns: Confirmation message """ try: state = load_state() state['paused'] = False save_state(state) logger.info("Automation resumed") return "✓ Automation resumed - wake messages will continue on schedule" except Exception as e: logger.error(f"Failed to resume automation: {e}") return f"Error resuming automation: {str(e)}" @mcp.tool() def get_status() -> str: """ Get current automation status. Shows: - Whether automation is running or paused - When the last wake happened - When the next wake is scheduled - Current interval setting Returns: Formatted status report """ try: state = load_state() status_lines = ["Automation Status:"] status_lines.append("=" * 40) # Paused status if state.get('paused'): status_lines.append("⏸️ State: PAUSED") else: status_lines.append("▶️ State: RUNNING") # Last wake if state.get('last_wake'): try: last_wake = datetime.fromisoformat(state['last_wake']) time_ago = datetime.now() - last_wake status_lines.append(f"Last wake: {last_wake.strftime('%Y-%m-%d %H:%M:%S')} ({int(time_ago.total_seconds()/60)} min ago)") except: status_lines.append(f"Last wake: {state.get('last_wake')}") else: status_lines.append("Last wake: Never (daemon just started)") # Next wake if state.get('next_wake_timestamp'): try: next_wake = datetime.fromisoformat(state['next_wake_timestamp']) time_until = next_wake - datetime.now() if time_until.total_seconds() > 0: status_lines.append(f"Next wake: {next_wake.strftime('%Y-%m-%d %H:%M:%S')} (in {int(time_until.total_seconds()/60)} min)") else: status_lines.append(f"Next wake: OVERDUE (was {next_wake.strftime('%Y-%m-%d %H:%M:%S')})") except: status_lines.append(f"Next wake: {state.get('next_wake_timestamp')}") else: interval = state.get('interval_minutes', 60) status_lines.append(f"Next wake: Using default interval ({interval} minutes)") # Interval interval = state.get('interval_minutes', 60) status_lines.append(f"Default interval: {interval} minutes") return "\n".join(status_lines) except Exception as e: logger.error(f"Failed to get status: {e}") return f"Error getting status: {str(e)}" if __name__ == "__main__": logger.info("=" * 60) logger.info("Wakeup Control MCP Server starting...") logger.info(f"State file: {STATE_FILE}") logger.info("=" * 60) # Run MCP server (uses stdio transport by default) mcp.run()