pip/pip-401.md
Pulsar Functions and Sources enable the batching feature hard-coded, and also set the batchingMaxPublishDelay to 10ms, it only
supports set the batch-builder for now, this is not suitable for all the use cases, and also not feasible for users.
Support setting batching configurations for Pulsar Functions&Sources, to make it more flexible and suitable for users.
Make users able to enable&disable batching and set batching configurations for Pulsar Functions&Sources.
BatchingSpec with below fields in Function.proto, and add it as a new filed batchingSpec to the ProducerSpec message
bool enabledint32 batchingMaxPublishDelayMsint32 roundRobinRouterBatchingPartitionSwitchFrequencyint32 batchingMaxMessagesint32 batchingMaxBytesstring batchBuilderBatchingConfig with below fields and add it as a new field batchingConfig to the ProducerConfig:
bool enabledint batchingMaxPublishDelayMsint roundRobinRouterBatchingPartitionSwitchFrequencyint batchingMaxMessagesint batchingMaxBytesString batchBuilderAnd related logic also will be added:
batchingSpec field of the ProducerSpec from FunctionDetails to the batchingConfig field of the ProducerConfig and vice versaTo keep the compatibility, when the batchingSpec of the ProducerSpec is null when creating the ProducerConfig from the ProducerSpec,
the batchingConfig field will be fallback to: BatchingConfig(enabled=true, batchingMaxPublishDelayMs=10).
After the changes, users can pass the batching configurations when creating the functions and sources, like below using CLI arguments:
./bin/pulsar-admin functions create \
--tenant public \
--namespace default \
--name test-java \
--className org.apache.pulsar.functions.api.examples.ExclamationFunction \
--inputs persistent://public/default/test-java-input \
--producer-config '{"batchingConfig": {"enabled": true, "batchingMaxPublishDelayMs": 100, "roundRobinRouterBatchingPartitionSwitchFrequency": 10, "batchingMaxMessages": 1000}}' \
--jar /pulsar/examples/api-examples.jar
./bin/pulsar-admin sources create \
--name data-generator-source \
--source-type data-generator \
--destination-topic-name persistent://public/default/data-source-topic \
--producer-config '{"batchingConfig": {"enabled": true, "batchingMaxPublishDelayMs": 100, "roundRobinRouterBatchingPartitionSwitchFrequency": 10, "batchingMaxMessages": 1000}}' \
--source-config '{"sleepBetweenMessages": "1000"}'
Users can also use the function config file to set the batching configs for functions:
tenant: "public"
namespace: "default"
name: "test-java"
jar: "/pulsar/examples/api-examples.jar"
className: "org.apache.pulsar.functions.api.examples.ExclamationFunction"
inputs: ["persistent://public/default/test-java-input"]
output: "persistent://public/default/test-java-output"
autoAck: true
parallelism: 1
producerConfig:
batchingConfig:
enabled: true
batchingMaxPublishDelayMs: 100
roundRobinRouterBatchingPartitionSwitchFrequency: 10
batchingMaxMessages: 1000
And use source config file to set the batching configs for sources:
tenant: "public"
namespace: "default"
name: "data-generator-source"
topicName: "persistent://public/default/data-source-topic"
archive: "builtin://data-generator"
parallelism: 1
configs:
sleepBetweenMessages: "5000"
producerConfig:
batchingConfig:
enabled: true
batchingMaxPublishDelayMs: 100
roundRobinRouterBatchingPartitionSwitchFrequency: 10
batchingMaxMessages: 1000
No changes are needed to revert to the previous version.
No other changes are needed to upgrade to the new version.
None