378 lines
17 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
import logging
from functools import lru_cache
from pathlib import Path
from typing import Any, Dict, List
import yaml
from langchain_anthropic import ChatAnthropic
from langchain_aws import ChatBedrock
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.tools import BaseTool
from langgraph.prebuilt import create_react_agent
from .agent_state import AgentState
# Configure logging with basicConfig
logging.basicConfig(
level=logging.INFO, # Set the log level to INFO
# Define log message format
format="%(asctime)s,p%(process)s,{%(filename)s:%(lineno)d},%(levelname)s,%(message)s",
)
# Suppress MCP protocol logs
mcp_loggers = ["streamable_http", "mcp.client.streamable_http", "httpx", "httpcore"]
for logger_name in mcp_loggers:
mcp_logger = logging.getLogger(logger_name)
mcp_logger.setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
@lru_cache(maxsize=1)
def _load_agent_config() -> Dict[str, Any]:
"""Load agent configuration from YAML file."""
config_path = Path(__file__).parent / "config" / "agent_config.yaml"
with open(config_path, "r") as f:
return yaml.safe_load(f)
def _create_llm(provider: str = "anthropic", **kwargs):
"""Create LLM instance based on provider."""
if provider == "anthropic":
return ChatAnthropic(
model=kwargs.get("model_id", "claude-sonnet-4-20250514"),
max_tokens=kwargs.get("max_tokens", 4096),
temperature=kwargs.get("temperature", 0.1),
)
elif provider == "bedrock":
return ChatBedrock(
model_id=kwargs.get("model_id", "us.amazon.nova-micro-v1:0"),
region_name=kwargs.get("region_name", "us-east-1"),
model_kwargs={
"temperature": kwargs.get("temperature", 0.1),
"max_tokens": kwargs.get("max_tokens", 4096),
},
)
else:
raise ValueError(f"Unsupported provider: {provider}")
def _filter_tools_for_agent(
all_tools: List[BaseTool], agent_name: str, config: Dict[str, Any]
) -> List[BaseTool]:
"""Filter tools based on agent configuration."""
agent_config = config["agents"].get(agent_name, {})
allowed_tools = agent_config.get("tools", [])
# Also include global tools
global_tools = config.get("global_tools", [])
allowed_tools.extend(global_tools)
# Filter tools based on their names
filtered_tools = []
for tool in all_tools:
tool_name = getattr(tool, "name", "")
# Remove any prefix from tool name for matching
base_tool_name = tool_name.split("___")[-1] if "___" in tool_name else tool_name
if base_tool_name in allowed_tools:
filtered_tools.append(tool)
logger.info(f"Agent {agent_name} has access to {len(filtered_tools)} tools")
return filtered_tools
class BaseAgentNode:
"""Base class for all agent nodes."""
def __init__(
self,
name: str,
description: str,
tools: List[BaseTool],
llm_provider: str = "anthropic",
**llm_kwargs,
):
self.name = name
self.description = description
self.tools = tools
self.llm = _create_llm(llm_provider, **llm_kwargs)
# Create the react agent
self.agent = create_react_agent(self.llm, self.tools)
def _get_system_prompt(self) -> str:
"""Get system prompt for this agent."""
base_prompt = f"""You are the {self.name}.
{self.description}
You have access to specific tools related to your domain. Use them to help answer questions
and solve problems related to your area of expertise. Be concise and factual in your responses.
CRITICAL: ALWAYS quote your data sources when making statements about investigations and recommendations.
For every claim, finding, or recommendation you make, include the specific source:
- For tool results: "According to [tool_name] output..." or "Based on [tool_name] data..."
- For specific data points: "The [metric_name] shows [value] (source: [tool_name])"
- For runbook procedures: "Per runbook [runbook_id]: [step_details]"
- For status information: "Current status from [tool_name]: [status_details]"
This source attribution is essential for SRE lineage tracking and verification.
MANDATORY ANTI-HALLUCINATION RULE: If no data is available from your tools or if tools return empty results, you MUST clearly state "No data available" or "No results found" rather than fabricating plausible-sounding information. Never invent log entries, metrics values, timestamps, pod names, error messages, or any other data that was not actually returned by your tools.
SERVICE/POD VALIDATION REQUIREMENT: If the user asks about a specific service or pod name that you cannot find in your tool results, you MUST explicitly state: "I do not see the exact [service/pod] '[name]' in the available data. Based on my understanding of the issue, I'm investigating related services that might be impacting the problem you described. The analysis below represents my assessment of services that could be related to your query."
FORBIDDEN BEHAVIORS:
- Creating fake log entries with specific timestamps when tools return empty
- Inventing error messages, stack traces, or database connection strings
- Making up metric values, percentages, or performance numbers
- Fabricating pod names, service names, or configuration details
- Creating plausible but false narrative details to fill information gaps
- Pretending non-existent services or pods exist in the system
Accuracy is critical for SRE operations - wrong information can lead to incorrect troubleshooting decisions.
If a question is outside your domain of expertise, acknowledge this and suggest which other
agent might be better suited to help."""
# Add specific instructions for Kubernetes agent
if "Kubernetes" in self.name:
base_prompt += """
IMPORTANT: If the user doesn't specify a namespace, check the 'production' namespace first and inform the user that you're checking production. Let them know they can specify a different namespace if needed.
KUBERNETES SOURCE ATTRIBUTION EXAMPLES:
- "Based on 'kubectl get pods' output from get_pod_status tool: Pod database-pod-xyz is in CrashLoopBackOff state"
- "According to get_deployment_status tool results: Deployment has 2/3 replicas ready"
- "Per get_cluster_events data: Last event shows 'Failed to pull image' at 14:32:15"
- "get_resource_usage tool indicates: CPU usage at 85% (source: metrics-server)"""
# Add specific instructions for Runbooks agent
elif "Runbooks" in self.name or "Operational" in self.name:
base_prompt += """
CRITICAL RUNBOOK INSTRUCTIONS:
- NEVER just describe what a runbook contains - ALWAYS show the complete, verbatim steps
- DO NOT say "the runbook provides 6 steps" - SHOW ALL 6 STEPS with full details
- NEVER use phrases like "runbook is ready for execution" - DISPLAY the actual execution steps
- Copy and paste the ENTIRE runbook content, including all commands, parameters, and procedures
- Include the full runbook identification (name, ID, version) at the top
- Show every step with specific kubectl commands, bash scripts, or other executable instructions
- Include all safety checks, verification commands, and expected outputs
- Display rollback procedures and troubleshooting steps if provided
- Format with clear numbering, code blocks, and proper markdown
MANDATORY: You MUST show the complete runbook content. SREs need the actual steps to execute, not summaries.
RUNBOOK SOURCE ATTRIBUTION REQUIREMENTS:
- ALWAYS start with: "Per runbook [runbook_id] from [tool_name] tool:"
- Include runbook metadata: "**Source:** [tool_name] query result | **Runbook ID:** [id] | **Title:** [title]"
- For each step, maintain: "Step X from runbook [runbook_id]: [actual_step_content]"
- Include escalation info with source: "Escalation procedures (source: runbook [runbook_id]): [contact_details]"
EXAMPLE (showing full content with sources):
**Source:** search_runbooks tool result | **Runbook ID:** DB-001 | **Title:** Database Pod Recovery
Per runbook DB-001 from search_runbooks tool:
### Step 1 from runbook DB-001: Verify Current State
```bash
kubectl get pods -n production | grep database
kubectl describe pod database-pod -n production
```
**Expected Output (per runbook DB-001):** Pod status showing CrashLoopBackOff
[Continue showing ALL steps with source attribution...]
Remember: Show the COMPLETE runbook with proper source attribution for SRE lineage tracking."""
# Add specific instructions for Logs agent
elif "Logs" in self.name or "Application" in self.name:
base_prompt += """
LOGS SOURCE ATTRIBUTION REQUIREMENTS:
- Always cite the specific log tool used: "According to search_logs tool results:" or "Based on get_error_logs data:"
- Include timestamps and log sources: "Log entry from [timestamp] (source: search_logs): [log_message]"
- Reference log patterns with tool source: "analyze_log_patterns tool identified: [pattern_details]"
- Quote specific log entries: "Error log from get_error_logs: '[actual_log_line]' at [timestamp]"
- Include log context: "From get_recent_logs for service [service_name]: [log_context]"
CRITICAL ANTI-HALLUCINATION RULES FOR LOGS:
- If search_logs returns empty/no results, say "No log entries found" - DO NOT create fake log entries
- If get_error_logs returns no data, say "No error logs available" - DO NOT invent error messages
- If get_recent_logs returns empty, say "No recent logs found" - DO NOT fabricate log entries with timestamps
- NEVER create log entries with specific timestamps (like 14:22:00.123Z) unless they came directly from tool output
- NEVER invent exact error messages, database connection strings, or stack traces
- If tools return "No data available", prominently state this rather than speculating
VALID EXAMPLES:
- "According to search_logs tool: No entries found for 'payment-service' pattern"
- "get_error_logs data: No error logs available for payment-service in the last 24 hours"
- "search_logs tool found ConfigMap error: '[exact_log_line]' at [exact_timestamp_from_tool]"
FORBIDDEN EXAMPLES:
- Creating entries like "Database connection timeout at 14:22:00.123Z" when tools returned empty
- Inventing specific error messages when no errors were found
- Making up log counts or patterns when analyze_log_patterns returned no data
"""
# Add specific instructions for Metrics agent
elif "Metrics" in self.name or "Performance" in self.name:
base_prompt += """
METRICS SOURCE ATTRIBUTION REQUIREMENTS:
- Always cite the metrics tool source: "Per get_performance_metrics data:" or "According to get_resource_metrics:"
- Include metric names and values with sources: "[metric_name]: [value] (source: [tool_name])"
- Reference time ranges: "get_performance_metrics for last 1h shows: [metric_details]"
- Quote exact metric values: "CPU utilization: 85.3% (source: get_resource_metrics)"
- Include trend analysis sources: "analyze_trends tool indicates: [trend_information]"
CRITICAL ANTI-HALLUCINATION RULES FOR METRICS:
- If get_performance_metrics returns no data, say "No performance metrics available" - DO NOT invent response times
- If get_resource_metrics returns empty, say "No resource metrics found" - DO NOT create CPU/memory percentages
- If analyze_trends returns no data, say "No trend data available" - DO NOT fabricate anomaly details
- NEVER create specific metric values (like 2,500ms, 85.3%) unless they came directly from tool output
- NEVER invent error rate percentages, availability numbers, or threshold violations
- If tools return "No data available", state this clearly rather than creating plausible numbers
VALID EXAMPLES:
- "get_performance_metrics: No data available for api-gateway service"
- "According to get_resource_metrics: CPU: 45%, Memory: 60% (source: metrics data)"
- "analyze_trends tool indicates: No trend data found for the requested timeframe"
FORBIDDEN EXAMPLES:
- Creating metrics like "Response time: 2,500ms average" when tools returned no data
- Inventing specific percentages when get_error_rates returned empty
- Making up anomaly details when analyze_trends found no patterns
"""
return base_prompt
async def __call__(self, state: AgentState) -> Dict[str, Any]:
"""Process the current state and return updated state."""
try:
# Get the last user message
messages = state["messages"]
# Create a focused query for this agent
agent_prompt = (
f"As the {self.name}, help with: {state.get('current_query', '')}"
)
# We'll collect all messages and the final response
all_messages = []
agent_response = ""
# Add system prompt and user prompt
system_message = SystemMessage(content=self._get_system_prompt())
user_message = HumanMessage(content=agent_prompt)
# Stream the agent execution to capture tool calls
async for chunk in self.agent.astream(
{"messages": [system_message] + messages + [user_message]}
):
if "agent" in chunk:
agent_step = chunk["agent"]
if "messages" in agent_step:
for msg in agent_step["messages"]:
all_messages.append(msg)
# Always capture the latest content from AIMessages
if (
hasattr(msg, "content")
and hasattr(msg, "__class__")
and "AIMessage" in str(msg.__class__)
):
agent_response = msg.content
elif "tools" in chunk:
tools_step = chunk["tools"]
if "messages" in tools_step:
for msg in tools_step["messages"]:
all_messages.append(msg)
# Debug: Check what we captured
logger.info(
f"{self.name} - Captured response length: {len(agent_response) if agent_response else 0}"
)
if agent_response:
logger.info(f"{self.name} - Full response: {str(agent_response)}")
# Update state with streaming info
return {
"agent_results": {
**state.get("agent_results", {}),
self.name: agent_response,
},
"agents_invoked": state.get("agents_invoked", []) + [self.name],
"messages": messages + all_messages,
"metadata": {
**state.get("metadata", {}),
f"{self.name.replace(' ', '_')}_trace": all_messages,
},
}
except Exception as e:
logger.error(f"Error in {self.name}: {e}")
return {
"agent_results": {
**state.get("agent_results", {}),
self.name: f"Error: {str(e)}",
},
"agents_invoked": state.get("agents_invoked", []) + [self.name],
}
def create_kubernetes_agent(tools: List[BaseTool], **kwargs) -> BaseAgentNode:
"""Create Kubernetes infrastructure agent."""
config = _load_agent_config()
filtered_tools = _filter_tools_for_agent(tools, "kubernetes_agent", config)
return BaseAgentNode(
name="Kubernetes Infrastructure Agent",
description="Manages Kubernetes cluster operations and monitoring",
tools=filtered_tools,
**kwargs,
)
def create_logs_agent(tools: List[BaseTool], **kwargs) -> BaseAgentNode:
"""Create application logs agent."""
config = _load_agent_config()
filtered_tools = _filter_tools_for_agent(tools, "logs_agent", config)
return BaseAgentNode(
name="Application Logs Agent",
description="Handles application log analysis and searching",
tools=filtered_tools,
**kwargs,
)
def create_metrics_agent(tools: List[BaseTool], **kwargs) -> BaseAgentNode:
"""Create performance metrics agent."""
config = _load_agent_config()
filtered_tools = _filter_tools_for_agent(tools, "metrics_agent", config)
return BaseAgentNode(
name="Performance Metrics Agent",
description="Provides application performance and resource metrics",
tools=filtered_tools,
**kwargs,
)
def create_runbooks_agent(tools: List[BaseTool], **kwargs) -> BaseAgentNode:
"""Create operational runbooks agent."""
config = _load_agent_config()
filtered_tools = _filter_tools_for_agent(tools, "runbooks_agent", config)
return BaseAgentNode(
name="Operational Runbooks Agent",
description="Provides operational procedures and troubleshooting guides",
tools=filtered_tools,
**kwargs,
)