Advanced Concept
直接實作 _run_async_impl(或其他語言中的對應類別)來建立自訂 agent,雖然能提供強大的控制能力,但相較於使用預先定義的 LlmAgent 或標準的 WorkflowAgent 類型來說,實作上會更為複雜。我們建議在挑戰自訂協作邏輯之前,先充分理解這些基礎的 agent 類型。
自訂 agent¶
自訂 agent 提供了 Agent Development Kit (ADK) 中最高的彈性,讓你可以直接繼承自 BaseAgent,並實作自己的控制流程,從而定義任意的協作邏輯。這種方式超越了 SequentialAgent、LoopAgent 和 ParallelAgent 等預先定義的模式,使你能夠建立高度特定且複雜的 agent 工作流程。
介紹:超越預設工作流程¶
什麼是自訂 Agent?¶
自訂 Agent 基本上是你所建立、繼承自 google.adk.agents.BaseAgent 的任何類別,並在 _run_async_impl 非同步方法中實作其核心執行邏輯。你可以完全掌控這個方法如何呼叫其他 agent(子 agent)、管理狀態,以及處理事件。
Note
實作 agent 核心非同步邏輯的方法名稱,會依不同 SDK 語言略有差異(例如:Java 中為 runAsyncImpl,Python 中為 _run_async_impl)。詳細資訊請參閱各語言的 API 文件說明。
為什麼要使用自訂 agent?¶
雖然標準的 Workflow Agents(SequentialAgent、LoopAgent、ParallelAgent)已涵蓋常見的協作流程模式,但當你的需求包含以下情境時,就需要使用自訂 agent:
- 條件邏輯(Conditional Logic): 根據執行時條件或前一步驟的結果,執行不同的子 agent 或採取不同的流程分支。
- 複雜狀態管理(Complex State Management): 在工作流程中,實作超越單純順序傳遞的複雜狀態維護與更新邏輯。
- 外部整合(External Integrations): 在協作流程控制中,直接整合對外部 API、資料庫或自訂函式庫的呼叫。
- 動態 agent 選擇(Dynamic Agent Selection): 根據情境或輸入的動態評估,決定接下來要執行哪個子 agent(或多個)。
- 獨特的工作流程模式(Unique Workflow Patterns): 實作不屬於標準順序、平行或迴圈結構的協作邏輯。

