"""Main LangGraph agent for competitive intelligence gathering.""" import asyncio import sys from pathlib import Path from typing import Dict, List, TypedDict, Annotated, Optional, Any from datetime import datetime import langgraph import langgraph.graph as lg_graph StateGraph = lg_graph.StateGraph END = lg_graph.END from langchain_core.messages import HumanMessage, SystemMessage from langchain_aws import ChatBedrockConverse from rich.console import Console from rich.panel import Panel from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn sys.path.insert(0, str(Path(__file__).parent)) from utils.imports import setup_interactive_tools_import paths = setup_interactive_tools_import() from interactive_tools.browser_viewer import BrowserViewerServer # Import tools from config import AgentConfig from browser_tools import BrowserTools from analysis_tools import AnalysisTools console = Console() class CompetitiveIntelState(TypedDict): """State for the competitive intelligence agent.""" messages: Annotated[List, "append"] competitors: List[Dict] current_competitor_index: int competitor_data: Dict analysis_results: Dict report: Optional[str] recording_path: Optional[str] error: Optional[str] total_screenshots: int discovered_apis: List[Dict] performance_metrics: Dict session_data: Optional[Dict] # For session persistence parallel_mode: bool # For parallel processing class CompetitiveIntelligenceAgent: """LangGraph agent for competitive intelligence gathering.""" def __init__(self, config: AgentConfig): self.config = config self.browser_tools = BrowserTools(config) self.analysis_tools = AnalysisTools(config) self.llm = None self.graph = None self.browser_viewer = None self.parallel_browser_sessions = [] # For parallel processing async def initialize(self, resume_session_id: Optional[str] = None): """Initialize the agent and its tools with optional session resume.""" console.print(Panel( "[bold cyan]๐ŸŽฏ Competitive Intelligence Agent[/bold cyan]\n\n" "[bold]Powered by Amazon Bedrock AgentCore[/bold]\n\n" "Features:\n" "โ€ข ๐ŸŒ Automated browser navigation\n" "โ€ข ๐Ÿ“Š Real-time API and network analysis\n" "โ€ข ๐ŸŽฏ Intelligent content extraction\n" "โ€ข ๐Ÿ“ธ Screenshot capture\n" "โ€ข ๐Ÿ“น Full session recording to S3\n" "โ€ข ๐Ÿ”„ Multi-tool orchestration\n" "โ€ข โšก Parallel processing support\n", title="Initializing", border_style="blue" )) # Check if we're resuming a session if resume_session_id: console.print(f"[cyan]๐Ÿ”„ Resuming session: {resume_session_id}[/cyan]") session_data = await self.resume_session(resume_session_id) if session_data: console.print("[green]โœ… Previous session data loaded[/green]") # Initialize browser with recording self.browser_tools.create_browser_with_recording() # Initialize LLM self.llm = ChatBedrockConverse( model_id=self.config.llm_model_id, region_name=self.config.region ) console.print(f"โœ… LLM initialized: {self.config.llm_model_id}") # Initialize browser session with CDP await self.browser_tools.initialize_browser_session(self.llm) # Initialize code interpreter self.analysis_tools.initialize() # Start browser live viewer if self.browser_tools.browser_client: console.print("\n[cyan]๐Ÿ–ฅ๏ธ Starting live browser viewer...[/cyan]") self.browser_viewer = BrowserViewerServer( self.browser_tools.browser_client, port=self.config.live_view_port ) viewer_url = self.browser_viewer.start(open_browser=True) console.print(f"[green]โœ… Live viewer: {viewer_url}[/green]") console.print("[dim]You can take/release control in the viewer[/dim]") # Build the graph self._build_graph() console.print("\n[green]โœ… Agent initialized successfully![/green]") console.print(f"[cyan]๐Ÿ“น Recording to: {self.browser_tools.recording_path}[/cyan]") async def resume_session(self, session_id: str) -> Optional[Dict]: """Resume a previous analysis session using Code Interpreter persistence.""" try: console.print("[cyan]๐Ÿ“‚ Loading previous session data...[/cyan]") # Use Code Interpreter to load session data session_data = self.analysis_tools.load_session_state(session_id) if session_data and session_data.get('status') == 'success': return session_data.get('data') else: console.print("[yellow]โš ๏ธ No previous session data found[/yellow]") return None except Exception as e: console.print(f"[yellow]โš ๏ธ Could not resume session: {e}[/yellow]") return None def _build_graph(self): """Build the LangGraph workflow.""" workflow = StateGraph(CompetitiveIntelState) # Add nodes workflow.add_node("analyze_competitor", self.analyze_competitor) workflow.add_node("intelligent_analysis", self.intelligent_multi_tool_analysis) workflow.add_node("process_data", self.process_data) workflow.add_node("generate_report", self.generate_report) # Set entry point workflow.set_entry_point("analyze_competitor") # Conditional edge to loop through competitors workflow.add_conditional_edges( "analyze_competitor", self.should_continue_analyzing, { "continue": "analyze_competitor", "analyze": "intelligent_analysis", "process": "process_data" } ) workflow.add_edge("intelligent_analysis", "process_data") workflow.add_edge("process_data", "generate_report") workflow.add_edge("generate_report", END) self.graph = workflow.compile() async def analyze_competitor(self, state: CompetitiveIntelState) -> CompetitiveIntelState: """Analyze a single competitor with enhanced features.""" competitors = state["competitors"] current_index = state.get("current_competitor_index", 0) if current_index >= len(competitors): return state competitor = competitors[current_index] console.print(f"\n[bold blue]๐Ÿ” Analyzing Competitor {current_index + 1}/{len(competitors)}: {competitor['name']}[/bold blue]") # Progress tracking with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), console=console ) as progress: task = progress.add_task(f"Analyzing {competitor['name']}...", total=10) competitor_data = {} try: # Step 1: Navigate progress.update(task, description="Navigating to website...", advance=1) nav_result = await self.browser_tools.navigate_to_url(competitor['url']) competitor_data['navigation'] = nav_result # Step 2: Take initial screenshot progress.update(task, description="Taking homepage screenshot...", advance=1) try: await self.browser_tools.take_annotated_screenshot(f"{competitor['name']} - Homepage") except Exception as e: console.print(f"[yellow]โš ๏ธ Screenshot error: {e}[/yellow]") # Step 3: Intelligent discovery progress.update(task, description="Discovering page sections...", advance=1) try: discovered_sections = await self.browser_tools.intelligent_scroll_and_discover() competitor_data['discovered_sections'] = discovered_sections console.print(f"[green]Found {len(discovered_sections)} key sections[/green]") except Exception as e: console.print(f"[yellow]โš ๏ธ Discovery error: {e}[/yellow]") competitor_data['discovered_sections'] = [] # Step 4: Try to navigate to pricing page progress.update(task, description="Looking for pricing page...", advance=1) try: found_pricing = await self.browser_tools.smart_navigation("pricing") if found_pricing: await asyncio.sleep(3) # Let page load await self.browser_tools.take_annotated_screenshot(f"{competitor['name']} - Pricing Page") except Exception as e: console.print(f"[yellow]โš ๏ธ Navigation error: {e}[/yellow]") # Step 5: Advanced form interaction (NEW) progress.update(task, description="Checking for interactive elements...", advance=1) try: form_data = await self.browser_tools.analyze_forms_and_inputs() competitor_data['interactive_elements'] = form_data except Exception as e: console.print(f"[yellow]โš ๏ธ Form analysis error: {e}[/yellow]") # Step 6: Extract pricing progress.update(task, description="Extracting pricing information...", advance=1) try: pricing_result = await self.browser_tools.extract_pricing_info() competitor_data['pricing'] = pricing_result except Exception as e: console.print(f"[yellow]โš ๏ธ Pricing extraction error: {e}[/yellow]") competitor_data['pricing'] = {"status": "error", "error": str(e)} # Step 7: Extract features progress.update(task, description="Extracting product features...", advance=1) try: features_result = await self.browser_tools.extract_product_features() competitor_data['features'] = features_result except Exception as e: console.print(f"[yellow]โš ๏ธ Feature extraction error: {e}[/yellow]") competitor_data['features'] = {"status": "error", "error": str(e)} # Step 8: Multi-page workflow (NEW) progress.update(task, description="Exploring additional pages...", advance=1) try: additional_pages = await self.browser_tools.explore_multi_page_workflow( ["features", "docs", "api", "about"] ) competitor_data['additional_pages'] = additional_pages except Exception as e: console.print(f"[yellow]โš ๏ธ Multi-page exploration error: {e}[/yellow]") # Step 9: Capture performance metrics progress.update(task, description="Capturing performance metrics...", advance=1) try: metrics = await self.browser_tools.capture_performance_metrics() competitor_data['performance_metrics'] = metrics except Exception as e: console.print(f"[yellow]โš ๏ธ Metrics error: {e}[/yellow]") # Step 10: Save session state (NEW) progress.update(task, description="Saving session state...", advance=1) try: self.analysis_tools.save_session_state( f"competitor_{current_index}", competitor_data ) except Exception as e: console.print(f"[yellow]โš ๏ธ Session save error: {e}[/yellow]") except Exception as e: console.print(f"[red]โŒ Critical error analyzing {competitor['name']}: {e}[/red]") competitor_data = { "status": "error", "error": str(e), "url": competitor['url'] } # Store results all_competitor_data = state.get("competitor_data", {}) all_competitor_data[competitor['name']] = { "url": competitor['url'], "timestamp": datetime.now().isoformat(), **competitor_data, "apis_discovered": len(self.browser_tools._discovered_apis), "screenshots_taken": len(self.browser_tools._screenshots_taken), "status": "success" if competitor_data.get("navigation", {}).get("status") == "success" else "error" } # Analyze this competitor's data console.print(f"[cyan]๐Ÿ“Š Running analysis for {competitor['name']}...[/cyan]") try: analysis_result = self.analysis_tools.analyze_competitor_data( competitor['name'], all_competitor_data[competitor['name']] ) except Exception as e: console.print(f"[yellow]โš ๏ธ Analysis error: {e}[/yellow]") analysis_result = {"status": "error", "error": str(e)} console.print(f"[green]โœ… Completed: {competitor['name']}[/green]") console.print(f" โ€ข Discovered {len(competitor_data.get('discovered_sections', []))} sections") console.print(f" โ€ข Found {len(self.browser_tools._discovered_apis)} API endpoints") console.print(f" โ€ข Took {len(self.browser_tools._screenshots_taken)} screenshots") console.print(f" โ€ข Explored {len(competitor_data.get('additional_pages', []))} additional pages") # Update state return { **state, "current_competitor_index": current_index + 1, "competitor_data": all_competitor_data, "total_screenshots": state.get("total_screenshots", 0) + len(self.browser_tools._screenshots_taken), "discovered_apis": state.get("discovered_apis", []) + self.browser_tools._discovered_apis, "messages": state["messages"] + [ HumanMessage(content=f"Analyzed {competitor['name']}: {analysis_result}") ] } async def intelligent_multi_tool_analysis(self, state: CompetitiveIntelState) -> CompetitiveIntelState: """NEW: Intelligent analysis that orchestrates browser and code interpreter together.""" console.print("\n[bold cyan]๐Ÿค– Running Intelligent Multi-Tool Analysis...[/bold cyan]") competitor_data = state.get("competitor_data", {}) # Step 1: Use Code Interpreter to analyze patterns console.print("[cyan]Step 1: Analyzing data patterns with Code Interpreter...[/cyan]") pattern_analysis = self.analysis_tools.analyze_pricing_patterns(competitor_data) # Step 2: Based on analysis, browser performs targeted actions if pattern_analysis.get('missing_data'): console.print("[cyan]Step 2: Browser collecting missing data points...[/cyan]") for competitor_name, missing_items in pattern_analysis['missing_data'].items(): if 'pricing_tiers' in missing_items: # Browser goes back to find more detailed pricing console.print(f"[yellow]Revisiting {competitor_name} for detailed pricing...[/yellow]") # This would navigate back if needed # Step 3: Code Interpreter processes combined data console.print("[cyan]Step 3: Processing combined insights...[/cyan]") combined_insights = self.analysis_tools.generate_competitive_insights( competitor_data, pattern_analysis ) # Step 4: Use AWS CLI in Code Interpreter to save results console.print("[cyan]Step 4: Using AWS CLI to archive results...[/cyan]") aws_result = self.analysis_tools.save_to_s3_with_aws_cli( combined_insights, self.config.s3_bucket, f"{self.config.s3_prefix}analysis/" ) return { **state, "analysis_results": { **state.get("analysis_results", {}), "pattern_analysis": pattern_analysis, "combined_insights": combined_insights, "aws_storage": aws_result } } async def analyze_competitors_parallel(self, competitors: List[Dict]) -> Dict: """Analyze multiple competitors in parallel with statistics.""" console.print("\n[bold cyan]โšก Starting Parallel Analysis Mode[/bold cyan]") console.print(f"Analyzing {len(competitors)} competitors simultaneously...") # Add timing for performance comparison start_time = datetime.now() # Create tasks for parallel execution tasks = [] for i, competitor in enumerate(competitors): task = self._analyze_single_competitor_async(competitor, i) tasks.append(task) # Execute all tasks in parallel results = await asyncio.gather(*tasks, return_exceptions=True) # Aggregate results all_competitor_data = {} total_apis = [] total_screenshots = 0 parallel_sessions = [] for i, result in enumerate(results): if isinstance(result, Exception): console.print(f"[red]Error analyzing {competitors[i]['name']}: {result}[/red]") all_competitor_data[competitors[i]['name']] = { "status": "error", "error": str(result) } else: competitor_name = competitors[i]['name'] all_competitor_data[competitor_name] = result['data'] total_apis.extend(result.get('apis', [])) total_screenshots += result.get('screenshots', 0) # Track session information for replaying if 'browser_session_id' in result['data'] and result['data']['browser_session_id']: parallel_sessions.append({ "name": competitor_name, "session_id": result['data']['browser_session_id'], "recording_path": result['data'].get('recording_path') }) # Calculate execution time end_time = datetime.now() duration = (end_time - start_time).total_seconds() console.print(f"\n[green]โœ… Parallel analysis complete![/green]") console.print(f" โ€ข Successfully analyzed: {sum(1 for d in all_competitor_data.values() if d.get('status') != 'error')}/{len(competitors)}") console.print(f" โ€ข Total APIs discovered: {len(total_apis)}") console.print(f" โ€ข Total screenshots: {total_screenshots}") console.print(f" โ€ข Execution time: {duration:.2f} seconds") console.print(f" โ€ข Average time per competitor: {duration/len(competitors):.2f} seconds") return { "competitor_data": all_competitor_data, "discovered_apis": total_apis, "total_screenshots": total_screenshots, "parallel_sessions": parallel_sessions, "execution_stats": { "total_duration": duration, "avg_duration_per_competitor": duration/len(competitors), "concurrent_sessions": len(competitors) } } async def _analyze_single_competitor_async(self, competitor: Dict, index: int) -> Dict: """Helper method for parallel competitor analysis.""" console.print(f"[cyan]๐Ÿ”„ Starting parallel analysis for {competitor['name']}...[/cyan]") # Create a new browser session for this competitor browser_session = BrowserTools(self.config) browser_id = browser_session.create_browser_with_recording() session = await browser_session.initialize_browser_session(self.llm) # Track the browser session for potential cleanup self.parallel_browser_sessions.append(browser_session) try: # Navigate and analyze await browser_session.navigate_to_url(competitor['url']) # Collect data pricing = await browser_session.extract_pricing_info() features = await browser_session.extract_product_features() sections = await browser_session.intelligent_scroll_and_discover() # Take screenshots screenshot_result = await browser_session.take_annotated_screenshot(f"{competitor['name']} - Parallel Analysis") result = { "data": { "url": competitor['url'], "timestamp": datetime.now().isoformat(), "pricing": pricing, "features": features, "sections": sections, "status": "success", "browser_session_id": browser_session.browser_client.session_id if browser_session.browser_client else None, "recording_path": browser_session.recording_path }, "apis": browser_session._discovered_apis, "screenshots": len(browser_session._screenshots_taken) } console.print(f"[green]โœ… Completed parallel analysis for {competitor['name']}[/green]") return result except Exception as e: console.print(f"[red]Error in parallel analysis for {competitor['name']}: {e}[/red]") raise e finally: # Cleanup the browser session await browser_session.cleanup() def should_continue_analyzing(self, state: CompetitiveIntelState) -> str: """Determine if we should continue to the next competitor.""" current_index = state.get("current_competitor_index", 0) total_competitors = len(state["competitors"]) if current_index < total_competitors: return "continue" elif state.get("competitor_data"): return "analyze" # Go to intelligent analysis else: return "process" async def process_data(self, state: CompetitiveIntelState) -> CompetitiveIntelState: """Process all collected data and create visualizations.""" console.print("\n[bold yellow]๐Ÿ“Š Processing all competitor data...[/bold yellow]") competitor_data = state.get("competitor_data", {}) # Create comparison visualization console.print("[cyan]Creating visualizations...[/cyan]") viz_result = self.analysis_tools.create_comparison_visualization(competitor_data) # Save final session state console.print("[cyan]Saving final session state...[/cyan]") session_id = datetime.now().strftime("%Y%m%d_%H%M%S") # Create a safe copy of state for serialization serializable_state = { "competitor_data": competitor_data, "total_screenshots": state.get("total_screenshots", 0), "discovered_apis": state.get("discovered_apis", []), "timestamp": datetime.now().isoformat(), "parallel_mode": state.get("parallel_mode", False), # Don't include full messages to avoid serialization issues "message_count": len(state.get("messages", [])) if "messages" in state else 0 } # Save session state with serializable content self.analysis_tools.save_session_state(f"final_{session_id}", serializable_state) return { **state, "analysis_results": { "visualization": viz_result, "total_competitors": len(competitor_data), "successful_analyses": sum(1 for d in competitor_data.values() if d.get('status') == 'success'), "total_apis_discovered": len(state.get("discovered_apis", [])), "session_id": session_id } } async def generate_report(self, state: CompetitiveIntelState) -> CompetitiveIntelState: """Generate the final report.""" console.print("\n[bold green]๐Ÿ“„ Generating final report...[/bold green]") # Generate comprehensive report report_result = self.analysis_tools.generate_final_report( state.get("competitor_data", {}), state.get("analysis_results", {}) ) # Get recording path recording_path = self.browser_tools.recording_path # Summary panel console.print("\n") console.print(Panel( f"[bold green]โœ… Analysis Complete![/bold green]\n\n" f"๐Ÿ“Š Competitors analyzed: {len(state['competitors'])}\n" f"๐Ÿ“ธ Screenshots taken: {state.get('total_screenshots', 0)}\n" f"๐Ÿ” APIs discovered: {len(state.get('discovered_apis', []))}\n" f"๐Ÿ“„ Report: {report_result.get('report_path', 'N/A')}\n" f"๐Ÿ“น Recording: {recording_path}\n" f"๐Ÿ’พ Session ID: {state['analysis_results'].get('session_id', 'N/A')}\n" f"โšก Mode: {'Parallel' if state.get('parallel_mode', False) else 'Sequential'}\n" + (f"โฑ๏ธ Total execution: {state.get('execution_stats', {}).get('total_duration', 0):.2f}s" if state.get('execution_stats') else ""), title="Summary", border_style="green" )) return { **state, "report": report_result.get("report_content", ""), "recording_path": recording_path, "messages": state["messages"] + [ {"type": "human", "content": f"Report generated: {report_result.get('output', '')}"} ] } async def run(self, competitors: List[Dict], parallel: bool = False, force_parallel: bool = False) -> Dict: """Run the competitive intelligence analysis.""" try: # For live view, we need to warn but allow forcing parallel mode if parallel and self.browser_viewer and len(competitors) > 1 and not force_parallel: console.print("[yellow]โš ๏ธ Live viewing is active - parallel mode will disable live view[/yellow]") if not force_parallel: console.print("[yellow]Switching to sequential mode to maintain visibility...[/yellow]") parallel = False if parallel and len(competitors) > 1: # Use parallel mode for multiple competitors console.print("[bold cyan]Using parallel processing mode[/bold cyan]") if self.browser_viewer: console.print("[yellow]โš ๏ธ Live view will be limited during parallel execution[/yellow]") parallel_results = await self.analyze_competitors_parallel(competitors) # Create state with parallel results initial_state: CompetitiveIntelState = { "messages": [{"type": "system", "content": "Parallel competitive intelligence analysis"}], "competitors": competitors, "current_competitor_index": len(competitors), "competitor_data": parallel_results["competitor_data"], "analysis_results": {}, "report": None, "recording_path": self.browser_tools.recording_path, "error": None, "total_screenshots": parallel_results["total_screenshots"], "discovered_apis": parallel_results["discovered_apis"], "performance_metrics": {}, "session_data": None, "parallel_mode": True, "execution_stats": parallel_results.get("execution_stats", {}) } # Run only the processing and report generation final_state = await self.graph.ainvoke(initial_state) else: # Use sequential mode initial_state: CompetitiveIntelState = { "messages": [{"type": "system", "content": "Starting competitive intelligence analysis"}], "competitors": competitors, "current_competitor_index": 0, "competitor_data": {}, "analysis_results": {}, "report": None, "recording_path": None, "error": None, "total_screenshots": 0, "discovered_apis": [], "performance_metrics": {}, "session_data": None, "parallel_mode": False } # Run the full graph console.print("\n[cyan]๐Ÿš€ Starting analysis workflow...[/cyan]") final_state = await self.graph.ainvoke(initial_state) return { "success": True, "report": final_state["report"], "recording_path": final_state["recording_path"], "analysis_results": final_state["analysis_results"], "apis_discovered": final_state.get("discovered_apis", []), "session_id": final_state["analysis_results"].get("session_id"), "parallel_mode": final_state.get("parallel_mode", False), "parallel_sessions": parallel_results.get("parallel_sessions", []) if parallel else [], "execution_stats": final_state.get("execution_stats", {}) } except Exception as e: console.print(f"[red]โŒ Agent error: {e}[/red]") import traceback traceback.print_exc() return {"success": False, "error": str(e)} async def cleanup(self): """Clean up agent resources.""" console.print("\n[yellow]๐Ÿงน Cleaning up...[/yellow]") # Cleanup browser await self.browser_tools.cleanup() # Cleanup any parallel browser sessions for session in self.parallel_browser_sessions: try: await session.cleanup() except: pass # Cleanup code interpreter self.analysis_tools.cleanup() console.print("[green]โœ… Cleanup complete[/green]") async def cleanup_resources(state: CompetitiveIntelState): """Clean up all resources to prevent ongoing costs.""" cleanup_report = { "browsers_closed": 0, "s3_objects_deleted": 0, "errors": [] } try: # 1. Stop BedrockAgentCore browsers if 'browser_tools' in state: browser_tools = state['browser_tools'] if browser_tools.browser_id: control_client = boto3.client( "bedrock-agentcore-control", region_name=state['config'].region, endpoint_url=get_control_plane_endpoint(state['config'].region) ) try: control_client.delete_browser(browserId=browser_tools.browser_id) cleanup_report["browsers_closed"] += 1 except Exception as e: cleanup_report["errors"].append(str(e)) # 2. Clean up parallel browser sessions for session in state.get('parallel_browser_sessions', []): if session.browser_id: try: control_client.delete_browser(browserId=session.browser_id) cleanup_report["browsers_closed"] += 1 except: pass # 3. Stop Code Interpreter if 'analysis_tools' in state: try: state['analysis_tools'].cleanup() except: pass # 4. Delete recordings if requested if state.get('delete_recordings'): s3_client = boto3.client('s3') recording_path = state.get('recording_path', '') if recording_path.startswith('s3://'): parts = recording_path.replace('s3://', '').split('/', 1) bucket = parts[0] prefix = parts[1] if len(parts) > 1 else '' try: paginator = s3_client.get_paginator('list_objects_v2') pages = paginator.paginate(Bucket=bucket, Prefix=prefix) for page in pages: if 'Contents' in page: for obj in page['Contents']: s3_client.delete_object(Bucket=bucket, Key=obj['Key']) cleanup_report["s3_objects_deleted"] += 1 except Exception as e: cleanup_report["errors"].append(f"S3: {str(e)}") except Exception as e: cleanup_report["errors"].append(str(e)) return cleanup_report