1. 介绍

本文假设已经成功配置过flume,能够令数据成功持久化到HDFS。如果还不会的,可以参看我的文章使用flume将avro源的数据写入HDFS

本节接下来将介绍如何从kafka source到hdfs sink的操作

2. flume配置

注意在这个过程当中, flume承担consumer的角色。如果存在多个消费者,注意把他们配置在同一个消费者组中,以免出问题!!

# ------------------- 定义数据流----------------------
# source的名字
agent.sources = kafkaSource
# channels的名字,建议按照type来命名
agent.channels = memoryChannel
# sink的名字,建议按照目标来命名
agent.sinks = hdfsSink

# 指定source使用的channel名字
agent.sources.kafkaSource.channels = memoryChannel
# 指定sink需要使用的channel的名字,注意这里是channel
agent.sinks.hdfsSink.channel = memoryChannel

#-------- kafkaSource相关配置-----------------
# 定义消息源类型
agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
# 定义kafka所在zk的地址
agent.sources.kafkaSource.zookeeperConnect = 10.45.9.139:2181
# 配置消费的kafka topic
agent.sources.kafkaSource.topic = my-topic
# 配置消费者组的id
agent.sources.kafkaSource.groupId = flume
# 消费超时时间,参照如下写法可以配置其他所有kafka的consumer选项。注意格式从kafka.xxx开始是consumer的配置属性
agent.sources.kafkaSource.kafka.consumer.timeout.ms = 100



#------- memoryChannel相关配置-------------------------
# channel类型
agent.channels.memoryChannel.type = memory
# channel存储的事件容量
agent.channels.memoryChannel.capacity=1000
# 事务容量
agent.channels.memoryChannel.transactionCapacity=100

#---------hdfsSink 相关配置------------------
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = hdfs://10.45.10.33:9000/flume

3. 启动kafka发送消息

随便写个producer给对应的topic发消息即可:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * @author Wan Kaiming on 2016/8/1
 * @version 1.0
 */
public class MyProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        //broker地址 这里用域名,记得修改本地的hosts文件
        props.put("bootstrap.servers", "mysql1:9092");
        //消息可靠性语义
        props.put("acks", "all");
        //请求broker失败进行重试的次数,避免由于请求broker失败造成的消息重复
        props.put("retries", 0);
        //按批发送,每批的消息数量
        props.put("batch.size", 16384);
        //防止来不及发送,延迟一点点时间,使得能够批量发送消息
        props.put("linger.ms", 1);
        //缓冲大小,bytes
        props.put("buffer.memory", 33554432);
        //key的序列化类
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //value的序列化类
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //创建一个Producer对象,加载配置上下文信息
        Producer<String, String> producer = new KafkaProducer<String,String>(props);


        producer.send(new ProducerRecord<String, String>("my-topic", "hello", "world"));

//        while(true){
//            //调用send方法进行发送。send方法将消息加到缓存,异步发送
//            producer.send(new ProducerRecord<String, String>("my-topic", "hello", "world"));
//        }

        //producer.close();
    }
}

PS:我配置的HDFS进行刷盘的批次大小为3,所以要记得运行该程序三遍才能在HDFS上看到数据:

在HDFS上查看