1. 介绍

以前分享过一篇文章kafka原理以及设计实现思想,但是很多东西讲的还是不够深入。今天这篇文章主要分析下消费者的具体设计和原理。

文章主要内容转自Kafka设计解析(四)- Kafka Consumer设计解析,我只选择了一些自己尚未掌握的或者自己认为比较重要的内容作总结。

2. consumer rebalance算法

这里引用下参考资料的内容:

2.1 old consumer

较老的consumer的相关元信息是保存在zk上的。其结构如下(图片来自参考文献的博客):

0.8.2.1版本的Kafka的Consumer Rebalance的控制策略是由每一个Consumer通过在Zookeeper上注册Watch完成的。每个Consumer被创建时会触发Consumer Group的Rebalance,具体启动流程如下:

  1. High Level Consumer启动时将其ID注册到其Consumer Group下,在Zookeeper上的路径为/consumers/[consumer group]/ids/[consumer id]
  2. 在/consumers/[consumer group]/ids上注册Watch
  3. 在/brokers/ids上注册Watch
  4. 如果Consumer通过Topic Filter创建消息流,则它会同时在/brokers/topics上也创建Watch
  5. 强制自己在其Consumer Group内启动Rebalance流程

  在这种策略下,每一个Consumer或者Broker的增加或者减少都会触发Consumer Rebalance。因为每个Consumer只负责调整自己所消费的Partition,为了保证整个Consumer Group的一致性,当一个Consumer触发了Rebalance时,该Consumer Group内的其它所有其它Consumer也应该同时触发Rebalance。

该方式有如下缺陷:

  1. Herd effect(羊群效应):任何Broker或者Consumer的增减都会触发所有的Consumer的Rebalance
  2. Split Brain(脑裂问题):每个Consumer分别单独通过Zookeeper判断哪些Broker和Consumer 宕机了,那么不同Consumer在同一时刻从Zookeeper“看”到的View就可能不一样,这是由Zookeeper的特性决定的,这就会造成不正确的Reblance尝试。
  3. 调整结果不可控:所有的Consumer都并不知道其它Consumer的Rebalance是否成功,这可能会导致Kafka工作在一个不正确的状态。

后续new consumer api则将相关元数据存储到一个kafka topic中来避免以上问题。

2.2 new consumer

新消费者的设计主要考虑以下因素

2.2.1 简化消费者客户端

 部分用户希望开发和使用non-java的客户端。现阶段使用non-java发SimpleConsumer比较方便,但想开发High Level Consumer并不容易。因为High Level Consumer需要实现一些复杂但必不可少的失败探测和Rebalance。如果能将消费者客户端更精简,使依赖最小化,将会极大的方便non-java用户实现自己的Consumer。

2.2.2 中心Coordinator

如上文所述,当前版本的High Level Consumer存在Herd Effect和Split Brain的问题。如果将失败探测和Rebalance的逻辑放到一个高可用的中心Coordinator,那么这两个问题即可解决。同时还可大大减少Zookeeper的负载,有利于Kafka Broker的Scale Out。

2.2.3 允许手工管理offset

2.2.4 Rebalance后触发用户指定的回调

 一些应用可能会在内存中为每个Partition维护一些状态,Rebalance时,它们可能需要将该状态持久化。因此该需求希望支持用户实现并指定一些可插拔的并在Rebalance时触发的回调。如果用户使用手动的Offset管理,那该需求可方便得由用户实现,而如果用户希望使用Kafka提供的自动Offset管理,则需要Kafka提供该回调机制。

2.2.5 非阻塞式Consumer API

该需求源于那些实现高层流处理操作,如filter by, group by, join等,的系统。现阶段的阻塞式Consumer几乎不可能实现Join操作。

3. new consumer设计和实现

前面说了old consumer会有羊群效应和脑裂等问题。接下来看看new consume如何通过中心coordinator来实现rebalance.

成功Rebalance的结果是,被订阅的所有Topic的每一个Partition将会被Consumer Group内的一个(有且仅有一个)Consumer拥有。每一个Broker将被选举为某些Consumer Group的Coordinator。某个Cosnumer Group的Coordinator负责在该Consumer Group的成员变化或者所订阅的Topic的Partititon变化时协调Rebalance操作。

