1.基本介绍

kafka是一个分布式消息系统中间件,主要在分布式环境下为各个系统提供消息传递服务。其最令人印象深刻的特点是高吞吐量、超强消息堆积、持久化能力、快速的消息get、put。

基本介绍环节对kafka的主要组成部分以及一些名字做一些解释。

PS: 我们这里部分设置细节以kafka.0.10.x为例

1.1 consumer,producer,broker

从上图可以看到kafka的核心组件是Producer、broker和consumer。其名次定义如下:

  1. Consumer:用于从Broker中取出/消费Message。
  2. Producer:用于往Broker中发送/生产Message。
  3. Broker:Kafka中使用Broker来接受Producer和Consumer的请求,并把Message持久化到本地磁盘。每个Cluster当中会选举出一个Broker来担任Controller,负责处理Partition的Leader选举,协调Partition迁移等工作。

注意:以上组件在分布式环境下均可以是多个,支持故障转移。同时ZK仅和broker和consumer相关。值得注意的是broker的设计是无状态的,消费的状态信息依靠消费者自己维护,通过一个offset偏移量。client和server之间通信采用TCP协议。

1.2 topic和partition

该图可以看到,消息是按照主题来提交到Partition当中的。Partition当中的消息是有序的,consumer从一个有序的分区消息队列中顺序获取消息。相关名次定义如下:

1.Topic:用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上。
2.Partition:是Kafka中横向扩展和一切并行化的基础,每个Topic都至少被切分为1个Partition。
3.offset:消息在Partition中的编号,编号顺序不跨Partition。

总结如下:
1.分区目的:Kafka中采用分区的设计有几个目的。一是可以处理更多的消息,不受单台服务器的限制。Topic拥有多个分区意味着它可以不受限的处理更多的数据。第二,分区可以作为并行处理的单元。
2.offset:由消费者控制offset,因此分区本身所在broker是无状态的。消费者可以自由控制offset,很灵活
3.同个分区内有序消费:每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。分区中的消息都被分配了一个序列号,称之为偏移量(offset),在每个分区中此偏移量都是唯一的。

1.3 partition复制

每个分区都有自己的镜像分区,来保证分区的高可用。其中一个称为leader。如果leader挂掉了,也会有相应的选举算法来选新的leader。

1.Replication:Kafka支持以Partition为单位对Message进行冗余备份,每个Partition都可以配置至少1个Replication(当仅1个Replication时即仅该Partition本身)。

2.Leader:每个Replication集合中的Partition都会选出一个唯一的Leader,所有的读写请求都由Leader处理。其他Replicas从Leader处把数据更新同步到本地,过程类似大家熟悉的MySQL中的Binlog同步。
3.ISR(In-Sync Replica):是Replicas的一个子集,表示目前Alive且与Leader能够“Catch-up”的Replicas集合。由于读写都是首先落到Leader上,所以一般来说通过同步机制从Leader上拉取数据的Replica都会和Leader有一些延迟(包括了延迟时间和延迟条数两个维度),任意一个超过阈值都会把该Replica踢出ISR。每个Partition都有它自己独立的ISR。

1.4 消费者组

消费者组提供两种消费TOPIC的方式:

  1. 只有一个消费者组:保证消费者组内负载均衡的读取消息
  2. 多个消费者组:每个消费者组理解为一个独立的个体看成订阅了tipic。下图可以看到分区上的消息会完整、负载均衡地广播给一个消费者组内的消费者

1.5 消息分发语义(消息可靠性问题)

对于kafka来说,以下两个方面来保障消息分发的可靠性:
消息发送的可靠性保障(producer)
消息消费的可靠性保障(consumer)

1.5.1 消息发送可靠性保证

  1. acks=0 --- producer不等待broker的acks。发送的消息可能丢失,但永远不会重发。
  2. acks=1 --- leader不等待其他follower同步,leader直接写log然后发送acks给producer。这种情况下会有重发现象,可靠性比only once好点,但是仍然会丢消息。例如leader挂了了,但是其他replication还没同步完成。
  3. acks=all --- leader等待所有follower同步完成才返回acks。消息可靠不丢失(丢了会重发),没收到ack会重发。

PS: 当acks=all的时候,就算你设置retries=0也依然会重发,详情见kafka client 官方API文档

文中查看KafkaProducer类,下面有如下解释

