docs/content.zh/docs/ops/metrics.md
Flink exposes a metric system that allows gathering and exposing metrics to external systems.
You can access the metric system from any user function that extends [RichFunction]({{< ref "docs/dev/datastream/user_defined_functions" >}}#rich-functions) by calling getRuntimeContext().getMetricGroup().
This method returns a MetricGroup object on which you can create and register new metrics.
Flink supports Counters, Gauges, Histograms and Meters.
A Counter is used to count something. The current value can be in- or decremented using inc()/inc(long n) or dec()/dec(long n).
You can create and register a Counter by calling counter(String name) on a MetricGroup.
{{< tabs "9612d275-bdda-4322-a01f-ae6da805e917" >}} {{< tab "Java" >}}
public class MyMapper extends RichMapFunction<String, String> {
private transient Counter counter;
@Override
public void open(OpenContext ctx) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter");
}
@Override
public String map(String value) throws Exception {
this.counter.inc();
return value;
}
}
{{< /tab >}} {{< tab "Python" >}}
class MyMapper(MapFunction):
def __init__(self):
self.counter = None
def open(self, runtime_context: RuntimeContext):
self.counter = runtime_context \
.get_metrics_group() \
.counter("my_counter")
def map(self, value: str):
self.counter.inc()
return value
{{< /tab >}} {{< /tabs >}}
Alternatively you can also use your own Counter implementation:
{{< tabs "e2de1ea4-fad3-4619-b4ba-fe41af1bd25f" >}} {{< tab "Java" >}}
public class MyMapper extends RichMapFunction<String, String> {
private transient Counter counter;
@Override
public void open(OpenContext ctx) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCustomCounter", new CustomCounter());
}
@Override
public String map(String value) throws Exception {
this.counter.inc();
return value;
}
}
{{< /tab >}} {{< tab "Python" >}}
Still not supported in Python API.
{{< /tab >}} {{< /tabs >}}
A Gauge provides a value of any type on demand. In order to use a Gauge you must first create a class that implements the org.apache.flink.metrics.Gauge interface.
There is no restriction for the type of the returned value.
You can register a gauge by calling gauge(String name, Gauge gauge) on a MetricGroup.
{{< tabs "1457e63d-28c4-4dbd-b742-582fe88706bf" >}} {{< tab "Java" >}}
public class MyMapper extends RichMapFunction<String, String> {
private transient int valueToExpose = 0;
@Override
public void open(OpenContext ctx) {
getRuntimeContext()
.getMetricGroup()
.gauge("MyGauge", new Gauge<Integer>() {
@Override
public Integer getValue() {
return valueToExpose;
}
});
}
@Override
public String map(String value) throws Exception {
valueToExpose++;
return value;
}
}
{{< /tab >}} {{< tab "Python" >}}
class MyMapper(MapFunction):
def __init__(self):
self.value_to_expose = 0
def open(self, runtime_context: RuntimeContext):
runtime_context \
.get_metrics_group() \
.gauge("my_gauge", lambda: self.value_to_expose)
def map(self, value: str):
self.value_to_expose += 1
return value
{{< /tab >}} {{< /tabs >}}
Note that reporters will turn the exposed object into a String, which means that a meaningful toString() implementation is required.
A Histogram measures the distribution of long values.
You can register one by calling histogram(String name, Histogram histogram) on a MetricGroup.
{{< tabs "f00bd80e-ce30-497c-aa1f-89f3b5f653a0" >}} {{< tab "Java" >}}
public class MyMapper extends RichMapFunction<Long, Long> {
private transient Histogram histogram;
@Override
public void open(OpenContext ctx) {
this.histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new MyHistogram());
}
@Override
public Long map(Long value) throws Exception {
this.histogram.update(value);
return value;
}
}
{{< /tab >}} {{< tab "Python" >}}
Still not supported in Python API.
{{< /tab >}} {{< /tabs >}}
Flink does not provide a default implementation for Histogram, but offers a {{< gh_link file="flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java" name="Wrapper" >}} that allows usage of Codahale/DropWizard histograms.
To use this wrapper add the following dependency in your pom.xml:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>{{< version >}}</version>
</dependency>
You can then register a Codahale/DropWizard histogram like this:
{{< tabs "bb87937e-afd3-40c3-9ef2-95bce0cbaeb7" >}} {{< tab "Java" >}}
public class MyMapper extends RichMapFunction<Long, Long> {
private transient Histogram histogram;
@Override
public void open(OpenContext ctx) {
com.codahale.metrics.Histogram dropwizardHistogram =
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
this.histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new DropwizardHistogramWrapper(dropwizardHistogram));
}
@Override
public Long map(Long value) throws Exception {
this.histogram.update(value);
return value;
}
}
{{< /tab >}} {{< tab "Python" >}}
Still not supported in Python API.
{{< /tab >}} {{< /tabs >}}
A Meter measures an average throughput. An occurrence of an event can be registered with the markEvent() method. Occurrence of multiple events at the same time can be registered with markEvent(long n) method.
You can register a meter by calling meter(String name, Meter meter) on a MetricGroup.
{{< tabs "39036212-06d1-4efe-bab3-d821aa11f6fe" >}} {{< tab "Java" >}}
public class MyMapper extends RichMapFunction<Long, Long> {
private transient Meter meter;
@Override
public void open(OpenContext ctx) {
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new MyMeter());
}
@Override
public Long map(Long value) throws Exception {
this.meter.markEvent();
return value;
}
}
{{< /tab >}} {{< tab "Python" >}}
class MyMapperMeter(MapFunction):
def __init__(self):
self.meter = None
def open(self, runtime_context: RuntimeContext):
# an average rate of events per second over 120s, default is 60s.
self.meter = runtime_context \
.get_metrics_group() \
.meter("my_meter", time_span_in_seconds=120)
def map(self, value: str):
self.meter.mark_event()
return value
{{< /tab >}} {{< /tabs >}}
Flink offers a {{< gh_link file="flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapper.java" name="Wrapper" >}} that allows usage of Codahale/DropWizard meters.
To use this wrapper add the following dependency in your pom.xml:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>{{< version >}}</version>
</dependency>
You can then register a Codahale/DropWizard meter like this:
{{< tabs "9cc57972-cf86-401e-a394-ee97efd816f2" >}} {{< tab "Java" >}}
public class MyMapper extends RichMapFunction<Long, Long> {
private transient Meter meter;
@Override
public void open(OpenContext ctx) {
com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter));
}
@Override
public Long map(Long value) throws Exception {
this.meter.markEvent();
return value;
}
}
{{< /tab >}} {{< tab "Python" >}}
Still not supported in Python API.
{{< /tab >}} {{< /tabs >}}
Every metric is assigned an identifier and a set of key-value pairs under which the metric will be reported.
The identifier is based on 3 components: a user-defined name when registering the metric, an optional user-defined scope and a system-provided scope.
For example, if A.B is the system scope, C.D the user scope and E the name, then the identifier for the metric will be A.B.C.D.E.
You can configure which delimiter to use for the identifier (default: .) by setting the metrics.scope.delimiter key in [Flink configuration file]({{< ref "docs/deployment/config#flink-配置文件" >}}).
You can define a user scope by calling MetricGroup#addGroup(String name), MetricGroup#addGroup(int name) or MetricGroup#addGroup(String key, String value).
These methods affect what MetricGroup#getMetricIdentifier and MetricGroup#getScopeComponents return.
{{< tabs "8ba6943e-ab5d-45ce-8a73-091a01370eaf" >}} {{< tab "Java" >}}
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetrics")
.counter("myCounter");
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter");
{{< /tab >}} {{< tab "Python" >}}
counter = runtime_context \
.get_metric_group() \
.add_group("my_metrics") \
.counter("my_counter")
counter = runtime_context \
.get_metric_group() \
.add_group("my_metrics_key", "my_metrics_value") \
.counter("my_counter")
{{< /tab >}} {{< /tabs >}}
The system scope contains context information about the metric, for example in which task it was registered or what job that task belongs to.
Which context information should be included can be configured by setting the following keys in [Flink configuration file]({{< ref "docs/deployment/config#flink-配置文件" >}}). Each of these keys expect a format string that may contain constants (e.g. "taskmanager") and variables (e.g. "<task_id>") which will be replaced at runtime.
metrics.scope.jm
metrics.scope.jm-job
metrics.scope.tm
metrics.scope.tm-job
metrics.scope.task
metrics.scope.operator
There are no restrictions on the number or order of variables. Variables are case sensitive.
The default scope for operator metrics will result in an identifier akin to localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric
If you also want to include the task name but omit the task manager information you can specify the following format:
metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>
This could create the identifier localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric.
Note that for this format string an identifier clash can occur should the same job be run multiple times concurrently, which can lead to inconsistent metric data. As such it is advised to either use format strings that provide a certain degree of uniqueness by including IDs (e.g <job_id>) or by assigning unique names to jobs and operators.
Important: For the Batch API, <operator_id> is always equal to <task_id>.
You can define a user variable by calling MetricGroup#addGroup(String key, String value).
This method affects what MetricGroup#getMetricIdentifier, MetricGroup#getScopeComponents and MetricGroup#getAllVariables() returns.
Important: User variables cannot be used in scope formats.
{{< tabs "66c0ba7f-adc3-4a8b-831f-b0126ea2de81" >}} {{< tab "Java" >}}
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter");
{{< /tab >}} {{< tab "Python" >}}
counter = runtime_context
.get_metric_group() \
.add_group("my_metrics_key", "my_metrics_value") \
.counter("my_counter")
{{< /tab >}} {{< /tabs >}}
You can define custom variables that will be assigned to all metrics reported by a given operator
using Transformation.addMetricVariable. For example:
{{< tabs "32c0ba7f-3acd-831f-4a8b-a2de81b0126e" >}} {{< tab "Java" >}}
fooSource =
execEnv.fromSource(
kafkaSource,
getWatermarkStrategy(),
"KafkaSource-Foo")
.addMetricVariable("table_name", "Foo");
barSource =
execEnv.fromSource(
kafkaSource,
getWatermarkStrategy(),
"KafkaSource-Bar")
.addMetricVariable("table_name", "Bar");
{{< /tab >}} {{< /tabs >}}
Will assign table_name variable with respective values Foo and Bar
to all metrics reported by the KafkaSource, like numRecordsOut or currentOutputWatermark.
If supported by your chosen metric reporter, those additional variables will be then converted to
labels or tags.
For information on how to set up Flink's metric reporters please take a look at the [metric reporters documentation]({{< ref "docs/deployment/metric_reporters" >}}).
By default Flink gathers several metrics that provide deep insights on the current state. This section is a reference of all these metrics.
The tables below generally feature 5 columns:
The "Scope" column describes which scope format is used to generate the system scope. For example, if the cell contains "Operator" then the scope format for "metrics.scope.operator" is used. If the cell contains multiple values, separated by a slash, then the metrics are reported multiple times for different entities, like for both job- and taskmanagers.
The (optional)"Infix" column describes which infix is appended to the system scope.
The "Metrics" column lists the names of all metrics that are registered for the given scope and infix.
The "Description" column provides information as to what a given metric is measuring.
The "Type" column describes which metric type is used for the measurement.
Note that all dots in the infix/metric name columns are still subject to the "metrics.delimiter" setting.
Thus, in order to infer the metric identifier:
The memory-related metrics require Oracle's memory management (also included in OpenJDK's Hotspot implementation) to be in place. Some metrics might not be exposed when using other JVM implementations (e.g. IBM's J9).
<table class="table table-bordered"> <thead> <tr> <th class="text-left" style="width: 18%">Scope</th> <th class="text-left" style="width: 22%">Infix</th> <th class="text-left" style="width: 20%">Metrics</th> <th class="text-left" style="width: 32%">Description</th> <th class="text-left" style="width: 8%">Type</th> </tr> </thead> <tbody> <tr> <th rowspan="17"><strong>Job-/TaskManager</strong></th> <td rowspan="15">Status.JVM.Memory</td> <td>Heap.Used</td> <td>The amount of heap memory currently used (in bytes).</td> <td>Gauge</td> </tr> <tr> <td>Heap.Committed</td> <td>The amount of heap memory guaranteed to be available to the JVM (in bytes).</td> <td>Gauge</td> </tr> <tr> <td>Heap.Max</td> <td>The maximum amount of heap memory that can be used for memory management (in bytes). This value might not be necessarily equal to the maximum value specified through -Xmx or
the equivalent Flink configuration parameter. Some GC algorithms allocate heap memory that won't
be available to the user code and, therefore, not being exposed through the heap metrics.</td>
<td>Gauge</td>
</tr>
<tr>
<td>NonHeap.Used</td>
<td>The amount of non-heap memory currently used (in bytes).</td>
<td>Gauge</td>
</tr>
<tr>
<td>NonHeap.Committed</td>
<td>The amount of non-heap memory guaranteed to be available to the JVM (in bytes).</td>
<td>Gauge</td>
</tr>
<tr>
<td>NonHeap.Max</td>
<td>The maximum amount of non-heap memory that can be used for memory management (in bytes).</td>
<td>Gauge</td>
</tr>
<tr>
<td>Metaspace.Used</td>
<td>The amount of memory currently used in the Metaspace memory pool (in bytes).</td>
<td>Gauge</td>
</tr>
<tr>
<td>Metaspace.Committed</td>
<td>The amount of memory guaranteed to be available to the JVM in the Metaspace memory pool (in bytes).</td>
<td>Gauge</td>
</tr>
<tr>
<td>Metaspace.Max</td>
<td>The maximum amount of memory that can be used in the Metaspace memory pool (in bytes).</td>
<td>Gauge</td>
</tr>
<tr>
<td>Direct.Count</td>
<td>The number of buffers in the direct buffer pool.</td>
<td>Gauge</td>
</tr>
<tr>
<td>Direct.MemoryUsed</td>
<td>The amount of memory used by the JVM for the direct buffer pool (in bytes).</td>
<td>Gauge</td>
</tr>
<tr>
<td>Direct.TotalCapacity</td>
<td>The total capacity of all buffers in the direct buffer pool (in bytes).</td>
<td>Gauge</td>
</tr>
<tr>
<td>Mapped.Count</td>
<td>The number of buffers in the mapped buffer pool.</td>
<td>Gauge</td>
</tr>
<tr>
<td>Mapped.MemoryUsed</td>
<td>The amount of memory used by the JVM for the mapped buffer pool (in bytes).</td>
<td>Gauge</td>
</tr>
<tr>
<td>Mapped.TotalCapacity</td>
<td>The number of buffers in the mapped buffer pool (in bytes).</td>
<td>Gauge</td>
</tr>
<tr>
<td rowspan="2">Status.Flink.Memory</td>
<td>Managed.Used</td>
<td>The amount of managed memory currently used.</td>
<td>Gauge</td>
</tr>
<tr>
<td>Managed.Total</td>
<td>The total amount of managed memory.</td>
<td>Gauge</td>
</tr>
{{< hint warning >}} Deprecated: use Default shuffle service metrics {{< /hint >}}
<table class="table table-bordered"> <thead> <tr> <th class="text-left" style="width: 18%">Scope</th> <th class="text-left" style="width: 22%">Infix</th> <th class="text-left" style="width: 22%">Metrics</th> <th class="text-left" style="width: 30%">Description</th> <th class="text-left" style="width: 8%">Type</th> </tr> </thead> <tbody> <tr> <th rowspan="2"><strong>TaskManager</strong></th> <td rowspan="2">Status.Network</td> <td>AvailableMemorySegments</td> <td>The number of unused memory segments.</td> <td>Gauge</td> </tr> <tr> <td>TotalMemorySegments</td> <td>The number of allocated memory segments.</td> <td>Gauge</td> </tr> <tr> <th rowspan="10">Task</th> <td rowspan="6">buffers</td> <td>inputQueueLength</td> <td>The number of queued input buffers. (ignores LocalInputChannels which are using blocking subpartitions)</td> <td>Gauge</td> </tr> <tr> <td>outputQueueLength</td> <td>The number of queued output buffers.</td> <td>Gauge</td> </tr> <tr> <td>inPoolUsage</td> <td>An estimate of the input buffers usage. (ignores LocalInputChannels)</td> <td>Gauge</td> </tr> <tr> <td>inputFloatingBuffersUsage</td> <td>An estimate of the floating input buffers usage. (ignores LocalInputChannels)</td> <td>Gauge</td> </tr> <tr> <td>inputExclusiveBuffersUsage</td> <td>An estimate of the exclusive input buffers usage. (ignores LocalInputChannels)</td> <td>Gauge</td> </tr> <tr> <td>outPoolUsage</td> <td>An estimate of the output buffers usage. The pool usage can be > 100% if overdraft buffers are being used.</td> <td>Gauge</td> </tr> <tr> <td rowspan="4">Network.<Input|Output>.<gate|partition> <strong>(only available if <tt>taskmanager.network.detailed-metrics</tt> config option is set)</strong></td>
<td>totalQueueLen</td>
<td>Total number of queued buffers in all input/output channels.</td>
<td>Gauge</td>
</tr>
<tr>
<td>minQueueLen</td>
<td>Minimum number of queued buffers in all input/output channels.</td>
<td>Gauge</td>
</tr>
<tr>
<td>maxQueueLen</td>
<td>Maximum number of queued buffers in all input/output channels.</td>
<td>Gauge</td>
</tr>
<tr>
<td>avgQueueLen</td>
<td>Average number of queued buffers in all input/output channels.</td>
<td>Gauge</td>
</tr>
Metrics related to data exchange between task executors using netty network communication.
<table class="table table-bordered"> <thead> <tr> <th class="text-left" style="width: 18%">Scope</th> <th class="text-left" style="width: 22%">Infix</th> <th class="text-left" style="width: 22%">Metrics</th> <th class="text-left" style="width: 30%">Description</th> <th class="text-left" style="width: 8%">Type</th> </tr> </thead> <tbody> <tr> <th rowspan="7"><strong>TaskManager</strong></th> <td rowspan="7">Status.Shuffle.Netty</td> <td>AvailableMemorySegments</td> <td>The number of unused memory segments.</td> <td>Gauge</td> </tr> <tr> <td>UsedMemorySegments</td> <td>The number of used memory segments.</td> <td>Gauge</td> </tr> <tr> <td>TotalMemorySegments</td> <td>The number of allocated memory segments.</td> <td>Gauge</td> </tr> <tr> <td>AvailableMemory</td> <td>The amount of unused memory in bytes.</td> <td>Gauge</td> </tr> <tr> <td>UsedMemory</td> <td>The amount of used memory in bytes.</td> <td>Gauge</td> </tr> <tr> <td>TotalMemory</td> <td>The amount of allocated memory in bytes.</td> <td>Gauge</td> </tr> <tr> <td>RequestedMemoryUsage</td> <td>Experimental: The usage of the network memory. Shows (as percentage) the total amount of requested memory from all of the subtasks. It can exceed 100% as not all requested memory is required for subtask to make progress. However if usage exceeds 100% throughput can suffer greatly and please consider increasing available network memory, or decreasing configured size of network buffer pools.</td> <td>Gauge</td> </tr> <tr> <th rowspan="20">Task</th> <td rowspan="5">Shuffle.Netty.Input.Buffers</td> <td>inputQueueLength</td> <td>The number of queued input buffers.</td> <td>Gauge</td> </tr> <tr> <td>inputQueueSize</td> <td>The real size of queued input buffers in bytes. The size for local input channels is always `0` since the local channel takes records directly from the output queue.</td> <td>Gauge</td> </tr> <tr> <td>inPoolUsage</td> <td>An estimate of the input buffers usage. (ignores LocalInputChannels)</td> <td>Gauge</td> </tr> <tr> <td>inputFloatingBuffersUsage</td> <td>An estimate of the floating input buffers usage. (ignores LocalInputChannels)</td> <td>Gauge</td> </tr> <tr> <td>inputExclusiveBuffersUsage</td> <td>An estimate of the exclusive input buffers usage. (ignores LocalInputChannels)</td> <td>Gauge</td> </tr> <tr> <td rowspan="3">Shuffle.Netty.Output.Buffers</td> <td>outputQueueLength</td> <td>The number of queued output buffers.</td> <td>Gauge</td> </tr> <tr> <td>outputQueueSize</td> <td>The real size of queued output buffers in bytes.</td> <td>Gauge</td> </tr> <tr> <td>outPoolUsage</td> <td>An estimate of the output buffers usage. The pool usage can be > 100% if overdraft buffers are being used.</td> <td>Gauge</td> </tr> <tr> <td rowspan="4">Shuffle.Netty.<Input|Output>.<gate|partition> <strong>(only available if <tt>taskmanager.network.detailed-metrics</tt> config option is set)</strong></td>
<td>totalQueueLen</td>
<td>Total number of queued buffers in all input/output channels.</td>
<td>Gauge</td>
</tr>
<tr>
<td>minQueueLen</td>
<td>Minimum number of queued buffers in all input/output channels.</td>
<td>Gauge</td>
</tr>
<tr>
<td>maxQueueLen</td>
<td>Maximum number of queued buffers in all input/output channels.</td>
<td>Gauge</td>
</tr>
<tr>
<td>avgQueueLen</td>
<td>Average number of queued buffers in all input/output channels.</td>
<td>Gauge</td>
</tr>
<tr>
<td rowspan="8">Shuffle.Netty.Input</td>
<td>numBytesInLocal</td>
<td>The total number of bytes this task has read from a local source.</td>
<td>Counter</td>
</tr>
<tr>
<td>numBytesInLocalPerSecond</td>
<td>The number of bytes this task reads from a local source per second.</td>
<td>Meter</td>
</tr>
<tr>
<td>numBytesInRemote</td>
<td>The total number of bytes this task has read from a remote source.</td>
<td>Counter</td>
</tr>
<tr>
<td>numBytesInRemotePerSecond</td>
<td>The number of bytes this task reads from a remote source per second.</td>
<td>Meter</td>
</tr>
<tr>
<td>numBuffersInLocal</td>
<td>The total number of network buffers this task has read from a local source.</td>
<td>Counter</td>
</tr>
<tr>
<td>numBuffersInLocalPerSecond</td>
<td>The number of network buffers this task reads from a local source per second.</td>
<td>Meter</td>
</tr>
<tr>
<td>numBuffersInRemote</td>
<td>The total number of network buffers this task has read from a remote source.</td>
<td>Counter</td>
</tr>
<tr>
<td>numBuffersInRemotePerSecond</td>
<td>The number of network buffers this task reads from a remote source per second.</td>
<td>Meter</td>
</tr>
The metrics in this table are available for each of the following job states: INITIALIZING, CREATED, RUNNING, RESTARTING, CANCELLING, FAILING. Whether these metrics are reported depends on the [metrics.job.status.enable]({{< ref "docs/deployment/config" >}}#metrics-job-status-enable) setting.
<span class="label label-info">Evolving</span> The semantics of these metrics may change in later releases.
<table class="table table-bordered"> <thead> <tr> <th class="text-left" style="width: 18%">Scope</th> <th class="text-left" style="width: 26%">Metrics</th> <th class="text-left" style="width: 48%">Description</th> <th class="text-left" style="width: 8%">Type</th> </tr> </thead> <tbody> <tr> <th rowspan="3"><strong>Job (only available on JobManager)</strong></th> <td><jobStatus>State</td> <td>For a given state, return 1 if the job is currently in that state, otherwise return 0.</td> <td>Gauge</td> </tr> <tr> <td><jobStatus>Time</td> <td>For a given state, if the job is currently in that state, return the time (in milliseconds) since the job transitioned into that state, otherwise return 0.</td> <td>Gauge</td> </tr> <tr> <td><jobStatus>TimeTotal</td> <td>For a given state, return how much time (in milliseconds) the job has spent in that state in total.</td> <td>Gauge</td> </tr> </tbody> </table>{{< hint info >}} <span class="label label-info">Experimental</span>
While the job is in the RUNNING state the metrics in this table provide additional details on what the job is currently doing. Whether these metrics are reported depends on the [metrics.job.status.enable]({{< ref "docs/deployment/config" >}}#metrics-job-status-enable) setting.
<table class="table table-bordered table-inline"> <thead> <tr> <th class="text-left" style="width: 18%">Scope</th> <th class="text-left" style="width: 26%">Metrics</th> <th class="text-left" style="width: 48%">Description</th> <th class="text-left" style="width: 8%">Type</th> </tr> </thead> <tbody> <tr> <th rowspan="3"><strong>Job (only available on JobManager)</strong></th> <td>deployingState</td> <td>Return 1 if the job is currently deploying* tasks, otherwise return 0.</td> <td>Gauge</td> </tr> <tr> <td>deployingTime</td> <td>Return the time (in milliseconds) since the job has started deploying* tasks, otherwise return 0.</td> <td>Gauge</td> </tr> <tr> <td>deployingTimeTotal</td> <td>Return how much time (in milliseconds) the job has spent deploying* tasks in total.</td> <td>Gauge</td> </tr> </tbody> </table>*A job is considered to be deploying tasks when:
{
Note that for failed checkpoints, metrics are updated on a best efforts basis and may be not accurate.
<table class="table table-bordered"> <thead> <tr> <th class="text-left" style="width: 18%">Scope</th> <th class="text-left" style="width: 26%">Metrics</th> <th class="text-left" style="width: 48%">Description</th> <th class="text-left" style="width: 8%">Type</th> </tr> </thead> <tbody> <tr> <th rowspan="10"><strong>Job (only available on JobManager)</strong></th> <td>lastCheckpointDuration</td> <td>The time it took to complete the last checkpoint (in milliseconds).</td> <td>Gauge</td> </tr> <tr> <td>lastCheckpointSize</td> <td>The checkpointed size of the last checkpoint (in bytes), this metric could be different from lastCheckpointFullSize if incremental checkpoint or changelog is enabled.</td> <td>Gauge</td> </tr> <tr> <td>lastCompletedCheckpointId</td> <td>The identifier of the last completed checkpoint.</td> <td>Gauge</td> </tr> <tr> <td>lastCheckpointCompletedTimestamp</td> <td>The timestamp of the last completed checkpoint (in milliseconds).</td> <td>Gauge</td> </tr> <tr> <td>lastCheckpointFullSize</td> <td>The full size of the last checkpoint (in bytes).</td> <td>Gauge</td> </tr> <tr> <td>lastCheckpointMetadataSize</td> <td>The metadata file size of the last checkpoint (in bytes).</td> <td>Gauge</td> </tr> <tr> <td>lastCheckpointExternalPath</td> <td>The path where the last external checkpoint was stored.</td> <td>Gauge</td> </tr> <tr> <td>lastCheckpointRestoreTimestamp</td> <td>Timestamp when the last checkpoint was restored at the coordinator (in milliseconds).</td> <td>Gauge</td> </tr> <tr> <td>numberOfInProgressCheckpoints</td> <td>The number of in progress checkpoints.</td> <td>Gauge</td> </tr> <tr> <td>numberOfCompletedCheckpoints</td> <td>The number of successfully completed checkpoints.</td> <td>Gauge</td> </tr> <tr> <td>numberOfFailedCheckpoints</td> <td>The number of failed checkpoints.</td> <td>Gauge</td> </tr> <tr> <td>totalNumberOfCheckpoints</td> <td>The number of total checkpoints (in progress, completed, failed).</td> <td>Gauge</td> </tr> <tr> <th rowspan="2"><strong>Task</strong></th> <td>checkpointAlignmentTime</td> <td>The time in nanoseconds that the last barrier alignment took to complete, or how long the current alignment has taken so far (in nanoseconds). This is the time between receiving first and the last checkpoint barrier. You can find more information in the [Monitoring State and Checkpoints section]({{< ref "docs/ops/state/large_state_tuning" >}}#monitoring-state-and-checkpoints)</td> <td>Gauge</td> </tr> <tr> <td>checkpointStartDelayNanos</td> <td>The time in nanoseconds that elapsed between the creation of the last checkpoint and the time when the checkpointing process has started by this Task. This delay shows how long it takes for the first checkpoint barrier to reach the task. A high value indicates back-pressure. If only a specific task has a long start delay, the most likely reason is data skew.</td> <td>Gauge</td> </tr> <tr> <th rowspan="4"><strong>Job (only available on TaskManager)</strong></th> <td>fileMerging.logicalFileCount</td> <td>The number of logical files of file merging mechanism.</td> <td>Gauge</td> </tr> <tr> <td>fileMerging.logicalFileSize</td> <td>The total size of logical files of file merging mechanism on one task manager for one job.</td> <td>Gauge</td> </tr> <tr> <td>fileMerging.physicalFileCount</td> <td>The number of physical files of file merging mechanism.</td> <td>Gauge</td> </tr> <tr> <td>fileMerging.physicalFileSize</td> <td>The total size of physical files of file merging mechanism on one task manager for one job, usually larger than <samp>fileMerging.logicalFileSize</samp>.</td> <td>Gauge</td> </tr> </tbody> </table>Certain RocksDB native metrics are available but disabled by default, you can find full documentation [here]({{< ref "docs/deployment/config" >}}#rocksdb-native-metrics)
Certain ForSt native metrics are available but disabled by default, you can find full documentation [here]({{< ref "docs/deployment/config" >}}#forst-native-metrics)
Besides that, we support the following metrics:
<table class="table table-bordered"> <thead> <tr> <th class="text-left" style="width: 15%">Scope</th> <th class="text-left" style="width: 15%">Infix</th> <th class="text-left" style="width: 15%">Metrics</th> <th class="text-left" style="width: 50%">Description</th> <th class="text-left" style="width: 5%">Type</th> </tr> </thead> <tbody> <tr> <th rowspan="4"><strong>Task/Operator</strong></th> <td rowspan="4">forst.fileCache</td> <td>hit</td> <td>The hit count of ForSt state backend cache.</td> <td>Counter</td> </tr> <tr> <td>miss</td> <td>The miss count of ForSt state backend cache.</td> <td>Counter</td> </tr> <tr> <td>usedBytes</td> <td>The bytes cached in ForSt state backend cache.</td> <td>Gauge</td> </tr> <tr> <td>remainingBytes</td> <td>The remaining space in the volume for the configured cache. Only available when 'state.backend.forst.cache.reserve-size' is set above 0. </td> <td>Gauge</td> </tr> <tr> <td>lru.evict</td> <td>The number of cache files that are evicted from LRU.</td> <td>Gauge</td> </tr> <tr> <td>lru.loadback</td> <td>The number of cache files that are loaded back from remote storage into the LRU. </td> <td>Gauge</td> </tr> </tbody> </table>Note that the metrics are only available via reporters.
<table class="table table-bordered"> <thead> <tr> <th class="text-left" style="width: 18%">Scope</th> <th class="text-left" style="width: 26%">Metrics</th> <th class="text-left" style="width: 48%">Description</th> <th class="text-left" style="width: 8%">Type</th> </tr> </thead> <tbody> <tr> <th rowspan="8"><strong>Job (only available on TaskManager)</strong></th> <td>numberOfUploadRequests</td> <td>Total number of upload requests made</td> <td>Counter</td> </tr> <tr> <td>numberOfUploadFailures</td> <td>Total number of failed upload requests (request may be retried after the failure)</td> <td>Counter</td> </tr> <tr> <td>attemptsPerUpload</td> <td>The number of attempts per upload</td> <td>Histogram</td> </tr> <tr> <td>totalAttemptsPerUpload</td> <td>The total count distributions of attempts for per upload</td> <td>Histogram</td> </tr> <tr> <td>uploadBatchSizes</td> <td>The number of upload tasks (coming from one or more writers, i.e. backends/tasks) that were grouped together and form a single upload resulting in a single file</td> <td>Histogram</td> </tr> <tr> <td>uploadLatenciesNanos</td> <td>The latency distributions of uploads</td> <td>Histogram</td> </tr> <tr> <td>uploadSizes</td> <td>The size distributions of uploads</td> <td>Histogram</td> </tr> <tr> <td>uploadQueueSize</td> <td>Current size of upload queue. Queue items can be packed together and form a single upload.</td> <td>Gauge</td> </tr> <tr> <th rowspan="8"><strong>Task/Operator</strong></th> <td>startedMaterialization</td> <td>The number of started materializations.</td> <td>Counter</td> </tr> <tr> <td>completedMaterialization</td> <td>The number of successfully completed materializations.</td> <td>Counter</td> </tr> <tr> <td>failedMaterialization</td> <td>The number of failed materializations.</td> <td>Counter</td> </tr> <tr> <td>lastDurationOfMaterialization</td> <td>The duration of the last materialization (in milliseconds).</td> <td>Gauge</td> </tr> <tr> <td>lastFullSizeOfMaterialization</td> <td>The full size of the materialization part of the last reported checkpoint (in bytes).</td> <td>Gauge</td> </tr> <tr> <td>lastIncSizeOfMaterialization</td> <td>The incremental size of the materialization part of the last reported checkpoint (in bytes).</td> <td>Gauge</td> </tr> <tr> <td>lastFullSizeOfNonMaterialization</td> <td>The full size of the non-materialization part of the last reported checkpoint (in bytes).</td> <td>Gauge</td> </tr> <tr> <td>lastIncSizeOfNonMaterialization</td> <td>The incremental size of the non-materialization part of the last reported checkpoint (in bytes).</td> <td>Gauge</td> </tr> </tbody> </table>System resources reporting is disabled by default. When metrics.system-resource
is enabled additional metrics listed below will be available on Job- and TaskManager.
System resources metrics are updated periodically and they present average values for a
configured interval (metrics.system-resource-probing-interval).
System resources reporting requires an optional dependency to be present on the
classpath (for example placed in Flink's lib directory):
com.github.oshi:oshi-core:6.1.5 (licensed under MIT license)Including it's transitive dependencies:
net.java.dev.jna:jna-platform:jar:5.10.0net.java.dev.jna:jna:jar:5.10.0Failures in this regard will be reported as warning messages like NoClassDefFoundError
logged by SystemResourcesMetricsInitializer during the startup.
以下指标可以用来衡量预测执行的有效性。
<table class="table table-bordered"> <thead> <tr> <th class="text-left" style="width: 18%">Scope</th> <th class="text-left" style="width: 26%">Metrics</th> <th class="text-left" style="width: 48%">Description</th> <th class="text-left" style="width: 8%">Type</th> </tr> </thead> <tbody> <tr> <th rowspan="2"><strong>Job (only available on JobManager)</strong></th> <td>numSlowExecutionVertices</td> <td>当前的慢执行节点数量。</td> <td>Gauge</td> </tr> <tr> <td>numEffectiveSpeculativeExecutions</td> <td>有效的预测执行数量,即比初始执行实例更早结束的预测执行实例的数量。</td> <td>Counter</td> </tr> </tbody> </table>Flink allows to track the latency of records travelling through the system. This feature is disabled by default.
To enable the latency tracking you must set the latencyTrackingInterval to a positive number in either the
[Flink configuration]({{< ref "docs/deployment/config" >}}#metrics-latency-interval) or ExecutionConfig.
At the latencyTrackingInterval, the sources will periodically emit a special record, called a LatencyMarker.
The marker contains a timestamp from the time when the record has been emitted at the sources.
Latency markers can not overtake regular user records, thus if records are queuing up in front of an operator,
it will add to the latency tracked by the marker.
Note that the latency markers are not accounting for the time user records spend in operators as they are bypassing them. In particular the markers are not accounting for the time records spend for example in window buffers. Only if operators are not able to accept new records, thus they are queuing up, the latency measured using the markers will reflect that.
The LatencyMarkers are used to derive a distribution of the latency between the sources of the topology and each
downstream operator. These distributions are reported as histogram metrics. The granularity of these distributions can
be controlled in the [Flink configuration]({{< ref "docs/deployment/config" >}}#metrics-latency-interval). For the highest
granularity subtask Flink will derive the latency distribution between every source subtask and every downstream
subtask, which results in quadratic (in the terms of the parallelism) number of histograms.
Currently, Flink assumes that the clocks of all machines in the cluster are in sync. We recommend setting up an automated clock synchronisation service (like NTP) to avoid false latency results.
<span class="label label-danger">Warning</span> Enabling latency metrics can significantly impact the performance
of the cluster (in particular for subtask granularity). It is highly recommended to only use them for debugging
purposes.
Flink also allows to track the keyed state access latency for standard Flink state-backends or customized state backends which extending from AbstractStateBackend. This feature is disabled by default.
To enable this feature you must set the state.latency-track.keyed-state-enabled to true in the [Flink configuration]({{< ref "docs/deployment/config" >}}#state-backends-latency-tracking-options).
Once tracking keyed state access latency is enabled, Flink will sample the state access latency every N access, in which N is defined by state.latency-track.sample-interval.
This configuration has a default value of 100. A smaller value will get more accurate results but have a higher performance impact since it is sampled more frequently.
As the type of this latency metrics is histogram, state.latency-track.history-size will control the maximum number of recorded values in history, which has the default value of 128.
A larger value of this configuration will require more memory, but will provide a more accurate result.
<span class="label label-danger">Warning</span> Enabling state-access-latency metrics may impact the performance. It is recommended to only use them for debugging purposes.
Flink also allows to track the keyed state key/value size for standard Flink state-backends or customized state backends which extending from AbstractStateBackend. This feature is disabled by default.
To enable this feature you must set the state.size-track.keyed-state-enabled to true in the [Flink configuration]({{< ref "docs/deployment/config" >}}#state-backends-size-tracking-options).
Once tracking keyed state key/value size is enabled, Flink will sample the state size every N access, in which N is defined by state.size-track.sample-interval.
This configuration has a default value of 100. A smaller value will get more accurate results but have a higher performance impact since it is sampled more frequently.
As the type of this key/value size metrics is histogram, state.size-track.history-size will control the maximum number of recorded values in history, which has the default value of 128.
A larger value of this configuration will require more memory, but will provide a more accurate result.
<span class="label label-danger">Warning</span> Enabling state-size metrics may impact the performance. It is recommended to only use them for debugging purposes. If state.ttl is enabled, the size of the value will include the size of the TTL-related timestamp. The value size of AggregatingState is not accounted for because AggregatingState returns a result processed by a user-defined AggregateFunction, whereas currently, only the actual stored data size in the state can be tracked.
Metrics can be queried through the [Monitoring REST API]({{< ref "docs/ops/rest_api" >}}).
Below is a list of available endpoints, with a sample JSON response. All endpoints are of the sample form http://hostname:8081/jobmanager/metrics, below we list only the path part of the URLs.
Values in angle brackets are variables, for example http://hostname:8081/jobs/<jobid>/metrics will have to be requested for example as http://hostname:8081/jobs/7684be6004e4e955c2a558a9bc463f65/metrics.
Request metrics for a specific entity:
/jobmanager/metrics/taskmanagers/<taskmanagerid>/metrics/jobs/<jobid>/metrics/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtaskindex>Request metrics aggregated across all entities of the respective type:
/taskmanagers/metrics/jobs/metrics/jobs/<jobid>/vertices/<vertexid>/subtasks/metrics/jobs/<jobid>/vertices/<vertexid>/jm-operator-metricsRequest metrics aggregated over a subset of all entities of the respective type:
/taskmanagers/metrics?taskmanagers=A,B,C/jobs/metrics?jobs=D,E,F/jobs/<jobid>/vertices/<vertexid>/subtasks/metrics?subtask=1,2,3<span class="label label-danger">Warning</span> Metric names can contain special characters that you need to escape when querying metrics.
For example, "a_+_b" would be escaped to "a_%2B_b".
List of characters that should be escaped:
<table class="table table-bordered"> <thead> <tr> <th>Character</th> <th>Escape Sequence</th> </tr> </thead> <tbody> <tr> <td>#</td> <td>%23</td> </tr> <tr> <td>$</td> <td>%24</td> </tr> <tr> <td>&</td> <td>%26</td> </tr> <tr> <td>+</td> <td>%2B</td> </tr> <tr> <td>/</td> <td>%2F</td> </tr> <tr> <td>;</td> <td>%3B</td> </tr> <tr> <td>=</td> <td>%3D</td> </tr> <tr> <td>?</td> <td>%3F</td> </tr> <tr> <td>@</td> <td>%40</td> </tr> </tbody> </table>Request a list of available metrics:
GET /jobmanager/metrics
[
{
"id": "metric1"
},
{
"id": "metric2"
}
]
Request the values for specific (unaggregated) metrics:
GET taskmanagers/ABCDE/metrics?get=metric1,metric2
[
{
"id": "metric1",
"value": "34"
},
{
"id": "metric2",
"value": "2"
}
]
Request aggregated values for specific metrics:
GET /taskmanagers/metrics?get=metric1,metric2
[
{
"id": "metric1",
"min": 1,
"max": 34,
"avg": 15,
"sum": 45
},
{
"id": "metric2",
"min": 2,
"max": 14,
"avg": 7,
"sum": 16
}
]
Request specific aggregated values for specific metrics:
GET /taskmanagers/metrics?get=metric1,metric2&agg=min,max
[
{
"id": "metric1",
"min": 1,
"max": 34
},
{
"id": "metric2",
"min": 2,
"max": 14
}
]
Metrics that were gathered for each task or operator can also be visualized in the Dashboard. On the main page for a
job, select the Metrics tab. After selecting one of the tasks in the top graph you can select metrics to display using
the Add Metric drop-down menu.
<subtask_index>.<metric_name>.<subtask_index>.<operator_name>.<metric_name>.Each metric will be visualized as a separate graph, with the x-axis representing time and the y-axis the measured value. All graphs are automatically updated every 10 seconds, and continue to do so when navigating to another page.
There is no limit as to the number of visualized metrics; however only numeric metrics can be visualized.
{{< top >}}