53 lines
1.9 KiB
Python
Raw Permalink Normal View History

from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools import google_search
from google.genai import types
import asyncio
import os
# adapted form https://google.github.io/adk-docs/tools/built-in-tools/#google-search
APP_NAME="google_search_agent"
USER_ID="user1234"
# Agent Definition
# Add your GEMINI_API_KEY
root_agent = Agent(
model="gemini-2.0-flash",
name="openai_agent",
description="Agent to answer questions using Google Search.",
instruction="I can answer your questions by searching the internet. Just ask me anything!",
# google_search is a pre-built tool which allows the agent to perform Google searches.
tools=[google_search]
)
# Session and Runner
async def setup_session_and_runner(user_id, session_id):
session_service = InMemorySessionService()
session = await session_service.create_session(app_name=APP_NAME, user_id=user_id, session_id=session_id)
runner = Runner(agent=root_agent, app_name=APP_NAME, session_service=session_service)
return session, runner
# Agent Interaction
async def call_agent_async(query, user_id, session_id):
content = types.Content(role='user', parts=[types.Part(text=query)])
session, runner = await setup_session_and_runner(user_id, session_id)
events = runner.run_async(user_id=user_id, session_id=session_id, new_message=content)
async for event in events:
if event.is_final_response():
final_response = event.content.parts[0].text
print("Agent Response: ", final_response)
return final_response
from bedrock_agentcore.runtime import BedrockAgentCoreApp
app = BedrockAgentCoreApp()
@app.entrypoint
def agent_invocation(payload, context):
return asyncio.run(call_agent_async(payload.get("prompt", "what is Bedrock Agentcore Runtime?"), payload.get("user_id",USER_ID), context.session_id))
app.run()