acks在producer配置文件中配置
0.10版的kafka参数配置具体见apache kafka官网

1.5.2 消息消费可靠性保障

消费者的可靠性保障(关键是保存offset的时机):

  1. 至多一次(at most once):读取消息->保存offset->处理消息。处理消息时崩溃则会丢失消息,因为此时offset已经改变了。
  2. 至少一次(at least once):读取消息->处理消息->保存offset。保存offset失败,会造成重复消费,但是不会丢消息。如果重读消费时幂等操作,那就不会出现重复消息了。前面2个步骤失败可以在offset位置重新消费。
  3. 有且仅有一次(exactly once):保存offset和处理消息这两个环节采用two-phase commit(2PC)。但是,在Kafka中,一种更简单的方法就是可以把offset和处理后的结果一起存储。有点把处理结果和offset做成原子性的感觉。这样可以避免重复消费。

Kafka可以从自己的偏移量仓库读取偏移量操作偏移量。

1.5.3 broker消息发布可靠性保障

当发送者由于网络问题导致重发,这时候可能会产生消息重复消费。当然消费者可以自己做处理来避免重复消费,例如全局唯一ID。关于这个问题官方文档是这么说的(4.6节):

也就是说broker在分发消息的时候暂还不支持exactly once的分发语义。

2. 组件实现和设计原理

2.1 分区

之所以设计分区的概念是从以下几个角度来考虑的:

2.1.1 灵活性(负载均衡控制、灵活消费)

  1. Kafka允许Partition在集群内的Broker之间任意移动,以此来均衡可能存在的数据倾斜问题。
  2. Partition支持自定义的分区算法,例如可以将同一个Key的所有消息都路由到同一个Partition上去。
  3. 同时Leader也可以在In-Sync的Replica中迁移。由于针对某一个Partition的所有读写请求都是只由Leader来处理,所以Kafka会尽量把Leader均匀的分散到集群的各个节点上,以免造成网络流量过于集中。
  4. 分区有偏移量的概念。消费者通过控制偏移量,可以灵活的消费消息。

2.1.2 并发性

任意Partition在某一个时刻只能被一个Consumer Group内的一个Consumer消费(反过来一个Consumer则可以同时消费多个Partition),Kafka非常简洁的Offset机制最小化了Broker和Consumer之间的交互,这使Kafka并不会像同类其他消息队列一样,随着下游Consumer数目的增加而成比例的降低性能。此外,如果多个Consumer恰巧都是消费时间序上很相近的数据,可以达到很高的PageCache命中率,因而Kafka可以非常高效的支持高并发读操作,实践中基本可以达到单机网卡上限。

2.1.3 高可用

分区采用leader-follower的组织架构来保证高可用

2.1.4 分区有序消费

kafka中每个分区都是一个顺序、不可变的消息队列。提供一个分区内顺序消费的语义

2.2 消费者

2.2.1 high-level api vs low-level api

Consumer API分为High level和Low level两种。前一种重度依赖Zookeeper,所以性能差一些且不自由,但是超省心。第二种不依赖Zookeeper服务,无论从自由度和性能上都有更好的表现,但是所有的异常(Leader迁移、Offset越界、Broker宕机等)和Offset的维护都需要自行处理。

总结:
high level api: zookeeper自动管理offset,自动获取last offset,包括leader迁移、broker宕机都自动化管理
low level api :手动管理offset、leader迁移、broker宕机的事情

在kafka 0.9.x以后,虽然仍然支持使用这两种API,但是建议还是使用新的new consumer(消除了这两类API使用的区别,一套API可以自由选择按照low level来使用还是high level来使用)来代替这两种consumer api。详情见kafka官方文档的2.2节

2.2.2 为什么kafka是pull模型

消费者应该从broker中pull数据还是broker应该向消费者push数据,在这方面,kafka遵循比较传统的设计,大多数消息系统,生产者推消息到broker,消费者从broker拉取消息,一些logging-centric的系统,比如 Scribe 和Apache Flume ,采用非常不同的push模式。事实上,push模式和pull模式各有优劣。push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
基于pull模式的另一个优点是,它有助于积极的批处理的数据发送到消费者。基于push模式必须选择要么立即发送请求或者积累更多的数据,稍后发送它,无论消费者是否能立刻处理它,如果是低延迟,这将导致短时间只发送一条消息,不用缓存,这是实在是一种浪费,基于pull的设计解决这个问题,消费者总是pull在日志的当前位置之后pull所有可用的消息(或配置一些大size),所以消费者可设置消费多大的量,也不会引入不必要的等待时间。

