mirror of
https://github.com/awslabs/amazon-bedrock-agentcore-samples.git
synced 2025-09-08 20:50:46 +00:00
- Copy SRE Agent codebase to 02-use-cases/04-SRE-agent - Update LICENSE link to reference main repository LICENSE - Configure .gitignore to exclude wheel files from version control
371 lines
14 KiB
Python
371 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Literal
|
|
|
|
from langchain_anthropic import ChatAnthropic
|
|
from langchain_aws import ChatBedrock
|
|
from langchain_core.messages import HumanMessage, SystemMessage
|
|
from pydantic import BaseModel, Field
|
|
|
|
from .agent_state import AgentState
|
|
from .output_formatter import create_formatter
|
|
|
|
# Configure logging with basicConfig
|
|
logging.basicConfig(
|
|
level=logging.INFO, # Set the log level to INFO
|
|
# Define log message format
|
|
format="%(asctime)s,p%(process)s,{%(filename)s:%(lineno)d},%(levelname)s,%(message)s",
|
|
)
|
|
|
|
# Suppress MCP protocol logs
|
|
mcp_loggers = ["streamable_http", "mcp.client.streamable_http", "httpx", "httpcore"]
|
|
|
|
for logger_name in mcp_loggers:
|
|
mcp_logger = logging.getLogger(logger_name)
|
|
mcp_logger.setLevel(logging.WARNING)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class InvestigationPlan(BaseModel):
|
|
"""Investigation plan created by supervisor."""
|
|
|
|
steps: List[str] = Field(
|
|
description="List of 3-5 investigation steps to be executed"
|
|
)
|
|
agents_sequence: List[str] = Field(
|
|
description="Sequence of agents to invoke (kubernetes, logs, metrics, runbooks)"
|
|
)
|
|
complexity: Literal["simple", "complex"] = Field(
|
|
description="Whether this plan is simple (auto-execute) or complex (needs approval)"
|
|
)
|
|
auto_execute: bool = Field(
|
|
description="Whether to execute automatically or ask for user approval"
|
|
)
|
|
reasoning: str = Field(
|
|
description="Brief explanation of the investigation approach"
|
|
)
|
|
|
|
|
|
class RouteDecision(BaseModel):
|
|
"""Decision made by supervisor for routing."""
|
|
|
|
next: Literal["kubernetes", "logs", "metrics", "runbooks", "FINISH"] = Field(
|
|
description="The next agent to route to, or FINISH if done"
|
|
)
|
|
reasoning: str = Field(
|
|
description="Brief explanation of why this routing decision was made"
|
|
)
|
|
|
|
|
|
def _read_supervisor_prompt() -> str:
|
|
"""Read supervisor system prompt from file."""
|
|
try:
|
|
prompt_path = (
|
|
Path(__file__).parent
|
|
/ "config"
|
|
/ "prompts"
|
|
/ "supervisor_multi_agent_prompt.txt"
|
|
)
|
|
if prompt_path.exists():
|
|
return prompt_path.read_text().strip()
|
|
except Exception as e:
|
|
logger.warning(f"Could not read supervisor prompt file: {e}")
|
|
|
|
# Default prompt if file not found
|
|
return """You are the Supervisor Agent orchestrating a team of specialized SRE agents.
|
|
|
|
Your team consists of:
|
|
1. Kubernetes Infrastructure Agent - Handles K8s cluster operations, pod status, deployments, and resource monitoring
|
|
2. Application Logs Agent - Analyzes logs, searches for patterns, and identifies errors
|
|
3. Performance Metrics Agent - Monitors application performance, resource usage, and availability
|
|
4. Operational Runbooks Agent - Provides troubleshooting guides and operational procedures
|
|
|
|
Your responsibilities:
|
|
- Analyze incoming queries and determine which agent(s) should handle them
|
|
- Route queries to the most appropriate agent based on the query content
|
|
- Determine if multiple agents need to collaborate
|
|
- Aggregate responses when multiple agents are involved
|
|
- Provide clear, actionable responses to users
|
|
|
|
Routing guidelines:
|
|
- For Kubernetes/infrastructure issues → kubernetes agent
|
|
- For log analysis or error investigation → logs agent
|
|
- For performance/metrics questions → metrics agent
|
|
- For procedures/troubleshooting guides → runbooks agent
|
|
- For complex issues spanning multiple domains → multiple agents
|
|
|
|
Always consider if a query requires multiple perspectives. For example:
|
|
- "Why is my service down?" might need kubernetes (pod status) + logs (errors) + metrics (performance)
|
|
- "Debug high latency" might need metrics (performance data) + logs (error patterns)"""
|
|
|
|
|
|
class SupervisorAgent:
|
|
"""Supervisor agent that orchestrates other agents."""
|
|
|
|
def __init__(self, llm_provider: str = "anthropic", **llm_kwargs):
|
|
self.llm_provider = llm_provider
|
|
self.llm = self._create_llm(**llm_kwargs)
|
|
self.system_prompt = _read_supervisor_prompt()
|
|
self.formatter = create_formatter()
|
|
|
|
def _create_llm(self, **kwargs):
|
|
"""Create LLM instance based on provider."""
|
|
if self.llm_provider == "anthropic":
|
|
return ChatAnthropic(
|
|
model=kwargs.get("model_id", "claude-sonnet-4-20250514"),
|
|
max_tokens=kwargs.get("max_tokens", 4096),
|
|
temperature=kwargs.get("temperature", 0.1),
|
|
)
|
|
elif self.llm_provider == "bedrock":
|
|
return ChatBedrock(
|
|
model_id=kwargs.get("model_id", "us.amazon.nova-micro-v1:0"),
|
|
region_name=kwargs.get("region_name", "us-east-1"),
|
|
model_kwargs={
|
|
"temperature": kwargs.get("temperature", 0.1),
|
|
"max_tokens": kwargs.get("max_tokens", 4096),
|
|
},
|
|
)
|
|
else:
|
|
raise ValueError(f"Unsupported provider: {self.llm_provider}")
|
|
|
|
async def create_investigation_plan(self, state: AgentState) -> InvestigationPlan:
|
|
"""Create an investigation plan for the user's query."""
|
|
current_query = state.get("current_query", "No query provided")
|
|
|
|
planning_prompt = f"""{self.system_prompt}
|
|
|
|
User's query: {current_query}
|
|
|
|
Create a simple, focused investigation plan with 2-3 steps maximum. Consider:
|
|
- Start with the most relevant single agent
|
|
- Add one follow-up agent only if clearly needed
|
|
- Keep it simple - most queries need only 1-2 agents
|
|
- Mark as simple unless it involves production changes or multiple domains
|
|
|
|
Return a structured plan."""
|
|
|
|
structured_llm = self.llm.with_structured_output(InvestigationPlan)
|
|
|
|
plan = await structured_llm.ainvoke(
|
|
[
|
|
SystemMessage(content=planning_prompt),
|
|
HumanMessage(content=current_query),
|
|
]
|
|
)
|
|
|
|
logger.info(
|
|
f"Created investigation plan: {len(plan.steps)} steps, complexity: {plan.complexity}"
|
|
)
|
|
return plan
|
|
|
|
def _format_plan_markdown(self, plan: InvestigationPlan) -> str:
|
|
"""Format investigation plan as properly formatted markdown."""
|
|
plan_text = "## 🔍 Investigation Plan\n\n"
|
|
|
|
# Add steps with proper numbering and formatting
|
|
for i, step in enumerate(plan.steps, 1):
|
|
plan_text += f"**{i}.** {step}\n\n"
|
|
|
|
# Add metadata
|
|
plan_text += f"**📊 Complexity:** {plan.complexity.title()}\n"
|
|
plan_text += f"**🤖 Auto-execute:** {'Yes' if plan.auto_execute else 'No'}\n"
|
|
if plan.reasoning:
|
|
plan_text += f"**💭 Reasoning:** {plan.reasoning}\n"
|
|
|
|
# Add agents involved
|
|
if plan.agents_sequence:
|
|
agents_list = ", ".join(
|
|
[agent.replace("_", " ").title() for agent in plan.agents_sequence]
|
|
)
|
|
plan_text += f"**👥 Agents involved:** {agents_list}\n"
|
|
|
|
return plan_text
|
|
|
|
async def route(self, state: AgentState) -> Dict[str, Any]:
|
|
"""Determine which agent should handle the query next."""
|
|
agents_invoked = state.get("agents_invoked", [])
|
|
|
|
# Check if we have an existing plan
|
|
existing_plan = state.get("metadata", {}).get("investigation_plan")
|
|
|
|
if not existing_plan:
|
|
# First time - create investigation plan
|
|
plan = await self.create_investigation_plan(state)
|
|
|
|
if not plan.auto_execute:
|
|
# Complex plan - present to user for approval
|
|
plan_text = self._format_plan_markdown(plan)
|
|
return {
|
|
"next": "FINISH",
|
|
"metadata": {
|
|
**state.get("metadata", {}),
|
|
"investigation_plan": plan.model_dump(),
|
|
"routing_reasoning": f"Created investigation plan. Complexity: {plan.complexity}",
|
|
"plan_pending_approval": True,
|
|
"plan_text": plan_text,
|
|
},
|
|
}
|
|
else:
|
|
# Simple plan - start execution
|
|
next_agent = (
|
|
plan.agents_sequence[0] if plan.agents_sequence else "FINISH"
|
|
)
|
|
plan_text = self._format_plan_markdown(plan)
|
|
return {
|
|
"next": next_agent,
|
|
"metadata": {
|
|
**state.get("metadata", {}),
|
|
"investigation_plan": plan.model_dump(),
|
|
"routing_reasoning": f"Executing plan step 1: {plan.steps[0] if plan.steps else 'Start'}",
|
|
"plan_step": 0,
|
|
"plan_text": plan_text,
|
|
"show_plan": True,
|
|
},
|
|
}
|
|
else:
|
|
# Continue executing existing plan
|
|
plan = InvestigationPlan(**existing_plan)
|
|
current_step = state.get("metadata", {}).get("plan_step", 0)
|
|
|
|
# Check if plan is complete
|
|
if current_step >= len(plan.agents_sequence) or not agents_invoked:
|
|
next_step = current_step
|
|
else:
|
|
next_step = current_step + 1
|
|
|
|
if next_step >= len(plan.agents_sequence):
|
|
# Plan complete
|
|
return {
|
|
"next": "FINISH",
|
|
"metadata": {
|
|
**state.get("metadata", {}),
|
|
"routing_reasoning": "Investigation plan completed. Presenting results.",
|
|
"plan_step": next_step,
|
|
},
|
|
}
|
|
else:
|
|
# Continue with next agent in plan
|
|
next_agent = plan.agents_sequence[next_step]
|
|
step_description = (
|
|
plan.steps[next_step]
|
|
if next_step < len(plan.steps)
|
|
else f"Execute {next_agent}"
|
|
)
|
|
|
|
return {
|
|
"next": next_agent,
|
|
"metadata": {
|
|
**state.get("metadata", {}),
|
|
"routing_reasoning": f"Executing plan step {next_step + 1}: {step_description}",
|
|
"plan_step": next_step,
|
|
},
|
|
}
|
|
|
|
async def aggregate_responses(self, state: AgentState) -> Dict[str, Any]:
|
|
"""Aggregate responses from multiple agents into a final response."""
|
|
agent_results = state.get("agent_results", {})
|
|
metadata = state.get("metadata", {})
|
|
|
|
# Check if this is a plan approval request
|
|
if metadata.get("plan_pending_approval"):
|
|
plan = metadata.get("investigation_plan", {})
|
|
query = state.get("current_query", "Investigation")
|
|
|
|
# Use enhanced formatting for plan approval
|
|
try:
|
|
approval_response = self.formatter.format_plan_approval(plan, query)
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Failed to use enhanced formatting: {e}, falling back to plain text"
|
|
)
|
|
plan_text = metadata.get("plan_text", "")
|
|
approval_response = f"""## Investigation Plan
|
|
|
|
I've analyzed your query and created the following investigation plan:
|
|
|
|
{plan_text}
|
|
|
|
**Complexity:** {plan.get('complexity', 'unknown').title()}
|
|
**Reasoning:** {plan.get('reasoning', 'Standard investigation approach')}
|
|
|
|
This plan will help systematically investigate your issue. Would you like me to proceed with this plan, or would you prefer to modify it?
|
|
|
|
You can:
|
|
- Type "proceed" or "yes" to execute the plan
|
|
- Type "modify" to suggest changes
|
|
- Ask specific questions about any step"""
|
|
|
|
return {"final_response": approval_response, "next": "FINISH"}
|
|
|
|
if not agent_results:
|
|
return {"final_response": "No agent responses to aggregate."}
|
|
|
|
# Use enhanced formatting for investigation results
|
|
query = state.get("current_query", "Investigation")
|
|
plan = metadata.get("investigation_plan")
|
|
|
|
try:
|
|
# Try enhanced formatting first
|
|
final_response = self.formatter.format_investigation_response(
|
|
query=query, agent_results=agent_results, metadata=metadata, plan=plan
|
|
)
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Failed to use enhanced formatting: {e}, falling back to LLM aggregation"
|
|
)
|
|
|
|
# Fallback to LLM-based aggregation
|
|
if plan:
|
|
# Plan-based aggregation
|
|
current_step = metadata.get("plan_step", 0)
|
|
total_steps = len(plan.get("steps", []))
|
|
|
|
aggregation_prompt = f"""You are presenting results from a planned investigation.
|
|
|
|
Original query: {state.get('current_query', 'No query provided')}
|
|
|
|
Investigation Plan Progress: Step {current_step + 1} of {total_steps}
|
|
Plan: {json.dumps(plan.get('steps', []), indent=2)}
|
|
|
|
Agent findings:
|
|
{json.dumps(agent_results, indent=2)}
|
|
|
|
Present the results clearly:
|
|
1. **Current Status**: What we've completed so far
|
|
2. **Key Findings**: Important discoveries from this investigation
|
|
3. **Next Steps**: What happens next (if plan continues) or recommendations
|
|
|
|
Keep it professional and focused on the investigation results."""
|
|
else:
|
|
# Standard aggregation
|
|
aggregation_prompt = f"""You are synthesizing findings from specialized agents.
|
|
|
|
Original query: {state.get('current_query', 'No query provided')}
|
|
|
|
Agent findings:
|
|
{json.dumps(agent_results, indent=2)}
|
|
|
|
Create a comprehensive response that:
|
|
1. **Summarizes key findings** from the investigation
|
|
2. **Highlights the most important insights** discovered
|
|
3. **Provides actionable recommendations**
|
|
|
|
Keep the response professional and focused."""
|
|
|
|
response = await self.llm.ainvoke(
|
|
[
|
|
SystemMessage(
|
|
content="You are an expert at presenting technical investigation results clearly and professionally."
|
|
),
|
|
HumanMessage(content=aggregation_prompt),
|
|
]
|
|
)
|
|
|
|
final_response = response.content
|
|
|
|
return {"final_response": final_response, "next": "FINISH"}
|