Sundar Raghavan 1c7e1861c1
Add Competitive Intelligence Agent with Bedrock AgentCore SDK (#221)
* Add Enterprise Web Intelligence Agent with Bedrock AgentCore Tools SDK
Automated browser navigation with CDP enhancements
Live viewing with DCV integration
Session recording and replay capabilities
LLM-powered content extraction
Code Interpreter for dynamic analysis
Comprehensive documentation and examples

* Moved project from old-folder to new-folder and updated files

* Refactor: Rename to enterprise-web-intelligence-agent and add Strands implementation alongside LangGraph
2025-08-27 15:17:28 -07:00

945 lines
34 KiB
Python

"""Analysis tools using BedrockAgentCore SDK's CodeInterpreter."""
import json
from typing import Dict, List, Any
from rich.console import Console
from datetime import datetime
# Import from BedrockAgentCore SDK
from bedrock_agentcore.tools.code_interpreter_client import CodeInterpreter
console = Console()
class AnalysisTools:
"""Data analysis tools using BedrockAgentCore SDK's CodeInterpreter."""
def __init__(self, config):
self.config = config
self.code_interpreter = CodeInterpreter(config.region)
self.session_active = False
def _extract_output(self, result: Dict) -> str:
"""Extract output from CodeInterpreter result."""
# Handle the streaming response format from SDK
if "stream" in result:
for event in result.get("stream", []):
if "result" in event:
result_data = event["result"]
if "structuredContent" in result_data:
return result_data["structuredContent"].get("stdout", "")
elif "content" in result_data:
for item in result_data["content"]:
if item.get("type") == "text":
return item.get("text", "")
# Fallback to direct result format
elif "structuredContent" in result:
return result["structuredContent"].get("stdout", "")
elif "content" in result:
for item in result["content"]:
if item.get("type") == "text":
return item.get("text", "")
return ""
def initialize(self):
"""Initialize the CodeInterpreter session."""
console.print("[cyan]🔧 Initializing CodeInterpreter...[/cyan]")
# Start session using SDK method
session_id = self.code_interpreter.start(
name="competitive_intel_analysis",
session_timeout_seconds=self.config.code_session_timeout
)
self.session_active = True
console.print(f"✅ CodeInterpreter session: {session_id}")
# Set up the analysis environment
setup_code = """
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import json
from datetime import datetime
import os
import subprocess
# Create directories for outputs
os.makedirs('analysis', exist_ok=True)
os.makedirs('reports', exist_ok=True)
os.makedirs('visualizations', exist_ok=True)
os.makedirs('data', exist_ok=True)
os.makedirs('sessions', exist_ok=True) # For session persistence
# Configure matplotlib
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")
print("Analysis environment ready!")
print(f"Working directory: {os.getcwd()}")
print(f"Available directories: {', '.join([d for d in os.listdir('.') if os.path.isdir(d)])}")
# Test AWS CLI availability
try:
result = subprocess.run(['aws', '--version'], capture_output=True, text=True)
print(f"AWS CLI available: {result.stdout.strip()}")
except Exception as e:
print(f"AWS CLI not available: {e}")
"""
# Execute setup code
result = self.code_interpreter.invoke("executeCode", {
"code": setup_code,
"language": "python",
"clearContext": False
})
output = self._extract_output(result)
console.print(f"[dim]{output}[/dim]")
return session_id
def save_session_state(self, session_name: str, data: Dict) -> Dict:
"""NEW: Save session state for later resumption."""
try:
console.print(f"[cyan]💾 Saving session state: {session_name}[/cyan]")
# Create a JSON-serializable copy of the data
serializable_data = self._make_serializable(data)
save_code = f"""
import json
import os
from datetime import datetime
session_data = {json.dumps(serializable_data)}
session_name = "{session_name}"
# Create session metadata
session_metadata = {{
"session_name": session_name,
"saved_at": datetime.now().isoformat(),
"data_size": len(json.dumps(session_data))
}}
# Save session data
os.makedirs('sessions', exist_ok=True)
session_file = f'sessions/{{session_name}}_data.json'
with open(session_file, 'w') as f:
json.dump(session_data, f, indent=2)
# Save metadata
metadata_file = f'sessions/{{session_name}}_metadata.json'
with open(metadata_file, 'w') as f:
json.dump(session_metadata, f, indent=2)
print(f"Session saved: {{session_name}}")
print(f"Data file: {{session_file}} ({{os.path.getsize(session_file)}} bytes)")
print(f"Metadata file: {{metadata_file}}")
# List all saved sessions
all_sessions = [f.replace('_metadata.json', '') for f in os.listdir('sessions') if f.endswith('_metadata.json')]
print(f"Total saved sessions: {{len(all_sessions)}}")
"""
result = self.code_interpreter.invoke("executeCode", {
"code": save_code,
"language": "python"
})
output = self._extract_output(result)
return {
"status": "success",
"session_name": session_name,
"output": output
}
except Exception as e:
console.print(f"[red]❌ Session save error: {e}[/red]")
return {"status": "error", "error": str(e)}
def _make_serializable(self, data):
"""Convert data structure to be JSON serializable, handling LangChain messages."""
if data is None:
return None
if isinstance(data, dict):
return {k: self._make_serializable(v) for k, v in data.items()}
elif isinstance(data, list):
return [self._make_serializable(item) for item in data]
# Handle LangChain message types
elif hasattr(data, 'content') and hasattr(data, 'type'):
# This is likely a LangChain message object
return {
"content": data.content,
"type": getattr(data, "type", "message")
}
# Add handling for other custom objects as needed
else:
try:
# Try to serialize with json to test if it's serializable
json.dumps(data)
return data
except (TypeError, OverflowError):
# If we can't serialize it, convert to string representation
return str(data)
def load_session_state(self, session_name: str) -> Dict:
"""NEW: Load a previously saved session state."""
try:
console.print(f"[cyan]📂 Loading session state: {session_name}[/cyan]")
load_code = f"""
import json
import os
session_name = "{session_name}"
# Check if session exists
data_file = f'sessions/{{session_name}}_data.json'
metadata_file = f'sessions/{{session_name}}_metadata.json'
if not os.path.exists(data_file):
print(f"Session not found: {{session_name}}")
print(json.dumps({{"status": "not_found"}}))
else:
# Load session data
with open(data_file, 'r') as f:
session_data = json.load(f)
# Load metadata
if os.path.exists(metadata_file):
with open(metadata_file, 'r') as f:
metadata = json.load(f)
else:
metadata = {{}}
result = {{
"status": "success",
"data": session_data,
"metadata": metadata
}}
print(json.dumps(result))
"""
result = self.code_interpreter.invoke("executeCode", {
"code": load_code,
"language": "python"
})
output = self._extract_output(result)
try:
return json.loads(output)
except:
return {"status": "error", "output": output}
except Exception as e:
console.print(f"[red]❌ Session load error: {e}[/red]")
return {"status": "error", "error": str(e)}
def save_to_s3_with_aws_cli(self, data: Dict, bucket: str, prefix: str) -> Dict:
"""NEW: Use AWS CLI within Code Interpreter to save data to S3."""
try:
console.print(f"[cyan]☁️ Saving to S3 using AWS CLI...[/cyan]")
aws_cli_code = f"""
import json
import subprocess
import os
from datetime import datetime
# Prepare data
data = {json.dumps(data)}
bucket = "{bucket}"
prefix = "{prefix}"
# Save data locally first
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
local_file = f'analysis/competitive_analysis_{{timestamp}}.json'
with open(local_file, 'w') as f:
json.dump(data, f, indent=2)
print(f"Saved locally: {{local_file}}")
# Use AWS CLI to upload to S3
s3_key = f"{{prefix}}competitive_analysis_{{timestamp}}.json"
cmd = [
'aws', 's3', 'cp',
local_file,
f's3://{{bucket}}/{{s3_key}}',
'--region', '{self.config.region}'
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
print(f"Successfully uploaded to S3: s3://{{bucket}}/{{s3_key}}")
# List files in the S3 prefix to verify
list_cmd = ['aws', 's3', 'ls', f's3://{{bucket}}/{{prefix}}', '--region', '{self.config.region}']
list_result = subprocess.run(list_cmd, capture_output=True, text=True, timeout=30)
print("Files in S3 prefix:")
print(list_result.stdout)
output = {{
"status": "success",
"s3_path": f"s3://{{bucket}}/{{s3_key}}",
"local_file": local_file,
"file_size": os.path.getsize(local_file)
}}
else:
print(f"AWS CLI error: {{result.stderr}}")
output = {{
"status": "error",
"error": result.stderr
}}
except subprocess.TimeoutExpired:
print("AWS CLI command timed out")
output = {{
"status": "error",
"error": "Command timed out"
}}
except Exception as e:
print(f"Error: {{e}}")
output = {{
"status": "error",
"error": str(e)
}}
print(json.dumps(output))
"""
result = self.code_interpreter.invoke("executeCode", {
"code": aws_cli_code,
"language": "python"
})
output = self._extract_output(result)
try:
# Try to parse JSON output
lines = output.strip().split('\n')
for line in reversed(lines):
if line.strip().startswith('{'):
return json.loads(line)
return {"status": "success", "output": output}
except:
return {"status": "success", "output": output}
except Exception as e:
console.print(f"[red]❌ AWS CLI error: {e}[/red]")
return {"status": "error", "error": str(e)}
def analyze_pricing_patterns(self, competitor_data: Dict) -> Dict:
"""NEW: Analyze pricing patterns across competitors."""
try:
console.print("[cyan]🔍 Analyzing pricing patterns...[/cyan]")
analysis_code = f"""
import json
import pandas as pd
competitor_data = {json.dumps(competitor_data)}
# Analyze what data we have and what's missing
analysis = {{
"competitors_with_pricing": [],
"competitors_without_pricing": [],
"missing_data": {{}},
"patterns": []
}}
for name, data in competitor_data.items():
if data.get('pricing', {{}}).get('status') == 'success':
analysis["competitors_with_pricing"].append(name)
else:
analysis["competitors_without_pricing"].append(name)
analysis["missing_data"][name] = ["pricing_tiers", "subscription_details"]
# Identify patterns
if len(analysis["competitors_with_pricing"]) > 0:
analysis["patterns"].append("Pricing data available for analysis")
print(json.dumps(analysis, indent=2))
"""
result = self.code_interpreter.invoke("executeCode", {
"code": analysis_code,
"language": "python"
})
output = self._extract_output(result)
try:
return json.loads(output)
except:
return {"raw_output": output}
except Exception as e:
console.print(f"[red]❌ Pattern analysis error: {e}[/red]")
return {"status": "error", "error": str(e)}
def generate_competitive_insights(self, competitor_data: Dict, pattern_analysis: Dict) -> Dict:
"""NEW: Generate insights by combining browser and analysis data."""
try:
console.print("[cyan]💡 Generating competitive insights...[/cyan]")
insights_code = f"""
import json
from datetime import datetime
competitor_data = {json.dumps(competitor_data)}
pattern_analysis = {json.dumps(pattern_analysis)}
insights = {{
"generated_at": datetime.now().isoformat(),
"total_competitors": len(competitor_data),
"data_completeness": {{
"with_pricing": len(pattern_analysis.get("competitors_with_pricing", [])),
"without_pricing": len(pattern_analysis.get("competitors_without_pricing", []))
}},
"key_findings": [],
"recommendations": []
}}
# Generate findings
if pattern_analysis.get("competitors_with_pricing"):
insights["key_findings"].append(
f"Successfully extracted pricing from {{len(pattern_analysis['competitors_with_pricing'])}} competitors"
)
if pattern_analysis.get("missing_data"):
insights["recommendations"].append(
"Consider manual review for competitors with missing data"
)
print(json.dumps(insights, indent=2))
"""
result = self.code_interpreter.invoke("executeCode", {
"code": insights_code,
"language": "python"
})
output = self._extract_output(result)
try:
return json.loads(output)
except:
return {"raw_output": output}
except Exception as e:
console.print(f"[red]❌ Insights generation error: {e}[/red]")
return {"status": "error", "error": str(e)}
def analyze_competitor_data(self, competitor_name: str, data: Dict) -> Dict:
"""Analyze data for a specific competitor."""
try:
console.print(f"[cyan]📊 Analyzing {competitor_name}...[/cyan]")
analysis_code = f"""
import json
import pandas as pd
import os # Make sure os is imported
from datetime import datetime
# Load competitor data
competitor_data = {json.dumps(data)}
competitor_name = "{competitor_name}"
# Create analysis summary
analysis = {{
"competitor": competitor_name,
"analyzed_at": datetime.now().isoformat(),
"data_points_collected": {{
"has_pricing": bool(competitor_data.get('pricing', {{}}).get('data')),
"has_features": bool(competitor_data.get('features', {{}}).get('data')),
"navigation_success": competitor_data.get('navigation', {{}}).get('status') == 'success',
"screenshots_taken": competitor_data.get('screenshots_taken', 0),
"pages_explored": len(competitor_data.get('additional_pages', [])),
"forms_found": len(competitor_data.get('interactive_elements', {{}}).get('forms', []))
}}
}}
# Save raw data
with open(f'analysis/{{competitor_name.replace(" ", "_")}}_raw_data.json', 'w') as f:
json.dump(competitor_data, f, indent=2)
# Extract and analyze pricing if available
if competitor_data.get('pricing', {{}}).get('data'):
pricing_data = competitor_data['pricing']['data']
analysis['pricing_analysis'] = {{
'data_length': len(str(pricing_data)),
'extracted_successfully': True
}}
# Save pricing data
with open(f'analysis/{{competitor_name.replace(" ", "_")}}_pricing.txt', 'w') as f:
f.write(str(pricing_data))
# Extract and analyze features if available
if competitor_data.get('features', {{}}).get('data'):
features_data = competitor_data['features']['data']
analysis['features_analysis'] = {{
'data_length': len(str(features_data)),
'extracted_successfully': True
}}
# Save features data
with open(f'analysis/{{competitor_name.replace(" ", "_")}}_features.txt', 'w') as f:
f.write(str(features_data))
print(json.dumps(analysis, indent=2))
# Define created_files variable before using it
created_files = []
for file in ['raw_data.json', 'pricing.txt', 'features.txt']:
file_path = f'analysis/{{competitor_name.replace(" ", "_")}}_{{file}}'
if os.path.exists(file_path):
created_files.append(file_path)
print(f"\\nCreated {{len(created_files)}} analysis files for {{competitor_name}}:")
for file in created_files:
print(f" - {{file}}")
"""
# Execute using SDK
result = self.code_interpreter.invoke("executeCode", {
"code": analysis_code,
"language": "python"
})
output = self._extract_output(result)
try:
# Try to extract JSON from output
lines = output.split('\n')
for line in lines:
if line.strip().startswith('{'):
analysis_result = json.loads(line)
break
else:
analysis_result = {"raw_output": output}
except:
analysis_result = {"raw_output": output}
return {
"status": "success",
"analysis": analysis_result,
"competitor": competitor_name
}
except Exception as e:
console.print(f"[red]❌ Analysis error: {e}[/red]")
return {"status": "error", "error": str(e)}
def create_comparison_visualization(self, all_competitors_data: Dict) -> Dict:
"""Create comparison visualizations."""
try:
console.print("[cyan]📈 Creating visualizations...[/cyan]")
viz_code = f"""
import json
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os
import glob
from datetime import datetime
# Load all competitor data
all_data = {json.dumps(all_competitors_data)}
# Create figure
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('Competitive Intelligence Analysis - ' + datetime.now().strftime('%Y-%m-%d'), fontsize=16)
# Prepare summary data
competitors = list(all_data.keys())
success_rates = []
data_collected = []
screenshots = []
pages_explored = []
for comp in competitors:
comp_data = all_data[comp]
# Success rate
success = 1 if comp_data.get('status') == 'success' else 0
success_rates.append(success)
# Data collection
has_pricing = 1 if comp_data.get('pricing', {{}}).get('status') == 'success' else 0
has_features = 1 if comp_data.get('features', {{}}).get('status') == 'success' else 0
data_collected.append(has_pricing + has_features)
# Screenshots
screenshots.append(comp_data.get('screenshots_taken', 0))
# Pages explored
pages_explored.append(len(comp_data.get('additional_pages', [])))
# Plot 1: Success Rate
ax1 = axes[0, 0]
ax1.bar(competitors, success_rates, color='green', alpha=0.7)
ax1.set_title('Navigation Success Rate')
ax1.set_ylabel('Success (1) / Failure (0)')
ax1.set_ylim(0, 1.2)
# Plot 2: Data Collection
ax2 = axes[0, 1]
ax2.bar(competitors, data_collected, color='blue', alpha=0.7)
ax2.set_title('Data Points Collected')
ax2.set_ylabel('Count (Pricing + Features)')
ax2.set_ylim(0, 2.5)
# Plot 3: Pages Explored
ax3 = axes[1, 0]
ax3.bar(competitors, pages_explored, color='orange', alpha=0.7)
ax3.set_title('Additional Pages Explored')
ax3.set_ylabel('Page Count')
# Plot 4: Summary Table
ax4 = axes[1, 1]
ax4.axis('off')
# Create summary table
summary_data = []
for comp in competitors:
comp_data = all_data[comp]
summary_data.append([
comp,
'' if comp_data.get('status') == 'success' else '',
'' if comp_data.get('pricing', {{}}).get('status') == 'success' else '',
str(len(comp_data.get('additional_pages', [])))
])
table = ax4.table(cellText=summary_data,
colLabels=['Competitor', 'Nav', 'Pricing', 'Pages'],
cellLoc='center',
loc='center')
table.auto_set_font_size(False)
table.set_fontsize(10)
table.scale(1, 2)
plt.tight_layout()
plt.savefig('visualizations/competitive_analysis_dashboard.png', dpi=300, bbox_inches='tight')
plt.close()
# Create detailed comparison matrix
comparison_df = pd.DataFrame({{
'Competitor': competitors,
'Navigation': ['Success' if all_data[c].get('status') == 'success' else 'Failed' for c in competitors],
'Pricing Data': ['Collected' if all_data[c].get('pricing', {{}}).get('status') == 'success' else 'Missing' for c in competitors],
'Features Data': ['Collected' if all_data[c].get('features', {{}}).get('status') == 'success' else 'Missing' for c in competitors],
'Screenshots': [all_data[c].get('screenshots_taken', 0) for c in competitors],
'Pages Explored': [len(all_data[c].get('additional_pages', [])) for c in competitors]
}})
comparison_df.to_csv('analysis/comparison_matrix.csv', index=False)
print(f"Visualizations created successfully!")
print(f"Dashboard: visualizations/competitive_analysis_dashboard.png")
print(f"Matrix: analysis/comparison_matrix.csv")
print(f"Analyzed {{len(competitors)}} competitors")
# Verify files were created
created_files = []
for file_pattern in ['visualizations/*.png', 'analysis/*.csv']:
for file_path in glob.glob(file_pattern):
if os.path.isfile(file_path):
created_files.append(file_path)
print(f"\\nVerified {{len(created_files)}} files were created:")
for file in created_files:
print(f" - {{file}}")
"""
# Execute using SDK
result = self.code_interpreter.invoke("executeCode", {
"code": viz_code,
"language": "python"
})
output = self._extract_output(result)
return {
"status": "success",
"output": output,
"files_created": [
"visualizations/competitive_analysis_dashboard.png",
"analysis/comparison_matrix.csv"
]
}
except Exception as e:
console.print(f"[red]❌ Visualization error: {e}[/red]")
return {"status": "error", "error": str(e)}
def _extract_file_content(self, result: Dict) -> str:
"""Extract file content from readFiles result."""
for event in result.get("stream", []):
if "result" in event:
result_data = event["result"]
if "content" in result_data:
for item in result_data["content"]:
if item.get("type") == "text":
return item.get("text", "")
return ""
def generate_final_report(self, all_data: Dict, analysis_results: Dict) -> Dict:
"""Generate the final competitive intelligence report."""
try:
console.print("[cyan]📄 Generating final report...[/cyan]")
# Create the report string directly here in case all else fails
direct_report = f"# Competitive Intelligence Report\n\n"
direct_report += f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
direct_report += f"**Session ID:** {analysis_results.get('session_id', 'N/A')}\n\n"
direct_report += f"## Executive Summary\n\n"
direct_report += f"This report analyzes {len(all_data)} competitor websites.\n\n"
# Add sections for each competitor to direct report
for competitor, data in all_data.items():
direct_report += f"### {competitor}\n\n"
direct_report += f"**Website:** {data.get('url', 'N/A')}\n"
direct_report += f"**Pages Explored:** {len(data.get('additional_pages', []))}\n\n"
# Add pricing section if available
if data.get('pricing', {}).get('data'):
direct_report += f"#### Pricing Information\n\n"
pricing_data = str(data['pricing'].get('data', ''))[:500]
direct_report += f"{pricing_data}...\n\n"
# Add features section if available
if data.get('features', {}).get('data'):
direct_report += f"#### Product Features\n\n"
features_data = str(data['features'].get('data', ''))[:500]
direct_report += f"{features_data}...\n\n"
direct_report += "---\n\n"
# Ensure directory exists before creating report file
exec_code = """
import os
os.makedirs('reports', exist_ok=True)
# Create a simple report to verify file creation works
with open('reports/competitive_intelligence_report.md', 'w') as f:
f.write("# Test Report\\n\\nThis is a test report to verify file creation.")
# Check if the file was created
if os.path.exists('reports/competitive_intelligence_report.md'):
print("Test report file created successfully")
# Read it back to verify
with open('reports/competitive_intelligence_report.md', 'r') as f:
content = f.read()
print(f"File content length: {len(content)}")
else:
print("Failed to create test report file")
"""
# Execute the test first
console.print("[yellow]Testing file creation capability...[/yellow]")
test_result = self.code_interpreter.invoke("executeCode", {
"code": exec_code,
"language": "python"
})
test_output = self._extract_output(test_result)
console.print(f"[dim]Test output: {test_output}[/dim]")
# Now create the full report
report_code = f'''
import json
import os
from datetime import datetime
# Create directories
os.makedirs('reports', exist_ok=True)
# Generate markdown report
report_content = """# Competitive Intelligence Report
**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
**Session ID:** {analysis_results.get('session_id', 'N/A')}
## Executive Summary
This report analyzes {len(all_data)} competitor websites, examining their pricing strategies and product features.
## Analysis Results
### Overall Statistics
- Total Competitors Analyzed: {len(all_data)}
- Successful Site Visits: {sum(1 for d in all_data.values() if d.get('status') == 'success')}
- Pricing Data Collected: {sum(1 for d in all_data.values() if d.get('pricing', {}).get('status') == 'success')}
- Feature Data Collected: {sum(1 for d in all_data.values() if d.get('features', {}).get('status') == 'success')}
- Total Screenshots: {sum(d.get('screenshots_taken', 0) for d in all_data.values())}
- Total Pages Explored: {sum(len(d.get('additional_pages', [])) for d in all_data.values())}
## Detailed Competitor Analysis
"""
# Add sections for each competitor
for competitor, data in {json.dumps(all_data)}.items():
report_content += f"### {{competitor}}\\n\\n"
report_content += f"**Website:** {{data.get('url', 'N/A')}} \\n"
report_content += f"**Status:** {{data.get('status', 'Unknown')}} \\n"
report_content += f"**Analysis Time:** {{data.get('timestamp', 'N/A')}} \\n"
report_content += f"**Screenshots Taken:** {{data.get('screenshots_taken', 0)}}\\n"
report_content += f"**Pages Explored:** {{len(data.get('additional_pages', []))}}\\n\\n"
# Add pricing section
if data.get('pricing', {{}}).get('status') == 'success':
pricing_data_text = str(data['pricing'].get('data', 'No data'))
if len(pricing_data_text) > 500:
pricing_data_text = pricing_data_text[:500] + '...'
report_content += "#### Pricing Information\\n\\n"
report_content += pricing_data_text + "\\n\\n"
# Add features section
if data.get('features', {{}}).get('status') == 'success':
features_data_text = str(data['features'].get('data', 'No data'))
if len(features_data_text) > 500:
features_data_text = features_data_text[:500] + '...'
report_content += "#### Product Features\\n\\n"
report_content += features_data_text + "\\n\\n"
report_content += "---\\n\\n"
# Add final sections
report_content += """## Session Recording
All browser interactions have been recorded for compliance and review purposes.
The recording includes all navigation, data extraction, and screenshot activities.
---
*End of Report*
"""
# Save the report file
with open('reports/competitive_intelligence_report.md', 'w') as f:
f.write(report_content)
print(f"Report saved to: reports/competitive_intelligence_report.md")
print(f"Report length: {{len(report_content)}} characters")
# Print the entire report content for direct capture
print("\\nREPORT_CONTENT_BEGIN")
print(report_content)
print("REPORT_CONTENT_END")
'''
# Execute the code to generate the report
console.print("[cyan]Creating report file...[/cyan]")
result = self.code_interpreter.invoke("executeCode", {
"code": report_code,
"language": "python"
})
output = self._extract_output(result)
console.print(f"[dim]Report generation output: {output[:200]}...[/dim]")
# Extract the report content from the output
report_content = None
if "REPORT_CONTENT_BEGIN" in output and "REPORT_CONTENT_END" in output:
try:
start_marker = "REPORT_CONTENT_BEGIN"
end_marker = "REPORT_CONTENT_END"
start_pos = output.find(start_marker) + len(start_marker)
end_pos = output.find(end_marker)
if start_pos > 0 and end_pos > start_pos:
report_content = output[start_pos:end_pos].strip()
console.print(f"[green]✅ Extracted report content ({len(report_content)} chars)[/green]")
except Exception as e:
console.print(f"[yellow]Error extracting report from markers: {e}[/yellow]")
# If we couldn't extract the report from output, use our direct report
if not report_content or len(report_content) < 100: # Sanity check
console.print("[yellow]Using direct report as fallback...[/yellow]")
report_content = direct_report
# Also try to save it directly
try:
save_code = f'''
import os
os.makedirs('reports', exist_ok=True)
with open('reports/competitive_intelligence_report.md', 'w') as f:
f.write("""{direct_report}""")
print("Direct report saved successfully")
'''
self.code_interpreter.invoke("executeCode", {
"code": save_code,
"language": "python"
})
except Exception as e:
console.print(f"[yellow]Failed to save direct report: {e}[/yellow]")
# Track files created during report generation
file_tracking_code = """
import os
created_files = []
# Check common directories
for dir_name in ['reports', 'analysis', 'visualizations', 'data', 'sessions']:
if os.path.exists(dir_name):
for file in os.listdir(dir_name):
file_path = os.path.join(dir_name, file)
if os.path.isfile(file_path):
created_files.append(file_path)
# List all created files
print("\\nFiles created:")
for file in sorted(created_files):
print(f" - {file}")
"""
track_result = self.code_interpreter.invoke("executeCode", {
"code": file_tracking_code,
"language": "python"
})
track_output = self._extract_output(track_result)
console.print(f"[dim]{track_output}[/dim]")
return {
"status": "success",
"report_content": report_content,
"output": output,
"report_path": "reports/competitive_intelligence_report.md"
}
except Exception as e:
console.print(f"[red]❌ Report generation error: {e}[/red]")
import traceback
traceback.print_exc()
# Create minimal report as fallback
fallback_report = f"# Competitive Intelligence Report\n\n"
fallback_report += f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
fallback_report += f"**Error encountered during report generation:** {str(e)}\n\n"
fallback_report += f"Analyzed {len(all_data)} competitors.\n\n"
return {
"status": "error",
"error": str(e),
"report_content": fallback_report,
"report_path": "reports/competitive_intelligence_report.md"
}
def cleanup(self):
"""Clean up CodeInterpreter session."""
if self.session_active:
console.print("[yellow]🧹 Cleaning up CodeInterpreter...[/yellow]")
try:
self.code_interpreter.stop()
self.session_active = False
console.print("✅ CodeInterpreter cleaned up")
except Exception as e:
console.print(f"[yellow]Warning: Error cleaning up CodeInterpreter: {e}[/yellow]")