好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Netty分布式ByteBuf使用directArena分配缓冲区过程解析

上一小节简单分析了PooledByteBufAllocator中, 线程局部缓存和arean的相关逻辑, 这一小节简单分析下directArena分配缓冲区的相关过程

directArena分配缓冲区

回到newDirectBuffer中

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

protected ByteBuf newDirectBuffer( int initialCapacity, int maxCapacity) {

     PoolThreadCache cache = threadCache.get();

     PoolArena<ByteBuffer> directArena = cache.directArena;

     ByteBuf buf;

     if (directArena != null ) {

         buf = directArena.allocate(cache, initialCapacity, maxCapacity);

     } else {

         if (PlatformDependent.hasUnsafe()) {

             buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf( this , initialCapacity, maxCapacity);

         } else {

             buf = new UnpooledDirectByteBuf( this , initialCapacity, maxCapacity);

         }

     }

     return toLeakAwareBuffer(buf);

}

获取了directArena对象之后, 通过allocate方法分配一个ByteBuf, 这里allocate方法是PoolArena类中的方法

跟到allocate方法中:

?

1

2

3

4

5

PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {

     PooledByteBuf<T> buf = newByteBuf(maxCapacity);

     allocate(cache, buf, reqCapacity);

     return buf;

}

首先通过newByteBuf获得一个ByteBuf对象

再通过allocate方法进行分配, 这里要注意, 这里进行分配的时候是线程私有的directArena进行分配

我们跟到newByteBuf方法中

因为是directArena调用的newByteBuf, 所以这里会进入DirectArena类的newByteBuf中:

?

1

2

3

4

5

6

7

protected PooledByteBuf<ByteBuffer> newByteBuf( int maxCapacity) {

     if (HAS_UNSAFE) {

         return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);

     } else {

         return PooledDirectByteBuf.newInstance(maxCapacity);

     }

}

因为默认通常是有unsafe对象的, 所以这里会走到这一步中PooledUnsafeDirectByteBuf.newInstance(maxCapacity)

通过静态方法newInstance创建一个PooledUnsafeDirectByteBuf对象

跟到newInstance方法中:

?

1

2

3

4

5

static PooledUnsafeDirectByteBuf newInstance( int maxCapacity) {

     PooledUnsafeDirectByteBuf buf = RECYCLER.get();

     buf.reuse(maxCapacity);

     return buf;

}

这里通过RECYCLER.get()这种方式拿到一个ByteBuf对象, RECYCLER其实是一个对象回收站, 这部分内容会在后面的内容中详细剖析, 这里我们只需要知道, 这种方式能从回收站中拿到一个对象, 如果回收站里没有相关对象, 则创建一个新

因为这里有可能是从回收站中拿出的一个对象, 所以通过reuse进行复用

跟到reuse方法中

?

1

2

3

4

5

6

final void reuse( int maxCapacity) {

     maxCapacity(maxCapacity);

     setRefCnt( 1 );

     setIndex0( 0 , 0 );

     discardMarks();

}

这里设置了的最大可扩容内存, 对象的引用数量, 读写指针位置都重置为0, 以及读写指针的位置标记也都重置为0

我们回到PoolArena的allocate方法中:

?

1

2

3

4

5

PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {

     PooledByteBuf<T> buf = newByteBuf(maxCapacity);

     allocate(cache, buf, reqCapacity);

     return buf;

}

拿到了ByteBuf对象, 就可以通过allocate(cache, buf, reqCapacity)方法进行内存分配了

跟到allocate方法中

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {

     //规格化

     final int normCapacity = normalizeCapacity(reqCapacity);

     if (isTinyOrSmall(normCapacity)) {

         int tableIdx;

         PoolSubpage<T>[] table;

         //判断是不是tinty

         boolean tiny = isTiny(normCapacity);

         if (tiny) { // < 512

             //缓存分配

             if (cache.allocateTiny( this , buf, reqCapacity, normCapacity)) {

                 return ;

             }

             //通过tinyIdx拿到tableIdx

             tableIdx = tinyIdx(normCapacity);

             //subpage的数组

             table = tinySubpagePools;

         } else {

             if (cache.allocateSmall( this , buf, reqCapacity, normCapacity)) {

                 return ;

             }

             tableIdx = smallIdx(normCapacity);

             table = smallSubpagePools;

         }

         //拿到对应的节点

         final PoolSubpage<T> head = table[tableIdx];

 

         synchronized (head) {

             final PoolSubpage<T> s = head.next;

             //默认情况下, head的next也是自身

             if (s != head) {

                 assert s.doNotDestroy && s.elemSize == normCapacity;

                 long handle = s.allocate();

                 assert handle >= 0 ;

                 s.chunk.initBufWithSubpage(buf, handle, reqCapacity);

 

                 if (tiny) {

                     allocationsTiny.increment();

                 } else {

                     allocationsSmall.increment();

                 }

                 return ;

             }

         }

         allocateNormal(buf, reqCapacity, normCapacity);

         return ;

     }

     if (normCapacity <= chunkSize) {

         //首先在缓存上进行内存分配

         if (cache.allocateNormal( this , buf, reqCapacity, normCapacity)) {

             //分配成功, 返回

             return ;

         }

         //分配不成功, 做实际的内存分配

         allocateNormal(buf, reqCapacity, normCapacity);

     } else {

         //大于这个值, 就不在缓存上分配

         allocateHuge(buf, reqCapacity);

     }

}

这里看起来逻辑比较长, 其实主要步骤分为两步

1.首先在缓存上进行分配

对应步骤是:

  cache.allocateTiny(this, buf, reqCapacity, normCapacity)

  cache.allocateSmall(this, buf, reqCapacity, normCapacity)

  cache.allocateNormal(this, buf, reqCapacity, normCapacity)

2.如果在缓存上分配不成功, 则实际分配一块内存

对应步骤是

  allocateNormal(buf, reqCapacity, normCapacity)

在这里对几种类型的内存进行介绍:

之前的小节我们介绍过, 缓冲区内存类型分为tiny, small, 和normal, 其实还有种不常见的类型叫做huge, 那么这几种类型的内存有什么区别呢, 实际上这几种类型是按照缓冲区初始化空间的范围进行区分的, 具体区分如下:

tiny 类型对应的缓冲区范围为0-512B

small 类型对应的缓冲区范围为512B-8K

normal 类型对应的缓冲区范围为8K-16MB

huge 类型对应缓冲区范围为大于16MB

简单介绍下有关范围的含义:

16MB对应一个chunk, netty是以chunk为单位向操作系统申请内存的

8k对应一个page, page是将chunk切分后的结果, 一个chunk对应2048个page

8k以下对应一个subpage, subpage是page的切分, 一个page可以切分多个subpage, 具体切分几个需要根据subpage的大小而定, 比如只要分配1k的缓冲区, 则会将page切分成8个subpage

以上就是directArena内存分配的大概流程和相关概念,更多关于Netty分布式ByteBuf directArena分配缓冲区的资料请关注其它相关文章!

原文链接:https://www.cnblogs.com/xiangnan6122/p/10205478.html

查看更多关于Netty分布式ByteBuf使用directArena分配缓冲区过程解析的详细内容...

  阅读:14次