mirror of
https://github.com/awslabs/amazon-bedrock-agentcore-samples.git
synced 2025-09-08 20:50:46 +00:00
- Complete serverless AI-powered AWS operations platform - Multi-Lambda architecture with Function URL deployment - Bedrock AgentCore Gateway integration with MCP protocol - 20 AWS service tools for comprehensive operations - Dual authentication: AWS SigV4 + Okta JWT - Natural language interface with streaming responses - DynamoDB conversation persistence - Docker-based MCP Tool Lambda with Strands framework - Production-ready with enterprise security patterns - Comprehensive documentation and setup guides - Read-only operations by default with write enablement guide - Interactive client with CLI interface - Complete Okta OAuth2 PKCE setup - Management scripts for gateway and target operations - Sanitized configuration with dummy data for public sharing
185 lines
7.8 KiB
Python
185 lines
7.8 KiB
Python
"""
|
|
Strands-based MCP Client for BAC Gateway
|
|
Replaces custom MCP client with Strands SDK patterns
|
|
"""
|
|
import os
|
|
import logging
|
|
import jwt
|
|
from typing import Dict, Any, List, Optional
|
|
from strands.tools.mcp.mcp_client import MCPClient
|
|
from mcp.client.streamable_http import streamablehttp_client
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class StrandsMCPClient:
|
|
"""
|
|
Strands-based MCP client for BAC Gateway
|
|
Uses Strands SDK patterns for MCP communication
|
|
"""
|
|
|
|
def __init__(self, gateway_url: Optional[str] = None):
|
|
# BAC Gateway URL - must be provided dynamically, no environment fallback
|
|
self.gateway_url = gateway_url
|
|
if not self.gateway_url:
|
|
logger.warning("BAC Gateway URL not provided - MCP tools will be unavailable until URL is set")
|
|
self._ready = False
|
|
else:
|
|
self._ready = True
|
|
logger.info(f"StrandsMCPClient initialized with Gateway URL: {self.gateway_url}")
|
|
|
|
self.auth_token = None
|
|
self.mcp_client = None
|
|
self._tools_cache = []
|
|
self.jwt_signature_secret = os.environ.get("JWT_SIGNATURE_SECRET", "default-secret")
|
|
|
|
def update_gateway_url(self, gateway_url: str):
|
|
"""Update the BAC Gateway URL dynamically"""
|
|
if gateway_url and gateway_url != self.gateway_url:
|
|
logger.info(f"Updating BAC Gateway URL from {self.gateway_url} to {gateway_url}")
|
|
self.gateway_url = gateway_url
|
|
self._ready = True
|
|
# Reset client to force re-initialization with new URL
|
|
self.mcp_client = None
|
|
self._tools_cache = []
|
|
elif gateway_url == self.gateway_url:
|
|
logger.debug("Gateway URL unchanged, no update needed")
|
|
else:
|
|
logger.warning("Empty gateway URL provided, ignoring update")
|
|
|
|
async def initialize(self):
|
|
"""Initialize Strands MCP client"""
|
|
try:
|
|
logger.info(f"Initializing Strands MCP client with BAC Gateway: {self.gateway_url}")
|
|
self._ready = True
|
|
logger.info("Strands MCP client initialized successfully")
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize Strands MCP client: {str(e)}")
|
|
|
|
def is_ready(self) -> bool:
|
|
"""Check if MCP client is ready"""
|
|
return self._ready
|
|
|
|
async def set_auth_token(self, token: str):
|
|
"""Set OAuth authentication token and create MCP client"""
|
|
try:
|
|
self.auth_token = token
|
|
logger.info(f"Setting auth token for Gateway URL: {self.gateway_url}")
|
|
|
|
if not self.gateway_url:
|
|
logger.error("Cannot create MCP client: Gateway URL not set")
|
|
return
|
|
|
|
# Create Strands MCP client with same authentication as direct MCP calls
|
|
# Use only Bearer token, no JWT token to match working direct calls
|
|
self.mcp_client = MCPClient(lambda: streamablehttp_client(
|
|
url=self.gateway_url,
|
|
headers={
|
|
"Authorization": f"Bearer {self.auth_token}",
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json"
|
|
}
|
|
))
|
|
|
|
# Start the MCP client (synchronous method)
|
|
self.mcp_client.start()
|
|
|
|
# Fetch available tools
|
|
await self._fetch_tools()
|
|
|
|
logger.info(f"Strands MCP client connected to {self.gateway_url} with {len(self._tools_cache)} tools")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error setting auth token and creating MCP client: {str(e)}")
|
|
self.mcp_client = None
|
|
|
|
async def get_available_tools(self) -> List[Dict]:
|
|
"""Get list of available MCP tools"""
|
|
if not self.is_ready() or not self.mcp_client:
|
|
return []
|
|
|
|
return self._tools_cache
|
|
|
|
async def _fetch_tools(self):
|
|
"""Fetch available tools using Strands MCP client"""
|
|
try:
|
|
if not self.mcp_client:
|
|
logger.warning("MCP client not initialized")
|
|
return
|
|
|
|
# Use Strands MCP client to list tools (async version)
|
|
tools = await self.mcp_client.list_tools()
|
|
|
|
# Convert Strands tools to our expected format
|
|
self._tools_cache = []
|
|
for tool in tools:
|
|
try:
|
|
# Handle different tool attribute patterns
|
|
tool_name = getattr(tool, 'name', None) or getattr(tool, '_name', None) or str(tool)
|
|
tool_description = getattr(tool, 'description', None) or getattr(tool, '_description', None) or "No description available"
|
|
tool_schema = getattr(tool, 'inputSchema', None) or getattr(tool, 'input_schema', None) or {"type": "object"}
|
|
|
|
tool_def = {
|
|
"name": tool_name,
|
|
"description": tool_description,
|
|
"inputSchema": tool_schema
|
|
}
|
|
self._tools_cache.append(tool_def)
|
|
except Exception as tool_error:
|
|
logger.warning(f"Error processing tool {tool}: {str(tool_error)}")
|
|
continue
|
|
|
|
logger.info(f"Fetched {len(self._tools_cache)} tools from BAC Gateway via Strands")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching tools via Strands: {str(e)}")
|
|
|
|
def get_mcp_tools_for_agent(self) -> List:
|
|
"""Get MCP tools in Strands format for agent integration"""
|
|
if not self.mcp_client:
|
|
logger.warning("MCP client not available for agent tools")
|
|
return []
|
|
|
|
try:
|
|
# Return Strands MCP tools directly
|
|
# The MCP client should provide tools that Strands Agent can use
|
|
tools = []
|
|
|
|
# Try to get tools from the MCP client
|
|
if hasattr(self.mcp_client, 'list_tools_sync'):
|
|
tools = self.mcp_client.list_tools_sync()
|
|
logger.info("Got tools using list_tools_sync method")
|
|
elif hasattr(self.mcp_client, 'tools'):
|
|
tools = self.mcp_client.tools
|
|
logger.info("Got tools using tools attribute")
|
|
else:
|
|
logger.warning("MCP client doesn't have expected tool methods")
|
|
return []
|
|
|
|
# Log detailed information about each tool
|
|
logger.info(f"Retrieved {len(tools)} tools from MCP client")
|
|
for i, tool in enumerate(tools[:5]): # Log first 5 tools
|
|
tool_name = getattr(tool, 'name', None) or getattr(tool, '_name', None) or str(tool)
|
|
logger.info(f"Tool {i}: name={tool_name}, type={type(tool).__name__}, length={len(tool_name)}")
|
|
if '___' in tool_name:
|
|
prefix, operation = tool_name.split('___', 1)
|
|
logger.info(f" - Tool name has prefix: prefix={prefix}, operation={operation}")
|
|
|
|
logger.info(f"Returning {len(tools)} Strands MCP tools for agent")
|
|
logger.debug(f"Tool types: {[type(tool).__name__ for tool in tools[:3]]}") # Log first 3 tool types
|
|
|
|
return tools
|
|
except Exception as e:
|
|
logger.error(f"Error getting MCP tools for agent: {str(e)}")
|
|
return []
|
|
|
|
async def close(self):
|
|
"""Close the MCP client"""
|
|
try:
|
|
if self.mcp_client:
|
|
# Strands MCP client cleanup
|
|
logger.info("Closing Strands MCP client")
|
|
# Note: Strands MCPClient may not have explicit close method
|
|
self.mcp_client = None
|
|
except Exception as e:
|
|
logger.error(f"Error closing Strands MCP client: {str(e)}")
|