.agents/skills/apm-integrations/references/async-iterator-pattern.md
CRITICAL: If you are working with async iterators or async generators (methods like stream(), *generate(), or anything returning Promise<AsyncIterable>), you MUST read and follow this entire document. The AsyncIterator pattern requires TWO plugins and has specific implementation requirements.
Use kind: 'AsyncIterator' in your Orchestrion config when the target method:
Promise<AsyncIterable<T>>Promise<AsyncIterableIterator<T>>Promise<IterableReadableStream<T>>async *methodName()Examples:
// These ALL need kind: 'AsyncIterator'
async stream(input) { /* returns Promise<AsyncIterable> */ }
async *generate() { /* async generator */ }
async getStream() { /* returns Promise<ReadableStream> */ }
When kind: 'AsyncIterator' is used, Orchestrion automatically creates TWO channels:
Base channel: tracing:orchestrion:{package}:{channelName}:*
Next channel: tracing:orchestrion:{package}:{channelName}_next:*
next() call)result.done === trueYou MUST create TWO plugins to handle both channels. See the complete LangGraph example below for the full implementation pattern.
channelName from config exactly as-is_next to channelName_nextstatic idbindStart(): Creates span via this.startSpan()bindStart(): Returns inherited store (NO new span)asyncEnd(): Finishes span ONLY when ctx.result.done === trueerror(): Finishes span immediately on errorBoth plugins MUST be:
module.exports = [StreamPlugin, NextStreamPlugin]// packages/datadog-instrumentations/src/helpers/rewriter/instrumentations/langgraph.js
module.exports = [
{
module: {
name: '@langchain/langgraph',
versionRange: '>=1.2.0',
filePath: 'dist/pregel/index.js'
},
functionQuery: {
methodName: 'stream',
className: 'Pregel',
kind: 'AsyncIterator' // ← Critical
},
channelName: 'Pregel_stream'
}
]
// packages/datadog-plugin-langchain-langgraph/src/tracing.js
const { TracingPlugin } = require('../../dd-trace/src/plugins/tracing')
class StreamPlugin extends TracingPlugin {
static id = 'langgraph'
static prefix = 'tracing:orchestrion:@langchain/langgraph:Pregel_stream'
bindStart (ctx) {
const input = ctx.arguments?.[0]
this.startSpan('langgraph.stream', {
service: this.config.service,
kind: 'internal',
component: 'langgraph',
meta: {
'langgraph.input': JSON.stringify(input)
}
}, ctx)
return ctx.currentStore
}
}
class NextStreamPlugin extends StreamPlugin {
static id = 'langgraph'
static prefix = 'tracing:orchestrion:@langchain/langgraph:Pregel_stream_next'
bindStart (ctx) {
return ctx.currentStore // Inherit span from StreamPlugin
}
asyncEnd (ctx) {
const span = ctx.currentStore?.span
if (!span) return
if (ctx.result.done === true) {
span.setTag('langgraph.chunks', ctx.result.value?.length || 0)
span.finish()
}
}
error (ctx) {
const span = ctx.currentStore?.span
if (span) {
this.addError(ctx?.error, span)
span.finish()
}
}
}
module.exports = [StreamPlugin, NextStreamPlugin]
When testing AsyncIterator instrumentation:
it('should trace stream() method with AsyncIterator', async () => {
const result = await myLib.stream(input)
// Iterate through results
const chunks = []
for await (const chunk of result) {
chunks.push(chunk)
}
// Verify span exists and finished
await agent.assertSomeTraces(traces => {
const span = traces[0][0]
expect(span.name).to.equal('mylib.stream')
expect(span.meta.component).to.equal('mylib')
// Span should be complete after iteration finishes
})
})
When implementing AsyncIterator instrumentation:
kind: 'AsyncIterator'_next suffixstatic idbindStart()bindStart()result.done === true before finishing span