1. 介绍

EventStore是EventSink的下游组件。Sink组件处理解析数据完毕后则将数据交给store处理。

在Sink组件的实现类EntryEventSink的成员变量中已经包含了了EventStore对象:

在SINK阶段执行sinkData方法时调用的doSink方法中进行了将数据提交到store的操作。

至此,我们已经知道在代码上,eventStore是怎么和eventSink联系在一起的。

2. 源码解读

2.1 基本结构

2.2 model包

该包下主要定义一些数据模型,主要分为:

  1. batch: 批处理模型,可以获取大小信息 2.Event/Events: store存放对象的数据模型定义

2.3 CanalStoreConstants,CanalStoreException、StoreInfo和CanalStoreScavenge

这些类主要做一些异常控制、定义一些值对象,以及做一些清理工作

2.4 CanalEventStore(核心接口)

该接口中提供的接口方法可以完整表明Store需要完成的核心功能:

2.5 MemoryEventStoreWithBuffer

该类是实现Store在内存的实现类。

Store组件完整的最主要的功能就是提供server和client之间的订阅消费服务。store组件的核心方法是:
1.Put : Sink模块进行数据存储的最后一次写入位置
2.Get : 数据订阅获取的最后一次提取位置
3.Ack : 数据消费成功的最后一次消费位置

采用唤醒缓冲区来设计store。ringbuffer其实是个蛮重要的东西,其概念来源于Disruptor。这个后续我写一篇文章专门说说。其主要优点是互斥访问(只有一个GET指针),然后访问效率高,CPU友好(基于数组,例如可以设定2^n的大小)

canal允许get/ack异步处理,比如可以连续调用get多次,后续异步按顺序提交ack/rollback,项目中称之为流式api.

流式API好处:

  1. get/ack异步化,减少因ack带来的网络延迟和操作成本 (99%的状态都是处于正常状态,异常的rollback属于个别情况,没必要为个别的case牺牲整个性能)
  2. get获取数据后,业务消费存在瓶颈或者需要多进程/多线程消费时,可以不停的轮询get数据,不停的往后发送任务,提高并行化. (作者在实际业务中的一个case:业务数据消费需要跨中美网络,所以一次操作基本在200ms以上,为了减少延迟,所以需要实施并行化)


执行过程:

  • 每次get操作都会在meta中产生一个mark,mark标记会递增,保证运行过程中mark的唯一性
  • 每次的get操作,都会在上一次的mark操作记录的cursor继续往后取,如果mark不存在,则在last ack cursor继续往后取
  • 进行ack时,需要按照mark的顺序进行数序ack,不能跳跃ack. ack会删除当前的mark标记,并将对应的mark位置更新为last ack cursor
  • 一旦出现异常情况,客户端可发起rollback情况,重新置位:删除所有的mark, 清理get请求位置,下次请求会从last ack cursor继续往后取

流式api带来的异步响应模型:

3.总结

store的设计主要围绕如何高效的进行get、put;主要使用了异步、并行的思想来达到这种目的。功能上主要完成了数据汇总并且在内存存储的功能。