好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Java从源码看异步任务计算FutureTask

前言:

大家是否熟悉FutureTask呢?或者说你有没有异步计算的需求呢?FutureTask就能够很好的帮助你实现异步计算,并且可以实现同步获取异步任务的计算结果。下面我们就一起从源码分析一下FutureTask。

了解一下什么是FutureTask?

FutureTask 是一个可取消的异步计算。

FutureTask 提供了对Future的基本实现,可以调用方法去开始和取消一个计算,可以查询计算是否完成,并且获取计算结果。

FutureTask只能在计算完成后获取到计算结果, 一旦计算完成,将不能重启或者取消,除非调用runAndReset方法。

FutureTask除了实现了Future接口以外,还实现了 Runnable 接口,因此FutureTask是可以交由线程池的Executor执行,也可以直接使用一个异步线程调用执行(futureTask.run())。

FutureTask 是如何实现的呢?

首先,我们看一下 FutureTask 类的继承结构,如下图,它实现的是 RunnableFuture 接口,而 RunnableFuture 继承自Future和函数式接口 Runnable ,所以说FutureTask本质就是一个可运行的Future。

Future 接口约定了一些异步计算类必须要实现的功能,源码如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

package java.util.concurrent;

public interface Future<V> {

     /**

     * 尝试取消任务的执行,并返回取消结果。

     * 参数mayInterruptIfRunning:是否中断线程。

     */

     boolean cancel( boolean mayInterruptIfRunning);

     /**

     * 判断任务是否被取消(正常结束之前被被取消返回true)

     */

     boolean isCancelled();

     /**

     * 判断当前任务是否执行完毕,包括正常执行完毕、执行异常或者任务取消。

     */

     boolean isDone();

     /**

     * 获取任务执行结果,任务结束之前会阻塞。

     */

     V get() throws InterruptedException, ExecutionException;

     /**

     * 在指定时间内尝试获取执行结果。若超时则抛出超时异常TimeoutException

     */

     V get( long timeout, TimeUnit unit)

         throws InterruptedException, ExecutionException, TimeoutException;

}

Runnable 接口我们都很熟悉,他就是一个函数式接口,我们常用其创建一个线程。

?

1

2

3

4

5

6

7

package java.lang;

@FunctionalInterface

public interface Runnable {

    

     public abstract void run();

}

FutureTask就是一个将要被执行的任务,它包含了以上接口具体的实现,FutureTask内部定义了任务的状态state和一些状态的常量,它的内部核心是一个Callable callable,我们通过构造函数可以传入callable或者是runnable,最后都会内部转为callable,因为我们需要获取异步任务的执行结果,只有通过Callable创建的线程才会返回结果。

我们可以通过此时的状态判断Future中 isCancelled() , isDone() 的返回结果。

以下为FutureTask源码,内含核心源码分析注释

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

286

287

288

289

290

291

292

293

294

295

296

297

298

299

300

301

302

303

304

305

package java.util.concurrent;

import java.util.concurrent.locks.LockSupport;

public class FutureTask<V> implements RunnableFuture<V> {

     /**

      * 任务的运行状态

      */

     private volatile int state;

     private static final int NEW          = 0 ; // 新建

     private static final int COMPLETING   = 1 ; // 完成

     private static final int NORMAL       = 2 ; // 正常

     private static final int EXCEPTIONAL  = 3 ; // 异常

     private static final int CANCELLED    = 4 ; // 取消

     private static final int INTERRUPTING = 5 ; // 中断中

     private static final int INTERRUPTED  = 6 ; // 中断的

     private Callable<V> callable;

     /**

      * 返回结果

      */

     private Object outcome;

     private volatile Thread runner;

     private volatile WaitNode waiters;

     ...

     public FutureTask(Callable<V> callable) {

         if (callable == null )

             throw new NullPointerException();

         this .callable = callable;

         this .state = NEW;      

     }

     public FutureTask(Runnable runnable, V result) {

         this .callable = Executors.callable(runnable, result);

         this .state = NEW;      

     }

     public boolean isCancelled() {

         return state >= CANCELLED;

     }

     public boolean isDone() {

         return state != NEW;

     }

     /*

      * 取消任务实现

      *   如果任务还没有启动就调用了cancel(true),任务将永远不会被执行。

      *   如果任务已经启动,参数mayInterruptIfRunning将决定任务是否应该中断执行该任务的线程,以尝试中断该任务。

      *   如果任务任务已经取消、已经完成或者其他原因不能取消,尝试将失败。

      */

