1. 介绍

netty4的基本原理这里不再赘述。感兴趣的可以从CSDN下载netty in action的内容学习一下基础原理。

本节主要讨论InboundHandler和OutboundHandler的理解。

2. 术语回顾

2.1 基本术语

Channel和ChannelPipeline: 一个Channel对应一个ChannelPipeline。Channel代表一个传输通道,ChannelPipeline是ChannelHandler的容器
ChannelHandler: 用于对数据进行处理,其子类细分为ChannelInboundHandler和ChannelOutboundhandler

2.2 channel pipeline的结构

一个ChannelPipeline的结构如下:

网上很多对ChannelPipeline的结构都按照如下来描述,但是这个图其实画的不是很好。有一些重要的细节被掩盖了。这些重要细节如下:

2.3 pipeline只有一个头一个尾

一个ChannelPipeline只有一个头一个尾,上图左侧是头,右侧是尾。ChannelPipeline.addLast()就是往尾部加。

2.4 入站和出站事件流都会经过pipeline所有的Channel Handler

上图中把InboundChannelHandler和OutboundChannelHandler在一个pipeline里面分成了两行,代表收包和发包的事件流分开处理。这样确实没错,但是容易造成误解。实际上不管InboundChannelHandler还是OutboundChannelHandler本质上都属于ChannelHandler。所有ChannelHandler从pipeline的头部往尾部进行排列。入站的时候也会经过OutboundChannelHandler,只不过略过了这个ChannelHandler。同理出站的时候也会经过InboundChannelHandler,只不过忽略了。这个从源码的AbstractChannelHandlerContext中可以看到,有方法来判断是inboundHandler还是outboundHandler:

2.5 ChannelPipeline中一切皆为ChannelHandler

Channelhandler又分为入站和出站。也就是pipeline中的handler不是inbound handler就是outboundhandler。只有这两种handler及其子类才能被加入pipeline。这个很重要,我们使用的编码器解码器之所以能加入pipeline也是因为他们本质上是一个channel handler

3 handler 执行顺序

了解以上内容后,关于执行顺序只要记住:

  1. 入站事件流从头到尾执行所有的handler(当然会略过那些outbound handler)
  2. 出站事件流从尾到头执行所有的handler

3.1 关于duplexHandler

有人可能要问了,duplex handler到底属于出站还是入站?比如常见的LoggingHandler就是一个duplex handler。duplex handler的类定义如下:

public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {
// more code...
}

所以duplex handler被看成是inbound handler还是outbound handler关键取决于你是调用了ChannelInboundHandlerAdapter里面的方法还是ChannelOutboundHandler里面的方法。例如你调用了channelRead()那么就把它看成ChannelInboundHandlerAdapter,例如调用了write()方法,就看成是ChannelOutboundHandler

3.2 入站和出站事件流分界点

分界点自然就是在尾部。当执行完毕最后一个入站的handler之后就开始出站事件流了。这时候往回执行所有的handler。

4. 实例

这里说两个例子来理解执行顺序。

4.1 例1

例1用的LengthFieldBasedFrameDecoder这个类里面注释的example

    @Test
    public  void testLengthFieldBasedFrameDecoder() {
        ByteBuf in = Unpooled.buffer(16);
        in.writeByte(0xCA);
        in.writeShort(0x0010);
        in.writeByte(0xFE);
        in.writeBytes("HELLO, WORLD".getBytes());

        EmbeddedChannel channel = new EmbeddedChannel();
        channel.pipeline().addLast(new LoggingHandler(LogLevel.INFO));    //log handler1
        channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 1, 2, -3, 3));
        channel.pipeline().addLast(new LoggingHandler(LogLevel.INFO));  //log handler2
        channel.writeInbound(in);

    }

执行结果如下:

分下下执行顺序:
入站: log handler1->LengthFieldBasedFrameDecoder->log handler2

出站: 无出站的handler

4.2 例2

例2是我自己设计的一个测试。。一开始也困惑了我很久。

    @Test
    public void testProtobufDecoder(){


        //创建嵌入式Channel用于发送encode之后的数据
        EmbeddedChannel ch1 = new EmbeddedChannel();


        //准备一个需要Decode 的protobuf类的实例用于解码
        CanalModelPacket.CanalModelMessage msg = ProtobufUtils.getDefaultCanalModeMessage();


        //OutboundHandler子类放前面,放在InboundHandler子类之后的话,根据往回逆序调用的话,一个都OutboundHandler都不会调用的
        ch1.pipeline().addLast(new LoggingHandler(LogLevel.INFO));  //log handler1:一开始作为InboundHandler,会使用channelRead方法;之后会作为OutboundHandler再执行一遍,打印encoder之后的消息(write方法)并且flush
        ch1.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
        ch1.pipeline().addLast(new ProtobufEncoder());

        ch1.pipeline().addLast(new LoggingHandler(LogLevel.INFO));  //log handler2:一开始作为InboundHandler,会使用channelRead方法;之后会作为OutboundHandler再执行一遍,打印encoder之前的消息并且flush
        ch1.pipeline().addLast(new InboundHandler1());              //InboundHandler1会产生一个protobuf实例,PS:调用InboundHandler之后开始走Outbound的事件流了

        ch1.writeInbound("Give a example message to avtive the channel...");

    }

