1. 介绍

本节主要介绍如何将kafka消息写入alluxio.

在学习本文前请保证已经读过我的以下文章:
alluxio读写HDFS代码示例

2. 主要代码

由于之前做过其他很多测试,所以有其他相关的函数接口,请留意!

2.1 producer

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Date;
import java.util.Properties;

/**
 * @author Wan Kaiming on 2016/8/1
 * @version 1.0
 */
public class MyProducer {


    public static void main(String[] args) {

        //统计时间
        System.out.println("程序开始时间戳信息:"+new Date());
        final long startTime=System.currentTimeMillis();

        Properties props = new Properties();

        //指定clientID方便监控
        props.put("client.id", "test_producer_1");

        //broker地址 这里用域名,记得修改本地的hosts文件
        props.put("bootstrap.servers", "mysql1:9092,mysql4:9092");
        //消息可靠性语义
        props.put("acks", "all");
        //请求broker失败进行重试的次数,避免由于请求broker失败造成的消息重复
        props.put("retries", 3);
        //按批发送,每批的消息数量
        //props.put("batch.size", 16384);
        props.put("batch.size", 16384);
        //防止来不及发送,延迟一点点时间,使得能够批量发送消息
        props.put("linger.ms", 1);
        //缓冲大小,bytes
        props.put("buffer.memory", 33554432);
        //key的序列化类
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //value的序列化类
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //创建一个Producer对象,加载配置上下文信息
        Producer<String, String> producer = new KafkaProducer<String,String>(props);



        for(int i=0;i<10;i++){

            //发送一条后休眠2秒或者
            try {
                Thread.currentThread().sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            producer.send(new ProducerRecord<String, String>("my-topic-new", Integer.toString(i), "one,two,three,four"));
        }





        producer.close();

        final long endTime=System.currentTimeMillis();
        float excTime=(float)(endTime-startTime)/1000;
        System.out.println("执行时间:"+excTime+"s");
        System.out.println("当前时间为:"+ new Date());



    }
}

2.2 consumer

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @author Wan Kaiming on 2016/8/2
 * @version 1.0
 */
public class MyConsumer {

    //记录多线程消费的所有记录数
    private static AtomicLong totalRecordsCount = new AtomicLong(0);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        MyConsumer myConsumer = new MyConsumer();
        //myConsumer.runAuto();
        //myConsumer.runMannual();
        myConsumer.runAutoWithMultiThreads();

    }


    /**
     *  自动提交offset,多线程消费
     */

    private void runAutoWithMultiThreads() throws ExecutionException, InterruptedException {
        Properties props = new Properties();

        //broker的地址
        props.put("bootstrap.servers", "mysql1:9092,mysql4:9092");
        //唯一标识消费者组的ID
        props.put("group.id", "test_new");
        //开启自动提交offset
        props.put("enable.auto.commit", "true");
        //自动提交offset间隔1s
        props.put("auto.commit.interval.ms", "1000");
        //消费会话超时时间10s(做测试,默认30s)
        props.put("session.timeout.ms", "30000");
        //key的反序列化类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //value的反序列化类
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //创建多个consumer实例,并且全部订阅同一个topic
        List<KafkaConsumer<String, String>> consumerList = new ArrayList<>();
        for(int i=0;i<6;i++){
            //指定clientID方便监控
            String clientID = "test_consumer_" + Integer.toString(i);
            props.put("client.id", clientID);
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
            consumer.subscribe(Arrays.asList("my-topic-new"));
            consumerList.add(consumer);
        }



        ExecutorService executorService = Executors.newFixedThreadPool(6);


        //启动6个消费线程

        Future future = null;
        for(int i=0;i<6;i++) {

            future =  executorService.submit(new ConsumerRunnable(consumerList.get(i)));

        }

        executorService.shutdown();

        while(future.get() == null){
            System.out.println("消费者总计消费"+totalRecordsCount.get());
        }
    }



    //创建消费类,用于执行多线程
    class ConsumerRunnable implements Runnable{

        KafkaConsumer<String, String> consumer;

        public ConsumerRunnable(KafkaConsumer<String, String> consumer){
            this.consumer=consumer;
        }

