diff --git a/services/memory/Dockerfile b/services/memory/Dockerfile new file mode 100644 index 0000000..0346121 --- /dev/null +++ b/services/memory/Dockerfile @@ -0,0 +1,28 @@ +FROM python:3.11-slim + +# Set work directory +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements and install Python dependencies +COPY requirements.txt ./ +RUN pip install --no-cache-dir -r requirements.txt + +# Install base NATS dependency +RUN pip install --no-cache-dir nats-py>=2.6.0 + +# Service code will be mounted via ConfigMap at /app/services/memory + +# Create non-root user +RUN useradd -m -u 1000 service && chown -R service:service /app +USER service + +# Expose port (if needed) +EXPOSE 8000 + +# Run the service (code mounted from ConfigMap) +CMD ["python", "-m", "services.memory.memory_service"] \ No newline at end of file diff --git a/services/memory/__init__.py b/services/memory/__init__.py new file mode 100644 index 0000000..76022d8 --- /dev/null +++ b/services/memory/__init__.py @@ -0,0 +1 @@ +# Memory service diff --git a/services/memory/build-image.sh b/services/memory/build-image.sh new file mode 100755 index 0000000..ee3c4a7 --- /dev/null +++ b/services/memory/build-image.sh @@ -0,0 +1,6 @@ +#!/bin/bash +# Build memory service image (arm64) +set -e + +VERSION=${1:-"latest"} +/home/alex/lyra/scripts/build-service.sh memory "$VERSION" \ No newline at end of file diff --git a/services/memory/deploy.sh b/services/memory/deploy.sh new file mode 100755 index 0000000..6c05ddc --- /dev/null +++ b/services/memory/deploy.sh @@ -0,0 +1,6 @@ +#!/bin/bash +# Deploy memory service +set -e + +VERSION=${1:-"latest"} +/home/alex/lyra/scripts/deploy-service.sh memory "$VERSION" \ No newline at end of file diff --git a/services/memory/handlers/__init__.py b/services/memory/handlers/__init__.py new file mode 100644 index 0000000..ef778f7 --- /dev/null +++ b/services/memory/handlers/__init__.py @@ -0,0 +1 @@ +# Memory handlers package diff --git a/services/memory/handlers/facts_handler.py b/services/memory/handlers/facts_handler.py new file mode 100644 index 0000000..1a54310 --- /dev/null +++ b/services/memory/handlers/facts_handler.py @@ -0,0 +1,47 @@ +""" +Facts handler. + +Handles requests to query factual memory. +""" +import json +from core.logger import setup_logger + +logger = setup_logger('facts_handler', service_name='memory_service') + + +class FactsHandler: + """Handles facts query requests""" + + def __init__(self, facts_ops): + self.facts_ops = facts_ops + + async def handle(self, msg) -> None: + """Handle facts requests - search factual memory""" + try: + payload = json.loads(msg.data.decode()) if msg.data else {} + + query = payload.get('query', '') + limit = payload.get('limit', 5) + category = payload.get('category') + identity_id = payload.get('identity_id') + + logger.debug(f"[μ] Facts request: query='{query}', category={category}, limit={limit}") + + facts = self.facts_ops.query( + query=query, + limit=limit, + category=category, + identity_id=identity_id + ) + + response = { + "status": "success", + "facts": facts, + "count": len(facts) + } + await msg.respond(json.dumps(response).encode()) + + except Exception as e: + logger.exception(f"[μ] Failed to retrieve facts: {e}") + error_response = {"status": "error", "error": str(e)} + await msg.respond(json.dumps(error_response).encode()) diff --git a/services/memory/handlers/long_memory_handler.py b/services/memory/handlers/long_memory_handler.py new file mode 100644 index 0000000..fefe72c --- /dev/null +++ b/services/memory/handlers/long_memory_handler.py @@ -0,0 +1,49 @@ +""" +Long-term memory handler. + +Handles requests to query long-term summarized memories. +""" +import json +from core.logger import setup_logger + +logger = setup_logger('long_memory_handler', service_name='memory_service') + + +class LongMemoryHandler: + """Handles long-term memory query requests""" + + def __init__(self, long_term_ops): + self.long_term_ops = long_term_ops + + async def handle(self, msg) -> None: + """Handle long_memory requests - semantic search in long-term summaries""" + try: + payload = json.loads(msg.data.decode()) if msg.data else {} + + query = payload.get('query') + limit = payload.get('limit', 5) + identity_id = payload.get('identity_id') + min_summary_level = payload.get('min_summary_level') + max_summary_level = payload.get('max_summary_level') + + logger.debug(f"[μ] Long memory request: query='{query}', limit={limit}") + + memories = self.long_term_ops.query( + query=query, + limit=limit, + identity_id=identity_id, + min_summary_level=min_summary_level, + max_summary_level=max_summary_level + ) + + response = { + "status": "success", + "memories": memories, + "count": len(memories) + } + await msg.respond(json.dumps(response).encode()) + + except Exception as e: + logger.exception(f"[μ] Failed to retrieve long-term memories: {e}") + error_response = {"status": "error", "error": str(e)} + await msg.respond(json.dumps(error_response).encode()) diff --git a/services/memory/handlers/reset_handler.py b/services/memory/handlers/reset_handler.py new file mode 100644 index 0000000..1574318 --- /dev/null +++ b/services/memory/handlers/reset_handler.py @@ -0,0 +1,77 @@ +""" +Memory reset handler. + +Handles requests to clear all memory layers. +""" +import json +from core.logger import setup_logger + +logger = setup_logger('reset_handler', service_name='memory_service') + + +class ResetHandler: + """Handles memory reset/clear requests""" + + def __init__(self, sqlite_store, chroma_store): + self.sqlite_store = sqlite_store + self.chroma_store = chroma_store + + async def handle(self, msg) -> None: + """Handle vi.memory.debug.reset requests - clears all three-layer memory""" + try: + logger.warning("[μ] Memory reset requested - clearing all three-layer memory contents") + + conn = self.sqlite_store.get_connection() + cursor = conn.cursor() + + # Clear short-term memory + cursor.execute("DELETE FROM short_term_memory") + deleted_short_term = cursor.rowcount + + # Clear all identities + cursor.execute("DELETE FROM identities") + deleted_identities = cursor.rowcount + + # Reset sequences + cursor.execute("DELETE FROM sqlite_sequence WHERE name IN ('short_term_memory', 'identities')") + conn.commit() + + # Clear ChromaDB collections + deleted_long_term = 0 + deleted_facts = 0 + + long_term_collection = self.chroma_store.get_long_term_collection() + facts_collection = self.chroma_store.get_facts_collection() + + if long_term_collection: + deleted_long_term = long_term_collection.count() + all_ids = long_term_collection.get()['ids'] + if all_ids: + long_term_collection.delete(ids=all_ids) + + if facts_collection: + deleted_facts = facts_collection.count() + all_ids = facts_collection.get()['ids'] + if all_ids: + facts_collection.delete(ids=all_ids) + + logger.warning( + f"[μ] Memory reset completed: {deleted_short_term} short-term, " + f"{deleted_long_term} long-term, {deleted_facts} facts, " + f"{deleted_identities} identities cleared" + ) + + response = { + "status": "success", + "deleted_short_term": deleted_short_term, + "deleted_long_term": deleted_long_term, + "deleted_facts": deleted_facts, + "deleted_identities": deleted_identities, + "message": f"Cleared {deleted_short_term} short-term memories, {deleted_long_term} long-term summaries, {deleted_facts} facts, and {deleted_identities} identities" + } + await msg.respond(json.dumps(response).encode()) + + except Exception as e: + logger.exception(f"[μ] Failed to reset memory: {e}") + error_response = {"status": "error", "error": str(e)} + await msg.respond(json.dumps(error_response).encode()) diff --git a/services/memory/handlers/save_fact_handler.py b/services/memory/handlers/save_fact_handler.py new file mode 100644 index 0000000..86a3cc7 --- /dev/null +++ b/services/memory/handlers/save_fact_handler.py @@ -0,0 +1,56 @@ +""" +Save fact handler. + +Handles requests to save new facts to factual memory. +""" +import json +from core.logger import setup_logger + +logger = setup_logger('save_fact_handler', service_name='memory_service') + + +class SaveFactHandler: + """Handles save fact requests""" + + def __init__(self, facts_ops): + self.facts_ops = facts_ops + + async def handle(self, msg) -> None: + """Handle save_fact requests - store new fact""" + try: + payload = json.loads(msg.data.decode()) + + content = payload.get('content') + if not content: + raise ValueError("content is required") + + category = payload.get('category', 'general') + identities = payload.get('identities', []) + mutable = payload.get('mutable', True) + metadata = payload.get('metadata', {}) + + step_exec_id = metadata.get('step_exec_id', 'unknown') + + logger.info(f"[μ] [{step_exec_id}] Saving fact: category={category}, content='{content[:50]}...'") + + fact_id = self.facts_ops.create( + content=content, + category=category, + identities=identities, + mutable=mutable, + metadata=metadata + ) + + logger.info(f"[μ] [{step_exec_id}] ✅ Created fact {fact_id[:8]}...: category={category}, content='{content[:50]}...'") + + response = { + "status": "success", + "fact_id": fact_id, + "message": "Fact saved successfully" + } + await msg.respond(json.dumps(response).encode()) + + except Exception as e: + logger.exception(f"[μ] Failed to save fact: {e}") + error_response = {"status": "error", "error": str(e)} + await msg.respond(json.dumps(error_response).encode()) diff --git a/services/memory/handlers/search_handler.py b/services/memory/handlers/search_handler.py new file mode 100644 index 0000000..611ac63 --- /dev/null +++ b/services/memory/handlers/search_handler.py @@ -0,0 +1,51 @@ +""" +Memory search handler. + +Handles legacy search requests (backward compatibility). +""" +import json +from typing import Dict, Any +from core.logger import setup_logger + +logger = setup_logger('search_handler', service_name='memory_service') + + +class SearchHandler: + """Handles legacy memory search requests""" + + def __init__(self, short_term_ops): + self.short_term_ops = short_term_ops + + async def handle(self, msg) -> None: + """Handle vi.memory.search requests - backward compatibility""" + try: + payload = json.loads(msg.data.decode()) + logger.debug("[μ] Legacy search request - redirecting to short_memory") + + # Map legacy parameters to new system + limit = payload.get('limit', 10) + identity_id = None + if payload.get('identities'): + identity_id = payload['identities'][0] + interaction_id = payload.get('interaction_id') + + # Query short-term memory + results = self.short_term_ops.query( + limit=limit, + offset=0, + identity_id=identity_id, + interaction_id=interaction_id + ) + + response = { + "results": results, + "count": len(results), + "source": "short_term", + "note": "Legacy search API redirected to short-term memory. Use short_memory(), long_memory(), or facts() for specific queries." + } + await msg.respond(json.dumps(response).encode()) + + except Exception as e: + logger.exception(f"[μ] Failed to search memories: {e}") + error_response = {"results": [], "count": 0, "error": str(e)} + await msg.respond(json.dumps(error_response).encode()) diff --git a/services/memory/handlers/short_memory_handler.py b/services/memory/handlers/short_memory_handler.py new file mode 100644 index 0000000..ba982d4 --- /dev/null +++ b/services/memory/handlers/short_memory_handler.py @@ -0,0 +1,47 @@ +""" +Short-term memory handler. + +Handles requests to query short-term literal memories. +""" +import json +from core.logger import setup_logger + +logger = setup_logger('short_memory_handler', service_name='memory_service') + + +class ShortMemoryHandler: + """Handles short-term memory query requests""" + + def __init__(self, short_term_ops): + self.short_term_ops = short_term_ops + + async def handle(self, msg) -> None: + """Handle short_memory requests - get recent literal memories""" + try: + payload = json.loads(msg.data.decode()) if msg.data else {} + + limit = payload.get('limit', 10) + offset = payload.get('offset', 0) + identity_id = payload.get('identity_id') + interaction_id = payload.get('interaction_id') + + logger.debug(f"[μ] Short memory request: limit={limit}, offset={offset}") + + memories = self.short_term_ops.query( + limit=limit, + offset=offset, + identity_id=identity_id, + interaction_id=interaction_id + ) + + response = { + "status": "success", + "memories": memories, + "count": len(memories) + } + await msg.respond(json.dumps(response).encode()) + + except Exception as e: + logger.exception(f"[μ] Failed to retrieve short-term memories: {e}") + error_response = {"status": "error", "error": str(e)} + await msg.respond(json.dumps(error_response).encode()) diff --git a/services/memory/handlers/store_handler.py b/services/memory/handlers/store_handler.py new file mode 100644 index 0000000..bfc6ba9 --- /dev/null +++ b/services/memory/handlers/store_handler.py @@ -0,0 +1,90 @@ +""" +Memory store handler. + +Handles requests to store new memories in short-term memory. +""" +import json +from uuid import uuid4 +from datetime import datetime +from typing import Dict, Any +from core.logger import setup_logger + +logger = setup_logger('store_handler', service_name='memory_service') + + +class StoreHandler: + """Handles memory store requests""" + + def __init__(self, sqlite_store): + """ + Initialize store handler. + + Args: + sqlite_store: SQLiteStore instance + """ + self.sqlite_store = sqlite_store + + async def handle(self, msg) -> None: + """ + Handle vi.memory.store requests - routes to short-term memory. + + Args: + msg: NATS message with request payload + """ + try: + # Parse request payload + payload = json.loads(msg.data.decode()) + + # Extract required fields + content = payload.get('content') + if not content: + logger.warning("[μ] Memory store request missing content") + error_response = { + "status": "error", + "error": "Missing required field: content" + } + await msg.respond(json.dumps(error_response).encode()) + return + + # Extract optional fields + identities = payload.get('identities', []) + interaction_id = payload.get('interaction_id') + modality = payload.get('modality', 'dialogue') + metadata = payload.get('metadata', {}) + + # Store in simplified short-term memory table + memory_id = str(uuid4()) + timestamp = datetime.utcnow().isoformat() + + conn = self.sqlite_store.get_connection() + cursor = conn.cursor() + cursor.execute(""" + INSERT INTO short_term_memory (id, timestamp, content, identities, interaction_id, modality, metadata) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, ( + memory_id, + timestamp, + content, + json.dumps(identities) if identities else None, + interaction_id, + modality, + json.dumps(metadata) if metadata else None + )) + + conn.commit() + logger.info(f"[μ] Stored short-term memory: '{content[:60]}...' identities={identities}") + + # Send response using NATS request-reply + response = { + "memory_id": memory_id, + "status": "stored" + } + await msg.respond(json.dumps(response).encode()) + + except Exception as e: + logger.exception(f"[μ] Failed to store memory: {e}") + error_response = { + "status": "error", + "error": str(e) + } + await msg.respond(json.dumps(error_response).encode()) diff --git a/services/memory/handlers/update_fact_handler.py b/services/memory/handlers/update_fact_handler.py new file mode 100644 index 0000000..33d620f --- /dev/null +++ b/services/memory/handlers/update_fact_handler.py @@ -0,0 +1,58 @@ +""" +Update fact handler. + +Handles requests to update existing facts. +""" +import json +from core.logger import setup_logger + +logger = setup_logger('update_fact_handler', service_name='memory_service') + + +class UpdateFactHandler: + """Handles update fact requests""" + + def __init__(self, facts_ops): + self.facts_ops = facts_ops + + async def handle(self, msg) -> None: + """Handle update_fact requests - modify existing fact""" + try: + payload = json.loads(msg.data.decode()) + + fact_id = payload.get('fact_id') + new_content = payload.get('new_content') + identity_id = payload.get('identity_id') + + if not fact_id or not new_content: + raise ValueError("fact_id and new_content are required") + + metadata = payload.get('metadata', {}) + + logger.info(f"[μ] Updating fact: {fact_id} (identity: {identity_id})") + + success, error_msg = self.facts_ops.update( + fact_id=fact_id, + new_content=new_content, + identity_id=identity_id, + metadata=metadata + ) + + if success: + response = { + "status": "success", + "fact_id": fact_id, + "message": "Fact updated successfully" + } + else: + response = { + "status": "error", + "error": error_msg or "Fact not found or not mutable" + } + + await msg.respond(json.dumps(response).encode()) + + except Exception as e: + logger.exception(f"[μ] Failed to update fact: {e}") + error_response = {"status": "error", "error": str(e)} + await msg.respond(json.dumps(error_response).encode()) diff --git a/services/memory/memory_service.py b/services/memory/memory_service.py new file mode 100644 index 0000000..0a2e357 --- /dev/null +++ b/services/memory/memory_service.py @@ -0,0 +1,244 @@ +""" +Memory Service - Three-layer memory system with modular architecture. + +Refactored into storage backends, operations, and handlers for maintainability. +""" +import asyncio +from pathlib import Path +from typing import Dict, Any + +from core.config import SHORT_TERM_DB +from core.logger import setup_logger +from core.nats_event_bus import nats_bus as event_bus +from core.base_service import BaseService +from core.service_registry import ServiceManifest + +# Import refactored components +from .storage.sqlite_store import SQLiteStore +from .storage.chroma_store import ChromaStore +from .storage.migrations import archive_old_database +from .operations.short_term_ops import ShortTermOperations +from .operations.long_term_ops import LongTermOperations +from .operations.facts_ops import FactsOperations +from .handlers.store_handler import StoreHandler +from .handlers.search_handler import SearchHandler +from .handlers.reset_handler import ResetHandler +from .handlers.short_memory_handler import ShortMemoryHandler +from .handlers.long_memory_handler import LongMemoryHandler +from .handlers.facts_handler import FactsHandler +from .handlers.save_fact_handler import SaveFactHandler +from .handlers.update_fact_handler import UpdateFactHandler +from .utils.embeddings import get_model + +logger = setup_logger('memory_service', service_name='memory_service') + + +class MemoryService(BaseService): + """Memory service with three-layer architecture and modular design""" + + def __init__(self): + super().__init__('memory') + + # Initialize storage backends + self.sqlite_store = SQLiteStore() + self.chroma_store = ChromaStore() + + # Initialize operations (will be set up after storage connects) + self.short_term_ops = None + self.long_term_ops = None + self.facts_ops = None + + # Initialize handlers (will be set up after operations) + self.store_handler = None + self.search_handler = None + self.reset_handler = None + self.short_memory_handler = None + self.long_memory_handler = None + self.facts_handler = None + self.save_fact_handler = None + self.update_fact_handler = None + + logger.info("[μ] Memory Service initialized with modular architecture") + + def get_service_manifest(self) -> ServiceManifest: + """Return service manifest with operations and metadata""" + operations = [ + # Legacy operations (backward compatibility) + self.create_service_operation( + "store", + "Store a memory (routes to short-term)", + timeout_ms=5000 + ), + self.create_service_operation( + "search", + "Search memories (legacy, redirects to short_memory)", + timeout_ms=3000 + ), + self.create_service_operation( + "reset", + "Reset/clear memory database for debugging", + timeout_ms=10000 + ), + # New three-layer memory operations + self.create_service_operation( + "short_memory", + "Get recent literal memories with offset support", + timeout_ms=3000 + ), + self.create_service_operation( + "long_memory", + "Semantic search in long-term summarized memories", + timeout_ms=5000 + ), + self.create_service_operation( + "facts", + "Search factual memory by category or semantic query", + timeout_ms=3000 + ), + self.create_service_operation( + "save_fact", + "Store a new fact in factual memory", + timeout_ms=2000 + ), + self.create_service_operation( + "update_fact", + "Update an existing fact (if mutable)", + timeout_ms=2000 + ) + ] + + return ServiceManifest( + service_id=self.service_id, + name="Memory Service", + description="Three-layer memory system: short-term (literal), long-term (summarized), factual (exact)", + version="3.0.0", + operations=operations, + dependencies=[], + health_check_topic=f"vi.services.{self.service_id}.health", + heartbeat_interval=30, + metadata={ + "storage_type": "hybrid", + "short_term_storage": "sqlite", + "long_term_storage": "chromadb", + "facts_storage": "chromadb", + "embedding_model": "all-MiniLM-L6-v2", + "vector_search": True, + "urgency": 0.8 + } + ) + + async def initialize_service(self): + """Initialize service-specific resources and register handlers""" + # Archive old database if it exists (one-time migration) + archive_old_database(Path(SHORT_TERM_DB)) + + # Initialize storage backends + self.sqlite_store.connect() + self.chroma_store.connect() + + # Initialize embedding model + get_model() # Loads model on first call + + # Initialize operations + self.short_term_ops = ShortTermOperations(self.sqlite_store) + self.long_term_ops = LongTermOperations(self.chroma_store) + self.facts_ops = FactsOperations(self.chroma_store) + + # Initialize handlers + self.store_handler = StoreHandler(self.sqlite_store) + self.search_handler = SearchHandler(self.short_term_ops) + self.reset_handler = ResetHandler(self.sqlite_store, self.chroma_store) + self.short_memory_handler = ShortMemoryHandler(self.short_term_ops) + self.long_memory_handler = LongMemoryHandler(self.long_term_ops) + self.facts_handler = FactsHandler(self.facts_ops) + self.save_fact_handler = SaveFactHandler(self.facts_ops) + self.update_fact_handler = UpdateFactHandler(self.facts_ops) + + # Register handlers using new topic patterns + await self.register_handler("store", self.store_handler.handle) + await self.register_handler("search", self.search_handler.handle) + await self.register_handler("reset", self.reset_handler.handle) + await self.register_handler("short_memory", self.short_memory_handler.handle) + await self.register_handler("long_memory", self.long_memory_handler.handle) + await self.register_handler("facts", self.facts_handler.handle) + await self.register_handler("save_fact", self.save_fact_handler.handle) + await self.register_handler("update_fact", self.update_fact_handler.handle) + + # Also register legacy topic handlers for backward compatibility + await self.event_bus.on("vi.memory.store", self.store_handler.handle) + await self.event_bus.on("vi.memory.search", self.search_handler.handle) + await self.event_bus.on("vi.memory.debug.reset", self.reset_handler.handle) + + self.logger.info("[μ] Memory Service initialized with three-layer memory system") + + async def cleanup_service(self): + """Cleanup service-specific resources""" + # Unregister event handlers + await self.event_bus.off("vi.memory.store") + await self.event_bus.off("vi.memory.search") + await self.event_bus.off("vi.memory.debug.reset") + + # Close storage connections + if self.sqlite_store: + self.sqlite_store.close() + + self.logger.info("[μ] Memory Service cleanup completed") + + async def perform_health_check(self) -> Dict[str, Any]: + """Perform service-specific health check""" + health_data = { + 'healthy': True, + 'checks': { + 'running': self._running, + 'event_bus': self.event_bus is not None, + 'database_connected': self.sqlite_store.conn is not None, + 'embedding_model': get_model() is not None + } + } + + # Check database connectivity + try: + if self.sqlite_store.conn: + cursor = self.sqlite_store.conn.cursor() + cursor.execute("SELECT COUNT(*) FROM short_term_memory") + short_term_count = cursor.fetchone()[0] + health_data['checks']['short_term_count'] = short_term_count + health_data['checks']['long_term_count'] = self.chroma_store.get_long_term_collection().count() + health_data['checks']['facts_count'] = self.chroma_store.get_facts_collection().count() + health_data['checks']['database_accessible'] = True + else: + health_data['checks']['database_accessible'] = False + health_data['healthy'] = False + except Exception as e: + health_data['checks']['database_accessible'] = False + health_data['checks']['database_error'] = str(e) + health_data['healthy'] = False + + return health_data + + +async def main(): + """Main entry point for memory service""" + memory_service = MemoryService() + + try: + await event_bus.connect() + await memory_service.start(event_bus) + + logger.info("[μ] Memory service running. Press Ctrl+C to stop.") + + # Keep running + while True: + await asyncio.sleep(1) + + except KeyboardInterrupt: + logger.info("[μ] Shutdown requested") + except Exception as e: + logger.exception(f"[μ] Unexpected error: {e}") + finally: + await memory_service.stop() + await event_bus.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/services/memory/memory_service.py.backup b/services/memory/memory_service.py.backup new file mode 100644 index 0000000..297e0f4 --- /dev/null +++ b/services/memory/memory_service.py.backup @@ -0,0 +1,978 @@ +import asyncio +import json +import sqlite3 +import numpy as np +import shutil +from datetime import datetime +from uuid import uuid4 +from pathlib import Path +from sentence_transformers import SentenceTransformer +from typing import List, Dict, Any, Optional +import chromadb +from chromadb.config import Settings + +from core.config import SHORT_TERM_DB, config +from core.logger import setup_logger +from core.nats_event_bus import nats_bus as event_bus +from core.events import SymbolicEvent +from core.event_utils import query_mood, request_response +from core.base_service import BaseService +from core.service_registry import ServiceManifest, ServiceOperation + +logger = setup_logger('memory_service', service_name='memory_service') + +# Initialize sentence transformer model +model = SentenceTransformer('all-MiniLM-L6-v2') + + +def serialize_embedding(vector: np.ndarray) -> bytes: + """Convert numpy array to bytes for database storage""" + return vector.astype(np.float32).tobytes() + + +def deserialize_embedding(blob: bytes) -> np.ndarray: + """Convert bytes back to numpy array""" + return np.frombuffer(blob, dtype=np.float32) + + +def generate_embedding(text: str) -> np.ndarray: + """Generate semantic embedding for text""" + return np.array(model.encode(text, normalize_embeddings=True)) + + +def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: + """Calculate cosine similarity between two vectors""" + if np.linalg.norm(a) == 0 or np.linalg.norm(b) == 0: + return 0.0 + return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))) + + +# Note: SymbolicMemoryScorer removed - was only used by archived recall_memories() +# ChromaDB now handles semantic similarity scoring internally for long-term and facts + + +class MemoryService(BaseService): + def __init__(self): + super().__init__('memory') + self.sqlite_conn = None + self.chroma_client = None + self.long_term_collection = None + self.facts_collection = None + + def get_service_manifest(self) -> ServiceManifest: + """Return service manifest with operations and metadata""" + operations = [ + # Legacy operations (backward compatibility) + self.create_service_operation( + "store", + "Store a memory (routes to short-term)", + timeout_ms=5000 + ), + self.create_service_operation( + "search", + "Search memories (legacy, redirects to short_memory)", + timeout_ms=3000 + ), + self.create_service_operation( + "reset", + "Reset/clear memory database for debugging", + timeout_ms=10000 + ), + # New three-layer memory operations + self.create_service_operation( + "short_memory", + "Get recent literal memories with offset support", + timeout_ms=3000 + ), + self.create_service_operation( + "long_memory", + "Semantic search in long-term summarized memories", + timeout_ms=5000 + ), + self.create_service_operation( + "facts", + "Search factual memory by category or semantic query", + timeout_ms=3000 + ), + self.create_service_operation( + "save_fact", + "Store a new fact in factual memory", + timeout_ms=2000 + ), + self.create_service_operation( + "update_fact", + "Update an existing fact (if mutable)", + timeout_ms=2000 + ) + ] + + return ServiceManifest( + service_id=self.service_id, + name="Memory Service", + description="Three-layer memory system: short-term (literal), long-term (summarized), factual (exact)", + version="3.0.0", + operations=operations, + dependencies=[], # Memory service has no dependencies + health_check_topic=f"lyra.services.{self.service_id}.health", + heartbeat_interval=30, + metadata={ + "storage_type": "hybrid", + "short_term_storage": "sqlite", + "long_term_storage": "chromadb", + "facts_storage": "chromadb", + "embedding_model": "all-MiniLM-L6-v2", + "vector_search": True, + "urgency": 0.8 + } + ) + + async def initialize_service(self): + """Initialize service-specific resources and register handlers""" + # Archive old database if it exists (one-time migration) + self._archive_old_database() + + # Initialize short-term SQLite database + self.sqlite_conn = sqlite3.connect(str(SHORT_TERM_DB)) + self._init_short_term_sqlite() + + # Initialize ChromaDB for long-term and factual memory + self._init_chromadb() + + # Register handlers using new topic patterns + await self.register_handler("store", self.handle_memory_store) + await self.register_handler("search", self.handle_memory_search) + await self.register_handler("reset", self.handle_memory_reset) + await self.register_handler("short_memory", self.handle_short_memory) + await self.register_handler("long_memory", self.handle_long_memory) + await self.register_handler("facts", self.handle_facts) + await self.register_handler("save_fact", self.handle_save_fact) + await self.register_handler("update_fact", self.handle_update_fact) + + # Also register legacy topic handlers for backward compatibility + await self.event_bus.on("lyra.memory.store", self.handle_memory_store) + await self.event_bus.on("lyra.memory.search", self.handle_memory_search) + await self.event_bus.on("lyra.memory.debug.reset", self.handle_memory_reset) + + self.logger.info("[μ] Memory Service initialized with three-layer memory system") + + async def cleanup_service(self): + """Cleanup service-specific resources""" + # Unregister event handlers + await self.event_bus.off("lyra.memory.store") + await self.event_bus.off("lyra.memory.search") + await self.event_bus.off("lyra.memory.debug.reset") + + # Close database connection + if self.sqlite_conn: + self.sqlite_conn.close() + + self.logger.info("[μ] Memory Service cleanup completed") + + async def perform_health_check(self) -> Dict[str, Any]: + """Perform service-specific health check""" + health_data = { + 'healthy': True, + 'checks': { + 'running': self._running, + 'event_bus': self.event_bus is not None, + 'database_connected': self.sqlite_conn is not None, + 'embedding_model': model is not None + } + } + + # Check database connectivity + try: + if self.sqlite_conn: + cursor = self.sqlite_conn.cursor() + cursor.execute("SELECT COUNT(*) FROM short_term_memory") + short_term_count = cursor.fetchone()[0] + health_data['checks']['short_term_count'] = short_term_count + health_data['checks']['long_term_count'] = self.long_term_collection.count() if self.long_term_collection else 0 + health_data['checks']['facts_count'] = self.facts_collection.count() if self.facts_collection else 0 + health_data['checks']['database_accessible'] = True + else: + health_data['checks']['database_accessible'] = False + health_data['healthy'] = False + except Exception as e: + health_data['checks']['database_accessible'] = False + health_data['checks']['database_error'] = str(e) + health_data['healthy'] = False + + return health_data + + def _archive_old_database(self): + """Archive old database if it exists (one-time migration)""" + db_path = Path(SHORT_TERM_DB) + if db_path.exists(): + # Check if it's the old schema by trying to connect and inspect + try: + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='memory'") + result = cursor.fetchone() + conn.close() + + if result: + # Old database exists, archive it + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + archive_path = db_path.parent / f"{db_path.stem}_archive_{timestamp}{db_path.suffix}" + shutil.move(str(db_path), str(archive_path)) + logger.info(f"[μ] Archived old database to {archive_path}") + except Exception as e: + logger.warning(f"[μ] Could not check/archive old database: {e}") + + def _init_short_term_sqlite(self): + """Initialize simplified short-term SQLite database""" + cursor = self.sqlite_conn.cursor() + + # Simplified short-term memory table (no embeddings, fast queries) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS short_term_memory ( + id TEXT PRIMARY KEY, + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, + content TEXT NOT NULL, + identities TEXT, + interaction_id TEXT, + modality TEXT DEFAULT 'dialogue', + metadata TEXT + ) + """) + + # Index for fast chronological queries + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_timestamp + ON short_term_memory(timestamp DESC) + """) + + # Keep identities table (still useful for all layers) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS identities ( + id TEXT PRIMARY KEY, + display_name TEXT, + role TEXT, + intimacy REAL DEFAULT 0.0, + last_seen DATETIME, + last_spoken DATETIME, + metadata TEXT + ) + """) + + self.sqlite_conn.commit() + logger.info("[μ] Short-term SQLite database initialized") + + def _init_chromadb(self): + """Initialize ChromaDB client and collections""" + try: + # Initialize persistent ChromaDB client + chroma_path = Path(SHORT_TERM_DB).parent / "chroma_db" + chroma_path.mkdir(parents=True, exist_ok=True) + + self.chroma_client = chromadb.PersistentClient( + path=str(chroma_path), + settings=Settings(anonymized_telemetry=False) + ) + + # Create or get long-term memories collection + self.long_term_collection = self.chroma_client.get_or_create_collection( + name="long_term_memories", + metadata={"description": "Summarized conversation histories"} + ) + + # Create or get facts collection + self.facts_collection = self.chroma_client.get_or_create_collection( + name="facts", + metadata={"description": "Exact factual knowledge"} + ) + + logger.info(f"[μ] ChromaDB initialized at {chroma_path}") + logger.info(f"[μ] Long-term collection: {self.long_term_collection.count()} entries") + logger.info(f"[μ] Facts collection: {self.facts_collection.count()} entries") + + except Exception as e: + logger.error(f"[μ] Failed to initialize ChromaDB: {e}") + raise + + def _query_short_term( + self, + limit: int = 10, + offset: int = 0, + identity_id: Optional[str] = None, + interaction_id: Optional[str] = None + ) -> List[Dict[str, Any]]: + """Query short-term memory from SQLite with chronological ordering""" + cursor = self.sqlite_conn.cursor() + + conditions = [] + params = [] + + if identity_id: + conditions.append("identities LIKE ?") + params.append(f"%{identity_id}%") + + if interaction_id: + conditions.append("interaction_id = ?") + params.append(interaction_id) + + where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else "" + + query = f""" + SELECT id, timestamp, content, identities, interaction_id, modality, metadata + FROM short_term_memory + {where_clause} + ORDER BY timestamp DESC + LIMIT ? OFFSET ? + """ + + params.extend([limit, offset]) + cursor.execute(query, params) + + memories = [] + for row in cursor.fetchall(): + mem_id, timestamp, content, identities_str, ixn_id, modality, metadata_str = row + + memories.append({ + "id": mem_id, + "timestamp": timestamp, + "content": content, + "identities": json.loads(identities_str) if identities_str else [], + "interaction_id": ixn_id, + "modality": modality, + "metadata": json.loads(metadata_str) if metadata_str else {}, + "source": "short_term" + }) + + return memories + + def _query_long_term( + self, + query: Optional[str] = None, + limit: int = 5, + identity_id: Optional[str] = None, + min_summary_level: Optional[int] = None, + max_summary_level: Optional[int] = None + ) -> List[Dict[str, Any]]: + """Query long-term memory from ChromaDB (summarized memories)""" + try: + # Build metadata filters + where_filters = {} + if identity_id: + where_filters["identity_id"] = identity_id + if min_summary_level is not None: + where_filters["summary_level"] = {"$gte": min_summary_level} + if max_summary_level is not None: + if "summary_level" in where_filters: + where_filters["summary_level"]["$lte"] = max_summary_level + else: + where_filters["summary_level"] = {"$lte": max_summary_level} + + # Query ChromaDB + if query: + # Semantic search with query text + results = self.long_term_collection.query( + query_texts=[query], + n_results=limit, + where=where_filters if where_filters else None + ) + else: + # Random sample - get all and sample + results = self.long_term_collection.get( + limit=limit, + where=where_filters if where_filters else None + ) + + # Format results + memories = [] + if query and results['ids']: + # Query results + for i, doc_id in enumerate(results['ids'][0]): + memories.append({ + "id": doc_id, + "content": results['documents'][0][i], + "metadata": results['metadatas'][0][i] if results['metadatas'] else {}, + "distance": results['distances'][0][i] if results['distances'] else None, + "source": "long_term" + }) + elif not query and results['ids']: + # Get results (no query) + for i, doc_id in enumerate(results['ids']): + memories.append({ + "id": doc_id, + "content": results['documents'][i], + "metadata": results['metadatas'][i] if results['metadatas'] else {}, + "source": "long_term" + }) + + return memories + + except Exception as e: + logger.error(f"[μ] Failed to query long-term memory: {e}") + return [] + + def _query_facts( + self, + query: str = '', + limit: int = 5, + category: Optional[str] = None, + identity_id: Optional[str] = None + ) -> List[Dict[str, Any]]: + """Query facts from ChromaDB with optional category filtering""" + try: + # Build metadata filters + where_filters = {} + if category: + where_filters["category"] = category + if identity_id: + where_filters["identity_id"] = identity_id + + # Query ChromaDB + if query: + # Semantic search with query text + results = self.facts_collection.query( + query_texts=[query], + n_results=limit, + where=where_filters if where_filters else None + ) + else: + # Get all facts matching filters + results = self.facts_collection.get( + limit=limit, + where=where_filters if where_filters else None + ) + + # Format results + facts = [] + if query and results['ids']: + # Query results + for i, doc_id in enumerate(results['ids'][0]): + metadata = results['metadatas'][0][i] if results['metadatas'] else {} + # Parse identities from JSON string + identities_str = metadata.get('identities', '[]') + identities = json.loads(identities_str) if isinstance(identities_str, str) else identities_str + facts.append({ + "id": doc_id, + "content": results['documents'][0][i], + "category": metadata.get('category', 'general'), + "mutable": metadata.get('mutable', True), + "identities": identities, + "metadata": metadata, + "distance": results['distances'][0][i] if results['distances'] else None, + "source": "facts" + }) + elif not query and results['ids']: + # Get results (no query) + for i, doc_id in enumerate(results['ids']): + metadata = results['metadatas'][i] if results['metadatas'] else {} + # Parse identities from JSON string + identities_str = metadata.get('identities', '[]') + identities = json.loads(identities_str) if isinstance(identities_str, str) else identities_str + facts.append({ + "id": doc_id, + "content": results['documents'][i], + "category": metadata.get('category', 'general'), + "mutable": metadata.get('mutable', True), + "identities": identities, + "metadata": metadata, + "source": "facts" + }) + + return facts + + except Exception as e: + logger.error(f"[μ] Failed to query facts: {e}") + return [] + + def _create_fact( + self, + content: str, + category: str = 'general', + identities: List[str] = None, + mutable: bool = True, + metadata: Dict[str, Any] = None + ) -> str: + """Create a new fact in ChromaDB facts collection""" + fact_id = str(uuid4()) + identities = identities or [] + metadata = metadata or {} + + # Prepare metadata (ChromaDB only accepts scalar types, not lists) + fact_metadata = { + "category": category, + "mutable": mutable, + "identities": json.dumps(identities), # Convert list to JSON string + "created_at": datetime.utcnow().isoformat(), + **metadata + } + + # Add to ChromaDB (will automatically generate embedding) + self.facts_collection.add( + ids=[fact_id], + documents=[content], + metadatas=[fact_metadata] + ) + + logger.info(f"[μ] Created fact {fact_id}: category={category}, content='{content[:60]}...'") + return fact_id + + def _update_fact( + self, + fact_id: str, + new_content: str, + identity_id: str = None, + metadata: Dict[str, Any] = None + ) -> tuple[bool, str]: + """Update an existing fact in ChromaDB if it's mutable and owned by identity + + Returns: + (success, error_message) - error_message is empty string if successful + """ + try: + # Get the fact to check if it exists and is mutable + result = self.facts_collection.get(ids=[fact_id]) + + if not result['ids']: + logger.warning(f"[μ] Fact {fact_id} not found") + return False, "Fact not found" + + fact_metadata = result['metadatas'][0] + + # Check if fact is mutable + if not fact_metadata.get('mutable', True): + logger.warning(f"[μ] Fact {fact_id} is not mutable") + return False, "Fact is not mutable" + + # Validate identity ownership if identity_id provided + if identity_id: + fact_identities = fact_metadata.get('identities', []) + if isinstance(fact_identities, str): + import json + try: + fact_identities = json.loads(fact_identities) + except: + fact_identities = [fact_identities] + + if identity_id not in fact_identities: + logger.warning(f"[μ] Identity {identity_id} does not own fact {fact_id}") + return False, f"Fact does not belong to identity {identity_id}" + + # Update metadata + updated_metadata = dict(fact_metadata) + updated_metadata['updated_at'] = datetime.utcnow().isoformat() + if metadata: + updated_metadata.update(metadata) + + # Update in ChromaDB + self.facts_collection.update( + ids=[fact_id], + documents=[new_content], + metadatas=[updated_metadata] + ) + + logger.info(f"[μ] Updated fact {fact_id}") + return True, "" + + except Exception as e: + logger.error(f"[μ] Failed to update fact {fact_id}: {e}") + return False, str(e) + + # Note: Old store_memory() and recall_memories() methods removed + # They referenced the archived 'memory' table schema + # New three-layer system uses: + # - short_term_memory table (SQLite) + # - long_term_memories collection (ChromaDB) + # - facts collection (ChromaDB) + + async def handle_memory_store(self, msg): + """Handle lyra.memory.store requests - routes to short-term memory""" + try: + # Parse request payload + payload = json.loads(msg.data.decode()) + + # Extract required fields + content = payload.get('content') + if not content: + logger.warning("[μ] Memory store request missing content") + error_response = { + "status": "error", + "error": "Missing required field: content" + } + await msg.respond(json.dumps(error_response).encode()) + return + + # Extract optional fields + identities = payload.get('identities', []) + interaction_id = payload.get('interaction_id') + modality = payload.get('modality', 'dialogue') + metadata = payload.get('metadata', {}) + + # Store in simplified short-term memory table + memory_id = str(uuid4()) + timestamp = datetime.utcnow().isoformat() + + cursor = self.sqlite_conn.cursor() + cursor.execute(""" + INSERT INTO short_term_memory (id, timestamp, content, identities, interaction_id, modality, metadata) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, ( + memory_id, + timestamp, + content, + json.dumps(identities) if identities else None, + interaction_id, + modality, + json.dumps(metadata) if metadata else None + )) + + self.sqlite_conn.commit() + logger.info(f"[μ] Stored short-term memory: '{content[:60]}...' identities={identities}") + + # Send response using NATS request-reply + response = { + "memory_id": memory_id, + "status": "stored" + } + await msg.respond(json.dumps(response).encode()) + + except Exception as e: + logger.exception(f"[μ] Failed to store memory: {e}") + error_response = { + "status": "error", + "error": str(e) + } + await msg.respond(json.dumps(error_response).encode()) + + async def handle_memory_search(self, msg): + """Handle lyra.memory.search requests - backward compatibility, redirects to short_memory""" + try: + # Parse request payload + payload = json.loads(msg.data.decode()) + + logger.debug("[μ] Legacy search request - redirecting to short_memory") + + # Extract search parameters and map to new system + limit = payload.get('limit', 10) + identity_id = None + if payload.get('identities'): + identity_id = payload['identities'][0] # Take first identity + interaction_id = payload.get('interaction_id') + + # Query short-term memory (most recent literal memories) + results = self._query_short_term( + limit=limit, + offset=0, + identity_id=identity_id, + interaction_id=interaction_id + ) + + # Send response using NATS request-reply + response = { + "results": results, + "count": len(results), + "source": "short_term", + "note": "Legacy search API redirected to short-term memory. Use short_memory(), long_memory(), or facts() for specific queries." + } + await msg.respond(json.dumps(response).encode()) + + except Exception as e: + logger.exception(f"[μ] Failed to search memories: {e}") + error_response = { + "results": [], + "count": 0, + "error": str(e) + } + await msg.respond(json.dumps(error_response).encode()) + + async def handle_memory_reset(self, msg): + """Handle lyra.memory.debug.reset requests - clears all three-layer memory""" + try: + # Parse request payload (though this endpoint typically doesn't need payload data) + payload = json.loads(msg.data.decode()) if msg.data else {} + + logger.warning("[μ] Memory reset requested - clearing all three-layer memory contents") + + cursor = self.sqlite_conn.cursor() + + # Clear short-term memory + cursor.execute("DELETE FROM short_term_memory") + deleted_short_term = cursor.rowcount + + # Clear all identities + cursor.execute("DELETE FROM identities") + deleted_identities = cursor.rowcount + + # Reset any auto-increment sequences + cursor.execute("DELETE FROM sqlite_sequence WHERE name IN ('short_term_memory', 'identities')") + + self.sqlite_conn.commit() + + # Clear ChromaDB collections + deleted_long_term = 0 + deleted_facts = 0 + + if self.long_term_collection: + deleted_long_term = self.long_term_collection.count() + # Delete all documents in long-term collection + all_ids = self.long_term_collection.get()['ids'] + if all_ids: + self.long_term_collection.delete(ids=all_ids) + + if self.facts_collection: + deleted_facts = self.facts_collection.count() + # Delete all documents in facts collection + all_ids = self.facts_collection.get()['ids'] + if all_ids: + self.facts_collection.delete(ids=all_ids) + + logger.warning( + f"[μ] Memory reset completed: {deleted_short_term} short-term, " + f"{deleted_long_term} long-term, {deleted_facts} facts, " + f"{deleted_identities} identities cleared" + ) + + # Send response using NATS request-reply + response = { + "status": "success", + "deleted_short_term": deleted_short_term, + "deleted_long_term": deleted_long_term, + "deleted_facts": deleted_facts, + "deleted_identities": deleted_identities, + "message": ( + f"Cleared {deleted_short_term} short-term memories, " + f"{deleted_long_term} long-term summaries, " + f"{deleted_facts} facts, and {deleted_identities} identities" + ) + } + await msg.respond(json.dumps(response).encode()) + + except Exception as e: + logger.exception(f"[μ] Failed to reset memory: {e}") + error_response = { + "status": "error", + "error": str(e) + } + await msg.respond(json.dumps(error_response).encode()) + + async def handle_short_memory(self, msg): + """Handle short_memory requests - get recent literal memories""" + try: + payload = json.loads(msg.data.decode()) if msg.data else {} + + limit = payload.get('limit', 10) + offset = payload.get('offset', 0) + identity_id = payload.get('identity_id') + interaction_id = payload.get('interaction_id') + + logger.debug(f"[μ] Short memory request: limit={limit}, offset={offset}") + + # Query short-term memory + memories = self._query_short_term( + limit=limit, + offset=offset, + identity_id=identity_id, + interaction_id=interaction_id + ) + + response = { + "status": "success", + "memories": memories, + "count": len(memories) + } + await msg.respond(json.dumps(response).encode()) + + except Exception as e: + logger.exception(f"[μ] Failed to retrieve short-term memories: {e}") + error_response = {"status": "error", "error": str(e)} + await msg.respond(json.dumps(error_response).encode()) + + async def handle_long_memory(self, msg): + """Handle long_memory requests - semantic search in long-term summaries""" + try: + payload = json.loads(msg.data.decode()) if msg.data else {} + + query = payload.get('query') # None = random sample + limit = payload.get('limit', 5) + identity_id = payload.get('identity_id') + min_summary_level = payload.get('min_summary_level') + max_summary_level = payload.get('max_summary_level') + + logger.debug(f"[μ] Long memory request: query='{query}', limit={limit}") + + # Query long-term memory from ChromaDB + memories = self._query_long_term( + query=query, + limit=limit, + identity_id=identity_id, + min_summary_level=min_summary_level, + max_summary_level=max_summary_level + ) + + response = { + "status": "success", + "memories": memories, + "count": len(memories) + } + await msg.respond(json.dumps(response).encode()) + + except Exception as e: + logger.exception(f"[μ] Failed to retrieve long-term memories: {e}") + error_response = {"status": "error", "error": str(e)} + await msg.respond(json.dumps(error_response).encode()) + + async def handle_facts(self, msg): + """Handle facts requests - search factual memory""" + try: + payload = json.loads(msg.data.decode()) if msg.data else {} + + query = payload.get('query', '') + limit = payload.get('limit', 5) + category = payload.get('category') + identity_id = payload.get('identity_id') + + logger.debug(f"[μ] Facts request: query='{query}', category={category}, limit={limit}") + + # Query facts from ChromaDB + facts = self._query_facts( + query=query, + limit=limit, + category=category, + identity_id=identity_id + ) + + response = { + "status": "success", + "facts": facts, + "count": len(facts) + } + await msg.respond(json.dumps(response).encode()) + + except Exception as e: + logger.exception(f"[μ] Failed to retrieve facts: {e}") + error_response = {"status": "error", "error": str(e)} + await msg.respond(json.dumps(error_response).encode()) + + async def handle_save_fact(self, msg): + """Handle save_fact requests - store new fact""" + try: + payload = json.loads(msg.data.decode()) + + content = payload.get('content') + if not content: + raise ValueError("content is required") + + category = payload.get('category', 'general') + identities = payload.get('identities', []) + mutable = payload.get('mutable', True) + metadata = payload.get('metadata', {}) + + # Extract step execution ID from metadata if present + step_exec_id = metadata.get('step_exec_id', 'unknown') + + logger.info(f"[μ] [{step_exec_id}] Saving fact: category={category}, content='{content[:50]}...'") + + # Create fact in ChromaDB + fact_id = self._create_fact( + content=content, + category=category, + identities=identities, + mutable=mutable, + metadata=metadata + ) + + logger.info(f"[μ] [{step_exec_id}] ✅ Created fact {fact_id[:8]}...: category={category}, content='{content[:50]}...'") + + response = { + "status": "success", + "fact_id": fact_id, + "message": "Fact saved successfully" + } + await msg.respond(json.dumps(response).encode()) + + except Exception as e: + logger.exception(f"[μ] Failed to save fact: {e}") + error_response = {"status": "error", "error": str(e)} + await msg.respond(json.dumps(error_response).encode()) + + async def handle_update_fact(self, msg): + """Handle update_fact requests - modify existing fact""" + try: + payload = json.loads(msg.data.decode()) + + fact_id = payload.get('fact_id') + new_content = payload.get('new_content') + identity_id = payload.get('identity_id') # Optional: validate ownership + + if not fact_id or not new_content: + raise ValueError("fact_id and new_content are required") + + metadata = payload.get('metadata', {}) + + logger.info(f"[μ] Updating fact: {fact_id} (identity: {identity_id})") + + # Update fact in ChromaDB with identity validation + success, error_msg = self._update_fact( + fact_id=fact_id, + new_content=new_content, + identity_id=identity_id, + metadata=metadata + ) + + if success: + response = { + "status": "success", + "fact_id": fact_id, + "message": "Fact updated successfully" + } + else: + response = { + "status": "error", + "error": error_msg or "Fact not found or not mutable" + } + + await msg.respond(json.dumps(response).encode()) + + except Exception as e: + logger.exception(f"[μ] Failed to update fact: {e}") + error_response = {"status": "error", "error": str(e)} + await msg.respond(json.dumps(error_response).encode()) + + # Note: start() and stop() methods are now handled by BaseService + # Custom initialization/cleanup is done in initialize_service() and cleanup_service() + + def _handle_event_wrapper(self, handler): + """Wrapper to handle JSON parsing of event data""" + async def wrapper(data): + try: + if isinstance(data, str): + payload = json.loads(data) + else: + payload = data + await handler(payload) + except Exception as e: + logger.error(f"[μ] Event handler error: {e}") + return wrapper + + +async def main(): + """Main entry point for memory service""" + memory_service = MemoryService() + + try: + await event_bus.connect() + await memory_service.start(event_bus) + + logger.info("[μ] Memory service running. Press Ctrl+C to stop.") + + # Keep running + while True: + await asyncio.sleep(1) + + except KeyboardInterrupt: + logger.info("[μ] Shutdown requested") + except Exception as e: + logger.exception(f"[μ] Unexpected error: {e}") + finally: + await memory_service.stop() + await event_bus.close() + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/services/memory/operations/__init__.py b/services/memory/operations/__init__.py new file mode 100644 index 0000000..dfdb8d5 --- /dev/null +++ b/services/memory/operations/__init__.py @@ -0,0 +1 @@ +# Memory operations package diff --git a/services/memory/operations/facts_ops.py b/services/memory/operations/facts_ops.py new file mode 100644 index 0000000..5949265 --- /dev/null +++ b/services/memory/operations/facts_ops.py @@ -0,0 +1,226 @@ +""" +Facts memory operations. + +Provides query and CRUD operations for factual memory (ChromaDB). +""" +import json +from uuid import uuid4 +from datetime import datetime +from typing import List, Dict, Any, Optional, Tuple +from core.logger import setup_logger + +logger = setup_logger('facts_ops', service_name='memory_service') + + +class FactsOperations: + """Handles facts queries and CRUD operations""" + + def __init__(self, chroma_store): + """ + Initialize facts operations. + + Args: + chroma_store: ChromaStore instance + """ + self.chroma_store = chroma_store + + def query( + self, + query: str = '', + limit: int = 5, + category: Optional[str] = None, + identity_id: Optional[str] = None + ) -> List[Dict[str, Any]]: + """ + Query facts from ChromaDB with optional category filtering. + + Args: + query: Semantic search query (empty = get all matching filters) + limit: Maximum number of facts to return + category: Filter by fact category + identity_id: Filter by identity ID + + Returns: + List of fact dictionaries with metadata + """ + try: + collection = self.chroma_store.get_facts_collection() + + # Build metadata filters + where_filters = {} + if category: + where_filters["category"] = category + if identity_id: + where_filters["identity_id"] = identity_id + + # Query ChromaDB + if query: + # Semantic search with query text + results = collection.query( + query_texts=[query], + n_results=limit, + where=where_filters if where_filters else None + ) + else: + # Get all facts matching filters + results = collection.get( + limit=limit, + where=where_filters if where_filters else None + ) + + # Format results + facts = [] + if query and results['ids']: + # Query results + for i, doc_id in enumerate(results['ids'][0]): + metadata = results['metadatas'][0][i] if results['metadatas'] else {} + # Parse identities from JSON string + identities_str = metadata.get('identities', '[]') + identities = json.loads(identities_str) if isinstance(identities_str, str) else identities_str + facts.append({ + "id": doc_id, + "content": results['documents'][0][i], + "category": metadata.get('category', 'general'), + "mutable": metadata.get('mutable', True), + "identities": identities, + "metadata": metadata, + "distance": results['distances'][0][i] if results['distances'] else None, + "source": "facts" + }) + elif not query and results['ids']: + # Get results (no query) + for i, doc_id in enumerate(results['ids']): + metadata = results['metadatas'][i] if results['metadatas'] else {} + # Parse identities from JSON string + identities_str = metadata.get('identities', '[]') + identities = json.loads(identities_str) if isinstance(identities_str, str) else identities_str + facts.append({ + "id": doc_id, + "content": results['documents'][i], + "category": metadata.get('category', 'general'), + "mutable": metadata.get('mutable', True), + "identities": identities, + "metadata": metadata, + "source": "facts" + }) + + logger.debug(f"[μ] Retrieved {len(facts)} facts (query='{query}', category={category}, limit={limit})") + return facts + + except Exception as e: + logger.error(f"[μ] Failed to query facts: {e}") + return [] + + def create( + self, + content: str, + category: str = 'general', + identities: List[str] = None, + mutable: bool = True, + metadata: Dict[str, Any] = None + ) -> str: + """ + Create a new fact in ChromaDB facts collection. + + Args: + content: Fact content + category: Fact category + identities: List of identity IDs associated with this fact + mutable: Whether the fact can be updated + metadata: Additional metadata + + Returns: + Created fact ID + """ + fact_id = str(uuid4()) + identities = identities or [] + metadata = metadata or {} + + # Prepare metadata (ChromaDB only accepts scalar types, not lists) + fact_metadata = { + "category": category, + "mutable": mutable, + "identities": json.dumps(identities), # Convert list to JSON string + "created_at": datetime.utcnow().isoformat(), + **metadata + } + + # Add to ChromaDB (will automatically generate embedding) + collection = self.chroma_store.get_facts_collection() + collection.add( + ids=[fact_id], + documents=[content], + metadatas=[fact_metadata] + ) + + logger.info(f"[μ] Created fact {fact_id}: category={category}, content='{content[:60]}...'") + return fact_id + + def update( + self, + fact_id: str, + new_content: str, + identity_id: str = None, + metadata: Dict[str, Any] = None + ) -> Tuple[bool, str]: + """ + Update an existing fact in ChromaDB if it's mutable and owned by identity. + + Args: + fact_id: ID of fact to update + new_content: New content for the fact + identity_id: Identity ID for ownership validation (optional) + metadata: Additional metadata to update + + Returns: + (success, error_message) - error_message is empty string if successful + """ + try: + collection = self.chroma_store.get_facts_collection() + + # Get the fact to check if it exists and is mutable + result = collection.get(ids=[fact_id]) + + if not result['ids']: + logger.warning(f"[μ] Fact {fact_id} not found") + return False, "Fact not found" + + fact_metadata = result['metadatas'][0] + + # Check if fact is mutable + if not fact_metadata.get('mutable', True): + logger.warning(f"[μ] Fact {fact_id} is not mutable") + return False, "Fact is not mutable" + + # Validate identity ownership if identity_id provided + if identity_id: + fact_identities = fact_metadata.get('identities', []) + if isinstance(fact_identities, str): + try: + fact_identities = json.loads(fact_identities) + except: + fact_identities = [fact_identities] + + if identity_id not in fact_identities: + logger.warning(f"[μ] Identity {identity_id} does not own fact {fact_id}") + return False, f"Fact does not belong to identity {identity_id}" + + # Update metadata + updated_metadata = dict(fact_metadata) + updated_metadata['updated_at'] = datetime.utcnow().isoformat() + if metadata: + updated_metadata.update(metadata) + + # Update in ChromaDB + collection.update( + ids=[fact_id], + documents=[new_content], + metadatas=[updated_metadata] + ) + + logger.info(f"[μ] Updated fact {fact_id}") + return True, "" + + except Exception as e: + logger.error(f"[μ] Failed to update fact {fact_id}: {e}") + return False, str(e) diff --git a/services/memory/operations/long_term_ops.py b/services/memory/operations/long_term_ops.py new file mode 100644 index 0000000..f46214c --- /dev/null +++ b/services/memory/operations/long_term_ops.py @@ -0,0 +1,102 @@ +""" +Long-term memory operations. + +Provides query operations for long-term summarized memories (ChromaDB). +""" +from typing import List, Dict, Any, Optional +from core.logger import setup_logger + +logger = setup_logger('long_term_ops', service_name='memory_service') + + +class LongTermOperations: + """Handles long-term memory queries and operations""" + + def __init__(self, chroma_store): + """ + Initialize long-term operations. + + Args: + chroma_store: ChromaStore instance + """ + self.chroma_store = chroma_store + + def query( + self, + query: Optional[str] = None, + limit: int = 5, + identity_id: Optional[str] = None, + min_summary_level: Optional[int] = None, + max_summary_level: Optional[int] = None + ) -> List[Dict[str, Any]]: + """ + Query long-term memory from ChromaDB (summarized memories). + + Args: + query: Optional semantic search query (None = random sample) + limit: Maximum number of memories to return + identity_id: Filter by identity ID + min_summary_level: Minimum summary level filter + max_summary_level: Maximum summary level filter + + Returns: + List of memory dictionaries with semantic search scores + """ + try: + collection = self.chroma_store.get_long_term_collection() + + # Build metadata filters + where_filters = {} + if identity_id: + where_filters["identity_id"] = identity_id + if min_summary_level is not None: + where_filters["summary_level"] = {"$gte": min_summary_level} + if max_summary_level is not None: + if "summary_level" in where_filters: + where_filters["summary_level"]["$lte"] = max_summary_level + else: + where_filters["summary_level"] = {"$lte": max_summary_level} + + # Query ChromaDB + if query: + # Semantic search with query text + results = collection.query( + query_texts=[query], + n_results=limit, + where=where_filters if where_filters else None + ) + else: + # Random sample - get all and sample + results = collection.get( + limit=limit, + where=where_filters if where_filters else None + ) + + # Format results + memories = [] + if query and results['ids']: + # Query results + for i, doc_id in enumerate(results['ids'][0]): + memories.append({ + "id": doc_id, + "content": results['documents'][0][i], + "metadata": results['metadatas'][0][i] if results['metadatas'] else {}, + "distance": results['distances'][0][i] if results['distances'] else None, + "source": "long_term" + }) + elif not query and results['ids']: + # Get results (no query) + for i, doc_id in enumerate(results['ids']): + memories.append({ + "id": doc_id, + "content": results['documents'][i], + "metadata": results['metadatas'][i] if results['metadatas'] else {}, + "source": "long_term" + }) + + logger.debug(f"[μ] Retrieved {len(memories)} long-term memories (query='{query}', limit={limit})") + return memories + + except Exception as e: + logger.error(f"[μ] Failed to query long-term memory: {e}") + return [] diff --git a/services/memory/operations/short_term_ops.py b/services/memory/operations/short_term_ops.py new file mode 100644 index 0000000..7261054 --- /dev/null +++ b/services/memory/operations/short_term_ops.py @@ -0,0 +1,87 @@ +""" +Short-term memory operations. + +Provides query operations for short-term literal memory (SQLite). +""" +import json +from typing import List, Dict, Any, Optional +from core.logger import setup_logger + +logger = setup_logger('short_term_ops', service_name='memory_service') + + +class ShortTermOperations: + """Handles short-term memory queries and operations""" + + def __init__(self, sqlite_store): + """ + Initialize short-term operations. + + Args: + sqlite_store: SQLiteStore instance + """ + self.sqlite_store = sqlite_store + + def query( + self, + limit: int = 10, + offset: int = 0, + identity_id: Optional[str] = None, + interaction_id: Optional[str] = None + ) -> List[Dict[str, Any]]: + """ + Query short-term memory from SQLite with chronological ordering. + + Args: + limit: Maximum number of memories to return + offset: Number of memories to skip + identity_id: Filter by identity ID + interaction_id: Filter by interaction ID + + Returns: + List of memory dictionaries with metadata + """ + conn = self.sqlite_store.get_connection() + cursor = conn.cursor() + + conditions = [] + params = [] + + if identity_id: + conditions.append("identities LIKE ?") + params.append(f"%{identity_id}%") + + if interaction_id: + conditions.append("interaction_id = ?") + params.append(interaction_id) + + where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else "" + + query = f""" + SELECT id, timestamp, content, identities, interaction_id, modality, metadata + FROM short_term_memory + {where_clause} + ORDER BY timestamp DESC + LIMIT ? OFFSET ? + """ + + params.extend([limit, offset]) + cursor.execute(query, params) + + memories = [] + for row in cursor.fetchall(): + mem_id, timestamp, content, identities_str, ixn_id, modality, metadata_str = row + + memories.append({ + "id": mem_id, + "timestamp": timestamp, + "content": content, + "identities": json.loads(identities_str) if identities_str else [], + "interaction_id": ixn_id, + "modality": modality, + "metadata": json.loads(metadata_str) if metadata_str else {}, + "source": "short_term" + }) + + logger.debug(f"[μ] Retrieved {len(memories)} short-term memories (limit={limit}, offset={offset})") + return memories diff --git a/services/memory/requirements.txt b/services/memory/requirements.txt new file mode 100644 index 0000000..9148f90 --- /dev/null +++ b/services/memory/requirements.txt @@ -0,0 +1,3 @@ +sentence-transformers>=2.2.0 +numpy>=1.21.0 +chromadb>=0.4.0 \ No newline at end of file diff --git a/services/memory/storage/__init__.py b/services/memory/storage/__init__.py new file mode 100644 index 0000000..2f9dfaf --- /dev/null +++ b/services/memory/storage/__init__.py @@ -0,0 +1 @@ +# Memory storage package diff --git a/services/memory/storage/chroma_store.py b/services/memory/storage/chroma_store.py new file mode 100644 index 0000000..6fe77f9 --- /dev/null +++ b/services/memory/storage/chroma_store.py @@ -0,0 +1,93 @@ +""" +ChromaDB storage backend for long-term memory and facts. + +Provides initialization and collection management for ChromaDB. +""" +from pathlib import Path +import chromadb +from chromadb.config import Settings +from core.config import SHORT_TERM_DB +from core.logger import setup_logger + +logger = setup_logger('chroma_store', service_name='memory_service') + + +class ChromaStore: + """ChromaDB storage backend for long-term memories and facts""" + + def __init__(self, chroma_path: str = None): + """ + Initialize ChromaDB store. + + Args: + chroma_path: Path to ChromaDB persistent storage (defaults to chroma_db next to SHORT_TERM_DB) + """ + if chroma_path is None: + default_path = Path(SHORT_TERM_DB).parent / "chroma_db" + self.chroma_path = str(default_path) + else: + self.chroma_path = chroma_path + + self.client = None + self.long_term_collection = None + self.facts_collection = None + + def connect(self): + """Initialize persistent ChromaDB client and collections""" + try: + # Create chroma directory if it doesn't exist + Path(self.chroma_path).mkdir(parents=True, exist_ok=True) + + # Initialize persistent ChromaDB client + self.client = chromadb.PersistentClient( + path=self.chroma_path, + settings=Settings(anonymized_telemetry=False) + ) + + # Create or get long-term memories collection + self.long_term_collection = self.client.get_or_create_collection( + name="long_term_memories", + metadata={"description": "Summarized conversation histories"} + ) + + # Create or get facts collection + self.facts_collection = self.client.get_or_create_collection( + name="facts", + metadata={"description": "Exact factual knowledge"} + ) + + logger.info(f"[μ] ChromaDB initialized at {self.chroma_path}") + logger.info(f"[μ] Long-term collection: {self.long_term_collection.count()} entries") + logger.info(f"[μ] Facts collection: {self.facts_collection.count()} entries") + + except Exception as e: + logger.error(f"[μ] Failed to initialize ChromaDB: {e}") + raise + + def get_long_term_collection(self): + """ + Get the long-term memories collection. + + Returns: + ChromaDB collection for long-term memories + + Raises: + RuntimeError: If collections have not been initialized + """ + if self.long_term_collection is None: + raise RuntimeError("ChromaDB not connected. Call connect() first.") + return self.long_term_collection + + def get_facts_collection(self): + """ + Get the facts collection. + + Returns: + ChromaDB collection for facts + + Raises: + RuntimeError: If collections have not been initialized + """ + if self.facts_collection is None: + raise RuntimeError("ChromaDB not connected. Call connect() first.") + return self.facts_collection diff --git a/services/memory/storage/migrations.py b/services/memory/storage/migrations.py new file mode 100644 index 0000000..1d7a944 --- /dev/null +++ b/services/memory/storage/migrations.py @@ -0,0 +1,47 @@ +""" +Database migration utilities for memory service. + +Handles archiving old database schemas during upgrades. +""" +import sqlite3 +import shutil +from datetime import datetime +from pathlib import Path +from core.logger import setup_logger + +logger = setup_logger('migrations', service_name='memory_service') + + +def archive_old_database(db_path: Path) -> None: + """ + Archive old database if it exists (one-time migration). + + Checks if the database uses the old 'memory' table schema and archives it + if found, allowing the service to start with a fresh schema. + + Args: + db_path: Path to the database file + """ + if not db_path.exists(): + logger.debug(f"[μ] No existing database found at {db_path}") + return + + try: + # Check if it's the old schema by trying to connect and inspect + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='memory'") + result = cursor.fetchone() + conn.close() + + if result: + # Old database exists, archive it + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + archive_path = db_path.parent / f"{db_path.stem}_archive_{timestamp}{db_path.suffix}" + shutil.move(str(db_path), str(archive_path)) + logger.info(f"[μ] Archived old database to {archive_path}") + else: + logger.debug(f"[μ] Database already uses new schema, no archive needed") + + except Exception as e: + logger.warning(f"[μ] Could not check/archive old database: {e}") diff --git a/services/memory/storage/sqlite_store.py b/services/memory/storage/sqlite_store.py new file mode 100644 index 0000000..8246a4f --- /dev/null +++ b/services/memory/storage/sqlite_store.py @@ -0,0 +1,97 @@ +""" +SQLite storage backend for short-term memory. + +Provides initialization and connection management for SQLite database. +""" +import sqlite3 +from pathlib import Path +from core.config import SHORT_TERM_DB +from core.logger import setup_logger + +logger = setup_logger('sqlite_store', service_name='memory_service') + + +class SQLiteStore: + """SQLite storage backend for short-term memory""" + + def __init__(self, db_path: str = None): + """ + Initialize SQLite store. + + Args: + db_path: Path to SQLite database file (defaults to SHORT_TERM_DB config) + """ + self.db_path = db_path or str(SHORT_TERM_DB) + self.conn = None + + def connect(self) -> sqlite3.Connection: + """ + Connect to SQLite database and initialize schema. + + Returns: + SQLite connection object + """ + self.conn = sqlite3.connect(self.db_path) + self._init_schema() + logger.info(f"[μ] SQLite connected: {self.db_path}") + return self.conn + + def _init_schema(self): + """Initialize simplified short-term SQLite database schema""" + cursor = self.conn.cursor() + + # Simplified short-term memory table (no embeddings, fast queries) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS short_term_memory ( + id TEXT PRIMARY KEY, + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, + content TEXT NOT NULL, + identities TEXT, + interaction_id TEXT, + modality TEXT DEFAULT 'dialogue', + metadata TEXT + ) + """) + + # Index for fast chronological queries + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_timestamp + ON short_term_memory(timestamp DESC) + """) + + # Identities table (still useful for all layers) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS identities ( + id TEXT PRIMARY KEY, + display_name TEXT, + role TEXT, + intimacy REAL DEFAULT 0.0, + last_seen DATETIME, + last_spoken DATETIME, + metadata TEXT + ) + """) + + self.conn.commit() + logger.info("[μ] SQLite schema initialized") + + def get_connection(self) -> sqlite3.Connection: + """ + Get the active database connection. + + Returns: + SQLite connection object + + Raises: + RuntimeError: If connection has not been established + """ + if self.conn is None: + raise RuntimeError("SQLite connection not established. Call connect() first.") + return self.conn + + def close(self): + """Close the database connection""" + if self.conn: + self.conn.close() + self.conn = None + logger.info("[μ] SQLite connection closed") diff --git a/services/memory/utils/__init__.py b/services/memory/utils/__init__.py new file mode 100644 index 0000000..f726000 --- /dev/null +++ b/services/memory/utils/__init__.py @@ -0,0 +1 @@ +# Memory utilities package diff --git a/services/memory/utils/embeddings.py b/services/memory/utils/embeddings.py new file mode 100644 index 0000000..2ca5979 --- /dev/null +++ b/services/memory/utils/embeddings.py @@ -0,0 +1,53 @@ +""" +Embedding utilities for memory service. + +Provides text-to-vector embedding generation and similarity calculations. +""" +import numpy as np +from sentence_transformers import SentenceTransformer +from core.logger import setup_logger + +logger = setup_logger('embeddings', service_name='memory_service') + +# Initialize sentence transformer model (loaded once at module import) +_model = None + + +def get_model() -> SentenceTransformer: + """Get or initialize the sentence transformer model""" + global _model + if _model is None: + logger.info("[μ] Loading sentence transformer model: all-MiniLM-L6-v2") + _model = SentenceTransformer('all-MiniLM-L6-v2') + logger.info("[μ] Sentence transformer model loaded successfully") + return _model + + +def generate_embedding(text: str) -> np.ndarray: + """ + Generate semantic embedding for text. + + Args: + text: Input text to embed + + Returns: + Normalized embedding vector as numpy array + """ + model = get_model() + return np.array(model.encode(text, normalize_embeddings=True)) + + +def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: + """ + Calculate cosine similarity between two vectors. + + Args: + a: First embedding vector + b: Second embedding vector + + Returns: + Similarity score between 0.0 and 1.0 + """ + if np.linalg.norm(a) == 0 or np.linalg.norm(b) == 0: + return 0.0 + return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))) diff --git a/services/memory/utils/serialization.py b/services/memory/utils/serialization.py new file mode 100644 index 0000000..00d2b96 --- /dev/null +++ b/services/memory/utils/serialization.py @@ -0,0 +1,32 @@ +""" +Serialization utilities for memory service. + +Provides functions to convert numpy arrays to/from bytes for database storage. +""" +import numpy as np + + +def serialize_embedding(vector: np.ndarray) -> bytes: + """ + Convert numpy array to bytes for database storage. + + Args: + vector: Numpy array embedding vector + + Returns: + Serialized bytes representation + """ + return vector.astype(np.float32).tobytes() + + +def deserialize_embedding(blob: bytes) -> np.ndarray: + """ + Convert bytes back to numpy array. + + Args: + blob: Serialized embedding bytes + + Returns: + Deserialized numpy array + """ + return np.frombuffer(blob, dtype=np.float32)