好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

一文带你了解Java中的ForkJoin

前言:

ForkJoin 是在Java7中新加入的特性,大家可能对其比较陌生,但是Java8中 Stream 的并行流parallelStream就是依赖于ForkJoin。在ForkJoin体系中最为关键的就是ForkJoinTask和ForkJoinPool,ForkJoin就是利用分治的思想将大的任务按照一定规则Fork拆分成小任务,再通过Join聚合起来。

什么是ForkJoin?

ForkJoin 从字面上看Fork是分岔的意思,Join是结合的意思,我们可以理解为将大任务拆分成小任务进行计算求解,最后将小任务的结果进行结合求出大任务的解,这些裂变出来的小任务,我们就可以交给不同的线程去进行计算,这也就是分布式计算的一种思想。这与大数据中的分布式离线计算MapReduce类似,对ForkJoin最经典的一个应用就是Java8中的Stream,我们知道Stream分为串行流和并行流,其中并行流parallelStream就是依赖于ForkJoin来实现并行处理的。

下面我们一起来看一下最为核心的 ForkJoinTask 和 ForkJoinPool 。

ForkJoinTask 任务

ForkJoinTask本身的依赖关系并不复杂,它与异步任务计算FutureTask一样均实现了Future接口,FutureTask我们在之前的文章中有讲到感兴趣的可以阅读一下—— Java从源码看异步任务计算FutureTask

下面我们就ForkJoinTask的核心源码来研究一下,该任务是如何通过分治法进行计算。

ForkJoinTask最核心的莫过于fork()和join()方法了。

fork()

判断当前线程是不是ForkJoinWorkerThread线程 是 直接将当前线程push到工作队列中 否 调用ForkJoinPool 的externalPush方法

在 ForkJoinPool 构建了一个静态的common对象,这里调用的就是 common 的 externalPush()

join()

调用doJoin()方法,等待线程执行完成

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

public final ForkJoinTask<V> fork() {

     Thread t;

     if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)

         ((ForkJoinWorkerThread)t).workQueue.push( this );

     else

         ForkJoinPool.common.externalPush( this );

     return this ;

}

 

public final V join() {

     int s;

     if ((s = doJoin() & DONE_MASK) != NORMAL)

         reportException(s);

     return getRawResult();

}

 

private int doJoin() {

     int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;

     return (s = status) < 0 ? s :

         ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?

         (w = (wt = (ForkJoinWorkerThread)t).workQueue).

         tryUnpush( this ) && (s = doExec()) < 0 ? s :

         wt.pool.awaitJoin(w, this , 0L) :

         externalAwaitDone();

}

 

// 获取结果的方法由子类实现

public abstract V getRawResult();  

RecursiveTask 是ForkJoinTask的一个子类主要对获取结果的方法进行了实现,通过泛型约束结果。我们如果需要自己创建任务,仍需要实现RecursiveTask,并去编写最为核心的计算方法compute()。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

public abstract class RecursiveTask<V> extends ForkJoinTask<V> {

     private static final long serialVersionUID = 5232453952276485270L;

 

     V result;

 

     protected abstract V compute();

 

     public final V getRawResult() {

         return result;

     }

 

     protected final void setRawResult(V value) {

         result = value;

     }

     protected final boolean exec() {

         result = compute();

         return true ;

     }

 

}

ForkJoinPool 线程池

ForkJoinTask 中许多功能都依赖于ForkJoinPool线程池,所以说ForkJoinTask运行离不开ForkJoinPool,ForkJoinPool与ThreadPoolExecutor有许多相似之处,他是专门用来执行ForkJoinTask任务的线程池,我之前也有文章对线程池技术进行了介绍,感兴趣的可以进行阅读—— 从java源码分析线程池(池化技术)的实现原理

ForkJoinPool与ThreadPoolExecutor的继承关系几乎是相同的,他们相当于兄弟关系。

工作窃取算法