        @Override
        public void run() {
            int count = 0;

            String tempStr;
            //长连接
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1);
                for (ConsumerRecord<String, String> record : records) {

                    count++;
                    tempStr="offset = "+ record.offset()+", key = "+record.key()+", value = "+record.value()+", count = "+count;
                    System.out.println(tempStr);
                    MyConsumer.totalRecordsCount.addAndGet(1);
                    //System.out.print(MyConsumer.totalRecordsCount.get()+" ");
//
//                    try {
//                        KafkaUtils.flushToDisk(Long.toString(MyConsumer.totalRecordsCount.get())+" ","consumer_output.txt");
//                    } catch (IOException e) {
//                        e.printStackTrace();
//                    }

                    try {

                        hdfsWriter(record);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }


                }

                //System.out.println("线程"+Thread.currentThread().getId()+"消费记录:"+count+"条");

                //消费完毕后休眠20秒或者20ms
                try {
                    Thread.currentThread().sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }

        }
    }



    /**
     * 自动提交offset
     */
    private void runAuto(){

        Properties props = new Properties();
        //指定clientID方便监控
        props.put("client.id", "test_consumer_12");
        //broker的地址
        props.put("bootstrap.servers", "mysql1:9092,mysql4:9092");
        //唯一标识消费者组的ID
        props.put("group.id", "test_new");
        //开启自动提交offset
        props.put("enable.auto.commit", "true");
        //自动提交offset间隔1s
        props.put("auto.commit.interval.ms", "1000");
        //消费会话超时时间30s
        props.put("session.timeout.ms", "30000");
        //key的反序列化类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //value的反序列化类
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //多个主题用逗号分隔
        consumer.subscribe(Arrays.asList("my-topic-new"));

        String tempStr;

        //长连接
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records){
                System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
                tempStr = record.key() + record.value();
            }

            //消费完毕后休眠10秒
            try {
                Thread.currentThread().sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }


    /**
     * 手动提交offset
     */
    private void runMannual(){
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.45.10.31:9092,10.45.10.34:9092");
        props.put("group.id", "test");
        //不开启自动提交
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("my-topic-new"));
        final int minBatchSize = 200;
        List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                buffer.add(record);
            }
            if (buffer.size() >= minBatchSize) {

                //打印结果
                for(ConsumerRecord<String,String> record:buffer){
                    System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
                }

                //同步提交
                consumer.commitSync();
                buffer.clear();
            }
        }
    }


    /**
     *     消费的数据传递到HDFS
     *     由于本来就是健值对,所以就采用SequenceFile
     */
    private void hdfsWriter(ConsumerRecord<String, String> record) throws IOException {
        Configuration conf = RWHDFSTest.initialConfig("root", "alluxio://10.45.10.33:19998", "10.45.10.33");

        //设置需要访问的文件在HDFS上的URL
        String uri = "/linecount/kafka_input_file.txt";

        Path path = new Path(uri);



        //使用已经写好的类,将kafka数据持久化到HDFS
        RWHDFSTest.seqFileWriterForKafka(conf,path,record);

    }

    private void hdfsReader() throws IOException {
        Configuration conf = RWHDFSTest.initialConfig("root", "alluxio://10.45.10.33:19998", "10.45.10.33");

        //设置需要访问的文件在HDFS上的URL
        String uri = "/linecount/kafka_input_file.txt";

        Path path = new Path(uri);



        //使用已经写好的类,将kafka数据持久化到HDFS
        RWHDFSTest.seqFileReader(conf,path);
    }

}

2.3 HDFS和alluxio的IO操作工具类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.io.IOException;
import java.net.URI;

/**
 *
 * 对HDFS进行读写的一个demo
 * @author Wan Kaiming on 2016/9/14
 * @version 1.0
 */
public class RWHDFSTest {

    //初始化需要存储到HDFS上的value
    private static String[] MYVALUE = {
            "One, two, buckle my shoe",
            "Three, four, shut the door",
            "Five, six, pick up sticks",
            "Seven, eight, lay them straight",
            "Nine, ten, a big fat hen"
    };

    public static void main(String[] args) throws IOException {

        Configuration conf = initialConfig("root", "alluxio://10.45.10.33:19998", "10.45.10.33");


        //设置需要访问的文件在HDFS上的URL
        String uri = "/linecount/kafka_input_file.txt";

        Path path = new Path(uri);

        IntWritable key = new IntWritable();
        Text value = new Text();


        //写到HDFS上,采用SequenceFile
        //seqFileWriter(conf,path,key,value);
        //从HDFS上读取SequenceFile文件
        seqFileReader(conf,path);
        //normalFileWriter(conf,path,uri);
        //normalFileReader(conf, path,uri);

    }


