好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Netty分布式分隔符解码器逻辑源码剖析

前文传送门: Netty分布式行解码器逻辑源码解析

分隔符解码器

基于分隔符解码器DelimiterBasedFrameDecoder, 是按照指定分隔符进行解码的解码器, 通过分隔符, 可以将二进制流拆分成完整的数据包

同样继承了ByteToMessageDecoder并重写了decode方法

我们看其中的一个构造方法

?

1

2

3

public DelimiterBasedFrameDecoder( int maxFrameLength, ByteBuf... delimiters) {

     this (maxFrameLength, true , delimiters);

}

这里参数maxFrameLength代表最大长度, delimiters是个可变参数, 可以说可以支持多个分隔符进行解码

我们进入decode方法:

?

1

2

3

4

5

6

protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

     Object decoded = decode(ctx, in);

     if (decoded != null ) {

         out.add(decoded);

     }

}

这里同样调用了其重载的decode方法并将解析好的数据添加到集合list中, 其父类就可以遍历out, 并将内容传播

我们跟到重载decode方法中

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {

     //行处理器(1)

     if (lineBasedDecoder != null ) {

         return lineBasedDecoder.decode(ctx, buffer);

     }

     int minFrameLength = Integer.MAX_VALUE;

     ByteBuf minDelim = null ;

     //找到最小长度的分隔符(2)

     for (ByteBuf delim: delimiters) {

         //每个分隔符分隔的数据包长度

         int frameLength = indexOf(buffer, delim);

         if (frameLength >= 0 && frameLength < minFrameLength) {

             minFrameLength = frameLength;

             minDelim = delim;

         }

     }

     //解码(3)

     //已经找到分隔符

     if (minDelim != null ) {

         int minDelimLength = minDelim.capacity();

         ByteBuf frame;

         //当前分隔符否处于丢弃模式

         if (discardingTooLongFrame) {

             //首先设置为非丢弃模式

             discardingTooLongFrame = false ;

             //丢弃

             buffer.skipBytes(minFrameLength + minDelimLength);

 

             int tooLongFrameLength = this .tooLongFrameLength;

             this .tooLongFrameLength = 0 ;

             if (!failFast) {

                 fail(tooLongFrameLength);

             }

             return null ;

         }

         //处于非丢弃模式

         //当前找到的数据包, 大于允许的数据包

         if (minFrameLength > maxFrameLength) {

             //当前数据包+最小分隔符长度 全部丢弃

             buffer.skipBytes(minFrameLength + minDelimLength);

             //传递异常事件

             fail(minFrameLength);

             return null ;

         }

         //如果是正常的长度

         //解析出来的数据包是否忽略分隔符

         if (stripDelimiter) {

             //如果不包含分隔符

             //截取

             frame = buffer.readRetainedSlice(minFrameLength);

             //跳过分隔符

             buffer.skipBytes(minDelimLength);

         } else {

             //截取包含分隔符的长度

             frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);

         }

         return frame;

     } else {

         //如果没有找到分隔符

         //非丢弃模式

         if (!discardingTooLongFrame) {

             //可读字节大于允许的解析出来的长度

             if (buffer.readableBytes() > maxFrameLength) {

                 //将这个长度记录下

                 tooLongFrameLength = buffer.readableBytes();

                 //跳过这段长度

                 buffer.skipBytes(buffer.readableBytes());

                 //标记当前处于丢弃状态

                 discardingTooLongFrame = true ;

                 if (failFast) {

                     fail(tooLongFrameLength);

                 }

             }

         } else {

             tooLongFrameLength += buffer.readableBytes();

             buffer.skipBytes(buffer.readableBytes());

         }

         return null ;

     }

}

这里的方法也比较长, 这里也通过拆分进行剖析

(1). 行处理器

(2). 找到最小长度分隔符

(3). 解码

首先看第一步行处理器:

?

1

2

3

if (lineBasedDecoder != null ) {

     return lineBasedDecoder.decode(ctx, buffer);

}

这里首先判断成员变量lineBasedDecoder是否为空, 如果不为空则直接调用lineBasedDecoder的decode的方法进行解码, lineBasedDecoder实际上就是上一小节剖析的LineBasedFrameDecoder解码器

这个成员变量, 会在分隔符是\n和\r\n的时候进行初始化

我们看初始化该属性的构造方法

?

1

2

3

4

5

6

7

8

9

10

11

12

13

public DelimiterBasedFrameDecoder(

         int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters) {

     //代码省略

     //如果是基于行的分隔

     if (isLineBased(delimiters) && !isSubclass()) {

         //初始化行处理器

         lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast);

         this .delimiters = null ;

     } else {

         //代码省略

     }

     //代码省略

}

这里isLineBased(delimiters)会判断是否是基于行的分隔, 跟到isLineBased方法中:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

private static boolean isLineBased( final ByteBuf[] delimiters) {

     //分隔符长度不为2

     if (delimiters.length != 2 ) {

         return false ;

     }

     //拿到第一个分隔符

     ByteBuf a = delimiters[ 0 ];

     //拿到第二个分隔符

     ByteBuf b = delimiters[ 1 ];

     if (a.capacity() < b.capacity()) {

         a = delimiters[ 1 ];

         b = delimiters[ 0 ];

     }

     //确保a是/r/n分隔符, 确保b是/n分隔符

     return a.capacity() == 2 && b.capacity() == 1

             && a.getByte( 0 ) == '\r' && a.getByte( 1 ) == '\n'

             && b.getByte( 0 ) == '\n' ;

}

