好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Java8通过CompletableFuture实现异步回调

前言:

java5为我们提供了 Callable 和 Future ,使我们可以很容易的完成异步任务结果的获取,但是通过Future的get获取异步任务结果会导致主线程的阻塞,这样在某些场景下是非常消耗CPU资源的,进而Java8为我们提供了 CompletableFuture ,使我们可以轻松完成异步任务的回调。

1 什么是CompletableFuture?

CompletableFuture 是Java 8 中新增的一个类,它是对Future接口的扩展。从下方的类继承关系图中我们看到其不仅实现了Future接口,还有CompletionStage接口,当Future需要显示地完成时,可以使用CompletionStage接口去支持完成时触发的函数和操作,当2个以上线程同时尝试完成、异常完成、取消一个CompletableFuture时,只有一个能成功。

CompletableFuture 主要作用就是简化我们异步编程的复杂性,支持函数式编程,可以通过回调的方式处理计算结果。

2 为什么会有CompletableFuture ?

在java5中,JDK为我们提供了Callable和Future,使我们可以很容易的完成异步任务结果的获取,但是通过Future的get获取异步任务结果会导致主线程的阻塞,这样在某些场景下是非常消耗CPU资源的,进而Java8为我们提供了CompletableFuture,使我们无需阻塞等待,而是通过回调的方式去处理结果,并且还支持流式处理、组合异步任务等操作。

如果不熟悉 Callable 和 Future 的,可以看小编之前更新的这篇文章 Java从源码看异步任务计算FutureTask

3 CompletableFuture 简单使用

下面我们就CompletableFuture 的使用进行简单分类:

创建任务:

supplyAsync/runAsync

异步回调:

thenApply/thenAccept/thenRun thenApplyAsync/thenAcceptAsync/thenRunAsync exceptionally handle/whenComplete

组合处理:

thenCombine / thenAcceptBoth / runAfterBoth applyToEither / acceptEither / runAfterEither thenCompose allOf / anyOf

具体内容请参照以下案例:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

public static void main(String[] args) throws Exception {

     // 1.带返回值的异步任务(不指定线程池,默认ForkJoinPool测试数据monPool(),单核ThreadPerTaskExecutor)

     CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {

         return 1 + 1 ;

     });

     System.out.println( "cf1 result: " + cf1.get());

     // 2.无返回值的异步任务(不指定线程池,默认ForkJoinPool测试数据monPool(),单核ThreadPerTaskExecutor)

     CompletableFuture cf2 = CompletableFuture.runAsync(() -> {

         int a = 1 + 1 ;

     });

     System.out.println( "cf2 result: " + cf2.get());

     // 3.指定线程池的带返回值的异步任务,runAsync同理

     CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(() -> {

         return 1 + 1 ;

     }, Executors.newCachedThreadPool());

     System.out.println( "cf3 result: " + cf3.get());

     // 4.回调,任务执行完成后执行的动作

     CompletableFuture<Integer> cf4 = cf1.thenApply((result) -> {

         System.out.println( "cf4回调拿到cf1的结果 result : " + result);

         return result + 1 ;

     });

     System.out.println( "cf4 result: " + cf4.get());

     // 5.异步回调(将回调任务提交到线程池),任务执行完成后执行的动作后异步执行

     CompletableFuture<Integer> cf5 = cf1.thenApplyAsync((result) -> {

         System.out.println( "cf5回调拿到cf1的结果 result : " + result);

         return result + 1 ;

     });

     System.out.println( "cf5 result: " + cf5.get());

     // 6.回调(同thenApply但无返回结果),任务执行完成后执行的动作

     CompletableFuture cf6 = cf1.thenAccept((result) -> {

         System.out.println( "cf6回调拿到cf1的结果 result : " + result);

     });

     System.out.println( "cf6 result: " + cf6.get());

     // 7.回调(同thenAccept但无入参),任务执行完成后执行的动作

     CompletableFuture cf7 = cf1.thenRun(() -> {

     });

     System.out.println( "cf7 result: " + cf7.get());

     // 8.异常回调,任务执行出现异常后执行的动作

     CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> {

         throw new RuntimeException( "出现异常" );

     });

     CompletableFuture<Integer> cf8 = cf.exceptionally((result) -> {

         return - 1 ;

     });

     System.out.println( "cf8 result: " + cf8.get());

     // 9.当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法

     //   如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致;

     //   如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。

     CompletableFuture<Integer> cf9 = cf1.handle((a, b) -> {

         if (b != null ) {

             b.printStackTrace();

         }

         return a;

     });

     System.out.println( "cf9 result: " + cf9.get());

     // 10 与handle类似,无返回值

     try {

         CompletableFuture<Integer> cf10 = cf.whenComplete((a, b) -> {

             if (b != null ) {

                 b.printStackTrace();

             }

         });

         System.out.println( "cf10 result: " + cf10.get());

     } catch (Exception e) {

         System.out.println( "cf10 出现异常!!!" );

     }

     // 11 组合处理(两个都完成,然后执行)有入参,有返回值

     CompletableFuture<Integer> cf11 = cf1.thenCombine(cf3, (r1, r2) -> {

         return r1 + r2;

     });

     System.out.println( "cf11 result: " + cf11.get());

     // 12 组合处理(两个都完成,然后执行)有入参,无返回值

     CompletableFuture cf12 = cf1.thenAcceptBoth(cf3, (r1, r2) -> {

     });

     System.out.println( "cf12 result: " + cf12.get());

     // 13 组合处理(两个都完成,然后执行)无入参,无返回值

     CompletableFuture cf13 = cf1.runAfterBoth(cf3, () -> {

     });

     System.out.println( "cf13 result: " + cf13.get());

     // 14 组合处理(有一个完成,然后执行)有入参,有返回值

     CompletableFuture<Integer> cf14 = cf1.applyToEither(cf3, (r) -> {

         return r;

     });

     System.out.println( "cf14 result: " + cf14.get());

     // 15 组合处理(有一个完成,然后执行)有入参,无返回值

     CompletableFuture cf15 = cf1.acceptEither(cf3, (r) -> {

     });

     System.out.println( "cf15 result: " + cf15.get());

     // 16 组合处理(有一个完成,然后执行)无入参,无返回值

     CompletableFuture cf16 = cf1.runAfterEither(cf3, () -> {

     });

     System.out.println( "cf16 result: " + cf16.get());

     // 17 方法执行后返回一个新的CompletableFuture

     CompletableFuture<Integer> cf17 = cf1.thenCompose((r) -> {

         return CompletableFuture.supplyAsync(() -> {

             return 1 + 1 ;

         });

     });

     System.out.println( "cf17 result: " + cf17.get());

     // 18 多个任务都执行成功才会继续执行

     CompletableFuture.allOf(cf1,cf2,cf3).whenComplete((r, t) -> {

         System.out.println(r);

     });

     // 18 多个任务任意一个执行成功就会继续执行

     CompletableFuture.anyOf(cf1,cf2,cf3).whenComplete((r, t) -> {

         System.out.println(r);

     });

}