2.3 生产者

2.3.1 生产者上的优化方法

  1. 数据重排序、MessageSet等手段来使得消息批量顺序写入
  2. 数据压缩
  3. 异步发送
  4. 负载均衡

2.3.2 消息丢失问题

不过Kafka采用MessageSet也导致在可用性上一定程度的妥协。每次发送数据时,Producer都是send()之后就认为已经发送出去了,但其实大多数情况下消息还在内存的MessageSet当中,尚未发送到网络,这时候如果Producer挂掉,那就会出现丢数据的情况。
解决办法:
采用网络中的ack机制。当然这种是可选的。通过配置acks的值来控制。

  1. acks=0:关闭ack,以全速发送
  2. acks=1:消息只需要被Leader接收并确认即可,其他的Replica可以进行异步拉取无需立即进行确认,在保证可靠性的同时又不会把效率拉得很低
  3. acks=all:消息要Commit到该Partition的ISR集合中的所有Replica后,才可以返回ack,消息的发送会更安全,而整个过程的延迟会随着Replica的数量正比增长,这里就需要根据不同的需求做相应的优化。

3. kafka如何做到大吞吐量、强大消息堆积能力等特性

3.1 依赖OS文件系统的页缓存

当上层有写操作时,操作系统只是将数据写入PageCache,同时标记Page属性为Dirty。当读操作发生时,先从PageCache中查找,如果发生缺页才进行磁盘调度,最终返回需要的数据。实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用。同时如果有其他进程申请内存,回收PageCache的代价又很小。
总结:依赖OS的页缓存能大量减少IO,高效利用内存来作为缓存

3.2 为什么不使用JVM缓存数据

JVM为我们提供了强大的GC能力,同时也引入了一些问题不适用与Kafka的设计。

  1. 如果在Heap内管理缓存,JVM的GC线程会频繁扫描Heap空间,带来不必要的开销。如果Heap过大,执行一次Full GC对系统的可用性来说将是极大的挑战。
  2. 所有在在JVM内的对象都不免带有一个Object Overhead(千万不可小视),内存的有效空间利用率会因此降低。
  3. 所有的In-Process Cache在OS中都有一份同样的PageCache。所以通过将缓存只放在PageCache,可以至少让可用缓存空间翻倍。
  4. 如果Kafka重启,所有的In-Process Cache都会失效,而OS管理的PageCache依然可以继续使用。

总结:利用OS来缓存,内存利用率高!

3.3 顺序IO以及常量时间get、put消息

顺序IO:只采用顺序IO不仅可以利用RAID技术带来很高的吞吐量,同时可以利用队列来提供常量时间的get和put。这样获取消息的效率也就是O(1)了。这种设计方法使得消息访问速度和消息堆积的量剥离了联系。而且操作系统对顺序IO都会进行优化,提升整体顺序IO的性能

3.4 sendfile技术(零拷贝)

传统网络IO流程:

  1. OS 从硬盘把数据读到内核区的PageCache。
  2. 用户进程把数据从内核区Copy到用户区。
  3. 然后用户进程再把数据写入到Socket,数据流入内核区的Socket Buffer上。
  4. OS 再把数据从Buffer中Copy到网卡的Buffer上,这样完成一次发送。

整个过程共经历两次Context Switch,四次System Call。同一份数据在内核Buffer与用户Buffer之间重复拷贝,效率低下。其中2、3两步没有必要,完全可以直接在内核区完成数据拷贝。这也正是Sendfile所解决的问题,经过Sendfile优化后,整个I/O过程就变成了下面这个样子。

3.5 消息压缩

Producer支持End-to-End的压缩。数据在本地压缩后放到网络上传输,在Broker一般不解压(除非指定要Deep-Iteration),直至消息被Consume之后在客户端解压。
当然用户也可以选择自己在应用层上做压缩和解压的工作(毕竟Kafka目前支持的压缩算法有限,只有GZIP和Snappy),不过这样做反而会意外的降低效率!!!! Kafka的End-to-End压缩与MessageSet配合在一起工作效果最佳,上面的做法直接割裂了两者间联系。至于道理其实很简单,压缩算法中一条基本的原理“重复的数据量越多,压缩比越高”。无关于消息体的内容,无关于消息体的数量,大多数情况下输入数据量大一些会取得更好的压缩比。

