1. 介绍

Avro是一个数据序列化系统。
它提供以下:

  1. 丰富的数据结构类型
  2. 快速可压缩的二进制数据形式
  3. 存储持久数据的文件容器
  4. 远程过程调用RPC
  5. 简单的动态语言结合功能,Avro和动态语言结合后,读写数据文件和使用RPC协议都不需要生成代码,而代码生成作为一种可选的优化只值得在静态类型语言中实现。

Avro依赖于模式(Schema)。Avro数据的读写操作是很频繁的,而这些操作都需要使用模式,这样就减少写入每个数据资料的开销,使得序列化快速而又轻巧。这种数据及其模式的自我描述方便于动态脚本语言的使用。

avro有两种使用方式,一种是generic 另外一中是specific。两者的区别可以参考hadoop深入研究:(十六)——Avro序列化与反序列化Avro Client-Server - generic vs specific

下面介绍如何使用avro进行序列化和反序列化的操作。

2 specific

specific使用起来比较简便,推荐使用。generic据说比较适合RPC代理的场景。具体可以看Avro Client-Server - generic vs specific这边的讨论

2.1 MAVEN配置

使用IDEA创建一个MAVEN项目,在pom文件中添加如下内容

    <build>
        <plugins>
            <!-- maven的avro插件,用于根据schema文件生成代码 -->
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.8.1</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <!--定义获取schema文件的位置,以及输出目录 -->
                        <configuration>
                            <sourceDirectory>E:/JavaProjects/KafkaLearning/flume_test/src/main/resources</sourceDirectory>
                            <outputDirectory>E:/JavaProjects/KafkaLearning/flume_test/src/main/java/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <!--源代码和目标平台编译代码所使用JAVA版本-->
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>


    <dependencies>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version> 1.8.1</version>
        </dependency>
    </dependencies>

2.2. 新建schema

根据配置文件中配置的路径,我们在resources目录下添加以类名命名的avsc格式的文件:

Student.avsc文件的内容如下:

 {"namespace": "com.best.avro.test.bean",
 "type": "record",
 "name": "Student",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}

PS:

  1. namespace在java项目中翻译成包名
  2. name是类名
  3. fields就是配置的属性 注意:必须配置type为record

2.3 . 编译生成目标类

2.4 构造对象实例

用生成的Student类来构造类对象,有三种方式,代码如下:

package com.best.avro.test.bean;

import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * @author Wan Kaiming on 2016/8/4
 * @version 1.0
 */
public class TestAvroSpecific {

    public static void main(String[] args) throws IOException {
        //方法1:使用new方法
        Student s1 = new Student("Kami",10,"Red");

        //方法2:使用set方法
        Student s2 = new Student();
        s2.setName("David");
        s2.setFavoriteNumber(12);
        s2.setFavoriteColor("Green");

        //方法3:使用build 方法
        Student s3 = Student.newBuilder().setName("Lucy").setFavoriteNumber(20).setFavoriteColor("Blue").build();


        //加入list
        List<Student> studentList = new ArrayList<>();
        studentList.add(s1);
        studentList.add(s2);
        studentList.add(s3);

        serializeAvroToFile(studentList,"myFile.txt");
        deserializeAvroFromFile("myFile.txt");

    }


    //使用avro协议序列化对象到文件中
    private static void serializeAvroToFile(List<Student> studentList,String fileName) throws IOException {

        DatumWriter<Student> datumWriter = new SpecificDatumWriter<Student>(Student.class);
        DataFileWriter<Student> dataFileWriter = new DataFileWriter<Student>(datumWriter);
        dataFileWriter.create(studentList.get(0).getSchema(), new File(fileName));
        for (Student student: studentList) {
            dataFileWriter.append(student);
        }
        dataFileWriter.close();
    }


    //使用avro协议反序列化对象到内存并且打印
    private static void deserializeAvroFromFile(String fileName) throws IOException {
        File file = new File(fileName);
        DatumReader<Student> datumReader = new SpecificDatumReader<Student>(Student.class);
        DataFileReader<Student> dataFileReader = new DataFileReader<Student>(file, datumReader);
        Student student = null;
        System.out.println("----------------deserializeAvroFromFile-------------------");
        while (dataFileReader.hasNext()) {
            student = dataFileReader.next(student);
            System.out.println(student);
        }
    }
}

3. generic

同样的,我们在指定目录下先新建一个Teacher类的schema

{"namespace": "com.best.avro.test.bean",
 "type": "record",
 "name": "Teacher",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}

然后创建一个类来测试序列化和反序列化,代码如下:

package com.best.avro.test.bean;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.*;

import java.io.File;
import java.io.IOException;

/**
 * @author Wan Kaiming on 2016/8/4
 * @version 1.0
 */
public class TestAvroGeneric {

    public static void main(String[] args) throws IOException {

        TestAvroGeneric testAvroGeneric = new TestAvroGeneric();
        File myFile = new File("myTeacher.txt");

        //1. 将schema从StringPair.avsc文件中加载
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(testAvroGeneric.getClass().getResourceAsStream("/Teacher.avsc"));

        //2. 根据schema创建一个record示例,即我们需要序列化的记录
        GenericRecord genericRecord = new GenericData.Record(schema);
        genericRecord.put("name", "Kami");
        genericRecord.put("favorite_number", 18);
        genericRecord.put("favorite_color", "Red");



        //3. 利用avro提供的IO类来做序列化,需要传递 schema对象
        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
        DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
        dataFileWriter.create(genericRecord.getSchema(), myFile);
        dataFileWriter.append(genericRecord);
        dataFileWriter.close();



        //4. 从文件当中读取,注意需要传递schema对象
        DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
        DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(myFile, datumReader);
        //由于不知道是什么类型,动态加载schema。所以这里用GenericRecord对象
        GenericRecord teacher = null;
        System.out.println("----------------deserializeAvroFromFile-------------------");
        while (dataFileReader.hasNext()) {
            teacher = dataFileReader.next(teacher);
            System.out.println(teacher);
        }

    }
}