3.1 consumer rebalance过程

  1. Consumer启动时,先向Broker列表中的任意一个Broker发送ConsumerMetadataRequest,并通过ConsumerMetadataResponse获取它所在Group的Coordinator信息。ConsumerMetadataRequest和ConsumerMetadataResponse的结构如下
ConsumerMetadataRequest
{
  GroupId                => String
}

ConsumerMetadataResponse
{
  ErrorCode              => int16
  Coordinator            => Broker
}
  1. Consumer连接到Coordinator并发送HeartbeatRequest,如果返回的HeartbeatResponse没有任何错误码,Consumer继续fetch数据。若其中包含IllegalGeneration错误码,即说明Coordinator已经发起了Rebalance操作,此时Consumer停止fetch数据,commit offset,并发送JoinGroupRequest给它的Coordinator,并在JoinGroupResponse中获得它应该拥有的所有Partition列表和它所属的Group的新的Generation ID。此时Rebalance完成,Consumer开始fetch数据。相应Request和Response结构如下
HeartbeatRequest
{
  GroupId                => String
  GroupGenerationId      => int32
  ConsumerId             => String
}

HeartbeatResponse
{
  ErrorCode              => int16
}

JoinGroupRequest
{
  GroupId                     => String
  SessionTimeout              => int32
  Topics                      => [String]
  ConsumerId                  => String
  PartitionAssignmentStrategy => String
}

JoinGroupResponse
{
  ErrorCode              => int16
  GroupGenerationId      => int32
  ConsumerId             => String
  PartitionsToOwn        => [TopicName [Partition]]
}
TopicName => String
Partition => int32

3.2 consumer rebalance状态机

  1. Down:Consumer处于停止状态
  2. Start up & discover coordinator:Consumer检测其所在Group的Coordinator。一旦它检测到Coordinator,即向其发送JoinGroupRequest。
  3. Part of a group:该状态下,Consumer已经是该Group的成员,并周期性发送HeartbeatRequest。如HeartbeatResponse包含IllegalGeneration错误码,则转换到Stopped Consumption状态。若连接丢失,HeartbeatResponse包含NotCoordinatorForGroup错误码,则转换到Rediscover coordinator状态。
  4. Rediscover coordinator:该状态下,Consumer不停止消费而是尝试通过发送ConsumerMetadataRequest来探测新的Coordinator,并且等待直到获得无错误码的响应。
  5. Stopped consumption:该状态下,Consumer停止消费并提交offset,直到它再次加入Group。

3.3 故障检测机制

3.3.1 consumer角度来看故障检测

  Consumer成功加入Group后,Consumer和相应的Coordinator同时开始故障探测程序。Consumer向Coordinator发起周期性的Heartbeat(HeartbeatRequest)并等待响应,该周期为 session.timeout.ms/heartbeat.frequency。若Consumer在session.timeout.ms内未收到HeartbeatResponse,或者发现相应的Socket channel断开,它即认为Coordinator已宕机并启动Coordinator探测程序。若Coordinator在session.timeout.ms内没有收到一次HeartbeatRequest,则它将该Consumer标记为宕机状态并为其所在Group触发一次Rebalance操作。

  Coordinator Failover过程中,Consumer可能会在新的Coordinator完成Failover过程之前或之后发现新的Coordinator并向其发送HeatbeatRequest。对于后者,新的Cooodinator可能拒绝该请求,致使该Consumer重新探测Coordinator并发起新的连接请求。如果该Consumer向新的Coordinator发送连接请求太晚,新的Coordinator可能已经在此之前将其标记为宕机状态而将之视为新加入的Consumer并触发一次Rebalance操作。

3.3.2 coordinator角度来看故障检测

  1. 稳定状态下,Coordinator通过上述故障探测机制跟踪其所管理的每个Group下的每个Consumer的健康状态。
  2. 刚启动时或选举完成后,Coordinator从Zookeeper读取它所管理的Group列表及这些Group的成员列表(最新版的应该是从__consumer_offset这个topic里面取读取了)。如果没有获取到Group成员信息,它不会做任何事情直到某个Group中有成员注册进来。
  3. 在Coordinator完成加载其管理的Group列表及其相应的成员信息之前,它将为HeartbeatRequest,OffsetCommitRequest和JoinGroupRequests返回CoordinatorStartupNotComplete错误码。此时,Consumer会重新发送请求。
  4. Coordinator会跟踪被其所管理的任何Consumer Group注册的Topic的Partition的变化,并为该变化触发Rebalance操作。创建新的Topic也可能触发Rebalance,因为Consumer可以在Topic被创建之前就已经订阅它了。
  5. Coordinator发起Rebalance操作流程如下所示(最新版本的0.10.x话,下图需要改动,因为不依赖ZK了)。


