1. 前言

市面上主流的kafka监控工具有:

  1. Kafka Web Conslole
  2. Kafka Manager
  3. KafkaOffsetMonitor、

这些工具都是开源的。三者的对比可以参看kafka三种工具比较

我查看了github上三者工具的更新时间,发现kafka manager的作者维护的比较情况。虽然WIKI上说kafka manager没说支持0.10,但是通过调研后发现,直接使用于kafka 0.10版本也是没有问题的。本文就介绍下如何使用kafka manager

2. kafka manager功能简介

kafka manager支持以下功能:

  1. 管理多个kafka集群
  2. 方便的集群状态监控 (包括topics, consumers, offsets, brokers, replica distribution, partition distribution)
  3. 方便选择你想要的分区副本
  4. 配置分区任务,包括选择使用哪些brokers
  5. 可以对分区任务重分配
  6. 提供不同的选项来创建topic (0.8.1.1 has different configs than 0.8.2+,不同版本配置上有所不同)
  7. 删除topic(仅仅支持 0.8.2以上版本并且注意在broker config中设置delete.topic.enable=true in broker config)
  8. Topic list会指明哪些topic被删除 (在0.8.2以上版本适用)
  9. 批量产生分区任务并且和多个topic和brokers关联
  10. 批量运行多个主题对应的多个分区
  11. 向已经存在的主题中添加分区
  12. 对已经存在的topic修改配置
  13. 可选地在broker level和topic level的度量中启用JMX polling功能
  14. 可选地过滤在ZK上没有 ids/ owners/ & offsets/ directories的consumer

3. 下载安装

3.1 环境要求

Kafka 0.8.1.1 or 0.8.2.+ or 0.9.0.+
Java 8+
sbt 0.13.x

3.2 安装scala插件

这里我们使用idea和sbt插件来完成kafka-manager的安装

直接在设置里面搜索scala安装即可,科学上网建议采用xxnet

3.3 使用repox加载SBT下载依赖

一开始用公司的nexus私服尝试了下,发现不行。不代理ivy格式的文件。还好后来找到了这个开源工具,提供给了依赖的私服下载。使用办法可以查看入门指南

在使用repox工作前,可以做以下准备工作
1.下载sbt并且安装。
下载安装好后,修改如下两个文件:

sbtconfig.txt增加如下内容:

# Set the java args to high

-Xmx512M

-XX:MaxPermSize=256m

-XX:ReservedCodeCacheSize=128m

# Set the extra SBT options

-Dsbt.ivy.home=D:/sbt/.ivy2
-Dsbt.global.base=D:/sbt/.sbt
-Dsbt.repository.config=D:/sbt/conf/repo.properties
-Dsbt.log.format=true

repo.properties可以增加如下内容

[repositories]
local
#  这里可以添加下自己的NEXUS库或者其他可用的公有仓库
typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
sonatype-oss-releases
maven-central
sonatype-oss-snapshots
  1. 下载nodejs(包括了npm工具)
  2. 下载bower,npm install -g bower
  3. 按照上面的入门指南操作

上述中的bower依赖于node.js和npm,因此需要先安装node.js。 sbt将会在target/scala-2.11/目录下生成 repox-assembly-$VERSION.jar

运行使用以下命令

$ java -Xmx512m -jar repox-assembly-$VERSION.jar

注意repox也需要先编译,这个过程很慢。有时候发现一直在下载的包很慢,可以手动去maven仓库下载,然后扔到本地的sbt仓库。

Repox web admin 默认密码为zhimakaimen.

通过访问127.0.0.1:8087访问repox

这里的仓库信息可以使用repox提供的,直接点击右上角的进行导入操作

仓库信息可以从repox社区公服导入:


这个即为需要导入的文件

3.4 从github下载工程并且创建sbt工程

设置下自定义的sbt还有VM参数,然后倒入SBT工程就可以了。慢慢等待吧,用了repox确实快了不少。

PS:如果sbt出现了java.lang.NoClassDefFoundError: sdt/testing/Framework 可以考虑删除sbt安装目录下的.sbt目录就可以了。这个错误貌似是多个项目同时使用sbt时发生了一些冲突。

导入工程后,在SBT主目录下执行以下命令编译:

./sbt clean dist

执行完毕后,生成的包会在kafka-manager/target/universal 下。该压缩包可以直接部署后使用

4.部署kafka-manaer

将编译打包后的kafka-manager在需要部署的机器上解压出来。

进入conf中配置下application.conf中的kafka-manager.zkhosts来指定下自己的zk-server

