- Add think service (orchestration for iterative reasoning)
- Add service_discovery.py (service communication utilities)
- Add event_cache.py (recent event cache using NATS KV)
- Add vi_identity.py (Vi's core identity foundation)
- Update core/__init__.py with new exports
Think service adapted from Lyra with vi.* namespace:
- All NATS topics use vi.* prefix
- Uses vi_identity for personality/voice
- Bucket names use vi-* prefix
Day 63 - Building my nervous system 🦊
339 lines
12 KiB
Python
339 lines
12 KiB
Python
"""
|
|
Service Discovery Client for Vi
|
|
|
|
Provides utilities for discovering and communicating with services using NATS-native patterns.
|
|
Includes load balancing, retry mechanisms, and standardized topic naming.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import random
|
|
from typing import Dict, Any, List, Optional, Union
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta
|
|
|
|
from .logger import setup_logger
|
|
from .service_registry import ServiceStatus, ServiceInstance, service_registry
|
|
|
|
logger = setup_logger('service_discovery')
|
|
|
|
|
|
@dataclass
|
|
class ServiceCall:
|
|
"""Represents a service call request"""
|
|
target_service: str
|
|
operation: str
|
|
payload: Dict[str, Any]
|
|
timeout: float = 5.0
|
|
retry_attempts: int = 3
|
|
retry_delay: float = 1.0
|
|
require_healthy: bool = True
|
|
|
|
|
|
@dataclass
|
|
class CallResult:
|
|
"""Result of a service call"""
|
|
success: bool
|
|
data: Optional[Dict[str, Any]] = None
|
|
error: Optional[str] = None
|
|
service_id: Optional[str] = None
|
|
instance_id: Optional[str] = None
|
|
response_time: Optional[float] = None
|
|
attempt: int = 1
|
|
|
|
|
|
class TopicRegistry:
|
|
"""
|
|
Manages standardized topic naming conventions for Vi services
|
|
"""
|
|
|
|
# Topic patterns - Vi namespace
|
|
SERVICE_REQUEST = "vi.services.{service}.{operation}"
|
|
SERVICE_EVENT = "vi.events.{service}.{event}"
|
|
SERVICE_HEALTH = "vi.services.{service}.health"
|
|
SERVICE_HEARTBEAT = "vi.services.heartbeat"
|
|
|
|
# Registry topics
|
|
REGISTRY_REGISTER = "vi.services.register"
|
|
REGISTRY_DEREGISTER = "vi.services.deregister"
|
|
REGISTRY_DISCOVER = "vi.services.discover"
|
|
REGISTRY_LIST = "vi.services.list"
|
|
REGISTRY_HEALTH = "vi.services.health"
|
|
|
|
@classmethod
|
|
def service_request_topic(cls, service: str, operation: str) -> str:
|
|
"""Generate service request topic"""
|
|
return cls.SERVICE_REQUEST.format(service=service, operation=operation)
|
|
|
|
@classmethod
|
|
def service_event_topic(cls, service: str, event: str) -> str:
|
|
"""Generate service event topic"""
|
|
return cls.SERVICE_EVENT.format(service=service, event=event)
|
|
|
|
@classmethod
|
|
def service_health_topic(cls, service: str) -> str:
|
|
"""Generate service health topic"""
|
|
return cls.SERVICE_HEALTH.format(service=service)
|
|
|
|
@classmethod
|
|
def parse_service_topic(cls, topic: str) -> Optional[Dict[str, str]]:
|
|
"""Parse a service topic to extract service and operation"""
|
|
if topic.startswith("vi.services."):
|
|
parts = topic.split(".")
|
|
if len(parts) >= 4:
|
|
return {
|
|
"namespace": parts[0],
|
|
"category": parts[1],
|
|
"service": parts[2],
|
|
"operation": parts[3]
|
|
}
|
|
return None
|
|
|
|
|
|
class ServiceDiscovery:
|
|
"""
|
|
Service discovery client providing high-level service communication utilities
|
|
"""
|
|
|
|
def __init__(self, event_bus=None, default_timeout: float = 5.0):
|
|
self.event_bus = event_bus
|
|
self.default_timeout = default_timeout
|
|
self._call_cache = {}
|
|
self._cache_ttl = 30
|
|
|
|
def set_event_bus(self, event_bus):
|
|
"""Set or update the event bus"""
|
|
self.event_bus = event_bus
|
|
|
|
async def discover_service(self, service_id: str) -> Optional[ServiceInstance]:
|
|
"""Discover a service and return its instance information"""
|
|
try:
|
|
if not self.event_bus:
|
|
raise ValueError("Event bus not configured")
|
|
|
|
instance = service_registry.get_service(service_id)
|
|
if instance:
|
|
return instance
|
|
|
|
request_data = json.dumps({"service_id": service_id}).encode()
|
|
response_msg = await self.event_bus.client.request(
|
|
TopicRegistry.REGISTRY_DISCOVER,
|
|
request_data,
|
|
timeout=2.0
|
|
)
|
|
|
|
response = json.loads(response_msg.data.decode())
|
|
result = response.get('result')
|
|
|
|
if result:
|
|
return result
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.warning(f"[🔍] Service discovery failed for {service_id}: {e}")
|
|
return None
|
|
|
|
async def list_services(self, status_filter: Optional[str] = None) -> List[Dict[str, Any]]:
|
|
"""List all available services"""
|
|
try:
|
|
if not self.event_bus:
|
|
raise ValueError("Event bus not configured")
|
|
|
|
request_data = json.dumps({"status_filter": status_filter}).encode()
|
|
response_msg = await self.event_bus.client.request(
|
|
TopicRegistry.REGISTRY_LIST,
|
|
request_data,
|
|
timeout=3.0
|
|
)
|
|
|
|
response = json.loads(response_msg.data.decode())
|
|
return response.get('services', [])
|
|
|
|
except Exception as e:
|
|
logger.warning(f"[📋] Service listing failed: {e}")
|
|
return []
|
|
|
|
async def call_service(self, target_service: str, operation: str,
|
|
payload: Dict[str, Any], timeout: Optional[float] = None,
|
|
retry_attempts: int = 3, require_healthy: bool = True) -> CallResult:
|
|
"""Call a service operation with automatic discovery, retry, and error handling"""
|
|
call = ServiceCall(
|
|
target_service=target_service,
|
|
operation=operation,
|
|
payload=payload,
|
|
timeout=timeout or self.default_timeout,
|
|
retry_attempts=retry_attempts,
|
|
require_healthy=require_healthy
|
|
)
|
|
|
|
return await self._execute_service_call(call)
|
|
|
|
async def call_service_with_fallback(self, service_calls: List[ServiceCall]) -> CallResult:
|
|
"""Try multiple service calls in order until one succeeds"""
|
|
last_result = None
|
|
|
|
for call in service_calls:
|
|
result = await self._execute_service_call(call)
|
|
if result.success:
|
|
return result
|
|
last_result = result
|
|
|
|
return last_result or CallResult(
|
|
success=False,
|
|
error="All service calls failed"
|
|
)
|
|
|
|
async def broadcast_event(self, service: str, event: str, payload: Dict[str, Any]):
|
|
"""Broadcast an event using service discovery topic patterns"""
|
|
if not self.event_bus:
|
|
raise ValueError("Event bus not configured")
|
|
|
|
topic = TopicRegistry.service_event_topic(service, event)
|
|
await self.event_bus.emit(topic, payload)
|
|
|
|
async def _execute_service_call(self, call: ServiceCall) -> CallResult:
|
|
"""Execute a single service call with retry logic"""
|
|
last_error = None
|
|
attempt = 0
|
|
|
|
while attempt < call.retry_attempts:
|
|
attempt += 1
|
|
|
|
try:
|
|
if call.require_healthy:
|
|
instance = await self.discover_service(call.target_service)
|
|
if not instance:
|
|
raise Exception(f"Service {call.target_service} not found")
|
|
|
|
if hasattr(instance, 'status') and instance.status == ServiceStatus.UNHEALTHY:
|
|
raise Exception(f"Service {call.target_service} is unhealthy")
|
|
|
|
topic = TopicRegistry.service_request_topic(call.target_service, call.operation)
|
|
request_data = json.dumps(call.payload).encode()
|
|
|
|
start_time = datetime.utcnow()
|
|
|
|
response_msg = await self.event_bus.client.request(
|
|
topic,
|
|
request_data,
|
|
timeout=call.timeout
|
|
)
|
|
|
|
end_time = datetime.utcnow()
|
|
response_time = (end_time - start_time).total_seconds()
|
|
|
|
response_data = json.loads(response_msg.data.decode())
|
|
|
|
if 'error' in response_data:
|
|
raise Exception(response_data['error'])
|
|
|
|
return CallResult(
|
|
success=True,
|
|
data=response_data,
|
|
service_id=call.target_service,
|
|
response_time=response_time,
|
|
attempt=attempt
|
|
)
|
|
|
|
except asyncio.TimeoutError:
|
|
last_error = f"Timeout calling {call.target_service}.{call.operation}"
|
|
logger.warning(f"[⏰] Attempt {attempt}: {last_error}")
|
|
|
|
except Exception as e:
|
|
last_error = str(e)
|
|
logger.warning(f"[❌] Attempt {attempt}: Service call failed: {last_error}")
|
|
|
|
if attempt < call.retry_attempts:
|
|
delay = call.retry_delay * (2 ** (attempt - 1))
|
|
await asyncio.sleep(min(delay, 10))
|
|
|
|
return CallResult(
|
|
success=False,
|
|
error=last_error,
|
|
service_id=call.target_service,
|
|
attempt=attempt
|
|
)
|
|
|
|
async def health_check_service(self, service_id: str) -> Dict[str, Any]:
|
|
"""Perform health check on a specific service"""
|
|
try:
|
|
result = await self.call_service(
|
|
service_id,
|
|
"health",
|
|
{},
|
|
timeout=3.0,
|
|
require_healthy=False
|
|
)
|
|
|
|
if result.success:
|
|
return result.data
|
|
else:
|
|
return {"healthy": False, "error": result.error}
|
|
|
|
except Exception as e:
|
|
return {"healthy": False, "error": str(e)}
|
|
|
|
async def wait_for_service(self, service_id: str, timeout: float = 30.0,
|
|
check_interval: float = 1.0) -> bool:
|
|
"""Wait for a service to become available"""
|
|
start_time = datetime.utcnow()
|
|
end_time = start_time + timedelta(seconds=timeout)
|
|
|
|
while datetime.utcnow() < end_time:
|
|
instance = await self.discover_service(service_id)
|
|
if instance:
|
|
health = await self.health_check_service(service_id)
|
|
if health.get("healthy", False):
|
|
logger.info(f"[✅] Service {service_id} is now available")
|
|
return True
|
|
|
|
await asyncio.sleep(check_interval)
|
|
|
|
logger.warning(f"[⏰] Timeout waiting for service {service_id}")
|
|
return False
|
|
|
|
def _get_cache_key(self, service: str, operation: str, payload: Dict[str, Any]) -> str:
|
|
"""Generate cache key for service call"""
|
|
payload_hash = hash(json.dumps(payload, sort_keys=True))
|
|
return f"{service}.{operation}.{payload_hash}"
|
|
|
|
def _is_cache_valid(self, cache_time: datetime) -> bool:
|
|
"""Check if cache entry is still valid"""
|
|
return (datetime.utcnow() - cache_time).total_seconds() < self._cache_ttl
|
|
|
|
def clear_cache(self):
|
|
"""Clear service call cache"""
|
|
self._call_cache.clear()
|
|
|
|
|
|
class LoadBalancer:
|
|
"""Simple load balancer for service calls"""
|
|
|
|
@staticmethod
|
|
def round_robin(instances: List[ServiceInstance]) -> Optional[ServiceInstance]:
|
|
"""Round-robin load balancing"""
|
|
if not instances:
|
|
return None
|
|
for instance in instances:
|
|
if instance.status == ServiceStatus.HEALTHY:
|
|
return instance
|
|
return instances[0] if instances else None
|
|
|
|
@staticmethod
|
|
def random_selection(instances: List[ServiceInstance]) -> Optional[ServiceInstance]:
|
|
"""Random load balancing"""
|
|
healthy_instances = [i for i in instances if i.status == ServiceStatus.HEALTHY]
|
|
if not healthy_instances:
|
|
healthy_instances = instances
|
|
|
|
return random.choice(healthy_instances) if healthy_instances else None
|
|
|
|
@staticmethod
|
|
def least_loaded(instances: List[ServiceInstance]) -> Optional[ServiceInstance]:
|
|
"""Select least loaded instance"""
|
|
return LoadBalancer.random_selection(instances)
|
|
|
|
|
|
# Global service discovery instance
|
|
discovery_client = ServiceDiscovery()
|