Dheeraj Oruganty e346e83bf1
fix(02-use-cases): SRE-Agent Deployment (#179)
* Add missing credential_provider_name parameter to config.yaml.example

* Fix get_config function to properly parse YAML values with inline comments

* Enhanced get_config to prevent copy-paste whitespace errors in AWS identifiers

* Improve LLM provider configuration and error handling with bedrock as default

* Add OpenAPI templating system and fix hardcoded regions

* Add backend template build to Readme

* delete old yaml files

* Fix Cognito setup with automation script and missing domain creation steps

* docs: Add EC2 instance port configuration documentation

- Document required inbound ports (443, 8011-8014)
- Include SSL/TLS security requirements
- Add AWS security group best practices
- Provide port usage summary table

* docs: Add hyperlinks to prerequisites in README

- Link EC2 port configuration documentation
- Link IAM role authentication setup
- Improve navigation to detailed setup instructions

* docs: Add BACKEND_API_KEY to configuration documentation

- Document gateway environment variables section
- Add BACKEND_API_KEY requirement for credential provider
- Include example .env file format for gateway directory
- Explain usage in create_gateway.sh script

* docs: Add BACKEND_API_KEY to deployment guide environment variables

- Include BACKEND_API_KEY in environment variables reference table
- Mark as required for gateway setup
- Provide quick reference alongside other required variables

* docs: Add BedrockAgentCoreFullAccess policy and trust policy documentation

- Document AWS managed policy BedrockAgentCoreFullAccess
- Add trust policy requirements for bedrock-agentcore.amazonaws.com
- Reorganize IAM permissions for better clarity
- Remove duplicate trust policy section
- Add IAM role requirement to deployment prerequisites

* docs: Document role_name field in gateway config example

- Explain that role_name is used to create and manage the gateway
- Specify BedrockAgentCoreFullAccess policy requirement
- Note trust policy requirement for bedrock-agentcore.amazonaws.com
- Improve clarity for gateway configuration setup

* docs: Add AWS IP address ranges for production security enhancement

- Document AWS IP ranges JSON download for restricting access
- Reference official AWS documentation for IP address ranges
- Provide security alternatives to 0.0.0.0/0 for production
- Include examples of restricted security group configurations
- Enable egress filtering and region-specific access control

* style: Format Python code with black

- Reformat 14 Python files for consistent code style
- Apply PEP 8 formatting standards
- Improve code readability and maintainability

* docs: Update SRE agent prerequisites and setup documentation

- Convert prerequisites section to markdown table format
- Add SSL certificate provider examples (no-ip.com, letsencrypt.org)
- Add Identity Provider (IDP) requirement with setup_cognito.sh reference
- Clarify that all prerequisites must be completed before setup
- Add reference to domain name and cert paths needed for BACKEND_DOMAIN
- Remove Managing OpenAPI Specifications section (covered in use-case setup)
- Add Deployment Guide link to Development to Production section

Addresses issues #171 and #174

* fix: Replace 'AWS Bedrock' with 'Amazon Bedrock' in SRE agent files

- Updated error messages in llm_utils.py
- Updated comments in both .env.example files
- Ensures consistent naming convention across SRE agent codebase

---------

Co-authored-by: dheerajoruganty <dheo@amazon.com>
Co-authored-by: Amit Arora <aroraai@amazon.com>
2025-08-01 13:24:58 -04:00

517 lines
16 KiB
Python

import json
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, List
from fastapi import (
FastAPI,
Query,
Header,
HTTPException,
Depends,
)
from pydantic import BaseModel, Field
from enum import Enum
from fastapi.responses import JSONResponse
from retrieve_api_key import retrieve_api_key
# Configure logging with basicConfig
logging.basicConfig(
level=logging.INFO, # Set the log level to INFO
# Define log message format
format="%(asctime)s,p%(process)s,{%(filename)s:%(lineno)d},%(levelname)s,%(message)s",
)
app = FastAPI(title="Kubernetes Analysis API", version="1.0.0")
# Base path for fake data
DATA_PATH = Path(__file__).parent.parent / "data" / "k8s_data"
# API Key for authentication
CREDENTIAL_PROVIDER_NAME = "sre-agent-api-key-credential-provider"
# Retrieve API key from credential provider at startup
try:
EXPECTED_API_KEY = retrieve_api_key(CREDENTIAL_PROVIDER_NAME)
if not EXPECTED_API_KEY:
logging.error("Failed to retrieve API key from credential provider")
raise RuntimeError(
"Cannot start server without valid API key from credential provider"
)
except Exception as e:
logging.error(f"Error retrieving API key: {e}")
raise RuntimeError(f"Cannot start server: {e}") from e
def _validate_api_key(x_api_key: str = Header(None, alias="X-API-Key")):
"""Validate API key from header"""
if not x_api_key or x_api_key != EXPECTED_API_KEY:
raise HTTPException(status_code=401, detail="Invalid or missing API key")
return x_api_key
def _parse_timestamp(timestamp_str: str) -> datetime:
"""Parse ISO timestamp string to datetime object"""
try:
# Handle both with and without timezone
if timestamp_str.endswith("Z"):
return datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
elif "+" in timestamp_str or timestamp_str.endswith("0"):
return datetime.fromisoformat(timestamp_str)
else:
return datetime.fromisoformat(timestamp_str + "+00:00")
except:
# Fallback: assume current time if parsing fails
return datetime.now(timezone.utc)
def _filter_events_by_time(events: list, since: Optional[str] = None) -> list:
"""Filter events by since timestamp"""
if not since:
return events
filtered_events = []
since_dt = _parse_timestamp(since)
for event in events:
event_timestamp = event.get("timestamp")
if not event_timestamp:
continue
try:
event_dt = _parse_timestamp(event_timestamp)
if event_dt >= since_dt:
filtered_events.append(event)
except:
# Include events with unparseable timestamps
filtered_events.append(event)
return filtered_events
# Pydantic Models
class PodStatus(str, Enum):
"""Pod status enumeration"""
RUNNING = "Running"
PENDING = "Pending"
SUCCEEDED = "Succeeded"
FAILED = "Failed"
UNKNOWN = "Unknown"
CRASHLOOPBACKOFF = "CrashLoopBackOff"
class PodPhase(str, Enum):
"""Pod phase enumeration"""
PENDING = "Pending"
RUNNING = "Running"
SUCCEEDED = "Succeeded"
FAILED = "Failed"
UNKNOWN = "Unknown"
class ResourceUsage(BaseModel):
"""Resource usage information"""
cpu: str = Field(..., description="CPU request/limit", example="250m")
memory: str = Field(..., description="Memory request/limit", example="512Mi")
cpu_utilization: str = Field(
..., description="CPU utilization percentage", example="75%"
)
memory_utilization: str = Field(
..., description="Memory utilization percentage", example="85%"
)
class Pod(BaseModel):
"""Pod information model"""
name: str = Field(
..., description="Pod name", example="web-app-deployment-5c8d7f9b6d-k2n8p"
)
namespace: str = Field(
..., description="Kubernetes namespace", example="production"
)
status: PodStatus = Field(..., description="Pod status")
phase: PodPhase = Field(..., description="Pod phase")
node: str = Field(..., description="Node where pod is running", example="node-1")
created_at: str = Field(..., description="Pod creation timestamp")
resource_usage: ResourceUsage = Field(..., description="Resource usage metrics")
class PodStatusResponse(BaseModel):
"""Response model for pod status endpoint"""
pods: List[Pod] = Field(..., description="List of pods")
class DeploymentStatus(str, Enum):
"""Deployment status enumeration"""
HEALTHY = "Healthy"
DEGRADED = "Degraded"
FAILED = "Failed"
class Deployment(BaseModel):
"""Deployment information model"""
name: str = Field(..., description="Deployment name", example="web-app-deployment")
namespace: str = Field(
..., description="Kubernetes namespace", example="production"
)
replicas: int = Field(..., description="Desired number of replicas", example=3)
available_replicas: int = Field(
..., description="Number of available replicas", example=2
)
unavailable_replicas: int = Field(
..., description="Number of unavailable replicas", example=1
)
status: DeploymentStatus = Field(..., description="Deployment status")
class DeploymentStatusResponse(BaseModel):
"""Response model for deployment status endpoint"""
deployments: List[Deployment] = Field(..., description="List of deployments")
class EventType(str, Enum):
"""Event type enumeration"""
NORMAL = "Normal"
WARNING = "Warning"
ERROR = "Error"
class Event(BaseModel):
"""Kubernetes event model"""
type: EventType = Field(..., description="Event type")
reason: str = Field(..., description="Event reason", example="FailedScheduling")
object: str = Field(
...,
description="Kubernetes object reference",
example="pod/web-app-deployment-5c8d7f9b6d-k2n8p",
)
message: str = Field(
...,
description="Event message",
example="0/3 nodes are available: 3 Insufficient memory",
)
timestamp: str = Field(..., description="Event timestamp")
namespace: str = Field(
..., description="Kubernetes namespace", example="production"
)
count: int = Field(..., description="Number of occurrences", example=5)
class EventsResponse(BaseModel):
"""Response model for events endpoint"""
events: List[Event] = Field(..., description="List of events")
class ErrorResponse(BaseModel):
"""Error response model"""
error: str = Field(..., description="Error message")
detail: Optional[str] = Field(None, description="Detailed error information")
@app.get("/pods/status", response_model=PodStatusResponse)
async def get_pod_status(
namespace: Optional[str] = Query(
None, description="Kubernetes namespace to filter pods"
),
pod_name: Optional[str] = Query(None, description="Specific pod name to retrieve"),
api_key: str = Depends(_validate_api_key),
):
"""
Retrieve pod information from the Kubernetes cluster.
This endpoint provides detailed information about pods including their status,
resource usage, and location within the cluster. Results can be filtered by
namespace and specific pod name.
Args:
namespace: Optional Kubernetes namespace to filter pods
pod_name: Optional specific pod name to retrieve
api_key: Required API key for authentication
Returns:
PodStatusResponse: List of pods with detailed status information
Raises:
HTTPException: 401 if API key is invalid
HTTPException: 500 if data retrieval fails
"""
try:
with open(DATA_PATH / "pods.json", "r") as f:
data = json.load(f)
pods = data.get("pods", [])
# Filter by namespace if provided
if namespace:
pods = [p for p in pods if p.get("namespace") == namespace]
# Filter by pod name if provided
if pod_name:
pods = [p for p in pods if p.get("name") == pod_name]
return PodStatusResponse(pods=pods)
except Exception as e:
logging.error(f"Error retrieving pod status: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/deployments/status", response_model=DeploymentStatusResponse)
async def get_deployment_status(
namespace: Optional[str] = Query(None, description="Kubernetes namespace"),
deployment_name: Optional[str] = Query(
None, description="Specific deployment name"
),
api_key: str = Depends(_validate_api_key),
):
"""
Check deployment health and replica status.
This endpoint provides comprehensive information about deployments including
their current status, replica counts, and health metrics. Results can be
filtered by namespace and specific deployment name.
Args:
namespace: Optional Kubernetes namespace to filter deployments
deployment_name: Optional specific deployment name to retrieve
api_key: Required API key for authentication
Returns:
DeploymentStatusResponse: List of deployments with health status
Raises:
HTTPException: 401 if API key is invalid
HTTPException: 500 if data retrieval fails
"""
try:
with open(DATA_PATH / "deployments.json", "r") as f:
data = json.load(f)
deployments = data.get("deployments", [])
if namespace:
deployments = [d for d in deployments if d.get("namespace") == namespace]
if deployment_name:
deployments = [d for d in deployments if d.get("name") == deployment_name]
return DeploymentStatusResponse(deployments=deployments)
except Exception as e:
logging.error(f"Error retrieving deployment status: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/events", response_model=EventsResponse)
async def get_cluster_events(
since: Optional[str] = Query(
None, description="Filter events since this timestamp"
),
severity: Optional[str] = Query(
None,
enum=["Warning", "Error", "Normal"],
description="Filter by event severity",
),
api_key: str = Depends(_validate_api_key),
):
"""
Fetch recent Kubernetes cluster events.
This endpoint retrieves cluster events with filtering capabilities by timestamp
and severity level. Events provide insights into cluster operations, scheduling
decisions, and potential issues.
Args:
since: Optional ISO 8601 timestamp to filter events from
severity: Optional severity filter (Warning, Error, Normal)
api_key: Required API key for authentication
Returns:
EventsResponse: List of cluster events with timestamps and details
Raises:
HTTPException: 401 if API key is invalid
HTTPException: 500 if data retrieval fails
"""
try:
with open(DATA_PATH / "events.json", "r") as f:
data = json.load(f)
events = data.get("events", [])
if severity:
events = [e for e in events if e.get("type") == severity]
# Filter by since timestamp
events = _filter_events_by_time(events, since)
return EventsResponse(events=events)
except Exception as e:
logging.error(f"Error retrieving cluster events: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/resource_usage")
async def get_resource_usage(
namespace: Optional[str] = Query(None, description="Filter by namespace"),
resource_type: Optional[str] = Query(
None, enum=["cpu", "memory", "pods"], description="Type of resource to monitor"
),
api_key: str = Depends(_validate_api_key),
):
"""
Monitor cluster resource consumption and utilization.
This endpoint provides detailed metrics about resource usage across the cluster,
including CPU, memory, and pod consumption. Data can be filtered by namespace
and specific resource types.
Args:
namespace: Optional namespace to filter resource usage data
resource_type: Optional resource type filter (cpu, memory, pods)
api_key: Required API key for authentication
Returns:
Dict: Resource usage metrics with cluster and namespace breakdowns
Raises:
HTTPException: 401 if API key is invalid
HTTPException: 500 if data retrieval fails
"""
try:
with open(DATA_PATH / "resource_usage.json", "r") as f:
data = json.load(f)
resource_usage = data.get("resource_usage", {})
# Filter by namespace if provided
if namespace and "namespace_usage" in resource_usage:
namespace_data = resource_usage["namespace_usage"].get(namespace, {})
if resource_type:
return {
"resource_usage": {resource_type: namespace_data.get(resource_type)}
}
return {"resource_usage": {"namespace": namespace, "usage": namespace_data}}
return {"resource_usage": resource_usage}
except Exception as e:
logging.error(f"Error retrieving resource usage: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/nodes/status")
async def get_node_status(
node_name: Optional[str] = Query(None, description="Specific node name"),
api_key: str = Depends(_validate_api_key),
):
"""
Check cluster node health and status.
This endpoint provides comprehensive information about cluster nodes including
their health status, capacity, allocatable resources, and current usage.
Results can be filtered by specific node name.
Args:
node_name: Optional specific node name to retrieve
api_key: Required API key for authentication
Returns:
Dict: Node status information with health and resource metrics
Raises:
HTTPException: 401 if API key is invalid
HTTPException: 500 if data retrieval fails
"""
try:
with open(DATA_PATH / "nodes.json", "r") as f:
data = json.load(f)
nodes = data.get("nodes", [])
if node_name:
nodes = [n for n in nodes if n.get("name") == node_name]
return {"nodes": nodes}
except Exception as e:
logging.error(f"Error retrieving node status: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/")
async def health_check(api_key: str = Depends(_validate_api_key)):
"""
Health check endpoint for the Kubernetes API service.
This endpoint provides a simple health check to verify that the API service
is running and accessible. It requires authentication like all other endpoints.
Args:
api_key: Required API key for authentication
Returns:
Dict: Service health status information
Raises:
HTTPException: 401 if API key is invalid
"""
return {"status": "healthy", "service": "k8s-api"}
if __name__ == "__main__":
import uvicorn
import sys
import argparse
from pathlib import Path
# Add parent directory to path to import config_utils
sys.path.append(str(Path(__file__).parent.parent))
from config_utils import get_server_port
parser = argparse.ArgumentParser(description="K8s API Server")
parser.add_argument(
"--host",
type=str,
required=True,
help="Host to bind to (REQUIRED - must match SSL certificate hostname if using SSL)",
)
parser.add_argument("--ssl-keyfile", type=str, help="Path to SSL private key file")
parser.add_argument("--ssl-certfile", type=str, help="Path to SSL certificate file")
parser.add_argument("--port", type=int, help="Port to bind to (overrides config)")
args = parser.parse_args()
port = args.port if args.port else get_server_port("k8s")
# Configure SSL if both cert files are provided
ssl_config = {}
if args.ssl_keyfile and args.ssl_certfile:
ssl_config = {
"ssl_keyfile": args.ssl_keyfile,
"ssl_certfile": args.ssl_certfile,
}
protocol = "HTTPS"
logging.warning(
f"⚠️ SSL CERTIFICATE HOSTNAME WARNING: Ensure your SSL certificate is valid for hostname '{args.host}'"
)
logging.warning(
f"⚠️ If using self-signed certificates, generate with: openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 365 -nodes -subj '/CN={args.host}'"
)
else:
protocol = "HTTP"
logging.info(f"Starting K8s server on {protocol}://{args.host}:{port}")
uvicorn.run(app, host=args.host, port=port, **ssl_config)