1. 介绍

对kafka的监控是很必要的。kafka本身支持通过JMX的方式来获取一些监控项的值。
关于这方面的资料在官方文档6.6节中均有说明。

但是值得注意的是,官方文档提供的监控指标仅仅给出了broker和producer上的监控项说明,而没有给出新的consumer api的监控项。关于重写的consumer客户端提供的监控项可以查看kafka conflunet platform文档中关于monitoring的说明。本文也主要参照confluent的文章来整理。

了解kafka本身监控项的含义,配个zabbix这种监控工具可以很容易对kafka进行全面的监控,并且支持报警。

kafka使用yammer度量标准。这个我查了下,貌似是个com.yammer.metrics提供的度量工具,有机会可以使用下。maven库里面应该也有。它可以给监控系统提供插件式的统计报表监控。

2. jconsole监控kafka

查看这些监控最简单的方式就是使用jconsole,这样可以通过jmx查看所有的度量信息。

PS:kafka的监控可以分为三类:

  1. broker实例监控
  2. 生产者实例监控
  3. 消费者实例监控

请大家按照自己的需求来选择。本文之后罗列的监控选项和解释并不是最全的(官网的其实也不全)。如果需要了解更完整的监控项目建议结合官网文档和JCONSOLE查看的MBEAN来看。

2.1 broker实例监控

2.2 消费者监控

2.3 生产者监控

3. broker上的JMX度量指标

这里的监控指标主要按照kafka conflunet platform文档中提到的进行罗列。官方文档中关于broker监控的部分在新版本的0.10.x中仍然适用,可以对照着看。

Server Metrics

3.1 metrics to alert on a Kafka broker

Here are the important metrics to alert on a Kafka broker:

名称 说明
kafka.server:type=ReplicaManager,
name=UnderReplicatedPartitions
Number of under-replicated partitions (ISR < all replicas ). Alert if value is greater than 0.
kafka.controller:type=KafkaController,
name=OfflinePartitionsCount
Number of partitions that don’t have an active leader and are hence not writable or readable. Alert if value is greater than 0.
kafka.controller:type=KafkaController,
name=ActiveControllerCount
Number of active controllers in the cluster. Alert if value is anything other than 1.

3.2 metrics to observe on a Kafka broker

Here are the list of metrics to observe on a Kafka broker:

名称 说明
kafka.server:type=BrokerTopicMetrics,
name=MessagesInPerSec
Aggregate incoming message rate.
kafka.server:type=BrokerTopicMetrics,
name=BytesInPerSec
Aggregate incoming byte rate.
kafka.server:type=BrokerTopicMetrics,
name=BytesOutPerSec
Aggregate outgoing byte rate.
kafka.network:type=RequestMetrics,
name=RequestsPerSec,request={Produce or FetchConsumer or FetchFollower}
Request rate.
kafka.log:type=LogFlushStats,
name=LogFlushRateAndTimeMs
Log flush rate and time.
kafka.controller:type=ControllerStats,
name=LeaderElectionRateAndTimeMs
Leader election rate and latency.
kafka.controller:type=ControllerStats,
name=UncleanLeaderElectionsPerSec
Unclean leader election rate.
kafka.server:type=ReplicaManager,
name=PartitionCount
Number of partitions on this broker. This should be mostly even across all brokers.
kafka.server:type=ReplicaManager,
name=LeaderCount
Number of leaders on this broker. This should be mostly even across all brokers. If not, set auto.leader.rebalance.enable to true on all brokers in the cluster.
kafka.server:type=ReplicaManager,
name=IsrShrinksPerSec
If a broker goes down, ISR for some of the partitions will shrink. When that broker is up again, ISR will be expanded once the replicas are fully caught up. Other than that, the expected value for both ISR shrink rate and expansion rate is 0.
kafka.server:type=ReplicaManager,
name=IsrExpandsPerSec
When a broker is brought up after a failure, it starts catching up by reading from the leader. Once it is caught up, it gets added back to the ISR.
kafka.server:type=ReplicaFetcherManager,
name=MaxLag,clientId=Replica
Maximum lag in messages between the follower and leader replicas. This is controlled by the replica.lag.max.messages config.
kafka.server:type=FetcherLagMetrics,
name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+)
Lag in number of messages per follower replica. This is useful to know if the replica is slow or has stopped replicating from the leader.
kafka.network:type=RequestMetrics,
name=TotalTimeMs,request={Produce or FetchConsumer or FetchFollower}
Total time in ms to serve the specified request.
kafka.server:type=ProducerRequestPurgatory,
name=PurgatorySize
Number of requests waiting in the producer purgatory. This should be non-zero acks=-1 is used on the producer.
kafka.server:type=FetchRequestPurgatory,
name=PurgatorySize
Number of requests waiting in the fetch purgatory. This is high if consumers use a large value for fetch.wait.max.ms .

