1.sink模块介绍

1.1 基本结构功能

sink模块也是instance当中的一个重要构成部分。它是parser和store模块中间的一个组件,与parser和store的关系可以用下图表示:

主要功能是:

  1. 数据过滤:支持通配符的过滤模式,表名,字段内容等
  2. 数据路由/分发:解决1:n (1个parser对应多个store的模式)
  3. 数据归并:解决n:1 (多个parser对应1个store)
  4. 数据加工:在进入store之前进行额外的处理,比如join

1.2 parser中如何使用sink

1.2.1 将解析结果写入TransactionBuffer

应该还记得AbstarctEventParser的start方法完成了parser的主要功能。

start的主要过程再回忆下:
start中工作线程的主要流程是:建立和mysql master的连接->启动心跳线程->dump准备->获取位点信息->sinkHandler准备好消费->dump->关闭连接

在AbstarctEventParser当中的start方法中,定义了一个SinkFunction对象(即sinkHandler),该对象包含一个sink方法。sink方法负责将解析完成的transaction信息加入TransactionBuffer对象中(sink方法中调用了EventTransactionBuffer对象的sink方法)。

sink方法定义好了之后在dump的时候才调用(作为一个参数传递个dump方法)。

dump方法在MysqlConnection类中调用(实现了ErosaConnection)

至此,我们已经成功将dump出来的结果加入了TransactionBuffer对象

1.2.2 TransactionBuffer对象将结果交给CanalEventSink对象

解析完成后的内容封装成了Transaction信息,写入了TransactionBuffer对象(通过sinkHandler中的sink方法)。TransactionBuffer对象和CanalEventSink都在成员变量中定义:

1.2.1节我们已经提到过,transactionBuffer对象中可以通过add方法来加入解析完成的event。如果transactionBuffer缓存满了,择需要调用EvenTransactionBuffer类中的flush方法。值得注意的是flush方法则在AbstractEventParser的构造函数中重写了(原本是在EventTransactionBuffer中定义的flush方法):

可以看到,重写的flush方法中调用了consumeTheEventAndProfilingIfNecessary方法。

consumeTheEventAndProfilingIfNecessary方法也在AbstractEventParser当中定义。该方法就是真正去调用了CanalEventSink对象的sink方法,去消费了解析结果(即一个保存CanalEntry的list),将该结果交给EventSink组件。

至此,我们可以愉快的进入到Canal的sink模块,仔细去研究sink的功能和实现了

2.sink模块源码解析

2.1 包结构概览

sink模块的包结构如下图所示:

从包结构上我们可以发现一些问题:

  1. 没有过滤相关的类:按照之前的理解sink组件的一个重要功能是过滤,之所以这里没有过滤相关的操作是作者将过滤的功能单独剥离开来,放到了filter这个模块当中了
  2. 存在DownStreamHandler:在介绍server模块实现时,我们提到过 netty当中的handler。sink对event的处理引用了handler这样的概念。不过和netty的实现还是有区别的。sink中作者现在仅仅写了2个handler,他们做的事情是比较独立的,互相之间不像netty的hanlder一样存在紧密联系。
  3. 有hearbeat相关的event hanlder:这主要是在parser阶段,心跳也作为一个event交给了sink组件,在这里做一下出处理,去除心跳相关的event。我们可以查看AbstarctEventParser当中的以下代码:

2.2 下游处理器(DownStreamHandler)

可以看到作者设计了一个接口CanalEventStreamHandler和AbstractEventDownStreamHandler抽象类。它们的实现都很简单。图下图是相关实现。可以看到加工的时机主要是交给store之前,之后和重试的时候

如果使用者有需要,可以通过继承该抽象类,自由处理解析出来的event

这里的HeartBeatEntryEventHandler类就是继承了该抽象类。在交给下一个组件store时,先去除了心跳事件:

2.3 CanalEventSink,AbstractCanalEventSink和EntryEventSink

这三个类完成了sink合并分发的工作。sink我们也可以称它为canal event的消费者。

2.3.1 CanalEventSink

CanalEventSink: sink模块的顶级接口。主要就2个方法,一个是sink方法,负责提交内容到store组件,另外个方法是interrupt方法用来中断消费,例如解析模块发生了切换,想临时中断当前的merge请求。之前AbstractEventParser当中就有用到该方法

2.3.2 AbstarctCanalEventSink

CanalEventSink实现了AbstractCanalEventSink接口,相比接口多了一些Handler的处理函数。此外,还多了个CanalEventFilter成员变量。前文已经提过,sink将filter的功能单独剥离出来处理了,但是仍然作为sink组件的一部分。

此外可以看到采用handler这种方式,很灵活。

2.3.3 EntryEventSink

该类就是具体的去执行sink功能和filter功能的类了。

这些方法和成员变量主要可以归纳成几个核心功能模块:

  1. 生命周期控制:这个由CanalLifeCycle控制,各个组件模块都差不多。涉及方法:start(),stop()
  2. 过滤:由doFilter去完成,主要是根据table和schema名做一些过滤。具体的过滤方法在filter模块中
  3. sink: 就是处理后交给store组件。在处理时会使用自己定义的hanlder
  4. 其他:事务头尾的处理和过滤选项,获取schema或者table名、无数据等异常情况处理等辅助方法

2.4 group包

