1. 介绍

原文来自linkedin的一篇PPTproducer-performance-tuning-for-apache-kafka

2. 本文的一些前提

  1. 讨论的kafka版本为0.10.0
  2. 没有broker端的再压缩
  3. 消息都有8字节的时间戳介绍信息

3. 优化目标

给定一个要发送的数据集,在满足持久性、有序性的前提下优化以下两点:

  1. 吞吐量
  2. 延迟

优化专注于优化平均性能,这样对所有的producer都有效。

4. kafka producer原理回顾

4.1 生产者的关键配置

  • batch.size: 基于大小的batching策略
  • linger.ms: 基于时间的batching策略
  • compression.type:压缩的速度上lz4=snappy<gzip。
  • max.in.flight.requests.per.connection (affects ordering,设置为1可以保证有序性,但是发送性能会受影响。不为1的时候,如果发生消息重发则会乱序)
  • acks (affects durability)

PS: 更大的批次,意味着更好的压缩率、更高的吞吐量。但是负面影响,就是延迟会高些。

4.2 生产者发送原理

这个之前在kafka生产者原理详解一文中做了一些分析。现在来看看kafka的 committer如何来分析的发送者原理的。其分析相对更加简明扼要。

发送者发送消息的过程简单概括为:

  1. 序列化
  2. 根据topic的元信息对数据进行分区
  3. 分区数据经过压缩器处理后放入batch,产生M和CB。分区数据按照batch在Record Accumulator里面组织(used和callback)。一个batch对象本身会占用一些空间,图上的used和callbacks。

假设现在Record Accumulator中已经包含了如下的数据:

当一个batch准备完毕后,用户线程就可以去执行具体的发送操作了。当满足以下条件之一时,我们认为一个batch是已经“准备完毕的”:

  1. batch.size达到了
  2. linger.ms时间达到了
  3. 发现同一个broker的其他batch已经完毕了
  4. flush()和close()方法被调用

用户线程获取batch的过程如下:

  1. 从batch队列中轮询获取batch
  2. 将batch根据leader broker来分组
  3. 将分完组的batches发送给broker
  4. 如果max.in.flight.requests.per.connection>1则在管道中排队

PS: 接下来的说明,都假设max.in.flight.requests.per.connection=1

5. 生产者调优

5.1 调优工具

生产者调优,主要可以利用kafka-producer-perf-test.sh(org.apache.kafka.tools.ProducerPerformance)。通过测试不同的配置来对比发送效率。

使用方法例子:

./kafka-producer-perf-test.sh --num-records 1000000 --record-size 1000 --topic
becket_test_3_replicas_1_partition --throughput 1000000 --producer-props bootstrap.
servers=localhost:9092 max.in.flight.requests.per.connection=1 batch.size=100000
compression.type=lz4

PS: kafka 0.8的版本还支持thread-num等选项,现在0.10.1中还没有,不过已经有issue在解决了。相信马上会有了。详情见:KAFKA-3554

3554修复后会有如下功能:

  1. --num-threads: 发送消息的线程数
  2. --value-bound: The range of the random integer in the messages. This option is useful when compression is used.Different integer range simulates different compression ratio.
  3. producer metrics: 在使用ProducerPerformance的时候,还会打印一系列metrics。

关于第三点,是以前没有的特性。这个对生产者调优十分重要。使用ProducerPerformance的时候,打印的度量信息有:

  • Select_Rate_Avg (The rate that the sender thread runs to check if it can send some messages)
  • Request_Rate_Avg
  • Request_Latency_Avg (Not including the callback execution time)
  • Request_Size_Avg (After compression)
  • Batch_Size_Avg (After compression)
  • Records_Per_Request_Avg
  • Record_Queue_Time_Avg
  • Compression_Rate_Avg

PS:以上度量信息,需要至少1分钟运行时间才能保证稳定。

使用例子:

./kafka-producer-perf-test.sh --num-records 1000000 --record-size 1000 --topic
becket_test_3_replicas_4_partition --throughput 100000 --num-threads 1 --value-bound 50000
--producer-props bootstrap.servers=localhost:9092 compression.type=gzip max.in.flight.
requests.per.connection=1

5.2 用于调优的几个公式

5.2.1 吞吐量计算公式

吞吐量可以用以下公式估算:
throughput_Avg(平均吞吐量) ~= Request_Rate_Avg (平均请求速率)* Request_Size_Avg(平均请求大小) / Compression_Rate_Avg (压缩率)
估算的实际值会比实际值大一些,因为会有一些request overhead没有考虑进去。

5.2.2 request_size_avg计算

平均请求大小的计算公式为:

Request_Size_Avg(平均请求大小) = Records_Per_Request_Avg (每个请求的消息数)Record_Size (消息大小) Compression_Rate_Avg(压缩率) +Request_Overhead

request overhead取决于:

  1. topic和分区数量
  2. 一般都是从几十字节到几百字节

5.2.3 Request_Rate_Upper_Limit

5.2.4 平均延迟计算公式

5.3 调优工具使用示例

假设我们使用以下的生产者来测试:

5.2.1 测试结果分析

根据得到的结果,我们发现吞吐量为9.96MB/s,远远小于我们实际的网络带宽1Gbps。

request_rate_avg和理论上限差距不大,而压缩率又是固定的。所以我们的目标为增大request_size_avg来增加吞吐量。增加吞吐量的方式主要有:

linger.ms与batch size、压缩率以及吞吐量和延迟之间的关系:

5.2.2 batching与压缩时间和吞吐量的关系

上图看出来,batching增大之后,吞吐量反而变差了,而且压缩率也只有少量增长。这种原因主要是:增大batch会显著增加压缩的耗时。

相关测试:

总结: 一般我们说增大批次,都有利于增加吞吐量(减少了网络IO次数)。但是这里之所以行不通是因为增大批次带来的好处无法抵消压缩时间的增长。从上图的实验结果可以看到,采用16KB或者索性采用较大的256KB都是可以的。避免采用处在中间的batch size

5.2.3 线程数与吞吐量的关系

可见:发送者的线程数,不是越多越好,因为线程数过多会影响延迟,而且有时候会产生负面效果。但是一般线程数小于topic分区数都是没啥问题的。

5.2.4 优化

通过增加分区数、线程数、batch size,使得吞吐量得到改善:

5.2.5 关于寻找吞吐量瓶颈的方式

5.3 acks=-1(all)时的延迟调优

5.3.1 原理回顾

acks=all的时候,瓶颈很有肯能发生在replication time。

高水位线的值变更需要等待下一次fetch过来之后才变更。所有ProduceRequest里面的高水位线全部抵达当前offset了,才会返回ProduceResponse。

第二个fetch过来的时候,partition0的高水位线移动到当前offset

假设broker1只有1个replication线程,则replicaiton time为

5.3.2 replication time优化

显而易见的是增加num.replica.fetchers,从而使得并发的fetch来做复制。这样的Replication time则为:

设置多少的replica fetchers合理?一般按照官方的生产建议设置成4就好了。

5.4 生产者RTT时间长优化

5.4.1 场景描述

有个跨洋的pipeline

5.4.2 优化方案

现有情况的计算,发现确实吞吐量比较低。

解决办法是增加send和 recieve buffer。下图可以看到增大吞吐量之后,最多能达到20MB/s的吞吐量。