4. 生产者JMX监控指标

以下指标在新的producer 实例上可以获取,即监控producer

这里补充一下:发送方将一条条record组成一个record batch。在发送的时候根据缓存大小将累加起来的record batches一起发送给broker.

发送者的监控参数主要分为buffer、record、request、io这几类。

重要提示!!!: 官方文档给出的consumer监控项是0.8.x的。现在0.10.x都是使用的新的consumer api。因此,查看最新的consumer监控项,建议查看kafka conflunet platform文档中关于monitoring的说明。

Producer Metrics
Starting with 0.8.2, the new producer exposes the following metrics:

4.1 Global Request Metrics

MBean: kafka.producer:type=producer-metrics,client-id=([-.w]+)

名称 说明
request-latency-avg The average request latency in ms.
request-latency-max The maximum request latency in ms.
request-rate The average number of requests sent per second.
response-rate The average number of responses received per second.
incoming-byte-rate The average number of incoming bytes received per second from all servers.
outgoing-byte-rate The average number of outgoing bytes sent per second to all servers.

4.2 Global Connection Metrics

MBean: kafka.producer:type=producer-metrics,client-id=([-.w]+)

名称 说明
connection-count The current number of active connections.
connection-creation-rate New connections established per second in the window.
connection-close-rate Connections closed per second in the window.
io-ratio The fraction of time the I/O thread spent doing I/O.
io-time-ns-avg The average length of time for I/O per select call in nanoseconds.
io-wait-ratio The fraction of time the I/O thread spent waiting.
select-rate Number of times the I/O layer checked for new I/O to perform per second.
io-wait-time-ns-avg The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.

4.3 Per-Broker Metrics

MBean: kafka.producer:type=producer-node-metrics,client-id=([-.w]+),node-id=([0-9]+)

Besides the Global Request Metrics_, the following metrics are also available per broker:

名称 说明
request-size-max The maximum size of any request sent in the window for a broker.
request-size-avg The average size of all requests in the window for a broker.
request-rate The average number of requests sent per second to the broker.
response-rate The average number of responses received per second from the broker.
incoming-byte-rate The average number of bytes received per second from the broker.
outgoing-byte-rate The average number of bytes sent per second to the broker.

4.4 topic-level的度量指标(Per-Topic Metrics)

MBean: kafka.producer:type=producer-topic-metrics,client-id=([-.w]+),topic=([-.w]+)

Besides the Global Request Metrics_, the following metrics are also available per topic:

名称 说明
byte-rate The average number of bytes sent per second for a topic.
record-send-rate The average number of records sent per second for a topic.
compression-rate The average compression rate of record batches for a topic.
record-retry-rate The average per-second number of retried record sends for a topic.
record-error-rate The average per-second number of record sends that resulted in errors for a topic.

5. 消费者JMX监控指标

Starting with Kafka 0.9.0.0, the new consumer exposes the following metrics:

5.1 Fetch Metrics

MBean: kafka.consumer:type=consumer-fetch-manager-metrics,client-id=([-.w]+)

名称 说明
records-lag-max The maximum lag in terms of number of records for any partition in this window. An increasing value over time is your best indication that the consumer group is not keeping up with the producers.
fetch-size-avg The average number of bytes fetched per request.
fetch-size-max The max number of bytes fetched per request.
bytes-consumed-rate The average number of bytes consumed per second.
records-per-request-avg The average number of records in each request.
records-consumed-rate The average number of records consumed per second
fetch-rate The number of fetch requests per second.
fetch-latency-avg The average time taken for a fetch request.
fetch-latency-max The max time taken for a fetch request.
fetch-throttle-time-avg The average throttle time in ms. When quotas are enabled, the broker may delay fetch requests in order to throttle a consumer which has exceeded its limit. This metric indicates how throttling time has been added to fetch requests on average.
fetch-throttle-time-avg The maximum throttle time in ms