    //用于写kafka数据的接口,key固定为int类型,value固定为Text类型
    public static void seqFileWriterForKafka(Configuration conf, Path path,ConsumerRecord<String, String> record) throws IOException {
        IntWritable key = new IntWritable();
        Text value = new Text();
        SequenceFile.Writer.Option fileOption = SequenceFile.Writer.file(path);
        SequenceFile.Writer.Option keyClassOption = SequenceFile.Writer.keyClass(key.getClass());
        SequenceFile.Writer.Option valueClassOption = SequenceFile.Writer.valueClass(value.getClass());

        //只对value进行压缩,选用RECORD类型

        SequenceFile.Writer  writer = SequenceFile.createWriter(conf, fileOption, keyClassOption, valueClassOption, SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD));



        try {

                key.set(Integer.valueOf(record.key()));
                value.set(record.value());
                System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key,
                        value);
                //追加到文件
                writer.append(key, value);


        }finally {
            IOUtils.closeStream(writer);
        }
    }





    //写seqFileWriter数据demo
    public static void seqFileWriter(Configuration conf, Path path, IntWritable key,Text value) throws IOException {



        SequenceFile.Writer writer = null;
        SequenceFile.Writer.Option fileOption = SequenceFile.Writer.file(path);
        SequenceFile.Writer.Option keyClassOption = SequenceFile.Writer.keyClass(key.getClass());
        SequenceFile.Writer.Option valueClassOption = SequenceFile.Writer.valueClass(value.getClass());

        //只对value进行压缩,选用RECORD类型

        writer = SequenceFile.createWriter(conf, fileOption, keyClassOption, valueClassOption, SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD));



        try {
            for (int i = 0; i < 100; i++) {
                key.set(1 + i);
                value.set(MYVALUE[i % MYVALUE.length]);
                System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key,
                        value);
                //追加到文件
                writer.append(key, value);
            }

        }finally {
            IOUtils.closeStream(writer);
        }

    }

    //读seqFileWriter数据demo,同样适用于读kafka在HDFS上的数据
    public static void seqFileReader(Configuration conf,Path path) throws IOException {

        SequenceFile.Reader.Option fileOption = SequenceFile.Reader.file(path);
        //SequenceFile.Reader.Option lenOption = SequenceFile.Reader.length(1000);  //读取10个字节做测试

        SequenceFile.Reader reader = null;
        try {
            reader = new SequenceFile.Reader(conf, fileOption);

            //定义要读取的KEY VALUE
            Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
            Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);

            long position = reader.getPosition();

            while (reader.next(key, value)) {
                String syncSeen = reader.syncSeen() ? "*" : ""; //做布尔值的替换,true换成*
                System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key,
                        value);
                //position = reader.getPosition(); // beginning of next record
            }

        }finally {
            IOUtils.closeStream(reader);
        }

    }

    //写任意格式数据(底层其实还是会做序列化的,)
    public static void normalFileWriter(Configuration conf,Path path,String uri) throws IOException {
        FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);
        FSDataOutputStream fsDataOutputStream = fileSystem.create(path);
        fsDataOutputStream.writeChars("hello world!");
        fsDataOutputStream.close();
        fileSystem.close();
    }

    //读取任意格式数据
    public static void normalFileReader(Configuration conf,Path path,String uri)throws IOException {
        FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);
        FSDataInputStream fsDataInputStream = fileSystem.open(path);

        //读取1K字节读取
        byte[] buffer = new byte[1024];
        //记录读取长度
        int len = fsDataInputStream.read(buffer);
        while (len != -1) {
            System.out.write(buffer, 0, len);
            len = fsDataInputStream.read(buffer);
        }
    }

    //做一些初始化操作
    public static Configuration initialConfig(String userName,String FileSystemurl,String yarnAddress){
        //设置使用root用户操作
        System.setProperty("HADOOP_USER_NAME", userName);

        Configuration conf = new Configuration();

        //设置hdfs和yarn地址
        conf.set("fs.defaultFS",FileSystemurl);
        conf.set("yarn.resourcemanager.hostname",yarnAddress);

        return conf;
    }

}

3. 实验结果

生产者:


消费者:

尝试读取结果:

4 问题分析

