0%

CyclicBarrier源码分析

用来做多线程时候的同步,当所有线程都准备好了之后,才进行放行。例如现在有10个线程,在调用了cyclicBarrier的await()方法的线程数量不为10的时候,线程都会处于阻塞状态,只有达到10了才会进行释放,然后阻塞线程被唤醒,程序继续执行

使用

启动了十个线程,每个线程都会打印两个输出。

在调用了cyclicBarrier.await()的时候,当前线程会被阻塞,直到调用await的线程达到了设定的数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class TestCyclicBarrier
{

public static CyclicBarrier cyclicBarrier = new CyclicBarrier(10);

public static void startWait() throws Exception
{
System.out.printf("%s 开始等待 \n", Thread.currentThread().getName());
cyclicBarrier.await();
System.out.printf("%s 执行结束 \n", Thread.currentThread().getName());
}

public static void main(String[] args) throws Exception
{
for (int i = 0; i < 10; i++)
{
new Thread(() -> {
try
{
startWait();
} catch (Exception e)
{
e.printStackTrace();
}
}).start();
Thread.sleep(200);
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Thread-0 开始等待 
Thread-1 开始等待
Thread-2 开始等待
Thread-3 开始等待
Thread-4 开始等待
Thread-5 开始等待
Thread-6 开始等待
Thread-7 开始等待
Thread-8 开始等待
Thread-9 开始等待
Thread-9 执行结束
Thread-1 执行结束
Thread-0 执行结束
Thread-7 执行结束
Thread-6 执行结束
Thread-5 执行结束
Thread-4 执行结束
Thread-3 执行结束
Thread-2 执行结束
Thread-8 执行结束

源码

CyclicBarrier跟CountDownlatch整体不同,CountDownLatch是通过直接重写了AQS的相关方法来实现的,而CyclicBarrier则是通过引入ReentrantLock来实现锁的,利用Condition来实现条件的等待以及线程的通知唤醒。如果中间某个线程出现了中断异常,则会导致屏障被打破,所有等待的线程被唤醒,然后抛出异常。

基础变量

每一次被使用的屏障,都会被看做一个周期。当一个周期结束的时候,期间所有被阻塞的线程才会被唤醒,然后CyclicBarrier进入下一个周期。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private static class Generation {
Generation() {} // prevent access constructor creation
boolean broken; // initially false
}

//用来锁住屏障的锁
private final ReentrantLock lock = new ReentrantLock();
//需要等待的条件
private final Condition trip = lock.newCondition();
//参与等待的数量
private final int parties;
//当栅栏释放之后,需要执行的命令
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();

/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
private int count;

初始化

默认是传入数量,然后在数量为0的时候,释放屏障

也可以传入一个Runnable对象,在要释放屏障之前,先执行Runnable

1
2
3
4
5
6
7
8
9
10
public CyclicBarrier(int parties) {
this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties; //设定当前等待的总数量
this.count = parties; //设定当前的计数器
this.barrierCommand = barrierAction;
}

await

https://iotxing-1253163150.cos.ap-shanghai.myqcloud.com/CyclicBarrier.png

执行流程

  1. 先获取到锁(所以CyclicBarrier是线程安全的,同时只有一个线程能够获取到锁)
  2. 获取当前周期,判断是否正常
  3. 如果当前线程触发了中断,执行breakBarrier,打破当前屏障并唤醒其他的线程,其他的线程也会抛出异常
  4. 计数器减一
  5. 如果计数器减一之后等于0,如果传入了相关的命令则先执行命令,然后调用nextGeneration,进入下一个周期,并且会唤醒所有的等待线程
  6. 如果计数器不为0,进行自旋,然后使用await释放锁。如果被唤醒了,判断是否正常情况的唤醒,还是因为屏障被打破导致的唤醒
  7. 释放持有的锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock; //先获取到当前的锁,然后锁住当前的线程
lock.lock();
try {
final Generation g = generation; //获取到当前的

if (g.broken) //如果当前需要进行等待的cyclicBarrier已经被打破,会抛出异常
throw new BrokenBarrierException();

if (Thread.interrupted()) {
breakBarrier(); //如果当前线程被中断了,则会打破屏障,然后通知所有等待的线程
throw new InterruptedException();
}

int index = --count; //对count数量进行减一,然后获取到当前的序号
if (index == 0) { // tripped
Runnable command = barrierCommand;
if (command != null) { //如果count在自减之后为0,说明可以打破屏障了,执行传入的命令
try {
command.run();
} catch (Throwable ex) {
breakBarrier();
throw ex;
}
}
nextGeneration(); //唤醒所有的线程,然后准备进行下一个周期
return 0;
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed) //如果没有设置超时时间,就一直等待,释放锁
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException(); //如果是因为出现了异常而被唤醒的,则抛出异常

if (g != generation) //如果周期相同,说明没有进入到下一个周期,等待进入下一个周期
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

nextGeneration

通知所有的线程,然后重置当前的计数器,进入下一个周期

1
2
3
4
5
6
7
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}

BreakBarrier

打破屏障,唤醒所有等待的线程,被唤醒的线程不会继续执行,而是会抛出一个异常

1
2
3
4
5
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}