1.Parser介绍

在binlog日志解析开源工具canal源码浅析(2):instance模块一文中我们介绍了instance模块的作用。在启动一个instance之后,为了能够去解析mysql的binlog日志,我们首先要启动的核心组件就是EventParser了。

1.1 EventParser的功能定位

为了更好的理解Parser的作用,我们首先需要回顾下EventParser的定位:
instance中的核心组件
sink的上级组件,主要完成一些协议包的解析、位点管理等

1.2 EventParser与instance的联系

我们已经说过EventParser是Instance构成的核心组件之一。为了更好的理解EventParser和Instance的关系。我们需要回顾Instance源码中相关的内容。

在Instance的AbstractCanalInstance当中的start()方法里面启动了Instance的各个组件,当然也包括EventParser

这里用了两个方法来装饰eventParser的启动。也就是说在eventParser启动和停止之前需要一些额外的操作。

1.2.1 beforeStartEventParser

beforeStartEventParser主要是判断eventParser是GroupEventParser,如果是的话,也就是意味着这个eventParser里面有好多个Parser,他们共同合并成了一个Group

GroupEventParser的代码在Parser包中,代码如下。主要可以看到就是通过一个List对象来保存CanalEventParser对象

里面的方法也很简单,主要是启停控制和GET,SET方法,这个类就是一个EventParser的容器而已

1.2.2 afterStartEventParser

这个方法主要是在启动EventParser之后根据订阅情况做一些事件过滤。设置event过滤器等一些操作源码在Parser的包里

1.2.3 start

在beforeStart和after Start中间的eventParser.start()就是启动我们这个核心组件的语句了。

eventParser在AbstractCanalInstance当中是一个成员变量,类型是CanalEventParser。这是一个接口类型,在Parser包当中定义。至此,我们谈论instance和eventParser的联系已经讲完。根据自顶向下的分析理论,现在我们可以安心走入Parser包的世界了!

2. Parser基本结构

2.1 包结构

parser包的整体包结构如下。该包是canal中最重要的包之一,同时也很复杂。理解该包的源码可以让我们弄明白canal到底是如何实现去解析mysql的一些日志event,如何去伪装成一个mysql-slave去解析mysql-master的binlog日志。

之前也提过,mysql5.7的 I/O dump threads支持并发了, 如果以后需要改造canal提升其解析效率,弄明白这个包是十分需要的。

2.2 类设计

类图设计如下。我们从上到下来简单理解一番:

  1. CanalLifeCycle来控制整个组件的生命周期
  2. 位点管理和HA控制器接口与CanalEventParser对象(自然包括其子类)是一种关联关系,也就是说位点管理和HA控制器以成员变量的身份在CanalEventParser对象中出现
  3. CanalEventParser只是一个顶级接口,根据功能的细化不断衍生出更多子类。例如路线CanalEventParser->AbstractEventParser->AbstractMysqlEventParser->MysqlEventParser

2.3 EventParser基本设计

大致过程:

整个parser过程大致可分为几步:

  1. Connection获取上一次解析成功的位置 (如果第一次启动,则获取初始指定的位置或者是当前数据库的binlog位点)

  2. Connection建立链接,发送BINLOG_DUMP指令(BINLOG_DUMP指令介绍可以参考:binlog_dump指令官方介绍)
    // 0. write command number
    // 1. write 4 bytes bin-log position to start at
    // 2. write 2 bytes bin-log flags
    // 3. write 4 bytes server id of the slave
    // 4. write bin-log file name

3.Mysql开始推送Binay Log

4.接收到的Binaly Log的通过Binlog parser进行协议解析,补充一些特定信息
// 补充字段名字,字段类型,主键信息,unsigned类型处理

5.传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功
6.存储成功后,定时记录Binaly Log位置

mysql的Binlay Log网络协议:

说明:
图中的协议4byte header,主要是描述整个binlog网络包的length
binlog event structure,详细信息请参考:event structure官方资料

2.4 Binlog-Event

mysql数据库的操作会记录到binlog日志,然后触发一个Binlog Event使得其他slave可以根据event的内容获取binlog信息,详情可以看官方介绍:binlog event官方介绍

3. AbstractEventParser源码分析

