Amit Arora cdb450260a
feat(02-usecases): add observability support and documentation improvements (#220)
* feat(sre-agent): add OpenTelemetry observability and tracing

- Add OpenTelemetry tracing to supervisor and memory tools
- Configure OTEL collector with Jaeger backend via docker-compose
- Add trace context propagation between supervisor and workers
- Include run-with-tracing.sh helper script for easy tracing setup
- Update blog post with comprehensive observability section
- Add presentation slides for SRE agent capabilities

* docs(sre-agent): replace mermaid diagram with architecture image

- Replace inline mermaid diagram with external architecture PNG image
- Add detailed component descriptions for AgentCore integration
- Image shows complete flow from customer to AgentCore services

* feat(sre-agent): add assets table with demo video and AI podcast links

- Add assets section with clickable links to demo video and AI-generated podcast
- Include descriptions for each asset to help users understand the content
- Position table prominently after the use case details for visibility

* docs(sre-agent): update blog post with latest code snippets and improvements

- Update Dockerfile snippet to include OpenTelemetry instrumentation
- Update invoke_agent_runtime.py snippet with timeout config and memory personalization
- Remove verbose real-time agent execution traces section while keeping key insights
- Simplify cleanup section to show only essential command
- Ensure all code snippets match latest implementation

* style(sre-agent): apply ruff formatting to Python files

- Format code with ruff formatter for consistent style
- Fix whitespace and indentation issues
- Apply standard Python formatting conventions
- Ensure code adheres to project style guidelines

* chore(sre-agent): remove slide files from docs

- Remove presentation slide markdown files
- Clean up docs directory structure
2025-08-08 09:22:15 -04:00

314 lines
10 KiB
Python

#!/usr/bin/env python3
import asyncio
import logging
import os
from datetime import datetime, timezone
from typing import Any, Dict
from fastapi import FastAPI, HTTPException
from langchain_core.messages import HumanMessage
from langchain_core.tools import BaseTool
from pydantic import BaseModel
from .agent_state import AgentState
from .constants import SREConstants
# Import logging config
from .logging_config import configure_logging
from .multi_agent_langgraph import create_multi_agent_system
# Configure logging based on DEBUG environment variable
# This ensures debug mode works even when not run via __main__
if not logging.getLogger().handlers:
# Check if DEBUG is already set in environment
debug_from_env = os.getenv("DEBUG", "false").lower() in ("true", "1", "yes")
configure_logging(debug_from_env)
# Custom filter to exclude /ping endpoint logs
class PingEndpointFilter(logging.Filter):
def filter(self, record):
# Filter out GET /ping requests from access logs
if hasattr(record, "getMessage"):
message = record.getMessage()
if '"GET /ping HTTP/' in message:
return False
return True
# Configure uvicorn access logger to filter out ping requests
uvicorn_logger = logging.getLogger("uvicorn.access")
uvicorn_logger.addFilter(PingEndpointFilter())
logger = logging.getLogger(__name__)
# Simple FastAPI app
app = FastAPI(title="SRE Agent Runtime", version="1.0.0")
# Simple request/response models
class InvocationRequest(BaseModel):
input: Dict[str, Any]
class InvocationResponse(BaseModel):
output: Dict[str, Any]
# Global variables for agent state
agent_graph = None
tools: list[BaseTool] = []
async def initialize_agent():
"""Initialize the SRE agent system using the same method as CLI."""
global agent_graph, tools
if agent_graph is not None:
return # Already initialized
try:
logger.info("Initializing SRE Agent system...")
# Get provider from environment variable with bedrock as default
provider = os.getenv("LLM_PROVIDER", "bedrock").lower()
# Validate provider
if provider not in ["anthropic", "bedrock"]:
logger.warning(f"Invalid provider '{provider}', defaulting to 'bedrock'")
provider = "bedrock"
logger.info(f"Environment LLM_PROVIDER: {os.getenv('LLM_PROVIDER', 'NOT_SET')}")
logger.info(f"Using LLM provider: {provider}")
logger.info(f"Calling create_multi_agent_system with provider: {provider}")
# Create multi-agent system using the same function as CLI
agent_graph, tools = await create_multi_agent_system(provider)
logger.info(
f"SRE Agent system initialized successfully with {len(tools)} tools"
)
except Exception as e:
from .llm_utils import LLMAccessError, LLMAuthenticationError, LLMProviderError
if isinstance(e, (LLMAuthenticationError, LLMAccessError, LLMProviderError)):
logger.error(f"LLM Provider Error: {e}")
print(f"\n{type(e).__name__}:")
print(str(e))
print("\n💡 Set LLM_PROVIDER environment variable to switch providers:")
other_provider = "anthropic" if provider == "bedrock" else "bedrock"
print(f" export LLM_PROVIDER={other_provider}")
else:
logger.error(f"Failed to initialize SRE Agent system: {e}")
raise
@app.on_event("startup")
async def startup_event():
"""Initialize agent on startup."""
await initialize_agent()
@app.post("/invocations", response_model=InvocationResponse)
async def invoke_agent(request: InvocationRequest):
"""Main agent invocation endpoint."""
global agent_graph, tools
logger.info("Received invocation request")
try:
# Ensure agent is initialized
await initialize_agent()
# Extract user prompt
user_prompt = request.input.get("prompt", "")
if not user_prompt:
raise HTTPException(
status_code=400,
detail="No prompt found in input. Please provide a 'prompt' key in the input.",
)
logger.info(f"Processing query: {user_prompt}")
# Extract session_id and user_id from request
session_id = request.input.get("session_id", "")
user_id = request.input.get("user_id", "default_user")
logger.info(f"Session ID: {session_id}, User ID: {user_id}")
# Create initial state exactly like the CLI does
initial_state: AgentState = {
"messages": [HumanMessage(content=user_prompt)],
"next": "supervisor",
"agent_results": {},
"current_query": user_prompt,
"metadata": {},
"requires_collaboration": False,
"agents_invoked": [],
"final_response": None,
"auto_approve_plan": True, # Always auto-approve plans in runtime mode
"session_id": session_id, # Required for memory retrieval
"user_id": user_id, # Required for user personalization
}
# Process through the agent graph exactly like the CLI
final_response = ""
logger.info("Starting agent graph execution")
async for event in agent_graph.astream(initial_state):
for node_name, node_output in event.items():
logger.info(f"Processing node: {node_name}")
# Log key events from each node
if node_name == "supervisor":
next_agent = node_output.get("next", "")
metadata = node_output.get("metadata", {})
logger.info(f"Supervisor routing to: {next_agent}")
if metadata.get("routing_reasoning"):
logger.info(
f"Routing reasoning: {metadata['routing_reasoning']}"
)
elif node_name in [
"kubernetes_agent",
"logs_agent",
"metrics_agent",
"runbooks_agent",
]:
agent_results = node_output.get("agent_results", {})
logger.info(f"{node_name} completed with results")
# Capture final response from aggregate node
elif node_name == "aggregate":
final_response = node_output.get("final_response", "")
logger.info("Aggregate node completed, final response captured")
if not final_response:
logger.warning("No final response received from agent graph")
final_response = (
"I encountered an issue processing your request. Please try again."
)
else:
logger.info(f"Final response length: {len(final_response)} characters")
# Simple response format
response_data = {
"message": final_response,
"timestamp": datetime.now(timezone.utc).isoformat(),
"model": SREConstants.app.agent_model_name,
}
logger.info("Successfully processed agent request")
logger.info("Returning invocation response")
return InvocationResponse(output=response_data)
except HTTPException:
raise
except Exception as e:
logger.error(f"Agent processing failed: {e}")
logger.exception("Full exception details:")
raise HTTPException(
status_code=500, detail=f"Agent processing failed: {str(e)}"
)
@app.get("/ping")
async def ping():
"""Health check endpoint."""
return {"status": "healthy"}
async def invoke_sre_agent_async(prompt: str, provider: str = "anthropic") -> str:
"""
Programmatic interface to invoke SRE agent.
Args:
prompt: The user prompt/query
provider: LLM provider ("anthropic" or "bedrock")
Returns:
The agent's response as a string
"""
try:
# Create the multi-agent system
graph, tools = await create_multi_agent_system(provider=provider)
# Create initial state
initial_state: AgentState = {
"messages": [HumanMessage(content=prompt)],
"next": "supervisor",
"agent_results": {},
"current_query": prompt,
"metadata": {},
"requires_collaboration": False,
"agents_invoked": [],
"final_response": None,
}
# Execute and get final response
final_response = ""
async for event in graph.astream(initial_state):
for node_name, node_output in event.items():
if node_name == "aggregate":
final_response = node_output.get("final_response", "")
return final_response or "I encountered an issue processing your request."
except Exception as e:
logger.error(f"Agent invocation failed: {e}")
raise
def invoke_sre_agent(prompt: str, provider: str = "anthropic") -> str:
"""
Synchronous wrapper for invoke_sre_agent_async.
Args:
prompt: The user prompt/query
provider: LLM provider ("anthropic" or "bedrock")
Returns:
The agent's response as a string
"""
return asyncio.run(invoke_sre_agent_async(prompt, provider))
if __name__ == "__main__":
import argparse
import uvicorn
parser = argparse.ArgumentParser(description="SRE Agent Runtime")
parser.add_argument(
"--provider",
choices=["anthropic", "bedrock"],
default=os.getenv("LLM_PROVIDER", "bedrock"),
help="LLM provider to use (default: bedrock)",
)
parser.add_argument("--host", default="0.0.0.0", help="Host to bind to")
parser.add_argument("--port", type=int, default=8080, help="Port to bind to")
parser.add_argument(
"--debug",
action="store_true",
help="Enable debug logging and trace output",
)
args = parser.parse_args()
# Configure logging based on debug flag
from .logging_config import configure_logging
debug_enabled = configure_logging(args.debug)
# Set environment variables
os.environ["LLM_PROVIDER"] = args.provider
os.environ["DEBUG"] = "true" if debug_enabled else "false"
logger.info(f"Starting SRE Agent Runtime with provider: {args.provider}")
if debug_enabled:
logger.info("Debug logging enabled")
uvicorn.run(app, host=args.host, port=args.port)