Back to Fluent Bit

Statistics

lib/librdkafka-2.10.1/STATISTICS.md

5.0.420.6 KB
Original Source

Statistics

librdkafka may be configured to emit internal metrics at a fixed interval by setting the statistics.interval.ms configuration property to a value > 0 and registering a stats_cb (or similar, depending on language).

The stats are provided as a JSON object string.

Note: The metrics returned may not be completely consistent between brokers, toppars and totals, due to the internal asynchronous nature of librdkafka. E.g., the top level tx total may be less than the sum of the broker tx values which it represents.

General structure

All fields that contain sizes are in bytes unless otherwise noted.

{
 <Top-level fields>
 "brokers": {
    <brokers fields>,
    "toppars": { <toppars fields> }
 },
 "topics": {
   <topic fields>,
   "partitions": {
     <partitions fields>
   }
 }
[, "cgrp": { <cgrp fields> } ]
[, "eos": { <eos fields> } ]
}

Field type

Fields are represented as follows:

  • string - UTF8 string.
  • int - Integer counter (64 bits wide). Ever increasing.
  • int gauge - Integer gauge (64 bits wide). Will be reset to 0 on each stats emit.
  • object - Nested JSON object.
  • bool - true or false.

Top-level

FieldTypeExampleDescription
namestring"rdkafka#producer-1"Handle instance name
client_idstring"rdkafka"The configured (or default) client.id
typestring"producer"Instance type (producer or consumer)
tsint12345678912345librdkafka's internal monotonic clock (microseconds)
timeintWall clock time in seconds since the epoch
ageintTime since this client instance was created (microseconds)
replyqint gaugeNumber of ops (callbacks, events, etc) waiting in queue for application to serve with rd_kafka_poll()
msg_cntint gaugeCurrent number of messages in producer queues
msg_sizeint gaugeCurrent total size of messages in producer queues
msg_maxintThreshold: maximum number of messages allowed allowed on the producer queues
msg_size_maxintThreshold: maximum total size of messages allowed on the producer queues
txintTotal number of requests sent to Kafka brokers
tx_bytesintTotal number of bytes transmitted to Kafka brokers
rxintTotal number of responses received from Kafka brokers
rx_bytesintTotal number of bytes received from Kafka brokers
txmsgsintTotal number of messages transmitted (produced) to Kafka brokers
txmsg_bytesintTotal number of message bytes (including framing, such as per-Message framing and MessageSet/batch framing) transmitted to Kafka brokers
rxmsgsintTotal number of messages consumed, not including ignored messages (due to offset, etc), from Kafka brokers.
rxmsg_bytesintTotal number of message bytes (including framing) received from Kafka brokers
simple_cntint gaugeInternal tracking of legacy vs new consumer API state
metadata_cache_cntint gaugeNumber of topics in the metadata cache.
brokersobjectDict of brokers, key is broker name, value is object. See brokers below
topicsobjectDict of topics, key is topic name, value is object. See topics below
cgrpobjectConsumer group metrics. See cgrp below
eosobjectEOS / Idempotent producer state and metrics. See eos below

brokers

Per broker statistics.

