mirror of
https://github.com/awslabs/amazon-bedrock-agentcore-samples.git
synced 2025-09-08 20:50:46 +00:00
379 lines
14 KiB
Python
379 lines
14 KiB
Python
import json
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from fastapi import (
|
|
FastAPI,
|
|
Query,
|
|
Path as PathParam,
|
|
Header,
|
|
HTTPException,
|
|
Depends,
|
|
)
|
|
from fastapi.responses import JSONResponse
|
|
|
|
from retrieve_api_key import retrieve_api_key
|
|
|
|
# Configure logging with basicConfig
|
|
logging.basicConfig(
|
|
level=logging.INFO, # Set the log level to INFO
|
|
# Define log message format
|
|
format="%(asctime)s,p%(process)s,{%(filename)s:%(lineno)d},%(levelname)s,%(message)s",
|
|
)
|
|
|
|
app = FastAPI(title="DevOps Runbooks API", version="1.0.0")
|
|
|
|
DATA_PATH = Path(__file__).parent.parent / "data" / "runbooks_data"
|
|
|
|
# API Key for authentication
|
|
CREDENTIAL_PROVIDER_NAME = "sre-agent-api-key-credential-provider"
|
|
|
|
# Retrieve API key from credential provider at startup
|
|
try:
|
|
EXPECTED_API_KEY = retrieve_api_key(CREDENTIAL_PROVIDER_NAME)
|
|
if not EXPECTED_API_KEY:
|
|
logging.error("Failed to retrieve API key from credential provider")
|
|
raise RuntimeError("Cannot start server without valid API key from credential provider")
|
|
except Exception as e:
|
|
logging.error(f"Error retrieving API key: {e}")
|
|
raise RuntimeError(f"Cannot start server: {e}") from e
|
|
|
|
|
|
def _validate_api_key(x_api_key: str = Header(None, alias="X-API-Key")):
|
|
"""Validate API key from header"""
|
|
if not x_api_key or x_api_key != EXPECTED_API_KEY:
|
|
raise HTTPException(status_code=401, detail="Invalid or missing API key")
|
|
return x_api_key
|
|
|
|
|
|
@app.get("/runbooks/search")
|
|
async def search_runbooks(
|
|
incident_type: Optional[str] = Query(
|
|
None,
|
|
enum=["performance", "availability", "security", "deployment"],
|
|
description="Type of incident",
|
|
),
|
|
keyword: Optional[str] = Query(
|
|
None, description="Search keyword in runbook content"
|
|
),
|
|
severity: Optional[str] = Query(
|
|
None,
|
|
enum=["low", "medium", "high", "critical"],
|
|
description="Incident severity level",
|
|
),
|
|
api_key: str = Depends(_validate_api_key),
|
|
):
|
|
"""Search runbooks by incident type/keyword"""
|
|
try:
|
|
logging.info(
|
|
f"🔍 RUNBOOKS API: search_runbooks called - incident_type={incident_type}, keyword={keyword}, severity={severity}"
|
|
)
|
|
|
|
with open(DATA_PATH / "incident_playbooks.json", "r") as f:
|
|
data = json.load(f)
|
|
|
|
runbooks = data.get("playbooks", [])
|
|
original_count = len(runbooks)
|
|
|
|
if incident_type:
|
|
runbooks = [r for r in runbooks if r.get("incident_type") == incident_type]
|
|
logging.info(
|
|
f"📋 RUNBOOKS API: Filtered by incident_type '{incident_type}': {len(runbooks)} runbooks"
|
|
)
|
|
|
|
if severity:
|
|
runbooks = [r for r in runbooks if r.get("severity") == severity]
|
|
logging.info(
|
|
f"📋 RUNBOOKS API: Filtered by severity '{severity}': {len(runbooks)} runbooks"
|
|
)
|
|
|
|
if keyword:
|
|
runbooks = [
|
|
r
|
|
for r in runbooks
|
|
if keyword.lower() in r.get("title", "").lower()
|
|
or keyword.lower() in r.get("description", "").lower()
|
|
or any(keyword.lower() in step.lower() for step in r.get("steps", []))
|
|
]
|
|
logging.info(
|
|
f"📋 RUNBOOKS API: Filtered by keyword '{keyword}': {len(runbooks)} runbooks"
|
|
)
|
|
|
|
response_data = {"runbooks": runbooks}
|
|
|
|
# Log detailed response
|
|
logging.info(
|
|
f"📤 RUNBOOKS API: Returning {len(runbooks)} runbooks out of {original_count} total"
|
|
)
|
|
for i, runbook in enumerate(runbooks):
|
|
logging.info(
|
|
f" 📖 Runbook {i+1}: {runbook.get('title', 'No title')} (ID: {runbook.get('id', 'No ID')})"
|
|
)
|
|
steps = runbook.get("steps", [])
|
|
logging.info(f" Steps count: {len(steps)}")
|
|
for j, step in enumerate(steps[:3]): # Show first 3 steps for brevity
|
|
logging.info(f" Step {j+1}: {step}")
|
|
if len(steps) > 3:
|
|
logging.info(f" ... and {len(steps) - 3} more steps")
|
|
|
|
logging.info(
|
|
f"📋 RUNBOOKS API: Full response data: {json.dumps(response_data, indent=2)}"
|
|
)
|
|
return response_data
|
|
except Exception as e:
|
|
logging.error(f"❌ Error searching runbooks: {str(e)}")
|
|
return JSONResponse(status_code=500, content={"error": str(e)})
|
|
|
|
|
|
@app.get("/runbooks/playbook/{playbook_id}")
|
|
async def get_incident_playbook(
|
|
playbook_id: str = PathParam(..., description="Playbook ID"),
|
|
api_key: str = Depends(_validate_api_key),
|
|
):
|
|
"""Retrieve specific incident playbooks"""
|
|
try:
|
|
logging.info(
|
|
f"🔍 RUNBOOKS API: get_incident_playbook called for playbook_id='{playbook_id}'"
|
|
)
|
|
|
|
with open(DATA_PATH / "incident_playbooks.json", "r") as f:
|
|
data = json.load(f)
|
|
|
|
playbooks = data.get("playbooks", [])
|
|
|
|
for playbook in playbooks:
|
|
if playbook.get("id") == playbook_id:
|
|
logging.info(
|
|
f"📖 RUNBOOKS API: Found playbook '{playbook.get('title', 'No title')}'"
|
|
)
|
|
steps = playbook.get("steps", [])
|
|
logging.info(f"📝 RUNBOOKS API: Playbook has {len(steps)} steps:")
|
|
for i, step in enumerate(steps):
|
|
logging.info(f" Step {i+1}: {step}")
|
|
|
|
logging.info(
|
|
f"📤 RUNBOOKS API: Returning complete playbook data: {json.dumps(playbook, indent=2)}"
|
|
)
|
|
return playbook
|
|
|
|
logging.warning(f"❌ RUNBOOKS API: Playbook '{playbook_id}' not found")
|
|
return JSONResponse(status_code=404, content={"error": "Playbook not found"})
|
|
except Exception as e:
|
|
logging.error(f"❌ Error retrieving playbook: {str(e)}")
|
|
return JSONResponse(status_code=500, content={"error": str(e)})
|
|
|
|
|
|
@app.get("/runbooks/troubleshooting")
|
|
async def get_troubleshooting_guide(
|
|
category: Optional[str] = Query(
|
|
None,
|
|
enum=["kubernetes", "performance", "networking", "database"],
|
|
description="Troubleshooting category",
|
|
),
|
|
issue_type: Optional[str] = Query(None, description="Specific issue type"),
|
|
api_key: str = Depends(_validate_api_key),
|
|
):
|
|
"""Fetch step-by-step troubleshooting guides"""
|
|
try:
|
|
logging.info(
|
|
f"🔍 RUNBOOKS API: get_troubleshooting_guide called - category={category}, issue_type={issue_type}"
|
|
)
|
|
|
|
with open(DATA_PATH / "troubleshooting_guides.json", "r") as f:
|
|
data = json.load(f)
|
|
|
|
guides = data.get("guides", [])
|
|
original_count = len(guides)
|
|
|
|
if category:
|
|
guides = [g for g in guides if g.get("category") == category]
|
|
logging.info(
|
|
f"📋 RUNBOOKS API: Filtered by category '{category}': {len(guides)} guides"
|
|
)
|
|
|
|
if issue_type:
|
|
guides = [
|
|
g
|
|
for g in guides
|
|
if issue_type.lower() in g.get("title", "").lower()
|
|
or issue_type.lower() in g.get("id", "").lower()
|
|
]
|
|
logging.info(
|
|
f"📋 RUNBOOKS API: Filtered by issue_type '{issue_type}': {len(guides)} guides"
|
|
)
|
|
|
|
response_data = {"guides": guides}
|
|
|
|
# Log detailed response
|
|
logging.info(
|
|
f"📤 RUNBOOKS API: Returning {len(guides)} guides out of {original_count} total"
|
|
)
|
|
for i, guide in enumerate(guides):
|
|
logging.info(
|
|
f" 📖 Guide {i+1}: {guide.get('title', 'No title')} (ID: {guide.get('id', 'No ID')})"
|
|
)
|
|
steps = guide.get("steps", [])
|
|
logging.info(f" Steps count: {len(steps)}")
|
|
for j, step in enumerate(steps[:3]): # Show first 3 steps for brevity
|
|
logging.info(f" Step {j+1}: {step}")
|
|
if len(steps) > 3:
|
|
logging.info(f" ... and {len(steps) - 3} more steps")
|
|
|
|
logging.info(
|
|
f"📋 RUNBOOKS API: Full response data: {json.dumps(response_data, indent=2)}"
|
|
)
|
|
return response_data
|
|
except Exception as e:
|
|
logging.error(f"❌ Error retrieving troubleshooting guides: {str(e)}")
|
|
return JSONResponse(status_code=500, content={"error": str(e)})
|
|
|
|
|
|
@app.get("/runbooks/escalation")
|
|
async def get_escalation_procedures(
|
|
severity: Optional[str] = Query(
|
|
None,
|
|
enum=["low", "medium", "high", "critical"],
|
|
description="Incident severity",
|
|
),
|
|
incident_type: Optional[str] = Query(None, description="Type of incident"),
|
|
api_key: str = Depends(_validate_api_key),
|
|
):
|
|
"""Retrieve escalation procedures"""
|
|
try:
|
|
with open(DATA_PATH / "escalation_procedures.json", "r") as f:
|
|
data = json.load(f)
|
|
|
|
procedures = data.get("escalation_procedures", [])
|
|
|
|
if severity:
|
|
procedures = [p for p in procedures if p.get("severity") == severity]
|
|
|
|
if incident_type:
|
|
procedures = [
|
|
p
|
|
for p in procedures
|
|
if incident_type.lower() in p.get("title", "").lower()
|
|
or any(
|
|
incident_type.lower() in condition.lower()
|
|
for condition in p.get("trigger_conditions", [])
|
|
)
|
|
]
|
|
|
|
return {"escalation_procedures": procedures}
|
|
except Exception as e:
|
|
logging.error(f"Error retrieving escalation procedures: {str(e)}")
|
|
return JSONResponse(status_code=500, content={"error": str(e)})
|
|
|
|
|
|
@app.get("/runbooks/resolutions")
|
|
async def get_common_resolutions(
|
|
issue: str = Query(..., description="Issue or error type"),
|
|
service: Optional[str] = Query(None, description="Affected service"),
|
|
api_key: str = Depends(_validate_api_key),
|
|
):
|
|
"""Fetch common resolution steps"""
|
|
try:
|
|
logging.info(
|
|
f"🔍 RUNBOOKS API: get_common_resolutions called - issue='{issue}', service={service}"
|
|
)
|
|
|
|
with open(DATA_PATH / "common_resolutions.json", "r") as f:
|
|
data = json.load(f)
|
|
|
|
resolutions = data.get("resolutions", [])
|
|
original_count = len(resolutions)
|
|
|
|
# Filter by issue
|
|
matching_resolutions = []
|
|
for resolution in resolutions:
|
|
if (
|
|
issue.lower() in resolution.get("issue", "").lower()
|
|
or issue.lower() in resolution.get("id", "").lower()
|
|
or any(
|
|
issue.lower() in symptom.lower()
|
|
for symptom in resolution.get("symptoms", [])
|
|
)
|
|
):
|
|
matching_resolutions.append(resolution)
|
|
|
|
logging.info(
|
|
f"📋 RUNBOOKS API: Found {len(matching_resolutions)} matching resolutions for issue '{issue}'"
|
|
)
|
|
|
|
# If service specified, prioritize resolutions that mention the service
|
|
if service and matching_resolutions:
|
|
# This is a simple implementation - in real world might have service-specific resolutions
|
|
logging.info(
|
|
f"📋 RUNBOOKS API: Service filter '{service}' applied (basic implementation)"
|
|
)
|
|
|
|
response_data = {"resolutions": matching_resolutions}
|
|
|
|
# Log detailed response
|
|
logging.info(
|
|
f"📤 RUNBOOKS API: Returning {len(matching_resolutions)} resolutions out of {original_count} total"
|
|
)
|
|
for i, resolution in enumerate(matching_resolutions):
|
|
logging.info(
|
|
f" 📖 Resolution {i+1}: {resolution.get('issue', 'No issue title')} (ID: {resolution.get('id', 'No ID')})"
|
|
)
|
|
steps = resolution.get("steps", [])
|
|
logging.info(f" Steps count: {len(steps)}")
|
|
for j, step in enumerate(steps[:3]): # Show first 3 steps for brevity
|
|
logging.info(f" Step {j+1}: {step}")
|
|
if len(steps) > 3:
|
|
logging.info(f" ... and {len(steps) - 3} more steps")
|
|
|
|
logging.info(
|
|
f"📋 RUNBOOKS API: Full response data: {json.dumps(response_data, indent=2)}"
|
|
)
|
|
return response_data
|
|
except Exception as e:
|
|
logging.error(f"❌ Error retrieving common resolutions: {str(e)}")
|
|
return JSONResponse(status_code=500, content={"error": str(e)})
|
|
|
|
|
|
@app.get("/")
|
|
async def health_check(api_key: str = Depends(_validate_api_key)):
|
|
"""Health check endpoint"""
|
|
return {"status": "healthy", "service": "runbooks-api"}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
import sys
|
|
import argparse
|
|
from pathlib import Path
|
|
|
|
# Add parent directory to path to import config_utils
|
|
sys.path.append(str(Path(__file__).parent.parent))
|
|
from config_utils import get_server_port
|
|
|
|
parser = argparse.ArgumentParser(description="Runbooks API Server")
|
|
parser.add_argument("--host", type=str, required=True,
|
|
help="Host to bind to (REQUIRED - must match SSL certificate hostname if using SSL)")
|
|
parser.add_argument("--ssl-keyfile", type=str, help="Path to SSL private key file")
|
|
parser.add_argument("--ssl-certfile", type=str, help="Path to SSL certificate file")
|
|
parser.add_argument("--port", type=int, help="Port to bind to (overrides config)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
port = args.port if args.port else get_server_port("runbooks")
|
|
|
|
# Configure SSL if both cert files are provided
|
|
ssl_config = {}
|
|
if args.ssl_keyfile and args.ssl_certfile:
|
|
ssl_config = {
|
|
"ssl_keyfile": args.ssl_keyfile,
|
|
"ssl_certfile": args.ssl_certfile
|
|
}
|
|
protocol = "HTTPS"
|
|
logging.warning(f"⚠️ SSL CERTIFICATE HOSTNAME WARNING: Ensure your SSL certificate is valid for hostname '{args.host}'")
|
|
logging.warning(f"⚠️ If using self-signed certificates, generate with: openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 365 -nodes -subj '/CN={args.host}'")
|
|
else:
|
|
protocol = "HTTP"
|
|
|
|
logging.info(f"Starting Runbooks server on {protocol}://{args.host}:{port}")
|
|
uvicorn.run(app, host=args.host, port=port, **ssl_config)
|