1. 介绍

采用hadoop 2.7.2的API

2. 代码

我们使用MAVEN工程,如果需要哪些依赖还不清楚,请参考我的文章idea中调试hadoop mapreduce程序(windows)

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.ReflectionUtils;

import java.io.IOException;
import java.net.URI;

/**
 *
 * 对HDFS进行读写的一个demo
 * @author Wan Kaiming on 2016/9/14
 * @version 1.0
 */
public class RWHDFSTest {

    //初始化需要存储到HDFS上的value
    private static String[] MYVALUE = {
            "One, two, buckle my shoe",
            "Three, four, shut the door",
            "Five, six, pick up sticks",
            "Seven, eight, lay them straight",
            "Nine, ten, a big fat hen"
    };

    public static void main(String[] args) throws IOException {
        //设置使用root用户操作
        System.setProperty("HADOOP_USER_NAME", "root");

        Configuration conf = new Configuration();

        //设置hdfs和yarn地址
        conf.set("fs.defaultFS", "hdfs://10.45.10.33:8020");
        conf.set("yarn.resourcemanager.hostname","10.45.10.33");

        //设置需要访问的文件在HDFS上的URL
        String uri = "/linecount/random_file.txt";

        Path path = new Path(uri);

        IntWritable key = new IntWritable();
        Text value = new Text();


        //写到HDFS上,采用SequenceFile
        //seqFileWriter(conf,path,key,value);
        //从HDFS上读取SequenceFile文件
        //seqFileReader(conf,path);
        //normalFileWriter(conf,path,uri);
        normalFileReader(conf, path,uri);

    }



    //写seqFileWriter数据demo
    private static void seqFileWriter(Configuration conf, Path path, IntWritable key,Text value) throws IOException {



        SequenceFile.Writer writer = null;
        SequenceFile.Writer.Option fileOption = SequenceFile.Writer.file(path);
        SequenceFile.Writer.Option keyClassOption = SequenceFile.Writer.keyClass(key.getClass());
        SequenceFile.Writer.Option valueClassOption = SequenceFile.Writer.valueClass(value.getClass());

        //只对value进行压缩,选用RECORD类型

        writer = SequenceFile.createWriter(conf, fileOption, keyClassOption, valueClassOption, SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD));



        try {
            for (int i = 0; i < 100; i++) {
                key.set(1 + i);
                value.set(MYVALUE[i % MYVALUE.length]);
                System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key,
                        value);
                //追加到文件
                writer.append(key, value);
            }

        }finally {
            IOUtils.closeStream(writer);
        }

    }

    //读seqFileWriter数据demo
    private static void seqFileReader(Configuration conf,Path path) throws IOException {

        SequenceFile.Reader.Option fileOption = SequenceFile.Reader.file(path);
        SequenceFile.Reader.Option lenOption = SequenceFile.Reader.length(1000);  //读取10个字节做测试

        SequenceFile.Reader reader = null;
        try {
            reader = new SequenceFile.Reader(conf, fileOption, lenOption);

            //定义要读取的KEY VALUE
            Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
            Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);

            long position = reader.getPosition();

            while (reader.next(key, value)) {
                String syncSeen = reader.syncSeen() ? "*" : ""; //做布尔值的替换,true换成*
                System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key,
                        value);
                position = reader.getPosition(); // beginning of next record
            }

        }finally {
            IOUtils.closeStream(reader);
        }

    }

    //写任意格式数据(底层其实还是会做序列化的,)
    private static void normalFileWriter(Configuration conf,Path path,String uri) throws IOException {
        FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);
        FSDataOutputStream fsDataOutputStream = fileSystem.create(path);
        fsDataOutputStream.writeChars("hello world!");
        fsDataOutputStream.close();
        fileSystem.close();
    }

    //读取任意格式数据
    private static void normalFileReader(Configuration conf,Path path,String uri)throws IOException {
        FileSystem fileSystem = FileSystem.get(URI.create(uri), conf);
        FSDataInputStream fsDataInputStream = fileSystem.open(path);

        //读取1K字节读取
        byte[] buffer = new byte[1024];
        //记录读取长度
        int len = fsDataInputStream.read(buffer);
        while (len != -1) {
            System.out.write(buffer, 0, len);
            len = fsDataInputStream.read(buffer);
        }
    }

}