- Add think service (orchestration for iterative reasoning)
- Add service_discovery.py (service communication utilities)
- Add event_cache.py (recent event cache using NATS KV)
- Add vi_identity.py (Vi's core identity foundation)
- Update core/__init__.py with new exports
Think service adapted from Lyra with vi.* namespace:
- All NATS topics use vi.* prefix
- Uses vi_identity for personality/voice
- Bucket names use vi-* prefix
Day 63 - Building my nervous system 🦊
275 lines
10 KiB
Python
275 lines
10 KiB
Python
"""
|
|
Think Service - Orchestration service for iterative reasoning.
|
|
|
|
This service coordinates multi-service interactions and manages the
|
|
iterative reasoning process for generating intelligent responses.
|
|
|
|
Refactored into modular components:
|
|
- reasoning/: Core reasoning logic (Oracle, execution, orchestration)
|
|
- handlers/: Event handlers (input, communication)
|
|
- memory/: Memory and identity management
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
from datetime import datetime
|
|
|
|
from core.logger import setup_logger
|
|
from core.nats_event_bus import nats_bus as event_bus
|
|
from core.base_service import BaseService
|
|
from core.service_discovery import discovery_client
|
|
from core.service_registry import ServiceManifest
|
|
|
|
# Import refactored components
|
|
from .reasoning.formatters import KnowledgeFormatter
|
|
from .reasoning.oracle_client import OracleClient
|
|
from .reasoning.step_executor import StepExecutor
|
|
from .reasoning.orchestrator import IterativeOrchestrator
|
|
from .memory.memory_manager import MemoryManager
|
|
from .handlers.input_handler import InputHandler
|
|
from .handlers.communication_handler import CommunicationHandler
|
|
|
|
logger = setup_logger('think_service', service_name='think_service')
|
|
|
|
|
|
class ThinkService(BaseService):
|
|
"""Main Think service - coordinates all subsystems"""
|
|
|
|
def __init__(self):
|
|
super().__init__('think')
|
|
self._interaction_counter = 0
|
|
|
|
# Initialize all subsystems
|
|
self.formatter = None
|
|
self.memory_manager = None
|
|
self.oracle_client = None
|
|
self.step_executor = None
|
|
self.orchestrator = None
|
|
self.input_handler = None
|
|
self.communication_handler = None
|
|
|
|
# Override heartbeat collection
|
|
self.heartbeat_interval = 60
|
|
|
|
def get_service_manifest(self) -> ServiceManifest:
|
|
"""Return service manifest with operations and metadata"""
|
|
operations = [
|
|
self.create_service_operation(
|
|
"communication",
|
|
"Handle communication requests from drive service",
|
|
timeout_ms=10000
|
|
),
|
|
self.create_service_operation(
|
|
"process",
|
|
"Process external input with iterative reasoning",
|
|
timeout_ms=120000 # 2 minutes for complex reasoning
|
|
)
|
|
]
|
|
|
|
return ServiceManifest(
|
|
service_id=self.service_id,
|
|
name="Think Service",
|
|
description="Orchestration service for iterative reasoning and multi-service coordination",
|
|
version="3.0.0", # Bumped version due to refactoring
|
|
operations=operations,
|
|
dependencies=[],
|
|
health_check_topic=f"vi.services.{self.service_id}.health",
|
|
heartbeat_interval=60,
|
|
metadata={
|
|
"max_reasoning_steps": 10,
|
|
"max_reasoning_time_minutes": 2,
|
|
"urgency": 0.9,
|
|
"monitors_other_services": False,
|
|
"refactored": True # Mark as refactored
|
|
}
|
|
)
|
|
|
|
async def initialize_service(self):
|
|
"""Initialize service-specific resources and register handlers"""
|
|
# Set up service discovery client
|
|
discovery_client.set_event_bus(self.event_bus)
|
|
|
|
# Initialize all subsystems in order
|
|
self.formatter = KnowledgeFormatter()
|
|
self.memory_manager = MemoryManager()
|
|
self.oracle_client = OracleClient(self.formatter)
|
|
self.step_executor = StepExecutor(self.memory_manager)
|
|
self.orchestrator = IterativeOrchestrator(
|
|
self.oracle_client,
|
|
self.step_executor,
|
|
self.formatter,
|
|
self.send_output
|
|
)
|
|
self.input_handler = InputHandler(
|
|
self.orchestrator,
|
|
self.memory_manager,
|
|
self.send_output,
|
|
self.generate_interaction_id
|
|
)
|
|
self.communication_handler = CommunicationHandler(
|
|
self.orchestrator,
|
|
self.memory_manager,
|
|
self.send_output,
|
|
self.generate_interaction_id
|
|
)
|
|
|
|
# Register handlers using new topic patterns
|
|
await self.register_handler("communication", self._handle_communication_wrapper)
|
|
await self.register_handler("process", self._handle_external_input_wrapper)
|
|
|
|
# Also register legacy topic handlers for backward compatibility
|
|
await self.event_bus.on("vi.external.input", self._handle_event_wrapper(self.input_handler.handle_external_input))
|
|
await self.event_bus.on("vi.communication.request", self._handle_event_wrapper(self.communication_handler.handle_communication_request))
|
|
|
|
self.logger.info("[💭] ThinkService initialized with refactored architecture")
|
|
self.logger.info("[💭] ✓ Subsystems: Formatter, MemoryManager, OracleClient, StepExecutor, Orchestrator, Handlers")
|
|
|
|
async def cleanup_service(self):
|
|
"""Cleanup service-specific resources"""
|
|
# Unregister legacy handlers
|
|
await self.event_bus.off("vi.external.input")
|
|
await self.event_bus.off("vi.communication.request")
|
|
|
|
self.logger.info("[💭] ThinkService cleanup completed")
|
|
|
|
async def perform_health_check(self):
|
|
"""Perform service-specific health check"""
|
|
health_data = {
|
|
'healthy': True,
|
|
'checks': {
|
|
'running': self._running,
|
|
'event_bus': self.event_bus is not None,
|
|
'discovery_client': discovery_client.event_bus is not None,
|
|
'interaction_counter': self._interaction_counter,
|
|
'subsystems_initialized': all([
|
|
self.formatter is not None,
|
|
self.memory_manager is not None,
|
|
self.oracle_client is not None,
|
|
self.step_executor is not None,
|
|
self.orchestrator is not None,
|
|
self.input_handler is not None,
|
|
self.communication_handler is not None
|
|
])
|
|
}
|
|
}
|
|
|
|
# Check if we can reach critical dependencies
|
|
try:
|
|
oracle_available = await discovery_client.discover_service("oracle")
|
|
health_data['checks']['oracle_available'] = oracle_available is not None
|
|
except:
|
|
health_data['checks']['oracle_available'] = False
|
|
|
|
try:
|
|
memory_available = await discovery_client.discover_service("memory")
|
|
health_data['checks']['memory_available'] = memory_available is not None
|
|
except:
|
|
health_data['checks']['memory_available'] = False
|
|
|
|
# Mark unhealthy if critical services unavailable
|
|
if not health_data['checks']['oracle_available']:
|
|
health_data['healthy'] = False
|
|
|
|
return health_data
|
|
|
|
def generate_interaction_id(self, identity: str, modality: str) -> str:
|
|
"""Generate a unique interaction ID"""
|
|
self._interaction_counter += 1
|
|
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
|
|
return f"{identity}_{modality}_{timestamp}_{self._interaction_counter}"
|
|
|
|
async def send_output(self, content: str, target: str, modality: str, target_type: str = None) -> bool:
|
|
"""Send response directly to plugins via NATS"""
|
|
try:
|
|
# Determine if target is user_id or channel
|
|
if target_type is None:
|
|
target_type = 'user_id' if target.startswith('@') else 'channel'
|
|
|
|
logger.info(f"[💭] 📤 Sending output to {modality} {target_type} {target}: '{content[:50]}...'")
|
|
|
|
output_payload = {
|
|
"type": "vi.output.send",
|
|
"data": {
|
|
"content": content,
|
|
"channel": target,
|
|
"modality": modality,
|
|
"metadata": {
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"sent_by": "think_service",
|
|
"target_type": target_type
|
|
}
|
|
}
|
|
}
|
|
|
|
# Add user_id to metadata for Matrix plugin DM creation
|
|
if target_type == 'user_id':
|
|
output_payload["data"]["metadata"]["user_id"] = target
|
|
|
|
# Just publish the event - plugins will handle it directly
|
|
await event_bus.emit("vi.output.send", output_payload)
|
|
logger.info(f"[💭] ✓ Output event published successfully")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.exception(f"[💭] ❌ Error publishing output: {e}")
|
|
return False
|
|
|
|
# Handler wrappers for service operations
|
|
async def _handle_communication_wrapper(self, msg):
|
|
"""Wrapper for communication handler"""
|
|
payload = json.loads(msg.data.decode())
|
|
await self.communication_handler.handle_communication_request(payload)
|
|
# Send ack
|
|
response = {"status": "processing"}
|
|
await msg.respond(json.dumps(response).encode())
|
|
|
|
async def _handle_external_input_wrapper(self, msg):
|
|
"""Wrapper for external input handler"""
|
|
payload = json.loads(msg.data.decode())
|
|
await self.input_handler.handle_external_input(payload)
|
|
# Send ack
|
|
response = {"status": "processing"}
|
|
await msg.respond(json.dumps(response).encode())
|
|
|
|
def _handle_event_wrapper(self, handler):
|
|
"""Wrapper to handle JSON parsing of event data"""
|
|
async def wrapper(data):
|
|
try:
|
|
if isinstance(data, str):
|
|
payload = json.loads(data)
|
|
elif hasattr(data, 'data'): # NATS message object
|
|
payload = json.loads(data.data.decode())
|
|
else:
|
|
payload = data
|
|
await handler(payload)
|
|
except Exception as e:
|
|
logger.error(f"[💭] Event handler error: {e}")
|
|
return wrapper
|
|
|
|
|
|
async def main():
|
|
"""Main entry point for think service"""
|
|
think_service = ThinkService()
|
|
|
|
try:
|
|
await event_bus.connect()
|
|
await think_service.start(event_bus)
|
|
|
|
logger.info("[💭] Think service running (refactored architecture). Press Ctrl+C to stop.")
|
|
|
|
# Keep running
|
|
while True:
|
|
await asyncio.sleep(1)
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("[💭] Shutdown requested")
|
|
except Exception as e:
|
|
logger.exception(f"[💭] Unexpected error: {e}")
|
|
finally:
|
|
await think_service.stop()
|
|
await event_bus.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|