.claude-plugin/plugins/redpanda-connect/skills/pipeline-assistant/resources/recipes/stateful-counter.md
Pattern: Stateful Processing - Counter with Threshold Difficulty: Intermediate Components: stdin, cache, mapping, switch Use Case: Track error counts in memory and implement circuit breaker pattern to stop pipeline when threshold is exceeded
This recipe demonstrates stateful counting using an in-memory cache. The pattern tracks JSON validation errors and implements a circuit breaker that stops the pipeline when errors exceed a threshold. This is useful for building resilient pipelines that fail-fast when data quality degrades.
See stateful-counter.yaml for the complete configuration.
The cache resource maintains state across messages:
cache_resources:
- label: error_cache
memory:
compaction_interval: '' # Never expire
init_values:
error_count: 0 # Initialize counter
State persists for the pipeline's lifetime but is lost on restart.
The counter is updated using three cache operations:
Using the branch processor ensures these operations are atomic within the branch.
After updating the counter, check if threshold is exceeded:
- check: json("error_count") > 3
processors:
- crash: 'Pipeline failed due to error threshold'
This implements fail-fast behavior when data quality is poor.
The branch processor runs operations without affecting the main message:
# Run the pipeline
rpk connect run stateful-counter.yaml
# Send valid JSON (should pass)
echo '{"test":"valid"}' | rpk connect run stateful-counter.yaml
# Send invalid JSON (increments counter)
echo 'invalid' | rpk connect run stateful-counter.yaml
echo '{broken' | rpk connect run stateful-counter.yaml
echo 'nope' | rpk connect run stateful-counter.yaml
# Fourth error should trigger circuit breaker and crash pipeline
echo 'error4' | rpk connect run stateful-counter.yaml
# Pipeline stops with: "Pipeline failed due to error threshold"
Persistent Counter with Redis:
cache_resources:
- label: error_cache
redis:
url: ${REDIS_URL}
default_ttl: "24h"
Per-Topic Counters:
- cache:
resource: error_cache
operator: get
key: ${!metadata("kafka_topic")}_error_count
Windowed Counters:
cache_resources:
- label: error_cache
memory:
compaction_interval: "1h" # Reset hourly