mirror of
https://github.com/awslabs/amazon-bedrock-agentcore-samples.git
synced 2025-09-08 20:50:46 +00:00
* Data Analyst Assistant Solution with Amazon Bedrock AgentCore * Data Analyst Assistant Solution with Amazon Bedrock AgentCore * Data Analyst Assistant Solution with Amazon Bedrock AgentCore * Data Analyst Assistant Solution with Amazon Bedrock AgentCore * Data Analyst Assistant Solution with Amazon Bedrock AgentCore * Data Analyst Assistant Solution with Amazon Bedrock AgentCore * Data Analyst Assistant Solution with Amazon Bedrock AgentCore --------- Co-authored-by: Uriel Ramirez <beralfon@amazon.com>
229 lines
8.9 KiB
Python
229 lines
8.9 KiB
Python
"""
|
|
Strands Data Analyst Assistant - Main Application
|
|
|
|
This application provides a data analyst assistant powered by Amazon Bedrock and uses
|
|
the Amazon RDS Data API to execute SQL queries against an Aurora Serverless PostgreSQL database.
|
|
It leverages Bedrock Agent Core for agent functionality and memory management.
|
|
"""
|
|
|
|
import logging
|
|
import json
|
|
from uuid import uuid4
|
|
import os
|
|
|
|
# Bedrock Agent Core imports
|
|
from bedrock_agentcore import BedrockAgentCoreApp
|
|
from bedrock_agentcore.memory import MemoryClient
|
|
from strands import Agent, tool
|
|
from strands_tools import current_time
|
|
from strands.models import BedrockModel
|
|
|
|
# Custom module imports
|
|
from src.MemoryHookProvider import MemoryHookProvider
|
|
from src.tools import get_tables_information, load_file_content
|
|
from src.rds_data_api_utils import run_sql_query
|
|
from src.utils import save_raw_query_result, read_messages_by_session, save_agent_interactions
|
|
from src.ssm_utils import get_ssm_parameter
|
|
|
|
# Setup logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger("personal-agent")
|
|
|
|
# Read memory ID from SSM Parameter Store
|
|
try:
|
|
# Read memory ID from SSM
|
|
memory_id = get_ssm_parameter("MEMORY_ID")
|
|
|
|
# Check if memory ID is empty
|
|
if not memory_id or memory_id.strip() == "":
|
|
error_msg = "Memory ID from SSM is empty. Memory has not been created yet."
|
|
logger.error(error_msg)
|
|
raise ValueError(error_msg)
|
|
|
|
logger.info(f"Retrieved memory ID from SSM: {memory_id}")
|
|
|
|
# Initialize Memory Client
|
|
client = MemoryClient(region_name='us-west-2', environment="prod")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving memory ID from SSM: {e}")
|
|
raise # Re-raise the exception to stop execution
|
|
|
|
|
|
# Initialize the Bedrock Agent Core app
|
|
app = BedrockAgentCoreApp()
|
|
|
|
def load_system_prompt():
|
|
"""
|
|
Load the system prompt from the instructions.txt file.
|
|
|
|
This prompt defines the behavior and capabilities of the data analyst assistant.
|
|
If the file is not available, a fallback prompt is used.
|
|
|
|
Returns:
|
|
str: The system prompt to use for the data analyst assistant
|
|
"""
|
|
fallback_prompt = """You are a helpful Data Analyst Assistant who can help with data analysis tasks.
|
|
You can process data, interpret statistics, and provide insights based on data."""
|
|
return load_file_content("instructions.txt", default_content=fallback_prompt)
|
|
|
|
# Load the system prompt
|
|
DATA_ANALYST_SYSTEM_PROMPT = load_system_prompt()
|
|
|
|
def create_execute_sql_query_tool(user_prompt: str, prompt_uuid: str):
|
|
"""
|
|
Create a dynamic SQL query execution tool with session context.
|
|
|
|
This function creates a tool that can execute SQL queries against the Aurora database
|
|
using the RDS Data API. It also saves query results to DynamoDB for future reference.
|
|
|
|
Args:
|
|
user_prompt (str): The original user prompt/question
|
|
prompt_uuid (str): Unique identifier for tracking this interaction
|
|
|
|
Returns:
|
|
function: The configured SQL query execution tool
|
|
"""
|
|
@tool
|
|
def execute_sql_query(sql_query: str, description: str) -> str:
|
|
"""
|
|
Execute an SQL query against a database and return results for data analysis
|
|
|
|
Args:
|
|
sql_query: The SQL query to execute
|
|
description: Concise explanation of the SQL query
|
|
|
|
Returns:
|
|
str: JSON string containing the query results or error message
|
|
"""
|
|
try:
|
|
# Execute the SQL query using the RDS Data API function
|
|
response_json = json.loads(run_sql_query(sql_query))
|
|
|
|
# Check if there was an error
|
|
if "error" in response_json:
|
|
return json.dumps(response_json)
|
|
|
|
# Extract the results
|
|
records_to_return = response_json.get("result", [])
|
|
message = response_json.get("message", "")
|
|
|
|
# Prepare result object
|
|
if message != "":
|
|
result = {
|
|
"result": records_to_return,
|
|
"message": message
|
|
}
|
|
else:
|
|
result = {
|
|
"result": records_to_return
|
|
}
|
|
|
|
# Save query results to DynamoDB for future reference
|
|
save_result = save_raw_query_result(
|
|
prompt_uuid,
|
|
user_prompt,
|
|
sql_query,
|
|
description,
|
|
result,
|
|
message
|
|
)
|
|
|
|
if not save_result["success"]:
|
|
result["saved"] = False
|
|
result["save_error"] = save_result["error"]
|
|
|
|
return json.dumps(result)
|
|
|
|
except Exception as e:
|
|
return json.dumps({"error": f"Unexpected error: {str(e)}"})
|
|
|
|
return execute_sql_query
|
|
|
|
@app.entrypoint
|
|
async def agent_invocation(payload):
|
|
"""
|
|
Main handler for agent invocation with streaming response.
|
|
|
|
This function processes incoming requests, initializes the agent with appropriate tools,
|
|
streams the response back to the client, and saves conversation history.
|
|
|
|
Expected payload structure:
|
|
{
|
|
"prompt": "Your data analysis question",
|
|
"bedrock_model_id": "us.anthropic.claude-3-7-sonnet-20250219-v1:0",
|
|
"prompt_uuid": "optional-uuid",
|
|
"user_timezone": "US/Pacific",
|
|
"session_id": "optional-session-id",
|
|
"user_id": "optional-user-id",
|
|
"last_turns": "optional-number-of-conversation-turns"
|
|
}
|
|
|
|
Returns:
|
|
Generator: Yields response chunks for streaming
|
|
"""
|
|
try:
|
|
# Extract parameters from payload
|
|
user_message = payload.get("prompt", "No prompt found in input, please guide customer to create a json payload with prompt key")
|
|
bedrock_model_id = payload.get("bedrock_model_id", "us.anthropic.claude-3-7-sonnet-20250219-v1:0")
|
|
prompt_uuid = payload.get("prompt_uuid", str(uuid4()))
|
|
user_timezone = payload.get("user_timezone", "US/Pacific")
|
|
session_id = payload.get("session_id", str(uuid4()))
|
|
user_id = payload.get("user_id", "guest")
|
|
last_k_turns = int(payload.get("last_k_turns", 20))
|
|
|
|
print("Request received: ")
|
|
print(f"Prompt: {user_message}")
|
|
print(f"Prompt UUID: {prompt_uuid}")
|
|
print(f"User Timezone: {user_timezone}")
|
|
print(f"Session ID: {session_id}")
|
|
print(f"User ID: {user_id}")
|
|
print(f"Last K Turns: {last_k_turns}")
|
|
|
|
# Get conversation history from DynamoDB
|
|
message_history = read_messages_by_session(session_id)
|
|
starting_message_id = len(message_history)
|
|
print(f"Agent Interactions length: {len(message_history)}")
|
|
print(f"Agent Interactions: {message_history}")
|
|
|
|
# Create Bedrock model instance
|
|
bedrock_model = BedrockModel(model_id=bedrock_model_id)
|
|
|
|
# Prepare system prompt with user's timezone
|
|
system_prompt = DATA_ANALYST_SYSTEM_PROMPT.replace("{timezone}", user_timezone)
|
|
|
|
# Create the agent with conversation history, memory hooks, and tools
|
|
agent = Agent(
|
|
#messages=message_history,
|
|
model=bedrock_model,
|
|
system_prompt=system_prompt,
|
|
hooks=[MemoryHookProvider(client, memory_id, user_id, session_id, last_k_turns)],
|
|
tools=[get_tables_information, current_time, create_execute_sql_query_tool(user_message, prompt_uuid)],
|
|
callback_handler=None
|
|
)
|
|
|
|
# Stream the response to the client
|
|
stream = agent.stream_async(user_message)
|
|
async for event in stream:
|
|
if "message" in event and "content" in event["message"] and "role" in event["message"] and event["message"]["role"] == "assistant":
|
|
for content_item in event['message']['content']:
|
|
if "toolUse" in content_item and "input" in content_item["toolUse"] and content_item["toolUse"]['name'] == 'execute_sql_query':
|
|
yield f" {content_item['toolUse']['input']['description']}.\n\n"
|
|
elif "toolUse" in content_item and "name" in content_item["toolUse"] and content_item["toolUse"]['name'] == 'get_tables_information':
|
|
yield "\n\n"
|
|
elif "toolUse" in content_item and "name" in content_item["toolUse"] and content_item["toolUse"]['name'] == 'current_time':
|
|
yield "\n\n"
|
|
elif "data" in event:
|
|
yield event['data']
|
|
|
|
# Save detailed agent interactions after streaming is complete
|
|
save_agent_interactions(session_id, prompt_uuid, starting_message_id, agent.messages)
|
|
|
|
except Exception as e:
|
|
error_message = f"Error: {str(e)}"
|
|
print(error_message)
|
|
yield error_message
|
|
|
|
if __name__ == "__main__":
|
|
print("Starting Data Analyst Assistant with Bedrock Agent Core")
|
|
app.run() |