Sundar Raghavan 1c7e1861c1
Add Competitive Intelligence Agent with Bedrock AgentCore SDK (#221)
* Add Enterprise Web Intelligence Agent with Bedrock AgentCore Tools SDK
Automated browser navigation with CDP enhancements
Live viewing with DCV integration
Session recording and replay capabilities
LLM-powered content extraction
Code Interpreter for dynamic analysis
Comprehensive documentation and examples

* Moved project from old-folder to new-folder and updated files

* Refactor: Rename to enterprise-web-intelligence-agent and add Strands implementation alongside LangGraph
2025-08-27 15:17:28 -07:00

840 lines
34 KiB
Python

"""Browser automation tools using BedrockAgentCore SDK with Playwright and CDP enhancements."""
import asyncio
import uuid
import json
from typing import Dict, List, Optional, Any
from datetime import datetime
import boto3
from playwright.async_api import async_playwright, Browser, Page, BrowserContext, CDPSession
from langchain_core.messages import HumanMessage
from rich.console import Console
# Import from BedrockAgentCore SDK
from bedrock_agentcore.tools.browser_client import BrowserClient
from bedrock_agentcore._utils.endpoints import get_control_plane_endpoint
console = Console()
class BrowserTools:
"""Enhanced browser automation tools with CDP capabilities."""
def __init__(self, config):
self.config = config
self.browser_client = None
self.browser_id = None
self.playwright = None
self.browser = None
self.context = None
self.page = None
self.cdp_session = None
self.recording_path = None
self.llm = None
self._screenshots_taken = []
self._discovered_apis = []
self._performance_metrics = {}
def create_browser_with_recording(self) -> str:
"""Create a browser with recording configuration using Control Plane API."""
console.print("[cyan]🔧 Creating browser with recording configuration...[/cyan]")
# Create control plane client
control_plane_url = get_control_plane_endpoint(self.config.region)
control_client = boto3.client(
"bedrock-agentcore-control",
region_name=self.config.region,
endpoint_url=control_plane_url
)
# Create browser with recording
browser_name = f"competitive_intel_{uuid.uuid4().hex[:8]}"
console.print(f" Browser name: {browser_name}")
console.print(f" S3 location: s3://{self.config.s3_bucket}/{self.config.s3_prefix}")
console.print(f" Role ARN: {self.config.recording_role_arn}")
response = control_client.create_browser(
name=browser_name,
executionRoleArn=self.config.recording_role_arn,
networkConfiguration={
"networkMode": "PUBLIC"
},
recording={
"enabled": True,
"s3Location": {
"bucket": self.config.s3_bucket,
"prefix": self.config.s3_prefix
}
}
)
self.browser_id = response["browserId"]
# NEW: Store the structured recording configuration
self.recording_config = response.get("recording", {})
# Build recording path for display (but keep structured config)
s3_location = self.recording_config.get("s3Location", {})
self.recording_path = f"s3://{s3_location.get('bucket')}/{s3_location.get('prefix')}"
console.print(f"✅ Browser created: {self.browser_id}")
console.print(f"📹 Recording to: {self.recording_path}")
return self.browser_id
async def initialize_browser_session(self, llm):
"""Initialize browser session with enhanced CDP capabilities."""
self.llm = llm
# Create BrowserClient from SDK
self.browser_client = BrowserClient(region=self.config.region)
self.browser_client.identifier = self.browser_id
# Start a session
session_id = self.browser_client.start(
identifier=self.browser_id,
name=f"competitive_intel_session_{datetime.now().strftime('%Y%m%d-%H%M%S')}",
session_timeout_seconds=self.config.browser_session_timeout
)
console.print(f"✅ Session started: {session_id}")
# Get WebSocket headers
ws_url, headers = self.browser_client.generate_ws_headers()
console.print(f"[dim]WebSocket URL: {ws_url}[/dim]")
# Wait for browser initialization
console.print("[yellow]⏳ Waiting for browser to initialize...[/yellow]")
await asyncio.sleep(10)
# Initialize Playwright with CDP
console.print("[cyan]🎭 Connecting Playwright with CDP support...[/cyan]")
self.playwright = await async_playwright().start()
# Connect to the browser via CDP
self.browser = await self.playwright.chromium.connect_over_cdp(
ws_url,
headers=headers
)
# Get context and page
self.context = self.browser.contexts[0]
self.page = self.context.pages[0]
# Create CDP session for advanced features
try:
self.cdp_session = await self.context.new_cdp_session(self.page)
await self._setup_cdp_domains()
console.print("✅ CDP session initialized")
except Exception as e:
console.print(f"[yellow]⚠️ CDP setup partial: {e}[/yellow]")
self.cdp_session = None
# Set up network interception
await self._setup_network_interception()
console.print("✅ Playwright connected with enhancements")
# Set recording path
self.recording_path = f"s3://{self.config.s3_bucket}/{self.config.s3_prefix}{session_id}/"
console.print(f"📹 Recording to: {self.recording_path}")
return self.page
async def _setup_cdp_domains(self):
"""Enable CDP domains for advanced features."""
if not self.cdp_session:
return
try:
# Enable required CDP domains
await self.cdp_session.send("Network.enable")
await self.cdp_session.send("Performance.enable")
console.print("[dim]✅ CDP domains enabled[/dim]")
except Exception as e:
console.print(f"[yellow]⚠️ Some CDP domains failed: {e}[/yellow]")
async def _setup_network_interception(self):
"""Set up network request interception for API discovery."""
def handle_response(response):
"""Handle network responses to discover APIs."""
try:
url = response.url
# Skip ad networks and analytics
skip_domains = [
'doubleclick.net', 'googletagmanager.com',
'google-analytics.com', 'facebook.com',
'twitter.com', 'linkedin.com', 'pinterest.com',
'amazon-adsystem.com', 'googleadservices.com'
]
if any(domain in url.lower() for domain in skip_domains):
return
# Track relevant APIs
if any(keyword in url.lower() for keyword in ['api', 'price', 'pricing', 'tier', 'plan']):
self._discovered_apis.append({
'url': url[:100], # Truncate long URLs
'status': response.status,
'timestamp': datetime.now().isoformat()
})
if len(self._discovered_apis) <= 5: # Limit console output
console.print(f"[dim]🔍 Found API: {url[:60]}...[/dim]")
except:
pass
# Set up response handler
self.page.on("response", handle_response)
async def navigate_to_url(self, url: str) -> Dict:
"""Navigate to URL with enhanced visual feedback."""
try:
console.print(f"[cyan]🌐 Navigating to: {url}[/cyan]")
# Navigate with proper timeout
await self.page.goto(url, wait_until="domcontentloaded", timeout=60000)
# Wait for dynamic content
await self.page.wait_for_timeout(3000)
# Get page metrics if CDP is available
if self.cdp_session:
try:
metrics = await self.cdp_session.send("Performance.getMetrics")
self._performance_metrics = {m['name']: m['value'] for m in metrics.get('metrics', [])}
except:
pass
title = await self.page.title()
return {
"status": "success",
"url": url,
"title": title,
"metrics": self._performance_metrics,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
console.print(f"[red]❌ Navigation error: {e}[/red]")
return {"status": "error", "url": url, "error": str(e)}
async def analyze_forms_and_inputs(self) -> Dict:
"""NEW: Analyze forms and input fields on the page."""
console.print("[cyan]📝 Analyzing forms and inputs...[/cyan]")
try:
# Find all forms on the page
forms_data = await self.page.evaluate("""
() => {
const forms = Array.from(document.querySelectorAll('form'));
return {
forms: forms.map(form => ({
action: form.action,
method: form.method,
id: form.id,
className: form.className,
inputs: Array.from(form.querySelectorAll('input, select, textarea')).map(input => ({
type: input.type || input.tagName.toLowerCase(),
name: input.name,
id: input.id,
placeholder: input.placeholder,
required: input.required,
value: input.type === 'password' ? '[hidden]' : input.value
}))
})),
total_inputs: document.querySelectorAll('input, select, textarea').length,
has_file_upload: document.querySelectorAll('input[type="file"]').length > 0,
has_password_field: document.querySelectorAll('input[type="password"]').length > 0
};
}
""")
console.print(f"[green]Found {len(forms_data['forms'])} forms with {forms_data['total_inputs']} inputs[/green]")
if forms_data['has_file_upload']:
console.print("[yellow]📎 Page has file upload capability[/yellow]")
if forms_data['has_password_field']:
console.print("[yellow]🔐 Page has authentication form[/yellow]")
return {
"status": "success",
**forms_data
}
except Exception as e:
console.print(f"[yellow]⚠️ Form analysis error: {e}[/yellow]")
return {"status": "error", "error": str(e)}
async def handle_authentication(self, username: str, password: str, form_selector: Optional[str] = None) -> Dict:
"""NEW: Handle authentication on login pages."""
console.print("[cyan]🔐 Handling authentication...[/cyan]")
try:
# Find login form
if not form_selector:
# Common selectors for login forms
possible_selectors = [
'form[action*="login"]',
'form[action*="signin"]',
'form#loginForm',
'form.login-form',
'form'
]
for selector in possible_selectors:
form = await self.page.query_selector(selector)
if form:
form_selector = selector
break
if not form_selector:
return {"status": "error", "error": "No login form found"}
# Fill in credentials
await self.page.fill('input[type="email"], input[type="text"], input[name*="user"]', username)
await self.page.fill('input[type="password"]', password)
# Submit form
await self.page.click('button[type="submit"], input[type="submit"]')
# Wait for navigation or response
await self.page.wait_for_timeout(3000)
# Check if login was successful (simple heuristic)
current_url = self.page.url
return {
"status": "success",
"logged_in": "login" not in current_url.lower(),
"current_url": current_url
}
except Exception as e:
console.print(f"[red]❌ Authentication error: {e}[/red]")
return {"status": "error", "error": str(e)}
async def upload_file_to_form(self, file_path: str, selector: str = 'input[type="file"]') -> Dict:
"""NEW: Upload a file to a form."""
console.print(f"[cyan]📤 Uploading file: {file_path}[/cyan]")
try:
# Find file input
file_input = await self.page.query_selector(selector)
if not file_input:
return {"status": "error", "error": "No file input found"}
# Set the file
await file_input.set_input_files(file_path)
# Wait for any upload progress
await self.page.wait_for_timeout(2000)
return {
"status": "success",
"file_uploaded": file_path
}
except Exception as e:
console.print(f"[red]❌ File upload error: {e}[/red]")
return {"status": "error", "error": str(e)}
async def explore_multi_page_workflow(self, target_pages: List[str]) -> List[Dict]:
"""NEW: Explore multiple pages in a workflow."""
console.print(f"[cyan]🔄 Exploring {len(target_pages)} additional pages...[/cyan]")
explored_pages = []
base_url = self.page.url
for target in target_pages:
try:
# Try to find and navigate to the page
console.print(f"[dim]Looking for: {target}[/dim]")
# Look for links containing the target keyword
link_found = False
selectors = [
f'a[href*="{target}"]',
f'a:has-text("{target}")',
f'nav a:has-text("{target}")',
f'[class*="menu"] a:has-text("{target}")'
]
for selector in selectors:
try:
link = await self.page.query_selector(selector)
if link:
await link.click()
await self.page.wait_for_load_state("domcontentloaded")
await self.page.wait_for_timeout(2000)
# Capture information about this page
page_info = {
"target": target,
"url": self.page.url,
"title": await self.page.title(),
"found": True,
"timestamp": datetime.now().isoformat()
}
# Take a screenshot
await self.take_annotated_screenshot(f"Explored - {target}")
explored_pages.append(page_info)
console.print(f"[green]✅ Found and explored: {target}[/green]")
link_found = True
# Go back to base URL for next exploration
await self.page.goto(base_url, wait_until="domcontentloaded")
break
except:
continue
if not link_found:
explored_pages.append({
"target": target,
"found": False,
"timestamp": datetime.now().isoformat()
})
console.print(f"[yellow]⚠️ Could not find: {target}[/yellow]")
except Exception as e:
console.print(f"[yellow]⚠️ Error exploring {target}: {e}[/yellow]")
explored_pages.append({
"target": target,
"found": False,
"error": str(e)
})
return explored_pages
async def execute_javascript_analysis(self, custom_script: Optional[str] = None) -> Dict:
"""NEW: Execute custom JavaScript for advanced analysis."""
console.print("[cyan]⚡ Executing JavaScript analysis...[/cyan]")
try:
if custom_script:
result = await self.page.evaluate(custom_script)
else:
# Default analysis script
result = await self.page.evaluate("""
() => {
// Analyze page structure
const analysis = {
// Count different element types
tables: document.querySelectorAll('table').length,
forms: document.querySelectorAll('form').length,
images: document.querySelectorAll('img').length,
videos: document.querySelectorAll('video').length,
iframes: document.querySelectorAll('iframe').length,
// Check for specific technologies
hasReact: window.React !== undefined,
hasJQuery: window.jQuery !== undefined,
hasAngular: window.angular !== undefined,
// Page metrics
documentHeight: document.documentElement.scrollHeight,
viewportHeight: window.innerHeight,
// Interactive elements
buttons: document.querySelectorAll('button').length,
links: document.querySelectorAll('a').length,
// Meta information
metaDescription: document.querySelector('meta[name="description"]')?.content,
metaKeywords: document.querySelector('meta[name="keywords"]')?.content
};
return analysis;
}
""")
console.print(f"[green]JavaScript analysis complete[/green]")
return {
"status": "success",
"analysis": result
}
except Exception as e:
console.print(f"[red]❌ JavaScript execution error: {e}[/red]")
return {"status": "error", "error": str(e)}
async def intelligent_scroll_and_discover(self) -> List[Dict]:
"""Perform intelligent scrolling to discover content sections."""
console.print("[cyan]🔍 Discovering page content...[/cyan]")
discovered_sections = []
try:
# Get page height
page_height = await self.page.evaluate("document.body.scrollHeight")
viewport_height = await self.page.evaluate("window.innerHeight")
# Calculate scroll positions (0%, 25%, 50%, 75%, 100%)
scroll_positions = [0, 0.25, 0.5, 0.75, 1.0]
for position in scroll_positions:
current_position = int(page_height * position)
# Smooth scroll
await self.page.evaluate(f"window.scrollTo({{top: {current_position}, behavior: 'smooth'}})")
await asyncio.sleep(1) # Pause to load content
# Look for important sections at this position
important_selectors = [
('[class*="pric"]', 'Pricing'),
('[class*="tier"]', 'Tiers'),
('[class*="plan"]', 'Plans'),
('[class*="feature"]', 'Features'),
('table', 'Table'),
('form', 'Form'),
('[class*="testimonial"]', 'Testimonials'),
('[class*="faq"]', 'FAQ')
]
for selector, label in important_selectors:
try:
elements = await self.page.query_selector_all(selector)
if elements:
discovered_sections.append({
'selector': selector,
'label': label,
'count': len(elements),
'position': position
})
console.print(f"[dim]Found: {label} ({len(elements)} elements)[/dim]")
except:
pass
# Scroll back to top
await self.page.evaluate("window.scrollTo({top: 0, behavior: 'smooth'})")
await asyncio.sleep(1)
except Exception as e:
console.print(f"[yellow]⚠️ Discovery error: {e}[/yellow]")
return discovered_sections
async def smart_navigation(self, target: str) -> bool:
"""Try to navigate to specific page sections (pricing, features, etc)."""
console.print(f"[cyan]🎯 Looking for {target} page...[/cyan]")
nav_patterns = {
"pricing": ["pricing", "price", "plans", "cost", "subscription"],
"features": ["features", "capabilities", "benefits", "solutions"],
"docs": ["docs", "documentation", "api", "developers"],
"about": ["about", "company", "team", "story"]
}
keywords = nav_patterns.get(target.lower(), [target.lower()])
for keyword in keywords:
try:
# Try to find and click a link
selectors = [
f'a[href*="{keyword}"]',
f'a:has-text("{keyword}")',
f'nav a:has-text("{keyword}")',
]
for selector in selectors:
try:
element = await self.page.query_selector(selector)
if element:
await element.click()
await self.page.wait_for_load_state("domcontentloaded")
console.print(f"[green]✅ Found and clicked {target} link[/green]")
return True
except:
continue
except:
continue
console.print(f"[yellow]⚠️ Could not find {target} link[/yellow]")
return False
async def extract_pricing_info(self) -> Dict:
"""Extract pricing information with visual feedback."""
try:
console.print("[cyan]💰 Extracting pricing information...[/cyan]")
# First do intelligent scroll to find pricing sections
discovered = await self.intelligent_scroll_and_discover()
# Find pricing elements
pricing_selectors = [
'[class*="price"], [class*="Price"]',
'[class*="pricing"], [class*="Pricing"]',
'[class*="tier"], [class*="Tier"]',
'[class*="plan"], [class*="Plan"]',
]
found_elements = []
for selector in pricing_selectors:
try:
elements = await self.page.query_selector_all(selector)
for element in elements[:5]:
text = await element.text_content()
if text and len(text.strip()) > 0:
found_elements.append(text.strip())
except:
pass
# Get text content - LIMIT TO PREVENT TOKEN OVERFLOW
text_content = await self.page.evaluate("() => document.body.innerText")
# Truncate to avoid token limits
max_chars = 10000
if len(text_content) > max_chars:
text_content = text_content[:max_chars]
console.print(f"[yellow]⚠️ Truncated content to {max_chars} chars[/yellow]")
# Option 1: Use boto3 directly for LLM calls
import boto3
import json
bedrock_client = boto3.client(
"bedrock-runtime",
region_name=self.llm._client.meta.region_name if hasattr(self.llm, '_client') else "us-west-2"
)
extraction_prompt = f"""
Analyze this webpage and extract pricing information.
URL: {self.page.url}
Found elements: {json.dumps(found_elements[:20])}
Text (truncated):
{text_content}
Extract: prices, tiers, features per tier, billing cycles.
Return as concise JSON.
"""
# Format request for Bedrock
native_request = {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 2048,
"temperature": 0.3,
"messages": [
{
"role": "user",
"content": [{"type": "text", "text": extraction_prompt}]
}
]
}
try:
response = bedrock_client.invoke_model(
modelId=self.config.llm_model_id,
body=json.dumps(native_request)
)
model_response = json.loads(response["body"].read())
response_text = model_response["content"][0]["text"]
return {
"status": "success",
"data": response_text,
"visual_elements": found_elements[:20],
"discovered_sections": discovered,
"url": self.page.url,
"extracted_at": datetime.now().isoformat()
}
except Exception as llm_error:
console.print(f"[yellow]⚠️ LLM error, using fallback extraction: {llm_error}[/yellow]")
# Fallback: Return structured data without LLM
return {
"status": "partial",
"data": {
"visual_elements": found_elements[:20],
"page_text_sample": text_content[:500],
"discovered_sections": discovered
},
"url": self.page.url,
"extracted_at": datetime.now().isoformat()
}
except Exception as e:
console.print(f"[red]❌ Extraction error: {e}[/red]")
return {"status": "error", "error": str(e)}
async def extract_product_features(self) -> Dict:
"""Extract product features from the page."""
try:
console.print("[cyan]🔍 Extracting product features...[/cyan]")
# Get text content - LIMITED
text_content = await self.page.evaluate("() => document.body.innerText")
max_chars = 8000
if len(text_content) > max_chars:
text_content = text_content[:max_chars]
# Use boto3 directly
import boto3
import json
bedrock_client = boto3.client(
"bedrock-runtime",
region_name=self.llm._client.meta.region_name if hasattr(self.llm, '_client') else "us-west-2"
)
extraction_prompt = f"""
Extract key product features from this page.
URL: {self.page.url}
Content:
{text_content}
List top 10 features as JSON. Be concise.
"""
native_request = {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 2048,
"temperature": 0.3,
"messages": [
{
"role": "user",
"content": [{"type": "text", "text": extraction_prompt}]
}
]
}
try:
response = bedrock_client.invoke_model(
modelId=self.config.llm_model_id,
body=json.dumps(native_request)
)
model_response = json.loads(response["body"].read())
response_text = model_response["content"][0]["text"]
return {
"status": "success",
"data": response_text,
"url": self.page.url,
"extracted_at": datetime.now().isoformat()
}
except Exception as llm_error:
console.print(f"[yellow]⚠️ LLM error, using fallback: {llm_error}[/yellow]")
# Fallback without LLM
return {
"status": "partial",
"data": {"page_text_sample": text_content[:500]},
"url": self.page.url,
"error": str(llm_error)
}
except Exception as e:
console.print(f"[red]❌ Feature extraction error: {e}[/red]")
return {"status": "error", "error": str(e)}
async def take_annotated_screenshot(self, description: str = "") -> Dict:
"""Take screenshot with annotation overlay."""
try:
console.print(f"[cyan]📸 Taking screenshot: {description}[/cyan]")
# Add annotation to the page (safe way without innerHTML)
if description:
await self.page.evaluate(f"""
() => {{
const annotation = document.createElement('div');
annotation.id = 'screenshot-annotation';
annotation.style.cssText = 'position: fixed; top: 10px; left: 10px; background: rgba(0,0,0,0.8); color: white; padding: 10px; border-radius: 8px; z-index: 99999; font-family: monospace;';
annotation.textContent = '{description} - {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}';
document.body.appendChild(annotation);
}}
""")
# Take screenshot
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
screenshot_path = f"screenshot_{timestamp}.png"
await self.page.screenshot(path=screenshot_path, full_page=False)
# Remove annotation
if description:
await self.page.evaluate("""
() => {
const annotation = document.getElementById('screenshot-annotation');
if (annotation) annotation.remove();
}
""")
screenshot_info = {
"description": description,
"timestamp": datetime.now().isoformat(),
"url": self.page.url,
"path": screenshot_path
}
self._screenshots_taken.append(screenshot_info)
# Clean up local file
import os
if os.path.exists(screenshot_path):
os.remove(screenshot_path)
return {
"status": "success",
"screenshot": screenshot_info,
"total_screenshots": len(self._screenshots_taken)
}
except Exception as e:
console.print(f"[yellow]⚠️ Screenshot error: {e}[/yellow]")
return {"status": "error", "error": str(e)}
async def capture_performance_metrics(self) -> Dict:
"""Capture performance metrics using CDP."""
if not self.cdp_session:
return {}
try:
metrics = await self.cdp_session.send("Performance.getMetrics")
return {m['name']: m['value'] for m in metrics.get('metrics', [])}
except:
return {}
def take_control(self):
"""Take manual control of the browser."""
if self.browser_client:
console.print("[yellow]🎮 Taking manual control...[/yellow]")
self.browser_client.take_control()
console.print("✅ Manual control enabled")
def release_control(self):
"""Release manual control."""
if self.browser_client:
console.print("[yellow]🤖 Releasing control...[/yellow]")
self.browser_client.release_control()
console.print("✅ Automation restored")
async def cleanup(self):
"""Clean up browser resources."""
if self.cdp_session:
try:
await self.cdp_session.detach()
except:
pass
if self.browser:
console.print("[yellow]🎭 Closing browser...[/yellow]")
await self.browser.close()
if self.playwright:
console.print("[yellow]🎭 Stopping Playwright...[/yellow]")
await self.playwright.stop()
if self.browser_client:
console.print("[yellow]🛑 Stopping session...[/yellow]")
self.browser_client.stop()
console.print("✅ Cleanup complete")