1. 介绍

kafka-connect-hdfs是一个JAVA写的开源的kafka工具。用于负责从kafka抽取消息写入到hdfs,原理上使用了avro来做序列化。来自Confluent公司。安装了conflunt platform可以实现kafka消息持久化到JDBC或者HDFS。

kafka-connect-hdfs还集成了hive。针对每个kafka topic可以创建对应的外部HIVE 分区表,并且根据HDFS上的信息来更新HIVE分区表。

接下来的实验采用的环境如下:

ip 包含组件
10.45.10.31 kafka broker,kafka manager
10.45.10.33 hadoop-master(namenode),hadoop-slave-1(datanode)
10.45.10.34 kafka broker, confluent相关服务,hadoop-slave-2(datanode)
10.45.10.35 hadoop-slave-3(datanode)
10.45.9.139 zookeeper-server

重要提醒:如果遇到问题没法解决,一定要仔细查看confluent的官方文档(我用的是3.0.0)

2. 安装和使用教程

我们通过官方文档的例子来详细了解该工具的使用。

2.1 关于Confluent platform

经过研究发现是无法直接使用kafka-connect-hdfs的。为了使用该组件,必须完整安装Confluent platform

下载可以直接点击confluent platform下载

更多资料可以查看Confluent Quickstart的文档
Confluent Quickstart

接下来confluent的主目录均由$CONFLUENT_HOME表示

2.2 HDFS上新建topic和logs目录

为了保证connector能正确写入,需要提前新建topic和logs目录。这两个目录可以在配置文件中通过topics.dir和logs.dir来设置,如果没设置则默认值为/topics和/logs。一般都是没权限直接写根目录的,所以提起建好相应的HDFS目录(每个节点都需要建)。

在namenode上使用hadoop命令 :

hadoop fs -mkdir /logs
hadoop fs -mkdir /topics

2.3 secure HDFS and Hive metastore

如果开启了secure HDFS and HIVE metastore则需要额外配置一些内容,具体参考文档Secure HDFS and Hive Metastore这一章节。如果没有配置则可以跳过继续往下看。

2.4 启动schema-registry

注意,除了schema-registry请确保zk和kafka服务都已经启动。

编辑下confluent 主目录下的配置文件。这里有多个配置文件。其中schema-registry.properties是启动该服务所需要配置的文件

# 这个etc是confluent主目录下的etc,别搞错
vi $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties

主要修改下zookeeper的地址

# zookeeper 的地址
kafkastore.connection.url=10.45.9.139:2181
# schema registry本身需要的topic,与其他topic独立
kafkastore.topic=_schemas
# debug可以开启,方便排错
debug=true

在$CONFLUENT_HOME目录下使用以下命令启动该服务

 nohup ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties &

启动后可以看到存在了SchemaRegistry相关的服务

一些schema注册相关的命令如下,可以作为参考(我们这里不需要执行注册schema,在之后调用命令的时候会完成该操作,注册test_hdfs-value这个 schema):

# Register a new version of a schema under the subject "Kafka-key"
$ curl -X POST -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8081/subjects/Kafka-key/versions

# Register a new version of a schema under the subject "Kafka-value"
$ curl -X POST -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
     http://localhost:8081/subjects/Kafka-value/versions

# List all subjects
$ curl -X GET -i http://localhost:8081/subjects

# List all schema versions registered under the subject "Kafka-value"
$ curl -X GET -i http://localhost:8081/subjects/Kafka-value/versions

# Fetch a schema by globally unique id 1
$ curl -X GET -i http://localhost:8081/schemas/ids/1

# Fetch version 1 of the schema registered under subject "Kafka-value"
$ curl -X GET -i http://localhost:8081/subjects/Kafka-value/versions/1

# Fetch the most recently registered schema under subject "Kafka-value"
$ curl -X GET -i http://localhost:8081/subjects/Kafka-value/versions/latest

# Check whether a schema has been registered under subject "Kafka-key"
$ curl -X POST -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8081/subjects/Kafka-key

# Test compatibility of a schema with the latest schema under subject "Kafka-value"
$ curl -X POST -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8081/compatibility/subjects/Kafka-value/versions/latest

# Get top level config
$ curl -X GET -i http://localhost:8081/config

# Update compatibility requirements globally
$ curl -X PUT -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"compatibility": "NONE"}' \
    http://localhost:8081/config

