Back to Mastra

Reference: BatchPartsProcessor | Processors

docs/src/content/en/reference/processors/batch-parts-processor.mdx

2025-12-182.9 KB
Original Source

BatchPartsProcessor

The BatchPartsProcessor is an output processor that batches multiple stream parts together to reduce the frequency of emissions during streaming. This processor is useful for reducing network overhead, improving user experience by consolidating small text chunks, and optimizing streaming performance by controlling when parts are emitted to the client.

Usage example

typescript
import { BatchPartsProcessor } from '@mastra/core/processors'

const processor = new BatchPartsProcessor({
  batchSize: 5,
  maxWaitTime: 100,
  emitOnNonText: true,
})

Constructor parameters

<PropertiesTable content={[ { name: 'options', type: 'Options', description: 'Configuration options for batching stream parts', isOptional: true, properties: [ { type: 'Options', parameters: [ { name: 'batchSize', type: 'number', description: 'Number of parts to batch together before emitting', isOptional: true, default: '5', }, { name: 'maxWaitTime', type: 'number', description: "Maximum time to wait before emitting a batch (in milliseconds). If set, will emit the current batch even if it hasn't reached batchSize", isOptional: true, default: 'undefined (no timeout)', }, { name: 'emitOnNonText', type: 'boolean', description: 'Whether to emit immediately when a non-text part is encountered', isOptional: true, default: 'true', }, ], }, ], }, ]} />

Returns

<PropertiesTable content={[ { name: 'id', type: 'string', description: "Processor identifier set to 'batch-parts'", isOptional: false, }, { name: 'name', type: 'string', description: 'Optional processor display name', isOptional: true, }, { name: 'processOutputStream', type: '(args: { part: ChunkType; streamParts: ChunkType[]; state: Record<string, any>; abort: (reason?: string) => never }) => Promise<ChunkType | null>', description: 'Processes streaming output parts to batch them together', isOptional: false, }, { name: 'flush', type: '(state?: BatchPartsState) => ChunkType | null', description: 'Force flush any remaining batched parts when the stream ends', isOptional: false, }, ]} />

Extended usage example

typescript
import { Agent } from '@mastra/core/agent'
import { BatchPartsProcessor } from '@mastra/core/processors'

export const agent = new Agent({
  name: 'batched-agent',
  instructions: 'You are a helpful assistant',
  model: 'openai/gpt-5.4',
  outputProcessors: [
    new BatchPartsProcessor({
      batchSize: 5,
      maxWaitTime: 100,
      emitOnNonText: true,
    }),
  ],
})