"""Main Strands agent for competitive intelligence gathering.""" import asyncio import json from pathlib import Path from typing import Dict, List, Optional, Any from datetime import datetime import nest_asyncio import sys sys.path.insert(0, str(Path(__file__).parent)) from utils.imports import setup_interactive_tools_import paths = setup_interactive_tools_import() from strands import Agent, tool from strands.models import BedrockModel from strands.session.s3_session_manager import S3SessionManager from rich.console import Console from rich.panel import Panel from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn from interactive_tools.browser_viewer import BrowserViewerServer # Import tools from config import AgentConfig from browser_tools import BrowserTools from analysis_tools import AnalysisTools # Apply nest_asyncio to allow nested event loops nest_asyncio.apply() console = Console() class CompetitiveIntelligenceAgent: """Strands agent for competitive intelligence gathering.""" def __init__(self, config: AgentConfig): self.config = config self.browser_tools = BrowserTools(config) self.analysis_tools = AnalysisTools(config) self.agent = None self.browser_viewer = None self.parallel_browser_sessions = [] # Store the event loop self.loop = None def _safe_state_get(self, key: str, default: Any = None) -> Any: """Safely get state value with default.""" try: value = self.agent.state.get(key) return value if value is not None else default except: return default async def initialize(self, resume_session_id: Optional[str] = None): """Initialize the agent and its tools with optional session resume.""" console.print(Panel( "[bold cyan]๐ŸŽฏ Competitive Intelligence Agent[/bold cyan]\n\n" "[bold]Powered by Amazon Bedrock and Strands Framework[/bold]\n\n" "Features:\n" "โ€ข ๐ŸŒ Automated browser navigation\n" "โ€ข ๐Ÿ“Š Real-time API and network analysis\n" "โ€ข ๐ŸŽฏ Intelligent content extraction\n" "โ€ข ๐Ÿ“ธ Screenshot capture\n" "โ€ข ๐Ÿ“น Full session recording to S3\n" "โ€ข ๐Ÿ”„ Multi-tool orchestration\n" "โ€ข โšก Parallel processing support\n", title="Initializing", border_style="blue" )) # Store the current event loop self.loop = asyncio.get_event_loop() # Initialize browser with recording self.browser_tools.create_browser_with_recording() # Set up session manager for persistence session_manager = None if resume_session_id: console.print(f"[cyan]๐Ÿ”„ Resuming session: {resume_session_id}[/cyan]") session_manager = S3SessionManager( session_id=resume_session_id, bucket=self.config.s3_bucket, prefix=f"{self.config.s3_prefix}sessions/", region_name=self.config.region ) # Initialize Bedrock model bedrock_model = BedrockModel( model_id=self.config.llm_model_id, region_name=self.config.region ) # Initialize browser session with CDP - IMPORTANT: Do this before creating agent await self.browser_tools.initialize_browser_session(bedrock_model) # Initialize code interpreter self.analysis_tools.initialize() # Create the main Strands agent with all tools self.agent = Agent( model=bedrock_model, system_prompt=self._get_system_prompt(), tools=self._create_agent_tools(), session_manager=session_manager, callback_handler=self._create_callback_handler() ) # Initialize state if starting fresh if not resume_session_id: self.agent.state.set("competitors", []) self.agent.state.set("current_competitor_index", 0) self.agent.state.set("competitor_data", {}) self.agent.state.set("analysis_results", {}) self.agent.state.set("total_screenshots", 0) self.agent.state.set("discovered_apis", []) self.agent.state.set("parallel_mode", False) else: console.print("[green]โœ… Previous session data loaded[/green]") # Start browser live viewer if self.browser_tools.browser_client: console.print("\n[cyan]๐Ÿ–ฅ๏ธ Starting live browser viewer...[/cyan]") self.browser_viewer = BrowserViewerServer( self.browser_tools.browser_client, port=self.config.live_view_port ) viewer_url = self.browser_viewer.start(open_browser=True) console.print(f"[green]โœ… Live viewer: {viewer_url}[/green]") console.print("[dim]You can take/release control in the viewer[/dim]") console.print("\n[green]โœ… Agent initialized successfully![/green]") console.print(f"[cyan]๐Ÿ“น Recording to: {self.browser_tools.recording_path}[/cyan]") def _get_system_prompt(self) -> str: """Get the system prompt for the agent.""" return """You are a competitive intelligence analysis agent. When asked to analyze competitors: 1. Use the analyze_website tool for each competitor 2. Use the perform_analysis tool to analyze the collected data 3. Use the generate_report tool to create the final report Always use these tools in sequence to complete the analysis.""" def _create_agent_tools(self) -> List: """Create all agent tools.""" tools = [] # Store reference to self for use in tools agent_instance = self @tool def analyze_website(competitor_name: str, competitor_url: str) -> str: """ Analyze a competitor website to extract pricing, features, and other intelligence. Args: competitor_name: Name of the competitor company competitor_url: URL of the competitor website to analyze """ # Use the existing event loop with run_until_complete if agent_instance.loop and agent_instance.loop.is_running(): # We're already in an async context, create a task future = asyncio.ensure_future( agent_instance._analyze_website_impl(competitor_name, competitor_url), loop=agent_instance.loop ) # Use nest_asyncio to handle the nested loop return agent_instance.loop.run_until_complete(future) else: # No running loop, use asyncio.run return asyncio.run(agent_instance._analyze_website_impl(competitor_name, competitor_url)) @tool def perform_analysis() -> str: """ Analyze all collected competitor data to identify patterns and insights. """ console.print("\n[bold yellow]๐Ÿ“Š Analyzing all competitor data...[/bold yellow]") competitor_data = agent_instance._safe_state_get("competitor_data", {}) if not competitor_data: return "No competitor data to analyze yet" # Analyze each competitor for competitor_name, data in competitor_data.items(): console.print(f"[cyan]Analyzing {competitor_name}...[/cyan]") analysis_result = agent_instance.analysis_tools.analyze_competitor_data( competitor_name, data ) # Store analysis results analysis_results = agent_instance._safe_state_get("analysis_results", {}) analysis_results[competitor_name] = analysis_result agent_instance.agent.state.set("analysis_results", analysis_results) # Create visualizations console.print("[cyan]Creating comparison visualizations...[/cyan]") viz_result = agent_instance.analysis_tools.create_comparison_visualization(competitor_data) analysis_results = agent_instance._safe_state_get("analysis_results", {}) analysis_results["visualizations"] = viz_result agent_instance.agent.state.set("analysis_results", analysis_results) return "Analysis completed successfully" @tool def generate_report() -> str: """ Generate the final competitive intelligence report from analyzed data. """ console.print("\n[bold green]๐Ÿ“„ Generating final report...[/bold green]") competitor_data = agent_instance._safe_state_get("competitor_data", {}) analysis_results = agent_instance._safe_state_get("analysis_results", {}) if not competitor_data: return "No data to generate report from" # Generate report report_result = agent_instance.analysis_tools.generate_final_report( competitor_data, analysis_results ) agent_instance.agent.state.set("report", report_result.get("report_content", "")) agent_instance.agent.state.set("recording_path", agent_instance.browser_tools.recording_path) return "Report generated successfully" # Add tools to list tools.extend([ analyze_website, perform_analysis, generate_report ]) return tools async def _analyze_website_impl(self, competitor_name: str, competitor_url: str) -> str: """Implementation of website analysis.""" console.print(f"\n[bold blue]๐Ÿ” Analyzing: {competitor_name}[/bold blue]") console.print(f"[cyan]URL: {competitor_url}[/cyan]") competitor_data = {} with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), console=console ) as progress: task = progress.add_task(f"Analyzing {competitor_name}...", total=10) try: # Navigate to website progress.update(task, description="Navigating to website...", advance=1) nav_result = await self.browser_tools.navigate_to_url(competitor_url) competitor_data['navigation'] = nav_result if nav_result.get('status') != 'success': console.print(f"[yellow]โš ๏ธ Navigation failed: {nav_result.get('error')}[/yellow]") # Continue anyway to try to get some data # Take screenshot progress.update(task, description="Taking homepage screenshot...", advance=1) await self.browser_tools.take_annotated_screenshot(f"{competitor_name} - Homepage") # Discover sections progress.update(task, description="Discovering page sections...", advance=1) discovered_sections = await self.browser_tools.intelligent_scroll_and_discover() competitor_data['discovered_sections'] = discovered_sections console.print(f"[green]Found {len(discovered_sections)} key sections[/green]") # Try to find pricing page progress.update(task, description="Looking for pricing page...", advance=1) found_pricing = await self.browser_tools.smart_navigation("pricing") if found_pricing: await asyncio.sleep(3) await self.browser_tools.take_annotated_screenshot(f"{competitor_name} - Pricing") # Analyze forms progress.update(task, description="Checking interactive elements...", advance=1) form_data = await self.browser_tools.analyze_forms_and_inputs() competitor_data['interactive_elements'] = form_data # Extract pricing progress.update(task, description="Extracting pricing...", advance=1) pricing_result = await self.browser_tools.extract_pricing_info() competitor_data['pricing'] = pricing_result # Extract features progress.update(task, description="Extracting features...", advance=1) features_result = await self.browser_tools.extract_product_features() competitor_data['features'] = features_result # Explore additional pages progress.update(task, description="Exploring additional pages...", advance=1) additional_pages = await self.browser_tools.explore_multi_page_workflow( ["features", "docs", "api", "about"] ) competitor_data['additional_pages'] = additional_pages # Capture metrics progress.update(task, description="Capturing metrics...", advance=1) metrics = await self.browser_tools.capture_performance_metrics() competitor_data['performance_metrics'] = metrics # Save to state progress.update(task, description="Saving data...", advance=1) all_competitor_data = self._safe_state_get("competitor_data", {}) all_competitor_data[competitor_name] = { "url": competitor_url, "timestamp": datetime.now().isoformat(), **competitor_data, "status": "success" } self.agent.state.set("competitor_data", all_competitor_data) # Update metrics in state total_screenshots = self._safe_state_get("total_screenshots", 0) self.agent.state.set("total_screenshots", total_screenshots + len(self.browser_tools._screenshots_taken)) discovered_apis = self._safe_state_get("discovered_apis", []) discovered_apis.extend(self.browser_tools._discovered_apis) self.agent.state.set("discovered_apis", discovered_apis) except Exception as e: console.print(f"[red]โŒ Error analyzing {competitor_name}: {e}[/red]") import traceback traceback.print_exc() competitor_data = {"status": "error", "error": str(e)} all_competitor_data = self._safe_state_get("competitor_data", {}) all_competitor_data[competitor_name] = competitor_data self.agent.state.set("competitor_data", all_competitor_data) return f"Error analyzing {competitor_name}: {str(e)}" console.print(f"[green]โœ… Completed: {competitor_name}[/green]") return f"Successfully analyzed {competitor_name} - found {len(discovered_sections)} sections, extracted pricing and features" def _create_callback_handler(self): """Create a callback handler for progress tracking.""" def callback_handler(**kwargs): # Track tool usage if "current_tool_use" in kwargs and kwargs["current_tool_use"].get("name"): tool_name = kwargs["current_tool_use"]["name"] console.print(f"[cyan]๐Ÿ”ง Using tool: {tool_name}[/cyan]") # Show text output if "data" in kwargs: # Don't print full LLM reasoning, just tool calls pass return callback_handler async def run(self, competitors: List[Dict], parallel: bool = False) -> Dict: """Run the competitive intelligence analysis.""" try: # Store competitors in state self.agent.state.set("competitors", competitors) console.print("\n[cyan]๐Ÿค– Starting competitive analysis workflow...[/cyan]") console.print(f"[bold]Analyzing {len(competitors)} competitors[/bold]") # Analyze each competitor sequentially for i, competitor in enumerate(competitors, 1): console.print(f"\n[bold yellow]๐Ÿ“Š Competitor {i}/{len(competitors)}: {competitor['name']}[/bold yellow]") try: # Directly invoke the tool result = self.agent.tool.analyze_website( competitor_name=competitor['name'], competitor_url=competitor['url'] ) console.print(f"[green]โœ“ {competitor['name']} analysis complete[/green]") console.print(f"[dim]Result: {result[:200]}...[/dim]" if len(result) > 200 else f"[dim]Result: {result}[/dim]") # Add a small delay between competitors to avoid overwhelming if i < len(competitors): console.print(f"[dim]Waiting 2 seconds before next competitor...[/dim]") await asyncio.sleep(2) except Exception as comp_error: console.print(f"[red]โŒ Error analyzing {competitor['name']}: {comp_error}[/red]") # Continue with next competitor even if one fails continue console.print("\n[bold cyan]All competitors analyzed, generating insights...[/bold cyan]") # Perform analysis console.print("\n[yellow]Running data analysis...[/yellow]") try: analysis_result = self.agent.tool.perform_analysis() console.print(f"[green]โœ“ Analysis complete[/green]") except Exception as e: console.print(f"[red]Analysis error: {e}[/red]") analysis_result = "Analysis failed" # Generate report console.print("\n[yellow]Generating report...[/yellow]") try: report_result = self.agent.tool.generate_report() console.print(f"[green]โœ“ Report generated[/green]") except Exception as e: console.print(f"[red]Report generation error: {e}[/red]") report_result = "Report generation failed" # Get final state report = self._safe_state_get("report") recording_path = self._safe_state_get("recording_path") or self.browser_tools.recording_path analysis_results = self._safe_state_get("analysis_results", {}) apis_discovered = self._safe_state_get("discovered_apis", []) total_screenshots = self._safe_state_get("total_screenshots", 0) competitor_data = self._safe_state_get("competitor_data", {}) # Display summary console.print("\n" + "="*60) console.print(Panel( f"[bold green]โœ… Analysis Complete![/bold green]\n\n" f"๐Ÿ“Š Competitors requested: {len(competitors)}\n" f"โœ“ Successfully analyzed: {len([c for c in competitor_data.values() if c.get('status') == 'success'])}\n" f"โœ— Failed: {len([c for c in competitor_data.values() if c.get('status') == 'error'])}\n" f"๐Ÿ“ธ Screenshots taken: {total_screenshots}\n" f"๐Ÿ” APIs discovered: {len(apis_discovered)}\n" f"๐Ÿ“„ Report generated: {'Yes' if report else 'No'}\n" f"๐Ÿ“น Recording: {recording_path}\n\n" f"[bold]Analyzed:[/bold]\n" + "\n".join([f" โ€ข {name}: {data.get('status', 'unknown')}" for name, data in competitor_data.items()]), title="Summary", border_style="green" )) console.print("="*60) return { "success": True, "report": self._safe_state_get("report"), "recording_path": self.browser_tools.recording_path if self.browser_tools else None, "recording_config": self.browser_tools.recording_config if self.browser_tools else None, # NEW "analysis_results": self._safe_state_get("analysis_results", {}), "apis_discovered": self._safe_state_get("discovered_apis", []), "session_id": datetime.now().strftime("%Y%m%d_%H%M%S"), "parallel_mode": self._safe_state_get("parallel_mode", False) } except Exception as e: console.print(f"[red]โŒ Agent error: {e}[/red]") import traceback traceback.print_exc() return {"success": False, "error": str(e)} async def cleanup(self): """Clean up agent resources.""" console.print("\n[yellow]๐Ÿงน Cleaning up...[/yellow]") # Cleanup browser await self.browser_tools.cleanup() # Cleanup parallel sessions for session in self.parallel_browser_sessions: try: await session.cleanup() except: pass # Cleanup code interpreter self.analysis_tools.cleanup() console.print("[green]โœ… Cleanup complete[/green]")