pip/pip-430.md
Apache Pulsar brokers maintain an in-memory entry cache to reduce latency and load on BookKeeper and tiered storage (S3) by serving frequently accessed message data directly from memory.
Key concepts:
ManagedLedgerImpl: Manages storage for a single topic partition. Each ManagedLedgerImpl instance has its own EntryCache instance.EntryCache: Stores message entries (payloads) in memory. The default implementation is RangeEntryCacheImpl, which uses a RangeCache internally.RangeCache: A specialized cache that stores entries mapped by their Position (ledgerId, entryId). It supports range-based operations for retrieval and expiration.EntryCacheManager (RangeEntryCacheManagerImpl): A global component that limits the total size of all entry caches in a broker. When the total size exceeds a threshold, it triggers eviction.managedLedgerCacheEvictionTimeThresholdMillis). This is currently handled periodically by ManagedLedgerFactoryImpl iterating over all managed ledgers.managedLedgerCacheSizeMB). The current implementation resides in EntryCacheDefaultEvictionPolicy, which selects a subset of larger caches and proportionally evicts entries from each entry cache to keep the total cache size under the limit.cacheEvictionByMarkDeletedPosition.The broker cache serves various read patterns:
The current Pulsar broker entry cache implementation and its eviction mechanisms face several challenges that impact performance, efficiency, and predictability:
Inefficient and Flawed Size-Based Eviction:
The EntryCacheDefaultEvictionPolicy (the current default for size-based eviction) does not guarantee the removal of the oldest entries globally. It sorts individual EntryCache instances by their size, selects a percentage of the largest caches, and then asks each of them to evict a proportional amount of data. This can lead to newer entries being evicted from large, active caches while older, less relevant entries remain in smaller or less active caches, resulting in suboptimal cache utilization and potentially lower hit rates.
Inefficient and Incorrect Timestamp-Based Eviction:
The existing timestamp-based eviction mechanism, triggered by ManagedLedgerFactoryImpl, has significant performance and correctness issues:
ManagedLedgerImpl instances and their respective EntryCache instances periodically (default every 10ms). In brokers with a large number of topics, this frequent and exhaustive iteration leads to high CPU utilization and memory pressure.RangeCache.evictLEntriesBeforeTimestamp) often assumes entries within a single RangeCache are primarily ordered by timestamp due to typical append-only workloads. This assumption breaks down with mixed read patterns like catch-up reads or when entries are inserted out of their natural position order (Key_shared subscription replay queue scenario), potentially leading to incorrect eviction decisions or inefficient scanning.Limited Cache Scope and Effectiveness for Diverse Read Patterns:
The original RangeCache was primarily designed with tailing reads in mind. While support for caching backlogged cursors and replay queue reads was added later, the eviction algorithms were not holistically updated to effectively manage mixed read patterns (tailing, catch-up, replays in Key_Shared). This can lead to:
Foundation for Advanced Caching Strategies Needed: The current cache architecture makes it difficult to implement more intelligent caching strategies that could further optimize for common Pulsar use cases, such as efficiently handling fan-out to multiple shared consumers or retaining entries expected to be read by several cursors.
Addressing these issues is crucial for improving broker performance, reducing operational costs (lower BookKeeper load), and providing a more robust caching layer that can adapt to diverse workloads.
The refactoring aims to make cache eviction more robust, performant, and predictable. The "expected read count" strategy is an attempt to make the cache more aware of Pulsar's specific consumption patterns.
EntryCacheDefaultEvictionPolicy for size-based eviction with a centralized, insertion-order aware mechanism.RangeCacheRemovalQueue that tracks all cached entries globally in approximate insertion order.cacheEvictionByExpectedReadCount) to enable this strategy.managedLedgerCacheEvictionTimeThresholdMillis during eviction check if it has been accessed since the last check.managedLedgerCacheEvictionExtendTTLOfRecentlyAccessed configuration option.The proposed solution involves two main components: a refactored eviction mechanism using a centralized removal queue and a new cache strategy based on expected read count and additionally LRU cache behavior.
RangeCacheRemovalQueueA new component, RangeCacheRemovalQueue, will be introduced at the EntryCacheManager level.
RangeEntryCacheImpl (the per-ledger cache), a lightweight wrapper for this entry (RangeCacheEntryWrapper) is also added to this global RangeCacheRemovalQueue. This queue maintains entries in their global insertion order (FIFO). This wrapper is already necessary in RangeEntryCacheImpl to prevent consistency issues. The current internal wrapper class is refactored to a top-level class so that it can be used with the removal queue.ManagedLedgerFactoryImpl's cacheEvictionExecutor) will process the RangeCacheRemovalQueue. It will iterate from the head of the queue, removing entries whose timestampNanos are older than the cacheEvictionTimeThresholdNanos. Since the queue is insertion-ordered, this process can often stop early once it encounters an entry that is not expired.EntryCacheManager detects that the total cache size exceeds evictionTriggerThresholdPercent * maxSize, it will trigger an eviction cycle. This cycle will also process the RangeCacheRemovalQueue from the head, removing the oldest entries (regardless of which specific ledger they belong to) until the cache size is brought down to cacheEvictionWatermark * maxSize.removalQueue). Requeued entries are reconsidered in subsequent eviction passes or if they eventually expire by managedLedgerCacheEvictionExtendTTLOfEntriesWithRemainingExpectedReadsMaxTimes setting. This prevents premature eviction of entries that are likely to be read again soon. Similarly, there's a requeue mechanism for entries that have been accessed since the last eviction check and have managedLedgerCacheEvictionExtendTTLOfRecentlyAccessed set to true. If the entry has been accessed since the last eviction check, it will be requeued to the end of the removalQueue and its TTL will be extended by managedLedgerCacheEvictionTimeThresholdMillis. This is similar to LRU cache behavior.This centralized approach replaces the distributed, per-cache iteration for timestamp eviction and the less precise EntryCacheDefaultEvictionPolicy for size eviction, leading to more globally optimal and efficient eviction decisions.
This FIFO queue-based solution is influenced by the S3FIFO cache eviction algorithm (algorithm explanation), although it is not an implementation of S3FIFO. It accomplishes a similar goal as S3FIFO, but for the managed ledger broker cache. The goal of S3FIFO is to quickly remove entries that are unlikely to be accessed in the future. Similarly, the PIP-430 broker cache removal queue aims to quickly and efficiently remove entries that would be unlikely to be accessed in the future. The reason why S3FIFO is not implemented in broker cache is that in broker caching, the "expected read count" concept is used to determine whether an entry is likely to be accessed in the future. This information is not available in generic cache implementations, and in the broker cache this information can be used to keep entries in the cache longer that are likely to be accessed in the future.
The S3FIFO algorithm explanation blog post states: "While the eviction algorithms so far have been centered around LRU, I believe modern eviction algorithms should be designed with FIFO queues." This is one of the reasons why the PIP-430 broker cache removal queue is also based on a FIFO queue.
This new strategy aims to improve cache hit rates by retaining entries that are likely to be read by multiple active consumers/cursors.
EntryReadCountHandler: Each cached entry (EntryImpl) will be associated with an EntryReadCountHandlerImpl. This handler maintains an expectedReadCount (an atomic integer).OpAddEntry), its expectedReadCount is initialized to the number of active cursors.RangeEntryCacheImpl.readFromStorage), the expectedReadCount is initialized based on the current state of active cursors currently positioned before or at the entry being added. This information is sourced from ActiveManagedCursorContainer.getNumberOfCursorsAtSamePositionOrBefore(ManagedCursor).expectedReadCount is decremented (via EntryReadCountHandler.markRead()).expectedReadCount on a cached entry is incremented when the entry is added to the replay queue. This will de-prioritize the removal of the entry from the cache when size-based eviction is performed so that when the Key_Shared consumer is available to read the entry, it would more likely be available in the cache.RangeCacheRemovalQueue's size-based eviction logic will consult EntryImpl.hasExpectedReads(). This method returns true if expectedReadCount > 0 and it has been defined.cacheEvictionByExpectedReadCount (boolean, default: true): Enables the new eviction strategy based on expected read count. When true, entries with expectedReadCount > 0 are less likely to be evicted by size-based eviction unless they also meet timestamp expiration.managedLedgerCacheEvictionExtendTTLOfEntriesWithRemainingExpectedReadsMaxTimes (Integer, default: 5): Maximum number of times the cache can extend the TTL of an entry that has remaining expected reads. Only takes effect when cacheEvictionByExpectedReadCount is enabled.managedLedgerCacheEvictionExtendTTLOfRecentlyAccessed (boolean, default: true): Controls whether recently accessed entries in the managed ledger cache should have their lifetime extended before cache eviction.This strategy allows the cache to be more intelligent about retaining entries that have higher utility, especially in fan-out scenarios or with slightly lagging consumers in Key_Shared subscriptions. The replay queue is also used in Shared subscriptions when more entries are read than can be consumed by the available consumers. This strategy will also avoid cache misses in those scenarios.
RangeCacheRemovalQueueremovalQueue: org.jctools.queues.MpscUnboundedArrayQueue<RangeCacheEntryWrapper> to hold entries in insertion order in a FIFO queue.EvictionPredicate determines the EvictionResult for each entry:
REMOVE: Entry is evicted.REQUEUE: Entry is moved to the tail of the removalQueue (which extends the TTL by managedLedgerCacheEvictionTimeThresholdMillis).STOP: Stops processing further entries in the current pass (e.g., for timestamp eviction when a non-expired entry is found, or for size eviction when enough space is freed).MISSING: Entry was already removed from cache by other means.evictLEntriesBeforeTimestamp(long timestampNanos): Uses a predicate to remove entries whose timestampNanos is older than the provided timestamp. It requeues entries if extendTTLOfRecentlyAccessed is enabled and the entry has been accessed since the last eviction check or if the entry has remaining expected reads and the requeue count is less than the configured maximum.evictLeastAccessedEntries(long sizeToFree, long expirationTimestampNanos): Uses a predicate to remove the least accessed entries until the specified size is freed. It considers the extendTTLOfRecentlyAccessed and expected read count when determining eviction eligibility.EntryImpl Modifications and EntryReadCountHandlerThe values stored inside a RangeCache are EntryImpl instances (they implement ReferenceCountedEntry). EntryImpl has been modified to hold a reference to an EntryReadCountHandler instance which handles the state related to read count. EntryReadCountHandler is used as an abstraction and integration between RangeCacheRemovalQueue, EntryImpl and the Managed Ledger layer. The implementation of EntryReadCountHandler holds the state in a single volatile int field.
public interface EntryReadCountHandler {
int getExpectedReadCount();
void incrementExpectedReadCount();
void markRead();
default boolean hasExpectedReads() {
return getExpectedReadCount() >= 1;
}
}
EntryReadCountHandler interface:
int getExpectedReadCount() returns the current expected read count.void incrementExpectedReadCount() increments the count.void markRead(): Decrements the count.EntryReadCountHandlerImpl class:
AtomicIntegerFieldUpdater for expectedReadCount.ActiveManagedCursorContainer and ActiveManagedCursorContainerImplTo efficiently track the number of active cursors at or before a given position, a new ActiveManagedCursorContainer interface is introduced. The ActiveManagedCursorContainerImpl class implements tracking using a sorted linked list. JMH benchmarks show that this implementation is more efficient for the expected use cases, allowing O(1) access to the number of cursors at a given position. The ActiveManagedCursorContainerImpl is only used when the cacheEvictionByExpectedReadCount is enabled. In other cases, the previous ManagedCursorContainer implementation is used, although it is renamed to ManagedCursorContainerImpl so that the implementation can be decoupled from the interface. JMH benchmarks show that the new ActiveManagedCursorContainerImpl is more efficient than the previous ManagedCursorContainerImpl implementation for all use cases at the moment. It shows that there's a performance improvement opportunity for the ManagedCursorContainerImpl implementation, but it is not a priority for this PIP.
ManagedLedgerImplOpAddEntry:
if (!(ml instanceof ShadowManagedLedgerImpl)) {
int activeCursorCount = ml.getActiveCursors().size();
if (activeCursorCount > 0) {
int expectedReadCount = 0;
if (ml.getConfig().isCacheEvictionByExpectedReadCount()) {
// For newly added entries, all active cursors are effectively "before" the added entry for future reads.
expectedReadCount = activeCursorCount;
}
EntryImpl entry = EntryImpl.create(ledgerId, entryId, data, expectedReadCount);
entry.setDecreaseReadCountOnRelease(false); // Cache owns the primary read count handling now
ml.entryCache.insert(entry);
entry.release();
}
}
ManagedLedgerImpl.asyncReadEntry:
expectedReadCount parameter (an IntSupplier) is passed down to RangeEntryCacheImpl.asyncReadEntry0.IntSupplier would resolve to opReadEntry.cursor.getNumberOfCursorsAtSamePositionOrBefore() when cacheEvictionByExpectedReadCount is true.RangeEntryCacheImpl.readFromStorage:
IntSupplier expectedReadCount.EntryImpl from LedgerEntry, it passes expectedReadCount.getAsInt() to EntryImpl.create.EntryImpl.release():
decreaseReadCountOnRelease (default false) is added to EntryImpl.EntryImpl.create(Entry other) (copy constructor for dispatch) is called, this flag is set to true on the copy.EntryImpl.beforeDeallocate(): if decreaseReadCountOnRelease is true and readCountHandler is not null, call readCountHandler.markRead().
This ensures that only when an entry copy created for dispatch is released, the read count on the original cached entry's handler is decremented.RangeEntryCacheManagerImpldoCacheEviction: Called by the global eviction scheduler.
evictionHandler.invalidateEntriesBeforeTimestampNanos. Would also have to take separate managedLedgerCacheEvictionTimeThresholdMillis and managedLedgerCacheEvictionExtendTTLOfEntriesWithRemainingExpectedReadsMaxTimes settings into account.doEvictToWatermarkWhenOverThreshold() to ensure cache size is within limits.triggerEvictionWhenNeeded(): Called after an entry is added.
doEvictToWatermarkWhenOverThreshold() on cacheEvictionExecutor.doEvictToWatermarkWhenOverThreshold():
sizeToEvict.evictionHandler.evictEntries(sizeToEvict, expirationTimestampNanosForNonEvictableEntries).The calculation of the entry cache size is not accurate unless the managedLedgerCacheCopyEntries setting is set to true. This occurs because when the entry payload is read from BookKeeper or received from a Pulsar client publisher, it is "sliced" from an underlying buffer. The actual memory consumption can be higher when the other entries of the shared underlying buffer have been released and there is a single entry that retains the complete underlying buffer.
For entries received from a Pulsar client publisher, the buffer size is between 16kB to 1MB. This is currently not configurable and is set in code in the BrokerService.defaultServerBootstrap method.
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024));
The actual underlying buffer size will also depend on Netty's LengthFieldBasedFrameDecoder (extends ByteToMessageDecoder) logic, which will cumulate buffers by copying until the complete entry has been read. In that case, the underlying buffer size can be larger than 1MB. The cumulated buffer will be shared across multiple entries since entry buffers are sliced from the cumulated buffer.
For entries received from BookKeeper, it's similar. However, BookKeeper client defaults to Netty's default channel config settings for RCVBUF_ALLOCATOR, which has a maximum buffer size of 64kB.
It would be useful to make the Pulsar broker side RCVBUF_ALLOCATOR parameters configurable so that the parameters could be tuned to a smaller size. It is very unlikely that the 1MB maximum size improves performance significantly. There is a performance tradeoff in the case where messages are large and buffers would have to be merged by copying in the LengthFieldBasedFrameDecoder. To avoid this overhead, it would be possible to make LengthFieldBasedFrameDecoder use composite buffers for merging by setting the cumulator property to ByteToMessageDecoder.COMPOSITE_CUMULATOR, however that could have a negative performance impact on code that expects that the complete payload is a single non-continuous buffer.
Since in many cases, the underlying buffer where the slice is taken for a single entry is shared with entries that have been published or retrieved together, it is also common that these entries will get evicted together. Making the RCVBUF_ALLOCATOR settings configurable in the Pulsar broker is a sufficient mitigation for the problem. In cases where shared buffers add a lot of overhead of consumed memory, it will be possible to reduce it by setting the maximum size for AdaptiveRecvByteBufAllocator to 64kB, with the tradeoff of possible unnecessary buffer copies for messages exceeding 64kB. In BookKeeper server, there are settings byteBufAllocatorSizeInitial, byteBufAllocatorSizeMin and byteBufAllocatorSizeMax to configure the AdaptiveRecvByteBufAllocator parameters. The naming of these parameters in BookKeeper isn't optimal since the settings are specifically about AdaptiveRecvByteBufAllocator parameters and not "byteBufAllocator" parameters.
The proposal would be to add these configuration parameters to broker.conf for controlling Broker's AdaptiveRecvByteBufAllocator parameters:
# Netty adaptive receive buffer allocator's minimum size
brokerAdaptiveRecvByteBufAllocatorMinimumSize=1024
# Netty adaptive receive buffer allocator's initial size
brokerAdaptiveRecvByteBufAllocatorInitialSize=16384
# Netty receive adaptive buffer allocator's maximum size
# Tune this value to a lower value to reduce overhead of the entries cached in the Broker cache due to shared underlying buffers
brokerAdaptiveRecvByteBufAllocatorMaximumSize=1048576
New Configuration: broker.conf
cacheEvictionByExpectedReadCount (boolean): Enables the new eviction strategy based on expected read count. When true, entries with expectedReadCount > 0 are less likely to be evicted by size-based eviction unless they also meet timestamp expiration.
Default: true.managedLedgerCacheEvictionExtendTTLOfEntriesWithRemainingExpectedReadsMaxTimes (int): Maximum number of times the cache can extend the TTL of an entry that has remaining expected reads. Only takes effect when cacheEvictionByExpectedReadCount is enabled. This helps optimize cache efficiency for scenarios like:
Key_Shared subscription replays
Catch-up reads for lagging consumers
Consumers temporarily falling behind the tail
Entries with remaining expected reads will have their TTL extended up to this many times before being eligible for eviction. The TTL will be extended by managedLedgerCacheEvictionTimeThresholdMillis each time.
Default: 5.
managedLedgerCacheEvictionExtendTTLOfRecentlyAccessed (boolean): Controls whether recently accessed entries in the managed ledger cache should have their lifetime extended before cache eviction.
managedLedgerCacheEvictionTimeThresholdMillismanagedLedgerCacheEvictionTimeThresholdMillismanagedLedgerCacheEvictionTimeThresholdMillistrue.managedLedgerContinueCachingAddedEntriesAfterLastActiveCursorLeavesMillis (long):
cacheEvictionByExpectedReadCount is enabled.managedLedgerCacheEvictionTimeThresholdMillis.Modified Behavior of Existing Configurations:
managedLedgerCacheEvictionTimeThresholdMillis (long): Controls time-to-live (TTL) for entries in the managed ledger (broker) cache. The TTL can be extended in two ways:
cacheEvictionByExpectedReadCount is enabled: TTL is extended for entries with remaining expected reads. The maximum number of extensions is controlled by managedLedgerCacheEvictionExtendTTLOfEntriesWithRemainingExpectedReadsMaxTimes.cacheEvictionExtendTTLOfRecentlyAccessed is enabled: TTL is extended for entries accessed since the last expiration check.1000.managedLedgerCacheSizeMB: Still applies. The new size-based eviction will use the RangeCacheRemovalQueue.cacheEvictionByMarkDeletedPosition: If cacheEvictionByExpectedReadCount is true, this setting's direct influence on preserving entries is diminished, as expectedReadCount provides more granular control. However, mark-delete position updates still occur. If cacheEvictionByExpectedReadCount is false, this setting functions as before (though the underlying eviction for "up to slowest reader" now also uses the central queue).managedLedgerCursorBackloggedThreshold, managedLedgerMinimumBacklogCursorsForCaching, managedLedgerMinimumBacklogEntriesForCaching, managedLedgerMaxBacklogBetweenCursorsForCaching: These settings primarily determine whether a cursor is considered "active" for caching purposes (i.e., ManagedCursorImpl.isCacheReadEntry()).
cacheEvictionByExpectedReadCount is true, these settings are ignored since they would conflict with the expected read count implementation and would not benefit the caching algorithm.managedLedgerCursorBackloggedThreshold and related configuration parameters relies on periodic checks of checkBackloggedCursors and checkCursorsToCacheEntries, which occur once per minute during statistics updates.
cacheEvictionByExpectedReadCount solution eliminates the need for these settings since the efficient eviction algorithm can adapt to cases where far-behind backlogged cursors unnecessarily cause entries to be cached longer than necessary. The size-based eviction will efficiently remove entries in a prioritized manner, ensuring that entries with a positive expected read count are retained longer, while entries with no expected reads are evicted more aggressively.Existing broker cache metrics will continue to function, reflecting the behavior of the new eviction system and broker cache strategy.
There are no compatibility concerns since the broker cache is handled at runtime, in a single broker.
Simple LRU (Least Recently Used) for Global Cache:
Priority Queue for RangeCacheRemovalQueue:
expectedReadCount (ascending) and then timestamp (ascending) could be used.expectedReadCount can change dynamically for entries already in the queue adds significant complexity. Updates would require O(log N) or O(N) operations depending on the queue implementation and how updates are handled. The current MPSC queue with a "stash" for non-evictable items offers a simpler, lower-overhead approach for the initial refactoring. Performance of MPSC queues is very high for enqueue/dequeue. The "stash" handles the "deprioritizing" of eviction for entries with positive read counts without complex queue reordering. This can be revisited if the stash mechanism proves insufficient.