好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Netty分布式NioEventLoop任务队列执行源码分析

前文传送门: NioEventLoop处理IO事件

执行任务队列

继续回到NioEventLoop的run()方法:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

protected void run() {

     for (;;) {

         try {

             switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {

                 case SelectStrategy.CONTINUE:

                     continue ;

                 case SelectStrategy.SELECT:

                     //轮询io事件(1)

                     select(wakenUp.getAndSet( false ));

                     if (wakenUp.get()) {

                         selector.wakeup();

                     }

                 default :

             }

             cancelledKeys = 0 ;

             needsToSelectAgain = false ;

             //默认是50

             final int ioRatio = this .ioRatio;

             if (ioRatio == 100 ) {

                 try {

                     processSelectedKeys();

                 } finally {

                     runAllTasks();

                 }

             } else {

                 //记录下开始时间

                 final long ioStartTime = System.nanoTime();

                 try {

                     //处理轮询到的key(2)

                     processSelectedKeys();

                 } finally {

                     //计算耗时

                     final long ioTime = System.nanoTime() - ioStartTime;

                     //执行task(3)

                     runAllTasks(ioTime * ( 100 - ioRatio) / ioRatio);

                 }

             }

         } catch (Throwable t) {

             handleLoopException(t);

         }

         //代码省略

     }

}

我们看到处理完轮询到的key之后, 首先记录下耗时, 然后通过runAllTasks(ioTime * (100 - ioRatio) / ioRatio)执行taskQueue中的任务

我们知道ioRatio默认是50, 所以执行完ioTime * (100 - ioRatio) / ioRatio后, 方法传入的值为ioTime, 也就是processSelectedKeys()的执行时间:

跟进runAllTasks方法:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

protected boolean runAllTasks( long timeoutNanos) {

     //定时任务队列中聚合任务

     fetchFromScheduledTaskQueue();

     //从普通taskQ里面拿一个任务

     Runnable task = pollTask();

     //task为空, 则直接返回

     if (task == null ) {

         //跑完所有的任务执行收尾的操作

         afterRunningAllTasks();

         return false ;

     }

     //如果队列不为空

     //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)

     final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;

     long runTasks = 0 ;

     long lastExecutionTime;

     //执行每一个任务

     for (;;) {

         safeExecute(task);

         //标记当前跑完的任务

         runTasks ++;

         //当跑完64个任务的时候, 会计算一下当前时间

         if ((runTasks & 0x3F ) == 0 ) {

             //定时任务初始化到当前的时间

             lastExecutionTime = ScheduledFutureTask.nanoTime();

             //如果超过截止时间则不执行(nanoTime()是耗时的)

             if (lastExecutionTime >= deadline) {

                 break ;

             }

         }

         //如果没有超过这个时间, 则继续从普通任务队列拿任务

         task = pollTask();

         //直到没有任务执行

         if (task == null ) {

             //记录下最后执行时间

             lastExecutionTime = ScheduledFutureTask.nanoTime();

             break ;

         }

     }

     //收尾工作

     afterRunningAllTasks();

     this .lastExecutionTime = lastExecutionTime;

     return true ;

}

首先会执行fetchFromScheduledTaskQueue()这个方法, 这个方法的意思是从定时任务队列中聚合任务, 也就是将定时任务中找到可以执行的任务添加到taskQueue中

我们跟进fetchFromScheduledTaskQueue()方法

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

private boolean fetchFromScheduledTaskQueue() {

     long nanoTime = AbstractScheduledEventExecutor.nanoTime();

     //从定时任务队列中抓取第一个定时任务

     //寻找截止时间为nanoTime的任务

     Runnable scheduledTask  = pollScheduledTask(nanoTime);

     //如果该定时任务队列不为空, 则塞到普通任务队列里面

     while (scheduledTask != null ) {

         //如果添加到普通任务队列过程中失败

         if (!taskQueue.offer(scheduledTask)) {

             //则重新添加到定时任务队列中

             scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);

             return false ;

         }

         //继续从定时任务队列中拉取任务

         //方法执行完成之后, 所有符合运行条件的定时任务队列, 都添加到了普通任务队列中

         scheduledTask = pollScheduledTask(nanoTime);

     }

     return true ;

}

 long nanoTime = AbstractScheduledEventExecutor.nanoTime()

 代表从定时任务初始化到现在过去了多长时间

 Runnable scheduledTask= pollScheduledTask(nanoTime) 

