数据集成工程实现中mysql jdbc全量扫描的实现

背景

数据集成中全量扫描的实现需要注意以下两点内容:

  • 锁表问题:实现时需要关注事务隔离级别,考虑采用快照读的方式
  • 内存问题:流式读取、分页读取避免一次性捞取过多的数据到内存

解决锁表问题

快照读(一致性读)

优点:快照读不会有锁表问题。关于快照读的理解可以参考mysql glosaary中的consistent read
缺点:需要自己显式设置session的隔离级别到RR,否则当用户隔离级别默认为SERIALIABLE的时候会有锁表问题
参考源码:源码可以参考debezium中lockTablesForSchemaSnapshot的实现。像MySQL源端,debezium可以设置锁表也可以设置不锁表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
protected void lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceContext, RelationalSnapshotContext snapshotContext)
throws SQLException, InterruptedException {
// Set the transaction isolation level to REPEATABLE READ. This is the default, but the default can be changed
// which is why we explicitly set it here.
//
// With REPEATABLE READ, all SELECT queries within the scope of a transaction (which we don't yet have) will read
// from the same MVCC snapshot. Thus each plain (non-locking) SELECT statements within the same transaction are
// consistent also with respect to each other.
//
// See: https://dev.mysql.com/doc/refman/5.7/en/set-transaction.html
// See: https://dev.mysql.com/doc/refman/5.7/en/innodb-transaction-isolation-levels.html
// See: https://dev.mysql.com/doc/refman/5.7/en/innodb-consistent-read.html
connection.connection().setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
connection.executeWithoutCommitting("SET SESSION lock_wait_timeout=" + connectorConfig.snapshotLockTimeout().getSeconds());
try {
connection.executeWithoutCommitting("SET SESSION innodb_lock_wait_timeout=" + connectorConfig.snapshotLockTimeout().getSeconds());
}
catch (SQLException e) {
LOGGER.warn("Unable to set innodb_lock_wait_timeout", e);
}

// ------------------------------------
// LOCK TABLES
// ------------------------------------
// Obtain read lock on all tables. This statement closes all open tables and locks all tables
// for all databases with a global read lock, and it prevents ALL updates while we have this lock.
// It also ensures that everything we do while we have this lock will be consistent.
if (connectorConfig.getSnapshotLockingMode().usesLocking() && connectorConfig.useGlobalLock()) {
try {
globalLock();
metrics.globalLockAcquired();
}
catch (SQLException e) {
LOGGER.info("Unable to flush and acquire global read lock, will use table read locks after reading table names");
// Continue anyway, since RDS (among others) don't allow setting a global lock
assert !isGloballyLocked();
}
// FLUSH TABLES resets TX and isolation level
connection.executeWithoutCommitting("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ");
}
}

解决内存占用问题

应用层实现分页读

优点:通过select limit的方式分页读取,避免驱动一次性扫描过多的数据,可以避免内存占用问题。
缺点:自己需要控制offset的移动,代码实现复杂度上会高些;对大表scan的时候频繁的分页sql会占用源端数据库大量IOPS,网络RTT增多也会影响整体读取数据的吞吐

利用驱动读取

mysql jdbc api实现文档关于ResultSet中的介绍了JDBC驱动提供的能力。

方式一:驱动单条流式读取

优点:实现简单,可以避免内存占用问题,同时相比分页的读取性能更好。可以理解流式读取是个长连接,没有频繁重建tcp connection的开销。这个实现本质上是个Client端的行为,row-by-row的从socket buffer取数据。
缺点mysql jdbc api实现文档 中也提到了,就是流式读取的话,这个connection必须读取完执行sql的所有数据才可以继续使用,你无法对一个正在流式读取的connection进行复用或者close这个result set。

1
2
3
stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY,
java.sql.ResultSet.CONCUR_READ_ONLY);
stmt.setFetchSize(Integer.MIN_VALUE);

方式二:驱动分页读取

优点:实现简单,通过setFetchSize和useCursorFetch=true即可开启驱动层面分页读。和应用层分页读效果相同,也可以避免内存占用问题。
缺点:和应用层分页读的缺点相同,会占用源端数据库大量IOPS,网络RTT增多也会影响整体读取数据的吞吐

实际工程中的选择

在数据集成领域,还是可以考虑优先采用驱动层面的流式读取。因为实际工程中我们都有记录读取的offfset来确保整个进程退出后重启仍然从指定位置消费,这也就意味着我们没必要一定优雅停止去执行resultset.close等待其全部读完,可以直接退出进程。因此resultSet没法快速close掉并不是一个很严重的问题。

参考资料