Back to Langfuse

Queue Consumers

.agents/skills/datadog-query-recipes/references/queue-consumers.md

3.174.110.5 KB
Original Source

Queue Consumers

Use this reference when a task asks which Langfuse queues exist, whether a consumer is running, how much work a queue has, or how to query queue processor spans.

Source Of Truth

  • Queue names and job names: packages/shared/src/server/queues.ts (QueueName, QueueJobs).
  • Queue producer classes and shard naming: packages/shared/src/server/redis/*.ts.
  • Worker consumer registration and feature gates: worker/src/app.ts.
  • Worker consumer env vars: worker/src/env.ts (QUEUE_CONSUMER_*_IS_ENABLED plus feature-specific gates).
  • Worker registration, request/error counters, wait/processing time, and sampled old-style depth metrics: worker/src/queues/workerManager.ts.
  • Queue depth background reporter: worker/src/features/queue-metrics-runner/index.ts.
  • Metric name conversion: packages/shared/src/server/instrumentation/index.ts (convertQueueNameToMetricName).
  • Sharded queue registry: worker/src/queues/shardedQueueRegistry.ts.
  • BullMQ tracing setup: worker/src/instrumentation.ts (BullMQInstrumentation).

Queue Inventory

Current QueueName values:

QueueNotes
trace-upsertSharded. Registers all TraceUpsertQueue shards.
trace-deleteDelete traces from storage.
project-deleteProject deletion cleanup.
evaluation-execution-queueSharded eval execution.
secondary-evaluation-execution-queueSharded secondary eval execution.
llm-as-a-judge-execution-queueSharded observation-based eval execution.
dataset-run-item-upsert-queueDataset run item upserts.
batch-export-queueBatch exports.
otel-ingestion-queueSharded OTel ingestion.
secondary-otel-ingestion-queueSharded secondary OTel ingestion.
ingestion-queueSharded single-event ingestion.
secondary-ingestion-queueSharded secondary single-event ingestion.
cloud-usage-metering-queueCloud-only, Stripe-gated.
cloud-spend-alert-queueCloud-only, Stripe-gated.
cloud-free-tier-usage-threshold-queueCloud-only, Stripe-gated.
experiment-create-queueExperiment creation.
posthog-integration-queueSchedules PostHog integration jobs.
posthog-integration-processing-queueProcesses PostHog projects.
mixpanel-integration-queueSchedules Mixpanel integration jobs.
mixpanel-integration-processing-queueProcesses Mixpanel projects.
blobstorage-integration-queueSchedules blob storage jobs.
blobstorage-integration-processing-queueProcesses blob storage projects.
core-data-s3-export-queueCloud export feature gate.
metering-data-postgres-export-queueCloud export feature gate.
data-retention-queueSchedules data retention jobs.
data-retention-processing-queueProcesses data retention projects.
batch-action-queueBatch actions.
create-eval-queueEval job creation.
score-deleteScore deletion cleanup.
dataset-delete-queueDataset deletion cleanup.
dead-letter-retry-queueDead letter retry worker.
webhook-queueWebhook delivery.
entity-change-queueEntity change propagation.
event-propagation-queueExperiment event propagation gate.
notification-queueNotifications.

Sharded queues use the base queue for shard 0 and append -1, -2, etc. for additional shards. The sharded base queues are:

  • trace-upsert
  • evaluation-execution-queue
  • secondary-evaluation-execution-queue
  • llm-as-a-judge-execution-queue
  • otel-ingestion-queue
  • secondary-otel-ingestion-queue
  • ingestion-queue
  • secondary-ingestion-queue

Consumer Gates

Consumer registration is in worker/src/app.ts. Some gates register multiple queues or every shard for a sharded queue.

GateQueues registered
QUEUE_CONSUMER_TRACE_UPSERT_QUEUE_IS_ENABLEDtrace-upsert shards
QUEUE_CONSUMER_CREATE_EVAL_QUEUE_IS_ENABLEDcreate-eval-queue
LANGFUSE_S3_CORE_DATA_EXPORT_IS_ENABLEDcore-data-s3-export-queue
LANGFUSE_POSTGRES_METERING_DATA_EXPORT_IS_ENABLEDmetering-data-postgres-export-queue
QUEUE_CONSUMER_TRACE_DELETE_QUEUE_IS_ENABLEDtrace-delete
QUEUE_CONSUMER_SCORE_DELETE_QUEUE_IS_ENABLEDscore-delete
QUEUE_CONSUMER_DATASET_DELETE_QUEUE_IS_ENABLEDdataset-delete-queue
QUEUE_CONSUMER_PROJECT_DELETE_QUEUE_IS_ENABLEDproject-delete
QUEUE_CONSUMER_DATASET_RUN_ITEM_UPSERT_QUEUE_IS_ENABLEDdataset-run-item-upsert-queue
QUEUE_CONSUMER_EVAL_EXECUTION_QUEUE_IS_ENABLEDevaluation-execution-queue shards, llm-as-a-judge-execution-queue shards
QUEUE_CONSUMER_EVAL_EXECUTION_SECONDARY_QUEUE_IS_ENABLEDsecondary-evaluation-execution-queue shards
QUEUE_CONSUMER_BATCH_EXPORT_QUEUE_IS_ENABLEDbatch-export-queue
QUEUE_CONSUMER_BATCH_ACTION_QUEUE_IS_ENABLEDbatch-action-queue
QUEUE_CONSUMER_OTEL_INGESTION_QUEUE_IS_ENABLEDotel-ingestion-queue shards
QUEUE_CONSUMER_OTEL_INGESTION_SECONDARY_QUEUE_IS_ENABLEDsecondary-otel-ingestion-queue shards
QUEUE_CONSUMER_INGESTION_QUEUE_IS_ENABLEDingestion-queue shards
QUEUE_CONSUMER_INGESTION_SECONDARY_QUEUE_IS_ENABLEDsecondary-ingestion-queue shards
QUEUE_CONSUMER_CLOUD_USAGE_METERING_QUEUE_IS_ENABLED plus STRIPE_SECRET_KEYcloud-usage-metering-queue
QUEUE_CONSUMER_CLOUD_SPEND_ALERT_QUEUE_IS_ENABLED plus STRIPE_SECRET_KEYcloud-spend-alert-queue
QUEUE_CONSUMER_FREE_TIER_USAGE_THRESHOLD_QUEUE_IS_ENABLED plus cloud region and Stripe gatescloud-free-tier-usage-threshold-queue
QUEUE_CONSUMER_EXPERIMENT_CREATE_QUEUE_IS_ENABLEDexperiment-create-queue
QUEUE_CONSUMER_POSTHOG_INTEGRATION_QUEUE_IS_ENABLEDposthog-integration-queue, posthog-integration-processing-queue
QUEUE_CONSUMER_MIXPANEL_INTEGRATION_QUEUE_IS_ENABLEDmixpanel-integration-queue, mixpanel-integration-processing-queue
QUEUE_CONSUMER_BLOB_STORAGE_INTEGRATION_QUEUE_IS_ENABLEDblobstorage-integration-queue, blobstorage-integration-processing-queue
QUEUE_CONSUMER_DATA_RETENTION_QUEUE_IS_ENABLEDdata-retention-queue, data-retention-processing-queue
QUEUE_CONSUMER_DEAD_LETTER_RETRY_QUEUE_IS_ENABLEDdead-letter-retry-queue
QUEUE_CONSUMER_WEBHOOK_QUEUE_IS_ENABLEDwebhook-queue
QUEUE_CONSUMER_ENTITY_CHANGE_QUEUE_IS_ENABLEDentity-change-queue
QUEUE_CONSUMER_EVENT_PROPAGATION_QUEUE_IS_ENABLED plus events-table experiment gateevent-propagation-queue
QUEUE_CONSUMER_NOTIFICATION_QUEUE_IS_ENABLEDnotification-queue

Query Consumer Spans

Start with aggregate spans on worker services:

text
env:<env> (service:worker OR service:worker-cpu) operation_name:bullmq.process

Then group by resource_name, queue facets such as bullmq.queue or messaging.*, and error fields. Facet names can differ between Datadog sites, so inspect one sample span before relying on a specific facet.

Queue-specific starter query:

text
env:<env> (service:worker OR service:worker-cpu) operation_name:bullmq.process (resource_name:"process otel-ingestion-queue" OR resource_name:"Worker.run otel-ingestion-queue" OR bullmq.queue:otel-ingestion-queue)

For sharded queues, query the base queue and shard suffixes:

text
env:<env> (service:worker OR service:worker-cpu) operation_name:bullmq.process resource_name:"*otel-ingestion-queue*"

If a queue file wraps the handler with instrumentAsync, also search the domain-specific resource name. Examples:

SubsystemResource name
PostHog project processingprocess posthog-integration-project
Mixpanel project processingprocess mixpanel-integration-project
Blob storage project processingprocess blob-storage-project
Data retention project processingprocess data-retention-project
Event propagationprocess event-propagation
Cloud usage meteringprocess cloud-usage-metering
Free-tier usage thresholdprocess cloud-free-tier-usage-threshold

Useful aggregations:

  • Count by env, service, and resource_name.
  • Count by queue facet and status.
  • Count by error.type and error.message.
  • p50, p95, and p99 duration by queue or shard.
  • Count by messaging.bullmq.job.input.projectId when the processor attaches project IDs to the current span.

Query Queue Metrics

Metric base names come from convertQueueNameToMetricName(queueName):

text
langfuse.queue.<queue-name-with-hyphens-replaced-by-underscores-and-trailing-_queue-removed>

Examples:

QueueMetric base
ingestion-queuelangfuse.queue.ingestion
otel-ingestion-queuelangfuse.queue.otel_ingestion
secondary-otel-ingestion-queuelangfuse.queue.secondary_otel_ingestion
evaluation-execution-queuelangfuse.queue.evaluation_execution
trace-upsertlangfuse.queue.trace_upsert
batch-export-queuelangfuse.queue.batch_export

Prefer the newer tagged metrics:

text
<metric_base>.depth{env:<env>,type:waiting}
<metric_base>.depth{env:<env>,type:failed}
<metric_base>.depth{env:<env>,type:active}
<metric_base>.rate{env:<env>,type:request}
<metric_base>.rate{env:<env>,type:failed}
<metric_base>.rate{env:<env>,type:error}
<metric_base>.time{env:<env>,type:wait}
<metric_base>.time{env:<env>,type:processing}

For sharded queues, use the shard tag when present. shard:all is emitted by the depth runner for aggregate depth across shards.

Backward-compatible metrics may still appear:

text
<metric_base>.length
<metric_base>.dlq_length
<metric_base>.active
<metric_base>.request
<metric_base>.failed
<metric_base>.error
<metric_base>.wait_time
<metric_base>.processing_time

For non-BullMQ internal write buffering, ClickhouseWriter emits langfuse.queue.clickhouse_writer.* metrics, but it is not a QueueName consumer.

Consumer Running Checklist

To establish whether a consumer is running in production:

  1. Check queue depth metrics for waiting, failed, and active counts.
  2. Check rate{type:request} or old .request metrics for recent processing.
  3. Search BullMQ processor spans on worker and worker-cpu.
  4. Search worker logs for the queue name or processor-specific log prefix.
  5. If all signals are empty, verify the relevant QUEUE_CONSUMER_*_IS_ENABLED gate and any feature-specific gates in worker/src/app.ts.

The queue metrics runner only polls queues with registered workers. Missing depth metrics can mean the consumer is not registered on that worker, queue metrics are disabled, or the data is on the other Datadog site.