启动kafka-manager,这里采用nohup配合&来后台运行(使用nohup,如果账户退出的话,该进程不会被关掉)。通过-Dhttp.port可以指定端口,默认为9000

nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=9001 &

通过浏览器访问该机器的9001端口可以访问kafka-manager的web管理界面

5. 配置kafka-manager

5.1 配置JMX(JAVA管理扩展)

为了监控Kafka,我们建议配置下JMX,使得监控能够提供更加详细的内容。kafka是用scala写的,而scala依赖JVM,所以用JMX来监控是理所当然的。

修改bin/kafka-server-start.sh,可以在堆信息配置那里添加JMX_PORT参数。我们这里使JMX端口为9999。

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    export JMX_PORT="9999"
fi

然后再启动kafka

nohup sh kafka-server-start.sh ../config/server.properties &

开启JMX之后除了可以使得jconsole监控更加详细的kafka信息。JMX监控kafka的使用可以参考我其他的kafka系列文章

用visualVM可以来监控kafka manager的JVM

5.2 新建集群

刚进去只是个空空的管理界面,首先需要新建一个集群。

建完集群之后可以查看集群的状态信息

还可以进行一些日志查看以及增删主题等操作。

虽说kafka-manager官方没说支持最新的0.10半本的kafka,但是现在用起来没什么问题。

5.3 使用体验

我们拿otter来获取实时信息给kafka,然后来体验下kafka manager提供的功能。在我们的实验中,我们采用2个broker(分别在不同的NODE上)和3个consumer(在一台机器上)

5.3.1 使用logkafka做日志收集

logkafka貌似是国人开发的一个开源工具,被集成在kakfa-manager。详情见logkafka说明

具体的安装教程参见logkafka安装

把玩了一下,觉得如果要做日志收集还是上logstash或者flume这种成熟点的系统比较好。

5.3.2 往kafka写消息

这时候尝试往kafka写入1万条数据库记录看看效果。数据表结构为:

5.4 broker信息

broker提供了一些准实时的监控(监控会有个采样间隔,表示的是某个时间段内的吞吐信息)。红圈部分可以查看broker fetch消息失败的情况。其他地方主要是查看一个吞吐量信息。

5.5 topic信息

5.6 分区信息

可以查看分区上offset的信息

可以修改分区和broker的绑定关系

6. kafka manager with 0.10.x supported

官方现在提供的kafka manager过过用在0.10.x的kafka集群上会出问题。例如在复制的时候,由于kafka manager使用的仍然是0.9的client来实现的,可能会有如下报错(在kafka manager日志当中),造成kafka manager不断的产生日志,然后导致磁盘资源耗尽。日志报错内容如下:
可见是由于协议的缘故导致找不到指定的域。

2016-09-01 21:32:31,015 - [WARN] - from kafka.manager.actor.cluster.KafkaManagedOffsetCache in KafkaManagedOffsetCache 
Failed to process a message from offset topic!
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading array of size 680815, only 448 bytes available
    at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71) ~[org.apache.kafka.kafka-clients-0.9.0.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:439) ~[org.apache.kafka.kafka-clients-0.9.0.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) ~[org.apache.kafka.kafka-clients-0.9.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) ~[org.apache.kafka.kafka-clients-0.9.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) ~[org.apache.kafka.kafka-clients-0.9.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) ~[org.apache.kafka.kafka-clients-0.9.0.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:908) ~[org.apache.kafka.kafka-clients-0.9.0.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) ~[org.apache.kafka.kafka-clients-0.9.0.1.jar:na]
    at kafka.manager.actor.cluster.KafkaManagedOffsetCache$$anonfun$run$3.apply(KafkaStateActor.scala:253) [kafka-manager.kafka-manager-1.3.1.6-sans-externalized.jar:na]
    at kafka.manager.actor.cluster.KafkaManagedOffsetCache$$anonfun$run$3.apply(KafkaStateActor.scala:235) [kafka-manager.kafka-manager-1.3.1.6-sans-externalized.jar:na]
    at scala.util.Success.foreach(Try.scala:236) [org.scala-lang.scala-library-2.11.8.jar:na]
    at kafka.manager.actor.cluster.KafkaManagedOffsetCache.run(KafkaStateActor.scala:235) [kafka-manager.kafka-manager-1.3.1.6-sans-externalized.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]

不过值得高兴的是,也有细心的网友提了pull request。大家可以按照前面所说的编译方法直接clonekafka manager for 0.10.x