细心的读者已经发现,为什么读取文件的时候只有一条记录呢?我们KAFKA明明往alluxio写了多条记录呀。
z
仔细分析可以发现,读出来的记录时最后一条记录。这是因为上面kafka往alluxio写数据,并不是采用追加的方式,而是每次写记录都是全新的写入。

这到底是为啥呢?我们仔细看下消费者的输出,其实有报错:

以上结果可知:

  1. hadoop读取文件我们仍然使用追加的方式,所以如果文件不存在会报错
  2. alluxio写入文件要求文件是不存在的,才能写入(这个其实可以利用ALLUXIO提供的客户端接口解决这个问题)

这个怎么解决呢?请听下回分解!

哈咯小伙伴们,让我来告诉你们答案吧。同学们可以看下alluxio官方文档——文件系统客户端API,这里面提到了,alluxio采用一次写入的机制。这就有点坑爹了。但是我这里也给出一个解决方案:即在kafka消费者端取数据的时候采用批量取。当取到一定批次大小的时候,采用FileSystem接口一次写入alluxio。修改后的KafkaConsumer和RWHDFSTest的代码如下:

  1. KafkaConsumer.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @author Wan Kaiming on 2016/8/2
 * @version 1.0
 */
public class MyConsumer {

    //记录多线程消费的所有记录数
    private static AtomicLong totalRecordsCount = new AtomicLong(0);

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        MyConsumer myConsumer = new MyConsumer();
        //myConsumer.runAuto();
        //myConsumer.runMannual();
        myConsumer.runAutoWithMultiThreads();

    }


    /**
     *  自动提交offset,多线程消费
     */

    private void runAutoWithMultiThreads() throws ExecutionException, InterruptedException {
        Properties props = new Properties();

        //broker的地址
        props.put("bootstrap.servers", "mysql1:9092,mysql4:9092");
        //唯一标识消费者组的ID
        props.put("group.id", "test_new");
        //开启自动提交offset
        props.put("enable.auto.commit", "true");
        //自动提交offset间隔1s
        props.put("auto.commit.interval.ms", "1000");
        //消费会话超时时间10s(做测试,默认30s)
        props.put("session.timeout.ms", "30000");
        //key的反序列化类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //value的反序列化类
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //创建多个consumer实例,并且全部订阅同一个topic
        List<KafkaConsumer<String, String>> consumerList = new ArrayList<>();
        for(int i=0;i<6;i++){
            //指定clientID方便监控
//            String clientID = "test_consumer_" + Integer.toString(i);
//            props.put("client.id", clientID);
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
            consumer.subscribe(Arrays.asList("my-topic-new"));
            consumerList.add(consumer);
        }



        ExecutorService executorService = Executors.newFixedThreadPool(6);


        //启动6个消费线程

        Future future = null;
        for(int i=0;i<6;i++) {

            future =  executorService.submit(new ConsumerRunnable(consumerList.get(i)));

        }

        executorService.shutdown();

        while(future.get() == null){
            System.out.println("消费者总计消费"+totalRecordsCount.get());
        }
    }



    //创建消费类,用于执行多线程
    class ConsumerRunnable implements Runnable{

        KafkaConsumer<String, String> consumer;

        public ConsumerRunnable(KafkaConsumer<String, String> consumer){
            this.consumer=consumer;
        }

        @Override
        public void run() {
            int count = 0;

            String tempStr;


            //由于alluxio一次写入的特性,这里采用批处理。
            final int minBatchSize = 10;
            List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();


            //长连接
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {

                    count++;
                    tempStr="offset = "+ record.offset()+", key = "+record.key()+", value = "+record.value()+", count = "+count;
                    System.out.println(tempStr);
                    MyConsumer.totalRecordsCount.addAndGet(1);
                    //System.out.print(MyConsumer.totalRecordsCount.get()+" ");
//
//                    try {
//                        KafkaUtils.flushToDisk(Long.toString(MyConsumer.totalRecordsCount.get())+" ","consumer_output.txt");
//                    } catch (IOException e) {
//                        e.printStackTrace();
//                    }

                    //加入缓冲区
                    buffer.add(record);
                    if(buffer.size() >= minBatchSize) {
                        try {
                            hdfsWriter(buffer);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }

                }

                //System.out.println("线程"+Thread.currentThread().getId()+"消费记录:"+count+"条");

                //消费完毕后休眠20秒或者20ms
                try {
                    Thread.currentThread().sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }

        }
    }



    /**
     * 自动提交offset
     */
    private void runAuto(){

        Properties props = new Properties();
        //指定clientID方便监控
        props.put("client.id", "test_consumer_12");
        //broker的地址
        props.put("bootstrap.servers", "mysql1:9092,mysql4:9092");
        //唯一标识消费者组的ID
        props.put("group.id", "test_new");
        //开启自动提交offset
        props.put("enable.auto.commit", "true");
        //自动提交offset间隔1s
        props.put("auto.commit.interval.ms", "1000");
        //消费会话超时时间30s
        props.put("session.timeout.ms", "30000");
        //key的反序列化类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //value的反序列化类
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //多个主题用逗号分隔
        consumer.subscribe(Arrays.asList("my-topic-new"));

        String tempStr;

        //长连接
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records){
                System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
                tempStr = record.key() + record.value();
            }

            //消费完毕后休眠10秒
            try {
                Thread.currentThread().sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }


    /**
     * 手动提交offset
     */
    private void runMannual(){
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.45.10.31:9092,10.45.10.34:9092");
        props.put("group.id", "test");
        //不开启自动提交
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("my-topic-new"));
        final int minBatchSize = 200;
        List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                buffer.add(record);
            }
            if (buffer.size() >= minBatchSize) {

                //打印结果
                for(ConsumerRecord<String,String> record:buffer){
                    System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
                }

                //同步提交
                consumer.commitSync();
                buffer.clear();
            }
        }
    }


    /**
     *     消费的数据传递到HDFS
     *     由于本来就是健值对,所以就采用SequenceFile
     */
    private void hdfsWriter(List<ConsumerRecord<String, String>> buffer) throws IOException {
        Configuration conf = RWHDFSTest.initialConfig("root", "alluxio://10.45.10.33:19998", "10.45.10.33");

        //设置需要访问的文件在HDFS上的URL
        String uri = "/linecount/kafka_input_file.txt";

        Path path = new Path(uri);



        //使用已经写好的类,将kafka数据持久化到HDFS
        RWHDFSTest.normalFileWriterForKafka(conf,path,uri,buffer);

    }

    private void hdfsReader() throws IOException {
        Configuration conf = RWHDFSTest.initialConfig("root", "alluxio://10.45.10.33:19998", "10.45.10.33");

        //设置需要访问的文件在HDFS上的URL
        String uri = "/linecount/kafka_input_file.txt";

        Path path = new Path(uri);


        //使用已经写好的类,将kafka数据持久化到HDFS
        RWHDFSTest.seqFileReader(conf,path);
    }

}
  1. RWHDFSTest.java

