Eashan Kaushik 0c291a8485
customer support agent (#15)
* Adding customer support agent

* added scripts

* update from wheels

* readme final

* new arch
2025-07-16 10:02:29 -04:00

1083 lines
42 KiB
Python

# Copyright 2024 Amazon.com and its affiliates; all rights reserved.
# This file is AWS Content and may not be duplicated or distributed without permission
"""
This module contains a helper class for building and using Knowledge Bases for Amazon Bedrock.
The KnowledgeBasesForAmazonBedrock class provides a convenient interface for working with Knowledge Bases.
It includes methods for creating, updating, and invoking Knowledge Bases, as well as managing
IAM roles and OpenSearch Serverless.
"""
import json
import boto3
import time
import uuid
from botocore.exceptions import ClientError
from opensearchpy import (
OpenSearch,
RequestsHttpConnection,
AWSV4SignerAuth,
RequestError,
)
import pprint
from retrying import retry
import random
import yaml
import os
import argparse
valid_embedding_models = [
"cohere.embed-multilingual-v3",
"cohere.embed-english-v3",
"amazon.titan-embed-text-v1",
"amazon.titan-embed-text-v2:0",
]
pp = pprint.PrettyPrinter(indent=2)
def read_yaml_file(file_path: str):
"""
read and process a yaml file
Args:
file_path: the path to the yaml file
"""
with open(file_path, "r") as file:
try:
return yaml.safe_load(file)
except yaml.YAMLError as e:
print(f"Error reading YAML file: {e}")
return None
def interactive_sleep(seconds: int):
"""
Support functionality to induce an artificial 'sleep' to the code in order to wait for resources to be available
Args:
seconds (int): number of seconds to sleep for
"""
dots = ""
for i in range(seconds):
dots += "."
print(dots, end="\r")
time.sleep(1)
class KnowledgeBasesForAmazonBedrock:
"""
Support class that allows for:
- creation (or retrieval) of a Knowledge Base for Amazon Bedrock with all its pre-requisites
(including OSS, IAM roles and Permissions and S3 bucket)
- Ingestion of data into the Knowledge Base
- Deletion of all resources created
"""
def __init__(self, suffix=None):
"""
Class initializer
"""
boto3_session = boto3.session.Session()
self.region_name = boto3_session.region_name
self.iam_client = boto3_session.client("iam", region_name=self.region_name)
self.account_number = (
boto3.client("sts", region_name=self.region_name)
.get_caller_identity()
.get("Account")
)
if suffix is not None:
self.suffix = suffix
else:
self.suffix = str(uuid.uuid4())[:4]
self.identity = boto3.client(
"sts", region_name=self.region_name
).get_caller_identity()["Arn"]
self.aoss_client = boto3_session.client(
"opensearchserverless", region_name=self.region_name
)
self.s3_client = boto3.client("s3", region_name=self.region_name)
self.bedrock_agent_client = boto3.client(
"bedrock-agent", region_name=self.region_name
)
self.bedrock_agent_client = boto3.client(
"bedrock-agent", region_name=self.region_name
)
credentials = boto3.Session().get_credentials()
self.awsauth = AWSV4SignerAuth(credentials, self.region_name, "aoss")
self.oss_client = None
self.data_bucket_name = None
def create_or_retrieve_knowledge_base(
self,
kb_name: str,
kb_description: str = None,
data_bucket_name: str = None,
embedding_model: str = "amazon.titan-embed-text-v2:0",
):
"""
Function used to create a new Knowledge Base or retrieve an existent one
Args:
kb_name: Knowledge Base Name
kb_description: Knowledge Base Description
data_bucket_name: Name of s3 Bucket containing Knowledge Base Data
embedding_model: Name of Embedding model to be used on Knowledge Base creation
Returns:
kb_id: str - Knowledge base id
ds_id: str - Data Source id
"""
kb_id = None
ds_id = None
kbs_available = self.bedrock_agent_client.list_knowledge_bases(
maxResults=100,
)
for kb in kbs_available["knowledgeBaseSummaries"]:
if kb_name == kb["name"]:
kb_id = kb["knowledgeBaseId"]
if kb_id is not None:
ds_available = self.bedrock_agent_client.list_data_sources(
knowledgeBaseId=kb_id,
maxResults=100,
)
for ds in ds_available["dataSourceSummaries"]:
if kb_id == ds["knowledgeBaseId"]:
ds_id = ds["dataSourceId"]
if not data_bucket_name:
self.data_bucket_name = self._get_knowledge_base_s3_bucket(
kb_id, ds_id
)
print(f"Knowledge Base {kb_name} already exists.")
print(f"Retrieved Knowledge Base Id: {kb_id}")
print(f"Retrieved Data Source Id: {ds_id}")
else:
print(f"Creating KB {kb_name}")
# self.kb_name = kb_name
# self.kb_description = kb_description
if data_bucket_name is None:
kb_name_temp = kb_name.replace("_", "-")
data_bucket_name = f"{kb_name_temp}-{self.suffix}"
print(
f"KB bucket name not provided, creating a new one called: {data_bucket_name}"
)
if embedding_model not in valid_embedding_models:
valid_embeddings_str = str(valid_embedding_models)
raise ValueError(
f"Invalid embedding model. Your embedding model should be one of {valid_embeddings_str}"
)
# self.embedding_model = embedding_model
encryption_policy_name = f"{kb_name}-sp-{self.suffix}"
network_policy_name = f"{kb_name}-np-{self.suffix}"
access_policy_name = f"{kb_name}-ap-{self.suffix}"
kb_execution_role_name = (
f"AmazonBedrockExecutionRoleForKnowledgeBase_{self.suffix}"
)
fm_policy_name = (
f"AmazonBedrockFoundationModelPolicyForKnowledgeBase_{self.suffix}"
)
s3_policy_name = f"AmazonBedrockS3PolicyForKnowledgeBase_{self.suffix}"
oss_policy_name = f"AmazonBedrockOSSPolicyForKnowledgeBase_{self.suffix}"
vector_store_name = f"{kb_name}-{self.suffix}"
index_name = f"{kb_name}-index-{self.suffix}"
print(
"========================================================================================"
)
print(
f"Step 1 - Creating or retrieving {data_bucket_name} S3 bucket for Knowledge Base documents"
)
self.create_s3_bucket(data_bucket_name)
print(
"========================================================================================"
)
print(
f"Step 2 - Creating Knowledge Base Execution Role ({kb_execution_role_name}) and Policies"
)
bedrock_kb_execution_role = self.create_bedrock_kb_execution_role(
embedding_model,
data_bucket_name,
fm_policy_name,
s3_policy_name,
kb_execution_role_name,
)
print(
"========================================================================================"
)
print(f"Step 3 - Creating OSS encryption, network and data access policies")
encryption_policy, network_policy, access_policy = (
self.create_policies_in_oss(
encryption_policy_name,
vector_store_name,
network_policy_name,
bedrock_kb_execution_role,
access_policy_name,
)
)
print(
"========================================================================================"
)
print(
f"Step 4 - Creating OSS Collection (this step takes a couple of minutes to complete)"
)
host, collection, collection_id, collection_arn = self.create_oss(
vector_store_name, oss_policy_name, bedrock_kb_execution_role
)
# Build the OpenSearch client
self.oss_client = OpenSearch(
hosts=[{"host": host, "port": 443}],
http_auth=self.awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
timeout=300,
)
print(
"========================================================================================"
)
print(f"Step 5 - Creating OSS Vector Index")
self.create_vector_index(index_name)
print(
"========================================================================================"
)
print(f"Step 6 - Creating Knowledge Base")
knowledge_base, data_source = self.create_knowledge_base(
collection_arn,
index_name,
data_bucket_name,
embedding_model,
kb_name,
kb_description,
bedrock_kb_execution_role,
)
interactive_sleep(60)
print(
"========================================================================================"
)
kb_id = knowledge_base["knowledgeBaseId"]
ds_id = data_source["dataSourceId"]
return kb_id, ds_id
def create_s3_bucket(self, bucket_name: str):
"""
Check if bucket exists, and if not create S3 bucket for knowledge base data source
Args:
bucket_name: s3 bucket name
"""
self.data_bucket_name = bucket_name
try:
self.s3_client.head_bucket(Bucket=bucket_name)
print(f"Bucket {bucket_name} already exists - retrieving it!")
except ClientError as e:
print(f"Creating bucket {bucket_name}")
if self.region_name == "us-east-1":
self.s3_client.create_bucket(Bucket=bucket_name)
else:
self.s3_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={"LocationConstraint": self.region_name},
)
def upload_directory(self, s3_path, bucket_name):
"""
Upload files from a local path to s3
s3_path: local path of the document
bucket_name: bucket name
"""
for root, dirs, files in os.walk(s3_path):
for file in files:
file_to_upload = os.path.join(root, file)
print(f"uploading file {file_to_upload} to {bucket_name}")
self.s3_client.upload_file(file_to_upload, bucket_name, file)
def get_data_bucket_name(self):
"""
get the name of the data bucket
"""
return self.data_bucket_name
def _get_knowledge_base_s3_bucket(self, knowledge_base_id, data_source_id):
"""Get the s3 bucket associated with a knowledge base, if there is one"""
try:
# Get the data source details
response = self.bedrock_agent_client.get_data_source(
knowledgeBaseId=knowledge_base_id, dataSourceId=data_source_id
)
# Extract the S3 bucket information from the data source configuration
data_source_config = response["dataSource"]["dataSourceConfiguration"]
if data_source_config["type"] == "S3":
s3_config = data_source_config["s3Configuration"]
bucket_arn = s3_config["bucketArn"]
# Extract bucket name from ARN
bucket_name = bucket_arn.split(":")[-1]
return bucket_name
else:
return "Data source is not an S3 bucket"
except Exception as e:
print(f"Error retrieving data source information: {str(e)}")
return None
def create_bedrock_kb_execution_role(
self,
embedding_model: str,
bucket_name: str,
fm_policy_name: str,
s3_policy_name: str,
kb_execution_role_name: str,
):
"""
Create Knowledge Base Execution IAM Role and its required policies.
If role and/or policies already exist, retrieve them
Args:
embedding_model: the embedding model used by the knowledge base
bucket_name: the bucket name used by the knowledge base
fm_policy_name: the name of the foundation model access policy
s3_policy_name: the name of the s3 access policy
kb_execution_role_name: the name of the knowledge base execution role
Returns:
IAM role created
"""
foundation_model_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"bedrock:InvokeModel",
],
"Resource": [
f"arn:aws:bedrock:{self.region_name}::foundation-model/{embedding_model}"
],
}
],
}
s3_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:GetObject", "s3:ListBucket"],
"Resource": [
f"arn:aws:s3:::{bucket_name}",
f"arn:aws:s3:::{bucket_name}/*",
],
"Condition": {
"StringEquals": {
"aws:ResourceAccount": f"{self.account_number}"
}
},
}
],
}
assume_role_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "bedrock.amazonaws.com"},
"Action": "sts:AssumeRole",
}
],
}
try:
# create policies based on the policy documents
fm_policy = self.iam_client.create_policy(
PolicyName=fm_policy_name,
PolicyDocument=json.dumps(foundation_model_policy_document),
Description="Policy for accessing foundation model",
)
except self.iam_client.exceptions.EntityAlreadyExistsException:
print(f"{fm_policy_name} already exists, retrieving it!")
fm_policy = self.iam_client.get_policy(
PolicyArn=f"arn:aws:iam::{self.account_number}:policy/{fm_policy_name}"
)
try:
s3_policy = self.iam_client.create_policy(
PolicyName=s3_policy_name,
PolicyDocument=json.dumps(s3_policy_document),
Description="Policy for reading documents from s3",
)
except self.iam_client.exceptions.EntityAlreadyExistsException:
print(f"{s3_policy_name} already exists, retrieving it!")
s3_policy = self.iam_client.get_policy(
PolicyArn=f"arn:aws:iam::{self.account_number}:policy/{s3_policy_name}"
)
# create bedrock execution role
try:
bedrock_kb_execution_role = self.iam_client.create_role(
RoleName=kb_execution_role_name,
AssumeRolePolicyDocument=json.dumps(assume_role_policy_document),
Description="Amazon Bedrock Knowledge Base Execution Role for accessing OSS and S3",
MaxSessionDuration=3600,
)
except self.iam_client.exceptions.EntityAlreadyExistsException:
print(f"{kb_execution_role_name} already exists, retrieving it!")
bedrock_kb_execution_role = self.iam_client.get_role(
RoleName=kb_execution_role_name
)
# fetch arn of the policies and role created above
s3_policy_arn = s3_policy["Policy"]["Arn"]
fm_policy_arn = fm_policy["Policy"]["Arn"]
# attach policies to Amazon Bedrock execution role
self.iam_client.attach_role_policy(
RoleName=bedrock_kb_execution_role["Role"]["RoleName"],
PolicyArn=fm_policy_arn,
)
self.iam_client.attach_role_policy(
RoleName=bedrock_kb_execution_role["Role"]["RoleName"],
PolicyArn=s3_policy_arn,
)
return bedrock_kb_execution_role
def create_oss_policy_attach_bedrock_execution_role(
self, collection_id: str, oss_policy_name: str, bedrock_kb_execution_role: str
):
"""
Create OpenSearch Serverless policy and attach it to the Knowledge Base Execution role.
If policy already exists, attaches it
Args:
collection_id: collection id
oss_policy_name: opensearch serverless policy name
bedrock_kb_execution_role: knowledge base execution role
Returns:
created: bool - boolean to indicate if role was created
"""
# define oss policy document
oss_policy_document = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["aoss:APIAccessAll"],
"Resource": [
f"arn:aws:aoss:{self.region_name}:{self.account_number}:collection/{collection_id}"
],
}
],
}
oss_policy_arn = f"arn:aws:iam::{self.account_number}:policy/{oss_policy_name}"
created = False
try:
self.iam_client.create_policy(
PolicyName=oss_policy_name,
PolicyDocument=json.dumps(oss_policy_document),
Description="Policy for accessing opensearch serverless",
)
created = True
except self.iam_client.exceptions.EntityAlreadyExistsException:
print(f"Policy {oss_policy_arn} already exists, updating it")
print("Opensearch serverless arn: ", oss_policy_arn)
self.iam_client.attach_role_policy(
RoleName=bedrock_kb_execution_role["Role"]["RoleName"],
PolicyArn=oss_policy_arn,
)
return created
def create_policies_in_oss(
self,
encryption_policy_name: str,
vector_store_name: str,
network_policy_name: str,
bedrock_kb_execution_role: str,
access_policy_name: str,
):
"""
Create OpenSearch Serverless encryption, network and data access policies.
If policies already exist, retrieve them
Args:
encryption_policy_name: name of the data encryption policy
vector_store_name: name of the vector store
network_policy_name: name of the network policy
bedrock_kb_execution_role: name of the knowledge base execution role
access_policy_name: name of the data access policy
Returns:
encryption_policy, network_policy, access_policy
"""
try:
encryption_policy = self.aoss_client.create_security_policy(
name=encryption_policy_name,
policy=json.dumps(
{
"Rules": [
{
"Resource": ["collection/" + vector_store_name],
"ResourceType": "collection",
}
],
"AWSOwnedKey": True,
}
),
type="encryption",
)
except self.aoss_client.exceptions.ConflictException:
print(f"{encryption_policy_name} already exists, retrieving it!")
encryption_policy = self.aoss_client.get_security_policy(
name=encryption_policy_name, type="encryption"
)
try:
network_policy = self.aoss_client.create_security_policy(
name=network_policy_name,
policy=json.dumps(
[
{
"Rules": [
{
"Resource": ["collection/" + vector_store_name],
"ResourceType": "collection",
}
],
"AllowFromPublic": True,
}
]
),
type="network",
)
except self.aoss_client.exceptions.ConflictException:
print(f"{network_policy_name} already exists, retrieving it!")
network_policy = self.aoss_client.get_security_policy(
name=network_policy_name, type="network"
)
try:
access_policy = self.aoss_client.create_access_policy(
name=access_policy_name,
policy=json.dumps(
[
{
"Rules": [
{
"Resource": ["collection/" + vector_store_name],
"Permission": [
"aoss:CreateCollectionItems",
"aoss:DeleteCollectionItems",
"aoss:UpdateCollectionItems",
"aoss:DescribeCollectionItems",
],
"ResourceType": "collection",
},
{
"Resource": ["index/" + vector_store_name + "/*"],
"Permission": [
"aoss:CreateIndex",
"aoss:DeleteIndex",
"aoss:UpdateIndex",
"aoss:DescribeIndex",
"aoss:ReadDocument",
"aoss:WriteDocument",
],
"ResourceType": "index",
},
],
"Principal": [
self.identity,
bedrock_kb_execution_role["Role"]["Arn"],
],
"Description": "Easy data policy",
}
]
),
type="data",
)
except self.aoss_client.exceptions.ConflictException:
print(f"{access_policy_name} already exists, retrieving it!")
access_policy = self.aoss_client.get_access_policy(
name=access_policy_name, type="data"
)
return encryption_policy, network_policy, access_policy
def create_oss(
self,
vector_store_name: str,
oss_policy_name: str,
bedrock_kb_execution_role: str,
):
"""
Create OpenSearch Serverless Collection. If already existent, retrieve
Args:
vector_store_name: name of the vector store
oss_policy_name: name of the opensearch serverless access policy
bedrock_kb_execution_role: name of the knowledge base execution role
"""
try:
collection = self.aoss_client.create_collection(
name=vector_store_name, type="VECTORSEARCH"
)
collection_id = collection["createCollectionDetail"]["id"]
collection_arn = collection["createCollectionDetail"]["arn"]
except self.aoss_client.exceptions.ConflictException:
collection = self.aoss_client.batch_get_collection(
names=[vector_store_name]
)["collectionDetails"][0]
pp.pprint(collection)
collection_id = collection["id"]
collection_arn = collection["arn"]
pp.pprint(collection)
# Get the OpenSearch serverless collection URL
host = collection_id + "." + self.region_name + ".aoss.amazonaws.com"
print(host)
# wait for collection creation
# This can take couple of minutes to finish
response = self.aoss_client.batch_get_collection(names=[vector_store_name])
# Periodically check collection status
while (response["collectionDetails"][0]["status"]) == "CREATING":
print("Creating collection...")
interactive_sleep(30)
response = self.aoss_client.batch_get_collection(names=[vector_store_name])
print("\nCollection successfully created:")
pp.pprint(response["collectionDetails"])
# create opensearch serverless access policy and attach it to Bedrock execution role
try:
created = self.create_oss_policy_attach_bedrock_execution_role(
collection_id, oss_policy_name, bedrock_kb_execution_role
)
if created:
# It can take up to a minute for data access rules to be enforced
print(
"Sleeping for a minute to ensure data access rules have been enforced"
)
interactive_sleep(60)
return host, collection, collection_id, collection_arn
except Exception as e:
print("Policy already exists")
pp.pprint(e)
def create_vector_index(self, index_name: str):
"""
Create OpenSearch Serverless vector index. If existent, ignore
Args:
index_name: name of the vector index
"""
body_json = {
"settings": {
"index.knn": "true",
"number_of_shards": 1,
"knn.algo_param.ef_search": 512,
"number_of_replicas": 0,
},
"mappings": {
"properties": {
"vector": {
"type": "knn_vector",
"dimension": 1024,
"method": {
"name": "hnsw",
"engine": "faiss",
"space_type": "l2",
},
},
"text": {"type": "text"},
"text-metadata": {"type": "text"},
}
},
}
# Create index
try:
response = self.oss_client.indices.create(
index=index_name, body=json.dumps(body_json)
)
print("\nCreating index:")
pp.pprint(response)
# index creation can take up to a minute
interactive_sleep(60)
except RequestError as e:
# you can delete the index if its already exists
# oss_client.indices.delete(index=index_name)
print(
f"Error while trying to create the index, with error {e.error}\nyou may unmark the delete above to "
f"delete, and recreate the index"
)
@retry(wait_random_min=1000, wait_random_max=2000, stop_max_attempt_number=7)
def create_knowledge_base(
self,
collection_arn: str,
index_name: str,
bucket_name: str,
embedding_model: str,
kb_name: str,
kb_description: str,
bedrock_kb_execution_role: str,
):
"""
Create Knowledge Base and its Data Source. If existent, retrieve
Args:
collection_arn: ARN of the opensearch serverless collection
index_name: name of the opensearch serverless index
bucket_name: name of the s3 bucket containing the knowledge base data
embedding_model: id of the embedding model used
kb_name: knowledge base name
kb_description: knowledge base description
bedrock_kb_execution_role: knowledge base execution role
Returns:
knowledge base object,
data source object
"""
opensearch_serverless_configuration = {
"collectionArn": collection_arn,
"vectorIndexName": index_name,
"fieldMapping": {
"vectorField": "vector",
"textField": "text",
"metadataField": "text-metadata",
},
}
# Ingest strategy - How to ingest data from the data source
chunking_strategy_configuration = {
"chunkingStrategy": "FIXED_SIZE",
"fixedSizeChunkingConfiguration": {
"maxTokens": 512,
"overlapPercentage": 20,
},
}
# The data source to ingest documents from, into the OpenSearch serverless knowledge base index
s3_configuration = {
"bucketArn": f"arn:aws:s3:::{bucket_name}",
# "inclusionPrefixes": [
# "policies"
# ], # you can use this if you want to create a KB using data within s3 prefixes.
}
# The embedding model used by Bedrock to embed ingested documents, and realtime prompts
embedding_model_arn = (
f"arn:aws:bedrock:{self.region_name}::foundation-model/{embedding_model}"
)
print(
str(
{
"type": "VECTOR",
"vectorKnowledgeBaseConfiguration": {
"embeddingModelArn": embedding_model_arn
},
}
)
)
try:
create_kb_response = self.bedrock_agent_client.create_knowledge_base(
name=kb_name,
description=kb_description,
roleArn=bedrock_kb_execution_role["Role"]["Arn"],
knowledgeBaseConfiguration={
"type": "VECTOR",
"vectorKnowledgeBaseConfiguration": {
"embeddingModelArn": embedding_model_arn
},
},
storageConfiguration={
"type": "OPENSEARCH_SERVERLESS",
"opensearchServerlessConfiguration": opensearch_serverless_configuration,
},
)
kb = create_kb_response["knowledgeBase"]
pp.pprint(kb)
except self.bedrock_agent_client.exceptions.ConflictException:
kbs = self.bedrock_agent_client.list_knowledge_bases(maxResults=100)
kb_id = None
for kb in kbs["knowledgeBaseSummaries"]:
if kb["name"] == kb_name:
kb_id = kb["knowledgeBaseId"]
response = self.bedrock_agent_client.get_knowledge_base(
knowledgeBaseId=kb_id
)
kb = response["knowledgeBase"]
pp.pprint(kb)
# Create a DataSource in KnowledgeBase
try:
create_ds_response = self.bedrock_agent_client.create_data_source(
name=kb_name,
description=kb_description,
knowledgeBaseId=kb["knowledgeBaseId"],
dataDeletionPolicy="RETAIN",
dataSourceConfiguration={
"type": "S3",
"s3Configuration": s3_configuration,
},
vectorIngestionConfiguration={
"chunkingConfiguration": chunking_strategy_configuration
},
)
ds = create_ds_response["dataSource"]
pp.pprint(ds)
except self.bedrock_agent_client.exceptions.ConflictException:
ds_id = self.bedrock_agent_client.list_data_sources(
knowledgeBaseId=kb["knowledgeBaseId"], maxResults=100
)["dataSourceSummaries"][0]["dataSourceId"]
get_ds_response = self.bedrock_agent_client.get_data_source(
dataSourceId=ds_id, knowledgeBaseId=kb["knowledgeBaseId"]
)
ds = get_ds_response["dataSource"]
pp.pprint(ds)
return kb, ds
def synchronize_data(self, kb_id, ds_id):
"""
Start an ingestion job to synchronize data from an S3 bucket to the Knowledge Base
and waits for the job to be completed
Args:
kb_id: knowledge base id
ds_id: data source id
"""
# ensure that the kb is available
i_status = ["CREATING", "DELETING", "UPDATING"]
while (
self.bedrock_agent_client.get_knowledge_base(knowledgeBaseId=kb_id)[
"knowledgeBase"
]["status"]
in i_status
):
time.sleep(10)
# Start an ingestion job
start_job_response = self.bedrock_agent_client.start_ingestion_job(
knowledgeBaseId=kb_id, dataSourceId=ds_id
)
job = start_job_response["ingestionJob"]
pp.pprint(job)
# Get job
while job["status"] != "COMPLETE" and job["status"] != "FAILED":
get_job_response = self.bedrock_agent_client.get_ingestion_job(
knowledgeBaseId=kb_id,
dataSourceId=ds_id,
ingestionJobId=job["ingestionJobId"],
)
job = get_job_response["ingestionJob"]
interactive_sleep(5)
pp.pprint(job)
# interactive_sleep(40)
def get_kb(self, kb_id):
"""
Get KB details
Args:
kb_id: knowledge base id
"""
get_job_response = self.bedrock_agent_client.get_knowledge_base(
knowledgeBaseId=kb_id
)
return get_job_response
def delete_kb(
self,
kb_name: str,
delete_s3_bucket: bool = True,
delete_iam_roles_and_policies: bool = True,
delete_aoss: bool = True,
):
"""
Delete the Knowledge Base resources
Args:
kb_name: name of the knowledge base to delete
delete_s3_bucket (bool): boolean to indicate if s3 bucket should also be deleted
delete_iam_roles_and_policies (bool): boolean to indicate if IAM roles and Policies should also be deleted
delete_aoss: boolean to indicate if amazon opensearch serverless resources should also be deleted
"""
kbs_available = self.bedrock_agent_client.list_knowledge_bases(
maxResults=100,
)
kb_id = None
ds_id = None
for kb in kbs_available["knowledgeBaseSummaries"]:
if kb_name == kb["name"]:
kb_id = kb["knowledgeBaseId"]
kb_details = self.bedrock_agent_client.get_knowledge_base(knowledgeBaseId=kb_id)
kb_role = kb_details["knowledgeBase"]["roleArn"].split("/")[1]
collection_id = kb_details["knowledgeBase"]["storageConfiguration"][
"opensearchServerlessConfiguration"
]["collectionArn"].split("/")[1]
index_name = kb_details["knowledgeBase"]["storageConfiguration"][
"opensearchServerlessConfiguration"
]["vectorIndexName"]
encryption_policies = self.aoss_client.list_security_policies(
maxResults=100, type="encryption"
)
encryption_policy_name = None
for ep in encryption_policies["securityPolicySummaries"]:
if ep["name"].startswith(kb_name):
encryption_policy_name = ep["name"]
network_policies = self.aoss_client.list_security_policies(
maxResults=100, type="network"
)
network_policy_name = None
for np in network_policies["securityPolicySummaries"]:
if np["name"].startswith(kb_name):
network_policy_name = np["name"]
data_policies = self.aoss_client.list_access_policies(
maxResults=100, type="data"
)
access_policy_name = None
for dp in data_policies["accessPolicySummaries"]:
if dp["name"].startswith(kb_name):
access_policy_name = dp["name"]
ds_available = self.bedrock_agent_client.list_data_sources(
knowledgeBaseId=kb_id,
maxResults=100,
)
for ds in ds_available["dataSourceSummaries"]:
if kb_id == ds["knowledgeBaseId"]:
ds_id = ds["dataSourceId"]
ds_details = self.bedrock_agent_client.get_data_source(
dataSourceId=ds_id,
knowledgeBaseId=kb_id,
)
bucket_name = ds_details["dataSource"]["dataSourceConfiguration"][
"s3Configuration"
]["bucketArn"].replace("arn:aws:s3:::", "")
try:
self.bedrock_agent_client.delete_data_source(
dataSourceId=ds_id, knowledgeBaseId=kb_id
)
print("Data Source deleted successfully!")
except Exception as e:
print(e)
try:
self.bedrock_agent_client.delete_knowledge_base(knowledgeBaseId=kb_id)
print("Knowledge Base deleted successfully!")
except Exception as e:
print(e)
if delete_aoss:
try:
self.oss_client.indices.delete(index=index_name)
print("OpenSource Serveless Index deleted successfully!")
except Exception as e:
print(e)
try:
self.aoss_client.delete_collection(id=collection_id)
print("OpenSource Collection Index deleted successfully!")
except Exception as e:
print(e)
try:
self.aoss_client.delete_access_policy(
type="data", name=access_policy_name
)
print("OpenSource Serveless access policy deleted successfully!")
except Exception as e:
print(e)
try:
self.aoss_client.delete_security_policy(
type="network", name=network_policy_name
)
print("OpenSource Serveless network policy deleted successfully!")
except Exception as e:
print(e)
try:
self.aoss_client.delete_security_policy(
type="encryption", name=encryption_policy_name
)
print("OpenSource Serveless encryption policy deleted successfully!")
except Exception as e:
print(e)
if delete_s3_bucket:
try:
self.delete_s3(bucket_name)
print("Knowledge Base S3 bucket deleted successfully!")
except Exception as e:
print(e)
if delete_iam_roles_and_policies:
try:
self.delete_iam_roles_and_policies(kb_role)
print("Knowledge Base Roles and Policies deleted successfully!")
except Exception as e:
print(e)
print("Resources deleted successfully!")
def delete_iam_roles_and_policies(self, kb_execution_role_name: str):
"""
Delete IAM Roles and policies used by the Knowledge Base
Args:
kb_execution_role_name: knowledge base execution role
"""
attached_policies = self.iam_client.list_attached_role_policies(
RoleName=kb_execution_role_name, MaxItems=100
)
policies_arns = []
for policy in attached_policies["AttachedPolicies"]:
policies_arns.append(policy["PolicyArn"])
for policy in policies_arns:
self.iam_client.detach_role_policy(
RoleName=kb_execution_role_name, PolicyArn=policy
)
self.iam_client.delete_policy(PolicyArn=policy)
self.iam_client.delete_role(RoleName=kb_execution_role_name)
return 0
def delete_s3(self, bucket_name: str):
"""
Delete the objects contained in the Knowledge Base S3 bucket.
Once the bucket is empty, delete the bucket
Args:
bucket_name: bucket name
"""
objects = self.s3_client.list_objects(Bucket=bucket_name)
if "Contents" in objects:
for obj in objects["Contents"]:
self.s3_client.delete_object(Bucket=bucket_name, Key=obj["Key"])
self.s3_client.delete_bucket(Bucket=bucket_name)
if __name__ == "__main__":
kb = KnowledgeBasesForAmazonBedrock()
smm_client = boto3.client("ssm")
current_dir = os.path.dirname(os.path.abspath(__file__))
# Example usage:
config_path = f"{current_dir}/prereqs_config.yaml"
data = read_yaml_file(config_path)
parser = argparse.ArgumentParser(description="Knowledge Base handler")
parser.add_argument(
"--mode",
required=True,
help="Knowledge Base helper model. One for: create or delete.",
)
args = parser.parse_args()
print(data)
if args.mode == "create":
kb_id, ds_id = kb.create_or_retrieve_knowledge_base(
data["knowledge_base_name"], data["knowledge_base_description"]
)
print(f"Knowledge Base ID: {kb_id}")
print(f"Data Source ID: {ds_id}")
kb.upload_directory(
f"{current_dir}/{data['kb_files_path']}", kb.get_data_bucket_name()
)
kb.synchronize_data(kb_id, ds_id)
smm_client.put_parameter(
Name="/app/customersupport/knowledge_base/knowledge_base_id",
Description=f"{data['knowledge_base_name']} kb id",
Value=kb_id,
Type="String",
Overwrite=True,
)
if args.mode == "delete":
kb.delete_kb(data["knowledge_base_name"])
smm_client.delete_parameter(
Name="/app/customersupport/knowledge_base/knowledge_base_id"
)