docs/src/content/en/reference/streaming/agents/stream.mdx
import { MODEL_SETTINGS_OBJECT } from "@site/src/components/ModelSettingsProperties";
The .stream() method enables real-time streaming of responses from an agent with enhanced capabilities and format flexibility. This method accepts messages and optional streaming options, providing a next-generation streaming experience with support for both Mastra's native format and AI SDK v5+ compatibility.
const stream = await agent.stream('message for agent')
:::info
Model Compatibility: This method is designed for V2 models. V1 models should use the .streamLegacy() method. The framework automatically detects your model version and will throw an error if there's a mismatch.
:::
<PropertiesTable
content={[
{
name: 'messages',
type: 'string | string[] | CoreMessage[] | AiMessageType[] | UIMessageWithMetadata[]',
description:
'The messages to send to the agent. Can be a single string, array of strings, or structured message objects.',
},
{
name: 'options',
type: 'AgentExecutionOptions<Output, Format>',
isOptional: true,
description: 'Optional configuration for the streaming process.',
properties: [
{
type: 'AgentExecutionOptions<Output, Format>',
parameters: [
{
name: 'maxSteps',
type: 'number',
isOptional: true,
description: 'Maximum number of steps to run during execution.',
},
{
name: 'scorers',
type: "MastraScorers | Record<string, { scorer: MastraScorer['name']; sampling?: ScoringSamplingConfig }>",
isOptional: true,
description: 'Evaluation scorers to run on the execution results.',
properties: [
{
parameters: [
{
name: 'scorer',
type: 'string',
isOptional: false,
description: 'Name of the scorer to use.',
},
],
},
{
parameters: [
{
name: 'sampling',
type: 'ScoringSamplingConfig',
isOptional: true,
description: 'Sampling configuration for the scorer.',
properties: [
{
parameters: [
{
name: 'type',
type: "'none' | 'ratio'",
isOptional: false,
description:
"Type of sampling strategy. Use 'none' to disable sampling or 'ratio' for percentage-based sampling.",
},
],
},
{
parameters: [
{
name: 'rate',
type: 'number',
isOptional: true,
description: "Sampling rate (0-1). Required when type is 'ratio'.",
},
],
},
],
},
],
},
],
},
{
name: 'onIterationComplete',
type: '(context: IterationCompleteContext) => { continue?: boolean; feedback?: string } | void | Promise<{ continue?: boolean; feedback?: string } | void>',
isOptional: true,
description:
'Callback function called after each iteration completes. Use this to monitor progress, provide feedback to guide the agent, or stop execution early. The callback receives context about the iteration including the current text, tool calls, and finish reason.',
properties: [
{
parameters: [
{
name: 'context.iteration',
type: 'number',
description: 'Current iteration number (1-based).',
},
],
},
{
parameters: [
{
name: 'context.maxIterations',
type: 'number | undefined',
description: 'Maximum iterations allowed (if set).',
},
],
},
{
parameters: [
{
name: 'context.text',
type: 'string',
description: 'The text response from this iteration.',
},
],
},
{
parameters: [
{
name: 'context.isFinal',
type: 'boolean',
description: 'Whether this is the final iteration.',
},
],
},
{
parameters: [
{
name: 'context.finishReason',
type: 'string',
description: "Reason why this iteration finished (e.g., 'stop', 'length', 'tool-calls').",
},
],
},
{
parameters: [
{
name: 'context.toolCalls',
type: 'ToolCall[]',
description: 'Tool calls made in this iteration.',
},
],
},
{
parameters: [
{
name: 'context.messages',
type: 'MastraDBMessage[]',
description: 'All messages accumulated so far.',
},
],
},
{
parameters: [
{
name: 'return.continue',
type: 'boolean',
isOptional: true,
description: 'Set to false to stop execution early.',
},
],
},
{
parameters: [
{
name: 'return.feedback',
type: 'string',
isOptional: true,
description: "Feedback message to guide the agent's next iteration.",
},
],
},
],
},
{
name: 'isTaskComplete',
type: 'IsTaskCompleteConfig',
isOptional: true,
description:
"Task completion scoring configuration that validates whether the task is complete. Uses Mastra's evaluation scorers to automatically check if the agent's response satisfies the completion criteria.",
properties: [
{
parameters: [
{
name: 'scorers',
type: 'MastraScorer[]',
description:
'Array of scorers that evaluate task completion. Each scorer returns 0 (failed) or 1 (passed).',
},
],
},
{
parameters: [
{
name: 'strategy',
type: "'all' | 'any'",
isOptional: true,
defaultValue: "'all'",
description:
"Strategy for combining scorer results. 'all' requires all scorers to pass, 'any' requires at least one.",
},
],
},
{
parameters: [
{
name: 'onComplete',
type: '(result: IsTaskCompleteRunResult) => void | Promise<void>',
isOptional: true,
description:
'Callback called when the task completion check finishes. Receives the result with individual scorer scores.',
},
],
},
{
parameters: [
{
name: 'parallel',
type: 'boolean',
isOptional: true,
defaultValue: 'true',
description: 'Whether to run scorers in parallel.',
},
],
},
{
parameters: [
{
name: 'timeout',
type: 'number',
isOptional: true,
defaultValue: '600000',
description: 'Maximum time in milliseconds to wait for all scorers to complete.',
},
],
},
],
},
{
name: 'delegation',
type: 'DelegationConfig',
isOptional: true,
description:
'Configuration for subagent delegation. Use this to control and monitor when the agent delegates tasks to other agents, including the ability to modify, reject delegations, and provide feedback to guide the supervisor.',
properties: [
{
parameters: [
{
name: 'onDelegationStart',
type: '(context: DelegationStartContext) => DelegationStartResult | void | Promise<DelegationStartResult | void>',
isOptional: true,
description:
'Called before delegating to a subagent. Use this to modify the delegation parameters or reject the delegation entirely.',
},
],
},
{
parameters: [
{
name: 'onDelegationComplete',
type: '(context: DelegationCompleteContext) => { feedback?: string } | void | Promise<{ feedback?: string } | void>',
isOptional: true,
description:
"Called after a subagent delegation completes. The context includes a bail() method to stop further execution, and you can return { feedback } to guide the supervisor's next action. Feedback is saved to supervisor memory as an assistant message.",
},
],
},
{
parameters: [
{
name: 'messageFilter',
type: '(context: MessageFilterContext) => MastraDBMessage[] | Promise<MastraDBMessage[]>',
isOptional: true,
description:
'Callback function called before delegating to a subagent. Use this to filter the messages that are passed to the subagent.',
},
],
},
],
},
{
name: 'tracingContext',
type: 'TracingContext',
isOptional: true,
description: 'Tracing context for span hierarchy and metadata.',
},
{
name: 'returnScorerData',
type: 'boolean',
isOptional: true,
description: 'Whether to return detailed scoring data in the response.',
},
{
name: 'onChunk',
type: '(chunk: ChunkType) => Promise<void> | void',
isOptional: true,
description: 'Callback function called for each chunk during streaming.',
},
{
name: 'onError',
type: '({ error }: { error: Error | string }) => Promise<void> | void',
isOptional: true,
description: 'Callback function called when an error occurs during streaming.',
},
{
name: 'onAbort',
type: '(event: any) => Promise<void> | void',
isOptional: true,
description: 'Callback function called when the stream is aborted.',
},
{
name: 'abortSignal',
type: 'AbortSignal',
isOptional: true,
description:
"Signal object that allows you to abort the agent's execution. When the signal is aborted, all ongoing operations will be terminated.",
},
{
name: 'activeTools',
type: 'Array<keyof ToolSet> | undefined',
isOptional: true,
description: 'Array of active tool names that can be used during execution.',
},
{
name: 'prepareStep',
type: 'PrepareStepFunction<any>',
isOptional: true,
description: 'Callback function called before each step of multi-step execution.',
},
{
name: 'context',
type: 'ModelMessage[]',
isOptional: true,
description: 'Additional context messages to provide to the agent.',
},
{
name: 'structuredOutput',
type: 'StructuredOutputOptions<S extends ZodTypeAny = ZodTypeAny>',
isOptional: true,
description: 'Options to fine tune your structured output generation.',
properties: [
{
parameters: [
{
name: 'schema',
type: 'z.ZodSchema<S>',
isOptional: false,
description: 'Zod schema defining the expected output structure.',
},
],
},
{
parameters: [
{
name: 'model',
type: 'MastraLanguageModel',
isOptional: true,
description:
'Language model to use for structured output generation. If provided, enables the agent to respond in multi step with tool calls, text, and structured output',
},
],
},
{
parameters: [
{
name: 'errorStrategy',
type: "'strict' | 'warn' | 'fallback'",
isOptional: true,
description:
"Strategy for handling schema validation errors. 'strict' throws errors, 'warn' logs warnings, 'fallback' uses fallback values.",
},
],
},
{
parameters: [
{
name: 'fallbackValue',
type: '<S extends ZodTypeAny>',
isOptional: true,
description: "Fallback value to use when schema validation fails and errorStrategy is 'fallback'.",
},
],
},
{
parameters: [
{
name: 'instructions',
type: 'string',
isOptional: true,
description: 'Additional instructions for the structured output model.',
},
],
},
{
parameters: [
{
name: 'jsonPromptInjection',
type: 'boolean',
isOptional: true,
description:
'Injects system prompt into the main agent instructing it to return structured output, useful for when a model does not natively support structured outputs.',
},
],
},
{
parameters: [
{
name: 'providerOptions',
type: 'ProviderOptions',
isOptional: true,
description:
"Provider-specific options passed to the internal structuring agent. Use this to control model behavior like reasoning effort for thinking models (e.g., { openai: { reasoningEffort: 'low' } }).",
},
],
},
],
},
{
name: 'outputProcessors',
type: 'Processor[]',
isOptional: true,
description:
'Overrides the output processors set on the agent. Output processors that can modify or validate messages from the agent before they are returned to the user. Must implement either (or both) of the processOutputResult and processOutputStream functions.',
},
{
name: 'includeRawChunks',
type: 'boolean',
isOptional: true,
description: 'Whether to include raw chunks in the stream output (not available on all model providers).',
},
{
name: 'inputProcessors',
type: 'Processor[]',
isOptional: true,
description:
'Overrides the input processors set on the agent. Input processors that can modify or validate messages before they are processed by the agent. Must implement the processInput function.',
},
{
name: 'instructions',
type: 'string',
isOptional: true,
description:
"Custom instructions that override the agent's default instructions for this specific generation. Useful for dynamically modifying agent behavior without creating a new agent instance.",
},
{
name: 'system',
type: 'string | string[] | CoreSystemMessage | SystemModelMessage | CoreSystemMessage[] | SystemModelMessage[]',
isOptional: true,
description:
"Custom system message(s) to include in the prompt. Can be a single string, message object, or array of either. System messages provide additional context or behavior instructions that supplement the agent's main instructions.",
},
{
name: 'output',
type: 'Zod schema | JsonSchema7',
isOptional: true,
description:
'Deprecated. Use structuredOutput without a model to achieve the same thing. Defines the expected structure of the output. Can be a JSON Schema object or a Zod schema.',
},
{
name: 'memory',
type: 'object',
isOptional: true,
description: 'Configuration for memory. This is the preferred way to manage memory.',
properties: [
{
parameters: [
{
name: 'thread',
type: 'string | { id: string; metadata?: Record<string, any>, title?: string }',
isOptional: false,
description:
'The conversation thread, as a string ID or an object with an id and optional metadata.',
},
],
},
{
parameters: [
{
name: 'resource',
type: 'string',
isOptional: false,
description: 'Identifier for the user or resource associated with the thread.',
},
],
},
{
parameters: [
{
name: 'options',
type: 'MemoryConfig',
isOptional: true,
description:
'Configuration for memory behavior including lastMessages, readOnly, semanticRecall, and workingMemory.',
},
],
},
],
},
{
name: 'onFinish',
type: 'StreamTextOnFinishCallback<any> | StreamObjectOnFinishCallback<OUTPUT>',
isOptional: true,
description: 'Callback function called when streaming completes. Receives the final result.',
},
{
name: 'onStepFinish',
type: 'StreamTextOnStepFinishCallback<any> | never',
isOptional: true,
description:
'Callback function called after each execution step. Receives step details as a JSON string. Unavailable for structured output',
},
{
name: 'telemetry',
type: 'TelemetrySettings',
isOptional: true,
description: 'Settings for OTLP telemetry collection during streaming (not Tracing).',
properties: [
{
parameters: [
{
name: 'isEnabled',
type: 'boolean',
isOptional: true,
description: 'Enable or disable telemetry. Disabled by default while experimental.',
},
],
},
{
parameters: [
{
name: 'recordInputs',
type: 'boolean',
isOptional: true,
description:
'Enable or disable input recording. Enabled by default. You might want to disable input recording to avoid recording sensitive information.',
},
],
},
{
parameters: [
{
name: 'recordOutputs',
type: 'boolean',
isOptional: true,
description:
'Enable or disable output recording. Enabled by default. You might want to disable output recording to avoid recording sensitive information.',
},
],
},
{
parameters: [
{
name: 'functionId',
type: 'string',
isOptional: true,
description: 'Identifier for this function. Used to group telemetry data by function.',
},
],
},
],
},
MODEL_SETTINGS_OBJECT,
{
name: 'toolChoice',
type: "'auto' | 'none' | 'required' | { type: 'tool'; toolName: string }",
isOptional: true,
defaultValue: "'auto'",
description: 'Controls how the agent uses tools during streaming.',
properties: [
{
parameters: [
{
name: "'auto'",
type: 'string',
description: 'Let the model decide whether to use tools (default).',
},
],
},
{
parameters: [
{
name: "'none'",
type: 'string',
description: 'Do not use any tools.',
},
],
},
{
parameters: [
{
name: "'required'",
type: 'string',
description: 'Require the model to use at least one tool.',
},
],
},
{
parameters: [
{
name: "{ type: 'tool'; toolName: string }",
type: 'object',
description: 'Require the model to use a specific tool by name.',
},
],
},
],
},
{
name: 'toolsets',
type: 'ToolsetsInput',
isOptional: true,
description: 'Additional toolsets to make available to the agent during streaming.',
},
{
name: 'clientTools',
type: 'ToolsInput',
isOptional: true,
description:
"Tools that are executed on the 'client' side of the request. These tools do not have execute functions in the definition.",
},
{
name: 'savePerStep',
type: 'boolean',
isOptional: true,
description: 'Save messages incrementally after each stream step completes (default: false).',
},
{
name: 'requireToolApproval',
type: 'boolean',
isOptional: true,
description:
'When true, all tool calls require explicit approval before execution. The stream will emit tool-call-approval chunks and pause until approveToolCall() or declineToolCall() is called.',
},
{
name: 'autoResumeSuspendedTools',
type: 'boolean',
isOptional: true,
description:
"When true, automatically resumes suspended tools when the user sends a new message on the same thread. The agent extracts resumeData from the user's message based on the tool's resumeSchema. Requires memory to be configured.",
},
{
name: 'toolCallConcurrency',
type: 'number',
isOptional: true,
description:
'Maximum number of tool calls to execute concurrently. Defaults to 1 when approval may be required, otherwise 10.',
},
{
name: 'providerOptions',
type: 'Record<string, Record<string, JSONValue>>',
isOptional: true,
description:
"Additional provider-specific options that are passed through to the underlying LLM provider. The structure is { providerName: { optionKey: value } }. For example: { openai: { reasoningEffort: 'high' }, anthropic: { maxTokens: 1000 } }.",
properties: [
{
parameters: [
{
name: 'openai',
type: 'Record<string, JSONValue>',
isOptional: true,
description: "OpenAI-specific options. Example: { reasoningEffort: 'high' }",
},
],
},
{
parameters: [
{
name: 'anthropic',
type: 'Record<string, JSONValue>',
isOptional: true,
description: 'Anthropic-specific options. Example: { maxTokens: 1000 }',
},
],
},
{
parameters: [
{
name: 'google',
type: 'Record<string, JSONValue>',
isOptional: true,
description: 'Google-specific options. Example: { safetySettings: [...] }',
},
],
},
{
parameters: [
{
name: '[providerName]',
type: 'Record<string, JSONValue>',
isOptional: true,
description:
'Other provider-specific options. The key is the provider name and the value is a record of provider-specific options.',
},
],
},
],
},
{
name: 'runId',
type: 'string',
isOptional: true,
description: 'Unique ID for this generation run. Useful for tracking and debugging purposes.',
},
{
name: 'requestContext',
type: 'RequestContext',
isOptional: true,
description: 'Request Context for dependency injection and contextual information.',
},
{
name: 'tracingContext',
type: 'TracingContext',
isOptional: true,
description:
"Tracing context for creating child spans and adding metadata. Automatically injected when using Mastra's tracing system.",
properties: [
{
parameters: [
{
name: 'currentSpan',
type: 'Span',
isOptional: true,
description:
'Current span for creating child spans and adding metadata. Use this to create custom child spans or update span attributes during execution.',
},
],
},
],
},
{
name: 'tracingOptions',
type: 'TracingOptions',
isOptional: true,
description: 'Options for Tracing configuration.',
properties: [
{
parameters: [
{
name: 'metadata',
type: 'Record<string, any>',
isOptional: true,
description:
'Metadata to add to the root trace span. Useful for adding custom attributes like user IDs, session IDs, or feature flags.',
},
],
},
{
parameters: [
{
name: 'requestContextKeys',
type: 'string[]',
isOptional: true,
description:
"Additional RequestContext keys to extract as metadata for this trace. Supports dot notation for nested values (e.g., 'user.id').",
},
],
},
{
parameters: [
{
name: 'traceId',
type: 'string',
isOptional: true,
description:
'Trace ID to use for this execution (1-32 hexadecimal characters). If provided, this trace will be part of the specified trace.',
},
],
},
{
parameters: [
{
name: 'parentSpanId',
type: 'string',
isOptional: true,
description:
'Parent span ID to use for this execution (1-16 hexadecimal characters). If provided, the root span will be created as a child of this span.',
},
],
},
{
parameters: [
{
name: 'tags',
type: 'string[]',
isOptional: true,
description: 'Tags to apply to this trace. String labels for categorizing and filtering traces.',
},
],
},
],
},
],
},
],
},
]}
/>
<PropertiesTable content={[ { name: 'stream', type: 'MastraModelOutput<Output>', description: 'Returns a MastraModelOutput instance that provides access to the streaming output.', }, { name: 'traceId', type: 'string', isOptional: true, description: 'The trace ID associated with this execution when Tracing is enabled. Use this to correlate logs and debug execution flow.', }, { name: 'spanId', type: 'string', isOptional: true, description: 'The root span ID associated with this execution when Tracing is enabled. Use this for span-level lookup and correlation.', }, ]} />
import { stepCountIs } from 'ai-v5'
const stream = await agent.stream('Tell me a story', {
stopWhen: stepCountIs(3), // Stop after 3 steps
modelSettings: {
temperature: 0.7,
},
})
// Access text stream
for await (const chunk of stream.textStream) {
console.log(chunk)
}
// or access full stream
for await (const chunk of stream.fullStream) {
console.log(chunk)
}
// Get full text after streaming
const fullText = await stream.text
To use the stream with AI SDK v5 (and later), you can convert it using our utility function toAISdkStream.
import { stepCountIs, createUIMessageStreamResponse } from 'ai'
import { toAISdkStream } from '@mastra/ai-sdk'
const stream = await agent.stream('Tell me a story', {
stopWhen: stepCountIs(3), // Stop after 3 steps
modelSettings: {
temperature: 0.7,
},
})
// In an API route for frontend integration
return createUIMessageStreamResponse({
stream: toAISdkStream(stream, { from: 'agent' }),
})
All callback functions are now available as top-level properties for a cleaner API experience.
const stream = await agent.stream('Tell me a story', {
onFinish: result => {
console.log('Streaming finished:', result)
},
onStepFinish: step => {
console.log('Step completed:', step)
},
onChunk: chunk => {
console.log('Received chunk:', chunk)
},
onError: ({ error }) => {
console.error('Streaming error:', error)
},
onAbort: event => {
console.log('Stream aborted:', event)
},
})
// Process the stream
for await (const chunk of stream.textStream) {
console.log(chunk)
}
import { z } from 'zod'
import { stepCountIs } from 'ai'
await agent.stream('message for agent', {
stopWhen: stepCountIs(3), // Stop after 3 steps
modelSettings: {
temperature: 0.7,
},
memory: {
thread: 'user-123',
resource: 'test-app',
},
toolChoice: 'auto',
// Structured output with better DX
structuredOutput: {
schema: z.object({
sentiment: z.enum(['positive', 'negative', 'neutral']),
confidence: z.number(),
}),
model: 'openai/gpt-5.4',
errorStrategy: 'warn',
},
// Output processors for streaming response validation
outputProcessors: [
new ModerationProcessor({ model: 'openrouter/openai/gpt-oss-safeguard-20b' }),
new BatchPartsProcessor({ maxBatchSize: 3, maxWaitTime: 100 }),
],
})
Opt into OpenAI Responses WebSocket streaming via providerOptions.openai.transport. This only applies to streaming calls and is currently supported for direct OpenAI models (for example, openai/gpt-4o). If WebSocket streaming is unavailable, Mastra falls back to HTTP streaming. By default, Mastra closes the WebSocket when the stream finishes.
const stream = await agent.stream('Hello', {
providerOptions: {
openai: {
transport: 'websocket', // 'websocket' | 'fetch' | 'auto'
websocket: {
url: 'wss://api.openai.com/v1/responses',
closeOnFinish: true, // default
},
},
},
})
To keep the connection open after the stream finishes, set closeOnFinish: false and close it manually.
const stream = await agent.stream('Hello', {
providerOptions: {
openai: {
transport: 'websocket',
websocket: { closeOnFinish: false },
},
},
})
// Later, when you're done with the connection:
stream.transport?.close()