4 CompletableFuture 源码分析

首先我们可以从注释中看到,它对 CompletionStage 、 Future 接口扩展的一些描述,这些也是它的一些重点。

除了直接操作状态和结果的相关方法外, CompletableFuture 还实现了 CompletionStage 接口的如下策略:

(1)为非异步方法的依赖完成提供的操作,可以由完成当前 CompletableFuture 的线程执行,也可以由完成方法的任何其他调用方执行。 (2)所有没有显式Executor参数的异步方法都使用 ForkJoinPool测试数据monPool() 执行(除非它不支持至少两个并行级别,在这种情况下,将创建一个新线程来运行每个任务)。为了简化监视、调试和跟踪,所有生成的异步任务都是CompletableFuture的实例,异步完成任务。

不了解 ForkJoinPool 的可以阅读小编之前更新的这篇文章 一文带你了解Java中的ForkJoin 。

(3)所有 CompletionStage 方法都是独立于其他公共方法实现的,因此一个方法的行为不会受到子类中其他方法重写的影响。

CompletableFuture实现了Future接口的如下策略:

因为(与FutureTask不同)这个类对导致它完成的计算没有直接控制权,所以取消被视为另一种形式的异常完成,所以cancel操作被视为是另一种异常完成形式(new CancellationException()具有相同的效果。)。方法 isCompletedExceptionally() 可以用来确定一个CompletableFuture是否以任何异常的方式完成。 如果异常完成时出现 CompletionException ,方法get()和get(long,TimeUnit)会抛出一个 ExecutionException ,其原因与相应CompletionException中的原因相同。为了简化在大多数上下文中的使用,该类还定义了join()和getNow()方法,在这些情况下直接抛出CompletionException。

4.1 创建异步任务

我们先看一下 CompletableFuture 是如何创建异步任务的,我们可以看到起创建异步任务的核心实现是两个入参,一个入参是Executor,另一个入参是Supplier(函数式编程接口)。其中也提供了一个入参的重载,一个入参的重载方法会获取默认的Executor,当系统是单核的会使用ThreadPerTaskExecutor,多核时使用 ForkJoinPool测试数据monPool() 。

