1 简介
Condition 中的 await() 方法相当于 Object 的 wait() 方法, Condition 中的 signal() 方法相当于 Object 的 notify() 方法, Condition 中的 signalAll() 相当于 Object 的 notifyAll() 方法。
不同的是, Object 中的 wait() , notify() , notifyAll() 方法是和"同步锁"( synchronized 关键字)捆绑使用的;而 Condition 是需要与"互斥锁"/"共享锁"捆绑使用的。
2 Condition的实现分析
Condition 是同步器 AbstractQueuedSynchronized 的内部类,因为 Condition 的操作需要获取相关的锁,所以作为同步器的内部类比较合理。每个 Condition 对象都包含着一个队列(等待队列),该队列是 Condition 对象实现等待/通知功能的关键。
等待队列
等待队列是一个 FIFO 的队列,队列的每一个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了 await() 方法,该线程就会释放锁、构造成节点进入等待队列并进入等待状态。
这里的节点定义也就是 AbstractQueuedSynchronizer.Node 的定义。
一个 Condition 包含一个等待队列, Condition 拥有首节点( firstWaiter )和尾节点( lastWaiter )。当前线程调用 Condition.await() 方法时,将会以当前线程构造节点,并将节点从尾部加入等待队列。
在 Object 的监视器模型上,一个对象拥有一个同步队列和等待队列,而 Lock (同步器)拥有一个同步队列和多个等待队列。
等待(await):AbstractQueuedLongSynchronizer中实现
调用 Condition 的 await() 方法,会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。
从队列的角度来看,相当于同步队列的首节点(获取了锁的节点)移动到 Condition 的等待队列中。
当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过 Condition.signal() 方法唤醒,而是对等待线程进行中断,则抛出 InterruptedException 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); long savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park( this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } |
Condition等待通知的本质
总的来说, Condition 的本质就是等待队列和同步队列的交互:
当一个持有锁的线程调用Condition.await()时,它会执行以下步骤:
构造一个新的等待队列节点加入到等待队列队尾
释放锁,也就是将它的同步队列节点从同步队列队首移除
自旋,直到它在等待队列上的节点移动到了同步队列(通过其他线程调用 signal() )或被中断
阻塞当前节点,直到它获取到了锁,也就是它在同步队列上的节点排队排到了队首。
当一个持有锁的线程调用 Condition.signal() 时, 它会执行以下操作:
从等待队列的队首开始,尝试对队首节点执行唤醒操作;如果节点 CANCELLED ,就尝试唤醒下一个节点;如果再 CANCELLED 则继续迭代。
对每个节点执行唤醒操作时,首先将节点加入同步队列,此时await()操作的步骤3的解锁条件就已经开启了。
然后分两种情况讨论:
如果先驱节点的状态为 CANCELLED(>0) 或设置先驱节点的状态为 SIGNAL 失败,那么就立即唤醒当前节点对应的线程,此时await()方法就会完成步骤3,进入步骤4.
如果成功把先驱节点的状态设置为了 SIGNAL ,那么就不立即唤醒了。等到先驱节点成为同步队列首节点并释放了同步状态后,会自动唤醒当前节点对应线程的,这时候 await() 的步骤3才执行完成,而且有很大概率快速完成步骤4.
通知(signal):AbstractQueuedLongSynchronizer中实现
调用 Condition 的 signal() 方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。
Condition 的 signalAll() 方法,相当于对等待队列中的每个节点均执行一次 signal() 方法,将等待队列中的节点全部移动到同步队列中,并唤醒每个节点的线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null ) doSignal(first); }
public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null ) doSignalAll(first); } |
最后还要注意, Java 中有 signal 和 signalAll 两种方法, signal 是随机解除一个等待集中的线程的阻塞状态, signalAll 是解除所有等待集中的线程的阻塞状态。 signal 方法的效率会比 signalAll 高,但是它存在危险,因为它一次只解除一个线程的阻塞状态,因此,如果等待集中有多个线程都满足了条件,也只能唤醒一个,其他的线程可能会导致死锁
3 Condition 实例
消费生产者模式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
public class ConditionTest { public static void main(String[] args) { // 仓库 Depot depot = new Depot( 100 ); // 消费者 Consumer consumer = new Consumer(depot); // 生产者 Produce produce = new Produce(depot); produce.produceThing( 5 ); consumer.consumerThing( 5 ); produce.produceThing( 2 ); consumer.consumerThing( 5 ); produce.produceThing( 3 ); } }
class Depot { private int capacity; private int size; private Lock lock; private Condition consumerCond; private Condition produceCond;
public Depot( int capacity) { this .capacity = capacity; this .size = 0 ; this .lock = new ReentrantLock(); this .consumerCond = lock.newCondition(); this .produceCond = lock.newCondition(); }
public void produce( int val) { lock.lock(); try { int left = val; while (left > 0 ) { while (size >= capacity) { produceCond.await(); } int produce = (left+size) > capacity ? (capacity-size) : left; size += produce; left -= produce; System.out.println(Thread.currentThread().getName() + ", ProduceVal=" + val + ", produce=" + produce + ", size=" + size); consumerCond.signalAll(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
public void consumer( int val) { lock.lock(); try { int left = val; while (left > 0 ) { while (size <= 0 ) { consumerCond.await(); } int consumer = (size <= left) ? size : left; size -= consumer; left -= consumer; System.out.println(Thread.currentThread().getName() + ", ConsumerVal=" + val + ", consumer=" + consumer + ", size=" + size); produceCond.signalAll(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } class Consumer { private Depot depot; public Consumer(Depot depot) { this .depot = depot; }
public void consumerThing( final int amount) { new Thread( new Runnable() { public void run() { depot.consumer(amount); } }).start(); } } class Produce { private Depot depot; public Produce(Depot depot) { this .depot = depot; }
public void produceThing( final int amount) { new Thread( new Runnable() { public void run() { depot.produce(amount); } }).start(); } } |
Thread-0, ProduceVal=5, produce=5, size=5
Thread-1, ConsumerVal=5, consumer=5, size=0
Thread-2, ProduceVal=2, produce=2, size=2
Thread-3, ConsumerVal=5, consumer=2, size=0
Thread-4, ProduceVal=3, produce=3, size=3
Thread-3, ConsumerVal=5, consumer=3, size=0
输出结果中, Thread-3 出现两次,就是因为要消费5个产品,但仓库中只有2个产品,所以先将库存的2个产品全部消费,然后这个线程进入等待队列,等待生产,随后生产出了3个产品,生产者生产后又执行 signalAll 方法将等待队列中所有的线程都唤醒, Thread-3 继续消费还需要的3个产品。
三个线程依次打印ABC
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
class Business { private Lock lock = new ReentrantLock(); private Condition conditionA = lock.newCondition(); private Condition conditionB = lock.newCondition(); private Condition conditionC = lock.newCondition(); private String type = "A" ; //内部状态
/* * 方法的基本要求为: * 1、该方法必须为原子的。 * 2、当前状态必须满足条件。若不满足,则等待;满足,则执行业务代码。 * 3、业务执行完毕后,修改状态,并唤醒指定条件下的线程。 */ public void printA() { lock.lock(); //锁,保证了线程安全。 try { while (type != "A" ) { //type不为A, try { conditionA.await(); //将当前线程阻塞于conditionA对象上,将被阻塞。 } catch (InterruptedException e) { e.printStackTrace(); } }
//type为A,则执行。 System.out.println(Thread.currentThread().getName() + " 正在打印A" ); type = "B" ; //将type设置为B。 conditionB.signal(); //唤醒在等待conditionB对象上的一个线程。将信号传递出去。 } finally { lock.unlock(); //解锁 } }
public void printB() { lock.lock(); //锁 try { while (type != "B" ) { //type不为B, try { conditionB.await(); //将当前线程阻塞于conditionB对象上,将被阻塞。 } catch (InterruptedException e) { e.printStackTrace(); } }
//type为B,则执行。 System.out.println(Thread.currentThread().getName() + " 正在打印B" ); type = "C" ; //将type设置为C。 conditionC.signal(); //唤醒在等待conditionC对象上的一个线程。将信号传递出去。 } finally { lock.unlock(); //解锁 } }
public void printC() { lock.lock(); //锁 try { while (type != "C" ) { try { conditionC.await(); } catch (InterruptedException e) { e.printStackTrace(); } }
System.out.println(Thread.currentThread().getName() + " 正在打印C" ); type = "A" ; conditionA.signal(); } finally { lock.unlock(); //解锁 } } }
public class ConditionTest{
public static void main(String[] args) { final Business business = new Business(); //业务对象。
//线程1号,打印10次A。 Thread ta = new Thread( new Runnable() {
@Override public void run() { for ( int i= 0 ;i< 10 ;i++){ business.printA(); } } });
//线程2号,打印10次B。 Thread tb = new Thread( new Runnable() {
@Override public void run() { for ( int i= 0 ;i< 10 ;i++){ business.printB(); } } });
//线程3号,打印10次C。 Thread tc = new Thread( new Runnable() {
@Override public void run() { for ( int i= 0 ;i< 10 ;i++){ business.printC(); } } });
//执行3条线程。 ta.start(); tb.start(); tc.start(); }
} |
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
虚假唤醒
所谓"虚假唤醒",即其他地方的代码触发了 condition.signal() ,唤醒 condition 上等待的线程。但被唤醒的线程仍然不满足执行条件。
condition 通常与条件语句一起使用:
1 2 3 |
if (!条件){ condition.await(); //不满足条件,当前线程等待; } |
更好的方法是使用while:
1 2 3 |
while (!条件){ condition.await(); //不满足条件,当前线程等待; } |
在等待 Condition 时,允许发生"虚假唤醒",这通常作为对基础平台语义的让步。若使用"if(!条件)"则被"虚假唤醒"的线程可能继续执行。所以" while (!条件)"可以防止"虚假唤醒"。建议总是假定这些"虚假唤醒"可能发生,因此总是在一个循环中等待。
4、总结
如果知道 Object 的等待通知机制, Condition 的使用是比较容易掌握的,因为和 Object 等待通知的使用基本一致。
对 Condition 的源码理解,主要就是理解等待队列,等待队列可以类比同步队列,而且等待队列比同步队列要简单,因为等待队列是单向队列,同步队列是双向队列。
以下是笔者对等待队列是单向队列、同步队列是双向队列的一些思考,欢迎提出不同意见:
之所以同步队列要设计成双向的,是因为在同步队列中,节点唤醒是接力式的,由每一个节点唤醒它的下一个节点,如果是由next指针获取下一个节点,是有可能获取失败的,因为虚拟队列每添加一个节点,是先用CAS把tail设置为新节点,然后才修改原tail的next指针到新节点的。因此用next向后遍历是不安全的,但是如果在设置新节点为tail前,为新节点设置prev,则可以保证从tail往前遍历是安全的。因此要安全的获取一个节点Node的下一个节点,先要看next是不是null,如果是null,还要从tail往前遍历看看能不能遍历到Node。
而等待队列就简单多了,等待的线程就是等待者,只负责等待,唤醒的线程就是唤醒者,只负责唤醒,因此每次要执行唤醒操作的时候,直接唤醒等待队列的首节点就行了。等待队列的实现中不需要遍历队列,因此也不需要 prev 指针。
到此这篇关于Java多线程之条件对象Condition的文章就介绍到这了,更多相关Java多线程 Condition内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
原文链接:https://juejin.cn/post/7019161855608225805
查看更多关于Java多线程之条件对象Condition的详细内容...