之所以采用了这个抽象类是希望最大化共用mysql/oracle的实现。在阿里内部,canal除了支持mysql日志增量消费还支持oracle日志的增量消费(阿里开源的yugong就支持Oracle的增量解析和同步)。

在AbstractEventParser当中有很多成员变量和方法,但是只要掌握主要的几个即可以理解该类的功能。

3.1 成员变量

成员变量的定义主要分以下几类:

  • 初始化核心组件,主要包括位点管理器CanalLogPositionManager、事件过滤器CanalEventFilter和CanalEventSink、报警器、BinlogParser

这里可以看到在mysql event解析之后是要首先进行过滤,然后再分发的。

  • 统计参数 一些原子变量,主要用于记录一些统计信息,例如解析的事件数,收到的事件数等。可以看到这里的一些统计变量都采用了原子变量来保证线程安全性,这是因为在解析时可能有多个parser来解析Mysql binary log event.

  • 其他 主要涉及一些事件缓存类EventTransactionBuffer和心跳控制相关的成员变量

3.2 start方法解析(重要!binlogParser涉及如何解析mysql binlog event)

最重要的成员方法就是start()方法,其他方法都是围绕该方法而产生的。start方法说明了如何初始化一个EventParser及其相关的所有组件。这里我们只简单说下start()方法做了什么事情。其源码如下:
start做的事情可以总结如下:

  1. 初始化event的缓冲队列,保存的是CanalEntry对象(保存在一个List对象,可以添加CanalEntry对象)。CanalEntry对象是canal封装的一个packet,里面保存mysql event等重要信息。EventTransactionBuffer则作为CanalEntry的缓冲队列,可以往其中添加CanalEntry.entry
  2. 初始化binlogParser组件:binlogParser只是一个接口,它的创建需要通过buildParser方法。buildParser方法的实现在AbstractMysqlEventParser中实现,当然如果能支持oracle,也肯定会在AbstarctOracleEventParser实现一个builderParser方法来创建binlogParser对象。binlogParser的构建主要是通过LogEventConvert这个类(该类实现了BinlogParser)完成解析event的功能。顾名思义,该类是负责将mysql传递过来的binlog-event封装成一个Entry对象。Entry对象相比binlog-event多了很多元数据信息,例如我们在canal配置文件配置的一些过滤信息等。在LogEventConvert当中,会完成对mysql binlog event的解析。
  3. 启动一个工作线程来完成“建立和mysql master的连接->启动心跳线程->dump准备->获取位点信息->sinkHandler准备好消费->dump->关闭连接”这一系列操作
