docs/design/jet/019-memory-management.md
Since: 5.0
The memory management is used for the operations that accumulate a (potentially large) number of records:
Without means to control memory consumption, these operations could
lead to OutOfMemoryErrors and destabilization of entire cluster.
Control very precisely (with byte level granularity) how much memory each operator uses at any given time. All member's operators share a memory pool they acquire from and release to.
Control number of records Processors can hold on to. The record might
mean different things for different operations - i.e. for sorting it's
an individual item, for grouping it's a distinct key. The limit applies
to each Processor instance separately, hence the effective limit of
records accumulated by each cluster member is influenced by the
vertex's localParallelism and the number of jobs in the cluster.
OutOfMemoryErrors due to variable object sizesWe've chosen the second option since it's simple. We might reconsider it if it turns out that current solution is not enough.
Allow configuring maxProcessorAccumulatedRecords for the member:
public class InstanceConfig {
public void setMaxProcessorAccumulatedRecords(long maxProcessorAccumulatedRecords) {
checkPositive(maxProcessorAccumulatedRecords, "maxProcessorAccumulatedRecords must be a positive number");
this.maxProcessorAccumulatedRecords = maxProcessorAccumulatedRecords;
}
}
jet:
instance:
max-processor-accumulated-records: 1000000000
<jet>
<max-processor-accumulated-records>1000000000</max-processor-accumulated-records>
</jet>
as well as for the job:
public class JobConfig implements IdentifiedDataSerializable {
public JobConfig setMaxProcessorAccumulatedRecords(long maxProcessorAccumulatedRecords) {
checkTrue(maxProcessorAccumulatedRecords > 0 || maxProcessorAccumulatedRecords == -1,
"maxProcessorAccumulatedRecords must be a positive number or -1");
this.maxProcessorAccumulatedRecords = maxProcessorAccumulatedRecords;
return this;
}
}
To not break backward compatibility the default value of
maxProcessorAccumulatedRecords for JobConfig is -1 and for
InstanceConfig it is Long.MAX_VALUE. JobConfig's value, if set,
has precedence over InstanceConfig's one.
maxProcessorAccumulatedRecords is accessible for Processors via
ProcessorMetaSupplier.Context.maxProcessorAccumulatedRecords().
Processors track number of accumulated records individually and throw
com.hazelcast.jet.impl.memory.AccumulationLimitExceededException if
limit is exceeded which fails the job.