代表从定时任务队列中拿到小于nanoTime时间的任务, 因为小于初始化到现在的时间, 说明该任务需要执行了

跟到其父类AbstractScheduledEventExecutor的pollScheduledTask(nanoTime)方法中:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

protected final Runnable pollScheduledTask( long nanoTime) {

     assert inEventLoop();

     //拿到定时任务队列

     Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this .scheduledTaskQueue;

     //peek()方法拿到第一个任务

     ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();

     if (scheduledTask == null ) {

         return null ;

     }

     if (scheduledTask.deadlineNanos() <= nanoTime) {

         //从队列中删除

         scheduledTaskQueue.remove();

         //返回该任务

         return scheduledTask;

     }

     return null ;

}

我们看到首先获得当前类绑定的定时任务队列的成员变量

如果不为空, 则通过scheduledTaskQueue.peek()弹出第一个任务

如果当前任务小于传来的时间, 说明该任务需要执行, 则从定时任务队列中删除

我们继续回到fetchFromScheduledTaskQueue()方法中:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

private boolean fetchFromScheduledTaskQueue() {

     long nanoTime = AbstractScheduledEventExecutor.nanoTime();

     //从定时任务队列中抓取第一个定时任务

     //寻找截止时间为nanoTime的任务

     Runnable scheduledTask  = pollScheduledTask(nanoTime);

     //如果该定时任务队列不为空, 则塞到普通任务队列里面

     while (scheduledTask != null ) {

         //如果添加到普通任务队列过程中失败

         if (!taskQueue.offer(scheduledTask)) {

             //则重新添加到定时任务队列中

             scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);

             return false ;

         }

         //继续从定时任务队列中拉取任务

         //方法执行完成之后, 所有符合运行条件的定时任务队列, 都添加到了普通任务队列中

         scheduledTask = pollScheduledTask(nanoTime);

     }

     return true ;

}

弹出需要执行的定时任务之后, 我们通过taskQueue.offer(scheduledTask)添加到taskQueue中, 如果添加失败, 则通过

scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask)

重新添加到定时任务队列中

如果添加成功, 则通过pollScheduledTask(nanoTime)方法继续添加, 直到没有需要执行的任务

这样就将定时任务队列需要执行的任务添加到了taskQueue中

回到runAllTasks(long timeoutNanos)方法中

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

protected boolean runAllTasks( long timeoutNanos) {

     //定时任务队列中聚合任务

     fetchFromScheduledTaskQueue();

     //从普通taskQ里面拿一个任务

     Runnable task = pollTask();

     //task为空, 则直接返回

     if (task == null ) {

         //跑完所有的任务执行收尾的操作

         afterRunningAllTasks();

         return false ;

     }

     //如果队列不为空

     //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)

     final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;

     long runTasks = 0 ;

     long lastExecutionTime;

     //执行每一个任务

     for (;;) {

         safeExecute(task);

         //标记当前跑完的任务

         runTasks ++;

         //当跑完64个任务的时候, 会计算一下当前时间

         if ((runTasks & 0x3F ) == 0 ) {

             //定时任务初始化到当前的时间

             lastExecutionTime = ScheduledFutureTask.nanoTime();

             //如果超过截止时间则不执行(nanoTime()是耗时的)

             if (lastExecutionTime >= deadline) {

                 break ;

             }

         }

         //如果没有超过这个时间, 则继续从普通任务队列拿任务

         task = pollTask();

         //直到没有任务执行

         if (task == null ) {

             //记录下最后执行时间

             lastExecutionTime = ScheduledFutureTask.nanoTime();

             break ;

         }

     }

     //收尾工作

     afterRunningAllTasks();

     this .lastExecutionTime = lastExecutionTime;

     return true ;

}

