好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Netty源码分析NioEventLoop执行select操作入口

分析完了 selector的创建和优化 的过程, 这一小节分析select相关操作

select操作的入口

NioEventLoop的run方法

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

protected void run() {

     for (;;) {

         try {

             switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {

                 case SelectStrategy.CONTINUE:

                     continue ;

                 case SelectStrategy.SELECT:

                     //轮询io事件(1)

                     select(wakenUp.getAndSet( false ));

                     if (wakenUp.get()) {

                         selector.wakeup();

                     }

                 default :

             }

             cancelledKeys = 0 ;

             needsToSelectAgain = false ;

             //默认是50

             final int ioRatio = this .ioRatio;

             if (ioRatio == 100 ) {

                 try {

                     processSelectedKeys();

                 } finally {

                     runAllTasks();

                 }

             } else {

                 //记录下开始时间

                 final long ioStartTime = System.nanoTime();

                 try {

                     //处理轮询到的key(2)

                     processSelectedKeys();

                 } finally {

                     //计算耗时

                     final long ioTime = System.nanoTime() - ioStartTime;

                     //执行task(3)

                     runAllTasks(ioTime * ( 100 - ioRatio) / ioRatio);

                 }

             }

         } catch (Throwable t) {

             handleLoopException(t);

         }

         //代码省略

     }

}

代码比较长, 其实主要分为三部分:

1. 轮询io事件

2. 处理轮询到的key

3. 执行task

这一小节, 主要剖析第一部分

轮询io事件

首先switch块中默认会走到SelectStrategy.SELECT中, 执行select(wakenUp.getAndSet(false))方法

参数wakenUp.getAndSet(false)代表当前select操作是未唤醒状态

进入到select(wakenUp.getAndSet(false))方法中

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

private void select( boolean oldWakenUp) throws IOException {

     Selector selector = this .selector;

     try {

         int selectCnt = 0 ;

         //当前系统的纳秒数

         long currentTimeNanos = System.nanoTime();

         //截止时间=当前时间+队列第一个任务剩余执行时间

         long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

         for (;;) {

             //阻塞时间(毫秒)=(截止时间-当前时间+0.5毫秒)

             long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;

             if (timeoutMillis <= 0 ) {

                 if (selectCnt == 0 ) {

                     selector.selectNow();

                     selectCnt = 1 ;

                 }

                 break ;

             }

             if (hasTasks() && wakenUp.compareAndSet( false , true )) {

                 selector.selectNow();

                 selectCnt = 1 ;

                 break ;

             }

             //进行阻塞式的select操作

             int selectedKeys = selector.select(timeoutMillis);

             //轮询次数

             selectCnt ++;

             //如果轮询到一个事件(selectedKeys != 0), 或者当前select操作需要唤醒(oldWakenUp),

             //或者在执行select操作时已经被外部线程唤醒(wakenUp.get()),

             //或者任务队列已经有任务(hasTask), 或者定时任务队列中有任务(hasScheduledTasks())

             if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {

                 break ;

             }

             //省略

             //记录下当前时间

             long time = System.nanoTime();

             //当前时间-开始时间>=超时时间(条件成立, 执行过一次select操作, 条件不成立, 有可能发生空轮询)

             if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {

                 //代表已经进行了一次阻塞式select操作, 操作次数重置为1

                 selectCnt = 1 ;

             } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {

                 //省略日志代码

                 //如果空轮询的次数大于一个阈值(512), 解决空轮询的bug

                 rebuildSelector();

                 selector = this .selector;

                 selector.selectNow();

                 selectCnt = 1 ;

                 break ;

             }

             currentTimeNanos = time;

         }

         //代码省略

     } catch (CancelledKeyException e) {

         //省略

     }

}

首先通过 long currentTimeNanos = System.nanoTime() 获取系统的纳秒数

继续往下看:

?

1

long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

delayNanos(currentTimeNanos)代表距定时任务中第一个任务剩余多长时间, 这个时间+当前时间代表这次操作不能超过的时间, 因为超过之后定时任务不能严格按照预定时间执行, 其中定时任务队列是已经按照执行时间有小到大排列好的队列, 所以第一个任务则是最近需要执行的任务, selectDeadLineNanos就代表了当前操作不能超过的时间

然后就进入到了无限for循环

for循环中我们关注:

?

1

long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L

selectDeadLineNanos - currentTimeNanos+500000L 代表截止时间-当前时间+0.5毫秒的调整时间, 除以1000000表示将计算的时间转化为毫秒数

最后算出的时间就是selector操作的阻塞时间, 并赋值到局部变量的timeoutMillis中

后面有个判断 if(imeoutMillis<0) , 代表当前时间已经超过了最后截止时间+0.5毫秒,  selectCnt == 0 代表没有进行select操作, 满足这两个条件, 则执行selectNow()之后, 将selectCnt赋值为1之后跳出循环

如果没超过截止时间, 就进行了 if(hasTasks() && wakenUp.compareAndSet(false, true)) 判断

