Back to Posthog

Pipeline Doctor

.agents/skills/ingestion-pipeline-doctor-nodejs/SKILL.md

1.43.12.9 KB
Original Source

Pipeline Doctor

Quick reference for PostHog's ingestion pipeline framework and its convention-checking agents.

Architecture overview

The ingestion pipeline processes events through a typed, composable step chain:

text
Kafka message
  → messageAware()
    → parse headers/body
    → sequentially() for preprocessing
    → filterMap() to enrich context (e.g., team lookup)
    → teamAware()
      → groupBy(token:distinctId)
        → concurrently() for per-entity processing
      → gather()
      → pipeBatch() for batch operations
      → handleIngestionWarnings()
    → handleResults()
  → handleSideEffects()
  → build()

See nodejs/src/ingestion/analytics/joined-ingestion-pipeline.ts for the real implementation.

Key file locations

WhatWhere
Step typenodejs/src/ingestion/pipelines/steps.ts
Result typesnodejs/src/ingestion/pipelines/results.ts
Doc-test chaptersnodejs/src/ingestion/pipelines/docs/*.test.ts
Joined pipelinenodejs/src/ingestion/analytics/joined-ingestion-pipeline.ts
Doctor agents.claude/agents/ingestion/
Test helpersnodejs/src/ingestion/pipelines/docs/helpers.ts

Which agent to use

ConcernAgentWhen to use
Step structurepipeline-step-doctorFactory pattern, type extension, config injection, naming
Result handlingpipeline-result-doctorok/dlq/drop/redirect, side effects, ingestion warnings
Compositionpipeline-composition-doctorBuilder chain, concurrency, grouping, branching, retries
Testingpipeline-testing-doctorTest helpers, assertions, fake timers, doc-test style

Quick convention reference

Steps: Factory function returning a named inner function. Generic <T extends Input> for type extension. No any. Config via closure.

Results: Use ok(), dlq(), drop(), redirect() constructors. Side effects as promises in ok(value, [effects]). Warnings as third parameter.

Composition: messageAware wraps the pipeline. handleResults inside messageAware. handleSideEffects after. groupBy + concurrently for per-entity work. gather before batch steps.

Testing: Step tests call factory directly. Use consumeAll()/collectBatches() helpers. Fake timers for async. Type guards for result assertions. No any.

Running all doctors

Ask Claude to "run all pipeline doctors on my recent changes" to get a comprehensive review across all 4 concern areas.