好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Java并发编程之Condition源码分析(推荐)

condition介绍

上篇文章讲了reentrantlock的加锁和释放锁的使用,这篇文章是对reentrantlock的补充。reentrantlock#newcondition()可以创建condition,在reentrantlock加锁过程中可以利用condition阻塞当前线程并临时释放锁,待另外线程获取到锁并在逻辑后通知阻塞线程"激活"。condition常用在基于异步通信的同步机制实现中,比如dubbo中的请求和获取应答结果的实现。

常用方法

condition中主要的方法有2个

(1)await()方法可以阻塞当前线程,并释放锁。 (2)在获取锁后可以调用signal()通知被await()阻塞的线程"激活"。

这里的await(),signal()必须在reentrantlock#lock()和reentrantlock#unlock()之间调用。

condition实现分析

condition的实现也是利用abstractqueuedsynchronizer队列来实现,await()在被调用后先将当前线程加入到等待队列中,然后释放锁,最后阻塞当前线程。signal()在被调用后会先获取等待队列中第一个节点,并将这个节点转化成reentrantlock中的节点并加入到同步阻塞队列的结尾,这样此节点的上个节点线程释放锁后会激活此节点线程取来获取锁。

await()方法源码分析

await()源码如下

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

public final void await() throws interruptedexception {

         //判断是否当前线程是否被中断中断则抛出中断异常

       if (thread.interrupted())

         throw new interruptedexception();

         //加入等待队列

       node node = addconditionwaiter();

         //释放当前线程锁

       int savedstate = fullyrelease(node);

       int interruptmode = 0 ;

         //判断是否在同步阻塞队列,如果不在一直循环到被加入

       while (!isonsyncqueue(node)) {

         //阻塞当前线程

         locksupport.park( this );

         //判断是否被中断

         if ((interruptmode = checkinterruptwhilewaiting(node)) != 0 )

           break ;

       }

         //获取锁,如果获取中被中断则设置中断状态

       if (acquirequeued(node, savedstate) && interruptmode != throw_ie)

         interruptmode = reinterrupt;

         //清除等待队列中被"激活"的节点

       if (node.nextwaiter != null ) // clean up if cancelled

         unlinkcancelledwaiters();

         //如果当前线程被中断,处理中断逻辑

       if (interruptmode != 0 )

         reportinterruptafterwait(interruptmode);

     }

主要分以下几步

(1)先判断是否当前线程是否被中断中断则抛出中断异常如果未中断调用addconditionwaiter()加入等待队列 (2)调用fullyrelease(node)释放锁使同步阻塞队列的下个节点线程能获取锁。 (3)调用isonsyncqueue(node)判断是否在同步阻塞队列,这里的加入同步阻塞队列操作是在另一个线程调用signal()后加入,如果不在同步阻塞队列会进行阻塞直到被激活。 (4)如果被激活然后调用checkinterruptwhilewaiting(node)判断是否被中断并获取中断模式。 (5)继续调用isonsyncqueue(node)判断是否在同步阻塞队列。 (6)是则调用acquirequeued(node, savedstate) 获取锁,这里如果获取不到也会被阻塞,获取不到原因是在第一次调用isonsyncqueue(node)前,可能另一个线程已经调用signal()后加入到同步阻塞队列,然后调用acquirequeued(node, savedstate) 获取不到锁并阻塞。acquirequeued(node, savedstate)也会返回当前线程是否被中断,如果被中断设置中断模式。 (7)在激活后调用unlinkcancelledwaiters()清理等待队列的已经被激活的节点。 (8)最后判断当前线程是否被中断,如果被中断则对中断线程做处理。

下面来看下addconditionwaiter()实现

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

private node addconditionwaiter() {

         //获取等待队列尾部节点

       node t = lastwaiter;

       //如果尾部状态不为condition,如果已经被"激活",清理之,然后重新获取尾部节点

       if (t != null && t.waitstatus != node.condition) {

         unlinkcancelledwaiters();

         t = lastwaiter;

       }

         //创建以当前线程为基础的节点,并将节点模式设置成condition

       node node = new node(thread.currentthread(), node.condition);

         //如果尾节点不存在,说明队列为空,将头节点设置成当前节点

       if (t == null )

         firstwaiter = node;

         //如果尾节点存在,将此节点设置成尾节点的下个节点

       else

         t.nextwaiter = node;

         //将尾节点设置成当前节点

       lastwaiter = node;

       return node;

     }

