1. 介绍

以前分享过一篇文章kafka原理以及设计实现思想,但是很多东西讲的还是不够深入。今天这篇文章主要分析下生产者的具体设计和原理。

这篇文章参考了很多其他资料,使用的版本也较老,基本上是0.8的版本。后续有时间我会更新0.10中新的变化。

2. kafka发送流程

kafka的发送流程可以简单概括为如下的图。这幅图我们可以分为三部分来理解。中间的(深蓝色矩形)部分的流程是发送的核心流程(同步发送);左边(淡蓝色)是异步发送时相关的额外流程,右边(黄色)是客户端更新元信息相关的流程。简单概括为:

  1. 同步发送流程
  2. 异步发送流程
  3. 更新元信息流程

后续我们将对这三个流程分别进行分析。

2.1 创建生产者相关实例

一般在生产者客户端代码中我们使用如下这样的代码来创建一个生产者。

Produce p = new Producer(new ProducerConfig());

实际上在运行该代码后,我们是启动了三个实例,同时也初始化了ProducerConfig类完成了生产者的配置。三个实例如下:

  1. Producer
  2. DefaultEventHandler
  3. ProducerPool: 连接不同kafka broker的生产者池,连接个数有broker.list参数决定

2.2 同步发送流程

  1. Producer实例调用其send方法
  2. 本质是调用了Handler的handle(message)
  3. handler序列化消息
  4. handler调用dispatchSerializedData方法来调度序列化后的消息
  5. dispatchSerializedData方法调用partitionAndCollate方法对topic的message进行分组(根据获取的leaderBrokerId元数据来对消息分组)
  6. 从生产者池中获取不同broker对应的生产者,来真正的发送消息

2.2 异步发送流程

异步发送可以结合同步发送的流程来看。异步发送流程就是在同步发送流程前面多进行了一些额外的流程,来达到异步批量发送的目的。

额外增加的流程为:

  1. 根据生产者API,采用异步方式,则先将消息写入一个阻塞队列
  2. DefaultEventHandler定期向阻塞队列拉去消息
  3. 后面和同步发送流程相同

大家可以结合下图的流程来理解同步发送流程和异步发送流程(区别可以看到就是多了一个阻塞队列):

  1. 同步发送流程

  2. 异步发送流程

2.3 更新元信息流程

我们可以看到生产者并不是通过ZK来获取元信息的。这里注意获取元信息的时机:

  1. DefaultEventHandler中的handle方法在初次使用时会初始化BrokerPartitionInfo类调用其updateInfo方法来更新元数据信息
  2. handler处理时还会定期调用updateInfo方法来更新元数据信息

下面我们来看下更新元信息相关的代码:

1. DefaultEventHandler类

DefaultEventHandler类: 初始化实例化BrokerPartitionInfo类,然后定期brokerPartitionInfo.updateInfo方法,DefaultEventHandler部分代码如下:

  def handle(events: Seq[KeyedMessage[K,V]]) {
    ......
    while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
      topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
      if (topicMetadataRefreshInterval >= 0 &&
          SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
          //首次进入循环,满足该条件会进行一次元信息更新
        Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
        sendPartitionPerTopicCache.clear()
        topicMetadataToRefresh.clear
        lastTopicMetadataRefreshTime = SystemTime.milliseconds
      }
      outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
      if (outstandingProduceRequests.size > 0) {
        info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1))
        //休眠时间,多长时间刷新一次
        Thread.sleep(config.retryBackoffMs)
        // 生产者定期请求刷新最新topics的broker元数据信息
        Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))
        .....
      }
    }
  }

