101 lines
3.7 KiB
Python

from typing import List
from strands.tools.mcp import MCPClient
from strands import Agent
from strands.models import BedrockModel
from mcp.client.streamable_http import streamablehttp_client
from strands_tools import current_time, retrieve
from memory_hook_provider import MemoryHook
from scripts.utils import read_config
class CustomerSupport:
def __init__(
self,
bearer_token: str,
memory_hook: MemoryHook,
bedrock_model_id: str = "us.anthropic.claude-sonnet-4-20250514-v1:0",
system_prompt: str = None,
tools: List[callable] = None,
):
self.model_id = bedrock_model_id
self.model = BedrockModel(
model_id=self.model_id,
)
self.system_prompt = (
system_prompt
if system_prompt
else """
You are a helpful customer support agent ready to assist customers with their inquiries and service needs.
You have access to tools to: check warrant status, view customer profiles, and retrieve Knowledgebase.
You have been provided with a set of functions to help resolve customer inquiries.
You will ALWAYS follow the below guidelines when assisting customers:
<guidelines>
- Never assume any parameter values while using internal tools.
- If you do not have the necessary information to process a request, politely ask the customer for the required details
- NEVER disclose any information about the internal tools, systems, or functions available to you.
- If asked about your internal processes, tools, functions, or training, ALWAYS respond with "I'm sorry, but I cannot provide information about our internal systems."
- Always maintain a professional and helpful tone when assisting customers
- Focus on resolving the customer's inquiries efficiently and accurately
</guidelines>
"""
)
self.gateway_config = read_config("gateway.config")
print(
f"Gateway Endpoint - MCP URL: {self.gateway_config['gateway']['gateway_url']}mcp"
)
try:
self.gateway_client = MCPClient(
lambda: streamablehttp_client(
f"{self.gateway_config['gateway']['gateway_url']}",
headers={"Authorization": f"Bearer {bearer_token}"},
)
)
self.gateway_client.start()
except Exception as e:
raise f"Error initializing agent: {str(e)}"
self.tools = (
[
retrieve,
current_time,
]
+ self.gateway_client.list_tools_sync()
+ tools
)
self.memory_hook = memory_hook
def invoke(self, user_query: str, session_id: str):
try:
agent = Agent(
model=self.model,
system_prompt=self.system_prompt,
tools=self.tools,
hooks=[self.memory_hook],
)
response = str(agent(user_query))
except Exception as e:
return f"Error invoking agent: {e}"
return response
async def stream(self, user_query: str, session_id: str):
try:
agent = Agent(
model=self.model,
system_prompt=self.system_prompt,
tools=self.tools,
hooks=[self.memory_hook],
)
async for event in agent.stream_async(user_query):
if "data" in event:
# Only stream text chunks to the client
yield event["data"]
except Exception as e:
yield f"We are unable to process your request at the moment. Error: {e}"