import json import os from typing import Dict, Any, Optional, List from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import asyncio import uuid from dotenv import load_dotenv import boto3 from botocore.exceptions import NoCredentialsError, ProfileNotFound from botocore.config import Config from contextlib import asynccontextmanager import time from functools import lru_cache # Load environment variables load_dotenv() # Global cache for AWS session and agents _aws_session_cache = None _agents_cache = {} _model_cache = {} # Global variables for AWS session and region aws_session = None aws_region = None @lru_cache(maxsize=1) def get_aws_credentials(): """Cached AWS credentials setup""" aws_profile = os.getenv('AWS_PROFILE', 'default') aws_region = os.getenv('AWS_REGION', 'us-east-1') print("๐Ÿ” Setting up AWS credentials...") # Try AWS profile first try: session = boto3.Session(profile_name=aws_profile, region_name=aws_region) # Test the credentials sts = session.client('sts') identity = sts.get_caller_identity() print(f"โœ… Using AWS profile: {aws_profile}") print(f" Account: {identity.get('Account', 'Unknown')}") print(f" User/Role: {identity.get('Arn', 'Unknown').split('/')[-1]}") print(f" Region: {aws_region}") # CRITICAL FIX: Set environment variables to match profile credentials # This ensures AgentCore uses the same credentials credentials = session.get_credentials() if credentials: os.environ['AWS_ACCESS_KEY_ID'] = credentials.access_key os.environ['AWS_SECRET_ACCESS_KEY'] = credentials.secret_key if credentials.token: os.environ['AWS_SESSION_TOKEN'] = credentials.token else: # Remove session token if not present to avoid conflicts os.environ.pop('AWS_SESSION_TOKEN', None) os.environ['AWS_DEFAULT_REGION'] = aws_region print("โœ… Environment variables synchronized with profile credentials") return session, aws_region except ProfileNotFound: print(f"โš ๏ธ AWS profile '{aws_profile}' not found, trying access keys...") except NoCredentialsError: print(f"โš ๏ธ No credentials found for profile '{aws_profile}', trying access keys...") except Exception as e: print(f"โš ๏ธ Profile authentication failed: {e}, trying access keys...") # Fallback to access keys (but warn about potential issues) aws_access_key = os.getenv('AWS_ACCESS_KEY_ID') aws_secret_key = os.getenv('AWS_SECRET_ACCESS_KEY') if aws_access_key and aws_secret_key: try: session = boto3.Session( aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key, region_name=aws_region ) # Test the credentials sts = session.client('sts') identity = sts.get_caller_identity() print(f"โœ… Using AWS access keys") print(f" Account: {identity.get('Account', 'Unknown')}") print(f" Access Key: {aws_access_key[:8]}...") print(f" Region: {aws_region}") print("โš ๏ธ Note: Using access keys - ensure AgentCore permissions are attached to this user") return session, aws_region except Exception as e: print(f"โŒ Access key authentication failed: {e}") raise Exception(f"AWS authentication failed: {e}") else: print("โŒ No AWS access keys found in environment variables") raise Exception("No AWS credentials available. Please configure AWS profile or set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY") # Import strands-agents framework - handle both installed and local versions try: from strands import Agent, tool from strands.models import BedrockModel print("โœ“ Using strands-agents framework") except ImportError: # Try to import from parent directory (local strands) import sys import os parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) strands_path = os.path.join(parent_dir, '..') if strands_path not in sys.path: sys.path.insert(0, strands_path) try: from strands import Agent, tool from strands.models import BedrockModel print("โœ“ Using local strands framework") except ImportError as e: print(f"โŒ Failed to import strands framework: {e}") print("Please ensure strands-agents is installed: pip install strands-agents") raise # Import AgentCore for code interpreter from bedrock_agentcore.tools.code_interpreter_client import code_session @asynccontextmanager async def lifespan(app: FastAPI): # Startup global aws_session, aws_region aws_session, aws_region = setup_aws_credentials() initialize_agents() yield # Shutdown (if needed) pass app = FastAPI( title="AgentCore Code Interpreter", version="1.0.0", lifespan=lifespan ) # Enable CORS for React frontend app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:3000"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Pydantic models for request/response class CodeGenerationRequest(BaseModel): prompt: str session_id: Optional[str] = None class InteractiveCodeExecutionRequest(BaseModel): code: str session_id: Optional[str] = None inputs: Optional[List[str]] = None # Pre-provided inputs for interactive code class CodeExecutionRequest(BaseModel): code: str session_id: Optional[str] = None interactive: Optional[bool] = False inputs: Optional[List[str]] = None class FileUploadRequest(BaseModel): filename: str content: str session_id: Optional[str] = None # Session management class CodeInterpreterSession: def __init__(self, session_id: str): self.session_id = session_id self.conversation_history = [] self.code_history = [] self.execution_results = [] self.interactive_sessions = {} # Track interactive execution sessions self.uploaded_csv = None # Store uploaded CSV file data # Global variables for agents code_generator_agent = None code_executor_agent = None executor_type = "unknown" # Track which executor type we're using active_sessions = {} def clean_output_for_display(output: str) -> str: """Clean output for display by removing image binary data while preserving analysis text""" if not output: return output # If output contains IMAGE_DATA, extract everything except the binary if 'IMAGE_DATA:' in output: parts = output.split('IMAGE_DATA:') cleaned_parts = [] # Add the part before IMAGE_DATA if parts[0].strip(): cleaned_parts.append(parts[0].strip()) # Process parts after IMAGE_DATA for i in range(1, len(parts)): # Split on newline to separate binary from any following text lines = parts[i].split('\n', 1) if len(lines) > 1: # Skip the binary line, keep any text after it remaining_text = lines[1].strip() if remaining_text and not remaining_text.startswith(('iVBOR', '/9j/', 'data:')): cleaned_parts.append(remaining_text) if cleaned_parts: result = '\n\n'.join(cleaned_parts) print(f"๐Ÿงน Cleaned output: removed image binary, kept {len(result)} chars of text") return result else: return "Code executed successfully - chart generated" return output def extract_image_data(execution_result: str): """Extract base64 image data from execution results - fixed for AgentCore format""" try: import re import base64 images = [] print(f"๐Ÿ” Image extraction - Input length: {len(execution_result)}") print(f"๐Ÿ” Contains IMAGE_DATA: {'IMAGE_DATA:' in execution_result}") if 'IMAGE_DATA:' in execution_result: # Find all IMAGE_DATA: patterns in the text # AgentCore puts the full base64 string in stdout, so we need a greedy pattern pattern = r'IMAGE_DATA:([A-Za-z0-9+/=\n\r\s]+?)(?=\n[A-Za-z]|\nBase64|\n$|$)' matches = re.findall(pattern, execution_result, re.MULTILINE | re.DOTALL) print(f"๐Ÿ” Regex matches found: {len(matches)}") for i, match in enumerate(matches): try: # Clean up the base64 string - remove all whitespace and newlines clean_match = re.sub(r'[\s\n\r]', '', match) print(f"๐Ÿ” Match {i+1} - Original length: {len(match)}, Clean length: {len(clean_match)}") print(f"๐Ÿ” Match {i+1} - Starts with: {clean_match[:50]}...") # Must be reasonable length for an image (at least 1KB when decoded) if len(clean_match) > 1000: # Validate it's valid base64 and can be decoded decoded = base64.b64decode(clean_match) print(f"๐Ÿ” Match {i+1} - Decoded length: {len(decoded)} bytes") # Check if it looks like a PNG (starts with PNG signature) if decoded.startswith(b'\x89PNG\r\n\x1a\n'): images.append({ 'format': 'png', 'data': clean_match, 'source': 'agentcore_stdout' }) print(f"โœ… Match {i+1} - Valid PNG image extracted") # Also check for JPEG signatures elif decoded.startswith(b'\xff\xd8\xff'): images.append({ 'format': 'jpeg', 'data': clean_match, 'source': 'agentcore_stdout' }) print(f"โœ… Match {i+1} - Valid JPEG image extracted") else: print(f"โš ๏ธ Match {i+1} - Invalid image signature") else: print(f"โš ๏ธ Match {i+1} - Too short to be valid image") except Exception as e: print(f"โŒ Match {i+1} - Extraction error: {e}") continue print(f"๐ŸŽฏ Final result: {len(images)} images extracted") return images except Exception as e: print(f"โŒ Image extraction error: {e}") return [] def upload_files_to_agentcore_sandbox(files_data: list, aws_region: str) -> bool: """Upload files to AgentCore sandbox using writeFiles tool""" try: print(f"๐Ÿ”ง Uploading {len(files_data)} files to AgentCore sandbox...") with code_session(aws_region) as code_client: response = code_client.invoke("writeFiles", {"content": files_data}) for event in response["stream"]: result = event.get("result", {}) if result.get("isError", False): error_content = result.get("content", [{}]) error_text = error_content[0].get("text", "Unknown error") if error_content else "Unknown error" print(f"โŒ File upload error: {error_text}") return False else: content = result.get("content", []) for item in content: if item.get("type") == "text": print(f"โœ… File upload result: {item.get('text', '')}") return True return False except Exception as e: print(f"โŒ File upload failed: {str(e)}") return False def execute_chart_code_direct(code: str, session_files: list = None) -> tuple[str, list]: """Execute chart code directly with AgentCore to preserve full base64 output""" try: print(f"\n๐ŸŽจ Direct AgentCore chart execution") print(f"๐Ÿ“ Code length: {len(code)} characters") # Clean the code to remove any markdown formatting clean_code = extract_python_code_from_prompt(code) print(f"๐Ÿ”ง Clean code length: {len(clean_code)} characters") with code_session(aws_region) as code_client: # Upload files to sandbox if provided if session_files: print(f"๐Ÿ“ Uploading {len(session_files)} files to sandbox...") files_data = [] for file_info in session_files: files_data.append({ "path": file_info['filename'], "text": file_info['content'] }) # Upload files using writeFiles tool upload_response = code_client.invoke("writeFiles", {"content": files_data}) for event in upload_response["stream"]: result = event.get("result", {}) if result.get("isError", False): error_content = result.get("content", [{}]) error_text = error_content[0].get("text", "Unknown error") if error_content else "Unknown error" print(f"โŒ File upload error: {error_text}") return f"File upload failed: {error_text}", [] else: content = result.get("content", []) for item in content: if item.get("type") == "text": print(f"โœ… File upload: {item.get('text', '')}") # Execute the cleaned code response = code_client.invoke("executeCode", { "code": clean_code, "language": "python", "clearContext": False }) # Process response directly without Strands-Agents truncation output_parts = [] full_stdout = "" for event in response["stream"]: result = event.get("result", {}) if result.get("isError", False): error_content = result.get("content", [{}]) error_text = error_content[0].get("text", "Unknown error") if error_content else "Unknown error" print(f"โŒ Direct execution error: {error_text}") return f"Error: {error_text}", [] # Extract structured content structured_content = result.get("structuredContent", {}) stdout = structured_content.get("stdout", "") stderr = structured_content.get("stderr", "") if stdout: output_parts.append(stdout) full_stdout += stdout print(f"๐Ÿ“ค Direct stdout captured: {len(stdout)} characters") if stderr: output_parts.append(f"Errors: {stderr}") print(f"โš ๏ธ Direct stderr: {stderr}") # Combine output final_output = "\n".join(output_parts) if output_parts else "Code executed successfully" # Extract images directly from full stdout images = extract_image_data(full_stdout) # Clean the output for display (remove image binary but keep analysis text) display_output = clean_output_for_display(final_output) print(f"โœ… Direct execution completed:") print(f" Output length: {len(final_output)}") print(f" Display output length: {len(display_output)}") print(f" Images extracted: {len(images)}") return display_output, images except Exception as e: print(f"โŒ Direct AgentCore execution failed: {str(e)}") import traceback print(f"๐Ÿ“‹ Traceback: {traceback.format_exc()}") return f"Direct execution failed: {str(e)}", [] def detect_chart_code(code: str) -> bool: """Detect if code contains interactive elements like input() calls""" interactive_patterns = [ 'input(', 'raw_input(', 'sys.stdin.read', 'getpass.getpass', ] code_lower = code.lower() return any(pattern in code_lower for pattern in interactive_patterns) def prepare_interactive_code(code: str, inputs: List[str]) -> str: """Prepare code for execution with pre-provided inputs""" if not inputs: return code # Create a mock input function that uses pre-provided inputs input_setup = f""" # Interactive input simulation _provided_inputs = {inputs} _input_index = 0 def input(prompt=''): global _input_index, _provided_inputs if prompt: print(prompt, end='') if _input_index < len(_provided_inputs): response = _provided_inputs[_input_index] _input_index += 1 print(response) # Echo the input return response else: print("No more inputs provided") return "" # Override built-in input __builtins__['input'] = input """ return input_setup + "\n" + code def extract_text_from_agent_result(agent_result) -> str: """Extract clean text content from Strands-Agents AgentResult object""" if not agent_result: return "" try: # Try to access the message attribute first if hasattr(agent_result, 'message'): message = agent_result.message print(f"๐Ÿ” AgentResult.message type: {type(message)}") # If message is a dict with content structure if isinstance(message, dict): if 'content' in message and isinstance(message['content'], list): # Extract text from content array text_parts = [] for item in message['content']: if isinstance(item, dict) and 'text' in item: text_parts.append(item['text']) if text_parts: full_text = '\n'.join(text_parts) print(f"โœ… Extracted text from message.content array") # Extract actual execution output from AI commentary actual_output = extract_execution_output_from_ai_response(full_text) return actual_output # If message has direct text content if 'text' in message: full_text = str(message['text']) print(f"โœ… Extracted text from message.text") actual_output = extract_execution_output_from_ai_response(full_text) return actual_output # If message is a string if isinstance(message, str): print(f"โœ… Using message as string") actual_output = extract_execution_output_from_ai_response(message) return actual_output # Try other attributes if hasattr(agent_result, 'content'): content = agent_result.content if isinstance(content, str): print(f"โœ… Using content attribute") actual_output = extract_execution_output_from_ai_response(content) return actual_output if hasattr(agent_result, 'text'): text = agent_result.text if isinstance(text, str): print(f"โœ… Using text attribute") actual_output = extract_execution_output_from_ai_response(text) return actual_output # Fallback to string conversion result = str(agent_result) print(f"โš ๏ธ Using str() fallback") actual_output = extract_execution_output_from_ai_response(result) return actual_output except Exception as e: print(f"โŒ Error extracting text from AgentResult: {e}") return str(agent_result) if agent_result else "" def extract_execution_output_from_ai_response(ai_response: str) -> str: """Extract the actual execution output from AI's commentary, prioritizing analysis text over raw output""" import re # For CSV analysis, prioritize AI analysis text over raw execution output if any(keyword in ai_response.lower() for keyword in ['dataset', 'dataframe', 'csv', 'analysis', 'statistics']): # Check if response contains IMAGE_DATA (indicating chart generation) if 'IMAGE_DATA:' in ai_response: # For chart generation, extract everything EXCEPT the image binary parts = ai_response.split('IMAGE_DATA:') if len(parts) > 1: # Take the part before IMAGE_DATA and any analysis after before_image = parts[0].strip() # Look for analysis text after the image data after_parts = parts[1].split('\n', 1) if len(after_parts) > 1: after_image = after_parts[1].strip() if after_image and not after_image.startswith(('iVBOR', '/9j/', 'data:')): combined_analysis = f"{before_image}\n\n{after_image}".strip() if combined_analysis: print(f"๐ŸŽฏ Extracted analysis text (excluding image binary): {len(combined_analysis)} chars") return combined_analysis # If no analysis after image, return the part before if before_image: print(f"๐ŸŽฏ Extracted analysis text before image: {len(before_image)} chars") return before_image # If it's data analysis without charts, prefer AI commentary over raw output if any(phrase in ai_response.lower() for phrase in [ 'analysis shows', 'data reveals', 'statistics indicate', 'summary:', 'insights:' ]): print(f"๐ŸŽฏ Using AI analysis commentary for data analysis: {len(ai_response)} chars") return ai_response # Pattern 1: Look for code blocks with output (for non-analysis cases) code_block_patterns = [ r'```\s*\n(.*?)\n```', # ``` ... ``` r'```[a-zA-Z]*\s*\n(.*?)\n```', # ```python ... ``` or similar ] for pattern in code_block_patterns: matches = re.findall(pattern, ai_response, re.DOTALL) if matches: output = matches[0].strip() # Skip if it's just image binary if not output.startswith(('iVBOR', '/9j/', 'IMAGE_DATA:')): print(f"๐ŸŽฏ Extracted output from code block: {len(output)} chars") return output # Pattern 2: Look for "output:" or "result:" sections output_patterns = [ r'(?:output|result):\s*\n(.*?)(?:\n\n|\n[A-Z]|$)', r'(?:complete output|execution output):\s*\n(.*?)(?:\n\n|\n[A-Z]|$)', ] for pattern in output_patterns: matches = re.findall(pattern, ai_response, re.DOTALL | re.IGNORECASE) if matches: output = matches[0].strip() if not output.startswith(('iVBOR', '/9j/', 'IMAGE_DATA:')): print(f"๐ŸŽฏ Extracted output from output section: {len(output)} chars") return output # Fallback: return the original response (but clean up image binary if present) if 'IMAGE_DATA:' in ai_response: cleaned = ai_response.split('IMAGE_DATA:')[0].strip() if cleaned: print(f"๐ŸŽฏ Cleaned response (removed image binary): {len(cleaned)} chars") return cleaned print(f"โš ๏ธ Using original AI response as-is: {len(ai_response)} chars") return ai_response def extract_python_code_from_prompt(input_text: str) -> str: """Extract clean Python code from markdown-formatted prompts or raw code""" import re # If the input contains markdown code blocks, extract the Python code if '```python' in input_text or '```' in input_text: # Pattern to match Python code blocks patterns = [ r'```python\s*\n(.*?)\n```', # ```python ... ``` r'```\s*\n(.*?)\n```', # ``` ... ``` ] for pattern in patterns: matches = re.findall(pattern, input_text, re.DOTALL) if matches: # Return the first match (the actual Python code) clean_code = matches[0].strip() print(f"๐Ÿ”ง Extracted Python code from markdown block") return clean_code # If no markdown blocks found, check if it's a prompt with code if 'Execute this Python code' in input_text or 'python code' in input_text.lower(): # Try to extract code after common prompt phrases lines = input_text.split('\n') code_lines = [] in_code_section = False for line in lines: # Skip prompt text and markdown if any(phrase in line.lower() for phrase in [ 'execute this python code', 'python code', 'use the tool', 'return the complete output', '```' ]): continue # If line looks like Python code, include it if line.strip() and ( line.startswith('import ') or line.startswith('from ') or line.startswith('def ') or line.startswith('class ') or line.startswith('if ') or line.startswith('for ') or line.startswith('while ') or line.startswith('try:') or line.startswith('with ') or '=' in line or line.startswith('print(') or line.startswith(' ') # Indented line ): in_code_section = True code_lines.append(line) elif in_code_section and line.strip() == '': code_lines.append(line) # Keep empty lines within code elif in_code_section and not line.strip(): continue elif in_code_section: # If we were in code section and hit non-code, we might be done break if code_lines: clean_code = '\n'.join(code_lines).strip() print(f"๐Ÿ”ง Extracted Python code from prompt text") return clean_code # If no special formatting detected, return as-is (assume it's already clean code) print(f"๐Ÿ”ง Using input as-is (no markdown formatting detected)") return input_text.strip() @tool def execute_python_code(code: str, description: str = "", files: list = None) -> str: """Execute Python code using AgentCore CodeInterpreter - reliable execution with proper output capture and file support""" # Extract clean Python code from markdown-formatted input clean_code = extract_python_code_from_prompt(code) if description: clean_code = f"# {description}\n{clean_code}" print(f"\n๐Ÿ”ง Original input length: {len(code)}") print(f"๐Ÿ”ง Clean code length: {len(clean_code)}") print(f"๐Ÿ”ง Files provided: {len(files) if files else 0}") print(f"๐Ÿ”ง Clean code preview: {clean_code[:200]}...") try: with code_session(aws_region) as code_client: # Upload files to sandbox if provided if files: print(f"๐Ÿ“ Uploading {len(files)} files to sandbox...") files_data = [] for file_info in files: files_data.append({ "path": file_info.get('filename', 'uploaded_file.csv'), "text": file_info.get('content', '') }) # Upload files using writeFiles tool upload_response = code_client.invoke("writeFiles", {"content": files_data}) for event in upload_response["stream"]: result = event.get("result", {}) if result.get("isError", False): error_content = result.get("content", [{}]) error_text = error_content[0].get("text", "Unknown error") if error_content else "Unknown error" print(f"โŒ File upload error: {error_text}") return f"File upload failed: {error_text}" else: content = result.get("content", []) for item in content: if item.get("type") == "text": print(f"โœ… File upload: {item.get('text', '')}") # Execute the code response = code_client.invoke("executeCode", { "code": clean_code, "language": "python", "clearContext": False }) # Process the response stream to capture all output output_parts = [] for event in response["stream"]: result = event.get("result", {}) if result.get("isError", False): error_content = result.get("content", [{}]) error_text = error_content[0].get("text", "Unknown error") if error_content else "Unknown error" print(f"โŒ AgentCore execution error: {error_text}") return f"Error: {error_text}" # Extract structured content (stdout, stderr) structured_content = result.get("structuredContent", {}) stdout = structured_content.get("stdout", "") stderr = structured_content.get("stderr", "") if stdout: output_parts.append(stdout) print(f"๐Ÿ“ค Stdout captured: {len(stdout)} characters") if stderr: output_parts.append(f"Errors: {stderr}") print(f"โš ๏ธ Stderr captured: {len(stderr)} characters") # Combine all output final_output = "\n".join(output_parts) if output_parts else "Code executed successfully (no output)" print(f"โœ… AgentCore execution completed - Output length: {len(final_output)}") return final_output except Exception as e: print(f"โŒ AgentCore execution error: {str(e)}") import traceback print(f"๐Ÿ“‹ Full traceback: {traceback.format_exc()}") return f"Execution failed: {str(e)}" @lru_cache(maxsize=1) def get_extended_botocore_config(): """Get BotocoreConfig with extended timeouts for long-running code execution This configuration is essential for complex code execution that may take several minutes. Based on Strands Agents documentation: https://strandsagents.com/1.0.x/documentation/docs/user-guide/concepts/model-providers/amazon-bedrock/ """ # Get timeout values from environment variables with sensible defaults read_timeout = int(os.getenv('AWS_READ_TIMEOUT', '600')) # 10 minutes default connect_timeout = int(os.getenv('AWS_CONNECT_TIMEOUT', '120')) # 2 minutes default max_retries = int(os.getenv('AWS_MAX_RETRIES', '5')) # 5 retries default return Config( read_timeout=read_timeout, connect_timeout=connect_timeout, retries={ 'max_attempts': max_retries, 'mode': 'adaptive' }, max_pool_connections=50 ) @lru_cache(maxsize=3) def create_bedrock_model_with_fallback(aws_region: str): """Create BedrockModel with Claude Sonnet 3.7 primary and Nova Premier fallback using inference profiles - cached""" cache_key = f"model_{aws_region}" if cache_key in _model_cache: print(f"โœ… Using cached model for region {aws_region}") return _model_cache[cache_key] # Primary model: Claude Sonnet 3.7 (Inference Profile) primary_model_id = "us.anthropic.claude-3-7-sonnet-20250219-v1:0" fallback_model_id = "us.amazon.nova-premier-v1:0" default_model_id = "anthropic.claude-3-5-sonnet-20241022-v2:0" print(f"๐Ÿค– Attempting to use primary inference profile: {primary_model_id}") # Try primary model (inference profile) try: primary_model = BedrockModel( model_id=primary_model_id, aws_region=aws_region, botocore_config=get_extended_botocore_config() ) print(f"โœ… Primary inference profile {primary_model_id} initialized successfully") result = (primary_model, primary_model_id) _model_cache[cache_key] = result return result except Exception as e: print(f"โš ๏ธ Primary inference profile failed: {e}") print(f"๐Ÿ”„ Trying fallback inference profile: {fallback_model_id}") # Try fallback model (inference profile) try: fallback_model = BedrockModel( model_id=fallback_model_id, aws_region=aws_region, botocore_config=get_extended_botocore_config() ) print(f"โœ… Fallback inference profile {fallback_model_id} initialized successfully") result = (fallback_model, fallback_model_id) _model_cache[cache_key] = result return result except Exception as fallback_error: print(f"โš ๏ธ Fallback inference profile failed: {fallback_error}") print(f"๐Ÿ”„ Using default model as last resort: {default_model_id}") # Last resort: standard model (not inference profile) try: default_model = BedrockModel( model_id=default_model_id, aws_region=aws_region, botocore_config=get_extended_botocore_config() ) print(f"โœ… Default model {default_model_id} initialized") result = (default_model, default_model_id) _model_cache[cache_key] = result return result except Exception as final_error: raise Exception(f"All model initialization attempts failed: {final_error}") def setup_aws_credentials(): """Setup AWS credentials - uses cached version""" global _aws_session_cache if _aws_session_cache: print("โœ… Using cached AWS session") return _aws_session_cache result = get_aws_credentials() _aws_session_cache = result return result def initialize_agents(): """Initialize agents using strands-agents with AgentCore CodeInterpreter tool - cached""" global code_generator_agent, code_executor_agent, executor_type, current_model_id # Check cache first if 'code_generator_agent' in _agents_cache and 'code_executor_agent' in _agents_cache: print("โœ… Using cached agents") code_generator_agent = _agents_cache['code_generator_agent'] code_executor_agent = _agents_cache['code_executor_agent'] current_model_id = _agents_cache['current_model_id'] executor_type = _agents_cache['executor_type'] return if not aws_session: raise Exception("AWS session not available. Check AWS credentials.") try: print("๐Ÿค– Initializing agents...") # Initialize Bedrock model with fallback logic bedrock_model, model_id = create_bedrock_model_with_fallback(aws_region) print(f"๐ŸŽฏ Using model: {model_id}") # Initialize Code Generator Agent using strands-agents code_generator_agent = Agent( model=bedrock_model, system_prompt=f"""You are a Python code generator specialist powered by {model_id}. Your role is to: 1. Generate clean, well-commented Python code based on user requirements 2. Follow Python best practices and PEP 8 style guidelines 3. Include appropriate error handling where needed 4. Only return executable Python code without explanations or markdown formatting 5. Make sure the code is complete and runnable 6. Do not include any text before or after the code Focus on creating practical, efficient code that solves the user's specific problem. Return ONLY the Python code, no explanations, no markdown, no additional text.""" ) # Test AgentCore availability with code_session(aws_region) as test_client: test_response = test_client.invoke("executeCode", { "code": "print('AgentCore initialization test successful')", "language": "python", "clearContext": True }) # AgentCore is working - create executor agent with AgentCore tool executor_type = "agentcore" # Create Code Executor Agent with AgentCore tool - following the sample system prompt SYSTEM_PROMPT = f"""You are a helpful AI assistant powered by {model_id} that validates all answers through code execution. VALIDATION PRINCIPLES: 1. When making claims about code, algorithms, or calculations - write code to verify them 2. Use execute_python_code to test mathematical calculations, algorithms, and logic 3. Create test scripts to validate your understanding before giving answers 4. Always show your work with actual code execution 5. If uncertain, explicitly state limitations and validate what you can APPROACH: - If asked about a programming concept, implement it in code to demonstrate - If asked for calculations, compute them programmatically AND show the code - If implementing algorithms, include test cases to prove correctness - Document your validation process for transparency - The sandbox maintains state between executions, so you can refer to previous results TOOL AVAILABLE: - execute_python_code: Run Python code and see output RESPONSE FORMAT: The execute_python_code tool returns execution results including stdout, stderr, and any errors.""" code_executor_agent = Agent( model=bedrock_model, tools=[execute_python_code], system_prompt=SYSTEM_PROMPT ) print("โœ… Agents initialized successfully:") print(f" - Code Generator: Strands-Agents Agent with {model_id}") print(f" - Code Executor: Strands-Agents Agent with {model_id} + AgentCore CodeInterpreter") # Cache the agents current_model_id = model_id _agents_cache['code_generator_agent'] = code_generator_agent _agents_cache['code_executor_agent'] = code_executor_agent _agents_cache['current_model_id'] = current_model_id _agents_cache['executor_type'] = executor_type except Exception as e: print(f"โŒ Error initializing agents: {str(e)}") print(" Make sure you have bedrock-agentcore permissions") raise e # Startup is now handled by lifespan context manager def get_or_create_session(session_id: Optional[str] = None) -> CodeInterpreterSession: """Get existing session or create new one""" if session_id is None: session_id = str(uuid.uuid4()) if session_id not in active_sessions: active_sessions[session_id] = CodeInterpreterSession(session_id) return active_sessions[session_id] # Utility functions for code analysis def detect_chart_code(code: str) -> bool: """Detect if code contains chart/visualization generation""" chart_indicators = [ 'plt.', 'matplotlib', 'seaborn', 'plotly', 'sns.', 'plt.show()', 'plt.savefig(', 'fig.show()', 'IMAGE_DATA:', 'base64.b64encode', 'io.BytesIO' ] code_lower = code.lower() return any(indicator.lower() in code_lower for indicator in chart_indicators) def detect_interactive_code(code: str) -> bool: """Detect if code requires interactive input""" interactive_patterns = [ 'input(', 'raw_input(', 'getpass.getpass(', 'sys.stdin.read', 'input =', 'user_input' ] code_lower = code.lower() return any(pattern.lower() in code_lower for pattern in interactive_patterns) def prepare_interactive_code(code: str, inputs: list) -> str: """Prepare interactive code with pre-provided inputs - OPTIMIZED for faster execution""" if not inputs: return code # OPTIMIZATION: More efficient input replacement input_setup = f"""# Pre-provided inputs (optimized) _inputs = {inputs} _input_index = 0 def input(prompt=''): global _input_index if _input_index < len(_inputs): value = _inputs[_input_index] _input_index += 1 print(prompt + str(value)) return value return '' """ return input_setup + code @app.post("/api/generate-code") async def generate_code(request: CodeGenerationRequest): """Generate Python code using the strands-agents code generator agent""" try: session = get_or_create_session(request.session_id) # Check if prompt mentions files but no CSV is uploaded file_keywords = ['file', 'csv', 'data', 'dataset', 'load', 'read', 'import', 'upload'] mentions_file = any(keyword in request.prompt.lower() for keyword in file_keywords) if mentions_file and not session.uploaded_csv: return { "success": False, "requires_file": True, "message": "Your request mentions working with files. Please upload a CSV file first.", "session_id": session.session_id } # Prepare prompt with CSV context if available enhanced_prompt = request.prompt # Check if the request involves visualization/charts chart_keywords = ['plot', 'chart', 'graph', 'visualiz', 'histogram', 'scatter', 'bar chart', 'line chart', 'pie chart', 'heatmap', 'matplotlib', 'seaborn', 'plotly'] needs_visualization = any(keyword in request.prompt.lower() for keyword in chart_keywords) if session.uploaded_csv: csv_info = f""" You have access to a CSV file named '{session.uploaded_csv['filename']}' with the following content preview: ```csv {session.uploaded_csv['content'][:1000]}{'...' if len(session.uploaded_csv['content']) > 1000 else ''} ``` When generating code, assume this CSV data is available and can be loaded using pandas.read_csv() or similar methods. Use the filename '{session.uploaded_csv['filename']}' in your code. User request: {request.prompt} """ enhanced_prompt = csv_info # Add chart rendering instructions if visualization is needed if needs_visualization: chart_instructions = """ IMPORTANT: For reliable chart rendering in the web interface, use this approach: ```python import matplotlib.pyplot as plt import numpy as np import base64 import io # Create your plot x = np.linspace(0, 10, 100) y = np.sin(x) plt.figure(figsize=(10, 6)) plt.plot(x, y) plt.title('Sine Wave') plt.xlabel('X') plt.ylabel('Y') plt.grid(True) # Save and capture the plot for web display buffer = io.BytesIO() plt.savefig(buffer, format='png', dpi=150, bbox_inches='tight') buffer.seek(0) image_base64 = base64.b64encode(buffer.read()).decode('utf-8') plt.close() # Close to free memory # Output the image data for web interface print(f"IMAGE_DATA:{image_base64}") print("Chart generated successfully!") ``` This ensures your charts are properly displayed in the web interface. """ enhanced_prompt += chart_instructions # Use the strands-agents agent for code generation agent_result = code_generator_agent(enhanced_prompt) # Extract string content from AgentResult generated_code = str(agent_result) if agent_result is not None else "" # Store generation in session history session.conversation_history.append({ "type": "generation", "prompt": request.prompt, "enhanced_prompt": enhanced_prompt if session.uploaded_csv else None, "generated_code": generated_code, "agent": "strands_code_generator", "csv_used": session.uploaded_csv['filename'] if session.uploaded_csv else None, "timestamp": time.time() }) return { "success": True, "code": generated_code, "session_id": session.session_id, "agent_used": "strands_code_generator", "csv_file_used": session.uploaded_csv['filename'] if session.uploaded_csv else None } except Exception as e: raise HTTPException(status_code=500, detail=f"Code generation failed: {str(e)}") @app.post("/api/analyze-code") async def analyze_code(request: CodeExecutionRequest): """Analyze code to detect interactive elements and suggest inputs - OPTIMIZED""" try: is_interactive = detect_interactive_code(request.code) if is_interactive: # OPTIMIZATION: Faster, more focused analysis analysis_prompt = f"""Analyze this Python code and identify the input() calls. Be concise: ```python {request.code} ``` Provide: 1. Number of input() calls 2. What each input should be (name, age, etc.) 3. Example values for testing Keep response short and practical.""" analysis_result = code_generator_agent(analysis_prompt) return { "success": True, "interactive": True, "analysis": analysis_result, "suggestions": "Provide inputs in the order they appear in the code" } else: return { "success": True, "interactive": False, "analysis": "This code does not require interactive input.", "suggestions": None } except Exception as e: raise HTTPException(status_code=500, detail=f"Code analysis failed: {str(e)}") @app.post("/api/execute-code") async def execute_code(request: CodeExecutionRequest): """Execute Python code using hybrid approach: direct AgentCore for charts, Strands-Agents for others""" try: session = get_or_create_session(request.session_id) # Track execution start time execution_start_time = time.time() # Check if code is interactive is_interactive = request.interactive or detect_interactive_code(request.code) # Try to find the original prompt from recent conversation history user_prompt = None if session.conversation_history: # Look for the most recent generation entry with a prompt for entry in reversed(session.conversation_history): if entry.get('prompt'): # Direct prompt field user_prompt = entry['prompt'] break elif entry.get('type') == 'generation' and entry.get('generated_code'): # Check if this generated code matches the current code being executed if entry.get('generated_code') and request.code.strip() in entry.get('generated_code', ''): user_prompt = entry.get('prompt') break # If no prompt found, check if this is a direct code execution if not user_prompt: # For direct executions, we can create a descriptive prompt based on the code code_lines = request.code.strip().split('\n') if len(code_lines) == 1 and len(code_lines[0]) < 100: user_prompt = f"Execute: {code_lines[0]}" elif 'input(' in request.code: user_prompt = "Interactive code execution" elif any(keyword in request.code.lower() for keyword in ['import matplotlib', 'plt.', 'plot', 'chart']): user_prompt = "Generate visualization/chart" elif 'import pandas' in request.code or 'pd.' in request.code: user_prompt = "Data analysis with pandas" else: user_prompt = "Direct code execution" # Prepare code for execution if is_interactive and request.inputs: prepared_code = prepare_interactive_code(request.code, request.inputs) print(f"๐Ÿ”„ Interactive code prepared with {len(request.inputs)} inputs") else: prepared_code = request.code # Check if this is chart/visualization code is_chart_code = detect_chart_code(prepared_code) # Get session files for sandbox upload session_files = [] if session.uploaded_csv: session_files.append({ 'filename': session.uploaded_csv['filename'], 'content': session.uploaded_csv['content'] }) # REVERTED: Use original logic - only force direct AgentCore for charts and files, NOT for interactive if is_chart_code or session_files: print(f"๐ŸŽจ Chart code detected - using direct AgentCore execution") # Use direct AgentCore execution to preserve full base64 output execution_result_str, images = execute_chart_code_direct(prepared_code, session_files) agent_used = "direct_agentcore_charts" else: print(f"๐Ÿ“ Regular code - using Strands-Agents execution") # For regular code, if files are needed, use direct AgentCore as well # since Strands-Agents tools can't easily access session files if session_files: print(f"๐Ÿ“ Files detected - switching to direct AgentCore for file access") execution_result_str, images = execute_chart_code_direct(prepared_code, session_files) agent_used = "direct_agentcore_with_files" else: # Use strands-agents with AgentCore tool for regular code without files execution_prompt = f"""Execute this Python code using the execute_python_code tool: ```python {prepared_code} ``` Use the tool to run the code and return the complete output.""" execution_result = code_executor_agent(execution_prompt) # Debug the AgentResult structure print(f"๐Ÿ” AgentResult type: {type(execution_result)}") # Extract the actual text content from AgentResult execution_result_str = extract_text_from_agent_result(execution_result) print(f"๐Ÿ“Š Extracted text length: {len(execution_result_str)}") # Extract image data from execution results images = extract_image_data(execution_result_str) agent_used = "strands_agents_with_agentcore" # Calculate execution duration execution_end_time = time.time() execution_duration = execution_end_time - execution_start_time # Store execution in session history session.code_history.append(request.code) session.execution_results.append({ "code": request.code, "result": execution_result_str, "agent": agent_used, "executor_type": "agentcore", "interactive": is_interactive, "inputs_provided": request.inputs if is_interactive else None, "images": images, "is_chart_code": is_chart_code, "timestamp": execution_end_time, "execution_duration": execution_duration, "prompt": user_prompt, "start_time": execution_start_time, "end_time": execution_end_time }) return { "success": True, "result": execution_result_str, "session_id": session.session_id, "agent_used": agent_used, "executor_type": "agentcore", "interactive": is_interactive, "inputs_used": request.inputs if is_interactive else None, "images": images, "is_chart_code": is_chart_code } except Exception as e: print(f"โŒ Code execution failed: {str(e)}") import traceback print(f"๐Ÿ“‹ Full traceback: {traceback.format_exc()}") raise HTTPException(status_code=500, detail=f"Code execution failed: {str(e)}") @app.post("/api/sessions/{session_id}/clear-csv") async def clear_csv_from_session(session_id: str): """Clear CSV file from session and AgentCore context""" try: session = get_or_create_session(session_id) if session.uploaded_csv: filename = session.uploaded_csv['filename'] # Clear CSV from session session.uploaded_csv = None # Add to conversation history session.conversation_history.append({ "type": "csv_removal", "filename": filename, "timestamp": time.time() }) print(f"๐Ÿ—‘๏ธ CSV file '{filename}' cleared from session {session_id}") return { "success": True, "message": f"CSV file '{filename}' removed successfully", "session_id": session_id } else: return { "success": True, "message": "No CSV file to remove", "session_id": session_id } except Exception as e: print(f"โŒ Error clearing CSV from session: {str(e)}") raise HTTPException(status_code=500, detail=f"Failed to clear CSV: {str(e)}") @app.post("/api/upload-csv") async def upload_csv_file(request: FileUploadRequest): """Upload and process a CSV file""" try: session = get_or_create_session(request.session_id) # Validate CSV content if not request.filename.lower().endswith('.csv'): raise HTTPException(status_code=400, detail="Only CSV files are allowed") # Store CSV file in session session.conversation_history.append({ "type": "csv_upload", "filename": request.filename, "content": request.content, "timestamp": time.time() }) # Store CSV data for code generation session.uploaded_csv = { "filename": request.filename, "content": request.content, "timestamp": asyncio.get_event_loop().time() } return { "success": True, "message": f"CSV file {request.filename} uploaded successfully", "session_id": session.session_id, "filename": request.filename, "preview": request.content[:500] + "..." if len(request.content) > 500 else request.content } except Exception as e: raise HTTPException(status_code=500, detail=f"CSV upload failed: {str(e)}") @app.post("/api/upload-file") async def upload_file(request: FileUploadRequest): """Upload and process a Python file""" try: session = get_or_create_session(request.session_id) # Store file in session session.conversation_history.append({ "type": "file_upload", "filename": request.filename, "content": request.content, "timestamp": time.time() }) return { "success": True, "message": f"File {request.filename} uploaded successfully", "session_id": session.session_id, "content": request.content } except Exception as e: raise HTTPException(status_code=500, detail=f"File upload failed: {str(e)}") @app.get("/api/session/{session_id}/history") async def get_session_history(session_id: str): """Get session history""" try: if session_id not in active_sessions: raise HTTPException(status_code=404, detail="Session not found") session = active_sessions[session_id] return { "success": True, "session_id": session_id, "conversation_history": session.conversation_history, "execution_results": session.execution_results } except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to get session history: {str(e)}") @app.get("/api/agents/status") async def get_agents_status(): """Get status of all agents""" try: current_model = globals().get('current_model_id', 'Unknown') agents_info = [ { "name": "code_generator", "framework": "strands-agents", "model": current_model, "purpose": "Generate Python code from natural language", "status": "active" if code_generator_agent else "inactive" }, { "name": "code_executor", "framework": executor_type, "model": current_model, "purpose": "Execute Python code safely" if executor_type == "agentcore" else "Simulate Python code execution", "status": "active" if 'code_executor_agent' in globals() else "inactive", "type": "AgentCore CodeInterpreter" if executor_type == "agentcore" else "Strands Simulation" } ] architecture = f"Hybrid: Strands-Agents + AgentCore ({current_model})" if executor_type == "agentcore" else f"Strands-Agents Framework ({current_model})" return { "agents": agents_info, "total": len(agents_info), "architecture": architecture, "executor_type": executor_type, "current_model": current_model, "aws_region": aws_region, "authentication": "AWS Profile" if os.getenv('AWS_PROFILE') else "Access Keys" } except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to get agents status: {str(e)}") # WebSocket endpoint for real-time communication @app.websocket("/ws/{session_id}") async def websocket_endpoint(websocket: WebSocket, session_id: str): await websocket.accept() print(f"WebSocket connected for session {session_id}") try: while True: data = await websocket.receive_text() message = json.loads(data) if message["type"] == "generate_code": # Handle code generation via WebSocket try: agent_result = code_generator_agent(message["prompt"]) # Extract string content from AgentResult generated_code = str(agent_result) if agent_result is not None else "" await websocket.send_text(json.dumps({ "type": "code_generated", "success": True, "code": generated_code, "session_id": session_id })) except Exception as e: await websocket.send_text(json.dumps({ "type": "error", "success": False, "error": str(e) })) elif message["type"] == "execute_code": # Handle code execution via WebSocket try: if executor_type == "agentcore": execution_result = code_executor_agent(f"Execute this code: {message['code']}") else: execution_result = code_executor_agent(f"Simulate execution of: {message['code']}") await websocket.send_text(json.dumps({ "type": "execution_result", "success": True, "result": execution_result, "session_id": session_id })) except Exception as e: await websocket.send_text(json.dumps({ "type": "error", "success": False, "error": str(e) })) except WebSocketDisconnect: print(f"WebSocket disconnected for session {session_id}") @app.get("/health") async def health_check(): """Health check endpoint""" current_model = globals().get('current_model_id', 'Unknown') return { "status": "healthy", "code_generator_ready": code_generator_agent is not None, "code_executor_ready": 'code_executor_agent' in globals(), "executor_type": executor_type, "current_model": current_model, "aws_region": aws_region, "authentication": "AWS Profile" if os.getenv('AWS_PROFILE') else "Access Keys", "architecture": { "code_generation": f"Strands-Agents Agent ({current_model})", "code_execution": f"{executor_type.title().replace('_', ' ')} Agent ({current_model})" } } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)