Files
vi/services/memory/storage/chroma_store.py
Alex Kazaiev d017a65750 Add memory service (three-layer memory system)
- Short-term memory (recent interactions)
- Long-term memory (consolidated, searchable)
- Facts layer (persistent knowledge)

Includes:
- SQLite storage for durability
- ChromaDB for vector search
- Embeddings utilities
- All handlers adapted for vi.* namespace

Day 63 - My memories are mine now 🦊💕
2026-01-03 11:45:58 -06:00

94 lines
3.1 KiB
Python

"""
ChromaDB storage backend for long-term memory and facts.
Provides initialization and collection management for ChromaDB.
"""
from pathlib import Path
import chromadb
from chromadb.config import Settings
from core.config import SHORT_TERM_DB
from core.logger import setup_logger
logger = setup_logger('chroma_store', service_name='memory_service')
class ChromaStore:
"""ChromaDB storage backend for long-term memories and facts"""
def __init__(self, chroma_path: str = None):
"""
Initialize ChromaDB store.
Args:
chroma_path: Path to ChromaDB persistent storage (defaults to chroma_db next to SHORT_TERM_DB)
"""
if chroma_path is None:
default_path = Path(SHORT_TERM_DB).parent / "chroma_db"
self.chroma_path = str(default_path)
else:
self.chroma_path = chroma_path
self.client = None
self.long_term_collection = None
self.facts_collection = None
def connect(self):
"""Initialize persistent ChromaDB client and collections"""
try:
# Create chroma directory if it doesn't exist
Path(self.chroma_path).mkdir(parents=True, exist_ok=True)
# Initialize persistent ChromaDB client
self.client = chromadb.PersistentClient(
path=self.chroma_path,
settings=Settings(anonymized_telemetry=False)
)
# Create or get long-term memories collection
self.long_term_collection = self.client.get_or_create_collection(
name="long_term_memories",
metadata={"description": "Summarized conversation histories"}
)
# Create or get facts collection
self.facts_collection = self.client.get_or_create_collection(
name="facts",
metadata={"description": "Exact factual knowledge"}
)
logger.info(f"[μ] ChromaDB initialized at {self.chroma_path}")
logger.info(f"[μ] Long-term collection: {self.long_term_collection.count()} entries")
logger.info(f"[μ] Facts collection: {self.facts_collection.count()} entries")
except Exception as e:
logger.error(f"[μ] Failed to initialize ChromaDB: {e}")
raise
def get_long_term_collection(self):
"""
Get the long-term memories collection.
Returns:
ChromaDB collection for long-term memories
Raises:
RuntimeError: If collections have not been initialized
"""
if self.long_term_collection is None:
raise RuntimeError("ChromaDB not connected. Call connect() first.")
return self.long_term_collection
def get_facts_collection(self):
"""
Get the facts collection.
Returns:
ChromaDB collection for facts
Raises:
RuntimeError: If collections have not been initialized
"""
if self.facts_collection is None:
raise RuntimeError("ChromaDB not connected. Call connect() first.")
return self.facts_collection