CyclicBarrier

CyclicBarrier
CyclicBarrier,字面意思回环栅栏。通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。

内部实现


CyclicBarrier的内部是使用重入锁ReentrantLock和Condition。

1
2
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();

两个构造函数:

1
2
3
4
5
6
7
8
9
10
private final Runnable barrierCommand;//自己定义的Runnable,用于在parties数量线程到达屏障时,优先执行barrierAction 
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;//要拦截的线程数
this.count = parties;
this.barrierCommand = barrierAction;
}

在CyclicBarrier中最重要的方法莫过于await()方法,在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);//不超时等待
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
//获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//分代
final Generation g = generation;

//当前generation“已损坏”,抛出BrokenBarrierException异常
//抛出该异常一般都是某个线程在等待某个处于“断开”状态的CyclicBarrie
if (g.broken)
//当某个线程试图等待处于断开状态的 barrier 时,或者 barrier 进入断开状态而线程处于等待状态时,抛出该异常
throw new BrokenBarrierException();

//如果线程中断,终止CyclicBarrier
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}

//进来一个线程 count - 1
int index = --count;
//count == 0 表示所有线程均已到位,触发Runnable任务
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
//触发任务
if (command != null)
command.run();
ranAction = true;
//唤醒所有等待线程,并更新generation,比如我CyclicBarrier只能放2个,但是我一共四个甚至更多的线程,每次执行2个,然后就重新往CyclicBarrier塞,而且要重置generation
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}


for (;;) {
try {
//如果不是超时等待,则调用Condition.await()方法等待
if (!timed)
trip.await();
else if (nanos > 0L)
//超时等待,调用Condition.awaitNanos()方法等待
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

//generation已经更新,返回index
if (g != generation)
return index;

//“超时等待”,并且时间已到,终止CyclicBarrier,并抛出异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
//释放锁
lock.unlock();
}
}

其中有个Generation对象,broken标识该当前CyclicBarrier是否已经处于中断状态。默认CyclicBarrier是没有损坏的。

1
2
3
private static class Generation {
boolean broken = false;//broken标识该当前CyclicBarrier是否已经处于中断状态。
}

当barrier损坏了或者有一个线程中断了,则通过breakBarrier()来终止所有的线程,,还会调用signalAll将在CyclicBarrier处于等待状态的线程全部唤醒。:

1
2
3
4
5
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}

当一批次CyclicBarrier执行完毕(parties==0),唤醒所有等待线程,并更新generation,然后进行下一批次(比如我CyclicBarrier只能放2个,但是我一共四个甚至更多的线程,每次执行2个,然后就重新往CyclicBarrier塞)

1
2
3
4
5
6
7
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}

应用场景


CyclicBarrier试用与多线程结果合并的操作,用于多线程计算数据,最后合并计算结果的应用场景。比如我们需要统计多个Excel中的数据,然后等到一个总结果。

例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class App 
{
public static void main(String[] args){
CyclicBarrier cyclicBarrier = new CyclicBarrier(2,()->{
System.out.println("任务集齐完毕");
});
for(int i = 0; i < 2; i++){
new Thread(() -> {
try{
System.out.println("任务" + Thread.currentThread().getName() + "开始执行");
cyclicBarrier.await();
System.out.println("任务" + Thread.currentThread().getName() + "执行完毕");
}catch (InterruptedException e){

}catch (BrokenBarrierException e){

}
}).start();
}
}
}

运行结果如下:

CyclicBarrier测试结果


CyclicBarrier
http://www.muzili.ren/2022/06/11/CyclicBarrier/
作者
jievhaha
发布于
2022年6月11日
许可协议