1.基本介绍和基础知识回顾

为了更好的理解这篇文章,建议先“仔细查看学习"官方文档的所有资料。快捷入口:canal官方资料

我们再简单回顾下canal以及canal-server相关的原理

1.1 canal的定位和功能

canal定位: 基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql
典型的增量数据消费方式比如做分布式数据库的数据同步

1.2 canal-server的基本结构

canal采用netty实现了一个分布式的CS架构,client也被称作实例。在canal中有且仅有一个server,但是可以有多个client(或者称作instance),其基本结构如下:

注意:本节主要讲解server,因此instance当中的具体组件可以先不用关心

1.3 server与client的交互协议

canal的server模块主要职责:

  1. 实现server和client之间的通信协议 2.管理server的生命周期 3.异常处理

canal和server的主要交互过程如下图所示:

主要过程是以下几步:

  1. 连接:握手协议然后进行权限验证
  2. 订阅:订阅服务端的日志解析结果
  3. 消费:传递message,获取订阅的消息
  4. 消费结束:该阶段进行ack或者rollback(消费失败就回滚)
  5. 结束订阅
  6. 关闭连接

官方get/ack/rollback协议介绍

2. canal-server源码解析

canal-server模块的主要包如下所示,为了整体上对这些类和接口有个认识,我们需要回顾一些Netty相关的知识

2.1 netty知识回顾

netty是什么?我想官网的解释应该是比较权威的了:
Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.

一句话来说就是:Netty是一个实现网络应用协议服务器的框架,方便你快速实现自己的服务器和CS交互协议。

2.1.1 netty主要部件

为了看懂canal-server的源码,只需要掌握的netty的核心知识即可,即了解netty的几个主要部件。

这些主要部件可以用一句话来概括:
ChannelEvents are processed by ChannelHandlers in a ChannelPipeline of a Channel

简单来说就是:client 传递过来的消息会有个Channel对象接收,然后产生一个Channel事件,并发送到ChannelPipeline。ChannelPipeline会选择一个ChannelHandler进行处理。这个ChannelHandler处理之后,可能会产生新的ChannelEvent,并流转到下一个ChannelHandler。

这里强力推荐看下官方文档关于类ChannelPipeline的介绍,源码中很多netty知识点与这个息息相关

2.1.2 channelPipeline流程

Netty的ChannelPipeline包含两条线路:Upstream和Downstream。Upstream对应上行,接收到的消息、被动的状态改变,都属于Upstream。Downstream则对应下行,发送的消息、主动的状态改变,都属于Downstream。

基本过程如下图:

downstream handler处理完毕后写入socket 提供给client访问,在canal-server中只用到了Downstream Handler

2.2 canal-server类功能浅析

2.2.1 CanaLifeCycle

  1. CanalLifeCycle接口 Canal的组件(包括service和instance)的生命周期全部由CanalLifeCycle管理(具体如图所示),服务器的生命周期可以自己扩展实现该接口。

顶级接口是在 canal.common包中

抽象类对顶级接口做一个基本实现

CanalServerWithNetty继承了这个生命周期基本实现的抽象类

2.2.2 CanalServerWithNetty

该类继承了抽象的生命周期类,和一个CanalServer接口。CanalServer接口的代码如下

聪明的你肯定纳闷了,怎么这里也有启动停止的方法。我们发现CanalServer里面的方法是包权限的,而AbstractCanalLifeCycle抽象类是公有的。因此他们职责不同,一个负责在包内实现接口使用,另外一个负责对外继承使用。

下面来重头戏了,让我们仔细分析下这个CanalServerWithNetty到底做了什么事情。这个类是这个server包的顶层的第一个具体实现类(更上面都是接口和抽象类),掌握了这个类基本上也知道server都在做神马事情了。

CanalServerWithNetty基本实现一览:

成员变量:

都很好理解对不对?

  1. 服务器要有IP和端口

  2. 总要有个服务端的 channel接受client 的消息

  3. IP和端口需要绑定服务端接受消息的Channel对象(这个就依靠ServerBootstrap这个辅助类了,他父类是Bootstrap专门为Channel、pipeline等提供一些工厂方法等辅助功能)

  4. 还记得我们前面的CS交互协议的图吗?这些ack,rollback等具体协议过程在CanalServerWithEmbedded类中和Sessionhandler当中都有进行实现