ForkJoinPool中采取工作窃取算法,如果每次fork子任务如果都去创建新线程去处理的话,对系统资源的开销是巨大的,所以必须采取线程池。一般的线程池只有一个任务队列,但是对于ForkJoinPool来说,由于同一个任务Fork出的各个子任务是平行关系,为了提高效率,减少线程的竞争,需要将这些平行的任务放到不同的队列中,由于线程处理不同任务的速度不同,这样就可能存在某个线程先执行完了自己队列中的任务,这时为了提升效率,就可以让该线程去[ 窃取 ]其它任务队列中的任务,这就是所谓的[ 工作窃取算法 ]。

对于一般的队列来说,入队元素都是在队尾,出队元素在队首,要满足[工作窃取]的需求,任务队列应该支持从[队尾]出队元素,这样可以减少与其它工作线程的冲突(因为其它工作线程会从队首获取自己任务队列中的任务),这时就需要使用双端阻塞队列来解决。

构造方法

首先我们来看ForkJoinPool线程池的构造方法,他为我们提供了三种形式的构造,其中最为复杂的是四个入参的构造,下面我们看一下它四个入参都代表什么?

int parallelism 可并行级别(不代表最多存在的线程数量) ForkJoinWorkerThreadFactory factory 线程创建工厂 UncaughtExceptionHandler handler 异常捕获处理器 boolean asyncMode 先进先出的工作模式 或者 后进先出的工作模式

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

public ForkJoinPool() {

     this (Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),

          defaultForkJoinWorkerThreadFactory, null , false );

}

 

public ForkJoinPool( int parallelism) {

     this (parallelism, defaultForkJoinWorkerThreadFactory, null , false );

}

 

public ForkJoinPool( int parallelism,

                     ForkJoinWorkerThreadFactory factory,

                     UncaughtExceptionHandler handler,

                     boolean asyncMode) {

     this (checkParallelism(parallelism),

          checkFactory(factory),

          handler,

          asyncMode ? FIFO_QUEUE : LIFO_QUEUE,

          "ForkJoinPool-" + nextPoolId() + "-worker-" );

     checkPermission();

}

提交方法

下面我们看一下提交任务的方法:

externalPush 这个方法我们很眼熟,它正是在fork的时候如果当前线程不是ForkJoinWorkerThread,新提交任务也是会通过这个方法去执行任务。由此可见,fork就是新建一个子任务进行提交。

externalSubmit 是最为核心的一个方法,它可以首次向池提交第一个任务,并执行二次初始化。它还可以检测外部线程的首次提交,并创建一个新的共享队列。

signalWork (ws, q)是发送工作信号,让工作队列进行运转。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

public ForkJoinTask<?> submit(Runnable task) {

     if (task == null )

         throw new NullPointerException();

     ForkJoinTask<?> job;

     if (task instanceof ForkJoinTask<?>) // avoid re-wrap

         job = (ForkJoinTask<?>) task;

     else

         job = new ForkJoinTask.AdaptedRunnableAction(task);

     externalPush(job);

     return job;

}

 