首先判断长度等于2, 直接返回false

然后拿到第一个分隔符a和第二个分隔符b, 然后判断a的第一个分隔符是不是\r, a的第二个分隔符是不是\n, b的第一个分隔符是不是\n, 如果都为true, 则条件成立

我们回到decode方法中, 看步骤2, 找到最小长度的分隔符:

这里最小长度的分隔符, 意思就是从读指针开始, 找到最近的分隔符

?

1

2

3

4

5

6

7

8

for (ByteBuf delim: delimiters) {

     //每个分隔符分隔的数据包长度

     int frameLength = indexOf(buffer, delim);

     if (frameLength >= 0 && frameLength < minFrameLength) {

         minFrameLength = frameLength;

         minDelim = delim;

     }

}

这里会遍历所有的分隔符, 然后找到每个分隔符到读指针到数据包长度

然后通过if判断, 找到长度最小的数据包的长度, 然后保存当前数据包的的分隔符, 如下图:

6-4-1

这里假设A和B同为分隔符, A分隔符到读指针的长度小于B分隔符到读指针的长度, 这里会找到最小的分隔符A, 分隔符的最小长度, 就readIndex到A的长度

我们继续看第3步, 解码:

 if (minDelim != null) 表示已经找到最小长度分隔符, 我们继续看if块中的逻辑:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

int minDelimLength = minDelim.capacity();

ByteBuf frame;

if (discardingTooLongFrame) {

     discardingTooLongFrame = false ;

     buffer.skipBytes(minFrameLength + minDelimLength);

     int tooLongFrameLength = this .tooLongFrameLength;

     this .tooLongFrameLength = 0 ;

     if (!failFast) {

         fail(tooLongFrameLength);

     }

     return null ;

}

if (minFrameLength > maxFrameLength) {

     buffer.skipBytes(minFrameLength + minDelimLength);

     fail(minFrameLength);

     return null ;

}

if (stripDelimiter) {

     frame = buffer.readRetainedSlice(minFrameLength);

     buffer.skipBytes(minDelimLength);

} else {

     frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);

}

return frame;

 if (discardingTooLongFrame) 表示当前是否处于非丢弃模式, 如果是丢弃模式, 则进入if块

因为第一个不是丢弃模式, 所以这里先分析if块后面的逻辑

 if (minFrameLength > maxFrameLength) 这里是判断当前找到的数据包长度大于最大长度, 这里的最大长度使我们创建解码器的时候设置的, 如果超过了最大长度, 就通过 buffer.skipBytes(minFrameLength + minDelimLength) 方式, 跳过数据包+分隔符的长度, 也就是将这部分数据进行完全丢弃

继续往下看, 如果长度不大最大允许长度, 则通过 if (stripDelimiter) 判断解析的出来的数据包是否包含分隔符, 如果不包含分隔符, 则截取数据包的长度之后, 跳过分隔符

我们再回头看 if (discardingTooLongFrame) 中的if块中的逻辑, 也就是丢弃模式:

首先将discardingTooLongFrame设置为false, 标记非丢弃模式

然后通过 buffer.skipBytes(minFrameLength + minDelimLength) 将数据包+分隔符长度的字节数跳过, 也就是进行丢弃, 之后再进行抛出异常

分析完成了找到分隔符之后的丢弃模式非丢弃模式的逻辑处理, 我们在分析没找到分隔符的逻辑处理, 也就是 if (minDelim != null) 中的else块:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

if (!discardingTooLongFrame) {

     if (buffer.readableBytes() > maxFrameLength) {

         tooLongFrameLength = buffer.readableBytes();

         buffer.skipBytes(buffer.readableBytes());

         discardingTooLongFrame = true ;

         if (failFast) {

             fail(tooLongFrameLength);

         }

     }

} else {

     tooLongFrameLength += buffer.readableBytes();

     buffer.skipBytes(buffer.readableBytes());

}

return null ;

首先通过 if (!discardingTooLongFrame) 判断是否为非丢弃模式, 如果是, 则进入if块:

在if块中, 首先通过 if (buffer.readableBytes() > maxFrameLength) 判断当前可读字节数是否大于最大允许的长度, 如果大于最大允许的长度, 则将可读字节数设置到tooLongFrameLength的属性中, 代表丢弃的字节数

然后通过 buffer.skipBytes(buffer.readableBytes()) 将累计器中所有的可读字节进行丢弃

最后将discardingTooLongFrame设置为true, 也就是丢弃模式, 之后抛出异常

如果 if (!discardingTooLongFrame) 为false, 也就是当前处于丢弃模式, 则追加tooLongFrameLength也就是丢弃的字节数的长度, 并通过 buffer.skipBytes(buffer.readableBytes()) 将所有的字节继续进行丢弃

以上就是分隔符解码器的相关逻辑

章节总结

本章介绍了抽象解码器ByteToMessageDecoder, 和其他几个实现了ByteToMessageDecoder类的解码器, 这个几个解码器逻辑都比较简单, 同学们可以根据其中的思想剖析其他的比较复杂的解码器, 或者根据其规则实现自己的自定义解码器

更多关于Netty分布式分隔符解码器的资料请关注其它相关文章!

原文链接:https://HdhCmsTestcnblogs测试数据/xiangnan6122/p/10206475.html

查看更多关于Netty分布式分隔符解码器逻辑源码剖析的详细内容...

  阅读:16次