Back to Mastra

Reference: Run.stream() | Streaming

docs/src/content/en/reference/streaming/workflows/stream.mdx

2025-12-185.5 KB
Original Source

Run.stream()

The .stream() method enables real-time streaming of responses from a workflow. It returns a ReadableStream of events directly.

Usage example

typescript
const run = await workflow.createRun()

const stream = await run.stream({
  inputData: {
    value: 'initial data',
  },
})

for await (const chunk of stream) {
  console.log(chunk)
}

Parameters

<PropertiesTable content={[ { name: 'inputData', type: 'z.infer<TInput>', description: "Input data that matches the workflow's input schema", isOptional: true, }, { name: 'requestContext', type: 'RequestContext', description: 'Request Context data to use during workflow execution', isOptional: true, }, { name: 'tracingContext', type: 'TracingContext', isOptional: true, description: 'Tracing context for creating child spans and adding metadata.', properties: [ { parameters: [ { name: 'currentSpan', type: 'Span', isOptional: true, description: 'Current span for creating child spans and adding metadata.', }, ], }, ], }, { name: 'tracingOptions', type: 'TracingOptions', isOptional: true, description: 'Options for Tracing configuration.', properties: [ { parameters: [ { name: 'metadata', type: 'Record<string, any>', isOptional: true, description: 'Metadata to add to the root trace span.', }, ], }, { parameters: [ { name: 'requestContextKeys', type: 'string[]', isOptional: true, description: "Additional RequestContext keys to extract as metadata for this trace. Supports dot notation for nested values (e.g., 'user.id').", }, ], }, { parameters: [ { name: 'traceId', type: 'string', isOptional: true, description: 'Trace ID to use for this execution (1-32 hexadecimal characters). If provided, this trace will be part of the specified trace.', }, ], }, { parameters: [ { name: 'parentSpanId', type: 'string', isOptional: true, description: 'Parent span ID to use for this execution (1-16 hexadecimal characters). If provided, the root span will be created as a child of this span.', }, ], }, { parameters: [ { name: 'tags', type: 'string[]', isOptional: true, description: 'Tags to apply to this trace. String labels for categorizing and filtering traces.', }, ], }, ], }, { name: 'closeOnSuspend', type: 'boolean', description: 'Whether to close the stream when the workflow is suspended, or to keep the stream open until the workflow is finished (by success or error). Default value is true.', isOptional: true, }, ]} />

Returns

Returns a WorkflowRunOutput object that implements the async iterable interface (can be used directly in for await...of loops) and provides access to the stream and workflow execution results.

<PropertiesTable content={[ { name: 'fullStream', type: 'ReadableStream<WorkflowStreamEvent>', description: 'A ReadableStream of workflow events that you can iterate over to track progress in real-time. You can also iterate over the WorkflowRunOutput object directly.', }, { name: 'result', type: 'Promise<WorkflowResult<TState, TInput, TOutput, TSteps>>', description: 'A promise that resolves to the final workflow result', }, { name: 'status', type: 'WorkflowRunStatus', description: "The current workflow run status ('running', 'suspended', 'success', 'failed', 'canceled', or 'tripwire')", }, { name: 'usage', type: 'Promise<{ inputTokens: number; outputTokens: number; totalTokens: number, reasoningTokens?: number, cachedInputTokens?: number }>', description: 'A promise that resolves to token usage statistics', }, ]} />

Extended usage example

typescript
const run = await workflow.createRun()

const stream = run.stream({
  inputData: {
    value: 'initial data',
  },
})

// Iterate over stream events (you can iterate over stream directly or use stream.fullStream)
for await (const chunk of stream) {
  console.log(chunk)
}

// Access the final result
const result = await stream.result
console.log('Final result:', result)

// Access token usage
const usage = await stream.usage
console.log('Token usage:', usage)

// Check current status
console.log('Status:', stream.status)

Stream events

The stream emits various event types during workflow execution. Each event has a type field and a payload containing relevant data:

  • workflow-start: Workflow execution begins
  • workflow-step-start: A step begins execution
  • workflow-step-output: Custom output from a step
  • workflow-step-progress: A foreach step reports per-iteration progress (includes completedCount, totalCount, currentIndex, iterationStatus, and optional iterationOutput)
  • workflow-step-result: A step completes with results
  • workflow-finish: Workflow execution completes with usage statistics