PostgreSQL&Greenplum高性能写入——百倍性能提升

背景

最近CloudCanal用户和我们反馈写入gp的性能没有开源软件bireme好,因此花了一些时间研究了下bireme的写入方式看看是否可以优化下CloudCanal写入gp/pg的性能。经过验证,其采用的copyManager方式相比CloudCanal基于PreparedStatement的写入具备更好的性能。

CopyManager

quick start

快速体验copyManager的使用,可以参考下:Fast CSV and JSON Ingestion in PostgreSQL with COPY

原理

本质上可以理解为一种物理复制,类似于MySQL的LOAD DATA INFILE。由于是物理复制,直接追加内容,效率相比传统的INSERT/UPDATE/DELETE处理是效率高非常多的。数据库处理路径上省略了传统接收请求经过SQL解析、优化执行的路径,直接在底层写入数据,因此非常快速。并不是所有的数据库都实现了这种写入方式。像pg的jdbc driver有提供copy api,我们可以利用该API来提升写入效率

如何处理UPDATE/DELETE

copy方式从底层上只支持INSERT。针对混合负载的写入其实也是可以利用copy API的性能优势,需要针对UPDATE和DELETE做以下的专门处理。

处理DELETE

postgresql不支持pk in (xx,yyy)这样的表示,因此没法依靠PreparedStatement一次性下发SQL,如果单独一条条下发,势必有较大的性能开销。这里引入一种临时表方案配来进行删表的操作,通过多一次的临时表批量写入来换取高性能的删除,据图流程可以参考下图。pg临时表可以参考文档create tablecreate table as
image.png

处理UPDATE

针对update事件,则先配合上文提到的DELETE处理转换成批量delete,然后再转成INSERT利用copy API写入。如果要基于该实现方式优化写入性能,需要满足如下条件:

  • 有主键:无主键表执行delete的时候可能删除比预期多的数据
  • 需要可以拿到update事件没有更新过的列的旧值:写入的时候必须是全列写入,如果缺少旧值,则不能保证写入数据的完整性

format与分隔符冲突处理

官方文档COPY 有详细解释copy的使用细节。一般而言采用csv的format是比较好的,避免列名重复写入key,但是主要存在的副作用就是分隔符与value冲突的问题。这个官方文档中也有专门的描述如下。解决的方式就是根据定义的escape和quote符号,将需要写入的数据包裹起来,由于提前转义,就避免了分隔符冲突的问题。

1
The values in each record are separated by the DELIMITER character. If the value contains the delimiter character, the QUOTE character, the NULL string, a carriage return, or line feed character, then the whole value is prefixed and suffixed by the QUOTE character, and any occurrence within the value of a QUOTE character or the ESCAPE character is preceded by the escape character. You can also use FORCE QUOTE to force quotes when outputting non-NULL values in specific columns.

空值处理

根据如下官方描述,采用csv format的时候,pg无法分辨是NULL还是空字符串。因此默认情况下,写入"“,即表示NULL(注意写入null值时,不要在两边加quote,否则会被转义处理不会变成null),如果需要写入空字符串,则需要表示为”" (两边使用quote转义)

1
The CSV format has no standard way to distinguish a NULL value from an empty string. PostgreSQL's COPY handles this by quoting. A NULL is output as the NULL string and is not quoted, while a data value matching the NULL string is quoted. Therefore, using the default settings, a NULL is written as an unquoted empty string, while an empty string is written with double quotes (""). Reading values follows similar rules. You can use FORCE NOT NULL to prevent NULL input comparisons for specific columns.

测试

本节对比下传统基于preparedStatement的写入方式和采用copy API的方式之间的写入性能差距

测试环境

运行写入程序使用的机器和数据库在不同的测试组中均相同。本次测试在本地Mac OS上执行,数据库使用的是测试环境GreenPlum 6.8.1
测试表结构如下:

1
2
3
4
5
6
7
8
9
10
create table public.ak_sk_list
(
id bigint not null
primary key,
access_key varchar(256) not null,
secret_key varchar(256) not null,
name varchar(256) not null,
type integer not null,
comment varchar(90) default NULL::character varying
) distributed by (id);

测试代码

