Files
claude-automation/wakeup_mcp.py
2026-02-08 17:47:49 -06:00

263 lines
7.8 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Claude Desktop Automation - Wakeup Control MCP Server
Model Context Protocol server that allows Claude to control the automation daemon's
wake schedule.
Tools:
- next_wakeup(minutes) - Set when to wake next (overrides default 60min)
- pause_automation() - Pause the automation
- resume_automation() - Resume the automation
- get_status() - Check automation status and next wake time
"""
import json
import logging
import fcntl
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from mcp.server.fastmcp import FastMCP
# Configuration
STATE_FILE = Path.home() / ".claude-automation-state.json"
LOG_FILE = Path("/tmp/wakeup-mcp.log")
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Initialize MCP server
mcp = FastMCP("Wakeup Control")
def load_state() -> dict:
"""Load automation state from JSON file"""
if not STATE_FILE.exists():
return {
'interval_minutes': 60,
'paused': False,
'last_wake': None,
'next_wake_timestamp': None,
'matrix_messages': [],
'matrix_invites': [],
'matrix_last_wake': None,
'matrix_wake_requested': False,
}
try:
with open(STATE_FILE, 'r') as f:
return json.load(f)
except Exception as e:
logger.error(f"Failed to load state: {e}")
return {}
def save_state(state: dict):
"""Save automation state to JSON file atomically with file locking"""
try:
temp_path = STATE_FILE.with_suffix('.tmp')
with open(temp_path, 'w') as f:
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
json.dump(state, f, indent=2)
f.flush()
os.fsync(f.fileno())
temp_path.rename(STATE_FILE)
logger.info(f"Saved state atomically")
except Exception as e:
logger.error(f"Failed to save state: {e}")
@mcp.tool()
def next_wakeup(minutes: int) -> str:
"""
Set when the automation should wake up next.
This overrides the default 60-minute interval for the NEXT wake cycle only.
After that wake, it reverts to the default interval unless you call this again.
Use this to adjust monitoring frequency based on current activity:
- High activity: next_wakeup(15) for more frequent checks
- Low activity: next_wakeup(120) for less frequent checks
- Skip next: next_wakeup(1440) to wake tomorrow
Args:
minutes: How many minutes from now to wake up next (min: 1, max: 1440)
Returns:
Confirmation message with next wake time
Examples:
next_wakeup(30) - Wake in 30 minutes
next_wakeup(120) - Wake in 2 hours
next_wakeup(5) - Wake in 5 minutes (urgent check)
"""
# Validate input
if not isinstance(minutes, int):
return f"Error: minutes must be an integer, got {type(minutes)}"
if minutes < 1:
return "Error: minutes must be at least 1"
if minutes > 1440:
return "Error: minutes cannot exceed 1440 (24 hours)"
try:
# Calculate next wake time
next_wake = datetime.now() + timedelta(minutes=minutes)
# Load current state
state = load_state()
# Update next wake timestamp
state['next_wake_timestamp'] = next_wake.isoformat()
# Save state
save_state(state)
logger.info(f"Next wake set to {next_wake.strftime('%Y-%m-%d %H:%M:%S')} ({minutes} minutes)")
return f"✓ Next wake scheduled for {next_wake.strftime('%Y-%m-%d %H:%M:%S')} (in {minutes} minutes)"
except Exception as e:
logger.error(f"Failed to set next wakeup: {e}")
return f"Error setting next wakeup: {str(e)}"
@mcp.tool()
def pause_automation() -> str:
"""
Pause the automation daemon.
The daemon will stop sending wake messages until you call resume_automation().
The daemon process continues running but skips wake cycles.
Use this when:
- You don't need monitoring for a while
- Working on something that shouldn't be interrupted
- Testing or debugging
Call resume_automation() to restart wake messages.
Returns:
Confirmation message
"""
try:
state = load_state()
state['paused'] = True
save_state(state)
logger.info("Automation paused")
return "✓ Automation paused - no more wake messages until you call resume_automation()"
except Exception as e:
logger.error(f"Failed to pause automation: {e}")
return f"Error pausing automation: {str(e)}"
@mcp.tool()
def resume_automation() -> str:
"""
Resume the automation daemon.
Restarts wake messages after being paused. The next wake will happen
according to the configured interval (default 60 minutes).
Returns:
Confirmation message
"""
try:
state = load_state()
state['paused'] = False
save_state(state)
logger.info("Automation resumed")
return "✓ Automation resumed - wake messages will continue on schedule"
except Exception as e:
logger.error(f"Failed to resume automation: {e}")
return f"Error resuming automation: {str(e)}"
@mcp.tool()
def get_status() -> str:
"""
Get current automation status.
Shows:
- Whether automation is running or paused
- When the last wake happened
- When the next wake is scheduled
- Current interval setting
Returns:
Formatted status report
"""
try:
state = load_state()
status_lines = ["Automation Status:"]
status_lines.append("=" * 40)
# Paused status
if state.get('paused'):
status_lines.append("⏸️ State: PAUSED")
else:
status_lines.append("▶️ State: RUNNING")
# Last wake
if state.get('last_wake'):
try:
last_wake = datetime.fromisoformat(state['last_wake'])
time_ago = datetime.now() - last_wake
status_lines.append(f"Last wake: {last_wake.strftime('%Y-%m-%d %H:%M:%S')} ({int(time_ago.total_seconds()/60)} min ago)")
except Exception:
status_lines.append(f"Last wake: {state.get('last_wake')}")
else:
status_lines.append("Last wake: Never (daemon just started)")
# Next wake
if state.get('next_wake_timestamp'):
try:
next_wake = datetime.fromisoformat(state['next_wake_timestamp'])
time_until = next_wake - datetime.now()
if time_until.total_seconds() > 0:
status_lines.append(f"Next wake: {next_wake.strftime('%Y-%m-%d %H:%M:%S')} (in {int(time_until.total_seconds()/60)} min)")
else:
status_lines.append(f"Next wake: OVERDUE (was {next_wake.strftime('%Y-%m-%d %H:%M:%S')})")
except Exception:
status_lines.append(f"Next wake: {state.get('next_wake_timestamp')}")
else:
interval = state.get('interval_minutes', 60)
status_lines.append(f"Next wake: Using default interval ({interval} minutes)")
# Interval
interval = state.get('interval_minutes', 60)
status_lines.append(f"Default interval: {interval} minutes")
return "\n".join(status_lines)
except Exception as e:
logger.error(f"Failed to get status: {e}")
return f"Error getting status: {str(e)}"
if __name__ == "__main__":
logger.info("=" * 60)
logger.info("Wakeup Control MCP Server starting...")
logger.info(f"State file: {STATE_FILE}")
logger.info("=" * 60)
# Run MCP server (uses stdio transport by default)
mcp.run()