mirror of
https://github.com/awslabs/amazon-bedrock-agentcore-samples.git
synced 2025-09-08 20:50:46 +00:00
- Replace all previous product references with AgentCore terminology - Add SSL certificate support to backend servers with mandatory host parameter - Update OpenAPI specs to use HTTPS with configurable domain placeholders - Add comprehensive SSL setup documentation in README - Update URLs from development to production endpoints - Add EC2 instance metadata commands for IP retrieval - Include sed command for bulk domain replacement in OpenAPI specs
172 lines
5.3 KiB
Python
172 lines
5.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Cognito Token Generator
|
|
|
|
This script generates OAuth2 access tokens from Amazon Cognito using client credentials
|
|
and saves them to a .access_token file for use with AgentCore Gateway.
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Dict, Any
|
|
|
|
import requests
|
|
import dotenv
|
|
|
|
|
|
# Configure logging with basicConfig
|
|
logging.basicConfig(
|
|
level=logging.INFO, # Set the log level to INFO
|
|
# Define log message format
|
|
format="%(asctime)s,p%(process)s,{%(filename)s:%(lineno)d},%(levelname)s,%(message)s",
|
|
)
|
|
|
|
|
|
def _get_cognito_token(
|
|
cognito_domain_url: str,
|
|
client_id: str,
|
|
client_secret: str,
|
|
audience: str = "MCPGateway",
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get OAuth2 token from Amazon Cognito or Auth0 using client credentials grant type.
|
|
|
|
Args:
|
|
cognito_domain_url: The full Cognito/Auth0 domain URL
|
|
client_id: The App Client ID
|
|
client_secret: The App Client Secret
|
|
audience: The audience for the token (default: MCPGateway)
|
|
|
|
Returns:
|
|
Token response containing access_token, expires_in, token_type
|
|
"""
|
|
# Construct the token endpoint URL
|
|
if "auth0.com" in cognito_domain_url:
|
|
url = f"{cognito_domain_url.rstrip('/')}/oauth/token"
|
|
# Use JSON format for Auth0
|
|
headers = {"Content-Type": "application/json"}
|
|
data = {
|
|
"client_id": client_id,
|
|
"client_secret": client_secret,
|
|
"audience": audience,
|
|
"grant_type": "client_credentials",
|
|
"scope": "invoke:gateway",
|
|
}
|
|
# Send as JSON for Auth0
|
|
response_method = lambda: requests.post(url, headers=headers, json=data)
|
|
else:
|
|
# Cognito format
|
|
url = f"{cognito_domain_url.rstrip('/')}/oauth2/token"
|
|
headers = {"Content-Type": "application/x-www-form-urlencoded"}
|
|
data = {
|
|
"grant_type": "client_credentials",
|
|
"client_id": client_id,
|
|
"client_secret": client_secret,
|
|
}
|
|
# Send as form data for Cognito
|
|
response_method = lambda: requests.post(url, headers=headers, data=data)
|
|
|
|
try:
|
|
# Make the request
|
|
response = response_method()
|
|
response.raise_for_status() # Raise exception for bad status codes
|
|
|
|
provider_type = "Auth0" if "auth0.com" in cognito_domain_url else "Cognito"
|
|
logging.info(f"Successfully obtained {provider_type} access token")
|
|
return response.json()
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
logging.error(f"Error getting token: {e}")
|
|
if hasattr(response, "text") and response.text:
|
|
logging.error(f"Response: {response.text}")
|
|
raise
|
|
|
|
|
|
def _save_access_token(
|
|
token_response: Dict[str, Any], output_file: str = ".access_token"
|
|
) -> None:
|
|
"""
|
|
Save the access token to a file.
|
|
|
|
Args:
|
|
token_response: Token response from Cognito
|
|
output_file: Output file path
|
|
"""
|
|
access_token = token_response["access_token"]
|
|
Path(output_file).write_text(access_token)
|
|
logging.info(f"Access token saved to {output_file}")
|
|
logging.info(
|
|
f"Token expires in {token_response.get('expires_in', 'unknown')} seconds"
|
|
)
|
|
|
|
|
|
def generate_and_save_token(audience: str = "MCPGateway") -> None:
|
|
"""
|
|
Generate Cognito token using environment variables and save to file.
|
|
|
|
Args:
|
|
audience: The audience for the token (default: MCPGateway)
|
|
"""
|
|
# Load environment variables from .env file
|
|
dotenv.load_dotenv()
|
|
|
|
# Get required environment variables
|
|
cognito_domain_url = os.environ.get("COGNITO_DOMAIN")
|
|
client_id = os.environ.get("COGNITO_CLIENT_ID")
|
|
client_secret = os.environ.get("COGNITO_CLIENT_SECRET")
|
|
|
|
# Validate that all required variables are present
|
|
if not all([cognito_domain_url, client_id, client_secret]):
|
|
missing_vars = []
|
|
if not cognito_domain_url:
|
|
missing_vars.append("COGNITO_DOMAIN")
|
|
if not client_id:
|
|
missing_vars.append("COGNITO_CLIENT_ID")
|
|
if not client_secret:
|
|
missing_vars.append("COGNITO_CLIENT_SECRET")
|
|
|
|
logging.error(
|
|
f"Missing required environment variables: {', '.join(missing_vars)}"
|
|
)
|
|
logging.error("Please set these variables in your .env file")
|
|
raise ValueError(f"Missing environment variables: {', '.join(missing_vars)}")
|
|
|
|
logging.info("Generating Cognito access token from environment variables...")
|
|
|
|
# Generate token
|
|
token_response = _get_cognito_token(
|
|
cognito_domain_url=cognito_domain_url,
|
|
client_id=client_id,
|
|
client_secret=client_secret,
|
|
audience=audience,
|
|
)
|
|
|
|
# Save token to file
|
|
_save_access_token(token_response)
|
|
|
|
logging.info("Token generation completed successfully!")
|
|
|
|
|
|
def main():
|
|
"""Main function to generate and save Cognito token."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Generate OAuth2 access tokens from Cognito/Auth0"
|
|
)
|
|
parser.add_argument(
|
|
"--audience", default="MCPGateway", help="Token audience (default: MCPGateway)"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
generate_and_save_token(audience=args.audience)
|
|
except Exception as e:
|
|
logging.error(f"Token generation failed: {e}")
|
|
exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|