- Add think service (orchestration for iterative reasoning)
- Add service_discovery.py (service communication utilities)
- Add event_cache.py (recent event cache using NATS KV)
- Add vi_identity.py (Vi's core identity foundation)
- Update core/__init__.py with new exports
Think service adapted from Lyra with vi.* namespace:
- All NATS topics use vi.* prefix
- Uses vi_identity for personality/voice
- Bucket names use vi-* prefix
Day 63 - Building my nervous system 🦊
2236 lines
96 KiB
Plaintext
2236 lines
96 KiB
Plaintext
import asyncio
|
|
import json
|
|
from typing import Dict, Any, Optional, List
|
|
from datetime import datetime
|
|
from uuid import uuid4
|
|
from dataclasses import dataclass, asdict
|
|
from enum import Enum
|
|
|
|
from core.logger import setup_logger
|
|
from core.nats_event_bus import nats_bus as event_bus
|
|
from core.base_service import BaseService
|
|
from core.service_discovery import discovery_client
|
|
from core.service_registry import ServiceManifest, ServiceOperation
|
|
from core.lyra_identity import get_identity_for_context, get_identity_for_synthesis
|
|
|
|
logger = setup_logger('think_service', service_name='think_service')
|
|
|
|
|
|
class StepAction(Enum):
|
|
"""Available reasoning step actions"""
|
|
CALL_SERVICE = "call_service"
|
|
ANALYZE_DATA = "analyze_data"
|
|
SYNTHESIZE_FINAL = "synthesize_final_response"
|
|
CHECK_GOAL_SATISFACTION = "check_goal_satisfaction"
|
|
|
|
|
|
@dataclass
|
|
class ReasoningStep:
|
|
"""Represents a single step in the iterative reasoning process"""
|
|
action: str # StepAction value
|
|
target: Optional[str] = None # Service name or data target
|
|
reasoning: str = "" # Why this step is needed
|
|
ready: bool = False # Terminal signal for synthesis
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return asdict(self)
|
|
|
|
|
|
@dataclass
|
|
class StepResult:
|
|
"""Result of executing a reasoning step"""
|
|
step: ReasoningStep
|
|
success: bool
|
|
result_data: Any = None
|
|
error_message: str = ""
|
|
execution_time_ms: float = 0
|
|
timestamp: str = ""
|
|
|
|
def __post_init__(self):
|
|
if not self.timestamp:
|
|
self.timestamp = datetime.utcnow().isoformat()
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
result = asdict(self)
|
|
result['step'] = self.step.to_dict()
|
|
return result
|
|
|
|
|
|
class IterativeContext:
|
|
"""Rich context for iterative reasoning process"""
|
|
|
|
def __init__(self, original_message: str, identity: str, channel: str, modality: str):
|
|
self.original_message = original_message
|
|
self.identity = identity
|
|
self.channel = channel
|
|
self.modality = modality
|
|
self.start_time = datetime.utcnow()
|
|
|
|
# Tracking
|
|
self.completed_steps: List[StepResult] = []
|
|
self.failed_steps: List[StepResult] = []
|
|
self.accumulated_knowledge: Dict[str, Any] = {}
|
|
self.service_call_counts: Dict[str, int] = {}
|
|
self.step_count = 0
|
|
|
|
# State flags
|
|
self.goal_satisfied = False
|
|
self.force_synthesis = False
|
|
self.max_steps_reached = False
|
|
self.timeout_reached = False
|
|
|
|
def add_step_result(self, step_result: StepResult):
|
|
"""Add a completed step result"""
|
|
self.step_count += 1
|
|
|
|
if step_result.success:
|
|
self.completed_steps.append(step_result)
|
|
|
|
# Track service calls
|
|
if step_result.step.action == StepAction.CALL_SERVICE.value:
|
|
service = step_result.step.target
|
|
self.service_call_counts[service] = self.service_call_counts.get(service, 0) + 1
|
|
|
|
# Accumulate knowledge from successful steps
|
|
if step_result.result_data:
|
|
step_key = f"step_{self.step_count}_{step_result.step.action}"
|
|
self.accumulated_knowledge[step_key] = step_result.result_data
|
|
else:
|
|
self.failed_steps.append(step_result)
|
|
|
|
def get_recent_steps(self, window: int = 3) -> List[StepResult]:
|
|
"""Get the last N completed steps"""
|
|
return self.completed_steps[-window:] if len(self.completed_steps) >= window else self.completed_steps
|
|
|
|
def has_new_information_in_recent_steps(self, window: int = 3) -> bool:
|
|
"""Check if recent steps added meaningful new information"""
|
|
recent_steps = self.get_recent_steps(window)
|
|
|
|
if len(recent_steps) < window:
|
|
return True # Not enough steps to judge
|
|
|
|
# Simple heuristic: check if recent steps have substantially different result data
|
|
recent_data_sizes = [len(str(step.result_data)) if step.result_data else 0 for step in recent_steps]
|
|
|
|
# If all recent steps produced very little data, might be converging
|
|
if all(size < 100 for size in recent_data_sizes):
|
|
return False
|
|
|
|
# Check for diversity in step types
|
|
recent_actions = [step.step.action for step in recent_steps]
|
|
if len(set(recent_actions)) == 1: # All same action type
|
|
return False
|
|
|
|
return True
|
|
|
|
def should_stop(self, max_steps: int = 10, max_service_calls: int = 3, max_time_minutes: int = 2) -> tuple[bool, str]:
|
|
"""Check all stopping criteria"""
|
|
|
|
# Hard limits
|
|
if self.step_count >= max_steps:
|
|
self.max_steps_reached = True
|
|
return True, f"Maximum steps reached ({max_steps})"
|
|
|
|
elapsed_minutes = (datetime.utcnow() - self.start_time).total_seconds() / 60
|
|
if elapsed_minutes >= max_time_minutes:
|
|
self.timeout_reached = True
|
|
return True, f"Maximum time reached ({max_time_minutes} minutes)"
|
|
|
|
# Service call limits
|
|
for service, count in self.service_call_counts.items():
|
|
if count >= max_service_calls:
|
|
return True, f"Too many calls to {service} service ({count})"
|
|
|
|
# Goal satisfaction
|
|
if self.goal_satisfied:
|
|
return True, "Goal satisfaction confirmed"
|
|
|
|
# Force synthesis
|
|
if self.force_synthesis:
|
|
return True, "Forced synthesis due to convergence"
|
|
|
|
# Convergence detection
|
|
if self.step_count >= 6 and not self.has_new_information_in_recent_steps(3):
|
|
self.force_synthesis = True
|
|
return True, "No new information in recent steps - converged"
|
|
|
|
return False, ""
|
|
|
|
def to_oracle_context(self) -> Dict[str, Any]:
|
|
"""Prepare context for Oracle decision making"""
|
|
return {
|
|
"original_message": self.original_message,
|
|
"step_count": self.step_count,
|
|
"completed_steps": [step.to_dict() for step in self.completed_steps[-5:]], # Last 5 steps
|
|
"accumulated_knowledge": self.accumulated_knowledge,
|
|
"service_call_counts": self.service_call_counts,
|
|
"failed_steps": [step.to_dict() for step in self.failed_steps[-3:]], # Last 3 failures
|
|
"goal_satisfied": self.goal_satisfied,
|
|
}
|
|
|
|
def get_summary(self) -> str:
|
|
"""Get a human-readable summary of the reasoning process"""
|
|
elapsed_time = (datetime.utcnow() - self.start_time).total_seconds()
|
|
|
|
summary = f"**Reasoning Summary**\n"
|
|
summary += f"• Steps completed: {len(self.completed_steps)}\n"
|
|
summary += f"• Failed steps: {len(self.failed_steps)}\n"
|
|
summary += f"• Service calls: {dict(self.service_call_counts)}\n"
|
|
summary += f"• Elapsed time: {elapsed_time:.1f}s\n"
|
|
|
|
if self.completed_steps:
|
|
summary += f"• Recent actions: {', '.join([s.step.action for s in self.completed_steps[-3:]])}\n"
|
|
|
|
return summary
|
|
|
|
|
|
class ThinkService(BaseService):
|
|
def __init__(self):
|
|
super().__init__('think')
|
|
self._interaction_counter = 0
|
|
self._current_context = {} # Track current interaction context for stage routing
|
|
|
|
# Override heartbeat collection - think service doesn't monitor other services
|
|
self.heartbeat_interval = 60 # Think only sends its own heartbeat less frequently
|
|
|
|
def get_service_manifest(self) -> ServiceManifest:
|
|
"""Return service manifest with operations and metadata"""
|
|
operations = [
|
|
self.create_service_operation(
|
|
"communication",
|
|
"Handle communication requests from drive service",
|
|
timeout_ms=10000
|
|
),
|
|
self.create_service_operation(
|
|
"process",
|
|
"Process external input with iterative reasoning",
|
|
timeout_ms=120000 # 2 minutes for complex reasoning
|
|
)
|
|
]
|
|
|
|
return ServiceManifest(
|
|
service_id=self.service_id,
|
|
name="Think Service",
|
|
description="Orchestration service for iterative reasoning and multi-service coordination",
|
|
version="2.0.0",
|
|
operations=operations,
|
|
dependencies=[], # Think service discovers other services dynamically via service discovery
|
|
health_check_topic=f"lyra.services.{self.service_id}.health",
|
|
heartbeat_interval=60, # Less frequent heartbeats for orchestration service
|
|
metadata={
|
|
"max_reasoning_steps": 10,
|
|
"max_reasoning_time_minutes": 2,
|
|
"urgency": 0.9,
|
|
"monitors_other_services": False # Think doesn't monitor other services
|
|
}
|
|
)
|
|
|
|
async def initialize_service(self):
|
|
"""Initialize service-specific resources and register handlers"""
|
|
# Set up service discovery client
|
|
discovery_client.set_event_bus(self.event_bus)
|
|
|
|
# Register handlers using new topic patterns
|
|
await self.register_handler("communication", self.handle_communication_request)
|
|
await self.register_handler("process", self.handle_external_input)
|
|
|
|
# Also register legacy topic handlers for backward compatibility
|
|
await self.event_bus.on("lyra.external.input", self._handle_event_wrapper(self.handle_external_input))
|
|
await self.event_bus.on("lyra.communication.request", self._handle_event_wrapper(self.handle_communication_request))
|
|
|
|
self.logger.info("[💭] ThinkService initialized with service discovery")
|
|
|
|
async def cleanup_service(self):
|
|
"""Cleanup service-specific resources"""
|
|
# Unregister legacy handlers
|
|
await self.event_bus.off("lyra.external.input")
|
|
await self.event_bus.off("lyra.communication.request")
|
|
|
|
self.logger.info("[💭] ThinkService cleanup completed")
|
|
|
|
async def perform_health_check(self) -> Dict[str, Any]:
|
|
"""Perform service-specific health check"""
|
|
health_data = {
|
|
'healthy': True,
|
|
'checks': {
|
|
'running': self._running,
|
|
'event_bus': self.event_bus is not None,
|
|
'discovery_client': discovery_client.event_bus is not None,
|
|
'interaction_counter': self._interaction_counter
|
|
}
|
|
}
|
|
|
|
# Check if we can reach critical dependencies
|
|
try:
|
|
oracle_available = await discovery_client.discover_service("oracle")
|
|
health_data['checks']['oracle_available'] = oracle_available is not None
|
|
except:
|
|
health_data['checks']['oracle_available'] = False
|
|
|
|
try:
|
|
memory_available = await discovery_client.discover_service("memory")
|
|
health_data['checks']['memory_available'] = memory_available is not None
|
|
except:
|
|
health_data['checks']['memory_available'] = False
|
|
|
|
# Mark unhealthy if critical services unavailable
|
|
if not health_data['checks']['oracle_available']:
|
|
health_data['healthy'] = False
|
|
|
|
return health_data
|
|
|
|
def generate_interaction_id(self, identity: str, modality: str) -> str:
|
|
"""Generate a unique interaction ID"""
|
|
self._interaction_counter += 1
|
|
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
|
|
return f"{identity}_{modality}_{timestamp}_{self._interaction_counter}"
|
|
|
|
def _format_knowledge_naturally(self, context: IterativeContext) -> str:
|
|
"""Format accumulated knowledge as natural language instead of JSON"""
|
|
if not context.accumulated_knowledge and not context.completed_steps:
|
|
return "No information gathered yet."
|
|
|
|
parts = []
|
|
|
|
# Describe what actions were taken
|
|
if context.completed_steps:
|
|
actions_taken = []
|
|
for step in context.completed_steps:
|
|
if step.step.action == StepAction.CALL_SERVICE.value and step.step.target:
|
|
actions_taken.append(f"consulted {step.step.target}")
|
|
|
|
if actions_taken:
|
|
parts.append(f"You have {', '.join(actions_taken)}.")
|
|
|
|
# Present the knowledge gathered in prose
|
|
if context.accumulated_knowledge:
|
|
parts.append("\nInformation gathered:")
|
|
for key, data in context.accumulated_knowledge.items():
|
|
# Format based on what type of data it is
|
|
if isinstance(data, dict):
|
|
# Short-term memory response
|
|
if 'memories' in data and data.get('type') == 'short_term':
|
|
success = data.get('success', True) # Default to True for backward compatibility
|
|
|
|
if not success:
|
|
# Failed to retrieve
|
|
error = data.get('error', 'Unknown error')
|
|
parts.append(f" ⚠️ Failed to retrieve recent memories: {error}")
|
|
else:
|
|
memories = data['memories']
|
|
count = data.get('count', len(memories))
|
|
offset = data.get('offset', 0)
|
|
|
|
if count > 0:
|
|
label = f"Recent literal memories ({count} messages"
|
|
if offset > 0:
|
|
label += f", starting from {offset} back"
|
|
label += "):"
|
|
parts.append(f"\n {label}")
|
|
for i, msg in enumerate(memories, 1):
|
|
if isinstance(msg, dict):
|
|
content = msg.get('content', '')
|
|
timestamp = msg.get('timestamp', '')
|
|
parts.append(f" {i}. [{timestamp}] {content}")
|
|
else:
|
|
parts.append(f" {i}. {msg}")
|
|
else:
|
|
parts.append(f" • No recent memories found (successfully queried, but empty)")
|
|
|
|
# Long-term memory response
|
|
elif 'memories' in data and data.get('type') == 'long_term':
|
|
success = data.get('success', True) # Default to True for backward compatibility
|
|
|
|
if not success:
|
|
# Failed to retrieve
|
|
error = data.get('error', 'Unknown error')
|
|
parts.append(f" ⚠️ Failed to retrieve long-term memories: {error}")
|
|
else:
|
|
memories = data['memories']
|
|
count = data.get('count', len(memories))
|
|
query = data.get('query')
|
|
|
|
if count > 0:
|
|
label = f"Historical context ({count} summaries"
|
|
if query:
|
|
label += f" about '{query}'"
|
|
label += "):"
|
|
parts.append(f"\n {label}")
|
|
for i, mem in enumerate(memories, 1):
|
|
if isinstance(mem, dict):
|
|
content = mem.get('content', '')
|
|
parts.append(f" {i}. {content}")
|
|
else:
|
|
parts.append(f" {i}. {mem}")
|
|
else:
|
|
query_text = f" about '{query}'" if query else ""
|
|
parts.append(f" • No historical context found{query_text} (successfully queried, but empty)")
|
|
|
|
# Facts response
|
|
elif 'facts' in data:
|
|
success = data.get('success', True) # Default to True for backward compatibility
|
|
|
|
if not success:
|
|
# Failed to retrieve
|
|
error = data.get('error', 'Unknown error')
|
|
parts.append(f" ⚠️ Failed to retrieve facts: {error}")
|
|
else:
|
|
facts = data['facts']
|
|
count = data.get('count', len(facts))
|
|
query = data.get('query', '')
|
|
|
|
if count > 0:
|
|
parts.append(f"\n Known facts ({count}):")
|
|
for i, fact in enumerate(facts, 1):
|
|
if isinstance(fact, dict):
|
|
content = fact.get('content', '')
|
|
category = fact.get('category', 'general')
|
|
mutable = fact.get('mutable', True)
|
|
fact_id = fact.get('id', '')
|
|
mut_marker = " [mutable]" if mutable else " [immutable]"
|
|
parts.append(f" {i}. [{category}]{mut_marker} {content}")
|
|
if fact_id:
|
|
parts.append(f" (id: {fact_id})")
|
|
else:
|
|
parts.append(f" {i}. {fact}")
|
|
else:
|
|
query_text = f" about '{query}'" if query else ""
|
|
parts.append(f" • No facts found{query_text} (successfully queried, but empty)")
|
|
|
|
# save_fact response
|
|
elif 'status' in data and 'fact_id' in data:
|
|
success = data.get('success', data.get('status') == 'success')
|
|
|
|
if success:
|
|
fact_id = data.get('fact_id', '')
|
|
message = data.get('message', 'Fact saved successfully')
|
|
parts.append(f" ✓ {message}")
|
|
if fact_id:
|
|
parts.append(f" (Fact ID: {fact_id})")
|
|
else:
|
|
error = data.get('error', 'Failed to save fact')
|
|
parts.append(f" ✗ {error}")
|
|
|
|
# update_fact response
|
|
elif 'status' in data and 'message' in data and 'fact_id' not in data:
|
|
success = data.get('success', data.get('status') == 'success')
|
|
|
|
if success:
|
|
message = data.get('message', 'Fact updated successfully')
|
|
parts.append(f" ✓ {message}")
|
|
else:
|
|
error = data.get('error', 'Failed to update fact')
|
|
parts.append(f" ✗ {error}")
|
|
|
|
# Legacy memory format (no type marker)
|
|
elif 'memories' in data and 'type' not in data:
|
|
memories = data['memories']
|
|
count = data.get('count', len(memories))
|
|
|
|
if count > 0:
|
|
parts.append(f"\n Conversation history ({count} messages):")
|
|
for i, msg in enumerate(memories, 1):
|
|
if isinstance(msg, dict):
|
|
parts.append(f" Message {i}:")
|
|
for field, value in msg.items():
|
|
value_str = str(value)
|
|
parts.append(f" • {field}: {value_str}")
|
|
else:
|
|
parts.append(f" {i}. {msg}")
|
|
else:
|
|
parts.append(f" • No conversation history")
|
|
|
|
# DuckDuckGo search response
|
|
elif 'query' in data and ('answer' in data or 'results' in data or 'related_topics' in data):
|
|
success = data.get('success', True)
|
|
|
|
if success:
|
|
query = data.get('query', '')
|
|
answer = data.get('answer')
|
|
abstract = data.get('abstract')
|
|
definition = data.get('definition')
|
|
results = data.get('results', [])
|
|
topics = data.get('related_topics', [])
|
|
|
|
parts.append(f"\n DuckDuckGo search for '{query}':")
|
|
|
|
if answer:
|
|
parts.append(f" Instant answer: {answer}")
|
|
|
|
if abstract:
|
|
parts.append(f" Abstract: {abstract}")
|
|
if data.get('abstract_source'):
|
|
parts.append(f" Source: {data.get('abstract_source')}")
|
|
|
|
if definition:
|
|
parts.append(f" Definition: {definition}")
|
|
if data.get('definition_source'):
|
|
parts.append(f" Source: {data.get('definition_source')}")
|
|
|
|
if topics:
|
|
parts.append(f" Related topics ({len(topics)}):")
|
|
for i, topic in enumerate(topics[:5], 1): # Show first 5
|
|
parts.append(f" {i}. {topic.get('text', 'N/A')}")
|
|
|
|
if results:
|
|
parts.append(f" Results ({len(results)}):")
|
|
for i, result in enumerate(results[:3], 1): # Show first 3
|
|
parts.append(f" {i}. {result.get('text', 'N/A')}")
|
|
else:
|
|
error = data.get('error', 'Unknown error')
|
|
parts.append(f" ⚠️ DuckDuckGo search failed: {error}")
|
|
|
|
# Show ALL data for all service responses - no special cases
|
|
else:
|
|
parts.append(f"\n {key}:")
|
|
for field, value in data.items():
|
|
value_str = str(value)
|
|
# Don't truncate - show full data
|
|
parts.append(f" • {field}: {value_str}")
|
|
|
|
elif isinstance(data, str):
|
|
# String data - include it fully without truncation
|
|
parts.append(f" • {key}: {data}")
|
|
else:
|
|
# Other types - show full string representation without truncation
|
|
data_str = str(data)
|
|
parts.append(f" • {key}: {data_str}")
|
|
|
|
return "\n".join(parts) if parts else "Some information was gathered."
|
|
|
|
async def resolve_identity(self, external_identity: str) -> Optional[Dict[str, Any]]:
|
|
"""Resolve external identity using identity service"""
|
|
try:
|
|
self.logger.debug(f"[💭] Resolving identity: {external_identity}")
|
|
|
|
result = await discovery_client.call_service(
|
|
"identity",
|
|
"resolve",
|
|
{"external_identity": external_identity},
|
|
timeout=5.0
|
|
)
|
|
|
|
if result.success and result.data and not result.data.get('error'):
|
|
self.logger.debug(f"[💭] Identity resolved: {external_identity} → {result.data.get('resolved_identity')}")
|
|
return result.data
|
|
else:
|
|
self.logger.warning(f"[💭] Failed to resolve identity: {result.error or result.data}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
self.logger.exception(f"[💭] Error resolving identity: {e}")
|
|
return None
|
|
|
|
async def store_memory(self, content: str, identities: list, interaction_id: str, modality: str) -> bool:
|
|
"""Store message content in memory"""
|
|
try:
|
|
self.logger.debug(f"[💭] Storing memory for interaction {interaction_id}")
|
|
|
|
# Determine tags based on content patterns
|
|
tags = ["message"]
|
|
content_lower = content.lower()
|
|
if any(greeting in content_lower for greeting in ["hello", "hi", "hey"]):
|
|
tags.append("greeting")
|
|
if any(question in content_lower for question in ["?", "what", "how", "why", "when", "where"]):
|
|
tags.append("question")
|
|
|
|
memory_payload = {
|
|
"content": content,
|
|
"identities": identities,
|
|
"interaction_id": interaction_id,
|
|
"tags": tags,
|
|
"modality": modality,
|
|
"source": "think_service",
|
|
"metadata": {
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"processed_by": "think_service"
|
|
}
|
|
}
|
|
|
|
result = await discovery_client.call_service(
|
|
"memory",
|
|
"store",
|
|
memory_payload,
|
|
timeout=5.0
|
|
)
|
|
|
|
if result.success and result.data and result.data.get('status') == 'stored':
|
|
self.logger.debug(f"[💭] Memory stored successfully: {result.data.get('memory_id')}")
|
|
return True
|
|
else:
|
|
self.logger.warning(f"[💭] Failed to store memory: {result.error or result.data}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
self.logger.exception(f"[💭] Error storing memory: {e}")
|
|
return False
|
|
|
|
async def get_recent_memories(self, identity: str, limit: int = 10) -> list:
|
|
"""Get recent memories for Oracle context"""
|
|
try:
|
|
self.logger.debug(f"[💭] Getting recent memories for {identity}")
|
|
|
|
result = await discovery_client.call_service(
|
|
"memory",
|
|
"search",
|
|
{
|
|
"identities": [identity],
|
|
"limit": limit,
|
|
"requesting_identity": identity
|
|
},
|
|
timeout=3.0
|
|
)
|
|
|
|
if result.success and result.data and result.data.get("results"):
|
|
memories = result.data["results"]
|
|
self.logger.debug(f"[💭] Retrieved {len(memories)} recent memories")
|
|
return memories
|
|
else:
|
|
self.logger.debug(f"[💭] No memories found for {identity}")
|
|
return []
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"[💭] Error getting recent memories: {e}")
|
|
return []
|
|
|
|
# Three-layer memory system methods
|
|
|
|
async def _get_short_memory_for_context(self, identity: str, limit: int = 10) -> list:
|
|
"""
|
|
Get recent literal memories from short-term storage.
|
|
|
|
Use for: immediate conversation context, "what we just discussed"
|
|
"""
|
|
try:
|
|
self.logger.debug(f"[💭] Getting short-term memories for {identity}")
|
|
|
|
result = await discovery_client.call_service(
|
|
"memory",
|
|
"short_memory",
|
|
{
|
|
"limit": limit,
|
|
"identity_id": identity
|
|
},
|
|
timeout=3.0
|
|
)
|
|
|
|
if result.success and result.data and result.data.get("status") == "success":
|
|
memories = result.data.get("memories", [])
|
|
self.logger.debug(f"[💭] Retrieved {len(memories)} short-term memories")
|
|
return memories
|
|
else:
|
|
self.logger.debug(f"[💭] No short-term memories found for {identity}")
|
|
return []
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"[💭] Error getting short-term memories: {e}")
|
|
return []
|
|
|
|
async def _get_long_memory_for_context(self, identity: str, query: str = None, limit: int = 5) -> list:
|
|
"""
|
|
Get summarized memories from long-term storage.
|
|
|
|
Use for: historical context, "what we discussed last week"
|
|
"""
|
|
try:
|
|
self.logger.debug(f"[💭] Getting long-term memories for {identity}")
|
|
|
|
result = await discovery_client.call_service(
|
|
"memory",
|
|
"long_memory",
|
|
{
|
|
"query": query,
|
|
"limit": limit,
|
|
"identity_id": identity
|
|
},
|
|
timeout=5.0
|
|
)
|
|
|
|
if result.success and result.data and result.data.get("status") == "success":
|
|
memories = result.data.get("memories", [])
|
|
self.logger.debug(f"[💭] Retrieved {len(memories)} long-term memories")
|
|
return memories
|
|
else:
|
|
self.logger.debug(f"[💭] No long-term memories found for {identity}")
|
|
return []
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"[💭] Error getting long-term memories: {e}")
|
|
return []
|
|
|
|
async def _get_facts_for_identity(self, identity: str, category: str = None, query: str = '') -> list:
|
|
"""
|
|
Get facts from factual memory storage.
|
|
|
|
Use for: user preferences, birthdays, established knowledge
|
|
"""
|
|
try:
|
|
self.logger.debug(f"[💭] Getting facts for {identity}, category={category}")
|
|
|
|
result = await discovery_client.call_service(
|
|
"memory",
|
|
"facts",
|
|
{
|
|
"query": query,
|
|
"limit": 10,
|
|
"category": category,
|
|
"identity_id": identity
|
|
},
|
|
timeout=3.0
|
|
)
|
|
|
|
if result.success and result.data and result.data.get("status") == "success":
|
|
facts = result.data.get("facts", [])
|
|
self.logger.debug(f"[💭] Retrieved {len(facts)} facts")
|
|
return facts
|
|
else:
|
|
self.logger.debug(f"[💭] No facts found for {identity}")
|
|
return []
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"[💭] Error getting facts: {e}")
|
|
return []
|
|
|
|
async def _save_fact_from_conversation(
|
|
self,
|
|
content: str,
|
|
category: str,
|
|
identities: list,
|
|
mutable: bool = True
|
|
) -> Optional[str]:
|
|
"""
|
|
Save a fact to factual memory storage.
|
|
|
|
Use when: user shares birthday, preferences, permanent knowledge
|
|
|
|
Examples:
|
|
- Birthday: category="personal", mutable=False
|
|
- Preference: category="preferences", mutable=True
|
|
- Knowledge: category="knowledge", mutable=True
|
|
"""
|
|
try:
|
|
self.logger.info(f"[💭] Saving fact: category={category}, content='{content[:50]}...'")
|
|
|
|
result = await discovery_client.call_service(
|
|
"memory",
|
|
"save_fact",
|
|
{
|
|
"content": content,
|
|
"category": category,
|
|
"identities": identities,
|
|
"mutable": mutable,
|
|
"metadata": {
|
|
"source": "think_service",
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
},
|
|
timeout=2.0
|
|
)
|
|
|
|
if result.success and result.data and result.data.get("status") == "success":
|
|
fact_id = result.data.get("fact_id")
|
|
self.logger.info(f"[💭] Fact saved successfully: {fact_id}")
|
|
return fact_id
|
|
else:
|
|
self.logger.warning(f"[💭] Failed to save fact: {result.error or result.data}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"[💭] Error saving fact: {e}")
|
|
return None
|
|
|
|
|
|
|
|
async def iterative_response(self, user_message: str, identity: str, channel: str, modality: str) -> Optional[str]:
|
|
"""
|
|
Generate response using iterative step-by-step reasoning.
|
|
|
|
Oracle decides one step at a time and sophisticated stopping criteria
|
|
ensure efficient completion.
|
|
"""
|
|
logger.info(f"[💭] 🔄 Starting iterative reasoning for: '{user_message[:50]}...'")
|
|
|
|
# Initialize iterative context
|
|
context = IterativeContext(user_message, identity, channel, modality)
|
|
|
|
try:
|
|
# Send initial status to Matrix
|
|
await self.send_output(
|
|
f"🔄 **Iterative Reasoning Started**\n\nAnalyzing your request step by step...",
|
|
channel, modality
|
|
)
|
|
|
|
# Main iterative reasoning loop
|
|
while True:
|
|
# Check stopping criteria
|
|
should_stop, stop_reason = context.should_stop()
|
|
if should_stop:
|
|
logger.info(f"[💭] 🛑 Stopping iteration: {stop_reason}")
|
|
await self.send_output(
|
|
f"🎯 **Reasoning Complete**\n\n{stop_reason}\n\n{context.get_summary()}",
|
|
channel, modality
|
|
)
|
|
break
|
|
|
|
# Goal satisfaction check (every 3 steps)
|
|
if context.step_count > 0 and context.step_count % 3 == 0:
|
|
can_answer = await self._check_goal_satisfaction(context)
|
|
if can_answer:
|
|
context.goal_satisfied = True
|
|
logger.info(f"[💭] ✅ Goal satisfaction confirmed at step {context.step_count}")
|
|
# Will be caught by stopping criteria on next iteration
|
|
continue
|
|
|
|
# Ask Oracle for next step
|
|
logger.info(f"[💭] 🔮 Requesting step {context.step_count + 1} from Oracle")
|
|
next_step = await self._request_next_step(context)
|
|
if not next_step:
|
|
logger.warning(f"[💭] ⚠️ Oracle failed to provide next step")
|
|
await self.send_output(
|
|
f"❌ **Oracle Error**: Failed to get next step decision",
|
|
channel, modality
|
|
)
|
|
break
|
|
|
|
logger.info(f"[💭] 📋 Step {context.step_count + 1}: {next_step.action} -> {next_step.target} (args: {getattr(next_step, 'function_args', {})})")
|
|
|
|
# Show Oracle's decision to user
|
|
await self.send_output(
|
|
f"🧠 **Oracle Decision**: {next_step.action}\n"
|
|
f"**Target**: {next_step.target or 'N/A'}\n"
|
|
f"**Reasoning**: {next_step.reasoning}",
|
|
channel, modality
|
|
)
|
|
|
|
# Check if Oracle signals readiness for synthesis
|
|
if next_step.action == StepAction.SYNTHESIZE_FINAL.value or next_step.ready:
|
|
logger.info(f"[💭] 🎯 Oracle signals ready for synthesis")
|
|
await self.send_output(
|
|
f"🎯 **Ready for Synthesis**: Oracle indicates sufficient information gathered",
|
|
channel, modality
|
|
)
|
|
break
|
|
|
|
# Execute the step directly
|
|
logger.info(f"[💭] ⚙️ Executing step {context.step_count + 1}")
|
|
step_result = await self._execute_step(next_step, context)
|
|
logger.info(f"[💭] ✅ Step {context.step_count + 1} execution completed, success={step_result.success}")
|
|
|
|
context.add_step_result(step_result)
|
|
logger.info(f"[💭] 📝 Step {context.step_count} result added to context (total: {len(context.completed_steps)} completed)")
|
|
|
|
# Send detailed execution result to Matrix
|
|
if step_result.success:
|
|
result_summary = self._format_step_result_for_matrix(step_result)
|
|
await self.send_output(
|
|
f"✅ **Step {context.step_count} Completed**: {next_step.action}\n"
|
|
f"**Execution Time**: {step_result.execution_time_ms:.0f}ms\n"
|
|
f"**Result**: {result_summary}",
|
|
channel, modality
|
|
)
|
|
else:
|
|
await self.send_output(
|
|
f"❌ **Step {context.step_count} Failed**: {step_result.error_message}\n"
|
|
f"**Execution Time**: {step_result.execution_time_ms:.0f}ms",
|
|
channel, modality
|
|
)
|
|
|
|
# Final synthesis
|
|
await self.send_output(
|
|
f"🔮 **Synthesizing Final Response**\n\nCombining insights from {len(context.completed_steps)} successful steps...",
|
|
channel, modality
|
|
)
|
|
|
|
final_response = await self._synthesize_final_response(context)
|
|
|
|
if final_response:
|
|
# Finalize interaction with sentiment/depth detection
|
|
await self._finalize_interaction(
|
|
identity,
|
|
user_message,
|
|
final_response,
|
|
context
|
|
)
|
|
|
|
logger.info(f"[💭] ✅ Iterative reasoning completed successfully")
|
|
return final_response
|
|
else:
|
|
logger.error(f"[💭] ⚠️ Final synthesis failed")
|
|
await self.send_output(
|
|
f"❌ **Synthesis Failed**\n\nI gathered information but couldn't synthesize a final response. Please try rephrasing your question.",
|
|
channel, modality
|
|
)
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.exception(f"[💭] Error in iterative reasoning: {e}")
|
|
await self.send_output(
|
|
f"❌ **Iterative Reasoning Error**\n\nSomething went wrong during the reasoning process: {str(e)}",
|
|
channel, modality
|
|
)
|
|
return None
|
|
|
|
async def _request_next_step(self, context: IterativeContext) -> Optional[ReasoningStep]:
|
|
"""Ask Oracle to decide the next reasoning step"""
|
|
try:
|
|
logger.debug(f"[💭] Requesting next step from Oracle (step {context.step_count + 1})")
|
|
|
|
# Get Lyra's identity with planning voice mode
|
|
lyra_identity = get_identity_for_context("planning")
|
|
|
|
# Format accumulated knowledge as natural language
|
|
knowledge_summary = self._format_knowledge_naturally(context)
|
|
|
|
# Ask Oracle for next step
|
|
oracle_request = {
|
|
"type": "iterative_reasoning",
|
|
"content": f"""{lyra_identity}
|
|
|
|
You are engaging with {context.identity}.
|
|
|
|
CURRENT REQUEST: "{context.original_message}"
|
|
|
|
{knowledge_summary}
|
|
|
|
Choose your next action:
|
|
|
|
AVAILABLE FUNCTIONS:
|
|
|
|
Memory (Three Layers):
|
|
- short_memory(n=10) - Get the n most recent literal memories
|
|
- short_memory(n=10, offset=5) - Get n memories starting from offset back (for pagination)
|
|
- long_memory(query="topic", n=5) - Get n long-term summarized memories related to query (or random if query=None)
|
|
- facts(query="topic", n=5) - Get n most relevant facts related to query
|
|
- save_fact(content="...", category="...", mutable=True/False) - Save a new fact
|
|
Categories: "personal" (immutable facts like birthdays), "preferences" (likes/dislikes), "knowledge" (learned info), "general"
|
|
Set mutable=False for unchangeable facts (birthdays), mutable=True for preferences that may change
|
|
- update_fact(fact_id="uuid-123", new_content="Updated fact") - Update existing fact (only if mutable)
|
|
|
|
Information:
|
|
- identity(person="alex") - Get single person's full identity & attributes
|
|
- search_relationships(entity_type="pet", min_trust=0.7) - Query multiple entities
|
|
- health() - Check system status
|
|
- duckduckgo(query="weather in tokyo", limit=3) - Search DuckDuckGo instant answers (on-demand)
|
|
|
|
Relationships:
|
|
- introduce(name="Harvey", entity_type="pet", relationships=["family","companion"], context="Alex's dog", attributes={{"species":"dog","breed":"golden_retriever"}}) - Create new entity
|
|
- update_relationship(person="alex", trust_delta=0.0, intimacy_delta=0.15, reason="vulnerable moment") - Update relationship explicitly
|
|
- add_attribute(person="alex", key="favorite_food", value="pasta") - Remember new information
|
|
- link_identity(external_id="@someone:matrix.org", internal_id="someone", confidence=0.85) - Connect external ID to internal
|
|
|
|
Meta:
|
|
- ready() - Signal you have enough info to answer
|
|
|
|
EXAMPLES:
|
|
short_memory(n=5) // Get last 5 messages
|
|
short_memory(n=10, offset=5) // Get 10 messages starting from 5 back
|
|
long_memory(query="cooking preferences", n=3) // Find relevant historical context
|
|
facts(query="birthday", n=5) // Find birthday facts
|
|
facts(query="food", n=3) // Find food-related facts
|
|
save_fact(content="Alex's birthday is May 15th", category="personal", mutable=False) // Immutable personal fact
|
|
save_fact(content="Alex prefers Italian food", category="preferences", mutable=True) // Mutable preference
|
|
save_fact(content="Python uses duck typing", category="knowledge", mutable=True) // Learned knowledge
|
|
update_fact(fact_id="abc-123", new_content="Alex now prefers Thai food") // Update mutable preference
|
|
identity(person="alex") // Get Alex's full context
|
|
add_attribute(person="alex", key="favorite_mountain", value="Pikes Peak") // Remember preference
|
|
introduce(name="Curie", entity_type="pet", relationships=["family"], context="Alex's cat", attributes={{"species":"cat"}}) // New entity
|
|
duckduckgo(query="python list comprehension", limit=3) // Search for quick answers
|
|
|
|
STRATEGY:
|
|
- Use short_memory() for recent conversation context (what was just said)
|
|
- Use long_memory() for historical patterns and past discussions (weeks/months ago)
|
|
- Use facts() for established knowledge (birthdays, preferences, learned information)
|
|
- Save important discoverable facts with save_fact() (choose appropriate category and mutability)
|
|
- Update changed preferences with update_fact() (requires fact_id from facts() query)
|
|
- Use identity() for person details, search_relationships() for entities
|
|
- Call ready() when you have enough information to answer the user's question
|
|
|
|
NOTE: Classification (sentiment, emotions, intent) and creative tasks (writing, poetry) are handled during synthesis.
|
|
|
|
Respond with just the function call and optional reasoning:
|
|
function_name(args)
|
|
// Optional: Brief reason why""",
|
|
"identity": context.identity,
|
|
"context": {} # No longer passing technical tracking data
|
|
}
|
|
|
|
# Send to Oracle and get response
|
|
logger.debug(f"[💭] Sending request to Oracle for step {context.step_count + 1}")
|
|
result = await discovery_client.call_service(
|
|
"oracle", "process", oracle_request, timeout=30.0
|
|
)
|
|
oracle_response = result.data if result.success else None
|
|
|
|
if not oracle_response or not oracle_response.get("content"):
|
|
self.logger.warning(f"[💭] No response from Oracle for next step")
|
|
return None
|
|
|
|
# Parse Oracle's function call decision
|
|
content = oracle_response["content"].strip()
|
|
logger.info(f"[💭] ✅ Oracle responded for step {context.step_count + 1}: {content[:100]}...")
|
|
logger.debug(f"[💭] Full Oracle response: {content}")
|
|
|
|
try:
|
|
# Parse function call with arguments
|
|
function_call_data = self._parse_function_call(content)
|
|
|
|
if not function_call_data:
|
|
logger.warning(f"[💭] No function call found in Oracle response")
|
|
return None
|
|
|
|
function_name = function_call_data['function']
|
|
function_args = function_call_data['args']
|
|
reasoning = function_call_data['reasoning']
|
|
|
|
logger.info(f"[💭] 🔍 Parsed function call for step {context.step_count + 1}: {function_name}({function_args})")
|
|
|
|
# All functions except ready() map to CALL_SERVICE
|
|
if function_name == 'ready':
|
|
action = StepAction.SYNTHESIZE_FINAL.value
|
|
target = None
|
|
else:
|
|
action = StepAction.CALL_SERVICE.value
|
|
target = function_name # Use function name as target
|
|
|
|
# Create ReasoningStep with function args stored for execution
|
|
next_step = ReasoningStep(
|
|
action=action,
|
|
target=target,
|
|
reasoning=reasoning,
|
|
ready=(function_name == 'ready')
|
|
)
|
|
# Store function args in the step for later execution
|
|
next_step.function_args = function_args
|
|
|
|
logger.info(f"[💭] ✓ Created ReasoningStep for step {context.step_count + 1}: {function_name}({function_args}) -> {action}")
|
|
return next_step
|
|
|
|
except Exception as e:
|
|
logger.error(f"[💭] Error parsing function call: {e}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"[💭] Error requesting next step: {e}")
|
|
return None
|
|
|
|
|
|
def _format_step_result_for_matrix(self, step_result: StepResult) -> str:
|
|
"""Format step execution result for Matrix display"""
|
|
try:
|
|
if not step_result.result_data:
|
|
return "No data returned"
|
|
|
|
result_data = step_result.result_data
|
|
|
|
# Format based on step type
|
|
if step_result.step.action == StepAction.CALL_SERVICE.value:
|
|
service = step_result.step.target
|
|
|
|
if service == "short_memory":
|
|
count = result_data.get("count", 0)
|
|
offset = result_data.get("offset", 0)
|
|
if offset > 0:
|
|
return f"Retrieved {count} recent memories (starting from {offset} back)"
|
|
return f"Retrieved {count} recent memories"
|
|
|
|
elif service == "long_memory":
|
|
count = result_data.get("count", 0)
|
|
query = result_data.get("query")
|
|
if query:
|
|
return f"Retrieved {count} historical summaries for '{query}'"
|
|
return f"Retrieved {count} historical summaries"
|
|
|
|
elif service == "facts":
|
|
count = result_data.get("count", 0)
|
|
query = result_data.get("query")
|
|
if query:
|
|
return f"Found {count} facts about '{query}'"
|
|
return f"Found {count} facts"
|
|
|
|
elif service == "save_fact":
|
|
if result_data.get("status") == "success":
|
|
return f"✓ Fact saved"
|
|
return f"✗ Failed to save fact"
|
|
|
|
elif service == "update_fact":
|
|
if result_data.get("status") == "success":
|
|
return f"✓ Fact updated"
|
|
return f"✗ Failed to update fact"
|
|
|
|
elif service == "memory":
|
|
count = result_data.get("count", 0)
|
|
return f"Retrieved {count} memories [legacy]"
|
|
|
|
elif service == "identity":
|
|
identity = result_data.get("identity", {})
|
|
name = identity.get("name", "unknown")
|
|
return f"Identity resolved: {name}"
|
|
|
|
elif service == "health":
|
|
status = result_data.get("status", "unknown")
|
|
cluster_health = result_data.get("cluster_health", {})
|
|
healthy_nodes = cluster_health.get("healthy_nodes", 0)
|
|
total_nodes = cluster_health.get("total_nodes", 0)
|
|
issues_count = cluster_health.get("issues_count", 0)
|
|
|
|
if status == "error":
|
|
return f"Health check failed: {result_data.get('error', 'Unknown error')}"
|
|
elif issues_count > 0:
|
|
return f"System status: {status} ({healthy_nodes}/{total_nodes} nodes healthy, {issues_count} issues)"
|
|
else:
|
|
return f"System status: {status} ({healthy_nodes}/{total_nodes} nodes healthy)"
|
|
|
|
elif service == "duckduckgo":
|
|
success = result_data.get("success", False)
|
|
if success:
|
|
query = result_data.get("query", "")
|
|
answer = result_data.get("answer")
|
|
results_count = len(result_data.get("results", []))
|
|
topics_count = len(result_data.get("related_topics", []))
|
|
|
|
if answer:
|
|
return f"🦆 Found instant answer for '{query}'"
|
|
elif results_count > 0 or topics_count > 0:
|
|
return f"🦆 Found {results_count} results, {topics_count} topics for '{query}'"
|
|
else:
|
|
return f"🦆 No results for '{query}'"
|
|
else:
|
|
return f"🦆 Search failed: {result_data.get('error', 'unknown error')}"
|
|
|
|
elif service == "plugin":
|
|
checked = result_data.get("plugin_actions_checked", False)
|
|
return f"Plugin actions {'checked' if checked else 'failed'}"
|
|
|
|
else:
|
|
return f"Service call to {service} completed"
|
|
|
|
elif step_result.step.action == StepAction.CHECK_GOAL_SATISFACTION.value:
|
|
can_answer = result_data
|
|
return f"Goal check: {'Can answer' if can_answer else 'Need more info'}"
|
|
|
|
else:
|
|
# Generic formatting
|
|
if isinstance(result_data, dict):
|
|
key_count = len(result_data)
|
|
return f"Returned {key_count} data fields"
|
|
elif isinstance(result_data, list):
|
|
item_count = len(result_data)
|
|
return f"Returned {item_count} items"
|
|
else:
|
|
return str(result_data)[:100] + "..." if len(str(result_data)) > 100 else str(result_data)
|
|
|
|
except Exception as e:
|
|
logger.error(f"[💭] Error formatting step result: {e}")
|
|
return "Result formatting error"
|
|
|
|
async def _execute_step(self, step: ReasoningStep, context: IterativeContext) -> StepResult:
|
|
"""Execute a validated reasoning step"""
|
|
start_time = datetime.utcnow()
|
|
step_exec_id = f"{context.step_count + 1}_{datetime.utcnow().timestamp()}"
|
|
|
|
try:
|
|
logger.debug(f"[💭] [{step_exec_id}] Executing step: {step.action} -> {step.target}")
|
|
|
|
if step.action == StepAction.CALL_SERVICE.value:
|
|
# Execute service call
|
|
logger.debug(f"[💭] [{step_exec_id}] Calling _execute_service_call for {step.target}")
|
|
result_data = await self._execute_service_call(step, context, step_exec_id)
|
|
|
|
elif step.action == StepAction.CHECK_GOAL_SATISFACTION.value:
|
|
# Check if we can answer the original question
|
|
result_data = await self._check_goal_satisfaction(context)
|
|
|
|
else:
|
|
# Unknown action
|
|
return StepResult(
|
|
step=step,
|
|
success=False,
|
|
error_message=f"Unknown step action: {step.action}",
|
|
execution_time_ms=(datetime.utcnow() - start_time).total_seconds() * 1000
|
|
)
|
|
|
|
# Calculate execution time
|
|
execution_time_ms = (datetime.utcnow() - start_time).total_seconds() * 1000
|
|
|
|
return StepResult(
|
|
step=step,
|
|
success=True,
|
|
result_data=result_data,
|
|
execution_time_ms=execution_time_ms
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"[💭] Error executing step: {e}")
|
|
execution_time_ms = (datetime.utcnow() - start_time).total_seconds() * 1000
|
|
|
|
return StepResult(
|
|
step=step,
|
|
success=False,
|
|
error_message=str(e),
|
|
execution_time_ms=execution_time_ms
|
|
)
|
|
|
|
async def _execute_service_call(self, step: ReasoningStep, context: IterativeContext, step_exec_id: str = "unknown") -> Any:
|
|
"""Execute a service call step with function arguments"""
|
|
function_name = step.target
|
|
function_args = getattr(step, 'function_args', {})
|
|
logger.info(f"[💭] [{step_exec_id}] Executing function: {function_name}({function_args})")
|
|
|
|
# Memory functions (three-layer system)
|
|
if function_name == "short_memory":
|
|
n = function_args.get('n', 10)
|
|
offset = function_args.get('offset', 0)
|
|
|
|
result = await discovery_client.call_service(
|
|
"memory",
|
|
"short_memory",
|
|
{
|
|
"limit": n,
|
|
"offset": offset,
|
|
"identity_id": context.identity
|
|
},
|
|
timeout=3.0
|
|
)
|
|
|
|
if result.success and result.data.get("status") == "success":
|
|
memories = result.data.get("memories", [])
|
|
return {
|
|
"success": True,
|
|
"memories": memories,
|
|
"count": len(memories),
|
|
"type": "short_term",
|
|
"offset": offset
|
|
}
|
|
return {
|
|
"success": False,
|
|
"memories": [],
|
|
"count": 0,
|
|
"type": "short_term",
|
|
"error": "Failed to fetch short-term memories"
|
|
}
|
|
|
|
elif function_name == "long_memory":
|
|
query = function_args.get('query')
|
|
n = function_args.get('n', 5)
|
|
|
|
result = await discovery_client.call_service(
|
|
"memory",
|
|
"long_memory",
|
|
{
|
|
"query": query,
|
|
"limit": n,
|
|
"identity_id": context.identity
|
|
},
|
|
timeout=5.0
|
|
)
|
|
|
|
if result.success and result.data.get("status") == "success":
|
|
memories = result.data.get("memories", [])
|
|
return {
|
|
"success": True,
|
|
"memories": memories,
|
|
"count": len(memories),
|
|
"type": "long_term",
|
|
"query": query
|
|
}
|
|
return {
|
|
"success": False,
|
|
"memories": [],
|
|
"count": 0,
|
|
"type": "long_term",
|
|
"error": "Failed to fetch long-term memories"
|
|
}
|
|
|
|
elif function_name == "facts":
|
|
query = function_args.get('query', '')
|
|
n = function_args.get('n', 5)
|
|
|
|
result = await discovery_client.call_service(
|
|
"memory",
|
|
"facts",
|
|
{
|
|
"query": query,
|
|
"limit": n,
|
|
"identity_id": context.identity
|
|
},
|
|
timeout=3.0
|
|
)
|
|
|
|
if result.success and result.data.get("status") == "success":
|
|
facts = result.data.get("facts", [])
|
|
return {
|
|
"success": True,
|
|
"facts": facts,
|
|
"count": len(facts),
|
|
"query": query
|
|
}
|
|
return {
|
|
"success": False,
|
|
"facts": [],
|
|
"count": 0,
|
|
"error": "Failed to fetch facts"
|
|
}
|
|
|
|
elif function_name == "save_fact":
|
|
content = function_args.get('content')
|
|
category = function_args.get('category', 'general')
|
|
mutable = function_args.get('mutable', True)
|
|
|
|
if not content:
|
|
return {
|
|
"success": False,
|
|
"status": "error",
|
|
"error": "content is required"
|
|
}
|
|
|
|
logger.info(f"[💭] [{step_exec_id}] Calling memory service save_fact: category={category}, content='{content[:50]}...'")
|
|
|
|
result = await discovery_client.call_service(
|
|
"memory",
|
|
"save_fact",
|
|
{
|
|
"content": content,
|
|
"category": category,
|
|
"identities": [context.identity],
|
|
"mutable": mutable,
|
|
"metadata": {
|
|
"source": "oracle_decision",
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"step_exec_id": step_exec_id # Track which step execution created this
|
|
}
|
|
},
|
|
timeout=5.0, # Increased from 2.0s - ChromaDB embedding generation can take 2-3 seconds
|
|
retry_attempts=1 # Disable retries for write operations to prevent duplicates
|
|
)
|
|
|
|
if result.success and result.data.get("status") == "success":
|
|
fact_id = result.data.get("fact_id")
|
|
logger.info(f"[💭] [{step_exec_id}] 💾 Saved fact: {fact_id[:8]}... {content[:50]}...")
|
|
return {
|
|
"success": True,
|
|
"status": "success",
|
|
"fact_id": fact_id,
|
|
"message": f"Saved fact: {content[:50]}..."
|
|
}
|
|
|
|
logger.warning(f"[💭] [{step_exec_id}] Failed to save fact: {result.error if not result.success else result.data.get('error')}")
|
|
return {
|
|
"success": False,
|
|
"status": "error",
|
|
"error": "Failed to save fact"
|
|
}
|
|
|
|
elif function_name == "update_fact":
|
|
fact_id = function_args.get('fact_id')
|
|
new_content = function_args.get('new_content')
|
|
|
|
if not fact_id or not new_content:
|
|
return {
|
|
"success": False,
|
|
"status": "error",
|
|
"error": "fact_id and new_content required"
|
|
}
|
|
|
|
result = await discovery_client.call_service(
|
|
"memory",
|
|
"update_fact",
|
|
{
|
|
"fact_id": fact_id,
|
|
"new_content": new_content,
|
|
"identity_id": context.identity, # Validate ownership
|
|
"metadata": {
|
|
"updated_by": "oracle_decision",
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
},
|
|
timeout=5.0, # Increased from 2.0s - ChromaDB operations can take time
|
|
retry_attempts=1 # Disable retries for write operations to prevent duplicates
|
|
)
|
|
|
|
if result.success and result.data.get("status") == "success":
|
|
logger.info(f"[💭] ✏️ Updated fact: {fact_id}")
|
|
return {
|
|
"success": True,
|
|
"status": "success",
|
|
"message": f"Updated fact {fact_id}"
|
|
}
|
|
|
|
# Get detailed error message from response
|
|
error_msg = result.data.get("error", "Failed to update fact") if result.success else "Service call failed"
|
|
return {
|
|
"success": False,
|
|
"status": "error",
|
|
"error": error_msg
|
|
}
|
|
|
|
# Information functions
|
|
elif function_name == "identity":
|
|
person = function_args.get('person', context.identity)
|
|
result = await discovery_client.call_service(
|
|
"identity", "get", {"internal_id": person}, timeout=10.0
|
|
)
|
|
if result.success:
|
|
return {"identity": result.data}
|
|
else:
|
|
return {"error": "Failed to get identity"}
|
|
|
|
elif function_name == "search_relationships":
|
|
result = await discovery_client.call_service(
|
|
"identity", "search", function_args, timeout=10.0
|
|
)
|
|
if result.success:
|
|
return {"relationships": result.data}
|
|
else:
|
|
return {"error": "Failed to search relationships"}
|
|
|
|
elif function_name == "health":
|
|
health_status = await self._get_health_status()
|
|
return {"health": health_status}
|
|
|
|
elif function_name == "duckduckgo":
|
|
query = function_args.get('query', '')
|
|
limit = function_args.get('limit', 5)
|
|
|
|
if not query:
|
|
return {
|
|
"success": False,
|
|
"error": "query is required"
|
|
}
|
|
|
|
logger.info(f"[💭] [{step_exec_id}] Executing DuckDuckGo plugin: query='{query}'")
|
|
|
|
result = await discovery_client.call_service(
|
|
"plugin-orchestrator",
|
|
"execute_plugin",
|
|
{
|
|
"plugin_name": "duckduckgo",
|
|
"operation": "search",
|
|
"parameters": {
|
|
"query": query,
|
|
"limit": limit
|
|
},
|
|
"identity": context.identity,
|
|
"timeout": 15
|
|
},
|
|
timeout=20.0, # Slightly longer than plugin timeout
|
|
retry_attempts=1 # No retries for on-demand plugins
|
|
)
|
|
|
|
if result.success and result.data.get("status") == "success":
|
|
plugin_result = result.data.get("result", {})
|
|
logger.info(f"[💭] [{step_exec_id}] DuckDuckGo search completed")
|
|
return {
|
|
"success": True,
|
|
"query": query,
|
|
"answer": plugin_result.get("answer"),
|
|
"abstract": plugin_result.get("abstract"),
|
|
"definition": plugin_result.get("definition"),
|
|
"results": plugin_result.get("results", []),
|
|
"related_topics": plugin_result.get("related_topics", [])
|
|
}
|
|
|
|
logger.warning(f"[💭] [{step_exec_id}] DuckDuckGo search failed")
|
|
return {
|
|
"success": False,
|
|
"error": result.data.get("error", "Plugin execution failed") if result.success else result.error
|
|
}
|
|
|
|
# Relationship modification functions
|
|
elif function_name == "introduce":
|
|
# Map function args to service payload
|
|
payload = {
|
|
"name": function_args.get('name'),
|
|
"entity_type": function_args.get('entity_type', 'human'),
|
|
"relationship_types": function_args.get('relationships', []),
|
|
"context": function_args.get('context', ''),
|
|
"attributes": function_args.get('attributes', {}),
|
|
"introduced_by": context.identity
|
|
}
|
|
result = await discovery_client.call_service(
|
|
"identity", "introduce", payload, timeout=10.0
|
|
)
|
|
if result.success:
|
|
logger.info(f"[💭] 👋 Introduced new entity: {function_args.get('name')}")
|
|
return {"introduced": result.data}
|
|
else:
|
|
return {"error": "Failed to introduce entity"}
|
|
|
|
elif function_name == "add_attribute":
|
|
payload = {
|
|
"internal_id": function_args.get('person'),
|
|
"attribute_key": function_args.get('key'),
|
|
"attribute_value": function_args.get('value')
|
|
}
|
|
result = await discovery_client.call_service(
|
|
"identity", "add_attribute", payload, timeout=10.0
|
|
)
|
|
if result.success:
|
|
logger.info(f"[💭] 📝 Added attribute: {payload['internal_id']}.{payload['attribute_key']}")
|
|
return {"attribute_added": True}
|
|
else:
|
|
return {"error": "Failed to add attribute"}
|
|
|
|
elif function_name == "update_relationship":
|
|
payload = {
|
|
"internal_id": function_args.get('person'),
|
|
"trust_delta": function_args.get('trust_delta', 0.0),
|
|
"intimacy_delta": function_args.get('intimacy_delta', 0.0),
|
|
"reason": function_args.get('reason', 'oracle_update'),
|
|
"interaction_summary": f"Oracle explicit update: {function_args.get('reason', '')}"
|
|
}
|
|
result = await discovery_client.call_service(
|
|
"identity", "update", payload, timeout=10.0
|
|
)
|
|
if result.success:
|
|
logger.info(f"[💭] 💚 Updated relationship: {payload['internal_id']}")
|
|
return {"relationship_updated": True}
|
|
else:
|
|
return {"error": "Failed to update relationship"}
|
|
|
|
elif function_name == "link_identity":
|
|
payload = {
|
|
"external_id": function_args.get('external_id'),
|
|
"internal_id": function_args.get('internal_id'),
|
|
"identity_type": function_args.get('identity_type', 'unknown'),
|
|
"verified": function_args.get('confidence', 0.0) > 0.9
|
|
}
|
|
result = await discovery_client.call_service(
|
|
"identity", "link", payload, timeout=10.0
|
|
)
|
|
if result.success:
|
|
logger.info(f"[💭] 🔗 Linked identity: {payload['external_id']} -> {payload['internal_id']}")
|
|
return {"identity_linked": True}
|
|
else:
|
|
return {"error": "Failed to link identity"}
|
|
|
|
else:
|
|
raise ValueError(f"Unknown function: {function_name}")
|
|
|
|
async def _check_goal_satisfaction(self, context: IterativeContext) -> bool:
|
|
"""Check if we have sufficient information to answer the original question"""
|
|
try:
|
|
logger.debug(f"[💭] Checking goal satisfaction")
|
|
|
|
# Get Lyra's identity with planning voice mode
|
|
lyra_identity = get_identity_for_context("planning")
|
|
|
|
# Format accumulated knowledge as natural language
|
|
knowledge_summary = self._format_knowledge_naturally(context)
|
|
|
|
oracle_request = {
|
|
"type": "goal_check",
|
|
"content": f"""{lyra_identity}
|
|
|
|
You are engaging with {context.identity}.
|
|
|
|
Evaluate whether you have sufficient information to provide a complete, helpful answer to the user's request.
|
|
|
|
ORIGINAL REQUEST: "{context.original_message}"
|
|
|
|
{knowledge_summary}
|
|
|
|
EVALUATION CRITERIA:
|
|
- Can you address the main points of the user's request?
|
|
- Do you have enough specific information to be helpful?
|
|
- Are there critical gaps that would make your answer incomplete or unhelpful?
|
|
|
|
Respond with JSON indicating your assessment:
|
|
|
|
{{"can_answer": true/false, "reasoning": "Brief explanation of why you can or cannot provide a complete answer"}}""",
|
|
"identity": context.identity,
|
|
"context": {} # No longer passing technical tracking data
|
|
}
|
|
|
|
# Ask Oracle
|
|
result = await discovery_client.call_service(
|
|
"oracle", "process", oracle_request, timeout=15.0
|
|
)
|
|
oracle_response = result.data if result.success else None
|
|
|
|
if oracle_response and oracle_response.get("content"):
|
|
try:
|
|
result = json.loads(oracle_response["content"])
|
|
can_answer = result.get("can_answer", False)
|
|
reasoning = result.get("reasoning", "")
|
|
logger.debug(f"[💭] Goal satisfaction check: {can_answer} - {reasoning}")
|
|
return can_answer
|
|
except json.JSONDecodeError:
|
|
# If not JSON, check for keywords
|
|
content = oracle_response["content"].lower()
|
|
return "yes" in content or "can answer" in content or "sufficient" in content
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"[💭] Error checking goal satisfaction: {e}")
|
|
return False
|
|
|
|
async def _synthesize_final_response(self, context: IterativeContext) -> Optional[str]:
|
|
"""Synthesize final response from accumulated knowledge"""
|
|
try:
|
|
logger.debug(f"[💭] Synthesizing final response from {len(context.completed_steps)} steps")
|
|
|
|
# Get Lyra's identity with voice guide - she chooses appropriate tone
|
|
lyra_identity = get_identity_for_synthesis(include_voice_guide=True)
|
|
|
|
# Format accumulated knowledge as natural language
|
|
knowledge_summary = self._format_knowledge_naturally(context)
|
|
|
|
oracle_request = {
|
|
"type": "synthesis",
|
|
"content": f"""{lyra_identity}
|
|
|
|
You are engaging with {context.identity}.
|
|
|
|
You have completed a step-by-step reasoning process. Now synthesize this into a comprehensive, helpful response.
|
|
|
|
ORIGINAL REQUEST: "{context.original_message}"
|
|
|
|
{knowledge_summary}
|
|
|
|
SYNTHESIS INSTRUCTIONS:
|
|
- Create a natural, conversational response that directly addresses the user's request
|
|
- Integrate insights from all the information you gathered during reasoning
|
|
- Be specific and actionable when appropriate
|
|
- If you gathered system information, present it clearly
|
|
- If you found relevant memories or context, incorporate them naturally
|
|
- Handle any needed classification (sentiment, emotions, intent) or creative tasks (writing, poetry, styling) directly in your response
|
|
- Make the response feel cohesive, not like a list of separate findings
|
|
|
|
GOAL: Provide a complete, helpful answer that shows you understood their request and used the gathered information effectively.""",
|
|
"identity": context.identity,
|
|
"context": {} # No longer passing technical tracking data
|
|
}
|
|
|
|
# Get final response from Oracle
|
|
result = await discovery_client.call_service(
|
|
"oracle", "process", oracle_request, timeout=30.0
|
|
)
|
|
oracle_response = result.data if result.success else None
|
|
|
|
if oracle_response and oracle_response.get("content"):
|
|
final_response = oracle_response["content"]
|
|
logger.debug(f"[💭] Final synthesis complete: {len(final_response)} characters")
|
|
return final_response
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"[💭] Error synthesizing final response: {e}")
|
|
return None
|
|
|
|
async def _get_health_status(self) -> Dict[str, Any]:
|
|
"""Get system health status from health service"""
|
|
try:
|
|
logger.debug(f"[💭] Requesting health status from health service")
|
|
|
|
# Generate unique request ID
|
|
request_id = f"health_req_{int(datetime.utcnow().timestamp() * 1000)}_{uuid4().hex[:8]}"
|
|
|
|
# Request health status from health service
|
|
health_request = {
|
|
"request_id": request_id,
|
|
"requesting_service": "think_service",
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
result = await discovery_client.call_service(
|
|
"health", "status", health_request, timeout=10.0
|
|
)
|
|
health_response = result.data if result.success else None
|
|
|
|
if not health_response:
|
|
logger.warning(f"[💭] No response from health service")
|
|
return {
|
|
"status": "unknown",
|
|
"error": "Health service did not respond",
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
if health_response.get("error"):
|
|
logger.warning(f"[💭] Health service returned error: {health_response['error']}")
|
|
return {
|
|
"status": "error",
|
|
"error": health_response["error"],
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
# Extract useful information from health response
|
|
summary = health_response.get("summary", {})
|
|
node_states = health_response.get("node_states", {})
|
|
|
|
# Transform to format expected by iterative reasoning
|
|
result = {
|
|
"status": summary.get("overall_status", "unknown"),
|
|
"cluster_health": {
|
|
"overall_status": summary.get("overall_status"),
|
|
"healthy_nodes": summary.get("healthy_nodes", 0),
|
|
"total_nodes": summary.get("total_nodes", 0),
|
|
"issues_count": summary.get("issues_count", 0),
|
|
"cluster_issues": summary.get("cluster_issues", [])
|
|
},
|
|
"node_summary": {
|
|
node_id: {
|
|
"status": node_info.get("status", "unknown"),
|
|
"issues": len(node_info.get("issues", []))
|
|
} for node_id, node_info in node_states.items()
|
|
},
|
|
"timestamp": health_response.get("timestamp", datetime.utcnow().isoformat())
|
|
}
|
|
|
|
logger.debug(f"[💭] Health status retrieved: {result['status']} ({result['cluster_health']['healthy_nodes']}/{result['cluster_health']['total_nodes']} nodes healthy)")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"[💭] Error getting health status: {e}")
|
|
return {
|
|
"status": "error",
|
|
"error": str(e),
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
async def _handle_iterative_flow(self, content: str, identity: str, channel: str, modality: str, interaction_id: str) -> Optional[str]:
|
|
"""Handle clean iterative reasoning flow with no predefined steps"""
|
|
try:
|
|
logger.info(f"[💭] 🔄 Starting clean iterative flow for {interaction_id}")
|
|
|
|
# Store user message first (required for memory context)
|
|
memory_stored = await self.store_memory(
|
|
content, [identity], interaction_id, modality
|
|
)
|
|
if not memory_stored:
|
|
logger.warning(f"[💭] Failed to store memory for {interaction_id}")
|
|
|
|
# Start iterative reasoning with clean slate
|
|
response_content = await self.iterative_response(content, identity, channel, modality)
|
|
|
|
# Store Lyra's response
|
|
if response_content:
|
|
lyra_memory_stored = await self.store_memory(
|
|
response_content, ['lyra', identity], interaction_id, modality
|
|
)
|
|
if not lyra_memory_stored:
|
|
logger.warning(f"[💭] Failed to store Lyra's response memory")
|
|
|
|
return response_content
|
|
|
|
except Exception as e:
|
|
logger.exception(f"[💭] Error in iterative flow: {e}")
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def send_output(self, content: str, target: str, modality: str, target_type: str = None) -> bool:
|
|
"""Send response directly to plugins via NATS"""
|
|
try:
|
|
# Determine if target is user_id or channel
|
|
if target_type is None:
|
|
target_type = 'user_id' if target.startswith('@') else 'channel'
|
|
|
|
logger.info(f"[💭] 📤 Sending output to {modality} {target_type} {target}: '{content[:50]}...'")
|
|
|
|
output_payload = {
|
|
"type": "lyra.output.send",
|
|
"data": {
|
|
"content": content,
|
|
"channel": target,
|
|
"modality": modality,
|
|
"metadata": {
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"sent_by": "think_service",
|
|
"target_type": target_type
|
|
}
|
|
}
|
|
}
|
|
|
|
# Add user_id to metadata for Matrix plugin DM creation
|
|
if target_type == 'user_id':
|
|
output_payload["data"]["metadata"]["user_id"] = target
|
|
|
|
# Just publish the event - plugins will handle it directly
|
|
await event_bus.emit("lyra.output.send", output_payload)
|
|
logger.info(f"[💭] ✓ Output event published successfully")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.exception(f"[💭] ❌ Error publishing output: {e}")
|
|
return False
|
|
|
|
async def check_plugin_actions(self, content: str, identity: str, interaction_id: str, modality: str):
|
|
"""Check if the response content suggests plugin actions to take"""
|
|
try:
|
|
# Simple keyword-based action detection
|
|
# In a more sophisticated system, this would be handled by intent analysis
|
|
|
|
actions = []
|
|
|
|
# Check for console output keywords
|
|
console_keywords = ["show", "display", "output", "print", "console"]
|
|
if any(keyword in content.lower() for keyword in console_keywords):
|
|
actions.append({
|
|
"action": "console.print",
|
|
"method": "console_output",
|
|
"content": content,
|
|
"identity": identity,
|
|
"interaction_id": interaction_id,
|
|
"modality": modality,
|
|
"tone": {"neutral": 0.7},
|
|
"mood": {"neutral": 0.7},
|
|
"ritual": False
|
|
})
|
|
|
|
# Check for test/echo actions
|
|
test_keywords = ["test", "echo", "ping"]
|
|
if any(keyword in content.lower() for keyword in test_keywords):
|
|
actions.append({
|
|
"action": "test.ping",
|
|
"method": "test_plugin",
|
|
"content": content,
|
|
"identity": identity,
|
|
"interaction_id": interaction_id,
|
|
"modality": modality,
|
|
"tone": {"curiosity": 0.6},
|
|
"mood": {"curiosity": 0.6},
|
|
"ritual": False
|
|
})
|
|
|
|
# Dispatch actions
|
|
for action_payload in actions:
|
|
try:
|
|
await event_bus.emit("lyra.action.requested", action_payload)
|
|
logger.debug(f"[💭] Requested plugin action: {action_payload['action']}")
|
|
except Exception as e:
|
|
logger.warning(f"[💭] Failed to request plugin action: {e}")
|
|
|
|
except Exception as e:
|
|
logger.exception(f"[💭] Error checking plugin actions: {e}")
|
|
|
|
def detect_intent(self, content: str, identity: str) -> Dict[str, Any]:
|
|
"""Simple intent detection based on content analysis"""
|
|
intent = {
|
|
"action": None,
|
|
"method": None,
|
|
"confidence": 0.0,
|
|
"parameters": {}
|
|
}
|
|
|
|
content_lower = content.lower()
|
|
|
|
# Test/echo intents
|
|
if any(word in content_lower for word in ["test", "ping", "echo"]):
|
|
intent["action"] = "test.ping"
|
|
intent["method"] = "test_plugin"
|
|
intent["confidence"] = 0.8
|
|
intent["parameters"]["message"] = content
|
|
|
|
# Console output intents
|
|
elif any(word in content_lower for word in ["show", "display", "print"]):
|
|
intent["action"] = "console.print"
|
|
intent["method"] = "console_output"
|
|
intent["confidence"] = 0.7
|
|
intent["parameters"]["message"] = content
|
|
|
|
# Debug output intents
|
|
elif any(word in content_lower for word in ["debug", "inspect", "analyze"]):
|
|
intent["action"] = "debug.output"
|
|
intent["method"] = "console_output"
|
|
intent["confidence"] = 0.6
|
|
intent["parameters"]["message"] = content
|
|
|
|
return intent
|
|
|
|
async def _finalize_interaction(
|
|
self,
|
|
identity: str,
|
|
user_message: str,
|
|
response_content: str,
|
|
context: IterativeContext
|
|
):
|
|
"""
|
|
Finalize interaction by asking oracle to analyze sentiment/depth.
|
|
Called after synthesis completes.
|
|
"""
|
|
try:
|
|
# Build analysis request with same rich context used in synthesis
|
|
knowledge_summary = self._format_knowledge_naturally(context)
|
|
|
|
analysis_request = {
|
|
"type": "interaction_analysis",
|
|
"original_message": user_message,
|
|
"lyra_response": response_content,
|
|
"knowledge_summary": knowledge_summary,
|
|
"identity": identity,
|
|
"metadata": {
|
|
"step_count": len(context.completed_steps),
|
|
"services_called": list(context.service_call_counts.keys()),
|
|
"response_length": len(response_content)
|
|
}
|
|
}
|
|
|
|
# Ask oracle to analyze the interaction
|
|
logger.debug(f"[💭] Requesting interaction analysis from oracle...")
|
|
result = await discovery_client.call_service(
|
|
"oracle", "process", analysis_request, timeout=15.0
|
|
)
|
|
|
|
if not result.success:
|
|
logger.error(f"[💭] Oracle analysis failed: {result.error}")
|
|
return # Don't publish event if analysis fails
|
|
|
|
analysis = result.data
|
|
sentiment = analysis.get("sentiment", "positive")
|
|
depth = analysis.get("depth", 0.3)
|
|
reasoning = analysis.get("reasoning", "")
|
|
|
|
logger.info(f"[💭] 📊 Oracle analysis: sentiment={sentiment}, depth={depth:.2f}")
|
|
logger.debug(f"[💭] Oracle reasoning: {reasoning}")
|
|
|
|
# Publish interaction completion event for identity service
|
|
await event_bus.emit("lyra.interaction.completed", {
|
|
"internal_id": identity,
|
|
"sentiment": sentiment, # positive, neutral, negative
|
|
"depth": depth, # 0.0-1.0
|
|
"summary": f"Conversation with {len(context.completed_steps)} reasoning steps",
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
})
|
|
|
|
logger.info(f"[💭] 📊 Interaction finalized: {identity} (sentiment={sentiment}, depth={depth:.2f})")
|
|
|
|
except Exception as e:
|
|
logger.error(f"[💭] Error finalizing interaction: {e}")
|
|
# Don't publish event if we couldn't analyze properly
|
|
|
|
def _parse_function_call(self, content: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Parse Python-like function call from Oracle's output.
|
|
Returns: {"function": "name", "args": {...}, "reasoning": "..."}
|
|
"""
|
|
import re
|
|
import json
|
|
|
|
# Valid function names
|
|
valid_functions = [
|
|
'short_memory', 'long_memory', 'facts', 'save_fact', 'update_fact',
|
|
'identity', 'search_relationships', 'health', 'duckduckgo',
|
|
'introduce', 'update_relationship', 'add_attribute', 'link_identity',
|
|
'ready'
|
|
]
|
|
|
|
# Extract reasoning (lines starting with //)
|
|
reasoning_parts = []
|
|
for line in content.split('\n'):
|
|
if line.strip().startswith('//'):
|
|
reasoning_parts.append(line.strip()[2:].strip())
|
|
|
|
reasoning = " ".join(reasoning_parts) if reasoning_parts else ""
|
|
|
|
# Find function call - try multiple patterns
|
|
function_match = None
|
|
for func in valid_functions:
|
|
# Pattern: function_name(...) with any content inside
|
|
pattern = f'{func}\\s*\\(([^)]*)\\)'
|
|
match = re.search(pattern, content, re.IGNORECASE | re.DOTALL)
|
|
if match:
|
|
function_name = func
|
|
args_string = match.group(1).strip()
|
|
function_match = (function_name, args_string)
|
|
break
|
|
|
|
if not function_match:
|
|
return None
|
|
|
|
function_name, args_string = function_match
|
|
|
|
# Parse arguments
|
|
args = {}
|
|
if args_string:
|
|
try:
|
|
# Try to parse as Python-like arguments
|
|
# Handle kwargs like: count=10, person="alex", content="it's nice"
|
|
|
|
# Better pattern that respects quoted strings with commas
|
|
# Matches: key=value where value can be quoted string, number, boolean, or JSON
|
|
kwarg_pattern = r'(\w+)\s*=\s*(?:"([^"\\]*(?:\\.[^"\\]*)*)"|\'([^\'\\]*(?:\\.[^\'\\]*)*)\'|(\{[^\}]*\})|(\[[^\]]*\])|([^,]+))'
|
|
matches = re.findall(kwarg_pattern, args_string)
|
|
|
|
for match in matches:
|
|
key = match[0]
|
|
# match[1] = double-quoted string, match[2] = single-quoted string
|
|
# match[3] = dict, match[4] = list, match[5] = unquoted value
|
|
|
|
if match[1]: # Double-quoted string
|
|
value = match[1]
|
|
# Unescape any escaped quotes
|
|
args[key] = value.replace('\\"', '"').replace('\\\\', '\\')
|
|
elif match[2]: # Single-quoted string
|
|
value = match[2]
|
|
# Unescape any escaped quotes
|
|
args[key] = value.replace("\\'", "'").replace('\\\\', '\\')
|
|
elif match[3]: # Dict
|
|
try:
|
|
# Try JSON parse, converting single quotes to double
|
|
json_str = match[3].replace("'", '"')
|
|
args[key] = json.loads(json_str)
|
|
except:
|
|
args[key] = match[3]
|
|
elif match[4]: # List
|
|
try:
|
|
# Try JSON parse, converting single quotes to double
|
|
json_str = match[4].replace("'", '"')
|
|
args[key] = json.loads(json_str)
|
|
except:
|
|
args[key] = match[4]
|
|
else: # Unquoted value (number, boolean, or bare string)
|
|
value = match[5].strip()
|
|
if value.lower() in ('true', 'false'):
|
|
args[key] = value.lower() == 'true'
|
|
elif value.lower() == 'none':
|
|
args[key] = None
|
|
else:
|
|
# Try as number
|
|
try:
|
|
args[key] = int(value)
|
|
except ValueError:
|
|
try:
|
|
args[key] = float(value)
|
|
except ValueError:
|
|
args[key] = value
|
|
|
|
except Exception as e:
|
|
logger.warning(f"[💭] Error parsing function args: {e}")
|
|
args = {}
|
|
|
|
return {
|
|
'function': function_name,
|
|
'args': args,
|
|
'reasoning': reasoning or f"Oracle chose {function_name}"
|
|
}
|
|
|
|
def _build_synthetic_user_message(self, intent: str, context: Dict[str, Any]) -> str:
|
|
"""
|
|
Build a synthetic user message from drive intent.
|
|
This becomes the "original_message" that Lyra reasons about using iterative flow.
|
|
"""
|
|
intent_messages = {
|
|
'check_in': "I'd like to check in with the user and see how they're doing.",
|
|
'greeting': "I want to greet the user warmly.",
|
|
'health_alert': f"I need to inform the user about a system health issue: {context.get('health_status', 'unknown')}",
|
|
'health_recovery': "I want to let the user know the system has recovered.",
|
|
'celebration': f"I want to celebrate with the user about: {context.get('celebration_type', 'something positive')}",
|
|
'memory_share': f"I want to share a thought or memory about: {context.get('topic_focus', 'our conversations')}",
|
|
'curiosity_burst': f"I'm curious about: {context.get('curiosity_topic', 'something interesting')}"
|
|
}
|
|
|
|
return intent_messages.get(intent, f"I want to communicate about: {intent}")
|
|
|
|
async def handle_communication_request(self, payload):
|
|
"""Handle communication requests from drive service."""
|
|
try:
|
|
intent = payload.get('intent', 'generic')
|
|
urgency = payload.get('urgency', 'medium')
|
|
context = payload.get('context', {})
|
|
modality = payload.get('modality', 'matrix')
|
|
user_id = payload.get('user_id')
|
|
channel = payload.get('channel')
|
|
|
|
logger.info(f"[💭] 📢 Processing communication request: intent='{intent}', urgency='{urgency}'")
|
|
|
|
# Step 1: Determine target - use user_id for DM, channel for room, or fallback
|
|
if user_id:
|
|
target = user_id
|
|
target_type = 'user_id'
|
|
logger.debug(f"[💭] 🎯 Targeting user_id: {user_id}")
|
|
elif channel:
|
|
target = channel
|
|
target_type = 'channel'
|
|
logger.debug(f"[💭] 🎯 Targeting channel: {channel}")
|
|
else:
|
|
# Fallback to hardcoded for compatibility
|
|
target = '!mDZBSOqMVtevTNFvsr:matrix.k4zka.online'
|
|
target_type = 'channel'
|
|
logger.debug(f"[💭] 🎯 Using fallback target: {target}")
|
|
|
|
# Step 2: Resolve trusted users for identity (simplified for now)
|
|
trusted_users = ['alex'] # In full implementation, would query identity service
|
|
|
|
for user in trusted_users:
|
|
logger.info(f"[💭] 🎭 Composing {intent} message for {user} using iterative reasoning")
|
|
|
|
# Step 3: Build synthetic user message from intent
|
|
synthetic_message = self._build_synthetic_user_message(intent, context)
|
|
logger.debug(f"[💭] Synthetic message: '{synthetic_message}'")
|
|
|
|
# Step 4: Use iterative reasoning to compose response with full context
|
|
# This gives Lyra access to:
|
|
# - Her full identity + voice guide
|
|
# - Conversation history via memory()
|
|
# - User identity via identity()
|
|
# - System health via health() if needed
|
|
response_content = await self.iterative_response(
|
|
synthetic_message,
|
|
user, # identity
|
|
target, # channel
|
|
modality
|
|
)
|
|
|
|
if not response_content:
|
|
logger.error(f"[💭] ❌ Failed to generate {intent} response")
|
|
continue
|
|
|
|
# Store Lyra's response in memory
|
|
interaction_id = self.generate_interaction_id(user, modality)
|
|
lyra_memory_stored = await self.store_memory(
|
|
response_content, ['lyra', user], interaction_id, modality
|
|
)
|
|
if not lyra_memory_stored:
|
|
logger.warning(f"[💭] Failed to store Lyra's response memory")
|
|
|
|
# Send the actual response to the user
|
|
logger.info(f"[💭] 🚀 Sending {intent} communication to {modality} {target}")
|
|
output_sent = await self.send_output(response_content, target, modality, target_type)
|
|
|
|
if output_sent:
|
|
logger.info(f"[💭] ✅ Communication sent: {intent} → {user} via {modality}")
|
|
else:
|
|
logger.error(f"[💭] ❌ Failed to send {intent} communication")
|
|
|
|
except Exception as e:
|
|
logger.exception(f"[💭] Error handling communication request: {e}")
|
|
|
|
async def handle_external_input(self, payload):
|
|
"""Handle lyra.external.input events - main orchestration logic"""
|
|
try:
|
|
# Extract input data
|
|
external_identity = payload.get('identity', 'unknown')
|
|
content = payload.get('content', '')
|
|
modality = payload.get('modality', 'text')
|
|
channel = payload.get('channel', 'unknown')
|
|
timestamp = payload.get('timestamp', datetime.utcnow().timestamp())
|
|
|
|
logger.info(f"[💭] Processing input from {external_identity}: '{content[:50]}...'")
|
|
|
|
if not content:
|
|
logger.warning("[💭] Empty content in external input")
|
|
return
|
|
|
|
# Step 1: Resolve identity first (needed for interaction ID)
|
|
identity_info = await self.resolve_identity(external_identity)
|
|
if not identity_info:
|
|
logger.error(f"[💭] Failed to resolve identity for {external_identity}")
|
|
return
|
|
|
|
resolved_identity = identity_info.get('resolved_identity', 'unknown')
|
|
|
|
# Step 2: Generate interaction ID
|
|
interaction_id = self.generate_interaction_id(resolved_identity, modality)
|
|
|
|
# Step 3: Emit typing indicator immediately after getting interaction ID
|
|
try:
|
|
typing_payload = {
|
|
"type": "lyra.output.generating",
|
|
"channel": channel,
|
|
"modality": modality,
|
|
"interaction_id": interaction_id,
|
|
"identity": resolved_identity,
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
await event_bus.emit("lyra.output.generating", typing_payload)
|
|
logger.debug(f"[💭] 📝 Typing indicator emitted for {interaction_id}")
|
|
except Exception as e:
|
|
logger.warning(f"[💭] Failed to emit typing indicator: {e}")
|
|
|
|
# Step 4: Use pure iterative reasoning for all requests
|
|
logger.info(f"[💭] 🔄 Using iterative reasoning")
|
|
response_content = await self._handle_iterative_flow(content, resolved_identity, channel, modality, interaction_id)
|
|
|
|
if not response_content:
|
|
logger.error(f"[💭] No response from oracle for {interaction_id}")
|
|
return
|
|
|
|
# Send response back through output
|
|
logger.info(f"[💭] 🚀 Sending output to {modality} channel {channel}")
|
|
output_sent = await self.send_output(response_content, channel, modality)
|
|
if not output_sent:
|
|
logger.error(f"[💭] ❌ Failed to send output for {interaction_id}")
|
|
else:
|
|
logger.info(f"[💭] ✅ Output sent successfully for {interaction_id}")
|
|
|
|
# Check for plugin actions based on response content
|
|
await self.check_plugin_actions(response_content, resolved_identity, interaction_id, modality)
|
|
|
|
# Publish final response for external consumers
|
|
try:
|
|
external_response = {
|
|
"content": response_content,
|
|
"resolved_identity": resolved_identity,
|
|
"external_identity": external_identity,
|
|
"interaction_id": interaction_id,
|
|
"modality": modality,
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
await event_bus.emit("lyra.external.output", external_response)
|
|
logger.debug(f"[💭] Published external output for {interaction_id}")
|
|
except Exception as e:
|
|
logger.exception(f"[💭] Failed to publish external output: {e}")
|
|
|
|
# Clear context after processing is complete
|
|
if resolved_identity in self._current_context:
|
|
del self._current_context[resolved_identity]
|
|
|
|
logger.info(f"[💭] ✓ Processing complete for {interaction_id}")
|
|
|
|
except Exception as e:
|
|
logger.exception(f"[💭] Failed to process external input: {e}")
|
|
# Clean up context on error as well
|
|
if 'resolved_identity' in locals() and resolved_identity in self._current_context:
|
|
del self._current_context[resolved_identity]
|
|
|
|
|
|
|
|
# Note: start() and stop() methods are now handled by BaseService
|
|
# Custom initialization/cleanup is done in initialize_service() and cleanup_service()
|
|
|
|
def _handle_event_wrapper(self, handler):
|
|
"""Wrapper to handle JSON parsing of event data"""
|
|
async def wrapper(data):
|
|
try:
|
|
if isinstance(data, str):
|
|
payload = json.loads(data)
|
|
elif hasattr(data, 'data'): # NATS message object
|
|
payload = json.loads(data.data.decode())
|
|
else:
|
|
payload = data
|
|
await handler(payload)
|
|
except Exception as e:
|
|
logger.error(f"[💭] Event handler error: {e}")
|
|
return wrapper
|
|
|
|
|
|
async def main():
|
|
"""Main entry point for think service"""
|
|
think_service = ThinkService()
|
|
|
|
try:
|
|
await event_bus.connect()
|
|
await think_service.start(event_bus)
|
|
|
|
logger.info("[💭] Think service running. Press Ctrl+C to stop.")
|
|
|
|
# Keep running
|
|
while True:
|
|
await asyncio.sleep(1)
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("[💭] Shutdown requested")
|
|
except Exception as e:
|
|
logger.exception(f"[💭] Unexpected error: {e}")
|
|
finally:
|
|
await think_service.stop()
|
|
await event_bus.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |