Eashan Kaushik 88e19eddc9
customer-support-assistant v1 (#103)
* feat(customer-support): updated code

* Delete 02-use-cases/customer-support-assistant/Dockerfile

Signed-off-by: Eashan Kaushik <50113394+EashanKaushik@users.noreply.github.com>

* Update .gitignore

Signed-off-by: Eashan Kaushik <50113394+EashanKaushik@users.noreply.github.com>

---------

Signed-off-by: Eashan Kaushik <50113394+EashanKaushik@users.noreply.github.com>
2025-07-21 11:34:00 -04:00

167 lines
5.0 KiB
Python

#!/usr/bin/python
import base64
import hashlib
from typing import Any, Optional
import webbrowser
import urllib
import json
from urllib.parse import urlencode
import requests
import uuid
import sys
import os
import click
import logging
# Ensure we can import local utilities
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from scripts.utils import get_aws_region, read_config, get_ssm_parameter
def generate_pkce_pair():
code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode("utf-8").rstrip("=")
code_challenge = (
base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest())
.decode("utf-8")
.rstrip("=")
)
return code_verifier, code_challenge
def invoke_endpoint(
agent_arn: str,
payload,
session_id: str,
bearer_token: Optional[str],
endpoint_name: str = "DEFAULT",
) -> Any:
escaped_arn = urllib.parse.quote(agent_arn, safe="")
url = f"https://bedrock-agentcore.{get_aws_region()}.amazonaws.com/runtimes/{escaped_arn}/invocations"
headers = {
"Authorization": f"Bearer {bearer_token}",
"Content-Type": "application/json",
"X-Amzn-Bedrock-AgentCore-Runtime-Session-Id": session_id,
}
try:
body = json.loads(payload) if isinstance(payload, str) else payload
except json.JSONDecodeError:
body = {"payload": payload}
try:
response = requests.post(
url,
params={"qualifier": endpoint_name},
headers=headers,
json=body,
timeout=100,
stream=True,
)
logger = logging.getLogger("bedrock_agentcore.stream")
logger.setLevel(logging.INFO)
last_data = False
# for line in response.text.splitlines():
# # line = line.strip()
# if line.startswith("data:"):
# last_data = True
# content = line[6:]
# print(content, end="")
# elif line: # lines without "data:" that still have text
# if last_data:
# print("\n" + line, end="")
# last_data = False
for line in response.iter_lines(chunk_size=1):
if line:
line = line.decode("utf-8")
# print(line)
if line.startswith("data: "):
last_data = True
line = line[6:].replace('"', "")
print(line, end="")
elif line:
if last_data:
print("\n" + line.replace('"', ""), end="")
last_data = False
# print({"response": "\n".join(content)})
except requests.exceptions.RequestException as e:
print("Failed to invoke agent endpoint:", str(e))
raise
@click.command()
@click.argument("agent_name")
@click.option("--prompt", "-p", default="Hello", help="Prompt to send to the agent")
def main(agent_name: str, prompt: str):
"""CLI tool to invoke a Bedrock agent by name."""
runtime_config = read_config(".bedrock_agentcore.yaml")
print(runtime_config)
if agent_name not in runtime_config["agents"]:
print(f"❌ Agent '{agent_name}' not found in config.")
sys.exit(1)
code_verifier, code_challenge = generate_pkce_pair()
state = str(uuid.uuid4())
client_id = get_ssm_parameter("/app/customersupport/agentcore/web_client_id")
cognito_domain = get_ssm_parameter("/app/customersupport/agentcore/cognito_domain")
redirect_uri = "https://example.com/auth/callback"
login_params = {
"response_type": "code",
"client_id": client_id,
"redirect_uri": redirect_uri,
"scope": "email openid profile",
"code_challenge_method": "S256",
"code_challenge": code_challenge,
"state": state,
}
login_url = f"{cognito_domain}/oauth2/authorize?{urlencode(login_params)}"
print("🔐 Open the following URL in a browser to authenticate:")
print(login_url)
webbrowser.open(login_url)
auth_code = input("📥 Paste the `code` from the redirected URL: ").strip()
token_url = get_ssm_parameter("/app/customersupport/agentcore/cognito_token_url")
response = requests.post(
token_url,
data={
"grant_type": "authorization_code",
"client_id": client_id,
"code": auth_code,
"redirect_uri": redirect_uri,
"code_verifier": code_verifier,
},
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
if response.status_code != 200:
print(f"❌ Failed to exchange code: {response.text}")
sys.exit(1)
access_token = response.json()["access_token"]
print("✅ Access token acquired.")
agent_arn = runtime_config["agents"][agent_name]["bedrock_agentcore"]["agent_arn"]
invoke_endpoint(
agent_arn=agent_arn,
payload=json.dumps({"prompt": prompt, "actor_id": "DEFAULT"}),
bearer_token=access_token,
session_id=str(uuid.uuid4()),
)
if __name__ == "__main__":
main()