1. 介绍
本节主要介绍如何将kafka消息写入alluxio.
在学习本文前请保证已经读过我的以下文章:
2. 主要代码
由于之前做过其他很多测试,所以有其他相关的函数接口,请留意!
2.1 producer
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Date;
import java.util.Properties;
/**
* @author Wan Kaiming on 2016/8/1
* @version 1.0
*/
public class MyProducer {
public static void main(String[] args) {
//统计时间
System.out.println("程序开始时间戳信息:"+new Date());
final long startTime=System.currentTimeMillis();
Properties props = new Properties();
//指定clientID方便监控
props.put("client.id", "test_producer_1");
//broker地址 这里用域名,记得修改本地的hosts文件
props.put("bootstrap.servers", "mysql1:9092,mysql4:9092");
//消息可靠性语义
props.put("acks", "all");
//请求broker失败进行重试的次数,避免由于请求broker失败造成的消息重复
props.put("retries", 3);
//按批发送,每批的消息数量
//props.put("batch.size", 16384);
props.put("batch.size", 16384);
//防止来不及发送,延迟一点点时间,使得能够批量发送消息
props.put("linger.ms", 1);
//缓冲大小,bytes
props.put("buffer.memory", 33554432);
//key的序列化类
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//value的序列化类
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//创建一个Producer对象,加载配置上下文信息
Producer<String, String> producer = new KafkaProducer<String,String>(props);
for(int i=0;i<10;i++){
//发送一条后休眠2秒或者
try {
Thread.currentThread().sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
producer.send(new ProducerRecord<String, String>("my-topic-new", Integer.toString(i), "one,two,three,four"));
}
producer.close();
final long endTime=System.currentTimeMillis();
float excTime=(float)(endTime-startTime)/1000;
System.out.println("执行时间:"+excTime+"s");
System.out.println("当前时间为:"+ new Date());
}
}
2.2 consumer
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author Wan Kaiming on 2016/8/2
* @version 1.0
*/
public class MyConsumer {
//记录多线程消费的所有记录数
private static AtomicLong totalRecordsCount = new AtomicLong(0);
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyConsumer myConsumer = new MyConsumer();
//myConsumer.runAuto();
//myConsumer.runMannual();
myConsumer.runAutoWithMultiThreads();
}
/**
* 自动提交offset,多线程消费
*/
private void runAutoWithMultiThreads() throws ExecutionException, InterruptedException {
Properties props = new Properties();
//broker的地址
props.put("bootstrap.servers", "mysql1:9092,mysql4:9092");
//唯一标识消费者组的ID
props.put("group.id", "test_new");
//开启自动提交offset
props.put("enable.auto.commit", "true");
//自动提交offset间隔1s
props.put("auto.commit.interval.ms", "1000");
//消费会话超时时间10s(做测试,默认30s)
props.put("session.timeout.ms", "30000");
//key的反序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//value的反序列化类
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//创建多个consumer实例,并且全部订阅同一个topic
List<KafkaConsumer<String, String>> consumerList = new ArrayList<>();
for(int i=0;i<6;i++){
//指定clientID方便监控
String clientID = "test_consumer_" + Integer.toString(i);
props.put("client.id", clientID);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("my-topic-new"));
consumerList.add(consumer);
}
ExecutorService executorService = Executors.newFixedThreadPool(6);
//启动6个消费线程
Future future = null;
for(int i=0;i<6;i++) {
future = executorService.submit(new ConsumerRunnable(consumerList.get(i)));
}
executorService.shutdown();
while(future.get() == null){
System.out.println("消费者总计消费"+totalRecordsCount.get());
}
}
//创建消费类,用于执行多线程
class ConsumerRunnable implements Runnable{
KafkaConsumer<String, String> consumer;
public ConsumerRunnable(KafkaConsumer<String, String> consumer){
this.consumer=consumer;
}
@Override
public void run() {
int count = 0;
String tempStr;
//长连接
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1);
for (ConsumerRecord<String, String> record : records) {
count++;
tempStr="offset = "+ record.offset()+", key = "+record.key()+", value = "+record.value()+", count = "+count;
System.out.println(tempStr);
MyConsumer.totalRecordsCount.addAndGet(1);
//System.out.print(MyConsumer.totalRecordsCount.get()+" ");
//
// try {
// KafkaUtils.flushToDisk(Long.toString(MyConsumer.totalRecordsCount.get())+" ","consumer_output.txt");
// } catch (IOException e) {
// e.printStackTrace();
// }
try {
hdfsWriter(record);
} catch (IOException e) {
e.printStackTrace();
}
}
//System.out.println("线程"+Thread.currentThread().getId()+"消费记录:"+count+"条");
//消费完毕后休眠20秒或者20ms
try {
Thread.currentThread().sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 自动提交offset
*/
private void runAuto(){
Properties props = new Properties();
//指定clientID方便监控
props.put("client.id", "test_consumer_12");
//broker的地址
props.put("bootstrap.servers", "mysql1:9092,mysql4:9092");
//唯一标识消费者组的ID
props.put("group.id", "test_new");
//开启自动提交offset
props.put("enable.auto.commit", "true");
//自动提交offset间隔1s
props.put("auto.commit.interval.ms", "1000");
//消费会话超时时间30s
props.put("session.timeout.ms", "30000");
//key的反序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//value的反序列化类
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//多个主题用逗号分隔
consumer.subscribe(Arrays.asList("my-topic-new"));
String tempStr;
//长连接
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
tempStr = record.key() + record.value();
}
//消费完毕后休眠10秒
try {
Thread.currentThread().sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 手动提交offset
*/
private void runMannual(){
Properties props = new Properties();
props.put("bootstrap.servers", "10.45.10.31:9092,10.45.10.34:9092");
props.put("group.id", "test");
//不开启自动提交
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("my-topic-new"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
//打印结果
for(ConsumerRecord<String,String> record:buffer){
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
//同步提交
consumer.commitSync();
buffer.clear();
}
}
}
/**
* 消费的数据传递到HDFS
* 由于本来就是健值对,所以就采用SequenceFile
*/
private void hdfsWriter(ConsumerRecord<String, String> record) throws IOException {
Configuration conf = RWHDFSTest.initialConfig("root", "alluxio://10.45.10.33:19998", "10.45.10.33");
//设置需要访问的文件在HDFS上的URL
String uri = "/linecount/kafka_input_file.txt";
Path path = new Path(uri);
//使用已经写好的类,将kafka数据持久化到HDFS
RWHDFSTest.seqFileWriterForKafka(conf,path,record);
}
private void hdfsReader() throws IOException {
Configuration conf = RWHDFSTest.initialConfig("root", "alluxio://10.45.10.33:19998", "10.45.10.33");
//设置需要访问的文件在HDFS上的URL
String uri = "/linecount/kafka_input_file.txt";
Path path = new Path(uri);
//使用已经写好的类,将kafka数据持久化到HDFS
RWHDFSTest.seqFileReader(conf,path);
}
}
2.3 HDFS和alluxio的IO操作工具类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.io.IOException;
import java.net.URI;
/**
*
* 对HDFS进行读写的一个demo
* @author Wan Kaiming on 2016/9/14
* @version 1.0
*/
public class RWHDFSTest {
//初始化需要存储到HDFS上的value
private static String[] MYVALUE = {
"One, two, buckle my shoe",
"Three, four, shut the door",
"Five, six, pick up sticks",
"Seven, eight, lay them straight",
"Nine, ten, a big fat hen"
};
public static void main(String[] args) throws IOException {
Configuration conf = initialConfig("root", "alluxio://10.45.10.33:19998", "10.45.10.33");
//设置需要访问的文件在HDFS上的URL
String uri = "/linecount/kafka_input_file.txt";
Path path = new Path(uri);
IntWritable key = new IntWritable();
Text value = new Text();
//写到HDFS上,采用SequenceFile
//seqFileWriter(conf,path,key,value);
//从HDFS上读取SequenceFile文件
seqFileReader(conf,path);
//normalFileWriter(conf,path,uri);
//normalFileReader(conf, path,uri);
}
//用于写kafka数据的接口,key固定为int类型,value固定为Text类型
public static void seqFileWriterForKafka(Configuration conf, Path path,ConsumerRecord<String, String> record) throws IOException {
IntWritable key = new IntWritable();
Text value = new Text();
SequenceFile.Writer.Option fileOption = SequenceFile.Writer.file(path);
SequenceFile.Writer.Option keyClassOption = SequenceFile.Writer.keyClass(key.getClass());
SequenceFile.Writer.Option valueClassOption = SequenceFile.Writer.valueClass(value.getClass());
//只对value进行压缩,选用RECORD类型
SequenceFile.Writer writer = SequenceFile.createWriter(conf, fileOption, keyClassOption, valueClassOption, SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD));
try {
key.set(Integer.valueOf(record.key()));
value.set(record.value());
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key,
value);
//追加到文件
writer.append(key, value);
}finally {
IOUtils.closeStream(writer);
}
}
//写seqFileWriter数据demo
public static void seqFileWriter(Configuration conf, Path path, IntWritable key,Text value) throws IOException {
SequenceFile.Writer writer = null;
SequenceFile.Writer.Option fileOption = SequenceFile.Writer.file(path);
SequenceFile.Writer.Option keyClassOption = SequenceFile.Writer.keyClass(key.getClass());
SequenceFile.Writer.Option valueClassOption = SequenceFile.Writer.valueClass(value.getClass());
//只对value进行压缩,选用RECORD类型
writer = SequenceFile.createWriter(conf, fileOption, keyClassOption, valueClassOption, SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD));
try {
for (int i = 0; i < 100; i++) {
key.set(1 + i);
value.set(MYVALUE[i % MYVALUE.length]);
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key,
value);
//追加到文件
writer.append(key, value);
}
}finally {
IOUtils.closeStream(writer);
}
}
//读seqFileWriter数据demo,同样适用于读kafka在HDFS上的数据
public static void seqFileReader(Configuration conf,Path path) throws IOException {
SequenceFile.Reader.Option fileOption = SequenceFile.Reader.file(path);
//SequenceFile.Reader.Option lenOption = SequenceFile.Reader.length(1000); //读取10个字节做测试
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(conf, fileOption);
//定义要读取的KEY VALUE
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
long position = reader.getPosition();
while (reader.next(key, value)) {
String syncSeen = reader.syncSeen() ? "*" : ""; //做布尔值的替换,true换成*
System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key,
value);
//position = reader.getPosition(); // beginning of next record
}
}finally {
IOUtils.closeStream(reader);
}
}
//写任意格式数据(底层其实还是会做序列化的,)
public static void normalFileWriter(Configuration conf,Path path,String uri) throws IOException {
FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);
FSDataOutputStream fsDataOutputStream = fileSystem.create(path);
fsDataOutputStream.writeChars("hello world!");
fsDataOutputStream.close();
fileSystem.close();
}
//读取任意格式数据
public static void normalFileReader(Configuration conf,Path path,String uri)throws IOException {
FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);
FSDataInputStream fsDataInputStream = fileSystem.open(path);
//读取1K字节读取
byte[] buffer = new byte[1024];
//记录读取长度
int len = fsDataInputStream.read(buffer);
while (len != -1) {
System.out.write(buffer, 0, len);
len = fsDataInputStream.read(buffer);
}
}
//做一些初始化操作
public static Configuration initialConfig(String userName,String FileSystemurl,String yarnAddress){
//设置使用root用户操作
System.setProperty("HADOOP_USER_NAME", userName);
Configuration conf = new Configuration();
//设置hdfs和yarn地址
conf.set("fs.defaultFS",FileSystemurl);
conf.set("yarn.resourcemanager.hostname",yarnAddress);
return conf;
}
}
3. 实验结果
生产者:
消费者:
尝试读取结果:
4 问题分析
细心的读者已经发现,为什么读取文件的时候只有一条记录呢?我们KAFKA明明往alluxio写了多条记录呀。
z
仔细分析可以发现,读出来的记录时最后一条记录。这是因为上面kafka往alluxio写数据,并不是采用追加的方式,而是每次写记录都是全新的写入。
这到底是为啥呢?我们仔细看下消费者的输出,其实有报错:
以上结果可知:
- hadoop读取文件我们仍然使用追加的方式,所以如果文件不存在会报错
- alluxio写入文件要求文件是不存在的,才能写入(这个其实可以利用ALLUXIO提供的客户端接口解决这个问题)
这个怎么解决呢?请听下回分解!
哈咯小伙伴们,让我来告诉你们答案吧。同学们可以看下alluxio官方文档——文件系统客户端API,这里面提到了,alluxio采用一次写入的机制。这就有点坑爹了。但是我这里也给出一个解决方案:即在kafka消费者端取数据的时候采用批量取。当取到一定批次大小的时候,采用FileSystem接口一次写入alluxio。修改后的KafkaConsumer和RWHDFSTest的代码如下:
- KafkaConsumer.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author Wan Kaiming on 2016/8/2
* @version 1.0
*/
public class MyConsumer {
//记录多线程消费的所有记录数
private static AtomicLong totalRecordsCount = new AtomicLong(0);
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyConsumer myConsumer = new MyConsumer();
//myConsumer.runAuto();
//myConsumer.runMannual();
myConsumer.runAutoWithMultiThreads();
}
/**
* 自动提交offset,多线程消费
*/
private void runAutoWithMultiThreads() throws ExecutionException, InterruptedException {
Properties props = new Properties();
//broker的地址
props.put("bootstrap.servers", "mysql1:9092,mysql4:9092");
//唯一标识消费者组的ID
props.put("group.id", "test_new");
//开启自动提交offset
props.put("enable.auto.commit", "true");
//自动提交offset间隔1s
props.put("auto.commit.interval.ms", "1000");
//消费会话超时时间10s(做测试,默认30s)
props.put("session.timeout.ms", "30000");
//key的反序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//value的反序列化类
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//创建多个consumer实例,并且全部订阅同一个topic
List<KafkaConsumer<String, String>> consumerList = new ArrayList<>();
for(int i=0;i<6;i++){
//指定clientID方便监控
// String clientID = "test_consumer_" + Integer.toString(i);
// props.put("client.id", clientID);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("my-topic-new"));
consumerList.add(consumer);
}
ExecutorService executorService = Executors.newFixedThreadPool(6);
//启动6个消费线程
Future future = null;
for(int i=0;i<6;i++) {
future = executorService.submit(new ConsumerRunnable(consumerList.get(i)));
}
executorService.shutdown();
while(future.get() == null){
System.out.println("消费者总计消费"+totalRecordsCount.get());
}
}
//创建消费类,用于执行多线程
class ConsumerRunnable implements Runnable{
KafkaConsumer<String, String> consumer;
public ConsumerRunnable(KafkaConsumer<String, String> consumer){
this.consumer=consumer;
}
@Override
public void run() {
int count = 0;
String tempStr;
//由于alluxio一次写入的特性,这里采用批处理。
final int minBatchSize = 10;
List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
//长连接
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
count++;
tempStr="offset = "+ record.offset()+", key = "+record.key()+", value = "+record.value()+", count = "+count;
System.out.println(tempStr);
MyConsumer.totalRecordsCount.addAndGet(1);
//System.out.print(MyConsumer.totalRecordsCount.get()+" ");
//
// try {
// KafkaUtils.flushToDisk(Long.toString(MyConsumer.totalRecordsCount.get())+" ","consumer_output.txt");
// } catch (IOException e) {
// e.printStackTrace();
// }
//加入缓冲区
buffer.add(record);
if(buffer.size() >= minBatchSize) {
try {
hdfsWriter(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
//System.out.println("线程"+Thread.currentThread().getId()+"消费记录:"+count+"条");
//消费完毕后休眠20秒或者20ms
try {
Thread.currentThread().sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 自动提交offset
*/
private void runAuto(){
Properties props = new Properties();
//指定clientID方便监控
props.put("client.id", "test_consumer_12");
//broker的地址
props.put("bootstrap.servers", "mysql1:9092,mysql4:9092");
//唯一标识消费者组的ID
props.put("group.id", "test_new");
//开启自动提交offset
props.put("enable.auto.commit", "true");
//自动提交offset间隔1s
props.put("auto.commit.interval.ms", "1000");
//消费会话超时时间30s
props.put("session.timeout.ms", "30000");
//key的反序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//value的反序列化类
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//多个主题用逗号分隔
consumer.subscribe(Arrays.asList("my-topic-new"));
String tempStr;
//长连接
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
tempStr = record.key() + record.value();
}
//消费完毕后休眠10秒
try {
Thread.currentThread().sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 手动提交offset
*/
private void runMannual(){
Properties props = new Properties();
props.put("bootstrap.servers", "10.45.10.31:9092,10.45.10.34:9092");
props.put("group.id", "test");
//不开启自动提交
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("my-topic-new"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
//打印结果
for(ConsumerRecord<String,String> record:buffer){
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
//同步提交
consumer.commitSync();
buffer.clear();
}
}
}
/**
* 消费的数据传递到HDFS
* 由于本来就是健值对,所以就采用SequenceFile
*/
private void hdfsWriter(List<ConsumerRecord<String, String>> buffer) throws IOException {
Configuration conf = RWHDFSTest.initialConfig("root", "alluxio://10.45.10.33:19998", "10.45.10.33");
//设置需要访问的文件在HDFS上的URL
String uri = "/linecount/kafka_input_file.txt";
Path path = new Path(uri);
//使用已经写好的类,将kafka数据持久化到HDFS
RWHDFSTest.normalFileWriterForKafka(conf,path,uri,buffer);
}
private void hdfsReader() throws IOException {
Configuration conf = RWHDFSTest.initialConfig("root", "alluxio://10.45.10.33:19998", "10.45.10.33");
//设置需要访问的文件在HDFS上的URL
String uri = "/linecount/kafka_input_file.txt";
Path path = new Path(uri);
//使用已经写好的类,将kafka数据持久化到HDFS
RWHDFSTest.seqFileReader(conf,path);
}
}
- RWHDFSTest.java
主要就是加了个normalFileWriterForKafka方法
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.io.IOException;
import java.net.URI;
import java.util.List;
/**
*
* 对HDFS进行读写的一个demo
* @author Wan Kaiming on 2016/9/14
* @version 1.0
*/
public class RWHDFSTest {
//初始化需要存储到HDFS上的value
private static String[] MYVALUE = {
"One, two, buckle my shoe",
"Three, four, shut the door",
"Five, six, pick up sticks",
"Seven, eight, lay them straight",
"Nine, ten, a big fat hen"
};
public static void main(String[] args) throws IOException {
Configuration conf = initialConfig("root", "alluxio://10.45.10.33:19998", "10.45.10.33");
//设置需要访问的文件在HDFS上的URL
String uri = "/linecount/kafka_input_file.txt";
Path path = new Path(uri);
IntWritable key = new IntWritable();
Text value = new Text();
//写到HDFS上,采用SequenceFile
//seqFileWriter(conf,path,key,value);
//从HDFS上读取SequenceFile文件
//seqFileReader(conf,path);
//normalFileWriter(conf,path,uri);
normalFileReader(conf, path,uri);
}
//用于写kafka数据的接口,key固定为int类型,value固定为Text类型(注意alluxio使用SequenceFile接口智能写入一个key,value对)
public static void seqFileWriterForKafka(Configuration conf, Path path,ConsumerRecord<String, String> record) throws IOException {
IntWritable key = new IntWritable();
Text value = new Text();
SequenceFile.Writer.Option fileOption = SequenceFile.Writer.file(path);
SequenceFile.Writer.Option keyClassOption = SequenceFile.Writer.keyClass(key.getClass());
SequenceFile.Writer.Option valueClassOption = SequenceFile.Writer.valueClass(value.getClass());
//只对value进行压缩,选用RECORD类型
SequenceFile.Writer writer = SequenceFile.createWriter(conf, fileOption, keyClassOption, valueClassOption, SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD));
try {
key.set(Integer.valueOf(record.key()));
value.set(record.value());
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key,
value);
//追加到文件
writer.append(key, value);
}finally {
IOUtils.closeStream(writer);
}
}
//写seqFileWriter数据demo
public static void seqFileWriter(Configuration conf, Path path, IntWritable key,Text value) throws IOException {
SequenceFile.Writer writer = null;
SequenceFile.Writer.Option fileOption = SequenceFile.Writer.file(path);
SequenceFile.Writer.Option keyClassOption = SequenceFile.Writer.keyClass(key.getClass());
SequenceFile.Writer.Option valueClassOption = SequenceFile.Writer.valueClass(value.getClass());
//只对value进行压缩,选用RECORD类型
writer = SequenceFile.createWriter(conf, fileOption, keyClassOption, valueClassOption, SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD));
try {
for (int i = 0; i < 100; i++) {
key.set(1 + i);
value.set(MYVALUE[i % MYVALUE.length]);
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key,
value);
//追加到文件
writer.append(key, value);
}
}finally {
IOUtils.closeStream(writer);
}
}
//读seqFileWriter数据demo,同样适用于读kafka在HDFS上的数据
public static void seqFileReader(Configuration conf,Path path) throws IOException {
SequenceFile.Reader.Option fileOption = SequenceFile.Reader.file(path);
//SequenceFile.Reader.Option lenOption = SequenceFile.Reader.length(1000); //读取10个字节做测试
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(conf, fileOption);
//定义要读取的KEY VALUE
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
long position = reader.getPosition();
while (reader.next(key, value)) {
String syncSeen = reader.syncSeen() ? "*" : ""; //做布尔值的替换,true换成*
System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key,
value);
//position = reader.getPosition(); // beginning of next record
}
}finally {
IOUtils.closeStream(reader);
}
}
//写任意格式数据(底层其实还是会做序列化的,)
public static void normalFileWriter(Configuration conf,Path path,String uri) throws IOException {
FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);
FSDataOutputStream fsDataOutputStream = fileSystem.create(path);
fsDataOutputStream.writeChars("hello world!");
fsDataOutputStream.close();
fileSystem.close();
}
//写kafka消息到(仅供测试用)
public static void normalFileWriterForKafka(Configuration conf,Path path,String uri,List<ConsumerRecord<String, String>> buffer) throws IOException{
FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);
FSDataOutputStream fsDataOutputStream = fileSystem.create(path);
fsDataOutputStream.writeChars(buffer.toString());
fsDataOutputStream.close();
fileSystem.close();
}
//读取任意格式数据
public static void normalFileReader(Configuration conf,Path path,String uri)throws IOException {
FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);
FSDataInputStream fsDataInputStream = fileSystem.open(path);
//读取1K字节读取
byte[] buffer = new byte[1024];
//记录读取长度
int len = fsDataInputStream.read(buffer);
while (len != -1) {
System.out.write(buffer, 0, len);
len = fsDataInputStream.read(buffer);
}
}
//做一些初始化操作
public static Configuration initialConfig(String userName,String FileSystemurl,String yarnAddress){
//设置使用root用户操作
System.setProperty("HADOOP_USER_NAME", userName);
Configuration conf = new Configuration();
//设置hdfs和yarn地址
conf.set("fs.defaultFS",FileSystemurl);
conf.set("yarn.resourcemanager.hostname",yarnAddress);
return conf;
}
}
此外再补充另外两种方法:
- 当写数据给alluxio的时候,先写数据到HDFS,然后定期将HDFS文件load到alluxio的内存空间。
- 采用键值对存储(该功能还属于内测哦,默认不开启该功能)。通过键值对存储库可以使用alluxio API往里面追加键值对。也可以通过