.agents/skills/adk-workflow/references/parallel-and-fanout.md
Execute multiple nodes concurrently and collect their results.
from google.adk.workflow import Workflow
from google.adk.workflow._parallel_worker import ParallelWorker
from google.adk.workflow import JoinNode
from google.adk.workflow import node
Send output to multiple nodes simultaneously using tuple syntax:
def analyze_text(node_input: str) -> str:
return f"Analysis: {node_input}"
def translate_text(node_input: str) -> str:
return f"Translation: {node_input}"
def summarize_text(node_input: str) -> str:
return f"Summary: {node_input}"
agent = Workflow(
name="fan_out",
edges=[
('START', (analyze_text, translate_text, summarize_text)),
],
)
Each branch receives the same input and runs concurrently.
Collect outputs from multiple branches before continuing:
join = JoinNode(name="collect_results")
agent = Workflow(
name="fan_out_fan_in",
edges=[
('START', (analyze_text, translate_text, summarize_text)),
((analyze_text, translate_text, summarize_text), join),
(join, final_processor),
],
)
JoinNode outputs a dictionary mapping predecessor names to their outputs:
# JoinNode output:
# {
# "analyze_text": "Analysis: hello",
# "translate_text": "Translation: hello",
# "summarize_text": "Summary: hello",
# }
def final_processor(node_input: dict) -> str:
analysis = node_input["analyze_text"]
translation = node_input["translate_text"]
summary = node_input["summarize_text"]
return f"Combined: {analysis}, {translation}, {summary}"
Serialization warning: JoinNode stores partial inputs in session state while waiting. If predecessors are LLM agents without output_schema, the stored values are types.Content objects which are not JSON-serializable. This causes TypeError with SQLite/database session services. Fix: use output_schema on LLM agents feeding into a JoinNode.
Apply the same node to each item in a list concurrently:
def process_item(node_input: int) -> int:
return node_input * 2
parallel = ParallelWorker(node(process_item))
def produce_list(node_input: str) -> list:
return [1, 2, 3, 4, 5]
agent = Workflow(
name="parallel_processing",
edges=[
('START', produce_list),
(produce_list, parallel),
],
)
# Output: [2, 4, 6, 8, 10]
{parent_name}__{index} (e.g., process_item__0)rerun_on_resume=True@node(parallel_worker=True)
def process_item(node_input: int) -> int:
return node_input * 2
# Equivalent to: ParallelWorker(FunctionNode(process_item_fn))
Set parallel_worker=True directly on an Agent:
from google.adk import Agent
explain_topic = Agent(
name="explain_topic",
instruction="Explain how this topic relates to the original topic: \"{topic}\".",
output_schema=TopicExplanation,
parallel_worker=True, # Each list item processed by a cloned agent
)
agent = Workflow(
name="parallel_analysis",
edges=[
('START', process_input, find_related_topics, explain_topic, aggregate),
],
)
Or wrap manually:
parallel_analyzer = ParallelWorker(analyzer)
Do NOT use parallel_worker=True on fan-out nodes. Fan-out edges (a, (b, c, d)) already run nodes in parallel. Adding parallel_worker=True makes the node expect a list input and iterate over it — if it receives a single value or None, it produces no output and the JoinNode gets nothing.
Fan-out branches that all feed a single downstream node. The downstream node is triggered once per branch:
async def send_message(node_input: Any):
yield Event(message=f"Triggered for input: {node_input}")
agent = Workflow(
name="root_agent",
edges=[(
"START",
(make_uppercase, count_characters, reverse_string),
send_message,
)],
input_schema=str,
)
This differs from JoinNode: here send_message fires 3 times (once per branch), while JoinNode waits for all branches and fires once with a merged dict.
Fan-out then fan-in (diamond shape):
def splitter(node_input: str) -> str:
return node_input
def branch_a(node_input: str) -> str:
return f"A: {node_input}"
def branch_b(node_input: str) -> str:
return f"B: {node_input}"
join = JoinNode(name="merge")
def combiner(node_input: dict) -> str:
return f"Combined: {node_input['branch_a']} + {node_input['branch_b']}"
agent = Workflow(
name="diamond",
edges=[
('START', splitter),
(splitter, (branch_a, branch_b)),
((branch_a, branch_b), join),
(join, combiner),
],
)
Convenience subclasses for common patterns:
from google.adk.agents.sequential_agent import SequentialAgent
from google.adk.agents.parallel_agent import ParallelAgent
# Sequential: runs sub_agents in order
pipeline = SequentialAgent(
name="pipeline",
sub_agents=[writer_agent, reviewer_agent, editor_agent],
)
# Equivalent to: START -> writer -> reviewer -> editor
# Parallel: runs sub_agents concurrently
parallel = ParallelAgent(
name="concurrent",
sub_agents=[analyzer_agent, translator_agent, summarizer_agent],
)
# Equivalent to: START -> (analyzer, translator, summarizer)