好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Netty分布式抽象编码器MessageToByteEncoder逻辑分析

前文回顾: Netty分布式编码器及写数据事件处理

MessageToByteEncoder

同解码器一样, 编码器中也有一个抽象类叫MessageToByteEncoder, 其中定义了编码器的骨架方法, 具体编码逻辑交给子类实现

解码器同样也是个handler, 将写出的数据进行截取处理, 我们在学习pipeline中我们知道, 写数据的时候会传递write事件, 传递过程中会调用handler的write方法, 所以编码器码器可以重写write方法, 将数据编码成二进制字节流然后再继续传递write事件

首先看MessageToByteEncoder的类声明

?

1

2

3

public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter{

     //省略类体

}

这里继承ChannelOutboundHandlerAdapter, 说明是个outBoundhandler, 我们知道write事件是个outBound事件, 而outBound事件只能通过outBoundHandler进行传输

write事件传播过程中要调用handler的write方法

我们跟到MessageToByteEncoder的write方法中:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {

     ByteBuf buf = null ;

     try {

         if (acceptOutboundMessage(msg)) {

             @SuppressWarnings ( "unchecked" )

             I cast = (I) msg;

             buf = allocateBuffer(ctx, cast, preferDirect);

             try {

                 encode(ctx, cast, buf);

             } finally {

                 ReferenceCountUtil.release(cast);

             }

 

             if (buf.isReadable()) {

                 ctx.write(buf, promise);

             } else {

                 buf.release();

                 ctx.write(Unpooled.EMPTY_BUFFER, promise);

             }

             buf = null ;

         } else {

             ctx.write(msg, promise);

         }

     } catch (EncoderException e) {

         throw e;

     } catch (Throwable e) {

         throw new EncoderException(e);

     } finally {

         if (buf != null ) {

             buf.release();

         }

     }

}

首先通过 if (acceptOutboundMessage(msg)) 判断当前对象是否可处理

如果可处理, 则进入if块中的逻辑, 如果不能处理, 则进入else块, 通过ctx.write(msg, promise)继续传递write事件

我们看if块中

 I cast = (I) msg 这里是强制类型转换, 转换成I类型, I类型是个泛型, 具体类型由用户定义

 buf = allocateBuffer(ctx, cast, preferDirect) 这里进行缓冲区分配

跟到allocateBuffer方法中

?

1

2

3

4

5

6

7

8

protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings ( "unused" ) I msg,

                            boolean preferDirect) throws Exception {

     if (preferDirect) {

         return ctx.alloc().ioBuffer();

     } else {

         return ctx.alloc().heapBuffer();

     }

}

这里会直接通过ctx的内存分配器进行内存分配, 通过判断preferDirect来分配堆内存或者堆外内存, 默认情况下是分配堆外内存

有关内存分配, 我们之前已经做过相关的剖析

回到write方法中:

内存分配结束之后会调用encode(ctx, cast, buf)方法进行编码, 该类由子类实现

子类可以通过继承该类, 重写encode方法, 将参数对象cast编码成字节写入到传入的ByteBuf中, 就完成了编码工作

编码完成后后, 会通过ReferenceCountUtil.release(cast)将cast对象释放

 if (buf.isReadable()) 这里判断buf是否有可读字节, 如果有可读字节, 则继续传递write事件

如果没有可读字节, 则将buf进行释放, 继续传播write事件, 传递一个空的ByteBuf

最后将buf设置为空

以上就是有关抽象编码器的抽象逻辑, 具体的编码逻辑还需要其子类去做,更多关于Netty分布式抽象编码器MessageToByteEncoder的资料请关注其它相关文章!

原文链接:https://HdhCmsTestcnblogs测试数据/xiangnan6122/p/10208131.html

查看更多关于Netty分布式抽象编码器MessageToByteEncoder逻辑分析的详细内容...

  阅读:14次