2. BrokerPartitionInfo的updateInfo方法代码

 def updateInfo(topics: Set[String], correlationId: Int) {
    var topicsMetadata: Seq[TopicMetadata] = Nil
    //根据topics列表,meta.broker.list,其他配置参数,correlationId表示请求次数,一个计数器参数而已
    //创建一个topicMetadataRequest,并随机的选取传入的broker信息中任何一个去取metadata,直到取到为止
    val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
    topicsMetadata = topicMetadataResponse.topicsMetadata
    // throw partition specific exception
    topicsMetadata.foreach(tmd =>{
      trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
      if(tmd.errorCode == ErrorMapping.NoError) {
        topicPartitionInfo.put(tmd.topic, tmd)
      } else
        warn("Error while fetching metadata[%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))
      tmd.partitionsMetadata.foreach(pmd =>{
        if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) {
          warn("Error while fetching metadata %s for topic partition[%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,
            ErrorMapping.exceptionFor(pmd.errorCode).getClass))
        } // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata
      })
    })
    producerPool.updateProducer(topicsMetadata)
  }

3. ClientUtils.fetchTopicMetadata方法代码

def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {
    var fetchMetaDataSucceeded: Boolean = false
    var i: Int = 0
    val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq)
    var topicMetadataResponse: TopicMetadataResponse = null
    var t: Throwable = null
    val shuffledBrokers = Random.shuffle(brokers) //生成随机数
 while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) {
      val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i))
      info("Fetching metadata from broker %s with correlation id %d for %d topic(s) %s".format(shuffledBrokers(i), correlationId, topics.size, topics))
      try {
        topicMetadataResponse = producer.send(topicMetadataRequest)
        fetchMetaDataSucceeded = true
      }
      catch {
        case e: Throwable =>
          warn("Fetching topic metadata with correlation id %d for topics [%s] from broker [%s] failed"
            .format(correlationId, topics, shuffledBrokers(i).toString), e)
          t = e
      } finally {
        i = i + 1
        producer.close()
      }
    }
    if(!fetchMetaDataSucceeded) {
      throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, shuffledBrokers), t)
    } else {
      debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics))
    }
    return topicMetadataResponse
  }



//ProducerPool的updateProducer
def updateProducer(topicMetadata: Seq[TopicMetadata]) {
    val newBrokers = new collection.mutable.HashSet[Broker]
    topicMetadata.foreach(tmd => {
      tmd.partitionsMetadata.foreach(pmd => {
        if(pmd.leader.isDefined)
          newBrokers+=(pmd.leader.get)
      })
    })
    lock synchronized {
      newBrokers.foreach(b => {
        if(syncProducers.contains(b.id)){
          syncProducers(b.id).close()
          syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
        } else
          syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
      })
    }
  }

以下几点需要注意:

  1. 客户端调用send, 才会新建SyncProducer,只有调用send才会去定期刷新metadata
  2. 在每次取metadata时,kafka会新建一个SyncProducer去取metadata,逻辑处理完后再close。
  3. 每个SyncProducer实例化对象会建立一个socket连接。根据当前SyncProducer(一个Broker的连接)取得的最新的完整的metadata,刷新ProducerPool中到broker的所有连接.刷新会直接重新把到每个broker的socket连接重建,意味着在这之后的第一个请求会有几百毫秒的延迟。
  4. Kafka的集群中如果某个partition所在的broker挂了,可以检查错误后重启重新加入集群,手动做rebalance,producer的连接会再次断掉,直到rebalance完成,那么刷新后取到的连接着中就会有这个新加入的broker。

3. producer如何分区

分区相关的代码如下:

/** 
     * Implementation of asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>. 
     * See {@link #send(ProducerRecord, Callback)} for details. 
     */  
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {  
  
            //每次在dosend的时候都会去为当前的日志选择partition
            int partition = partition(record, serializedKey, serializedValue, metadata.fetch());  
  
}  
private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {  
        Integer partition = record.partition();  
if (partition != null) {  
            List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());  
            int lastPartition = partitions.size() - 1;  
            // they have given us a partition, use it  
            if (partition < 0 || partition > lastPartition) {  
                throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition));  
            }  
            return partition;  
        }  
        return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue,  
            cluster);  
    }  

发送者分区方法:如果在发送的时候已指定好partition,并且partition在Custer中存储的partition列表中,则以指定的partition为主。否则,则使用自定的partition class的分区函数进行分区。

参考资料:

  1. kafka源码解析之十六生产者流程(客户端如何向topic发送数据)
  2. apache kafka源码分析-Producer分析
  3. kafka-producer端-系统设计关注点的源码探究