mirror of
https://github.com/awslabs/amazon-bedrock-agentcore-samples.git
synced 2025-09-08 20:50:46 +00:00
* feat(customer-support): updated code * Delete 02-use-cases/customer-support-assistant/Dockerfile Signed-off-by: Eashan Kaushik <50113394+EashanKaushik@users.noreply.github.com> * Update .gitignore Signed-off-by: Eashan Kaushik <50113394+EashanKaushik@users.noreply.github.com> --------- Signed-off-by: Eashan Kaushik <50113394+EashanKaushik@users.noreply.github.com>
167 lines
5.0 KiB
Python
167 lines
5.0 KiB
Python
#!/usr/bin/python
|
|
|
|
import base64
|
|
import hashlib
|
|
from typing import Any, Optional
|
|
import webbrowser
|
|
import urllib
|
|
import json
|
|
from urllib.parse import urlencode
|
|
import requests
|
|
import uuid
|
|
import sys
|
|
import os
|
|
import click
|
|
import logging
|
|
|
|
# Ensure we can import local utilities
|
|
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
|
from scripts.utils import get_aws_region, read_config, get_ssm_parameter
|
|
|
|
|
|
def generate_pkce_pair():
|
|
code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode("utf-8").rstrip("=")
|
|
code_challenge = (
|
|
base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest())
|
|
.decode("utf-8")
|
|
.rstrip("=")
|
|
)
|
|
return code_verifier, code_challenge
|
|
|
|
|
|
def invoke_endpoint(
|
|
agent_arn: str,
|
|
payload,
|
|
session_id: str,
|
|
bearer_token: Optional[str],
|
|
endpoint_name: str = "DEFAULT",
|
|
) -> Any:
|
|
escaped_arn = urllib.parse.quote(agent_arn, safe="")
|
|
url = f"https://bedrock-agentcore.{get_aws_region()}.amazonaws.com/runtimes/{escaped_arn}/invocations"
|
|
|
|
headers = {
|
|
"Authorization": f"Bearer {bearer_token}",
|
|
"Content-Type": "application/json",
|
|
"X-Amzn-Bedrock-AgentCore-Runtime-Session-Id": session_id,
|
|
}
|
|
|
|
try:
|
|
body = json.loads(payload) if isinstance(payload, str) else payload
|
|
except json.JSONDecodeError:
|
|
body = {"payload": payload}
|
|
|
|
try:
|
|
response = requests.post(
|
|
url,
|
|
params={"qualifier": endpoint_name},
|
|
headers=headers,
|
|
json=body,
|
|
timeout=100,
|
|
stream=True,
|
|
)
|
|
logger = logging.getLogger("bedrock_agentcore.stream")
|
|
logger.setLevel(logging.INFO)
|
|
|
|
last_data = False
|
|
# for line in response.text.splitlines():
|
|
# # line = line.strip()
|
|
# if line.startswith("data:"):
|
|
# last_data = True
|
|
# content = line[6:]
|
|
# print(content, end="")
|
|
# elif line: # lines without "data:" that still have text
|
|
# if last_data:
|
|
# print("\n" + line, end="")
|
|
# last_data = False
|
|
|
|
for line in response.iter_lines(chunk_size=1):
|
|
if line:
|
|
line = line.decode("utf-8")
|
|
# print(line)
|
|
if line.startswith("data: "):
|
|
last_data = True
|
|
line = line[6:].replace('"', "")
|
|
print(line, end="")
|
|
elif line:
|
|
if last_data:
|
|
print("\n" + line.replace('"', ""), end="")
|
|
last_data = False
|
|
|
|
# print({"response": "\n".join(content)})
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
print("Failed to invoke agent endpoint:", str(e))
|
|
raise
|
|
|
|
|
|
@click.command()
|
|
@click.argument("agent_name")
|
|
@click.option("--prompt", "-p", default="Hello", help="Prompt to send to the agent")
|
|
def main(agent_name: str, prompt: str):
|
|
"""CLI tool to invoke a Bedrock agent by name."""
|
|
runtime_config = read_config(".bedrock_agentcore.yaml")
|
|
|
|
print(runtime_config)
|
|
|
|
if agent_name not in runtime_config["agents"]:
|
|
print(f"❌ Agent '{agent_name}' not found in config.")
|
|
sys.exit(1)
|
|
|
|
code_verifier, code_challenge = generate_pkce_pair()
|
|
state = str(uuid.uuid4())
|
|
|
|
client_id = get_ssm_parameter("/app/customersupport/agentcore/web_client_id")
|
|
cognito_domain = get_ssm_parameter("/app/customersupport/agentcore/cognito_domain")
|
|
redirect_uri = "https://example.com/auth/callback"
|
|
|
|
login_params = {
|
|
"response_type": "code",
|
|
"client_id": client_id,
|
|
"redirect_uri": redirect_uri,
|
|
"scope": "email openid profile",
|
|
"code_challenge_method": "S256",
|
|
"code_challenge": code_challenge,
|
|
"state": state,
|
|
}
|
|
|
|
login_url = f"{cognito_domain}/oauth2/authorize?{urlencode(login_params)}"
|
|
|
|
print("🔐 Open the following URL in a browser to authenticate:")
|
|
print(login_url)
|
|
webbrowser.open(login_url)
|
|
|
|
auth_code = input("📥 Paste the `code` from the redirected URL: ").strip()
|
|
|
|
token_url = get_ssm_parameter("/app/customersupport/agentcore/cognito_token_url")
|
|
response = requests.post(
|
|
token_url,
|
|
data={
|
|
"grant_type": "authorization_code",
|
|
"client_id": client_id,
|
|
"code": auth_code,
|
|
"redirect_uri": redirect_uri,
|
|
"code_verifier": code_verifier,
|
|
},
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
print(f"❌ Failed to exchange code: {response.text}")
|
|
sys.exit(1)
|
|
|
|
access_token = response.json()["access_token"]
|
|
print("✅ Access token acquired.")
|
|
|
|
agent_arn = runtime_config["agents"][agent_name]["bedrock_agentcore"]["agent_arn"]
|
|
|
|
invoke_endpoint(
|
|
agent_arn=agent_arn,
|
|
payload=json.dumps({"prompt": prompt, "actor_id": "DEFAULT"}),
|
|
bearer_token=access_token,
|
|
session_id=str(uuid.uuid4()),
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|