注意: 这里默认ForkJoinPool测试数据monPool()线程池,如果所有异步任务都使用该线程池话,出现问题不容易定位,如果长时间占用该线程池可能影响其他业务的正常操作,stream的并行流也是使用的该线程池。

其中还封装了静态内部类 AsyncSupply ,该类代表这个异步任务,实现了 Runnable ,重写了run方法。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

private static final Executor asyncPool = useCommonPool ?

     ForkJoinPool测试数据monPool() : new ThreadPerTaskExecutor();

 

private static final boolean useCommonPool =

     (ForkJoinPool.getCommonPoolParallelism() > 1 );

 

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {

     return asyncSupplyStage(asyncPool, supplier);

}

 

static <U> CompletableFuture<U> asyncSupplyStage(Executor e,

                                                  Supplier<U> f) {

     if (f == null ) throw new NullPointerException();

     CompletableFuture<U> d = new CompletableFuture<U>();

     e.execute( new AsyncSupply<U>(d, f));

     return d;

}

 

/**

  * 静态内部类,继承了ForkJoinTask<Void>、实现了Runnable、AsynchronousCompletionTask

  */

static final class AsyncSupply<T> extends ForkJoinTask<Void>

         implements Runnable, AsynchronousCompletionTask {

     CompletableFuture<T> dep; Supplier<T> fn;

     AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {

         this .dep = dep; this .fn = fn;

     }

 

     public final Void getRawResult() { return null ; }

     public final void setRawResult(Void v) {}

     public final boolean exec() { run(); return true ; }

 

     public void run() {

         CompletableFuture<T> d; Supplier<T> f;

         if ((d = dep) != null && (f = fn) != null ) {

             dep = null ; fn = null ;

             if (d.result == null ) {

                 try {

                     d测试数据pleteValue(f.get());

                 } catch (Throwable ex) {

                     d测试数据pleteThrowable(ex);

                 }

             }

             d.postComplete();

         }

     }

}

Supplier类是一个函数式的接口, @FunctionalInterface 注解就是函数式编程的标记。

?

1

2

3

4

5

6

7

package java.util.function;

 

@FunctionalInterface

public interface Supplier<T> {

 

     T get();

}

4.2 异步任务回调

异步任务回调,我们以 thenApply/thenApplyAsync 为例来看一下其实现原理,方法名含有Async的会传入asyncPool。 uniApplyStage 方法通过判断e是否有值,来区分是从哪个方法进来的。 thenApply 不会传入 Executor,它优先让当前线程来执行后续 stage 的任务。

当发现前一个 stage 已经执行完毕时,直接让当前线程来执行后续 stage 的 task。 当发现前一个 stage 还没执行完毕时,则把当前 stage 包装成一个 UniApply 对象,放到前一个 stage 的栈中。执行前一个 stage 的线程,执行完毕后,接着执行后续 stage 的 task。

thenApplyAsync 会传入一个 Executor,它总是让 Executor 线程池里面的线程来执行后续 stage 的任务。

把当前 stage 包装成一个 UniApply 对象,放到前一个 stage 的栈中,直接让 Executor 来执行。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

public <U> CompletableFuture<U> thenApply(

     Function<? super T,? extends U> fn) {

     return uniApplyStage( null , fn);

}

 

public <U> CompletableFuture<U> thenApplyAsync(

     Function<? super T,? extends U> fn) {

     return uniApplyStage(asyncPool, fn);

}

 

private <V> CompletableFuture<V> uniApplyStage(

     Executor e, Function<? super T,? extends V> f) {

     if (f == null ) throw new NullPointerException();

     CompletableFuture<V> d =  new CompletableFuture<V>();

     // Async直接进入,不是Async执行uniApply尝试获取结果

     if (e != null || !d.uniApply( this , f, null )) {

         UniApply<T,V> c = new UniApply<T,V>(e, d, this , f);

         push(c);

         c.tryFire(SYNC);

     }

     return d;

}

 

final <S> boolean uniApply(CompletableFuture<S> a,

                            Function<? super S,? extends T> f,

                            UniApply<S,T> c) {

     Object r; Throwable x;

     // 判断当前CompletableFuture是否已完成,如果没完成则返回false;如果完成了则执行下面的逻辑。

     if (a == null || (r = a.result) == null || f == null )

         return false ;

     tryComplete: if (result == null ) {

         // 判断任务结果是否是AltResult类型

         if (r instanceof AltResult) {

             if ((x = ((AltResult)r).ex) != null ) {

                 completeThrowable(x, r);

                 break tryComplete;

             }

             r = null ;

         }

         try {

             // 判断当前任务是否可以执行

             if (c != null && !c.claim())

                 return false ;

             // 获取任务结果

             @SuppressWarnings ( "unchecked" ) S s = (S) r;

             // 执行

             completeValue(f.apply(s));

         } catch (Throwable ex) {

             completeThrowable(ex);

         }

     }

     return true ;

}

 

