From e2d24a66f159894f6c6126b5396a6f297f9930ee Mon Sep 17 00:00:00 2001 From: Alex Kazaiev Date: Fri, 2 Jan 2026 13:04:26 -0600 Subject: [PATCH] Add core service infrastructure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - NATS event bus (pub/sub, JetStream, KV storage) - Service registry with health monitoring - Base service class with lifecycle management - Config system - Logger with Vi formatting Adapted from Lyra's patterns, namespace changed to vi.* ๐ŸฆŠ๐Ÿ’• --- .gitignore | 30 +++ config/config.json | 11 + core/__init__.py | 31 +++ core/base_service.py | 397 ++++++++++++++++++++++++++++++++ core/config.py | 94 ++++++++ core/logger.py | 73 ++++++ core/nats_event_bus.py | 209 +++++++++++++++++ core/service_registry.py | 476 +++++++++++++++++++++++++++++++++++++++ 8 files changed, 1321 insertions(+) create mode 100644 .gitignore create mode 100644 config/config.json create mode 100644 core/__init__.py create mode 100644 core/base_service.py create mode 100644 core/config.py create mode 100644 core/logger.py create mode 100644 core/nats_event_bus.py create mode 100644 core/service_registry.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..473cba1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,30 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +.venv/ +venv/ +ENV/ + +# Logs +logs/ +*.log + +# Data +data/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# OS +.DS_Store +Thumbs.db + +# Secrets +*.env +!.env.example diff --git a/config/config.json b/config/config.json new file mode 100644 index 0000000..98b05ab --- /dev/null +++ b/config/config.json @@ -0,0 +1,11 @@ +{ + "nats": { + "url": "nats://daemoness.local:4222", + "connection_timeout": 30, + "max_reconnect_attempts": 10 + }, + "services": { + "heartbeat_timeout": 60, + "health_check_interval": 30 + } +} diff --git a/core/__init__.py b/core/__init__.py new file mode 100644 index 0000000..35034a8 --- /dev/null +++ b/core/__init__.py @@ -0,0 +1,31 @@ +# Vi Core Module +# Nervous system infrastructure for Vivienne Rousseau + +from .config import config +from .logger import setup_logger, logger +from .nats_event_bus import NatsEventBus, nats_bus +from .service_registry import ( + ServiceRegistry, + ServiceManifest, + ServiceOperation, + ServiceInstance, + ServiceStatus, + service_registry +) +from .base_service import BaseService, SimpleService + +__all__ = [ + 'config', + 'setup_logger', + 'logger', + 'NatsEventBus', + 'nats_bus', + 'ServiceRegistry', + 'ServiceManifest', + 'ServiceOperation', + 'ServiceInstance', + 'ServiceStatus', + 'service_registry', + 'BaseService', + 'SimpleService', +] diff --git a/core/base_service.py b/core/base_service.py new file mode 100644 index 0000000..9df1b39 --- /dev/null +++ b/core/base_service.py @@ -0,0 +1,397 @@ +""" +Base Service Class for Vi + +Provides standardized service lifecycle management, registration, and health monitoring. +All Vi services should inherit from this base class. +""" + +import asyncio +import json +import time +import uuid +from abc import ABC, abstractmethod +from typing import Dict, Any, List, Optional +from datetime import datetime + +from .logger import setup_logger +from .service_registry import ServiceManifest, ServiceOperation, ServiceStatus, service_registry + +logger = setup_logger('base_service') + + +class BaseService(ABC): + """Base class for all Vi services providing standardized lifecycle management""" + + def __init__(self, service_id: str, event_bus=None): + self.service_id = service_id + self.event_bus = event_bus + self.instance_id = f"{service_id}-{uuid.uuid4().hex[:8]}" + + self._running = False + self._heartbeat_task = None + self._health_check_task = None + + self.heartbeat_interval = 60 + self.health_check_interval = 10 + + self._health_data = {} + self._status = ServiceStatus.UNKNOWN + + self._heartbeat_failures = 0 + self._max_heartbeat_failures = 3 + self._registration_confirmed = False + self._first_heartbeat = True + + self.logger = setup_logger(service_id, service_name=service_id) + + @abstractmethod + def get_service_manifest(self) -> ServiceManifest: + """Return service manifest with operations and metadata""" + pass + + @abstractmethod + async def initialize_service(self): + """Initialize service-specific resources""" + pass + + @abstractmethod + async def cleanup_service(self): + """Cleanup service-specific resources""" + pass + + @abstractmethod + async def perform_health_check(self) -> Dict[str, Any]: + """Perform service-specific health check""" + pass + + async def start(self, event_bus=None): + """Start the service with full lifecycle management""" + if event_bus: + self.event_bus = event_bus + + if not self.event_bus: + raise ValueError("Event bus is required") + + try: + self.logger.info(f"[๐Ÿš€] Starting service: {self.service_id}") + + if self.service_id == 'health': + await service_registry.initialize(self.event_bus) + + manifest = self.get_service_manifest() + if self.service_id == 'health': + service_registry.register_service(self.service_id, manifest, self.instance_id) + self._registration_confirmed = True + else: + await self._send_registration_message(manifest) + + await self.initialize_service() + + self._running = True + self._heartbeat_task = asyncio.create_task(self._heartbeat_loop()) + self._health_check_task = asyncio.create_task(self._health_check_loop()) + + self._status = ServiceStatus.HEALTHY + await self._send_heartbeat() + + self.logger.info(f"[โœ…] Service started: {self.service_id}") + + except Exception as e: + self.logger.exception(f"[โŒ] Failed to start {self.service_id}: {e}") + self._status = ServiceStatus.UNHEALTHY + raise + + async def stop(self): + """Stop the service gracefully""" + self.logger.info(f"[๐Ÿ›‘] Stopping service: {self.service_id}") + + self._running = False + self._status = ServiceStatus.OFFLINE + + if self._heartbeat_task: + self._heartbeat_task.cancel() + try: + await self._heartbeat_task + except asyncio.CancelledError: + pass + + if self._health_check_task: + self._health_check_task.cancel() + try: + await self._health_check_task + except asyncio.CancelledError: + pass + + try: + await self.cleanup_service() + await self._send_heartbeat() + + if self.service_id == 'health': + service_registry.deregister_service(self.service_id) + else: + await self._send_deregistration_message() + + self.logger.info(f"[โœ…] Service stopped: {self.service_id}") + + except Exception as e: + self.logger.exception(f"[โŒ] Error during shutdown: {e}") + + async def _heartbeat_loop(self): + """Background task to send periodic heartbeats""" + while self._running: + try: + await self._send_heartbeat() + await asyncio.sleep(self.heartbeat_interval) + except asyncio.CancelledError: + break + except Exception as e: + self.logger.exception(f"[๐Ÿ’”] Heartbeat error: {e}") + await asyncio.sleep(5) + + async def _health_check_loop(self): + """Background task for periodic health checks""" + while self._running: + try: + health_data = await self.perform_health_check() + self._health_data = health_data + + if health_data.get('healthy', True): + if self._status == ServiceStatus.UNHEALTHY: + self._status = ServiceStatus.HEALTHY + self.logger.info(f"[๐Ÿ’š] Service recovered") + else: + if self._status == ServiceStatus.HEALTHY: + self._status = ServiceStatus.UNHEALTHY + self.logger.warning(f"[๐Ÿ’”] Service unhealthy") + + await asyncio.sleep(self.health_check_interval) + except asyncio.CancelledError: + break + except Exception as e: + self.logger.exception(f"[๐Ÿ’”] Health check error: {e}") + self._status = ServiceStatus.UNHEALTHY + await asyncio.sleep(10) + + async def _send_heartbeat(self): + """Send heartbeat to service registry""" + try: + health_data = { + 'status': self._status.value, + 'timestamp': datetime.utcnow().isoformat(), + 'instance_id': self.instance_id, + **self._health_data + } + + if self.service_id == 'health': + service_registry.update_service_heartbeat(self.service_id, health_data) + self._heartbeat_failures = 0 + else: + if self.event_bus: + await self._send_resilient_heartbeat(health_data) + + except Exception as e: + self.logger.exception(f"[๐Ÿ’”] Failed to send heartbeat: {e}") + self._heartbeat_failures += 1 + if self._heartbeat_failures >= self._max_heartbeat_failures: + await self._attempt_reregistration() + + async def request_service(self, target_service: str, operation: str, + payload: Dict[str, Any], timeout: float = 5.0) -> Dict[str, Any]: + """Make a request to another service""" + topic = f"vi.services.{target_service}.{operation}" + + try: + request_data = json.dumps(payload).encode() + response_msg = await self.event_bus.client.request(topic, request_data, timeout=timeout) + return json.loads(response_msg.data.decode()) + + except Exception as e: + self.logger.exception(f"[๐Ÿ”—] Request failed {target_service}.{operation}: {e}") + raise + + async def emit_event(self, event_type: str, payload: Dict[str, Any]): + """Emit an event using standardized topic naming""" + if not self.event_bus: + raise ValueError("Event bus not available") + await self.event_bus.emit(event_type, payload) + + def register_handler(self, operation: str, handler): + """Register a request-reply handler for a service operation""" + if not self.event_bus: + raise ValueError("Event bus not available") + + async def wrapped_handler(msg): + try: + result = await handler(msg) + if result is not None: + await msg.respond(json.dumps(result).encode()) + except Exception as e: + error_response = {"error": str(e), "status": "error"} + await msg.respond(json.dumps(error_response).encode()) + self.logger.error(f"Handler error for {operation}: {e}") + + topic = f"vi.services.{self.service_id}.{operation}" + return self.event_bus.on(topic, wrapped_handler) + + def create_service_operation(self, operation_id: str, description: str, + request_topic: Optional[str] = None, + response_pattern: str = "request-reply", + timeout_ms: int = 5000, + parameters: Optional[List[Dict[str, Any]]] = None, + metadata: Optional[Dict[str, Any]] = None) -> ServiceOperation: + """Helper to create a ServiceOperation""" + if request_topic is None: + request_topic = f"vi.services.{self.service_id}.{operation_id}" + + return ServiceOperation( + operation_id=operation_id, + description=description, + request_topic=request_topic, + response_pattern=response_pattern, + parameters=parameters or [], + timeout_ms=timeout_ms, + metadata=metadata or {} + ) + + def get_status(self) -> ServiceStatus: + return self._status + + def set_status(self, status: ServiceStatus): + if self._status != status: + self.logger.info(f"[๐Ÿ“Š] Status: {self._status.value} โ†’ {status.value}") + self._status = status + + def update_health_data(self, health_data: Dict[str, Any]): + self._health_data.update(health_data) + + def get_health_data(self) -> Dict[str, Any]: + return self._health_data.copy() + + def get_service_info(self) -> Dict[str, Any]: + manifest = self.get_service_manifest() + return { + 'service_id': self.service_id, + 'instance_id': self.instance_id, + 'status': self._status.value, + 'manifest': manifest.__dict__, + 'health_data': self._health_data, + 'running': self._running + } + + async def _send_registration_message(self, manifest: ServiceManifest): + """Send registration message via NATS""" + if not self.event_bus: + return + + registration_payload = { + 'service_id': self.service_id, + 'instance_id': self.instance_id, + 'manifest': { + 'service_id': manifest.service_id, + 'name': manifest.name, + 'description': manifest.description, + 'version': manifest.version, + 'operations': [op.__dict__ for op in manifest.operations], + 'health_check_topic': manifest.health_check_topic, + 'metadata': manifest.metadata + } + } + + await self.event_bus.emit("vi.services.register", registration_payload) + self.logger.info(f"[๐Ÿ—‚๏ธ] Registered: {self.service_id}") + + async def _send_deregistration_message(self): + """Send deregistration message via NATS""" + if not self.event_bus: + return + + await self.event_bus.emit("vi.services.deregister", { + 'service_id': self.service_id, + 'instance_id': self.instance_id + }) + self.logger.info(f"[๐Ÿ—‚๏ธ] Deregistered: {self.service_id}") + + async def _send_resilient_heartbeat(self, health_data: Dict[str, Any]): + """Send heartbeat with acknowledgment""" + try: + heartbeat_payload = { + 'service_id': self.service_id, + 'instance_id': self.instance_id, + 'health_data': health_data + } + + request_data = json.dumps(heartbeat_payload).encode() + response_msg = await self.event_bus.client.request( + "vi.services.heartbeat", + request_data, + timeout=5.0 + ) + + response = json.loads(response_msg.data.decode()) + acknowledged = response.get('acknowledged', False) + + if acknowledged: + self._heartbeat_failures = 0 + if not self._registration_confirmed: + self._registration_confirmed = True + self.logger.info(f"[โœ…] Registration confirmed") + else: + self._registration_confirmed = False + await self._attempt_reregistration() + + except Exception as e: + self._heartbeat_failures += 1 + self.logger.warning(f"[๐Ÿ’”] Heartbeat failed ({self._heartbeat_failures}): {e}") + + if self._heartbeat_failures >= self._max_heartbeat_failures: + await self._attempt_reregistration() + + async def _attempt_reregistration(self): + """Attempt to re-register service""" + self.logger.warning(f"[๐Ÿ”„] Re-registering {self.service_id}") + + try: + self._heartbeat_failures = 0 + manifest = self.get_service_manifest() + await self._send_registration_message(manifest) + self.logger.info(f"[โœ…] Re-registered {self.service_id}") + except Exception as e: + self.logger.error(f"[โŒ] Re-registration failed: {e}") + + +class SimpleService(BaseService): + """Simple implementation of BaseService""" + + def __init__(self, service_id: str, name: str = "", description: str = "", + version: str = "1.0.0", operations: Optional[List[ServiceOperation]] = None, + event_bus=None): + super().__init__(service_id, event_bus) + self._name = name or service_id + self._description = description or f"Service: {service_id}" + self._version = version + self._operations = operations or [] + + def get_service_manifest(self) -> ServiceManifest: + return ServiceManifest( + service_id=self.service_id, + name=self._name, + description=self._description, + version=self._version, + operations=self._operations, + health_check_topic=f"vi.services.{self.service_id}.health" + ) + + async def initialize_service(self): + pass + + async def cleanup_service(self): + pass + + async def perform_health_check(self) -> Dict[str, Any]: + return { + 'healthy': True, + 'checks': { + 'running': self._running, + 'event_bus': self.event_bus is not None + } + } diff --git a/core/config.py b/core/config.py new file mode 100644 index 0000000..ca1ccc1 --- /dev/null +++ b/core/config.py @@ -0,0 +1,94 @@ +import json +import os +from pathlib import Path +from typing import Dict, Any + + +class Config: + _instance = None + _config_data = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + if self._config_data is None: + self._load_config() + + def _load_config(self): + config_path = os.path.join(os.path.dirname(__file__), '..', 'config', 'config.json') + config_path = os.path.abspath(config_path) + + try: + with open(config_path, 'r') as f: + self._config_data = json.load(f) + except FileNotFoundError: + # Default config if file doesn't exist + self._config_data = { + 'nats': { + 'url': 'nats://localhost:4222', + 'connection_timeout': 30, + 'max_reconnect_attempts': 10 + } + } + except json.JSONDecodeError as e: + raise ValueError(f"Invalid JSON in configuration file: {e}") + + def get(self, key: str, default: Any = None) -> Any: + keys = key.split('.') + value = self._config_data + + for k in keys: + if isinstance(value, dict) and k in value: + value = value[k] + else: + return default + + return value + + @property + def nats_url(self) -> str: + return os.getenv('NATS_URL') or self.get('nats.url', 'nats://localhost:4222') + + @property + def nats_connection_timeout(self) -> int: + return self.get('nats.connection_timeout', 30) + + @property + def nats_max_reconnect_attempts(self) -> int: + return self.get('nats.max_reconnect_attempts', 10) + + def get_local_services(self) -> list: + """Get list of services that should run on this node""" + systemd_services = self.get('auto_upgrade.systemd_services', []) + local_services = [] + for systemd_service in systemd_services: + service_name = systemd_service.replace('vi-', '').replace('.service', '') + local_services.append(service_name) + return local_services + + def is_service_enabled(self, service_name: str) -> bool: + """Check if a service is enabled on this node""" + return service_name in self.get_local_services() + + @property + def project_root(self) -> Path: + """Get the project root directory""" + return Path(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + + @property + def data_dir(self) -> Path: + """Get the data directory for databases""" + data_path = os.getenv('DATA_DIR', str(self.project_root / 'data')) + data_dir = Path(data_path) + data_dir.mkdir(exist_ok=True, parents=True) + return data_dir + + +config = Config() + +# Database paths +SHORT_TERM_DB = config.data_dir / 'memory_active.db' +LONG_TERM_DB = config.data_dir / 'memory_archive.db' diff --git a/core/logger.py b/core/logger.py new file mode 100644 index 0000000..4370c83 --- /dev/null +++ b/core/logger.py @@ -0,0 +1,73 @@ +import logging +import sys +import os +from datetime import datetime +from logging.handlers import TimedRotatingFileHandler + + +class ViFormatter(logging.Formatter): + """Custom formatter for Vi services with symbolic level indicators""" + def format(self, record): + timestamp = datetime.now().strftime('%H:%M:%S') + level_map = { + 'DEBUG': 'ยท', + 'INFO': 'โœ“', + 'WARNING': 'โš ', + 'ERROR': 'โœ—', + 'CRITICAL': 'โ˜ ' + } + level_symbol = level_map.get(record.levelname, '?') + return f"[{timestamp}] {level_symbol} {record.getMessage()}" + + +def _get_project_root(): + """Find the project root directory by looking for core/ directory""" + current_dir = os.path.dirname(os.path.abspath(__file__)) + return os.path.dirname(current_dir) + + +def setup_logger(name="vi", level=logging.INFO, service_name=None): + """Set up a logger with console and optional file output""" + # Allow environment variable to override log level + env_level = os.getenv('LOG_LEVEL', '').upper() + if env_level in ['DEBUG', 'INFO', 'WARNING', 'ERROR']: + level = getattr(logging, env_level) + + logger = logging.getLogger(name) + + if logger.handlers: + return logger + + logger.setLevel(level) + + # Console handler + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setFormatter(ViFormatter()) + logger.addHandler(console_handler) + + # File handler with daily rotation if service_name is specified + if service_name: + project_root = _get_project_root() + logs_dir = os.path.join(project_root, 'logs') + + if not os.path.exists(logs_dir): + os.makedirs(logs_dir, exist_ok=True) + + log_file = os.path.join(logs_dir, f"{service_name}.log") + + file_handler = TimedRotatingFileHandler( + log_file, + when='midnight', + interval=1, + backupCount=30, + encoding='utf-8' + ) + file_handler.setFormatter(ViFormatter()) + logger.addHandler(file_handler) + + logger.propagate = False + + return logger + + +logger = setup_logger() diff --git a/core/nats_event_bus.py b/core/nats_event_bus.py new file mode 100644 index 0000000..c58bc68 --- /dev/null +++ b/core/nats_event_bus.py @@ -0,0 +1,209 @@ +""" +NATS Event Bus for Vi + +Provides pub/sub messaging, JetStream durable queues, and KV storage. +""" + +import json +from nats.aio.client import Client as NATS +from nats.errors import TimeoutError as NatsTimeoutError +from .config import config +from .logger import logger + + +class NatsEventBus: + def __init__(self): + self.nc = NATS() + self.subscriptions = {} + self._connected = False + self._js = None + + async def connect(self, url=None): + if self._connected: + return + + url = url or config.nats_url + logger.info(f"[NATS] Connecting to {url}") + + try: + await self.nc.connect( + servers=[url], + connect_timeout=config.nats_connection_timeout, + max_reconnect_attempts=config.nats_max_reconnect_attempts, + ) + self._connected = True + self._js = self.nc.jetstream() + logger.info("[NATS] Connected successfully") + except Exception as e: + logger.error(f"[NATS] Connection failed: {e}") + raise + + async def emit(self, topic, payload): + if not self._connected: + logger.error("[NATS] Cannot emit - EventBus is not connected") + raise RuntimeError("EventBus is not connected") + + try: + await self.nc.publish(topic, json.dumps(payload).encode()) + logger.debug(f"[NATS] Emitted to {topic}") + except Exception as e: + logger.error(f"[NATS] Failed to emit to {topic}: {e}") + raise + + async def on(self, topic, handler): + if not self._connected: + logger.error("[NATS] Cannot subscribe - EventBus is not connected") + raise RuntimeError("EventBus is not connected") + + async def wrapper(msg): + try: + data = msg.data.decode() + logger.debug(f"[NATS] Received message on {topic}") + except Exception as e: + logger.error(f"[NATS] Failed to decode message on {topic}: {e}") + return + + try: + await handler(msg) + except Exception as e: + logger.error(f"[NATS] Handler error for {topic}: {e}") + + try: + sub = await self.nc.subscribe(topic, cb=wrapper) + self.subscriptions[topic] = sub + logger.info(f"[NATS] Subscribed to {topic}") + except Exception as e: + logger.error(f"[NATS] Failed to subscribe to {topic}: {e}") + raise + + async def off(self, topic): + sub = self.subscriptions.pop(topic, None) + if sub: + try: + await sub.unsubscribe() + logger.info(f"[NATS] Unsubscribed from {topic}") + except Exception as e: + logger.error(f"[NATS] Failed to unsubscribe from {topic}: {e}") + + async def jetstream_pull(self, stream_name, consumer_name, timeout=1.0): + """Pull next message from JetStream consumer""" + if not self._connected: + raise RuntimeError("EventBus is not connected") + if not self._js: + raise RuntimeError("JetStream not initialized") + + try: + sub = await self._js.pull_subscribe( + subject="", + durable=consumer_name, + stream=stream_name, + ) + msgs = await sub.fetch(batch=1, timeout=timeout) + return msgs[0] if msgs else None + except NatsTimeoutError: + return None + except Exception as e: + logger.error(f"[NATS] JetStream pull error: {e}") + raise + + async def kv_get_bucket(self, bucket_name: str, ttl_seconds: int = 1800): + """Get or create a NATS KV bucket""" + if not self._connected: + raise RuntimeError("EventBus is not connected") + if not self._js: + raise RuntimeError("JetStream not initialized") + + try: + kv = await self._js.key_value(bucket=bucket_name) + logger.debug(f"[NATS KV] Using existing bucket: {bucket_name}") + return kv + except Exception: + try: + kv = await self._js.create_key_value( + bucket=bucket_name, + ttl=ttl_seconds + ) + logger.info(f"[NATS KV] Created bucket: {bucket_name} (TTL: {ttl_seconds}s)") + return kv + except Exception as e: + logger.error(f"[NATS KV] Failed to create bucket {bucket_name}: {e}") + raise + + async def kv_put(self, bucket_name: str, key: str, value: bytes, ttl_seconds: int = 1800): + """Put a value in a KV bucket""" + try: + kv = await self.kv_get_bucket(bucket_name, ttl_seconds) + await kv.put(key, value) + logger.debug(f"[NATS KV] Put: {bucket_name}/{key}") + except Exception as e: + logger.error(f"[NATS KV] Put error for {bucket_name}/{key}: {e}") + raise + + async def kv_get(self, bucket_name: str, key: str): + """Get a value from a KV bucket""" + try: + kv = await self.kv_get_bucket(bucket_name) + entry = await kv.get(key) + logger.debug(f"[NATS KV] Get: {bucket_name}/{key}") + return entry.value if entry else None + except Exception as e: + logger.error(f"[NATS KV] Get error for {bucket_name}/{key}: {e}") + return None + + async def kv_keys(self, bucket_name: str, filter_prefix: str = None): + """List keys in a KV bucket""" + try: + kv = await self.kv_get_bucket(bucket_name) + keys = await kv.keys() + if filter_prefix: + keys = [k for k in keys if k.startswith(filter_prefix)] + logger.debug(f"[NATS KV] Listed {len(keys)} keys from {bucket_name}") + return keys + except Exception as e: + logger.error(f"[NATS KV] Keys error for {bucket_name}: {e}") + return [] + + async def kv_delete(self, bucket_name: str, key: str): + """Delete a key from a KV bucket""" + try: + kv = await self.kv_get_bucket(bucket_name) + await kv.delete(key) + logger.debug(f"[NATS KV] Deleted: {bucket_name}/{key}") + except Exception as e: + logger.error(f"[NATS KV] Delete error for {bucket_name}/{key}: {e}") + raise + + async def close(self): + if not self._connected: + return + + logger.info("[NATS] Closing connection") + try: + await self.nc.drain() + self._connected = False + self.subscriptions.clear() + self._js = None + logger.info("[NATS] Connection closed successfully") + except Exception as e: + logger.error(f"[NATS] Error closing connection: {e}") + + @property + def is_connected(self): + return self._connected + + @property + def client(self): + """Access to underlying NATS client for request-reply operations""" + if not self._connected: + raise RuntimeError("EventBus is not connected - cannot access NATS client") + return self.nc + + async def __aenter__(self): + await self.connect() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close() + + +nats_bus = NatsEventBus() diff --git a/core/service_registry.py b/core/service_registry.py new file mode 100644 index 0000000..4a8b6ab --- /dev/null +++ b/core/service_registry.py @@ -0,0 +1,476 @@ +""" +Service Registry System for Vi + +Manages service lifecycle, discovery, and health monitoring. +""" + +import asyncio +import json +import time +from typing import Dict, Any, List, Optional, Set +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum + +from .logger import setup_logger + +logger = setup_logger('service_registry') + + +class ServiceStatus(Enum): + """Service status enumeration""" + UNKNOWN = "unknown" + REGISTERING = "registering" + HEALTHY = "healthy" + DEGRADED = "degraded" + UNHEALTHY = "unhealthy" + OFFLINE = "offline" + + +@dataclass +class ServiceOperation: + """Represents a service operation/endpoint""" + operation_id: str + description: str + request_topic: str + response_pattern: str = "request-reply" + parameters: List[Dict[str, Any]] = field(default_factory=list) + timeout_ms: int = 5000 + metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class ServiceManifest: + """Service manifest containing metadata and capabilities""" + service_id: str + name: str = "" + description: str = "" + version: str = "1.0.0" + namespace: str = "vi" + operations: List[ServiceOperation] = field(default_factory=list) + dependencies: List[str] = field(default_factory=list) + health_check_topic: str = "" + heartbeat_interval: int = 30 + metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class ServiceInstance: + """Represents a specific instance of a service""" + service_id: str + instance_id: str + manifest: ServiceManifest + status: ServiceStatus = ServiceStatus.UNKNOWN + last_heartbeat: datetime = field(default_factory=datetime.utcnow) + last_health_check: datetime = field(default_factory=datetime.utcnow) + health_data: Dict[str, Any] = field(default_factory=dict) + registered_at: datetime = field(default_factory=datetime.utcnow) + metadata: Dict[str, Any] = field(default_factory=dict) + + +class ServiceRegistry: + """Central service registry managing all service discovery and health monitoring""" + + def __init__(self, heartbeat_timeout: int = 60, health_check_interval: int = 30): + self.services: Dict[str, ServiceInstance] = {} + self.service_operations: Dict[str, ServiceOperation] = {} + self.service_dependencies: Dict[str, Set[str]] = {} + self.heartbeat_timeout = heartbeat_timeout + self.health_check_interval = health_check_interval + self.event_bus = None + self._health_monitor_task = None + self._running = False + + async def initialize(self, event_bus): + """Initialize the service registry with event bus""" + if self._running: + logger.info("[๐Ÿ—‚๏ธ] Re-initializing service registry") + await self.shutdown() + + self.event_bus = event_bus + await self._register_handlers() + self._running = True + self._health_monitor_task = asyncio.create_task(self._health_monitor_loop()) + logger.info("[๐Ÿ—‚๏ธ] Service registry initialized") + + async def shutdown(self): + """Shutdown the service registry""" + self._running = False + + if self._health_monitor_task: + self._health_monitor_task.cancel() + try: + await self._health_monitor_task + except asyncio.CancelledError: + pass + + if self.event_bus: + try: + await self.event_bus.off("vi.services.register") + await self.event_bus.off("vi.services.deregister") + await self.event_bus.off("vi.services.heartbeat") + await self.event_bus.off("vi.services.discover") + await self.event_bus.off("vi.services.list") + await self.event_bus.off("vi.services.health") + except Exception as e: + logger.debug(f"[๐Ÿ—‚๏ธ] Handler cleanup: {e}") + + logger.info("[๐Ÿ—‚๏ธ] Service registry shutdown") + + async def _register_handlers(self): + """Register NATS handlers for service registry operations""" + if not self.event_bus: + return + + await self.event_bus.on("vi.services.register", self._handle_service_register) + await self.event_bus.on("vi.services.deregister", self._handle_service_deregister) + await self.event_bus.on("vi.services.heartbeat", self._handle_service_heartbeat) + await self.event_bus.on("vi.services.discover", self._handle_service_discover) + await self.event_bus.on("vi.services.list", self._handle_service_list) + await self.event_bus.on("vi.services.health", self._handle_health_request) + + logger.debug("[๐Ÿ—‚๏ธ] Service registry handlers registered") + + def _serialize_manifest(self, manifest: ServiceManifest) -> Dict[str, Any]: + """Serialize ServiceManifest to JSON-compatible dictionary""" + return { + 'service_id': manifest.service_id, + 'name': manifest.name, + 'description': manifest.description, + 'version': manifest.version, + 'namespace': manifest.namespace, + 'operations': [op.__dict__ for op in manifest.operations], + 'dependencies': manifest.dependencies, + 'health_check_topic': manifest.health_check_topic, + 'heartbeat_interval': manifest.heartbeat_interval, + 'metadata': manifest.metadata + } + + def register_service(self, service_id: str, manifest: ServiceManifest, + instance_id: Optional[str] = None) -> str: + """Register a service with the registry""" + if instance_id is None: + instance_id = f"{service_id}-{int(time.time())}" + + instance = ServiceInstance( + service_id=service_id, + instance_id=instance_id, + manifest=manifest, + status=ServiceStatus.REGISTERING + ) + + self.services[service_id] = instance + + for operation in manifest.operations: + operation_key = f"{service_id}.{operation.operation_id}" + self.service_operations[operation_key] = operation + + if manifest.dependencies: + self.service_dependencies[service_id] = set(manifest.dependencies) + + logger.info(f"[๐Ÿ—‚๏ธ] Registered service: {service_id} (instance: {instance_id})") + return instance_id + + def deregister_service(self, service_id: str) -> bool: + """Deregister a service from the registry""" + instance = self.services.pop(service_id, None) + if not instance: + logger.warning(f"[๐Ÿ—‚๏ธ] Service {service_id} not found") + return False + + operations_to_remove = [ + op_key for op_key in self.service_operations.keys() + if op_key.startswith(f"{service_id}.") + ] + for op_key in operations_to_remove: + self.service_operations.pop(op_key, None) + + self.service_dependencies.pop(service_id, None) + logger.info(f"[๐Ÿ—‚๏ธ] Deregistered service: {service_id}") + return True + + def update_service_heartbeat(self, service_id: str, health_data: Optional[Dict[str, Any]] = None) -> bool: + """Update service heartbeat and health data""" + instance = self.services.get(service_id) + if not instance: + return False + + instance.last_heartbeat = datetime.utcnow() + if health_data: + instance.health_data = health_data + + if health_data and 'status' in health_data: + try: + instance.status = ServiceStatus(health_data['status']) + except ValueError: + instance.status = ServiceStatus.UNKNOWN + elif instance.status == ServiceStatus.REGISTERING: + instance.status = ServiceStatus.HEALTHY + + return True + + def get_service(self, service_id: str) -> Optional[ServiceInstance]: + """Get service instance by ID""" + return self.services.get(service_id) + + def find_service_operation(self, service_id: str, operation_id: str) -> Optional[ServiceOperation]: + """Find a specific service operation""" + return self.service_operations.get(f"{service_id}.{operation_id}") + + def list_services(self, status_filter: Optional[ServiceStatus] = None) -> List[ServiceInstance]: + """List all services, optionally filtered by status""" + services = list(self.services.values()) + if status_filter: + services = [s for s in services if s.status == status_filter] + return services + + def list_service_operations(self, service_id: Optional[str] = None) -> List[ServiceOperation]: + """List all service operations""" + operations = list(self.service_operations.values()) + if service_id: + operations = [ + op for op_key, op in self.service_operations.items() + if op_key.startswith(f"{service_id}.") + ] + return operations + + def check_dependencies(self, service_id: str) -> Dict[str, bool]: + """Check if service dependencies are available""" + dependencies = self.service_dependencies.get(service_id, set()) + result = {} + for dep_service_id in dependencies: + dep_instance = self.services.get(dep_service_id) + result[dep_service_id] = ( + dep_instance is not None and + dep_instance.status in [ServiceStatus.HEALTHY, ServiceStatus.DEGRADED] + ) + return result + + async def _health_monitor_loop(self): + """Background task to monitor service health""" + while self._running: + try: + await self._check_service_health() + await asyncio.sleep(self.health_check_interval) + except asyncio.CancelledError: + break + except Exception as e: + logger.exception(f"[๐Ÿ—‚๏ธ] Health monitor error: {e}") + await asyncio.sleep(5) + + async def _check_service_health(self): + """Check health of all registered services""" + current_time = datetime.utcnow() + + for service_id, instance in self.services.items(): + heartbeat_age = (current_time - instance.last_heartbeat).total_seconds() + + if heartbeat_age > self.heartbeat_timeout: + if instance.status != ServiceStatus.OFFLINE: + logger.warning(f"[๐Ÿ—‚๏ธ] Service {service_id} heartbeat timeout ({heartbeat_age}s)") + instance.status = ServiceStatus.OFFLINE + + if self.event_bus: + await self.event_bus.emit("vi.services.health.status", { + "service_id": service_id, + "instance_id": instance.instance_id, + "status": instance.status.value, + "last_heartbeat": instance.last_heartbeat.isoformat(), + "health_data": instance.health_data + }) + + # NATS Event Handlers + + async def _handle_service_register(self, msg): + """Handle service registration requests""" + try: + if hasattr(msg, 'data'): + payload = json.loads(msg.data.decode()) + else: + payload = msg if isinstance(msg, dict) else json.loads(msg) + + service_id = payload.get('service_id') + manifest_data = payload.get('manifest', {}) + instance_id = payload.get('instance_id') + + if not service_id: + logger.error("[๐Ÿ—‚๏ธ] Registration error: Missing service_id") + return + + manifest_data.pop('service_id', None) + + operations_data = manifest_data.pop('operations', []) + operations = [] + for op_data in operations_data: + if isinstance(op_data, dict): + operation = ServiceOperation( + operation_id=op_data.get('operation_id', ''), + description=op_data.get('description', ''), + request_topic=op_data.get('request_topic', ''), + response_pattern=op_data.get('response_pattern', 'request-reply'), + parameters=op_data.get('parameters', []), + timeout_ms=op_data.get('timeout_ms', 5000), + metadata=op_data.get('metadata', {}) + ) + operations.append(operation) + else: + operations.append(op_data) + + manifest = ServiceManifest(service_id=service_id, operations=operations, **manifest_data) + actual_instance_id = self.register_service(service_id, manifest, instance_id) + logger.info(f"[๐Ÿ—‚๏ธ] โœ… Service registered: {service_id}") + + except Exception as e: + logger.exception(f"[๐Ÿ—‚๏ธ] Registration error: {e}") + + async def _handle_service_deregister(self, msg): + """Handle service deregistration requests""" + try: + if hasattr(msg, 'data'): + payload = json.loads(msg.data.decode()) + else: + payload = msg if isinstance(msg, dict) else json.loads(msg) + + service_id = payload.get('service_id') + if service_id: + self.deregister_service(service_id) + except Exception as e: + logger.exception(f"[๐Ÿ—‚๏ธ] Deregistration error: {e}") + + async def _handle_service_heartbeat(self, msg): + """Handle service heartbeat updates""" + try: + if hasattr(msg, 'data'): + payload = json.loads(msg.data.decode()) + else: + payload = msg if isinstance(msg, dict) else json.loads(msg) + + service_id = payload.get('service_id') + instance_id = payload.get('instance_id') + health_data = payload.get('health_data') + + if not service_id: + if hasattr(msg, 'respond'): + await msg.respond(json.dumps({'acknowledged': False}).encode()) + return + + success = self.update_service_heartbeat(service_id, health_data) + + response = { + 'acknowledged': success, + 'service_id': service_id, + 'instance_id': instance_id, + 'timestamp': datetime.utcnow().isoformat() + } + + if hasattr(msg, 'respond'): + await msg.respond(json.dumps(response).encode()) + + except Exception as e: + logger.exception(f"[๐Ÿ—‚๏ธ] Heartbeat error: {e}") + + async def _handle_service_discover(self, msg): + """Handle service discovery requests""" + try: + if hasattr(msg, 'data'): + payload = json.loads(msg.data.decode()) + else: + payload = msg if isinstance(msg, dict) else json.loads(msg) + + service_id = payload.get('service_id') + operation_id = payload.get('operation_id') + + if service_id and operation_id: + operation = self.find_service_operation(service_id, operation_id) + result = operation.__dict__ if operation and hasattr(operation, '__dict__') else operation + elif service_id: + instance = self.get_service(service_id) + result = { + 'service_id': instance.service_id, + 'instance_id': instance.instance_id, + 'status': instance.status.value, + 'manifest': self._serialize_manifest(instance.manifest), + 'last_heartbeat': instance.last_heartbeat.isoformat() + } if instance else None + else: + services = self.list_services() + result = [ + { + 'service_id': s.service_id, + 'status': s.status.value, + 'last_heartbeat': s.last_heartbeat.isoformat() + } + for s in services + ] + + await msg.respond(json.dumps({'result': result}).encode()) + + except Exception as e: + logger.exception(f"[๐Ÿ—‚๏ธ] Discovery error: {e}") + await msg.respond(json.dumps({'error': str(e)}).encode()) + + async def _handle_service_list(self, msg): + """Handle service listing requests""" + try: + if hasattr(msg, 'data'): + payload = json.loads(msg.data.decode()) + else: + payload = msg if isinstance(msg, dict) else json.loads(msg) + + status_filter = payload.get('status_filter') + status_enum = ServiceStatus(status_filter) if status_filter else None + + services = self.list_services(status_enum) + result = [ + { + 'service_id': s.service_id, + 'status': s.status.value, + 'name': s.manifest.name, + 'version': s.manifest.version, + 'last_heartbeat': s.last_heartbeat.isoformat() + } + for s in services + ] + + await msg.respond(json.dumps({'services': result}).encode()) + + except Exception as e: + logger.exception(f"[๐Ÿ—‚๏ธ] List error: {e}") + await msg.respond(json.dumps({'error': str(e)}).encode()) + + async def _handle_health_request(self, msg): + """Handle health status requests""" + try: + if hasattr(msg, 'data'): + payload = json.loads(msg.data.decode()) + else: + payload = msg if isinstance(msg, dict) else json.loads(msg) + + service_id = payload.get('service_id') + + if service_id: + instance = self.get_service(service_id) + result = { + 'service_id': service_id, + 'status': instance.status.value, + 'last_heartbeat': instance.last_heartbeat.isoformat(), + 'health_data': instance.health_data + } if instance else None + else: + result = { + sid: { + 'status': inst.status.value, + 'last_heartbeat': inst.last_heartbeat.isoformat(), + 'health_data': inst.health_data + } + for sid, inst in self.services.items() + } + + await msg.respond(json.dumps({'health': result}).encode()) + + except Exception as e: + logger.exception(f"[๐Ÿ—‚๏ธ] Health request error: {e}") + await msg.respond(json.dumps({'error': str(e)}).encode()) + + +service_registry = ServiceRegistry()