start
public void start() {
    super.start();
    MDC.put("destination", destination);
    // 配置transaction buffer
    // 初始化缓冲队列
    transactionBuffer.setBufferSize(transactionSize);// 设置buffer大小
    transactionBuffer.start();
    // 构造bin log parser
    binlogParser = buildParser();// 初始化一下BinLogParser
    binlogParser.start();
    // 启动工作线程
    parseThread = new Thread(new Runnable() {

        public void run() {
            MDC.put("destination", String.valueOf(destination));
            ErosaConnection erosaConnection = null;
            while (running) {
                try {

                    // 开始执行replication
                    // 1. 构造Erosa连接
                    erosaConnection = buildErosaConnection();

                    // 2. 启动一个心跳线程
                    startHeartBeat(erosaConnection);

                    // 3. 执行dump前的准备工作
                    preDump(erosaConnection);

                    erosaConnection.connect();// 链接
                    // 4. 获取最后的位置信息
                    final EntryPosition startPosition = findStartPosition(erosaConnection);
                    if (startPosition == null) {
                        throw new CanalParseException("can't find start position for " + destination);
                    }
                    logger.info("find start position : {}", startPosition.toString());
                    // 重新链接,因为在找position过程中可能有状态,需要断开后重建
                    erosaConnection.reconnect();

                    final SinkFunction sinkHandler = new SinkFunction<EVENT>() {

                        private LogPosition lastPosition;

                        public boolean sink(EVENT event) {
                            try {
                                CanalEntry.Entry entry = parseAndProfilingIfNecessary(event);

                                if (!running) {
                                    return false;
                                }

                                if (entry != null) {
                                    exception = null; // 有正常数据流过,清空exception
                                    transactionBuffer.add(entry);
                                    // 记录一下对应的positions
                                    this.lastPosition = buildLastPosition(entry);
                                    // 记录一下最后一次有数据的时间
                                    lastEntryTime = System.currentTimeMillis();
                                }
                                return running;
                            } catch (TableIdNotFoundException e) {
                                throw e;
                            } catch (Exception e) {
                                // 记录一下,出错的位点信息
                                processError(e,
                                    this.lastPosition,
                                    startPosition.getJournalName(),
                                    startPosition.getPosition());
                                throw new CanalParseException(e); // 继续抛出异常,让上层统一感知
                            }
                        }

                    };

                    // 4. 开始dump数据
                    if (StringUtils.isEmpty(startPosition.getJournalName()) && startPosition.getTimestamp() != null) {
                        erosaConnection.dump(startPosition.getTimestamp(), sinkHandler);
                    } else {
                        erosaConnection.dump(startPosition.getJournalName(),
                            startPosition.getPosition(),
                            sinkHandler);
                    }

                } catch (TableIdNotFoundException e) {
                    exception = e;
                    // 特殊处理TableIdNotFound异常,出现这样的异常,一种可能就是起始的position是一个事务当中,导致tablemap
                    // Event时间没解析过
                    needTransactionPosition.compareAndSet(false, true);
                    logger.error(String.format("dump address %s has an error, retrying. caused by ",
                        runningInfo.getAddress().toString()), e);
                } catch (Throwable e) {
                    exception = e;
                    if (!running) {
                        if (!(e instanceof java.nio.channels.ClosedByInterruptException || e.getCause() instanceof java.nio.channels.ClosedByInterruptException)) {
                            throw new CanalParseException(String.format("dump address %s has an error, retrying. ",
                                runningInfo.getAddress().toString()), e);
                        }
                    } else {
                        logger.error(String.format("dump address %s has an error, retrying. caused by ",
                            runningInfo.getAddress().toString()), e);
                        sendAlarm(destination, ExceptionUtils.getFullStackTrace(e));
                    }
                } finally {
                    // 重新置为中断状态
                    Thread.interrupted();
                    // 关闭一下链接
                    afterDump(erosaConnection);
                    try {
                        if (erosaConnection != null) {
                            erosaConnection.disconnect();
                        }
                    } catch (IOException e1) {
                        if (!running) {
                            throw new CanalParseException(String.format("disconnect address %s has an error, retrying. ",
                                runningInfo.getAddress().toString()),
                                e1);
                        } else {
                            logger.error("disconnect address {} has an error, retrying., caused by ",
                                runningInfo.getAddress().toString(),
                                e1);
                        }
                    }
                }
                // 出异常了,退出sink消费,释放一下状态
                eventSink.interrupt();
                transactionBuffer.reset();// 重置一下缓冲队列,重新记录数据
                binlogParser.reset();// 重新置位

                if (running) {
                    // sleep一段时间再进行重试
                    try {
                        Thread.sleep(10000 + RandomUtils.nextInt(10000));
                    } catch (InterruptedException e) {
                    }
                }
            }
            MDC.remove("destination");
        }
    });

    parseThread.setUncaughtExceptionHandler(handler);
    parseThread.setName(String.format("destination = %s , address = %s , EventParser",
        destination,
        runningInfo == null ? null : runningInfo.getAddress().toString()));
    parseThread.start();
}

3.3 AbstractEventParser工作线程分析