# Update compatibility requirements under the subject "Kafka-value"
$ curl -X PUT -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"compatibility": "BACKWARD"}' \
    http://localhost:8081/config/Kafka-value

2.5配置下kafka-connect-hdfs的配置文件

修改配置文件.

# 打开配置文件
vi $CONFLUENT_HOME/etc/kafka-connect-hdfs/quickstart-hdfs.properties

# 修改如下内容
# 该sink组件的名字,后续执行中会用到该名字
name=hdfs-sink
# 保持默认即可
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
# 最大的sink数量,默认只让1个sink执行
tasks.max=1
# 指定将哪个topic的消息持久化到HDSF
topics=test_connect_hdfs
# HDFS的地址
hdfs.url=hdfs://10.45.10.33:9000
# 持久化到HDFS的批次大小。例如如果小于3条的话,consumer拿到数据并不会马上持久化到HDFS
flush.size=3

2.6 使用confluent提供的avro生产者向指定topic产生消息

首先先生成一个topic
如果采用命令来创建topic可以在$KAFKA_HOME中使用:

# 创建一个名为“test_hdfs”的Topic,只有一个分区和一个备份:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_hdfs
# 查看topics信息
 bin/kafka-topics.sh --list --zookeeper 10.45.9.139:2181

还可以使用kafka manger来创建,这个就比较简单了。kafka manager的安装使用可以参考kafka manager使用教程

我们采用10个分区来创建test_connect_hdfs主题

开始前请确保相关服务都已经开启,包括hadoop、kafka、zookeeper、SchemaRegistry

# 这里指定了用avro producer来向2个指定的broker上的test_connect_hdfs发送消息。同时还指定了schema的形式
./bin/kafka-avro-console-producer --broker-list 10.45.10.34:9092,10.45.10.31:9092 --topic test_connect_hdfs --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

# 然后可以输入符合schema格式要求的数据(根据配置信息可知,持久化到HDFS的批次大小是3条消息)
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}

这时候还可以使用schema-registry的一些命令来查看,例如查看所有主题:
curl -X GET -i http://localhost:8081/subjects

发现与我们设定的topic相关的schema主题信息

2.7 使用confluent提供的avro消费者向指定topic拉消息持久化到HDFS

2.7.1 修改connect-avro-standalone.properties配置文件

confluent提供的avro消费者需要用到schema registry的一个配置文件。在启动消费者前先对该文件进行修改。在confluent主目录打开该文件:

# 打开
vi $CONFLUENT_HOME/etc/schema-registry/connect-avro-standalone.properties

# 修改下bootstrap.servers即可(用于指定broker的地址)
bootstrap.servers=10.45.10.34:9092,10.45.10.31:9092

2.7.2 执行avro消费者

使用如下命令运行消费者即可

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties \
etc/kafka-connect-hdfs/quickstart-hdfs.properties

消费过程如下图,可以看到consumer同时也是一个sink task负责将数据store到hdfs

执行完毕后可以到http://10.45.10.33:50070/上查看结果

PS: 由于按批次处理,我没有产生太多消息,所以虽然设定了10个分区,这里并没有全部产生。

2.7.3 查看具体在HDFS的文件

在浏览器中无法查看该文件的内容。文件命名的方式是 topic+kafkaPartition+startOffset+endOoffset.format.

我们可以下载个avro工具来帮助我们查看文件内容。
首先下载avro-tools-1.8.1到namenode节点上。

然后使用以下命令提交 jar(我们提交给yarn而不是hadoop)

yarn jar avro-tools-1.8.1.jar tojson \
/topics/test_connect_hdfs/partition=2/test_connect_hdfs+2+0000000004+0000000006.avro

运行时会出现这样的错误

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.avro.io.EncoderFactory.jsonEncoder(Lorg/apache/avro/Schema;Ljava/io/OutputStream;Z)Lorg/apache/avro/io/JsonEncoder;

翻遍google发现只有一个人问了这个问题,还好有人回答了。出现原因是avro-tools一般在单机使用,放在hadoop这种分布式环境中有时候找不到类。详情见这里

避免这个问题的方式是在你的namenode上增加如下环境变量:

# 修改profile
vi /etc/profile
# 添加如下内容
export HADOOP_USER_CLASSPATH_FIRST=true
export HADOOP_CLASSPATH=avro-tools-1.8.1.jar

然后再重新读取下发现已经OK: