1.介绍

本节主要是希望通过以下两方面来研究kafka在容错处理时对消费者消费延迟的影响:

  1. 当1个broker挂掉后,分区会重新平衡,这时候观察消费者的消费速率、延迟
  2. 当增加和删除分区时,数据也会重新平衡,这时候观察消费者的消费速率和延迟

使用版本:kafka 0.10.0.0
JVM配置信息:

-Xmx1G
-Xms1G
-XX:+UseG1GC
-XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35
-XX:+DisableExplicitGC
-Djava.awt.headless=true
-Xloggc:/root/Downloads/kafka_2.11-0.10.0.0/bin/../logs/kafkaServer-gc.log
-verbose:gc
-XX:+PrintGCDetails
-XX:+PrintGCDateStamps
-XX:+PrintGCTimeStamps
-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.port=9999
-Dkafka.logs.dir=/root/Downloads/kafka_2.11-0.10.0.0/bin/../logs
-Dlog4j.configuration=file:./../config/log4j.properties 

测试集群信息:2台broker,1个topic,6个分区,2个replication

发送者发送的信息(5亿条比较小的记录):


消费者消费的主要代码(可以发现消费者只是单纯fetch消息,没有任何处理工作,因此处理的延迟肯定是很低的):

2. 基准测试

通过对kafka监控,我们首先观察不出现异常时,kafka的运行情况。

2.1 分区情况

可以看到数据均衡地存储在各个分区上

broker0的6个分区上,存放了3份replica0和3份replica1
broker1的6个分区上,存放了3分replica1和3分replica0

2.2 监控情况

2.2.1 producer发送字节数

2.2.2 生产者发送延迟

平均在40-50毫秒之间

2.2.3 接收消息总计

前面从非零开始是因为我之前也做了一些实验

2.2.4 数据量进出总计

前面从非零开始是因为我之前也做了一些实验

2.2.5 消费平均fetch和commit延迟

fetch如果拿不到数据有个超时时间,为500ms。平均的fetch延迟为36.53ms

2.2.6 消费者每秒平均消费的数据量

3. 消费者reblance影响

消费者组的消费者如果消费时间加上poll时间大于session超时时间,则会被踢出消费者组。
消费进行重新平衡(分区和消费者实例的对应关系重新分配),这时候如果等处理完再去拉消息就会取不到消息或者造成重复消费的异常问题。

这里我们采用1个topic,有6个分区和6个消费者实例。多线程消费代码如下:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author Wan Kaiming on 2016/8/2
 * @version 1.0
 */
public class MyConsumer {

