好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

聊聊 CountDownLatch 闭锁源码分析

功能简介

闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态【CPJ 3.4.2】。闭锁的作用相当于一扇门∶ 在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态。闭锁可以用来确保某些活动直到其他活动都完成后才继续执行,例如∶

确保某个计算在其需要的所有资源都被初始化之后才继续执行。二元闭锁(包括两个状态)可以用来表示"资源R已经被初始化",而所有需要 R 的操作都必须先在这个闭锁上等待。 确保某个服务在其依赖的所有其他服务都已经启动之后才启动。每个服务都有一个相关的二元闭锁。当启动服务S 时,将首先在S依赖的其他服务的闭锁上等待,在所有依赖的服务都启动后会释放闭锁S,这样其他依赖 S 的服务才能继续执行。 等待直到某个操作的所有参与者(例如,在多玩家游戏中的所有玩家)都就绪再继续执行。在这种情况中,当所有玩家都准备就绪时,闭锁将到达结束状态。

CountDownLatch.jpg

CountDownLatch是一种灵活的闭锁实现,可以在上述各种情况中使用,它可以使一个或多个线程等待一组事件发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而 await方法等待计数器达到零,这表示所有需要等待的事件都已经发生。如果计数器的值非零,那么 await 会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。

使用案例

TestHarness 中给出了闭锁的两种常见用法。TestHarness 创建一定数量的线程,利用它们并发地执行指定的任务。它使用两个闭锁,分别表示"起始门(Starting Gate)"和"结束门(Ending Gate)"。起始门计数器的初始值为1,而结束门计数器的初始值为工作线程的数量。每个工作线程首先要做的就是在启动门上等待,从而确保所有线程都就绪后才开始执行。而每个线程要做的最后一件事情是将调用结束门的 countDown 方法减1,这能使主线程高效地等待直到所有工作线程都执行完成,因此可以统计所消耗的时间。

public  class TestHarness {         public  long timeTasks( int  nThreads, final Runnable task) throws InterruptedException {          final CountDownLatch startGate = new CountDownLatch(1);          final CountDownLatch endGate = new CountDownLatch(nThreads);             for  ( int  i = 0; i < nThreads; i++) {              Thread t = new Thread(() -> {                  try {                      startGate.await();                      try {                          task.run();                      } finally {                          endGate.countDown();                      }                    } catch (InterruptedException ignored) {                    }              });              t.start();          }            long start = System.nanoTime();          startGate.countDown();          endGate.await();          long  end  = System.nanoTime();           return   end  - start;      }         public   static  void main(String[] args) throws InterruptedException {          TestHarness testHarness = new TestHarness();          AtomicInteger num = new AtomicInteger(0);          long  time  = testHarness.timeTasks(10, () -> System. out .println(num.incrementAndGet()));          System. out .println( "cost time: "  +  time  +  "ms" );      }  }    //输出结果  1  10  9  8  7  5  6  4  3  2  cost  time : 2960900ms 

为什么要在 TestHarness 中使用闭锁,而不是在线程创建后就立即启动? 或许,我们希望测试 n 个线程并发执行某个任务时需要的时间。如果在创建线程后立即启动它们,那么先启动的线程将"领先"后启动的线程,并且活跃线程数量会随着时间的推移而增加或减少,竞争程度也在不断发生变化。启动门将使得主线程能够实时释放所有工作线程,而结束门则使主线程能够等待最后一个线程执行完成,而不是顺序地等待每个线程执行完成。

使用总结

CountDownLatch 是一次性的,计算器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch 使用完毕后,它不能再次被使用。

源码分析

代码分析

CountDownLatch 在底层还是采用 AbstractQueuedSynchronizer 实现。

CountDownLatch startGate = **new **CountDownLatch(1); 

我们先看它的构造方法, 创建了一个 sync 对象。

public  CountDownLatch( int   count ) {      if ( count  < 0) throw new IllegalArgumentException( "count < 0" );      this.sync = new Sync( count );  } 

Sync 是 AbstractQueuedSynchronizer 的一个实现, 按照字面意思我们可以猜到它是公平方式实现。