这里我们关注hasTasks()方法, 这里是判断当前NioEventLoop所绑定的taskQueue是否有任务, 如果有任务, 则执行selectNow()之后, 将selectCnt赋值为1之后跳出循环(跳出循环之后去执行任务队列中的任务)

hasTasks()方法可以自己跟一下, 非常简单

如果没有满足上述条件, 就会执行 int selectedKeys = selector.select(timeoutMillis) 进行阻塞式轮询, 并且自增轮询次数, 而后会进行如下判断:

?

1

2

3

if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {

     break ;

}

selectedKeys != 0代表已经有轮询到的事件, oldWakenUp代表当前select操作是否需要唤醒, wakenUp.get()说明已经被外部线程唤醒, hasTasks()代表任务队列是否有任务, hasScheduledTasks()代表定时任务队列是否任务, 满足条件之一, 就跳出循环

long time = System.nanoTime() 记录了当前的时间, 之后有个判断:

 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) 

这里的意思是当前时间-阻塞时间>方法开始执行的时间, 这里说明已经完整的执行完成了一个阻塞的select()操作, 将selectCnt设置成1

如果此条件不成立, 说明没有完整执行select()操作, 可能触发了一次空轮询, 根据前一个selectCnt++这步我们知道, 每触发一次空轮询selectCnt都会自增

之后会进入第二个判断

 SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD 

其中SELECTOR_AUTO_REBUILD_THRESHOLD默认是512, 这个判断意思就是空轮询的次数如果超过512次, 则会认为是发生了epoll bug, 这样会通过rebuildSelector()方法重新构建selector, 然后将重新构建的selector赋值到局部变量selector, 执行一次selectNow(), 将selectCnt初始化1, 跳出循环

rebuildSelector()方法

rebuildSelector()方法中, 看netty是如何解决epoll bug的

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

public void rebuildSelector() {

     //是否是由其他线程发起的

     if (!inEventLoop()) {

         //如果是其他线程发起的, 将rebuildSelector()封装成任务队列, 由NioEventLoop进行调用

         execute( new Runnable() {

             @Override

             public void run() {

                 rebuildSelector();

             }

         });

         return ;

     }

     final Selector oldSelector = selector;

     final Selector newSelector;

     if (oldSelector == null ) {

         return ;

     }

     try {

         //重新创建一个select

         newSelector = openSelector();

     } catch (Exception e) {

         logger.warn( "Failed to create a new Selector." , e);

         return ;

     }

     int nChannels = 0 ;

     for (;;) {

         try {

             //拿到旧select中所有的key

             for (SelectionKey key: oldSelector.keys()) {

                 Object a = key.attachment();

                 try {

                     Object a = key.attachment();

                     //代码省略

 

                     //获取key注册的事件

                     int interestOps = key.interestOps();

                     //将key注册的事件取消

                     key.cancel();

                     //注册到重新创建的新的selector中

                     SelectionKey newKey = key.channel().register(newSelector, interestOps, a);

                     //如果channel是NioChannel

                     if (a instanceof AbstractNioChannel) {

                         //重新赋值

                         ((AbstractNioChannel) a).selectionKey = newKey;

                     }

                     nChannels ++;

                 } catch (Exception e) {

                     //代码省略

                 }

             }

         } catch (ConcurrentModificationException e) {

             continue ;

         }

         break ;

     }

     selector = newSelector;

     //代码省略

}

首先会判断是不是当前NioEventLoop线程执行的, 如果不是, 则将构建方法封装成task由当前NioEventLoop执行

 final Selector oldSelector = selector 表示拿到旧的selector

然后通过 newSelector = openSelector() 创建新的selector

通过for循环遍历所有注册在selector中的key

 Object a = key.attachment() 是获取channel, 第一章讲过, 在注册时, 将自身作为属性绑定在key上

for循环体中, 通过 int interestOps = key.interestOps() 获取其注册的事件

key.cancel()将注册的事件进行取消

 SelectionKey newKey = key.channel().register(newSelector, interestOps, a) 

将channel以及注册的事件注册在新的selector中

 if (a instanceof AbstractNioChannel) 判断是不是NioChannel

如果是NioChannel, 则通过 ((AbstractNioChannel) a).selectionKey = newKey 将自身的属性selectionKey赋值为新返回的key

 selector = newSelector 将自身NioEventLoop属性selector赋值为新创建的newSelector

至此, 就是netty解决epoll bug的步骤, 其实就是创建一个新的selector, 将旧selector中注册的channel和事件重新注册到新的selector中, 然后将自身selector属性替换成新创建的selector

以上就是Netty源码分析NioEventLoop执行select操作入口的详细内容,更多关于Netty分布式NioEventLoop selector操作的资料请关注其它相关文章!

原文链接:https://www.cnblogs.com/xiangnan6122/p/10203139.html

查看更多关于Netty源码分析NioEventLoop执行select操作入口的详细内容...

  阅读:9次