测试环境使用的主要代码如下,比较了基于PreparedStatement和CopyManager两种实现方式写入80W+行数据的效率。具体获取PgConnection的方式,可以自行通过工具类实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package com.clougence.cloudcanal.postgre.pg;

import java.io.StringReader;
import java.sql.PreparedStatement;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
* @author wanshao <344277934@qq.com> create time is 2022/6/28
**/
@Slf4j
public class TestWriter {

private static long curPk = 1;

private static long batchSize = 1024;

private static int roundCount = 10;

@Test
public void testPsBatchInsert() {
String sql = buildSqlTemp();

try (DruidPooledConnection connection = (DruidPooledConnection) PgTestConnector.getTestConnection()) {
Stopwatch stopwatch = Stopwatch.createStarted();
for (int i = 0; i < roundCount; i++) {
doPsBatchExecuteRound(sql, connection);
}
stopwatch.stop();
log.warn("Execute cause " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
} catch (Exception e) {
e.printStackTrace();
}
}

@Test
public void testCopyManager() {
try (DruidPooledConnection connection = (DruidPooledConnection) PgTestConnector.getTestConnection()) {
String copySql = getCopySql("public.ak_sk_list", Lists.newArrayList("id", "access_key", "secret_key", "name", "type", "comment"));
Stopwatch stopwatch = Stopwatch.createStarted();
for (int i = 0; i < roundCount; i++) {
doCopyRound(copySql, connection);
}
stopwatch.stop();
log.warn("Execute copy cause " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
} catch (Exception e) {
e.printStackTrace();
}
}

private final String DELIMITER = "|";
private final String ROW_END = "\n";

@SneakyThrows
private void doCopyRound(String copySql, DruidPooledConnection connection) {
StringBuilder dataSb = new StringBuilder();
for (int i = 0; i < batchSize; i++) {
dataSb.append(curPk + i).append(DELIMITER);
dataSb.append("accc").append(DELIMITER);
dataSb.append("ssss").append(DELIMITER);
dataSb.append("aaa").append(DELIMITER);
dataSb.append("1").append(DELIMITER);
dataSb.append("");
dataSb.append(ROW_END);
}
String data = dataSb.toString();

try (StringReader stringReader = new StringReader(data);) {

CopyManager copyManager = new CopyManager((BaseConnection) connection.getConnection());
copyManager.copyIn(copySql, stringReader);
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally {
curPk += batchSize;
}

}

private String getCopySql(String tableName, List<String> columnList) {
StringBuilder sb = new StringBuilder().append("COPY ")
.append(tableName)
.append(" (")
.append(StringUtils.join(columnList, ","))
// see '' as database null
.append(") FROM STDIN WITH DELIMITER '|' NULL '' CSV QUOTE '\"' ESCAPE E'\\\\';");
String sql = sb.toString();
return sql;
}

private String buildSqlTemp() {
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO pipi_test.public.ak_sk_list (id, access_key, secret_key, name, type, comment) VALUES (?,?,?,?,?,?);");
return sb.toString();
}

@SneakyThrows
private void doPsBatchExecuteRound(String sql, DruidPooledConnection connection) {
try (PreparedStatement ps = connection.prepareStatement(sql)) {
for (int i = 0; i < batchSize; i++) {
ps.setLong( 1, curPk + i);
ps.setString( 2, "222");
ps.setString( 3, "333");
ps.setString( 4, "4444");
ps.setInt( 5, 1);
ps.setString( 6, "666");
ps.addBatch();
}
ps.executeBatch();

} catch (Exception e) {
e.printStackTrace();
throw e;
} finally {
curPk += batchSize;
}
}

}

测试结果

可以看到,如果直接单纯的INSERT写入,性能差距会在百倍以上。

写入方式 写入行数 执行耗时
PreparedStatement Batch Insert 10240行 47910ms
CopyManager 819200行 3387ms

迁移同步中的应用

高性能的写入在数据迁移、同步中均有重要的应用价值。CopyManager对于pg/gp的写入性能优化具有重要意义。不过对于增量数据的写入,还需要做一些SQL合并的工作,避免pk约束冲突导致同步中断。总的来说,基于copyManager的写入方式在数据迁移同步中应用具备如下优势:

  • 更极致的性能
  • 避免约束冲突导致的性能劣化