1. 介绍
netty4的基本原理这里不再赘述。感兴趣的可以从CSDN下载netty in action的内容学习一下基础原理。
本节主要讨论InboundHandler和OutboundHandler的理解。
2. 术语回顾
2.1 基本术语
Channel和ChannelPipeline: 一个Channel对应一个ChannelPipeline。Channel代表一个传输通道,ChannelPipeline是ChannelHandler的容器
ChannelHandler: 用于对数据进行处理,其子类细分为ChannelInboundHandler和ChannelOutboundhandler
2.2 channel pipeline的结构
一个ChannelPipeline的结构如下:
网上很多对ChannelPipeline的结构都按照如下来描述,但是这个图其实画的不是很好。有一些重要的细节被掩盖了。这些重要细节如下:
2.3 pipeline只有一个头一个尾
一个ChannelPipeline只有一个头一个尾,上图左侧是头,右侧是尾。ChannelPipeline.addLast()就是往尾部加。
2.4 入站和出站事件流都会经过pipeline所有的Channel Handler
上图中把InboundChannelHandler和OutboundChannelHandler在一个pipeline里面分成了两行,代表收包和发包的事件流分开处理。这样确实没错,但是容易造成误解。实际上不管InboundChannelHandler还是OutboundChannelHandler本质上都属于ChannelHandler。所有ChannelHandler从pipeline的头部往尾部进行排列。入站的时候也会经过OutboundChannelHandler,只不过略过了这个ChannelHandler。同理出站的时候也会经过InboundChannelHandler,只不过忽略了。这个从源码的AbstractChannelHandlerContext中可以看到,有方法来判断是inboundHandler还是outboundHandler:
2.5 ChannelPipeline中一切皆为ChannelHandler
Channelhandler又分为入站和出站。也就是pipeline中的handler不是inbound handler就是outboundhandler。只有这两种handler及其子类才能被加入pipeline。这个很重要,我们使用的编码器解码器之所以能加入pipeline也是因为他们本质上是一个channel handler
3 handler 执行顺序
了解以上内容后,关于执行顺序只要记住:
- 入站事件流从头到尾执行所有的handler(当然会略过那些outbound handler)
- 出站事件流从尾到头执行所有的handler
3.1 关于duplexHandler
有人可能要问了,duplex handler到底属于出站还是入站?比如常见的LoggingHandler就是一个duplex handler。duplex handler的类定义如下:
public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {
// more code...
}
所以duplex handler被看成是inbound handler还是outbound handler关键取决于你是调用了ChannelInboundHandlerAdapter里面的方法还是ChannelOutboundHandler里面的方法。例如你调用了channelRead()那么就把它看成ChannelInboundHandlerAdapter,例如调用了write()方法,就看成是ChannelOutboundHandler
3.2 入站和出站事件流分界点
分界点自然就是在尾部。当执行完毕最后一个入站的handler之后就开始出站事件流了。这时候往回执行所有的handler。
4. 实例
这里说两个例子来理解执行顺序。
4.1 例1
例1用的LengthFieldBasedFrameDecoder这个类里面注释的example
@Test
public void testLengthFieldBasedFrameDecoder() {
ByteBuf in = Unpooled.buffer(16);
in.writeByte(0xCA);
in.writeShort(0x0010);
in.writeByte(0xFE);
in.writeBytes("HELLO, WORLD".getBytes());
EmbeddedChannel channel = new EmbeddedChannel();
channel.pipeline().addLast(new LoggingHandler(LogLevel.INFO)); //log handler1
channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 1, 2, -3, 3));
channel.pipeline().addLast(new LoggingHandler(LogLevel.INFO)); //log handler2
channel.writeInbound(in);
}
执行结果如下:
分下下执行顺序:
入站: log handler1->LengthFieldBasedFrameDecoder->log handler2
出站: 无出站的handler
4.2 例2
例2是我自己设计的一个测试。。一开始也困惑了我很久。
@Test
public void testProtobufDecoder(){
//创建嵌入式Channel用于发送encode之后的数据
EmbeddedChannel ch1 = new EmbeddedChannel();
//准备一个需要Decode 的protobuf类的实例用于解码
CanalModelPacket.CanalModelMessage msg = ProtobufUtils.getDefaultCanalModeMessage();
//OutboundHandler子类放前面,放在InboundHandler子类之后的话,根据往回逆序调用的话,一个都OutboundHandler都不会调用的
ch1.pipeline().addLast(new LoggingHandler(LogLevel.INFO)); //log handler1:一开始作为InboundHandler,会使用channelRead方法;之后会作为OutboundHandler再执行一遍,打印encoder之后的消息(write方法)并且flush
ch1.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
ch1.pipeline().addLast(new ProtobufEncoder());
ch1.pipeline().addLast(new LoggingHandler(LogLevel.INFO)); //log handler2:一开始作为InboundHandler,会使用channelRead方法;之后会作为OutboundHandler再执行一遍,打印encoder之前的消息并且flush
ch1.pipeline().addLast(new InboundHandler1()); //InboundHandler1会产生一个protobuf实例,PS:调用InboundHandler之后开始走Outbound的事件流了
ch1.writeInbound("Give a example message to avtive the channel...");
}
InboundHandler类
package netty;
import com.alibaba.otter.canal.instance.manager.protocol.CanalModelPacket;
import com.alibaba.otter.canal.instance.manager.protocol.ProtobufUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* @author Wan Kaiming on 2017/1/4
* @version 1.0
*/
public class InboundHandler1 extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Reading msg in InboundHandler1");
//读完之后初始化一个protobuf 实例对象交给OutboundHandler
CanalModelPacket.CanalModelMessage canalModelMessage = ProtobufUtils.getDefaultCanalModeMessage();
System.out.println("Sending canalModelMessage to Encoder...");
ctx.write(canalModelMessage);
ctx.flush();
}
}
执行结果为:
2017-01-05 13:24:35.681 [main] INFO io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] RECEIVED: Give a example message to avtive the channel...
2017-01-05 13:24:35.684 [main] INFO io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] RECEIVED: Give a example message to avtive the channel...
Reading msg in InboundHandler1
Sending canalModelMessage to Encoder...
2017-01-05 13:24:35.796 [main] INFO io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: id: 1
name: "example"
canalParameterMessage {
basicConfig {
canalId: 1
}
zkConfig {
zkClusterId: 1
zkClusters: "10.45.10.33:2181/kaiming-canal"
}
storageConfig {
transactionSize: 1024
storageBatchMode: MEMSIZE
memoryStorageBufferSize: 16384
memoryStorageBufferMemUnit: 1024
storageScavengeMode: ON_ACK
}
replicationConfig {
}
netConfig {
port: 11111
defaultConnectionTimeoutInSeconds: 30
receiveBufferSize: 65536
sendBufferSize: 65536
}
charsetConfig {
connectionCharsetNumber: "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
connectionCharset: "UTF-8"
}
dbConfig {
dbAddresses {
IPAddress: "10.45.10.33"
port: 3306
}
dbUsername: "canal"
dbPassword: "canal"
}
binlogConnectConfig {
fallbackIntervalInSeconds: 60
}
heartBeatConfig {
detectingEnable: true
detectingIntervalInSeconds: 3
detectingTimeoutThresholdInSeconds: 30
detectingRetryTimes: 3
}
ddlSupportConfig {
}
}
2017-01-05 13:24:35.905 [main] INFO io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: 183B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| b5 01 08 01 12 07 65 78 61 6d 70 6c 65 2a a7 01 |......example*..|
|00000010| 0a 02 08 01 12 22 08 01 12 1e 31 30 2e 34 35 2e |....."....10.45.|
|00000020| 31 30 2e 33 33 3a 32 31 38 31 2f 6b 61 69 6d 69 |10.33:2181/kaimi|
|00000030| 6e 67 2d 63 61 6e 61 6c 1a 0e 08 80 08 18 01 20 |ng-canal....... |
|00000040| 80 80 01 28 80 08 50 01 22 00 2a 0d 08 e7 56 10 |...(..P.".*...V.|
|00000050| 1e 18 80 80 04 20 80 80 04 32 2a 0a 21 00 00 00 |..... ...2*.!...|
|00000060| 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|
|00000070| 00 00 00 00 00 00 00 00 00 00 00 00 00 00 12 05 |................|
|00000080| 55 54 46 2d 38 3a 20 0a 10 0a 0b 31 30 2e 34 35 |UTF-8: ....10.45|
|00000090| 2e 31 30 2e 33 33 10 ea 19 1a 05 63 61 6e 61 6c |.10.33.....canal|
|000000a0| 22 05 63 61 6e 61 6c 42 02 28 3c 4a 08 08 01 20 |".canalB.(<J... |
|000000b0| 03 28 1e 30 03 6a 00 |.(.0.j. |
+--------+-------------------------------------------------+----------------+
2017-01-05 13:24:35.906 [main] INFO io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] FLUSH
2017-01-05 13:24:35.906 [main] INFO io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] FLUSH
执行顺序分析:
入站:log handler1(作为inbound)->log handler2-> inboundHandler1(作为inbound)
出站:log handler2(作为outbound) -> ProtobufEncoder->ProtobufVarint32LengthFieldPrepender->log handler1(作为outbound)
至于这里log handler什么时候是inbound什么时候是outbound参考3.1节的说明。
以上部分源码可以看我的github: netty测试代码
PS: 这里用到了protobuf,生成了一个 CanalModelMessage实例,大家可以自己生成。