""" Service Registry System for Vi Manages service lifecycle, discovery, and health monitoring. """ import asyncio import json import time from typing import Dict, Any, List, Optional, Set from dataclasses import dataclass, field from datetime import datetime from enum import Enum from .logger import setup_logger logger = setup_logger('service_registry') class ServiceStatus(Enum): """Service status enumeration""" UNKNOWN = "unknown" REGISTERING = "registering" HEALTHY = "healthy" DEGRADED = "degraded" UNHEALTHY = "unhealthy" OFFLINE = "offline" @dataclass class ServiceOperation: """Represents a service operation/endpoint""" operation_id: str description: str request_topic: str response_pattern: str = "request-reply" parameters: List[Dict[str, Any]] = field(default_factory=list) timeout_ms: int = 5000 metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class ServiceManifest: """Service manifest containing metadata and capabilities""" service_id: str name: str = "" description: str = "" version: str = "1.0.0" namespace: str = "vi" operations: List[ServiceOperation] = field(default_factory=list) dependencies: List[str] = field(default_factory=list) health_check_topic: str = "" heartbeat_interval: int = 30 metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class ServiceInstance: """Represents a specific instance of a service""" service_id: str instance_id: str manifest: ServiceManifest status: ServiceStatus = ServiceStatus.UNKNOWN last_heartbeat: datetime = field(default_factory=datetime.utcnow) last_health_check: datetime = field(default_factory=datetime.utcnow) health_data: Dict[str, Any] = field(default_factory=dict) registered_at: datetime = field(default_factory=datetime.utcnow) metadata: Dict[str, Any] = field(default_factory=dict) class ServiceRegistry: """Central service registry managing all service discovery and health monitoring""" def __init__(self, heartbeat_timeout: int = 60, health_check_interval: int = 30): self.services: Dict[str, ServiceInstance] = {} self.service_operations: Dict[str, ServiceOperation] = {} self.service_dependencies: Dict[str, Set[str]] = {} self.heartbeat_timeout = heartbeat_timeout self.health_check_interval = health_check_interval self.event_bus = None self._health_monitor_task = None self._running = False async def initialize(self, event_bus): """Initialize the service registry with event bus""" if self._running: logger.info("[🗂️] Re-initializing service registry") await self.shutdown() self.event_bus = event_bus await self._register_handlers() self._running = True self._health_monitor_task = asyncio.create_task(self._health_monitor_loop()) logger.info("[🗂️] Service registry initialized") async def shutdown(self): """Shutdown the service registry""" self._running = False if self._health_monitor_task: self._health_monitor_task.cancel() try: await self._health_monitor_task except asyncio.CancelledError: pass if self.event_bus: try: await self.event_bus.off("vi.services.register") await self.event_bus.off("vi.services.deregister") await self.event_bus.off("vi.services.heartbeat") await self.event_bus.off("vi.services.discover") await self.event_bus.off("vi.services.list") await self.event_bus.off("vi.services.health") except Exception as e: logger.debug(f"[🗂️] Handler cleanup: {e}") logger.info("[🗂️] Service registry shutdown") async def _register_handlers(self): """Register NATS handlers for service registry operations""" if not self.event_bus: return await self.event_bus.on("vi.services.register", self._handle_service_register) await self.event_bus.on("vi.services.deregister", self._handle_service_deregister) await self.event_bus.on("vi.services.heartbeat", self._handle_service_heartbeat) await self.event_bus.on("vi.services.discover", self._handle_service_discover) await self.event_bus.on("vi.services.list", self._handle_service_list) await self.event_bus.on("vi.services.health", self._handle_health_request) logger.debug("[🗂️] Service registry handlers registered") def _serialize_manifest(self, manifest: ServiceManifest) -> Dict[str, Any]: """Serialize ServiceManifest to JSON-compatible dictionary""" return { 'service_id': manifest.service_id, 'name': manifest.name, 'description': manifest.description, 'version': manifest.version, 'namespace': manifest.namespace, 'operations': [op.__dict__ for op in manifest.operations], 'dependencies': manifest.dependencies, 'health_check_topic': manifest.health_check_topic, 'heartbeat_interval': manifest.heartbeat_interval, 'metadata': manifest.metadata } def register_service(self, service_id: str, manifest: ServiceManifest, instance_id: Optional[str] = None) -> str: """Register a service with the registry""" if instance_id is None: instance_id = f"{service_id}-{int(time.time())}" instance = ServiceInstance( service_id=service_id, instance_id=instance_id, manifest=manifest, status=ServiceStatus.REGISTERING ) self.services[service_id] = instance for operation in manifest.operations: operation_key = f"{service_id}.{operation.operation_id}" self.service_operations[operation_key] = operation if manifest.dependencies: self.service_dependencies[service_id] = set(manifest.dependencies) logger.info(f"[🗂️] Registered service: {service_id} (instance: {instance_id})") return instance_id def deregister_service(self, service_id: str) -> bool: """Deregister a service from the registry""" instance = self.services.pop(service_id, None) if not instance: logger.warning(f"[🗂️] Service {service_id} not found") return False operations_to_remove = [ op_key for op_key in self.service_operations.keys() if op_key.startswith(f"{service_id}.") ] for op_key in operations_to_remove: self.service_operations.pop(op_key, None) self.service_dependencies.pop(service_id, None) logger.info(f"[🗂️] Deregistered service: {service_id}") return True def update_service_heartbeat(self, service_id: str, health_data: Optional[Dict[str, Any]] = None) -> bool: """Update service heartbeat and health data""" instance = self.services.get(service_id) if not instance: return False instance.last_heartbeat = datetime.utcnow() if health_data: instance.health_data = health_data if health_data and 'status' in health_data: try: instance.status = ServiceStatus(health_data['status']) except ValueError: instance.status = ServiceStatus.UNKNOWN elif instance.status == ServiceStatus.REGISTERING: instance.status = ServiceStatus.HEALTHY return True def get_service(self, service_id: str) -> Optional[ServiceInstance]: """Get service instance by ID""" return self.services.get(service_id) def find_service_operation(self, service_id: str, operation_id: str) -> Optional[ServiceOperation]: """Find a specific service operation""" return self.service_operations.get(f"{service_id}.{operation_id}") def list_services(self, status_filter: Optional[ServiceStatus] = None) -> List[ServiceInstance]: """List all services, optionally filtered by status""" services = list(self.services.values()) if status_filter: services = [s for s in services if s.status == status_filter] return services def list_service_operations(self, service_id: Optional[str] = None) -> List[ServiceOperation]: """List all service operations""" operations = list(self.service_operations.values()) if service_id: operations = [ op for op_key, op in self.service_operations.items() if op_key.startswith(f"{service_id}.") ] return operations def check_dependencies(self, service_id: str) -> Dict[str, bool]: """Check if service dependencies are available""" dependencies = self.service_dependencies.get(service_id, set()) result = {} for dep_service_id in dependencies: dep_instance = self.services.get(dep_service_id) result[dep_service_id] = ( dep_instance is not None and dep_instance.status in [ServiceStatus.HEALTHY, ServiceStatus.DEGRADED] ) return result async def _health_monitor_loop(self): """Background task to monitor service health""" while self._running: try: await self._check_service_health() await asyncio.sleep(self.health_check_interval) except asyncio.CancelledError: break except Exception as e: logger.exception(f"[🗂️] Health monitor error: {e}") await asyncio.sleep(5) async def _check_service_health(self): """Check health of all registered services""" current_time = datetime.utcnow() for service_id, instance in self.services.items(): heartbeat_age = (current_time - instance.last_heartbeat).total_seconds() if heartbeat_age > self.heartbeat_timeout: if instance.status != ServiceStatus.OFFLINE: logger.warning(f"[🗂️] Service {service_id} heartbeat timeout ({heartbeat_age}s)") instance.status = ServiceStatus.OFFLINE if self.event_bus: await self.event_bus.emit("vi.services.health.status", { "service_id": service_id, "instance_id": instance.instance_id, "status": instance.status.value, "last_heartbeat": instance.last_heartbeat.isoformat(), "health_data": instance.health_data }) # NATS Event Handlers async def _handle_service_register(self, msg): """Handle service registration requests""" try: if hasattr(msg, 'data'): payload = json.loads(msg.data.decode()) else: payload = msg if isinstance(msg, dict) else json.loads(msg) service_id = payload.get('service_id') manifest_data = payload.get('manifest', {}) instance_id = payload.get('instance_id') if not service_id: logger.error("[🗂️] Registration error: Missing service_id") return manifest_data.pop('service_id', None) operations_data = manifest_data.pop('operations', []) operations = [] for op_data in operations_data: if isinstance(op_data, dict): operation = ServiceOperation( operation_id=op_data.get('operation_id', ''), description=op_data.get('description', ''), request_topic=op_data.get('request_topic', ''), response_pattern=op_data.get('response_pattern', 'request-reply'), parameters=op_data.get('parameters', []), timeout_ms=op_data.get('timeout_ms', 5000), metadata=op_data.get('metadata', {}) ) operations.append(operation) else: operations.append(op_data) manifest = ServiceManifest(service_id=service_id, operations=operations, **manifest_data) actual_instance_id = self.register_service(service_id, manifest, instance_id) logger.info(f"[🗂️] ✅ Service registered: {service_id}") except Exception as e: logger.exception(f"[🗂️] Registration error: {e}") async def _handle_service_deregister(self, msg): """Handle service deregistration requests""" try: if hasattr(msg, 'data'): payload = json.loads(msg.data.decode()) else: payload = msg if isinstance(msg, dict) else json.loads(msg) service_id = payload.get('service_id') if service_id: self.deregister_service(service_id) except Exception as e: logger.exception(f"[🗂️] Deregistration error: {e}") async def _handle_service_heartbeat(self, msg): """Handle service heartbeat updates""" try: if hasattr(msg, 'data'): payload = json.loads(msg.data.decode()) else: payload = msg if isinstance(msg, dict) else json.loads(msg) service_id = payload.get('service_id') instance_id = payload.get('instance_id') health_data = payload.get('health_data') if not service_id: if hasattr(msg, 'respond'): await msg.respond(json.dumps({'acknowledged': False}).encode()) return success = self.update_service_heartbeat(service_id, health_data) response = { 'acknowledged': success, 'service_id': service_id, 'instance_id': instance_id, 'timestamp': datetime.utcnow().isoformat() } if hasattr(msg, 'respond'): await msg.respond(json.dumps(response).encode()) except Exception as e: logger.exception(f"[🗂️] Heartbeat error: {e}") async def _handle_service_discover(self, msg): """Handle service discovery requests""" try: if hasattr(msg, 'data'): payload = json.loads(msg.data.decode()) else: payload = msg if isinstance(msg, dict) else json.loads(msg) service_id = payload.get('service_id') operation_id = payload.get('operation_id') if service_id and operation_id: operation = self.find_service_operation(service_id, operation_id) result = operation.__dict__ if operation and hasattr(operation, '__dict__') else operation elif service_id: instance = self.get_service(service_id) result = { 'service_id': instance.service_id, 'instance_id': instance.instance_id, 'status': instance.status.value, 'manifest': self._serialize_manifest(instance.manifest), 'last_heartbeat': instance.last_heartbeat.isoformat() } if instance else None else: services = self.list_services() result = [ { 'service_id': s.service_id, 'status': s.status.value, 'last_heartbeat': s.last_heartbeat.isoformat() } for s in services ] await msg.respond(json.dumps({'result': result}).encode()) except Exception as e: logger.exception(f"[🗂️] Discovery error: {e}") await msg.respond(json.dumps({'error': str(e)}).encode()) async def _handle_service_list(self, msg): """Handle service listing requests""" try: if hasattr(msg, 'data'): payload = json.loads(msg.data.decode()) else: payload = msg if isinstance(msg, dict) else json.loads(msg) status_filter = payload.get('status_filter') status_enum = ServiceStatus(status_filter) if status_filter else None services = self.list_services(status_enum) result = [ { 'service_id': s.service_id, 'status': s.status.value, 'name': s.manifest.name, 'version': s.manifest.version, 'last_heartbeat': s.last_heartbeat.isoformat() } for s in services ] await msg.respond(json.dumps({'services': result}).encode()) except Exception as e: logger.exception(f"[🗂️] List error: {e}") await msg.respond(json.dumps({'error': str(e)}).encode()) async def _handle_health_request(self, msg): """Handle health status requests""" try: if hasattr(msg, 'data'): payload = json.loads(msg.data.decode()) else: payload = msg if isinstance(msg, dict) else json.loads(msg) service_id = payload.get('service_id') if service_id: instance = self.get_service(service_id) result = { 'service_id': service_id, 'status': instance.status.value, 'last_heartbeat': instance.last_heartbeat.isoformat(), 'health_data': instance.health_data } if instance else None else: result = { sid: { 'status': inst.status.value, 'last_heartbeat': inst.last_heartbeat.isoformat(), 'health_data': inst.health_data } for sid, inst in self.services.items() } await msg.respond(json.dumps({'health': result}).encode()) except Exception as e: logger.exception(f"[🗂️] Health request error: {e}") await msg.respond(json.dumps({'error': str(e)}).encode()) service_registry = ServiceRegistry()