""" NATS Event Bus for Vi Provides pub/sub messaging, JetStream durable queues, and KV storage. """ import json from nats.aio.client import Client as NATS from nats.errors import TimeoutError as NatsTimeoutError from .config import config from .logger import logger class NatsEventBus: def __init__(self): self.nc = NATS() self.subscriptions = {} self._connected = False self._js = None async def connect(self, url=None): if self._connected: return url = url or config.nats_url logger.info(f"[NATS] Connecting to {url}") try: await self.nc.connect( servers=[url], connect_timeout=config.nats_connection_timeout, max_reconnect_attempts=config.nats_max_reconnect_attempts, ) self._connected = True self._js = self.nc.jetstream() logger.info("[NATS] Connected successfully") except Exception as e: logger.error(f"[NATS] Connection failed: {e}") raise async def emit(self, topic, payload): if not self._connected: logger.error("[NATS] Cannot emit - EventBus is not connected") raise RuntimeError("EventBus is not connected") try: await self.nc.publish(topic, json.dumps(payload).encode()) logger.debug(f"[NATS] Emitted to {topic}") except Exception as e: logger.error(f"[NATS] Failed to emit to {topic}: {e}") raise async def on(self, topic, handler): if not self._connected: logger.error("[NATS] Cannot subscribe - EventBus is not connected") raise RuntimeError("EventBus is not connected") async def wrapper(msg): try: data = msg.data.decode() logger.debug(f"[NATS] Received message on {topic}") except Exception as e: logger.error(f"[NATS] Failed to decode message on {topic}: {e}") return try: await handler(msg) except Exception as e: logger.error(f"[NATS] Handler error for {topic}: {e}") try: sub = await self.nc.subscribe(topic, cb=wrapper) self.subscriptions[topic] = sub logger.info(f"[NATS] Subscribed to {topic}") except Exception as e: logger.error(f"[NATS] Failed to subscribe to {topic}: {e}") raise async def off(self, topic): sub = self.subscriptions.pop(topic, None) if sub: try: await sub.unsubscribe() logger.info(f"[NATS] Unsubscribed from {topic}") except Exception as e: logger.error(f"[NATS] Failed to unsubscribe from {topic}: {e}") async def jetstream_pull(self, stream_name, consumer_name, timeout=1.0): """Pull next message from JetStream consumer""" if not self._connected: raise RuntimeError("EventBus is not connected") if not self._js: raise RuntimeError("JetStream not initialized") try: sub = await self._js.pull_subscribe( subject="", durable=consumer_name, stream=stream_name, ) msgs = await sub.fetch(batch=1, timeout=timeout) return msgs[0] if msgs else None except NatsTimeoutError: return None except Exception as e: logger.error(f"[NATS] JetStream pull error: {e}") raise async def kv_get_bucket(self, bucket_name: str, ttl_seconds: int = 1800): """Get or create a NATS KV bucket""" if not self._connected: raise RuntimeError("EventBus is not connected") if not self._js: raise RuntimeError("JetStream not initialized") try: kv = await self._js.key_value(bucket=bucket_name) logger.debug(f"[NATS KV] Using existing bucket: {bucket_name}") return kv except Exception: try: kv = await self._js.create_key_value( bucket=bucket_name, ttl=ttl_seconds ) logger.info(f"[NATS KV] Created bucket: {bucket_name} (TTL: {ttl_seconds}s)") return kv except Exception as e: logger.error(f"[NATS KV] Failed to create bucket {bucket_name}: {e}") raise async def kv_put(self, bucket_name: str, key: str, value: bytes, ttl_seconds: int = 1800): """Put a value in a KV bucket""" try: kv = await self.kv_get_bucket(bucket_name, ttl_seconds) await kv.put(key, value) logger.debug(f"[NATS KV] Put: {bucket_name}/{key}") except Exception as e: logger.error(f"[NATS KV] Put error for {bucket_name}/{key}: {e}") raise async def kv_get(self, bucket_name: str, key: str): """Get a value from a KV bucket""" try: kv = await self.kv_get_bucket(bucket_name) entry = await kv.get(key) logger.debug(f"[NATS KV] Get: {bucket_name}/{key}") return entry.value if entry else None except Exception as e: logger.error(f"[NATS KV] Get error for {bucket_name}/{key}: {e}") return None async def kv_keys(self, bucket_name: str, filter_prefix: str = None): """List keys in a KV bucket""" try: kv = await self.kv_get_bucket(bucket_name) keys = await kv.keys() if filter_prefix: keys = [k for k in keys if k.startswith(filter_prefix)] logger.debug(f"[NATS KV] Listed {len(keys)} keys from {bucket_name}") return keys except Exception as e: logger.error(f"[NATS KV] Keys error for {bucket_name}: {e}") return [] async def kv_delete(self, bucket_name: str, key: str): """Delete a key from a KV bucket""" try: kv = await self.kv_get_bucket(bucket_name) await kv.delete(key) logger.debug(f"[NATS KV] Deleted: {bucket_name}/{key}") except Exception as e: logger.error(f"[NATS KV] Delete error for {bucket_name}/{key}: {e}") raise async def close(self): if not self._connected: return logger.info("[NATS] Closing connection") try: await self.nc.drain() self._connected = False self.subscriptions.clear() self._js = None logger.info("[NATS] Connection closed successfully") except Exception as e: logger.error(f"[NATS] Error closing connection: {e}") @property def is_connected(self): return self._connected @property def client(self): """Access to underlying NATS client for request-reply operations""" if not self._connected: raise RuntimeError("EventBus is not connected - cannot access NATS client") return self.nc async def __aenter__(self): await self.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() nats_bus = NatsEventBus()