- NATS event bus (pub/sub, JetStream, KV storage) - Service registry with health monitoring - Base service class with lifecycle management - Config system - Logger with Vi formatting Adapted from Lyra's patterns, namespace changed to vi.* 🦊💕
210 lines
7.2 KiB
Python
210 lines
7.2 KiB
Python
"""
|
|
NATS Event Bus for Vi
|
|
|
|
Provides pub/sub messaging, JetStream durable queues, and KV storage.
|
|
"""
|
|
|
|
import json
|
|
from nats.aio.client import Client as NATS
|
|
from nats.errors import TimeoutError as NatsTimeoutError
|
|
from .config import config
|
|
from .logger import logger
|
|
|
|
|
|
class NatsEventBus:
|
|
def __init__(self):
|
|
self.nc = NATS()
|
|
self.subscriptions = {}
|
|
self._connected = False
|
|
self._js = None
|
|
|
|
async def connect(self, url=None):
|
|
if self._connected:
|
|
return
|
|
|
|
url = url or config.nats_url
|
|
logger.info(f"[NATS] Connecting to {url}")
|
|
|
|
try:
|
|
await self.nc.connect(
|
|
servers=[url],
|
|
connect_timeout=config.nats_connection_timeout,
|
|
max_reconnect_attempts=config.nats_max_reconnect_attempts,
|
|
)
|
|
self._connected = True
|
|
self._js = self.nc.jetstream()
|
|
logger.info("[NATS] Connected successfully")
|
|
except Exception as e:
|
|
logger.error(f"[NATS] Connection failed: {e}")
|
|
raise
|
|
|
|
async def emit(self, topic, payload):
|
|
if not self._connected:
|
|
logger.error("[NATS] Cannot emit - EventBus is not connected")
|
|
raise RuntimeError("EventBus is not connected")
|
|
|
|
try:
|
|
await self.nc.publish(topic, json.dumps(payload).encode())
|
|
logger.debug(f"[NATS] Emitted to {topic}")
|
|
except Exception as e:
|
|
logger.error(f"[NATS] Failed to emit to {topic}: {e}")
|
|
raise
|
|
|
|
async def on(self, topic, handler):
|
|
if not self._connected:
|
|
logger.error("[NATS] Cannot subscribe - EventBus is not connected")
|
|
raise RuntimeError("EventBus is not connected")
|
|
|
|
async def wrapper(msg):
|
|
try:
|
|
data = msg.data.decode()
|
|
logger.debug(f"[NATS] Received message on {topic}")
|
|
except Exception as e:
|
|
logger.error(f"[NATS] Failed to decode message on {topic}: {e}")
|
|
return
|
|
|
|
try:
|
|
await handler(msg)
|
|
except Exception as e:
|
|
logger.error(f"[NATS] Handler error for {topic}: {e}")
|
|
|
|
try:
|
|
sub = await self.nc.subscribe(topic, cb=wrapper)
|
|
self.subscriptions[topic] = sub
|
|
logger.info(f"[NATS] Subscribed to {topic}")
|
|
except Exception as e:
|
|
logger.error(f"[NATS] Failed to subscribe to {topic}: {e}")
|
|
raise
|
|
|
|
async def off(self, topic):
|
|
sub = self.subscriptions.pop(topic, None)
|
|
if sub:
|
|
try:
|
|
await sub.unsubscribe()
|
|
logger.info(f"[NATS] Unsubscribed from {topic}")
|
|
except Exception as e:
|
|
logger.error(f"[NATS] Failed to unsubscribe from {topic}: {e}")
|
|
|
|
async def jetstream_pull(self, stream_name, consumer_name, timeout=1.0):
|
|
"""Pull next message from JetStream consumer"""
|
|
if not self._connected:
|
|
raise RuntimeError("EventBus is not connected")
|
|
if not self._js:
|
|
raise RuntimeError("JetStream not initialized")
|
|
|
|
try:
|
|
sub = await self._js.pull_subscribe(
|
|
subject="",
|
|
durable=consumer_name,
|
|
stream=stream_name,
|
|
)
|
|
msgs = await sub.fetch(batch=1, timeout=timeout)
|
|
return msgs[0] if msgs else None
|
|
except NatsTimeoutError:
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"[NATS] JetStream pull error: {e}")
|
|
raise
|
|
|
|
async def kv_get_bucket(self, bucket_name: str, ttl_seconds: int = 1800):
|
|
"""Get or create a NATS KV bucket"""
|
|
if not self._connected:
|
|
raise RuntimeError("EventBus is not connected")
|
|
if not self._js:
|
|
raise RuntimeError("JetStream not initialized")
|
|
|
|
try:
|
|
kv = await self._js.key_value(bucket=bucket_name)
|
|
logger.debug(f"[NATS KV] Using existing bucket: {bucket_name}")
|
|
return kv
|
|
except Exception:
|
|
try:
|
|
kv = await self._js.create_key_value(
|
|
bucket=bucket_name,
|
|
ttl=ttl_seconds
|
|
)
|
|
logger.info(f"[NATS KV] Created bucket: {bucket_name} (TTL: {ttl_seconds}s)")
|
|
return kv
|
|
except Exception as e:
|
|
logger.error(f"[NATS KV] Failed to create bucket {bucket_name}: {e}")
|
|
raise
|
|
|
|
async def kv_put(self, bucket_name: str, key: str, value: bytes, ttl_seconds: int = 1800):
|
|
"""Put a value in a KV bucket"""
|
|
try:
|
|
kv = await self.kv_get_bucket(bucket_name, ttl_seconds)
|
|
await kv.put(key, value)
|
|
logger.debug(f"[NATS KV] Put: {bucket_name}/{key}")
|
|
except Exception as e:
|
|
logger.error(f"[NATS KV] Put error for {bucket_name}/{key}: {e}")
|
|
raise
|
|
|
|
async def kv_get(self, bucket_name: str, key: str):
|
|
"""Get a value from a KV bucket"""
|
|
try:
|
|
kv = await self.kv_get_bucket(bucket_name)
|
|
entry = await kv.get(key)
|
|
logger.debug(f"[NATS KV] Get: {bucket_name}/{key}")
|
|
return entry.value if entry else None
|
|
except Exception as e:
|
|
logger.error(f"[NATS KV] Get error for {bucket_name}/{key}: {e}")
|
|
return None
|
|
|
|
async def kv_keys(self, bucket_name: str, filter_prefix: str = None):
|
|
"""List keys in a KV bucket"""
|
|
try:
|
|
kv = await self.kv_get_bucket(bucket_name)
|
|
keys = await kv.keys()
|
|
if filter_prefix:
|
|
keys = [k for k in keys if k.startswith(filter_prefix)]
|
|
logger.debug(f"[NATS KV] Listed {len(keys)} keys from {bucket_name}")
|
|
return keys
|
|
except Exception as e:
|
|
logger.error(f"[NATS KV] Keys error for {bucket_name}: {e}")
|
|
return []
|
|
|
|
async def kv_delete(self, bucket_name: str, key: str):
|
|
"""Delete a key from a KV bucket"""
|
|
try:
|
|
kv = await self.kv_get_bucket(bucket_name)
|
|
await kv.delete(key)
|
|
logger.debug(f"[NATS KV] Deleted: {bucket_name}/{key}")
|
|
except Exception as e:
|
|
logger.error(f"[NATS KV] Delete error for {bucket_name}/{key}: {e}")
|
|
raise
|
|
|
|
async def close(self):
|
|
if not self._connected:
|
|
return
|
|
|
|
logger.info("[NATS] Closing connection")
|
|
try:
|
|
await self.nc.drain()
|
|
self._connected = False
|
|
self.subscriptions.clear()
|
|
self._js = None
|
|
logger.info("[NATS] Connection closed successfully")
|
|
except Exception as e:
|
|
logger.error(f"[NATS] Error closing connection: {e}")
|
|
|
|
@property
|
|
def is_connected(self):
|
|
return self._connected
|
|
|
|
@property
|
|
def client(self):
|
|
"""Access to underlying NATS client for request-reply operations"""
|
|
if not self._connected:
|
|
raise RuntimeError("EventBus is not connected - cannot access NATS client")
|
|
return self.nc
|
|
|
|
async def __aenter__(self):
|
|
await self.connect()
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
await self.close()
|
|
|
|
|
|
nats_bus = NatsEventBus()
|