start中工作线程的主要流程是:建立和mysql master的连接->启动心跳线程->dump准备->获取位点信息->sinkHandler准备好消费->dump->关闭连接

  1. 建立和mysql master的连接:这里使用了一个erosaConnection,是一个通用的连接对象,通过buildErosaConnection方法来创建。erosa的build方法根据使用具体的数据库来决定。使用mysql则在buildErosaConnection中调用buildMysqlConnection来创建连接。MysqlConnection则通过AuthenticationInfo来获取要访问的主库的基本信息
  2. 启动心跳线程:首次出现在AbstractEventParser的start方法中,然后调用路线是AbstractEventParser.start()->AbstractEventParser.buildHeartBeatTimeTask()->MysqlEventParser.buildHeartBeatTimeTask()。实现原理是通过TimerTask这个类来定期请求mysql-master执行detectingSQL。detectingSQL的配置可以在canal.properties当中配置
  3. dump准备:binlog format的判断;另外一件重要的事情是建立metaConnection专门用来查询元信息(这个在子类,比如MysqlEventParser当中实现)
  4. 获取位点信息:在protocol模块中实现了EntryPosition用来获取解析的开始位置,在AbstractEventParser中调用findStartPosition方法来获取EntryPosition对象
  5. sinkHandler: parser的下游组件,负责过滤、分发、合并等事宜
  6. dump: MysqlConnection里面有dump方法。通过DirectLogFetcher对象提供的fetch方法获取LogEvent对象。LogFetcher的相关实现在dbsync包中,这里先不展开 7.关闭连接:dump完毕后关闭连接

4. MysqlEventParser

MysqlEventParser继承了AbstractEventParser,主要完成的事情如下:

  1. 对抽象父类中未实现的方法进行实现,例如preDump()和afterDump()方法。在preDump()方法中主要建立metaConnection连接,对connect、binlogParser和binlogFormat做一些类型判断,异常处理。
  2. 对抽象父类中的一些方法进行覆盖,例如心跳的开启。
  3. 主备切换逻辑的实现:主要就是通过心跳检查来判断是否有必要进行切换。线条检查失败则调用eventParser对象里的doSwitch方法

5. 心跳机制与热切换实现

之前我们已经知道在canal的eventPasrser当中我们已经启动了心跳方法,通过TimerTask定时的去执行查询。如果超时就说明和数据库的连接断开了,切换到备库。我们现在来看看代码上涉及HeartBeat的内容。

  1. AbstractEventParser里面的start方法里面的工作线程中启动了startHeartBeat工作线程
  2. startHeartBeat中调用了buildHeartBeatTimeTask
  3. MysqlEventParser中覆盖了buildHeartBeatTimeTask方法,并使用MysqlDetectingTimeTask的run方法来启动心跳探测
  4. 如果探测失败捕获SocketTimeoutException异常,则触发HeartBeatHAController的onFailed方法(多态)。HeartBeatHAController继承了CanalHAController和CanalHAController接口
  5. 控制器HeartBeatHAController去控制parser调用其doSwitch方法来进行具体的切换

6. 表的元数据

表的元数据主要包括表明和字段名。存储在TableMeta对象中。为了保证获取元数据的效率,采用了一个Map对象来保存TableMeta对象。

7. 本地的解析方式

可以直接解析本地的binlog日志。涉及local包下以及LocalBinlogConnection和LocalBinlogEventParser这些类。

8. 位点管理

index下是相关位点管理的类,根据你采用不同的instance模式,则会有不同的位点管理方式。例如采用zookeeper则使用ZooKeeperLogPositionManager。这些Manager主要都是实现了如下两个方法:
getLastestIndexBy:获取保存位点信息的对象LogPosition(在protocol 中定义)
persistLogPosition: 将LogPosition的信息交给下游对象 (例如写入zk)

9. protocol模块

之所以把protocol模块也放到这里来说,是因为protocol里面的代码就是对约定的协议的实现,主要是对协议基本对象的定义。

包结构如下:

包下面代码的功能主要分为3类:

  1. 异常处理:这个基本每个包都有
  2. 位点定义:前面在parser包中我们提到过位点管理器,而基本的位点定义则在这个包中
  3. CanalPacket,Message,CnalEntry的定义:这三者的关系是CanalPacket包含Message,Message包含CanalEntry

对于协议包的实现代码是通过google的开源工具Protobuf然后根据定义好的proto文件自动生成的。

了解canal-server和canal-client之间通信时的协议包的具体定义可以查看CanalProtocol.proto和EntryProtocol.proto的定义。

传送门:
CanalProtocol.proto
EntryProtocol.proto

10. 总结

EventParser模块的内容比较多,本文对主要对EventParser执行解析的步骤做了重点说明(start方法)。其他相关类也做了一些见解。对EventParser做的事情,已经各个类的功能都有了个基本的认识。