實作自訂邏輯:¶
任何自訂 agent 的核心,就是你定義其獨特非同步行為的方法。這個方法讓你能協調子 agent,並管理執行流程。
The heart of any custom agent is the _run_async_impl method. This is where you define its unique behavior.
- Signature:
async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]: - Asynchronous Generator: It must be an
async deffunction and return anAsyncGenerator. This allows it toyieldevents produced by sub-agents or its own logic back to the runner. ctx(InvocationContext): Provides access to crucial runtime information, most importantlyctx.session.state, which is the primary way to share data between steps orchestrated by your custom agent.
The heart of any custom agent is the runAsyncImpl method, which you override from BaseAgent.
- Signature:
protected Flowable<Event> runAsyncImpl(InvocationContext ctx) - Reactive Stream (
Flowable): It must return anio.reactivex.rxjava3.core.Flowable<Event>. ThisFlowablerepresents a stream of events that will be produced by the custom agent's logic, often by combining or transforming multipleFlowablefrom sub-agents. ctx(InvocationContext): Provides access to crucial runtime information, most importantlyctx.session().state(), which is ajava.util.concurrent.ConcurrentMap<String, Object>. This is the primary way to share data between steps orchestrated by your custom agent.
核心非同步方法中的主要功能:
-
呼叫子 agent: 你可以透過呼叫子 agent(通常會以實例屬性如
self.my_llm_agent儲存),使用它們的run_async方法並讓渡(yield)它們的事件: -
管理狀態: 透過讀取與寫入 session state 字典(
ctx.session.state),可在子 agent 呼叫之間傳遞資料或進行決策:# Read data set by a previous agent previous_result = ctx.session.state.get("some_key") # Make a decision based on state if previous_result == "some_value": # ... call a specific sub-agent ... else: # ... call another sub-agent ... # Store a result for a later step (often done via a sub-agent's output_key) # ctx.session.state["my_custom_result"] = "calculated_value" -
實作控制流程: 使用標準的 Python 結構(
if/elif/else、for/while迴圈、try/except)來建立複雜、有條件或具迭代性的工作流程,並整合你的子 agent。
-
呼叫子 agent: 你可以透過呼叫子 agent(通常以實例屬性或物件形式儲存)的非同步 run 方法,並回傳它們的事件串流:
通常你會使用 RxJava 的運算子(如
concatWith、flatMapPublisher或concatArray)來串接來自子 agent 的Flowable。// Example: Running one sub-agent // return someSubAgent.runAsync(ctx); // Example: Running sub-agents sequentially Flowable<Event> firstAgentEvents = someSubAgent1.runAsync(ctx) .doOnNext(event -> System.out.println("Event from agent 1: " + event.id())); Flowable<Event> secondAgentEvents = Flowable.defer(() -> someSubAgent2.runAsync(ctx) .doOnNext(event -> System.out.println("Event from agent 2: " + event.id())) ); return firstAgentEvents.concatWith(secondAgentEvents);Flowable.defer()通常用於後續階段,如果這些階段的執行依賴於前一階段的完成或狀態時。 -
管理狀態: 透過讀取和寫入 session state,在子 agent 呼叫之間傳遞資料或進行決策。session state 是透過
ctx.session().state()取得的java.util.concurrent.ConcurrentMap<String, Object>。// Read data set by a previous agent Object previousResult = ctx.session().state().get("some_key"); // Make a decision based on state if ("some_value".equals(previousResult)) { // ... logic to include a specific sub-agent's Flowable ... } else { // ... logic to include another sub-agent's Flowable ... } // Store a result for a later step (often done via a sub-agent's output_key) // ctx.session().state().put("my_custom_result", "calculated_value"); -
實作控制流程: 使用標準語言結構(
if/else、迴圈、try/catch)結合 reactive operators(如 RxJava),可以建立複雜的工作流程。- 條件判斷: 使用
Flowable.defer()來根據條件選擇要訂閱哪個Flowable,或在串流中過濾事件時使用filter()。 - 迭代處理: 可以使用像是
repeat()、retry()的運算子,或透過設計Flowable鏈,在條件下遞迴呼叫自身的部分(通常會用flatMapPublisher或concatMap來管理)。
- 條件判斷: 使用
管理子 agent 與狀態¶
通常,自訂 agent 會協調其他 agent(如 LlmAgent、LoopAgent 等)。
- 初始化: 通常會將這些子 agent 的實例傳入自訂 agent 的建構子,並將其儲存為實例欄位/屬性(例如:
this.story_generator = story_generator_instance或self.story_generator = story_generator_instance)。這樣可以在自訂 agent 的核心非同步執行邏輯中(如:_run_async_impl方法)存取這些子 agent。 - 子 agent 清單: 初始化
BaseAgent時,使用其super()建構子,應傳入sub agents清單。這個清單會告訴 Agent Development Kit (ADK) 框架,哪些 agent 屬於這個自訂 agent 的直接階層。這對於框架的生命週期管理、內省(introspection),以及未來可能的路由功能都很重要,即使你的核心執行邏輯(_run_async_impl)是直接透過self.xxx_agent呼叫這些 agent。請將自訂邏輯中會在頂層直接呼叫的 agent 都包含進來。 - 狀態: 如前所述,
ctx.session.state是子 agent(特別是使用output key的LlmAgent)將結果回傳給協調者,以及協調者將必要輸入下傳的標準方式。
設計模式範例:StoryFlowAgent¶
以下用一個設計模式範例來說明自訂 agent 的強大功能:一個具備條件邏輯的多階段內容生成工作流程。
目標: 建立一個系統,能夠產生故事,並透過批評與修訂反覆優化,進行最終檢查,且最關鍵的是,如果最終語氣檢查未通過,則重新生成故事。
為什麼要自訂? 這裡需要自訂 agent 的核心原因是根據語氣檢查結果進行條件式的重新生成。標準工作流程 agent 並未內建根據子 agent 任務結果進行條件分支的能力。我們需要在協調者中加入自訂邏輯(if tone == "negative": ...)。
第一部分:簡化版自訂 agent 初始化¶
We define the StoryFlowAgent inheriting from BaseAgent. In __init__, we store the necessary sub-agents (passed in) as instance attributes and tell the BaseAgent framework about the top-level agents this custom agent will directly orchestrate.
class StoryFlowAgent(BaseAgent):
"""
Custom agent for a story generation and refinement workflow.
This agent orchestrates a sequence of LLM agents to generate a story,
critique it, revise it, check grammar and tone, and potentially
regenerate the story if the tone is negative.
"""
# --- Field Declarations for Pydantic ---
# Declare the agents passed during initialization as class attributes with type hints
story_generator: LlmAgent
critic: LlmAgent
reviser: LlmAgent
grammar_check: LlmAgent
tone_check: LlmAgent
loop_agent: LoopAgent
sequential_agent: SequentialAgent
# model_config allows setting Pydantic configurations if needed, e.g., arbitrary_types_allowed
model_config = {"arbitrary_types_allowed": True}
def __init__(
self,
name: str,
story_generator: LlmAgent,
critic: LlmAgent,
reviser: LlmAgent,
grammar_check: LlmAgent,
tone_check: LlmAgent,
):
"""
Initializes the StoryFlowAgent.
Args:
name: The name of the agent.
story_generator: An LlmAgent to generate the initial story.
critic: An LlmAgent to critique the story.
reviser: An LlmAgent to revise the story based on criticism.
grammar_check: An LlmAgent to check the grammar.
tone_check: An LlmAgent to analyze the tone.
"""
# Create internal agents *before* calling super().__init__
loop_agent = LoopAgent(
name="CriticReviserLoop", sub_agents=[critic, reviser], max_iterations=2
)
sequential_agent = SequentialAgent(
name="PostProcessing", sub_agents=[grammar_check, tone_check]
)
# Define the sub_agents list for the framework
sub_agents_list = [
story_generator,
loop_agent,
sequential_agent,
]
# Pydantic will validate and assign them based on the class annotations.
super().__init__(
name=name,
story_generator=story_generator,
critic=critic,
reviser=reviser,
grammar_check=grammar_check,
tone_check=tone_check,
loop_agent=loop_agent,
sequential_agent=sequential_agent,
sub_agents=sub_agents_list, # Pass the sub_agents list directly
)
We define the StoryFlowAgentExample by extending BaseAgent. In its constructor, we store the necessary sub-agent instances (passed as parameters) as instance fields. These top-level sub-agents, which this custom agent will directly orchestrate, are also passed to the super constructor of BaseAgent as a list.
private final LlmAgent storyGenerator;
private final LoopAgent loopAgent;
private final SequentialAgent sequentialAgent;
public StoryFlowAgentExample(
String name, LlmAgent storyGenerator, LoopAgent loopAgent, SequentialAgent sequentialAgent) {
super(
name,
"Orchestrates story generation, critique, revision, and checks.",
List.of(storyGenerator, loopAgent, sequentialAgent),
null,
null);
this.storyGenerator = storyGenerator;
this.loopAgent = loopAgent;
this.sequentialAgent = sequentialAgent;
}
第 2 部分:定義自訂執行邏輯¶
This method orchestrates the sub-agents using standard Python async/await and control flow.
@override
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
"""
Implements the custom orchestration logic for the story workflow.
Uses the instance attributes assigned by Pydantic (e.g., self.story_generator).
"""
logger.info(f"[{self.name}] Starting story generation workflow.")
# 1. Initial Story Generation
logger.info(f"[{self.name}] Running StoryGenerator...")
async for event in self.story_generator.run_async(ctx):
logger.info(f"[{self.name}] Event from StoryGenerator: {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
# Check if story was generated before proceeding
if "current_story" not in ctx.session.state or not ctx.session.state["current_story"]:
logger.error(f"[{self.name}] Failed to generate initial story. Aborting workflow.")
return # Stop processing if initial story failed
logger.info(f"[{self.name}] Story state after generator: {ctx.session.state.get('current_story')}")
# 2. Critic-Reviser Loop
logger.info(f"[{self.name}] Running CriticReviserLoop...")
# Use the loop_agent instance attribute assigned during init
async for event in self.loop_agent.run_async(ctx):
logger.info(f"[{self.name}] Event from CriticReviserLoop: {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
logger.info(f"[{self.name}] Story state after loop: {ctx.session.state.get('current_story')}")
# 3. Sequential Post-Processing (Grammar and Tone Check)
logger.info(f"[{self.name}] Running PostProcessing...")
# Use the sequential_agent instance attribute assigned during init
async for event in self.sequential_agent.run_async(ctx):
logger.info(f"[{self.name}] Event from PostProcessing: {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
# 4. Tone-Based Conditional Logic
tone_check_result = ctx.session.state.get("tone_check_result")
logger.info(f"[{self.name}] Tone check result: {tone_check_result}")
if tone_check_result == "negative":
logger.info(f"[{self.name}] Tone is negative. Regenerating story...")
async for event in self.story_generator.run_async(ctx):
logger.info(f"[{self.name}] Event from StoryGenerator (Regen): {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
else:
logger.info(f"[{self.name}] Tone is not negative. Keeping current story.")
pass
logger.info(f"[{self.name}] Workflow finished.")
- The initial
story_generatorruns. Its output is expected to be inctx.session.state["current_story"]. - The
loop_agentruns, which internally calls thecriticandrevisersequentially formax_iterationstimes. They read/writecurrent_storyandcriticismfrom/to the state. - The
sequential_agentruns, callinggrammar_checkthentone_check, readingcurrent_storyand writinggrammar_suggestionsandtone_check_resultto the state. - Custom Part: The
ifstatement checks thetone_check_resultfrom the state. If it's "negative", thestory_generatoris called again, overwriting thecurrent_storyin the state. Otherwise, the flow ends.
The runAsyncImpl method orchestrates the sub-agents using RxJava's Flowable streams and operators for asynchronous control flow.
@Override
protected Flowable<Event> runAsyncImpl(InvocationContext invocationContext) {
// Implements the custom orchestration logic for the story workflow.
// Uses the instance attributes assigned by Pydantic (e.g., self.story_generator).
logger.log(Level.INFO, () -> String.format("[%s] Starting story generation workflow.", name()));
// Stage 1. Initial Story Generation
Flowable<Event> storyGenFlow = runStage(storyGenerator, invocationContext, "StoryGenerator");
// Stage 2: Critic-Reviser Loop (runs after story generation completes)
Flowable<Event> criticReviserFlow = Flowable.defer(() -> {
if (!isStoryGenerated(invocationContext)) {
logger.log(Level.SEVERE,() ->
String.format("[%s] Failed to generate initial story. Aborting after StoryGenerator.",
name()));
return Flowable.empty(); // Stop further processing if no story
}
logger.log(Level.INFO, () ->
String.format("[%s] Story state after generator: %s",
name(), invocationContext.session().state().get("current_story")));
return runStage(loopAgent, invocationContext, "CriticReviserLoop");
});
// Stage 3: Post-Processing (runs after critic-reviser loop completes)
Flowable<Event> postProcessingFlow = Flowable.defer(() -> {
logger.log(Level.INFO, () ->
String.format("[%s] Story state after loop: %s",
name(), invocationContext.session().state().get("current_story")));
return runStage(sequentialAgent, invocationContext, "PostProcessing");
});
// Stage 4: Conditional Regeneration (runs after post-processing completes)
Flowable<Event> conditionalRegenFlow = Flowable.defer(() -> {
String toneCheckResult = (String) invocationContext.session().state().get("tone_check_result");
logger.log(Level.INFO, () -> String.format("[%s] Tone check result: %s", name(), toneCheckResult));
if ("negative".equalsIgnoreCase(toneCheckResult)) {
logger.log(Level.INFO, () ->
String.format("[%s] Tone is negative. Regenerating story...", name()));
return runStage(storyGenerator, invocationContext, "StoryGenerator (Regen)");
} else {
logger.log(Level.INFO, () ->
String.format("[%s] Tone is not negative. Keeping current story.", name()));
return Flowable.empty(); // No regeneration needed
}
});
return Flowable.concatArray(storyGenFlow, criticReviserFlow, postProcessingFlow, conditionalRegenFlow)
.doOnComplete(() -> logger.log(Level.INFO, () -> String.format("[%s] Workflow finished.", name())));
}
// Helper method for a single agent run stage with logging
private Flowable<Event> runStage(BaseAgent agentToRun, InvocationContext ctx, String stageName) {
logger.log(Level.INFO, () -> String.format("[%s] Running %s...", name(), stageName));
return agentToRun
.runAsync(ctx)
.doOnNext(event ->
logger.log(Level.INFO,() ->
String.format("[%s] Event from %s: %s", name(), stageName, event.toJson())))
.doOnError(err ->
logger.log(Level.SEVERE,
String.format("[%s] Error in %s", name(), stageName), err))
.doOnComplete(() ->
logger.log(Level.INFO, () ->
String.format("[%s] %s finished.", name(), stageName)));
}
- The initial
storyGenerator.runAsync(invocationContext)Flowable is executed. Its output is expected to be ininvocationContext.session().state().get("current_story"). - The
loopAgent'sFlowable runs next (due toFlowable.concatArrayandFlowable.defer). The LoopAgent internally calls thecriticandrevisersub-agents sequentially for up tomaxIterations. They read/writecurrent_storyandcriticismfrom/to the state. - Then, the
sequentialAgent'sFlowable executes. It calls thegrammar_checkthentone_check, readingcurrent_storyand writinggrammar_suggestionsandtone_check_resultto the state. - Custom Part: After the sequentialAgent completes, logic within a
Flowable.deferchecks the "tone_check_result" frominvocationContext.session().state(). If it's "negative", thestoryGeneratorFlowable is conditionally concatenated and executed again, overwriting "current_story". Otherwise, an empty Flowable is used, and the overall workflow proceeds to completion.
第 3 部分:定義大型語言模型 (LLM) 子代理¶
這些是標準的 LlmAgent 定義,負責特定任務。它們的 output key 參數對於將結果放入 session.state 中至關重要,其他 agent 或自訂協調器可以存取這些結果。
Direct State Injection in Instructions
請注意 story_generator 的指令。{var} 語法是一個占位符。在指令傳送給大型語言模型 (LLM) 之前,Agent Development Kit (ADK) 框架會自動將 (例如:{topic}) 替換為 session.state['topic'] 的值。這是建議為 agent 提供情境資訊的方式,也就是在指令中使用樣板(templating)。如需更多細節,請參閱 State documentation。
GEMINI_2_FLASH = "gemini-2.0-flash" # Define model constant
# --- Define the individual LLM agents ---
story_generator = LlmAgent(
name="StoryGenerator",
model=GEMINI_2_FLASH,
instruction="""You are a story writer. Write a short story (around 100 words), on the following topic: {topic}""",
input_schema=None,
output_key="current_story", # Key for storing output in session state
)
critic = LlmAgent(
name="Critic",
model=GEMINI_2_FLASH,
instruction="""You are a story critic. Review the story provided: {{current_story}}. Provide 1-2 sentences of constructive criticism
on how to improve it. Focus on plot or character.""",
input_schema=None,
output_key="criticism", # Key for storing criticism in session state
)
reviser = LlmAgent(
name="Reviser",
model=GEMINI_2_FLASH,
instruction="""You are a story reviser. Revise the story provided: {{current_story}}, based on the criticism in
{{criticism}}. Output only the revised story.""",
input_schema=None,
output_key="current_story", # Overwrites the original story
)
grammar_check = LlmAgent(
name="GrammarCheck",
model=GEMINI_2_FLASH,
instruction="""You are a grammar checker. Check the grammar of the story provided: {current_story}. Output only the suggested
corrections as a list, or output 'Grammar is good!' if there are no errors.""",
input_schema=None,
output_key="grammar_suggestions",
)
tone_check = LlmAgent(
name="ToneCheck",
model=GEMINI_2_FLASH,
instruction="""You are a tone analyzer. Analyze the tone of the story provided: {current_story}. Output only one word: 'positive' if
the tone is generally positive, 'negative' if the tone is generally negative, or 'neutral'
otherwise.""",
input_schema=None,
output_key="tone_check_result", # This agent's output determines the conditional flow
)
// --- Define the individual LLM agents ---
LlmAgent storyGenerator =
LlmAgent.builder()
.name("StoryGenerator")
.model(MODEL_NAME)
.description("Generates the initial story.")
.instruction(
"""
You are a story writer. Write a short story (around 100 words) about a cat,
based on the topic: {topic}
""")
.inputSchema(null)
.outputKey("current_story") // Key for storing output in session state
.build();
LlmAgent critic =
LlmAgent.builder()
.name("Critic")
.model(MODEL_NAME)
.description("Critiques the story.")
.instruction(
"""
You are a story critic. Review the story: {current_story}. Provide 1-2 sentences of constructive criticism
on how to improve it. Focus on plot or character.
""")
.inputSchema(null)
.outputKey("criticism") // Key for storing criticism in session state
.build();
LlmAgent reviser =
LlmAgent.builder()
.name("Reviser")
.model(MODEL_NAME)
.description("Revises the story based on criticism.")
.instruction(
"""
You are a story reviser. Revise the story: {current_story}, based on the criticism: {criticism}. Output only the revised story.
""")
.inputSchema(null)
.outputKey("current_story") // Overwrites the original story
.build();
LlmAgent grammarCheck =
LlmAgent.builder()
.name("GrammarCheck")
.model(MODEL_NAME)
.description("Checks grammar and suggests corrections.")
.instruction(
"""
You are a grammar checker. Check the grammar of the story: {current_story}. Output only the suggested
corrections as a list, or output 'Grammar is good!' if there are no errors.
""")
.outputKey("grammar_suggestions")
.build();
LlmAgent toneCheck =
LlmAgent.builder()
.name("ToneCheck")
.model(MODEL_NAME)
.description("Analyzes the tone of the story.")
.instruction(
"""
You are a tone analyzer. Analyze the tone of the story: {current_story}. Output only one word: 'positive' if
the tone is generally positive, 'negative' if the tone is generally negative, or 'neutral'
otherwise.
""")
.outputKey("tone_check_result") // This agent's output determines the conditional flow
.build();
LoopAgent loopAgent =
LoopAgent.builder()
.name("CriticReviserLoop")
.description("Iteratively critiques and revises the story.")
.subAgents(critic, reviser)
.maxIterations(2)
.build();
SequentialAgent sequentialAgent =
SequentialAgent.builder()
.name("PostProcessing")
.description("Performs grammar and tone checks sequentially.")
.subAgents(grammarCheck, toneCheck)
.build();
第 4 部分:實例化並執行自訂 agent¶
最後,您可以實例化您的 StoryFlowAgent,並像平常一樣使用 Runner。
# --- Create the custom agent instance ---
story_flow_agent = StoryFlowAgent(
name="StoryFlowAgent",
story_generator=story_generator,
critic=critic,
reviser=reviser,
grammar_check=grammar_check,
tone_check=tone_check,
)
INITIAL_STATE = {"topic": "a brave kitten exploring a haunted house"}
# --- Setup Runner and Session ---
async def setup_session_and_runner():
session_service = InMemorySessionService()
session = await session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID, state=INITIAL_STATE)
logger.info(f"Initial session state: {session.state}")
runner = Runner(
agent=story_flow_agent, # Pass the custom orchestrator agent
app_name=APP_NAME,
session_service=session_service
)
return session_service, runner
# --- Function to Interact with the Agent ---
async def call_agent_async(user_input_topic: str):
"""
Sends a new topic to the agent (overwriting the initial one if needed)
and runs the workflow.
"""
session_service, runner = await setup_session_and_runner()
current_session = await session_service.get_session(app_name=APP_NAME,
user_id=USER_ID,
session_id=SESSION_ID)
if not current_session:
logger.error("Session not found!")
return
current_session.state["topic"] = user_input_topic
logger.info(f"Updated session state topic to: {user_input_topic}")
content = types.Content(role='user', parts=[types.Part(text=f"Generate a story about: {user_input_topic}")])
events = runner.run_async(user_id=USER_ID, session_id=SESSION_ID, new_message=content)
final_response = "No final response captured."
async for event in events:
if event.is_final_response() and event.content and event.content.parts:
logger.info(f"Potential final response from [{event.author}]: {event.content.parts[0].text}")
final_response = event.content.parts[0].text
print("\n--- Agent Interaction Result ---")
print("Agent Final Response: ", final_response)
final_session = await session_service.get_session(app_name=APP_NAME,
user_id=USER_ID,
session_id=SESSION_ID)
print("Final Session State:")
import json
print(json.dumps(final_session.state, indent=2))
print("-------------------------------\n")
# --- Run the Agent ---
# Note: In Colab, you can directly use 'await' at the top level.
# If running this code as a standalone Python script, you'll need to use asyncio.run() or manage the event loop.
await call_agent_async("a lonely robot finding a friend in a junkyard")
// --- Function to Interact with the Agent ---
// Sends a new topic to the agent (overwriting the initial one if needed)
// and runs the workflow.
public static void runAgent(StoryFlowAgentExample agent, String userTopic) {
// --- Setup Runner and Session ---
InMemoryRunner runner = new InMemoryRunner(agent);
Map<String, Object> initialState = new HashMap<>();
initialState.put("topic", "a brave kitten exploring a haunted house");
Session session =
runner
.sessionService()
.createSession(APP_NAME, USER_ID, new ConcurrentHashMap<>(initialState), SESSION_ID)
.blockingGet();
logger.log(Level.INFO, () -> String.format("Initial session state: %s", session.state()));
session.state().put("topic", userTopic); // Update the state in the retrieved session
logger.log(Level.INFO, () -> String.format("Updated session state topic to: %s", userTopic));
Content userMessage = Content.fromParts(Part.fromText("Generate a story about: " + userTopic));
// Use the modified session object for the run
Flowable<Event> eventStream = runner.runAsync(USER_ID, session.id(), userMessage);
final String[] finalResponse = {"No final response captured."};
eventStream.blockingForEach(
event -> {
if (event.finalResponse() && event.content().isPresent()) {
String author = event.author() != null ? event.author() : "UNKNOWN_AUTHOR";
Optional<String> textOpt =
event
.content()
.flatMap(Content::parts)
.filter(parts -> !parts.isEmpty())
.map(parts -> parts.get(0).text().orElse(""));
logger.log(Level.INFO, () ->
String.format("Potential final response from [%s]: %s", author, textOpt.orElse("N/A")));
textOpt.ifPresent(text -> finalResponse[0] = text);
}
});
System.out.println("\n--- Agent Interaction Result ---");
System.out.println("Agent Final Response: " + finalResponse[0]);
// Retrieve session again to see the final state after the run
Session finalSession =
runner
.sessionService()
.getSession(APP_NAME, USER_ID, SESSION_ID, Optional.empty())
.blockingGet();
assert finalSession != null;
System.out.println("Final Session State:" + finalSession.state());
System.out.println("-------------------------------\n");
}
(注意:完整可執行程式碼(包含 import 與執行邏輯)請參見下方連結。)
完整程式碼範例¶
Storyflow Agent
# Full runnable code for the StoryFlowAgent example
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import AsyncGenerator
from typing_extensions import override
from google.adk.agents import LlmAgent, BaseAgent, LoopAgent, SequentialAgent
from google.adk.agents.invocation_context import InvocationContext
from google.genai import types
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.events import Event
from pydantic import BaseModel, Field
# --- Constants ---
APP_NAME = "story_app"
USER_ID = "12345"
SESSION_ID = "123344"
GEMINI_2_FLASH = "gemini-2.0-flash"
# --- Configure Logging ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Custom Orchestrator Agent ---
class StoryFlowAgent(BaseAgent):
"""
Custom agent for a story generation and refinement workflow.
This agent orchestrates a sequence of LLM agents to generate a story,
critique it, revise it, check grammar and tone, and potentially
regenerate the story if the tone is negative.
"""
# --- Field Declarations for Pydantic ---
# Declare the agents passed during initialization as class attributes with type hints
story_generator: LlmAgent
critic: LlmAgent
reviser: LlmAgent
grammar_check: LlmAgent
tone_check: LlmAgent
loop_agent: LoopAgent
sequential_agent: SequentialAgent
# model_config allows setting Pydantic configurations if needed, e.g., arbitrary_types_allowed
model_config = {"arbitrary_types_allowed": True}
def __init__(
self,
name: str,
story_generator: LlmAgent,
critic: LlmAgent,
reviser: LlmAgent,
grammar_check: LlmAgent,
tone_check: LlmAgent,
):
"""
Initializes the StoryFlowAgent.
Args:
name: The name of the agent.
story_generator: An LlmAgent to generate the initial story.
critic: An LlmAgent to critique the story.
reviser: An LlmAgent to revise the story based on criticism.
grammar_check: An LlmAgent to check the grammar.
tone_check: An LlmAgent to analyze the tone.
"""
# Create internal agents *before* calling super().__init__
loop_agent = LoopAgent(
name="CriticReviserLoop", sub_agents=[critic, reviser], max_iterations=2
)
sequential_agent = SequentialAgent(
name="PostProcessing", sub_agents=[grammar_check, tone_check]
)
# Define the sub_agents list for the framework
sub_agents_list = [
story_generator,
loop_agent,
sequential_agent,
]
# Pydantic will validate and assign them based on the class annotations.
super().__init__(
name=name,
story_generator=story_generator,
critic=critic,
reviser=reviser,
grammar_check=grammar_check,
tone_check=tone_check,
loop_agent=loop_agent,
sequential_agent=sequential_agent,
sub_agents=sub_agents_list, # Pass the sub_agents list directly
)
@override
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
"""
Implements the custom orchestration logic for the story workflow.
Uses the instance attributes assigned by Pydantic (e.g., self.story_generator).
"""
logger.info(f"[{self.name}] Starting story generation workflow.")
# 1. Initial Story Generation
logger.info(f"[{self.name}] Running StoryGenerator...")
async for event in self.story_generator.run_async(ctx):
logger.info(f"[{self.name}] Event from StoryGenerator: {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
# Check if story was generated before proceeding
if "current_story" not in ctx.session.state or not ctx.session.state["current_story"]:
logger.error(f"[{self.name}] Failed to generate initial story. Aborting workflow.")
return # Stop processing if initial story failed
logger.info(f"[{self.name}] Story state after generator: {ctx.session.state.get('current_story')}")
# 2. Critic-Reviser Loop
logger.info(f"[{self.name}] Running CriticReviserLoop...")
# Use the loop_agent instance attribute assigned during init
async for event in self.loop_agent.run_async(ctx):
logger.info(f"[{self.name}] Event from CriticReviserLoop: {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
logger.info(f"[{self.name}] Story state after loop: {ctx.session.state.get('current_story')}")
# 3. Sequential Post-Processing (Grammar and Tone Check)
logger.info(f"[{self.name}] Running PostProcessing...")
# Use the sequential_agent instance attribute assigned during init
async for event in self.sequential_agent.run_async(ctx):
logger.info(f"[{self.name}] Event from PostProcessing: {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
# 4. Tone-Based Conditional Logic
tone_check_result = ctx.session.state.get("tone_check_result")
logger.info(f"[{self.name}] Tone check result: {tone_check_result}")
if tone_check_result == "negative":
logger.info(f"[{self.name}] Tone is negative. Regenerating story...")
async for event in self.story_generator.run_async(ctx):
logger.info(f"[{self.name}] Event from StoryGenerator (Regen): {event.model_dump_json(indent=2, exclude_none=True)}")
yield event
else:
logger.info(f"[{self.name}] Tone is not negative. Keeping current story.")
pass
logger.info(f"[{self.name}] Workflow finished.")
# --- Define the individual LLM agents ---
story_generator = LlmAgent(
name="StoryGenerator",
model=GEMINI_2_FLASH,
instruction="""You are a story writer. Write a short story (around 100 words), on the following topic: {topic}""",
input_schema=None,
output_key="current_story", # Key for storing output in session state
)
critic = LlmAgent(
name="Critic",
model=GEMINI_2_FLASH,
instruction="""You are a story critic. Review the story provided: {{current_story}}. Provide 1-2 sentences of constructive criticism
on how to improve it. Focus on plot or character.""",
input_schema=None,
output_key="criticism", # Key for storing criticism in session state
)
reviser = LlmAgent(
name="Reviser",
model=GEMINI_2_FLASH,
instruction="""You are a story reviser. Revise the story provided: {{current_story}}, based on the criticism in
{{criticism}}. Output only the revised story.""",
input_schema=None,
output_key="current_story", # Overwrites the original story
)
grammar_check = LlmAgent(
name="GrammarCheck",
model=GEMINI_2_FLASH,
instruction="""You are a grammar checker. Check the grammar of the story provided: {current_story}. Output only the suggested
corrections as a list, or output 'Grammar is good!' if there are no errors.""",
input_schema=None,
output_key="grammar_suggestions",
)
tone_check = LlmAgent(
name="ToneCheck",
model=GEMINI_2_FLASH,
instruction="""You are a tone analyzer. Analyze the tone of the story provided: {current_story}. Output only one word: 'positive' if
the tone is generally positive, 'negative' if the tone is generally negative, or 'neutral'
otherwise.""",
input_schema=None,
output_key="tone_check_result", # This agent's output determines the conditional flow
)
# --- Create the custom agent instance ---
story_flow_agent = StoryFlowAgent(
name="StoryFlowAgent",
story_generator=story_generator,
critic=critic,
reviser=reviser,
grammar_check=grammar_check,
tone_check=tone_check,
)
INITIAL_STATE = {"topic": "a brave kitten exploring a haunted house"}
# --- Setup Runner and Session ---
async def setup_session_and_runner():
session_service = InMemorySessionService()
session = await session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID, state=INITIAL_STATE)
logger.info(f"Initial session state: {session.state}")
runner = Runner(
agent=story_flow_agent, # Pass the custom orchestrator agent
app_name=APP_NAME,
session_service=session_service
)
return session_service, runner
# --- Function to Interact with the Agent ---
async def call_agent_async(user_input_topic: str):
"""
Sends a new topic to the agent (overwriting the initial one if needed)
and runs the workflow.
"""
session_service, runner = await setup_session_and_runner()
current_session = await session_service.get_session(app_name=APP_NAME,
user_id=USER_ID,
session_id=SESSION_ID)
if not current_session:
logger.error("Session not found!")
return
current_session.state["topic"] = user_input_topic
logger.info(f"Updated session state topic to: {user_input_topic}")
content = types.Content(role='user', parts=[types.Part(text=f"Generate a story about: {user_input_topic}")])
events = runner.run_async(user_id=USER_ID, session_id=SESSION_ID, new_message=content)
final_response = "No final response captured."
async for event in events:
if event.is_final_response() and event.content and event.content.parts:
logger.info(f"Potential final response from [{event.author}]: {event.content.parts[0].text}")
final_response = event.content.parts[0].text
print("\n--- Agent Interaction Result ---")
print("Agent Final Response: ", final_response)
final_session = await session_service.get_session(app_name=APP_NAME,
user_id=USER_ID,
session_id=SESSION_ID)
print("Final Session State:")
import json
print(json.dumps(final_session.state, indent=2))
print("-------------------------------\n")
# --- Run the Agent ---
# Note: In Colab, you can directly use 'await' at the top level.
# If running this code as a standalone Python script, you'll need to use asyncio.run() or manage the event loop.
await call_agent_async("a lonely robot finding a friend in a junkyard")
# Full runnable code for the StoryFlowAgent example
import com.google.adk.agents.LlmAgent;
import com.google.adk.agents.BaseAgent;
import com.google.adk.agents.InvocationContext;
import com.google.adk.agents.LoopAgent;
import com.google.adk.agents.SequentialAgent;
import com.google.adk.events.Event;
import com.google.adk.runner.InMemoryRunner;
import com.google.adk.sessions.Session;
import com.google.genai.types.Content;
import com.google.genai.types.Part;
import io.reactivex.rxjava3.core.Flowable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
public class StoryFlowAgentExample extends BaseAgent {
// --- Constants ---
private static final String APP_NAME = "story_app";
private static final String USER_ID = "user_12345";
private static final String SESSION_ID = "session_123344";
private static final String MODEL_NAME = "gemini-2.0-flash"; // Ensure this model is available
private static final Logger logger = Logger.getLogger(StoryFlowAgentExample.class.getName());
private final LlmAgent storyGenerator;
private final LoopAgent loopAgent;
private final SequentialAgent sequentialAgent;
public StoryFlowAgentExample(
String name, LlmAgent storyGenerator, LoopAgent loopAgent, SequentialAgent sequentialAgent) {
super(
name,
"Orchestrates story generation, critique, revision, and checks.",
List.of(storyGenerator, loopAgent, sequentialAgent),
null,
null);
this.storyGenerator = storyGenerator;
this.loopAgent = loopAgent;
this.sequentialAgent = sequentialAgent;
}
public static void main(String[] args) {
// --- Define the individual LLM agents ---
LlmAgent storyGenerator =
LlmAgent.builder()
.name("StoryGenerator")
.model(MODEL_NAME)
.description("Generates the initial story.")
.instruction(
"""
You are a story writer. Write a short story (around 100 words) about a cat,
based on the topic: {topic}
""")
.inputSchema(null)
.outputKey("current_story") // Key for storing output in session state
.build();
LlmAgent critic =
LlmAgent.builder()
.name("Critic")
.model(MODEL_NAME)
.description("Critiques the story.")
.instruction(
"""
You are a story critic. Review the story: {current_story}. Provide 1-2 sentences of constructive criticism
on how to improve it. Focus on plot or character.
""")
.inputSchema(null)
.outputKey("criticism") // Key for storing criticism in session state
.build();
LlmAgent reviser =
LlmAgent.builder()
.name("Reviser")
.model(MODEL_NAME)
.description("Revises the story based on criticism.")
.instruction(
"""
You are a story reviser. Revise the story: {current_story}, based on the criticism: {criticism}. Output only the revised story.
""")
.inputSchema(null)
.outputKey("current_story") // Overwrites the original story
.build();
LlmAgent grammarCheck =
LlmAgent.builder()
.name("GrammarCheck")
.model(MODEL_NAME)
.description("Checks grammar and suggests corrections.")
.instruction(
"""
You are a grammar checker. Check the grammar of the story: {current_story}. Output only the suggested
corrections as a list, or output 'Grammar is good!' if there are no errors.
""")
.outputKey("grammar_suggestions")
.build();
LlmAgent toneCheck =
LlmAgent.builder()
.name("ToneCheck")
.model(MODEL_NAME)
.description("Analyzes the tone of the story.")
.instruction(
"""
You are a tone analyzer. Analyze the tone of the story: {current_story}. Output only one word: 'positive' if
the tone is generally positive, 'negative' if the tone is generally negative, or 'neutral'
otherwise.
""")
.outputKey("tone_check_result") // This agent's output determines the conditional flow
.build();
LoopAgent loopAgent =
LoopAgent.builder()
.name("CriticReviserLoop")
.description("Iteratively critiques and revises the story.")
.subAgents(critic, reviser)
.maxIterations(2)
.build();
SequentialAgent sequentialAgent =
SequentialAgent.builder()
.name("PostProcessing")
.description("Performs grammar and tone checks sequentially.")
.subAgents(grammarCheck, toneCheck)
.build();
StoryFlowAgentExample storyFlowAgentExample =
new StoryFlowAgentExample(APP_NAME, storyGenerator, loopAgent, sequentialAgent);
// --- Run the Agent ---
runAgent(storyFlowAgentExample, "a lonely robot finding a friend in a junkyard");
}
// --- Function to Interact with the Agent ---
// Sends a new topic to the agent (overwriting the initial one if needed)
// and runs the workflow.
public static void runAgent(StoryFlowAgentExample agent, String userTopic) {
// --- Setup Runner and Session ---
InMemoryRunner runner = new InMemoryRunner(agent);
Map<String, Object> initialState = new HashMap<>();
initialState.put("topic", "a brave kitten exploring a haunted house");
Session session =
runner
.sessionService()
.createSession(APP_NAME, USER_ID, new ConcurrentHashMap<>(initialState), SESSION_ID)
.blockingGet();
logger.log(Level.INFO, () -> String.format("Initial session state: %s", session.state()));
session.state().put("topic", userTopic); // Update the state in the retrieved session
logger.log(Level.INFO, () -> String.format("Updated session state topic to: %s", userTopic));
Content userMessage = Content.fromParts(Part.fromText("Generate a story about: " + userTopic));
// Use the modified session object for the run
Flowable<Event> eventStream = runner.runAsync(USER_ID, session.id(), userMessage);
final String[] finalResponse = {"No final response captured."};
eventStream.blockingForEach(
event -> {
if (event.finalResponse() && event.content().isPresent()) {
String author = event.author() != null ? event.author() : "UNKNOWN_AUTHOR";
Optional<String> textOpt =
event
.content()
.flatMap(Content::parts)
.filter(parts -> !parts.isEmpty())
.map(parts -> parts.get(0).text().orElse(""));
logger.log(Level.INFO, () ->
String.format("Potential final response from [%s]: %s", author, textOpt.orElse("N/A")));
textOpt.ifPresent(text -> finalResponse[0] = text);
}
});
System.out.println("\n--- Agent Interaction Result ---");
System.out.println("Agent Final Response: " + finalResponse[0]);
// Retrieve session again to see the final state after the run
Session finalSession =
runner
.sessionService()
.getSession(APP_NAME, USER_ID, SESSION_ID, Optional.empty())
.blockingGet();
assert finalSession != null;
System.out.println("Final Session State:" + finalSession.state());
System.out.println("-------------------------------\n");
}
private boolean isStoryGenerated(InvocationContext ctx) {
Object currentStoryObj = ctx.session().state().get("current_story");
return currentStoryObj != null && !String.valueOf(currentStoryObj).isEmpty();
}
@Override
protected Flowable<Event> runAsyncImpl(InvocationContext invocationContext) {
// Implements the custom orchestration logic for the story workflow.
// Uses the instance attributes assigned by Pydantic (e.g., self.story_generator).
logger.log(Level.INFO, () -> String.format("[%s] Starting story generation workflow.", name()));
// Stage 1. Initial Story Generation
Flowable<Event> storyGenFlow = runStage(storyGenerator, invocationContext, "StoryGenerator");
// Stage 2: Critic-Reviser Loop (runs after story generation completes)
Flowable<Event> criticReviserFlow = Flowable.defer(() -> {
if (!isStoryGenerated(invocationContext)) {
logger.log(Level.SEVERE,() ->
String.format("[%s] Failed to generate initial story. Aborting after StoryGenerator.",
name()));
return Flowable.empty(); // Stop further processing if no story
}
logger.log(Level.INFO, () ->
String.format("[%s] Story state after generator: %s",
name(), invocationContext.session().state().get("current_story")));
return runStage(loopAgent, invocationContext, "CriticReviserLoop");
});
// Stage 3: Post-Processing (runs after critic-reviser loop completes)
Flowable<Event> postProcessingFlow = Flowable.defer(() -> {
logger.log(Level.INFO, () ->
String.format("[%s] Story state after loop: %s",
name(), invocationContext.session().state().get("current_story")));
return runStage(sequentialAgent, invocationContext, "PostProcessing");
});
// Stage 4: Conditional Regeneration (runs after post-processing completes)
Flowable<Event> conditionalRegenFlow = Flowable.defer(() -> {
String toneCheckResult = (String) invocationContext.session().state().get("tone_check_result");
logger.log(Level.INFO, () -> String.format("[%s] Tone check result: %s", name(), toneCheckResult));
if ("negative".equalsIgnoreCase(toneCheckResult)) {
logger.log(Level.INFO, () ->
String.format("[%s] Tone is negative. Regenerating story...", name()));
return runStage(storyGenerator, invocationContext, "StoryGenerator (Regen)");
} else {
logger.log(Level.INFO, () ->
String.format("[%s] Tone is not negative. Keeping current story.", name()));
return Flowable.empty(); // No regeneration needed
}
});
return Flowable.concatArray(storyGenFlow, criticReviserFlow, postProcessingFlow, conditionalRegenFlow)
.doOnComplete(() -> logger.log(Level.INFO, () -> String.format("[%s] Workflow finished.", name())));
}
// Helper method for a single agent run stage with logging
private Flowable<Event> runStage(BaseAgent agentToRun, InvocationContext ctx, String stageName) {
logger.log(Level.INFO, () -> String.format("[%s] Running %s...", name(), stageName));
return agentToRun
.runAsync(ctx)
.doOnNext(event ->
logger.log(Level.INFO,() ->
String.format("[%s] Event from %s: %s", name(), stageName, event.toJson())))
.doOnError(err ->
logger.log(Level.SEVERE,
String.format("[%s] Error in %s", name(), stageName), err))
.doOnComplete(() ->
logger.log(Level.INFO, () ->
String.format("[%s] %s finished.", name(), stageName)));
}
@Override
protected Flowable<Event> runLiveImpl(InvocationContext invocationContext) {
return Flowable.error(new UnsupportedOperationException("runLive not implemented."));
}
}