     public boolean cancel( boolean mayInterruptIfRunning) {

         if (!(state == NEW &&

               UNSAFE.compareAndSwapInt( this , stateOffset, NEW,

                   mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))

             return false ;

         try {    // in case call to interrupt throws exception

             if (mayInterruptIfRunning) {

                 try {

                     Thread t = runner;

                     if (t != null )

                         t.interrupt();

                 } finally { // final state

                     UNSAFE.putOrderedInt( this , stateOffset, INTERRUPTED);

                 }

             }

         } finally {

             finishCompletion();

         }

         return true ;

     }

         /*

      * 等待获取结果

      *   获取当前状态,判断是否执行完成。并且判断时间是否超时

      *   如果任务没有执行完成,就阻塞等待完成,若超时抛出超时等待异常。

      */

     public V get() throws InterruptedException, ExecutionException {

         int s = state;

         if (s <= COMPLETING)

             s = awaitDone( false , 0L);

         return report(s);

     }

   /*

      * 等待获取结果

      *   获取当前状态,判断是否执行完成。

      *   如果任务没有执行完成,就阻塞等待完成。

      */

     public V get( long timeout, TimeUnit unit)

         throws InterruptedException, ExecutionException, TimeoutException {

         if (unit == null )

             throw new NullPointerException();

         int s = state;

         if (s <= COMPLETING &&

             (s = awaitDone( true , unit.toNanos(timeout))) <= COMPLETING)

             throw new TimeoutException();

         return report(s);

     }

     /**

      * 根据状态判断返回结果还是异常

      */

     private V report( int s) throws ExecutionException {

         Object x = outcome;

         if (s == NORMAL)

             return (V)x;

         if (s >= CANCELLED)

             throw new CancellationException();

         throw new ExecutionException((Throwable)x);

     }

     protected void done() { }

     /**

      * 设置结果借助CAS确认状态是否完成状态

      */

     protected void set(V v) {

         if (UNSAFE.compareAndSwapInt( this , stateOffset, NEW, COMPLETING)) {

             outcome = v;

             UNSAFE.putOrderedInt( this , stateOffset, NORMAL); // final state

             finishCompletion();

         }

     }

     /**

      * 设置异常,当运行完成出现异常,设置异常状态

      */

     protected void setException(Throwable t) {

         if (UNSAFE.compareAndSwapInt( this , stateOffset, NEW, COMPLETING)) {

             outcome = t;

             UNSAFE.putOrderedInt( this , stateOffset, EXCEPTIONAL); // final state

             finishCompletion();

         }

     }

     /*

      * 执行callable获取结果,或者异常

      *   判断状态是不是启动过的,如果是新建才可以执行run方法

      */

     public void run() {

         if (state != NEW ||

             !UNSAFE.compareAndSwapObject( this , runnerOffset,

                                          null , Thread.currentThread()))

             return ;

         try {

             Callable<V> c = callable;

             if (c != null && state == NEW) {

                 V result;

                 boolean ran;

                 try {

                     result = c.call();

                     ran = true ;

                 } catch (Throwable ex) {

                     result = null ;

                     ran = false ;

                     setException(ex);

                 }

                 if (ran)

                     set(result);

             }

         } finally {

             runner = null ;

             int s = state;

             if (s >= INTERRUPTING)

                 handlePossibleCancellationInterrupt(s);

         }

     }

     /**

      * 重新执行

      */

     protected boolean runAndReset() {

         if (state != NEW ||

             !UNSAFE.compareAndSwapObject( this , runnerOffset,

                                          null , Thread.currentThread()))

             return false ;

         boolean ran = false ;

         int s = state;

         try {

             Callable<V> c = callable;

             if (c != null && s == NEW) {

                 try {

                     c.call(); // don't set result

                     ran = true ;

                 } catch (Throwable ex) {

                     setException(ex);

                 }

             }

         } finally {

             runner = null ;

             s = state;

             if (s >= INTERRUPTING)

                 handlePossibleCancellationInterrupt(s);

         }

         return ran && s == NEW;

     }

     /*

      * 处理可能取消的中断

      */

     private void handlePossibleCancellationInterrupt( int s) {

         if (s == INTERRUPTING)

             while (state == INTERRUPTING)

                 Thread.yield();

     }

     static final class WaitNode {

         volatile Thread thread;

         volatile WaitNode next;

         WaitNode() { thread = Thread.currentThread(); }

     }

     /**

      * 移除并唤醒所有等待线程,执行done,置空callable

      */

     private void finishCompletion() {

         // assert state > COMPLETING;

         for (WaitNode q; (q = waiters) != null ;) {

             if (UNSAFE.compareAndSwapObject( this , waitersOffset, q, null )) {

                 for (;;) {

                     Thread t = q.thread;

                     if (t != null ) {

                         q.thread = null ;

                         LockSupport.unpark(t);

                     }

                     WaitNode next = q.next;

                     if (next == null )

                         break ;

                     q.next = null ; // unlink to help gc

                     q = next;

                 }

                 break ;

             }

         }

         done();

         callable = null ;        // to reduce footprint

     }

     /**

      * 等待完成

      * 首先判断是否超时

      * 处理中断的,然后处理异常状态的,处理完成的...

      */

     private int awaitDone( boolean timed, long nanos)

         throws InterruptedException {

         final long deadline = timed ? System.nanoTime() + nanos : 0L;

         WaitNode q = null ;

         boolean queued = false ;

         for (;;) {

             if (Thread.interrupted()) {

                 removeWaiter(q);

                 throw new InterruptedException();

             }

             int s = state;

             if (s > COMPLETING) {

                 if (q != null )

                     q.thread = null ;

                 return s;

             }

             else if (s == COMPLETING) // cannot time out yet

                 Thread.yield();

             else if (q == null )

                 q = new WaitNode();

             else if (!queued)

                 queued = UNSAFE.compareAndSwapObject( this , waitersOffset,

                                                      q.next = waiters, q);

             else if (timed) {

                 nanos = deadline - System.nanoTime();

                 if (nanos <= 0L) {

                     removeWaiter(q);

                     return state;

                 }

                 LockSupport.parkNanos( this , nanos);

             }

             else

                 LockSupport.park( this );

         }

     }

     /**

      * 去除等待

      */

     private void removeWaiter(WaitNode node) {

         if (node != null ) {

             node.thread = null ;

             retry:

             for (;;) {          // restart on removeWaiter race

                 for (WaitNode pred = null , q = waiters, s; q != null ; q = s) {

                     s = q.next;

                     if (q.thread != null )

                         pred = q;

                     else if (pred != null ) {

                         pred.next = s;

                         if (pred.thread == null ) // check for race

                             continue retry;

                     }

                     else if (!UNSAFE.compareAndSwapObject( this , waitersOffset,

                                                           q, s))

                         continue retry;

                 }

                 break ;

             }

         }

     }

     // Unsafe mechanics

     private static final sun.misc.Unsafe UNSAFE;

     private static final long stateOffset;

     private static final long runnerOffset;

     private static final long waitersOffset;

     static {

         try {

             UNSAFE = sun.misc.Unsafe.getUnsafe();

             Class<?> k = FutureTask. class ;

             stateOffset = UNSAFE.objectFieldOffset

                 (k.getDeclaredField( "state" ));

             runnerOffset = UNSAFE.objectFieldOffset

                 (k.getDeclaredField( "runner" ));

             waitersOffset = UNSAFE.objectFieldOffset

                 (k.getDeclaredField( "waiters" ));

         } catch (Exception e) {

             throw new Error(e);

         }

     }

}

FutureTask 运行流程

一般来说,我们可以认为FutureTask具有以下三种状态:

未启动: 新建的FutureTask,在run()没执行之前,FutureTask处于未启动状态。

?

1

private static final int NEW          = 0 ; // 新建

已启动 : FutureTask 对象的run方法启动并执行的过程中,FutureTask处于已启动状态。

已完成: FutureTask正常执行结束,或者 FutureTask 执行被取消(FutureTask对象cancel方法),或者FutureTask对象run方法执行抛出异常而导致中断而结束,FutureTask都处于已完成状态。

?

1

2

3

4

5

6

private static final int COMPLETING   = 1 ; // 完成

private static final int NORMAL       = 2 ; // 完成后正常设置结果

private static final int EXCEPTIONAL  = 3 ; // 完成后异常设置异常

private static final int CANCELLED    = 4 ; // 执行取消

private static final int INTERRUPTING = 5 ; // 中断中

private static final int INTERRUPTED  = 6 ; // 中断的

FutureTask 的使用

使用一(直接新建一个线程调用):

?

1

2

3

4

5

6

7

8

FutureTask<Integer> task = new FutureTask<>( new Callable() {

     @Override

     public Integer call() throws Exception {

         return sum();

     }

});

new Thread(task).stat();

Integer result = task.get();

使用二(结合线程池使用)

?

1

2

3

4

5

6

7

8

FutureTask<Integer> task = new FutureTask<>( new Callable() {

     @Override

     public Integer call() throws Exception {

         return sum();

     }

});

Executors.newCachedThreadPool().submit(task);

Integer result = task.get();

到此这篇关于Java从源码看异步任务计算FutureTask的文章就介绍到这了,更多相关Java FutureTask内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

原文链接:https://juejin.cn/post/7062980018208669709

查看更多关于Java从源码看异步任务计算FutureTask的详细内容...

  阅读:12次