diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..8b6444d --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +# Vi dependencies +nats-py>=2.6.0 +vllm>=0.4.0 +torch>=2.0.0 diff --git a/services/__init__.py b/services/__init__.py new file mode 100644 index 0000000..a5318d0 --- /dev/null +++ b/services/__init__.py @@ -0,0 +1 @@ +# Vi Services diff --git a/services/oracle/__init__.py b/services/oracle/__init__.py new file mode 100644 index 0000000..c23a5fa --- /dev/null +++ b/services/oracle/__init__.py @@ -0,0 +1,4 @@ +# Oracle Service - LLM wrapper for Vi +from .oracle_service import OracleService, oracle_service + +__all__ = ['OracleService', 'oracle_service'] diff --git a/services/oracle/llm/__init__.py b/services/oracle/llm/__init__.py new file mode 100644 index 0000000..e34e1f2 --- /dev/null +++ b/services/oracle/llm/__init__.py @@ -0,0 +1,4 @@ +# LLM module for Oracle service +from .llm_manager import LLMManager + +__all__ = ['LLMManager'] diff --git a/services/oracle/llm/generator.py b/services/oracle/llm/generator.py new file mode 100644 index 0000000..81b551f --- /dev/null +++ b/services/oracle/llm/generator.py @@ -0,0 +1,69 @@ +""" +Text generation using vLLM. +""" + +from typing import Optional +from core.logger import setup_logger + +logger = setup_logger('text_generator', service_name='oracle_service') + + +class TextGenerator: + """Text generation with vLLM""" + + def __init__(self, llm_model, sampling_config: dict): + self.llm = llm_model + self.sampling_config = sampling_config + + def generate( + self, + prompt: str, + max_tokens: int = None, + temperature: float = None, + top_p: float = None, + top_k: int = None, + min_p: float = None + ) -> Optional[str]: + """Generate text using vLLM""" + if not self.llm: + logger.error("[✺] LLM not initialized") + return None + + try: + params = self.sampling_config.copy() + + if max_tokens is not None: + params["max_new_tokens"] = max_tokens + if temperature is not None: + params["temperature"] = temperature + if top_p is not None: + params["top_p"] = top_p + if top_k is not None: + params["top_k"] = top_k + if min_p is not None: + params["min_p"] = min_p + + from vllm import SamplingParams + + sampling_params = SamplingParams( + temperature=params["temperature"], + top_p=params["top_p"], + top_k=params.get("top_k", -1), + min_p=params.get("min_p", 0.0), + max_tokens=params["max_new_tokens"], + repetition_penalty=params["repetition_penalty"] + ) + + outputs = self.llm.generate([prompt], sampling_params) + + if outputs and outputs[0].outputs: + raw_text = outputs[0].outputs[0].text + logger.info(f"[✺] Generated {len(raw_text)} chars") + return raw_text.strip() + else: + logger.warning("[✺] Empty output") + return "" + + except Exception as e: + logger.error(f"[✺] Generation failed: {e}") + return None diff --git a/services/oracle/llm/llm_manager.py b/services/oracle/llm/llm_manager.py new file mode 100644 index 0000000..9b01561 --- /dev/null +++ b/services/oracle/llm/llm_manager.py @@ -0,0 +1,127 @@ +""" +LLM Manager for Vi's Oracle service. + +Coordinates model loading and text generation. +""" + +import time +from typing import Optional, Dict, Any +from core.logger import setup_logger + +from .model_loader import ModelLoader +from .generator import TextGenerator + +logger = setup_logger('llm_manager', service_name='oracle_service') + + +class LLMManager: + """High-level LLM manager""" + + def __init__(self, model_path: str = None): + self.model_loader = ModelLoader(model_path) + self.generator = None + + # Sampling config for Qwen3 thinking mode + self.thinking_mode_config = { + "temperature": 0.6, + "top_p": 0.95, + "top_k": 20, + "min_p": 0.0, + "max_new_tokens": 8192, + "repetition_penalty": 1.1, + "do_sample": True, + } + + self.non_thinking_mode_config = { + "temperature": 0.7, + "top_p": 0.8, + "top_k": 20, + "min_p": 0.0, + "max_new_tokens": 8192, + "repetition_penalty": 1.1, + "do_sample": True, + } + + self.sampling_config = self.thinking_mode_config.copy() + + @property + def is_loaded(self) -> bool: + return self.model_loader.is_loaded + + @property + def model_path(self) -> str: + return self.model_loader.model_path + + @property + def model_name(self) -> Optional[str]: + return self.model_loader.model_name + + @property + def backend_type(self) -> Optional[str]: + return self.model_loader.backend_type + + async def load_model(self, model_path: Optional[str] = None) -> bool: + """Load model and initialize generator""" + success = await self.model_loader.load_model(model_path) + + if success and self.model_loader.llm: + self.generator = TextGenerator(self.model_loader.llm, self.sampling_config) + logger.info("[✺] TextGenerator initialized") + + return success + + async def unload_model(self): + """Unload model""" + self.generator = None + await self.model_loader.unload_model() + + def get_model_info(self) -> Dict[str, Any]: + """Get model information""" + return self.model_loader.get_model_info() + + async def generate_response( + self, + prompt: str, + temperature: float = None, + max_tokens: int = None, + enable_thinking: bool = True + ) -> Optional[str]: + """Generate a response using the loaded model""" + try: + if not self.is_loaded: + logger.warning("[✺] Model not loaded") + if not await self.load_model(): + return "I'm having trouble thinking right now." + + mode_config = self.thinking_mode_config if enable_thinking else self.non_thinking_mode_config + + if temperature is None: + temperature = mode_config["temperature"] + if max_tokens is None: + max_tokens = mode_config["max_new_tokens"] + + logger.info(f"[✺] Generating - temp: {temperature}, max_tokens: {max_tokens}") + + start_time = time.time() + + raw_text = self.generator.generate( + prompt, + max_tokens=max_tokens, + temperature=temperature, + top_p=mode_config["top_p"], + top_k=mode_config["top_k"], + min_p=mode_config["min_p"] + ) + + elapsed = time.time() - start_time + + if raw_text: + logger.info(f"[✺] Generated {len(raw_text)} chars in {elapsed:.2f}s") + return raw_text.strip() + else: + logger.warning("[✺] Empty response") + return "" + + except Exception as e: + logger.error(f"[✺] Generation failed: {e}") + return "I encountered an error while thinking." diff --git a/services/oracle/llm/model_loader.py b/services/oracle/llm/model_loader.py new file mode 100644 index 0000000..49f5476 --- /dev/null +++ b/services/oracle/llm/model_loader.py @@ -0,0 +1,164 @@ +""" +Model loading for vLLM backend. +""" + +import os +from typing import Optional, Dict, Any +from core.logger import setup_logger + +logger = setup_logger('model_loader', service_name='oracle_service') + + +class ModelLoader: + """Handles model loading with vLLM""" + + def __init__(self, model_path: str = None): + if model_path is None: + model_path = os.getenv('ORACLE_MODEL_PATH', "/data/models/Qwen3-32B-GPTQ-Int4") + self.model_path = model_path + + if model_path.startswith(('/', './', '../')): + self.model_cache_dir = os.path.dirname(model_path) + else: + self.model_cache_dir = os.getenv('HF_HOME', "/data/models") + + self.llm = None + self.model_name: Optional[str] = None + self.backend_type: Optional[str] = None + self.is_loaded = False + + async def load_model(self, model_path: Optional[str] = None) -> bool: + """Load model using vLLM""" + try: + target_model = model_path or self.model_path + + logger.info(f"[✺] Loading model: {target_model}") + + if self.is_loaded and self.model_name == target_model: + logger.info("[✺] Model already loaded") + return True + + if not self._validate_model_path(target_model): + logger.error(f"[✺] Model not found: {target_model}") + return False + + return await self._load_vllm(target_model) + + except Exception as e: + logger.exception(f"[✺] Failed to load model: {e}") + self.is_loaded = False + return False + + def _validate_model_path(self, model_path: str) -> bool: + """Check if model exists locally""" + if not model_path.startswith('/'): + return False + + if not os.path.exists(model_path) or not os.path.isdir(model_path): + return False + + # Check for config.json + if not os.path.exists(os.path.join(model_path, 'config.json')): + return False + + # Check for model files + has_model = ( + os.path.exists(os.path.join(model_path, 'model.safetensors')) or + os.path.exists(os.path.join(model_path, 'pytorch_model.bin')) or + os.path.exists(os.path.join(model_path, 'model.safetensors.index.json')) + ) + + return has_model + + async def _load_vllm(self, model_path: str) -> bool: + """Load with vLLM""" + try: + from vllm import LLM + import torch + + if not torch.cuda.is_available(): + logger.warning("[✺] CUDA not available") + return False + + gpu_name = torch.cuda.get_device_name(0) + gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1024**3 + logger.info(f"[✺] GPU: {gpu_name}, {gpu_memory:.1f}GB") + + is_gptq = 'GPTQ' in model_path or 'gptq' in model_path.lower() + max_context = int(os.getenv("MAX_MODEL_LEN", "8192")) + + vllm_config = { + "model": model_path, + "gpu_memory_utilization": float(os.getenv("GPU_MEMORY_UTILIZATION", "0.90")), + "max_model_len": max_context, + "tensor_parallel_size": 1, + "trust_remote_code": False, + "download_dir": self.model_cache_dir, + "disable_log_stats": True, + "max_num_seqs": 16, + "swap_space": 4, + } + + if is_gptq: + vllm_config["dtype"] = "float16" + vllm_config["quantization"] = "gptq_marlin" + logger.info("[✺] GPTQ mode enabled") + + logger.info(f"[✺] Initializing vLLM - max_context: {max_context}") + + self.llm = LLM(**vllm_config) + self.backend_type = "vllm" + self.model_name = model_path + self.is_loaded = True + + logger.info(f"[✺] Model loaded: {model_path}") + return True + + except Exception as e: + logger.error(f"[✺] vLLM failed: {e}") + return False + + async def unload_model(self): + """Unload model""" + try: + if self.llm is not None: + del self.llm + self.llm = None + + self.model_name = None + self.backend_type = None + self.is_loaded = False + + try: + import torch + if torch.cuda.is_available(): + torch.cuda.empty_cache() + except ImportError: + pass + + logger.info("[✺] Model unloaded") + + except Exception as e: + logger.exception(f"[✺] Unload error: {e}") + + def get_model_info(self) -> Dict[str, Any]: + """Get model info""" + info = { + "model_name": self.model_name, + "model_path": self.model_path, + "backend_type": self.backend_type, + "is_loaded": self.is_loaded, + } + + try: + import torch + if torch.cuda.is_available(): + info["gpu_available"] = True + info["gpu_memory_total"] = torch.cuda.get_device_properties(0).total_memory / 1024**3 + info["gpu_memory_reserved"] = torch.cuda.memory_reserved(0) / 1024**3 + else: + info["gpu_available"] = False + except ImportError: + info["gpu_available"] = False + + return info diff --git a/services/oracle/oracle_service.py b/services/oracle/oracle_service.py new file mode 100644 index 0000000..e44ea56 --- /dev/null +++ b/services/oracle/oracle_service.py @@ -0,0 +1,179 @@ +""" +Oracle Service - LLM-powered reasoning service for Vi. + +Provides the core LLM interface for Vi's nervous system. +""" +import asyncio +import json +from typing import Dict, Any + +from core.logger import setup_logger +from core.nats_event_bus import nats_bus as event_bus +from core.base_service import BaseService +from core.service_registry import ServiceManifest + +from .llm.llm_manager import LLMManager + +logger = setup_logger('oracle_service', service_name='oracle_service') + + +class OracleService(BaseService): + """Oracle service with LLM integration""" + + def __init__(self): + super().__init__('oracle') + self.llm_manager = LLMManager() + logger.info("[✺] Oracle Service initialized") + + def get_service_manifest(self) -> ServiceManifest: + """Return service manifest""" + operations = [ + self.create_service_operation( + "process", + "Process LLM requests", + timeout_ms=60000 # 60 seconds for LLM + ), + self.create_service_operation( + "generate", + "Simple text generation", + timeout_ms=60000 + ) + ] + + return ServiceManifest( + service_id=self.service_id, + name="Oracle Service", + description="LLM-powered reasoning for Vi", + version="1.0.0", + operations=operations, + dependencies=[], + health_check_topic=f"vi.services.{self.service_id}.health", + heartbeat_interval=30, + metadata={ + "llm_backend": self.llm_manager.backend_type or "vllm", + "model_path": self.llm_manager.model_path, + } + ) + + async def initialize_service(self): + """Initialize service resources""" + self.logger.info("[✺] Loading LLM model...") + success = await self.llm_manager.load_model() + + if not success: + self.logger.error("[✺] Failed to load LLM model") + raise RuntimeError("LLM model loading failed") + + model_info = self.llm_manager.get_model_info() + self.logger.info(f"[✺] LLM loaded - Backend: {model_info['backend_type']}, Model: {model_info['model_name']}") + + # Register handlers + await self.register_handler("process", self.handle_process) + await self.register_handler("generate", self.handle_generate) + + self.logger.info("[✺] Oracle Service ready") + + async def cleanup_service(self): + """Cleanup resources""" + if self.llm_manager.is_loaded: + self.logger.info("[✺] Unloading LLM model...") + await self.llm_manager.unload_model() + self.logger.info("[✺] Oracle Service cleanup complete") + + async def perform_health_check(self) -> Dict[str, Any]: + """Health check""" + return { + 'healthy': self.llm_manager.is_loaded, + 'checks': { + 'running': self._running, + 'event_bus': self.event_bus is not None, + 'llm_loaded': self.llm_manager.is_loaded, + 'backend_type': self.llm_manager.backend_type or 'unknown', + 'model_name': self.llm_manager.model_name or 'unknown' + } + } + + async def handle_process(self, msg): + """Handle vi.services.oracle.process requests""" + try: + payload = json.loads(msg.data.decode()) + + prompt = payload.get("prompt", "") + temperature = payload.get("temperature") + max_tokens = payload.get("max_tokens") + enable_thinking = payload.get("enable_thinking", True) + + logger.info(f"[✺] Processing request - prompt: {len(prompt)} chars") + + response = await self.llm_manager.generate_response( + prompt=prompt, + temperature=temperature, + max_tokens=max_tokens, + enable_thinking=enable_thinking + ) + + result = { + "content": response, + "status": "success" + } + + return result + + except Exception as e: + logger.error(f"[✺] Process error: {e}") + return { + "content": "Error processing request", + "status": "error", + "error": str(e) + } + + async def handle_generate(self, msg): + """Simple generation endpoint""" + try: + payload = json.loads(msg.data.decode()) + prompt = payload.get("prompt", "") + + response = await self.llm_manager.generate_response(prompt=prompt) + + return { + "content": response, + "status": "success" + } + + except Exception as e: + logger.error(f"[✺] Generate error: {e}") + return { + "content": "", + "status": "error", + "error": str(e) + } + + +# Global instance +oracle_service = OracleService() + + +async def main(): + """Main entry point""" + try: + await event_bus.connect() + logger.info("[✺] Connected to NATS") + + await oracle_service.start(event_bus) + logger.info("[✺] Oracle service running. Ctrl+C to stop.") + + while True: + await asyncio.sleep(1) + + except KeyboardInterrupt: + logger.info("[✺] Shutdown requested") + except Exception as e: + logger.error(f"[✺] Error: {e}", exc_info=True) + finally: + await oracle_service.stop() + await event_bus.close() + logger.info("[✺] Shutdown complete") + + +if __name__ == "__main__": + asyncio.run(main())