pip/pip-439.md
Apache Pulsar transactions enable atomic operations across multiple topics, allowing producers to send messages and consumers to acknowledge messages as a single unit of work. This provides the foundation for exactly-once processing semantics in streaming applications.
Pulsar's transaction system consists of four key components:
Transaction Coordinator (TC): A broker module that manages transaction lifecycles, allocates transaction IDs, and orchestrates the commit/abort process.
Transaction Log: A persistent topic storing transaction metadata and state changes, enabling recovery after failures.
Transaction Buffer: Temporarily stores messages produced within transactions, making them visible to consumers only after commit.
Pending Acknowledge State: Tracks message acknowledgments within transactions, preventing conflicts between competing transactions.
Transactions follow a defined lifecycle:
Pulsar transactions provide:
Pulsar Functions is a lightweight compute framework integrated with Apache Pulsar that enables stream processing without managing infrastructure. Key characteristics include:
Functions operate on a per-message basis, making them ideal for implementing stream processing with exactly-once semantics when combined with transactions.
Currently, Pulsar Functions cannot publish to multiple topics transactionally, which is a significant limitation for use cases requiring atomic multi-topic publishing. For instance, if a function processes an input message and needs to publish related updates to several output topics, there's no guarantee that all operations will succeed atomically.
This limitation prevents building robust stream processing applications that require exactly-once semantics across multiple input and output topics. Without transaction support in Functions, developers must implement their own error handling and retry mechanisms, which can be complex and error-prone.
Adding transaction support to Pulsar Functions would finally ensure message processing atomicity.
The proposed solution introduces managed transaction wrapping for Pulsar Functions through configuration settings. When enabled, each function execution will be automatically wrapped in a transaction without requiring code changes to the function implementation.
The general flow will be:
transactionMode: MANAGEDThis approach provides transaction support in a way that is transparent to function implementers, requiring only configuration changes rather than code changes.
We will update the FunctionConfig to include transaction-related settings through a new TransactionConfig class:
public enum TransactionMode {
OFF,
MANAGED
}
public class TransactionConfig {
private TransactionMode transactionMode = TransactionMode.OFF;
private Long transactionTimeoutMs = 60000L;
private Integer transactionBatchingMaxEntries = 1;
private Long transactionBatchingQuietPeriodMs = 100L;
// Getters and setters...
}
public class FunctionConfig {
// Existing fields...
private TransactionConfig transaction = new TransactionConfig();
// Getter and setter ...
}
We also need to update the protobuf definition for FunctionDetails to include these fields:
message TransactionSpec {
enum TransactionMode {
OFF = 0;
MANAGED = 1;
}
TransactionMode transactionMode = 1;
int64 transactionTimeoutMs = 2;
int64 transactionBatchingMaxEntries = 3;
int64 transactionBatchingQuietPeriodMs = 4;
}
message FunctionDetails {
// Other existing fields...
TransactionSpec transaction = 24;
}
class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable {
// Existing fields...
// Finds the proper transaction to tie to current function execution (sync/async)
private Transaction getManagedTransaction() {
// implementation...
}
// Existing methods...
public void setCurrentTransaction(Transaction transaction) {
this.currentTransaction = transaction;
}
@Override
public <T> TypedMessageBuilder<T> newOutputMessage(String topicName, Schema<T> schema)
throws PulsarClientException {
MessageBuilderImpl<T> messageBuilder = new MessageBuilderImpl<>();
TypedMessageBuilder<T> typedMessageBuilder;
Producer<T> producer = getProducer(topicName, schema);
Transaction managedTransaction = getManagedTransaction();
if (currentTransaction != null) {
if (schema != null) {
// Uses the new API that supports both schema and transaction
typedMessageBuilder = producer.newMessage(schema, managedTransaction);
} else {
typedMessageBuilder = producer.newMessage(managedTransaction);
}
} else if (schema != null) {
typedMessageBuilder = producer.newMessage(schema);
} else {
typedMessageBuilder = producer.newMessage();
}
messageBuilder.setUnderlyingBuilder(typedMessageBuilder);
return messageBuilder;
}
}
It's important to note that Pulsar Functions supports asynchronous processing, where functions can return CompletableFuture objects. This proposal ensures that transaction support works seamlessly with both synchronous and asynchronous functions.
For asynchronous functions:
CompletableFuture, the transaction is maintained until the future completesTo optimize performance and reduce the overhead on the Transaction Coordinator, this proposal introduces transaction batching. Transaction batching allows multiple incoming messages to be processed within the same transaction, reducing the total number of transactions created.
Transaction batching is distinct from Pulsar's message batching. While message batching combines multiple messages into a single "batch message" for efficient network transfer, transaction batching processes multiple incoming messages (or batch messages) within the scope of a single transaction.
Key benefits of transaction batching include:
Transaction batching is controlled by two main parameters:
transactionBatchingMaxEntries: The maximum number of entries (incoming messages or batch messages) to process within a single
transaction before committing it and starting a new one.transactionBatchingQuietPeriodMs: The maximum amount of time to wait for additional messages before committing a transaction if
transactionBatchingMaxEntries is not reached.batchingMaxPublishDelay value)Transaction support will be configured using the new TransactionConfig class:
TransactionConfig txnConfig = new TransactionConfig();
txnConfig.setTransactionMode(TransactionMode.MANAGED);
txnConfig.setTransactionTimeoutMs(30000L); // 30 seconds
txnConfig.setTransactionBatchingMaxEntries(10); // Process up to 10 entries per transaction
txnConfig.setTransactionBatchingQuietPeriodMs(1L); // Commit after 1ms of inactivity
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setTransaction(txnConfig);
// Other configuration...
The Functions worker configuration will include a setting to en-/disable pulsar_function_txn_latency:
# Transaction metrics configuration
transactionMetrics:
# Enable/disable specific transaction metrics individually
txnLatency: false # Transaction duration histograms (high cardinality)
# ... disable any future metric deemed as too expensive to enable per default
The Pulsar Admin CLI will be updated to support these new configuration options:
$ pulsar-admin functions create \
--auto-transactions-enabled true \
--transaction-timeout-ms 30000 \
--transaction-batching-max-entries 10 \
--transaction-batching-quiet-period-ms 1 \
... other options ...
Similarly, the CLI will support updating these options with the update command.
The following new metrics will be added to track transaction usage in functions:
pulsar_function_txn_created_total: Counter tracking the total number of transactions created by functionstenant, namespace, name (function name), instance_id, clusterpulsar_function_txn_committed_total: Counter tracking successfully committed transactionstenant, namespace, name (function name), instance_id, clusterpulsar_function_txn_aborted_total: Counter tracking aborted transactionstenant, namespace, name (function name), instance_id, clusterpulsar_function_txn_timeout_total: Counter tracking transactions that timed outtenant, namespace, name (function name), instance_id, clusterpulsar_function_txn_latency: Histogram of transaction duration from creation to commit/abort (configurable in functions worker)tenant, namespace, name (function name), instance_id, clusterpulsar_function_txn_batch_size: Histogram of transaction batch sizes (number of entries processed per transaction)tenant, namespace, name (function name), instance_id, clusterpulsar_function_txn_batch_commit_reason: Counter tracking transaction batch commits by reasontenant, namespace, name (function name), instance_id, cluster, reason ("max_entries", "quiet_period",
"function_close")pulsar_function_txn_entries_per_second: Gauge tracking the rate of entries processed in transactionstenant, namespace, name (function name), instance_id, clusterTo monitor the transaction functionality in Pulsar Functions, users should:
Example alert thresholds:
The proposed transaction support doesn't introduce new security concerns as it builds on top of existing Pulsar transaction mechanisms. All security aspects of transactions, including authentication and authorization, are inherited from the Pulsar client's transaction implementation.
Functions will only be able to create transactions and operate on topics they already have permission to access. No additional permissions are required beyond what's already needed for the function to operate.
The proposed changes are backward compatible with existing Pulsar Functions:
OFF so existing functions will continue to operate without transaction support.To enable transaction support for existing functions:
transactionMode: MANAGED and optionally configure transactionTimeoutMs.When downgrading to a version that doesn't support the transaction features, several steps need to be taken to ensure compatibility:
transactionMode: OFF before downgradingInstead of automatic transaction wrapping, we could expose explicit transaction APIs in the Context interface:
Transaction newTransaction();
void commitTransaction(Transaction txn);
void abortTransaction(Transaction txn);
This approach gives function authors more control but requires code changes to use transactions. We rejected this approach to keep the programming model simpler and avoid breaking the Function interface.
The implementation of transaction support in Pulsar Functions should be considered a stepping stone toward a more comprehensive exactly-once processing model for Pulsar's stream processing capabilities. Future work may include Transaction support in Pulsar IO connectors.