addconditionwaiter()的逻辑很简单,就是创建以当前线程为基础的节点并把节点加入等待队列的尾部待其他线程处理。

下面来看下fullyrelease(node node)实现

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

final int fullyrelease(node node) {

     boolean failed = true ;

     try {

         //获取阻塞队列中当前线程节点的锁状态值

       int savedstate = getstate();

         //释放当前线程节点锁

       if (release(savedstate)) {

         failed = false ;

         return savedstate;

       } else {

         throw new illegalmonitorstateexception();

       }

     } finally {

         //释放失败讲节点等待状态设置成关闭

       if (failed)

         node.waitstatus = node.cancelled;

     }

   }

调用getstate()先获取阻塞队列中当前线程节点的锁状态值,这个值可能大于1表示多次重入,然后调用release(savedstate)释放所有锁,如果释放成功返回锁状态值。

下面来看下isonsyncqueue(node node)实现

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

final boolean isonsyncqueue(node node) {

         //判断当前节点是否是condition或者前置节点是否为空如果为空直接返回false

     if (node.waitstatus == node.condition || node.prev == null )

       return false ;

         //如果下个节点存在,则在同步阻塞队列中返回true

     if (node.next != null ) // if has successor, it must be on queue

       return true ;

         //遍历查找当前节点是否在同步阻塞队列中

     return findnodefromtail(node);

   }

   private boolean findnodefromtail(node node) {

     node t = tail;

     for (;;) {

       if (t == node)

         return true ;

       if (t == null )

         return false ;

       t = t.prev;

     }

   }

此方法的功能是查找当前节点是否在同步阻塞队列中,方法先是快速判断,判断不了再进行遍历查找。

(1)第一步先判断次节点是否condition状态或者前置节点是否存在,如果是表明不在队列中返回false,阻塞队列中的状态一般是0或者signal状态而且如果当前如果当前节点在队列阻塞中且未被激活前置节点一定不为空。 (2)第二步判断节点的下个节点是否存在,如果存在则表明当前当前节点已加入到阻塞队列中。 (3)如果以上2点都没法判断,也有可能刚刚加入到同步阻塞队列中,所以调用findnodefromtail(node node)做最后的遍历查找。查找从队列尾部开始查,从尾部开始查的原因是可能刚刚加入到同步阻塞队列中,从尾部能快速定位。

下面看下checkinterruptwhilewaiting(node node)实现

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

private int checkinterruptwhilewaiting(node node) {

       return thread.interrupted() ?

         (transferaftercancelledwait(node) ? throw_ie : reinterrupt) :

         0 ;

     }

 

final boolean transferaftercancelledwait(node node) {

     if (compareandsetwaitstatus(node, node.condition, 0 )) {

       enq(node);

       return true ;

     }

     while (!isonsyncqueue(node))

       thread.yield();

     return false ;

   }

此方法在线程被激活后被调用,主要功能就是判断被激活的线程是否被中断。此方法会返回2种中断状态throw_ie和reinterrupt,throw_ie是调用signal()前被中断返回,reinterrupt在调用signal()后被中断返回。 此方法先判断是否被标记中断,是的话再调用transferaftercancelledwait(node)取判断是那种中断状态,transferaftercancelledwait(node)方法分2步

(1)用cas方式将节点状态改错等待状态改成condition,并加入到同步阻塞队列中返回true (2)如果不能加入到同步阻塞队列就自旋一直等待加入

如果使用await()方法上面2步其实是没什么作用其最后一定会返回false,因为await()被激活只能调用 signal()方法,而signal()方法肯定已经将节点加入到同步阻塞队列中。所以以上逻辑是给await(long time, timeunit unit)等带超时激活方法用的。

acquirequeued(node, savedstate)方法再上一章节已经讲过这边就不重复了,下面分析下unlinkcancelledwaiters()方法

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