5.2 Consumer Group Metrics

MBean: kafka.consumer:type=consumer-coordinator-metrics,client-id=([-.w]+)

名称 说明
assigned-partitions The number of partitions currently assigned to this consumer.
commit-latency-avg The average time taken for a commit request.
commit-latency-max The max time taken for a commit request
commit-rate The number of commit calls per second.
join-rate The number of group joins per second. Group joining is the first phase of the rebalance protocol. A large value indicates that the consumer group is unstable and will likely be coupled with increased lag.
join-time-avg The average time taken for a group rejoin. This value can get as high as the configured session timeout for the consumer, but should usually be lower.
join-time-max The max time taken for a group rejoin. This value should not get much higher than the configured session timeout for the consumer.
sync-rate The number of group syncs per second. Group synchronization is the second and last phase of the rebalance protocol. Similar to join-rate, a large value indicates group instability.
sync-time-avg The average time taken for a group sync.
sync-time-max The max time taken for a group sync.
heartbeat-rate The average number of heartbeats per second. After a rebalance, the consumer sends heartbeats to the coordinator to keep itself active in the group. You can control this using the heartbeat.interval.ms setting for the consumer. You may see a lower rate than configured if the processing loop is taking more time to handle message batches. Usually this is OK as long as you see no increase in the join rate.
heartbeat-response-time-max The max time taken to receive a response to a heartbeat request.
last-heartbeat-seconds-ago The number of seconds since the last controller heartbeat.

5.3 Global Request Metrics

MBean: kafka.consumer:type=consumer-metrics,client-id=([-.w]+)

名称 说明
request-rate The average number of requests sent per second.
response-rate The average number of responses received per second.
incoming-byte-rate The average number of incoming bytes received per second from all servers.
outgoing-byte-rate The average number of outgoing bytes sent per second to all servers.

5.4 Global Connection Metrics

MBean: kafka.consumer:type=consumer-metrics,client-id=([-.w]+)

名称 说明
connection-count The current number of active connections.
connection-creation-rate New connections established per second in the window.
connection-close-rate Connections closed per second in the window.
io-ratio The fraction of time the I/O thread spent doing I/O.
io-time-ns-avg The average length of time for I/O per select call in nanoseconds.
io-wait-ratio The fraction of time the I/O thread spent waiting.
select-rate Number of times the I/O layer checked for new I/O to perform per second.
io-wait-time-ns-avg The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.

5.5 Per-Broker Metrics

MBean: kafka.consumer:type=consumer-node-metrics,client-id=([-.w]+),node-id=([0-9]+)

Besides the Global Request Metrics_, the following metrics are also available per broker:

名称 说明
request-size-max The maximum size of any request sent in the window for a broker.
request-size-avg The average size of all requests in the window for a broker.
request-rate The average number of requests sent per second to the broker.
response-rate The average number of responses received per second from the broker.
incoming-byte-rate The average number of bytes received per second from the broker.
outgoing-byte-rate The average number of bytes sent per second to the broker.

6. 监控指标建议

上面罗列了很多监控项目,有点眼花缭乱。官方文档给出了监控方案:

server端监控信息:

  1. 监控GC time
  2. 监控系统状态比如CPU利用率、I/O服务时间等

client端监控信息:

  1. monitoring the message/byte rate (global and per topic)
  2. request rate/size/time

consumer监控项:

  1. max lag in messages among all partitions
  2. min fetch request rate.

For a consumer to keep up, max lag needs to be less than a threshold and min fetch rate needs to be larger than 0.

补充说明:生产者和消费者的监控都需要指定client-id。这个可以在通过配置clinet.id来指定。如果不指定,则对于生产者默认采用producer-x的命名方式;对于consumer默认采用consumer-x的命名方式。

7. 审计

数据交付的正确性比较重要。在Linkedin,他们对被所有消费者消费的消息的lag进行审计。对于重要的topic,如果没有在特定时间段内完成消息交付会发警报。在Linkedin内部由一套系统监控数据流,来保证所有的消息交付到所有消费者。

PS: 关键是基于时间戳对Event Count进行统计。

关于如何实施kafka进行审计建议查看官方文章:Add audit trail to kafka。我只是粗略看了下,今天先下班了。后续有时间了再将该文章思想补充到本文中。