Coordinator状态机(0.9.x)

  1. Down:Coordinator不再担任之前负责的Consumer Group的Coordinator
  2. Catch up:该状态下,Coordinator竞选成功,但还未能做好服务相应请求的准备。
  3. Ready:该状态下,新竞选出来的Coordinator已经完成从Zookeeper中加载它所负责管理的所有Group的metadata(0.10.x是从__consumer_offset中获取,注意!!),并可开始接收相应的请求。
  4. Prepare for rebalance:该状态下,Coordinator在所有HeartbeatResponse中返回IllegalGeneration错误码,并等待所有Consumer向其发送JoinGroupRequest后转到Rebalancing状态。
  5. Rebalancing:该状态下,Coordinator已经收到了JoinGroupRequest请求,并增加其Group Generation ID,分配Consumer ID,分配Partition。Rebalance成功后,它会等待接收包含新的Consumer Generation ID的HeartbeatRequest,并转至Ready状态。

3.3.3 Coordinator Failover

如前文所述,Rebalance操作需要经历如下几个阶段

  1. Topic/Partition的改变或者新Consumer的加入或者已有Consumer停止,触发Coordinator注册在Zookeeper上的watch,Coordinator收到通知准备发起Rebalance操作。
  2. Coordinator通过在HeartbeatResponse中返回IllegalGeneration错误码发起Rebalance操作。
  3. Consumer发送JoinGroupRequest
  4. Coordinator在Zookeeper中增加Group的Generation ID并将新的Partition分配情况写入Zookeeper
  5. Coordinator发送JoinGroupResponse

在这个过程中的每个阶段,Coordinator都可能出现故障。下面给出Rebalance不同阶段中Coordinator的Failover处理方式。

  1. 如果Coordinator的故障发生在第一阶段,即它收到Notification并未来得及作出响应,则新的Coordinator将从Zookeeper读取Group的metadata,包含这些Group订阅的Topic列表和之前的Partition分配。如果某个Group所订阅的Topic数或者某个Topic的Partition数与之前的Partition分配不一致,亦或者某个Group连接到新的Coordinator的Consumer数与之前Partition分配中的不一致,新的Coordinator会发起Rebalance操作。
  2. 如果失败发生在阶段2,它可能对部分而非全部Consumer发出带错误码的HeartbeatResponse。与第上面第一种情况一样,新的Coordinator会检测到Rebalance的必要性并发起一次Rebalance操作。如果Rebalance是由Consumer的失败所触发并且Cosnumer在Coordinator的Failover完成前恢复,新的Coordinator不会为此发起新的Rebalance操作。
  3. 如果Failure发生在阶段3,新的Coordinator可能只收到部分而非全部Consumer的JoinGroupRequest。Failover完成后,它可能收到部分Consumer的HeartRequest及另外部分Consumer的JoinGroupRequest。与第1种情况类似,它将发起新一轮的Rebalance操作。
  4. 如果Failure发生在阶段4,即它将新的Group Generation ID和Group成员信息写入Zookeeper后。新的Generation ID和Group成员信息以一个原子操作一次性写入Zookeeper。Failover完成后,Consumer会发送HeartbeatRequest给新的Coordinator,并包含旧的Generation ID。此时新的Coordinator通过在HeartbeatResponse中返回IllegalGeneration错误码发起新的一轮Rebalance。这也解释了为什么每次HeartbeatRequest中都需要包含Generation ID和Consumer ID。
  5. 如果Failure发生在阶段5,旧的Coordinator可能只向Group中的部分Consumer发送了JoinGroupResponse。收到JoinGroupResponse的Consumer在下次向已经失效的Coordinator发送HeartbeatRequest或者提交Offset时会检测到它已经失败。此时,它将检测新的Coordinator并向其发送带有新的Generation ID 的HeartbeatRequest。而未收到JoinGroupResponse的Consumer将检测新的Coordinator并向其发送JoinGroupRequest,这将促使新的Coordinator发起新一轮的Rebalance。

参考资料:

  1. Kafka设计解析(四)- Kafka Consumer设计解析