mirror of
https://github.com/awslabs/amazon-bedrock-agentcore-samples.git
synced 2025-09-08 20:50:46 +00:00
346 lines
13 KiB
Python
346 lines
13 KiB
Python
|
"""
|
||
|
Unified S3 DataSource combining all fixes from your working code.
|
||
|
Save this as: competitive-intelligence-agent/utils/s3_datasource.py
|
||
|
"""
|
||
|
|
||
|
import os
|
||
|
import sys
|
||
|
import json
|
||
|
import time
|
||
|
import tempfile
|
||
|
import shutil
|
||
|
import gzip
|
||
|
import io
|
||
|
from pathlib import Path
|
||
|
from datetime import datetime
|
||
|
from typing import Dict, List, Optional, Tuple
|
||
|
|
||
|
import boto3
|
||
|
from rich.console import Console
|
||
|
|
||
|
console = Console()
|
||
|
|
||
|
|
||
|
class UnifiedS3DataSource:
|
||
|
"""
|
||
|
Unified S3 data source that combines all fixes:
|
||
|
- Correct S3 path construction without doubling session ID
|
||
|
- Proper timestamp parsing from metadata
|
||
|
- Fallback event creation when recordings are incomplete
|
||
|
- Support for both direct session access and discovery
|
||
|
"""
|
||
|
|
||
|
def __init__(self, bucket: str, prefix: str, session_id: Optional[str] = None):
|
||
|
"""
|
||
|
Initialize S3 data source.
|
||
|
|
||
|
Args:
|
||
|
bucket: S3 bucket name
|
||
|
prefix: S3 prefix (without session ID)
|
||
|
session_id: Optional session ID. If not provided, will try to discover
|
||
|
"""
|
||
|
self.s3_client = boto3.client('s3')
|
||
|
self.bucket = bucket
|
||
|
self.prefix = prefix.rstrip('/')
|
||
|
self.session_id = session_id
|
||
|
self.temp_dir = Path(tempfile.mkdtemp(prefix='bedrock_agentcore_replay_'))
|
||
|
|
||
|
# Fix: Build the full prefix correctly
|
||
|
if session_id:
|
||
|
# Only append session_id if prefix doesn't already contain it
|
||
|
if prefix and not prefix.endswith(session_id):
|
||
|
self.full_prefix = f"{prefix}/{session_id}"
|
||
|
elif prefix:
|
||
|
# Prefix already contains session_id, don't duplicate
|
||
|
self.full_prefix = prefix
|
||
|
else:
|
||
|
# No prefix, just use session_id
|
||
|
self.full_prefix = session_id
|
||
|
else:
|
||
|
# Try to discover session from prefix
|
||
|
self.session_id = self._discover_session()
|
||
|
if self.session_id:
|
||
|
self.full_prefix = f"{prefix}/{self.session_id}" if prefix else self.session_id
|
||
|
else:
|
||
|
self.full_prefix = prefix
|
||
|
|
||
|
console.print(f"[cyan]S3 DataSource initialized[/cyan]")
|
||
|
console.print(f" Bucket: {bucket}")
|
||
|
console.print(f" Prefix: {prefix}")
|
||
|
console.print(f" Session: {self.session_id}")
|
||
|
console.print(f" Full path: s3://{bucket}/{self.full_prefix}/")
|
||
|
|
||
|
def cleanup(self):
|
||
|
"""Clean up temp files"""
|
||
|
if self.temp_dir.exists():
|
||
|
try:
|
||
|
shutil.rmtree(self.temp_dir)
|
||
|
except:
|
||
|
pass
|
||
|
|
||
|
def _discover_session(self) -> Optional[str]:
|
||
|
"""Discover the latest session ID from S3 prefix"""
|
||
|
try:
|
||
|
response = self.s3_client.list_objects_v2(
|
||
|
Bucket=self.bucket,
|
||
|
Prefix=self.prefix,
|
||
|
Delimiter='/'
|
||
|
)
|
||
|
|
||
|
if 'CommonPrefixes' in response:
|
||
|
# Get all session directories
|
||
|
sessions = []
|
||
|
for prefix_info in response['CommonPrefixes']:
|
||
|
session_path = prefix_info['Prefix'].rstrip('/')
|
||
|
session_id = session_path.split('/')[-1]
|
||
|
sessions.append(session_id)
|
||
|
|
||
|
if sessions:
|
||
|
# Return the latest (last) session
|
||
|
latest = sorted(sessions)[-1]
|
||
|
console.print(f"[green]Discovered session: {latest}[/green]")
|
||
|
return latest
|
||
|
|
||
|
# Alternative: Look for metadata.json files
|
||
|
response = self.s3_client.list_objects_v2(
|
||
|
Bucket=self.bucket,
|
||
|
Prefix=self.prefix
|
||
|
)
|
||
|
|
||
|
if 'Contents' in response:
|
||
|
for obj in response['Contents']:
|
||
|
if 'metadata.json' in obj['Key']:
|
||
|
# Extract session ID from path
|
||
|
parts = obj['Key'].split('/')
|
||
|
for i, part in enumerate(parts):
|
||
|
if i > 0 and parts[i-1] == self.prefix.split('/')[-1]:
|
||
|
console.print(f"[green]Found session from metadata: {part}[/green]")
|
||
|
return part
|
||
|
|
||
|
except Exception as e:
|
||
|
console.print(f"[yellow]Could not discover session: {e}[/yellow]")
|
||
|
|
||
|
return None
|
||
|
|
||
|
def list_recordings(self) -> List[Dict]:
|
||
|
"""List recordings with all timestamp parsing fixes"""
|
||
|
recordings = []
|
||
|
|
||
|
if not self.session_id:
|
||
|
console.print("[yellow]No session ID available[/yellow]")
|
||
|
return recordings
|
||
|
|
||
|
try:
|
||
|
# Fetch metadata
|
||
|
metadata = self._get_metadata()
|
||
|
|
||
|
# Parse timestamp with all the fixes from your working code
|
||
|
timestamp = self._parse_timestamp(metadata)
|
||
|
|
||
|
# Get duration
|
||
|
duration = metadata.get('duration', 0) or metadata.get('durationMs', 0) or 0
|
||
|
|
||
|
# Get event count
|
||
|
event_count = metadata.get('eventCount', 0) or metadata.get('totalEvents', 0) or 0
|
||
|
|
||
|
# Create recording entry
|
||
|
recordings.append({
|
||
|
'id': self.session_id,
|
||
|
'sessionId': self.session_id,
|
||
|
'timestamp': timestamp,
|
||
|
'date': datetime.fromtimestamp(timestamp / 1000).strftime('%Y-%m-%d %H:%M:%S'),
|
||
|
'events': event_count,
|
||
|
'duration': duration
|
||
|
})
|
||
|
|
||
|
except Exception as e:
|
||
|
console.print(f"[yellow]Error listing recordings: {e}[/yellow]")
|
||
|
# Return fallback recording
|
||
|
recordings.append(self._create_fallback_recording())
|
||
|
|
||
|
return recordings
|
||
|
|
||
|
def download_recording(self, recording_id: str) -> Optional[Dict]:
|
||
|
"""Download and process recording from S3"""
|
||
|
console.print(f"[cyan]Downloading recording: {recording_id}[/cyan]")
|
||
|
|
||
|
recording_dir = self.temp_dir / recording_id
|
||
|
recording_dir.mkdir(exist_ok=True)
|
||
|
|
||
|
try:
|
||
|
# Get metadata
|
||
|
metadata = self._get_metadata()
|
||
|
|
||
|
# List all objects in session directory
|
||
|
response = self.s3_client.list_objects_v2(
|
||
|
Bucket=self.bucket,
|
||
|
Prefix=self.full_prefix
|
||
|
)
|
||
|
|
||
|
if 'Contents' not in response:
|
||
|
console.print(f"[yellow]No files found in session[/yellow]")
|
||
|
return self._create_fallback_recording_data()
|
||
|
|
||
|
# Find batch files
|
||
|
batch_files = [
|
||
|
obj['Key'] for obj in response['Contents']
|
||
|
if obj['Key'].endswith('.gz') or 'batch-' in obj['Key']
|
||
|
]
|
||
|
|
||
|
console.print(f"Found {len(batch_files)} batch files")
|
||
|
|
||
|
# Process batch files
|
||
|
all_events = []
|
||
|
for key in batch_files:
|
||
|
try:
|
||
|
console.print(f"[dim]Processing: {key.split('/')[-1]}[/dim]")
|
||
|
response = self.s3_client.get_object(Bucket=self.bucket, Key=key)
|
||
|
|
||
|
# Read and decompress
|
||
|
with gzip.GzipFile(fileobj=io.BytesIO(response['Body'].read())) as gz:
|
||
|
content = gz.read().decode('utf-8')
|
||
|
|
||
|
# Parse JSON lines
|
||
|
for line in content.splitlines():
|
||
|
if line.strip():
|
||
|
try:
|
||
|
event = json.loads(line)
|
||
|
# Validate event structure
|
||
|
if 'type' in event and 'timestamp' in event:
|
||
|
all_events.append(event)
|
||
|
except json.JSONDecodeError:
|
||
|
continue
|
||
|
|
||
|
except Exception as e:
|
||
|
console.print(f"[yellow]Error processing {key}: {e}[/yellow]")
|
||
|
|
||
|
console.print(f"[green]✅ Loaded {len(all_events)} events[/green]")
|
||
|
|
||
|
# If no events or too few, create fallback
|
||
|
if len(all_events) < 2:
|
||
|
console.print("[yellow]Insufficient events, using fallback[/yellow]")
|
||
|
return self._create_fallback_recording_data()
|
||
|
|
||
|
return {
|
||
|
'metadata': metadata,
|
||
|
'events': all_events
|
||
|
}
|
||
|
|
||
|
except Exception as e:
|
||
|
console.print(f"[red]Error downloading recording: {e}[/red]")
|
||
|
return self._create_fallback_recording_data()
|
||
|
|
||
|
def _get_metadata(self) -> Dict:
|
||
|
"""Get metadata from S3 with error handling"""
|
||
|
try:
|
||
|
metadata_key = f"{self.full_prefix}/metadata.json"
|
||
|
response = self.s3_client.get_object(Bucket=self.bucket, Key=metadata_key)
|
||
|
metadata = json.loads(response['Body'].read().decode('utf-8'))
|
||
|
console.print(f"[dim]✅ Retrieved metadata[/dim]")
|
||
|
return metadata
|
||
|
except Exception as e:
|
||
|
console.print(f"[yellow]No metadata found: {e}[/yellow]")
|
||
|
return {}
|
||
|
|
||
|
def _parse_timestamp(self, metadata: Dict) -> int:
|
||
|
"""Parse timestamp from metadata with all edge cases handled"""
|
||
|
# Default to current time
|
||
|
timestamp = int(time.time() * 1000)
|
||
|
|
||
|
if 'startTime' in metadata:
|
||
|
start_time = metadata['startTime']
|
||
|
|
||
|
try:
|
||
|
if isinstance(start_time, str):
|
||
|
# Handle ISO format
|
||
|
dt = datetime.fromisoformat(start_time.replace('Z', '+00:00'))
|
||
|
timestamp = int(dt.timestamp() * 1000)
|
||
|
elif isinstance(start_time, (int, float)):
|
||
|
timestamp = int(start_time)
|
||
|
# Check if it's in seconds instead of milliseconds
|
||
|
if timestamp < 1000000000000: # Before year 2001 in ms
|
||
|
timestamp = timestamp * 1000
|
||
|
except Exception as e:
|
||
|
console.print(f"[yellow]Could not parse timestamp: {e}[/yellow]")
|
||
|
|
||
|
return timestamp
|
||
|
|
||
|
def _create_fallback_recording(self) -> Dict:
|
||
|
"""Create fallback recording entry"""
|
||
|
return {
|
||
|
'id': self.session_id or 'unknown',
|
||
|
'sessionId': self.session_id or 'unknown',
|
||
|
'timestamp': int(time.time() * 1000),
|
||
|
'date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||
|
'events': 0,
|
||
|
'duration': 0
|
||
|
}
|
||
|
|
||
|
def _create_fallback_recording_data(self) -> Dict:
|
||
|
"""
|
||
|
Create fallback recording data with minimal events when actual recording is unavailable.
|
||
|
|
||
|
Purpose:
|
||
|
1. **Graceful Degradation**: When S3 recordings are incomplete or corrupted,
|
||
|
this ensures the replay viewer doesn't crash completely.
|
||
|
|
||
|
2. **Development/Testing**: During development, recordings might not be
|
||
|
available yet. This allows testing the replay viewer interface.
|
||
|
|
||
|
3. **Partial Failures**: If recording upload partially fails (network issues,
|
||
|
S3 permissions), users can still access the replay viewer.
|
||
|
|
||
|
4. **User Experience**: Instead of showing an error, we show a minimal
|
||
|
interface with a message explaining the recording is unavailable.
|
||
|
|
||
|
5. **Debugging**: Helps identify when recordings fail - if users see the
|
||
|
fallback message, they know to check S3 uploads and permissions.
|
||
|
|
||
|
Returns:
|
||
|
Dict containing minimal valid rrweb events that create a page showing
|
||
|
an informative message about the recording being unavailable.
|
||
|
"""
|
||
|
timestamp = int(time.time() * 1000)
|
||
|
|
||
|
# Create minimal valid events for rrweb player
|
||
|
events = [
|
||
|
{
|
||
|
"type": 2, # Meta event
|
||
|
"timestamp": timestamp,
|
||
|
"data": {
|
||
|
"href": "https://example.com",
|
||
|
"width": 1280,
|
||
|
"height": 720
|
||
|
}
|
||
|
},
|
||
|
{
|
||
|
"type": 4, # Full snapshot
|
||
|
"timestamp": timestamp + 100,
|
||
|
"data": {
|
||
|
"node": {
|
||
|
"type": 1,
|
||
|
"childNodes": [{
|
||
|
"type": 2,
|
||
|
"tagName": "html",
|
||
|
"attributes": {},
|
||
|
"childNodes": [{
|
||
|
"type": 2,
|
||
|
"tagName": "body",
|
||
|
"attributes": {
|
||
|
"style": "font-family: sans-serif; padding: 40px; text-align: center;"
|
||
|
},
|
||
|
"childNodes": [{
|
||
|
"type": 3,
|
||
|
"textContent": "Recording data not available - this may be due to upload delays or permissions issues"
|
||
|
}]
|
||
|
}]
|
||
|
}]
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
]
|
||
|
|
||
|
return {
|
||
|
'metadata': {'fallback': True},
|
||
|
'events': events
|
||
|
}
|