.claude-plugin/plugins/redpanda-connect/skills/pipeline-assistant/resources/recipes/custom-metrics.md
Pattern: Monitoring - Custom Metrics Difficulty: Basic Components: stdin, metric processor, prometheus Use Case: Emit custom application metrics to Prometheus for monitoring and alerting
This recipe demonstrates how to add custom Prometheus metrics to your Redpanda Connect pipelines. The example tracks JSON validation errors as a counter metric, which can be scraped by Prometheus and used for alerting. This pattern is essential for building observable data pipelines.
See custom-metrics.yaml for the complete configuration.
The metric processor emits metrics during message processing:
- metric:
type: counter_by
name: json_error_count
value: 1
labels:
pipeline: "json_validation"
error_type: "invalid_json"
counter_by increments by the specified valueThe metrics section configures how metrics are exposed:
metrics:
prometheus: {} # Default HTTP endpoint on :4195/stats
mapping: |
# Filter which metrics to expose
if this != "json_error_count" { deleted() }
The mapping filters internal metrics, exposing only custom ones.
Redpanda Connect supports multiple metric types:
counter - Monotonically increasing (e.g., total messages)counter_by - Increment by valuegauge - Current value (e.g., queue depth)timing - Duration tracking# Run the pipeline
rpk connect run custom-metrics.yaml
# In another terminal, send test data
echo '{"valid":"json"}' | nc localhost 8080
echo 'invalid json' | nc localhost 8080
echo '{"more":"data"}' | nc localhost 8080
# Check metrics endpoint
curl -s http://localhost:4195/stats | grep json_error_count
# Expected output (after one error):
# json_error_count{error_type="invalid_json",label="emit_error_metric",path="root.pipeline.processors.1",pipeline="json_validation"} 1
Gauge Metric (Current Value):
- metric:
type: gauge
name: queue_depth
value: ${!json("queue_size")}
Timing Metric (Duration):
- metric:
type: timing
name: processing_duration_ms
value: ${!json("duration")}
Dynamic Labels:
- metric:
type: counter_by
name: messages_by_topic
value: 1
labels:
topic: ${!metadata("kafka_topic")}
For distributed deployments with multiple pipeline instances:
- metric:
type: counter_by
name: messages_processed
value: 1
labels:
instance_id: "${HOSTNAME}"
stream_id: "${STREAM_ID}"
pipeline: "production"
metrics:
prometheus:
push_url: "http://pushgateway:9091"
push_interval: "10s"
push_job_name: "redpanda_connect"
This enables:
Track pipeline health with multiple metric types:
pipeline:
processors:
# Track throughput
- metric:
type: counter_by
name: messages_total
value: 1
# Track processing time
- metric:
type: timing
name: processing_latency_ms
value: ${!timestamp_unix_milli() - json("timestamp")}
# Track queue depth
- metric:
type: gauge
name: backlog_size
value: ${!json("queue_size")}
# Track error rate
- switch:
- check: meta("error")
processors:
- metric:
type: counter_by
name: errors_total
value: 1
labels:
error_type: ${!meta("error_type")}
Combine multiple metrics for comprehensive observability.