final void externalPush(ForkJoinTask<?> task) {

     WorkQueue[] ws; WorkQueue q; int m;

     int r = ThreadLocalRandom.getProbe();

     int rs = runState;

     if ((ws = workQueues) != null && (m = (ws.length - 1 )) >= 0 &&

         (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&

         U.compareAndSwapInt(q, QLOCK, 0 , 1 )) {

         ForkJoinTask<?>[] a; int am, n, s;

         if ((a = q.array) != null &&

             (am = a.length - 1 ) > (n = (s = q.top) - q.base)) {

             int j = ((am & s) << ASHIFT) + ABASE;

             U.putOrderedObject(a, j, task);

             U.putOrderedInt(q, QTOP, s + 1 );

             U.putOrderedInt(q, QLOCK, 0 );

             if (n <= 1 )

                 signalWork(ws, q);

             return ;

         }

         U.compareAndSwapInt(q, QLOCK, 1 , 0 );

     }

     externalSubmit(task);

}

 

private void externalSubmit(ForkJoinTask<?> task) {

     int r;                                    // initialize caller's probe

     if ((r = ThreadLocalRandom.getProbe()) == 0 ) {

         ThreadLocalRandom.localInit();

         r = ThreadLocalRandom.getProbe();

     }

     for (;;) {

         WorkQueue[] ws; WorkQueue q; int rs, m, k;

         boolean move = false ;

         if ((rs = runState) < 0 ) {

             tryTerminate( false , false );     // help terminate

             throw new RejectedExecutionException();

         }

         else if ((rs & STARTED) == 0 ||     // initialize

                  ((ws = workQueues) == null || (m = ws.length - 1 ) < 0 )) {

             int ns = 0 ;

             rs = lockRunState();

             try {

                 if ((rs & STARTED) == 0 ) {

                     U.compareAndSwapObject( this , STEALCOUNTER, null ,

                                            new AtomicLong());

                     // create workQueues array with size a power of two

                     int p = config & SMASK; // ensure at least 2 slots

                     int n = (p > 1 ) ? p - 1 : 1 ;

                     n |= n >>> 1 ; n |= n >>> 2 ;  n |= n >>> 4 ;

                     n |= n >>> 8 ; n |= n >>> 16 ; n = (n + 1 ) << 1 ;

                     workQueues = new WorkQueue[n];

                     ns = STARTED;

                 }

             } finally {

                 unlockRunState(rs, (rs & ~RSLOCK) | ns);

             }

         }

         else if ((q = ws[k = r & m & SQMASK]) != null ) {

             if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0 , 1 )) {

                 ForkJoinTask<?>[] a = q.array;

                 int s = q.top;

                 boolean submitted = false ; // initial submission or resizing

                 try {                      // locked version of push

                     if ((a != null && a.length > s + 1 - q.base) ||

                         (a = q.growArray()) != null ) {

                         int j = (((a.length - 1 ) & s) << ASHIFT) + ABASE;

                         U.putOrderedObject(a, j, task);

                         U.putOrderedInt(q, QTOP, s + 1 );

                         submitted = true ;

                     }

                 } finally {

                     U.compareAndSwapInt(q, QLOCK, 1 , 0 );

                 }

                 if (submitted) {

                     signalWork(ws, q);

                     return ;

                 }

             }

             move = true ;                   // move on failure

         }

         else if (((rs = runState) & RSLOCK) == 0 ) { // create new queue

             q = new WorkQueue( this , null );

             q.hint = r;

             q.config = k | SHARED_QUEUE;

             q.scanState = INACTIVE;

             rs = lockRunState();           // publish index

             if (rs > 0 &&  (ws = workQueues) != null &&

                 k < ws.length && ws[k] == null )

                 ws[k] = q;                 // else terminated

             unlockRunState(rs, rs & ~RSLOCK);

         }

         else

             move = true ;                   // move if busy

         if (move)

             r = ThreadLocalRandom.advanceProbe(r);

     }

}

创建工人(线程)

提交任务后,通过 signalWork (ws, q)方法,发送工作信号,当符合没有执行完毕,且没有出现异常的条件下,循环执行任务,根据控制变量尝试添加工人(线程),通过线程工厂,生成线程,并且启动线程,也控制着工人(线程)的下岗。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

final void signalWork(WorkQueue[] ws, WorkQueue q) {

      long c; int sp, i; WorkQueue v; Thread p;

      while ((c = ctl) < 0L) {                       // too few active

          if ((sp = ( int )c) == 0 ) {                  // no idle workers

              if ((c & ADD_WORKER) != 0L)            // too few workers

                  tryAddWorker(c);

              break ;

          }

          if (ws == null )                            // unstarted/terminated

              break ;

          if (ws.length <= (i = sp & SMASK))         // terminated

              break ;

          if ((v = ws[i]) == null )                   // terminating

              break ;

          int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState

          int d = sp - v.scanState;                  // screen CAS

          long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);

          if (d == 0 && U.compareAndSwapLong( this , CTL, c, nc)) {

              v.scanState = vs;                      // activate v

              if ((p = v.parker) != null )

                  U.unpark(p);

              break ;

          }

          if (q != null && q.base == q.top)          // no more work

              break ;

      }

  }

 

  private void tryAddWorker( long c) {

      boolean add = false ;

      do {

          long nc = ((AC_MASK & (c + AC_UNIT)) |

                     (TC_MASK & (c + TC_UNIT)));

          if (ctl == c) {

              int rs, stop;                 // check if terminating

              if ((stop = (rs = lockRunState()) & STOP) == 0 )

                  add = U.compareAndSwapLong( this , CTL, c, nc);

              unlockRunState(rs, rs & ~RSLOCK);

              if (stop != 0 )

                  break ;

              if (add) {

                  createWorker();

                  break ;

              }

          }

      } while (((c = ctl) & ADD_WORKER) != 0L && ( int )c == 0 );

  }

 

  private boolean createWorker() {

      ForkJoinWorkerThreadFactory fac = factory;

      Throwable ex = null ;

      ForkJoinWorkerThread wt = null ;

      try {

          if (fac != null && (wt = fac.newThread( this )) != null ) {

              wt.start();

              return true ;

          }

      } catch (Throwable rex) {

          ex = rex;

      }

      deregisterWorker(wt, ex);

      return false ;

  }

 