FieldTypeExampleDescription
namestring"example.com:9092/13"Broker hostname, port and broker id
nodeidint13Broker id (-1 for bootstraps)
nodenamestring"example.com:9092"Broker hostname
sourcestring"configured"Broker source (learned, configured, internal, logical)
statestring"UP"Broker state (INIT, DOWN, CONNECT, AUTH, APIVERSION_QUERY, AUTH_HANDSHAKE, UP, UPDATE)
stateageint gaugeTime since last broker state change (microseconds)
outbuf_cntint gaugeNumber of requests awaiting transmission to broker
outbuf_msg_cntint gaugeNumber of messages awaiting transmission to broker
waitresp_cntint gaugeNumber of requests in-flight to broker awaiting response
waitresp_msg_cntint gaugeNumber of messages in-flight to broker awaiting response
txintTotal number of requests sent
txbytesintTotal number of bytes sent
txerrsintTotal number of transmission errors
txretriesintTotal number of request retries
txidleintMicroseconds since last socket send (or -1 if no sends yet for current connection).
req_timeoutsintTotal number of requests timed out
rxintTotal number of responses received
rxbytesintTotal number of bytes received
rxerrsintTotal number of receive errors
rxcorriderrsintTotal number of unmatched correlation ids in response (typically for timed out requests)
rxpartialintTotal number of partial MessageSets received. The broker may return partial responses if the full MessageSet could not fit in the remaining Fetch response size.
rxidleintMicroseconds since last socket receive (or -1 if no receives yet for current connection).
reqobjectRequest type counters. Object key is the request name, value is the number of requests sent.
zbuf_growintTotal number of decompression buffer size increases
buf_growintTotal number of buffer size increases (deprecated, unused)
wakeupsintBroker thread poll loop wakeups
connectsintNumber of connection attempts, including successful and failed, and name resolution failures.
disconnectsintNumber of disconnects (triggered by broker, network, load-balancer, etc.).
int_latencyobjectInternal producer queue latency in microseconds. See Window stats below
outbuf_latencyobjectInternal request queue latency in microseconds. This is the time between a request is enqueued on the transmit (outbuf) queue and the time the request is written to the TCP socket. Additional buffering and latency may be incurred by the TCP stack and network. See Window stats below
rttobjectBroker latency / round-trip time in microseconds. See Window stats below
throttleobjectBroker throttling time in milliseconds. See Window stats below
topparsobjectPartitions handled by this broker handle. Key is "topic-partition". See brokers.toppars below

Window stats

Rolling window statistics. The values are in microseconds unless otherwise stated.

FieldTypeExampleDescription
minint gaugeSmallest value
maxint gaugeLargest value
avgint gaugeAverage value
sumint gaugeSum of values
cntint gaugeNumber of values sampled
stddevint gaugeStandard deviation (based on histogram)
hdrsizeint gaugeMemory size of Hdr Histogram
p50int gauge50th percentile
p75int gauge75th percentile
p90int gauge90th percentile
p95int gauge95th percentile
p99int gauge99th percentile
p99_99int gauge99.99th percentile
outofrangeint gaugeValues skipped due to out of histogram range

brokers.toppars

Topic partition assigned to broker.

FieldTypeExampleDescription
topicstring"mytopic"Topic name
partitionint3Partition id

topics

FieldTypeExampleDescription
topicstring"myatopic"Topic name
ageint gaugeAge of client's topic object (milliseconds)
metadata_ageint gaugeAge of metadata from broker for this topic (milliseconds)
batchsizeobjectBatch sizes in bytes. See Window stats·
batchcntobjectBatch message counts. See Window stats·
partitionsobjectPartitions dict, key is partition id. See partitions below.

partitions

FieldTypeExampleDescription
partitionint3Partition Id (-1 for internal UA/UnAssigned partition)
brokerintThe id of the broker that messages are currently being fetched from
leaderintCurrent leader broker id
desiredboolPartition is explicitly desired by application
unknownboolPartition not seen in topic metadata from broker
msgq_cntint gaugeNumber of messages waiting to be produced in first-level queue
msgq_bytesint gaugeNumber of bytes in msgq_cnt
xmit_msgq_cntint gaugeNumber of messages ready to be produced in transmit queue
xmit_msgq_bytesint gaugeNumber of bytes in xmit_msgq
fetchq_cntint gaugeNumber of pre-fetched messages in fetch queue
fetchq_sizeint gaugeBytes in fetchq
fetch_statestring"active"Consumer fetch state for this partition (none, stopping, stopped, offset-query, offset-wait, active).
query_offsetint gaugeCurrent/Last logical offset query
next_offsetint gaugeNext offset to fetch
app_offsetint gaugeOffset of last message passed to application + 1
stored_offsetint gaugeOffset to be committed
stored_leader_epochintPartition leader epoch of stored offset
committed_offsetint gaugeLast committed offset
committed_leader_epochintPartition leader epoch of committed offset
eof_offsetint gaugeLast PARTITION_EOF signaled offset
lo_offsetint gaugePartition's low watermark offset on broker
hi_offsetint gaugePartition's high watermark offset on broker
ls_offsetint gaugePartition's last stable offset on broker, or same as hi_offset is broker version is less than 0.11.0.0.
consumer_lagint gaugeDifference between (hi_offset or ls_offset) and committed_offset). hi_offset is used when isolation.level=read_uncommitted, otherwise ls_offset.
consumer_lag_storedint gaugeDifference between (hi_offset or ls_offset) and stored_offset. See consumer_lag and stored_offset.
leader_epochintLast known partition leader epoch, or -1 if unknown.
txmsgsintTotal number of messages transmitted (produced)
txbytesintTotal number of bytes transmitted for txmsgs
rxmsgsintTotal number of messages consumed, not including ignored messages (due to offset, etc).
rxbytesintTotal number of bytes received for rxmsgs
msgsintTotal number of messages received (consumer, same as rxmsgs), or total number of messages produced (possibly not yet transmitted) (producer).
rx_ver_dropsintDropped outdated messages
msgs_inflightint gaugeCurrent number of messages in-flight to/from broker
next_ack_seqint gaugeNext expected acked sequence (idempotent producer)
next_err_seqint gaugeNext expected errored sequence (idempotent producer)
acked_msgidintLast acked internal message id (idempotent producer)

