pip/pip-453.md
The pulsar-metadata module provides two abstractions for interacting with metadata stores:
MetadataStore: the wrapper on the actual underlying metadata store (e.g. ZooKeeper), which has caches for value and children of a given key.MetadataCache<T>: a typed cache layer on top of MetadataStore, which performs serialization and deserialization of data between T and byte[].The MetadataStore instance is unique in each broker, and is shared by multiple MetadataCache<T> instances.
However, a single thread whose name starts with the metadata store name (e.g. ZK-MetadataStore) is used by implementations of them. This thread is used in the following tasks:
put.AbstractMetadataStore#accept, which calls accept methods of all MetadataCache instances and all listeners registered by MetadataStore#registerListener.metadataStoreBatchingMaxDelayMillis config (default: 5 ms).It should be noted that MetadataCache executes the compute sensitive tasks like serialization in the MetadataStore callback. When the number of metadata operations grows, this thread is easy to be overwhelmed. It also affects the topic loading, which involves many metadata operations, this thread can be overwhelmed and block other tasks. For example, in a production environment, it's observed that the pulsar_batch_metadata_store_queue_wait_time metric is high (100 ms), which should be close to 5 ms normally (configured by metadataStoreBatchingMaxDelayMillis).
The single thread model is inefficient when there are many metadata operations. For example, when a broker is down and the topics owned by this broker will be transferred to the new owner broker. Since the new owner broker might never owned them before, even the MetadataCache caches are cold, which results in many metadata operations. However, the CPU-bound tasks like serialization and deserialization are executed in the MetadataStore thread, which makes it easy to be overwhelmed. This affects the topic loading time and the overall performance of the broker.
In a production environment, there is a case when the metadata operation rate increased suddenly, the pulsar_batch_metadata_store_queue_wait_time_ms_bucket metric increased to ~100 ms, which is a part of the total latency of a metadata operation. As a result, the total P99 get latency (pulsar_metadata_store_ops_latency_ms_bucket{type="get"}) increased to 2 seconds.
The 3rd task in the previous section is scheduled via scheduleAtFixedRate, which means if the task is not executed in time (5 ms by default), the task will be executed immediately again in a short time, which also burdens the single metadata store thread.
Improve the existing thread model to handle various tasks on metadata store, which could avoid a single thread being overwhelmed when there are many metadata operations.
Actually the batching mechanism introduced by #13043 is harmful. The flush method, which is responsible to send a batch of metadata operations to broker, is called in the metadata store thread rather than the caller thread. The trade-off of the higher throughput is the lower latency. The benefit is limited because in most time the metadata operation rate is not so high. See this test report for more details.
This proposal doesn't intend to change the existing batching mechanism or disable it by default. It only improves the threading model to avoid the single thread being overwhelmed.
Additionally, some code paths execute the compute intensive tasks in the metadata store thread directly (e.g. store.get(path).thenApply(/* ... */)), this proposal does not aim at changing them to asynchronous methods (e.g. thenApplyAsync).
Create 4 sets of threads:
<name>-event: the original metadata store thread, which is now only responsible to handle notifications. This executor won't be a ScheduledExecutorService anymore.<name>-scheduler: a single thread, which is used to schedule tasks like flushing and retrying failed operations.<name>-batch-flusher: a single thread, which is used to schedule the flushing task at a fixed rate. It won't be created if metadataStoreBatchingEnabled is false.<name>-worker: a fixed thread pool shared by all MetadataCache instances to execute compute intensive tasks like serialization and deserialization. The same path will be handled by the same thread to keep the processing order on the same path.Regarding the callbacks, don't switch to a different thread. This change is not breaking because the underlying metadata store usually executes the callback in a single thread (e.g. <name>-EventThread in ZooKeeper) like the single thread in the current implementation. The caller should be responsible to manage worker threads on the metadata operation result if the callback is compute intensive.
The only concern is that introducing a new thread to execute callbacks allows waiting for the future of metadata store APIs in the callback. After this change, the following use case could be a dead lock:
metadataStore.get(path).thenApply(__ -> metadataStore.get(otherPath).join());;
Add a configurations to specify the number of worker threads for MetadataCache:
@FieldContext(
category = CATEGORY_SERVER,
doc = "The number of threads uses for serializing and deserializing data to and from the metadata store"
)
private int metadataStoreSerDesThreads = 1;
Use 1 as the default value since the serialization and deserialization tasks are not frequent. This separated thread pool is mainly added to avoid blocking the metadata store callback thread.
The pulsar_batch_metadata_store_executor_queue_size metric will be removed because the <name>-batch-flusher thread won't execute other tasks except for flushing.