工具类CountDownLatch
CountDownLatch,字面意思倒计数,它的作用是允许1或N个线程等待其他线程完成后再执行,比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行。
介绍 注意 ,CountDownlatch与CyclicBarrier有那么点相似,但是他们还是存在一些区别的:
CountDownLatch的作用是允许1或N个线程等待其他线程完成后再执行;而CyclicBarrier则是实现让一组线程等待至某个状态之后再全部同时执行。
CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用(reset()
)。
内部实现
CountDownLatch依赖内部类Sync实现,而Sync继承AQS。
提供了一个构造函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public CountDownLatch (int count) { if (count < 0 ) throw new IllegalArgumentException ("count < 0" ); this .sync = new Sync (count); }private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L ; Sync(int count) { setState(count); } int getCount () { return getState(); } protected int tryAcquireShared (int acquires) { return (getState() == 0 ) ? 1 : -1 ; } protected boolean tryReleaseShared (int releases) { for (;;) { int c = getState(); if (c == 0 ) return false ; int nextc = c-1 ; if (compareAndSetState(c, nextc)) return nextc = = 0 ; } } }
原理 CountDownLatch是采用共享锁 来实现的。
await() 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。
1 2 3 public void await () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); }
await内部调用AQS的acquireSharedInterruptibly
1 2 3 4 5 6 7 public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); }
tryAcquireShared由CountDownLatch内部类Sync重写实现:
1 2 3 protected int tryAcquireShared (int acquires) { return (getState() == 0 ) ? 1 : -1 ; }
getState()获取同步状态,其值等于计数器的值,可以理解为重入锁的state数量,只不过初始化CountDownLatch的时候就由我们自己定义好了数值。
如果计数器值不等于0,则会调用doAcquireSharedInterruptibly(int arg),该方法为一个自旋方法会尝试一直去获取同步状态:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException (); }sync.acquireSharedInterruptibly(1 ); } } finally { if (failed) cancelAcquire(node); } }
countDown() 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
1 2 3 public void countDown () { sync.releaseShared(1 ); }
内部调用AQS的releaseShared(int arg)方法来释放共享锁同步状态:
1 2 3 4 5 6 7 8 public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } } return false ; }
tryReleaseShared方法由CountDownLatch的内部类Sync重写实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 protected boolean tryReleaseShared (int releases) { for (;;) { int c = getState(); if (c == 0 ) return false ; int nextc = c-1 ; if (compareAndSetState(c, nextc)) return nextc = = 0 ; } } }
小结 CountDownLatch内部通过共享锁实现。在创建CountDownLatch实例时,需要传递一个int型的参数:count,该参数为计数器的初始值,也可以理解为该共享锁可获取的总次数。当某个线程调用await()方法,程序首先判断count的值是否为0,如果不会0的话则会一直等待,直到为0为止。当其他线程调用countDown()方法时,则执行释放共享锁状态,使count - 1,只到计数器count等于0(其他线程执行完毕),锁才会释放,前面等待的线程才会继续运行。注意CountDownLatch不能回滚重置。
示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class App { public static void main (String[] args) { CountDownLatch countDownLatch = new CountDownLatch (2 ); for (int i = 0 ; i < 2 ; i++){ new Thread (() -> { try { Thread.sleep(1000 ); System.out.println("任务" + Thread.currentThread().getName() + "开始执行" ); System.out.println("任务" + Thread.currentThread().getName() + "执行完毕" ); countDownLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } },String.valueOf(i)).start(); } try { System.out.println("任务" + Thread.currentThread().getName() + "正在等待子线程完成任务" ); countDownLatch.await(); System.out.println("任务" + Thread.currentThread().getName() + "也执行完毕" ); } catch (InterruptedException e) { e.printStackTrace(); } } }
运行结果如下: