1. 介绍

以前分享过一篇文章kafka原理以及设计实现思想,但是很多东西讲的还是不够深入。今天这篇文章主要分析下kafka的高可用原理。

文章主要内容转自Kafka设计解析(二)- Kafka High Availability (上),我只选择了一些自己尚未掌握的或者自己认为比较重要的内容作总结。

关于replication、ISR、分区复制、leader、controller等基本概念在以前的文章都已经说过了。这里就直接就针对某些概念进行更加细致的学习。

2. replication

2.1 replica复制算法

  1. 将所有Broker(假设共n个Broker)和待分配的Partition排序
  2. 将第i个Partition分配到第(i mod n)个Broker上
  3. 将第i个Partition的第j个Replica分配到第((i + j) mod n)个Broker上

2.2 broker活着的判定

kafka判定broker是否活着,通过以下2个方式:

  1. 和zk的session没有断(通过心跳来维系)
  2. follower能及时将leader消息复制过来,不能落后太多(例如默认lag超过4000就会踢出ISR)

2.3 所有replica都不工作的情况

如果所有副本都出问题,一般有两种选择:

  1. 等待ISR中的任一个Replica“活”过来,并且选它作为Leader(一致性好,但是可用性差)
  2. 选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader(一致性差,但是可用性相比第一种方式好)

3. 分区leader选举方法——ZK vs controller

一般比较容易想到的一个方法是:所有Follower都在Zookeeper上设置一个Watch,一旦Leader宕机,其对应的ephemeral znode会自动删除,此时所有Follower都尝试创建该节点,而创建成功者(Zookeeper保证只有一个能创建成功)即是新的Leader,其它Replica即为Follower。

该方法会存在3个问题:

  1. split-brain 这是由Zookeeper的特性引起的,虽然Zookeeper能保证所有Watch按顺序触发,但并不能保证同一时刻所有Replica“看”到的状态是一样的,这就可能造成不同Replica的响应不一致
  2. herd effect 如果宕机的那个Broker上的Partition比较多,会造成多个Watch被触发,造成集群内大量的调整
  3. Zookeeper负载过重 每个Replica都要为此在Zookeeper上注册一个Watch,当集群规模增加到几千个Partition时Zookeeper负载会过重。

改进的方法——所有broker中选出一个controller,所有Partition的Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的方式(比Zookeeper Queue的方式更高效)通知需为此作出响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。

4. HA相关ZK结构

 (本节所示Zookeeper结构中,实线框代表路径名是固定的,而虚线框代表路径名与业务相关)
  admin (该目录下znode只有在有相关操作时才会存在,操作结束时会将其删除)

4.1 preferred_replica_election数据结构

/admin/preferred_replica_election数据结构

Schema:
{
      "fields":[
         {
            "name":"version",
            "type":"int",
            "doc":"version id"
         },
         {
            "name":"partitions",
            "type":{
               "type":"array",
               "items":{
                  "fields":[
                     {
                        "name":"topic",
                        "type":"string",
                        "doc":"topic of the partition for which preferred replica election should be triggered"
                     },
                     {
                        "name":"partition",
                        "type":"int",
                        "doc":"the partition for which preferred replica election should be triggered"
                     }
                  ],
               }
               "doc":"an array of partitions for which preferred replica election should be triggered"
            }
         }
      ]
   }

   Example:     
   {
     "version": 1,
     "partitions":
        [
           {
               "topic": "topic1",
               "partition": 8         
           },
           {
               "topic": "topic2",
               "partition": 16        
           }
        ]            
   }

4.2 reassign_partitions数据结构

 /admin/reassign_partitions用于将一些Partition分配到不同的broker集合上。对于每个待重新分配的Partition,Kafka会在该znode上存储其所有的Replica和相应的Broker id。该znode由管理进程创建并且一旦重新分配成功它将会被自动移除。其数据结构如下

  Schema:
{
      "fields":[
         {
            "name":"version",
            "type":"int",
            "doc":"version id"
         },
         {
            "name":"partitions",
            "type":{
               "type":"array",
               "items":{
                  "fields":[
                     {
                        "name":"topic",
                        "type":"string",
                        "doc":"topic of the partition to be reassigned"
                     },
                     {
                        "name":"partition",
                        "type":"int",
                        "doc":"the partition to be reassigned"
                     },
                     {
                        "name":"replicas",
                        "type":"array",
                        "items":"int",
                        "doc":"a list of replica ids"
                     }
                  ],
               }
               "doc":"an array of partitions to be reassigned to new replicas"
            }
         }
      ]
   }

   Example:
   {
     "version": 1,
     "partitions":
        [
           {
               "topic": "topic3",
               "partition": 1,
               "replicas": [1, 2, 3]
           }
        ]            
   }

4.3 delete_topics数据结构

Schema:
{ "fields":
    [ {"name": "version", "type": "int", "doc": "version id"},
      {"name": "topics",
       "type": { "type": "array", "items": "string", "doc": "an array of topics to be deleted"}
      } ]
}

Example:
{
  "version": 1,
  "topics": ["topic4", "topic5"]
}

4.4 brokers数据结构

broker(即/brokers/ids/[brokerId])存储“活着”的Broker信息。数据结构如下

Schema:
{ "fields":
    [ {"name": "version", "type": "int", "doc": "version id"},
      {"name": "host", "type": "string", "doc": "ip address or host name of the broker"},
      {"name": "port", "type": "int", "doc": "port of the broker"},
      {"name": "jmx_port", "type": "int", "doc": "port for jmx"}
    ]
}

Example:
{
    "jmx_port":-1,
    "host":"node1",
    "version":1,
    "port":9092
}

4.5 brokers/topics数据结构

 topic注册信息(/brokers/topics/[topic]),存储该Topic的所有Partition的所有Replica所在的Broker id,第一个Replica即为Preferred Replica,对一个给定的Partition,它在同一个Broker上最多只有一个Replica,因此Broker id可作为Replica id。数据结构如下

Schema:
{ "fields" :
    [ {"name": "version", "type": "int", "doc": "version id"},
      {"name": "partitions",
       "type": {"type": "map",
                "values": {"type": "array", "items": "int", "doc": "a list of replica ids"},
                "doc": "a map from partition id to replica list"},
      }
    ]
}
Example:
{
    "version":1,
    "partitions":
        {"12":[6],
        "8":[2],
        "4":[6],
        "11":[5],
        "9":[3],
        "5":[7],
        "10":[4],
        "6":[8],
        "1":[3],
        "0":[2],
        "2":[4],
        "7":[1],
        "3":[5]}
}

4.6 /brokers/topics/[topic]/partitions数据结构

 partition state(/brokers/topics/[topic]/partitions/[partitionId]/state) 结构如下

Schema:
{ "fields":
    [ {"name": "version", "type": "int", "doc": "version id"},
      {"name": "isr",
       "type": {"type": "array",
                "items": "int",
                "doc": "an array of the id of replicas in isr"}
      },
      {"name": "leader", "type": "int", "doc": "id of the leader replica"},
      {"name": "controller_epoch", "type": "int", "doc": "epoch of the controller that last updated the leader and isr info"},
      {"name": "leader_epoch", "type": "int", "doc": "epoch of the leader"}
    ]
}

Example:
{
    "controller_epoch":29,
    "leader":2,
    "version":1,
    "leader_epoch":48,
    "isr":[2]
}

PS:
Controller epoch:
此值为一个数字,kafka集群中第一个broker第一次启动时为1,以后只要集群中center controller中央控制器所在broker变更或挂掉,就会重新选举新的center controller,每次center controller变更controller_epoch值就会 + 1;

4.7 controller数据结构

/controller -> int (broker id of the controller)存储当前controller的信息

Schema:
{ "fields":
    [ {"name": "version", "type": "int", "doc": "version id"},
      {"name": "brokerid", "type": "int", "doc": "broker id of the controller"}
    ]
}
Example:
{
    "version":1,
  "brokerid":8
}

 /controller_epoch -> int (epoch)直接以整数形式存储controller epoch,而非像其它znode一样以JSON字符串形式存储。      

5 各组件failover过程

5.1 broker failover过程

  1. Controller在Zookeeper注册Watch,一旦有Broker宕机(这是用宕机代表任何让系统认为其die的情景,包括但不限于机器断电,网络不可用,GC导致的Stop The World,进程crash等),其在Zookeeper对应的znode会自动被删除,Zookeeper会fire Controller注册的watch,Controller读取最新的幸存的Broker
  2. Controller决定set_p,该集合包含了宕机的所有Broker上的所有Partition
  3. 对set_p中的每一个Partition
      3.1 从/brokers/topics/[topic]/partitions/[partition]/state读取该Partition当前的ISR
      3.2 决定该Partition的新Leader。如果当前ISR中有至少一个Replica还幸存,则选择其中一个作为新Leader,新的ISR则包含当前ISR中所有幸存的Replica。否则选择该Partition中任意一个幸存的Replica作为新的Leader以及ISR(该场景下可能会有潜在的数据丢失)。如果该Partition的所有Replica都宕机了,则将新的Leader设置为-1。
      3.3 将新的Leader,ISR和新的leader_epoch及controller_epoch写入/brokers/topics/[topic]/partitions/[partition]/state。注意,该操作只有其version在3.1至3.3的过程中无变化时才会执行,否则跳转到3.1

  4. 直接通过RPC向set_p相关的Broker发送LeaderAndISRRequest命令。Controller可以在一个RPC操作中发送多个命令从而提高效率。
      Broker failover顺序图如下所示。

LeaderAndIsrRequest结构如下

LeaderAndIsrResponse结构如下

5.2 Controller Failover

Controller也需要Failover。每个Broker都会在Controller Path (/controller)上注册一个Watch。当前Controller失败时,对应的Controller Path会自动消失(因为它是Ephemeral Node),此时该Watch被fire,所有“活”着的Broker都会去竞选成为新的Controller(创建新的Controller Path),但是只会有一个竞选成功(这点由Zookeeper保证)。竞选成功者即为新的Leader,竞选失败者则重新在新的Controller Path上注册Watch。因为Zookeeper的Watch是一次性的,被fire一次之后即失效,所以需要重新注册。

Broker成功竞选为新Controller后会触发KafkaController.onControllerFailover方法,并在该方法中完成如下操作:

  1. 读取并增加Controller Epoch。
  2. 在ReassignedPartitions Path(/admin/reassign_partitions)上注册Watch。
  3. 在PreferredReplicaElection Path(/admin/preferred_replica_election)上注册Watch。
  4. 通过partitionStateMachine在Broker Topics Patch(/brokers/topics)上注册Watch。
  5. 若delete.topic.enable设置为true(默认值是false),则partitionStateMachine在Delete Topic Patch(/admin/delete_topics)上注册Watch。
  6. 通过replicaStateMachine在Broker Ids Patch(/brokers/ids)上注册Watch。
  7. 初始化ControllerContext对象,设置当前所有Topic,“活”着的Broker列表,所有Partition的Leader及ISR等。
  8. 启动replicaStateMachine和partitionStateMachine。
  9. 将brokerState状态设置为RunningAsController。
  10. 将每个Partition的Leadership信息发送给所有“活”着的Broker。
  11. 若auto.leader.rebalance.enable配置为true(默认值是true),则启动partition-rebalance线程。
  12. 若delete.topic.enable设置为true且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。

6. 其他过程

6.1 创建/删除Topic过程

  1. Controller在Zookeeper的/brokers/topics节点上注册Watch,一旦某个Topic被创建或删除,则Controller会通过Watch得到新创建/删除的Topic的Partition/Replica分配。
  2. 对于删除Topic操作,Topic工具会将该Topic名字存于/admin/delete_topics。若delete.topic.enable为true,则Controller注册在/admin/delete_topics上的Watch被fire,Controller通过回调向对应的Broker发送StopReplicaRequest;若为false则Controller不会在/admin/delete_topics上注册Watch,也就不会对该事件作出反应,此时Topic操作只被记录而不会被执行。
    对于创建Topic操作,Controller从/brokers/ids读取当前所有可用的Broker列表,对于set_p中的每一个Partition:
      3.1 从分配给该Partition的所有Replica(称为AR)中任选一个可用的Broker作为新的Leader,并将AR设置为新的ISR(因为该Topic是新创建的,所以AR中所有的Replica都没有数据,可认为它们都是同步的,也即都在ISR中,任意一个Replica都可作为Leader)
      3.2 将新的Leader和ISR写入/brokers/topics/[topic]/partitions/[partition]

  3. 直接通过RPC向相关的Broker发送LeaderAndISRRequest。
      创建Topic顺序图如下所示。