cgrp

FieldTypeExampleDescription
statestring"up"Local consumer group handler's state.
stateageint gaugeTime elapsed since last state change (milliseconds).
join_statestring"assigned"Local consumer group handler's join state.
rebalance_ageint gaugeTime elapsed since last rebalance (assign or revoke) (milliseconds).
rebalance_cntintTotal number of rebalances (assign or revoke).
rebalance_reasonstringLast rebalance reason, or empty string.
assignment_sizeint gaugeCurrent assignment's partition count.

eos

FieldTypeExampleDescription
idemp_statestring"Assigned"Current idempotent producer id state.
idemp_stateageint gaugeTime elapsed since last idemp_state change (milliseconds).
txn_statestring"InTransaction"Current transactional producer state.
txn_stateageint gaugeTime elapsed since last txn_state change (milliseconds).
txn_may_enqboolTransactional state allows enqueuing (producing) new messages.
producer_idint gaugeThe currently assigned Producer ID (or -1).
producer_epochint gaugeThe current epoch (or -1).
epoch_cntintThe number of Producer ID assignments since start.

Example output

This (prettified) example output is from a short-lived producer using the following command: rdkafka_performance -b localhost -P -t test -T 1000 -Y 'cat >> stats.json'.

Note: this output is prettified using jq ., the JSON object emitted by librdkafka does not contain line breaks.

