1. 介绍
采用hadoop 2.7.2的API
2. 代码
我们使用MAVEN工程,如果需要哪些依赖还不清楚,请参考我的文章idea中调试hadoop mapreduce程序(windows)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
import java.net.URI;
/**
*
* 对HDFS进行读写的一个demo
* @author Wan Kaiming on 2016/9/14
* @version 1.0
*/
public class RWHDFSTest {
//初始化需要存储到HDFS上的value
private static String[] MYVALUE = {
"One, two, buckle my shoe",
"Three, four, shut the door",
"Five, six, pick up sticks",
"Seven, eight, lay them straight",
"Nine, ten, a big fat hen"
};
public static void main(String[] args) throws IOException {
//设置使用root用户操作
System.setProperty("HADOOP_USER_NAME", "root");
Configuration conf = new Configuration();
//设置hdfs和yarn地址
conf.set("fs.defaultFS", "hdfs://10.45.10.33:8020");
conf.set("yarn.resourcemanager.hostname","10.45.10.33");
//设置需要访问的文件在HDFS上的URL
String uri = "/linecount/random_file.txt";
Path path = new Path(uri);
IntWritable key = new IntWritable();
Text value = new Text();
//写到HDFS上,采用SequenceFile
//seqFileWriter(conf,path,key,value);
//从HDFS上读取SequenceFile文件
//seqFileReader(conf,path);
//normalFileWriter(conf,path,uri);
normalFileReader(conf, path,uri);
}
//写seqFileWriter数据demo
private static void seqFileWriter(Configuration conf, Path path, IntWritable key,Text value) throws IOException {
SequenceFile.Writer writer = null;
SequenceFile.Writer.Option fileOption = SequenceFile.Writer.file(path);
SequenceFile.Writer.Option keyClassOption = SequenceFile.Writer.keyClass(key.getClass());
SequenceFile.Writer.Option valueClassOption = SequenceFile.Writer.valueClass(value.getClass());
//只对value进行压缩,选用RECORD类型
writer = SequenceFile.createWriter(conf, fileOption, keyClassOption, valueClassOption, SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD));
try {
for (int i = 0; i < 100; i++) {
key.set(1 + i);
value.set(MYVALUE[i % MYVALUE.length]);
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key,
value);
//追加到文件
writer.append(key, value);
}
}finally {
IOUtils.closeStream(writer);
}
}
//读seqFileWriter数据demo
private static void seqFileReader(Configuration conf,Path path) throws IOException {
SequenceFile.Reader.Option fileOption = SequenceFile.Reader.file(path);
SequenceFile.Reader.Option lenOption = SequenceFile.Reader.length(1000); //读取10个字节做测试
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(conf, fileOption, lenOption);
//定义要读取的KEY VALUE
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
long position = reader.getPosition();
while (reader.next(key, value)) {
String syncSeen = reader.syncSeen() ? "*" : ""; //做布尔值的替换,true换成*
System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key,
value);
position = reader.getPosition(); // beginning of next record
}
}finally {
IOUtils.closeStream(reader);
}
}
//写任意格式数据(底层其实还是会做序列化的,)
private static void normalFileWriter(Configuration conf,Path path,String uri) throws IOException {
FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);
FSDataOutputStream fsDataOutputStream = fileSystem.create(path);
fsDataOutputStream.writeChars("hello world!");
fsDataOutputStream.close();
fileSystem.close();
}
//读取任意格式数据
private static void normalFileReader(Configuration conf,Path path,String uri)throws IOException {
FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);
FSDataInputStream fsDataInputStream = fileSystem.open(path);
//读取1K字节读取
byte[] buffer = new byte[1024];
//记录读取长度
int len = fsDataInputStream.read(buffer);
while (len != -1) {
System.out.write(buffer, 0, len);
len = fsDataInputStream.read(buffer);
}
}
}