6.2 broker响应请求流程

  Broker通过kafka.network.SocketServer及相关模块接受各种请求并作出响应。整个网络通信模块基于Java NIO开发,并采用Reactor模式,其中包含1个Acceptor负责接受客户请求,N个Processor负责读写数据,M个Handler处理业务逻辑。

  Acceptor的主要职责是监听并接受客户端(请求发起方,包括但不限于Producer,Consumer,Controller,Admin Tool)的连接请求,并建立和客户端的数据传输通道,然后为该客户端指定一个Processor,至此它对该客户端该次请求的任务就结束了,它可以去响应下一个客户端的连接请求了。其核心代码如下。

Processor主要负责从客户端读取数据并将响应返回给客户端,它本身并不处理具体的业务逻辑,并且其内部维护了一个队列来保存分配给它的所有SocketChannel。Processor的run方法会循环从队列中取出新的SocketChannel并将其SelectionKey.OP_READ注册到selector上,然后循环处理已就绪的读(请求)和写(响应)。Processor读取完数据后,将其封装成Request对象并将其交给RequestChannel。

  RequestChannel是Processor和KafkaRequestHandler交换数据的地方,它包含一个队列requestQueue用来存放Processor加入的Request,KafkaRequestHandler会从里面取出Request来处理;同时它还包含一个respondQueue,用来存放KafkaRequestHandler处理完Request后返还给客户端的Response。
  Processor会通过processNewResponses方法依次将requestChannel中responseQueue保存的Response取出,并将对应的SelectionKey.OP_WRITE事件注册到selector上。当selector的select方法返回时,对检测到的可写通道,调用write方法将Response返回给客户端。
  KafkaRequestHandler循环从RequestChannel中取Request并交给kafka.server.KafkaApis处理具体的业务逻辑。

6.3 LeaderAndIsrRequest响应过程

 对于收到的LeaderAndIsrRequest(那些准备同步副本的broker收到该请求),Broker主要通过ReplicaManager的becomeLeaderOrFollower处理,流程如下:

  1. 若请求中controllerEpoch小于当前最新的controllerEpoch,则直接返回ErrorMapping.StaleControllerEpochCode。
  2. 对于请求中partitionStateInfos中的每一个元素,即((topic, partitionId), partitionStateInfo):
      2.1 若partitionStateInfo中的leader epoch大于当前ReplicManager中存储的(topic, partitionId)对应的partition的leader epoch,则:
        2.1.1 若当前brokerid(或者说replica id)在partitionStateInfo中,则将该partition及partitionStateInfo存入一个名为partitionState的HashMap中
        2.1.2否则说明该Broker不在该Partition分配的Replica list中,将该信息记录于log中
      2.2否则将相应的Error code(ErrorMapping.StaleLeaderEpochCode)存入Response中

  3. 筛选出partitionState中Leader与当前Broker ID相等的所有记录存入partitionsTobeLeader中,其它记录存入partitionsToBeFollower中。

  4. 若partitionsTobeLeader不为空,则对其执行makeLeaders方。

  5. 若partitionsToBeFollower不为空,则对其执行makeFollowers方法。

  6. 若highwatermak线程还未启动,则将其启动,并将hwThreadInitialized设为true。

  7. 关闭所有Idle状态的Fetcher。
      LeaderAndIsrRequest处理过程如下图所示

对于上面第二点,我补充几句方便理解: 第二点的意思就是,某个分区,如果当前的broker-id和其partitionStateInfo一致的话,则将其加入partitionState中。该对象保存的都是需要保存在该broker上的分区(可能是follower也可能是leader,后面再做进一步leader的筛选)。

6.4 broker启动过程

Broker启动后首先根据其ID在Zookeeper的/brokers/idszonde下创建临时子节点(Ephemeral node),创建成功后Controller的ReplicaStateMachine注册其上的Broker Change Watch会被fire,从而通过回调KafkaController.onBrokerStartup方法完成以下步骤:

  1. 向所有新启动的Broker发送UpdateMetadataRequest,其定义如下。

  1. 将新启动的Broker上的所有Replica设置为OnlineReplica状态,同时这些Broker会为这些Partition启动high watermark线程。
  2. 通过partitionStateMachine触发OnlinePartitionStateChange。