首先通过 Runnable task = pollTask() 从taskQueue中拿一个任务

任务不为空, 则通过

final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos 

计算一个截止时间, 任务的执行时间不能超过这个时间

然后在for循环中通过safeExecute(task)执行task

我们跟到safeExecute(task)中:

?

1

2

3

4

5

6

7

8

9

protected static void safeExecute(Runnable task) {

     try {

         //直接调用run()方法执行

         task.run();

     } catch (Throwable t) {

         //发生异常不终止

         logger.warn( "A task raised an exception. Task: {}" , task, t);

     }

}

这里直接调用task的run()方法进行执行, 其中发生异常, 只打印一条日志, 代表发生异常不终止, 继续往下执行

回到runAllTasks(long timeoutNanos)方法

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

protected boolean runAllTasks( long timeoutNanos) {

     //定时任务队列中聚合任务

     fetchFromScheduledTaskQueue();

     //从普通taskQ里面拿一个任务

     Runnable task = pollTask();

     //task为空, 则直接返回

     if (task == null ) {

         //跑完所有的任务执行收尾的操作

         afterRunningAllTasks();

         return false ;

     }

     //如果队列不为空

     //首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)

     final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;

     long runTasks = 0 ;

     long lastExecutionTime;

     //执行每一个任务

     for (;;) {

         safeExecute(task);

         //标记当前跑完的任务

         runTasks ++;

         //当跑完64个任务的时候, 会计算一下当前时间

         if ((runTasks & 0x3F ) == 0 ) {

             //定时任务初始化到当前的时间

             lastExecutionTime = ScheduledFutureTask.nanoTime();

             //如果超过截止时间则不执行(nanoTime()是耗时的)

             if (lastExecutionTime >= deadline) {

                 break ;

             }

         }

         //如果没有超过这个时间, 则继续从普通任务队列拿任务

         task = pollTask();

         //直到没有任务执行

         if (task == null ) {

             //记录下最后执行时间

             lastExecutionTime = ScheduledFutureTask.nanoTime();

             break ;

         }

     }

     //收尾工作

     afterRunningAllTasks();

     this .lastExecutionTime = lastExecutionTime;

     return true ;

}

每次执行完task, runTasks自增

这里 if ((runTasks & 0x3F) == 0) 代表是否执行了64个任务, 如果执行了64个任务, 则会通过 lastExecutionTime = ScheduledFutureTask.nanoTime() 记录定时任务初始化到现在的时间, 如果这个时间超过了截止时间, 则退出循环

如果没有超过截止时间, 则通过 task = pollTask() 继续弹出任务执行

这里执行64个任务统计一次时间, 而不是每次执行任务都统计, 主要原因是因为获取系统时间是个比较耗时的操作, 这里是netty的一种优化方式

如果没有task需要执行, 则通过afterRunningAllTasks()做收尾工作, 最后记录下最后的执行时间

以上就是有关执行任务队列的相关逻辑

章节小结

本章学习了有关NioEventLoopGroup的创建, NioEventLoop的创建和启动, 以及多路复用器的轮询处理和task执行的相关逻辑, 通过本章学习, 我们应该掌握如下内容:

        1.  NioEventLoopGroup如何选择分配NioEventLoop

        2.  NioEventLoop如何开启

        3.  NioEventLoop如何进行select操作

        4.  NioEventLoop如何执行task

以上就是Netty分布式NioEventLoop任务队列执行源码分析的详细内容,更多关于Netty分布式NioEventLoop执行任务队列的资料请关注其它相关文章!

原文链接:https://www.cnblogs.com/xiangnan6122/p/10203169.html

查看更多关于Netty分布式NioEventLoop任务队列执行源码分析的详细内容...

  阅读:14次