PostgreSQL&Greenplum高性能写入——百倍性能提升

背景

最近 CloudCanal 用户和我们反馈写入 gp 的性能没有开源软件bireme好,因此花了一些时间研究了下 bireme 的写入方式看看是否可以优化下 CloudCanal 写入 gp/pg 的性能。经过验证,其采用的 copyManager 方式相比 CloudCanal 基于 PreparedStatement 的写入具备更好的性能。

CopyManager

quick start

快速体验 copyManager 的使用,可以参考下:Fast CSV and JSON Ingestion in PostgreSQL with COPY

原理

本质上可以理解为一种物理复制,类似于 MySQL 的 LOAD DATA INFILE。由于是物理复制,直接追加内容,效率相比传统的 INSERT/UPDATE/DELETE 处理是效率高非常多的。数据库处理路径上省略了传统接收请求经过 SQL 解析、优化执行的路径,直接在底层写入数据,因此非常快速。并不是所有的数据库都实现了这种写入方式。像 pg 的 jdbc driver 有提供 copy api,我们可以利用该 API 来提升写入效率

如何处理 UPDATE/DELETE

copy 方式从底层上只支持 INSERT。针对混合负载的写入其实也是可以利用 copy API 的性能优势,需要针对 UPDATE 和 DELETE 做以下的专门处理。

处理 DELETE

postgresql 不支持 pk in (xx,yyy)这样的表示,因此没法依靠 PreparedStatement 一次性下发 SQL,如果单独一条条下发,势必有较大的性能开销。这里引入一种临时表方案配来进行删表的操作,通过多一次的临时表批量写入来换取高性能的删除,据图流程可以参考下图。pg 临时表可以参考文档create tablecreate table as

处理 UPDATE

针对 update 事件,则先配合上文提到的 DELETE 处理转换成批量 delete,然后再转成 INSERT 利用 copy API 写入。如果要基于该实现方式优化写入性能,需要满足如下条件:

  • 有主键:无主键表执行 delete 的时候可能删除比预期多的数据
  • 需要可以拿到 update 事件没有更新过的列的旧值:写入的时候必须是全列写入,如果缺少旧值,则不能保证写入数据的完整性

format 与分隔符冲突处理

官方文档 COPY 有详细解释 copy 的使用细节。一般而言采用 csv 的 format 是比较好的,避免列名重复写入 key,但是主要存在的副作用就是分隔符与 value 冲突的问题。这个官方文档中也有专门的描述如下。解决的方式就是根据定义的 escape 和 quote 符号,将需要写入的数据包裹起来,由于提前转义,就避免了分隔符冲突的问题。

1
The values in each record are separated by the DELIMITER character. If the value contains the delimiter character, the QUOTE character, the NULL string, a carriage return, or line feed character, then the whole value is prefixed and suffixed by the QUOTE character, and any occurrence within the value of a QUOTE character or the ESCAPE character is preceded by the escape character. You can also use FORCE QUOTE to force quotes when outputting non-NULL values in specific columns.

空值处理

根据如下官方描述,采用 csv format 的时候,pg 无法分辨是 NULL 还是空字符串。因此默认情况下,写入"“,即表示 NULL(注意写入 null 值时,不要在两边加 quote,否则会被转义处理不会变成 null),如果需要写入空字符串,则需要表示为”" (两边使用 quote 转义)

1
The CSV format has no standard way to distinguish a NULL value from an empty string. PostgreSQL's COPY handles this by quoting. A NULL is output as the NULL string and is not quoted, while a data value matching the NULL string is quoted. Therefore, using the default settings, a NULL is written as an unquoted empty string, while an empty string is written with double quotes (""). Reading values follows similar rules. You can use FORCE NOT NULL to prevent NULL input comparisons for specific columns.

测试

本节对比下传统基于 preparedStatement 的写入方式和采用 copy API 的方式之间的写入性能差距

测试环境

运行写入程序使用的机器和数据库在不同的测试组中均相同。本次测试在本地 Mac OS 上执行,数据库使用的是测试环境 GreenPlum 6.8.1
测试表结构如下:

1
2
3
4
5
6
7
8
9
10
create table public.ak_sk_list
(
id bigint not null
primary key,
access_key varchar(256) not null,
secret_key varchar(256) not null,
name varchar(256) not null,
type integer not null,
comment varchar(90) default NULL::character varying
) distributed by (id);

测试代码

测试环境使用的主要代码如下,比较了基于 PreparedStatement 和 CopyManager 两种实现方式写入 80W+行数据的效率。具体获取 PgConnection 的方式,可以自行通过工具类实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package com.clougence.cloudcanal.postgre.pg;

import java.io.StringReader;
import java.sql.PreparedStatement;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
* @author wanshao <344277934@qq.com> create time is 2022/6/28
**/
@Slf4j
public class TestWriter {

private static long curPk = 1;

private static long batchSize = 1024;

private static int roundCount = 10;

@Test
public void testPsBatchInsert() {
String sql = buildSqlTemp();

try (DruidPooledConnection connection = (DruidPooledConnection) PgTestConnector.getTestConnection()) {
Stopwatch stopwatch = Stopwatch.createStarted();
for (int i = 0; i < roundCount; i++) {
doPsBatchExecuteRound(sql, connection);
}
stopwatch.stop();
log.warn("Execute cause " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
} catch (Exception e) {
e.printStackTrace();
}
}

@Test
public void testCopyManager() {
try (DruidPooledConnection connection = (DruidPooledConnection) PgTestConnector.getTestConnection()) {
String copySql = getCopySql("public.ak_sk_list", Lists.newArrayList("id", "access_key", "secret_key", "name", "type", "comment"));
Stopwatch stopwatch = Stopwatch.createStarted();
for (int i = 0; i < roundCount; i++) {
doCopyRound(copySql, connection);
}
stopwatch.stop();
log.warn("Execute copy cause " + stopwatch.elapsed(TimeUnit.MILLISECONDS) + " ms");
} catch (Exception e) {
e.printStackTrace();
}
}

private final String DELIMITER = "|";
private final String ROW_END = "\n";

@SneakyThrows
private void doCopyRound(String copySql, DruidPooledConnection connection) {
StringBuilder dataSb = new StringBuilder();
for (int i = 0; i < batchSize; i++) {
dataSb.append(curPk + i).append(DELIMITER);
dataSb.append("accc").append(DELIMITER);
dataSb.append("ssss").append(DELIMITER);
dataSb.append("aaa").append(DELIMITER);
dataSb.append("1").append(DELIMITER);
dataSb.append("");
dataSb.append(ROW_END);
}
String data = dataSb.toString();

try (StringReader stringReader = new StringReader(data);) {

CopyManager copyManager = new CopyManager((BaseConnection) connection.getConnection());
copyManager.copyIn(copySql, stringReader);
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally {
curPk += batchSize;
}

}

private String getCopySql(String tableName, List<String> columnList) {
StringBuilder sb = new StringBuilder().append("COPY ")
.append(tableName)
.append(" (")
.append(StringUtils.join(columnList, ","))
// see '' as database null
.append(") FROM STDIN WITH DELIMITER '|' NULL '' CSV QUOTE '\"' ESCAPE E'\\\\';");
String sql = sb.toString();
return sql;
}

private String buildSqlTemp() {
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO pipi_test.public.ak_sk_list (id, access_key, secret_key, name, type, comment) VALUES (?,?,?,?,?,?);");
return sb.toString();
}

@SneakyThrows
private void doPsBatchExecuteRound(String sql, DruidPooledConnection connection) {
try (PreparedStatement ps = connection.prepareStatement(sql)) {
for (int i = 0; i < batchSize; i++) {
ps.setLong( 1, curPk + i);
ps.setString( 2, "222");
ps.setString( 3, "333");
ps.setString( 4, "4444");
ps.setInt( 5, 1);
ps.setString( 6, "666");
ps.addBatch();
}
ps.executeBatch();

} catch (Exception e) {
e.printStackTrace();
throw e;
} finally {
curPk += batchSize;
}
}

}

测试结果

可以看到,如果直接单纯的 INSERT 写入,性能差距会在百倍以上。

写入方式 写入行数 执行耗时
PreparedStatement Batch Insert 10240 行 47910ms
CopyManager 819200 行 3387ms

迁移同步中的应用

高性能的写入在数据迁移、同步中均有重要的应用价值。CopyManager 对于 pg/gp 的写入性能优化具有重要意义。不过对于增量数据的写入,还需要做一些 SQL 合并的工作,避免 pk 约束冲突导致同步中断。总的来说,基于 copyManager 的写入方式在数据迁移同步中应用具备如下优势:

  • 更极致的性能
  • 避免约束冲突导致的性能劣化