Uriel Ramirez 1c2fb802a0
Data Analyst Assistant Solution with Amazon Bedrock AgentCore (#108)
* Data Analyst Assistant Solution with Amazon Bedrock AgentCore

* Data Analyst Assistant Solution with Amazon Bedrock AgentCore

* Data Analyst Assistant Solution with Amazon Bedrock AgentCore

* Data Analyst Assistant Solution with Amazon Bedrock AgentCore

* Data Analyst Assistant Solution with Amazon Bedrock AgentCore

* Data Analyst Assistant Solution with Amazon Bedrock AgentCore

* Data Analyst Assistant Solution with Amazon Bedrock AgentCore

---------

Co-authored-by: Uriel Ramirez <beralfon@amazon.com>
2025-07-22 06:34:07 -04:00

124 lines
3.6 KiB
Python

import boto3
import os
session = boto3.session.Session()
region = session.region_name
# Environment variables
data_source_bucket_name = os.environ["DATA_SOURCE_BUCKET_NAME"]
aurora_serverless_db_cluster_arn = os.environ["AURORA_SERVERLESS_DB_CLUSTER_ARN"]
secret_arn = os.environ["SECRET_ARN"]
database_name = "video_games_sales"
# File path variables
local_file_path = "resources/database/video_games_sales_no_headers.csv"
s3_file_name = "video_games_sales_no_headers.csv"
try:
# Upload file to S3
s3_client = boto3.client("s3")
s3_client.upload_file(
local_file_path,
data_source_bucket_name,
s3_file_name,
)
print(f"File '{s3_file_name}' uploaded to bucket '{data_source_bucket_name}'")
# RDS data client
client = boto3.client("rds-data")
# Create table
query1 = """ CREATE TABLE video_games_sales_units (
title TEXT,
console TEXT,
genre TEXT,
publisher TEXT,
developer TEXT,
critic_score NUMERIC(3,1),
total_sales NUMERIC(4,2),
na_sales NUMERIC(4,2),
jp_sales NUMERIC(4,2),
pal_sales NUMERIC(4,2),
other_sales NUMERIC(4,2),
release_date DATE
); """
response = client.execute_statement(
resourceArn=aurora_serverless_db_cluster_arn,
secretArn=secret_arn,
sql=query1,
database=database_name,
)
print("Query: " + query1)
print("Query response: " + str(response))
# Create AWS S3 extension
query2 = "CREATE EXTENSION aws_s3 CASCADE;"
response = client.execute_statement(
resourceArn=aurora_serverless_db_cluster_arn,
secretArn=secret_arn,
sql=query2,
database=database_name,
)
print("-----------------------------------------")
print("Query: " + query2)
print("Query response: " + str(response))
# Import data from S3
query3 = f"""
SELECT aws_s3.table_import_from_s3(
'video_games_sales_units',
'title,console,genre,publisher,developer,critic_score,total_sales,na_sales,jp_sales,pal_sales,other_sales,release_date',
'DELIMITER ''|''',
aws_commons.create_s3_uri('{data_source_bucket_name}', '{s3_file_name}', '{region}')
); """
response = client.execute_statement(
resourceArn=aurora_serverless_db_cluster_arn,
secretArn=secret_arn,
sql=query3,
database=database_name,
)
print("-----------------------------------------")
print("Query: " + query3)
print("Query response: " + str(response))
# Delete the file from S3
versions_response = s3_client.list_object_versions(
Bucket=data_source_bucket_name, Prefix=s3_file_name
)
# Delete all versions of the object
delete_markers = []
if "Versions" in versions_response:
for version in versions_response["Versions"]:
delete_markers.append(
{"Key": s3_file_name, "VersionId": version["VersionId"]}
)
# Delete all delete markers
if "DeleteMarkers" in versions_response:
for marker in versions_response["DeleteMarkers"]:
delete_markers.append(
{"Key": s3_file_name, "VersionId": marker["VersionId"]}
)
# Execute the delete operation if there are versions to delete
if delete_markers:
s3_client.delete_objects(
Bucket=data_source_bucket_name,
Delete={"Objects": delete_markers, "Quiet": False},
)
print("-----------------------------------------")
print("Database created successfully!")
except Exception as e:
print(f"Error: {e}")