pip/pip-442.md
Apache Pulsar brokers provide commands for clients to discover topics within a namespace and watch for topic updates. These commands are critical for client operations but currently lack memory limits and flow control mechanisms, creating potential memory and stability issues at scale.
Pulsar brokers already implement comprehensive memory management for most operations through several key configurations:
Message Publishing Memory Limits:
maxMessagePublishBufferSizeInMB (default: 50% direct memory): Limits memory used for buffering messages during publishing, providing backpressure when producers exceed broker capacityManaged Ledger Memory Limits:
managedLedgerMaxReadsInFlightSizeInMB (default: 0, disabled): Controls memory allocation for concurrent read operations from BookKeeper, preventing excessive memory usage during high read loads. This limit extends to cover buffers that were read from BookKeeper and are waiting in channel outbound buffers to be written to client sockets.managedLedgerCacheSizeMB (default: 20% of direct memory): Limits cache memory for recently read ledger entries, ensuring predictable memory usage for read caching. This limit extends to cover buffers that were read from the cache and are waiting in channel outbound buffers to be written to client sockets.Additional Memory Controls:
maxConcurrentLookupRequest (default: 50000): Limits concurrent topic lookup requests. The unit of this limit is the number of requests; it is not expressed in memory size.maxConcurrentTopicLoadRequest (default: 5000): Controls concurrent topic loading operations. The unit of this limit is the number of requests; it is not expressed in memory size.These existing limits effectively bound memory usage for message handling, storage operations, and most broker functions. However, there is a significant gap in memory management for topic discovery operations.
Major unbounded memory allocation in Pulsar brokers occurs during topic listing operations:
CommandGetTopicsOfNamespace / CommandGetTopicsOfNamespaceResponseCommandWatchTopicList / CommandWatchTopicListSuccess & CommandWatchTopicUpdateThese operations can allocate arbitrary amounts of memory based on namespace size, with no limits or backpressure mechanisms.
Topic Discovery Commands:
CommandGetTopicsOfNamespace: Binary protocol command that retrieves all topics within a namespaceCommandGetTopicsOfNamespaceResponse: Response containing the list of topicsCommandWatchTopicList: Command to establish a watch for topic list changesCommandWatchTopicListSuccess: Initial response confirming watch establishment and returning current topic listCommandWatchTopicUpdate: Notifications sent when topics are added or removedCurrent Implementation Flow:
The getTopicsOfNamespace request follows this path:
CommandGetTopicsOfNamespace via binary protocolServerCnx.handleGetTopicsOfNamespace()LookupProxyHandler.handleGetTopicsOfNamespace()NamespaceService.getListOfUserTopics() orchestrates:
TopicResourcesTopicList.filterSystemTopic()inProgressQueryUserTopics to prevent duplicate queriesUnlike other broker operations that have memory limits, topic listing operations create unbounded memory allocation scenarios:
Memory Allocation Points:
Scale Impact:
The lack of memory limits for topic listing commands creates the final significant gap in Pulsar's otherwise comprehensive memory management system:
Memory Management Consistency: While all other broker operations have memory limits and backpressure mechanisms, topic listing operations remain unbounded, creating an inconsistent and unpredictable memory profile.
Broker Memory Exhaustion Risk: Large clusters can trigger OutOfMemoryErrors when multiple clients simultaneously request topic lists, causing broker crashes and service disruption despite other memory controls being in place.
Proxy Memory Exhaustion Risk: Proxies are also impacted for CommandGetTopicsOfNamespace since the request is forwarded to a broker and the response is deserialized and reserialized without limits.
Unpredictable Resource Usage: Operators cannot reliably predict or limit total broker or proxy memory consumption due to this unbounded allocation path, undermining capacity planning and resource management.
Performance Degradation: Even without OOM, large topic list operations cause GC pressure and latency spikes affecting all broker operations, counteracting the stability provided by other memory limits.
CommandGetTopicsOfNamespace and CommandWatchTopicList commandsThe solution introduces an AsyncDualMemoryLimiter that acts as a memory-aware semaphore for topic listing operations, completing Pulsar's memory management framework:
Memory Tracking: Before processing requests or sending responses, the system estimates memory requirements and acquires permits from the limiter. An initial permit is acquired with a fixed estimate (1KB), then updated to reflect the actual memory usage after the topic list is retrieved.
Dual Memory Pools: Separate tracking for heap memory (topic list assembly) and direct memory (network buffers) with independent limits, since topic listing operations use both types of memory.
Asynchronous Backpressure: When memory limits are reached, requests queue with configurable timeouts rather than failing immediately, providing graceful degradation similar to managedLedgerMaxReadsInFlightSizeInMB behavior. When the queue is completely full, requests are rejected.
Graceful Degradation: The system continues processing within memory limits rather than crashing, with clear metrics indicating when memory-based throttling occurs.
Release Guarantees: Memory permits are released after response transmission completes or on request failure, preventing memory leaks and ensuring accurate memory tracking.
Cancellation Support: The implementation supports cancellation of permit requests when the client connection is closed, preventing unnecessary queueing and resource allocation.
This is an abstraction for a generic asynchronous semaphore. The memory limiter implementation uses this abstraction to implement separate limiters for heap and direct memory.
public interface AsyncSemaphore {
/**
* Acquire permits from the semaphore.
* Returned future completes when permits are available.
* It will complete exceptionally with AsyncSemaphore.PermitAcquireTimeoutException on timeout
* and exceptionally with AsyncSemaphore.PermitAcquireQueueFullException when queue full
*
* @param permits number of permits to acquire
* @param isCancelled supplier that returns true if acquisition should be cancelled
* @return CompletableFuture that completes with permit when available
*/
CompletableFuture<AsyncSemaphorePermit> acquire(long permits, BooleanSupplier isCancelled);
/**
* Acquire or release permits for previously acquired permits by updating the permits.
* Returns a future that completes when permits are available.
* It will complete exceptionally with AsyncSemaphore.PermitAcquireTimeoutException on timeout
* and exceptionally with AsyncSemaphore.PermitAcquireQueueFullException when queue full
*
* @param permit previously acquired permit to update
* @param newPermits new number of permits to update to
* @param isCancelled supplier that returns true if update should be cancelled
* @return CompletableFuture that completes with permit when available
*/
CompletableFuture<AsyncSemaphorePermit> update(AsyncSemaphorePermit permit, long newPermits,
BooleanSupplier isCancelled);
/**
* Release previously acquired permit.
* Must be called to prevent permit leaks.
*
* @param permit permit to release
*/
void release(AsyncSemaphorePermit permit);
/**
* Get the number of available permits.
*/
long getAvailablePermits();
/**
* Get the number of acquired permits.
*/
long getAcquiredPermits();
/**
* Get the current size of queued requests.
*/
int getQueueSize();
}
This is an abstraction for an asynchronous memory semaphore that tracks separate limits for heap and direct memory.
public interface AsyncDualMemoryLimiter {
enum LimitType {
HEAP_MEMORY, // For heap memory allocation
DIRECT_MEMORY // For direct memory allocation
}
/**
* Acquire permits for the specified memory size.
* Returned future completes when memory permits are available.
* It will complete exceptionally with AsyncSemaphore.PermitAcquireTimeoutException on timeout
* and exceptionally with AsyncSemaphore.PermitAcquireQueueFullException when queue full
*
* @param memorySize the size of memory to acquire permits for
* @param limitType the type of memory limit (HEAP_MEMORY or DIRECT_MEMORY)
* @param isCancelled supplier that returns true if acquisition should be cancelled
* @return CompletableFuture that completes with permit when available
*/
CompletableFuture<AsyncDualMemoryLimiterPermit> acquire(long memorySize, LimitType limitType,
BooleanSupplier isCancelled);
/**
* Acquire or release permits for previously acquired permits by updating the requested memory size.
* Returns a future that completes when permits are available.
* It will complete exceptionally with AsyncSemaphore.PermitAcquireTimeoutException on timeout
* and exceptionally with AsyncSemaphore.PermitAcquireQueueFullException when queue full
* The provided permit is released when the permits are successfully acquired and the returned updated
* permit replaces the old instance.
*
* @param permit the previously acquired permit to update
* @param newMemorySize the new memory size to update to
* @param isCancelled supplier that returns true if update should be cancelled
* @return CompletableFuture that completes with permit when available
*/
CompletableFuture<AsyncDualMemoryLimiterPermit> update(AsyncDualMemoryLimiterPermit permit, long newMemorySize,
BooleanSupplier isCancelled);
/**
* Release previously acquired permit.
* Must be called to prevent memory permit leaks.
*
* @param permit the permit to release
*/
void release(AsyncDualMemoryLimiterPermit permit);
/**
* Execute the specified function with acquired permits and release the permits after the returned future completes.
* @param memorySize memory size to acquire permits for
* @param limitType memory limit type to acquire permits for
* @param function function to execute with acquired permits
* @return result of the function
* @param <T> type of the CompletableFuture returned by the function
*/
default <T> CompletableFuture<T> withAcquiredPermits(long memorySize, LimitType limitType,
BooleanSupplier isCancelled,
Function<AsyncDualMemoryLimiterPermit,
CompletableFuture<T>> function,
Function<Throwable, CompletableFuture<T>>
permitAcquireErrorHandler) {
return AsyncDualMemoryLimiterUtil.withPermitsFuture(acquire(memorySize, limitType, isCancelled), function,
permitAcquireErrorHandler, this::release);
}
/**
* Executed the specified function with updated permits and release the permits after the returned future completes.
* @param initialPermit initial permit to update
* @param newMemorySize new memory size to update to
* @param function function to execute with updated permits
* @return result of the function
* @param <T> type of the CompletableFuture returned by the function
*/
default <T> CompletableFuture<T> withUpdatedPermits(AsyncDualMemoryLimiterPermit initialPermit, long newMemorySize,
BooleanSupplier isCancelled,
Function<AsyncDualMemoryLimiterPermit,
CompletableFuture<T>> function,
Function<Throwable, CompletableFuture<T>>
permitAcquireErrorHandler) {
return AsyncDualMemoryLimiterUtil.withPermitsFuture(update(initialPermit, newMemorySize, isCancelled), function,
permitAcquireErrorHandler, this::release);
}
}
A utility class provides helper methods for common patterns:
public class AsyncDualMemoryLimiterUtil {
/**
* Execute a function with acquired permits and ensure permits are released after completion.
* This method handles the lifecycle of permits - acquisition, usage, and release, including error cases.
*
* @param permitsFuture Future that will complete with the required permits
* @param function Function to execute once permits are acquired that returns a CompletableFuture
* @param permitAcquireErrorHandler Handler for permit acquisition errors that returns a CompletableFuture
* @param releaser Consumer that handles releasing the permits
* @param <T> The type of result returned by the function
* @return CompletableFuture that completes with the result of the function execution
*/
public static <T> CompletableFuture<T> withPermitsFuture(
CompletableFuture<AsyncDualMemoryLimiterPermit>
permitsFuture,
Function<AsyncDualMemoryLimiterPermit,
CompletableFuture<T>> function,
Function<Throwable, CompletableFuture<T>>
permitAcquireErrorHandler,
Consumer<AsyncDualMemoryLimiterPermit> releaser) {
// implementation omitted from PIP document
}
/**
* Acquires permits and writes the command as a response to the channel.
* Releases the permits after the response has been written to the socket or if the write fails.
*
* @param ctx the channel handler context used for writing the response
* @param dualMemoryLimiter the memory limiter used to acquire and release memory permits
* @param isCancelled supplier that indicates if the permit acquisition should be cancelled
* @param command the base command to serialize and write to the channel
* @param permitAcquireErrorHandler handler for errors that occur during permit acquisition
* @return a future that completes when the command has been written to the channel's outbound buffer
*/
public static CompletableFuture<Void> acquireDirectMemoryPermitsAndWriteAndFlush(ChannelHandlerContext ctx,
AsyncDualMemoryLimiter
dualMemoryLimiter,
BooleanSupplier isCancelled,
BaseCommand command,
Consumer<Throwable>
permitAcquireErrorHandler
) {
// implementation omitted from PIP document
}
}
1. Heap Memory Limiting (Post-Retrieval) - Broker
In ServerCnx.handleGetTopicsOfNamespace, the implementation uses the helper methods:
private void internalHandleGetTopicsOfNamespace(String namespace, NamespaceName namespaceName, long requestId,
CommandGetTopicsOfNamespace.Mode mode,
Optional<String> topicsPattern, Optional<String> topicsHash,
Semaphore lookupSemaphore) {
BooleanSupplier isPermitRequestCancelled = () -> !ctx().channel().isActive();
maxTopicListInFlightLimiter.withAcquiredPermits(INITIAL_TOPIC_LIST_HEAP_PERMITS_SIZE,
AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, isPermitRequestCancelled, initialPermits -> {
return getBrokerService().pulsar().getNamespaceService().getListOfUserTopics(namespaceName, mode)
.thenAccept(topics -> {
long actualSize = topics.stream().mapToInt(String::length).sum();
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
isPermitRequestCancelled, permits -> {
boolean filterTopics = false;
// filter system topic
List<String> filteredTopics = topics;
if (enableSubscriptionPatternEvaluation && topicsPattern.isPresent()) {
if (topicsPattern.get().length() <= maxSubscriptionPatternLength) {
filterTopics = true;
filteredTopics = TopicList.filterTopics(filteredTopics, topicsPattern.get(),
topicsPatternImplementation);
} else {
log.info("[{}] Subscription pattern provided [{}] was longer than "
+ "maximum {}.", remoteAddress, topicsPattern.get(),
maxSubscriptionPatternLength);
}
}
String hash = TopicList.calculateHash(filteredTopics);
boolean hashUnchanged = topicsHash.isPresent() && topicsHash.get().equals(hash);
if (hashUnchanged) {
filteredTopics = Collections.emptyList();
}
if (log.isDebugEnabled()) {
log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace "
+ "[//{}] by {}, size:{}", remoteAddress, namespace,
requestId,
topics.size());
}
commandSender.sendGetTopicsOfNamespaceResponse(filteredTopics, hash, filterTopics,
!hashUnchanged, requestId, ex -> {
log.warn("[{}] Failed to acquire direct memory permits for "
+ "GetTopicsOfNamespace: {}", remoteAddress, ex.getMessage());
commandSender.sendErrorResponse(requestId, ServerError.TooManyRequests,
"Cannot acquire permits for direct memory");
});
return CompletableFuture.completedFuture(null);
}, t -> {
log.warn("[{}] Failed to acquire heap memory permits for "
+ "GetTopicsOfNamespace: {}", remoteAddress, t.getMessage());
writeAndFlush(Commands.newError(requestId, ServerError.TooManyRequests,
"Failed due to heap memory limit exceeded"));
return CompletableFuture.completedFuture(null);
});
}).whenComplete((__, ___) -> {
lookupSemaphore.release();
}).exceptionally(ex -> {
log.warn("[{}] Error GetTopicsOfNamespace for namespace [//{}] by {}", remoteAddress,
namespace, requestId);
commandSender.sendErrorResponse(requestId,
BrokerServiceException.getClientErrorCode(new ServerMetadataException(ex)),
ex.getMessage());
return null;
});
}, t -> {
log.warn("[{}] Failed to acquire initial heap memory permits for GetTopicsOfNamespace: {}",
remoteAddress, t.getMessage());
writeAndFlush(Commands.newError(requestId, ServerError.TooManyRequests,
"Failed due to heap memory limit exceeded"));
lookupSemaphore.release();
return CompletableFuture.completedFuture(null);
});
}
2. Direct Memory Limiting (Pre-Serialization) - Broker
Modified PulsarCommandSenderImpl:
@Override
public void sendGetTopicsOfNamespaceResponse(List<String> topics, String topicsHash,
boolean filtered, boolean changed, long requestId,
Consumer<Throwable> permitAcquireErrorHandler) {
BaseCommand command = Commands.newGetTopicsOfNamespaceResponseCommand(topics, topicsHash,
filtered, changed, requestId);
safeIntercept(command, cnx);
acquireDirectMemoryPermitsAndWriteAndFlush(cnx.ctx(), maxTopicListInFlightLimiter, () -> !cnx.isActive(),
command, permitAcquireErrorHandler);
}
The utility method implementation:
public static CompletableFuture<Void> acquireDirectMemoryPermitsAndWriteAndFlush(ChannelHandlerContext ctx,
AsyncDualMemoryLimiter
dualMemoryLimiter,
BooleanSupplier isCancelled,
BaseCommand command,
Consumer<Throwable>
permitAcquireErrorHandler
) {
// Calculate serialized size before acquiring permits
int serializedSize = command.getSerializedSize();
// Acquire permits
return dualMemoryLimiter.acquire(serializedSize, AsyncDualMemoryLimiter.LimitType.DIRECT_MEMORY, isCancelled)
.whenComplete((permits, t) -> {
if (t != null) {
permitAcquireErrorHandler.accept(t);
return;
}
try {
// Serialize the response
ByteBuf outBuf = Commands.serializeWithPrecalculatedSerializedSize(command, serializedSize);
// Write the response
ctx.writeAndFlush(outBuf).addListener(future -> {
// Release permits after the response has been written to the socket
dualMemoryLimiter.release(permits);
});
} catch (Throwable e) {
// Return permits if an exception occurs before writeAndFlush is called successfully
dualMemoryLimiter.release(permits);
throw e;
}
}).thenApply(__ -> null);
}
3. Watch Command Memory Control - Broker
Similar memory limiting patterns apply to watch commands in TopicListService:
public void sendTopicListUpdate(long watcherId, String topicsHash,
List<String> deletedTopics, List<String> newTopics) {
connection.getCommandSender().sendWatchTopicListUpdate(
watcherId, newTopics, deletedTopics, topicsHash,
t -> {
log.warn("[{}] Cannot acquire direct memory tokens for sending topic list update",
connection.toString(), t);
});
}
4. Proxy Memory Control
On the Pulsar Proxy side in LookupProxyHandler:
private void internalPerformGetTopicsOfNamespace(long clientRequestId, String namespaceName, ClientCnx clientCnx,
ByteBuf command, long requestId) {
BooleanSupplier isPermitRequestCancelled = () -> !proxyConnection.ctx().channel().isActive();
maxTopicListInFlightLimiter.withAcquiredPermits(INITIAL_TOPIC_LIST_HEAP_PERMITS_SIZE,
AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, isPermitRequestCancelled, initialPermits -> {
return clientCnx.newGetTopicsOfNamespace(command, requestId).whenComplete((r, t) -> {
if (t != null) {
log.warn("[{}] Failed to get TopicsOfNamespace {}: {}", clientAddress, namespaceName,
t.getMessage());
writeAndFlush(Commands.newError(clientRequestId, getServerError(t), t.getMessage()));
} else {
long actualSize =
r.getNonPartitionedOrPartitionTopics().stream().mapToInt(String::length).sum();
maxTopicListInFlightLimiter.withUpdatedPermits(initialPermits, actualSize,
isPermitRequestCancelled, permits -> {
return handleWritingGetTopicsResponse(clientRequestId, r, isPermitRequestCancelled);
}, t2 -> {
log.warn("[{}] Failed to acquire actual heap memory permits for "
+ "GetTopicsOfNamespace: {}", clientAddress, t2.getMessage());
writeAndFlush(Commands.newError(clientRequestId, ServerError.TooManyRequests,
"Failed due to heap memory limit exceeded"));
return CompletableFuture.completedFuture(null);
});
}
}).thenApply(__ -> null);
}, t -> {
log.warn("[{}] Failed to acquire initial heap memory permits for GetTopicsOfNamespace: {}",
clientAddress, t.getMessage());
writeAndFlush(Commands.newError(clientRequestId, ServerError.TooManyRequests,
"Failed due to heap memory limit exceeded"));
return CompletableFuture.completedFuture(null);
}).exceptionally(ex -> {
writeAndFlush(Commands.newError(clientRequestId, getServerError(ex), ex.getMessage()));
return null;
});
}
private CompletableFuture<Void> handleWritingGetTopicsResponse(long clientRequestId, GetTopicsResult r,
BooleanSupplier isCancelled) {
BaseCommand responseCommand = Commands.newGetTopicsOfNamespaceResponseCommand(
r.getNonPartitionedOrPartitionTopics(), r.getTopicsHash(), r.isFiltered(),
r.isChanged(), clientRequestId);
return acquireDirectMemoryPermitsAndWriteAndFlush(proxyConnection.ctx(), maxTopicListInFlightLimiter,
isCancelled, responseCommand, t -> {
log.warn("[{}] Failed to acquire actual direct memory permits for GetTopicsOfNamespace: {}",
clientAddress, t.getMessage());
writeAndFlush(Commands.newError(clientRequestId, ServerError.TooManyRequests,
"Failed due to direct memory limit exceeded"));
});
}
The TopicListMemoryLimiter class extends AsyncDualMemoryLimiterImpl and adds Prometheus and OpenTelemetry metrics:
public class TopicListMemoryLimiter extends AsyncDualMemoryLimiterImpl {
private final CollectorRegistry collectorRegistry;
private final Gauge heapMemoryUsedBytes;
private final Gauge heapMemoryLimitBytes;
// ... other Prometheus metrics
private final ObservableDoubleGauge otelHeapMemoryUsedGauge;
// ... other OpenTelemetry metrics
public TopicListMemoryLimiter(CollectorRegistry collectorRegistry,
String prometheusPrefix,
Meter openTelemetryMeter,
long maxHeapMemory, int maxHeapQueueSize,
long heapTimeoutMillis, long maxDirectMemory,
int maxDirectQueueSize, long directTimeoutMillis) {
super(maxHeapMemory, maxHeapQueueSize, heapTimeoutMillis,
maxDirectMemory, maxDirectQueueSize, directTimeoutMillis);
// Initialize Prometheus metrics
this.heapMemoryUsedBytes = Gauge.build(
prometheusPrefix + "topic_list_heap_memory_used_bytes",
"Current heap memory used by topic listings")
.create()
.setChild(new Gauge.Child() {
@Override
public double get() {
return getLimiter(LimitType.HEAP_MEMORY).getAcquiredPermits();
}
})
.register(collectorRegistry);
// Initialize OpenTelemetry metrics
this.otelHeapMemoryUsedGauge = openTelemetryMeter
.gaugeBuilder("topic.list.heap.memory.used")
.setUnit("By")
.setDescription("Current heap memory used by topic listings")
.buildWithCallback(measurement -> {
measurement.record(getLimiter(LimitType.HEAP_MEMORY).getAcquiredPermits());
});
// ... initialize other metrics
}
@Override
protected void recordHeapWaitTime(long waitTimeNanos) {
if (waitTimeNanos == Long.MAX_VALUE) {
heapTimeoutTotal.inc();
otelHeapTimeoutTotal.add(1);
} else {
heapWaitTimeMs.observe(TimeUnit.NANOSECONDS.toMillis(waitTimeNanos));
otelHeapWaitTime.record(waitTimeNanos / 1_000_000_000.0d);
}
}
@Override
protected void recordDirectWaitTime(long waitTimeNanos) {
// Similar implementation for direct memory
}
}
broker.conf/proxy.conf additions to complete the memory management configuration set:
# Maximum heap memory for inflight topic list operations (MB)
# Default: 100 MB (supports ~1M topic names assuming 100 bytes each)
maxTopicListInFlightHeapMemSizeMB=100
# Maximum direct memory for inflight topic list responses (MB)
# Default: 100 MB (network buffers for serialized responses)
maxTopicListInFlightDirectMemSizeMB=100
# Timeout for acquiring heap memory permits (milliseconds)
# Default: 25000 (25 seconds)
maxTopicListInFlightHeapMemSizePermitsAcquireTimeoutMillis=25000
# Maximum queue size for heap memory permit requests
# Default: 10000 (prevent unbounded queueing)
maxTopicListInFlightHeapMemSizePermitsAcquireQueueSize=10000
# Timeout for acquiring direct memory permits (milliseconds)
# Default: 25000 (25 seconds)
maxTopicListInFlightDirectMemSizePermitsAcquireTimeoutMillis=25000
# Maximum queue size for direct memory permit requests
# Default: 10000 (prevent unbounded queueing)
maxTopicListInFlightDirectMemSizePermitsAcquireQueueSize=10000
New metrics under pulsar_broker_topic_list_/pulsar_proxy_topic_list_ prefix, complementing existing memory metrics:
| Metric Name | Type | Description | Labels |
|---|---|---|---|
heap_memory_used_bytes | Gauge | Current heap memory used by topic listings | cluster |
heap_memory_limit_bytes | Gauge | Configured heap memory limit | cluster |
direct_memory_used_bytes | Gauge | Current direct memory used by topic listings | cluster |
direct_memory_limit_bytes | Gauge | Configured direct memory limit | cluster |
heap_queue_size | Gauge | Current heap memory limiter queue size | cluster |
heap_queue_max_size | Gauge | Maximum heap memory limiter queue size | cluster |
direct_queue_size | Gauge | Current direct memory limiter queue size | cluster |
direct_queue_max_size | Gauge | Maximum direct memory limiter queue size | cluster |
heap_wait_time_ms | Summary | Wait time for heap memory permits (quantiles: 0.5, 0.95, 0.99, 1.0) | cluster |
direct_wait_time_ms | Summary | Wait time for direct memory permits (quantiles: 0.5, 0.95, 0.99, 1.0) | cluster |
heap_timeout_total | Counter | Total heap memory permit timeouts | cluster |
direct_timeout_total | Counter | Total direct memory permit timeouts | cluster |
OpenTelemetry equivalents are also provided with similar naming under the topic.list.* namespace.
No changes to REST API.
No protocol changes. Existing commands continue to work with added server-side memory limits and backpressure.
Operators should monitor the following metrics alongside existing memory management metrics and set up alerts:
Memory Utilization Alert:
heap_memory_used_bytes / heap_memory_limit_bytes > 0.8Queue Saturation Alert:
heap_queue_size / heap_queue_max_size > 0.9Timeout Rate Alert:
rate(heap_timeout_total[5m]) > 1P99 Wait Time Alert:
heap_wait_time_ms{quantile="0.99"} > 10000These alerts should be configured alongside existing memory alerts for managedLedgerCacheSizeMB, maxMessagePublishBufferSizeInMB, and other memory limits to provide comprehensive memory management visibility.
The memory limiting mechanism introduces new denial-of-service protections:
Resource Exhaustion Protection: The limits prevent bad clients from triggering OOM by requesting large topic lists repeatedly, completing the broker's defense against memory-based attacks.
Fair Queueing: The queue size limits prevent bad clients from monopolizing memory permits and blocking legitimate requests.
Cancellation Support: The implementation includes cancellation support to prevent resource waste when clients disconnect, preventing malicious clients from queueing many requests and then disconnecting.
Multi-tenancy Isolation: Consider per-tenant memory limits in future iterations to prevent one tenant from consuming all available topic listing memory permits, similar to how other memory limits could benefit from tenant isolation.
managedLedgerCacheSizeMB or similar