Sundar Raghavan 1c7e1861c1
Add Competitive Intelligence Agent with Bedrock AgentCore SDK (#221)
* Add Enterprise Web Intelligence Agent with Bedrock AgentCore Tools SDK
Automated browser navigation with CDP enhancements
Live viewing with DCV integration
Session recording and replay capabilities
LLM-powered content extraction
Code Interpreter for dynamic analysis
Comprehensive documentation and examples

* Moved project from old-folder to new-folder and updated files

* Refactor: Rename to enterprise-web-intelligence-agent and add Strands implementation alongside LangGraph
2025-08-27 15:17:28 -07:00

346 lines
13 KiB
Python

"""
Unified S3 DataSource combining all fixes from your working code.
Save this as: competitive-intelligence-agent/utils/s3_datasource.py
"""
import os
import sys
import json
import time
import tempfile
import shutil
import gzip
import io
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional, Tuple
import boto3
from rich.console import Console
console = Console()
class UnifiedS3DataSource:
"""
Unified S3 data source that combines all fixes:
- Correct S3 path construction without doubling session ID
- Proper timestamp parsing from metadata
- Fallback event creation when recordings are incomplete
- Support for both direct session access and discovery
"""
def __init__(self, bucket: str, prefix: str, session_id: Optional[str] = None):
"""
Initialize S3 data source.
Args:
bucket: S3 bucket name
prefix: S3 prefix (without session ID)
session_id: Optional session ID. If not provided, will try to discover
"""
self.s3_client = boto3.client('s3')
self.bucket = bucket
self.prefix = prefix.rstrip('/')
self.session_id = session_id
self.temp_dir = Path(tempfile.mkdtemp(prefix='bedrock_agentcore_replay_'))
# Fix: Build the full prefix correctly
if session_id:
# Only append session_id if prefix doesn't already contain it
if prefix and not prefix.endswith(session_id):
self.full_prefix = f"{prefix}/{session_id}"
elif prefix:
# Prefix already contains session_id, don't duplicate
self.full_prefix = prefix
else:
# No prefix, just use session_id
self.full_prefix = session_id
else:
# Try to discover session from prefix
self.session_id = self._discover_session()
if self.session_id:
self.full_prefix = f"{prefix}/{self.session_id}" if prefix else self.session_id
else:
self.full_prefix = prefix
console.print(f"[cyan]S3 DataSource initialized[/cyan]")
console.print(f" Bucket: {bucket}")
console.print(f" Prefix: {prefix}")
console.print(f" Session: {self.session_id}")
console.print(f" Full path: s3://{bucket}/{self.full_prefix}/")
def cleanup(self):
"""Clean up temp files"""
if self.temp_dir.exists():
try:
shutil.rmtree(self.temp_dir)
except:
pass
def _discover_session(self) -> Optional[str]:
"""Discover the latest session ID from S3 prefix"""
try:
response = self.s3_client.list_objects_v2(
Bucket=self.bucket,
Prefix=self.prefix,
Delimiter='/'
)
if 'CommonPrefixes' in response:
# Get all session directories
sessions = []
for prefix_info in response['CommonPrefixes']:
session_path = prefix_info['Prefix'].rstrip('/')
session_id = session_path.split('/')[-1]
sessions.append(session_id)
if sessions:
# Return the latest (last) session
latest = sorted(sessions)[-1]
console.print(f"[green]Discovered session: {latest}[/green]")
return latest
# Alternative: Look for metadata.json files
response = self.s3_client.list_objects_v2(
Bucket=self.bucket,
Prefix=self.prefix
)
if 'Contents' in response:
for obj in response['Contents']:
if 'metadata.json' in obj['Key']:
# Extract session ID from path
parts = obj['Key'].split('/')
for i, part in enumerate(parts):
if i > 0 and parts[i-1] == self.prefix.split('/')[-1]:
console.print(f"[green]Found session from metadata: {part}[/green]")
return part
except Exception as e:
console.print(f"[yellow]Could not discover session: {e}[/yellow]")
return None
def list_recordings(self) -> List[Dict]:
"""List recordings with all timestamp parsing fixes"""
recordings = []
if not self.session_id:
console.print("[yellow]No session ID available[/yellow]")
return recordings
try:
# Fetch metadata
metadata = self._get_metadata()
# Parse timestamp with all the fixes from your working code
timestamp = self._parse_timestamp(metadata)
# Get duration
duration = metadata.get('duration', 0) or metadata.get('durationMs', 0) or 0
# Get event count
event_count = metadata.get('eventCount', 0) or metadata.get('totalEvents', 0) or 0
# Create recording entry
recordings.append({
'id': self.session_id,
'sessionId': self.session_id,
'timestamp': timestamp,
'date': datetime.fromtimestamp(timestamp / 1000).strftime('%Y-%m-%d %H:%M:%S'),
'events': event_count,
'duration': duration
})
except Exception as e:
console.print(f"[yellow]Error listing recordings: {e}[/yellow]")
# Return fallback recording
recordings.append(self._create_fallback_recording())
return recordings
def download_recording(self, recording_id: str) -> Optional[Dict]:
"""Download and process recording from S3"""
console.print(f"[cyan]Downloading recording: {recording_id}[/cyan]")
recording_dir = self.temp_dir / recording_id
recording_dir.mkdir(exist_ok=True)
try:
# Get metadata
metadata = self._get_metadata()
# List all objects in session directory
response = self.s3_client.list_objects_v2(
Bucket=self.bucket,
Prefix=self.full_prefix
)
if 'Contents' not in response:
console.print(f"[yellow]No files found in session[/yellow]")
return self._create_fallback_recording_data()
# Find batch files
batch_files = [
obj['Key'] for obj in response['Contents']
if obj['Key'].endswith('.gz') or 'batch-' in obj['Key']
]
console.print(f"Found {len(batch_files)} batch files")
# Process batch files
all_events = []
for key in batch_files:
try:
console.print(f"[dim]Processing: {key.split('/')[-1]}[/dim]")
response = self.s3_client.get_object(Bucket=self.bucket, Key=key)
# Read and decompress
with gzip.GzipFile(fileobj=io.BytesIO(response['Body'].read())) as gz:
content = gz.read().decode('utf-8')
# Parse JSON lines
for line in content.splitlines():
if line.strip():
try:
event = json.loads(line)
# Validate event structure
if 'type' in event and 'timestamp' in event:
all_events.append(event)
except json.JSONDecodeError:
continue
except Exception as e:
console.print(f"[yellow]Error processing {key}: {e}[/yellow]")
console.print(f"[green]✅ Loaded {len(all_events)} events[/green]")
# If no events or too few, create fallback
if len(all_events) < 2:
console.print("[yellow]Insufficient events, using fallback[/yellow]")
return self._create_fallback_recording_data()
return {
'metadata': metadata,
'events': all_events
}
except Exception as e:
console.print(f"[red]Error downloading recording: {e}[/red]")
return self._create_fallback_recording_data()
def _get_metadata(self) -> Dict:
"""Get metadata from S3 with error handling"""
try:
metadata_key = f"{self.full_prefix}/metadata.json"
response = self.s3_client.get_object(Bucket=self.bucket, Key=metadata_key)
metadata = json.loads(response['Body'].read().decode('utf-8'))
console.print(f"[dim]✅ Retrieved metadata[/dim]")
return metadata
except Exception as e:
console.print(f"[yellow]No metadata found: {e}[/yellow]")
return {}
def _parse_timestamp(self, metadata: Dict) -> int:
"""Parse timestamp from metadata with all edge cases handled"""
# Default to current time
timestamp = int(time.time() * 1000)
if 'startTime' in metadata:
start_time = metadata['startTime']
try:
if isinstance(start_time, str):
# Handle ISO format
dt = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
timestamp = int(dt.timestamp() * 1000)
elif isinstance(start_time, (int, float)):
timestamp = int(start_time)
# Check if it's in seconds instead of milliseconds
if timestamp < 1000000000000: # Before year 2001 in ms
timestamp = timestamp * 1000
except Exception as e:
console.print(f"[yellow]Could not parse timestamp: {e}[/yellow]")
return timestamp
def _create_fallback_recording(self) -> Dict:
"""Create fallback recording entry"""
return {
'id': self.session_id or 'unknown',
'sessionId': self.session_id or 'unknown',
'timestamp': int(time.time() * 1000),
'date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'events': 0,
'duration': 0
}
def _create_fallback_recording_data(self) -> Dict:
"""
Create fallback recording data with minimal events when actual recording is unavailable.
Purpose:
1. **Graceful Degradation**: When S3 recordings are incomplete or corrupted,
this ensures the replay viewer doesn't crash completely.
2. **Development/Testing**: During development, recordings might not be
available yet. This allows testing the replay viewer interface.
3. **Partial Failures**: If recording upload partially fails (network issues,
S3 permissions), users can still access the replay viewer.
4. **User Experience**: Instead of showing an error, we show a minimal
interface with a message explaining the recording is unavailable.
5. **Debugging**: Helps identify when recordings fail - if users see the
fallback message, they know to check S3 uploads and permissions.
Returns:
Dict containing minimal valid rrweb events that create a page showing
an informative message about the recording being unavailable.
"""
timestamp = int(time.time() * 1000)
# Create minimal valid events for rrweb player
events = [
{
"type": 2, # Meta event
"timestamp": timestamp,
"data": {
"href": "https://example.com",
"width": 1280,
"height": 720
}
},
{
"type": 4, # Full snapshot
"timestamp": timestamp + 100,
"data": {
"node": {
"type": 1,
"childNodes": [{
"type": 2,
"tagName": "html",
"attributes": {},
"childNodes": [{
"type": 2,
"tagName": "body",
"attributes": {
"style": "font-family: sans-serif; padding: 40px; text-align: center;"
},
"childNodes": [{
"type": 3,
"textContent": "Recording data not available - this may be due to upload delays or permissions issues"
}]
}]
}]
}
}
}
]
return {
'metadata': {'fallback': True},
'events': events
}