4. kafka分区repelication实现高可用原理

kafka每个主题分区的复制日志跨多个可配置的服务器(可设置 topic-by-topic 的复制因子),允许自动故障转到这些副本,当集群服务器发生故障时,消息仍可用。
kafka通过分区的复制,来实现高可用。当leader挂了,可以重新选举新的leader来保证消费的高可用.

4.1 选举算法(选日志最完整的作为新leader)

和ZK不同,不采用议员投票(Quorum)的方式,而是选取复制日志最完整的节点作为leader。这里相比Quorum就需要一些额外的操作,比如判断到底怎样才算是“日志最完整”,这样就需要一些额外的开销。
kafka采用了一种稍微不同的方法选择quorum集,而不是多数投票,kafka动态维护一组同步副本(ISR),就是以后的leader,只有这个组的成员才又资格当选leader,kafka副本写入不被认为是已提交,直到所有的同步副本已经接收才认为。这组ISR保存在zookeeper,正因为如此,在ISR中的任何副本都有资格当选leader,这是kafka的使用模型,有多个分区和确保leader平衡是很重要的一个重要因素。有了这个模型,ISR和f+1副本,kafka的主题可以容忍f失败而不会丢失已提交的消息。
这种投票表决的方式有一个非常好的特性:仅依赖速度最快的服务器,也就是说,如果复制因子为三个,由最快的一个来确定。

如何定义一个节点是活着:
与大多数分布式系统自动处理失败需要精确的定义一个节点什么是“活着”,对于kafka的节点活着有2个条件:

  1. 一个节点必须能维持与zookeeper的会话(通过zookeeper的心跳机制)

  2. 如果它是一个slave,它必须复制写入的leader并且不能落后"太多"

我们让节点满足这2个条件为“同步”,以避免分不清楚是“活着”还是“故障”。leader跟踪“同步”节点。如果一个follower死掉,被卡住了,或落后,leader将从同步副本列表中移除它。卡住和落后的副本规则是通过replica.lag.time.max.ms配置控制。
当所有同步副本、分区已经应用自己的日志,消息才被认为是“已提交”,只有已提交的消息给消费者,这意味着消费者不必担心会看到如果leader失败时可能丢失的消息。生产者,另一方面,可以选择要么等待消息“已提交”要么不等,取决于他们的偏好延迟还是耐久性。可通过设置生产者的 request.required.acks 。

kafka提供担保,在任何时候,只要至少有一个同步副本活着,已提交的消息就不会丢失。

4.2 为什么不采用类似zookeeper的quorum方法

多数投票的缺点是,它不需要通过很多次的失败来让你没有候选人,容忍一次失败需要3个数据副本,容忍2个故障需要5个数据副本。实际的系统以我们的经验只能容忍单个故障的冗余是不够的,但是如果5个数据副本,每个写5次,5倍的磁盘空间要求,1/5的吞吐量,对于大数据量不实用,这可能是quorum算法更通常在共享集群配置。如zookeeper,用于主数据存储不太常见。例如,在HDFS namenode的高可用性特性是建立在majority-vote-based日报,但这更昂贵的方法不用于数据本身。

总结:zk的quorum选举适用在共享集群配置而不是主数据存储。因为其吞吐量低,容忍故障所需要的冗余副本比较多

5. 使用kafka时候的心得

5.1 分区

  1. Partition的数量尽量提前预分配,虽然可以在后期动态增加Partition,但是会冒着可能破坏Message Key和Partition之间对应关系的风险。
  2. Replica的数量不要过多,如果条件允许尽量把Replica集合内的Partition分别调整到不同的Rack。
  3. 尽一切努力保证每次停Broker时都可以Clean Shutdown,否则问题就不仅仅是恢复服务所需时间长,还可能出现数据损坏或其他很诡异的问题。
  4. 单机分区数不宜过多,否则会造成发端到端延迟变长。如果比较重视延迟,建议分区数的值小于100乘以broker数量再乘以复制因子。该公式来自于confluent的文章:How to choose the number of topics/partitions in a Kafka cluster?
  5. 虽然跨分区不能保证全局有序消费,但是一般只要按照消息有序的KEY散列到不同的分区上,然后由多个不同的消费者并发消费。最后做排序也很简单。因为每个分区的消费都是有序的。如果一定要一开始就做到全局严格有序,可以只用一个分,当然效率会低不少。

