.agents/skills/adk-architecture/references/interfaces/workflow.md
Workflow is a graph-based orchestration node. It extends BaseNode
and implements _run_impl() as a scheduling loop that drives static
graph nodes and tracks dynamic nodes spawned by ctx.run_node().
Workflow manages two kinds of child nodes:
edges, compiled into a
WorkflowGraph. Scheduled by the orchestration loop via triggers
and asyncio.Tasks. Tracked in _LoopState.nodes by node name.ctx.run_node() from
inside a graph node's _run_impl. Tracked in
_LoopState.dynamic_nodes by full node_path. Managed by
DynamicNodeScheduler.Static and dynamic nodes share the same _LoopState.interrupt_ids
set, so the Workflow sees a unified view of all pending interrupts.
A graph node is a regular BaseNode placed in a Workflow's edges.
The Workflow wraps it in a NodeRunner, creates a child Context, and
reads ctx.output, ctx.route, and ctx.interrupt_ids after it
completes.
Output — two paths. At most one per execution. The Workflow reads the output to pass downstream.
# Yield (persisted immediately)
async def _run_impl(self, *, ctx, node_input):
yield compute(node_input)
# ctx (deferred until node end)
async def _run_impl(self, *, ctx, node_input):
ctx.output = compute(node_input)
return
yield
Routing — two paths. The Workflow uses the route to select conditional edges.
# Yield (persisted immediately)
async def _run_impl(self, *, ctx, node_input):
yield Event(route='approve' if node_input > 0.8 else 'reject')
# ctx (deferred until node end)
async def _run_impl(self, *, ctx, node_input):
ctx.route = 'approve' if node_input > 0.8 else 'reject'
yield node_input
State — two paths. ctx.state deltas are flushed onto the next
yielded Event, or a final Event at node end.
# Yield (persisted immediately)
async def _run_impl(self, *, ctx, node_input):
yield Event(state={'count': 1})
# ctx (flushed onto next/final Event)
async def _run_impl(self, *, ctx, node_input):
ctx.state['count'] = 1
yield result
Interrupts — yield only (ctx.interrupt_ids is read-only). The
Workflow marks the node WAITING and propagates the interrupt IDs
upward. On resume, if rerun_on_resume=True (default for Workflow),
the node is re-executed with ctx.resume_inputs populated.
async def _run_impl(self, *, ctx, node_input):
if ctx.resume_inputs and 'fc-1' in ctx.resume_inputs:
yield f'approved: {ctx.resume_inputs["fc-1"]}'
return
yield Event(long_running_tool_ids={'fc-1'})
A graph node can spawn child nodes at runtime:
class Orchestrator(BaseNode):
rerun_on_resume: bool = True # required
async def _run_impl(self, *, ctx, node_input):
result = await ctx.run_node(some_node, input_data)
yield f'child returned: {result}'
rerun_on_resume = True. Without
this, the Workflow cannot re-execute the node on resume to let it
re-acquire its dynamic children's results.Dynamic nodes are tracked by full node_path, not by name alone.
The path is parent_path/child_name:
wf/graph_node_a/dynamic_child ← dynamic node under graph_node_a
wf/graph_node_a/dynamic_child/inner ← transitive dynamic node
The child_name comes from either:
name parameter on ctx.run_node(node, name='explicit')name field (default)Each unique node_path is tracked exactly once in
_LoopState.dynamic_nodes. This enables:
When ctx.run_node() is called, the scheduler checks three cases:
Fresh — no prior events for this node_path. Execute via
NodeRunner, record output or interrupts in _LoopState.
Completed — prior events show the node produced output. Return cached output immediately. No re-execution.
Waiting — prior events show the node was interrupted:
_LoopState.interrupt_ids). The caller raises
NodeInterruptedError.resume_inputs from the
resolved function responses.State reconstruction is lazy: the scheduler scans session events
only on the first ctx.run_node() call for a given path, not
upfront. This avoids scanning for dynamic nodes that won't be
re-invoked.
When a dynamic child interrupts:
DynamicNodeScheduler._record_result sets the child's status
to WAITING and adds its interrupt IDs to
_LoopState.interrupt_ids.ctx.run_node() checks child_ctx.interrupt_ids. If non-empty,
it propagates them to the calling node's ctx._interrupt_ids
and raises NodeInterruptedError.NodeInterruptedError in _execute_node and
records the interrupt on the calling node's Context._handle_completion sees the interrupt and marks
the graph node as WAITING.On resume, the Workflow re-executes the graph node (because
rerun_on_resume=True). The graph node calls ctx.run_node()
again, which hits the scheduler. The scheduler lazily scans events,
finds the resolved FR, and either returns cached output or
re-executes the dynamic child with resume_inputs.
ctx.run_node(node, use_as_output=True) makes the dynamic child's
output count as the calling node's output:
class Delegator(BaseNode):
rerun_on_resume: bool = True
async def _run_impl(self, *, ctx, node_input):
# child's output becomes this node's output
await ctx.run_node(worker, node_input, use_as_output=True)
ctx._output_delegated = True on the parentevent.node_info.output_for with ancestor pathsuse_as_output=True per execution (second raises
ValueError)A dynamic node can itself call ctx.run_node(), creating a
transitive chain:
class Outer(BaseNode):
rerun_on_resume: bool = True
async def _run_impl(self, *, ctx, node_input):
result = await ctx.run_node(Inner(name='inner'), 'data')
yield result
class Inner(BaseNode):
rerun_on_resume: bool = True
async def _run_impl(self, *, ctx, node_input):
sub = await ctx.run_node(Leaf(name='leaf'), node_input)
yield f'inner got: {sub}'
This works because:
node_path:
wf/graph_node/outer/inner/leafEach Workflow has its own DynamicNodeScheduler and _LoopState.
A nested Workflow creates a new scheduler, so dynamic nodes within
it are scoped to that inner Workflow — not mixed with the outer
Workflow's state.
Workflow sets ctx.event_author = self.name at the start of
_run_impl. This propagates to all child Contexts via NodeRunner.
All events emitted by children carry this author, giving the UI
consistent attribution.
An inner orchestration node (nested Workflow, SingleAgentReactNode)
overrides event_author with its own name, so events are attributed
to the nearest orchestration ancestor.
_run_impl
├─ SETUP: resume from events OR seed start triggers
├─ ctx._schedule_dynamic_node_internal = DynamicNodeScheduler
├─ LOOP:
│ ├─ _schedule_ready_nodes → pop triggers, create NodeRunners
│ ├─ asyncio.wait(FIRST_COMPLETED)
│ └─ _handle_completion → update state, buffer downstream
├─ await dynamic_pending_tasks
├─ _collect_remaining_interrupts
└─ FINALIZE: set ctx.output or ctx._interrupt_ids
Key behaviors:
max_concurrency limits parallel graph nodes.
Dynamic nodes are excluded (they run inline, throttling would
deadlock).output_for. Only one terminal node may produce output.On resume (ctx.resume_inputs is non-empty), the Workflow
reconstructs static node states from session events:
resume_inputs)rerun_on_resume=True (e.g., nested Workflow): re-run with
partial resume_inputs so it can dispatch resolved
grandchildren internally. Remaining interrupts propagate
back up.rerun_on_resume=False: stay WAITING until all interrupts
are resolved.resume_inputs.Dynamic node state is not scanned upfront — it's lazily
reconstructed by DynamicNodeScheduler when ctx.run_node() is
called during the re-execution.
Set rerun_on_resume = True if your node calls
ctx.run_node(). The Workflow must be able to re-execute your
node so it can re-acquire dynamic children's results.
Use deterministic names for dynamic children. The name
parameter on ctx.run_node() determines the node_path, which
is the dedup/resume key. Non-deterministic names break resume.
Always await ctx.run_node() directly. Do not wrap in
asyncio.create_task() — the task won't be tracked by the
scheduler, errors are swallowed, and cancellation on interrupt
won't work.
Yield output after all dynamic children complete. If your
node calls ctx.run_node() and then yields, the output is
emitted only after all children finish. This is the expected
pattern.
Handle NodeInterruptedError only if you need custom logic.
Normally, ctx.run_node() raises NodeInterruptedError when a
child interrupts. NodeRunner catches it automatically. Only
catch it yourself if you need to clean up or adjust state before
the interrupt propagates.
Don't set ctx.event_author unless your node is an
orchestration node (like Workflow or SingleAgentReactNode). The
Workflow sets it for you and it propagates to all descendants.