private  static  final class Sync extends AbstractQueuedSynchronizer {      private  static  final long serialVersionUID = 4982264981922014374L;        // 构造方法      Sync( int   count ) {          setState( count );      }        // 获取资源数       int  getCount() {           return  getState();      }        // 获取锁      protected  int  tryAcquireShared( int  acquires) {           return  (getState() == 0) ? 1 : -1;      }        // 释放锁      protected boolean tryReleaseShared( int  releases) {          // Decrement  count ; signal  when  transition  to  zero           for  (;;) {               int  c = getState();              if (c == 0)                   return   false ;               int  nextc = c-1;              // CAS 解锁              if (compareAndSetState(c, nextc))                   return  nextc == 0;          }      }  } 

在 await 方法中如果存在计算值, 那么当前线程将进入 AQS 队列生成 Node 节点, 线程进入阻塞状态。

public  void await() throws InterruptedException {      sync.acquireSharedInterruptibly(1);  } 

其实主要是获取共享锁。

public  final void acquireSharedInterruptibly( int  arg)      throws InterruptedException {      if (Thread.interrupted())          throw new InterruptedException();      if (tryAcquireShared(arg) < 0)          doAcquireSharedInterruptibly(arg);  } 

CountDownLatch.Sync 实现了 tryAcquireShared 方法 ,如果 getState() == 0 返回 1 , 否则返回 -1. 也就是说创建 CountDownLatch 实例后再执行 await 方法将继续调用 doAcquireSharedInterruptibly(arg);

// 是否可获取共享锁  protected  int  tryAcquireShared( int  acquires) {       return  (getState() == 0) ? 1 : -1;  }      // 尝试获取锁, 或者入队  private void doAcquireSharedInterruptibly( int  arg)      throws InterruptedException {      final Node node = addWaiter(Node.SHARED);      boolean failed =  true ;      try {           for  (;;) {              final Node p = node.predecessor();              if (p == head) {                   int  r = tryAcquireShared(arg);                  if (r >= 0) {                      setHeadAndPropagate(node, r);                      p. next  =  null ; // help GC                      failed =  false ;                       return ;                  }              }              if (shouldParkAfterFailedAcquire(p, node) &&                  parkAndCheckInterrupt())                  throw new InterruptedException();          }      } finally {          if (failed)              cancelAcquire(node);      }  } 

在 countDown 方法如果存在等待的线程, 将对其进行唤醒. 或者减少 CountDownLatch 资源数。

public  void countDown() {      sync.releaseShared(1);  } 

通过 releaseShared 对共享锁进行解锁。

public  final boolean releaseShared( int  arg) {      if (tryReleaseShared(arg)) {          doReleaseShared();           return   true ;      }       return   false ;  } 

最终会调用 doReleaseShared 唤醒 AQS 中的头节点。

private void doReleaseShared() {      /*           * Ensure that a release propagates, even if there are other           *  in -progress acquires/releases.  This proceeds  in  the usual           * way  of  trying  to  unparkSuccessor  of  head if it needs           * signal. But if it does  not , status  is   set   to  PROPAGATE  to            * ensure that upon release, propagation continues.           * Additionally, we must loop  in   case  a new node  is  added           * while we are doing this. Also, unlike other uses  of            * unparkSuccessor, we need  to  know if CAS  to  reset status           * fails, if so rechecking.           */       for  (;;) {          Node h = head;          if (h !=  null  && h != tail) {               int  ws = h.waitStatus;              if (ws == Node.SIGNAL) {                  if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                       continue ;            // loop  to  recheck cases                  unparkSuccessor(h);              }               else  if (ws == 0 &&                       !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                   continue ;                // loop  on  failed CAS          }          if (h == head)                   // loop if head changed              break;      }  } 

详细流程如下图:

源码流程图

CountDownLatch 闭锁源码分析.png

参考资料

《Java 并发编程实战》

https://www.cnblogs.com/Lee_xy_z/p/10470181.html

原文链接:https://mp.weixin.qq.com/s/7rn6NCPqIcGiDs3cVuVm2g

查看更多关于聊聊 CountDownLatch 闭锁源码分析的详细内容...

  阅读:15次