    //记录多线程消费的所有记录数
    private static AtomicInteger totalRecordsCount = new AtomicInteger(0);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        MyConsumer myConsumer = new MyConsumer();
        //myConsumer.runAuto();
        //myConsumer.runMannual();
        myConsumer.runAutoWithMultiThreads();

    }


    /**
     *  自动提交offset,多线程消费
     */

    private void runAutoWithMultiThreads() throws ExecutionException, InterruptedException {
        Properties props = new Properties();

        //broker的地址
        props.put("bootstrap.servers", "mysql1:9092,mysql4:9092");
        //唯一标识消费者组的ID
        props.put("group.id", "test_new");
        //开启自动提交offset
        props.put("enable.auto.commit", "true");
        //自动提交offset间隔1s
        props.put("auto.commit.interval.ms", "1000");
        //消费会话超时时间10s(做测试)
        props.put("session.timeout.ms", "10000");
        //key的反序列化类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //value的反序列化类
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //创建多个consumer实例,并且全部订阅同一个topic
        List<KafkaConsumer<String, String>> consumerList = new ArrayList<>();
        for(int i=0;i<6;i++){
            //指定clientID方便监控
            String clientID = "test_consumer_" + Integer.toString(i);
            props.put("client.id", clientID);
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
            consumer.subscribe(Arrays.asList("my-topic-new"));
            consumerList.add(consumer);
        }



        ExecutorService executorService = Executors.newFixedThreadPool(6);


        //启动6个消费线程

        Future future = null;
        for(int i=0;i<6;i++) {

            future =  executorService.submit(new ConsumerRunnable(consumerList.get(i)));

        }

        executorService.shutdown();

        while(future.get() == null){
            System.out.println("消费者总计消费"+totalRecordsCount.get());
        }
    }



    //创建消费类,用于执行多线程
    class ConsumerRunnable implements Runnable{

        KafkaConsumer<String, String> consumer;

        public ConsumerRunnable(KafkaConsumer<String, String> consumer){
            this.consumer=consumer;
        }

        @Override
        public void run() {
            int count = 0;

            //长连接
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("pid = %d,offset = %d, key = %s, value = %s\n", Thread.currentThread().getId(), record.offset(), record.key(), record.value());

                    count++;
                    MyConsumer.totalRecordsCount.addAndGet(1);
                }

                System.out.println("线程"+Thread.currentThread().getId()+"消费记录:"+count+"条");

                //消费完毕后休眠20秒或者20ms
                try {
                    Thread.currentThread().sleep(20);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }

        }
    }



    /**
     * 自动提交offset
     */
    private void runAuto(){

        Properties props = new Properties();
        //指定clientID方便监控
        props.put("client.id", "test_consumer_1");
        //broker的地址
        props.put("bootstrap.servers", "mysql1:9092,mysql4:9092");
        //唯一标识消费者组的ID
        props.put("group.id", "test_new");
        //开启自动提交offset
        props.put("enable.auto.commit", "true");
        //自动提交offset间隔1s
        props.put("auto.commit.interval.ms", "1000");
        //消费会话超时时间30s
        props.put("session.timeout.ms", "30000");
        //key的反序列化类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //value的反序列化类
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //多个主题用逗号分隔
        consumer.subscribe(Arrays.asList("my-topic-new"));

        String tempStr;

        //长连接
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records){
                System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
                tempStr = record.key() + record.value();
            }

            //消费完毕后休眠10秒
            try {
                Thread.currentThread().sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }


    /**
     * 手动提交offset
     */
    private void runMannual(){
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.45.10.31:9092,10.45.10.34:9092");
        props.put("group.id", "test");
        //不开启自动提交
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("my-topic-new"));
        final int minBatchSize = 200;
        List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                buffer.add(record);
            }
            if (buffer.size() >= minBatchSize) {

                //打印结果
                for(ConsumerRecord<String,String> record:buffer){
                    System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
                }

                //同步提交
                consumer.commitSync();
                buffer.clear();
            }
        }
    }
}

生产者代码如下:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Date;
import java.util.Properties;

/**
 * @author Wan Kaiming on 2016/8/1
 * @version 1.0
 */
public class MyProducer {


    public static void main(String[] args) {

        //统计时间
        System.out.println("程序开始时间戳信息:"+new Date());
        final long startTime=System.currentTimeMillis();

        Properties props = new Properties();

        //指定clientID方便监控
        props.put("client.id", "test_producer_1");

        //broker地址 这里用域名,记得修改本地的hosts文件
        props.put("bootstrap.servers", "mysql1:9092,mysql4:9092");
        //消息可靠性语义
        props.put("acks", "all");
        //请求broker失败进行重试的次数,避免由于请求broker失败造成的消息重复
        props.put("retries", 3);
        //按批发送,每批的消息数量
        //props.put("batch.size", 16384);
        props.put("batch.size", 16384);
        //防止来不及发送,延迟一点点时间,使得能够批量发送消息
        props.put("linger.ms", 1);
        //缓冲大小,bytes
        props.put("buffer.memory", 33554432);
        //key的序列化类
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //value的序列化类
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //创建一个Producer对象,加载配置上下文信息
        Producer<String, String> producer = new KafkaProducer<String,String>(props);



        for(int i=0;i<100;i++){
            producer.send(new ProducerRecord<String, String>("my-topic-new", Integer.toString(i), "one,two,three,four"));
        }







        producer.close();

        final long endTime=System.currentTimeMillis();
        float excTime=(float)(endTime-startTime)/1000;
        System.out.println("执行时间:"+excTime+"s");
        System.out.println("当前时间为:"+ new Date());



    }
}

