mirror of
https://github.com/awslabs/amazon-bedrock-agentcore-samples.git
synced 2025-09-08 20:50:46 +00:00
* Add multi-region support for SRE-Agent - Add AWS region configuration parameter to agent_config.yaml - Update gateway main.py to validate region matches endpoint URL - Modify SRE agent to read region from config and pass through function chain - Update memory client and LLM creation to use configurable region - Fixes hardcoded us-east-1 region dependencies Closes #245 * Move architecture file to docs/ and improve setup instructions - Move sre_agent_architecture.md to docs/ folder for better organization - Update graph export code to generate architecture file in docs/ folder - Add automatic docs directory creation if it doesn't exist - Improve README setup instructions: - Fix .env.example copy path to use sre_agent folder - Add note that Amazon Bedrock users don't need to modify .env - Add START_API_BACKEND variable to conditionally start backend servers - Useful for workshop environments where backends are already running * Improve gateway configuration documentation and setup instructions - Update config.yaml.example to use REGION placeholder instead of hardcoded us-east-1 - Add gateway configuration step to README setup instructions - Document .cognito_config file in auth.md automated setup section - Remove duplicate credential_provider_name from config.yaml.example - Update configuration.md to include .cognito_config in files overview - Add clear instructions to copy and edit gateway/config.yaml before creating gateway * Improve IAM role guidance and region handling - Add clear guidance about IAM role options in gateway/config.yaml.example - Explain that testing can use current EC2/notebook role - Recommend dedicated role for production deployments - Add aws sts get-caller-identity command to help users find their role - Update deployment scripts to use AWS_REGION env var as fallback - Scripts now follow: CLI arg -> AWS_REGION env var -> us-east-1 default * Remove unnecessary individual Cognito ID files - Remove creation of .cognito_user_pool_id file - Remove creation of .cognito_client_id file - Keep only .cognito_config as the single source of truth - Simplifies configuration management * Implement region fallback logic for SRE Agent - Added region fallback chain: agent_config.yaml -> AWS_REGION env -> us-east-1 - Modified agent_config.yaml to comment out region parameter to enable fallback - Updated multi_agent_langgraph.py with comprehensive fallback implementation - Added logging to show which region source is being used - Ensures flexible region configuration without breaking existing deployments - Maintains backward compatibility while adding multi-region support
898 lines
36 KiB
Python
898 lines
36 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Literal, Optional
|
|
|
|
from langchain_core.messages import HumanMessage, SystemMessage
|
|
from langgraph.prebuilt import create_react_agent
|
|
from pydantic import BaseModel, Field, field_validator
|
|
|
|
from .agent_state import AgentState
|
|
from .constants import SREConstants
|
|
from .llm_utils import create_llm_with_error_handling
|
|
from .memory import create_conversation_memory_manager
|
|
from .memory.client import SREMemoryClient
|
|
from .memory.config import _load_memory_config
|
|
from .memory.hooks import MemoryHookProvider
|
|
from .memory.tools import create_memory_tools
|
|
from .output_formatter import create_formatter
|
|
from .prompt_loader import prompt_loader
|
|
|
|
|
|
def _get_user_from_env() -> str:
|
|
"""Get user_id from environment variable.
|
|
|
|
Returns:
|
|
user_id from USER_ID environment variable or default
|
|
"""
|
|
user_id = os.getenv("USER_ID")
|
|
if user_id:
|
|
logger.info(f"Using user_id from environment: {user_id}")
|
|
return user_id
|
|
else:
|
|
# Fallback to default user_id
|
|
default_user_id = SREConstants.agents.default_user_id
|
|
logger.warning(
|
|
f"USER_ID not set in environment, using default: {default_user_id}"
|
|
)
|
|
return default_user_id
|
|
|
|
|
|
def _get_session_from_env(mode: str) -> str:
|
|
"""Get session_id from environment variable or generate one.
|
|
|
|
Args:
|
|
mode: "interactive" or "prompt" for auto-generation prefix
|
|
|
|
Returns:
|
|
session_id from SESSION_ID environment variable or auto-generated
|
|
"""
|
|
session_id = os.getenv("SESSION_ID")
|
|
if session_id:
|
|
logger.info(f"Using session_id from environment: {session_id}")
|
|
return session_id
|
|
else:
|
|
# Auto-generate session_id
|
|
auto_session_id = f"{mode}-{datetime.now().strftime('%Y%m%d%H%M%S')}"
|
|
logger.info(
|
|
f"SESSION_ID not set in environment, auto-generated: {auto_session_id}"
|
|
)
|
|
return auto_session_id
|
|
|
|
|
|
# Configure logging with basicConfig
|
|
logging.basicConfig(
|
|
level=logging.INFO, # Set the log level to INFO
|
|
# Define log message format
|
|
format="%(asctime)s,p%(process)s,{%(filename)s:%(lineno)d},%(levelname)s,%(message)s",
|
|
)
|
|
|
|
# Enable HTTP and MCP protocol logs for debugging
|
|
# Comment out the following lines to suppress these logs if needed
|
|
# mcp_loggers = ["streamable_http", "mcp.client.streamable_http", "httpx", "httpcore"]
|
|
#
|
|
# for logger_name in mcp_loggers:
|
|
# mcp_logger = logging.getLogger(logger_name)
|
|
# mcp_logger.setLevel(logging.WARNING)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _json_serializer(obj):
|
|
"""JSON serializer for objects not serializable by default json code."""
|
|
if isinstance(obj, datetime):
|
|
return obj.isoformat()
|
|
raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable")
|
|
|
|
|
|
class InvestigationPlan(BaseModel):
|
|
"""Investigation plan created by supervisor."""
|
|
|
|
steps: List[str] = Field(
|
|
description="List of 3-5 investigation steps to be executed"
|
|
)
|
|
|
|
@field_validator("steps", mode="before")
|
|
@classmethod
|
|
def validate_steps(cls, v):
|
|
"""Convert string steps to list if needed."""
|
|
if isinstance(v, str):
|
|
# Split by numbered lines and clean up
|
|
import re
|
|
|
|
lines = v.strip().split("\n")
|
|
steps = []
|
|
for line in lines:
|
|
line = line.strip()
|
|
if line:
|
|
# Remove numbering like "1.", "2.", etc.
|
|
clean_line = re.sub(r"^\d+\.\s*", "", line)
|
|
if clean_line:
|
|
steps.append(clean_line)
|
|
return steps
|
|
return v
|
|
|
|
agents_sequence: List[str] = Field(
|
|
description="Sequence of agents to invoke (kubernetes_agent, logs_agent, metrics_agent, runbooks_agent)"
|
|
)
|
|
complexity: Literal["simple", "complex"] = Field(
|
|
description="Whether this plan is simple (auto-execute) or complex (needs approval)"
|
|
)
|
|
auto_execute: bool = Field(
|
|
description="Whether to execute automatically or ask for user approval"
|
|
)
|
|
reasoning: str = Field(
|
|
description="Brief explanation of the investigation approach"
|
|
)
|
|
|
|
|
|
class RouteDecision(BaseModel):
|
|
"""Decision made by supervisor for routing."""
|
|
|
|
next: Literal[
|
|
"kubernetes_agent", "logs_agent", "metrics_agent", "runbooks_agent", "FINISH"
|
|
] = Field(description="The next agent to route to, or FINISH if done")
|
|
reasoning: str = Field(
|
|
description="Brief explanation of why this routing decision was made"
|
|
)
|
|
|
|
|
|
def _read_supervisor_prompt() -> str:
|
|
"""Read supervisor system prompt from file."""
|
|
try:
|
|
prompt_path = (
|
|
Path(__file__).parent
|
|
/ "config"
|
|
/ "prompts"
|
|
/ "supervisor_multi_agent_prompt.txt"
|
|
)
|
|
if prompt_path.exists():
|
|
return prompt_path.read_text().strip()
|
|
except Exception as e:
|
|
logger.warning(f"Could not read supervisor prompt file: {e}")
|
|
|
|
# Fallback to supervisor fallback prompt file
|
|
try:
|
|
fallback_path = (
|
|
Path(__file__).parent
|
|
/ "config"
|
|
/ "prompts"
|
|
/ "supervisor_fallback_prompt.txt"
|
|
)
|
|
if fallback_path.exists():
|
|
return fallback_path.read_text().strip()
|
|
except Exception as e:
|
|
logger.warning(f"Could not read supervisor fallback prompt file: {e}")
|
|
|
|
# Final hardcoded fallback if files not found
|
|
return (
|
|
"You are the Supervisor Agent orchestrating a team of specialized SRE agents."
|
|
)
|
|
|
|
|
|
def _read_planning_prompt() -> str:
|
|
"""Read planning prompt from file."""
|
|
try:
|
|
prompt_path = (
|
|
Path(__file__).parent
|
|
/ "config"
|
|
/ "prompts"
|
|
/ "supervisor_planning_prompt.txt"
|
|
)
|
|
if prompt_path.exists():
|
|
return prompt_path.read_text().strip()
|
|
except Exception as e:
|
|
logger.warning(f"Could not read planning prompt file: {e}")
|
|
|
|
# Fallback planning prompt
|
|
return """Create a simple, focused investigation plan with 2-3 steps maximum.
|
|
Create the plan in JSON format with these fields:
|
|
- steps: List of 3-5 investigation steps
|
|
- agents_sequence: List of agents to invoke (kubernetes_agent, logs_agent, metrics_agent, runbooks_agent)
|
|
- complexity: "simple" or "complex"
|
|
- auto_execute: true or false
|
|
- reasoning: Brief explanation of the investigation approach"""
|
|
|
|
|
|
class SupervisorAgent:
|
|
"""Supervisor agent that orchestrates other agents with memory capabilities."""
|
|
|
|
def __init__(
|
|
self,
|
|
llm_provider: str = "bedrock",
|
|
force_delete_memory: bool = False,
|
|
**llm_kwargs,
|
|
):
|
|
self.llm_provider = llm_provider
|
|
self.llm = self._create_llm(**llm_kwargs)
|
|
self.system_prompt = _read_supervisor_prompt()
|
|
self.formatter = create_formatter(llm_provider=llm_provider)
|
|
|
|
# Initialize memory system
|
|
self.memory_config = _load_memory_config()
|
|
if self.memory_config.enabled:
|
|
# Use region from llm_kwargs if provided for bedrock
|
|
memory_region = llm_kwargs.get("region_name", self.memory_config.region) if llm_provider == "bedrock" else self.memory_config.region
|
|
self.memory_client = SREMemoryClient(
|
|
memory_name=self.memory_config.memory_name,
|
|
region=memory_region,
|
|
force_delete=force_delete_memory,
|
|
)
|
|
self.memory_hooks = MemoryHookProvider(self.memory_client)
|
|
self.conversation_manager = create_conversation_memory_manager(
|
|
self.memory_client
|
|
)
|
|
|
|
# Create memory tools for supervisor agent
|
|
self.memory_tools = create_memory_tools(self.memory_client)
|
|
|
|
# Create react agent with memory tools for supervised planning
|
|
self.planning_agent = create_react_agent(self.llm, self.memory_tools)
|
|
logger.info(
|
|
f"Memory system initialized for supervisor agent with {len(self.memory_tools)} memory tools"
|
|
)
|
|
else:
|
|
self.memory_client = None
|
|
self.memory_hooks = None
|
|
self.conversation_manager = None
|
|
self.memory_tools = []
|
|
self.planning_agent = None
|
|
logger.info("Memory system disabled")
|
|
|
|
def _create_llm(self, **kwargs):
|
|
"""Create LLM instance with improved error handling."""
|
|
return create_llm_with_error_handling(self.llm_provider, **kwargs)
|
|
|
|
async def retrieve_memory(
|
|
self,
|
|
memory_type: str,
|
|
query: str,
|
|
actor_id: str,
|
|
max_results: int = 5,
|
|
session_id: Optional[str] = None,
|
|
) -> str:
|
|
"""Retrieve information from long-term memory using the retrieve_memory tool."""
|
|
if not self.memory_tools:
|
|
return "Memory system not enabled"
|
|
|
|
# Find the retrieve_memory tool
|
|
retrieve_tool = None
|
|
for tool in self.memory_tools:
|
|
if tool.name == "retrieve_memory":
|
|
retrieve_tool = tool
|
|
break
|
|
|
|
if not retrieve_tool:
|
|
return "Retrieve memory tool not available"
|
|
|
|
try:
|
|
logger.info(
|
|
f"Supervisor using retrieve_memory tool: type={memory_type}, query='{query}', actor_id={actor_id}"
|
|
)
|
|
result = retrieve_tool._run(
|
|
memory_type=memory_type,
|
|
query=query,
|
|
actor_id=actor_id,
|
|
max_results=max_results,
|
|
session_id=session_id,
|
|
)
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving memory: {e}", exc_info=True)
|
|
return f"Error retrieving memory: {str(e)}"
|
|
|
|
async def create_investigation_plan(self, state: AgentState) -> InvestigationPlan:
|
|
"""Create an investigation plan for the user's query with memory context."""
|
|
current_query = state.get("current_query", "No query provided")
|
|
user_id = state.get("user_id", SREConstants.agents.default_user_id)
|
|
incident_id = state.get("incident_id")
|
|
|
|
# Update memory tools with the current user_id
|
|
if self.memory_tools:
|
|
from .memory.tools import update_memory_tools_user_id
|
|
|
|
update_memory_tools_user_id(self.memory_tools, user_id)
|
|
logger.info(f"Updated memory tools with user_id: {user_id}")
|
|
# Use user_id as actor_id for investigation memory retrieval (consistent with storage)
|
|
actor_id = state.get(
|
|
"user_id", state.get("actor_id", SREConstants.agents.default_actor_id)
|
|
)
|
|
session_id = state.get("session_id")
|
|
|
|
# Retrieve memory context if memory system is enabled
|
|
memory_context_text = ""
|
|
if self.memory_client:
|
|
try:
|
|
logger.info(
|
|
f"Retrieving memory context for user_id={user_id}, query='{current_query}'"
|
|
)
|
|
|
|
# Get memory context from hooks
|
|
if not session_id:
|
|
raise ValueError(
|
|
"session_id is required for memory retrieval but not found in state"
|
|
)
|
|
|
|
memory_context = self.memory_hooks.on_investigation_start(
|
|
query=current_query,
|
|
user_id=user_id,
|
|
actor_id=actor_id,
|
|
session_id=session_id,
|
|
incident_id=incident_id,
|
|
)
|
|
|
|
# Store memory context in state
|
|
state["memory_context"] = memory_context
|
|
|
|
# Log user preferences for debugging (they're stored in memory_context)
|
|
user_prefs = memory_context.get("user_preferences", [])
|
|
logger.debug(
|
|
f"Stored {len(user_prefs)} user preferences in memory_context during planning"
|
|
)
|
|
logger.debug(
|
|
f"User preferences being stored in memory_context: {user_prefs}"
|
|
)
|
|
|
|
# Format memory context for prompt
|
|
pref_count = len(memory_context.get("user_preferences", []))
|
|
infrastructure_by_agent = memory_context.get(
|
|
"infrastructure_by_agent", {}
|
|
)
|
|
total_knowledge = sum(
|
|
len(memories) for memories in infrastructure_by_agent.values()
|
|
)
|
|
investigation_count = len(memory_context.get("past_investigations", []))
|
|
|
|
if memory_context.get("user_preferences"):
|
|
memory_context_text += f"\nRelevant User Preferences:\n{json.dumps(memory_context['user_preferences'], indent=2, default=_json_serializer)}\n"
|
|
|
|
if infrastructure_by_agent:
|
|
memory_context_text += (
|
|
"\nRelevant Infrastructure Knowledge (organized by agent):\n"
|
|
)
|
|
for agent_id, agent_memories in infrastructure_by_agent.items():
|
|
memory_context_text += (
|
|
f"\n From {agent_id} ({len(agent_memories)} items):\n"
|
|
)
|
|
memory_context_text += f"{json.dumps(agent_memories, indent=4, default=_json_serializer)}\n"
|
|
|
|
if memory_context.get("past_investigations"):
|
|
memory_context_text += f"\nSimilar Past Investigations:\n{json.dumps(memory_context['past_investigations'], indent=2, default=_json_serializer)}\n"
|
|
|
|
logger.info(
|
|
f"Retrieved memory context for planning: {pref_count} preferences, {total_knowledge} knowledge items from {len(infrastructure_by_agent)} agents, {investigation_count} past investigations"
|
|
)
|
|
|
|
if pref_count + total_knowledge + investigation_count == 0:
|
|
logger.info(
|
|
"No relevant memories found - this may be the first interaction or a new topic"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to retrieve memory context: {e}", exc_info=True)
|
|
memory_context_text = ""
|
|
|
|
# Enhanced planning prompt that instructs the agent to use memory tools
|
|
planning_instructions = _read_planning_prompt()
|
|
# Replace placeholders manually to avoid issues with JSON braces in the prompt
|
|
formatted_planning_instructions = planning_instructions.replace(
|
|
"{user_id}", user_id
|
|
)
|
|
if session_id:
|
|
formatted_planning_instructions = formatted_planning_instructions.replace(
|
|
"{session_id}", session_id
|
|
)
|
|
|
|
planning_prompt = f"""{self.system_prompt}
|
|
|
|
User's query: {current_query}
|
|
{memory_context_text}
|
|
|
|
{formatted_planning_instructions}"""
|
|
|
|
if self.planning_agent and self.memory_tools:
|
|
# Use planning agent with memory tools
|
|
try:
|
|
# Create messages for the planning agent
|
|
messages = [
|
|
SystemMessage(content=planning_prompt),
|
|
HumanMessage(
|
|
content=f"Create an investigation plan for: {current_query}"
|
|
),
|
|
]
|
|
|
|
# Use the planning agent with memory tools
|
|
plan_response = await self.planning_agent.ainvoke(
|
|
{"messages": messages}
|
|
)
|
|
|
|
# Extract the final message content
|
|
if plan_response and "messages" in plan_response:
|
|
final_message = plan_response["messages"][-1]
|
|
plan_text = final_message.content
|
|
|
|
# Always log the complete planning agent response
|
|
logger.info(f"Planning agent original response: {plan_text}")
|
|
|
|
# Try to extract JSON from the response
|
|
import re
|
|
|
|
# Look for JSON in the response - try multiple patterns
|
|
json_patterns = [
|
|
r'\{[^{}]*"steps"[^{}]*"agents_sequence"[^{}]*"complexity"[^{}]*"auto_execute"[^{}]*"reasoning"[^{}]*\}', # Specific pattern for our structure
|
|
r'\{.*?"steps".*?\}', # Broader pattern
|
|
r"\{.*\}", # Most general pattern
|
|
]
|
|
|
|
json_content = None
|
|
for pattern in json_patterns:
|
|
json_match = re.search(pattern, plan_text, re.DOTALL)
|
|
if json_match:
|
|
json_content = json_match.group()
|
|
logger.info(
|
|
f"Extracted JSON content using pattern: {json_content}"
|
|
)
|
|
break
|
|
|
|
if json_content:
|
|
try:
|
|
# Clean up the JSON content
|
|
json_content = json_content.strip()
|
|
plan_json = json.loads(json_content)
|
|
logger.info(f"Successfully parsed JSON: {plan_json}")
|
|
plan = InvestigationPlan(**plan_json)
|
|
logger.info(
|
|
"Successfully created InvestigationPlan from JSON"
|
|
)
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"JSON decode error: {e}")
|
|
logger.error(f"Failed JSON content: {json_content}")
|
|
logger.warning(
|
|
"Could not parse JSON from planning agent response, using fallback"
|
|
)
|
|
plan = InvestigationPlan(
|
|
steps=[
|
|
"Investigate the reported issue",
|
|
"Analyze findings and provide recommendations",
|
|
],
|
|
agents_sequence=["metrics_agent", "logs_agent"],
|
|
complexity="simple",
|
|
auto_execute=True,
|
|
reasoning="Default investigation plan due to JSON parsing error",
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error creating InvestigationPlan: {e}")
|
|
logger.error(f"Plan JSON was: {plan_json}")
|
|
logger.warning(
|
|
"Could not create InvestigationPlan from parsed JSON, using fallback"
|
|
)
|
|
plan = InvestigationPlan(
|
|
steps=[
|
|
"Investigate the reported issue",
|
|
"Analyze findings and provide recommendations",
|
|
],
|
|
agents_sequence=["metrics_agent", "logs_agent"],
|
|
complexity="simple",
|
|
auto_execute=True,
|
|
reasoning="Default investigation plan due to validation error",
|
|
)
|
|
else:
|
|
# Fallback to basic plan if JSON parsing fails
|
|
logger.warning(
|
|
"Could not find JSON pattern in planning agent response, using fallback"
|
|
)
|
|
logger.warning(f"Response content was: {plan_text}")
|
|
plan = InvestigationPlan(
|
|
steps=[
|
|
"Investigate the reported issue",
|
|
"Analyze findings and provide recommendations",
|
|
],
|
|
agents_sequence=["metrics_agent", "logs_agent"],
|
|
complexity="simple",
|
|
auto_execute=True,
|
|
reasoning="Default investigation plan due to no JSON found",
|
|
)
|
|
else:
|
|
raise ValueError("No response from planning agent")
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error using planning agent with memory tools: {e}", exc_info=True
|
|
)
|
|
# Fallback to structured output without tools
|
|
structured_llm = self.llm.with_structured_output(InvestigationPlan)
|
|
plan = await structured_llm.ainvoke(
|
|
[
|
|
SystemMessage(content=planning_prompt),
|
|
HumanMessage(content=current_query),
|
|
]
|
|
)
|
|
else:
|
|
# Fallback to structured output without memory tools
|
|
structured_llm = self.llm.with_structured_output(InvestigationPlan)
|
|
plan = await structured_llm.ainvoke(
|
|
[
|
|
SystemMessage(content=planning_prompt),
|
|
HumanMessage(content=current_query),
|
|
]
|
|
)
|
|
|
|
logger.info(
|
|
f"Created investigation plan: {len(plan.steps)} steps, complexity: {plan.complexity}"
|
|
)
|
|
|
|
# Store conversation in memory
|
|
if self.conversation_manager and user_id and session_id:
|
|
try:
|
|
# Get supervisor display name with fallback
|
|
supervisor_name = getattr(SREConstants.agents, "supervisor", None)
|
|
if supervisor_name:
|
|
supervisor_display_name = supervisor_name.display_name
|
|
else:
|
|
supervisor_display_name = "Supervisor Agent"
|
|
|
|
messages_to_store = [
|
|
(current_query, "USER"),
|
|
(
|
|
f"[Agent: {supervisor_display_name}]\nInvestigation Plan:\n{self._format_plan_markdown(plan)}",
|
|
"ASSISTANT",
|
|
),
|
|
]
|
|
|
|
success = self.conversation_manager.store_conversation_batch(
|
|
messages=messages_to_store,
|
|
user_id=user_id,
|
|
session_id=session_id,
|
|
agent_name=supervisor_display_name,
|
|
)
|
|
|
|
if success:
|
|
logger.info("Supervisor: Successfully stored planning conversation")
|
|
else:
|
|
logger.warning("Supervisor: Failed to store planning conversation")
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Supervisor: Error storing planning conversation: {e}",
|
|
exc_info=True,
|
|
)
|
|
|
|
return plan
|
|
|
|
def _format_plan_markdown(self, plan: InvestigationPlan) -> str:
|
|
"""Format investigation plan as properly formatted markdown."""
|
|
plan_text = "## 🔍 Investigation Plan\n\n"
|
|
|
|
# Add steps with proper numbering and formatting
|
|
for i, step in enumerate(plan.steps, 1):
|
|
plan_text += f"**{i}.** {step}\n\n"
|
|
|
|
# Add metadata
|
|
plan_text += f"**📊 Complexity:** {plan.complexity.title()}\n"
|
|
plan_text += f"**🤖 Auto-execute:** {'Yes' if plan.auto_execute else 'No'}\n"
|
|
if plan.reasoning:
|
|
plan_text += f"**💭 Reasoning:** {plan.reasoning}\n"
|
|
|
|
# Add agents involved
|
|
if plan.agents_sequence:
|
|
agents_list = ", ".join(
|
|
[agent.replace("_", " ").title() for agent in plan.agents_sequence]
|
|
)
|
|
plan_text += f"**👥 Agents involved:** {agents_list}\n"
|
|
|
|
return plan_text
|
|
|
|
async def route(self, state: AgentState) -> Dict[str, Any]:
|
|
"""Determine which agent should handle the query next."""
|
|
agents_invoked = state.get("agents_invoked", [])
|
|
|
|
# Check if we have an existing plan
|
|
existing_plan = state.get("metadata", {}).get("investigation_plan")
|
|
|
|
if not existing_plan:
|
|
# First time - create investigation plan
|
|
plan = await self.create_investigation_plan(state)
|
|
|
|
# Check if we should auto-approve the plan (defaults to False if not set)
|
|
auto_approve = state.get("auto_approve_plan", False)
|
|
|
|
if not plan.auto_execute and not auto_approve:
|
|
# Complex plan - present to user for approval
|
|
plan_text = self._format_plan_markdown(plan)
|
|
return {
|
|
"next": "FINISH",
|
|
"metadata": {
|
|
**state.get("metadata", {}),
|
|
"investigation_plan": plan.model_dump(),
|
|
"routing_reasoning": f"Created investigation plan. Complexity: {plan.complexity}",
|
|
"plan_pending_approval": True,
|
|
"plan_text": plan_text,
|
|
},
|
|
# Preserve memory context in state
|
|
"memory_context": state.get("memory_context", {}),
|
|
}
|
|
else:
|
|
# Simple plan - start execution
|
|
next_agent = (
|
|
plan.agents_sequence[0] if plan.agents_sequence else "FINISH"
|
|
)
|
|
plan_text = self._format_plan_markdown(plan)
|
|
return {
|
|
"next": next_agent,
|
|
"metadata": {
|
|
**state.get("metadata", {}),
|
|
"investigation_plan": plan.model_dump(),
|
|
"routing_reasoning": f"Executing plan step 1: {plan.steps[0] if plan.steps else 'Start'}",
|
|
"plan_step": 0,
|
|
"plan_text": plan_text,
|
|
"show_plan": True,
|
|
},
|
|
# Preserve memory context in state
|
|
"memory_context": state.get("memory_context", {}),
|
|
}
|
|
else:
|
|
# Continue executing existing plan
|
|
plan = InvestigationPlan(**existing_plan)
|
|
current_step = state.get("metadata", {}).get("plan_step", 0)
|
|
|
|
# Check if plan is complete
|
|
if current_step >= len(plan.agents_sequence) or not agents_invoked:
|
|
next_step = current_step
|
|
else:
|
|
next_step = current_step + 1
|
|
|
|
if next_step >= len(plan.agents_sequence):
|
|
# Plan complete
|
|
return {
|
|
"next": "FINISH",
|
|
"metadata": {
|
|
**state.get("metadata", {}),
|
|
"routing_reasoning": "Investigation plan completed. Presenting results.",
|
|
"plan_step": next_step,
|
|
},
|
|
# Preserve memory context in state
|
|
"memory_context": state.get("memory_context", {}),
|
|
}
|
|
else:
|
|
# Continue with next agent in plan
|
|
next_agent = plan.agents_sequence[next_step]
|
|
step_description = (
|
|
plan.steps[next_step]
|
|
if next_step < len(plan.steps)
|
|
else f"Execute {next_agent}"
|
|
)
|
|
|
|
return {
|
|
"next": next_agent,
|
|
"metadata": {
|
|
**state.get("metadata", {}),
|
|
"routing_reasoning": f"Executing plan step {next_step + 1}: {step_description}",
|
|
"plan_step": next_step,
|
|
},
|
|
# Preserve memory context in state
|
|
"memory_context": state.get("memory_context", {}),
|
|
}
|
|
|
|
async def aggregate_responses(self, state: AgentState) -> Dict[str, Any]:
|
|
"""Aggregate responses from multiple agents into a final response."""
|
|
agent_results = state.get("agent_results", {})
|
|
metadata = state.get("metadata", {})
|
|
|
|
# Check if this is a plan approval request
|
|
if metadata.get("plan_pending_approval"):
|
|
plan = metadata.get("investigation_plan", {})
|
|
query = state.get("current_query", "Investigation") or "Investigation"
|
|
|
|
# Use enhanced formatting for plan approval
|
|
try:
|
|
approval_response = self.formatter.format_plan_approval(plan, query)
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Failed to use enhanced formatting: {e}, falling back to plain text"
|
|
)
|
|
plan_text = metadata.get("plan_text", "")
|
|
approval_response = f"""## Investigation Plan
|
|
|
|
I've analyzed your query and created the following investigation plan:
|
|
|
|
{plan_text}
|
|
|
|
**Complexity:** {plan.get("complexity", "unknown").title()}
|
|
**Reasoning:** {plan.get("reasoning", "Standard investigation approach")}
|
|
|
|
This plan will help systematically investigate your issue. Would you like me to proceed with this plan, or would you prefer to modify it?
|
|
|
|
You can:
|
|
- Type "proceed" or "yes" to execute the plan
|
|
- Type "modify" to suggest changes
|
|
- Ask specific questions about any step"""
|
|
|
|
return {"final_response": approval_response, "next": "FINISH"}
|
|
|
|
if not agent_results:
|
|
return {"final_response": "No agent responses to aggregate."}
|
|
|
|
# Use enhanced formatting for investigation results
|
|
query = state.get("current_query", "Investigation") or "Investigation"
|
|
plan = metadata.get("investigation_plan")
|
|
|
|
# Get user preferences from memory_context (not directly from state)
|
|
user_preferences = []
|
|
if "memory_context" in state:
|
|
memory_ctx = state["memory_context"]
|
|
user_preferences = memory_ctx.get("user_preferences", [])
|
|
logger.debug(
|
|
f"Memory context found with {len(user_preferences)} user preferences"
|
|
)
|
|
else:
|
|
logger.debug("No memory_context found in state")
|
|
|
|
logger.info(
|
|
f"Retrieved user preferences from memory_context for aggregation: {len(user_preferences)} items"
|
|
)
|
|
logger.debug(f"Full state keys available: {list(state.keys())}")
|
|
|
|
try:
|
|
# Try enhanced formatting first
|
|
final_response = self.formatter.format_investigation_response(
|
|
query=query,
|
|
agent_results=agent_results,
|
|
metadata=metadata,
|
|
plan=plan,
|
|
user_preferences=user_preferences,
|
|
)
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Failed to use enhanced formatting: {e}, falling back to LLM aggregation"
|
|
)
|
|
|
|
# Fallback to LLM-based aggregation
|
|
try:
|
|
# Get system message from prompt loader
|
|
system_prompt = prompt_loader.load_prompt(
|
|
"supervisor_aggregation_system"
|
|
)
|
|
|
|
# Determine if this is plan-based or standard aggregation
|
|
is_plan_based = plan is not None
|
|
|
|
# Prepare template variables
|
|
query = (
|
|
state.get("current_query", "No query provided")
|
|
or "No query provided"
|
|
)
|
|
agent_results_json = json.dumps(
|
|
agent_results, indent=2, default=_json_serializer
|
|
)
|
|
auto_approve_plan = state.get("auto_approve_plan", False) or False
|
|
|
|
# Use the user_preferences we already retrieved
|
|
user_preferences_json = (
|
|
json.dumps(user_preferences, indent=2, default=_json_serializer)
|
|
if user_preferences
|
|
else ""
|
|
)
|
|
|
|
if is_plan_based:
|
|
current_step = metadata.get("plan_step", 0)
|
|
total_steps = len(plan.get("steps", []))
|
|
plan_json = json.dumps(
|
|
plan.get("steps", []), indent=2, default=_json_serializer
|
|
)
|
|
|
|
aggregation_prompt = (
|
|
prompt_loader.get_supervisor_aggregation_prompt(
|
|
is_plan_based=True,
|
|
query=query,
|
|
agent_results=agent_results_json,
|
|
auto_approve_plan=auto_approve_plan,
|
|
current_step=current_step + 1,
|
|
total_steps=total_steps,
|
|
plan=plan_json,
|
|
user_preferences=user_preferences_json,
|
|
)
|
|
)
|
|
else:
|
|
aggregation_prompt = (
|
|
prompt_loader.get_supervisor_aggregation_prompt(
|
|
is_plan_based=False,
|
|
query=query,
|
|
agent_results=agent_results_json,
|
|
auto_approve_plan=auto_approve_plan,
|
|
user_preferences=user_preferences_json,
|
|
)
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error loading aggregation prompts: {e}")
|
|
# Fallback to simple prompt
|
|
system_prompt = "You are an expert at presenting technical investigation results clearly and professionally."
|
|
aggregation_prompt = f"Summarize these findings: {json.dumps(agent_results, indent=2, default=_json_serializer)}"
|
|
|
|
response = await self.llm.ainvoke(
|
|
[
|
|
SystemMessage(content=system_prompt),
|
|
HumanMessage(content=aggregation_prompt),
|
|
]
|
|
)
|
|
|
|
final_response = response.content
|
|
|
|
# Store final response conversation in memory
|
|
user_id = state.get("user_id")
|
|
session_id = state.get("session_id")
|
|
if (
|
|
self.conversation_manager
|
|
and user_id
|
|
and session_id
|
|
and not metadata.get("plan_pending_approval")
|
|
):
|
|
try:
|
|
# Store the final aggregated response
|
|
# Get supervisor display name with fallback
|
|
supervisor_name = getattr(SREConstants.agents, "supervisor", None)
|
|
if supervisor_name:
|
|
supervisor_display_name = supervisor_name.display_name
|
|
else:
|
|
supervisor_display_name = "Supervisor Agent"
|
|
|
|
messages_to_store = [
|
|
(
|
|
f"[Agent: {supervisor_display_name}]\n{final_response}",
|
|
"ASSISTANT",
|
|
)
|
|
]
|
|
|
|
success = self.conversation_manager.store_conversation_batch(
|
|
messages=messages_to_store,
|
|
user_id=user_id,
|
|
session_id=session_id,
|
|
agent_name=supervisor_display_name,
|
|
)
|
|
|
|
if success:
|
|
logger.info(
|
|
"Supervisor: Successfully stored final response conversation"
|
|
)
|
|
else:
|
|
logger.warning(
|
|
"Supervisor: Failed to store final response conversation"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Supervisor: Error storing final response conversation: {e}",
|
|
exc_info=True,
|
|
)
|
|
|
|
# Save investigation summary to memory if enabled
|
|
if self.memory_client and not metadata.get("plan_pending_approval"):
|
|
try:
|
|
incident_id = state.get("incident_id", "auto-generated")
|
|
agents_used = state.get("agents_invoked", [])
|
|
logger.debug(
|
|
f"Saving investigation summary for incident_id={incident_id}, agents_used={agents_used}"
|
|
)
|
|
|
|
# Use user_id as actor_id for investigation summaries (consistent with conversation memory)
|
|
actor_id = state.get(
|
|
"user_id",
|
|
state.get("actor_id", SREConstants.agents.default_actor_id),
|
|
)
|
|
self.memory_hooks.on_investigation_complete(
|
|
state=state, final_response=final_response, actor_id=actor_id
|
|
)
|
|
logger.info(
|
|
f"Saved investigation summary to memory for incident {incident_id}"
|
|
)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Failed to save investigation summary: {e}", exc_info=True
|
|
)
|
|
|
|
return {"final_response": final_response, "next": "FINISH"}
|