.claude-plugin/plugins/redpanda-connect/skills/pipeline-assistant/resources/recipes/dlq-basic.md
Pattern: Error Handling - Dead Letter Queue (DLQ) Difficulty: Basic Components: stdin, file, switch, mapping, log Use Case: Route invalid or malformed messages to a dead letter queue for later analysis
This recipe demonstrates the fundamental Dead Letter Queue (DLQ) pattern for handling invalid messages. Messages are validated for JSON format, and those that fail validation are written to a separate file (the DLQ) instead of causing pipeline failures. This pattern is essential for building resilient data pipelines that can handle malformed data gracefully.
See dlq-basic.yaml for the complete configuration.
The pipeline validates each message and sets metadata flags to track validation status:
@json_error = true - Message failed validation@json_error = false - Message passed validationThe switch output component routes messages based on the @json_error metadata:
Invalid messages are written to a file (json_error_dlq.txt) for later processing:
The pipeline maintains a counter of invalid messages in an in-memory cache:
# Run the pipeline
rpk connect run dlq-basic.yaml
# Test with valid JSON
echo '{"name":"John","age":30}' | rpk connect run dlq-basic.yaml
# Test with invalid JSON (will go to DLQ)
echo 'not valid json' | rpk connect run dlq-basic.yaml
echo '{"incomplete":' | rpk connect run dlq-basic.yaml
# Check DLQ file
cat json_error_dlq.txt
Handle AVRO schema validation and encoding errors:
pipeline:
processors:
- mapping: |
# Try AVRO encoding with schema
let result = this.encode("avro", schema_id: "${SCHEMA_ID}").catch(null)
if $result == null {
meta avro_error = true
meta error_text = "AVRO encoding failed: " + error()
meta origin_value = content().string()
} else {
root = $result
meta avro_error = false
}
output:
switch:
cases:
- check: "@avro_error"
output:
file:
path: ./avro_error_dlq.txt
Catch errors from any processor and route to DLQ:
pipeline:
processors:
- try:
- http:
url: https://api.example.com
verb: POST
catch:
- mapping: |
meta processor_error = true
meta error_text = "HTTP request failed: " + error()
meta origin_value = content().string()
All processor errors are automatically routed to DLQ.
Add configurable error limits with tolerance:
cache_resources:
- label: error_cache
memory:
init_values:
error_count: 0
error_threshold: 100 # Stop after 100 errors
error_tolerance_percent: 5 # Or 5% error rate
pipeline:
processors:
- switch:
- check: 'json("error_count") > json("error_threshold")'
processors:
- log:
level: ERROR
message: "Error threshold exceeded, stopping pipeline"
- crash: 'Too many errors'
This implements both absolute and percentage-based error tolerance.