基本概念

CyclicBarrier是一个同步的辅助类,允许一组线程相互之间等待,并设置一个公共屏障点,当组内线程达到这个屏障点的时候阻塞,阻塞在这个屏障点的线程数达到指定数量时,释放所有线程继续往下执行。CyclicBarrier在释放完线程后相当于重置之前的记录可以循环使用,所以称之为Cyclic(循环)Barrier(屏障)。

使用场景

开发经历有限,目前为止还真没用过CyclicBarrier,一般场景使用CountDownLatch就够了,就随便写点吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Test {

private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("屏障点回调线程,执行者:" + Thread.currentThread().getName());
}
});

public static void main(String[] args) {

for(int i = 0; i< 5;i++){

Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "开始运行");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + "结束运行");
} catch (Exception e) {
e.printStackTrace();
}
}
});

thread.setName("线程" + i);
thread.start();
}

}
}

打印结果:
线程1开始运行
线程0开始运行
线程2开始运行
线程3开始运行
线程4开始运行
屏障点回调线程,执行者:线程4
线程4结束运行
线程1结束运行
线程0结束运行
线程2结束运行
线程3结束运行

打印结果可以看出来,当指定数量(构造器参数决定)的线程到达屏障点(await代码行)后,才能继续往下执行。如果在构造器中指定了回调线程,还需要等待回调线程执行完才可以往下执行,回调线程由最后一个阻塞的线程执行。

构造器源码

1
2
3
4
5
6
7
8
9
10
11
12
13

// 设置屏障阈值
public CyclicBarrier(int parties) {
this(parties, null);
}

// 设置屏障阈值,同时增加回调线程功能
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

// lock对象
private final ReentrantLock lock = new ReentrantLock();

// 跳闸,可以理解为打开屏障
private final Condition trip = lock.newCondition();

// 屏障阈值
private final int parties;

// 回调线程
private final Runnable barrierCommand;

// 每次使用屏障都会生成,内部的broken标记屏障是否破损
private Generation generation = new Generation();

// 默认设置false
private static class Generation {
boolean broken = false;
}

await()源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92

// 内部调用dowait()方法,并且参数传false,不支持超时
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

// 真正进入等待的逻辑
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 获取排他锁
lock.lock();
try {
final Generation g = generation;
// 屏障被破坏则抛异常
if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
// 线程中断 则退出屏障
breakBarrier();
throw new InterruptedException();
}

// 到达屏障的计数-1
int index = --count;
if (index == 0) { // tripped
// index == 0, 说明指定 count 的线程均到达屏障,此时可以打开屏障
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
// 若指定了 barrierCommand 则执行
command.run();
ranAction = true;
// 唤醒阻塞在屏障的线程并重置 generation
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
// 若未指定阻塞在屏障处的等待时间,则一直等待;直至最后一个线程到达屏障处的时候被唤醒
trip.await();
else if (nanos > 0L)
// 若指定了阻塞在屏障处的等待时间,则在指定时间到达时会返回
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
// 若等待过程中,线程发生了中断,则退出屏障
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

// 屏障被破坏 则抛出异常
if (g.broken)
throw new BrokenBarrierException();

if (g != generation)
// g != generation 说明所有线程均到达屏障处 可直接返回
// 因为所有线程到达屏障处的时候,会重置 generation
// 参考 nextGeneration
return index;

if (timed && nanos <= 0L) {
// 说明指定时间内,还有线程未到达屏障处,也就是等待超时
// 退出屏障
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

nextGeneration()源码

1
2
3
4
5
6
7
8
9
10
11
private void nextGeneration() {

// 唤醒阻塞在等待队列的线程
trip.signalAll();

// 重置 count
count = parties;

// 重置 generation
generation = new Generation();
}

breakBarrier()源码

1
2
3
4
5
6
7
8
9
10
11
private void breakBarrier() {

// broken 设置为 true
generation.broken = true;

// 重置 count
count = parties;

// 唤醒等待队列的线程
trip.signalAll();
}

reset()源码

1
2
3
4
5
6
7
8
9
10
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 唤醒阻塞的线程
breakBarrier(); // break the current generation
// 重新设置 generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}

总结

CyclicBarrier依赖与Lock与Condition实现,await()方法使用Lock进行互斥,Condition对象负责挂起被屏障挡住的线程。Lock与Condition底层是基于AQS的,所以CyclicBarrier还是通过AQS实现。

CyclicBarrier内部有个屏障是否被打破的概念,维护在内部类Generation的broken属性中(默认是false),并且可以通过breakBarrier()方法进行打破(修改为true),调用这个方法的地方有三个,检测到中断、等待超时、reset()方法。当某个线程在等待过程中被中断或超时,会直接抛中断异常退出等待,不会对count执行-1操作,这会导致同一组线程会无限等待下去,因为count值永远无法到达0。使用reset()方法会重置count值,为了避免重置时还有残余线程没执行到await()方法,干扰重置后的count值导致下一轮提前结束。当遇到这些情况时,CyclicBarrier会修改broken=true来通知其他线程不要再等下去了。

CountDownLatch与CyclicBarrier区别:

CountDownLatch CyclicBarrier
一个线程(或多个)线程等待另N个线程完成某事后才能继续执行 N个线程相互在某个点等待,知道所有线程都到达这个点解除等待
无法重复利用,没有提供state属性的重置方法 可以重复利用,提供reset()方法重置

评论