final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {

      WorkQueue w = null ;

      if (wt != null && (w = wt.workQueue) != null ) {

          WorkQueue[] ws;                           // remove index from array

          int idx = w.config & SMASK;

          int rs = lockRunState();

          if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)

              ws[idx] = null ;

          unlockRunState(rs, rs & ~RSLOCK);

      }

      long c;                                       // decrement counts

      do {} while (!U.compareAndSwapLong

                   ( this , CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |

                                         (TC_MASK & (c - TC_UNIT)) |

                                         (SP_MASK & c))));

      if (w != null ) {

          w.qlock = - 1 ;                             // ensure set

          w.transferStealCount( this );

          w.cancelAll();                            // cancel remaining tasks

      }

      for (;;) {                                    // possibly replace

          WorkQueue[] ws; int m, sp;

          if (tryTerminate( false , false ) || w == null || w.array == null ||

              (runState & STOP) != 0 || (ws = workQueues) == null ||

              (m = ws.length - 1 ) < 0 )              // already terminating

              break ;

          if ((sp = ( int )(c = ctl)) != 0 ) {         // wake up replacement

              if (tryRelease(c, ws[sp & m], AC_UNIT))

                  break ;

          }

          else if (ex != null && (c & ADD_WORKER) != 0L) {

              tryAddWorker(c);                      // create replacement

              break ;

          }

          else                                       // don't need replacement

              break ;

      }

      if (ex == null )                               // help clean on way out

          ForkJoinTask.helpExpungeStaleExceptions();

      else                                           // rethrow

          ForkJoinTask.rethrow(ex);

  }

 

  public static interface ForkJoinWorkerThreadFactory {

      public ForkJoinWorkerThread newThread(ForkJoinPool pool);

  }

  static final class DefaultForkJoinWorkerThreadFactory

      implements ForkJoinWorkerThreadFactory {

      public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {

          return new ForkJoinWorkerThread(pool);

      }

  }

  protected ForkJoinWorkerThread(ForkJoinPool pool) {

      // Use a placeholder until a useful name can be set in registerWorker

      super ( "aForkJoinWorkerThread" );

      this .pool = pool;

      this .workQueue = pool.registerWorker( this );

  }

 

  final WorkQueue registerWorker(ForkJoinWorkerThread wt) {

      UncaughtExceptionHandler handler;

      wt.setDaemon( true );                           // configure thread

      if ((handler = ueh) != null )

          wt.setUncaughtExceptionHandler(handler);

      WorkQueue w = new WorkQueue( this , wt);

      int i = 0 ;                                    // assign a pool index

      int mode = config & MODE_MASK;

      int rs = lockRunState();

      try {

          WorkQueue[] ws; int n;                    // skip if no array

          if ((ws = workQueues) != null && (n = ws.length) > 0 ) {

              int s = indexSeed += SEED_INCREMENT;  // unlikely to collide

              int m = n - 1 ;

              i = ((s << 1 ) | 1 ) & m;               // odd-numbered indices

              if (ws[i] != null ) {                  // collision

                  int probes = 0 ;                   // step by approx half n

                  int step = (n <= 4 ) ? 2 : ((n >>> 1 ) & EVENMASK) + 2 ;

                  while (ws[i = (i + step) & m] != null ) {

                      if (++probes >= n) {

                          workQueues = ws = Arrays.copyOf(ws, n <<= 1 );

                          m = n - 1 ;

                          probes = 0 ;

                      }

                  }

              }

              w.hint = s;                           // use as random seed

              w.config = i | mode;

              w.scanState = i;                      // publication fence

              ws[i] = w;

          }

      } finally {

          unlockRunState(rs, rs & ~RSLOCK);

      }

      wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1 )));

      return w;

  }

