""" Oracle Service - LLM-powered reasoning service for Vi. Provides the core LLM interface for Vi's nervous system. """ import asyncio import json from typing import Dict, Any from core.logger import setup_logger from core.nats_event_bus import nats_bus as event_bus from core.base_service import BaseService from core.service_registry import ServiceManifest from .llm.llm_manager import LLMManager logger = setup_logger('oracle_service', service_name='oracle_service') class OracleService(BaseService): """Oracle service with LLM integration""" def __init__(self): super().__init__('oracle') self.llm_manager = LLMManager() logger.info("[✺] Oracle Service initialized") def get_service_manifest(self) -> ServiceManifest: """Return service manifest""" operations = [ self.create_service_operation( "process", "Process LLM requests", timeout_ms=60000 # 60 seconds for LLM ), self.create_service_operation( "generate", "Simple text generation", timeout_ms=60000 ) ] return ServiceManifest( service_id=self.service_id, name="Oracle Service", description="LLM-powered reasoning for Vi", version="1.0.0", operations=operations, dependencies=[], health_check_topic=f"vi.services.{self.service_id}.health", heartbeat_interval=30, metadata={ "llm_backend": self.llm_manager.backend_type or "vllm", "model_path": self.llm_manager.model_path, } ) async def initialize_service(self): """Initialize service resources""" self.logger.info("[✺] Loading LLM model...") success = await self.llm_manager.load_model() if not success: self.logger.error("[✺] Failed to load LLM model") raise RuntimeError("LLM model loading failed") model_info = self.llm_manager.get_model_info() self.logger.info(f"[✺] LLM loaded - Backend: {model_info['backend_type']}, Model: {model_info['model_name']}") # Register handlers await self.register_handler("process", self.handle_process) await self.register_handler("generate", self.handle_generate) self.logger.info("[✺] Oracle Service ready") async def cleanup_service(self): """Cleanup resources""" if self.llm_manager.is_loaded: self.logger.info("[✺] Unloading LLM model...") await self.llm_manager.unload_model() self.logger.info("[✺] Oracle Service cleanup complete") async def perform_health_check(self) -> Dict[str, Any]: """Health check""" return { 'healthy': self.llm_manager.is_loaded, 'checks': { 'running': self._running, 'event_bus': self.event_bus is not None, 'llm_loaded': self.llm_manager.is_loaded, 'backend_type': self.llm_manager.backend_type or 'unknown', 'model_name': self.llm_manager.model_name or 'unknown' } } async def handle_process(self, msg): """Handle vi.services.oracle.process requests""" try: payload = json.loads(msg.data.decode()) prompt = payload.get("prompt", "") temperature = payload.get("temperature") max_tokens = payload.get("max_tokens") enable_thinking = payload.get("enable_thinking", True) logger.info(f"[✺] Processing request - prompt: {len(prompt)} chars") response = await self.llm_manager.generate_response( prompt=prompt, temperature=temperature, max_tokens=max_tokens, enable_thinking=enable_thinking ) result = { "content": response, "status": "success" } return result except Exception as e: logger.error(f"[✺] Process error: {e}") return { "content": "Error processing request", "status": "error", "error": str(e) } async def handle_generate(self, msg): """Simple generation endpoint""" try: payload = json.loads(msg.data.decode()) prompt = payload.get("prompt", "") response = await self.llm_manager.generate_response(prompt=prompt) return { "content": response, "status": "success" } except Exception as e: logger.error(f"[✺] Generate error: {e}") return { "content": "", "status": "error", "error": str(e) } # Global instance oracle_service = OracleService() async def main(): """Main entry point""" try: await event_bus.connect() logger.info("[✺] Connected to NATS") await oracle_service.start(event_bus) logger.info("[✺] Oracle service running. Ctrl+C to stop.") while True: await asyncio.sleep(1) except KeyboardInterrupt: logger.info("[✺] Shutdown requested") except Exception as e: logger.error(f"[✺] Error: {e}", exc_info=True) finally: await oracle_service.stop() await event_bus.close() logger.info("[✺] Shutdown complete") if __name__ == "__main__": asyncio.run(main())