主要就是加了个normalFileWriterForKafka方法

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.io.IOException;
import java.net.URI;
import java.util.List;

/**
 *
 * 对HDFS进行读写的一个demo
 * @author Wan Kaiming on 2016/9/14
 * @version 1.0
 */
public class RWHDFSTest {

    //初始化需要存储到HDFS上的value
    private static String[] MYVALUE = {
            "One, two, buckle my shoe",
            "Three, four, shut the door",
            "Five, six, pick up sticks",
            "Seven, eight, lay them straight",
            "Nine, ten, a big fat hen"
    };

    public static void main(String[] args) throws IOException {

        Configuration conf = initialConfig("root", "alluxio://10.45.10.33:19998", "10.45.10.33");


        //设置需要访问的文件在HDFS上的URL
        String uri = "/linecount/kafka_input_file.txt";

        Path path = new Path(uri);

        IntWritable key = new IntWritable();
        Text value = new Text();


        //写到HDFS上,采用SequenceFile
        //seqFileWriter(conf,path,key,value);
        //从HDFS上读取SequenceFile文件
        //seqFileReader(conf,path);
        //normalFileWriter(conf,path,uri);
        normalFileReader(conf, path,uri);

    }


    //用于写kafka数据的接口,key固定为int类型,value固定为Text类型(注意alluxio使用SequenceFile接口智能写入一个key,value对)
    public static void seqFileWriterForKafka(Configuration conf, Path path,ConsumerRecord<String, String> record) throws IOException {
        IntWritable key = new IntWritable();
        Text value = new Text();
        SequenceFile.Writer.Option fileOption = SequenceFile.Writer.file(path);
        SequenceFile.Writer.Option keyClassOption = SequenceFile.Writer.keyClass(key.getClass());
        SequenceFile.Writer.Option valueClassOption = SequenceFile.Writer.valueClass(value.getClass());

        //只对value进行压缩,选用RECORD类型

        SequenceFile.Writer  writer = SequenceFile.createWriter(conf, fileOption, keyClassOption, valueClassOption, SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD));



