.agents/skills/review-logging-patterns/references/drain-pipeline.md
The drain pipeline wraps any adapter to add batching, retry with backoff, and buffer overflow protection. Use it in production to reduce network overhead and handle transient failures.
setInterval and arrays// server/plugins/evlog-drain.ts
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
export default defineNitroPlugin((nitroApp) => {
const pipeline = createDrainPipeline<DrainContext>()
const drain = pipeline(createAxiomDrain())
nitroApp.hooks.hook('evlog:drain', drain)
nitroApp.hooks.hook('close', () => drain.flush())
})
Important: Always call drain.flush() on server close hook. Without it, buffered events are lost when the process exits.
const pipeline = createDrainPipeline<DrainContext>({
batch: {
size: 50, // Max events per batch (default: 50)
intervalMs: 5000, // Max wait before flushing partial batch (default: 5000)
},
retry: {
maxAttempts: 3, // Total attempts including first (default: 3)
backoff: 'exponential', // 'exponential' | 'linear' | 'fixed' (default: 'exponential')
initialDelayMs: 1000, // Base delay for first retry (default: 1000)
maxDelayMs: 30000, // Upper bound for any retry delay (default: 30000)
},
maxBufferSize: 1000, // Max buffered events; oldest dropped on overflow (default: 1000)
onDropped: (events, error) => {
// Called when events are dropped (overflow or retry exhaustion)
console.error(`[evlog] Dropped ${events.length} events:`, error?.message)
},
})
drain(ctx) pushes a single event into the bufferbuffer.length >= batch.size, the batch is flushed immediatelyintervalMs, whatever is buffered gets flushedT[] (always an array)maxAttempts failures, onDropped is called and the batch is discardedmaxBufferSize, the oldest event is dropped and onDropped is called| Strategy | Delay Pattern | Best For |
|---|---|---|
exponential | 1s, 2s, 4s, 8s... | Default. Transient failures needing recovery time |
linear | 1s, 2s, 3s, 4s... | Predictable delay growth |
fixed | 1s, 1s, 1s, 1s... | Rate-limited APIs with known cooldown |
const drain = pipeline(myDrainFn)
drain(ctx) // Push a single event (synchronous, non-blocking)
await drain.flush() // Force-flush all buffered events
drain.pending // Number of events currently buffered (readonly)
const axiom = createAxiomDrain()
const otlp = createOTLPDrain()
const pipeline = createDrainPipeline<DrainContext>()
const drain = pipeline(async (batch) => {
await Promise.allSettled([axiom(batch), otlp(batch)])
})
const pipeline = createDrainPipeline<DrainContext>({ batch: { size: 100 } })
const drain = pipeline(async (batch) => {
await fetch('https://your-service.com/logs', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(batch.map(ctx => ctx.event)),
})
})
const pipeline = createDrainPipeline<DrainContext>({
batch: { size: 10, intervalMs: 30000 }, // Flush every 30s or 10 events
})
// ❌ No retry, no overflow protection, no flush on shutdown
const batch: WideEvent[] = []
setInterval(() => {
if (batch.length > 0) fetch(...)
}, 5000)
Transform to:
// ✅ Use the pipeline
const pipeline = createDrainPipeline<DrainContext>()
const drain = pipeline(async (batch) => { await fetch(...) })
nitroApp.hooks.hook('close', () => drain.flush())
// ❌ Buffered events lost on process exit
nitroApp.hooks.hook('evlog:drain', drain)
Fix:
// ✅ Always flush on close
nitroApp.hooks.hook('evlog:drain', drain)
nitroApp.hooks.hook('close', () => drain.flush())
drain.flush() called on server close hookonDropped callback logs or reports dropped eventsmaxBufferSize is set to prevent memory leaks under load