""" Step execution engine for iterative reasoning. This module executes reasoning steps by calling appropriate services and executing Oracle-requested functions. """ from typing import Any from datetime import datetime from core.logger import setup_logger from core.service_discovery import discovery_client from core.event_cache import event_cache from .models import ReasoningStep, StepResult, StepAction, IterativeContext from .todo_manager import TodoManager class StepExecutor: """Executes reasoning steps and service calls""" def __init__(self, memory_manager, logger_name: str = 'step_executor'): self.logger = setup_logger(logger_name, service_name='think_service') self.memory_manager = memory_manager self.todo_manager = TodoManager() async def execute_step(self, step: ReasoningStep, context: IterativeContext) -> StepResult: """Execute a validated reasoning step""" start_time = datetime.utcnow() step_exec_id = f"{context.step_count + 1}_{datetime.utcnow().timestamp()}" try: self.logger.debug(f"[💭] [{step_exec_id}] Executing step: {step.action} -> {step.target}") if step.action == StepAction.CALL_SERVICE.value: # Execute service call self.logger.debug(f"[💭] [{step_exec_id}] Calling _execute_service_call for {step.target}") result_data = await self._execute_service_call(step, context, step_exec_id) # Cache service call event for LLM context try: # Format content based on result success = result_data.get('success', True) if isinstance(result_data, dict) else True service_name = step.target function_args = getattr(step, 'function_args', {}) # Create human-readable summary content = f"Called {service_name}({function_args})" if isinstance(result_data, dict): if 'error' in result_data: content += f" - Error: {result_data['error']}" elif 'count' in result_data: content += f" - Found {result_data['count']} results" elif 'message' in result_data: content += f" - {result_data['message']}" await event_cache.add_event( identity=context.identity, interaction_id=step_exec_id, event_type='service_call', content=content, metadata={ 'service': service_name, 'success': success, 'function_args': function_args } ) self.logger.debug(f"[💭] 📝 Cached service call event for {context.identity}") except Exception as e: self.logger.warning(f"[💭] Failed to cache service call event: {e}") elif step.action == StepAction.CHECK_GOAL_SATISFACTION.value: # This is now handled by oracle_client, shouldn't reach here result_data = False else: # Unknown action return StepResult( step=step, success=False, error_message=f"Unknown step action: {step.action}", execution_time_ms=(datetime.utcnow() - start_time).total_seconds() * 1000 ) # Calculate execution time execution_time_ms = (datetime.utcnow() - start_time).total_seconds() * 1000 return StepResult( step=step, success=True, result_data=result_data, execution_time_ms=execution_time_ms ) except Exception as e: self.logger.error(f"[💭] Error executing step: {e}") execution_time_ms = (datetime.utcnow() - start_time).total_seconds() * 1000 return StepResult( step=step, success=False, error_message=str(e), execution_time_ms=execution_time_ms ) async def _execute_service_call(self, step: ReasoningStep, context: IterativeContext, step_exec_id: str = "unknown") -> Any: """Execute a service call step with function arguments""" function_name = step.target function_args = getattr(step, 'function_args', {}) self.logger.info(f"[💭] [{step_exec_id}] Executing function: {function_name}({function_args})") # Route to appropriate handler if function_name == "short_memory": return await self._execute_short_memory(function_args, context) elif function_name == "long_memory": return await self._execute_long_memory(function_args, context) elif function_name == "facts": return await self._execute_facts(function_args, context) elif function_name == "save_fact": return await self._execute_save_fact(function_args, context, step_exec_id) elif function_name == "update_fact": return await self._execute_update_fact(function_args, context, step_exec_id) elif function_name == "identity": return await self._execute_identity(function_args, context) elif function_name == "search_relationships": return await self._execute_search_relationships(function_args) elif function_name == "health": return await self._execute_health() elif function_name == "duckduckgo": return await self._execute_duckduckgo(function_args, context, step_exec_id) elif function_name == "introduce": return await self._execute_introduce(function_args, context) elif function_name == "add_attribute": return await self._execute_add_attribute(function_args) elif function_name == "update_relationship": return await self._execute_update_relationship(function_args) elif function_name == "link_identity": return await self._execute_link_identity(function_args) elif function_name == "todo_create": return await self._execute_todo_create(function_args, context) elif function_name == "todo_update": return await self._execute_todo_update(function_args, context) elif function_name == "todo_list": return await self._execute_todo_list(context) elif function_name == "todo_complete": return await self._execute_todo_complete(function_args, context) else: raise ValueError(f"Unknown function: {function_name}") async def _execute_short_memory(self, args: dict, context: IterativeContext) -> dict: """Execute short_memory function""" n = args.get('n', 10) offset = args.get('offset', 0) result = await discovery_client.call_service( "memory", "short_memory", { "limit": n, "offset": offset, "identity_id": context.identity }, timeout=3.0 ) if result.success and result.data.get("status") == "success": memories = result.data.get("memories", []) return { "success": True, "memories": memories, "count": len(memories), "type": "short_term", "offset": offset } return { "success": False, "memories": [], "count": 0, "type": "short_term", "error": "Failed to fetch short-term memories" } async def _execute_long_memory(self, args: dict, context: IterativeContext) -> dict: """Execute long_memory function""" query = args.get('query') n = args.get('n', 5) result = await discovery_client.call_service( "memory", "long_memory", { "query": query, "limit": n, "identity_id": context.identity }, timeout=5.0 ) if result.success and result.data.get("status") == "success": memories = result.data.get("memories", []) return { "success": True, "memories": memories, "count": len(memories), "type": "long_term", "query": query } return { "success": False, "memories": [], "count": 0, "type": "long_term", "error": "Failed to fetch long-term memories" } async def _execute_facts(self, args: dict, context: IterativeContext) -> dict: """Execute facts function""" query = args.get('query', '') n = args.get('n', 5) result = await discovery_client.call_service( "memory", "facts", { "query": query, "limit": n, "identity_id": context.identity }, timeout=3.0 ) if result.success and result.data.get("status") == "success": facts = result.data.get("facts", []) return { "success": True, "facts": facts, "count": len(facts), "query": query } return { "success": False, "facts": [], "count": 0, "error": "Failed to fetch facts" } async def _execute_save_fact(self, args: dict, context: IterativeContext, step_exec_id: str) -> dict: """Execute save_fact function""" content = args.get('content') category = args.get('category', 'general') mutable = args.get('mutable', True) if not content: return { "success": False, "status": "error", "error": "content is required" } self.logger.info(f"[💭] [{step_exec_id}] Calling memory service save_fact: category={category}, content='{content[:50]}...'") result = await discovery_client.call_service( "memory", "save_fact", { "content": content, "category": category, "identities": [context.identity], "mutable": mutable, "metadata": { "source": "oracle_decision", "timestamp": datetime.utcnow().isoformat(), "step_exec_id": step_exec_id } }, timeout=5.0, # Increased - ChromaDB embedding can take 2-3 seconds retry_attempts=1 # Disable retries for write operations ) if result.success and result.data.get("status") == "success": fact_id = result.data.get("fact_id") self.logger.info(f"[💭] [{step_exec_id}] 💾 Saved fact: {fact_id[:8]}... {content[:50]}...") return { "success": True, "status": "success", "fact_id": fact_id, "message": f"Saved fact: {content[:50]}..." } self.logger.warning(f"[💭] [{step_exec_id}] Failed to save fact: {result.error if not result.success else result.data.get('error')}") return { "success": False, "status": "error", "error": "Failed to save fact" } async def _execute_update_fact(self, args: dict, context: IterativeContext, step_exec_id: str) -> dict: """Execute update_fact function""" fact_id = args.get('fact_id') new_content = args.get('new_content') if not fact_id or not new_content: return { "success": False, "status": "error", "error": "fact_id and new_content required" } result = await discovery_client.call_service( "memory", "update_fact", { "fact_id": fact_id, "new_content": new_content, "identity_id": context.identity, "metadata": { "updated_by": "oracle_decision", "timestamp": datetime.utcnow().isoformat() } }, timeout=5.0, retry_attempts=1 ) if result.success and result.data.get("status") == "success": self.logger.info(f"[💭] ✏️ Updated fact: {fact_id}") return { "success": True, "status": "success", "message": f"Updated fact {fact_id}" } error_msg = result.data.get("error", "Failed to update fact") if result.success else "Service call failed" return { "success": False, "status": "error", "error": error_msg } async def _execute_identity(self, args: dict, context: IterativeContext) -> dict: """Execute identity function""" person = args.get('person', context.identity) result = await discovery_client.call_service( "identity", "get", {"internal_id": person}, timeout=10.0 ) if result.success: return {"identity": result.data} else: return {"error": "Failed to get identity"} async def _execute_search_relationships(self, args: dict) -> dict: """Execute search_relationships function""" result = await discovery_client.call_service( "identity", "search", args, timeout=10.0 ) if result.success: return {"relationships": result.data} else: return {"error": "Failed to search relationships"} async def _execute_health(self) -> dict: """Execute health function""" health_status = await self.memory_manager.get_health_status() return {"health": health_status} async def _execute_duckduckgo(self, args: dict, context: IterativeContext, step_exec_id: str) -> dict: """Execute duckduckgo function""" query = args.get('query', '') limit = args.get('limit', 5) if not query: return { "success": False, "error": "query is required" } self.logger.info(f"[💭] [{step_exec_id}] Executing DuckDuckGo plugin: query='{query}'") result = await discovery_client.call_service( "plugin-orchestrator", "execute_plugin", { "plugin_name": "duckduckgo", "operation": "search", "parameters": { "query": query, "limit": limit }, "identity": context.identity, "timeout": 60 # Increased from 15s to 60s for Job pod startup + API call }, timeout=75.0, # NATS timeout (must be > plugin timeout) retry_attempts=1 ) if result.success and result.data.get("status") == "success": plugin_result = result.data.get("result", {}) self.logger.info(f"[💭] [{step_exec_id}] DuckDuckGo search completed") return { "success": True, "query": query, "answer": plugin_result.get("answer"), "abstract": plugin_result.get("abstract"), "definition": plugin_result.get("definition"), "results": plugin_result.get("results", []), "related_topics": plugin_result.get("related_topics", []) } self.logger.warning(f"[💭] [{step_exec_id}] DuckDuckGo search failed") return { "success": False, "error": result.data.get("error", "Plugin execution failed") if result.success else result.error } async def _execute_introduce(self, args: dict, context: IterativeContext) -> dict: """Execute introduce function""" payload = { "name": args.get('name'), "entity_type": args.get('entity_type', 'human'), "relationship_types": args.get('relationships', []), "context": args.get('context', ''), "attributes": args.get('attributes', {}), "introduced_by": context.identity } result = await discovery_client.call_service( "identity", "introduce", payload, timeout=10.0 ) if result.success: self.logger.info(f"[💭] 👋 Introduced new entity: {args.get('name')}") return {"introduced": result.data} else: return {"error": "Failed to introduce entity"} async def _execute_add_attribute(self, args: dict) -> dict: """Execute add_attribute function""" payload = { "internal_id": args.get('person'), "attribute_key": args.get('key'), "attribute_value": args.get('value') } result = await discovery_client.call_service( "identity", "add_attribute", payload, timeout=10.0 ) if result.success: self.logger.info(f"[💭] 📝 Added attribute: {payload['internal_id']}.{payload['attribute_key']}") return {"attribute_added": True} else: return {"error": "Failed to add attribute"} async def _execute_update_relationship(self, args: dict) -> dict: """Execute update_relationship function""" payload = { "internal_id": args.get('person'), "trust_delta": args.get('trust_delta', 0.0), "intimacy_delta": args.get('intimacy_delta', 0.0), "reason": args.get('reason', 'oracle_update'), "interaction_summary": f"Oracle explicit update: {args.get('reason', '')}" } result = await discovery_client.call_service( "identity", "update", payload, timeout=10.0 ) if result.success: self.logger.info(f"[💭] 💚 Updated relationship: {payload['internal_id']}") return {"relationship_updated": True} else: return {"error": "Failed to update relationship"} async def _execute_link_identity(self, args: dict) -> dict: """Execute link_identity function""" payload = { "external_id": args.get('external_id'), "internal_id": args.get('internal_id'), "identity_type": args.get('identity_type', 'unknown'), "verified": args.get('confidence', 0.0) > 0.9 } result = await discovery_client.call_service( "identity", "link", payload, timeout=10.0 ) if result.success: self.logger.info(f"[💭] 🔗 Linked identity: {payload['external_id']} -> {payload['internal_id']}") return {"identity_linked": True} else: return {"error": "Failed to link identity"} async def _execute_todo_create(self, args: dict, context: IterativeContext) -> dict: """Execute todo_create function""" content = args.get('content') active_form = args.get('activeForm', args.get('active_form', content)) status = args.get('status', 'pending') if not content: return { "success": False, "error": "content is required" } try: todo = await self.todo_manager.create( interaction_id=context.interaction_id, content=content, active_form=active_form, status=status ) self.logger.info(f"[💭] ✓ Created todo {todo.todo_id}: {content}") return { "success": True, "todo_id": todo.todo_id, "content": content, "status": status, "message": f"Created todo: {content}" } except Exception as e: self.logger.error(f"[💭] Error creating todo: {e}") return { "success": False, "error": f"Failed to create todo: {str(e)}" } async def _execute_todo_update(self, args: dict, context: IterativeContext) -> dict: """Execute todo_update function""" todo_id = args.get('todo_id') status = args.get('status') content = args.get('content') active_form = args.get('activeForm', args.get('active_form')) if not todo_id: return { "success": False, "error": "todo_id is required" } try: result = await self.todo_manager.update( interaction_id=context.interaction_id, todo_id=todo_id, status=status, content=content, active_form=active_form ) if result: self.logger.info(f"[💭] ✏️ Updated todo {todo_id}") return { "success": True, "todo_id": todo_id, "message": f"Updated todo {todo_id}" } else: return { "success": False, "error": f"Todo {todo_id} not found" } except Exception as e: self.logger.error(f"[💭] Error updating todo: {e}") return { "success": False, "error": f"Failed to update todo: {str(e)}" } async def _execute_todo_list(self, context: IterativeContext) -> dict: """Execute todo_list function""" try: todos = await self.todo_manager.list(context.interaction_id) summary = await self.todo_manager.get_summary(context.interaction_id) todo_list = [ { "todo_id": t.todo_id, "content": t.content, "active_form": t.active_form, "status": t.status } for t in todos ] self.logger.info(f"[💭] 📋 Listed {len(todos)} todos") return { "success": True, "todos": todo_list, "summary": summary, "count": len(todos) } except Exception as e: self.logger.error(f"[💭] Error listing todos: {e}") return { "success": False, "error": f"Failed to list todos: {str(e)}", "todos": [] } async def _execute_todo_complete(self, args: dict, context: IterativeContext) -> dict: """Execute todo_complete function""" todo_id = args.get('todo_id') if not todo_id: return { "success": False, "error": "todo_id is required" } try: result = await self.todo_manager.complete( interaction_id=context.interaction_id, todo_id=todo_id ) if result: self.logger.info(f"[💭] ✓ Completed todo {todo_id}") return { "success": True, "todo_id": todo_id, "message": f"Completed todo {todo_id}" } else: return { "success": False, "error": f"Todo {todo_id} not found" } except Exception as e: self.logger.error(f"[💭] Error completing todo: {e}") return { "success": False, "error": f"Failed to complete todo: {str(e)}" }