#!/usr/bin/env python3 """ Matrix Integration for Claude Desktop Automation Monitors Matrix rooms for messages and images, queuing them for Claude to process. Integrates with automation_daemon.py to trigger Claude wakes when new messages arrive. Features: - Async Matrix sync loop using matrix-nio - Message and image queuing to shared state file - 2-minute rate limiting between Matrix-triggered wakes - Room whitelist for selective monitoring - Image download and compression (reuses patterns from image-watch-mcp) - Secure credential management """ import asyncio import json import logging import fcntl import os from datetime import datetime, timedelta from pathlib import Path from typing import Optional, List, Dict, Any from io import BytesIO import hashlib try: from nio import ( AsyncClient, MatrixRoom, RoomMessageText, RoomMessageImage, RoomMessageFile, RoomMessageAudio, InviteEvent, LoginResponse, SyncResponse, DownloadResponse, DownloadError, ) except ImportError: raise ImportError( "matrix-nio not installed. Install with: pip3 install matrix-nio\n" "Note: Without [e2e] extra, only unencrypted rooms are supported" ) try: from PIL import Image except ImportError: raise ImportError("Pillow not installed. Install with: pip3 install Pillow") # Configuration STATE_FILE = Path.home() / ".claude-automation-state.json" CREDENTIALS_FILE = Path.home() / ".matrix-credentials.json" MATRIX_DATA_DIR = Path.home() / ".matrix-data" LOG_FILE = Path("/tmp/matrix-integration.log") # Matrix wake rate limiting MIN_MATRIX_WAKE_INTERVAL_SECONDS = 120 # 2 minutes # Image compression (same as image-watch-mcp) MAX_IMAGE_SIZE_MB = 1.0 # Target for base64-encoded size (checked directly, not estimated) # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(LOG_FILE), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class MatrixCredentials: """Manages Matrix credentials and configuration""" def __init__(self, credentials_file: Path): self.credentials_file = credentials_file def load(self) -> Dict[str, Any]: """Load credentials from JSON file""" if not self.credentials_file.exists(): raise FileNotFoundError( f"Credentials file not found: {self.credentials_file}\n" f"Run setup_matrix.sh to create credentials" ) try: with open(self.credentials_file, 'r') as f: creds = json.load(f) # Validate required fields required = ['homeserver', 'user_id', 'access_token', 'device_id'] missing = [field for field in required if field not in creds] if missing: raise ValueError(f"Missing required credentials: {missing}") logger.info(f"Loaded credentials for {creds['user_id']}") return creds except Exception as e: logger.error(f"Failed to load credentials: {e}") raise def save(self, homeserver: str, user_id: str, access_token: str, device_id: str): """Save credentials to JSON file (chmod 600)""" try: creds = { 'homeserver': homeserver, 'user_id': user_id, 'access_token': access_token, 'device_id': device_id, 'room_whitelist': [], # Empty by default } with open(self.credentials_file, 'w') as f: json.dump(creds, f, indent=2) # Secure permissions self.credentials_file.chmod(0o600) logger.info(f"Saved credentials for {user_id}") except Exception as e: logger.error(f"Failed to save credentials: {e}") raise class MatrixState: """Manages Matrix state in shared automation state file""" def __init__(self, state_file: Path): self.state_file = state_file def load(self) -> Dict[str, Any]: """Load full automation state""" if not self.state_file.exists(): return self._default_state() try: with open(self.state_file, 'r') as f: return json.load(f) except Exception as e: logger.error(f"Failed to load state: {e}") return self._default_state() def save(self, state: Dict[str, Any]): """Save full automation state atomically with file locking""" try: temp_path = self.state_file.with_suffix('.tmp') with open(temp_path, 'w') as f: fcntl.flock(f.fileno(), fcntl.LOCK_EX) json.dump(state, f, indent=2) f.flush() os.fsync(f.fileno()) temp_path.rename(self.state_file) except Exception as e: logger.error(f"Failed to save state: {e}") def add_message(self, room_id: str, room_name: str, sender: str, message: str, timestamp: datetime, event_id: str): """Add a text message to the queue""" state = self.load() if 'matrix_messages' not in state: state['matrix_messages'] = [] state['matrix_messages'].append({ 'type': 'text', 'room_id': room_id, 'room_name': room_name, 'sender': sender, 'message': message, 'timestamp': timestamp.isoformat(), 'event_id': event_id, 'processed': False, }) self.save(state) logger.info(f"Queued text message from {sender} in {room_name}") def add_image(self, room_id: str, room_name: str, sender: str, image_data: bytes, filename: str, timestamp: datetime, event_id: str): """Add an image message to the queue (base64 encoded)""" import base64 state = self.load() if 'matrix_messages' not in state: state['matrix_messages'] = [] # Encode image to base64 image_b64 = base64.b64encode(image_data).decode('utf-8') state['matrix_messages'].append({ 'type': 'image', 'room_id': room_id, 'room_name': room_name, 'sender': sender, 'filename': filename, 'image_data': image_b64, 'timestamp': timestamp.isoformat(), 'event_id': event_id, 'processed': False, }) self.save(state) logger.info(f"Queued image from {sender} in {room_name}: {filename}") def add_audio(self, room_id: str, room_name: str, sender: str, audio_data: bytes, filename: str, timestamp: datetime, event_id: str): """Add an audio/voice message to the queue (base64 encoded)""" import base64 state = self.load() if 'matrix_messages' not in state: state['matrix_messages'] = [] # Encode audio to base64 audio_b64 = base64.b64encode(audio_data).decode('utf-8') state['matrix_messages'].append({ 'type': 'audio', 'room_id': room_id, 'room_name': room_name, 'sender': sender, 'filename': filename, 'audio_data': audio_b64, 'timestamp': timestamp.isoformat(), 'event_id': event_id, 'processed': False, }) self.save(state) logger.info(f"Queued audio from {sender} in {room_name}: {filename}") def add_invite(self, room_id: str, room_name: str, inviter: str, timestamp: datetime): """Add a room invite to the queue""" state = self.load() if 'matrix_invites' not in state: state['matrix_invites'] = [] # Check if invite already queued for invite in state['matrix_invites']: if invite['room_id'] == room_id and not invite.get('processed', False): logger.debug(f"Invite to {room_id} already queued") return state['matrix_invites'].append({ 'room_id': room_id, 'room_name': room_name, 'inviter': inviter, 'timestamp': timestamp.isoformat(), 'processed': False, }) self.save(state) logger.info(f"Queued invite to {room_name} from {inviter}") def add_file(self, room_id: str, room_name: str, sender: str, filename: str, filesize: int, mimetype: str, mxc_url: str, timestamp: datetime, event_id: str): """Add a file message to the queue""" state = self.load() if 'matrix_files' not in state: state['matrix_files'] = [] state['matrix_files'].append({ 'room_id': room_id, 'room_name': room_name, 'sender': sender, 'filename': filename, 'filesize': filesize, 'mimetype': mimetype, 'mxc_url': mxc_url, 'timestamp': timestamp.isoformat(), 'event_id': event_id, 'processed': False, }) self.save(state) logger.info(f"Queued file from {sender} in {room_name}: {filename}") def should_wake_for_matrix(self) -> bool: """Check if enough time has passed since last Matrix wake""" state = self.load() last_wake_str = state.get('matrix_last_wake') if not last_wake_str: return True try: last_wake = datetime.fromisoformat(last_wake_str) elapsed = (datetime.now() - last_wake).total_seconds() return elapsed >= MIN_MATRIX_WAKE_INTERVAL_SECONDS except ValueError: return True def set_matrix_wake(self): """Record that we triggered a Matrix wake""" state = self.load() state['matrix_last_wake'] = datetime.now().isoformat() self.save(state) logger.info("Set matrix_last_wake timestamp") def _default_state(self) -> Dict[str, Any]: """Default state structure""" return { 'interval_minutes': 60, 'paused': False, 'last_wake': None, 'next_wake_timestamp': None, 'matrix_messages': [], 'matrix_invites': [], 'matrix_files': [], 'matrix_last_wake': None, 'matrix_wake_requested': False, } class MatrixMonitor: """ Monitors Matrix rooms for messages and images. Integrates with automation_daemon.py by: 1. Queuing messages to shared state file 2. Setting matrix_wake_requested flag for daemon to trigger Claude 3. Respecting 2-minute rate limit between wakes """ def __init__(self, credentials_file: Path = CREDENTIALS_FILE, state_file: Path = STATE_FILE): self.credentials_file = credentials_file self.state_file = state_file self.creds_manager = MatrixCredentials(credentials_file) self.state_manager = MatrixState(state_file) self.client: Optional[AsyncClient] = None self.shutdown_flag = False self.initial_sync_done = False # Skip queueing messages until initial sync completes # Load credentials self.creds = self.creds_manager.load() self.room_whitelist = set(self.creds.get('room_whitelist', [])) # Setup Matrix client self._setup_client() def _setup_client(self): """Initialize Matrix client with credentials""" # Create data directory for E2EE store MATRIX_DATA_DIR.mkdir(exist_ok=True) self.client = AsyncClient( homeserver=self.creds['homeserver'], user=self.creds['user_id'], store_path=str(MATRIX_DATA_DIR), ) # Restore session from credentials self.client.access_token = self.creds['access_token'] self.client.device_id = self.creds['device_id'] self.client.user_id = self.creds['user_id'] # CRITICAL: Set user_id for session restore # Register event callbacks self.client.add_event_callback(self._on_message_text, RoomMessageText) self.client.add_event_callback(self._on_message_image, RoomMessageImage) self.client.add_event_callback(self._on_message_audio, RoomMessageAudio) self.client.add_event_callback(self._on_message_file, RoomMessageFile) self.client.add_event_callback(self._on_invite, InviteEvent) logger.info(f"Matrix client configured for {self.creds['user_id']}") async def _on_message_text(self, room: MatrixRoom, event: RoomMessageText): """Handle incoming text messages""" # Skip messages from ourselves logger.debug(f"Message from {event.sender}, bot user_id={self.client.user_id}, match={event.sender == self.client.user_id}") if event.sender == self.client.user_id: logger.info(f"✓ Skipping own message: {event.body[:30]}... (sender={event.sender})") return # Skip historical messages during initial sync if not self.initial_sync_done: logger.debug(f"Skipping historical message during initial sync: {event.body[:30]}...") return # Check room whitelist if self.room_whitelist and room.room_id not in self.room_whitelist: logger.debug(f"Ignoring message from non-whitelisted room: {room.display_name}") return logger.info(f"New message in {room.display_name} from {event.sender}: {event.body[:50]}...") # Queue message self.state_manager.add_message( room_id=room.room_id, room_name=room.display_name or room.room_id, sender=event.sender, message=event.body, timestamp=datetime.fromtimestamp(event.server_timestamp / 1000), event_id=event.event_id, ) # Trigger wake if rate limit allows await self._maybe_trigger_wake() async def _on_message_image(self, room: MatrixRoom, event: RoomMessageImage): """Handle incoming image messages""" # Skip messages from ourselves logger.debug(f"Image from {event.sender}, bot user_id={self.client.user_id}, match={event.sender == self.client.user_id}") if event.sender == self.client.user_id: logger.info(f"✓ Skipping own image message (sender={event.sender})") return # Skip historical messages during initial sync if not self.initial_sync_done: logger.debug(f"Skipping historical image during initial sync") return # Check room whitelist if self.room_whitelist and room.room_id not in self.room_whitelist: logger.debug(f"Ignoring image from non-whitelisted room: {room.display_name}") return logger.info(f"New image in {room.display_name} from {event.sender}: {event.body}") # Download and compress image try: image_data = await self._download_and_compress_image(event.url) if image_data: self.state_manager.add_image( room_id=room.room_id, room_name=room.display_name or room.room_id, sender=event.sender, image_data=image_data, filename=event.body, timestamp=datetime.fromtimestamp(event.server_timestamp / 1000), event_id=event.event_id, ) # Trigger wake if rate limit allows await self._maybe_trigger_wake() else: logger.warning(f"Image rejected (too large or download failed): {event.body}") except Exception as e: logger.error(f"Error processing image: {e}") async def _on_message_audio(self, room: MatrixRoom, event: RoomMessageAudio): """Handle incoming audio/voice messages""" # Skip messages from ourselves logger.debug(f"Audio from {event.sender}, bot user_id={self.client.user_id}") if event.sender == self.client.user_id: logger.info(f"✓ Skipping own audio message (sender={event.sender})") return # Skip historical messages during initial sync if not self.initial_sync_done: logger.debug(f"Skipping historical audio during initial sync") return # Check room whitelist if self.room_whitelist and room.room_id not in self.room_whitelist: logger.debug(f"Ignoring audio from non-whitelisted room: {room.display_name}") return logger.info(f"New audio in {room.display_name} from {event.sender}: {event.body}") # Download audio file try: response = await self.client.download(event.url) if isinstance(response, DownloadError): logger.error(f"Audio download failed: {response.message}") return # Queue audio message (similar to image, but type="audio") self.state_manager.add_audio( room_id=room.room_id, room_name=room.display_name or room.room_id, sender=event.sender, audio_data=response.body, filename=event.body, timestamp=datetime.fromtimestamp(event.server_timestamp / 1000), event_id=event.event_id, ) # Trigger wake if rate limit allows await self._maybe_trigger_wake() except Exception as e: logger.error(f"Error processing audio: {e}") async def _download_and_compress_image(self, mxc_url: str) -> Optional[bytes]: """ Download and compress Matrix image to fit Claude's 1MB limit. Reuses compression logic from image-watch-mcp. """ try: # Download image from Matrix response = await self.client.download(mxc_url) if isinstance(response, DownloadError): logger.error(f"Download failed: {response.message}") return None # Compress image image_bytes = await self._smart_compress(response.body) return image_bytes except Exception as e: logger.error(f"Failed to download image from {mxc_url}: {e}") return None async def _smart_compress(self, image_data: bytes, target_size_mb: float = MAX_IMAGE_SIZE_MB) -> bytes: """ Compress image to fit under target size with quality adaptation. Args: image_data: Raw image bytes target_size_mb: Target size in MB for base64-encoded output (default 1.0MB) Returns: Compressed JPEG bytes that will be under target when base64 encoded NOTE: We check the ACTUAL base64 size, not an estimate. """ try: import base64 # Load image img = Image.open(BytesIO(image_data)) # Convert RGBA to RGB if needed if img.mode == 'RGBA': background = Image.new('RGB', img.size, (255, 255, 255)) background.paste(img, mask=img.split()[3]) img = background elif img.mode not in ('RGB', 'L'): img = img.convert('RGB') # Try progressive quality reduction for quality in [85, 75, 65, 55]: buffer = BytesIO() img.save(buffer, format='JPEG', quality=quality, optimize=True) img_bytes = buffer.getvalue() # Check ACTUAL base64 size instead of estimating base64_encoded = base64.b64encode(img_bytes) base64_size_mb = len(base64_encoded) / (1024 * 1024) raw_size_mb = len(img_bytes) / (1024 * 1024) if base64_size_mb <= target_size_mb: logger.info( f"✓ Compressed to {img.width}x{img.height} @ quality {quality}: " f"{raw_size_mb:.2f}MB raw → {base64_size_mb:.2f}MB base64 (actual)" ) return img_bytes # If still too large, resize logger.warning(f"Quality reduction insufficient, resizing from {img.width}x{img.height}") for scale in [0.75, 0.5, 0.25]: new_width = int(img.width * scale) new_height = int(img.height * scale) resized = img.resize((new_width, new_height), Image.Resampling.LANCZOS) buffer = BytesIO() resized.save(buffer, format='JPEG', quality=75, optimize=True) img_bytes = buffer.getvalue() # Check ACTUAL base64 size instead of estimating base64_encoded = base64.b64encode(img_bytes) base64_size_mb = len(base64_encoded) / (1024 * 1024) raw_size_mb = len(img_bytes) / (1024 * 1024) if base64_size_mb <= target_size_mb: logger.info( f"✓ Resized to {new_width}x{new_height}: " f"{raw_size_mb:.2f}MB raw → {base64_size_mb:.2f}MB base64 (actual)" ) return img_bytes # Last resort: use heavily compressed version logger.warning("Could not compress to target, using minimum quality") buffer = BytesIO() final_img = img.resize((800, 600), Image.Resampling.LANCZOS) final_img.save(buffer, format='JPEG', quality=50, optimize=True) final_data = buffer.getvalue() # Final size validation - reject if still too large final_base64 = base64.b64encode(final_data) final_size_mb = len(final_base64) / (1024 * 1024) if final_size_mb > target_size_mb: logger.error( f"Image still too large after all compression attempts: " f"{final_size_mb:.2f}MB base64 (limit: {target_size_mb}MB)" ) return None return final_data except Exception as e: logger.error(f"Compression failed: {e}") raise async def _on_message_file(self, room: MatrixRoom, event: RoomMessageFile): """Handle incoming file messages""" # Skip messages from ourselves logger.debug(f"File from {event.sender}, bot user_id={self.client.user_id}, match={event.sender == self.client.user_id}") if event.sender == self.client.user_id: logger.info(f"✓ Skipping own file message (sender={event.sender})") return # Skip historical messages during initial sync if not self.initial_sync_done: logger.debug(f"Skipping historical file during initial sync") return # Check room whitelist if self.room_whitelist and room.room_id not in self.room_whitelist: logger.debug(f"Ignoring file from non-whitelisted room: {room.display_name}") return logger.info(f"New file in {room.display_name} from {event.sender}: {event.body}") # Queue file metadata (don't download yet) self.state_manager.add_file( room_id=room.room_id, room_name=room.display_name or room.room_id, sender=event.sender, filename=event.body, filesize=event.source.get('content', {}).get('info', {}).get('size', 0), mimetype=event.source.get('content', {}).get('info', {}).get('mimetype', 'application/octet-stream'), mxc_url=event.url, timestamp=datetime.fromtimestamp(event.server_timestamp / 1000), event_id=event.event_id, ) # Trigger wake if rate limit allows await self._maybe_trigger_wake() async def _on_invite(self, room: MatrixRoom, event: InviteEvent): """Handle room invites""" # Skip historical invites during initial sync if not self.initial_sync_done: logger.debug(f"Skipping historical invite during initial sync") return logger.info(f"Received invite to {room.display_name or room.room_id} from {event.sender}") # Queue invite - InviteEvent doesn't have server_timestamp, use current time self.state_manager.add_invite( room_id=room.room_id, room_name=room.display_name or room.room_id, inviter=event.sender, timestamp=datetime.now(), ) # Trigger wake if rate limit allows await self._maybe_trigger_wake() async def _maybe_trigger_wake(self): """Trigger Claude wake if rate limit allows""" if self.state_manager.should_wake_for_matrix(): logger.info("Rate limit OK, triggering Claude wake for Matrix activity") # Set typing indicators in rooms with unprocessed messages await self._set_typing_indicators() self.state_manager.set_matrix_wake() # Set flag that automation_daemon.py will check state = self.state_manager.load() state['matrix_wake_requested'] = True self.state_manager.save(state) else: logger.info("Rate limit active, queuing activity without immediate wake") async def _set_typing_indicators(self): """Set typing indicators in rooms with unprocessed messages""" try: state = self.state_manager.load() # Get all rooms with unprocessed activity rooms_with_activity = set() # Check messages for msg in state.get('matrix_messages', []): if not msg.get('processed', False): rooms_with_activity.add(msg['room_id']) # Check invites for inv in state.get('matrix_invites', []): if not inv.get('processed', False): rooms_with_activity.add(inv['room_id']) # Check files for file in state.get('matrix_files', []): if not file.get('processed', False): rooms_with_activity.add(file['room_id']) # Set typing indicator for each room (30 seconds) for room_id in rooms_with_activity: try: await self.client.room_typing(room_id, typing_state=True, timeout=30000) logger.debug(f"Set typing indicator in {room_id}") except Exception as e: logger.warning(f"Failed to set typing in {room_id}: {e}") if rooms_with_activity: logger.info(f"Set typing indicators in {len(rooms_with_activity)} rooms") except Exception as e: logger.error(f"Failed to set typing indicators: {e}") async def sync_loop(self): """Main sync loop - runs continuously""" logger.info("=" * 60) logger.info("Matrix Monitor Starting") logger.info(f"User: {self.creds['user_id']}") logger.info(f"Homeserver: {self.creds['homeserver']}") logger.info(f"Room whitelist: {self.room_whitelist or 'All rooms'}") logger.info("=" * 60) try: # Initial sync logger.info("Performing initial sync (historical messages will be ignored)...") await self.client.sync(timeout=30000, full_state=True) self.initial_sync_done = True # Now start queueing new messages logger.info(f"Initial sync complete - now monitoring for NEW messages only") logger.info(f"Bot user ID: {self.client.user_id} - messages from this user will be SKIPPED") # Continuous sync loop while not self.shutdown_flag: try: response = await self.client.sync(timeout=30000) if isinstance(response, SyncResponse): # Sync successful, callbacks already fired # Check if we have queued messages that can now trigger a wake # (in case messages arrived during cooldown period) state = self.state_manager.load() unprocessed_messages = [m for m in state.get('matrix_messages', []) if not m.get('processed', False)] unprocessed_invites = [i for i in state.get('matrix_invites', []) if not i.get('processed', False)] if (unprocessed_messages or unprocessed_invites) and not state.get('matrix_wake_requested', False): # We have queued items and no wake pending, check if rate limit allows wake now await self._maybe_trigger_wake() else: logger.warning(f"Sync returned unexpected type: {type(response)}") except Exception as e: logger.error(f"Error in sync loop: {e}") await asyncio.sleep(10) # Back off on errors except Exception as e: logger.exception(f"Fatal error in sync loop: {e}") finally: logger.info("Matrix monitor shutting down") await self.client.close() def shutdown(self): """Request graceful shutdown""" logger.info("Shutdown requested") self.shutdown_flag = True async def main(): """Main entry point for testing""" monitor = MatrixMonitor() try: await monitor.sync_loop() except KeyboardInterrupt: logger.info("Interrupted by user") monitor.shutdown() if __name__ == "__main__": asyncio.run(main())