InboundHandler类

package netty;

import com.alibaba.otter.canal.instance.manager.protocol.CanalModelPacket;
import com.alibaba.otter.canal.instance.manager.protocol.ProtobufUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @author Wan Kaiming on 2017/1/4
 * @version 1.0
 */
public class InboundHandler1 extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        System.out.println("Reading msg in InboundHandler1");

        //读完之后初始化一个protobuf 实例对象交给OutboundHandler
        CanalModelPacket.CanalModelMessage canalModelMessage = ProtobufUtils.getDefaultCanalModeMessage();
        System.out.println("Sending  canalModelMessage to Encoder...");
        ctx.write(canalModelMessage);
        ctx.flush();


    }
}

执行结果为:

2017-01-05 13:24:35.681 [main] INFO  io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] RECEIVED: Give a example message to avtive the channel...
2017-01-05 13:24:35.684 [main] INFO  io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] RECEIVED: Give a example message to avtive the channel...
Reading msg in InboundHandler1
Sending  canalModelMessage to Encoder...
2017-01-05 13:24:35.796 [main] INFO  io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: id: 1
name: "example"
canalParameterMessage {
  basicConfig {
    canalId: 1
  }
  zkConfig {
    zkClusterId: 1
    zkClusters: "10.45.10.33:2181/kaiming-canal"
  }
  storageConfig {
    transactionSize: 1024
    storageBatchMode: MEMSIZE
    memoryStorageBufferSize: 16384
    memoryStorageBufferMemUnit: 1024
    storageScavengeMode: ON_ACK
  }
  replicationConfig {
  }
  netConfig {
    port: 11111
    defaultConnectionTimeoutInSeconds: 30
    receiveBufferSize: 65536
    sendBufferSize: 65536
  }
  charsetConfig {
    connectionCharsetNumber: "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
    connectionCharset: "UTF-8"
  }
  dbConfig {
    dbAddresses {
      IPAddress: "10.45.10.33"
      port: 3306
    }
    dbUsername: "canal"
    dbPassword: "canal"
  }
  binlogConnectConfig {
    fallbackIntervalInSeconds: 60
  }
  heartBeatConfig {
    detectingEnable: true
    detectingIntervalInSeconds: 3
    detectingTimeoutThresholdInSeconds: 30
    detectingRetryTimes: 3
  }
  ddlSupportConfig {
  }
}

2017-01-05 13:24:35.905 [main] INFO  io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: 183B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| b5 01 08 01 12 07 65 78 61 6d 70 6c 65 2a a7 01 |......example*..|
|00000010| 0a 02 08 01 12 22 08 01 12 1e 31 30 2e 34 35 2e |....."....10.45.|
|00000020| 31 30 2e 33 33 3a 32 31 38 31 2f 6b 61 69 6d 69 |10.33:2181/kaimi|
|00000030| 6e 67 2d 63 61 6e 61 6c 1a 0e 08 80 08 18 01 20 |ng-canal....... |
|00000040| 80 80 01 28 80 08 50 01 22 00 2a 0d 08 e7 56 10 |...(..P.".*...V.|
|00000050| 1e 18 80 80 04 20 80 80 04 32 2a 0a 21 00 00 00 |..... ...2*.!...|
|00000060| 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|
|00000070| 00 00 00 00 00 00 00 00 00 00 00 00 00 00 12 05 |................|
|00000080| 55 54 46 2d 38 3a 20 0a 10 0a 0b 31 30 2e 34 35 |UTF-8: ....10.45|
|00000090| 2e 31 30 2e 33 33 10 ea 19 1a 05 63 61 6e 61 6c |.10.33.....canal|
|000000a0| 22 05 63 61 6e 61 6c 42 02 28 3c 4a 08 08 01 20 |".canalB.(<J... |
|000000b0| 03 28 1e 30 03 6a 00                            |.(.0.j.         |
+--------+-------------------------------------------------+----------------+
2017-01-05 13:24:35.906 [main] INFO  io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] FLUSH
2017-01-05 13:24:35.906 [main] INFO  io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] FLUSH

执行顺序分析:

入站:log handler1(作为inbound)->log handler2-> inboundHandler1(作为inbound)
出站:log handler2(作为outbound) -> ProtobufEncoder->ProtobufVarint32LengthFieldPrepender->log handler1(作为outbound)

至于这里log handler什么时候是inbound什么时候是outbound参考3.1节的说明。

以上部分源码可以看我的github: netty测试代码

PS: 这里用到了protobuf,生成了一个 CanalModelMessage实例,大家可以自己生成。