Amit Arora 0172468fd4 Update SRE Agent with AgentCore migration and SSL support
- Replace all previous product references with AgentCore terminology
- Add SSL certificate support to backend servers with mandatory host parameter
- Update OpenAPI specs to use HTTPS with configurable domain placeholders
- Add comprehensive SSL setup documentation in README
- Update URLs from development to production endpoints
- Add EC2 instance metadata commands for IP retrieval
- Include sed command for bulk domain replacement in OpenAPI specs
2025-07-15 01:37:38 +00:00

172 lines
5.3 KiB
Python

#!/usr/bin/env python3
"""
Cognito Token Generator
This script generates OAuth2 access tokens from Amazon Cognito using client credentials
and saves them to a .access_token file for use with AgentCore Gateway.
"""
import argparse
import os
import logging
from pathlib import Path
from typing import Dict, Any
import requests
import dotenv
# Configure logging with basicConfig
logging.basicConfig(
level=logging.INFO, # Set the log level to INFO
# Define log message format
format="%(asctime)s,p%(process)s,{%(filename)s:%(lineno)d},%(levelname)s,%(message)s",
)
def _get_cognito_token(
cognito_domain_url: str,
client_id: str,
client_secret: str,
audience: str = "MCPGateway",
) -> Dict[str, Any]:
"""
Get OAuth2 token from Amazon Cognito or Auth0 using client credentials grant type.
Args:
cognito_domain_url: The full Cognito/Auth0 domain URL
client_id: The App Client ID
client_secret: The App Client Secret
audience: The audience for the token (default: MCPGateway)
Returns:
Token response containing access_token, expires_in, token_type
"""
# Construct the token endpoint URL
if "auth0.com" in cognito_domain_url:
url = f"{cognito_domain_url.rstrip('/')}/oauth/token"
# Use JSON format for Auth0
headers = {"Content-Type": "application/json"}
data = {
"client_id": client_id,
"client_secret": client_secret,
"audience": audience,
"grant_type": "client_credentials",
"scope": "invoke:gateway",
}
# Send as JSON for Auth0
response_method = lambda: requests.post(url, headers=headers, json=data)
else:
# Cognito format
url = f"{cognito_domain_url.rstrip('/')}/oauth2/token"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
}
# Send as form data for Cognito
response_method = lambda: requests.post(url, headers=headers, data=data)
try:
# Make the request
response = response_method()
response.raise_for_status() # Raise exception for bad status codes
provider_type = "Auth0" if "auth0.com" in cognito_domain_url else "Cognito"
logging.info(f"Successfully obtained {provider_type} access token")
return response.json()
except requests.exceptions.RequestException as e:
logging.error(f"Error getting token: {e}")
if hasattr(response, "text") and response.text:
logging.error(f"Response: {response.text}")
raise
def _save_access_token(
token_response: Dict[str, Any], output_file: str = ".access_token"
) -> None:
"""
Save the access token to a file.
Args:
token_response: Token response from Cognito
output_file: Output file path
"""
access_token = token_response["access_token"]
Path(output_file).write_text(access_token)
logging.info(f"Access token saved to {output_file}")
logging.info(
f"Token expires in {token_response.get('expires_in', 'unknown')} seconds"
)
def generate_and_save_token(audience: str = "MCPGateway") -> None:
"""
Generate Cognito token using environment variables and save to file.
Args:
audience: The audience for the token (default: MCPGateway)
"""
# Load environment variables from .env file
dotenv.load_dotenv()
# Get required environment variables
cognito_domain_url = os.environ.get("COGNITO_DOMAIN")
client_id = os.environ.get("COGNITO_CLIENT_ID")
client_secret = os.environ.get("COGNITO_CLIENT_SECRET")
# Validate that all required variables are present
if not all([cognito_domain_url, client_id, client_secret]):
missing_vars = []
if not cognito_domain_url:
missing_vars.append("COGNITO_DOMAIN")
if not client_id:
missing_vars.append("COGNITO_CLIENT_ID")
if not client_secret:
missing_vars.append("COGNITO_CLIENT_SECRET")
logging.error(
f"Missing required environment variables: {', '.join(missing_vars)}"
)
logging.error("Please set these variables in your .env file")
raise ValueError(f"Missing environment variables: {', '.join(missing_vars)}")
logging.info("Generating Cognito access token from environment variables...")
# Generate token
token_response = _get_cognito_token(
cognito_domain_url=cognito_domain_url,
client_id=client_id,
client_secret=client_secret,
audience=audience,
)
# Save token to file
_save_access_token(token_response)
logging.info("Token generation completed successfully!")
def main():
"""Main function to generate and save Cognito token."""
parser = argparse.ArgumentParser(
description="Generate OAuth2 access tokens from Cognito/Auth0"
)
parser.add_argument(
"--audience", default="MCPGateway", help="Token audience (default: MCPGateway)"
)
args = parser.parse_args()
try:
generate_and_save_token(audience=args.audience)
except Exception as e:
logging.error(f"Token generation failed: {e}")
exit(1)
if __name__ == "__main__":
main()