json
{
  "name": "rdkafka#producer-1",
  "client_id": "rdkafka",
  "type": "producer",
  "ts": 5016483227792,
  "time": 1527060869,
  "replyq": 0,
  "msg_cnt": 22710,
  "msg_size": 704010,
  "msg_max": 500000,
  "msg_size_max": 1073741824,
  "simple_cnt": 0,
  "metadata_cache_cnt": 1,
  "brokers": {
    "localhost:9092/2": {
      "name": "localhost:9092/2",
      "nodeid": 2,
      "nodename": "localhost:9092",
      "source": "learned",
      "state": "UP",
      "stateage": 9057234,
      "outbuf_cnt": 0,
      "outbuf_msg_cnt": 0,
      "waitresp_cnt": 0,
      "waitresp_msg_cnt": 0,
      "tx": 320,
      "txbytes": 84283332,
      "txerrs": 0,
      "txretries": 0,
      "req_timeouts": 0,
      "rx": 320,
      "rxbytes": 15708,
      "rxerrs": 0,
      "rxcorriderrs": 0,
      "rxpartial": 0,
      "zbuf_grow": 0,
      "buf_grow": 0,
      "wakeups": 591067,
      "int_latency": {
        "min": 86,
        "max": 59375,
        "avg": 23726,
        "sum": 5694616664,
        "stddev": 13982,
        "p50": 28031,
        "p75": 36095,
        "p90": 39679,
        "p95": 43263,
        "p99": 48639,
        "p99_99": 59391,
        "outofrange": 0,
        "hdrsize": 11376,
        "cnt": 240012
      },
      "rtt": {
        "min": 1580,
        "max": 3389,
        "avg": 2349,
        "sum": 79868,
        "stddev": 474,
        "p50": 2319,
        "p75": 2543,
        "p90": 3183,
        "p95": 3199,
        "p99": 3391,
        "p99_99": 3391,
        "outofrange": 0,
        "hdrsize": 13424,
        "cnt": 34
      },
      "throttle": {
        "min": 0,
        "max": 0,
        "avg": 0,
        "sum": 0,
        "stddev": 0,
        "p50": 0,
        "p75": 0,
        "p90": 0,
        "p95": 0,
        "p99": 0,
        "p99_99": 0,
        "outofrange": 0,
        "hdrsize": 17520,
        "cnt": 34
      },
      "toppars": {
        "test-1": {
          "topic": "test",
          "partition": 1
        }
      }
    },
    "localhost:9093/3": {
      "name": "localhost:9093/3",
      "nodeid": 3,
      "nodename": "localhost:9093",
      "source": "learned",
      "state": "UP",
      "stateage": 9057209,
      "outbuf_cnt": 0,
      "outbuf_msg_cnt": 0,
      "waitresp_cnt": 0,
      "waitresp_msg_cnt": 0,
      "tx": 310,
      "txbytes": 84301122,
      "txerrs": 0,
      "txretries": 0,
      "req_timeouts": 0,
      "rx": 310,
      "rxbytes": 15104,
      "rxerrs": 0,
      "rxcorriderrs": 0,
      "rxpartial": 0,
      "zbuf_grow": 0,
      "buf_grow": 0,
      "wakeups": 607956,
      "int_latency": {
        "min": 82,
        "max": 58069,
        "avg": 23404,
        "sum": 5617432101,
        "stddev": 14021,
        "p50": 27391,
        "p75": 35839,
        "p90": 39679,
        "p95": 42751,
        "p99": 48639,
        "p99_99": 58111,
        "outofrange": 0,
        "hdrsize": 11376,
        "cnt": 240016
      },
      "rtt": {
        "min": 1704,
        "max": 3572,
        "avg": 2493,
        "sum": 87289,
        "stddev": 559,
        "p50": 2447,
        "p75": 2895,
        "p90": 3375,
        "p95": 3407,
        "p99": 3583,
        "p99_99": 3583,
        "outofrange": 0,
        "hdrsize": 13424,
        "cnt": 35
      },
      "throttle": {
        "min": 0,
        "max": 0,
        "avg": 0,
        "sum": 0,
        "stddev": 0,
        "p50": 0,
        "p75": 0,
        "p90": 0,
        "p95": 0,
        "p99": 0,
        "p99_99": 0,
        "outofrange": 0,
        "hdrsize": 17520,
        "cnt": 35
      },
      "toppars": {
        "test-0": {
          "topic": "test",
          "partition": 0
        }
      }
    },
    "localhost:9094/4": {
      "name": "localhost:9094/4",
      "nodeid": 4,
      "nodename": "localhost:9094",
      "source": "learned",
      "state": "UP",
      "stateage": 9057207,
      "outbuf_cnt": 0,
      "outbuf_msg_cnt": 0,
      "waitresp_cnt": 0,
      "waitresp_msg_cnt": 0,
      "tx": 1,
      "txbytes": 25,
      "txerrs": 0,
      "txretries": 0,
      "req_timeouts": 0,
      "rx": 1,
      "rxbytes": 272,
      "rxerrs": 0,
      "rxcorriderrs": 0,
      "rxpartial": 0,
      "zbuf_grow": 0,
      "buf_grow": 0,
      "wakeups": 4,
      "int_latency": {
        "min": 0,
        "max": 0,
        "avg": 0,
        "sum": 0,
        "stddev": 0,
        "p50": 0,
        "p75": 0,
        "p90": 0,
        "p95": 0,
        "p99": 0,
        "p99_99": 0,
        "outofrange": 0,
        "hdrsize": 11376,
        "cnt": 0
      },
      "rtt": {
        "min": 0,
        "max": 0,
        "avg": 0,
        "sum": 0,
        "stddev": 0,
        "p50": 0,
        "p75": 0,
        "p90": 0,
        "p95": 0,
        "p99": 0,
        "p99_99": 0,
        "outofrange": 0,
        "hdrsize": 13424,
        "cnt": 0
      },
      "throttle": {
        "min": 0,
        "max": 0,
        "avg": 0,
        "sum": 0,
        "stddev": 0,
        "p50": 0,
        "p75": 0,
        "p90": 0,
        "p95": 0,
        "p99": 0,
        "p99_99": 0,
        "outofrange": 0,
        "hdrsize": 17520,
        "cnt": 0
      },
      "toppars": {}
    }
  },
  "topics": {
    "test": {
      "topic": "test",
      "metadata_age": 9060,
      "batchsize": {
        "min": 99,
        "max": 391805,
        "avg": 272593,
        "sum": 18808985,
        "stddev": 180408,
        "p50": 393215,
        "p75": 393215,
        "p90": 393215,
        "p95": 393215,
        "p99": 393215,
        "p99_99": 393215,
        "outofrange": 0,
        "hdrsize": 14448,
        "cnt": 69
      },
      "batchcnt": {
        "min": 1,
        "max": 10000,
        "avg": 6956,
        "sum": 480028,
        "stddev": 4608,
        "p50": 10047,
        "p75": 10047,
        "p90": 10047,
        "p95": 10047,
        "p99": 10047,
        "p99_99": 10047,
        "outofrange": 0,
        "hdrsize": 8304,
        "cnt": 69
      },
      "partitions": {
        "0": {
          "partition": 0,
          "broker": 3,
          "leader": 3,
          "desired": false,
          "unknown": false,
          "msgq_cnt": 1,
          "msgq_bytes": 31,
          "xmit_msgq_cnt": 0,
          "xmit_msgq_bytes": 0,
          "fetchq_cnt": 0,
          "fetchq_size": 0,
          "fetch_state": "none",
          "query_offset": 0,
          "next_offset": 0,
          "app_offset": -1001,
          "stored_offset": -1001,
          "commited_offset": -1001,
          "committed_offset": -1001,
          "eof_offset": -1001,
          "lo_offset": -1001,
          "hi_offset": -1001,
          "consumer_lag": -1,
          "txmsgs": 2150617,
          "txbytes": 66669127,
          "rxmsgs": 0,
          "rxbytes": 0,
          "msgs": 2160510,
          "rx_ver_drops": 0
        },
        "1": {
          "partition": 1,
          "broker": 2,
          "leader": 2,
          "desired": false,
          "unknown": false,
          "msgq_cnt": 0,
          "msgq_bytes": 0,
          "xmit_msgq_cnt": 0,
          "xmit_msgq_bytes": 0,
          "fetchq_cnt": 0,
          "fetchq_size": 0,
          "fetch_state": "none",
          "query_offset": 0,
          "next_offset": 0,
          "app_offset": -1001,
          "stored_offset": -1001,
          "commited_offset": -1001,
          "committed_offset": -1001,
          "eof_offset": -1001,
          "lo_offset": -1001,
          "hi_offset": -1001,
          "consumer_lag": -1,
          "txmsgs": 2150136,
          "txbytes": 66654216,
          "rxmsgs": 0,
          "rxbytes": 0,
          "msgs": 2159735,
          "rx_ver_drops": 0
        },
        "-1": {
          "partition": -1,
          "broker": -1,
          "leader": -1,
          "desired": false,
          "unknown": false,
          "msgq_cnt": 0,
          "msgq_bytes": 0,
          "xmit_msgq_cnt": 0,
          "xmit_msgq_bytes": 0,
          "fetchq_cnt": 0,
          "fetchq_size": 0,
          "fetch_state": "none",
          "query_offset": 0,
          "next_offset": 0,
          "app_offset": -1001,
          "stored_offset": -1001,
          "commited_offset": -1001,
          "committed_offset": -1001,
          "eof_offset": -1001,
          "lo_offset": -1001,
          "hi_offset": -1001,
          "consumer_lag": -1,
          "txmsgs": 0,
          "txbytes": 0,
          "rxmsgs": 0,
          "rxbytes": 0,
          "msgs": 1177,
          "rx_ver_drops": 0
        }
      }
    }
  },
  "tx": 631,
  "tx_bytes": 168584479,
  "rx": 631,
  "rx_bytes": 31084,
  "txmsgs": 4300753,
  "txmsg_bytes": 133323343,
  "rxmsgs": 0,
  "rxmsg_bytes": 0
}