1. 介绍
kafka-connect-hdfs是一个JAVA写的开源的kafka工具。用于负责从kafka抽取消息写入到hdfs,原理上使用了avro来做序列化。来自Confluent公司。安装了conflunt platform可以实现kafka消息持久化到JDBC或者HDFS。
kafka-connect-hdfs还集成了hive。针对每个kafka topic可以创建对应的外部HIVE 分区表,并且根据HDFS上的信息来更新HIVE分区表。
接下来的实验采用的环境如下:
ip | 包含组件 |
---|---|
10.45.10.31 | kafka broker,kafka manager |
10.45.10.33 | hadoop-master(namenode),hadoop-slave-1(datanode) |
10.45.10.34 | kafka broker, confluent相关服务,hadoop-slave-2(datanode) |
10.45.10.35 | hadoop-slave-3(datanode) |
10.45.9.139 | zookeeper-server |
2. 安装和使用教程
我们通过官方文档的例子来详细了解该工具的使用。
2.1 关于Confluent platform
经过研究发现是无法直接使用kafka-connect-hdfs的。为了使用该组件,必须完整安装Confluent platform
下载可以直接点击confluent platform下载
更多资料可以查看Confluent Quickstart的文档
接下来confluent的主目录均由$CONFLUENT_HOME表示
2.2 HDFS上新建topic和logs目录
为了保证connector能正确写入,需要提前新建topic和logs目录。这两个目录可以在配置文件中通过topics.dir和logs.dir来设置,如果没设置则默认值为/topics和/logs。一般都是没权限直接写根目录的,所以提起建好相应的HDFS目录(每个节点都需要建)。
在namenode上使用hadoop命令 :
hadoop fs -mkdir /logs
hadoop fs -mkdir /topics
2.3 secure HDFS and Hive metastore
如果开启了secure HDFS and HIVE metastore则需要额外配置一些内容,具体参考文档Secure HDFS and Hive Metastore这一章节。如果没有配置则可以跳过继续往下看。
2.4 启动schema-registry
注意,除了schema-registry请确保zk和kafka服务都已经启动。
编辑下confluent 主目录下的配置文件。这里有多个配置文件。其中schema-registry.properties是启动该服务所需要配置的文件
# 这个etc是confluent主目录下的etc,别搞错
vi $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties
主要修改下zookeeper的地址
# zookeeper 的地址
kafkastore.connection.url=10.45.9.139:2181
# schema registry本身需要的topic,与其他topic独立
kafkastore.topic=_schemas
# debug可以开启,方便排错
debug=true
在$CONFLUENT_HOME目录下使用以下命令启动该服务
nohup ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties &
启动后可以看到存在了SchemaRegistry相关的服务
一些schema注册相关的命令如下,可以作为参考(我们这里不需要执行注册schema,在之后调用命令的时候会完成该操作,注册test_hdfs-value这个 schema):
# Register a new version of a schema under the subject "Kafka-key"
$ curl -X POST -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}' \
http://localhost:8081/subjects/Kafka-key/versions
# Register a new version of a schema under the subject "Kafka-value"
$ curl -X POST -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}' \
http://localhost:8081/subjects/Kafka-value/versions
# List all subjects
$ curl -X GET -i http://localhost:8081/subjects
# List all schema versions registered under the subject "Kafka-value"
$ curl -X GET -i http://localhost:8081/subjects/Kafka-value/versions
# Fetch a schema by globally unique id 1
$ curl -X GET -i http://localhost:8081/schemas/ids/1
# Fetch version 1 of the schema registered under subject "Kafka-value"
$ curl -X GET -i http://localhost:8081/subjects/Kafka-value/versions/1
# Fetch the most recently registered schema under subject "Kafka-value"
$ curl -X GET -i http://localhost:8081/subjects/Kafka-value/versions/latest
# Check whether a schema has been registered under subject "Kafka-key"
$ curl -X POST -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}' \
http://localhost:8081/subjects/Kafka-key
# Test compatibility of a schema with the latest schema under subject "Kafka-value"
$ curl -X POST -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"string\"}"}' \
http://localhost:8081/compatibility/subjects/Kafka-value/versions/latest
# Get top level config
$ curl -X GET -i http://localhost:8081/config
# Update compatibility requirements globally
$ curl -X PUT -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "NONE"}' \
http://localhost:8081/config
# Update compatibility requirements under the subject "Kafka-value"
$ curl -X PUT -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "BACKWARD"}' \
http://localhost:8081/config/Kafka-value
2.5配置下kafka-connect-hdfs的配置文件
修改配置文件.
# 打开配置文件
vi $CONFLUENT_HOME/etc/kafka-connect-hdfs/quickstart-hdfs.properties
# 修改如下内容
# 该sink组件的名字,后续执行中会用到该名字
name=hdfs-sink
# 保持默认即可
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
# 最大的sink数量,默认只让1个sink执行
tasks.max=1
# 指定将哪个topic的消息持久化到HDSF
topics=test_connect_hdfs
# HDFS的地址
hdfs.url=hdfs://10.45.10.33:9000
# 持久化到HDFS的批次大小。例如如果小于3条的话,consumer拿到数据并不会马上持久化到HDFS
flush.size=3
2.6 使用confluent提供的avro生产者向指定topic产生消息
首先先生成一个topic
如果采用命令来创建topic可以在$KAFKA_HOME中使用:
# 创建一个名为“test_hdfs”的Topic,只有一个分区和一个备份:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_hdfs
# 查看topics信息
bin/kafka-topics.sh --list --zookeeper 10.45.9.139:2181
还可以使用kafka manger来创建,这个就比较简单了。kafka manager的安装使用可以参考kafka manager使用教程
我们采用10个分区来创建test_connect_hdfs主题
开始前请确保相关服务都已经开启,包括hadoop、kafka、zookeeper、SchemaRegistry
# 这里指定了用avro producer来向2个指定的broker上的test_connect_hdfs发送消息。同时还指定了schema的形式
./bin/kafka-avro-console-producer --broker-list 10.45.10.34:9092,10.45.10.31:9092 --topic test_connect_hdfs --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
# 然后可以输入符合schema格式要求的数据(根据配置信息可知,持久化到HDFS的批次大小是3条消息)
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}
这时候还可以使用schema-registry的一些命令来查看,例如查看所有主题:
curl -X GET -i http://localhost:8081/subjects
发现与我们设定的topic相关的schema主题信息
2.7 使用confluent提供的avro消费者向指定topic拉消息持久化到HDFS
2.7.1 修改connect-avro-standalone.properties配置文件
confluent提供的avro消费者需要用到schema registry的一个配置文件。在启动消费者前先对该文件进行修改。在confluent主目录打开该文件:
# 打开
vi $CONFLUENT_HOME/etc/schema-registry/connect-avro-standalone.properties
# 修改下bootstrap.servers即可(用于指定broker的地址)
bootstrap.servers=10.45.10.34:9092,10.45.10.31:9092
2.7.2 执行avro消费者
使用如下命令运行消费者即可
./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties \
etc/kafka-connect-hdfs/quickstart-hdfs.properties
消费过程如下图,可以看到consumer同时也是一个sink task负责将数据store到hdfs
执行完毕后可以到http://10.45.10.33:50070/上查看结果
PS: 由于按批次处理,我没有产生太多消息,所以虽然设定了10个分区,这里并没有全部产生。
2.7.3 查看具体在HDFS的文件
在浏览器中无法查看该文件的内容。文件命名的方式是 topic+kafkaPartition+startOffset+endOoffset.format.
我们可以下载个avro工具来帮助我们查看文件内容。
首先下载avro-tools-1.8.1到namenode节点上。
然后使用以下命令提交 jar(我们提交给yarn而不是hadoop)
yarn jar avro-tools-1.8.1.jar tojson \
/topics/test_connect_hdfs/partition=2/test_connect_hdfs+2+0000000004+0000000006.avro
运行时会出现这样的错误
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.avro.io.EncoderFactory.jsonEncoder(Lorg/apache/avro/Schema;Ljava/io/OutputStream;Z)Lorg/apache/avro/io/JsonEncoder;
翻遍google发现只有一个人问了这个问题,还好有人回答了。出现原因是avro-tools一般在单机使用,放在hadoop这种分布式环境中有时候找不到类。详情见这里
避免这个问题的方式是在你的namenode上增加如下环境变量:
# 修改profile
vi /etc/profile
# 添加如下内容
export HADOOP_USER_CLASSPATH_FIRST=true
export HADOOP_CLASSPATH=avro-tools-1.8.1.jar
然后再重新读取下发现已经OK:
亲,按照上面的步骤到2.7.2 执行avro消费者的时候失败了:
ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectStandalone:122)
java.lang.NoClassDefFoundError: io/confluent/connect/storage/StorageSinkConnectorConfig
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:96)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)