之前在parser中,有GroupEventParser是产生多个parser对象。我们简单回顾下,多个parser对象对应着多个instance配置文件,产生独立的解析任务。在sink阶段可以对多个parser结果做一些处理(比如一些业务需要多库结果合并)

多个parser可以很自由的按序一个个启动,但是在sink阶段做归并的时候要稍微复杂点。我们看下sink的group包下主要有哪些内容:

其中除了GroupEventSink以外,其他的类和接口主要完成一个归并的判断。

这里需要再提一下数据路由和数据归并的问题:
数据路由/分发:为了合理利用数据库资源,业务会按照schema进行隔离。然后在mysql上层或者dao这一层面上,进行一个数据源路由,屏蔽数据库物理位置对开发的影响,阿里系主要是通过cobar/tddl来解决数据源路由问题。所以,一般一个数据库实例上,会部署多个schema,每个schema会有由1个或者多个业务方关注。也就是说每个schema(由一个parser会去解析),然后交给多个store(因为一个schema就会有多个业务方关注)。所以这里的1:n指的是1个schema会有多个业务方关注,sink只是负责将1个schema的解析结果提供给多个store消费。所以称1:n,一份parser解析的结果交个sink分发给多个store。

数据归并:当一个业务的数据规模达到一定的量级后,必然会涉及到水平拆分和垂直拆分的问题,针对这些拆分的数据需要处理时,就需要链接多个store进行处理,消费的位点就会变成多份,而且数据消费的进度无法得到尽可能有序的保证。所以,在一定业务场景下,需要将拆分后的增量数据进行归并处理,比如按照时间戳/全局id进行排序归并。可见,在分表分库时,对多个parser解析出来的结果需要通过sink来完成归并的操作,然后才能作为完整的一个解析结果提供给store。这也就是n:1的业务场景了。

2.4.1 GroupBarrier

取名Barrier意思是把符合合并条件的数据进行合并处理(围起来)。

该接口下主要设计了4个方法:

2.4.2 TimelineBarrier

完成功能:按照时间戳进行排序和归并的操作。归并时不划分事务边界,也就是说无法在后续过程中识别事务。由于这里会造成并发访问,实现上采用了一些上锁的并发控制手段。

实现思路:
假设有n个队列(对应n个parser),然后每个队列提交一个timestamp。这些timestamp提交的时候是按照顺序递增的(一般情况下都是这样)。最小的时间戳设定为threshold。timestamp由一个阻塞的优先级队列控制。如果提交的时间戳个数没有达到和sink的groupsize相等,就会阻塞等待。直到满足后,在按照优先级从优先队列中取优先级较高(时间戳小的)的唤醒。唤醒后记得要更新threshold的值,使得后续的时间戳可以正确加入队列。

注意:
1.只有当多个队列都提交了一个timestamp,缺少任何一个提交,都会阻塞其他队列通过。(解决当一个库启动过慢或者发生主备切换时出现延迟等问题)
2.存在假定:
存在一个假定,认为提交的timestamp是一个顺序递增,但是在两种case下会出现时间回退
a. 大事务时,事务头的时间会晚于事务当中数据的时间,相当于出现一个时间回退(mysql本身造成的现象)
b. 出现主备切换,从备机上发过来的数据会回退几秒钟

  1. TimelineBarrie进行归并的粒度控制在Event级别,而不是事务级别(事务则是List对象)。按照Event抵达的时间戳进行合并。Event的个数由group size控制,到达组大小了则写给store。

也正是由于存在时间回退,所以第一个队列提交的时间戳,不一定是最小的。这时候就需要做一些判断,在队列提交时间戳的时候更新最小的时间戳作为threshold。

2.4.3 TimelineTransactionBarrier

归并时保留事务信息(事务在这里指的就是多个DML操作的集合,其他DDL和DCL属于非事务), DDL和DCL做为普通的event事务归并写入store;DML的集合通过event.type为BEGIN或者END来区分事务边界,保留事务结构归并写入store。注意和TimelineBarrier的区别。

实现思路:设置一个事务状态标识分别标识初始状态、事务处理中和非事务处理中三种状态。通过状态标识可以清楚当前是否在处理事务数据。如果在处理事务数据了(发现BEGIN类型EVENT)则允许数据通过,进行事务处理。如果处理非事务数据,则不允许DML数据经过,只允许DDL和DCL通过。

2.4.3 GroupEventSink

其实继承了EntryEventSink。在改方法中主要就是在doSink方法中调用了相关的Barrier类来对多个源产生的解析结果进行归并。当设置了filterTransactionEntry为true则采用TimelineBarrier,按照event的单位进行有序化的sink(无法从解析结果上看出事务的结构),否则的话,采用TimelineTransactionBarrier,则会把事务结构保留下来和DCL和DDL等event一起写入store。

  1. 总结

sink模块重要概念总结如下:

1.TimelineBarrier类功能:在event粒度级别对多个队列产生的结果进行结果合并,每一轮从各个队列取一个event
2.TimelineTransactionBarrier: DDL和DCL非事务的操作归并后写入store;事务则将对应的DML event集合归并写入store。注意和TimelineBarrier的区别
3.GroupEventSink: 如果设定了过滤事务头尾信息,则采用event级别的粒度进行结果合并。需要将事务区分开来,则使用TimelineTransactionBarrier
4.事务的概念:一定要明白事务是一组DML操作的集合,使用event type为"BEGIN"和"END"来区分DML操作集合的边界