185 lines
7.8 KiB
Python
Raw Permalink Normal View History

"""
Strands-based MCP Client for BAC Gateway
Replaces custom MCP client with Strands SDK patterns
"""
import os
import logging
import jwt
from typing import Dict, Any, List, Optional
from strands.tools.mcp.mcp_client import MCPClient
from mcp.client.streamable_http import streamablehttp_client
logger = logging.getLogger(__name__)
class StrandsMCPClient:
"""
Strands-based MCP client for BAC Gateway
Uses Strands SDK patterns for MCP communication
"""
def __init__(self, gateway_url: Optional[str] = None):
# BAC Gateway URL - must be provided dynamically, no environment fallback
self.gateway_url = gateway_url
if not self.gateway_url:
logger.warning("BAC Gateway URL not provided - MCP tools will be unavailable until URL is set")
self._ready = False
else:
self._ready = True
logger.info(f"StrandsMCPClient initialized with Gateway URL: {self.gateway_url}")
self.auth_token = None
self.mcp_client = None
self._tools_cache = []
self.jwt_signature_secret = os.environ.get("JWT_SIGNATURE_SECRET", "default-secret")
def update_gateway_url(self, gateway_url: str):
"""Update the BAC Gateway URL dynamically"""
if gateway_url and gateway_url != self.gateway_url:
logger.info(f"Updating BAC Gateway URL from {self.gateway_url} to {gateway_url}")
self.gateway_url = gateway_url
self._ready = True
# Reset client to force re-initialization with new URL
self.mcp_client = None
self._tools_cache = []
elif gateway_url == self.gateway_url:
logger.debug("Gateway URL unchanged, no update needed")
else:
logger.warning("Empty gateway URL provided, ignoring update")
async def initialize(self):
"""Initialize Strands MCP client"""
try:
logger.info(f"Initializing Strands MCP client with BAC Gateway: {self.gateway_url}")
self._ready = True
logger.info("Strands MCP client initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Strands MCP client: {str(e)}")
def is_ready(self) -> bool:
"""Check if MCP client is ready"""
return self._ready
async def set_auth_token(self, token: str):
"""Set OAuth authentication token and create MCP client"""
try:
self.auth_token = token
logger.info(f"Setting auth token for Gateway URL: {self.gateway_url}")
if not self.gateway_url:
logger.error("Cannot create MCP client: Gateway URL not set")
return
# Create Strands MCP client with same authentication as direct MCP calls
# Use only Bearer token, no JWT token to match working direct calls
self.mcp_client = MCPClient(lambda: streamablehttp_client(
url=self.gateway_url,
headers={
"Authorization": f"Bearer {self.auth_token}",
"Content-Type": "application/json",
"Accept": "application/json"
}
))
# Start the MCP client (synchronous method)
self.mcp_client.start()
# Fetch available tools
await self._fetch_tools()
logger.info(f"Strands MCP client connected to {self.gateway_url} with {len(self._tools_cache)} tools")
except Exception as e:
logger.error(f"Error setting auth token and creating MCP client: {str(e)}")
self.mcp_client = None
async def get_available_tools(self) -> List[Dict]:
"""Get list of available MCP tools"""
if not self.is_ready() or not self.mcp_client:
return []
return self._tools_cache
async def _fetch_tools(self):
"""Fetch available tools using Strands MCP client"""
try:
if not self.mcp_client:
logger.warning("MCP client not initialized")
return
# Use Strands MCP client to list tools (async version)
tools = await self.mcp_client.list_tools()
# Convert Strands tools to our expected format
self._tools_cache = []
for tool in tools:
try:
# Handle different tool attribute patterns
tool_name = getattr(tool, 'name', None) or getattr(tool, '_name', None) or str(tool)
tool_description = getattr(tool, 'description', None) or getattr(tool, '_description', None) or "No description available"
tool_schema = getattr(tool, 'inputSchema', None) or getattr(tool, 'input_schema', None) or {"type": "object"}
tool_def = {
"name": tool_name,
"description": tool_description,
"inputSchema": tool_schema
}
self._tools_cache.append(tool_def)
except Exception as tool_error:
logger.warning(f"Error processing tool {tool}: {str(tool_error)}")
continue
logger.info(f"Fetched {len(self._tools_cache)} tools from BAC Gateway via Strands")
except Exception as e:
logger.error(f"Error fetching tools via Strands: {str(e)}")
def get_mcp_tools_for_agent(self) -> List:
"""Get MCP tools in Strands format for agent integration"""
if not self.mcp_client:
logger.warning("MCP client not available for agent tools")
return []
try:
# Return Strands MCP tools directly
# The MCP client should provide tools that Strands Agent can use
tools = []
# Try to get tools from the MCP client
if hasattr(self.mcp_client, 'list_tools_sync'):
tools = self.mcp_client.list_tools_sync()
logger.info("Got tools using list_tools_sync method")
elif hasattr(self.mcp_client, 'tools'):
tools = self.mcp_client.tools
logger.info("Got tools using tools attribute")
else:
logger.warning("MCP client doesn't have expected tool methods")
return []
# Log detailed information about each tool
logger.info(f"Retrieved {len(tools)} tools from MCP client")
for i, tool in enumerate(tools[:5]): # Log first 5 tools
tool_name = getattr(tool, 'name', None) or getattr(tool, '_name', None) or str(tool)
logger.info(f"Tool {i}: name={tool_name}, type={type(tool).__name__}, length={len(tool_name)}")
if '___' in tool_name:
prefix, operation = tool_name.split('___', 1)
logger.info(f" - Tool name has prefix: prefix={prefix}, operation={operation}")
logger.info(f"Returning {len(tools)} Strands MCP tools for agent")
logger.debug(f"Tool types: {[type(tool).__name__ for tool in tools[:3]]}") # Log first 3 tool types
return tools
except Exception as e:
logger.error(f"Error getting MCP tools for agent: {str(e)}")
return []
async def close(self):
"""Close the MCP client"""
try:
if self.mcp_client:
# Strands MCP client cleanup
logger.info("Closing Strands MCP client")
# Note: Strands MCPClient may not have explicit close method
self.mcp_client = None
except Exception as e:
logger.error(f"Error closing Strands MCP client: {str(e)}")