5.2 消费者

强烈推荐使用Low level API,虽然繁琐一些,但是目前只有这个API可以对Error数据进行自定义处理,尤其是处理Broker异常或由于Unclean Shutdown导致的Corrupted Data时,否则无法Skip只能等着“坏消息”在Broker上被Rotate掉,在此期间该Replica将会一直处于不可用状态。

5.3 生产者

Producer的线程不要配置过多,尤其是在Mirror或者Migration中使用的时候,会加剧目标集群Partition消息乱序的情况(如果你的应用场景对消息顺序很敏感的话)。
0.8版本的request.required.acks默认是0(同0.7)。

6. kafka使用心得

6.1 kafka数据中心

最佳实践:kafka集群最好部署在相同局域网的环境里,不要部署在不同的网络环境里。跨数据中心延迟大,大大影响kafka、zk写入效率以及分区复制效率。

解决方案:部署需要跨多个数据中心的数据通道建议每个数据中心作为一个独立的KAFKA集群部署。多个数据中心之间采用镜像同步复制。

操作办法:使用kafka提供的mirror maker tool

 > bin/kafka-mirror-maker.sh
       --consumer.config consumer-1.properties --consumer.config consumer-2.properties
       --producer.config producer.properties --whitelist my-topic

PS:白名单支持JAVA STYLE的正则

6.2 硬件

CPU和内存:Linkdin采用双路四核Intel Xeon,24GB内存。足够的内存来缓冲活跃的读和写。

磁盘吞吐量:磁盘越多越好。Linkdin采用8*7200 rpm SATA驱动器(经常强制刷新建议上SAS)

6.3 JAVA相关

最佳实践:建议采用最新的JDK8(7当然也支持)

Linkedin配置参考:

-Xmx6g -Xms6g -XX:MetaspaceSize=96m 
-XX:+UseG1GC -XX:MaxGCPauseMillis=20 
-XX:InitiatingHeapOccupancyPercent=35 
-XX:G1HeapRegionSize=16M 
-XX:MinMetaspaceFreeRatio=50 
-XX:MaxMetaspaceFreeRatio=80

Linkedin一个kafka商用集群,峰值时的数据:

60 brokers
50k partitions (replication factor 2)
800k messages/sec in
300 MB/sec inbound, 1 GB/sec+ outbound

90%的broker GC暂停时间为21ms左右。每秒进行的young GC小于1次

6.4 操作系统

最佳实践:

  1. 使用LINUX,WINDOWS的支持不是最好。
  2. 提升文件描述符的数量从而支持大量会话和连接
  3. 增大socket buffer保证数据中心之间高性能数据传输

6.5 关于磁盘

Kafka的复制支持RAID。选择使用raid需要做一些权衡:
缺点:

  1. 负载失衡:如果你配置多个数据目录分区,将会被循环分配数据目录,每个分区将完全在一个数据目录。如果数据没有被分区之间很好的平衡,可能导致磁盘之间负载失衡
  2. 为较大的写入吞吐量做优化,会减少可用磁盘空间

优点:

  1. 容忍磁盘故障,有磁盘冗余

6.5 关于安全

在发布的0.9.0.0,kafka增加了许多功能,可以单独也可以一起使用,目前支持以下的安全措施。

  1. 使用SSL或SASL(Kerberos),验证连接到broker上的客户端(生产者或消费者)、其他的broker和工具。也可以在最新的0.10.0.0中使用SASL/PLAIN.
  2. 从broker连接到Zookeeper的身份验证。
  3. broker和client之间的数据传输,broker之间,或使用SSL的broker和工具之间的数据加密(注意,当SSL时,性能会降低,其幅度取决于CPU类型和JVM)。
  4. client的read/write操作验证。
  5. 验证是插拔的,支持外部认证服务集成。

值得注意的是,安全是可选的 - 支持非安全集群,以及混合认证,未经认证,加密和非加密的客户端。下面的指南介绍如何配置和使用client和broker的安全特性。

参考资料:

  1. kafka入门介绍
  2. kafka官网
  3. rocketmq vs kafka(部分观点觉得有问题,比如kafka不支持消息重发)
  4. kafka高性能吞吐揭秘