3.1 正常消费结果

当我们正常消费的时候(设置sleep时间位20ms,保证不会超过超时会话超时时间),此时的执行结果如下。可以发现程序正常消费,且没有重复和异常。不过可以发现,kafka多个消费者消费的时候,并不是完全均衡的。这个也可以理解,毕竟是并发消费。

3.2 异常消费结果

如果设置sleep为20秒,则结果如下

警告内容如下:

WARN  o.a.kafka.clients.consumer.internals.ConsumerCoordinator - Auto offset commit failed for group test_new: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

此外也可以看到kafka manager上消费的异常:

观察消费结果会发现存在重复消费、消费实例被间歇性阻塞(主要原因就是由于不断的reblance导致)、消费者普遍出现明显lag这些问题。

针对这个问题,这里可以给出以下解决方案:

  1. 消费者取到消息后交给异步线程写到一个阻塞队列。处理线程从阻塞队列中取消息处理。该方法的缺点就是阻塞队列本身占用内存资源,不宜过大。优点是对kafka本身的性能不造成影响

  2. 设置kafka的超时时间稍微长点(默认30秒)或者设置poll的时间稍微小点使得poll的时间加上处理时间小于会话超时时间。这样做优点是消费者不需要额外在引入一个消费队列。缺点是,不合理的设置过程的超时时间或者过小的批次容易影响kafka性能。

个人觉得还是2方法稍微好些,不引入系统额外的复杂度。

4. broker挂掉的影响

4.1 broker挂掉对消费者影响

broker挂掉之后,分区自然也要重新平衡,这时候会对消费产生什么影响呢?让我们来跑下实验。

实验过程:

  1. 生产者发送N条信息
  2. 生产者发送完毕后马上kill一个broker
  3. 观察消费者状态(应该是阻塞了)。此处消费者每次消费完一条记录休眠2秒,方便我们有时间KILL BROKER。此外设置poll为1毫秒,方便执行休眠
  4. 重启broker并观察消费者是否继续消费,消费的条数和正确性怎样

4.1.1 发送消息条数为10时的过程

发送前状况:所有leader负载均衡,ISR集合包含2个server,没有处在同步中的分区

发送后,KILL掉broker后状态:分区进行重新平衡,leader重新选举为0,ISR集合只有一个SERVER,所有分区处于under replicated状态

看剩下的BROKER上也可以看到消费者的重新组织

结果:此时consumer仍然成功消费到所有消息,无重复,无丢失。

4.1.2 不同消息发送量下的实验结果

消息条数 消费状况
10 正确消费
1000 正确消费
5万 正确消费
300万 正确消费

结论:只要发送者能够正确发完,在ack=all的时候,kill一个broker不会对kafka造成影响。

4.2 broker挂掉对生产者影响

生产者发送过程中(消费者保持正常消费),kill broker之后生产者可能会有如下的警告,但是因为设置了重发,重新发送后会发送到那个正常的broker上(retries可以设置)。观察消费者端的消费情况可以发现消费者稍微消费了几条数据后,稍微被阻塞了一会,但是很快又能回复消费正确拿到所有数据。

2016-08-31 18:03:36.628 [kafka-producer-network-thread | test_producer_1] WARN  org.apache.kafka.clients.producer.internals.Sender - Got error produce response with correlation id 2 on topic-partition my-topic-new2-3, retrying (2 attempts left). Error: NETWORK_EXCEPTION

4.3 broker挂掉后对分区leader和ISR的影响

在正常运行时,所有leader都是平衡的,而且ISR集合也是包含所有SERVER,所有分区都不处于under replicated状态

当其中一个broker被kill后,其变化如下(PS:此时处于under replicated状态,这个在kafka manager上可以看)

此时已经很快的选举好了新leader,然后尝试重启server,重启后可以发现ISR集合很快同步完成(此时在kafka manager上的Under Replicated状态又变为false),但是leader还没有平衡。leader的平衡有参数leader.imbalance.check.interval.seconds来控制,默认为300秒,因此需要等待5分钟才会leader重新平衡

5分钟后查看