CountDownLatch

工具类CountDownLatch
CountDownLatch,字面意思倒计数,它的作用是允许1或N个线程等待其他线程完成后再执行,比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行。

介绍

注意,CountDownlatch与CyclicBarrier有那么点相似,但是他们还是存在一些区别的:

  1. CountDownLatch的作用是允许1或N个线程等待其他线程完成后再执行;而CyclicBarrier则是实现让一组线程等待至某个状态之后再全部同时执行。
  2. CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用(reset())。

内部实现


CountDownLatch依赖内部类Sync实现,而Sync继承AQS。

CountDownLatch类UML图

提供了一个构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

//获取共享状态
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

//释放共享状态
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
原理

CountDownLatch是采用共享锁来实现的。

await()使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。

1
2
3
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

await内部调用AQS的acquireSharedInterruptibly

1
2
3
4
5
6
7
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

tryAcquireShared由CountDownLatch内部类Sync重写实现:

1
2
3
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

getState()获取同步状态,其值等于计数器的值,可以理解为重入锁的state数量,只不过初始化CountDownLatch的时候就由我们自己定义好了数值。

如果计数器值不等于0,则会调用doAcquireSharedInterruptibly(int arg),该方法为一个自旋方法会尝试一直去获取同步状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//等待
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}sync.acquireSharedInterruptibly(1);
}
} finally {
if (failed)
cancelAcquire(node);
}
}

countDown()递减锁存器的计数,如果计数到达零,则释放所有等待的线程。

1
2
3
public void countDown() {
sync.releaseShared(1);
}

内部调用AQS的releaseShared(int arg)方法来释放共享锁同步状态:

1
2
3
4
5
6
7
8
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
}
return false;
}

tryReleaseShared方法由CountDownLatch的内部类Sync重写实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected boolean tryReleaseShared(int releases) {
for (;;) {
//获取锁状态
int c = getState();
//c == 0 直接返回,释放锁成功
if (c == 0)
return false;
//计算新“锁计数器”
int nextc = c-1;
//更新锁状态(计数器)
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
小结

CountDownLatch内部通过共享锁实现。在创建CountDownLatch实例时,需要传递一个int型的参数:count,该参数为计数器的初始值,也可以理解为该共享锁可获取的总次数。当某个线程调用await()方法,程序首先判断count的值是否为0,如果不会0的话则会一直等待,直到为0为止。当其他线程调用countDown()方法时,则执行释放共享锁状态,使count - 1,只到计数器count等于0(其他线程执行完毕),锁才会释放,前面等待的线程才会继续运行。注意CountDownLatch不能回滚重置。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class App 
{
public static void main(String[] args){
CountDownLatch countDownLatch = new CountDownLatch(2);
for(int i = 0; i < 2; i++){
new Thread(() -> {
try {
Thread.sleep(1000);
System.out.println("任务" + Thread.currentThread().getName() + "开始执行");
System.out.println("任务" + Thread.currentThread().getName() + "执行完毕");
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
try {
System.out.println("任务" + Thread.currentThread().getName() + "正在等待子线程完成任务");
countDownLatch.await();
System.out.println("任务" + Thread.currentThread().getName() + "也执行完毕");
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}

运行结果如下:

CountDownLatch测试结果


CountDownLatch
http://www.muzili.ren/2022/06/11/CountDownLatch/
作者
jievhaha
发布于
2022年6月11日
许可协议