import json import time from typing import Dict, List, Optional, Union import boto3 from botocore.exceptions import ClientError LAMBDA_EXECUTION_ROLE_POLICY = ( "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" ) LAMBDA_RUNTIME = "python3.12" LAMBDA_HANDLER = "lambda_function_code.lambda_handler" LAMBDA_PACKAGE_TYPE = "Zip" IAM_TRUST_POLICY = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "lambda.amazonaws.com"}, "Action": "sts:AssumeRole", } ], } # AgentCore Gateway IAM Role constants GATEWAY_AGENTCORE_ROLE_NAME = "GatewaySearchAgentCoreRole" GATEWAY_AGENTCORE_TRUST_POLICY = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "bedrock-agentcore.amazonaws.com"}, "Action": "sts:AssumeRole", } ], } GATEWAY_AGENTCORE_POLICY_NAME = "BedrockAgentPolicy" # Cognito configuration constants COGNITO_POOL_NAME = "MCPServerPool" COGNITO_CLIENT_NAME = "MCPServerPoolClient" COGNITO_PASSWORD_MIN_LENGTH = 8 COGNITO_DEFAULT_USERNAME = "testuser" COGNITO_DEFAULT_TEMP_PASSWORD = "Temp123!" COGNITO_DEFAULT_PASSWORD = "MyPassword123!" COGNITO_AUTH_FLOWS = ["ALLOW_USER_PASSWORD_AUTH", "ALLOW_REFRESH_TOKEN_AUTH"] COGNITO_PASSWORD_POLICY = { "PasswordPolicy": {"MinimumLength": COGNITO_PASSWORD_MIN_LENGTH} } def _format_error_message(error: ClientError) -> str: """Format error message from ClientError.""" return f"{error.response['Error']['Code']}-{error.response['Error']['Message']}" def _create_or_get_iam_role(iam_client, role_name: str) -> str: """Create IAM role or return existing role ARN.""" try: print("Creating IAM role for lambda function") response = iam_client.create_role( RoleName=role_name, AssumeRolePolicyDocument=json.dumps(IAM_TRUST_POLICY), Description="IAM role to be assumed by lambda function", ) role_arn = response["Role"]["Arn"] print("Attaching policy to the IAM role") iam_client.attach_role_policy( RoleName=role_name, PolicyArn=LAMBDA_EXECUTION_ROLE_POLICY, ) print(f"Role '{role_name}' created successfully: {role_arn}") return role_arn except ClientError as error: if error.response["Error"]["Code"] == "EntityAlreadyExists": response = iam_client.get_role(RoleName=role_name) role_arn = response["Role"]["Arn"] print(f"IAM role {role_name} already exists. Using the same ARN {role_arn}") return role_arn else: raise error def _create_or_get_lambda_function( lambda_client, function_name: str, role_arn: str, code: bytes ) -> str: """Create Lambda function or return existing function ARN.""" try: print("Creating lambda function") response = lambda_client.create_function( FunctionName=function_name, Role=role_arn, Runtime=LAMBDA_RUNTIME, Handler=LAMBDA_HANDLER, Code={"ZipFile": code}, Description="Lambda function example for Bedrock AgentCore Gateway", PackageType=LAMBDA_PACKAGE_TYPE, ) return response["FunctionArn"] except ClientError as error: if error.response["Error"]["Code"] == "ResourceConflictException": response = lambda_client.get_function(FunctionName=function_name) lambda_arn = response["Configuration"]["FunctionArn"] print( f"AWS Lambda function {function_name} already exists. Using the same ARN {lambda_arn}" ) return lambda_arn else: raise error def create_gateway_lambda( lambda_function_code_path: str, lambda_function_name: str ) -> Dict[str, Union[str, int]]: """Create AWS Lambda function with IAM role for AgentCore Gateway. Args: lambda_function_code_path: Path to the Lambda function code zip file lambda_function_name: Name for the Lambda function Returns: Dictionary with 'lambda_function_arn' and 'exit_code' keys """ session = boto3.Session() region = session.region_name lambda_client = boto3.client("lambda", region_name=region) iam_client = boto3.client("iam", region_name=region) role_name = f"{lambda_function_name}_lambda_iamrole" print("Reading code from zip file") with open(lambda_function_code_path, "rb") as f: lambda_function_code = f.read() try: role_arn = _create_or_get_iam_role(iam_client, role_name) time.sleep(20) try: lambda_arn = _create_or_get_lambda_function( lambda_client, lambda_function_name, role_arn, lambda_function_code ) except ClientError: lambda_arn = _create_or_get_lambda_function( lambda_client, lambda_function_name, role_arn, lambda_function_code ) return {"lambda_function_arn": lambda_arn, "exit_code": 0} except ClientError as error: error_message = _format_error_message(error) print(f"Error: {error_message}") return {"lambda_function_arn": error_message, "exit_code": 1} except Exception as error: print(f"Unexpected error: {str(error)}") return {"lambda_function_arn": str(error), "exit_code": 1} def _create_cognito_user_pool(cognito_client, pool_name: str) -> str: """Create Cognito User Pool and return pool ID.""" print(f"Creating Cognito User Pool: {pool_name}") response = cognito_client.create_user_pool( PoolName=pool_name, Policies=COGNITO_PASSWORD_POLICY ) pool_id = response["UserPool"]["Id"] print(f"User Pool created with ID: {pool_id}") return pool_id def _create_cognito_app_client(cognito_client, pool_id: str, client_name: str) -> str: """Create Cognito App Client and return client ID.""" print(f"Creating Cognito App Client: {client_name}") response = cognito_client.create_user_pool_client( UserPoolId=pool_id, ClientName=client_name, GenerateSecret=False, ExplicitAuthFlows=COGNITO_AUTH_FLOWS, ) client_id = response["UserPoolClient"]["ClientId"] print(f"App Client created with ID: {client_id}") return client_id def _create_cognito_user( cognito_client, pool_id: str, username: str, temp_password: str, permanent_password: str, ) -> None: """Create Cognito user with temporary password and set permanent password.""" print(f"Creating Cognito user: {username}") cognito_client.admin_create_user( UserPoolId=pool_id, Username=username, TemporaryPassword=temp_password, MessageAction="SUPPRESS", ) print(f"Setting permanent password for user: {username}") cognito_client.admin_set_user_password( UserPoolId=pool_id, Username=username, Password=permanent_password, Permanent=True, ) def _authenticate_user( cognito_client, client_id: str, username: str, password: str ) -> str: """Authenticate user and return access token.""" print(f"Authenticating user: {username}") auth_response = cognito_client.initiate_auth( ClientId=client_id, AuthFlow="USER_PASSWORD_AUTH", AuthParameters={"USERNAME": username, "PASSWORD": password}, ) return auth_response["AuthenticationResult"]["AccessToken"] def get_bearer_token( client_id: str, username: str, password: str, region: Optional[str] = None ) -> Optional[str]: """Get bearer token from existing Cognito User Pool. Args: client_id: Cognito App Client ID username: Username for authentication password: User password region: AWS region (if None, uses session default) Returns: Bearer token string or None if authentication fails """ if not region: session = boto3.Session() region = session.region_name cognito_client = boto3.client("cognito-idp", region_name=region) try: print(f"Authenticating user: {username}") auth_response = cognito_client.initiate_auth( ClientId=client_id, AuthFlow="USER_PASSWORD_AUTH", AuthParameters={"USERNAME": username, "PASSWORD": password}, ) bearer_token = auth_response["AuthenticationResult"]["AccessToken"] print(f"Bearer token obtained successfully") return bearer_token except ClientError as error: if error.response["Error"]["Code"] == "NotAuthorizedException": print(f"Authentication failed: Invalid credentials for user {username}") elif error.response["Error"]["Code"] == "UserNotFoundException": print(f"Authentication failed: User {username} not found") elif error.response["Error"]["Code"] == "ResourceNotFoundException": print(f"Authentication failed: Client ID {client_id} not found") else: error_message = _format_error_message(error) print(f"Cognito Client Error: {error_message}") return None except Exception as error: print(f"Unexpected error getting bearer token: {str(error)}") return None def create_gateway_iam_role( lambda_arns: List[str], role_name: str = GATEWAY_AGENTCORE_ROLE_NAME, policy_name: str = GATEWAY_AGENTCORE_POLICY_NAME, ) -> Optional[str]: """Create IAM role for AgentCore Gateway with Lambda invoke permissions. Args: lambda_arns: List of Lambda function ARNs to grant invoke permissions role_name: Name for the IAM role policy_name: Name for the inline policy Returns: Role ARN string or None if creation fails """ session = boto3.Session() region = session.region_name iam_client = boto3.client("iam", region_name=region) try: # Create the IAM role print(f"Creating IAM role: {role_name}") response = iam_client.create_role( RoleName=role_name, AssumeRolePolicyDocument=json.dumps(GATEWAY_AGENTCORE_TRUST_POLICY), Description="IAM role for AgentCore Gateway to invoke Lambda functions", ) role_arn = response["Role"]["Arn"] # Create the inline policy document policy_document = { "Version": "2012-10-17", "Statement": [ { "Sid": "InvokeFunction", "Effect": "Allow", "Action": "lambda:InvokeFunction", "Resource": lambda_arns, } ], } # Attach the inline policy print(f"Attaching policy: {policy_name}") iam_client.put_role_policy( RoleName=role_name, PolicyName=policy_name, PolicyDocument=json.dumps(policy_document), ) print(f"Gateway IAM role created successfully: {role_arn}") return role_arn except ClientError as error: if error.response["Error"]["Code"] == "EntityAlreadyExists": print(f"IAM role {role_name} already exists. Retrieving existing role...") response = iam_client.get_role(RoleName=role_name) role_arn = response["Role"]["Arn"] # Update the policy if role exists try: policy_document = { "Version": "2012-10-17", "Statement": [ { "Sid": "InvokeFunction", "Effect": "Allow", "Action": "lambda:InvokeFunction", "Resource": lambda_arns, } ], } iam_client.put_role_policy( RoleName=role_name, PolicyName=policy_name, PolicyDocument=json.dumps(policy_document), ) print(f"Updated policy for existing role: {role_arn}") except ClientError as policy_error: print( f"Warning: Could not update policy: {_format_error_message(policy_error)}" ) return role_arn else: error_message = _format_error_message(error) print(f"Error creating IAM role: {error_message}") return None except Exception as error: print(f"Unexpected error creating IAM role: {str(error)}") return None def _extract_function_name_from_arn(lambda_arn: str) -> str: """Extract function name from Lambda ARN. Args: lambda_arn: Lambda function ARN Returns: Function name extracted from ARN Example: arn:aws:lambda:us-east-1:123456789012:function:my-function -> my-function """ # ARN format: arn:aws:lambda:region:account:function:function-name if lambda_arn.startswith("arn:aws:lambda:"): return lambda_arn.split(":")[-1] else: # If it's already a function name, return as is return lambda_arn def delete_gateway_lambda(lambda_function_arn: str) -> bool: """Delete Lambda function and associated IAM role. Args: lambda_function_arn: ARN or name of the Lambda function to delete Returns: True if deletion successful, False otherwise """ session = boto3.Session() region = session.region_name lambda_client = boto3.client("lambda", region_name=region) iam_client = boto3.client("iam", region_name=region) # Extract function name from ARN lambda_function_name = _extract_function_name_from_arn(lambda_function_arn) role_name = f"{lambda_function_name}_lambda_iamrole" try: # Delete Lambda function (can use ARN or name) print(f"Deleting Lambda function: {lambda_function_name}") lambda_client.delete_function(FunctionName=lambda_function_arn) print(f"Lambda function {lambda_function_name} deleted successfully") # Delete IAM role and detach policies try: print(f"Detaching policies from IAM role: {role_name}") iam_client.detach_role_policy( RoleName=role_name, PolicyArn=LAMBDA_EXECUTION_ROLE_POLICY, ) print(f"Deleting IAM role: {role_name}") iam_client.delete_role(RoleName=role_name) print(f"IAM role {role_name} deleted successfully") except ClientError as role_error: if role_error.response["Error"]["Code"] == "NoSuchEntity": print(f"IAM role {role_name} not found, skipping") else: print( f"Warning: Could not delete IAM role: {_format_error_message(role_error)}" ) return True except ClientError as error: if error.response["Error"]["Code"] == "ResourceNotFoundException": print(f"Lambda function {lambda_function_name} not found") return False else: error_message = _format_error_message(error) print(f"Error deleting Lambda function: {error_message}") return False except Exception as error: print(f"Unexpected error deleting Lambda function: {str(error)}") return False def delete_gateway_iam_role( role_name: str = GATEWAY_AGENTCORE_ROLE_NAME, policy_name: str = GATEWAY_AGENTCORE_POLICY_NAME, ) -> bool: """Delete IAM role for AgentCore Gateway. Args: role_name: Name of the IAM role to delete policy_name: Name of the inline policy to delete Returns: True if deletion successful, False otherwise """ session = boto3.Session() region = session.region_name iam_client = boto3.client("iam", region_name=region) try: # Delete inline policy first print(f"Deleting inline policy: {policy_name}") iam_client.delete_role_policy( RoleName=role_name, PolicyName=policy_name, ) print(f"Inline policy {policy_name} deleted successfully") # Delete IAM role print(f"Deleting IAM role: {role_name}") iam_client.delete_role(RoleName=role_name) print(f"IAM role {role_name} deleted successfully") return True except ClientError as error: if error.response["Error"]["Code"] == "NoSuchEntity": print(f"IAM role {role_name} or policy {policy_name} not found") return False else: error_message = _format_error_message(error) print(f"Error deleting IAM role: {error_message}") return False except Exception as error: print(f"Unexpected error deleting IAM role: {str(error)}") return False def delete_cognito_user_pool( pool_name: str = COGNITO_POOL_NAME, username: str = COGNITO_DEFAULT_USERNAME, ) -> bool: """Delete Cognito User Pool and associated resources. Args: pool_name: Name of the Cognito User Pool to delete username: Username to delete from the pool Returns: True if deletion successful, False otherwise """ session = boto3.Session() region = session.region_name cognito_client = boto3.client("cognito-idp", region_name=region) try: # Find the User Pool by name print(f"Finding User Pool: {pool_name}") response = cognito_client.list_user_pools(MaxResults=50) pool_id = None for pool in response["UserPools"]: if pool["Name"] == pool_name: pool_id = pool["Id"] break if not pool_id: print(f"User Pool {pool_name} not found") return False # Delete user first try: print(f"Deleting user: {username}") cognito_client.admin_delete_user( UserPoolId=pool_id, Username=username, ) print(f"User {username} deleted successfully") except ClientError as user_error: if user_error.response["Error"]["Code"] == "UserNotFoundException": print(f"User {username} not found, skipping") else: print( f"Warning: Could not delete user: {_format_error_message(user_error)}" ) # Delete User Pool (this will also delete app clients) print(f"Deleting User Pool: {pool_name}") cognito_client.delete_user_pool(UserPoolId=pool_id) print(f"User Pool {pool_name} deleted successfully") return True except ClientError as error: error_message = _format_error_message(error) print(f"Error deleting Cognito User Pool: {error_message}") return False except Exception as error: print(f"Unexpected error deleting Cognito User Pool: {str(error)}") return False def setup_cognito_user_pool( pool_name: str = COGNITO_POOL_NAME, client_name: str = COGNITO_CLIENT_NAME, username: str = COGNITO_DEFAULT_USERNAME, temp_password: str = COGNITO_DEFAULT_TEMP_PASSWORD, permanent_password: str = COGNITO_DEFAULT_PASSWORD, ) -> Optional[Dict[str, str]]: """Set up Cognito User Pool with app client and test user. Args: pool_name: Name for the Cognito User Pool client_name: Name for the App Client username: Username for the test user temp_password: Temporary password for the test user permanent_password: Permanent password for the test user Returns: Dictionary with client_id and discovery_url or None if setup fails """ session = boto3.Session() region = session.region_name cognito_client = boto3.client("cognito-idp", region_name=region) try: pool_id = _create_cognito_user_pool(cognito_client, pool_name) client_id = _create_cognito_app_client(cognito_client, pool_id, client_name) _create_cognito_user( cognito_client, pool_id, username, temp_password, permanent_password ) discovery_url = f"https://cognito-idp.{region}.amazonaws.com/{pool_id}/.well-known/openid-configuration" # Output the required values print(f"Pool ID: {pool_id}") print(f"Discovery URL: {discovery_url}") print(f"Client ID: {client_id}") return { "client_id": client_id, "discovery_url": discovery_url, } except ClientError as error: error_message = _format_error_message(error) print(f"Cognito Client Error: {error_message}") return None except Exception as error: print(f"Unexpected error setting up Cognito: {str(error)}") return None