1. 介绍
Avro是一个数据序列化系统。
它提供以下:
- 丰富的数据结构类型
- 快速可压缩的二进制数据形式
- 存储持久数据的文件容器
- 远程过程调用RPC
- 简单的动态语言结合功能,Avro和动态语言结合后,读写数据文件和使用RPC协议都不需要生成代码,而代码生成作为一种可选的优化只值得在静态类型语言中实现。
Avro依赖于模式(Schema)。Avro数据的读写操作是很频繁的,而这些操作都需要使用模式,这样就减少写入每个数据资料的开销,使得序列化快速而又轻巧。这种数据及其模式的自我描述方便于动态脚本语言的使用。
avro有两种使用方式,一种是generic 另外一中是specific。两者的区别可以参考hadoop深入研究:(十六)——Avro序列化与反序列化和Avro Client-Server - generic vs specific
下面介绍如何使用avro进行序列化和反序列化的操作。
2 specific
specific使用起来比较简便,推荐使用。generic据说比较适合RPC代理的场景。具体可以看Avro Client-Server - generic vs specific这边的讨论
2.1 MAVEN配置
使用IDEA创建一个MAVEN项目,在pom文件中添加如下内容
<build>
<plugins>
<!-- maven的avro插件,用于根据schema文件生成代码 -->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.1</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<!--定义获取schema文件的位置,以及输出目录 -->
<configuration>
<sourceDirectory>E:/JavaProjects/KafkaLearning/flume_test/src/main/resources</sourceDirectory>
<outputDirectory>E:/JavaProjects/KafkaLearning/flume_test/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<!--源代码和目标平台编译代码所使用JAVA版本-->
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version> 1.8.1</version>
</dependency>
</dependencies>
2.2. 新建schema
根据配置文件中配置的路径,我们在resources目录下添加以类名命名的avsc格式的文件:
Student.avsc文件的内容如下:
{"namespace": "com.best.avro.test.bean",
"type": "record",
"name": "Student",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
PS:
- namespace在java项目中翻译成包名
- name是类名
- fields就是配置的属性
注意:必须配置type为record
2.3 . 编译生成目标类
2.4 构造对象实例
用生成的Student类来构造类对象,有三种方式,代码如下:
package com.best.avro.test.bean;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* @author Wan Kaiming on 2016/8/4
* @version 1.0
*/
public class TestAvroSpecific {
public static void main(String[] args) throws IOException {
//方法1:使用new方法
Student s1 = new Student("Kami",10,"Red");
//方法2:使用set方法
Student s2 = new Student();
s2.setName("David");
s2.setFavoriteNumber(12);
s2.setFavoriteColor("Green");
//方法3:使用build 方法
Student s3 = Student.newBuilder().setName("Lucy").setFavoriteNumber(20).setFavoriteColor("Blue").build();
//加入list
List<Student> studentList = new ArrayList<>();
studentList.add(s1);
studentList.add(s2);
studentList.add(s3);
serializeAvroToFile(studentList,"myFile.txt");
deserializeAvroFromFile("myFile.txt");
}
//使用avro协议序列化对象到文件中
private static void serializeAvroToFile(List<Student> studentList,String fileName) throws IOException {
DatumWriter<Student> datumWriter = new SpecificDatumWriter<Student>(Student.class);
DataFileWriter<Student> dataFileWriter = new DataFileWriter<Student>(datumWriter);
dataFileWriter.create(studentList.get(0).getSchema(), new File(fileName));
for (Student student: studentList) {
dataFileWriter.append(student);
}
dataFileWriter.close();
}
//使用avro协议反序列化对象到内存并且打印
private static void deserializeAvroFromFile(String fileName) throws IOException {
File file = new File(fileName);
DatumReader<Student> datumReader = new SpecificDatumReader<Student>(Student.class);
DataFileReader<Student> dataFileReader = new DataFileReader<Student>(file, datumReader);
Student student = null;
System.out.println("----------------deserializeAvroFromFile-------------------");
while (dataFileReader.hasNext()) {
student = dataFileReader.next(student);
System.out.println(student);
}
}
}
3. generic
同样的,我们在指定目录下先新建一个Teacher类的schema
{"namespace": "com.best.avro.test.bean",
"type": "record",
"name": "Teacher",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
然后创建一个类来测试序列化和反序列化,代码如下:
package com.best.avro.test.bean;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.*;
import java.io.File;
import java.io.IOException;
/**
* @author Wan Kaiming on 2016/8/4
* @version 1.0
*/
public class TestAvroGeneric {
public static void main(String[] args) throws IOException {
TestAvroGeneric testAvroGeneric = new TestAvroGeneric();
File myFile = new File("myTeacher.txt");
//1. 将schema从StringPair.avsc文件中加载
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(testAvroGeneric.getClass().getResourceAsStream("/Teacher.avsc"));
//2. 根据schema创建一个record示例,即我们需要序列化的记录
GenericRecord genericRecord = new GenericData.Record(schema);
genericRecord.put("name", "Kami");
genericRecord.put("favorite_number", 18);
genericRecord.put("favorite_color", "Red");
//3. 利用avro提供的IO类来做序列化,需要传递 schema对象
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
dataFileWriter.create(genericRecord.getSchema(), myFile);
dataFileWriter.append(genericRecord);
dataFileWriter.close();
//4. 从文件当中读取,注意需要传递schema对象
DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(myFile, datumReader);
//由于不知道是什么类型,动态加载schema。所以这里用GenericRecord对象
GenericRecord teacher = null;
System.out.println("----------------deserializeAvroFromFile-------------------");
while (dataFileReader.hasNext()) {
teacher = dataFileReader.next(teacher);
System.out.println(teacher);
}
}
}