#!/usr/bin/env python3 """Run the Competitive Intelligence Agent with Strands Framework.""" import asyncio import os import sys from pathlib import Path from typing import Dict, List, TypedDict, Annotated, Optional, Any # Add parent directory to path parent_dir = str(Path(__file__).parent.parent) sys.path.append(parent_dir) from utils.imports import setup_interactive_tools_import paths = setup_interactive_tools_import() from rich.console import Console from rich.prompt import Prompt, Confirm from rich.table import Table from rich.panel import Panel from shared.utils.s3_datasource import UnifiedS3DataSource from config import AgentConfig from agent import CompetitiveIntelligenceAgent from interactive_tools.live_view_sessionreplay.session_replay_viewer import SessionReplayViewer #from ..shared.utils.s3_datasource import UnifiedS3DataSource #from ..shared.utils.imports import setup_interactive_tools_import console = Console() def get_bedrock_agentcore_single() -> List[Dict]: """Analyze AWS Bedrock AgentCore pricing.""" return [ { "name": "AWS Bedrock AgentCore", "url": "https://aws.amazon.com/bedrock/agentcore/pricing/" } ] def get_bedrock_vs_vertex() -> List[Dict]: """Compare AWS Bedrock AgentCore with Google Vertex AI.""" return [ { "name": "AWS Bedrock AgentCore", "url": "https://aws.amazon.com/bedrock/agentcore/pricing/" }, { "name": "Google Vertex AI", "url": "https://cloud.google.com/vertex-ai/pricing" } ] def get_custom_competitors() -> List[Dict]: """Get custom competitors from user input with explicit analysis options.""" competitors = [] console.print("\n[bold]Enter competitors to analyze:[/bold]") console.print("[dim]Press Enter with empty name to finish[/dim]\n") while True: name = Prompt.ask("Competitor name", default="") if not name: break url = Prompt.ask(f"URL for {name}") # Let user specify what to analyze console.print("\n[cyan]What would you like to analyze?[/cyan]") console.print("1. Pricing information") console.print("2. Product features") console.print("3. API documentation") console.print("4. Company/About information") console.print("5. All of the above") analysis_choice = Prompt.ask( "Select options (comma-separated, e.g., 1,2,3)", default="1,2" ) analyze = [] if "1" in analysis_choice: analyze.extend(["pricing", "tiers"]) if "2" in analysis_choice: analyze.extend(["features", "capabilities"]) if "3" in analysis_choice: analyze.extend(["api", "docs", "developer"]) if "4" in analysis_choice: analyze.extend(["about", "company", "team"]) if "5" in analysis_choice: analyze = ["pricing", "tiers", "features", "capabilities", "api", "docs", "about", "company"] # Ask for specific URLs (optional) additional_urls = {} if Confirm.ask("Do you have specific URLs for pricing/docs pages?", default=False): if "pricing" in analyze: pricing_url = Prompt.ask("Pricing page URL (optional)", default="") if pricing_url: additional_urls["pricing_url"] = pricing_url if "api" in analyze or "docs" in analyze: docs_url = Prompt.ask("API/Docs URL (optional)", default="") if docs_url: additional_urls["docs_url"] = docs_url competitors.append({ "name": name, "url": url, "analyze": analyze, "additional_urls": additional_urls, "auto_discover": True }) console.print(f"[green]āœ“ Added {name} - will analyze: {', '.join(analyze)}[/green]\n") return competitors def show_competitors_table(competitors: List[Dict]): """Display competitors in a table.""" table = Table(title="Competitors to Analyze", title_style="bold cyan") table.add_column("#", style="cyan", width=4) table.add_column("Name", style="magenta") table.add_column("URL", style="blue") for i, comp in enumerate(competitors, 1): table.add_row( str(i), comp['name'], comp['url'][:50] + "..." if len(comp['url']) > 50 else comp['url'] ) console.print(table) async def view_replay(recording_config: Any, config: AgentConfig): """ Start the session replay viewer using the recording configuration. Args: recording_config: Either a dict with S3Location or a string path config: Agent configuration """ try: console.print("\n[cyan]šŸŽ­ Starting session replay viewer...[/cyan]") # Handle both structured config and legacy string format if isinstance(recording_config, dict): # New structured format from API if 's3Location' in recording_config: s3_location = recording_config['s3Location'] bucket = s3_location.get('bucket') prefix = s3_location.get('prefix', '').rstrip('/') else: # Direct dict with bucket and prefix bucket = recording_config.get('bucket') prefix = recording_config.get('prefix', '').rstrip('/') # Extract session ID from prefix prefix_parts = prefix.split('/') session_id = prefix_parts[-1] if prefix_parts else 'unknown' elif isinstance(recording_config, str): # Legacy string format (s3://bucket/prefix/session_id/) console.print("[yellow]āš ļø Using legacy S3 path format[/yellow]") parts = recording_config.replace("s3://", "").rstrip("/").split("/") bucket = parts[0] prefix = "/".join(parts[1:-1]) if len(parts) > 2 else "" session_id = parts[-1] if len(parts) > 1 else "unknown" else: raise ValueError(f"Invalid recording configuration format: {type(recording_config)}") console.print(f"[dim]Bucket: {bucket}[/dim]") console.print(f"[dim]Prefix: {prefix}[/dim]") console.print(f"[dim]Session: {session_id}[/dim]") # Wait for recordings to be uploaded console.print("ā³ Waiting for recordings to be uploaded to S3 (30 seconds)...") await asyncio.sleep(30) # Use the unified S3 data source data_source = UnifiedS3DataSource( bucket=bucket, prefix=prefix, session_id=session_id ) # Start replay viewer console.print(f"šŸŽ¬ Starting session replay viewer for: {session_id}") viewer = SessionReplayViewer( data_source=data_source, port=config.replay_viewer_port ) viewer.start() except Exception as e: console.print(f"[red]āŒ Error starting replay viewer: {e}[/red]") import traceback traceback.print_exc() async def main(): """Main function to run the agent.""" console.print(Panel( "[bold cyan]šŸŽÆ Competitive Intelligence Agent[/bold cyan]\n\n" "[bold]Powered by Strands Framework & Amazon Bedrock[/bold]\n\n" "Migration from LangGraph → Strands āœ…\n\n" "All Features Preserved:\n" "• šŸ” Automated browser navigation with CDP\n" "• šŸ“Š Intelligent content extraction with LLM\n" "• šŸ“ø Screenshot capture with annotations\n" "• šŸ“¹ Full session recording to S3\n" "• šŸŽ­ Session replay capability\n" "• šŸ¤– Claude 3.5 Sonnet for analysis\n" "• ⚔ Parallel processing support\n" "• šŸ’¾ Session persistence & resume\n" "• ā˜ļø AWS CLI integration\n" "• šŸ“ Advanced form analysis\n" "• 🌐 Multi-page workflows", title="Welcome - Strands Edition", border_style="blue" )) # Load configuration config = AgentConfig() # Validate configuration if not config.validate(): console.print("[red]āŒ Configuration validation failed[/red]") console.print("Please set the required environment variables") return # Show configuration console.print("\n[bold]Configuration:[/bold]") console.print(f" Region: {config.region}") console.print(f" Model: {config.llm_model_id}") console.print(f" S3 Bucket: {config.s3_bucket}") console.print(f" Role ARN: {config.recording_role_arn}") console.print() # Check for resume option resume_session = None if Confirm.ask("Do you want to resume a previous session?", default=False): resume_session = Prompt.ask("Enter session ID to resume") # Get competitors console.print("\n[bold]Select analysis option:[/bold]") console.print("1. šŸŽÆ AWS Bedrock AgentCore Pricing Only") console.print("2. šŸ†š Compare Bedrock AgentCore vs Vertex AI") console.print("3. āœļø Custom competitors") choice = Prompt.ask("Select option", choices=["1", "2", "3"], default="1") if choice == "1": competitors = get_bedrock_agentcore_single() elif choice == "2": competitors = get_bedrock_vs_vertex() else: competitors = get_custom_competitors() if not competitors: console.print("[yellow]No competitors entered. Exiting.[/yellow]") return # Show competitors show_competitors_table(competitors) # Ask for processing mode parallel_mode = False if len(competitors) > 1: parallel_mode = Confirm.ask( f"\n⚔ Use parallel processing for {len(competitors)} competitors?", default=False ) if not Confirm.ask("\nProceed with analysis?", default=True): console.print("[yellow]Analysis cancelled.[/yellow]") return # Create and run agent agent = CompetitiveIntelligenceAgent(config) try: # Initialize with optional session resume await agent.initialize(resume_session_id=resume_session) # Show what to watch for watch_panel = Panel( "[bold yellow]šŸ‘ļø Watch the Live Browser Viewer![/bold yellow]\n\n" "[bold]The browser will automatically:[/bold]\n" "• Navigate to each competitor's pricing page\n" "• Scroll through pages to discover content\n" "• Extract pricing information and features\n" "• Take annotated screenshots\n" "• Generate a comprehensive report\n\n" f"[bold]Mode:[/bold] {'⚔ Parallel' if parallel_mode else 'šŸ”„ Sequential'}\n\n" "[dim]Framework: Strands (migrated from LangGraph)[/dim]", border_style="yellow" ) console.print(watch_panel) console.print("\n[cyan]Starting automated analysis in 5 seconds...[/cyan]") await asyncio.sleep(5) # Run analysis results = await agent.run(competitors, parallel=parallel_mode) if results["success"]: # Show results summary results_panel = Panel( f"[bold green]āœ… Analysis Complete![/bold green]\n\n" f"[bold]Key Findings:[/bold]\n" f"šŸ“Š Competitors analyzed: {len(competitors)}\n" f"🌐 API endpoints discovered: {len(results.get('apis_discovered', []))}\n" f"šŸ“„ Report generated: Yes\n" f"šŸ“¹ Session recorded: Yes\n" f"šŸ’¾ Session ID: {results.get('session_id', 'N/A')}\n" f"⚔ Processing mode: {'Parallel' if parallel_mode else 'Sequential'}\n\n" f"[dim]Framework: Strands[/dim]", border_style="green" ) console.print(results_panel) # Show report preview if results.get("report"): console.print("\n[bold]Report Preview:[/bold]") console.print("-" * 60) preview = results['report'][:1500] console.print(preview + "..." if len(results['report']) > 1500 else preview) console.print("-" * 60) # Ask about replay #if results.get("recording_path"): # if Confirm.ask("\nView session replay?", default=True): # await view_replay(results["recording_path"], config) # if results.get("recording_config") or results.get("recording_path"): replay_prompt = Panel( "[bold cyan]šŸŽ¬ Session Recording Available![/bold cyan]\n\n" "Your entire analysis session has been recorded.\n" "You can replay it to:\n" "• Review the extraction process\n" "• Share findings with stakeholders\n" "• Debug any issues\n" "• Create training materials", border_style="cyan" ) console.print(replay_prompt) if Confirm.ask("\nView session replay?", default=True): # Use recording_config if available, fallback to recording_path recording_data = results.get("recording_config") or results.get("recording_path") await view_replay(recording_data, config) else: console.print(f"\n[red]Analysis failed: {results.get('error', 'Unknown error')}[/red]") except KeyboardInterrupt: console.print("\n[yellow]Analysis interrupted by user[/yellow]") except Exception as e: console.print(f"\n[red]Unexpected error: {e}[/red]") import traceback traceback.print_exc() finally: await agent.cleanup() console.print("\n[green]āœ… Agent shutdown complete[/green]") if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: console.print("\n[yellow]Interrupted by user[/yellow]") except Exception as e: console.print(f"\n[red]Unexpected error: {e}[/red]") import traceback traceback.print_exc()