        try {

            key.set(Integer.valueOf(record.key()));
            value.set(record.value());
            System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key,
                    value);
            //追加到文件
            writer.append(key, value);


        }finally {
            IOUtils.closeStream(writer);
        }
    }





    //写seqFileWriter数据demo
    public static void seqFileWriter(Configuration conf, Path path, IntWritable key,Text value) throws IOException {



        SequenceFile.Writer writer = null;
        SequenceFile.Writer.Option fileOption = SequenceFile.Writer.file(path);
        SequenceFile.Writer.Option keyClassOption = SequenceFile.Writer.keyClass(key.getClass());
        SequenceFile.Writer.Option valueClassOption = SequenceFile.Writer.valueClass(value.getClass());

        //只对value进行压缩,选用RECORD类型

        writer = SequenceFile.createWriter(conf, fileOption, keyClassOption, valueClassOption, SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD));



        try {
            for (int i = 0; i < 100; i++) {
                key.set(1 + i);
                value.set(MYVALUE[i % MYVALUE.length]);
                System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key,
                        value);
                //追加到文件
                writer.append(key, value);
            }

        }finally {
            IOUtils.closeStream(writer);
        }

    }

    //读seqFileWriter数据demo,同样适用于读kafka在HDFS上的数据
    public static void seqFileReader(Configuration conf,Path path) throws IOException {

        SequenceFile.Reader.Option fileOption = SequenceFile.Reader.file(path);
        //SequenceFile.Reader.Option lenOption = SequenceFile.Reader.length(1000);  //读取10个字节做测试

        SequenceFile.Reader reader = null;
        try {
            reader = new SequenceFile.Reader(conf, fileOption);

            //定义要读取的KEY VALUE
            Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
            Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);

            long position = reader.getPosition();

            while (reader.next(key, value)) {
                String syncSeen = reader.syncSeen() ? "*" : ""; //做布尔值的替换,true换成*
                System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key,
                        value);
                //position = reader.getPosition(); // beginning of next record
            }

        }finally {
            IOUtils.closeStream(reader);
        }

    }

    //写任意格式数据(底层其实还是会做序列化的,)
    public static void normalFileWriter(Configuration conf,Path path,String uri) throws IOException {
        FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);
        FSDataOutputStream fsDataOutputStream = fileSystem.create(path);
        fsDataOutputStream.writeChars("hello world!");
        fsDataOutputStream.close();
        fileSystem.close();
    }


    //写kafka消息到(仅供测试用)
    public static void normalFileWriterForKafka(Configuration conf,Path path,String uri,List<ConsumerRecord<String, String>> buffer) throws IOException{
        FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);
        FSDataOutputStream fsDataOutputStream = fileSystem.create(path);
        fsDataOutputStream.writeChars(buffer.toString());
        fsDataOutputStream.close();
        fileSystem.close();
    }

    //读取任意格式数据
    public static void normalFileReader(Configuration conf,Path path,String uri)throws IOException {
        FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);
        FSDataInputStream fsDataInputStream = fileSystem.open(path);

        //读取1K字节读取
        byte[] buffer = new byte[1024];
        //记录读取长度
        int len = fsDataInputStream.read(buffer);
        while (len != -1) {
            System.out.write(buffer, 0, len);
            len = fsDataInputStream.read(buffer);
        }
    }

    //做一些初始化操作
    public static Configuration initialConfig(String userName,String FileSystemurl,String yarnAddress){
        //设置使用root用户操作
        System.setProperty("HADOOP_USER_NAME", userName);

        Configuration conf = new Configuration();

        //设置hdfs和yarn地址
        conf.set("fs.defaultFS",FileSystemurl);
        conf.set("yarn.resourcemanager.hostname",yarnAddress);

        return conf;
    }

}

此外再补充另外两种方法:

  1. 当写数据给alluxio的时候,先写数据到HDFS,然后定期将HDFS文件load到alluxio的内存空间。
  2. 采用键值对存储(该功能还属于内测哦,默认不开启该功能)。通过键值对存储库可以使用alluxio API往里面追加键值对。也可以通过