例:ForkJoinTask实现归并排序

这里我们就用经典的归并排序为例,构建一个我们自己的ForkJoinTask,按照归并排序的思路,重写其核心的compute()方法,通过ForkJoinPool.submit(task)提交任务,通过get()同步获取任务执行结果。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

package com.zhj.interview;

 

import java.util.*;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ForkJoinPool;

import java.util.concurrent.RecursiveTask;

 

public class Test16 {

 

     public static void main(String[] args) throws ExecutionException, InterruptedException {

         int [] bigArr = new int [ 10000000 ];

         for ( int i = 0 ; i < 10000000 ; i++) {

             bigArr[i] = ( int ) (Math.random() * 10000000 );

         }

         ForkJoinPool forkJoinPool = new ForkJoinPool();

         MyForkJoinTask task = new MyForkJoinTask(bigArr);

         long start = System.currentTimeMillis();

         forkJoinPool.submit(task).get();

         long end = System.currentTimeMillis();

         System.out.println( "耗时:" + (end-start));

     }

 

}

class MyForkJoinTask extends RecursiveTask< int []> {

 

     private int source[];

 

     public MyForkJoinTask( int source[]) {

         if (source == null ) {

             throw new RuntimeException( "参数有误!!!" );

         }

         this .source = source;

     }

 

     @Override

     protected int [] compute() {

         int l = source.length;

         if (l < 2 ) {

             return Arrays.copyOf(source, l);

         }

         if (l == 2 ) {

             if (source[ 0 ] > source[ 1 ]) {

                 int [] tar = new int [ 2 ];

                 tar[ 0 ] = source[ 1 ];

                 tar[ 1 ] = source[ 0 ];

                 return tar;

             } else {

                 return Arrays.copyOf(source, l);

             }

         }

         if (l > 2 ) {

             int mid = l / 2 ;

             MyForkJoinTask task1 = new MyForkJoinTask(Arrays.copyOf(source, mid));

             task1.fork();

             MyForkJoinTask task2 = new MyForkJoinTask(Arrays.copyOfRange(source, mid, l));

             task2.fork();

             int [] res1 = task1.join();

             int [] res2 = task2.join();

             int tar[] = merge(res1, res2);

             return tar;

         }

         return null ;

     }

     // 合并数组

     private int [] merge( int [] res1, int [] res2) {

         int l1 = res1.length;

         int l2 = res2.length;

         int l = l1 + l2;

         int tar[] = new int [l];

         for ( int i = 0 , i1 = 0 , i2 = 0 ; i < l; i++) {

             int v1 = i1 >= l1 ? Integer.MAX_VALUE : res1[i1];

             int v2 = i2 >= l2 ? Integer.MAX_VALUE : res2[i2];

             // 如果条件成立,说明应该取数组array1中的值

             if (v1 < v2) {

                 tar[i] = v1;

                 i1++;

             } else {

                 tar[i] = v2;

                 i2++;

             }

         }

         return tar;

     }

}

ForkJoin计算流程

通过ForkJoinPool提交任务,获取结果流程如下,拆分子任务不一定是二分的形式,可参照MapReduce的模式,也可以按照具体需求进行灵活的设计。

到此这篇关于一文带你了解Java中的ForkJoin的文章就介绍到这了,更多相关Java中的ForkJoin内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://juejin.cn/post/7079612671850446861

查看更多关于一文带你了解Java中的ForkJoin的详细内容...

  阅读:13次