""" Recent Event Cache using NATS KV Provides fast access to recent conversation events without querying Memory service. Events are stored in NATS KV with automatic TTL-based expiration. """ import json from datetime import datetime, timezone from typing import List, Dict, Any, Optional from dataclasses import dataclass, asdict from .logger import setup_logger from .nats_event_bus import nats_bus logger = setup_logger('event_cache') @dataclass class CachedEvent: """Represents a single cached event""" event_id: str timestamp: str # ISO 8601 format identity: str interaction_id: str event_type: str # 'user_message', 'vi_response', 'service_call' content: str metadata: Dict[str, Any] def to_dict(self) -> Dict[str, Any]: """Convert to dictionary""" return asdict(self) @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'CachedEvent': """Create from dictionary""" return cls(**data) def to_natural_language(self) -> str: """Convert event to natural language description""" event_time = datetime.fromisoformat(self.timestamp.replace('Z', '+00:00')) now = datetime.now(timezone.utc) diff = now - event_time if diff.total_seconds() < 60: time_ago = "just now" elif diff.total_seconds() < 3600: mins = int(diff.total_seconds() / 60) time_ago = f"{mins} minute{'s' if mins > 1 else ''} ago" else: hours = int(diff.total_seconds() / 3600) time_ago = f"{hours} hour{'s' if hours > 1 else ''} ago" if self.event_type == 'user_message': return f"[{time_ago}] {self.identity}: {self.content}" elif self.event_type == 'vi_response': return f"[{time_ago}] Vi: {self.content}" elif self.event_type == 'service_call': service = self.metadata.get('service', 'unknown') result = self.metadata.get('success', False) status = "✓" if result else "✗" return f"[{time_ago}] {status} Called {service}: {self.content}" else: return f"[{time_ago}] {self.event_type}: {self.content}" class RecentEventCache: """Manages recent event cache in NATS KV""" def __init__(self, bucket_name: str = "vi-recent-events", ttl_seconds: int = 1800): self.bucket_name = bucket_name self.ttl_seconds = ttl_seconds def _make_key(self, identity: str, timestamp: str, seq: int) -> str: """Generate KV key for event""" sanitized_timestamp = timestamp.replace(':', '-').replace('+00:00', 'Z').replace('+', '-') return f"event.{identity}.{sanitized_timestamp}.{seq:04d}" async def add_event( self, identity: str, interaction_id: str, event_type: str, content: str, metadata: Optional[Dict[str, Any]] = None ) -> str: """Add an event to the cache""" timestamp = datetime.now(timezone.utc).isoformat() event_id = f"{identity}_{int(datetime.now(timezone.utc).timestamp() * 1000)}" seq = await self._get_next_seq(identity, timestamp) event = CachedEvent( event_id=event_id, timestamp=timestamp, identity=identity, interaction_id=interaction_id, event_type=event_type, content=content, metadata=metadata or {} ) key = self._make_key(identity, timestamp, seq) value = json.dumps(event.to_dict()).encode() await nats_bus.kv_put(self.bucket_name, key, value, self.ttl_seconds) logger.debug(f"[Event Cache] Added {event_type} for {identity}: {key}") return event_id async def _get_next_seq(self, identity: str, timestamp: str) -> int: """Get next sequence number for this identity/timestamp""" sanitized_timestamp = timestamp.replace(':', '-').replace('+00:00', 'Z').replace('+', '-') prefix = f"event.{identity}.{sanitized_timestamp}." keys = await nats_bus.kv_keys(self.bucket_name, filter_prefix=prefix) return len(keys) async def get_recent_events( self, identity: str, limit: int = 10 ) -> List[CachedEvent]: """Get recent events for identity""" prefix = f"event.{identity}." keys = await nats_bus.kv_keys(self.bucket_name, filter_prefix=prefix) if not keys: logger.debug(f"[Event Cache] No events found for {identity}") return [] keys.sort(reverse=True) keys = keys[:limit] events = [] for key in keys: value = await nats_bus.kv_get(self.bucket_name, key) if value: try: data = json.loads(value.decode()) event = CachedEvent.from_dict(data) events.append(event) except Exception as e: logger.error(f"[Event Cache] Error parsing event {key}: {e}") logger.debug(f"[Event Cache] Retrieved {len(events)} events for {identity}") return events async def format_for_llm( self, identity: str, limit: int = 10 ) -> str: """Get recent events formatted for LLM context""" events = await self.get_recent_events(identity, limit) if not events: return "" lines = ["## Recent Conversation Context"] for event in reversed(events): lines.append(event.to_natural_language()) return "\n".join(lines) async def clear_for_identity(self, identity: str): """Clear all cached events for an identity""" prefix = f"event.{identity}." keys = await nats_bus.kv_keys(self.bucket_name, filter_prefix=prefix) for key in keys: await nats_bus.kv_delete(self.bucket_name, key) logger.info(f"[Event Cache] Cleared {len(keys)} events for {identity}") # Singleton instance event_cache = RecentEventCache()