构造函数

注意点和学习点:

  1. 使用静态内部类来实现单例模式,保证只有一个CanalServerWithNetty
  2. 私有构造器,保证对外只提供静态方法或者变量的访问
  3. 对外只能通过instance方法来获取server实例

start方法

注释已经写得很清楚,随便提几点注意点吧:

  1. 服务器 channel和IP和端口绑定后才是真的启动了服务
  2. 权限验证和会话这些ChannelHandler需要依赖embeddedServer中的具体协议实现
    //启动服务器的方法
    public void start() {
        super.start();      //CanalServer的生命周期也是由CanalLifeCircle来控制的,需要启动它,状态改为running

        if (!embeddedServer.isStart()) {
            embeddedServer.start();     //启动自己实现的CS交互协议服务
        }


        //面向连接的ServerBootstrap类,用于接入channel。channel采用支持NIO的工厂来创建,采用newCachedThreadPool来管理
        this.bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
            Executors.newCachedThreadPool()));

        // 构造对应的pipeline,pipeline即管理channelHandler的容器,所有channelHandler都注册到channelPipeline
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {

            public ChannelPipeline getPipeline() throws Exception {
                //初始化channels管道对象
                ChannelPipeline pipelines = Channels.pipeline();

                //下面的几个addLast方法就是完成向pipeline注册各个channelHandler


                //1.在管道注册一个解码器(理解为特殊的handler吧),解码器会将缓存里面的东西提取出来转化成java对象
                pipelines.addLast(FixedHeaderFrameDecoder.class.getName(), new FixedHeaderFrameDecoder());
                //2.封装下握手阶段的canal协议包,并传递给响应的channel
                pipelines.addLast(HandshakeInitializationHandler.class.getName(), new HandshakeInitializationHandler());
                //3. 在管道上注册权限控制的Handler,所谓权限控制也就是看有没有订阅而已=。=,干嘛说那么抽象
                pipelines.addLast(ClientAuthenticationHandler.class.getName(),
                    new ClientAuthenticationHandler(embeddedServer));

                //会话handler主要是处理CS交互的会话,会话需要的ack,get等方法可以从embeddedServer当中获取
                SessionHandler sessionHandler = new SessionHandler(embeddedServer);
                pipelines.addLast(SessionHandler.class.getName(), sessionHandler);
                return pipelines;
            }
        });




        // 启动,之前仅仅只是完成了pipeline和各个handler的绑定,还没正式启动,将服务器channel和IP端口绑定后才算真正的启动了
        if (StringUtils.isNotEmpty(ip)) {
            this.serverChannel = bootstrap.bind(new InetSocketAddress(this.ip, this.port));
        } else {
            this.serverChannel = bootstrap.bind(new InetSocketAddress(this.port));
        }
    }

Handlers方法
总共4个handler方法,主要功能已经清楚。这里不再赘述,自己看可以看得懂。这里仅仅提供几个注意点:

  1. 每个Handler都继承了一个SimpleHandler,这个 SimpleHandler分别实现了上游和下游处理器,意味着既可以接受消息,也可以给别的handler发送消息

  2. 在握手协议和权限验证两个handler处理完毕后,ClientAuthenticationHandler会删除这些没用的handler,然后引入一个IdleStateHandler来控制超时。这个超时Handler是利用NETTY自带的工具类hashedWheelTimer来完成的,这个对象在NettyUtils类中

  3. NettyUtils中封装了一些在Handler具体实现中需要用到的工具

  4. 第一个注册到管道的是一个解码器负责把缓存的内容以int为单位转换成java对象读取出来,具体可以看看文档关于ReplayingDecoder的介绍

NettyUtils类
这个类也不贴代码了,主要讲下其完成的功能,大家可以对照下代码看

  1. 写入channel缓存的write方法,这个在HandshakeInitializationHandler中会用到
  2. ack,error方法中会用到write方法,即组装相应的canal packet写入channel缓存

至此对服务端的源码已经有了基本认识,本节至此对handler的具体实现、CanalServerWithEmbedded的具体实现说的还是比较粗糙,后续会完善

参考资料:
http://ifeve.com/channel-pipeline/
http://blog.csdn.net/yanlinwang/article/details/18352299
http://netty.io/