6.5 Partition重新分配流程

管理工具发出重新分配Partition请求后,会将相应信息写到/admin/reassign_partitions上,而该操作会触发ReassignedPartitionsIsrChangeListener,从而通过执行回调函数KafkaController.onPartitionReassignment来完成以下操作:

  1. 将Zookeeper中的AR(Current Assigned Replicas)更新为OAR(Original list of replicas for partition) + RAR(Reassigned replicas)。
  2. 强制更新Zookeeper中的leader epoch,向AR中的每个Replica发送LeaderAndIsrRequest。
  3. 将RAR - OAR中的Replica设置为NewReplica状态。
  4. 等待直到RAR中所有的Replica都与其Leader同步。
  5. 将RAR中所有的Replica都设置为OnlineReplica状态。
  6. 将Cache中的AR设置为RAR。
    7.若Leader不在RAR中,则从RAR中重新选举出一个新的Leader并发送LeaderAndIsrRequest。若新的Leader不是从RAR中选举而出,则还要增加Zookeeper中的leader epoch。

  7. 将OAR - RAR中的所有Replica设置为OfflineReplica状态,该过程包含两部分。第一,将Zookeeper上ISR中的OAR - RAR移除并向Leader发送LeaderAndIsrRequest从而通知这些Replica已经从ISR中移除;第二,向OAR - RAR中的Replica发送StopReplicaRequest从而停止不再分配给该Partition的Replica。

  8. 将OAR - RAR中的所有Replica设置为NonExistentReplica状态从而将其从磁盘上删除。

  9. 将Zookeeper中的AR设置为RAR。

  10. 删除/admin/reassign_partition。

注意:最后一步才将Zookeeper中的AR更新,因为这是唯一一个持久存储AR的地方,如果Controller在这一步之前crash,新的Controller仍然能够继续完成该过程。
  以下是Partition重新分配的案例,OAR = {1,2,3},RAR = {4,5,6},Partition重新分配过程中Zookeeper中的AR和Leader/ISR路径如下

AR leader/isr step
{1,2,3} 1/{1,2,3} (initial state)
{1,2,3,4,5,6} 1/{1,2,3} (step 2)
{1,2,3,4,5,6} 1/{1,2,3,4,5,6} (step 4)
{1,2,3,4,5,6} 4/{1,2,3,4,5,6} (step 7)
{1,2,3,4,5,6} 4/{4,5,6} (step 8)

{4,5,6} 4/{4,5,6} (step 10)

6.6 Follower从Leader Fetch数据流程

Follower通过向Leader发送FetchRequest获取消息,FetchRequest结构如下


从FetchRequest的结构可以看出,每个Fetch请求都要指定最大等待时间和最小获取字节数,以及由TopicAndPartition和PartitionFetchInfo构成的Map。实际上,Follower从Leader数据和Consumer从Broker Fetch数据,都是通过FetchRequest请求完成,所以在FetchRequest结构中,其中一个字段是clientID,并且其默认值是ConsumerConfig.DefaultClientId。

Leader收到Fetch请求后,Kafka通过KafkaApis.handleFetchRequest响应该请求,响应过程如下:

  1. replicaManager根据请求读出数据存入dataRead中。
  2. 如果该请求来自Follower则更新其相应的LEO(log end offset)以及相应Partition的High Watermark
  3. 根据dataRead算出可读消息长度(单位为字节)并存入bytesReadable中。
  4. 满足下面4个条件中的1个,则立即将相应的数据返回
  • Fetch请求不希望等待,即fetchRequest.macWait <= 0
  • Fetch请求不要求一定能取到消息,即fetchRequest.numPartitions <= 0,也即requestInfo为空
  • 有足够的数据可供返回,即bytesReadable >= fetchRequest.minBytes
  • 读取数据时发生异常

Leader通过以FetchResponse的形式将消息返回给Follower,FetchResponse结构如下

参考资料:

  1. Kafka设计解析(二)- Kafka High Availability (上)