Dheeraj Oruganty e346e83bf1
fix(02-use-cases): SRE-Agent Deployment (#179)
* Add missing credential_provider_name parameter to config.yaml.example

* Fix get_config function to properly parse YAML values with inline comments

* Enhanced get_config to prevent copy-paste whitespace errors in AWS identifiers

* Improve LLM provider configuration and error handling with bedrock as default

* Add OpenAPI templating system and fix hardcoded regions

* Add backend template build to Readme

* delete old yaml files

* Fix Cognito setup with automation script and missing domain creation steps

* docs: Add EC2 instance port configuration documentation

- Document required inbound ports (443, 8011-8014)
- Include SSL/TLS security requirements
- Add AWS security group best practices
- Provide port usage summary table

* docs: Add hyperlinks to prerequisites in README

- Link EC2 port configuration documentation
- Link IAM role authentication setup
- Improve navigation to detailed setup instructions

* docs: Add BACKEND_API_KEY to configuration documentation

- Document gateway environment variables section
- Add BACKEND_API_KEY requirement for credential provider
- Include example .env file format for gateway directory
- Explain usage in create_gateway.sh script

* docs: Add BACKEND_API_KEY to deployment guide environment variables

- Include BACKEND_API_KEY in environment variables reference table
- Mark as required for gateway setup
- Provide quick reference alongside other required variables

* docs: Add BedrockAgentCoreFullAccess policy and trust policy documentation

- Document AWS managed policy BedrockAgentCoreFullAccess
- Add trust policy requirements for bedrock-agentcore.amazonaws.com
- Reorganize IAM permissions for better clarity
- Remove duplicate trust policy section
- Add IAM role requirement to deployment prerequisites

* docs: Document role_name field in gateway config example

- Explain that role_name is used to create and manage the gateway
- Specify BedrockAgentCoreFullAccess policy requirement
- Note trust policy requirement for bedrock-agentcore.amazonaws.com
- Improve clarity for gateway configuration setup

* docs: Add AWS IP address ranges for production security enhancement

- Document AWS IP ranges JSON download for restricting access
- Reference official AWS documentation for IP address ranges
- Provide security alternatives to 0.0.0.0/0 for production
- Include examples of restricted security group configurations
- Enable egress filtering and region-specific access control

* style: Format Python code with black

- Reformat 14 Python files for consistent code style
- Apply PEP 8 formatting standards
- Improve code readability and maintainability

* docs: Update SRE agent prerequisites and setup documentation

- Convert prerequisites section to markdown table format
- Add SSL certificate provider examples (no-ip.com, letsencrypt.org)
- Add Identity Provider (IDP) requirement with setup_cognito.sh reference
- Clarify that all prerequisites must be completed before setup
- Add reference to domain name and cert paths needed for BACKEND_DOMAIN
- Remove Managing OpenAPI Specifications section (covered in use-case setup)
- Add Deployment Guide link to Development to Production section

Addresses issues #171 and #174

* fix: Replace 'AWS Bedrock' with 'Amazon Bedrock' in SRE agent files

- Updated error messages in llm_utils.py
- Updated comments in both .env.example files
- Ensures consistent naming convention across SRE agent codebase

---------

Co-authored-by: dheerajoruganty <dheo@amazon.com>
Co-authored-by: Amit Arora <aroraai@amazon.com>
2025-08-01 13:24:58 -04:00

375 lines
15 KiB
Python

#!/usr/bin/env python3
import json
import logging
from pathlib import Path
from typing import Any, Dict, List, Literal
from langchain_anthropic import ChatAnthropic
from langchain_aws import ChatBedrock
from .llm_utils import create_llm_with_error_handling
from langchain_core.messages import HumanMessage, SystemMessage
from pydantic import BaseModel, Field
from .agent_state import AgentState
from .constants import SREConstants
from .output_formatter import create_formatter
from .prompt_loader import prompt_loader
# Configure logging with basicConfig
logging.basicConfig(
level=logging.INFO, # Set the log level to INFO
# Define log message format
format="%(asctime)s,p%(process)s,{%(filename)s:%(lineno)d},%(levelname)s,%(message)s",
)
# Enable HTTP and MCP protocol logs for debugging
# Comment out the following lines to suppress these logs if needed
# mcp_loggers = ["streamable_http", "mcp.client.streamable_http", "httpx", "httpcore"]
#
# for logger_name in mcp_loggers:
# mcp_logger = logging.getLogger(logger_name)
# mcp_logger.setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
class InvestigationPlan(BaseModel):
"""Investigation plan created by supervisor."""
steps: List[str] = Field(
description="List of 3-5 investigation steps to be executed"
)
agents_sequence: List[str] = Field(
description="Sequence of agents to invoke (kubernetes, logs, metrics, runbooks)"
)
complexity: Literal["simple", "complex"] = Field(
description="Whether this plan is simple (auto-execute) or complex (needs approval)"
)
auto_execute: bool = Field(
description="Whether to execute automatically or ask for user approval"
)
reasoning: str = Field(
description="Brief explanation of the investigation approach"
)
class RouteDecision(BaseModel):
"""Decision made by supervisor for routing."""
next: Literal["kubernetes", "logs", "metrics", "runbooks", "FINISH"] = Field(
description="The next agent to route to, or FINISH if done"
)
reasoning: str = Field(
description="Brief explanation of why this routing decision was made"
)
def _read_supervisor_prompt() -> str:
"""Read supervisor system prompt from file."""
try:
prompt_path = (
Path(__file__).parent
/ "config"
/ "prompts"
/ "supervisor_multi_agent_prompt.txt"
)
if prompt_path.exists():
return prompt_path.read_text().strip()
except Exception as e:
logger.warning(f"Could not read supervisor prompt file: {e}")
# Default prompt if file not found
return """You are the Supervisor Agent orchestrating a team of specialized SRE agents.
Your team consists of:
1. Kubernetes Infrastructure Agent - Handles K8s cluster operations, pod status, deployments, and resource monitoring
2. Application Logs Agent - Analyzes logs, searches for patterns, and identifies errors
3. Performance Metrics Agent - Monitors application performance, resource usage, and availability
4. Operational Runbooks Agent - Provides troubleshooting guides and operational procedures
Your responsibilities:
- Analyze incoming queries and determine which agent(s) should handle them
- Route queries to the most appropriate agent based on the query content
- Determine if multiple agents need to collaborate
- Aggregate responses when multiple agents are involved
- Provide clear, actionable responses to users
Routing guidelines:
- For Kubernetes/infrastructure issues → kubernetes agent
- For log analysis or error investigation → logs agent
- For performance/metrics questions → metrics agent
- For procedures/troubleshooting guides → runbooks agent
- For complex issues spanning multiple domains → multiple agents
Always consider if a query requires multiple perspectives. For example:
- "Why is my service down?" might need kubernetes (pod status) + logs (errors) + metrics (performance)
- "Debug high latency" might need metrics (performance data) + logs (error patterns)"""
class SupervisorAgent:
"""Supervisor agent that orchestrates other agents."""
def __init__(self, llm_provider: str = "bedrock", **llm_kwargs):
self.llm_provider = llm_provider
self.llm = self._create_llm(**llm_kwargs)
self.system_prompt = _read_supervisor_prompt()
self.formatter = create_formatter(llm_provider=llm_provider)
def _create_llm(self, **kwargs):
"""Create LLM instance with improved error handling."""
return create_llm_with_error_handling(self.llm_provider, **kwargs)
async def create_investigation_plan(self, state: AgentState) -> InvestigationPlan:
"""Create an investigation plan for the user's query."""
current_query = state.get("current_query", "No query provided")
planning_prompt = f"""{self.system_prompt}
User's query: {current_query}
Create a simple, focused investigation plan with 2-3 steps maximum. Consider:
- Start with the most relevant single agent
- Add one follow-up agent only if clearly needed
- Keep it simple - most queries need only 1-2 agents
- Mark as simple unless it involves production changes or multiple domains
Return a structured plan."""
structured_llm = self.llm.with_structured_output(InvestigationPlan)
plan = await structured_llm.ainvoke(
[
SystemMessage(content=planning_prompt),
HumanMessage(content=current_query),
]
)
logger.info(
f"Created investigation plan: {len(plan.steps)} steps, complexity: {plan.complexity}"
)
return plan
def _format_plan_markdown(self, plan: InvestigationPlan) -> str:
"""Format investigation plan as properly formatted markdown."""
plan_text = "## 🔍 Investigation Plan\n\n"
# Add steps with proper numbering and formatting
for i, step in enumerate(plan.steps, 1):
plan_text += f"**{i}.** {step}\n\n"
# Add metadata
plan_text += f"**📊 Complexity:** {plan.complexity.title()}\n"
plan_text += f"**🤖 Auto-execute:** {'Yes' if plan.auto_execute else 'No'}\n"
if plan.reasoning:
plan_text += f"**💭 Reasoning:** {plan.reasoning}\n"
# Add agents involved
if plan.agents_sequence:
agents_list = ", ".join(
[agent.replace("_", " ").title() for agent in plan.agents_sequence]
)
plan_text += f"**👥 Agents involved:** {agents_list}\n"
return plan_text
async def route(self, state: AgentState) -> Dict[str, Any]:
"""Determine which agent should handle the query next."""
agents_invoked = state.get("agents_invoked", [])
# Check if we have an existing plan
existing_plan = state.get("metadata", {}).get("investigation_plan")
if not existing_plan:
# First time - create investigation plan
plan = await self.create_investigation_plan(state)
# Check if we should auto-approve the plan (defaults to False if not set)
auto_approve = state.get("auto_approve_plan", False)
if not plan.auto_execute and not auto_approve:
# Complex plan - present to user for approval
plan_text = self._format_plan_markdown(plan)
return {
"next": "FINISH",
"metadata": {
**state.get("metadata", {}),
"investigation_plan": plan.model_dump(),
"routing_reasoning": f"Created investigation plan. Complexity: {plan.complexity}",
"plan_pending_approval": True,
"plan_text": plan_text,
},
}
else:
# Simple plan - start execution
next_agent = (
plan.agents_sequence[0] if plan.agents_sequence else "FINISH"
)
plan_text = self._format_plan_markdown(plan)
return {
"next": next_agent,
"metadata": {
**state.get("metadata", {}),
"investigation_plan": plan.model_dump(),
"routing_reasoning": f"Executing plan step 1: {plan.steps[0] if plan.steps else 'Start'}",
"plan_step": 0,
"plan_text": plan_text,
"show_plan": True,
},
}
else:
# Continue executing existing plan
plan = InvestigationPlan(**existing_plan)
current_step = state.get("metadata", {}).get("plan_step", 0)
# Check if plan is complete
if current_step >= len(plan.agents_sequence) or not agents_invoked:
next_step = current_step
else:
next_step = current_step + 1
if next_step >= len(plan.agents_sequence):
# Plan complete
return {
"next": "FINISH",
"metadata": {
**state.get("metadata", {}),
"routing_reasoning": "Investigation plan completed. Presenting results.",
"plan_step": next_step,
},
}
else:
# Continue with next agent in plan
next_agent = plan.agents_sequence[next_step]
step_description = (
plan.steps[next_step]
if next_step < len(plan.steps)
else f"Execute {next_agent}"
)
return {
"next": next_agent,
"metadata": {
**state.get("metadata", {}),
"routing_reasoning": f"Executing plan step {next_step + 1}: {step_description}",
"plan_step": next_step,
},
}
async def aggregate_responses(self, state: AgentState) -> Dict[str, Any]:
"""Aggregate responses from multiple agents into a final response."""
agent_results = state.get("agent_results", {})
metadata = state.get("metadata", {})
# Check if this is a plan approval request
if metadata.get("plan_pending_approval"):
plan = metadata.get("investigation_plan", {})
query = state.get("current_query", "Investigation") or "Investigation"
# Use enhanced formatting for plan approval
try:
approval_response = self.formatter.format_plan_approval(plan, query)
except Exception as e:
logger.warning(
f"Failed to use enhanced formatting: {e}, falling back to plain text"
)
plan_text = metadata.get("plan_text", "")
approval_response = f"""## Investigation Plan
I've analyzed your query and created the following investigation plan:
{plan_text}
**Complexity:** {plan.get('complexity', 'unknown').title()}
**Reasoning:** {plan.get('reasoning', 'Standard investigation approach')}
This plan will help systematically investigate your issue. Would you like me to proceed with this plan, or would you prefer to modify it?
You can:
- Type "proceed" or "yes" to execute the plan
- Type "modify" to suggest changes
- Ask specific questions about any step"""
return {"final_response": approval_response, "next": "FINISH"}
if not agent_results:
return {"final_response": "No agent responses to aggregate."}
# Use enhanced formatting for investigation results
query = state.get("current_query", "Investigation") or "Investigation"
plan = metadata.get("investigation_plan")
try:
# Try enhanced formatting first
final_response = self.formatter.format_investigation_response(
query=query, agent_results=agent_results, metadata=metadata, plan=plan
)
except Exception as e:
logger.warning(
f"Failed to use enhanced formatting: {e}, falling back to LLM aggregation"
)
# Fallback to LLM-based aggregation
try:
# Get system message from prompt loader
system_prompt = prompt_loader.load_prompt(
"supervisor_aggregation_system"
)
# Determine if this is plan-based or standard aggregation
is_plan_based = plan is not None
# Prepare template variables
query = (
state.get("current_query", "No query provided")
or "No query provided"
)
agent_results_json = json.dumps(agent_results, indent=2)
auto_approve_plan = state.get("auto_approve_plan", False) or False
if is_plan_based:
current_step = metadata.get("plan_step", 0)
total_steps = len(plan.get("steps", []))
plan_json = json.dumps(plan.get("steps", []), indent=2)
aggregation_prompt = (
prompt_loader.get_supervisor_aggregation_prompt(
is_plan_based=True,
query=query,
agent_results=agent_results_json,
auto_approve_plan=auto_approve_plan,
current_step=current_step + 1,
total_steps=total_steps,
plan=plan_json,
)
)
else:
aggregation_prompt = (
prompt_loader.get_supervisor_aggregation_prompt(
is_plan_based=False,
query=query,
agent_results=agent_results_json,
auto_approve_plan=auto_approve_plan,
)
)
except Exception as e:
logger.error(f"Error loading aggregation prompts: {e}")
# Fallback to simple prompt
system_prompt = "You are an expert at presenting technical investigation results clearly and professionally."
aggregation_prompt = (
f"Summarize these findings: {json.dumps(agent_results, indent=2)}"
)
response = await self.llm.ainvoke(
[
SystemMessage(content=system_prompt),
HumanMessage(content=aggregation_prompt),
]
)
final_response = response.content
return {"final_response": final_response, "next": "FINISH"}