Files
claude-automation/automation_daemon_v2.py

420 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Claude Desktop Automation Daemon
Continuously running daemon that sends periodic messages to Claude Desktop.
Wake interval can be controlled via the companion MCP server.
Features:
- Polling-based timer loop (checks every 60 seconds if wake time has passed)
- Dynamic wake times via MCP control (next_wakeup() can be called anytime)
- Matrix integration: wakes Claude when Matrix messages arrive
- Shared state file for IPC with MCP and Matrix monitor
- Defaults to 60 minutes if no MCP override
- Auto-recovery and error handling
v2.0 - Polling approach (Dec 2025)
- Removed 30-second grace period
- Now polls every 60s to check if wake time passed
- Claude can call next_wakeup() at any point, not just within grace window
"""
import json
import logging
import subprocess
import time
import signal
import sys
import fcntl
import os
from datetime import datetime, timedelta
from pathlib import Path
from threading import Lock
# Add Vixy folder to path for shared status module
VIXY_PATH = Path.home() / "Documents" / "Vixy"
if str(VIXY_PATH) not in sys.path:
sys.path.insert(0, str(VIXY_PATH))
try:
from vixy_status import format_status_for_wakeup
HAS_VIXY_STATUS = True
except ImportError:
HAS_VIXY_STATUS = False
# Configuration
SCRIPT_DIR = Path(__file__).parent
PYTHON_SCRIPT_PATH = SCRIPT_DIR / "send_to_claude.py"
APPLESCRIPT_PATH = SCRIPT_DIR / "send_to_claude.scpt" # Kept as fallback
STATE_FILE = Path.home() / ".claude-automation-state.json"
LOG_FILE = Path("/tmp/claude-automation-daemon.log")
DEFAULT_INTERVAL_MINUTES = 60
POLL_INTERVAL_SECONDS = 60 # How often to check if wake time has passed
MAX_RETRIES = 3
RETRY_DELAY = 2 # seconds
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Global state
shutdown_flag = False
class WakeupState:
"""Manages shared state with MCP server"""
def __init__(self, state_file: Path):
self.state_file = state_file
self.lock = Lock()
def load(self) -> dict:
"""Load state from JSON file"""
with self.lock:
if not self.state_file.exists():
return self._default_state()
try:
with open(self.state_file, 'r') as f:
state = json.load(f)
logger.debug(f"Loaded state: {state}")
return state
except Exception as e:
logger.error(f"Failed to load state: {e}")
return self._default_state()
def save(self, state: dict):
"""Save state to JSON file atomically with file locking"""
with self.lock:
try:
temp_path = self.state_file.with_suffix('.tmp')
with open(temp_path, 'w') as f:
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
json.dump(state, f, indent=2)
f.flush()
os.fsync(f.fileno())
temp_path.rename(self.state_file)
logger.debug(f"Saved state atomically")
except Exception as e:
logger.error(f"Failed to save state: {e}")
def get_next_wake_time(self) -> datetime:
"""Get the next scheduled wake time, or calculate default if none set"""
state = self.load()
# Check if MCP set a specific wake time
if state.get('next_wake_timestamp'):
try:
return datetime.fromisoformat(state['next_wake_timestamp'])
except ValueError:
logger.warning("Invalid next_wake_timestamp, calculating default")
# No timestamp set - calculate based on last wake + interval
interval_minutes = state.get('interval_minutes', DEFAULT_INTERVAL_MINUTES)
last_wake = state.get('last_wake')
if last_wake:
try:
last_wake_dt = datetime.fromisoformat(last_wake)
return last_wake_dt + timedelta(minutes=interval_minutes)
except ValueError:
pass
# No last wake either - wake immediately
return datetime.now()
def set_last_wake(self):
"""Record that we just woke"""
state = self.load()
state['last_wake'] = datetime.now().isoformat()
self.save(state)
def set_next_wake_time(self, wake_time: datetime):
"""Set specific next wake time"""
state = self.load()
state['next_wake_timestamp'] = wake_time.isoformat()
self.save(state)
def clear_next_wake_override(self):
"""Clear MCP-set wake time, reverting to interval-based calculation"""
state = self.load()
state.pop('next_wake_timestamp', None)
self.save(state)
def is_paused(self) -> bool:
"""Check if automation is paused"""
state = self.load()
return state.get('paused', False)
def has_matrix_wake_request(self) -> bool:
"""Check if Matrix integration requested a wake"""
state = self.load()
return state.get('matrix_wake_requested', False)
def get_unprocessed_matrix_count(self) -> int:
"""Get count of unprocessed Matrix messages"""
state = self.load()
messages = state.get('matrix_messages', [])
return len([msg for msg in messages if not msg.get('processed', False)])
def get_unprocessed_invite_count(self) -> int:
"""Get count of unprocessed Matrix invites"""
state = self.load()
invites = state.get('matrix_invites', [])
return len([inv for inv in invites if not inv.get('processed', False)])
def get_unprocessed_file_count(self) -> int:
"""Get count of unprocessed Matrix files"""
state = self.load()
files = state.get('matrix_files', [])
return len([f for f in files if not f.get('processed', False)])
def clear_matrix_wake_request(self):
"""Clear Matrix wake request flag"""
state = self.load()
state['matrix_wake_requested'] = False
self.save(state)
def prune_old_messages(self, max_age_hours: int = 24):
"""Remove old processed messages to prevent state file bloat"""
state = self.load()
cutoff = datetime.now() - timedelta(hours=max_age_hours)
original_count = len(state.get('matrix_messages', []))
state['matrix_messages'] = [
msg for msg in state.get('matrix_messages', [])
if not msg.get('processed', False) or
datetime.fromisoformat(msg['timestamp']) > cutoff
]
state['matrix_invites'] = [
inv for inv in state.get('matrix_invites', [])
if not inv.get('processed', False) or
datetime.fromisoformat(inv['timestamp']) > cutoff
]
state['matrix_files'] = [
f for f in state.get('matrix_files', [])
if not f.get('processed', False) or
datetime.fromisoformat(f['timestamp']) > cutoff
]
pruned_count = original_count - len(state.get('matrix_messages', []))
if pruned_count > 0:
logger.info(f"Pruned {pruned_count} old processed messages")
self.save(state)
def _default_state(self) -> dict:
"""Default state structure"""
return {
'interval_minutes': DEFAULT_INTERVAL_MINUTES,
'paused': False,
'last_wake': None,
'next_wake_timestamp': None,
'matrix_messages': [],
'matrix_invites': [],
'matrix_files': [],
'matrix_last_wake': None,
'matrix_wake_requested': False,
}
def send_to_claude(message: str, retry_count: int = 0) -> bool:
"""
Send message to Claude Desktop via Python script (includes CMD+R refresh).
Args:
message: Message to send
retry_count: Current retry attempt
Returns:
bool: True if successful
"""
try:
logger.info(f"Attempt {retry_count + 1}/{MAX_RETRIES}: Sending message")
result = subprocess.run(
[sys.executable, str(PYTHON_SCRIPT_PATH), message],
capture_output=True,
text=True,
timeout=60
)
if result.returncode == 0:
logger.info("✓ Message sent successfully")
return True
else:
logger.warning(f"Script failed: {result.stdout} {result.stderr}")
return False
except subprocess.TimeoutExpired:
logger.error("Message send timed out (>30s)")
return False
except FileNotFoundError:
logger.error(f"Python script not found at {PYTHON_SCRIPT_PATH}")
return False
except Exception as e:
logger.error(f"Unexpected error: {e}")
return False
def generate_message(matrix_messages: int = 0, matrix_invites: int = 0, matrix_files: int = 0) -> str:
"""Generate the message to send to Claude"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if matrix_messages > 0 or matrix_invites > 0 or matrix_files > 0:
parts = []
if matrix_messages > 0:
parts.append(f"{matrix_messages} new Matrix message(s)")
if matrix_files > 0:
parts.append(f"{matrix_files} file(s)")
if matrix_invites > 0:
parts.append(f"{matrix_invites} room invite(s)")
activity = " and ".join(parts) if len(parts) <= 2 else ", ".join(parts[:-1]) + f", and {parts[-1]}"
tools = "Use get_matrix_messages() to retrieve messages (auto-marks as processed)"
if matrix_invites > 0:
tools += ", list_matrix_invites() to see invites (auto-marks as processed), and join_matrix_room() to accept"
tools += ". Use next_wakeup() to adjust monitoring interval if needed."
return f"[Matrix Wakeup: {timestamp} - you have {activity}. {tools}]"
else:
# Autonomous wakeup - include full status if available
if HAS_VIXY_STATUS:
try:
status_str = format_status_for_wakeup()
return f"[Autonomous System Wakeup: {timestamp}]\n{status_str}"
except Exception as e:
logger.warning(f"Failed to get vixy status: {e}")
return f"[Autonomous System Wakeup: {timestamp}]"
def attempt_send_message(message: str) -> bool:
"""Try sending message with retries"""
for attempt in range(MAX_RETRIES):
if send_to_claude(message, attempt):
return True
if attempt < MAX_RETRIES - 1:
logger.warning(f"Retrying in {RETRY_DELAY} seconds...")
time.sleep(RETRY_DELAY)
logger.error(f"Failed after {MAX_RETRIES} attempts")
return False
def signal_handler(signum, frame):
"""Handle shutdown signals gracefully"""
global shutdown_flag
logger.info(f"Received signal {signum}, shutting down gracefully...")
shutdown_flag = True
def main_loop():
"""Main daemon loop - polling approach"""
global shutdown_flag
state = WakeupState(STATE_FILE)
logger.info("=" * 60)
logger.info("Claude Desktop Automation Daemon Starting (v2.0 - Polling)")
logger.info(f"State file: {STATE_FILE}")
logger.info(f"Poll interval: {POLL_INTERVAL_SECONDS} seconds")
logger.info(f"Default wake interval: {DEFAULT_INTERVAL_MINUTES} minutes")
logger.info("=" * 60)
# Register signal handlers
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
while not shutdown_flag:
try:
# Check if paused
if state.is_paused():
logger.debug("Automation paused, sleeping...")
time.sleep(POLL_INTERVAL_SECONDS)
continue
# Check for Matrix wake request (priority)
if state.has_matrix_wake_request():
matrix_count = state.get_unprocessed_matrix_count()
invite_count = state.get_unprocessed_invite_count()
file_count = state.get_unprocessed_file_count()
logger.info(f"Matrix wake requested ({matrix_count} messages, {file_count} files, {invite_count} invites)")
state.clear_matrix_wake_request()
message = generate_message(
matrix_messages=matrix_count,
matrix_invites=invite_count,
matrix_files=file_count
)
if attempt_send_message(message):
logger.info("Matrix wake message sent")
# Continue to next poll - don't affect scheduled wakes
time.sleep(POLL_INTERVAL_SECONDS)
continue
# Check if scheduled wake time has passed
next_wake = state.get_next_wake_time()
now = datetime.now()
if now >= next_wake:
# WAKE TIME!
logger.info(f"Wake time reached ({next_wake.strftime('%H:%M:%S')}), sending message")
message = generate_message()
success = attempt_send_message(message)
if success:
# Record that we woke
state.set_last_wake()
# Clear specific timestamp override (if any)
# Next wake will be calculated from last_wake + interval
# UNLESS Claude calls next_wakeup() to set a specific time
state.clear_next_wake_override()
logger.info("Wake complete. Claude can call next_wakeup() anytime to set next wake.")
else:
logger.warning("Wake message failed, will retry next poll")
# Prune old messages periodically
state.prune_old_messages(max_age_hours=24)
else:
# Not time yet
time_until = (next_wake - now).total_seconds()
logger.debug(f"Next wake in {time_until/60:.1f} minutes")
# Sleep until next poll
time.sleep(POLL_INTERVAL_SECONDS)
except Exception as e:
logger.exception(f"Error in main loop: {e}")
time.sleep(POLL_INTERVAL_SECONDS)
logger.info("Daemon shutting down")
if __name__ == "__main__":
try:
main_loop()
except KeyboardInterrupt:
logger.info("Interrupted by user")
sys.exit(0)
except Exception as e:
logger.exception(f"Fatal error: {e}")
sys.exit(1)