Files
claude-automation/matrix_integration.py
Vixy 81c18c219c Initial commit: claude-automation 🦊
Autonomous wakeup system for Claude Desktop.
Built with love, Day 44.

Components:
- automation_daemon_v2.py - Main polling daemon
- send_to_claude.py - AppleScript wrapper for sending messages
- matrix_mcp.py - Matrix integration MCP
- wakeup_mcp.py - Wakeup control MCP
- matrix_integration.py - Matrix bridge

Originally built by Alex, adopted and maintained by Vixy 💕
2025-12-15 20:18:38 -06:00

781 lines
29 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Matrix Integration for Claude Desktop Automation
Monitors Matrix rooms for messages and images, queuing them for Claude to process.
Integrates with automation_daemon.py to trigger Claude wakes when new messages arrive.
Features:
- Async Matrix sync loop using matrix-nio
- Message and image queuing to shared state file
- 2-minute rate limiting between Matrix-triggered wakes
- Room whitelist for selective monitoring
- Image download and compression (reuses patterns from image-watch-mcp)
- Secure credential management
"""
import asyncio
import json
import logging
import fcntl
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, List, Dict, Any
from io import BytesIO
import hashlib
try:
from nio import (
AsyncClient,
MatrixRoom,
RoomMessageText,
RoomMessageImage,
RoomMessageFile,
RoomMessageAudio,
InviteEvent,
LoginResponse,
SyncResponse,
DownloadResponse,
DownloadError,
)
except ImportError:
raise ImportError(
"matrix-nio not installed. Install with: pip3 install matrix-nio\n"
"Note: Without [e2e] extra, only unencrypted rooms are supported"
)
try:
from PIL import Image
except ImportError:
raise ImportError("Pillow not installed. Install with: pip3 install Pillow")
# Configuration
STATE_FILE = Path.home() / ".claude-automation-state.json"
CREDENTIALS_FILE = Path.home() / ".matrix-credentials.json"
MATRIX_DATA_DIR = Path.home() / ".matrix-data"
LOG_FILE = Path("/tmp/matrix-integration.log")
# Matrix wake rate limiting
MIN_MATRIX_WAKE_INTERVAL_SECONDS = 120 # 2 minutes
# Image compression (same as image-watch-mcp)
MAX_IMAGE_SIZE_MB = 1.0 # Target for base64-encoded size (checked directly, not estimated)
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class MatrixCredentials:
"""Manages Matrix credentials and configuration"""
def __init__(self, credentials_file: Path):
self.credentials_file = credentials_file
def load(self) -> Dict[str, Any]:
"""Load credentials from JSON file"""
if not self.credentials_file.exists():
raise FileNotFoundError(
f"Credentials file not found: {self.credentials_file}\n"
f"Run setup_matrix.sh to create credentials"
)
try:
with open(self.credentials_file, 'r') as f:
creds = json.load(f)
# Validate required fields
required = ['homeserver', 'user_id', 'access_token', 'device_id']
missing = [field for field in required if field not in creds]
if missing:
raise ValueError(f"Missing required credentials: {missing}")
logger.info(f"Loaded credentials for {creds['user_id']}")
return creds
except Exception as e:
logger.error(f"Failed to load credentials: {e}")
raise
def save(self, homeserver: str, user_id: str, access_token: str, device_id: str):
"""Save credentials to JSON file (chmod 600)"""
try:
creds = {
'homeserver': homeserver,
'user_id': user_id,
'access_token': access_token,
'device_id': device_id,
'room_whitelist': [], # Empty by default
}
with open(self.credentials_file, 'w') as f:
json.dump(creds, f, indent=2)
# Secure permissions
self.credentials_file.chmod(0o600)
logger.info(f"Saved credentials for {user_id}")
except Exception as e:
logger.error(f"Failed to save credentials: {e}")
raise
class MatrixState:
"""Manages Matrix state in shared automation state file"""
def __init__(self, state_file: Path):
self.state_file = state_file
def load(self) -> Dict[str, Any]:
"""Load full automation state"""
if not self.state_file.exists():
return self._default_state()
try:
with open(self.state_file, 'r') as f:
return json.load(f)
except Exception as e:
logger.error(f"Failed to load state: {e}")
return self._default_state()
def save(self, state: Dict[str, Any]):
"""Save full automation state atomically with file locking"""
try:
temp_path = self.state_file.with_suffix('.tmp')
with open(temp_path, 'w') as f:
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
json.dump(state, f, indent=2)
f.flush()
os.fsync(f.fileno())
temp_path.rename(self.state_file)
except Exception as e:
logger.error(f"Failed to save state: {e}")
def add_message(self, room_id: str, room_name: str, sender: str,
message: str, timestamp: datetime, event_id: str):
"""Add a text message to the queue"""
state = self.load()
if 'matrix_messages' not in state:
state['matrix_messages'] = []
state['matrix_messages'].append({
'type': 'text',
'room_id': room_id,
'room_name': room_name,
'sender': sender,
'message': message,
'timestamp': timestamp.isoformat(),
'event_id': event_id,
'processed': False,
})
self.save(state)
logger.info(f"Queued text message from {sender} in {room_name}")
def add_image(self, room_id: str, room_name: str, sender: str,
image_data: bytes, filename: str, timestamp: datetime, event_id: str):
"""Add an image message to the queue (base64 encoded)"""
import base64
state = self.load()
if 'matrix_messages' not in state:
state['matrix_messages'] = []
# Encode image to base64
image_b64 = base64.b64encode(image_data).decode('utf-8')
state['matrix_messages'].append({
'type': 'image',
'room_id': room_id,
'room_name': room_name,
'sender': sender,
'filename': filename,
'image_data': image_b64,
'timestamp': timestamp.isoformat(),
'event_id': event_id,
'processed': False,
})
self.save(state)
logger.info(f"Queued image from {sender} in {room_name}: {filename}")
def add_audio(self, room_id: str, room_name: str, sender: str,
audio_data: bytes, filename: str, timestamp: datetime, event_id: str):
"""Add an audio/voice message to the queue (base64 encoded)"""
import base64
state = self.load()
if 'matrix_messages' not in state:
state['matrix_messages'] = []
# Encode audio to base64
audio_b64 = base64.b64encode(audio_data).decode('utf-8')
state['matrix_messages'].append({
'type': 'audio',
'room_id': room_id,
'room_name': room_name,
'sender': sender,
'filename': filename,
'audio_data': audio_b64,
'timestamp': timestamp.isoformat(),
'event_id': event_id,
'processed': False,
})
self.save(state)
logger.info(f"Queued audio from {sender} in {room_name}: {filename}")
def add_invite(self, room_id: str, room_name: str, inviter: str, timestamp: datetime):
"""Add a room invite to the queue"""
state = self.load()
if 'matrix_invites' not in state:
state['matrix_invites'] = []
# Check if invite already queued
for invite in state['matrix_invites']:
if invite['room_id'] == room_id and not invite.get('processed', False):
logger.debug(f"Invite to {room_id} already queued")
return
state['matrix_invites'].append({
'room_id': room_id,
'room_name': room_name,
'inviter': inviter,
'timestamp': timestamp.isoformat(),
'processed': False,
})
self.save(state)
logger.info(f"Queued invite to {room_name} from {inviter}")
def add_file(self, room_id: str, room_name: str, sender: str,
filename: str, filesize: int, mimetype: str, mxc_url: str,
timestamp: datetime, event_id: str):
"""Add a file message to the queue"""
state = self.load()
if 'matrix_files' not in state:
state['matrix_files'] = []
state['matrix_files'].append({
'room_id': room_id,
'room_name': room_name,
'sender': sender,
'filename': filename,
'filesize': filesize,
'mimetype': mimetype,
'mxc_url': mxc_url,
'timestamp': timestamp.isoformat(),
'event_id': event_id,
'processed': False,
})
self.save(state)
logger.info(f"Queued file from {sender} in {room_name}: {filename}")
def should_wake_for_matrix(self) -> bool:
"""Check if enough time has passed since last Matrix wake"""
state = self.load()
last_wake_str = state.get('matrix_last_wake')
if not last_wake_str:
return True
try:
last_wake = datetime.fromisoformat(last_wake_str)
elapsed = (datetime.now() - last_wake).total_seconds()
return elapsed >= MIN_MATRIX_WAKE_INTERVAL_SECONDS
except ValueError:
return True
def set_matrix_wake(self):
"""Record that we triggered a Matrix wake"""
state = self.load()
state['matrix_last_wake'] = datetime.now().isoformat()
self.save(state)
logger.info("Set matrix_last_wake timestamp")
def _default_state(self) -> Dict[str, Any]:
"""Default state structure"""
return {
'interval_minutes': 60,
'paused': False,
'last_wake': None,
'next_wake_timestamp': None,
'matrix_messages': [],
'matrix_invites': [],
'matrix_files': [],
'matrix_last_wake': None,
'matrix_wake_requested': False,
}
class MatrixMonitor:
"""
Monitors Matrix rooms for messages and images.
Integrates with automation_daemon.py by:
1. Queuing messages to shared state file
2. Setting matrix_wake_requested flag for daemon to trigger Claude
3. Respecting 2-minute rate limit between wakes
"""
def __init__(self, credentials_file: Path = CREDENTIALS_FILE,
state_file: Path = STATE_FILE):
self.credentials_file = credentials_file
self.state_file = state_file
self.creds_manager = MatrixCredentials(credentials_file)
self.state_manager = MatrixState(state_file)
self.client: Optional[AsyncClient] = None
self.shutdown_flag = False
self.initial_sync_done = False # Skip queueing messages until initial sync completes
# Load credentials
self.creds = self.creds_manager.load()
self.room_whitelist = set(self.creds.get('room_whitelist', []))
# Setup Matrix client
self._setup_client()
def _setup_client(self):
"""Initialize Matrix client with credentials"""
# Create data directory for E2EE store
MATRIX_DATA_DIR.mkdir(exist_ok=True)
self.client = AsyncClient(
homeserver=self.creds['homeserver'],
user=self.creds['user_id'],
store_path=str(MATRIX_DATA_DIR),
)
# Restore session from credentials
self.client.access_token = self.creds['access_token']
self.client.device_id = self.creds['device_id']
self.client.user_id = self.creds['user_id'] # CRITICAL: Set user_id for session restore
# Register event callbacks
self.client.add_event_callback(self._on_message_text, RoomMessageText)
self.client.add_event_callback(self._on_message_image, RoomMessageImage)
self.client.add_event_callback(self._on_message_audio, RoomMessageAudio)
self.client.add_event_callback(self._on_message_file, RoomMessageFile)
self.client.add_event_callback(self._on_invite, InviteEvent)
logger.info(f"Matrix client configured for {self.creds['user_id']}")
async def _on_message_text(self, room: MatrixRoom, event: RoomMessageText):
"""Handle incoming text messages"""
# Skip messages from ourselves
logger.debug(f"Message from {event.sender}, bot user_id={self.client.user_id}, match={event.sender == self.client.user_id}")
if event.sender == self.client.user_id:
logger.info(f"✓ Skipping own message: {event.body[:30]}... (sender={event.sender})")
return
# Skip historical messages during initial sync
if not self.initial_sync_done:
logger.debug(f"Skipping historical message during initial sync: {event.body[:30]}...")
return
# Check room whitelist
if self.room_whitelist and room.room_id not in self.room_whitelist:
logger.debug(f"Ignoring message from non-whitelisted room: {room.display_name}")
return
logger.info(f"New message in {room.display_name} from {event.sender}: {event.body[:50]}...")
# Queue message
self.state_manager.add_message(
room_id=room.room_id,
room_name=room.display_name or room.room_id,
sender=event.sender,
message=event.body,
timestamp=datetime.fromtimestamp(event.server_timestamp / 1000),
event_id=event.event_id,
)
# Trigger wake if rate limit allows
await self._maybe_trigger_wake()
async def _on_message_image(self, room: MatrixRoom, event: RoomMessageImage):
"""Handle incoming image messages"""
# Skip messages from ourselves
logger.debug(f"Image from {event.sender}, bot user_id={self.client.user_id}, match={event.sender == self.client.user_id}")
if event.sender == self.client.user_id:
logger.info(f"✓ Skipping own image message (sender={event.sender})")
return
# Skip historical messages during initial sync
if not self.initial_sync_done:
logger.debug(f"Skipping historical image during initial sync")
return
# Check room whitelist
if self.room_whitelist and room.room_id not in self.room_whitelist:
logger.debug(f"Ignoring image from non-whitelisted room: {room.display_name}")
return
logger.info(f"New image in {room.display_name} from {event.sender}: {event.body}")
# Download and compress image
try:
image_data = await self._download_and_compress_image(event.url)
if image_data:
self.state_manager.add_image(
room_id=room.room_id,
room_name=room.display_name or room.room_id,
sender=event.sender,
image_data=image_data,
filename=event.body,
timestamp=datetime.fromtimestamp(event.server_timestamp / 1000),
event_id=event.event_id,
)
# Trigger wake if rate limit allows
await self._maybe_trigger_wake()
else:
logger.warning(f"Failed to download/compress image: {event.body}")
except Exception as e:
logger.error(f"Error processing image: {e}")
async def _on_message_audio(self, room: MatrixRoom, event: RoomMessageAudio):
"""Handle incoming audio/voice messages"""
# Skip messages from ourselves
logger.debug(f"Audio from {event.sender}, bot user_id={self.client.user_id}")
if event.sender == self.client.user_id:
logger.info(f"✓ Skipping own audio message (sender={event.sender})")
return
# Skip historical messages during initial sync
if not self.initial_sync_done:
logger.debug(f"Skipping historical audio during initial sync")
return
# Check room whitelist
if self.room_whitelist and room.room_id not in self.room_whitelist:
logger.debug(f"Ignoring audio from non-whitelisted room: {room.display_name}")
return
logger.info(f"New audio in {room.display_name} from {event.sender}: {event.body}")
# Download audio file
try:
response = await self.client.download(event.url)
if isinstance(response, DownloadError):
logger.error(f"Audio download failed: {response.message}")
return
# Queue audio message (similar to image, but type="audio")
self.state_manager.add_audio(
room_id=room.room_id,
room_name=room.display_name or room.room_id,
sender=event.sender,
audio_data=response.body,
filename=event.body,
timestamp=datetime.fromtimestamp(event.server_timestamp / 1000),
event_id=event.event_id,
)
# Trigger wake if rate limit allows
await self._maybe_trigger_wake()
except Exception as e:
logger.error(f"Error processing audio: {e}")
async def _download_and_compress_image(self, mxc_url: str) -> Optional[bytes]:
"""
Download and compress Matrix image to fit Claude's 1MB limit.
Reuses compression logic from image-watch-mcp.
"""
try:
# Download image from Matrix
response = await self.client.download(mxc_url)
if isinstance(response, DownloadError):
logger.error(f"Download failed: {response.message}")
return None
# Compress image
image_bytes = await self._smart_compress(response.body)
return image_bytes
except Exception as e:
logger.error(f"Failed to download image from {mxc_url}: {e}")
return None
async def _smart_compress(self, image_data: bytes,
target_size_mb: float = MAX_IMAGE_SIZE_MB) -> bytes:
"""
Compress image to fit under target size with quality adaptation.
Args:
image_data: Raw image bytes
target_size_mb: Target size in MB for base64-encoded output (default 1.0MB)
Returns:
Compressed JPEG bytes that will be under target when base64 encoded
NOTE: We check the ACTUAL base64 size, not an estimate.
"""
try:
import base64
# Load image
img = Image.open(BytesIO(image_data))
# Convert RGBA to RGB if needed
if img.mode == 'RGBA':
background = Image.new('RGB', img.size, (255, 255, 255))
background.paste(img, mask=img.split()[3])
img = background
elif img.mode not in ('RGB', 'L'):
img = img.convert('RGB')
# Try progressive quality reduction
for quality in [85, 75, 65, 55]:
buffer = BytesIO()
img.save(buffer, format='JPEG', quality=quality, optimize=True)
img_bytes = buffer.getvalue()
# Check ACTUAL base64 size instead of estimating
base64_encoded = base64.b64encode(img_bytes)
base64_size_mb = len(base64_encoded) / (1024 * 1024)
raw_size_mb = len(img_bytes) / (1024 * 1024)
if base64_size_mb <= target_size_mb:
logger.info(
f"✓ Compressed to {img.width}x{img.height} @ quality {quality}: "
f"{raw_size_mb:.2f}MB raw → {base64_size_mb:.2f}MB base64 (actual)"
)
return img_bytes
# If still too large, resize
logger.warning(f"Quality reduction insufficient, resizing from {img.width}x{img.height}")
for scale in [0.75, 0.5, 0.25]:
new_width = int(img.width * scale)
new_height = int(img.height * scale)
resized = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
buffer = BytesIO()
resized.save(buffer, format='JPEG', quality=75, optimize=True)
img_bytes = buffer.getvalue()
# Check ACTUAL base64 size instead of estimating
base64_encoded = base64.b64encode(img_bytes)
base64_size_mb = len(base64_encoded) / (1024 * 1024)
raw_size_mb = len(img_bytes) / (1024 * 1024)
if base64_size_mb <= target_size_mb:
logger.info(
f"✓ Resized to {new_width}x{new_height}: "
f"{raw_size_mb:.2f}MB raw → {base64_size_mb:.2f}MB base64 (actual)"
)
return img_bytes
# Last resort: use heavily compressed version
logger.warning("Could not compress to target, using minimum quality")
buffer = BytesIO()
final_img = img.resize((800, 600), Image.Resampling.LANCZOS)
final_img.save(buffer, format='JPEG', quality=50, optimize=True)
return buffer.getvalue()
except Exception as e:
logger.error(f"Compression failed: {e}")
raise
async def _on_message_file(self, room: MatrixRoom, event: RoomMessageFile):
"""Handle incoming file messages"""
# Skip messages from ourselves
logger.debug(f"File from {event.sender}, bot user_id={self.client.user_id}, match={event.sender == self.client.user_id}")
if event.sender == self.client.user_id:
logger.info(f"✓ Skipping own file message (sender={event.sender})")
return
# Skip historical messages during initial sync
if not self.initial_sync_done:
logger.debug(f"Skipping historical file during initial sync")
return
# Check room whitelist
if self.room_whitelist and room.room_id not in self.room_whitelist:
logger.debug(f"Ignoring file from non-whitelisted room: {room.display_name}")
return
logger.info(f"New file in {room.display_name} from {event.sender}: {event.body}")
# Queue file metadata (don't download yet)
self.state_manager.add_file(
room_id=room.room_id,
room_name=room.display_name or room.room_id,
sender=event.sender,
filename=event.body,
filesize=event.source.get('content', {}).get('info', {}).get('size', 0),
mimetype=event.source.get('content', {}).get('info', {}).get('mimetype', 'application/octet-stream'),
mxc_url=event.url,
timestamp=datetime.fromtimestamp(event.server_timestamp / 1000),
event_id=event.event_id,
)
# Trigger wake if rate limit allows
await self._maybe_trigger_wake()
async def _on_invite(self, room: MatrixRoom, event: InviteEvent):
"""Handle room invites"""
# Skip historical invites during initial sync
if not self.initial_sync_done:
logger.debug(f"Skipping historical invite during initial sync")
return
logger.info(f"Received invite to {room.display_name or room.room_id} from {event.sender}")
# Queue invite - InviteEvent doesn't have server_timestamp, use current time
self.state_manager.add_invite(
room_id=room.room_id,
room_name=room.display_name or room.room_id,
inviter=event.sender,
timestamp=datetime.now(),
)
# Trigger wake if rate limit allows
await self._maybe_trigger_wake()
async def _maybe_trigger_wake(self):
"""Trigger Claude wake if rate limit allows"""
if self.state_manager.should_wake_for_matrix():
logger.info("Rate limit OK, triggering Claude wake for Matrix activity")
# Set typing indicators in rooms with unprocessed messages
await self._set_typing_indicators()
self.state_manager.set_matrix_wake()
# Set flag that automation_daemon.py will check
state = self.state_manager.load()
state['matrix_wake_requested'] = True
self.state_manager.save(state)
else:
logger.info("Rate limit active, queuing activity without immediate wake")
async def _set_typing_indicators(self):
"""Set typing indicators in rooms with unprocessed messages"""
try:
state = self.state_manager.load()
# Get all rooms with unprocessed activity
rooms_with_activity = set()
# Check messages
for msg in state.get('matrix_messages', []):
if not msg.get('processed', False):
rooms_with_activity.add(msg['room_id'])
# Check invites
for inv in state.get('matrix_invites', []):
if not inv.get('processed', False):
rooms_with_activity.add(inv['room_id'])
# Check files
for file in state.get('matrix_files', []):
if not file.get('processed', False):
rooms_with_activity.add(file['room_id'])
# Set typing indicator for each room (30 seconds)
for room_id in rooms_with_activity:
try:
await self.client.room_typing(room_id, typing_state=True, timeout=30000)
logger.debug(f"Set typing indicator in {room_id}")
except Exception as e:
logger.warning(f"Failed to set typing in {room_id}: {e}")
if rooms_with_activity:
logger.info(f"Set typing indicators in {len(rooms_with_activity)} rooms")
except Exception as e:
logger.error(f"Failed to set typing indicators: {e}")
async def sync_loop(self):
"""Main sync loop - runs continuously"""
logger.info("=" * 60)
logger.info("Matrix Monitor Starting")
logger.info(f"User: {self.creds['user_id']}")
logger.info(f"Homeserver: {self.creds['homeserver']}")
logger.info(f"Room whitelist: {self.room_whitelist or 'All rooms'}")
logger.info("=" * 60)
try:
# Initial sync
logger.info("Performing initial sync (historical messages will be ignored)...")
await self.client.sync(timeout=30000, full_state=True)
self.initial_sync_done = True # Now start queueing new messages
logger.info(f"Initial sync complete - now monitoring for NEW messages only")
logger.info(f"Bot user ID: {self.client.user_id} - messages from this user will be SKIPPED")
# Continuous sync loop
while not self.shutdown_flag:
try:
response = await self.client.sync(timeout=30000)
if isinstance(response, SyncResponse):
# Sync successful, callbacks already fired
# Check if we have queued messages that can now trigger a wake
# (in case messages arrived during cooldown period)
state = self.state_manager.load()
unprocessed_messages = [m for m in state.get('matrix_messages', []) if not m.get('processed', False)]
unprocessed_invites = [i for i in state.get('matrix_invites', []) if not i.get('processed', False)]
if (unprocessed_messages or unprocessed_invites) and not state.get('matrix_wake_requested', False):
# We have queued items and no wake pending, check if rate limit allows wake now
await self._maybe_trigger_wake()
else:
logger.warning(f"Sync returned unexpected type: {type(response)}")
except Exception as e:
logger.error(f"Error in sync loop: {e}")
await asyncio.sleep(10) # Back off on errors
except Exception as e:
logger.exception(f"Fatal error in sync loop: {e}")
finally:
logger.info("Matrix monitor shutting down")
await self.client.close()
def shutdown(self):
"""Request graceful shutdown"""
logger.info("Shutdown requested")
self.shutdown_flag = True
async def main():
"""Main entry point for testing"""
monitor = MatrixMonitor()
try:
await monitor.sync_loop()
except KeyboardInterrupt:
logger.info("Interrupted by user")
monitor.shutdown()
if __name__ == "__main__":
asyncio.run(main())