Files
vi/core/nats_event_bus.py
Alex Kazaiev e2d24a66f1 Add core service infrastructure
- NATS event bus (pub/sub, JetStream, KV storage)
- Service registry with health monitoring
- Base service class with lifecycle management
- Config system
- Logger with Vi formatting

Adapted from Lyra's patterns, namespace changed to vi.*

🦊💕
2026-01-02 13:04:26 -06:00

210 lines
7.2 KiB
Python

"""
NATS Event Bus for Vi
Provides pub/sub messaging, JetStream durable queues, and KV storage.
"""
import json
from nats.aio.client import Client as NATS
from nats.errors import TimeoutError as NatsTimeoutError
from .config import config
from .logger import logger
class NatsEventBus:
def __init__(self):
self.nc = NATS()
self.subscriptions = {}
self._connected = False
self._js = None
async def connect(self, url=None):
if self._connected:
return
url = url or config.nats_url
logger.info(f"[NATS] Connecting to {url}")
try:
await self.nc.connect(
servers=[url],
connect_timeout=config.nats_connection_timeout,
max_reconnect_attempts=config.nats_max_reconnect_attempts,
)
self._connected = True
self._js = self.nc.jetstream()
logger.info("[NATS] Connected successfully")
except Exception as e:
logger.error(f"[NATS] Connection failed: {e}")
raise
async def emit(self, topic, payload):
if not self._connected:
logger.error("[NATS] Cannot emit - EventBus is not connected")
raise RuntimeError("EventBus is not connected")
try:
await self.nc.publish(topic, json.dumps(payload).encode())
logger.debug(f"[NATS] Emitted to {topic}")
except Exception as e:
logger.error(f"[NATS] Failed to emit to {topic}: {e}")
raise
async def on(self, topic, handler):
if not self._connected:
logger.error("[NATS] Cannot subscribe - EventBus is not connected")
raise RuntimeError("EventBus is not connected")
async def wrapper(msg):
try:
data = msg.data.decode()
logger.debug(f"[NATS] Received message on {topic}")
except Exception as e:
logger.error(f"[NATS] Failed to decode message on {topic}: {e}")
return
try:
await handler(msg)
except Exception as e:
logger.error(f"[NATS] Handler error for {topic}: {e}")
try:
sub = await self.nc.subscribe(topic, cb=wrapper)
self.subscriptions[topic] = sub
logger.info(f"[NATS] Subscribed to {topic}")
except Exception as e:
logger.error(f"[NATS] Failed to subscribe to {topic}: {e}")
raise
async def off(self, topic):
sub = self.subscriptions.pop(topic, None)
if sub:
try:
await sub.unsubscribe()
logger.info(f"[NATS] Unsubscribed from {topic}")
except Exception as e:
logger.error(f"[NATS] Failed to unsubscribe from {topic}: {e}")
async def jetstream_pull(self, stream_name, consumer_name, timeout=1.0):
"""Pull next message from JetStream consumer"""
if not self._connected:
raise RuntimeError("EventBus is not connected")
if not self._js:
raise RuntimeError("JetStream not initialized")
try:
sub = await self._js.pull_subscribe(
subject="",
durable=consumer_name,
stream=stream_name,
)
msgs = await sub.fetch(batch=1, timeout=timeout)
return msgs[0] if msgs else None
except NatsTimeoutError:
return None
except Exception as e:
logger.error(f"[NATS] JetStream pull error: {e}")
raise
async def kv_get_bucket(self, bucket_name: str, ttl_seconds: int = 1800):
"""Get or create a NATS KV bucket"""
if not self._connected:
raise RuntimeError("EventBus is not connected")
if not self._js:
raise RuntimeError("JetStream not initialized")
try:
kv = await self._js.key_value(bucket=bucket_name)
logger.debug(f"[NATS KV] Using existing bucket: {bucket_name}")
return kv
except Exception:
try:
kv = await self._js.create_key_value(
bucket=bucket_name,
ttl=ttl_seconds
)
logger.info(f"[NATS KV] Created bucket: {bucket_name} (TTL: {ttl_seconds}s)")
return kv
except Exception as e:
logger.error(f"[NATS KV] Failed to create bucket {bucket_name}: {e}")
raise
async def kv_put(self, bucket_name: str, key: str, value: bytes, ttl_seconds: int = 1800):
"""Put a value in a KV bucket"""
try:
kv = await self.kv_get_bucket(bucket_name, ttl_seconds)
await kv.put(key, value)
logger.debug(f"[NATS KV] Put: {bucket_name}/{key}")
except Exception as e:
logger.error(f"[NATS KV] Put error for {bucket_name}/{key}: {e}")
raise
async def kv_get(self, bucket_name: str, key: str):
"""Get a value from a KV bucket"""
try:
kv = await self.kv_get_bucket(bucket_name)
entry = await kv.get(key)
logger.debug(f"[NATS KV] Get: {bucket_name}/{key}")
return entry.value if entry else None
except Exception as e:
logger.error(f"[NATS KV] Get error for {bucket_name}/{key}: {e}")
return None
async def kv_keys(self, bucket_name: str, filter_prefix: str = None):
"""List keys in a KV bucket"""
try:
kv = await self.kv_get_bucket(bucket_name)
keys = await kv.keys()
if filter_prefix:
keys = [k for k in keys if k.startswith(filter_prefix)]
logger.debug(f"[NATS KV] Listed {len(keys)} keys from {bucket_name}")
return keys
except Exception as e:
logger.error(f"[NATS KV] Keys error for {bucket_name}: {e}")
return []
async def kv_delete(self, bucket_name: str, key: str):
"""Delete a key from a KV bucket"""
try:
kv = await self.kv_get_bucket(bucket_name)
await kv.delete(key)
logger.debug(f"[NATS KV] Deleted: {bucket_name}/{key}")
except Exception as e:
logger.error(f"[NATS KV] Delete error for {bucket_name}/{key}: {e}")
raise
async def close(self):
if not self._connected:
return
logger.info("[NATS] Closing connection")
try:
await self.nc.drain()
self._connected = False
self.subscriptions.clear()
self._js = None
logger.info("[NATS] Connection closed successfully")
except Exception as e:
logger.error(f"[NATS] Error closing connection: {e}")
@property
def is_connected(self):
return self._connected
@property
def client(self):
"""Access to underlying NATS client for request-reply operations"""
if not self._connected:
raise RuntimeError("EventBus is not connected - cannot access NATS client")
return self.nc
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
nats_bus = NatsEventBus()