mirror of
https://github.com/awslabs/amazon-bedrock-agentcore-samples.git
synced 2025-09-08 20:50:46 +00:00
459 lines
21 KiB
Python
459 lines
21 KiB
Python
|
#!/usr/bin/env python3
|
||
|
"""
|
||
|
AWS Operations Agent Configuration Manager
|
||
|
Single source of truth for all project configuration
|
||
|
Replaces .env file with centralized JSON configuration
|
||
|
"""
|
||
|
import json
|
||
|
import os
|
||
|
from pathlib import Path
|
||
|
from typing import Dict, List, Any, Optional
|
||
|
|
||
|
class BedrockAgentCoreConfigManager:
|
||
|
"""Manages AWS Operations Agent Gateway configuration from centralized JSON file"""
|
||
|
|
||
|
def __init__(self, config_file: str = None):
|
||
|
"""Initialize configuration manager"""
|
||
|
if config_file:
|
||
|
self.config_file = Path(config_file)
|
||
|
else:
|
||
|
# Default to bedrock-agentcore-config.json in same directory as this script
|
||
|
self.config_file = Path(__file__).parent / "bedrock-agentcore-config.json"
|
||
|
|
||
|
self.config = self._load_config()
|
||
|
|
||
|
def _load_config(self) -> Dict[str, Any]:
|
||
|
"""Load configuration from JSON file"""
|
||
|
try:
|
||
|
with open(self.config_file, 'r') as f:
|
||
|
return json.load(f)
|
||
|
except FileNotFoundError:
|
||
|
raise FileNotFoundError(f"Configuration file not found: {self.config_file}")
|
||
|
except json.JSONDecodeError as e:
|
||
|
raise ValueError(f"Invalid JSON in configuration file: {str(e)}")
|
||
|
|
||
|
def validate_config(self) -> bool:
|
||
|
"""Validate that required configuration sections exist"""
|
||
|
required_sections = ['aws', 'bedrock_agentcore', 'okta', 'environments', 'tool_schemas']
|
||
|
|
||
|
for section in required_sections:
|
||
|
if section not in self.config:
|
||
|
print(f"❌ Missing required configuration section: {section}")
|
||
|
return False
|
||
|
|
||
|
# Validate active endpoint type is available
|
||
|
active_endpoint = self.config['bedrock_agentcore'].get('active_endpoint', 'beta_endpoints')
|
||
|
if active_endpoint not in self.config['bedrock_agentcore']:
|
||
|
available_keys = [k for k in self.config['bedrock_agentcore'].keys() if k.endswith('_endpoints')]
|
||
|
print(f"❌ Invalid active endpoint '{active_endpoint}'. Available endpoint keys: {', '.join(available_keys)}")
|
||
|
return False
|
||
|
|
||
|
# Validate environments have required fields
|
||
|
for env_name, env_config in self.config['environments'].items():
|
||
|
required_env_fields = ['aws_profile', 'aws_region', 'aws_account']
|
||
|
for field in required_env_fields:
|
||
|
if field not in env_config:
|
||
|
print(f"❌ Missing required field '{field}' in environment '{env_name}'")
|
||
|
return False
|
||
|
|
||
|
return True
|
||
|
|
||
|
# Project Configuration
|
||
|
def get_project_name(self) -> str:
|
||
|
"""Get project name"""
|
||
|
return self.config.get('project', {}).get('name', 'lambda-adaptor-bedrock-agentcore')
|
||
|
|
||
|
def get_default_environment(self) -> str:
|
||
|
"""Get default environment"""
|
||
|
return self.config.get('project', {}).get('default_environment', 'dev')
|
||
|
|
||
|
# AWS Configuration
|
||
|
def get_aws_config(self, environment: str = 'dev') -> Dict[str, str]:
|
||
|
"""Get AWS configuration for specified environment"""
|
||
|
env_config = self.config['environments'].get(environment, {})
|
||
|
|
||
|
return {
|
||
|
'profile': env_config.get('aws_profile', self.config['aws']['default_profile']),
|
||
|
'region': env_config.get('aws_region', self.config['aws']['default_region']),
|
||
|
'account': env_config.get('aws_account', self.config['aws']['default_account'])
|
||
|
}
|
||
|
|
||
|
def get_lambda_role_arn(self, environment: str = 'dev') -> str:
|
||
|
"""Get Lambda execution role ARN for environment"""
|
||
|
env_config = self.config['environments'].get(environment, {})
|
||
|
return env_config.get('lambda_role_arn', f"arn:aws:iam::{self.get_aws_config(environment)['account']}:role/lambda-execution-role")
|
||
|
|
||
|
# Bedrock AgentCore Configuration
|
||
|
def get_bedrock_agentcore_endpoints(self, endpoint_override: str = None) -> Dict[str, str]:
|
||
|
"""Get Bedrock AgentCore API endpoints based on active_endpoint setting or override"""
|
||
|
# Use override if provided, otherwise use active_endpoint from config
|
||
|
endpoint_key = endpoint_override or self.config['bedrock_agentcore'].get('active_endpoint', 'beta_endpoints')
|
||
|
|
||
|
# Return the endpoints for the specified key, fallback to beta_endpoints if not found
|
||
|
return self.config['bedrock_agentcore'].get(endpoint_key, self.config['bedrock_agentcore'].get('beta_endpoints', {}))
|
||
|
|
||
|
def get_active_endpoint_type(self) -> str:
|
||
|
"""Get the currently active endpoint type (clean name without '_endpoints')"""
|
||
|
active_endpoint = self.config['bedrock_agentcore'].get('active_endpoint', 'beta_endpoints')
|
||
|
# Remove '_endpoints' suffix for display purposes
|
||
|
return active_endpoint.replace('_endpoints', '') if active_endpoint.endswith('_endpoints') else active_endpoint
|
||
|
|
||
|
def get_available_endpoint_types(self) -> List[str]:
|
||
|
"""Get all available endpoint types from configuration (clean names without '_endpoints')"""
|
||
|
endpoint_types = []
|
||
|
for key in self.config['bedrock_agentcore'].keys():
|
||
|
if key.endswith('_endpoints'):
|
||
|
endpoint_type = key.replace('_endpoints', '')
|
||
|
endpoint_types.append(endpoint_type)
|
||
|
return sorted(endpoint_types)
|
||
|
|
||
|
def is_valid_endpoint_type(self, endpoint_type: str) -> bool:
|
||
|
"""Check if the given endpoint type is valid/available"""
|
||
|
# Handle both clean names (beta) and full keys (beta_endpoints)
|
||
|
if endpoint_type.endswith('_endpoints'):
|
||
|
return endpoint_type in self.config['bedrock_agentcore']
|
||
|
else:
|
||
|
return f"{endpoint_type}_endpoints" in self.config['bedrock_agentcore']
|
||
|
|
||
|
def get_bedrock_agentcore_service_name(self) -> str:
|
||
|
"""Get Bedrock AgentCore service name"""
|
||
|
return self.config['bedrock_agentcore'].get('service_name', 'bedrock-agentcore-control')
|
||
|
|
||
|
def get_bedrock_agentcore_role_arn(self, environment: str = 'dev') -> str:
|
||
|
"""Get Bedrock AgentCore Gateway role ARN for environment"""
|
||
|
env_config = self.config['environments'].get(environment, {})
|
||
|
aws_config = self.get_aws_config(environment)
|
||
|
role_name = env_config.get('bedrock_agentcore_role_name', f"{environment}-bedrock-agentcore-gateway-role")
|
||
|
return f"arn:aws:iam::{aws_config['account']}:role/{role_name}"
|
||
|
|
||
|
def get_mcp_endpoint_url(self, gateway_id: str = None, endpoint_override: str = None) -> str:
|
||
|
"""Get MCP endpoint URL from the active endpoint configuration"""
|
||
|
endpoints = self.get_bedrock_agentcore_endpoints(endpoint_override)
|
||
|
return endpoints.get('gateway_url', '')
|
||
|
|
||
|
def get_mcp_gateway_url(self, gateway_id: str = None, endpoint_override: str = None) -> str:
|
||
|
"""Get MCP gateway URL from the active endpoint configuration"""
|
||
|
return self.get_mcp_endpoint_url(gateway_id, endpoint_override)
|
||
|
|
||
|
def update_gateway_info_from_response(self, gateway_response: dict) -> bool:
|
||
|
"""
|
||
|
Update the active endpoint with gateway information from create_gateway response
|
||
|
|
||
|
Args:
|
||
|
gateway_response: The response from create_gateway API call
|
||
|
|
||
|
Returns:
|
||
|
bool: True if config was updated successfully
|
||
|
"""
|
||
|
try:
|
||
|
gateway_id = gateway_response.get('gatewayId')
|
||
|
gateway_url = gateway_response.get('gatewayUrl')
|
||
|
|
||
|
if not gateway_id or not gateway_url:
|
||
|
print("❌ Missing gatewayId or gatewayUrl in response")
|
||
|
return False
|
||
|
|
||
|
# Get the active endpoint type
|
||
|
active_endpoint = self.config['bedrock_agentcore'].get('active_endpoint', 'production_endpoints')
|
||
|
|
||
|
# Update the config
|
||
|
if active_endpoint in self.config['bedrock_agentcore']:
|
||
|
old_gateway_id = self.config['bedrock_agentcore'][active_endpoint].get('gateway_id', 'Not set')
|
||
|
old_gateway_url = self.config['bedrock_agentcore'][active_endpoint].get('gateway_url', 'Not set')
|
||
|
|
||
|
self.config['bedrock_agentcore'][active_endpoint]['gateway_id'] = gateway_id
|
||
|
self.config['bedrock_agentcore'][active_endpoint]['gateway_url'] = gateway_url
|
||
|
|
||
|
# Save the updated config back to file
|
||
|
with open(self.config_file, 'w') as f:
|
||
|
json.dump(self.config, f, indent=2)
|
||
|
|
||
|
print(f"\n📝 Config Updated:")
|
||
|
print(f" Active Endpoint: {active_endpoint}")
|
||
|
print(f" Old Gateway ID: {old_gateway_id}")
|
||
|
print(f" New Gateway ID: {gateway_id}")
|
||
|
print(f" Old Gateway URL: {old_gateway_url}")
|
||
|
print(f" New Gateway URL: {gateway_url}")
|
||
|
|
||
|
return True
|
||
|
else:
|
||
|
print(f"❌ Active endpoint '{active_endpoint}' not found in config")
|
||
|
return False
|
||
|
|
||
|
except Exception as e:
|
||
|
print(f"❌ Failed to update config: {str(e)}")
|
||
|
return False
|
||
|
|
||
|
def clear_gateway_info(self, gateway_id: str = None) -> bool:
|
||
|
"""
|
||
|
Clear gateway information from the active endpoint configuration
|
||
|
Called when a gateway is deleted
|
||
|
|
||
|
Args:
|
||
|
gateway_id: Optional gateway ID to verify we're clearing the right gateway
|
||
|
|
||
|
Returns:
|
||
|
bool: True if config was updated successfully
|
||
|
"""
|
||
|
try:
|
||
|
# Get the active endpoint type
|
||
|
active_endpoint = self.config['bedrock_agentcore'].get('active_endpoint', 'production_endpoints')
|
||
|
|
||
|
# Update the config
|
||
|
if active_endpoint in self.config['bedrock_agentcore']:
|
||
|
current_gateway_id = self.config['bedrock_agentcore'][active_endpoint].get('gateway_id', 'Not set')
|
||
|
current_gateway_url = self.config['bedrock_agentcore'][active_endpoint].get('gateway_url', 'Not set')
|
||
|
|
||
|
# If gateway_id is provided, verify it matches before clearing
|
||
|
if gateway_id and current_gateway_id != gateway_id:
|
||
|
print(f"⚠️ Gateway ID mismatch: config has '{current_gateway_id}', trying to clear '{gateway_id}'")
|
||
|
print(" Clearing anyway...")
|
||
|
|
||
|
# Remove gateway info from config
|
||
|
if 'gateway_id' in self.config['bedrock_agentcore'][active_endpoint]:
|
||
|
del self.config['bedrock_agentcore'][active_endpoint]['gateway_id']
|
||
|
if 'gateway_url' in self.config['bedrock_agentcore'][active_endpoint]:
|
||
|
del self.config['bedrock_agentcore'][active_endpoint]['gateway_url']
|
||
|
|
||
|
# Save the updated config back to file
|
||
|
with open(self.config_file, 'w') as f:
|
||
|
json.dump(self.config, f, indent=2)
|
||
|
|
||
|
print(f"\n📝 Config Cleared:")
|
||
|
print(f" Active Endpoint: {active_endpoint}")
|
||
|
print(f" Cleared Gateway ID: {current_gateway_id}")
|
||
|
print(f" Cleared Gateway URL: {current_gateway_url}")
|
||
|
|
||
|
return True
|
||
|
else:
|
||
|
print(f"❌ Active endpoint '{active_endpoint}' not found in config")
|
||
|
return False
|
||
|
|
||
|
except Exception as e:
|
||
|
print(f"❌ Failed to clear config: {str(e)}")
|
||
|
return False
|
||
|
|
||
|
# Okta Configuration
|
||
|
def get_okta_config(self) -> Dict[str, str]:
|
||
|
"""Get Okta configuration"""
|
||
|
return self.config['okta']
|
||
|
|
||
|
def get_okta_discovery_url(self) -> str:
|
||
|
"""Get Okta OIDC discovery URL"""
|
||
|
return self.config['okta']['discovery_url']
|
||
|
|
||
|
def get_okta_audience(self) -> str:
|
||
|
"""Get Okta audience"""
|
||
|
return self.config['okta']['audience']
|
||
|
|
||
|
def get_okta_authorizer_config(self) -> Dict[str, Any]:
|
||
|
"""Get Okta authorizer configuration for Bedrock AgentCore Gateway"""
|
||
|
okta_config = self.get_okta_config()
|
||
|
return {
|
||
|
"customJWTAuthorizer": {
|
||
|
"allowedAudience": [okta_config.get('audience', 'api://default')],
|
||
|
"discoveryUrl": okta_config.get('discovery_url', '')
|
||
|
}
|
||
|
}
|
||
|
|
||
|
# Bedrock Configuration
|
||
|
def get_bedrock_config(self) -> Dict[str, str]:
|
||
|
"""Get Bedrock configuration"""
|
||
|
return self.config.get('bedrock', {
|
||
|
'model_id': 'us.anthropic.claude-3-7-sonnet-20250219-v1:0',
|
||
|
'region': 'us-west-2'
|
||
|
})
|
||
|
|
||
|
def get_bedrock_model_id(self) -> str:
|
||
|
"""Get Bedrock model ID"""
|
||
|
return self.get_bedrock_config()['model_id']
|
||
|
|
||
|
def get_bedrock_region(self) -> str:
|
||
|
"""Get Bedrock region"""
|
||
|
return self.get_bedrock_config()['region']
|
||
|
|
||
|
# Lambda Configuration
|
||
|
def get_lambda_arn(self, environment: str = 'dev', function_name: str = None) -> str:
|
||
|
"""Get Lambda function ARN from config or generate it"""
|
||
|
env_config = self.config['environments'].get(environment, {})
|
||
|
|
||
|
# First try to get from config
|
||
|
if 'lambda_arn' in env_config:
|
||
|
return env_config['lambda_arn']
|
||
|
|
||
|
# Fallback to generating ARN if not in config
|
||
|
aws_config = self.get_aws_config(environment)
|
||
|
if not function_name:
|
||
|
# Use default naming convention
|
||
|
function_name = f"{environment}-bedrock-agentcore-hello-world"
|
||
|
|
||
|
return f"arn:aws:lambda:{aws_config['region']}:{aws_config['account']}:function:{function_name}"
|
||
|
|
||
|
def get_lambda_target_config(self, lambda_arn: str) -> Dict[str, Any]:
|
||
|
"""Get Lambda target configuration with tool schemas"""
|
||
|
return {
|
||
|
'mcp': {
|
||
|
'lambda': {
|
||
|
'lambdaArn': lambda_arn,
|
||
|
'toolSchema': {
|
||
|
'inlinePayload': self.get_tool_schemas()
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
def get_credential_provider_config(self) -> List[Dict[str, str]]:
|
||
|
"""Get credential provider configuration"""
|
||
|
return [{'credentialProviderType': 'GATEWAY_IAM_ROLE'}]
|
||
|
|
||
|
# Tool Schemas
|
||
|
def get_tool_schemas(self) -> List[Dict[str, Any]]:
|
||
|
"""Get all tool schemas"""
|
||
|
return self.config.get('tool_schemas', [])
|
||
|
|
||
|
def get_tool_schema(self, tool_name: str) -> Optional[Dict[str, Any]]:
|
||
|
"""Get specific tool schema by name"""
|
||
|
for tool in self.get_tool_schemas():
|
||
|
if tool.get('name') == tool_name:
|
||
|
return tool
|
||
|
return None
|
||
|
|
||
|
def get_tool_count(self) -> int:
|
||
|
"""Get total number of tools"""
|
||
|
return len(self.get_tool_schemas())
|
||
|
|
||
|
def get_tool_names(self) -> List[str]:
|
||
|
"""Get list of all tool names"""
|
||
|
return [tool.get('name', 'unknown') for tool in self.get_tool_schemas()]
|
||
|
|
||
|
# Environment Management
|
||
|
def get_environments(self) -> List[str]:
|
||
|
"""Get list of available environments"""
|
||
|
return list(self.config.get('environments', {}).keys())
|
||
|
|
||
|
def get_environment_config(self, environment: str) -> Dict[str, Any]:
|
||
|
"""Get full configuration for specific environment"""
|
||
|
return self.config['environments'].get(environment, {})
|
||
|
|
||
|
# Utility Methods
|
||
|
def get_gateway_description(self, environment: str = 'dev') -> str:
|
||
|
"""Generate gateway description"""
|
||
|
tool_count = self.get_tool_count()
|
||
|
project_name = self.get_project_name()
|
||
|
return f"{project_name.title()} Gateway for {environment} environment - {tool_count} tools available"
|
||
|
|
||
|
def get_target_description(self, environment: str = 'dev') -> str:
|
||
|
"""Generate target description"""
|
||
|
tool_count = self.get_tool_count()
|
||
|
tool_names = ', '.join(self.get_tool_names())
|
||
|
return f"Lambda Target for {environment} environment - {tool_count} tools: {tool_names}"
|
||
|
|
||
|
def print_config_summary(self, environment: str = 'dev'):
|
||
|
"""Print configuration summary"""
|
||
|
print(f"\n📋 Configuration Summary")
|
||
|
print("=" * 50)
|
||
|
print(f"Project: {self.get_project_name()}")
|
||
|
print(f"Environment: {environment}")
|
||
|
print(f"AWS Profile: {self.get_aws_config(environment)['profile']}")
|
||
|
print(f"AWS Region: {self.get_aws_config(environment)['region']}")
|
||
|
print(f"AWS Account: {self.get_aws_config(environment)['account']}")
|
||
|
available_endpoints = ', '.join(self.get_available_endpoint_types())
|
||
|
print(f"Bedrock AgentCore Endpoint: {self.get_bedrock_agentcore_endpoints()['control_plane']} (active: {self.get_active_endpoint_type()})")
|
||
|
print(f"Available Endpoints: {available_endpoints}")
|
||
|
print(f"Bedrock AgentCore Role: {self.get_bedrock_agentcore_role_arn(environment)}")
|
||
|
print(f"Lambda Role: {self.get_lambda_role_arn(environment)}")
|
||
|
print(f"Okta Domain: {self.get_okta_config()['domain']}")
|
||
|
print(f"Tools Available: {self.get_tool_count()}")
|
||
|
print(f"Tool Names: {', '.join(self.get_tool_names())}")
|
||
|
|
||
|
# Legacy .env compatibility methods (for migration)
|
||
|
def get_env_equivalent(self, env_var: str, environment: str = 'dev') -> Optional[str]:
|
||
|
"""Get configuration value equivalent to .env variable (for migration)"""
|
||
|
env_mapping = {
|
||
|
'AWS_REGION': lambda: self.get_aws_config(environment)['region'],
|
||
|
'AWS_ACCOUNT_ID': lambda: self.get_aws_config(environment)['account'],
|
||
|
'LAMBDA_ROLE_ARN': lambda: self.get_lambda_role_arn(environment),
|
||
|
'OKTA_DOMAIN': lambda: self.get_okta_config()['domain'],
|
||
|
'OKTA_CLIENT_ID': lambda: self.get_okta_config()['client_id'],
|
||
|
'OKTA_REDIRECT_URI': lambda: self.get_okta_config().get('redirect_uri'),
|
||
|
'OKTA_AUDIENCE': lambda: self.get_okta_config()['audience'],
|
||
|
'BEDROCK_MODEL_ID': lambda: self.get_bedrock_model_id(),
|
||
|
'BEDROCK_REGION': lambda: self.get_bedrock_region(),
|
||
|
'BEDROCK_AGENTCORE_ENDPOINT': lambda: self.get_bedrock_agentcore_endpoints().get('legacy_endpoint'),
|
||
|
'BEDROCK_AGENTCORE_SERVICE_NAME': lambda: self.get_bedrock_agentcore_service_name(),
|
||
|
'PROJECT_NAME': lambda: self.get_project_name(),
|
||
|
'ENVIRONMENT': lambda: environment
|
||
|
}
|
||
|
|
||
|
if env_var in env_mapping:
|
||
|
try:
|
||
|
return env_mapping[env_var]()
|
||
|
except (KeyError, TypeError):
|
||
|
return None
|
||
|
return None
|
||
|
|
||
|
def validate_required_settings(self, environment: str = 'dev') -> List[str]:
|
||
|
"""Validate required settings and return list of missing ones"""
|
||
|
missing = []
|
||
|
|
||
|
# Check Okta configuration
|
||
|
okta_config = self.get_okta_config()
|
||
|
if not okta_config.get('domain'):
|
||
|
missing.append('okta.domain')
|
||
|
if not okta_config.get('client_id'):
|
||
|
missing.append('okta.client_id')
|
||
|
|
||
|
# Check AWS configuration
|
||
|
aws_config = self.get_aws_config(environment)
|
||
|
if not aws_config.get('account'):
|
||
|
missing.append(f'environments.{environment}.aws_account')
|
||
|
|
||
|
# Check Lambda role
|
||
|
try:
|
||
|
self.get_lambda_role_arn(environment)
|
||
|
except:
|
||
|
missing.append(f'environments.{environment}.lambda_role_arn')
|
||
|
|
||
|
return missing
|
||
|
|
||
|
|
||
|
# Convenience function for easy import
|
||
|
def get_config_manager(config_file: str = None) -> BedrockAgentCoreConfigManager:
|
||
|
"""Get configuration manager instance"""
|
||
|
return BedrockAgentCoreConfigManager(config_file)
|
||
|
|
||
|
|
||
|
# For backward compatibility
|
||
|
def get_config(environment: str = "dev", config_file: str = None) -> Dict[str, Any]:
|
||
|
"""Get configuration for environment"""
|
||
|
manager = get_config_manager(config_file)
|
||
|
return {
|
||
|
'environment': environment,
|
||
|
'aws': manager.get_aws_config(environment),
|
||
|
'bedrock_agentcore': manager.get_bedrock_agentcore_endpoints(),
|
||
|
'okta': manager.get_okta_config(),
|
||
|
'tool_schemas': manager.get_tool_schemas()
|
||
|
}
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
# Test configuration manager
|
||
|
manager = BedrockAgentCoreConfigManager()
|
||
|
|
||
|
print("🧪 Testing Bedrock AgentCore Configuration Manager")
|
||
|
print("=" * 50)
|
||
|
|
||
|
# Validate configuration
|
||
|
manager.validate_config()
|
||
|
|
||
|
# Print summary
|
||
|
manager.print_config_summary("dev")
|
||
|
|
||
|
# Test .env compatibility
|
||
|
print(f"\n🔄 Testing .env compatibility:")
|
||
|
print(f"AWS_REGION: {manager.get_env_equivalent('AWS_REGION', 'dev')}")
|
||
|
print(f"OKTA_DOMAIN: {manager.get_env_equivalent('OKTA_DOMAIN', 'dev')}")
|
||
|
print(f"PROJECT_NAME: {manager.get_env_equivalent('PROJECT_NAME', 'dev')}")
|
||
|
|
||
|
print("\n✅ Configuration Manager Test Complete")
|