from google.adk.agents import Agent from google.adk.runners import Runner from google.adk.sessions import InMemorySessionService from google.adk.tools import google_search from google.genai import types import asyncio import os # adapted form https://google.github.io/adk-docs/tools/built-in-tools/#google-search APP_NAME="google_search_agent" USER_ID="user1234" # Agent Definition # Add your GEMINI_API_KEY root_agent = Agent( model="gemini-2.0-flash", name="openai_agent", description="Agent to answer questions using Google Search.", instruction="I can answer your questions by searching the internet. Just ask me anything!", # google_search is a pre-built tool which allows the agent to perform Google searches. tools=[google_search] ) # Session and Runner async def setup_session_and_runner(user_id, session_id): session_service = InMemorySessionService() session = await session_service.create_session(app_name=APP_NAME, user_id=user_id, session_id=session_id) runner = Runner(agent=root_agent, app_name=APP_NAME, session_service=session_service) return session, runner # Agent Interaction async def call_agent_async(query, user_id, session_id): content = types.Content(role='user', parts=[types.Part(text=query)]) session, runner = await setup_session_and_runner(user_id, session_id) events = runner.run_async(user_id=user_id, session_id=session_id, new_message=content) async for event in events: if event.is_final_response(): final_response = event.content.parts[0].text print("Agent Response: ", final_response) return final_response from bedrock_agentcore.runtime import BedrockAgentCoreApp app = BedrockAgentCoreApp() @app.entrypoint def agent_invocation(payload, context): return asyncio.run(call_agent_async(payload.get("prompt", "what is Bedrock Agentcore Runtime?"), payload.get("user_id",USER_ID), context.session_id)) app.run()