static final class UniApply<T,V> extends UniCompletion<T,V> {

     Function<? super T,? extends V> fn;

     UniApply(Executor executor, CompletableFuture<V> dep,

              CompletableFuture<T> src,

              Function<? super T,? extends V> fn) {

         super (executor, dep, src); this .fn = fn;

     }

     final CompletableFuture<V> tryFire( int mode) {

         CompletableFuture<V> d; CompletableFuture<T> a;

         if ((d = dep) == null ||

             !d.uniApply(a = src, fn, mode > 0 ? null : this ))

             return null ;

         dep = null ; src = null ; fn = null ;

         return d.postFire(a, mode);

     }

}

 

final void push(UniCompletion<?,?> c) {

     if (c != null ) {

         while (result == null && !tryPushStack(c))

             lazySetNext(c, null ); // clear on failure

     }

}

 

final boolean completeValue(T t) {

     return UNSAFE测试数据pareAndSwapObject( this , RESULT, null ,

                                        (t == null ) ? NIL : t);

}

4.3 异步任务组合

我们再 thenCombine 方法为例看一下 CompletableFuture 是如何处理组合任务的,我们可以看到thenCombine的源码与thenApply的源码基本上是一直的,只不过组合的时候不仅仅是判断一个,需要集合具体场景,判断多个 CompletableFuture 。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

public <U,V> CompletableFuture<V> thenCombine(

     CompletionStage<? extends U> other,

     BiFunction<? super T,? super U,? extends V> fn) {

     return biApplyStage( null , other, fn);

}

 

private <U,V> CompletableFuture<V> biApplyStage(

     Executor e, CompletionStage<U> o,

     BiFunction<? super T,? super U,? extends V> f) {

     CompletableFuture<U> b;

     if (f == null || (b = o.toCompletableFuture()) == null )

         throw new NullPointerException();

     CompletableFuture<V> d = new CompletableFuture<V>();

     if (e != null || !d.biApply( this , b, f, null )) {

         BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this , b, f);

         bipush(b, c);

         c.tryFire(SYNC);

     }

     return d;

}

 

final <R,S> boolean biApply(CompletableFuture<R> a,

                             CompletableFuture<S> b,

                             BiFunction<? super R,? super S,? extends T> f,

                             BiApply<R,S,T> c) {

     Object r, s; Throwable x;

     // 此处不止要判断a还得判断b

     if (a == null || (r = a.result) == null ||

         b == null || (s = b.result) == null || f == null )

         return false ;

     tryComplete: if (result == null ) {

         if (r instanceof AltResult) {

             if ((x = ((AltResult)r).ex) != null ) {

                 completeThrowable(x, r);

                 break tryComplete;

             }

             r = null ;

         }

         // 这里不止判断a的结果r还要判断b的结果s

         if (s instanceof AltResult) {

             if ((x = ((AltResult)s).ex) != null ) {

                 completeThrowable(x, s);

                 break tryComplete;

             }

             s = null ;

         }

         // 最后将rr, ss传入

         try {

             if (c != null && !c.claim())

                 return false ;

             @SuppressWarnings ( "unchecked" ) R rr = (R) r;

             @SuppressWarnings ( "unchecked" ) S ss = (S) s;

             completeValue(f.apply(rr, ss));

         } catch (Throwable ex) {

             completeThrowable(ex);

         }

     }

     return true ;

}

 

static final class BiApply<T,U,V> extends BiCompletion<T,U,V> {

     BiFunction<? super T,? super U,? extends V> fn;

     BiApply(Executor executor, CompletableFuture<V> dep,

             CompletableFuture<T> src, CompletableFuture<U> snd,

             BiFunction<? super T,? super U,? extends V> fn) {

         super (executor, dep, src, snd); this .fn = fn;

     }

     // tryFire方法也同样的多可个b

     final CompletableFuture<V> tryFire( int mode) {

         CompletableFuture<V> d;

         CompletableFuture<T> a;

         CompletableFuture<U> b;

         if ((d = dep) == null ||

             !d.biApply(a = src, b = snd, fn, mode > 0 ? null : this ))

             return null ;

         dep = null ; src = null ; snd = null ; fn = null ;

         return d.postFire(a, b, mode);

     }

}

到此这篇关于Java8通过 CompletableFuture 实现异步回调的文章就介绍到这了,更多相关Java8异步回调内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://juejin.cn/post/7088978415298019364

查看更多关于Java8通过CompletableFuture实现异步回调的详细内容...

  阅读:15次