private void unlinkcancelledwaiters() {

         //获取等待队列头节点

       node t = firstwaiter;

       node trail = null ;

       while (t != null ) {

         //获取下个节点

         node next = t.nextwaiter;

         //如果状态不为condition说明已经加入阻塞队列需要清理掉

         if (t.waitstatus != node.condition) {

           t.nextwaiter = null ;

           if (trail == null )

             firstwaiter = next;

           else

             //获取下个节点

             trail.nextwaiter = next;

           if (next == null )

             lastwaiter = trail;

         }

         else

           trail = t;

         t = next;

       }

     }

此方法就是从头开始查找状态不为condition的节点并清理,状态不为condition节点说明此节点已经加入到阻塞队列,已经不需要维护。

下面来看下reportinterruptafterwait(interruptmode)方法

?

1

2

3

4

5

6

7

8

9

private void reportinterruptafterwait( int interruptmode)

       throws interruptedexception {

         //如果是throw_ie模式直接抛出异常

       if (interruptmode == throw_ie)

         throw new interruptedexception();

         //如果是reinterrupt模式标记线程中断由上层处理中断

       else if (interruptmode == reinterrupt)

         selfinterrupt();

     }

此方法处理中断逻辑。如果是throw_ie模式直接抛出异常,如果是reinterrupt模式标记线程中断由上层处理中断。

signal()方法源码分析

signal()源码如下

?

1

2

3

4

5

6

7

8

9

public final void signal() {

         //是否当前线程持有锁

       if (!isheldexclusively())

         throw new illegalmonitorstateexception();

       node first = firstwaiter;

         //通知"激活"头节点线程

       if (first != null )

         dosignal(first);

     }

先调用isheldexclusively()判断锁是否被当前线程持有,然后检查等待队列是否为空,不为空就是可以取第一个节点调用dosignal(first)去"激活",这里激活不是真正的激活而只是将节点加入到同步阻塞队列尾部,所以上下文中带""的激活都是这种解释。

下面看下isheldexclusively()实现

?

1

2

3

protected final boolean isheldexclusively() {

      return getexclusiveownerthread() == thread.currentthread();

    }

实现就是比较下当前线程和持有锁的线程是否同一个

下面看下dosignal(first)的实现

?

1

2

3

4

5

6

7

8

9

10

11

private void dosignal(node first) {

       do {

         //头指头后移一位,如果后面的节点为空,则将尾指头也指向空,说明队列为空了

         if ( (firstwaiter = first.nextwaiter) == null )

           lastwaiter = null ;

         //清空头节点的下个节点

         first.nextwaiter = null ;

         //如果"激活"失败者取下个继续,直到成功或者遍历完

       } while (!transferforsignal(first) &&

            (first = firstwaiter) != null );

     }

 此方法就是取当前头节点一直去尝试"激活",直到成功或者遍历完。

下面来看下transferforsignal(first)方法

?

1

2

3

4

5

6

7

8

9

10

11

12

final boolean transferforsignal(node node) {

         //将condition状态设置成0

     if (!compareandsetwaitstatus(node, node.condition, 0 ))

       return false ;

         //加入到同步阻塞队列

     node p = enq(node);

     int ws = p.waitstatus;

         //状态异常直接激活

     if (ws > 0 || !compareandsetwaitstatus(p, ws, node.signal))

       locksupport.unpark(node.thread);

     return true ;

   }

(1)此方法先先将condition状态设置成0,因为如果是condition状态加入到同步阻塞队列,激活的时候是不识别的。
(2)加入到同步阻塞队列的尾部。所以同步阻塞队列中前面如果有多个在排队,调用unlock()不会马上激活此节点。
(3)状态异常直接调用unpark激活,这边按理说如果状态异常情况下激活,await()在调用unlock()被激活后会进行相应的异常处理,但看await()代码没有处理则是正常执行。

这个方法主要就是把节点加入到同步阻塞队列的,真正的激活则是调用unlock()去处理。

以上所述是小编给大家介绍的java并发编程之condition源码分析详解整合,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对网站的支持!

原文链接:https://my.oschina.net/u/945573/blog/2995600

查看更多关于Java并发编程之Condition源码分析(推荐)的详细内容...

  阅读:11次