.claude-plugin/plugins/redpanda-connect/skills/pipeline-assistant/resources/recipes/content-based-router.md
Pattern: Kafka Patterns - Content-Based Routing Difficulty: Basic Components: kafka_franz (input/output), mapping Use Case: Route Kafka messages to different topics based on message content fields
The Content-Based Router pattern dynamically routes messages to various destinations based on message content. This recipe shows how to filter Kafka messages by examining payload fields and routing only matching messages to the output topic, while preserving partition keys, timestamps, and headers for ordering guarantees.
See content-based-router.yaml for the complete configuration.
Messages are examined using Bloblang to check specific fields:
if (this.marketid == "nyse") {
root = this
} else {
root = deleted() # Filter out non-matching messages
}
Only messages matching the condition are forwarded; others are silently dropped.
Kafka-specific metadata is preserved through the pipeline:
This is critical for maintaining ordering guarantees in distributed systems.
The output uses partitioner: "manual" to explicitly control which partition messages go to:
partitioner: "manual"
partition: "${!metadata(\"kafka_partition\")}"
This ensures messages maintain their source partition assignment.
${KAFKA_BROKER})max_in_flight: 256 - High parallelism for throughputidempotent_write: true - Prevents duplicatesbroker_write_max_bytes: 100MiB - Handles large messagesauto_replay_nacks: true retries failed messages# Set environment variables
export KAFKA_BROKER=localhost:9092
export SOURCE_TOPIC=test_in
export DEST_TOPIC=topic_a
export CONSUMER_GROUP=test_cg
# Run the pipeline
rpk connect run content-based-router.yaml
# Produce test messages
echo '{"marketid":"nyse","symbol":"AAPL","price":150}' | rpk topic produce $SOURCE_TOPIC
echo '{"marketid":"nasdaq","symbol":"MSFT","price":300}' | rpk topic produce $SOURCE_TOPIC
echo '{"marketid":"nyse","symbol":"GOOGL","price":2800}' | rpk topic produce $SOURCE_TOPIC
# Check output topic (only NYSE messages should appear)
rpk topic consume $DEST_TOPIC
Multiple Destinations:
Replace the filter processor with a switch output to route to different topics:
output:
switch:
cases:
- check: 'json("marketid") == "nyse"'
output:
kafka_franz:
topic: topic_nyse
- check: 'json("marketid") == "nasdaq"'
output:
kafka_franz:
topic: topic_nasdaq